5 #include "util/uring/accept_server.h" 7 #include <boost/fiber/operations.hpp> 9 #include "base/logging.h" 10 #include "util/uring/fiber_socket.h" 11 #include "util/uring/proactor_pool.h" 12 #include "util/uring/uring_fiber_algo.h" 14 #define VSOCK(verbosity, sock) VLOG(verbosity) << "sock[" << (sock).native_handle() << "] " 15 #define DVSOCK(verbosity, sock) DVLOG(verbosity) << "sock[" << (sock).native_handle() << "] " 20 using namespace boost;
24 intrusive::slist<Connection, Connection::member_hook_t, intrusive::constant_time_size<true>,
25 intrusive::cache_last<false>>;
30 fibers::condition_variable cond;
33 std::lock_guard<fibers::mutex> lk(mu);
35 VLOG(2) <<
"List size " << list.size();
39 std::lock_guard<fibers::mutex> lk(mu);
40 auto it = list.iterator_to(*c);
42 DVLOG(2) <<
"List size " << list.size();
50 std::unique_lock<fibers::mutex> lk(mu);
51 DVLOG(1) <<
"AwaitEmpty: List size: " << list.size();
53 cond.wait(lk, [
this] {
return list.empty(); });
57 AcceptServer::AcceptServer(
ProactorPool* pool,
bool break_on_int)
58 : pool_(pool), ref_bc_(0), break_(break_on_int) {
60 Proactor* proactor = pool_->GetNextProactor();
61 proactor->RegisterSignal({SIGINT, SIGTERM}, [
this](
int signal) {
62 LOG(INFO) <<
"Exiting on signal " << signal;
68 AcceptServer::~AcceptServer() {
69 list_interface_.clear();
72 void AcceptServer::Run() {
73 if (!list_interface_.empty()) {
74 ref_bc_.Add(list_interface_.size());
76 for (
auto& lw : list_interface_) {
77 auto* proactor = lw->listener_.proactor();
78 proactor->AsyncFiber([li = lw.get(),
this] {
89 void AcceptServer::Stop(
bool wait) {
90 VLOG(1) <<
"AcceptServer::Stop";
97 void AcceptServer::Wait() {
98 VLOG(1) <<
"AcceptServer::Wait";
101 VLOG(1) <<
"AcceptServer::Wait completed";
103 CHECK(list_interface_.empty()) <<
"Must Call AcceptServer::Run() after adding listeners";
108 unsigned short AcceptServer::AddListener(
unsigned short port, ListenerInterface* lii) {
109 CHECK(lii && !lii->listener_.IsOpen());
115 uint32_t sock_opt_mask = lii->GetSockOptMask();
116 auto ec = fs.Listen(port, backlog_, sock_opt_mask);
117 CHECK(!ec) <<
"Could not open port " << port <<
" " << ec <<
"/" << ec.message();
119 auto ep = fs.LocalEndpoint();
120 lii->RegisterPool(pool_);
123 fs.set_proactor(next);
124 lii->listener_ = std::move(fs);
126 list_interface_.emplace_back(lii);
131 void AcceptServer::BreakListeners() {
132 for (
auto& lw : list_interface_) {
133 auto* proactor = lw->listener_.proactor();
134 proactor->AsyncBrief([sock = &lw->listener_] { sock->Shutdown(SHUT_RDWR); });
136 VLOG(1) <<
"AcceptServer::BreakListeners finished";
140 void ListenerInterface::RunAcceptLoop() {
141 auto& fiber_props = this_fiber::properties<UringFiberProps>();
142 fiber_props.set_name(
"AcceptLoop");
144 auto ep = listener_.LocalEndpoint();
145 VSOCK(0, listener_) <<
"AcceptServer - listening on port " << ep.port();
146 SafeConnList safe_list;
152 std::error_code ec = listener_.Accept(&peer);
153 if (ec == errc::connection_aborted)
157 LOG(ERROR) <<
"Error calling accept " << ec <<
"/" << ec.message();
160 VLOG(2) <<
"Accepted " << peer.native_handle() <<
": " << peer.LocalEndpoint();
163 peer.set_proactor(next);
165 conn->SetSocket(std::move(peer));
166 safe_list.Link(conn);
169 auto cb = [conn, next, &safe_list]()
mutable {
170 next->AsyncFiber(&RunSingleConnection, conn, &safe_list);
174 next->AsyncFiber(std::move(cb));
181 for (
auto& val : safe_list.list) {
182 val.socket_.Shutdown(SHUT_RDWR);
183 DVSOCK(1, val.socket_) <<
"Shutdown";
187 safe_list.mu.unlock();
189 VLOG(1) <<
"Waiting for " << cnt <<
" connections to close";
190 safe_list.AwaitEmpty();
194 LOG(INFO) <<
"Listener stopped for port " << ep.port();
198 ListenerInterface::~ListenerInterface() {
199 VLOG(1) <<
"Destroying ListenerInterface " <<
this;
202 void ListenerInterface::RunSingleConnection(Connection* conn, SafeConnList* conns) {
203 VSOCK(2, *conn) <<
"Running connection";
205 std::unique_ptr<Connection> guard(conn);
207 conn->HandleRequests();
208 VSOCK(2, *conn) <<
"After HandleRequests";
210 }
catch (std::exception& e) {
211 LOG(ERROR) <<
"Uncaught exception " << e.what();
216 void ListenerInterface::RegisterPool(ProactorPool* pool) {
219 CHECK(pool_ ==
nullptr || pool_ == pool);
virtual Connection * NewConnection(Proactor *context)=0
virtual void PreAcceptLoop(Proactor *owner)
Proactor * GetNextProactor()
Get a Proactor to use. Thread-safe.