TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_READ_UNTIL_HPP
11 : #define BOOST_CAPY_READ_UNTIL_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/cond.hpp>
16 : #include <coroutine>
17 : #include <boost/capy/error.hpp>
18 : #include <boost/capy/io_result.hpp>
19 : #include <boost/capy/io_task.hpp>
20 : #include <boost/capy/concept/dynamic_buffer.hpp>
21 : #include <boost/capy/concept/match_condition.hpp>
22 : #include <boost/capy/concept/read_stream.hpp>
23 : #include <boost/capy/ex/io_env.hpp>
24 :
25 : #include <algorithm>
26 : #include <cstddef>
27 : #include <optional>
28 : #include <stop_token>
29 : #include <string_view>
30 : #include <type_traits>
31 :
32 : namespace boost {
33 : namespace capy {
34 :
35 : namespace detail {
36 :
37 : // Linearize a buffer sequence into a string
38 : inline
39 : std::string
40 HIT 2 : linearize_buffers(ConstBufferSequence auto const& data)
41 : {
42 2 : std::string linear;
43 2 : linear.reserve(buffer_size(data));
44 2 : auto const end_ = end(data);
45 6 : for(auto it = begin(data); it != end_; ++it)
46 : {
47 4 : const_buffer b = *it;
48 8 : linear.append(
49 4 : static_cast<char const*>(b.data()),
50 : b.size());
51 : }
52 4 : return linear;
53 : } // LCOV_EXCL_LINE gcov brace artifact (linearize_buffers is exercised)
54 :
55 : // Search buffer using a MatchCondition, with single-buffer optimization
56 : template<MatchCondition M>
57 : std::size_t
58 263 : search_buffer_for_match(
59 : ConstBufferSequence auto const& data,
60 : M const& match,
61 : std::size_t* hint = nullptr)
62 : {
63 : // Fast path: single buffer - no linearization needed
64 263 : if(buffer_length(data) == 1)
65 : {
66 262 : auto const& buf = *begin(data);
67 786 : return match(std::string_view(
68 262 : static_cast<char const*>(buf.data()),
69 262 : buf.size()), hint);
70 : }
71 : // Multiple buffers - linearize
72 1 : return match(linearize_buffers(data), hint);
73 : }
74 :
75 : // Implementation coroutine for read_until with MatchCondition
76 : template<class Stream, class B, MatchCondition M>
77 : io_task<std::size_t>
78 136 : read_until_match_impl(
79 : Stream& stream,
80 : B& buffers,
81 : M match,
82 : std::size_t initial_amount)
83 : {
84 : std::size_t amount = initial_amount;
85 :
86 : for(;;)
87 : {
88 : // Check max_size before preparing
89 : if(buffers.size() >= buffers.max_size())
90 : co_return {error::not_found, 0};
91 :
92 : // Prepare space, respecting max_size
93 : std::size_t const available = buffers.max_size() - buffers.size();
94 : std::size_t const to_prepare = (std::min)(amount, available);
95 : if(to_prepare == 0)
96 : co_return {error::not_found, 0};
97 :
98 : auto mb = buffers.prepare(to_prepare);
99 : auto [ec, n] = co_await stream.read_some(mb);
100 : buffers.commit(n);
101 :
102 : if(!ec)
103 : {
104 : auto pos = search_buffer_for_match(buffers.data(), match);
105 : if(pos != std::string_view::npos)
106 : co_return {{}, pos};
107 : }
108 :
109 : if(ec == cond::eof)
110 : co_return {error::eof, buffers.size()};
111 : if(ec)
112 : co_return {ec, buffers.size()};
113 :
114 : // Grow buffer size for next iteration
115 : if(n == buffer_size(mb))
116 : amount = amount / 2 + amount;
117 : }
118 272 : }
119 :
120 : template<class Stream, class B, MatchCondition M, bool OwnsBuffer>
121 : struct read_until_awaitable
122 : {
123 : Stream* stream_;
124 : M match_;
125 : std::size_t initial_amount_;
126 : std::optional<io_result<std::size_t>> immediate_;
127 : std::optional<io_task<std::size_t>> inner_;
128 :
129 : using storage_type = std::conditional_t<OwnsBuffer, B, B*>;
130 : storage_type buffers_storage_;
131 :
132 136 : B& buffers() noexcept
133 : {
134 : if constexpr(OwnsBuffer)
135 126 : return buffers_storage_;
136 : else
137 10 : return *buffers_storage_;
138 : }
139 :
140 : // Constructor for lvalue (pointer storage)
141 14 : read_until_awaitable(
142 : Stream& stream,
143 : B* buffers,
144 : M match,
145 : std::size_t initial_amount)
146 : requires (!OwnsBuffer)
147 14 : : stream_(std::addressof(stream))
148 14 : , match_(std::move(match))
149 14 : , initial_amount_(initial_amount)
150 14 : , buffers_storage_(buffers)
151 : {
152 14 : auto pos = search_buffer_for_match(
153 14 : buffers_storage_->data(), match_);
154 14 : if(pos != std::string_view::npos)
155 4 : immediate_.emplace(io_result<std::size_t>{{}, pos});
156 14 : }
157 :
158 : // Constructor for rvalue adapter (owned storage)
159 132 : read_until_awaitable(
160 : Stream& stream,
161 : B&& buffers,
162 : M match,
163 : std::size_t initial_amount)
164 : requires OwnsBuffer
165 132 : : stream_(std::addressof(stream))
166 132 : , match_(std::move(match))
167 132 : , initial_amount_(initial_amount)
168 132 : , buffers_storage_(std::move(buffers))
169 : {
170 132 : auto pos = search_buffer_for_match(
171 132 : buffers_storage_.data(), match_);
172 132 : if(pos != std::string_view::npos)
173 6 : immediate_.emplace(io_result<std::size_t>{{}, pos});
174 132 : }
175 :
176 : bool
177 146 : await_ready() const noexcept
178 : {
179 146 : return immediate_.has_value();
180 : }
181 :
182 : std::coroutine_handle<>
183 136 : await_suspend(std::coroutine_handle<> h, io_env const* env)
184 : {
185 272 : inner_.emplace(read_until_match_impl(
186 136 : *stream_, buffers(), match_, initial_amount_));
187 136 : return inner_->await_suspend(h, env);
188 : }
189 :
190 : io_result<std::size_t>
191 146 : await_resume()
192 : {
193 146 : if(immediate_)
194 10 : return *immediate_;
195 136 : return inner_->await_resume();
196 : }
197 : };
198 :
199 : template<ReadStream Stream, class B, MatchCondition M>
200 : using read_until_return_t = read_until_awaitable<
201 : Stream,
202 : std::remove_reference_t<B>,
203 : M,
204 : !std::is_lvalue_reference_v<B&&>>;
205 :
206 : } // namespace detail
207 :
208 : /** Match condition that searches for a delimiter string.
209 :
210 : Satisfies @ref MatchCondition. Returns the position after the
211 : delimiter when found, or `npos` otherwise. Provides an overlap
212 : hint of `delim.size() - 1` to handle delimiters spanning reads.
213 :
214 : @see MatchCondition, read_until
215 : */
216 : struct match_delim
217 : {
218 : /** The delimiter string to search for.
219 :
220 : @note The referenced characters must remain valid
221 : for the lifetime of this object and any pending
222 : read operation.
223 : */
224 : std::string_view delim;
225 :
226 : /** Search for the delimiter in `data`.
227 :
228 : @param data The data to search.
229 : @param hint If non-null, receives the overlap hint
230 : on miss.
231 : @return `0` if `delim` is empty; otherwise the position
232 : just past the delimiter, or `npos` if not found.
233 : */
234 : std::size_t
235 226 : operator()(
236 : std::string_view data,
237 : std::size_t* hint) const noexcept
238 : {
239 226 : if(delim.empty())
240 2 : return 0;
241 224 : auto pos = data.find(delim);
242 224 : if(pos != std::string_view::npos)
243 27 : return pos + delim.size();
244 197 : if(hint)
245 1 : *hint = delim.size() > 1 ? delim.size() - 1 : 0;
246 197 : return std::string_view::npos;
247 : }
248 : };
249 :
250 : /** Asynchronously read until a match condition is satisfied.
251 :
252 : Reads data from `stream` and appends it to `dynbuf` via calling
253 : `stream.read_some` zero or more times and using the prepare/commit
254 : interface until:
255 :
256 : @li either @c match returns a valid position,
257 : @li or @c dynbuf.size() == @c dynbuf.max_size() ,
258 : @li or a contingency on @c stream.read_some occurs.
259 :
260 : If the match condition is satisfied by data in `dynbuf.data()` upon entry,
261 : no call to `stream.read_some` is performed.
262 :
263 :
264 : @par Await-returns
265 :
266 : An object of type `io_result<std::size_t>` destructuring as `[ec, n]`.
267 :
268 : If `bool(ec)`, `n` is the position returned by the match condition
269 : (bytes up to and including the matched delimiter).
270 :
271 :
272 : Contingencies:
273 :
274 : @li The first contingency, reported from awaiting @c stream.read_some .
275 : @li @c cond::not_found -- when @c dynbuf.size() == @c dynbuf.max_size()
276 : and the match condition is not satisfied by data in @c dynbuf.data() .
277 :
278 : @param stream The stream to read from. The caller retains ownership.
279 : @param dynbuf The dynamic buffer to append data to. Must remain
280 : valid until the operation completes.
281 : @param match The match condition callable. Copied into the awaitable.
282 : @param initial_amount Initial bytes to read per iteration (default
283 : 2048). Grows by 1.5x when filled.
284 :
285 :
286 :
287 :
288 : @par Await-throws
289 :
290 : Whatever operations on @c dunbuf throw.
291 :
292 : (Note: types modeling @c DynamicBufferParam provided by Capy throw
293 : @c std::bad_alloc from member function
294 : @c prepare .)
295 :
296 : @par Remarks
297 : Supports _IoAwaitable cancellation_.
298 :
299 : @par Example
300 :
301 : @code
302 : task<> read_http_header( ReadStream auto& stream )
303 : {
304 : std::string header;
305 : auto [ec, n] = co_await read_until(
306 : stream,
307 : string_dynamic_buffer( &header ),
308 : []( std::string_view data, std::size_t* hint ) {
309 : auto pos = data.find( "\r\n\r\n" );
310 : if( pos != std::string_view::npos )
311 : return pos + 4;
312 : if( hint )
313 : (*hint) = 3; // partial "\r\n\r" possible
314 : return std::string_view::npos;
315 : } );
316 : if( ec )
317 : detail::throw_system_error( ec );
318 : // header contains data through "\r\n\r\n"
319 : }
320 : @endcode
321 :
322 : @see read_some, MatchCondition, DynamicBufferParam
323 : */
324 : template<ReadStream Stream, class B, MatchCondition M>
325 : requires DynamicBufferParam<B&&>
326 : detail::read_until_return_t<Stream, B, M>
327 146 : read_until(
328 : Stream& stream,
329 : B&& dynbuf,
330 : M match,
331 : std::size_t initial_amount = 2048)
332 : {
333 146 : constexpr bool is_lvalue = std::is_lvalue_reference_v<B&&>;
334 : using BareB = std::remove_reference_t<B>;
335 :
336 : if constexpr(is_lvalue)
337 : return detail::read_until_awaitable<Stream, BareB, M, false>(
338 14 : stream, std::addressof(dynbuf), std::move(match), initial_amount);
339 : else
340 : return detail::read_until_awaitable<Stream, BareB, M, true>(
341 132 : stream, std::move(dynbuf), std::move(match), initial_amount);
342 : }
343 :
344 : /** Asynchronously read until a delimiter string is found.
345 :
346 : Reads data from the stream until the delimiter is found. This is
347 : a convenience overload equivalent to calling `read_until` with
348 : `match_delim{delim}`. If the delimiter already exists in the
349 : buffer, returns immediately without I/O.
350 :
351 : @li The operation completes when:
352 : @li The delimiter string is found
353 : @li End-of-stream is reached (`cond::eof`)
354 : @li The buffer's `max_size()` is reached (`cond::not_found`)
355 : @li An error occurs
356 : @li The operation is cancelled
357 :
358 : @par Cancellation
359 : Supports cancellation via `stop_token` propagated through the
360 : IoAwaitable protocol. When cancelled, returns with `cond::canceled`.
361 :
362 : @param stream The stream to read from. The caller retains ownership.
363 : @param buffers The dynamic buffer to append data to. Must remain
364 : valid until the operation completes.
365 : @param delim The delimiter string to search for.
366 : @param initial_amount Initial bytes to read per iteration (default
367 : 2048). Grows by 1.5x when filled.
368 :
369 : @return An awaitable that await-returns `(error_code, std::size_t)`.
370 : On success, `n` is bytes up to and including the delimiter.
371 : Compare error codes to conditions:
372 : @li `cond::eof` - EOF before delimiter; `n` is buffer size
373 : @li `cond::not_found` - `max_size()` reached before delimiter
374 : @li `cond::canceled` - Operation was cancelled
375 :
376 : @par Example
377 :
378 : @code
379 : task<std::string> read_line( ReadStream auto& stream )
380 : {
381 : std::string line;
382 : auto [ec, n] = co_await read_until(
383 : stream, string_dynamic_buffer( &line ), "\r\n" );
384 : if( ec == cond::eof )
385 : co_return line; // partial line at EOF
386 : if( ec )
387 : detail::throw_system_error( ec );
388 : line.resize( n - 2 ); // remove "\r\n"
389 : co_return line;
390 : }
391 : @endcode
392 :
393 : @see read_until, match_delim, DynamicBufferParam
394 : */
395 : template<ReadStream Stream, class B>
396 : requires DynamicBufferParam<B&&>
397 : detail::read_until_return_t<Stream, B, match_delim>
398 118 : read_until(
399 : Stream& stream,
400 : B&& buffers,
401 : std::string_view delim,
402 : std::size_t initial_amount = 2048)
403 : {
404 : return read_until(
405 : stream,
406 : std::forward<B>(buffers),
407 : match_delim{delim},
408 118 : initial_amount);
409 : }
410 :
411 : } // namespace capy
412 : } // namespace boost
413 :
414 : #endif
|