Libosmium  2.20.0
Fast and flexible C++ library for working with OpenStreetMap data
writer.hpp
Go to the documentation of this file.
1#ifndef OSMIUM_IO_WRITER_HPP
2#define OSMIUM_IO_WRITER_HPP
3
4/*
5
6This file is part of Osmium (https://osmcode.org/libosmium).
7
8Copyright 2013-2023 Jochen Topf <jochen@topf.org> and others (see README).
9
10Boost Software License - Version 1.0 - August 17th, 2003
11
12Permission is hereby granted, free of charge, to any person or organization
13obtaining a copy of the software and accompanying documentation covered by
14this license (the "Software") to use, reproduce, display, distribute,
15execute, and transmit the Software, and to prepare derivative works of the
16Software, and to permit third-parties to whom the Software is furnished to
17do so, all subject to the following:
18
19The copyright notices in the Software and this entire statement, including
20the above license grant, this restriction and the following disclaimer,
21must be included in all copies of the Software, in whole or in part, and
22all derivative works of the Software, unless such copies or derivative
23works are solely in the form of machine-executable object code generated by
24a source language processor.
25
26THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32DEALINGS IN THE SOFTWARE.
33
34*/
35
37#include <osmium/io/detail/output_format.hpp>
38#include <osmium/io/detail/queue_util.hpp>
39#include <osmium/io/detail/read_write.hpp>
40#include <osmium/io/detail/write_thread.hpp>
41#include <osmium/io/error.hpp>
42#include <osmium/io/file.hpp>
43#include <osmium/io/header.hpp>
49#include <osmium/version.hpp>
50
51#include <cassert>
52#include <cstddef>
53#include <exception>
54#include <functional>
55#include <future>
56#include <initializer_list>
57#include <memory>
58#include <string>
59#include <utility>
60
61namespace osmium {
62
63 namespace memory {
64 class Item;
65 } //namespace memory
66
67 namespace io {
68
69 namespace detail {
70
71 inline std::size_t get_output_queue_size() noexcept {
72 return osmium::config::get_max_queue_size("OUTPUT", 20);
73 }
74
75 } // namespace detail
76
100 class Writer {
101
102 enum {
103 default_buffer_size = 10UL * 1024UL * 1024UL
104 };
105
107
108 detail::future_string_queue_type m_output_queue{detail::get_output_queue_size(), "raw_output"};
109
110 std::unique_ptr<osmium::io::detail::OutputFormat> m_output{nullptr};
111
112 osmium::memory::Buffer m_buffer{};
113
115
117
118 std::future<std::size_t> m_write_future{};
119
121
122 // Checking the m_write_future is much more expensive then checking
123 // one atomic bool, so we set this bool in the write_thread when
124 // the writer should check the future...
125 std::atomic_bool m_notification{false};
126
127 enum class status {
128 okay = 0, // normal writing
129 error = 1, // some error occurred while writing
130 closed = 2 // close() called successfully
132
133 // Has the header already bin written to the file?
134 bool m_header_written = false;
135
136 // This function will run in a separate thread.
137 static void write_thread(detail::future_string_queue_type& output_queue,
138 std::unique_ptr<osmium::io::Compressor>&& compressor,
139 std::promise<std::size_t>&& write_promise,
140 std::atomic_bool* notification) {
141 detail::WriteThread write_thread{output_queue,
142 std::move(compressor),
143 std::move(write_promise),
144 notification};
145 write_thread();
146 }
147
149 if (m_header.get("generator").empty()) {
150 m_header.set("generator", "libosmium/" LIBOSMIUM_VERSION_STRING);
151 }
152
153 m_output->write_header(m_header);
154
155 m_header_written = true;
156 }
157
158 void do_write(osmium::memory::Buffer&& buffer) {
159 if (!m_header_written) {
160 write_header();
161 }
162 if (buffer && buffer.committed() > 0) {
163 m_output->write_buffer(std::move(buffer));
164 }
165 }
166
167 void do_flush() {
168 if (!m_header_written) {
169 write_header();
170 }
171 if (m_notification) {
173 }
174 if (m_buffer && m_buffer.committed() > 0) {
175 osmium::memory::Buffer buffer{m_buffer_size,
176 osmium::memory::Buffer::auto_grow::no};
177 using std::swap;
178 swap(m_buffer, buffer);
179
180 m_output->write_buffer(std::move(buffer));
181 }
182 }
183
184 template <typename TFunction, typename... TArgs>
185 void ensure_cleanup(TFunction func, TArgs&&... args) {
186 if (m_status != status::okay) {
187 throw io_error("Can not write to writer when in status 'closed' or 'error'");
188 }
189
190 try {
191 func(std::forward<TArgs>(args)...);
192 } catch (...) {
194 detail::add_to_queue(m_output_queue, std::current_exception());
195 detail::add_end_of_data_to_queue(m_output_queue);
196 throw;
197 }
198 }
199
205 };
206
207 static void set_option(options_type& options, osmium::thread::Pool& pool) {
208 options.pool = &pool;
209 }
210
211 static void set_option(options_type& options, const osmium::io::Header& header) {
212 options.header = header;
213 }
214
215 static void set_option(options_type& options, overwrite value) {
216 options.allow_overwrite = value;
217 }
218
219 static void set_option(options_type& options, fsync value) {
220 options.sync = value;
221 }
222
223 void do_close() {
224 if (m_status == status::okay) {
225 ensure_cleanup([&]() {
226 do_write(std::move(m_buffer));
227 m_output->write_end();
229 detail::add_end_of_data_to_queue(m_output_queue);
230 });
231 }
232 }
233
234 public:
235
266 template <typename... TArgs>
267 explicit Writer(const osmium::io::File& file, TArgs&&... args) :
268 m_file(file.check()) {
269 assert(!m_file.buffer()); // XXX can't handle pseudo-files
270
271 options_type options;
272 (void)std::initializer_list<int>{(set_option(options, args), 0)...};
273
274 if (!options.pool) {
276 }
277
278 m_header = options.header;
279
280 m_output = osmium::io::detail::OutputFormatFactory::instance().create_output(*options.pool, m_file, m_output_queue);
281
282 std::unique_ptr<osmium::io::Compressor> compressor =
284 osmium::io::detail::open_for_writing(m_file.filename(), options.allow_overwrite),
285 options.sync);
286
287 std::promise<std::size_t> write_promise;
288 m_write_future = write_promise.get_future();
289 m_thread = osmium::thread::thread_handler{write_thread, std::ref(m_output_queue), std::move(compressor), std::move(write_promise), &m_notification};
290 }
291
292 template <typename... TArgs>
293 explicit Writer(const std::string& filename, TArgs&&... args) :
294 Writer(osmium::io::File{filename}, std::forward<TArgs>(args)...) {
295 }
296
297 template <typename... TArgs>
298 explicit Writer(const char* filename, TArgs&&... args) :
299 Writer(osmium::io::File{filename}, std::forward<TArgs>(args)...) {
300 }
301
302 Writer(const Writer&) = delete;
303 Writer& operator=(const Writer&) = delete;
304
305 Writer(Writer&&) = delete;
306 Writer& operator=(Writer&&) = delete;
307
308 ~Writer() noexcept {
309 try {
310 do_close();
311 } catch (...) {
312 // Ignore any exceptions because destructor must not throw.
313 }
314 }
315
319 size_t buffer_size() const noexcept {
320 return m_buffer_size;
321 }
322
327 void set_buffer_size(size_t size) noexcept {
328 m_buffer_size = size;
329 }
330
337 void set_header(const osmium::io::Header& header) {
338 m_header = header;
339 }
340
348 void flush() {
349 ensure_cleanup([&]() {
350 do_flush();
351 });
352 }
353
362 void operator()(osmium::memory::Buffer&& buffer) {
363 ensure_cleanup([&]() {
364 do_flush();
365 do_write(std::move(buffer));
366 });
367 }
368
377 ensure_cleanup([&]() {
378 if (!m_buffer) {
379 m_buffer = osmium::memory::Buffer{m_buffer_size,
380 osmium::memory::Buffer::auto_grow::no};
381 }
382 try {
383 m_buffer.push_back(item);
384 } catch (const osmium::buffer_is_full&) {
385 do_flush();
386 m_buffer.push_back(item);
387 }
388 });
389 }
390
402 std::size_t close() {
403 do_close();
404
405 if (m_write_future.valid()) {
406 return m_write_future.get();
407 }
408
409 return 0;
410 }
411
412 }; // class Writer
413
414 } // namespace io
415
416} // namespace osmium
417
418#endif // OSMIUM_IO_WRITER_HPP
std::unique_ptr< osmium::io::Compressor > create_compressor(const osmium::io::file_compression compression, TArgs &&... args) const
Definition: compression.hpp:211
static CompressionFactory & instance()
Definition: compression.hpp:191
Definition: file.hpp:72
File & filename(const std::string &filename)
Definition: file.hpp:312
file_compression compression() const noexcept
Definition: file.hpp:294
const char * buffer() const noexcept
Definition: file.hpp:143
Definition: header.hpp:68
Definition: writer.hpp:100
std::size_t close()
Definition: writer.hpp:402
@ default_buffer_size
Definition: writer.hpp:103
void write_header()
Definition: writer.hpp:148
static void set_option(options_type &options, fsync value)
Definition: writer.hpp:219
static void set_option(options_type &options, const osmium::io::Header &header)
Definition: writer.hpp:211
static void set_option(options_type &options, osmium::thread::Pool &pool)
Definition: writer.hpp:207
size_t m_buffer_size
Definition: writer.hpp:116
Writer(Writer &&)=delete
void do_flush()
Definition: writer.hpp:167
Writer(const osmium::io::File &file, TArgs &&... args)
Definition: writer.hpp:267
Writer(const std::string &filename, TArgs &&... args)
Definition: writer.hpp:293
size_t buffer_size() const noexcept
Definition: writer.hpp:319
status
Definition: writer.hpp:127
osmium::thread::thread_handler m_thread
Definition: writer.hpp:120
void flush()
Definition: writer.hpp:348
void operator()(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:362
bool m_header_written
Definition: writer.hpp:134
osmium::io::Header m_header
Definition: writer.hpp:114
osmium::memory::Buffer m_buffer
Definition: writer.hpp:112
std::atomic_bool m_notification
Definition: writer.hpp:125
~Writer() noexcept
Definition: writer.hpp:308
std::unique_ptr< osmium::io::detail::OutputFormat > m_output
Definition: writer.hpp:110
void set_header(const osmium::io::Header &header)
Definition: writer.hpp:337
static void set_option(options_type &options, overwrite value)
Definition: writer.hpp:215
std::future< std::size_t > m_write_future
Definition: writer.hpp:118
void do_close()
Definition: writer.hpp:223
void do_write(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:158
Writer & operator=(Writer &&)=delete
Writer & operator=(const Writer &)=delete
detail::future_string_queue_type m_output_queue
Definition: writer.hpp:108
void set_buffer_size(size_t size) noexcept
Definition: writer.hpp:327
Writer(const Writer &)=delete
osmium::io::File m_file
Definition: writer.hpp:106
void operator()(const osmium::memory::Item &item)
Definition: writer.hpp:376
void ensure_cleanup(TFunction func, TArgs &&... args)
Definition: writer.hpp:185
static void write_thread(detail::future_string_queue_type &output_queue, std::unique_ptr< osmium::io::Compressor > &&compressor, std::promise< std::size_t > &&write_promise, std::atomic_bool *notification)
Definition: writer.hpp:137
Writer(const char *filename, TArgs &&... args)
Definition: writer.hpp:298
enum osmium::io::Writer::status m_status
Definition: item.hpp:105
Definition: pool.hpp:90
static Pool & default_instance()
Definition: pool.hpp:186
Definition: util.hpp:91
Definition: attr.hpp:342
std::size_t get_max_queue_size(const char *queue_name, const std::size_t default_value) noexcept
Definition: config.hpp:83
fsync
Definition: writer_options.hpp:51
overwrite
Definition: writer_options.hpp:43
void check_for_exception(std::future< T > &future)
Definition: util.hpp:57
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
Definition: location.hpp:555
Definition: writer.hpp:200
overwrite allow_overwrite
Definition: writer.hpp:202
osmium::thread::Pool * pool
Definition: writer.hpp:204
fsync sync
Definition: writer.hpp:203
osmium::io::Header header
Definition: writer.hpp:201
Definition: error.hpp:46
#define LIBOSMIUM_VERSION_STRING
Definition: version.hpp:45