From 81fb33de30700ddb7f56d2d5523df4b908e317d8 Mon Sep 17 00:00:00 2001 From: Michael Vandeberg Date: Fri, 20 Mar 2026 15:42:26 -0600 Subject: [PATCH] Replace coroutine_handle with continuation in executor interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The executor concept's dispatch() and post() now accept continuation& instead of std::coroutine_handle<>. A continuation wraps a coroutine handle with an intrusive next_ pointer, enabling executors to queue work without per-post heap allocation. The thread pool's post() previously allocated a work node on every call (new work(h)). It now links the caller's continuation directly into an intrusive queue — zero allocation on the hot path. The continuation lives in the I/O awaitable for caller-handle posting (delay, async_mutex, async_event, stream), and in combinator/trampoline state for parent-dispatch and child-launch patterns (when_all, when_any, timeout, run, run_async). The IoAwaitable concept and io_awaitable_promise_base are unchanged. Breaking change: all Executor implementations must update dispatch() and post() signatures from coroutine_handle<> to continuation&. --- bench/bench.cpp | 8 +- doc/continuation-rationale.md | 529 ++++++++++++++++++ .../ROOT/pages/4.coroutines/4c.executors.adoc | 18 +- .../pages/7.examples/7n.custom-executor.adoc | 22 +- .../ROOT/pages/8.design/8k.Executor.adoc | 141 ++--- doc/unlisted/execution-executors.adoc | 45 +- doc/unlisted/io-awaitables-executor.adoc | 36 +- example/asio/api/capy_streams.cpp | 18 +- example/asio/api/uni_stream.hpp | 6 +- example/custom-executor/custom_executor.cpp | 10 +- include/boost/capy/concept/executor.hpp | 48 +- include/boost/capy/concept/io_awaitable.hpp | 9 +- include/boost/capy/continuation.hpp | 78 +++ include/boost/capy/delay.hpp | 13 +- include/boost/capy/ex/any_executor.hpp | 38 +- include/boost/capy/ex/async_event.hpp | 11 +- include/boost/capy/ex/async_mutex.hpp | 11 +- include/boost/capy/ex/executor_ref.hpp | 40 +- include/boost/capy/ex/io_env.hpp | 21 +- include/boost/capy/ex/run.hpp | 8 +- include/boost/capy/ex/run_async.hpp | 11 +- include/boost/capy/ex/strand.hpp | 44 +- include/boost/capy/ex/thread_pool.hpp | 23 +- include/boost/capy/test/run_blocking.hpp | 10 +- include/boost/capy/test/stream.hpp | 37 +- include/boost/capy/timeout.hpp | 8 +- include/boost/capy/when_all.hpp | 31 +- include/boost/capy/when_any.hpp | 31 +- src/ex/detail/strand_service.cpp | 19 +- src/ex/thread_pool.cpp | 97 ++-- src/test/run_blocking.cpp | 8 +- test/unit/ex/any_executor.cpp | 68 ++- test/unit/ex/executor_ref.cpp | 34 +- test/unit/ex/frame_cb.cpp | 13 +- test/unit/ex/run_async.cpp | 16 +- test/unit/ex/strand.cpp | 21 +- test/unit/ex/thread_pool.cpp | 48 +- test/unit/ex/work_guard.cpp | 6 +- test/unit/task.cpp | 8 +- test/unit/test_helpers.hpp | 21 +- 40 files changed, 1206 insertions(+), 458 deletions(-) create mode 100644 doc/continuation-rationale.md create mode 100644 include/boost/capy/continuation.hpp diff --git a/bench/bench.cpp b/bench/bench.cpp index 3c1720f1..b8062419 100644 --- a/bench/bench.cpp +++ b/bench/bench.cpp @@ -81,14 +81,14 @@ class bench_io_context::executor_type { } - std::coroutine_handle<> dispatch(std::coroutine_handle<> h) const + std::coroutine_handle<> dispatch(continuation& c) const { - return h; + return c.h; } - void post(std::coroutine_handle<> h) const + void post(continuation& c) const { - h.resume(); + c.h.resume(); } void defer(std::coroutine_handle<> h) const diff --git a/doc/continuation-rationale.md b/doc/continuation-rationale.md new file mode 100644 index 00000000..544e30d1 --- /dev/null +++ b/doc/continuation-rationale.md @@ -0,0 +1,529 @@ +# Design Rationale: Continuation Type in the Executor Interface + +## Context + +This document captures the design space and trade-offs around replacing +`std::coroutine_handle<>` with a first-class `continuation` type in +capy's executor interface. The central question is whether the executor +concept should traffic in raw coroutine handles or in a richer type that +carries intrusive queue metadata. Secondary questions address where the +`continuation` object lives, how it is passed, and what this means for +the promise base, the `IoAwaitable` protocol, and downstream consumers +like corosio. + +The consensus was reached through discussion and prototyping. The +implementation ships as a breaking change to the `Executor` concept. + +## Current Consensus + +The executor concept adopts `continuation&` as the parameter type for +`dispatch` and `post`: + +```cpp +struct continuation +{ + std::coroutine_handle<> h; + continuation* next = nullptr; +}; + +concept Executor = requires(E& e, continuation c) { + { e.dispatch(c) } -> std::same_as>; + { e.post(c) }; + // ... +}; +``` + +Both fields are public. The `continuation` lives in the I/O awaitable +for caller-handle posting, and in combinator/trampoline state for +parent-dispatch and child-launch patterns. The `IoAwaitable` concept +is unchanged. The promise base (`io_awaitable_promise_base`) is +unchanged. The `dispatch` return type remains `std::coroutine_handle<>` +for symmetric transfer. + +The rationale for these choices follows. + +## Background + +### The Executor Bottleneck + +Every coroutine resumption in capy funnels through the executor's +`dispatch` or `post`. I/O completions, combinator child launches, +cancel callbacks, and cross-executor trampolines all converge on +these two operations. The executor interface is the narrowest +bottleneck in the library. + +### The Allocation Problem + +With `std::coroutine_handle<>` as the parameter, executors that queue +work must allocate a node to hold the handle. The thread pool wraps +every posted handle in a heap-allocated `work` struct: + +```cpp +struct work : intrusive_queue::node +{ + std::coroutine_handle<> h_; + // ... +}; + +void post(std::coroutine_handle<> h) { + auto* w = new work(h); // per-post allocation + q_.push(w); +} +``` + +Corosio's reactor scheduler has the same pattern: a `post_handler` +that inherits from `scheduler_op` and is heap-allocated for every +`post(coroutine_handle<>)` call. Corosio solved this for I/O +operations by using `scheduler_op*` (an intrusive node embedded in +the awaitable), but the executor-level `post(coroutine_handle<>)` +path remained allocating. + +Frame allocation is already recycled via `recycling_memory_resource`. +Queue-node allocation is the last steady-state allocation in the hot +path. + +### The Safety Problem + +Users can obtain a `std::coroutine_handle<>` and call +`executor.post(h)` directly. Misuse of raw coroutine handles +(double resume, use-after-destroy, resuming on the wrong thread) +causes silent UB. The type system does nothing to prevent it. + +### Concepts vs. Concrete Types + +A concept specifies the least set of requirements that generic code +may rely on. The executor concept determines what `dispatch` and +`post` accept. Changing this parameter type is a breaking change to +the concept and all conforming executor implementations. + +## The Parameter Type Question + +Three options exist for how `dispatch` and `post` receive the +continuation: + +### Option P1: By Value + +```cpp +void post(continuation c) const; +``` + +**Arguments for:** + +1. Simplest signature. Matches the original sketch. +2. No aliasing concerns — the executor gets its own copy. + +**Arguments against:** + +1. Breaks zero-allocation queuing. The executor links the + continuation into an intrusive queue via `next`. If `c` is a + stack-local copy, the copy is destroyed when `post` returns and + the queue has a dangling pointer. The whole point of the intrusive + `next` is that the executor queues the *original object*, not a + copy. +2. For `dispatch`, the inline case (return `c.h` for symmetric + transfer) works, but the fallback to `post` has the same problem. + +### Option P2: By Reference (chosen) + +```cpp +void post(continuation& c) const; +std::coroutine_handle<> dispatch(continuation& c) const; +``` + +**Arguments for:** + +1. The executor links the original object into the queue. No copy, + no dangling pointer. +2. The caller guarantees address stability — the `continuation` + must outlive the queue residency. This is the same guarantee + already required for coroutine frames and awaitable objects. +3. Cleaner than pointer — no null state to handle. + +**Arguments against:** + +1. Requires the caller to ensure the `continuation` is an lvalue + with sufficient lifetime. A `continuation` constructed as a + temporary cannot be passed. + +### Option P3: By Pointer + +```cpp +void post(continuation* c) const; +``` + +**Arguments for:** + +1. Traditional for intrusive data structures. Nullable. + +**Arguments against:** + +1. Nullable without reason — a null continuation is meaningless for + `post` and `dispatch`. +2. Pointer syntax at every call site (`&c` vs. `c`). + +**Recommendation:** Option P2. By-reference is the only option that +supports zero-allocation intrusive queuing without introducing null +states. The address-stability requirement is inherent to intrusive +data structures and is already a property of the objects that embed +continuations (awaitables, combinator state). + +## The Placement Question + +The `continuation` needs a stable address while it sits in an +executor's queue. Two locations were considered: + +### Option L1: In the Promise + +`io_awaitable_promise_base` gains a `continuation` member. One +`continuation` per coroutine, reused across all suspension points. + +**Arguments for:** + +1. One canonical location per coroutine. No question about where + it lives. +2. The promise outlives every suspension point, so the address is + always stable. +3. `final_suspend` can dispatch the parent's continuation directly + without any additional state. + +**Arguments against:** + +1. Changes the `IoAwaitable` concept. `await_suspend` must receive + `continuation&` instead of `coroutine_handle<>`, or the awaitable + must reach into the caller's promise to get the continuation. Both + are protocol changes. +2. Burdens task authors. Every promise type that inherits from + `io_awaitable_promise_base` grows by a pointer (the `next` + field) even though most suspension points never queue the + continuation (they use symmetric transfer inline). +3. Conflates two concerns. The promise stores "who resumes me when + I'm done" — a parent-child relationship. The `continuation` with + `next` means "I'm a queueable unit of work." These are different + concepts. The parent's continuation is only queued when the child + finishes and the parent must be posted to a different executor. + In the common case (same executor, symmetric transfer), it is + never queued. + +### Option L2: In the Awaitable (chosen) + +Each I/O awaitable embeds its own `continuation`. The awaitable +receives `coroutine_handle<>` in `await_suspend` as it does today, +wraps it in the embedded `continuation`, and passes that to +`post()`/`dispatch()`. + +**Arguments for:** + +1. No change to the `IoAwaitable` concept. The `continuation` is + an implementation detail of the awaitable, not a protocol concern. +2. The awaitable has a stable address for the duration of the + suspension (the compiler guarantees this for the operand of + `co_await`). +3. Aligns with corosio's pattern, where I/O services already embed + their operation state (`scheduler_op`) in the awaitable. +4. Zero burden on task authors. `task`, `quitter`, and future + task types are unchanged. +5. Cancel callbacks store `continuation*` pointing into the + awaitable, which outlives the suspension. + +**Arguments against:** + +1. A new `continuation` is initialized at every `co_await`. Not an + allocation (it is embedded), but `next` and `h` are set each + time. +2. Combinator and trampoline patterns (parent dispatch, child + launch) do not have an I/O awaitable in scope. These sites need + their own `continuation` storage in the combinator state or + trampoline promise. + +### Comparison + +| Property | Promise (L1) | Awaitable (L2) | +|---|---|---| +| Changes `IoAwaitable` concept? | Yes | No | +| Continuations per coroutine | One, reused | One per `co_await` | +| Init cost per suspension | None (already set) | Set `h` and `next` | +| Alignment with corosio `scheduler_op` | Separate patterns | Same pattern | +| Burden on task authors | Yes — inherits extra pointer | None | +| Combinator / trampoline sites | Free (in promise) | Need explicit storage | +| `io_awaitable_promise_base` size | +8 bytes per coroutine | Unchanged | + +**Recommendation:** Option L2. The `continuation` is about how an +I/O operation interacts with the executor's queue — that is the +awaitable's concern. The handful of combinator and trampoline sites +that need their own `continuation` storage are internal to the library +and explicitly annotated. The promise base stays lean, the IoAwaitable +protocol is untouched, and task authors see no change. + +## The Dispatch Return Type Question + +`dispatch` returns `std::coroutine_handle<>` for symmetric transfer. +Two options exist for what `dispatch` returns now that it accepts +`continuation&`: + +### Option D1: Return `std::coroutine_handle<>` (chosen) + +```cpp +std::coroutine_handle<> dispatch(continuation& c) const; +``` + +**Arguments for:** + +1. Symmetric transfer is a language-level mechanism. `await_suspend` + must return `std::coroutine_handle<>`. The return type of + `dispatch` feeds directly into `await_suspend`'s return value. +2. The inline case returns `c.h` (the wrapped handle). The posted + case returns `std::noop_coroutine()`. Both are already + `coroutine_handle<>`. +3. No new type needed in the return position. + +**Arguments against:** + +None identified. + +### Option D2: Return `continuation&` or `continuation*` + +**Arguments for:** + +1. Symmetry with the parameter type. + +**Arguments against:** + +1. `await_suspend` cannot return `continuation&`. The language + requires `coroutine_handle<>`, `bool`, or `void`. +2. The caller would have to unwrap `.h` at every return site. +3. Returning a reference to the input parameter is semantically + confusing — the executor may have queued the continuation and + returned `noop_coroutine()`, in which case the reference points + to a queued object. + +**Recommendation:** Option D1. The return type stays +`std::coroutine_handle<>`. Symmetric transfer is a language +mechanism that operates on handles, not continuations. + +## The Address Stability Invariant + +A `continuation` must not move or be destroyed while it is linked +into an executor's queue. When `post(c)` is called, the executor +stores `&c` in an intrusive list via `c.next_`. If `c` moves or is +destroyed before the executor dequeues it, the list has a dangling +pointer. + +This is not a new class of obligation. A `coroutine_handle<>` posted +to an executor has the same requirement: the coroutine frame it +points to must remain alive until the handle is resumed. The +difference is that the old executor interface hid this behind a +per-post heap allocation — `new work(h)` copied the handle into +owned storage, so the caller never had to think about it. With +`continuation&`, the queue node is the caller's object, making the +lifetime discipline explicit rather than hidden behind an allocation. + +In coroutine code, the invariant is satisfied automatically: + +- **I/O awaitables** are alive for the duration of the suspension + (guaranteed by the compiler for the operand of `co_await`). +- **Combinator state** outlives all child runners by construction. +- **Trampoline promises** live inside heap-allocated coroutine frames. + +The invariant is only visible in non-coroutine code (tests, manual +executor interaction), where the caller must ensure the `continuation` +is declared before the executor or otherwise outlives the queue +residency. This is the same care required when holding a raw +`coroutine_handle<>` — the handle must not dangle. The continuation +merely surfaces an obligation that was always present. + +Practical guidelines: + +- **Do not store continuations in containers that reallocate.** + `std::vector` is unsafe if the vector grows after + any continuation has been posted. Use + `std::unique_ptr` (allocated once, never + reallocated) or `std::array`. + +- **Declaration order matters in non-coroutine code.** A + stack-local `continuation` posted to a `thread_pool` must be + declared before the pool, so that C++ LIFO destruction destroys + the pool (joining its threads) before destroying the continuation. + +ASAN builds catch most violations. + +## The Strand Question + +The strand wraps an inner executor and provides serialized execution. +Its internal mechanism uses `strand_op` wrapper coroutines with frame +recycling. Two options exist for how the strand interacts with +`continuation`: + +### Option S1: Strand Queues Continuations Directly + +Replace `strand_op` with direct `continuation` queueing via `next`. + +**Arguments for:** + +1. Eliminates the wrapper coroutine and frame recycling machinery. +2. Consistent with the thread pool's approach. + +**Arguments against:** + +1. The strand_op wrapper exists for dispatch-loop control, not just + queuing. When the strand resumes a coroutine, the coroutine may + complete and its `final_suspend` may do symmetric transfer. The + wrapper coroutine catches this: it calls `target.resume()`, and + when the target suspends or the wrapper's own `final_suspend` + fires, control returns to the dispatch loop. Without the wrapper, + symmetric transfer from the target's `final_suspend` would escape + the strand's dispatch loop entirely. +2. Frame recycling amortizes allocation to once per strand lifetime. + Removing it does not save allocations — it moves them. + +### Option S2: Strand Keeps Its Wrapper, Changes Input Signature (chosen) + +The strand's `post(continuation& c)` extracts `c.h` and wraps it in +a `strand_op` as before. Only the public signature changes. + +**Arguments for:** + +1. Minimal change. The strand's proven serialization mechanism is + untouched. +2. The `strand_op` wrapper and frame recycling continue to work + exactly as before. + +**Arguments against:** + +1. The strand does not benefit from zero-allocation posting. Each + `post` still creates a wrapper coroutine. (But the wrapper frames + are recycled, so the steady-state allocation count is zero.) + +**Recommendation:** Option S2. The strand's wrapper mechanism solves +a problem (`continuation` does not: dispatch-loop control). Changing +only the input signature is the minimal, safe approach. + +## The Promise Base Question + +`io_awaitable_promise_base` stores the parent's coroutine handle via +`set_continuation(coroutine_handle<>)` / `continuation()`. Should +this internal storage change from `coroutine_handle<>` to the +`continuation` struct? + +### Option B1: Change Internal Storage + +`cont_` becomes `continuation`. `set_continuation` still accepts +`coroutine_handle<>` and constructs the struct internally. Task +authors see no change. + +**Arguments for:** + +1. `final_suspend` can dispatch the parent's continuation directly + to the executor without extra state. +2. Invisible to task authors — the conversion is internal. + +**Arguments against:** + +1. Every coroutine frame grows by 8 bytes (the `next` pointer), + even though the parent's continuation is rarely queued. The common + case (same executor, symmetric transfer) returns `c.h` inline — + `next` is dead weight. +2. Conflates "who resumes me" with "I'm a queueable unit." + +### Option B2: Keep Promise Base Unchanged (chosen) + +`cont_` stays as `coroutine_handle<>`. Only the specific internal +types that dispatch through an executor at `final_suspend` store +their own `continuation`: + +- `when_all_core::continuation_` (parent handle for combinator) +- `when_any_core::continuation_` (same) +- `dispatch_trampoline::parent_` (cross-executor trampoline) +- `run_awaitable_ex::task_cont_` (initial task dispatch) +- `run_async_trampoline::task_cont_` (same) + +**Arguments for:** + +1. Zero size increase for all coroutine frames. +2. Clean separation: the promise stores a handle for symmetric + transfer; the `continuation` struct is only used where queuing + actually occurs. +3. The affected sites are all library-internal, not user-facing. + +**Arguments against:** + +1. More explicit storage declarations in combinator and trampoline + code. (But these are few and clearly annotated.) + +**Recommendation:** Option B2. The 8-byte-per-frame cost is +unnecessary. The handful of internal sites that need a `continuation` +for executor dispatch are explicit about it. + +## Impact on Corosio + +Corosio is a separate library that consumes capy's executor interface. +The `continuation` change requires updates in corosio: + +1. **`io_context::executor_type`** — `dispatch` and `post` signatures + change. The fast-path logic (return `c.h` if on scheduler thread, + else post) is structurally identical. + +2. **`dispatch_coro`** — The single dispatch point for all + reactor-based I/O completions. Currently takes `coroutine_handle<>` + from the reactor_op; will take `continuation&`. The fast-path + (`target()` check) extracts `c.h` for + symmetric transfer. + +3. **`scheduler::post(coroutine_handle<>)`** — Currently + heap-allocates a `post_handler`. With `continuation`, the scheduler + can queue the continuation directly via `next`, eliminating the + allocation. Whether `continuation::next_` and `scheduler_op`'s + intrusive queue unify or coexist is a corosio-internal design + question. + +4. **I/O operation types** (`reactor_op`, `overlapped_op`, + `waiter_node`) — These store `coroutine_handle<>` and + `executor_ref`. They would embed a `continuation` instead. + +5. **IOCP constraint** — `overlapped_op` must remain an `OVERLAPPED` + for the Windows API. `continuation` must coexist with `OVERLAPPED` + inheritance, not replace it. + +## Areas of Agreement + +1. **The executor interface should not traffic in raw + `coroutine_handle<>`.** The allocation cost and safety risk are + both real. + +2. **The `IoAwaitable` concept should not change.** Awaitables + receive `coroutine_handle<>` in `await_suspend` and manage the + `continuation` internally. + +3. **The promise base should not carry a `continuation`.** The + per-frame overhead is unjustified for a field that is rarely + used for queuing. + +4. **`dispatch` returns `std::coroutine_handle<>`.** Symmetric + transfer is a language mechanism. + +5. **Address stability is the caller's responsibility.** The + `continuation` must outlive the queue residency. This is + documented and enforced by ASAN. + +## Summary + +| Property | `coroutine_handle<>` (old) | `continuation&` (new) | +|---|---|---| +| Per-post allocation (thread_pool) | `new work(h)` every call | None (intrusive queue) | +| Per-post allocation (strand) | `strand_op` wrapper (recycled) | Same (wrapper retained) | +| Type safety | Raw handle, easy to misuse | Struct, harder to fabricate | +| `IoAwaitable` concept | `await_suspend(handle, env)` | Unchanged | +| Promise base | `coroutine_handle<>` | Unchanged | +| Combinator state | `coroutine_handle<>` fields | `continuation` fields | +| Symmetric transfer | `dispatch` returns handle | Same | +| Lifetime invariant | Frame must outlive handle (hidden by allocation) | Same obligation, explicit (no allocation) | +| Breaking change | — | Yes (executor concept) | + +The core trade-off is between the simplicity of raw handles (freely +copyable, lifetime hidden behind per-post allocation) and the +performance and safety benefits of intrusive continuations +(zero-allocation posting, type system barrier against misuse). The +lifetime discipline is not new — a `coroutine_handle<>` always +required the frame to outlive the handle — but it becomes the +caller's explicit responsibility instead of being absorbed by a +heap allocation. In coroutine code, the existing lifetime guarantees +of awaitables and combinator state satisfy this automatically. diff --git a/doc/modules/ROOT/pages/4.coroutines/4c.executors.adoc b/doc/modules/ROOT/pages/4.coroutines/4c.executors.adoc index 2c0b43b9..4839686f 100644 --- a/doc/modules/ROOT/pages/4.coroutines/4c.executors.adoc +++ b/doc/modules/ROOT/pages/4.coroutines/4c.executors.adoc @@ -13,9 +13,9 @@ An *executor* is an object that can schedule work for execution. In Capy, execut [source,cpp] ---- -concept Executor = requires(E ex, std::coroutine_handle<> h) { - { ex.dispatch(h) } -> std::same_as>; - { ex.post(h) } -> std::same_as; +concept Executor = requires(E ex, continuation& c) { + { ex.dispatch(c) } -> std::same_as>; + { ex.post(c) } -> std::same_as; { ex.context() } -> std::convertible_to; }; ---- @@ -24,11 +24,11 @@ concept Executor = requires(E ex, std::coroutine_handle<> h) { Both methods schedule a coroutine for execution, but with different semantics: -`dispatch(h)`:: -May execute `h` inline if the current thread is already associated with the executor. Returns a coroutine handle—either `h` if execution was deferred, or `std::noop_coroutine()` if `h` was executed immediately. This enables symmetric transfer optimization. +`dispatch(c)`:: +May execute inline if the current thread is already associated with the executor. Returns a coroutine handle—either `c.h` for inline resumption via symmetric transfer, or `std::noop_coroutine()` if the work was queued. This enables symmetric transfer optimization. -`post(h)`:: -Always queues `h` for later execution. Never executes inline. Returns void. Use when you need guaranteed asynchrony. +`post(c)`:: +Always queues the continuation for later execution. Never executes inline. Returns void. Use when you need guaranteed asynchrony. === context() @@ -40,9 +40,9 @@ Returns a reference to the execution context that owns this executor. The contex [source,cpp] ---- -void schedule_work(executor_ref ex, std::coroutine_handle<> h) +void schedule_work(executor_ref ex, continuation& c) { - ex.post(h); // Works with any executor + ex.post(c); // Works with any executor } int main() diff --git a/doc/modules/ROOT/pages/7.examples/7n.custom-executor.adoc b/doc/modules/ROOT/pages/7.examples/7n.custom-executor.adoc index 8581934a..c248941a 100644 --- a/doc/modules/ROOT/pages/7.examples/7n.custom-executor.adoc +++ b/doc/modules/ROOT/pages/7.examples/7n.custom-executor.adoc @@ -95,17 +95,17 @@ public: void on_work_finished() const noexcept {} std::coroutine_handle<> dispatch( - std::coroutine_handle<> h) const + capy::continuation& c) const { if (loop_->is_running_on_this_thread()) - return h; - loop_->enqueue(h); + return c.h; + loop_->enqueue(c.h); return std::noop_coroutine(); } - void post(std::coroutine_handle<> h) const + void post(capy::continuation& c) const { - loop_->enqueue(h); + loop_->enqueue(c.h); } bool operator==(executor_type const& other) const noexcept @@ -192,8 +192,8 @@ The nested `executor_type` must provide: * `context()` -- returns a reference to the owning `execution_context` * `on_work_started()` / `on_work_finished()` -- work-tracking hooks -* `dispatch(h)` -- resume immediately if already on this context, otherwise enqueue -* `post(h)` -- always enqueue for later execution +* `dispatch(c)` -- resume immediately if already on this context, otherwise enqueue. Takes a `continuation&` and returns `std::coroutine_handle<>`. +* `post(c)` -- always enqueue for later execution. Takes a `continuation&`. * `operator==` -- compare two executors for identity [source,cpp] @@ -208,16 +208,16 @@ The `static_assert` verifies at compile time that all concept requirements are m [source,cpp] ---- std::coroutine_handle<> dispatch( - std::coroutine_handle<> h) const + capy::continuation& c) const { if (loop_->is_running_on_this_thread()) - return h; // resume inline - loop_->enqueue(h); + return c.h; // resume inline + loop_->enqueue(c.h); return std::noop_coroutine(); // defer } ---- -`dispatch` checks whether the caller is already running on the loop's thread. If so, it returns the handle directly for inline resumption. Otherwise it enqueues and returns `noop_coroutine` so the caller continues without blocking. +`dispatch` takes a `continuation&` and checks whether the caller is already running on the loop's thread. If so, it returns `c.h` directly for inline resumption via symmetric transfer. Otherwise it enqueues `c.h` and returns `noop_coroutine` so the caller continues without blocking. `post` always enqueues, even if already on the right thread. diff --git a/doc/modules/ROOT/pages/8.design/8k.Executor.adoc b/doc/modules/ROOT/pages/8.design/8k.Executor.adoc index 1060fa8b..92fc06a1 100644 --- a/doc/modules/ROOT/pages/8.design/8k.Executor.adoc +++ b/doc/modules/ROOT/pages/8.design/8k.Executor.adoc @@ -2,7 +2,7 @@ == Overview -This document describes the design of the `Executor` concept: the interface through which coroutines are scheduled for execution. It explains the relationship to Asio's executor model, why `dispatch` returns `void`, why `defer` was dropped, how `executor_ref` achieves zero-allocation type erasure, and the I/O completion pattern that motivates the entire design. +This document describes the design of the `Executor` concept: the interface through which coroutines are scheduled for execution. It explains the relationship to Asio's executor model, why `dispatch` returns `std::coroutine_handle<>`, why `defer` was dropped, how `executor_ref` achieves zero-allocation type erasure, and the I/O completion pattern that motivates the entire design. The `Executor` concept exists to answer one question: when a coroutine is ready to run, _where_ does it run? The concept captures the rules for scheduling coroutine resumption, tracking outstanding work for graceful shutdown, and accessing the execution context that owns the executor. Every I/O awaitable in Corosio -- sockets, acceptors, timers, resolvers -- depends on this concept to dispatch completions back to the correct executor. @@ -15,7 +15,7 @@ concept Executor = std::is_nothrow_copy_constructible_v && std::is_nothrow_move_constructible_v && requires(E& e, E const& ce, E const& ce2, - std::coroutine_handle<> h) + continuation c) { { ce == ce2 } noexcept -> std::convertible_to; { ce.context() } noexcept; @@ -28,20 +28,22 @@ concept Executor = { ce.on_work_started() } noexcept; { ce.on_work_finished() } noexcept; - { ce.dispatch(h) }; - { ce.post(h) }; + { ce.dispatch(c) } -> std::same_as>; + { ce.post(c) }; }; ---- -An `Executor` provides exactly two operations on a coroutine handle: +An `Executor` provides exactly two scheduling operations: -=== `dispatch(h)` -- Execute If Safe +=== `dispatch(c)` -- Execute If Safe -If the executor determines it is safe (e.g., the current thread is already associated with the executor's context), resumes the coroutine inline via `h.resume()`. Otherwise, posts the coroutine for later execution. Returns `void`. +If the executor determines it is safe (e.g., the current thread is already associated with the executor's context), returns `c.h` for symmetric transfer. Otherwise, posts the continuation for later execution and returns `std::noop_coroutine()`. The caller uses the returned handle for symmetric transfer from `await_suspend`, or calls `.resume()` at the event loop pump level. -=== `post(h)` -- Always Queue +=== `post(c)` -- Always Queue -Queues the coroutine for later execution without ever executing it inline. Never blocks. Use when guaranteed asynchrony is required. +Queues the continuation for later execution without ever executing it inline. Never blocks. The continuation is linked into the executor's internal queue via its intrusive `next` pointer -- no per-post heap allocation. + +Both operations accept `continuation&` rather than `std::coroutine_handle<>`. A `continuation` wraps a coroutine handle with an intrusive list pointer, enabling zero-allocation queuing. See xref:../continuation-rationale.adoc[Continuation Rationale] for the design of this type. The remaining operations support context access, lifecycle management, and identity: @@ -77,52 +79,36 @@ Capy retains the core elements of this model: Capy removes or changes: - **`defer`.** Dropped entirely. See <>. -- **Function object submission.** Asio executors accept arbitrary callables. Capy executors accept only `std::coroutine_handle<>`. This removes the need for allocator-aware function erasure and enables a simpler, cheaper type-erased wrapper (`executor_ref`). -- **`dispatch` return type.** Asio's `dispatch` returns void for the same reason Capy's does, but Capy also considered and rejected a `coroutine_handle<>` return for symmetric transfer. See <>. +- **Function object submission.** Asio executors accept arbitrary callables. Capy executors accept `continuation&` -- a coroutine handle wrapped with an intrusive queue pointer. This removes the need for allocator-aware function erasure, eliminates per-post heap allocation, and enables a simpler, cheaper type-erased wrapper (`executor_ref`). +- **`dispatch` return type.** Asio's `dispatch` returns void. Capy's `dispatch` returns `std::coroutine_handle<>` for symmetric transfer. See <>. The result is a concept that preserves Asio's proven execution model while removing the machinery that a coroutine-native library does not need. -[[why-dispatch-returns-void]] -== Why `dispatch` Returns `void` - -An earlier design had `dispatch` return `std::coroutine_handle<>` so that callers could use it for symmetric transfer from `await_suspend`. This was rejected because it violates a fundamental constraint of the I/O layer. - -=== The Problem: Synchronous Completion During `await_suspend` - -When an I/O awaitable initiates an operation inside `await_suspend`, the I/O might complete immediately. If it does, the completion path would call `dispatch(h)` while the caller's `await_suspend` is still on the call stack. If `dispatch` resumed the coroutine inline via `h.resume()`, the coroutine would execute while `await_suspend` has not yet returned -- resuming a coroutine from inside `await_suspend` before the suspension machinery completes risks undefined behavior. - -The {cpp} standard describes the sequencing in https://eel.is/c++draft/expr.await[[expr.await]/5.1]: - -[quote] -____ -If the result of await-ready is false, the coroutine is -considered suspended. Then, await-suspend is evaluated. -____ +[[why-dispatch-returns-handle]] +== Why `dispatch` Returns `std::coroutine_handle<>` -Although the standard considers the coroutine suspended before `await_suspend` is called, resuming it from _within_ `await_suspend` creates a nested resumption on the same call stack. The resumed coroutine runs, potentially suspends again or completes, and then control returns into the middle of `await_suspend`. If the coroutine was destroyed during resumption, `await_suspend` returns into a destroyed frame. +`dispatch` returns a `std::coroutine_handle<>` so that callers can use it for symmetric transfer from `await_suspend`. When the executor determines that inline resumption is safe, it returns `c.h` -- the caller returns this from `await_suspend` and the compiler performs a tail-call transfer to the target coroutine. When inline resumption is not safe, the executor queues the continuation and returns `std::noop_coroutine()`, which suspends the caller without resuming anything. -=== Why I/O Awaitables Return `void` or `std::noop_coroutine()` - -To avoid this, all I/O awaitables return `void` or `std::noop_coroutine()` from `await_suspend`. Both forms guarantee that the caller is fully suspended and the call stack has unwound before any completion handler can resume the coroutine. The I/O operation is initiated during `await_suspend`, but the completion is dispatched later -- from the event loop, after `await_suspend` has returned. - -https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0913r1.html[P0913R1] introduced the `coroutine_handle` return type for symmetric transfer, which is the correct mechanism for coroutine-to-coroutine control transfer (as used by `task` internally). But I/O awaitables cannot use it because the I/O completion is asynchronous relative to `await_suspend` -- it comes from the reactor or proactor, not from the awaitable itself. - -=== Consequence for `dispatch` - -Since the primary consumer of `dispatch` is I/O completion -- called _after_ the coroutine is suspended, from the event loop -- `dispatch` does not need to participate in symmetric transfer. It calls `h.resume()` inline when safe and returns `void`. A conforming implementation looks like: +A conforming implementation: [source,cpp] ---- -void dispatch(std::coroutine_handle<> h) const +std::coroutine_handle<> dispatch(continuation& c) const { if(ctx_.running_in_this_thread()) - h.resume(); - else - post(h); + return c.h; // symmetric transfer + post(c); + return std::noop_coroutine(); } ---- -After `dispatch` returns, the state of `h` is unspecified. The coroutine may have completed, been destroyed, or suspended at a different point. Callers must not use `h` after calling `dispatch`. +This design enables the common fast path -- same-executor dispatch at `final_suspend` -- to avoid queuing entirely, transferring control directly via symmetric transfer. + +=== I/O Awaitables and Symmetric Transfer + +I/O awaitables return `std::noop_coroutine()` from `await_suspend` rather than a handle for symmetric transfer. The I/O operation is initiated during `await_suspend`, but completion comes from the reactor or proactor asynchronously. The awaitable cannot know which coroutine to transfer to at suspension time. + +Symmetric transfer from `dispatch` is used at a different level: when a child coroutine completes and its `final_suspend` dispatches the parent's continuation through the executor. If the parent is on the same executor, `dispatch` returns the parent's handle for direct symmetric transfer. If not, it queues the continuation and returns `std::noop_coroutine()`. [[why-not-defer]] == Why Two Operations, Not Three @@ -171,17 +157,28 @@ When `task::await_suspend` returns the parent's coroutine handle, the compile Corosio confirms this in practice: its entire I/O layer -- sockets, acceptors, timers, resolvers, signals -- across all three backends (epoll, IOCP, select) uses only `dispatch` and `post`. No code path requires `defer`. -== Why `std::coroutine_handle<>`, Not Typed Handles +== Why `continuation`, Not Raw `coroutine_handle<>` + +The executor accepts `continuation&` rather than `std::coroutine_handle<>`. A `continuation` wraps the handle with an intrusive `next` pointer for zero-allocation queuing: -The executor accepts `std::coroutine_handle<>` -- the type-erased handle -- rather than `std::coroutine_handle

