Skip to content

Symmetric transfer inlining#130

Merged
sgerbino merged 4 commits intocppalliance:developfrom
sgerbino:pr/symmetric-transfer
Feb 11, 2026
Merged

Symmetric transfer inlining#130
sgerbino merged 4 commits intocppalliance:developfrom
sgerbino:pr/symmetric-transfer

Conversation

@sgerbino
Copy link
Collaborator

@sgerbino sgerbino commented Feb 11, 2026

Speculative inline completion for epoll socket and acceptor I/O, eliminating scheduler round-trips when data is immediately available. Includes Asio benchmark improvements for fairer comparison and a conntrack drain fix for reliable benchmark results.

Speculative inline completion (cb39ca7)

When a socket read/write/connect/accept syscall succeeds immediately (no EAGAIN), the completion can now be dispatched inline via ex.dispatch(h) instead of posting through the scheduler queue. This avoids the mutex lock, queue push/pop, and condvar signal overhead for the common case.

Inline budget: Each completion handler gets a budget of 2 inline completions (max_inline_budget_ = 2). reset_inline_budget() is called at the start of each posted handler; try_consume_inline_budget() gates inline dispatch. This prevents unbounded recursion while still allowing short chains (e.g., write → read → write) to complete without scheduler trips.

Key changes:

  • scheduler.hpp/cpp: Add reset_inline_budget(), try_consume_inline_budget(), max_inline_budget_, and inline_budget field on scheduler_context
  • sockets.cpp: Restructure connect(), read_some(), write_some() to try inline dispatch on speculative success, fall back to svc_.post() when budget is exhausted
  • acceptors.cpp: Same pattern for accept() — inline fast path sets up the peer socket and returns ex.dispatch(h) directly
  • op.hpp: Move epoll_op::operator()() to sockets.cpp so it can call reset_inline_budget() (needs scheduler access); replace peer_impl with peer_addr field on epoll_accept_op to cache the address from accept4() and eliminate redundant getsockname()/getpeername() syscalls
  • sockets.hpp: Replace cached_initiator + do_read_io()/do_write_io() with a unified register_op() helper that handles the EAGAIN async path for all op types
  • acceptors.cpp: Simplify cancel() to delegate to cancel_single_op() (removes duplicated logic)

Asio benchmark improvements (1b17dba)

Use concrete executor types (io_context::executor_type) and deferred completion token in Asio benchmarks instead of any_io_executor and default tokens. This removes type-erasure overhead from the Asio side, making the comparison fairer.

Conntrack drain (3a25f0c)

Add perf::await_conntrack_drain() that polls /proc/sys/net/netfilter/nf_conntrack_count between TCP-heavy benchmark categories. When the conntrack table exceeds 75% capacity, it waits for entries to expire before starting the next benchmark. This prevents silent SYN drops (and ~1s retransmit stalls) that were corrupting burst accept results when running the full --library all suite.

Test plan

  • cmake --build build -j$(nproc) && ctest --test-dir build — all 42 tests pass
  • taskset -c 0-11 build/perf/bench/corosio_bench --library all — no stalls, burst accept at parity with Asio

Summary by CodeRabbit

  • New Features

    • Wait-for-conntrack drain on Linux before benchmark runs to avoid stale state.
  • Performance

    • Per-thread inline budget for I/O completion to enable more efficient inline processing and lower latency.
    • Expanded inline completion opportunities for connect/read/write paths when budget permits.
  • Bug Fixes / Reliability

    • More robust accept/connect/read/write readiness, cancellation, and completion handling for socket operations.

Introduce a per-thread inline budget for speculative I/O completions
in the epoll backend. When a syscall succeeds immediately (no EAGAIN),
the coroutine is resumed via symmetric transfer without posting through
the scheduler. Each scheduler-dispatched completion grants N=2
speculative inlines before forcing a round-trip through the reactor,
balancing throughput with fairness across multiplexed connections.

The inline budget lives in scheduler_context alongside other per-
scheduler per-thread state, accessed via reset_inline_budget() and
try_consume_inline_budget() on epoll_scheduler.

Simplify socket I/O methods:
- Factor out register_op() helper for the EAGAIN registration
  pattern shared by connect, read_some, and write_some
- Merge speculative result branches (n>0, n==0, error) into one
  unified path in read_some and write_some
- Merge connect success and error branches into a single
  non-EINPROGRESS path
- Remove do_read_io/do_write_io and cached_initiator; replaced by
  register_op

