Libosmium  2.20.0
Fast and flexible C++ library for working with OpenStreetMap data
queue.hpp
Go to the documentation of this file.
1#ifndef OSMIUM_THREAD_QUEUE_HPP
2#define OSMIUM_THREAD_QUEUE_HPP
3
4/*
5
6This file is part of Osmium (https://osmcode.org/libosmium).
7
8Copyright 2013-2023 Jochen Topf <jochen@topf.org> and others (see README).
9
10Boost Software License - Version 1.0 - August 17th, 2003
11
12Permission is hereby granted, free of charge, to any person or organization
13obtaining a copy of the software and accompanying documentation covered by
14this license (the "Software") to use, reproduce, display, distribute,
15execute, and transmit the Software, and to prepare derivative works of the
16Software, and to permit third-parties to whom the Software is furnished to
17do so, all subject to the following:
18
19The copyright notices in the Software and this entire statement, including
20the above license grant, this restriction and the following disclaimer,
21must be included in all copies of the Software, in whole or in part, and
22all derivative works of the Software, unless such copies or derivative
23works are solely in the form of machine-executable object code generated by
24a source language processor.
25
26THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32DEALINGS IN THE SOFTWARE.
33
34*/
35
36#include <atomic>
37#include <chrono>
38#include <condition_variable>
39#include <cstddef>
40#include <mutex>
41#include <queue>
42#include <string>
43#include <utility> // IWYU pragma: keep
44
45#ifdef OSMIUM_DEBUG_QUEUE_SIZE
46# include <iostream>
47#endif
48
49namespace osmium {
50
51 namespace thread {
52
56 template <typename T>
57 class Queue {
58
61 const std::size_t m_max_size;
62
64 const std::string m_name;
65
66 mutable std::mutex m_mutex;
67
68 std::queue<T> m_queue;
69
71 std::condition_variable m_data_available;
72
74 std::condition_variable m_space_available;
75
76 std::atomic<bool> m_in_use{true};
77
78#ifdef OSMIUM_DEBUG_QUEUE_SIZE
80 std::size_t m_largest_size;
81
83 std::atomic<int> m_push_counter;
84
87 std::atomic<int> m_full_counter;
88
93 std::atomic<int> m_pop_counter;
94
97 std::atomic<int> m_empty_counter;
98#endif
99
100 public:
101
109 explicit Queue(std::size_t max_size = 0, std::string name = "") :
110 m_max_size(max_size),
111 m_name(std::move(name)),
112 m_queue()
113#ifdef OSMIUM_DEBUG_QUEUE_SIZE
114 ,
115 m_largest_size(0),
116 m_push_counter(0),
117 m_full_counter(0),
118 m_pop_counter(0),
119 m_empty_counter(0)
120#endif
121 {
122 }
123
124 Queue(const Queue&) = delete;
125 Queue& operator=(const Queue&) = delete;
126
127 Queue(Queue&&) = delete;
128 Queue& operator=(Queue&&) = delete;
129
130#ifdef OSMIUM_DEBUG_QUEUE_SIZE
131 ~Queue() {
132 std::cerr << "queue '" << m_name
133 << "' with max_size=" << m_max_size
134 << " had largest size " << m_largest_size
135 << " and was full " << m_full_counter
136 << " times in " << m_push_counter
137 << " push() calls and was empty " << m_empty_counter
138 << " times in " << m_pop_counter
139 << " pop() calls\n";
140 }
141#else
142 ~Queue() = default;
143#endif
144
149 void push(T value) {
150 if (!m_in_use) {
151 return;
152 }
153 constexpr const std::chrono::milliseconds max_wait{10};
154#ifdef OSMIUM_DEBUG_QUEUE_SIZE
155 ++m_push_counter;
156#endif
157 if (m_max_size) {
158 while (size() >= m_max_size) {
159 std::unique_lock<std::mutex> lock{m_mutex};
160 m_space_available.wait_for(lock, max_wait, [this] {
161 return m_queue.size() < m_max_size;
162 });
163#ifdef OSMIUM_DEBUG_QUEUE_SIZE
164 ++m_full_counter;
165#endif
166 }
167 }
168 const std::lock_guard<std::mutex> lock{m_mutex};
169 m_queue.push(std::move(value));
170#ifdef OSMIUM_DEBUG_QUEUE_SIZE
171 if (m_largest_size < m_queue.size()) {
172 m_largest_size = m_queue.size();
173 }
174#endif
175 m_data_available.notify_one();
176 }
177
178 void wait_and_pop(T& value) {
179#ifdef OSMIUM_DEBUG_QUEUE_SIZE
180 ++m_pop_counter;
181#endif
182 std::unique_lock<std::mutex> lock{m_mutex};
183#ifdef OSMIUM_DEBUG_QUEUE_SIZE
184 if (m_queue.empty()) {
185 ++m_empty_counter;
186 }
187#endif
188 m_data_available.wait(lock, [this] {
189 return !m_in_use || !m_queue.empty();
190 });
191 if (!m_queue.empty()) {
192 value = std::move(m_queue.front());
193 m_queue.pop();
194 lock.unlock();
195 if (m_max_size) {
196 m_space_available.notify_one();
197 }
198 }
199 }
200
201 bool try_pop(T& value) {
202#ifdef OSMIUM_DEBUG_QUEUE_SIZE
203 ++m_pop_counter;
204#endif
205 {
206 const std::lock_guard<std::mutex> lock{m_mutex};
207 if (m_queue.empty()) {
208#ifdef OSMIUM_DEBUG_QUEUE_SIZE
209 ++m_empty_counter;
210#endif
211 return false;
212 }
213 value = std::move(m_queue.front());
214 m_queue.pop();
215 }
216 if (m_max_size) {
217 m_space_available.notify_one();
218 }
219 return true;
220 }
221
222 bool empty() const {
223 const std::lock_guard<std::mutex> lock{m_mutex};
224 return m_queue.empty();
225 }
226
227 std::size_t size() const {
228 const std::lock_guard<std::mutex> lock{m_mutex};
229 return m_queue.size();
230 }
231
232 bool in_use() const noexcept {
233 return m_in_use;
234 }
235
236 void shutdown() {
237 m_in_use = false;
238 const std::lock_guard<std::mutex> lock{m_mutex};
239 while (!m_queue.empty()) {
240 m_queue.pop();
241 }
242 m_data_available.notify_all();
243 }
244
245 }; // class Queue
246
247 } // namespace thread
248
249} // namespace osmium
250
251#endif // OSMIUM_THREAD_QUEUE_HPP
Definition: queue.hpp:57
bool try_pop(T &value)
Definition: queue.hpp:201
std::mutex m_mutex
Definition: queue.hpp:66
Queue & operator=(const Queue &)=delete
Queue & operator=(Queue &&)=delete
bool empty() const
Definition: queue.hpp:222
bool in_use() const noexcept
Definition: queue.hpp:232
void push(T value)
Definition: queue.hpp:149
void wait_and_pop(T &value)
Definition: queue.hpp:178
std::atomic< bool > m_in_use
Definition: queue.hpp:76
Queue(const Queue &)=delete
std::condition_variable m_space_available
Used to signal producers when queue is not full.
Definition: queue.hpp:74
Queue(std::size_t max_size=0, std::string name="")
Definition: queue.hpp:109
std::size_t size() const
Definition: queue.hpp:227
std::queue< T > m_queue
Definition: queue.hpp:68
Queue(Queue &&)=delete
const std::size_t m_max_size
Definition: queue.hpp:61
std::condition_variable m_data_available
Used to signal consumers when data is available in the queue.
Definition: queue.hpp:71
void shutdown()
Definition: queue.hpp:236
const std::string m_name
Name of this queue (for debugging only).
Definition: queue.hpp:64
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
Definition: location.hpp:555