` for a specific promise type `P`. +[source,cpp] +---- +struct continuation +{ + std::coroutine_handle<> h; + continuation* next = nullptr; +}; +---- + +This design has three consequences: -This decision has three consequences: +- **Zero-allocation posting.** The thread pool links the `continuation` directly into its work queue via `next`. No `new work(h)` per post. The queue node is embedded in the thing being queued -- the awaitable, combinator state, or trampoline promise that owns the continuation. -- **Type erasure is possible.** `executor_ref` wraps any executor behind a uniform interface. If `dispatch` and `post` were templated on the promise type, the vtable would need to be generic over all promise types, making type erasure impractical. +- **Type erasure remains possible.** `executor_ref` wraps any executor behind a uniform vtable. The vtable function pointers accept `continuation&`, which is a concrete type. No templates on promise type are needed. -- **Executor implementations are independent of coroutine internals.** An executor schedules resumption. It does not need to know what the coroutine's promise type is, what value it produces, or how it handles exceptions. The type-erased handle provides exactly the right interface: `resume()` and nothing else. +- **I/O operation structures stay simple.** Every I/O awaitable embeds a `continuation` for the caller's handle and an `executor_ref` for the executor. Both are non-templated, keeping I/O backend code non-generic and out of headers. -- **I/O operation structures stay simple.** Every pending I/O operation in Corosio stores two fields: `std::coroutine_handle<> h` (a typedef for `std::coroutine_handle<>`) and `capy::executor_ref ex`. Both are type-erased. The operation structure does not need to be templated on the coroutine's promise type, which keeps the I/O backend code non-generic and out of headers. +The handle within the `continuation` is still type-erased (`std::coroutine_handle<>`) for the same reasons that applied before: executor implementations are independent of coroutine internals, and the type-erased handle provides exactly the right interface (`resume()` and nothing else). == Why Nothrow Copy and Move @@ -297,49 +294,33 @@ The executor concept is designed around a single use case: I/O completion dispat === Capture at Initiation -When a coroutine `co_await`s an I/O awaitable, the awaitable's `await_suspend` receives the caller's executor and stores it as `executor_ref`: +When a coroutine `co_await`s an I/O awaitable, the awaitable's `await_suspend` receives the caller's handle and executor. The awaitable embeds a `continuation` for the caller's handle: [source,cpp] ---- -template -auto await_suspend( +std::coroutine_handle<> +await_suspend( std::coroutine_handle<> h, - Ex const& ex) -> std::coroutine_handle<> + io_env const* env) noexcept { - // ex is captured as executor_ref in the operation - return socket_.connect(h, ex, endpoint_, token_, &ec_); + cont_.h = h; + ex_ = env->executor; + // ... initiate I/O operation ... + return std::noop_coroutine(); } ---- -The operation structure stores both the coroutine handle and the executor reference: - -[source,cpp] ----- -struct io_op : scheduler_op -{ - std::coroutine_handle<> h; - capy::executor_ref ex; - // ... error codes, buffers, etc. -}; ----- - === Dispatch at Completion -When the I/O completes (from the reactor thread for epoll, the completion port for IOCP, or the select loop), the operation uses the stored executor to resume the coroutine: +When the I/O completes (from the reactor thread for epoll, the completion port for IOCP, or the select loop), the awaitable uses the stored executor to resume the coroutine via the embedded continuation: [source,cpp] ---- -void operator()() override -{ - // ... set error codes ... - capy::executor_ref saved_ex(std::move(ex)); - std::coroutine_handle<> saved_h(std::move(h)); - impl_ptr.reset(); - saved_ex.dispatch(saved_h); -} +// Timer fires or I/O completes: +ex_.post(cont_); ---- -`dispatch` checks whether the current thread is already running on the executor's context. If so, the coroutine resumes inline. If not, the coroutine is posted for later execution on the correct context. +`post` links the continuation into the executor's work queue via `cont_.next`. No heap allocation occurs -- the continuation is embedded in the awaitable, which is alive for the duration of the suspension. A worker thread dequeues the continuation and calls `cont_.h.resume()`. === Platform Independence @@ -376,8 +357,8 @@ public: void on_work_started() const noexcept; void on_work_finished() const noexcept; - void dispatch(std::coroutine_handle<> h) const; - void post(std::coroutine_handle<> h) const; + std::coroutine_handle<> dispatch(continuation& c) const; + void post(continuation& c) const; bool operator==(my_executor const&) const noexcept; }; @@ -385,6 +366,6 @@ public: == Summary -The `Executor` concept provides `dispatch` and `post` for coroutine scheduling, work tracking for event loop lifetime, and `context()` for service access. The design descends from Asio's executor model but is adapted for coroutines: `defer` is replaced by symmetric transfer, function objects are replaced by `std::coroutine_handle<>`, and `dispatch` returns `void` because I/O completions are dispatched after suspension, not during it. +The `Executor` concept provides `dispatch` and `post` for coroutine scheduling, work tracking for event loop lifetime, and `context()` for service access. The design descends from Asio's executor model but is adapted for coroutines: `defer` is replaced by symmetric transfer, function objects are replaced by `continuation&` for zero-allocation intrusive queuing, and `dispatch` returns `std::coroutine_handle<>` for symmetric transfer at `final_suspend`. -`executor_ref` type-erases any executor into two pointers, enabling platform-independent I/O completion dispatch with zero allocation and predictable cache behavior. The capture-at-initiation / dispatch-at-completion pattern is the fundamental use case the concept serves. +`executor_ref` type-erases any executor into two pointers, enabling platform-independent I/O completion dispatch with zero allocation and predictable cache behavior. The capture-at-initiation / dispatch-at-completion pattern is the fundamental use case the concept serves. \ No newline at end of file diff --git a/doc/unlisted/execution-executors.adoc b/doc/unlisted/execution-executors.adoc index 81ee0dc5..315ab113 100644 --- a/doc/unlisted/execution-executors.adoc +++ b/doc/unlisted/execution-executors.adoc @@ -40,15 +40,15 @@ A dispatcher is a callable that schedules coroutine resumption: [source,cpp] ---- -template -concept dispatcher = requires(D const& d, std::coroutine_handle

h) { - { d(h) } -> std::convertible_to>; +template +concept dispatcher = requires(D const& d, continuation& c) { + { d(c) } -> std::convertible_to>; }; ---- -When invoked with a coroutine handle, the dispatcher: +When invoked with a `continuation&`, the dispatcher: -1. Schedules the handle for resumption (inline or queued) +1. Schedules the continuation for resumption (inline or queued) 2. Returns a handle suitable for symmetric transfer === Example Dispatcher @@ -57,9 +57,9 @@ When invoked with a coroutine handle, the dispatcher: ---- struct inline_dispatcher { - std::coroutine_handle<> operator()(std::coroutine_handle<> h) const + std::coroutine_handle<> operator()(continuation& c) const { - return h; // Resume inline via symmetric transfer + return c.h; // Resume inline via symmetric transfer } }; @@ -67,9 +67,9 @@ struct queuing_dispatcher { work_queue* queue_; - std::coroutine_handle<> operator()(std::coroutine_handle<> h) const + std::coroutine_handle<> operator()(continuation& c) const { - queue_->push(h); + queue_->push(c.h); return std::noop_coroutine(); // Caller returns to event loop } }; @@ -85,13 +85,12 @@ template concept executor = std::copy_constructible && std::equality_comparable && - requires(E const& ce, std::coroutine_handle<> h) { + requires(E const& ce, continuation& c) { { ce.context() } -> /* reference to execution context */; { ce.on_work_started() } noexcept; { ce.on_work_finished() } noexcept; - { ce(h) } -> std::convertible_to>; - { ce.post(h) }; - { ce.defer(h) }; + { ce.dispatch(c) } -> std::same_as>; + { ce.post(c) }; }; ---- @@ -101,21 +100,17 @@ concept executor = |=== | Operation | Behavior -| `operator()(h)` -| Run inline if safe, otherwise queue. Cheapest path. Also serves as dispatcher interface. +| `dispatch(c)` +| Run inline if safe, otherwise queue. Returns `std::coroutine_handle<>` for symmetric transfer. -| `post(h)` +| `post(c)` | Always queue, never inline. Guaranteed asynchrony. - -| `defer(h)` -| Always queue with "this is my continuation" hint. Enables optimizations. |=== **When to use each:** -* `operator()` — Default choice. Allows the executor to optimize. +* `dispatch` — Default choice. Allows the executor to optimize with inline resumption. * `post` — When you need guaranteed asynchrony (e.g., releasing a lock first). -* `defer` — When posting your own continuation (enables thread-local queuing). === Work Tracking @@ -141,7 +136,7 @@ Awaitables that participate in affinity propagation implement `affine_awaitable` ---- template concept affine_awaitable = - dispatcher && + dispatcher && requires(A a, std::coroutine_handle

h, D const& d) { a.await_suspend(h, d); }; @@ -158,7 +153,7 @@ Awaitables with cancellation support implement `stoppable_awaitable`: ---- template concept stoppable_awaitable = - affine_awaitable && + affine_awaitable && requires(A a, std::coroutine_handle

h, D const& d, std::stop_token t) { a.await_suspend(h, d, t); }; @@ -171,7 +166,7 @@ Stoppable awaitables provide _both_ overloads of `await_suspend`. Executors have specific thread safety guarantees: * Copy constructor, comparison, `context()` — always thread-safe -* `operator()`, `post`, `defer` — thread-safe for concurrent calls +* `dispatch`, `post` — thread-safe for concurrent calls * `on_work_started`, `on_work_finished` — thread-safe, must not throw == Executor Validity @@ -184,7 +179,7 @@ thread_pool pool; auto ex = pool.get_executor(); // When pool is destroyed, ex becomes invalid -// WARNING: Calling ex() after pool destruction is undefined behavior +// WARNING: Calling ex.dispatch() after pool destruction is undefined behavior ---- The copy constructor and `context()` remain valid until the context is diff --git a/doc/unlisted/io-awaitables-executor.adoc b/doc/unlisted/io-awaitables-executor.adoc index 7394163e..d5644a2c 100644 --- a/doc/unlisted/io-awaitables-executor.adoc +++ b/doc/unlisted/io-awaitables-executor.adoc @@ -20,9 +20,9 @@ the executor determines _where_ and _when_ the coroutine resumes: [source,cpp] ---- // Completion arrives on I/O thread -void on_io_complete(std::coroutine_handle<> h) +void on_io_complete(continuation& c) { - executor.dispatch(h).resume(); // Resume on executor's context + executor.dispatch(c).resume(); // Resume on executor's context } ---- @@ -36,10 +36,10 @@ Capy's `Executor` concept requires two operations: [source,cpp] ---- template -concept Executor = requires(Ex const& ex, std::coroutine_handle<> h) +concept Executor = requires(Ex const& ex, continuation& c) { - { ex.dispatch(h) } -> std::same_as>; - { ex.post(h) } -> std::same_as; + { ex.dispatch(c) } -> std::same_as>; + { ex.post(c) } -> std::same_as; }; ---- @@ -52,11 +52,11 @@ Returns a handle to resume. The implementation decides whether to: [source,cpp] ---- -std::coroutine_handle<> dispatch(std::coroutine_handle<> h) const +std::coroutine_handle<> dispatch(continuation& c) const { if (running_in_this_context()) - return h; // Resume inline - queue_work(h); + return c.h; // Resume inline + queue_work(c.h); return std::noop_coroutine(); // Don't resume now } ---- @@ -69,9 +69,9 @@ Always queues, never executes inline: [source,cpp] ---- -void post(std::coroutine_handle<> h) const +void post(continuation& c) const { - queue_work(h); // Always queue, never inline + queue_work(c.h); // Always queue, never inline } ---- @@ -92,8 +92,8 @@ thread_pool pool(4); executor_ref ex = pool.get_executor(); // Use uniformly -ex.dispatch(handle); -ex.post(handle); +ex.dispatch(cont); +ex.post(cont); ---- === Why Type Erasure? @@ -128,7 +128,7 @@ executor_ref ex = some_executor; // Check if valid if (ex) - ex.dispatch(h); + ex.dispatch(c); // Compare (pointer equality on underlying executor) executor_ref ex2 = some_executor; @@ -288,7 +288,7 @@ task example() } // Use for manual dispatch - ex.post(some_handle); + ex.post(some_continuation); } ---- @@ -307,11 +307,11 @@ actually suspends—`await_ready()` returns `true`. | `executor_ref` | Type-erased executor wrapper -| `dispatch(h)` -| Resume inline if safe, queue otherwise +| `dispatch(c)` +| Resume inline if safe, queue otherwise. Takes `continuation&`, returns `std::coroutine_handle<>`. -| `post(h)` -| Always queue, never inline +| `post(c)` +| Always queue, never inline. Takes `continuation&`. | `run(ex)(task)` | Override inherited executor for a subtask diff --git a/example/asio/api/capy_streams.cpp b/example/asio/api/capy_streams.cpp index 253e0626..ccbd14b8 100644 --- a/example/asio/api/capy_streams.cpp +++ b/example/asio/api/capy_streams.cpp @@ -83,6 +83,7 @@ class asio_socket MB buffers_; capy::io_result result_; std::shared_ptr cancel_; + capy::continuation cont_; public: read_awaitable( @@ -103,19 +104,20 @@ class asio_socket std::coroutine_handle<> h, capy::io_env const* env) { + cont_.h = h; cancel_ = std::make_shared(env->stop_token); self_->socket_.async_read_some( capy::to_asio(capy::mutable_buffer_array<8>(buffers_)), net::bind_cancellation_slot( cancel_->signal.slot(), - [this, h, ex = env->executor]( + [this, ex = env->executor]( boost::system::error_code ec, std::size_t n) mutable { result_.ec = ec; std::get<0>(result_.values) = n; - ex.post(h); + ex.post(cont_); })); return std::noop_coroutine(); @@ -145,6 +147,7 @@ class asio_socket CB buffers_; capy::io_result result_; std::shared_ptr cancel_; + capy::continuation cont_; public: write_awaitable( @@ -165,19 +168,20 @@ class asio_socket std::coroutine_handle<> h, capy::io_env const* env) { + cont_.h = h; cancel_ = std::make_shared(env->stop_token); self_->socket_.async_write_some( capy::to_asio(capy::const_buffer_array<8>(buffers_)), net::bind_cancellation_slot( cancel_->signal.slot(), - [this, h, ex = env->executor]( + [this, ex = env->executor]( boost::system::error_code ec, std::size_t n) mutable { result_.ec = ec; std::get<0>(result_.values) = n; - ex.post(h); + ex.post(cont_); })); return std::noop_coroutine(); @@ -235,14 +239,16 @@ class asio_executor void on_work_started() const noexcept {} void on_work_finished() const noexcept {} - std::coroutine_handle<> dispatch(std::coroutine_handle<> h) const + std::coroutine_handle<> dispatch(continuation& c) const { + auto h = c.h; net::post(ex_, [h]{ h.resume(); }); return std::noop_coroutine(); } - void post(std::coroutine_handle<> h) const + void post(continuation& c) const { + auto h = c.h; net::post(ex_, [h]{ h.resume(); }); } }; diff --git a/example/asio/api/uni_stream.hpp b/example/asio/api/uni_stream.hpp index b1a2962c..93964cde 100644 --- a/example/asio/api/uni_stream.hpp +++ b/example/asio/api/uni_stream.hpp @@ -55,14 +55,16 @@ class asio_executor_wrapper void on_work_started() const noexcept {} void on_work_finished() const noexcept {} - std::coroutine_handle<> dispatch(std::coroutine_handle<> h) const + std::coroutine_handle<> dispatch(continuation& c) const { + auto h = c.h; net::post(ex_, [h]{ h.resume(); }); return std::noop_coroutine(); } - void post(std::coroutine_handle<> h) const + void post(continuation& c) const { + auto h = c.h; net::post(ex_, [h]{ h.resume(); }); } }; diff --git a/example/custom-executor/custom_executor.cpp b/example/custom-executor/custom_executor.cpp index f90bc203..a7ecdb89 100644 --- a/example/custom-executor/custom_executor.cpp +++ b/example/custom-executor/custom_executor.cpp @@ -95,17 +95,17 @@ class run_loop::executor_type void on_work_finished() const noexcept {} std::coroutine_handle<> dispatch( - std::coroutine_handle<> h) const + capy::continuation& c) const { if (loop_->is_running_on_this_thread()) - return h; - loop_->enqueue(h); + return c.h; + loop_->enqueue(c.h); return std::noop_coroutine(); } - void post(std::coroutine_handle<> h) const + void post(capy::continuation& c) const { - loop_->enqueue(h); + loop_->enqueue(c.h); } bool operator==(executor_type const& other) const noexcept diff --git a/include/boost/capy/concept/executor.hpp b/include/boost/capy/concept/executor.hpp index dc9d1a20..94608b26 100644 --- a/include/boost/capy/concept/executor.hpp +++ b/include/boost/capy/concept/executor.hpp @@ -11,6 +11,7 @@ #define BOOST_CAPY_CONCEPT_EXECUTOR_HPP #include +#include #include #include @@ -39,13 +40,13 @@ class execution_context; @par Syntactic Requirements @li `E` must be nothrow copy and move constructible - @li `e1 == e2` must return a type convertible to `bool`, `noexcept` - @li `e.context()` must return an lvalue reference to a type derived + @li `ce == ce2` must return a type convertible to `bool`, `noexcept` + @li `ce.context()` must return an lvalue reference to a type derived from `execution_context`, `noexcept` - @li `e.on_work_started()` must be valid and `noexcept` - @li `e.on_work_finished()` must be valid and `noexcept` - @li `e.dispatch(h)` must return `std::coroutine_handle<>` - @li `e.post(h)` must be valid + @li `ce.on_work_started()` must be valid and `noexcept` + @li `ce.on_work_finished()` must be valid and `noexcept` + @li `ce.dispatch(c)` must return `std::coroutine_handle<>` + @li `ce.post(c)` must be valid @par Semantic Requirements @@ -75,9 +76,9 @@ class execution_context; resumed coroutine. @li If the executor determines it is safe to resume inline - (e.g., already on the correct thread), returns `h` for + (e.g., already on the correct thread), returns `c.h` for the caller to use in symmetric transfer - @li Otherwise, posts the coroutine for later execution and + @li Otherwise, posts the continuation for later execution and returns `std::noop_coroutine()` @li The caller is responsible for using the returned handle appropriately: returning it from `await_suspend` for @@ -88,11 +89,11 @@ class execution_context; @code std::coroutine_handle<> dispatch( - std::coroutine_handle<> h ) const + continuation& c ) const { if( ctx_.is_running_on_this_thread() ) - return h; // symmetric transfer - post( h ); + return c.h; // symmetric transfer + post( c ); return std::noop_coroutine(); } @endcode @@ -102,6 +103,21 @@ class execution_context; @li Never blocks the caller @li The coroutine executes on the executor's associated context + @par Continuation Lifetime + + Both `dispatch` and `post` operate on the caller's + continuation object by reference. The continuation must + remain at a stable address and must not be moved or + destroyed until the executor has dequeued and resumed it. + Destroying or moving a continuation while it is linked + into an executor's queue is undefined behavior. + + When `dispatch` returns `c.h` (the inline case), the + continuation is not enqueued and may be reused or + destroyed immediately. When `dispatch` falls through to + `post`, the continuation is enqueued and the lifetime + requirement applies. + @par Executor Validity An executor becomes invalid when the first call to @@ -126,8 +142,8 @@ class execution_context; void on_work_finished() const noexcept; std::coroutine_handle<> dispatch( - std::coroutine_handle<> h ) const; - void post( std::coroutine_handle<> h ) const; + continuation& c ) const; + void post( continuation& c ) const; bool operator==( E const& ) const noexcept; }; @@ -139,7 +155,7 @@ template concept Executor = std::is_nothrow_copy_constructible_v && std::is_nothrow_move_constructible_v && - requires(E& e, E const& ce, E const& ce2, std::coroutine_handle<> h) { + requires(E& e, E const& ce, E const& ce2, continuation c) { { ce == ce2 } noexcept -> std::convertible_to; { ce.context() } noexcept; requires std::is_lvalue_reference_v && @@ -149,8 +165,8 @@ concept Executor = { ce.on_work_started() } noexcept; { ce.on_work_finished() } noexcept; - { ce.dispatch(h) } -> std::same_as>; - { ce.post(h) }; + { ce.dispatch(c) } -> std::same_as>; + { ce.post(c) }; }; } // capy diff --git a/include/boost/capy/concept/io_awaitable.hpp b/include/boost/capy/concept/io_awaitable.hpp index 5ab4e327..a1942e37 100644 --- a/include/boost/capy/concept/io_awaitable.hpp +++ b/include/boost/capy/concept/io_awaitable.hpp @@ -79,16 +79,19 @@ namespace capy { struct my_io_op { io_env const* env_ = nullptr; - std::coroutine_handle<> cont_; + continuation cont_; auto await_suspend( std::coroutine_handle<> h, io_env const* env ) { env_ = env; - cont_ = h; + cont_ = continuation{h}; // Pass members by value; capturing this - // risks use-after-free in async callbacks + // risks use-after-free in async callbacks. + // When the async operation completes, resume + // via executor.post(cont_) or executor.dispatch(cont_) + // rather than calling h.resume() directly. start_async( env_->stop_token, env_->executor, diff --git a/include/boost/capy/continuation.hpp b/include/boost/capy/continuation.hpp new file mode 100644 index 00000000..765d0585 --- /dev/null +++ b/include/boost/capy/continuation.hpp @@ -0,0 +1,78 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_CONTINUATION_HPP +#define BOOST_CAPY_CONTINUATION_HPP + +#include + +#include + +namespace boost { +namespace capy { + +/** Executor-facing schedulable unit. + + Wraps a `std::coroutine_handle<>` with an intrusive list + pointer so executors can queue continuations without + per-post heap allocation. + + @par Fields + + @li `h` — the coroutine handle to resume. Set by the + code that creates or reuses the continuation (typically + an I/O awaitable or combinator). Read by the executor + when it dequeues the continuation. + + @li `next` — intrusive linked-list pointer, owned and + managed exclusively by executor implementations. Users + must not read or write `next` while the continuation + is enqueued. + + @par Ownership and Lifetime + + The continuation is owned by the site that embeds it (an + I/O awaitable, combinator state, or trampoline promise). + The executor borrows it by reference for the duration of + the queue residency. + + A continuation must have a **stable address** while it is + linked into an executor's queue. It must not be moved, + destroyed, or enqueued in more than one queue concurrently. + + @par Copy and Move + + Trivially copyable and movable (aggregate of a handle and + a pointer). However, copying or moving a queued + continuation produces a second object whose `next` is + stale — the executor still points to the original. Copy + and move are safe only when the continuation is not + enqueued. + + @par Thread Safety + + A single continuation must not be accessed concurrently + without external synchronization. In practice, the + creating thread sets `h` and calls `executor.post(c)`; + the executor's worker thread later reads `h` and calls + `h.resume()`. The executor's internal locking provides + the necessary synchronization between these two accesses. + + @see Executor, executor_ref +*/ +struct continuation +{ + std::coroutine_handle<> h; + continuation* next = nullptr; +}; + +} // namespace capy +} // namespace boost + +#endif diff --git a/include/boost/capy/delay.hpp b/include/boost/capy/delay.hpp index 94340398..279fd9f4 100644 --- a/include/boost/capy/delay.hpp +++ b/include/boost/capy/delay.hpp @@ -11,6 +11,7 @@ #define BOOST_CAPY_DELAY_HPP #include +#include #include #include #include @@ -71,6 +72,7 @@ class delay_awaitable // Declared before stop_cb_buf_: the callback // accesses these members, so they must still be // alive if the stop_cb_ destructor blocks. + continuation cont_; std::atomic claimed_{false}; bool canceled_ = false; bool stop_cb_active_ = false; @@ -79,7 +81,6 @@ class delay_awaitable { delay_awaitable* self_; executor_ref ex_; - std::coroutine_handle<> h_; void operator()() const noexcept { @@ -87,7 +88,7 @@ class delay_awaitable true, std::memory_order_acq_rel)) { self_->canceled_ = true; - ex_.post(h_); + ex_.post(self_->cont_); } } }; @@ -120,6 +121,7 @@ class delay_awaitable : dur_(o.dur_) , ts_(o.ts_) , tid_(o.tid_) + , cont_(o.cont_) , claimed_(o.claimed_.load(std::memory_order_relaxed)) , canceled_(o.canceled_) , stop_cb_active_(std::exchange(o.stop_cb_active_, false)) @@ -155,23 +157,24 @@ class delay_awaitable return h; } + cont_.h = h; ts_ = &env->executor.context().use_service(); // Schedule timer (won't fire inline since deadline is in the future) tid_ = ts_->schedule_after(dur_, - [this, h, ex = env->executor]() + [this, ex = env->executor]() { if(!claimed_.exchange( true, std::memory_order_acq_rel)) { - ex.post(h); + ex.post(cont_); } }); // Register stop callback (may fire inline) ::new(stop_cb_buf_) stop_cb_t( env->stop_token, - cancel_fn{this, env->executor, h}); + cancel_fn{this, env->executor}); stop_cb_active_ = true; return std::noop_coroutine(); diff --git a/include/boost/capy/ex/any_executor.hpp b/include/boost/capy/ex/any_executor.hpp index de95df3c..fa8cad49 100644 --- a/include/boost/capy/ex/any_executor.hpp +++ b/include/boost/capy/ex/any_executor.hpp @@ -11,6 +11,7 @@ #define BOOST_CAPY_ANY_EXECUTOR_HPP #include +#include #include #include #include @@ -91,8 +92,8 @@ class any_executor virtual execution_context& context() const noexcept = 0; virtual void on_work_started() const noexcept = 0; virtual void on_work_finished() const noexcept = 0; - virtual std::coroutine_handle<> dispatch(std::coroutine_handle<>) const = 0; - virtual void post(std::coroutine_handle<>) const = 0; + virtual std::coroutine_handle<> dispatch(continuation&) const = 0; + virtual void post(continuation&) const = 0; virtual bool equals(impl_base const*) const noexcept = 0; virtual std::type_info const& target_type() const noexcept = 0; }; @@ -123,14 +124,14 @@ class any_executor ex_.on_work_finished(); } - std::coroutine_handle<> dispatch(std::coroutine_handle<> h) const override + std::coroutine_handle<> dispatch(continuation& c) const override { - return ex_.dispatch(h); + return ex_.dispatch(c); } - void post(std::coroutine_handle<> h) const override + void post(continuation& c) const override { - ex_.post(h); + ex_.post(c); } bool equals(impl_base const* other) const noexcept override @@ -240,36 +241,39 @@ class any_executor p_->on_work_finished(); } - /** Dispatches a coroutine handle through the wrapped executor. + /** Dispatches a continuation through the wrapped executor. Returns a handle for symmetric transfer. If running in the - executor's thread, returns `h`. Otherwise, posts the coroutine - for later execution and returns `std::noop_coroutine()`. + executor's thread, returns `c.h`. Otherwise, posts the + continuation for later execution and returns + `std::noop_coroutine()`. - @param h The coroutine handle to dispatch for resumption. + @param c The continuation to dispatch for resumption. + Must remain at a stable address until dequeued. @return A handle for symmetric transfer or `std::noop_coroutine()`. @pre This instance holds a valid executor. */ - std::coroutine_handle<> dispatch(std::coroutine_handle<> h) const + std::coroutine_handle<> dispatch(continuation& c) const { - return p_->dispatch(h); + return p_->dispatch(c); } - /** Posts a coroutine handle to the wrapped executor. + /** Posts a continuation to the wrapped executor. - Posts the coroutine handle to the executor for later execution + Posts the continuation to the executor for later execution and returns. The caller should transfer to `std::noop_coroutine()` after calling this. - @param h The coroutine handle to post for resumption. + @param c The continuation to post for resumption. + Must remain at a stable address until dequeued. @pre This instance holds a valid executor. */ - void post(std::coroutine_handle<> h) const + void post(continuation& c) const { - p_->post(h); + p_->post(c); } /** Compares two executor wrappers for equality. diff --git a/include/boost/capy/ex/async_event.hpp b/include/boost/capy/ex/async_event.hpp index db89053a..ed254b2b 100644 --- a/include/boost/capy/ex/async_event.hpp +++ b/include/boost/capy/ex/async_event.hpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -113,7 +114,7 @@ class async_event friend class async_event; async_event* e_; - std::coroutine_handle<> h_; + continuation cont_; executor_ref ex_; // Declared before stop_cb_buf_: the callback @@ -134,7 +135,7 @@ class async_event true, std::memory_order_acq_rel)) { self_->canceled_ = true; - self_->ex_.post(self_->h_); + self_->ex_.post(self_->cont_); } } }; @@ -173,7 +174,7 @@ class async_event wait_awaiter(wait_awaiter&& o) noexcept : e_(o.e_) - , h_(o.h_) + , cont_(o.cont_) , ex_(o.ex_) , claimed_(o.claimed_.load( std::memory_order_relaxed)) @@ -203,7 +204,7 @@ class async_event canceled_ = true; return h; } - h_ = h; + cont_.h = h; ex_ = env->executor; e_->waiters_.push_back(this); in_list_ = true; @@ -278,7 +279,7 @@ class async_event if(!w->claimed_.exchange( true, std::memory_order_acq_rel)) { - w->ex_.post(w->h_); + w->ex_.post(w->cont_); } } } diff --git a/include/boost/capy/ex/async_mutex.hpp b/include/boost/capy/ex/async_mutex.hpp index 26acd313..90c9a4a1 100644 --- a/include/boost/capy/ex/async_mutex.hpp +++ b/include/boost/capy/ex/async_mutex.hpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -164,7 +165,7 @@ class async_mutex friend class async_mutex; async_mutex* m_; - std::coroutine_handle<> h_; + continuation cont_; executor_ref ex_; // These members must be declared before stop_cb_ @@ -183,7 +184,7 @@ class async_mutex true, std::memory_order_acq_rel)) { self_->canceled_ = true; - self_->ex_.post(self_->h_); + self_->ex_.post(self_->cont_); } } }; @@ -223,7 +224,7 @@ class async_mutex lock_awaiter(lock_awaiter&& o) noexcept : m_(o.m_) - , h_(o.h_) + , cont_(o.cont_) , ex_(o.ex_) , claimed_(o.claimed_.load( std::memory_order_relaxed)) @@ -257,7 +258,7 @@ class async_mutex canceled_ = true; return h; } - h_ = h; + cont_.h = h; ex_ = env->executor; m_->waiters_.push_back(this); ::new(stop_cb_buf_) stop_cb_t( @@ -422,7 +423,7 @@ class async_mutex if(!waiter->claimed_.exchange( true, std::memory_order_acq_rel)) { - waiter->ex_.post(waiter->h_); + waiter->ex_.post(waiter->cont_); return; } } diff --git a/include/boost/capy/ex/executor_ref.hpp b/include/boost/capy/ex/executor_ref.hpp index 3810939c..f6010b7b 100644 --- a/include/boost/capy/ex/executor_ref.hpp +++ b/include/boost/capy/ex/executor_ref.hpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -30,8 +31,8 @@ struct executor_vtable execution_context& (*context)(void const*) noexcept; void (*on_work_started)(void const*) noexcept; void (*on_work_finished)(void const*) noexcept; - void (*post)(void const*, std::coroutine_handle<>); - std::coroutine_handle<> (*dispatch)(void const*, std::coroutine_handle<>); + void (*post)(void const*, continuation&); + std::coroutine_handle<> (*dispatch)(void const*, continuation&); bool (*equals)(void const*, void const*) noexcept; detail::type_info const* type_id; }; @@ -52,12 +53,12 @@ inline constexpr executor_vtable vtable_for = { const_cast(static_cast(p))->on_work_finished(); }, // post - [](void const* p, std::coroutine_handle<> h) { - static_cast(p)->post(h); + [](void const* p, continuation& c) { + static_cast(p)->post(c); }, // dispatch - [](void const* p, std::coroutine_handle<> h) -> std::coroutine_handle<> { - return static_cast(p)->dispatch(h); + [](void const* p, continuation& c) -> std::coroutine_handle<> { + return static_cast(p)->dispatch(c); }, // equals [](void const* a, void const* b) noexcept -> bool { @@ -97,7 +98,7 @@ inline constexpr executor_vtable vtable_for = { void store_executor(executor_ref ex) { if(ex) - ex.post(my_coroutine); + ex.post(my_continuation); } io_context ctx; @@ -198,36 +199,39 @@ class executor_ref vt_->on_work_finished(ex_); } - /** Dispatches a coroutine handle through the wrapped executor. + /** Dispatches a continuation through the wrapped executor. Returns a handle for symmetric transfer. If running in the - executor's thread, returns `h`. Otherwise, posts the coroutine - for later execution and returns `std::noop_coroutine()`. + executor's thread, returns `c.h`. Otherwise, posts the + continuation for later execution and returns + `std::noop_coroutine()`. - @param h The coroutine handle to dispatch for resumption. + @param c The continuation to dispatch for resumption. + Must remain at a stable address until dequeued. @return A handle for symmetric transfer or `std::noop_coroutine()`. @pre This instance was constructed with a valid executor. */ - std::coroutine_handle<> dispatch(std::coroutine_handle<> h) const + std::coroutine_handle<> dispatch(continuation& c) const { - return vt_->dispatch(ex_, h); + return vt_->dispatch(ex_, c); } - /** Posts a coroutine handle to the wrapped executor. + /** Posts a continuation to the wrapped executor. - Posts the coroutine handle to the executor for later execution + Posts the continuation to the executor for later execution and returns. The caller should transfer to `std::noop_coroutine()` after calling this. - @param h The coroutine handle to post for resumption. + @param c The continuation to post for resumption. + Must remain at a stable address until dequeued. @pre This instance was constructed with a valid executor. */ - void post(std::coroutine_handle<> h) const + void post(continuation& c) const { - vt_->post(ex_, h); + vt_->post(ex_, c); } /** Compares two executor references for equality. diff --git a/include/boost/capy/ex/io_env.hpp b/include/boost/capy/ex/io_env.hpp index 77696af3..1c42feb3 100644 --- a/include/boost/capy/ex/io_env.hpp +++ b/include/boost/capy/ex/io_env.hpp @@ -20,11 +20,11 @@ namespace boost { namespace capy { -/** Callable that posts a coroutine handle to an executor. +/** Callable that posts a continuation to an executor. Use this as the callback type for `std::stop_callback` instead - of a raw `std::coroutine_handle<>`. Raw handles resume the - coroutine inline on whatever thread calls `request_stop()`, + of resuming a coroutine handle directly. Direct resumption runs + the coroutine inline on whatever thread calls `request_stop()`, which bypasses the executor and corrupts the thread-local frame allocator. @@ -36,13 +36,13 @@ namespace capy { struct resume_via_post { executor_ref ex; - std::coroutine_handle<> h; + mutable continuation cont; // post() must not throw; stop_callback requires a // non-throwing invocable. void operator()() const noexcept { - ex.post(h); + ex.post(cont); } }; @@ -99,17 +99,20 @@ struct io_env /** Create a resume_via_post callable for this environment. Convenience method for registering @ref stop_resume_callback - instances. Equivalent to `resume_via_post{executor, h}`. + instances. Wraps the coroutine handle in a @ref continuation + and pairs it with this environment's executor. Equivalent to + `resume_via_post{executor, continuation{h}}`. @par Example @code stop_cb_.emplace(env->stop_token, env->post_resume(h)); @endcode - @param h The coroutine handle to post on cancellation. + @param h The coroutine handle to wrap in a continuation + and post on cancellation. @return A @ref resume_via_post callable that holds a - non-owning @ref executor_ref and the coroutine handle. + non-owning @ref executor_ref and a @ref continuation. The callable must not outlive the executor it references. @see resume_via_post, stop_resume_callback @@ -117,7 +120,7 @@ struct io_env resume_via_post post_resume(std::coroutine_handle<> h) const noexcept { - return resume_via_post{executor, h}; + return resume_via_post{executor, continuation{h}}; } }; diff --git a/include/boost/capy/ex/run.hpp b/include/boost/capy/ex/run.hpp index 40d13aad..d6210e9e 100644 --- a/include/boost/capy/ex/run.hpp +++ b/include/boost/capy/ex/run.hpp @@ -74,7 +74,7 @@ struct dispatch_trampoline struct promise_type { executor_ref caller_ex_; - std::coroutine_handle<> parent_; + continuation parent_; dispatch_trampoline get_return_object() noexcept { @@ -169,6 +169,7 @@ struct [[nodiscard]] run_awaitable_ex std::conditional_t st_; io_env env_; dispatch_trampoline tr_; + continuation task_cont_; Task inner_; // Last: destroyed first, while env_ is still valid // void allocator, inherit stop token @@ -223,7 +224,7 @@ struct [[nodiscard]] run_awaitable_ex { tr_ = make_dispatch_trampoline(); tr_.h_.promise().caller_ex_ = caller_env->executor; - tr_.h_.promise().parent_ = cont; + tr_.h_.promise().parent_.h = cont; auto h = inner_.handle(); auto& p = h.promise(); @@ -241,7 +242,8 @@ struct [[nodiscard]] run_awaitable_ex env_.frame_allocator = caller_env->frame_allocator; p.set_environment(&env_); - return ex_.dispatch(h); + task_cont_.h = h; + return ex_.dispatch(task_cont_); } // Non-copyable diff --git a/include/boost/capy/ex/run_async.hpp b/include/boost/capy/ex/run_async.hpp index 4f559c5d..c4627d53 100644 --- a/include/boost/capy/ex/run_async.hpp +++ b/include/boost/capy/ex/run_async.hpp @@ -94,7 +94,11 @@ struct run_async_trampoline io_env env_; invoke_fn invoke_ = nullptr; void* task_promise_ = nullptr; + // task_h_: raw handle for frame_guard cleanup in make_trampoline. + // task_cont_: continuation wrapping the same handle for executor dispatch. + // Both must reference the same coroutine and be kept in sync. std::coroutine_handle<> task_h_; + continuation task_cont_; promise_type(Ex& ex, Handlers& h, Alloc& a) noexcept : wg_(std::move(ex)) @@ -201,7 +205,11 @@ struct run_async_trampoline io_env env_; invoke_fn invoke_ = nullptr; void* task_promise_ = nullptr; + // task_h_: raw handle for frame_guard cleanup in make_trampoline. + // task_cont_: continuation wrapping the same handle for executor dispatch. + // Both must reference the same coroutine and be kept in sync. std::coroutine_handle<> task_h_; + continuation task_cont_; promise_type( Ex& ex, Handlers& h, std::pmr::memory_resource* mr) noexcept @@ -404,7 +412,8 @@ class [[nodiscard]] run_async_wrapper task_promise.set_environment(&p.env_); // Start task through executor - p.wg_.executor().dispatch(task_h).resume(); + p.task_cont_.h = task_h; + p.wg_.executor().dispatch(p.task_cont_).resume(); } }; diff --git a/include/boost/capy/ex/strand.hpp b/include/boost/capy/ex/strand.hpp index 9c75b239..1f7753a4 100644 --- a/include/boost/capy/ex/strand.hpp +++ b/include/boost/capy/ex/strand.hpp @@ -11,6 +11,7 @@ #define BOOST_CAPY_EX_STRAND_HPP #include +#include #include #include @@ -45,8 +46,8 @@ namespace capy { This class satisfies the `Executor` concept, providing: - `context()` - Returns the underlying execution context - `on_work_started()` / `on_work_finished()` - Work tracking - - `dispatch(h)` - May run immediately if strand is idle - - `post(h)` - Always queues for later execution + - `dispatch(continuation&)` - May run immediately if strand is idle + - `post(continuation&)` - Always queues for later execution @par Thread Safety Distinct objects: Safe. @@ -57,10 +58,11 @@ namespace capy { thread_pool pool(4); auto strand = make_strand(pool.get_executor()); - // These coroutines will never run concurrently - strand.post(coro1); - strand.post(coro2); - strand.post(coro3); + // These continuations will never run concurrently + continuation c1{h1}, c2{h2}, c3{h3}; + strand.post(c1); + strand.post(c2); + strand.post(c3); @endcode @tparam E The type of the underlying executor. Must @@ -199,43 +201,47 @@ class strand return impl_ == other.impl_; } - /** Post a coroutine to the strand. + /** Post a continuation to the strand. - The coroutine is always queued for execution, never resumed + The continuation is always queued for execution, never resumed immediately. When the strand becomes available, queued - coroutines execute in FIFO order on the underlying executor. + work executes in FIFO order on the underlying executor. @par Ordering Guarantees strict FIFO ordering relative to other post() calls. Use this instead of dispatch() when ordering matters. - @param h The coroutine handle to post. + @param c The continuation to post. The caller retains + ownership; the continuation must remain valid until + it is dequeued and resumed. */ void - post(std::coroutine_handle<> h) const + post(continuation& c) const { - detail::strand_service::post(*impl_, executor_ref(ex_), h); + detail::strand_service::post(*impl_, executor_ref(ex_), c.h); } - /** Dispatch a coroutine through the strand. + /** Dispatch a continuation through the strand. Returns a handle for symmetric transfer. If the calling - thread is already executing within this strand, returns `h`. - Otherwise, the coroutine is queued and + thread is already executing within this strand, returns `c.h`. + Otherwise, the continuation is queued and `std::noop_coroutine()` is returned. @par Ordering Callers requiring strict FIFO ordering should use post() - instead, which always queues the coroutine. + instead, which always queues the continuation. - @param h The coroutine handle to dispatch. + @param c The continuation to dispatch. The caller retains + ownership; the continuation must remain valid until + it is dequeued and resumed. @return A handle for symmetric transfer or `std::noop_coroutine()`. */ std::coroutine_handle<> - dispatch(std::coroutine_handle<> h) const + dispatch(continuation& c) const { - return detail::strand_service::dispatch(*impl_, executor_ref(ex_), h); + return detail::strand_service::dispatch(*impl_, executor_ref(ex_), c.h); } }; diff --git a/include/boost/capy/ex/thread_pool.hpp b/include/boost/capy/ex/thread_pool.hpp index 6a644ab0..45eb6087 100644 --- a/include/boost/capy/ex/thread_pool.hpp +++ b/include/boost/capy/ex/thread_pool.hpp @@ -12,6 +12,7 @@ #define BOOST_CAPY_EX_THREAD_POOL_HPP #include +#include #include #include #include @@ -193,34 +194,36 @@ class thread_pool::executor_type void on_work_finished() const noexcept; - /** Dispatch a coroutine for execution. + /** Dispatch a continuation for execution. - Posts the coroutine to the thread pool for execution on a + Posts the continuation to the thread pool for execution on a worker thread and returns `std::noop_coroutine()`. Thread pools never execute inline because no single thread "owns" the pool. - @param h The coroutine handle to execute. + @param c The continuation to execute. Must remain at a + stable address until dequeued and resumed. @return `std::noop_coroutine()` always. */ std::coroutine_handle<> - dispatch(std::coroutine_handle<> h) const + dispatch(continuation& c) const { - post(h); + post(c); return std::noop_coroutine(); } - /** Post a coroutine to the thread pool. + /** Post a continuation to the thread pool. - The coroutine will be resumed on one of the pool's - worker threads. + The continuation will be resumed on one of the pool's + worker threads. The continuation must remain at a stable + address until it is dequeued and resumed. - @param h The coroutine handle to execute. + @param c The continuation to execute. */ BOOST_CAPY_DECL void - post(std::coroutine_handle<> h) const; + post(continuation& c) const; /// Return true if two executors refer to the same thread pool. bool diff --git a/include/boost/capy/test/run_blocking.hpp b/include/boost/capy/test/run_blocking.hpp index 0e63fd6d..14667c87 100644 --- a/include/boost/capy/test/run_blocking.hpp +++ b/include/boost/capy/test/run_blocking.hpp @@ -77,12 +77,12 @@ struct BOOST_CAPY_DECL blocking_executor Returns the handle for symmetric transfer. The caller resumes the coroutine via the returned handle. - @param h The coroutine handle to execute. + @param c The continuation to execute. - @return `h` for symmetric transfer. + @return `c.h` for symmetric transfer. */ std::coroutine_handle<> - dispatch(std::coroutine_handle<> h) const; + dispatch(continuation& c) const; /** Post work for deferred execution. @@ -90,10 +90,10 @@ struct BOOST_CAPY_DECL blocking_executor queue. The handle is resumed when the blocking event loop processes it. - @param h The coroutine handle to enqueue. + @param c The continuation to enqueue. */ void - post(std::coroutine_handle<> h) const; + post(continuation& c) const; private: blocking_context* ctx_; diff --git a/include/boost/capy/test/stream.hpp b/include/boost/capy/test/stream.hpp index 441ebdbc..32078e67 100644 --- a/include/boost/capy/test/stream.hpp +++ b/include/boost/capy/test/stream.hpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -86,7 +87,7 @@ class stream { std::string buf; std::size_t max_read_size = std::size_t(-1); - std::coroutine_handle<> pending_h{}; + continuation pending_cont_; executor_ref pending_ex; bool eof = false; }; @@ -109,13 +110,11 @@ class stream closed = true; for(auto& side : sides) { - if(side.pending_h) + if(side.pending_cont_.h) { - auto h = side.pending_h; - side.pending_h = {}; - auto ex = side.pending_ex; + side.pending_ex.post(side.pending_cont_); + side.pending_cont_.h = {}; side.pending_ex = {}; - ex.post(h); } } } @@ -167,13 +166,11 @@ class stream int peer = 1 - index_; auto& side = state_->sides[peer]; side.eof = true; - if(side.pending_h) + if(side.pending_cont_.h) { - auto h = side.pending_h; - side.pending_h = {}; - auto ex = side.pending_ex; + side.pending_ex.post(side.pending_cont_); + side.pending_cont_.h = {}; side.pending_ex = {}; - ex.post(h); } } @@ -234,7 +231,7 @@ class stream { auto& side = self_->state_->sides[ self_->index_]; - side.pending_h = h; + side.pending_cont_.h = h; side.pending_ex = env->executor; return std::noop_coroutine(); } @@ -336,13 +333,11 @@ class stream side.buf.data() + old_size, n), buffers_, n); - if(side.pending_h) + if(side.pending_cont_.h) { - auto h = side.pending_h; - side.pending_h = {}; - auto ex = side.pending_ex; + side.pending_ex.post(side.pending_cont_); + side.pending_cont_.h = {}; side.pending_ex = {}; - ex.post(h); } return {{}, n}; @@ -368,13 +363,11 @@ class stream int peer = 1 - index_; auto& side = state_->sides[peer]; side.buf.append(sv); - if(side.pending_h) + if(side.pending_cont_.h) { - auto h = side.pending_h; - side.pending_h = {}; - auto ex = side.pending_ex; + side.pending_ex.post(side.pending_cont_); + side.pending_cont_.h = {}; side.pending_ex = {}; - ex.post(h); } } diff --git a/include/boost/capy/timeout.hpp b/include/boost/capy/timeout.hpp index 6cdb0c73..d6132bf0 100644 --- a/include/boost/capy/timeout.hpp +++ b/include/boost/capy/timeout.hpp @@ -35,7 +35,7 @@ struct timeout_state std::atomic winner_{-1}; // -1=none, 0=inner, 1=delay std::optional inner_result_; std::exception_ptr inner_exception_; - std::array, 2> runner_handles_{}; + std::array runner_handles_{}; timeout_state() : core_(2) @@ -103,7 +103,7 @@ class timeout_launcher std::coroutine_handle<> continuation, io_env const* caller_env) { - state_->core_.continuation_ = continuation; + state_->core_.continuation_.h = continuation; state_->core_.caller_env_ = caller_env; if(caller_env->stop_token.stop_possible()) @@ -126,7 +126,7 @@ class timeout_launcher h0.promise().env_ = io_env{ caller_env->executor, token, caller_env->frame_allocator}; - state_->runner_handles_[0] = + state_->runner_handles_[0].h = std::coroutine_handle<>{h0}; auto r1 = make_timeout_delay_runner( @@ -136,7 +136,7 @@ class timeout_launcher h1.promise().env_ = io_env{ caller_env->executor, token, caller_env->frame_allocator}; - state_->runner_handles_[1] = + state_->runner_handles_[1].h = std::coroutine_handle<>{h1}; caller_env->executor.post( diff --git a/include/boost/capy/when_all.hpp b/include/boost/capy/when_all.hpp index b073465b..6d171c2d 100644 --- a/include/boost/capy/when_all.hpp +++ b/include/boost/capy/when_all.hpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -22,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -82,7 +84,7 @@ struct when_all_core using stop_callback_t = std::stop_callback; std::optional parent_stop_callback_; - std::coroutine_handle<> continuation_; + continuation continuation_; io_env const* caller_env_ = nullptr; explicit when_all_core(std::size_t count) noexcept @@ -111,7 +113,7 @@ struct when_all_state when_all_core core_; std::tuple...> results_; - std::array, task_count> runner_handles_{}; + std::array runner_handles_{}; std::atomic has_error_{false}; std::error_code first_error_; @@ -143,7 +145,7 @@ struct when_all_homogeneous_state { when_all_core core_; std::vector> results_; - std::vector> runner_handles_; + std::unique_ptr runner_handles_; std::atomic has_error_{false}; std::error_code first_error_; @@ -151,7 +153,7 @@ struct when_all_homogeneous_state explicit when_all_homogeneous_state(std::size_t count) : core_(count) , results_(count) - , runner_handles_(count) + , runner_handles_(std::make_unique(count)) { } @@ -175,14 +177,14 @@ template<> struct when_all_homogeneous_state> { when_all_core core_; - std::vector> runner_handles_; + std::unique_ptr runner_handles_; std::atomic has_error_{false}; std::error_code first_error_; explicit when_all_homogeneous_state(std::size_t count) : core_(count) - , runner_handles_(count) + , runner_handles_(std::make_unique(count)) { } @@ -235,7 +237,7 @@ struct when_all_runner auto& core = p_->state_->core_; auto* counter = &core.remaining_count_; auto* caller_env = core.caller_env_; - auto cont = core.continuation_; + auto& cont = core.continuation_; h.destroy(); @@ -386,7 +388,7 @@ class when_all_io_launcher std::coroutine_handle<> await_suspend( std::coroutine_handle<> continuation, io_env const* caller_env) { - state_->core_.continuation_ = continuation; + state_->core_.continuation_.h = continuation; state_->core_.caller_env_ = caller_env; if(caller_env->stop_token.stop_possible()) @@ -421,9 +423,8 @@ class when_all_io_launcher h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator}; - std::coroutine_handle<> ch{h}; - state_->runner_handles_[I] = ch; - state_->core_.caller_env_->executor.post(ch); + state_->runner_handles_[I].h = std::coroutine_handle<>{h}; + state_->core_.caller_env_->executor.post(state_->runner_handles_[I]); } }; @@ -477,7 +478,7 @@ class when_all_homogeneous_launcher std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env) { - state_->core_.continuation_ = continuation; + state_->core_.continuation_.h = continuation; state_->core_.caller_env_ = caller_env; if(caller_env->stop_token.stop_possible()) @@ -504,14 +505,14 @@ class when_all_homogeneous_launcher h.promise().index_ = index; h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator}; - state_->runner_handles_[index] = std::coroutine_handle<>{h}; + state_->runner_handles_[index].h = std::coroutine_handle<>{h}; ++index; } // Phase 2: Post all runners. Any may complete synchronously. // After last post, state_ and this may be destroyed. - std::coroutine_handle<>* handles = state_->runner_handles_.data(); - std::size_t count = state_->runner_handles_.size(); + auto* handles = state_->runner_handles_.get(); + std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed); for(std::size_t i = 0; i < count; ++i) caller_env->executor.post(handles[i]); diff --git a/include/boost/capy/when_any.hpp b/include/boost/capy/when_any.hpp index 7693118f..bf17c904 100644 --- a/include/boost/capy/when_any.hpp +++ b/include/boost/capy/when_any.hpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -24,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -138,7 +140,7 @@ struct when_any_core using stop_callback_t = std::stop_callback; std::optional parent_stop_callback_; - std::coroutine_handle<> continuation_; + continuation continuation_; io_env const* caller_env_ = nullptr; // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members) @@ -185,7 +187,7 @@ struct when_any_io_state when_any_core core_; std::optional result_; - std::array, task_count> runner_handles_{}; + std::array runner_handles_{}; // Last failure (error or exception) for the all-fail case. // Last writer wins — no priority between errors and exceptions. @@ -243,7 +245,7 @@ struct when_any_io_runner auto& core = p_->state_->core_; auto* counter = &core.remaining_count_; auto* caller_env = core.caller_env_; - auto cont = core.continuation_; + auto& cont = core.continuation_; h.destroy(); @@ -381,7 +383,7 @@ class when_any_io_launcher std::coroutine_handle<> await_suspend( std::coroutine_handle<> continuation, io_env const* caller_env) { - state_->core_.continuation_ = continuation; + state_->core_.continuation_.h = continuation; state_->core_.caller_env_ = caller_env; if(caller_env->stop_token.stop_possible()) @@ -417,9 +419,8 @@ class when_any_io_launcher h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator}; - std::coroutine_handle<> ch{h}; - state_->runner_handles_[I] = ch; - caller_ex.post(ch); + state_->runner_handles_[I].h = std::coroutine_handle<>{h}; + caller_ex.post(state_->runner_handles_[I]); } }; @@ -432,7 +433,7 @@ struct when_any_io_homogeneous_state { when_any_core core_; std::optional result_; - std::vector> runner_handles_; + std::unique_ptr runner_handles_; std::mutex failure_mu_; std::error_code last_error_; @@ -440,7 +441,7 @@ struct when_any_io_homogeneous_state explicit when_any_io_homogeneous_state(std::size_t count) : core_(count) - , runner_handles_(count) + , runner_handles_(std::make_unique(count)) { } @@ -464,7 +465,7 @@ template<> struct when_any_io_homogeneous_state> { when_any_core core_; - std::vector> runner_handles_; + std::unique_ptr runner_handles_; std::mutex failure_mu_; std::error_code last_error_; @@ -472,7 +473,7 @@ struct when_any_io_homogeneous_state> explicit when_any_io_homogeneous_state(std::size_t count) : core_(count) - , runner_handles_(count) + , runner_handles_(std::make_unique(count)) { } @@ -556,7 +557,7 @@ class when_any_io_homogeneous_launcher std::coroutine_handle<> await_suspend( std::coroutine_handle<> continuation, io_env const* caller_env) { - state_->core_.continuation_ = continuation; + state_->core_.continuation_.h = continuation; state_->core_.caller_env_ = caller_env; if(caller_env->stop_token.stop_possible()) @@ -584,13 +585,13 @@ class when_any_io_homogeneous_launcher h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator}; - state_->runner_handles_[index] = std::coroutine_handle<>{h}; + state_->runner_handles_[index].h = std::coroutine_handle<>{h}; ++index; } // Phase 2: Post all runners. Any may complete synchronously. - std::coroutine_handle<>* handles = state_->runner_handles_.data(); - std::size_t count = state_->runner_handles_.size(); + auto* handles = state_->runner_handles_.get(); + std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed); for(std::size_t i = 0; i < count; ++i) caller_env->executor.post(handles[i]); diff --git a/src/ex/detail/strand_service.cpp b/src/ex/detail/strand_service.cpp index 9b93a3f9..82932925 100644 --- a/src/ex/detail/strand_service.cpp +++ b/src/ex/detail/strand_service.cpp @@ -9,6 +9,7 @@ #include "src/ex/detail/strand_queue.hpp" #include +#include #include #include #include @@ -50,6 +51,11 @@ struct strand_invoker { struct promise_type { + // Used to post the invoker through the inner executor. + // Lives in the coroutine frame (heap-allocated), so has + // a stable address for the duration of the queue residency. + continuation self_; + void* operator new(std::size_t n, strand_impl& impl) { constexpr auto A = alignof(strand_impl*); @@ -205,6 +211,15 @@ class strand_service_impl : public strand_service } } + static void + post_invoker(strand_impl& impl, executor_ref ex) + { + auto invoker = make_invoker(impl); + auto& self = invoker.h_.promise().self_; + self.h = invoker.h_; + ex.post(self); + } + friend class strand_service; }; @@ -234,7 +249,7 @@ dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) return h; if(strand_service_impl::enqueue(impl, h)) - ex.post(strand_service_impl::make_invoker(impl).h_); + strand_service_impl::post_invoker(impl, ex); return std::noop_coroutine(); } @@ -243,7 +258,7 @@ strand_service:: post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) { if(strand_service_impl::enqueue(impl, h)) - ex.post(strand_service_impl::make_invoker(impl).h_); + strand_service_impl::post_invoker(impl, ex); } strand_service& diff --git a/src/ex/thread_pool.cpp b/src/ex/thread_pool.cpp index 69f5764d..e7b1ffd4 100644 --- a/src/ex/thread_pool.cpp +++ b/src/ex/thread_pool.cpp @@ -9,7 +9,7 @@ // #include -#include +#include #include #include #include @@ -22,9 +22,11 @@ /* Thread pool implementation using a shared work queue. - Work items are coroutine handles wrapped in intrusive list nodes, stored - in a single queue protected by a mutex. Worker threads wait on a - condition_variable until work is available or stop is requested. + Work items are continuations linked via their intrusive next pointer, + stored in a single queue protected by a mutex. No per-post heap + allocation: the continuation is owned by the caller and linked + directly. Worker threads wait on a condition_variable until work + is available or stop is requested. Threads are started lazily on first post() via std::call_once to avoid spawning threads for pools that are constructed but never used. Each @@ -48,34 +50,39 @@ namespace capy { class thread_pool::impl { - struct work : detail::intrusive_queue::node - { - std::coroutine_handle<> h_; + // Intrusive queue of continuations via continuation::next. + // No per-post allocation: the continuation is owned by the caller. + continuation* head_ = nullptr; + continuation* tail_ = nullptr; - explicit work(std::coroutine_handle<> h) noexcept - : h_(h) - { - } + void push(continuation* c) noexcept + { + c->next = nullptr; + if(tail_) + tail_->next = c; + else + head_ = c; + tail_ = c; + } - void run() - { - auto h = h_; - delete this; - h.resume(); - } + continuation* pop() noexcept + { + if(!head_) + return nullptr; + continuation* c = head_; + head_ = head_->next; + if(!head_) + tail_ = nullptr; + return c; + } - void destroy() - { - auto h = h_; - delete this; - if(h && h != std::noop_coroutine()) - h.destroy(); - } - }; + bool empty() const noexcept + { + return head_ == nullptr; + } std::mutex mutex_; std::condition_variable cv_; - detail::intrusive_queue q_; std::vector threads_; std::atomic outstanding_work_{0}; bool stop_{false}; @@ -85,10 +92,22 @@ class thread_pool::impl std::once_flag start_flag_; public: - ~impl() + ~impl() = default; + + // Destroy abandoned coroutine frames. Must be called + // before execution_context::shutdown()/destroy() so + // that suspended-frame destructors (e.g. delay_awaitable + // calling timer_service::cancel()) run while services + // are still valid. + void + drain_abandoned() noexcept { - while(auto* w = q_.pop()) - w->destroy(); + while(auto* c = pop()) + { + auto h = c->h; + if(h && h != std::noop_coroutine()) + h.destroy(); + } } impl(std::size_t num_threads, std::string_view thread_name_prefix) @@ -104,13 +123,12 @@ class thread_pool::impl } void - post(std::coroutine_handle<> h) + post(continuation& c) { ensure_started(); - auto* w = new work(h); { std::lock_guard lock(mutex_); - q_.push(w); + push(&c); } cv_.notify_one(); } @@ -193,19 +211,19 @@ class thread_pool::impl for(;;) { - work* w = nullptr; + continuation* c = nullptr; { std::unique_lock lock(mutex_); cv_.wait(lock, [this]{ - return !q_.empty() || + return !empty() || stop_; }); if(stop_) return; - w = q_.pop(); + c = pop(); } - if(w) - w->run(); + if(c) + c->h.resume(); } } }; @@ -217,6 +235,7 @@ thread_pool:: { impl_->stop(); impl_->join(); + impl_->drain_abandoned(); shutdown(); destroy(); delete impl_; @@ -269,9 +288,9 @@ on_work_finished() const noexcept void thread_pool::executor_type:: -post(std::coroutine_handle<> h) const +post(continuation& c) const { - pool_->impl_->post(h); + pool_->impl_->post(c); } } // capy diff --git a/src/test/run_blocking.cpp b/src/test/run_blocking.cpp index b9cb9eb8..5603725f 100644 --- a/src/test/run_blocking.cpp +++ b/src/test/run_blocking.cpp @@ -120,16 +120,16 @@ blocking_executor::on_work_finished() const noexcept std::coroutine_handle<> blocking_executor::dispatch( - std::coroutine_handle<> h) const + continuation& c) const { - return h; + return c.h; } void blocking_executor::post( - std::coroutine_handle<> h) const + continuation& c) const { - ctx_->enqueue(h); + ctx_->enqueue(c.h); } } // namespace test diff --git a/test/unit/ex/any_executor.cpp b/test/unit/ex/any_executor.cpp index 6dcf96dc..4c059fc3 100644 --- a/test/unit/ex/any_executor.cpp +++ b/test/unit/ex/any_executor.cpp @@ -250,7 +250,8 @@ struct any_executor_test std::atomic counter{0}; auto coro = make_counter_coro(counter); - ex.dispatch(coro.handle()); + continuation c{coro.handle()}; + ex.dispatch(c); coro.release(); BOOST_TEST(wait_for([&]{ return counter.load() >= 1; })); @@ -266,7 +267,8 @@ struct any_executor_test std::atomic counter{0}; auto coro = make_counter_coro(counter); - ex.post(coro.handle()); + continuation c{coro.handle()}; + ex.post(c); coro.release(); BOOST_TEST(wait_for([&]{ return counter.load() >= 1; })); @@ -276,18 +278,34 @@ struct any_executor_test void testMultiplePost() { + std::atomic counter{0}; + constexpr int N = 10; + + // continuations must outlive pool to avoid + // dangling pointers in the executor queue. + counter_coro coros[N] = { + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + }; + continuation conts[N] = {}; + thread_pool pool(2); auto executor = pool.get_executor(); any_executor ex(executor); - std::atomic counter{0}; - constexpr int N = 10; - for(int i = 0; i < N; ++i) { - auto coro = make_counter_coro(counter); - ex.post(coro.handle()); - coro.release(); + conts[i] = continuation{coros[i].handle()}; + ex.post(conts[i]); + coros[i].release(); } BOOST_TEST(wait_for([&]{ return counter.load() >= N; })); @@ -297,11 +315,20 @@ struct any_executor_test void testSharedOwnership() { + std::atomic counter{0}; + + // continuations must outlive pool to avoid + // dangling pointers in the executor queue. + auto coro1 = make_counter_coro(counter); + auto coro2 = make_counter_coro(counter); + auto coro3 = make_counter_coro(counter); + continuation c1{coro1.handle()}; + continuation c2{coro2.handle()}; + continuation c3{coro3.handle()}; + thread_pool pool(1); auto executor = pool.get_executor(); - std::atomic counter{0}; - // Create any_executor and make copies any_executor ex1(executor); any_executor ex2 = ex1; @@ -312,21 +339,12 @@ struct any_executor_test BOOST_TEST(ex2 == ex3); // Post through different copies - { - auto coro = make_counter_coro(counter); - ex1.post(coro.handle()); - coro.release(); - } - { - auto coro = make_counter_coro(counter); - ex2.post(coro.handle()); - coro.release(); - } - { - auto coro = make_counter_coro(counter); - ex3.post(coro.handle()); - coro.release(); - } + ex1.post(c1); + coro1.release(); + ex2.post(c2); + coro2.release(); + ex3.post(c3); + coro3.release(); BOOST_TEST(wait_for([&]{ return counter.load() >= 3; })); BOOST_TEST_EQ(counter.load(), 3); diff --git a/test/unit/ex/executor_ref.cpp b/test/unit/ex/executor_ref.cpp index ae93e4d2..f01360ac 100644 --- a/test/unit/ex/executor_ref.cpp +++ b/test/unit/ex/executor_ref.cpp @@ -192,7 +192,8 @@ struct executor_ref_test std::atomic counter{0}; auto coro = make_counter_coro(counter); - ex.dispatch(coro.handle()); + continuation c{coro.handle()}; + ex.dispatch(c); coro.release(); BOOST_TEST(wait_for([&]{ return counter.load() >= 1; })); @@ -208,7 +209,8 @@ struct executor_ref_test std::atomic counter{0}; auto coro = make_counter_coro(counter); - ex.post(coro.handle()); + continuation c{coro.handle()}; + ex.post(c); coro.release(); BOOST_TEST(wait_for([&]{ return counter.load() >= 1; })); @@ -218,18 +220,34 @@ struct executor_ref_test void testMultiplePost() { + std::atomic counter{0}; + constexpr int N = 10; + + // continuations must outlive pool to avoid + // dangling pointers in the executor queue. + counter_coro coros[N] = { + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + make_counter_coro(counter), + }; + continuation conts[N] = {}; + thread_pool pool(2); auto executor = pool.get_executor(); executor_ref ex(executor); - std::atomic counter{0}; - constexpr int N = 10; - for(int i = 0; i < N; ++i) { - auto coro = make_counter_coro(counter); - ex.post(coro.handle()); - coro.release(); + conts[i] = continuation{coros[i].handle()}; + ex.post(conts[i]); + coros[i].release(); } BOOST_TEST(wait_for([&]{ return counter.load() >= N; })); diff --git a/test/unit/ex/frame_cb.cpp b/test/unit/ex/frame_cb.cpp index caa0d8ec..60c8aa16 100644 --- a/test/unit/ex/frame_cb.cpp +++ b/test/unit/ex/frame_cb.cpp @@ -8,6 +8,7 @@ // #include +#include #include #include #include @@ -20,6 +21,7 @@ #include #include +#include namespace boost { namespace capy { @@ -129,6 +131,7 @@ struct frame_cb_test struct async_awaitable { int value; + continuation cont_; bool await_ready() const noexcept { @@ -140,7 +143,8 @@ struct frame_cb_test std::coroutine_handle<> h, io_env const* env) noexcept { - env->executor.post(h); + cont_.h = h; + env->executor.post(cont_); return std::noop_coroutine(); } @@ -155,18 +159,19 @@ struct frame_cb_test static task await_async(int v) { - auto [ec, result] = co_await async_awaitable{v}; + auto [ec, result] = co_await async_awaitable{v, {}}; co_return result; } void testWithAsyncAwaitable() { - thread_pool pool(1); + auto pool = std::make_unique(1); + auto ex = pool->get_executor(); std::latch done(1); int result = 0; - run_async(pool.get_executor(), + run_async(ex, [&](int v) { result = v; done.count_down(); diff --git a/test/unit/ex/run_async.cpp b/test/unit/ex/run_async.cpp index cab12622..8816b036 100644 --- a/test/unit/ex/run_async.cpp +++ b/test/unit/ex/run_async.cpp @@ -78,16 +78,16 @@ struct sync_executor void on_work_started() const noexcept {} void on_work_finished() const noexcept {} - std::coroutine_handle<> dispatch(std::coroutine_handle<> h) const + std::coroutine_handle<> dispatch(continuation& c) const { if(dispatch_count_) ++(*dispatch_count_); - return h; + return c.h; } - void post(std::coroutine_handle<> h) const + void post(continuation& c) const { - h.resume(); + c.h.resume(); } }; @@ -120,15 +120,15 @@ struct queue_executor void on_work_started() const noexcept {} void on_work_finished() const noexcept {} - std::coroutine_handle<> dispatch(std::coroutine_handle<> h) const + std::coroutine_handle<> dispatch(continuation& c) const { - queue_->push(h); + queue_->push(c.h); return std::noop_coroutine(); } - void post(std::coroutine_handle<> h) const + void post(continuation& c) const { - queue_->push(h); + queue_->push(c.h); } }; diff --git a/test/unit/ex/strand.cpp b/test/unit/ex/strand.cpp index edd362ca..38ecaa61 100644 --- a/test/unit/ex/strand.cpp +++ b/test/unit/ex/strand.cpp @@ -352,7 +352,8 @@ struct strand_test std::atomic counter{0}; auto coro = make_counter_coro(counter); - s.post(coro.handle()); + continuation c{coro.handle()}; + s.post(c); coro.release(); BOOST_TEST(wait_for([&]{ return counter.load() >= 1; })); @@ -368,7 +369,8 @@ struct strand_test std::atomic counter{0}; auto coro = make_counter_coro(counter); - s.dispatch(coro.handle()); + continuation c{coro.handle()}; + s.dispatch(c); coro.release(); BOOST_TEST(wait_for([&]{ return counter.load() >= 1; })); @@ -390,7 +392,8 @@ struct strand_test for(int i = 0; i < N; ++i) { coros.push_back(make_counter_coro(counter)); - s.post(coros.back().handle()); + continuation c{coros.back().handle()}; + s.post(c); coros.back().release(); } @@ -417,7 +420,8 @@ struct strand_test for(int j = 0; j < per_thread; ++j) { auto coro = make_counter_coro(counter); - s.post(coro.handle()); + continuation c{coro.handle()}; + s.post(c); coro.release(); } }); @@ -448,7 +452,8 @@ struct strand_test for(int i = 0; i < N; ++i) { coros.push_back(make_order_coro(log, log_mutex, i)); - s.post(coros.back().handle()); + continuation c{coros.back().handle()}; + s.post(c); coros.back().release(); } @@ -549,7 +554,8 @@ struct strand_test for(int i = 0; i < N; ++i) { coros.push_back(make_tracking_coro()); - s.post(coros.back().handle()); + continuation c{coros.back().handle()}; + s.post(c); coros.back().release(); } @@ -595,7 +601,8 @@ struct strand_test for(int i = 0; i < N; ++i) { coros.push_back(make_counter_coro(counter)); - s.post(coros.back().handle()); + continuation c{coros.back().handle()}; + s.post(c); coros.back().release(); } diff --git a/test/unit/ex/thread_pool.cpp b/test/unit/ex/thread_pool.cpp index fc6857c6..fb012dec 100644 --- a/test/unit/ex/thread_pool.cpp +++ b/test/unit/ex/thread_pool.cpp @@ -18,6 +18,7 @@ #include "test_helpers.hpp" +#include #include #include #include @@ -160,11 +161,13 @@ struct thread_pool_test void testPostWork() { + // continuation must outlive pool (LIFO destruction order) + continuation c{std::noop_coroutine()}; thread_pool pool(1); auto ex = pool.get_executor(); // Post a noop coroutine and verify no exceptions - ex.post(std::noop_coroutine()); + ex.post(c); // Basic test: pool constructs and destructs without issue (void)ex; @@ -186,11 +189,12 @@ struct thread_pool_test void testDispatch() { + continuation c{std::noop_coroutine()}; thread_pool pool(1); auto ex = pool.get_executor(); // dispatch() always posts for thread_pool (returns void) - ex.dispatch(std::noop_coroutine()); + ex.dispatch(c); } void @@ -233,10 +237,18 @@ struct thread_pool_test void testConcurrentPost() { + // Pre-allocate continuations: must outlive the pool + // (LIFO destruction order). + constexpr int num_threads = 8; + constexpr int posts_per_thread = 10; + std::vector> all_conts(num_threads); + for(auto& arr : all_conts) + for(auto& c : arr) + c.h = std::noop_coroutine(); + thread_pool pool(4); auto ex = pool.get_executor(); - constexpr int num_threads = 8; std::atomic post_count{0}; std::vector threads; @@ -244,11 +256,11 @@ struct thread_pool_test for(int i = 0; i < num_threads; ++i) { - threads.emplace_back([&ex, &post_count]{ + threads.emplace_back([&ex, &post_count, conts = all_conts[i].data()]{ // Multiple threads posting concurrently - for(int j = 0; j < 10; ++j) + for(int j = 0; j < posts_per_thread; ++j) { - ex.post(std::noop_coroutine()); + ex.post(conts[j]); ++post_count; } }); @@ -290,9 +302,11 @@ struct thread_pool_test #if defined(BOOST_CAPY_TEST_CAN_GET_THREAD_NAME) // Verify default thread name from within pool thread { - thread_pool pool(1); name_check_result result; - pool.get_executor().post(check_thread_name(result, "capy-pool-")); + auto nc = check_thread_name(result, "capy-pool-"); + continuation c{nc.h}; + thread_pool pool(1); + pool.get_executor().post(c); BOOST_TEST(wait_for([&]{ return result.done.load(); })); BOOST_TEST(result.matches.load()); @@ -300,9 +314,11 @@ struct thread_pool_test // Verify custom thread name from within pool thread { - thread_pool pool(1, "mypool-"); name_check_result result; - pool.get_executor().post(check_thread_name(result, "mypool-")); + auto nc = check_thread_name(result, "mypool-"); + continuation c{nc.h}; + thread_pool pool(1, "mypool-"); + pool.get_executor().post(c); BOOST_TEST(wait_for([&]{ return result.done.load(); })); BOOST_TEST(result.matches.load()); @@ -310,9 +326,11 @@ struct thread_pool_test // Verify thread naming works with index suffix { - thread_pool pool(1, "idx-"); name_check_result result; - pool.get_executor().post(check_thread_name(result, "idx-0")); + auto nc = check_thread_name(result, "idx-0"); + continuation c{nc.h}; + thread_pool pool(1, "idx-"); + pool.get_executor().post(c); BOOST_TEST(wait_for([&]{ return result.done.load(); })); BOOST_TEST(result.matches.load()); @@ -411,6 +429,7 @@ struct thread_pool_test { std::atomic busy{false}; std::atomic release{false}; + std::array conts; thread_pool pool(1); auto ex = pool.get_executor(); @@ -428,7 +447,10 @@ struct thread_pool_test // Queue items that can't be processed yet for(int i = 0; i < 50; ++i) - ex.post(std::noop_coroutine()); + { + conts[i].h = std::noop_coroutine(); + ex.post(conts[i]); + } // Release worker, then pool destructs immediately. // stop() races with the worker — pending items diff --git a/test/unit/ex/work_guard.cpp b/test/unit/ex/work_guard.cpp index a5f04901..fe0ba778 100644 --- a/test/unit/ex/work_guard.cpp +++ b/test/unit/ex/work_guard.cpp @@ -64,13 +64,13 @@ struct guard_test_executor } std::coroutine_handle<> - dispatch(std::coroutine_handle<> h) const + dispatch(continuation& c) const { - return h; + return c.h; } void - post(std::coroutine_handle<>) const + post(continuation&) const { } }; diff --git a/test/unit/task.cpp b/test/unit/task.cpp index 125823a5..65ca6a01 100644 --- a/test/unit/task.cpp +++ b/test/unit/task.cpp @@ -75,17 +75,17 @@ struct tracking_executor void on_work_started() const noexcept {} void on_work_finished() const noexcept {} - std::coroutine_handle<> dispatch(std::coroutine_handle<> h) const + std::coroutine_handle<> dispatch(continuation& c) const { ++(*dispatch_count_); if (dispatch_log) dispatch_log->push_back(id); - return h; + return c.h; } - void post(std::coroutine_handle<> h) const + void post(continuation& c) const { - h.resume(); + c.h.resume(); } }; diff --git a/test/unit/test_helpers.hpp b/test/unit/test_helpers.hpp index 4e9c268f..ef070f9a 100644 --- a/test/unit/test_helpers.hpp +++ b/test/unit/test_helpers.hpp @@ -108,17 +108,17 @@ struct test_executor void on_work_finished() const noexcept {} std::coroutine_handle<> - dispatch(std::coroutine_handle<> h) const + dispatch(continuation& c) const { if(dispatch_count_) ++(*dispatch_count_); - return h; + return c.h; } void - post(std::coroutine_handle<> h) const + post(continuation& c) const { - h.resume(); + c.h.resume(); } }; @@ -394,15 +394,15 @@ struct queuing_executor void on_work_started() const noexcept {} void on_work_finished() const noexcept {} - std::coroutine_handle<> dispatch(std::coroutine_handle<> h) const + std::coroutine_handle<> dispatch(continuation& c) const { - queue_->push(h); + queue_->push(c.h); return std::noop_coroutine(); } - void post(std::coroutine_handle<> h) const + void post(continuation& c) const { - queue_->push(h); + queue_->push(c.h); } }; @@ -420,6 +420,8 @@ static_assert(Executor); */ struct yield_awaitable { + continuation cont_; + bool await_ready() const noexcept { return false; @@ -428,7 +430,8 @@ struct yield_awaitable std::coroutine_handle<> await_suspend(std::coroutine_handle<> h, io_env const* env) { // Post ourselves back to the queue - env->executor.post(h); + cont_.h = h; + env->executor.post(cont_); return std::noop_coroutine(); }