accept_server.cc
1 // Copyright 2020, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #include "util/uring/accept_server.h"
6 
7 #include <boost/fiber/operations.hpp>
8 
9 #include "base/logging.h"
10 #include "util/uring/fiber_socket.h"
11 #include "util/uring/proactor_pool.h"
12 #include "util/uring/uring_fiber_algo.h"
13 
14 #define VSOCK(verbosity, sock) VLOG(verbosity) << "sock[" << (sock).native_handle() << "] "
15 #define DVSOCK(verbosity, sock) DVLOG(verbosity) << "sock[" << (sock).native_handle() << "] "
16 
17 namespace util {
18 namespace uring {
19 
20 using namespace boost;
21 using namespace std;
22 
23 using ListType =
24  intrusive::slist<Connection, Connection::member_hook_t, intrusive::constant_time_size<true>,
25  intrusive::cache_last<false>>;
26 
28  ListType list;
29  fibers::mutex mu;
30  fibers::condition_variable cond;
31 
32  void Link(Connection* c) {
33  std::lock_guard<fibers::mutex> lk(mu);
34  list.push_front(*c);
35  VLOG(2) << "List size " << list.size();
36  }
37 
38  void Unlink(Connection* c) {
39  std::lock_guard<fibers::mutex> lk(mu);
40  auto it = list.iterator_to(*c);
41  list.erase(it);
42  DVLOG(2) << "List size " << list.size();
43 
44  if (list.empty()) {
45  cond.notify_one();
46  }
47  }
48 
49  void AwaitEmpty() {
50  std::unique_lock<fibers::mutex> lk(mu);
51  DVLOG(1) << "AwaitEmpty: List size: " << list.size();
52 
53  cond.wait(lk, [this] { return list.empty(); });
54  }
55 };
56 
57 AcceptServer::AcceptServer(ProactorPool* pool, bool break_on_int)
58  : pool_(pool), ref_bc_(0), break_(break_on_int) {
59  if (break_on_int) {
60  Proactor* proactor = pool_->GetNextProactor();
61  proactor->RegisterSignal({SIGINT, SIGTERM}, [this](int signal) {
62  LOG(INFO) << "Exiting on signal " << signal;
63  BreakListeners();
64  });
65  }
66 }
67 
68 AcceptServer::~AcceptServer() {
69  list_interface_.clear();
70 }
71 
72 void AcceptServer::Run() {
73  if (!list_interface_.empty()) {
74  ref_bc_.Add(list_interface_.size());
75 
76  for (auto& lw : list_interface_) {
77  auto* proactor = lw->listener_.proactor();
78  proactor->AsyncFiber([li = lw.get(), this] {
79  li->RunAcceptLoop();
80  ref_bc_.Dec();
81  });
82  }
83  }
84  was_run_ = true;
85 }
86 
87 // If wait is false - does not wait for the server to stop.
88 // Then you need to run Wait() to wait for proper shutdown.
89 void AcceptServer::Stop(bool wait) {
90  VLOG(1) << "AcceptServer::Stop";
91 
92  BreakListeners();
93  if (wait)
94  Wait();
95 }
96 
97 void AcceptServer::Wait() {
98  VLOG(1) << "AcceptServer::Wait";
99  if (was_run_) {
100  ref_bc_.Wait();
101  VLOG(1) << "AcceptServer::Wait completed";
102  } else {
103  CHECK(list_interface_.empty()) << "Must Call AcceptServer::Run() after adding listeners";
104  }
105 }
106 
107 // Returns the port number to which the listener was bound.
108 unsigned short AcceptServer::AddListener(unsigned short port, ListenerInterface* lii) {
109  CHECK(lii && !lii->listener_.IsOpen());
110 
111  // We can not allow dynamic listener additions because listeners_ might reallocate.
112  CHECK(!was_run_);
113 
114  FiberSocket fs;
115  uint32_t sock_opt_mask = lii->GetSockOptMask();
116  auto ec = fs.Listen(port, backlog_, sock_opt_mask);
117  CHECK(!ec) << "Could not open port " << port << " " << ec << "/" << ec.message();
118 
119  auto ep = fs.LocalEndpoint();
120  lii->RegisterPool(pool_);
121 
122  Proactor* next = pool_->GetNextProactor();
123  fs.set_proactor(next);
124  lii->listener_ = std::move(fs);
125 
126  list_interface_.emplace_back(lii);
127 
128  return ep.port();
129 }
130 
131 void AcceptServer::BreakListeners() {
132  for (auto& lw : list_interface_) {
133  auto* proactor = lw->listener_.proactor();
134  proactor->AsyncBrief([sock = &lw->listener_] { sock->Shutdown(SHUT_RDWR); });
135  }
136  VLOG(1) << "AcceptServer::BreakListeners finished";
137 }
138 
139 // Runs in a dedicated fiber for each listener.
140 void ListenerInterface::RunAcceptLoop() {
141  auto& fiber_props = this_fiber::properties<UringFiberProps>();
142  fiber_props.set_name("AcceptLoop");
143 
144  auto ep = listener_.LocalEndpoint();
145  VSOCK(0, listener_) << "AcceptServer - listening on port " << ep.port();
146  SafeConnList safe_list;
147 
148  PreAcceptLoop(listener_.proactor());
149 
150  while (true) {
151  FiberSocket peer;
152  std::error_code ec = listener_.Accept(&peer);
153  if (ec == errc::connection_aborted)
154  break;
155 
156  if (ec) {
157  LOG(ERROR) << "Error calling accept " << ec << "/" << ec.message();
158  break;
159  }
160  VLOG(2) << "Accepted " << peer.native_handle() << ": " << peer.LocalEndpoint();
161  Proactor* next = pool_->GetNextProactor(); // Could be for another thread.
162 
163  peer.set_proactor(next);
164  Connection* conn = NewConnection(next);
165  conn->SetSocket(std::move(peer));
166  safe_list.Link(conn);
167 
168  // mutable because we move peer.
169  auto cb = [conn, next, &safe_list]() mutable {
170  next->AsyncFiber(&RunSingleConnection, conn, &safe_list);
171  };
172 
173  // Run cb in its Proactor thread.
174  next->AsyncFiber(std::move(cb));
175  }
176 
177  PreShutdown();
178 
179  safe_list.mu.lock();
180  unsigned cnt = 0;
181  for (auto& val : safe_list.list) {
182  val.socket_.Shutdown(SHUT_RDWR);
183  DVSOCK(1, val.socket_) << "Shutdown";
184  ++cnt;
185  }
186 
187  safe_list.mu.unlock();
188 
189  VLOG(1) << "Waiting for " << cnt << " connections to close";
190  safe_list.AwaitEmpty();
191 
192  PostShutdown();
193 
194  LOG(INFO) << "Listener stopped for port " << ep.port();
195 }
196 
197 
198 ListenerInterface::~ListenerInterface() {
199  VLOG(1) << "Destroying ListenerInterface " << this;
200 }
201 
202 void ListenerInterface::RunSingleConnection(Connection* conn, SafeConnList* conns) {
203  VSOCK(2, *conn) << "Running connection";
204 
205  std::unique_ptr<Connection> guard(conn);
206  try {
207  conn->HandleRequests();
208  VSOCK(2, *conn) << "After HandleRequests";
209 
210  } catch (std::exception& e) {
211  LOG(ERROR) << "Uncaught exception " << e.what();
212  }
213  conns->Unlink(conn);
214 }
215 
216 void ListenerInterface::RegisterPool(ProactorPool* pool) {
217  // In tests we might relaunch AcceptServer with the same listener, so we allow
218  // reassigning the same pool.
219  CHECK(pool_ == nullptr || pool_ == pool);
220 
221  pool_ = pool;
222 }
223 
224 } // namespace uring
225 } // namespace util
virtual Connection * NewConnection(Proactor *context)=0
virtual void PreAcceptLoop(Proactor *owner)
Definition: accept_server.h:81
Proactor * GetNextProactor()
Get a Proactor to use. Thread-safe.