Libosmium  2.20.0
Fast and flexible C++ library for working with OpenStreetMap data
reader.hpp
Go to the documentation of this file.
1#ifndef OSMIUM_IO_READER_HPP
2#define OSMIUM_IO_READER_HPP
3
4/*
5
6This file is part of Osmium (https://osmcode.org/libosmium).
7
8Copyright 2013-2023 Jochen Topf <jochen@topf.org> and others (see README).
9
10Boost Software License - Version 1.0 - August 17th, 2003
11
12Permission is hereby granted, free of charge, to any person or organization
13obtaining a copy of the software and accompanying documentation covered by
14this license (the "Software") to use, reproduce, display, distribute,
15execute, and transmit the Software, and to prepare derivative works of the
16Software, and to permit third-parties to whom the Software is furnished to
17do so, all subject to the following:
18
19The copyright notices in the Software and this entire statement, including
20the above license grant, this restriction and the following disclaimer,
21must be included in all copies of the Software, in whole or in part, and
22all derivative works of the Software, unless such copies or derivative
23works are solely in the form of machine-executable object code generated by
24a source language processor.
25
26THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32DEALINGS IN THE SOFTWARE.
33
34*/
35
37#include <osmium/io/detail/input_format.hpp>
38#include <osmium/io/detail/queue_util.hpp>
39#include <osmium/io/detail/read_thread.hpp>
40#include <osmium/io/detail/read_write.hpp>
41#include <osmium/io/error.hpp>
42#include <osmium/io/file.hpp>
43#include <osmium/io/header.hpp>
49
50#include <cerrno>
51#include <cstdlib>
52#include <fcntl.h>
53#include <future>
54#include <memory>
55#include <string>
56#include <system_error>
57#include <thread>
58#include <utility>
59
60#ifndef _WIN32
61# include <sys/wait.h>
62#endif
63
64#ifndef _MSC_VER
65# include <unistd.h>
66#endif
67
68namespace osmium {
69
70 namespace io {
71
72 namespace detail {
73
74 inline std::size_t get_input_queue_size() noexcept {
75 return osmium::config::get_max_queue_size("INPUT", 20);
76 }
77
78 inline std::size_t get_osmdata_queue_size() noexcept {
79 return osmium::config::get_max_queue_size("OSMDATA", 20);
80 }
81
82 } // namespace detail
83
90 class Reader {
91
92 // The Reader::read() function reads from a queue of buffers which
93 // can contain nested buffers. These nested buffers will be in
94 // here, because read() can only return a single unnested buffer.
95 osmium::memory::Buffer m_back_buffers{};
96
98
100
101 std::atomic<std::size_t> m_offset{0};
102
103 detail::ParserFactory::create_parser_type m_creator;
104
105 enum class status {
106 okay = 0, // normal reading
107 error = 1, // some error occurred while reading
108 closed = 2, // close() called
109 eof = 3 // eof of file was reached without error
111
112 int m_childpid = 0;
113
114 detail::future_string_queue_type m_input_queue;
115
116 int m_fd = -1;
117
118 std::size_t m_file_size = 0;
119
120 std::unique_ptr<osmium::io::Decompressor> m_decompressor;
121
122 osmium::io::detail::ReadThreadManager m_read_thread_manager;
123
124 detail::future_buffer_queue_type m_osmdata_queue;
125 detail::queue_wrapper<osmium::memory::Buffer> m_osmdata_queue_wrapper;
126
127 std::future<osmium::io::Header> m_header_future{};
129
131
135
136 void set_option(osmium::thread::Pool& pool) noexcept {
137 m_pool = &pool;
138 }
139
141 m_read_which_entities = value;
142 }
143
144 void set_option(osmium::io::read_meta value) noexcept {
145 // Ignore this setting if we have a history/change file,
146 // because if this is set to "no", we don't see the difference
147 // between visible and deleted objects.
149 m_read_metadata = value;
150 }
151 }
152
154 m_buffers_kind = value;
155 }
156
157 // This function will run in a separate thread.
159 int fd,
160 const detail::ParserFactory::create_parser_type& creator,
161 detail::future_string_queue_type& input_queue,
162 detail::future_buffer_queue_type& osmdata_queue,
163 std::promise<osmium::io::Header>&& header_promise,
164 std::atomic<std::size_t>* offset_ptr,
165 osmium::osm_entity_bits::type read_which_entities,
166 osmium::io::read_meta read_metadata,
167 osmium::io::buffers_type buffers_kind,
168 bool want_buffered_pages_removed) {
169 std::promise<osmium::io::Header> promise{std::move(header_promise)};
170 osmium::io::detail::parser_arguments args = {
171 pool,
172 fd,
173 input_queue,
174 osmdata_queue,
175 promise,
176 offset_ptr,
177 read_which_entities,
178 read_metadata,
179 buffers_kind,
180 want_buffered_pages_removed};
181 creator(args)->parse();
182 }
183
184#ifndef _WIN32
196 static int execute(const std::string& command, const std::string& filename, int* childpid) {
197 int pipefd[2];
198 if (pipe(pipefd) < 0) {
199 throw std::system_error{errno, std::system_category(), "opening pipe failed"};
200 }
201 const pid_t pid = fork();
202 if (pid < 0) {
203 throw std::system_error{errno, std::system_category(), "fork failed"};
204 }
205 if (pid == 0) { // child
206 // close all file descriptors except one end of the pipe
207 for (int i = 0; i < 32; ++i) {
208 if (i != pipefd[1]) {
209 ::close(i);
210 }
211 }
212 if (dup2(pipefd[1], 1) < 0) { // put end of pipe as stdout/stdin
213 std::exit(1); // NOLINT(concurrency-mt-unsafe)
214 }
215
216 ::open("/dev/null", O_RDONLY); // stdin
217 ::open("/dev/null", O_WRONLY); // stderr
218 // hack: -g switches off globbing in curl which allows [] to be used in file names
219 // this is important for XAPI URLs
220 // in theory this execute() function could be used for other commands, but it is
221 // only used for curl at the moment, so this is okay.
222 if (::execlp(command.c_str(), command.c_str(), "-g", filename.c_str(), nullptr) < 0) {
223 std::exit(1); // NOLINT(concurrency-mt-unsafe)
224 }
225 }
226 // parent
227 *childpid = pid;
228 ::close(pipefd[1]);
229 return pipefd[0];
230 }
231#endif
232
241 static int open_input_file_or_url(const std::string& filename, int* childpid) {
242 const std::string protocol{filename.substr(0, filename.find_first_of(':'))};
243 if (protocol == "http" || protocol == "https" || protocol == "ftp" || protocol == "file") {
244#ifndef _WIN32
245 return execute("curl", filename, childpid);
246#else
247 throw io_error{"Reading OSM files from the network currently not supported on Windows."};
248#endif
249 }
250 const int fd = osmium::io::detail::open_for_reading(filename);
251#if defined(__linux__) || defined(__FreeBSD__)
252 if (fd >= 0) {
253 // Tell the kernel we are going to read this file sequentially
254 ::posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
255 }
256#endif
257 return fd;
258 }
259
260 static std::unique_ptr<Decompressor> make_decompressor(const osmium::io::File& file, int fd, std::atomic<std::size_t>* offset_ptr) {
261 const auto& factory = osmium::io::CompressionFactory::instance();
262 std::unique_ptr<Decompressor> decompressor;
263
264 if (file.buffer()) {
265 decompressor = factory.create_decompressor(file.compression(), file.buffer(), file.buffer_size());
266 } else if (file.format() == file_format::pbf) {
267 decompressor = std::unique_ptr<Decompressor>{new DummyDecompressor{}};
268 } else {
269 decompressor = factory.create_decompressor(file.compression(), fd);
270 }
271
272 decompressor->set_offset_ptr(offset_ptr);
273 return decompressor;
274 }
275
276 public:
277
318 template <typename... TArgs>
319 explicit Reader(const osmium::io::File& file, TArgs&&... args) :
320 m_file(file.check()),
321 m_creator(detail::ParserFactory::instance().get_creator_function(m_file)),
322 m_input_queue(detail::get_input_queue_size(), "raw_input"),
323 m_fd(m_file.buffer() ? -1 : open_input_file_or_url(m_file.filename(), &m_childpid)),
324 m_file_size(m_fd > 2 ? osmium::file_size(m_fd) : 0),
327 m_osmdata_queue(detail::get_osmdata_queue_size(), "parser_results"),
329
330 (void)std::initializer_list<int>{(set_option(args), 0)...};
331
332 if (!m_pool) {
334 }
335
336 std::promise<osmium::io::Header> header_promise;
337 m_header_future = header_promise.get_future();
338
340 if (cpc >= 0) {
341 m_decompressor->set_want_buffered_pages_removed(true);
342 }
343
344 const int fd_for_parser = m_decompressor->is_real() ? -1 : m_fd;
345 m_thread = osmium::thread::thread_handler{parser_thread, std::ref(*m_pool), fd_for_parser, std::ref(m_creator),
346 std::ref(m_input_queue), std::ref(m_osmdata_queue),
347 std::move(header_promise), &m_offset, m_read_which_entities,
349 m_decompressor->want_buffered_pages_removed()};
350 }
351
352 template <typename... TArgs>
353 explicit Reader(const std::string& filename, TArgs&&... args) :
354 Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
355 }
356
357 template <typename... TArgs>
358 explicit Reader(const char* filename, TArgs&&... args) :
359 Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
360 }
361
362 Reader(const Reader&) = delete;
363 Reader& operator=(const Reader&) = delete;
364
365 Reader(Reader&&) = delete;
366 Reader& operator=(Reader&&) = delete;
367
368 ~Reader() noexcept {
369 try {
370 close();
371 } catch (...) {
372 // Ignore any exceptions because destructor must not throw.
373 }
374 }
375
384 void close() {
386
388
389 m_osmdata_queue_wrapper.shutdown();
390
391 try {
392 m_read_thread_manager.close();
393 } catch (...) {
394 // Ignore any exceptions.
395 }
396
397#ifndef _WIN32
398 if (m_childpid) {
399 int status = 0;
400 const pid_t pid = ::waitpid(m_childpid, &status, 0);
401#pragma GCC diagnostic push
402#pragma GCC diagnostic ignored "-Wold-style-cast"
403 if (pid < 0 || !WIFEXITED(status) || WEXITSTATUS(status) != 0) { // NOLINT(hicpp-signed-bitwise)
404 throw std::system_error{errno, std::system_category(), "subprocess returned error"};
405 }
406#pragma GCC diagnostic pop
407 m_childpid = 0;
408 }
409#endif
410 }
411
419 if (m_status == status::error) {
420 throw io_error{"Can not get header from reader when in status 'error'"};
421 }
422
423 try {
424 if (m_header_future.valid()) {
426 }
427 } catch (...) {
428 close();
430 throw;
431 }
432
433 return m_header;
434 }
435
444 osmium::memory::Buffer read() {
445 osmium::memory::Buffer buffer;
446
447 // If there are buffers on the stack, return those first.
448 if (m_back_buffers) {
449 if (m_back_buffers.has_nested_buffers()) {
450 buffer = std::move(*m_back_buffers.get_last_nested());
451 } else {
452 buffer = std::move(m_back_buffers);
453 m_back_buffers = osmium::memory::Buffer{};
454 }
455 return buffer;
456 }
457
458 if (m_status != status::okay) {
459 throw io_error{"Can not read from reader when in status 'closed', 'eof', or 'error'"};
460 }
461
464 return buffer;
465 }
466
467 try {
468 // m_input_format.read() can return an invalid buffer to signal EOF,
469 // or a valid buffer with or without data. A valid buffer
470 // without data is not an error, it just means we have to
471 // keep getting the next buffer until there is one with data.
472 while (true) {
473 buffer = m_osmdata_queue_wrapper.pop();
474 if (detail::at_end_of_data(buffer)) {
476 m_read_thread_manager.close();
477 return buffer;
478 }
479 if (buffer.has_nested_buffers()) {
480 m_back_buffers = std::move(buffer);
481 buffer = std::move(*m_back_buffers.get_last_nested());
482 }
483 if (buffer.committed() > 0) {
484 return buffer;
485 }
486 }
487 } catch (...) {
488 close();
490 throw;
491 }
492 }
493
498 bool eof() const {
500 }
501
506 std::size_t file_size() const noexcept {
507 return m_file_size;
508 }
509
524 std::size_t offset() const noexcept {
525 return m_offset;
526 }
527
528 }; // class Reader
529
538 template <typename... TArgs>
539 osmium::memory::Buffer read_file(TArgs&&... args) {
540 osmium::memory::Buffer buffer{1024UL * 1024UL, osmium::memory::Buffer::auto_grow::yes};
541
542 Reader reader{std::forward<TArgs>(args)...};
543 while (auto read_buffer = reader.read()) {
544 buffer.add_buffer(read_buffer);
545 buffer.commit();
546 }
547
548 return buffer;
549 }
550
551 } // namespace io
552
553} // namespace osmium
554
555#endif // OSMIUM_IO_READER_HPP
static CompressionFactory & instance()
Definition: compression.hpp:191
void set_offset_ptr(std::atomic< std::size_t > *offset_ptr) noexcept
Definition: compression.hpp:121
Definition: compression.hpp:287
Definition: file.hpp:72
size_t buffer_size() const noexcept
Definition: file.hpp:147
bool has_multiple_object_versions() const noexcept
Definition: file.hpp:303
file_compression compression() const noexcept
Definition: file.hpp:294
file_format format() const noexcept
Definition: file.hpp:285
const char * buffer() const noexcept
Definition: file.hpp:143
Definition: header.hpp:68
Definition: reader.hpp:90
osmium::memory::Buffer read()
Definition: reader.hpp:444
osmium::io::buffers_type m_buffers_kind
Definition: reader.hpp:134
detail::future_string_queue_type m_input_queue
Definition: reader.hpp:114
osmium::memory::Buffer m_back_buffers
Definition: reader.hpp:95
static void parser_thread(osmium::thread::Pool &pool, int fd, const detail::ParserFactory::create_parser_type &creator, detail::future_string_queue_type &input_queue, detail::future_buffer_queue_type &osmdata_queue, std::promise< osmium::io::Header > &&header_promise, std::atomic< std::size_t > *offset_ptr, osmium::osm_entity_bits::type read_which_entities, osmium::io::read_meta read_metadata, osmium::io::buffers_type buffers_kind, bool want_buffered_pages_removed)
Definition: reader.hpp:158
Reader & operator=(Reader &&)=delete
int m_childpid
Definition: reader.hpp:112
void set_option(osmium::io::read_meta value) noexcept
Definition: reader.hpp:144
detail::future_buffer_queue_type m_osmdata_queue
Definition: reader.hpp:124
static std::unique_ptr< Decompressor > make_decompressor(const osmium::io::File &file, int fd, std::atomic< std::size_t > *offset_ptr)
Definition: reader.hpp:260
std::size_t m_file_size
Definition: reader.hpp:118
void set_option(osmium::thread::Pool &pool) noexcept
Definition: reader.hpp:136
void set_option(osmium::io::buffers_type value) noexcept
Definition: reader.hpp:153
Reader & operator=(const Reader &)=delete
static int execute(const std::string &command, const std::string &filename, int *childpid)
Definition: reader.hpp:196
enum osmium::io::Reader::status m_status
static int open_input_file_or_url(const std::string &filename, int *childpid)
Definition: reader.hpp:241
std::unique_ptr< osmium::io::Decompressor > m_decompressor
Definition: reader.hpp:120
status
Definition: reader.hpp:105
Reader(const char *filename, TArgs &&... args)
Definition: reader.hpp:358
osmium::io::Header m_header
Definition: reader.hpp:128
detail::ParserFactory::create_parser_type m_creator
Definition: reader.hpp:103
osmium::io::Header header()
Definition: reader.hpp:418
detail::queue_wrapper< osmium::memory::Buffer > m_osmdata_queue_wrapper
Definition: reader.hpp:125
Reader(const osmium::io::File &file, TArgs &&... args)
Definition: reader.hpp:319
std::size_t file_size() const noexcept
Definition: reader.hpp:506
Reader(Reader &&)=delete
void set_option(osmium::osm_entity_bits::type value) noexcept
Definition: reader.hpp:140
std::future< osmium::io::Header > m_header_future
Definition: reader.hpp:127
osmium::io::detail::ReadThreadManager m_read_thread_manager
Definition: reader.hpp:122
bool eof() const
Definition: reader.hpp:498
Reader(const Reader &)=delete
std::size_t offset() const noexcept
Definition: reader.hpp:524
std::atomic< std::size_t > m_offset
Definition: reader.hpp:101
osmium::thread::thread_handler m_thread
Definition: reader.hpp:130
void close()
Definition: reader.hpp:384
~Reader() noexcept
Definition: reader.hpp:368
osmium::io::File m_file
Definition: reader.hpp:97
osmium::osm_entity_bits::type m_read_which_entities
Definition: reader.hpp:132
Reader(const std::string &filename, TArgs &&... args)
Definition: reader.hpp:353
osmium::io::read_meta m_read_metadata
Definition: reader.hpp:133
int m_fd
Definition: reader.hpp:116
osmium::thread::Pool * m_pool
Definition: reader.hpp:99
Definition: pool.hpp:90
static Pool & default_instance()
Definition: pool.hpp:186
Definition: util.hpp:91
Definition: attr.hpp:342
std::size_t get_max_queue_size(const char *queue_name, const std::size_t default_value) noexcept
Definition: config.hpp:83
int8_t clean_page_cache_after_read() noexcept
Definition: config.hpp:106
osmium::memory::Buffer read_file(TArgs &&... args)
Definition: reader.hpp:539
buffers_type
Definition: file_format.hpp:60
read_meta
Definition: file_format.hpp:55
type
Definition: entity_bits.hpp:63
@ all
object or changeset
Definition: entity_bits.hpp:76
@ nothing
Definition: entity_bits.hpp:67
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
Definition: location.hpp:555
Definition: error.hpp:46