Clean up epoll acceptor:
- Flatten operator()() nesting with consolidated fd cleanup
- Simplify EAGAIN block using same register_op pattern
- Unify cancel() to delegate to cancel_single_op()
- Remove vestigial peer_impl from epoll_accept_op

Eliminate two redundant syscalls per accepted connection:
- Remove getpeername(): accept4() already returns the peer address
  in its output parameter; store it in epoll_accept_op::peer_addr
- Remove getsockname(): the local endpoint is already cached on
  the acceptor as local_endpoint_
- All three accept completion paths (inline, posted, reactor-
  deferred) now use cached addresses instead of kernel queries
When running the full benchmark suite, heavy TCP categories can
fill the Linux conntrack table causing silent SYN drops and ~1 s
retransmit stalls that corrupt results.
@coderabbitai
Copy link

coderabbitai bot commented Feb 11, 2026

📝 Walkthrough

Walkthrough

Adds a per-context inline-budget mechanism to the epoll scheduler, centralizes op registration and moves op completion logic into sockets.cpp, refactors accept/connect/read/write paths to use inline-budget and register_op, replaces cached_initiator usage, and adds a Linux-only perf::await_conntrack_drain() utility.

Changes

Cohort / File(s) Summary
Scheduler / Inline Budget
src/corosio/src/detail/epoll/scheduler.hpp, src/corosio/src/detail/epoll/scheduler.cpp
Add per-context inline_budget and public methods reset_inline_budget() and try_consume_inline_budget(), plus max_inline_budget_ = 2.
Op relocation / Accept op layout
src/corosio/src/detail/epoll/op.hpp, src/corosio/src/detail/epoll/sockets.cpp
Move epoll_op::operator()() implementation out of header into sockets.cpp; add concrete operator() overloads; replace peer_impl with a persistent peer_addr in epoll_accept_op and adjust reset/perform_io paths.
Sockets / Registration & API surface
src/corosio/src/detail/epoll/sockets.hpp, src/corosio/src/detail/epoll/sockets.cpp
Remove cached_initiator usage and do_read_io/do_write_io helpers; add register_op(epoll_op&, epoll_op*&, bool&) noexcept to centralize op registration and readiness bookkeeping; route read/write/connect flows through register_op and inline-budget checks.
Acceptors / Cancellation & Inline path
src/corosio/src/detail/epoll/acceptors.cpp
Introduce inline-budget accept path using socket_service-created epoll_socket_impl; streamline success/error/cancel signaling; add cancel_single_op(epoll_op&) noexcept and delegate cancel() to it.
Performance Utility
perf/common/perf.hpp
Add Linux-only perf::await_conntrack_drain() that polls /proc/sys/net/netfilter/nf_conntrack_max and /proc/net/nf_conntrack_count, prints status and times out after 30s; add <fstream> and <thread> includes.

Sequence Diagram

sequenceDiagram
    autonumber
    participant Client
    participant Acceptor as Acceptor/Socket
    participant Scheduler
    participant Reactor as Reactor/DescriptorSlot
    participant Dispatcher

    Client->>Acceptor: initiate accept/read/write op
    Acceptor->>Scheduler: reset_inline_budget()
    Acceptor->>Scheduler: try_consume_inline_budget()
    alt budget available
        Scheduler-->>Acceptor: true
        Acceptor->>Acceptor: perform inline IO / produce result
        Acceptor->>Dispatcher: dispatch completion (inline)
        Dispatcher-->>Client: resume coroutine
    else budget exhausted or would-block
        Scheduler-->>Acceptor: false
        Acceptor->>Reactor: register_op(op, desc_slot, ready_flag)
        Reactor->>Reactor: store op until epoll event
        note right of Reactor: wait for epoll readiness
        Reactor->>Dispatcher: post completion when ready
        Dispatcher-->>Client: resume coroutine (posted)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • Benchmark enhancements #107: Modifies perf/common/perf.hpp and public perf APIs — directly related to the new await_conntrack_drain() addition.
  • Add speculative completion fast path for socket I/O #113: Changes epoll socket I/O fast-paths and read/write handling — overlaps with sockets.cpp/register_op and inline-budget changes.
  • Epoll #101: Extensive epoll reactor and scheduler edits that touch scheduler, op registration, and accept/connect flows — strong overlap with these refactors.

Poem

🐰 I watched the reactors hum at dawn,

