libs/corosio/src/corosio/src/tcp_server.cpp

63.6% Lines (42/66) 84.6% Functions (11/13) 83.3% Branches (20/24)
libs/corosio/src/corosio/src/tcp_server.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/tcp_server.hpp>
11 #include <boost/corosio/detail/except.hpp>
12 #include <condition_variable>
13 #include <mutex>
14 #include <utility>
15
16 namespace boost::corosio {
17
18 struct tcp_server::impl
19 {
20 std::mutex join_mutex;
21 std::condition_variable join_cv;
22 capy::execution_context& ctx;
23 std::vector<tcp_acceptor> ports;
24 std::stop_source stop;
25
26 7 explicit impl(capy::execution_context& c) noexcept
27 7 : ctx(c)
28 {
29 7 }
30 };
31
32 tcp_server::impl*
33 7 tcp_server::make_impl(capy::execution_context& ctx)
34 {
35 7 return new impl(ctx);
36 }
37
38 7 tcp_server::~tcp_server()
39 {
40
1/2
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
7 delete impl_;
41 7 }
42
43 tcp_server::tcp_server(
44 tcp_server&& o) noexcept
45 : impl_(std::exchange(o.impl_, nullptr))
46 , ex_(std::move(o.ex_))
47 , waiters_(std::exchange(o.waiters_, nullptr))
48 , idle_head_(std::exchange(o.idle_head_, nullptr))
49 , active_head_(std::exchange(o.active_head_, nullptr))
50 , active_tail_(std::exchange(o.active_tail_, nullptr))
51 , active_accepts_(std::exchange(o.active_accepts_, 0))
52 , storage_(std::move(o.storage_))
53 , running_(std::exchange(o.running_, false))
54 {
55 }
56
57 tcp_server&
58 tcp_server::operator=(tcp_server&& o) noexcept
59 {
60 delete impl_;
61 impl_ = std::exchange(o.impl_, nullptr);
62 ex_ = std::move(o.ex_);
63 waiters_ = std::exchange(o.waiters_, nullptr);
64 idle_head_ = std::exchange(o.idle_head_, nullptr);
65 active_head_ = std::exchange(o.active_head_, nullptr);
66 active_tail_ = std::exchange(o.active_tail_, nullptr);
67 active_accepts_ = std::exchange(o.active_accepts_, 0);
68 storage_ = std::move(o.storage_);
69 running_ = std::exchange(o.running_, false);
70 return *this;
71 }
72
73 // Accept loop: wait for idle worker, accept connection, dispatch
74 capy::task<void>
75
1/1
✓ Branch 1 taken 8 times.
8 tcp_server::do_accept(tcp_acceptor& acc)
76 {
77 auto st = co_await capy::this_coro::stop_token;
78 while(! st.stop_requested())
79 {
80 // Wait for an idle worker before blocking on accept
81 auto& w = co_await pop();
82 auto [ec] = co_await acc.accept(w.socket());
83 if(ec)
84 {
85 co_await push(w);
86 continue;
87 }
88 w.run(launcher{*this, w});
89 }
90 16 }
91
92 std::error_code
93 7 tcp_server::bind(endpoint ep)
94 {
95 7 impl_->ports.emplace_back(impl_->ctx);
96 // VFALCO this should return error_code
97 7 impl_->ports.back().listen(ep);
98 7 return {};
99 }
100
101 void
102 10 tcp_server::
103 start()
104 {
105 // Idempotent - only start if not already running
106
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 9 times.
10 if(running_)
107 1 return;
108
109 // Previous session must be fully stopped before restart
110
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 8 times.
9 if(active_accepts_ != 0)
111 1 detail::throw_logic_error(
112 "tcp_server::start: previous session not joined");
113
114 8 running_ = true;
115
116
1/1
✓ Branch 1 taken 8 times.
8 impl_->stop = {}; // Fresh stop source
117 8 auto st = impl_->stop.get_token();
118
119 8 active_accepts_ = impl_->ports.size();
120
121 // Launch with completion handler that decrements counter
122
2/2
✓ Branch 4 taken 8 times.
✓ Branch 5 taken 8 times.
16 for(auto& t : impl_->ports)
123 16 capy::run_async(ex_, st, [this]() {
124
1/1
✓ Branch 1 taken 8 times.
8 std::lock_guard lock(impl_->join_mutex);
125
1/2
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
8 if(--active_accepts_ == 0)
126 8 impl_->join_cv.notify_all();
127
2/2
✓ Branch 1 taken 8 times.
✓ Branch 4 taken 8 times.
16 })(do_accept(t));
128 8 }
129
130 void
131 10 tcp_server::
132 stop()
133 {
134 // Idempotent - only stop if running
135
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 8 times.
10 if(!running_)
136 2 return;
137 8 running_ = false;
138
139 // Stop accept loops
140 8 impl_->stop.request_stop();
141
142 // Launch cancellation coroutine on server executor
143
2/2
✓ Branch 4 taken 8 times.
✓ Branch 7 taken 8 times.
8 capy::run_async(ex_, std::stop_token{})(do_stop());
144 }
145
146 void
147 4 tcp_server::
148 join()
149 {
150
1/1
✓ Branch 1 taken 4 times.
4 std::unique_lock lock(impl_->join_mutex);
151
1/1
✓ Branch 1 taken 4 times.
8 impl_->join_cv.wait(lock, [this] { return active_accepts_ == 0; });
152 4 }
153
154 capy::task<>
155
1/1
✓ Branch 1 taken 8 times.
8 tcp_server::do_stop()
156 {
157 // Running on server executor - safe to iterate active list
158 // Just cancel, don't modify list - workers return themselves when done
159 for(auto* w = active_head_; w; w = w->next_)
160 w->stop_.request_stop();
161 co_return;
162 16 }
163
164 } // namespace boost::corosio
165