Libosmium  2.20.0
Fast and flexible C++ library for working with OpenStreetMap data
pool.hpp
Go to the documentation of this file.
1#ifndef OSMIUM_THREAD_POOL_HPP
2#define OSMIUM_THREAD_POOL_HPP
3
4/*
5
6This file is part of Osmium (https://osmcode.org/libosmium).
7
8Copyright 2013-2023 Jochen Topf <jochen@topf.org> and others (see README).
9
10Boost Software License - Version 1.0 - August 17th, 2003
11
12Permission is hereby granted, free of charge, to any person or organization
13obtaining a copy of the software and accompanying documentation covered by
14this license (the "Software") to use, reproduce, display, distribute,
15execute, and transmit the Software, and to prepare derivative works of the
16Software, and to permit third-parties to whom the Software is furnished to
17do so, all subject to the following:
18
19The copyright notices in the Software and this entire statement, including
20the above license grant, this restriction and the following disclaimer,
21must be included in all copies of the Software, in whole or in part, and
22all derivative works of the Software, unless such copies or derivative
23works are solely in the form of machine-executable object code generated by
24a source language processor.
25
26THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32DEALINGS IN THE SOFTWARE.
33
34*/
35
40
41#include <cstddef>
42#include <future>
43#include <thread>
44#include <type_traits>
45#include <utility>
46#include <vector>
47
48namespace osmium {
49
53 namespace thread {
54
55 namespace detail {
56
57 // Maximum number of allowed pool threads (just to keep the user
58 // from setting something silly).
59 enum {
60 max_pool_threads = 32
61 };
62
63 inline int get_pool_size(int num_threads, int user_setting, unsigned hardware_concurrency) {
64 if (num_threads == 0) {
65 num_threads = user_setting ? user_setting : -2;
66 }
67
68 if (num_threads < 0) {
69 num_threads += static_cast<int>(hardware_concurrency);
70 }
71
72 if (num_threads < 1) {
73 num_threads = 1;
74 } else if (num_threads > max_pool_threads) {
75 num_threads = max_pool_threads;
76 }
77
78 return num_threads;
79 }
80
81 inline std::size_t get_work_queue_size() noexcept {
82 return osmium::config::get_max_queue_size("WORK", 10);
83 }
84
85 } // namespace detail
86
90 class Pool {
91
97
98 std::vector<std::thread>& m_threads;
99
100 public:
101
102 explicit thread_joiner(std::vector<std::thread>& threads) :
103 m_threads(threads) {
104 }
105
106 thread_joiner(const thread_joiner&) = delete;
108
111
113 for (auto& thread : m_threads) {
114 if (thread.joinable()) {
115 thread.join();
116 }
117 }
118 }
119
120 }; // class thread_joiner
121
123 std::vector<std::thread> m_threads{};
126
128 osmium::thread::set_thread_name("_osmium_worker");
129 while (true) {
130 function_wrapper task;
131 m_work_queue.wait_and_pop(task);
132 if (task && task()) {
133 // The called tasks returns true only when the
134 // worker thread should shut down.
135 return;
136 }
137 }
138 }
139
140 public:
141
142 enum {
144 };
145
146 enum {
148 };
149
165 explicit Pool(int num_threads = default_num_threads, std::size_t max_queue_size = default_queue_size) :
166 m_work_queue(max_queue_size > 0 ? max_queue_size : detail::get_work_queue_size(), "work"),
168 m_num_threads(detail::get_pool_size(num_threads, osmium::config::get_pool_threads(), std::thread::hardware_concurrency())) {
169
170 try {
171 for (int i = 0; i < m_num_threads; ++i) {
172 m_threads.emplace_back(&Pool::worker_thread, this);
173 }
174 } catch (...) {
176 throw;
177 }
178 }
179
187 static Pool pool{};
188 return pool;
189 }
190
192 for (int i = 0; i < m_num_threads; ++i) {
193 // The special function wrapper makes a worker shut down.
195 }
196 }
197
198 Pool(const Pool&) = delete;
199 Pool& operator=(const Pool&) = delete;
200
201 Pool(Pool&&) = delete;
202 Pool& operator=(Pool&&) = delete;
203
206 }
207
208 int num_threads() const noexcept {
209 return m_num_threads;
210 }
211
212 std::size_t queue_size() const {
213 return m_work_queue.size();
214 }
215
216 bool queue_empty() const {
217 return m_work_queue.empty();
218 }
219
220#if defined(__cpp_lib_is_invocable) && __cpp_lib_is_invocable >= 201703
221 // std::result_of is deprecated in C++17 and removed in C++20,
222 // so we use std::invoke_result_t.
223 template <typename TFunction>
224 using submit_func_result_type = std::invoke_result_t<TFunction>;
225#else
226 // For C++11 and C++14
227 template <typename TFunction>
228 using submit_func_result_type = typename std::result_of<TFunction()>::type;
229#endif
230
231 template <typename TFunction>
232 std::future<submit_func_result_type<TFunction>> submit(TFunction&& func) {
233 std::packaged_task<submit_func_result_type<TFunction>()> task{std::forward<TFunction>(func)};
234 std::future<submit_func_result_type<TFunction>> future_result{task.get_future()};
235 m_work_queue.push(std::move(task));
236
237 return future_result;
238 }
239
240 }; // class Pool
241
242 } // namespace thread
243
244} // namespace osmium
245
246#endif // OSMIUM_THREAD_POOL_HPP
thread_joiner(std::vector< std::thread > &threads)
Definition: pool.hpp:102
thread_joiner(thread_joiner &&)=delete
std::vector< std::thread > & m_threads
Definition: pool.hpp:98
thread_joiner(const thread_joiner &)=delete
thread_joiner & operator=(thread_joiner &&)=delete
thread_joiner & operator=(const thread_joiner &)=delete
~thread_joiner()
Definition: pool.hpp:112
Definition: pool.hpp:90
void shutdown_all_workers()
Definition: pool.hpp:191
std::vector< std::thread > m_threads
Definition: pool.hpp:123
@ default_queue_size
Definition: pool.hpp:147
~Pool()
Definition: pool.hpp:204
Pool(int num_threads=default_num_threads, std::size_t max_queue_size=default_queue_size)
Definition: pool.hpp:165
Pool(Pool &&)=delete
std::size_t queue_size() const
Definition: pool.hpp:212
typename std::result_of< TFunction()>::type submit_func_result_type
Definition: pool.hpp:228
std::future< submit_func_result_type< TFunction > > submit(TFunction &&func)
Definition: pool.hpp:232
static Pool & default_instance()
Definition: pool.hpp:186
Pool(const Pool &)=delete
@ default_num_threads
Definition: pool.hpp:143
Pool & operator=(Pool &&)=delete
osmium::thread::Queue< function_wrapper > m_work_queue
Definition: pool.hpp:122
thread_joiner m_joiner
Definition: pool.hpp:124
bool queue_empty() const
Definition: pool.hpp:216
void worker_thread()
Definition: pool.hpp:127
int m_num_threads
Definition: pool.hpp:125
Pool & operator=(const Pool &)=delete
int num_threads() const noexcept
Definition: pool.hpp:208
Definition: queue.hpp:57
Definition: function_wrapper.hpp:48
Definition: attr.hpp:342
int get_pool_threads() noexcept
Definition: config.hpp:62
std::size_t get_max_queue_size(const char *queue_name, const std::size_t default_value) noexcept
Definition: config.hpp:83
type
Definition: entity_bits.hpp:63
void set_thread_name(const char *name) noexcept
Definition: util.hpp:78
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
Definition: location.hpp:555