Budgets reset, and ops hopped on,
I counted tracks till the noise went thin,
Inline they danced, then posted in,
I thumped with joy — the loop goes on.

🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Symmetric transfer inlining' directly and clearly describes the main feature being added: an inline completion fast path that uses symmetric transfer to avoid scheduler round-trips.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov
Copy link

codecov bot commented Feb 11, 2026

Codecov Report

❌ Patch coverage is 72.95918% with 53 lines in your changes missing coverage. Please review.
✅ Project coverage is 81.05%. Comparing base (6a49606) to head (da8835e).
⚠️ Report is 7 commits behind head on develop.

Files with missing lines Patch % Lines
src/corosio/src/detail/epoll/acceptors.cpp 54.23% 27 Missing ⚠️
src/corosio/src/detail/epoll/sockets.cpp 78.86% 26 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff             @@
##           develop     #130      +/-   ##
===========================================
+ Coverage    80.13%   81.05%   +0.91%     
===========================================
  Files           65       64       -1     
  Lines         5639     5557      -82     
===========================================
- Hits          4519     4504      -15     
+ Misses        1120     1053      -67     
Files with missing lines Coverage Δ
src/corosio/src/detail/epoll/op.hpp 81.44% <100.00%> (-2.91%) ⬇️
src/corosio/src/detail/epoll/scheduler.cpp 83.50% <100.00%> (+2.84%) ⬆️
src/corosio/src/detail/epoll/scheduler.hpp 0.00% <ø> (ø)
src/corosio/src/detail/epoll/sockets.hpp 91.66% <ø> (ø)
src/corosio/src/detail/epoll/sockets.cpp 82.56% <78.86%> (+11.10%) ⬆️
src/corosio/src/detail/epoll/acceptors.cpp 80.18% <54.23%> (-1.68%) ⬇️

... and 3 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 6a49606...da8835e. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@src/corosio/src/detail/epoll/sockets.cpp`:
- Around line 220-238: The inline fast-paths (e.g., the connect path in
sockets.cpp around the block using svc_.scheduler().try_consume_inline_budget())
dereference ec (and similarly bytes_out in read_some/write_some and ec in
accept) without null-checks while the posted fallback uses op.ec_out/
op.bytes_out guarded by if (ec_out)/if (bytes_out); make these consistent by
guarding dereferences in the inline branch: check ec (and bytes_out where
applicable) before writing to *ec/*bytes_out, or assert non-null (e.g.,
assert(ec) / assert(bytes_out)) if the API guarantees non-null; update the
connect code path (the block that sets *ec = ... and returns ex.dispatch(h)),
read_some, write_some, and the acceptor inline paths to match the posted-path
null checks using the same op.ec_out/op.bytes_out semantics.
🧹 Nitpick comments (3)
perf/common/perf.hpp (1)

277-285: No timeout indication when the deadline expires without draining.

If the 30-second deadline elapses and count is still above threshold, the loop silently falls through and prints the current counts on line 285, indistinguishable from a successful drain. A brief note to the operator would help during triage of anomalous benchmark runs.

Proposed fix
     while( clock::now() < deadline )
     {
         std::this_thread::sleep_for( std::chrono::milliseconds( 200 ) );
         count = read_value( "/proc/sys/net/netfilter/nf_conntrack_count" );
         if( count < 0 || count <= threshold )
             break;
     }

-    std::cout << " " << count << "/" << ct_max << "\n";
+    if( count >= 0 && count > threshold )
+        std::cout << " timed out at " << count << "/" << ct_max << "\n";
+    else
+        std::cout << " drained to " << count << "/" << ct_max << "\n";
src/corosio/src/detail/epoll/scheduler.hpp (1)

251-251: Consider making the inline budget configurable.

The budget ceiling is hardcoded to 2. If this ever needs tuning (e.g., for benchmarks or different workloads), consider accepting it as a constructor parameter or making it a runtime-settable value. Not urgent — just a thought for future flexibility.

src/corosio/src/detail/epoll/acceptors.cpp (1)

61-95: Duplicated peer-socket setup between operator()() and inline accept() path.

The ~20-line block that creates a socket impl, registers its descriptor, and sets endpoints is nearly identical in both locations. Extracting a shared helper would reduce maintenance burden and divergence risk.

♻️ Sketch of a helper
// In epoll_acceptor_impl or as a free function in this TU:
epoll_socket_impl*
setup_accepted_socket(
    epoll_socket_service& svc,
    int fd,
    endpoint local_ep,
    sockaddr_in const& peer) noexcept
{
    auto& impl = static_cast<epoll_socket_impl&>(svc.create_impl());
    impl.set_socket(fd);
    impl.desc_state_.fd = fd;
    {
        std::lock_guard lock(impl.desc_state_.mutex);
        impl.desc_state_.read_op = nullptr;
        impl.desc_state_.write_op = nullptr;
        impl.desc_state_.connect_op = nullptr;
    }
    svc.scheduler().register_descriptor(fd, &impl.desc_state_);
    impl.set_endpoints(local_ep, from_sockaddr_in(peer));
    return &impl;
}

Then both call sites become a one-liner plus the impl_out assignment.

Also applies to: 161-193

@cppalliance-bot
Copy link

cppalliance-bot commented Feb 11, 2026

An automated preview of the documentation is available at https://130.corosio.prtest3.cppalliance.org/index.html

If more commits are pushed to the pull request, the docs will rebuild at the same URL.

2026-02-11 21:26:03 UTC

@cppalliance-bot
Copy link

cppalliance-bot commented Feb 11, 2026

GCOVR code coverage report https://130.corosio.prtest3.cppalliance.org/gcovr/index.html
LCOV code coverage report https://130.corosio.prtest3.cppalliance.org/genhtml/index.html
Coverage Diff Report https://130.corosio.prtest3.cppalliance.org/diff-report/index.html

Build time: 2026-02-11 21:29:21 UTC

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@src/corosio/src/detail/epoll/acceptors.cpp`:
- Around line 86-91: Remove the leftover null check in the error path so it
matches the rest of the PR: unconditionally assign the error to *ec_out (i.e.
replace the conditional "if (ec_out && !*ec_out) *ec_out = make_err(ENOENT);"
with a direct assignment "*ec_out = make_err(ENOENT);") and leave success =
false; intact; this keeps behavior consistent with other places that assume
ec_out is non-null and references the same symbols ec_out, make_err, and success
in acceptors.cpp.
🧹 Nitpick comments (5)
src/corosio/src/detail/epoll/sockets.cpp (2)

