connection_handler.h
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #pragma once
5 
6 #include <absl/types/optional.h>
7 
8 #include <boost/asio/ip/tcp.hpp>
9 #include <boost/intrusive/list.hpp>
10 #include <boost/intrusive/slist_hook.hpp>
11 #include <boost/smart_ptr/intrusive_ptr.hpp>
12 
13 #include "util/asio/fiber_socket.h"
14 
15 namespace util {
16 
17 class IoContextPool;
18 class IoContext;
19 class AcceptServer;
20 
21 namespace detail {
22 using namespace ::boost::intrusive;
23 
24 // auto_unlink allows unlinking from the container during the destruction of
25 // of hook without holding the reference to the container itself.
26 // Requires that the container won't have O(1) size function.
27 typedef slist_member_hook<link_mode<auto_unlink>> connection_hook;
28 class Flusher;
29 
30 } // namespace detail
31 
32 // An instance of this class handles a single connection in fiber.
34  friend class AcceptServer;
35  friend class detail::Flusher;
36 
37  using connection_hook_t = detail::connection_hook;
38 
39  connection_hook_t hook_;
40  connection_hook_t flush_hook_;
41 
42  public:
43  using ptr_t = ::boost::intrusive_ptr<ConnectionHandler>;
44  using io_context = ::boost::asio::io_context;
45 
46  using member_hook_t =
47  detail::member_hook<ConnectionHandler, detail::connection_hook, &ConnectionHandler::hook_>;
48 
49  using flush_hook_t = detail::member_hook<ConnectionHandler, detail::connection_hook,
50  &ConnectionHandler::flush_hook_>;
51 
52  explicit ConnectionHandler(IoContext* context) noexcept;
53 
54  virtual ~ConnectionHandler();
55 
56  void Init(::boost::asio::ip::tcp::socket&& sock);
57 
58  void Close();
59 
60  IoContext& context() {
61  return io_context_;
62  }
63 
64  friend void intrusive_ptr_add_ref(ConnectionHandler* ctx) noexcept {
65  ctx->use_count_.fetch_add(1, std::memory_order_relaxed);
66  }
67 
68  friend void intrusive_ptr_release(ConnectionHandler* ctx) noexcept {
69  if (1 == ctx->use_count_.fetch_sub(1, std::memory_order_release)) {
70  // We want to synchronize on all changes to ctx performed in other threads.
71  // ctx is not atomic but we know that whatever was being written - has been written
72  // in other threads and no references to ctx exist anymore.
73  // Therefore acquiring fence is enough to synchronize.
74  // "acquire" requires a release opearation to mark the end of the memory changes we wish
75  // to acquire, and "fetch_sub(std::memory_order_release)" provides this marker.
76  // To summarize: fetch_sub(release) and fence(acquire) needed to order and synchronize
77  // on changes on ctx in most performant way.
78  // See: https://stackoverflow.com/q/27751025/
79  std::atomic_thread_fence(std::memory_order_acquire);
80  delete ctx;
81  }
82  }
83 
84  protected:
86  // false overthise.
87  virtual bool FlushWrites() { return false; }
88 
90  virtual void OnOpenSocket() {
91  }
92 
101  virtual void OnCloseSocket() {
102  }
103 
104  // Should not block the thread. Can fiber-block (fiber friendly).
105  virtual boost::system::error_code HandleRequest() = 0;
106 
107  absl::optional<FiberSyncSocket> socket_;
108 
109  IoContext& io_context_;
110 
113  bool use_flusher_fiber_ = false;
114 
115  private:
116  void RunInIOThread();
117 
118  std::atomic<std::uint32_t> use_count_{0};
119 };
120 
126  public:
127  virtual ~ListenerInterface() {
128  }
129 
130  void RegisterPool(IoContextPool* pool);
131 
132  // Creates a dedicated handler for a new connection.
133  virtual ConnectionHandler* NewConnection(IoContext& context) = 0;
134 
135  // Called by AcceptServer when shutting down start and before all connections are closed.
136  virtual void PreShutdown() {
137  }
138 
139  // Called by AcceptServer when shutting down finalized and after all connections are closed.
140  virtual void PostShutdown() {
141  }
142 
143  protected:
144  IoContextPool* pool() {
145  return pool_;
146  }
147 
148  private:
149  IoContextPool* pool_ = nullptr;
150 };
151 
152 } // namespace util
virtual void OnCloseSocket()
Called before ConnectionHandler destroyed but after the socket was signalled to stop and shutdown.
Abstracts away connections implementation and their life-cycle.
A pool of IoContext objects, representing and managing CPU resources of the system.
virtual void OnOpenSocket()
Called once after connection was initialized. Will run in io context thread of this handler.
virtual bool FlushWrites()
Called to flush pending writes to the socket. Returns true if flush took place,.