103-131: Missing high-level implementation overview for the file.

Per the project coding guidelines, files with non-trivial implementation logic should include a /* */ block comment after the includes providing a high-level overview. This file now contains the inline-budget fast-path machinery, the register_op pattern, and the moved operator()() definitions — all fairly intricate. A short paragraph after line 29 summarizing the speculative-I/O → inline-budget → register_op flow would help future maintainers.

📝 Example overview block
 namespace boost::corosio::detail {
 
+/*
+ * Epoll socket I/O implementation with speculative inline completion.
+ *
+ * Each I/O initiator (connect, read_some, write_some) attempts the syscall
+ * speculatively.  If the syscall succeeds (or fails with a non-EAGAIN error)
+ * and the per-scheduler inline budget has not been exhausted, the completion
+ * is dispatched inline via ex.dispatch(h) to avoid a scheduler round-trip.
+ * Otherwise the result is captured in the epoll_op and posted to the
+ * scheduler queue.  When the syscall returns EAGAIN/EWOULDBLOCK, register_op()
+ * checks for a cached edge-triggered readiness event and, if none is
+ * available, parks the op in the descriptor_state slot for the reactor to
+ * complete later.  work_started() / work_finished() bracket the reactor-
+ * managed lifetime; the matching work_finished() is issued either here
+ * (immediate re-try or cancel) or by the reactor / cancel path.
+ */
+
 // Register an op with the reactor, handling cached edge events.

As per coding guidelines: "Files containing non-trivial implementation logic should include a /* */ block comment after the includes that provides a high-level overview of how the implementation works."


264-269: Repeated op-initialization boilerplate across paths.

The six-line op.h = h; op.ex = ex; op.ec_out = ec; …; op.start(token, this); op.impl_ptr = shared_from_this(); block is duplicated across every non-inline branch in read_some, write_some, and (partially) connect. A small private helper like init_op(op, h, ex, ec, bytes_out, token) would deduplicate this and reduce the chance of a field being missed in one path.

Low priority given the performance-sensitive context — fine to defer.

Also applies to: 303-308, 315-321, 345-350, 383-388, 395-401

src/corosio/src/detail/epoll/acceptors.cpp (3)

135-150: op.start() is called before the inline-budget check, unlike the socket paths.

In connect(), read_some(), and write_some() (in sockets.cpp), op.start(token, this) is deliberately deferred to the non-inline branches so the stop callback is never registered when the inline fast path is taken. Here, op.start(token, this) at line 142 runs unconditionally before accept4, meaning the inline path (lines 159-189) dispatches with an active stop callback that is never explicitly reset.

This works correctly today (a spurious cancel is a no-op because desc_state_.read_op is never set), but it adds unnecessary overhead on the hot inline path (registering + later tearing down the stop callback) and is inconsistent with the socket-side pattern.

Consider deferring op.start(token, this) to after the inline-budget check, i.e. move it to the fallback and EAGAIN branches.

Sketch
     auto& op = acc_;
     op.reset();
     op.h = h;
     op.ex = ex;
     op.ec_out = ec;
     op.impl_out = impl_out;
     op.fd = fd_;
-    op.start(token, this);
 
     sockaddr_in addr{};
     ...
     if (accepted >= 0)
     {
         ...
         if (svc_.scheduler().try_consume_inline_budget())
         {
             // ... inline path (no stop_cb needed) ...
             return ex.dispatch(h);
         }
 
+        op.start(token, this);
         op.accepted_fd = accepted;
         op.peer_addr = addr;
         ...
     }
 
     if (errno == EAGAIN || errno == EWOULDBLOCK)
     {
+        op.start(token, this);
         op.impl_ptr = shared_from_this();
         ...
     }
 
+    op.start(token, this);
     op.complete(errno, 0);
     ...

58-83: Duplicated peer-socket setup logic between inline and posted paths.

The create-impl → set_socket → register-descriptor → set_endpoints → assign impl_out sequence at lines 59-83 (posted path in operator()()) and lines 161-180 (inline path in accept()) are nearly identical. Extracting a private helper on epoll_acceptor_impl (or a free function) that takes the accepted fd, peer address, and output pointers would reduce the duplication and the risk of the two paths diverging.

Fine to defer — just noting for future maintenance.

Also applies to: 159-180


46-47: Inline budget reset via static_cast chain is correct but fragile.

The static_cast<epoll_acceptor_impl*>(acceptor_impl_)->service().scheduler().reset_inline_budget() chain works because operator()() is only ever called on an epoll_accept_op whose acceptor_impl_ is always an epoll_acceptor_impl. If this invariant ever changes, the static_cast would silently produce UB. A brief comment or assert(dynamic_cast<...>) in debug builds would guard against future regressions.

This is not blocking — just a defensive coding note.

The ec_out and bytes_out pointers are always non-null — the only
callers are awaitable types that pass addresses of member variables.
@sgerbino sgerbino force-pushed the pr/symmetric-transfer branch from baff3be to da8835e Compare February 11, 2026 21:23
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@src/corosio/src/detail/epoll/sockets.cpp`:
- Around line 210-228: The stop callback installed in start() currently calls
request_cancel() synchronously when the stop_token is already stopped, which
sets cancelled=true before op.complete(err, 0) and causes operator()() to map
the real error to capy::error::canceled; modify the start()/stop callback logic
so that a synchronous already-stopped invocation does not set the cancelled flag
before complete() runs (e.g., detect token already stopped and either 1) defer
setting cancelled until after op.complete() returns, or 2) record a separate
stop_requested flag and only set cancelled=true from the async callback path),
ensuring complete(err, 0) is called with the real err preserved and operator()()
can distinguish real errors from later cancellations; touch the
functions/methods start(), request_cancel(), complete(), and operator()() to
implement this change.
🧹 Nitpick comments (4)
src/corosio/src/detail/epoll/acceptors.cpp (2)

58-91: Duplicated socket-setup logic between operator()() and inline budget path in accept().

The socket creation/configuration sequence (create_impl → set_socket → desc_state init → register_descriptor → set_endpoints → assign impl_out) is repeated nearly verbatim in two places. If one path evolves (e.g., new socket options, additional bookkeeping), the other will silently diverge.

Consider extracting a shared helper, e.g. setup_accepted_socket(int fd, sockaddr_in const& addr, io_object::io_object_impl** impl_out, std::error_code* ec) → bool, that both operator()() and the inline path in accept() can call.

Also applies to: 158-189


199-223: EAGAIN path mirrors register_op but is hand-inlined.

The lock-acquire → check read_readyperform_io → post-or-register pattern here is essentially register_op() from sockets.cpp specialized for the acceptor's read_op slot. This is fine for now since the acceptor only has a single op slot and uses a different service type, but worth noting for future maintainability — if register_op ever gains additional bookkeeping, this path would need to be updated independently.

src/corosio/src/detail/epoll/sockets.cpp (2)

255-273: Repeated op-setup boilerplate in non-inline posted fallbacks.

Each of read_some, write_some, and connect has a nearly identical block for the "inline budget exhausted → set up op fields → complete → post" path (e.g., lines 303–311 for read, 383–391 for write, 218–228 for connect). This is the same pattern as the duplication noted in acceptors.cpp. A small helper (or a lambda) that populates the common op fields (h, ex, ec_out, bytes_out, token, impl_ptr) would reduce the surface area for copy-paste drift.

Also applies to: 303-311, 383-391


10-30: Missing high-level implementation overview comment.

This file now contains the core inline-budget dispatch logic (register_op, operator()() for epoll_op and epoll_connect_op) plus the speculative I/O fast paths. A brief /* */ block comment after the includes explaining the inline-budget flow and the speculative-then-register pattern would help future maintainers orient quickly. As per coding guidelines: "Files containing non-trivial implementation logic should include a /* */ block comment after the includes."

Comment on lines +210 to 228
if (result == 0 || errno != EINPROGRESS)
{
svc_.work_started();
op.impl_ptr = shared_from_this();

std::lock_guard lock(desc_state_.mutex);
if (desc_state_.write_ready)
int err = (result < 0) ? errno : 0;
if (svc_.scheduler().try_consume_inline_budget())
{
desc_state_.write_ready = false;
op.perform_io();
if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
{
op.errn = 0;
if (op.cancelled.load(std::memory_order_acquire))
{
svc_.post(&op);
svc_.work_finished();
}
else
{
desc_state_.connect_op = &op;
}
}
else
{
svc_.post(&op);
svc_.work_finished();
}
}
else
{
if (op.cancelled.load(std::memory_order_acquire))
{
svc_.post(&op);
svc_.work_finished();
}
else
{
desc_state_.connect_op = &op;
}
*ec = err ? make_err(err) : std::error_code{};
return ex.dispatch(h);
}
op.reset();
op.h = h;
op.ex = ex;
op.ec_out = ec;
op.fd = fd_;
op.target_endpoint = ep;
op.start(token, this);
op.impl_ptr = shared_from_this();
op.complete(err, 0);
svc_.post(&op);
return std::noop_coroutine();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

cd src/corosio/src/detail/epoll && \
grep -n "void start\|auto start" sockets.cpp | head -20

Repository: cppalliance/corosio

Length of output: 45


🏁 Script executed:

cd src/corosio/src/detail/epoll && \
grep -n "void complete\|auto complete" sockets.cpp | head -20

Repository: cppalliance/corosio

Length of output: 45


🏁 Script executed:

# Find the operation type definition
rg "struct.*op\s*{|class.*op\s*{" --max-count 10 -A 5

Repository: cppalliance/corosio

Length of output: 187


🏁 Script executed:

# Search for start() method in the codebase to understand what it does
rg "\.start\(" src/corosio/src/detail/epoll/sockets.cpp -B 2 -A 5

Repository: cppalliance/corosio

Length of output: 1873


🏁 Script executed:

# Let me find the full connect() function and see what op type is
cd src/corosio/src/detail/epoll && \
head -250 sockets.cpp | tail -70

Repository: cppalliance/corosio

Length of output: 1817


🏁 Script executed:

# Look for the operation type definition - likely ConnectOp or similar
rg "ConnectOp|struct.*Operation|class.*Operation" src/corosio/src/detail/epoll/ -A 5 | head -50

Repository: cppalliance/corosio

Length of output: 45


🏁 Script executed:

# Search for where "op" is declared in the connect function
cd src/corosio/src/detail/epoll && \
sed -n '180,230p' sockets.cpp

Repository: cppalliance/corosio

Length of output: 1369


🏁 Script executed:

# Find the conn_ member declaration to understand its type
rg "conn_" src/corosio/src/detail/epoll/sockets.hpp -B 2 -A 2 | head -30

Repository: cppalliance/corosio

Length of output: 144


🏁 Script executed:

# Look for the operation type definition - likely in a detail namespace
fd "\.hpp$" src/corosio/src/detail/epoll/ -exec basename {} \; | sort

Repository: cppalliance/corosio

Length of output: 214


🏁 Script executed:

# Search for the operation class definition (ConnectOp or similar)
rg "class.*Op|struct.*Op" src/corosio/src/detail/epoll/ -A 10 | head -100

Repository: cppalliance/corosio

Length of output: 45


🏁 Script executed:

# Find epoll_connect_op definition
rg "epoll_connect_op|class epoll_connect_op|struct epoll_connect_op" src/corosio/src/detail/epoll/ -A 30

Repository: cppalliance/corosio

Length of output: 9436


🏁 Script executed:

# List all header files in the epoll detail directory
fd "\.hpp$" src/corosio/src/detail/epoll/

Repository: cppalliance/corosio

Length of output: 225


🏁 Script executed:

# Check what files exist in epoll directory
ls -la src/corosio/src/detail/epoll/

Repository: cppalliance/corosio

Length of output: 613


🏁 Script executed:

# Get the full epoll_op base class definition including start() and complete()
cat src/corosio/src/detail/epoll/op.hpp

Repository: cppalliance/corosio

Length of output: 10347


🏁 Script executed:

# Find the canceller::operator() implementation in sockets.cpp
rg "canceller::operator" src/corosio/src/detail/epoll/sockets.cpp -A 15

Repository: cppalliance/corosio

Length of output: 45


🏁 Script executed:

# Also find what cancel() methods do for epoll_op derived classes
rg "void.*cancel\(\)" src/corosio/src/detail/epoll/sockets.cpp -A 10 | head -80

Repository: cppalliance/corosio

Length of output: 45


🏁 Script executed:

# Search for the canceller struct implementation
rg "canceller.*operator\(\)" src/corosio/src/detail/epoll/ -A 10

Repository: cppalliance/corosio

Length of output: 45


🏁 Script executed:

# Look for where canceller is defined and called
rg "void.*canceller.*operator\|canceller\(\)" src/corosio/src/detail/epoll/ -B 3 -A 8

Repository: cppalliance/corosio

Length of output: 45


🏁 Script executed:

# Search in all of detail directory
rg "struct canceller" src/corosio/src/detail/ -A 15

Repository: cppalliance/corosio

Length of output: 6537


🏁 Script executed:

# Look at the full sockets.cpp file to understand the flow
wc -l src/corosio/src/detail/epoll/sockets.cpp

Repository: cppalliance/corosio

Length of output: 107


🏁 Script executed:

# Search more broadly for canceller implementation
rg "epoll_op.*canceller\|canceller.*operator" src/corosio/src/detail/epoll/sockets.cpp -A 5

Repository: cppalliance/corosio

Length of output: 45


🏁 Script executed:

# Look at the connect function and the area around line 224 more carefully
sed -n '200,240p' src/corosio/src/detail/epoll/sockets.cpp

Repository: cppalliance/corosio

Length of output: 1166


🏁 Script executed:

# Search for the canceller operator() which is likely inline or in scheduler_op
rg "void.*canceller.*operator\|namespace.*detail.*{" src/corosio/src/detail/ --max-count 5

Repository: cppalliance/corosio

Length of output: 220


🏁 Script executed:

# Look for where canceller::operator() is implemented - it's declared in op.hpp
fd "\.cpp$" src/corosio/src/detail/ --exec grep -l "canceller::operator\|epoll_op.*canceller" {} \;

Repository: cppalliance/corosio

Length of output: 103


🏁 Script executed:

# Search for canceller implementation in sockets.cpp
grep -n "canceller" src/corosio/src/detail/epoll/sockets.cpp

Repository: cppalliance/corosio

Length of output: 87


🏁 Script executed:

# Look at the entire file to find where epoll_op::canceller is implemented
rg "epoll_op" src/corosio/src/detail/epoll/sockets.cpp

Repository: cppalliance/corosio

Length of output: 463


🏁 Script executed:

# Let me check if start() implementation does anything with stop_token
sed -n '1,100p' src/corosio/src/detail/epoll/op.hpp | tail -50

Repository: cppalliance/corosio

Length of output: 1965


🏁 Script executed:

# Find the canceller operator() implementation in sockets.cpp
sed -n '67,100p' src/corosio/src/detail/epoll/sockets.cpp

Repository: cppalliance/corosio

Length of output: 580


🏁 Script executed:

# Look at the full op.hpp to see what start() does
sed -n '190,220p' src/corosio/src/detail/epoll/op.hpp

Repository: cppalliance/corosio

Length of output: 854


🏁 Script executed:

# Look for cancel_single_op implementation to see if it depends on impl_ptr
rg "cancel_single_op" src/corosio/src/detail/epoll/sockets.cpp -A 25

Repository: cppalliance/corosio

Length of output: 1779


🏁 Script executed:

# Let me also check if there's a window where socket_impl_ could be null after start() but before the stop callback is set
# Look at the full start() implementation again and understand the sequence
sed -n '210,250p' src/corosio/src/detail/epoll/op.hpp

Repository: cppalliance/corosio

Length of output: 967


🏁 Script executed:

# Look at the exact sequence in the immediate-completion path again
sed -n '210,230p' src/corosio/src/detail/epoll/sockets.cpp

Repository: cppalliance/corosio

Length of output: 636


🏁 Script executed:

# Check if there's any synchronization between start() and stop_token callback firing
# Look for documentation about stop_callback behavior
rg "stop_callback\|stop_token" src/corosio/src/detail/epoll/op.hpp | head -20

Repository: cppalliance/corosio

Length of output: 45


🏁 Script executed:

# Let me trace the exact flow: when complete() is called, what state is set?
# And when the stop callback fires, what happens?
sed -n '165,250p' src/corosio/src/detail/epoll/sockets.cpp | head -100

Repository: cppalliance/corosio

Length of output: 2157


🏁 Script executed:

# Find the full operator()() implementation for epoll_op
sed -n '96,130p' src/corosio/src/detail/epoll/sockets.cpp

Repository: cppalliance/corosio

Length of output: 1165


🏁 Script executed:

# Let me also check the std::stop_callback semantics - does it fire synchronously?
# Search for any documentation or comments about this
rg "stop_callback\|already stopped" src/corosio/src/detail/epoll/ -B 2 -A 2

Repository: cppalliance/corosio

Length of output: 45


🏁 Script executed:

# Check if there are any tests or examples showing immediate-stop behavior
fd "test" src/corosio/ -type d | head -5

Repository: cppalliance/corosio

Length of output: 233


🏁 Script executed:

# Let me search for any synchronization primitives or comments about the stop_callback behavior
rg "synchronously\|immediately\|std::stop" src/corosio/src/detail/epoll/op.hpp

Repository: cppalliance/corosio

Length of output: 45


🏁 Script executed:

# Check if there are any comments about the timing of stop callbacks
head -150 src/corosio/src/detail/epoll/op.hpp | tail -50

Repository: cppalliance/corosio

Length of output: 1675


🏁 Script executed:

# Check if there are any other places in the codebase that handle this pattern
rg "start\(token" src/corosio/src/detail/ -B 2 -A 5 | head -60

Repository: cppalliance/corosio

Length of output: 4197


🏁 Script executed:

# Look for any comments about the stop_token being already-stopped scenario
rg "already.*stop\|stop.*immediate\|stop.*synchronous" src/corosio/src/

Repository: cppalliance/corosio

Length of output: 45


Result code lost if stop_token already stopped before start() completes.

The stop callback fired by stop_token::emplace in start() (line 224) executes synchronously if the token is already stopped, calling request_cancel() to set cancelled = true before complete(err, 0) on line 226. When operator()() later checks cancelled first (line 104 of sockets.cpp), it discards the actual error code and returns capy::error::canceled instead. This causes operations that completed with a specific error to incorrectly report cancellation if the caller's stop token was already stopped.

🤖 Prompt for AI Agents
In `@src/corosio/src/detail/epoll/sockets.cpp` around lines 210 - 228, The stop
callback installed in start() currently calls request_cancel() synchronously
when the stop_token is already stopped, which sets cancelled=true before
op.complete(err, 0) and causes operator()() to map the real error to
capy::error::canceled; modify the start()/stop callback logic so that a
synchronous already-stopped invocation does not set the cancelled flag before
complete() runs (e.g., detect token already stopped and either 1) defer setting
cancelled until after op.complete() returns, or 2) record a separate
stop_requested flag and only set cancelled=true from the async callback path),
ensuring complete(err, 0) is called with the real err preserved and operator()()
can distinguish real errors from later cancellations; touch the
functions/methods start(), request_cancel(), complete(), and operator()() to
implement this change.

@sgerbino sgerbino merged commit c98acaa into cppalliance:develop Feb 11, 2026
17 checks passed
@sgerbino sgerbino deleted the pr/symmetric-transfer branch February 11, 2026 21:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants