fiber_socket.cc
1 // Copyright 2020, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #include "util/uring/fiber_socket.h"
6 
7 #include <netinet/in.h>
8 #include <sys/poll.h>
9 
10 #include <boost/fiber/context.hpp>
11 
12 #include "base/logging.h"
13 #include "base/stl_util.h"
14 #include "util/uring/proactor.h"
15 
16 #define VSOCK(verbosity) VLOG(verbosity) << "sock[" << native_handle() << "] "
17 #define DVSOCK(verbosity) DVLOG(verbosity) << "sock[" << native_handle() << "] "
18 
19 namespace util {
20 namespace uring {
21 
22 using namespace std;
23 using namespace boost;
24 using IoResult = Proactor::IoResult;
25 
26 namespace {
27 
28 class FiberCall {
29  SubmitEntry se_;
30  fibers::context* me_;
31  IoResult io_res_;
32 
33  public:
34  FiberCall(Proactor* proactor) : me_(fibers::context::active()), io_res_(0) {
35  auto waker = [this](IoResult res, int32_t, Proactor* mgr) {
36  io_res_ = res;
37  fibers::context::active()->schedule(me_);
38  };
39  se_ = proactor->GetSubmitEntry(std::move(waker), 0);
40  }
41 
42  ~FiberCall() {
43  CHECK(!me_) << "Get was not called!";
44  }
45 
46  SubmitEntry* operator->() {
47  return &se_;
48  }
49 
50  IoResult Get() {
51  me_->suspend();
52  me_ = nullptr;
53 
54  return io_res_;
55  }
56 };
57 
58 inline ssize_t posix_err_wrap(ssize_t res, FiberSocket::error_code* ec) {
59  if (res == -1) {
60  *ec = FiberSocket::error_code(errno, std::system_category());
61  } else if (res < 0) {
62  LOG(WARNING) << "Bad posix error " << res;
63  }
64  return res;
65 }
66 
67 } // namespace
68 
69 FiberSocket::~FiberSocket() {
70  error_code ec = Close(); // Quietly close.
71 
72  LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message();
73 }
74 
75 FiberSocket& FiberSocket::operator=(FiberSocket&& other) noexcept {
76  if (fd_ >= 0) {
77  error_code ec = Close();
78  LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message();
79  }
80  DCHECK_EQ(-1, fd_);
81 
82  swap(fd_, other.fd_);
83  p_ = other.p_;
84  other.p_ = nullptr;
85 
86  return *this;
87 }
88 
89 auto FiberSocket::Shutdown(int how) -> error_code {
90  CHECK_GE(fd_, 0);
91 
92  // If we shutdown and then try to Send/Recv - the call will stall since no data
93  // is sent/received. Therefore we remember the state to allow consistent API experience.
94  error_code ec;
95  if (fd_ & IS_SHUTDOWN)
96  return ec;
97 
98  posix_err_wrap(::shutdown(fd_ & FD_MASK, how), &ec);
99  fd_ |= IS_SHUTDOWN; // Enter shutdown state unrelated to the success of the call.
100 
101  return ec;
102 }
103 
104 auto FiberSocket::Close() -> error_code {
105  error_code ec;
106  if (fd_ > 0) {
107  DVSOCK(1) << "Closing socket";
108 
109  posix_err_wrap(::close(fd_ & FD_MASK), &ec);
110  fd_ = -1;
111  }
112  return ec;
113 }
114 
115 auto FiberSocket::Listen(unsigned port, unsigned backlog, uint32_t sock_opts_mask) -> error_code {
116  CHECK_EQ(fd_, -1) << "Close socket before!";
117 
118  error_code ec;
119  fd_ = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
120  if (posix_err_wrap(fd_, &ec) < 0)
121  return ec;
122 
123  const int val = 1;
124  for (int opt = 0; sock_opts_mask; ++opt) {
125  if (sock_opts_mask & 1) {
126  if (setsockopt(fd_, SOL_SOCKET, opt, &val, sizeof(val)) < 0) {
127  LOG(WARNING) << "setsockopt: could not set opt " << opt << ", " << strerror(errno);
128  }
129  }
130  sock_opts_mask >>= 1;
131  }
132 
133  sockaddr_in server_addr;
134  memset(&server_addr, 0, sizeof(server_addr));
135  server_addr.sin_family = AF_INET;
136  server_addr.sin_port = htons(port);
137  server_addr.sin_addr.s_addr = INADDR_ANY;
138 
139  if (posix_err_wrap(bind(fd_, (struct sockaddr*)&server_addr, sizeof(server_addr)), &ec) < 0)
140  return ec;
141 
142  posix_err_wrap(listen(fd_, backlog), &ec);
143  return ec;
144 }
145 
146 auto FiberSocket::Accept(FiberSocket* peer) -> error_code {
147  CHECK(p_);
148 
149  sockaddr_in client_addr;
150  socklen_t addr_len = sizeof(client_addr);
151 
152  error_code ec;
153  int fd = fd_ & FD_MASK;
154 
155  while (true) {
156  int res = accept4(fd, (struct sockaddr*)&client_addr, &addr_len, SOCK_NONBLOCK | SOCK_CLOEXEC);
157  if (res >= 0) {
158  *peer = FiberSocket{res};
159  return ec;
160  }
161 
162  DCHECK_EQ(-1, res);
163 
164  if (errno == EAGAIN) {
165  FiberCall fc(p_);
166  fc->PrepPollAdd(fd, POLLIN);
167  IoResult io_res = fc.Get();
168 
169  if (io_res == POLLERR) {
170  return system::errc::make_error_code(system::errc::connection_aborted);
171  }
172  continue;
173  }
174 
175  posix_err_wrap(res, &ec);
176  return ec;
177  }
178 }
179 
180 auto FiberSocket::Connect(const endpoint_type& ep) -> error_code {
181  CHECK_EQ(fd_, -1);
182  CHECK(p_ && p_->InMyThread());
183 
184  error_code ec;
185 
186  fd_ = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
187  if (posix_err_wrap(fd_, &ec) < 0)
188  return ec;
189 
190  FiberCall fc(p_);
191  fc->PrepConnect(fd_, ep.data(), ep.size());
192 
193  IoResult io_res = fc.Get();
194  if (io_res < 0) { // In that case connect returns -errno.
195  if (close(fd_) < 0) {
196  LOG(WARNING) << "Could not close fd " << strerror(errno);
197  }
198  fd_ = -1;
199  ec = error_code(-io_res, system::system_category());
200  }
201  return ec;
202 }
203 
204 auto FiberSocket::LocalEndpoint() const -> endpoint_type {
205  endpoint_type endpoint;
206 
207  if (fd_ < 0)
208  return endpoint;
209  socklen_t addr_len = endpoint.capacity();
210  error_code ec;
211  posix_err_wrap(::getsockname(fd_ & FD_MASK, endpoint.data(), &addr_len), &ec);
212  CHECK(!ec) << ec << "/" << ec.message() << " while running getsockname";
213 
214  endpoint.resize(addr_len);
215 
216  return endpoint;
217 }
218 
219 auto FiberSocket::RemoteEndpoint() const -> endpoint_type {
220  endpoint_type endpoint;
221  CHECK_GT(fd_, 0);
222 
223  socklen_t addr_len = endpoint.capacity();
224  error_code ec;
225  if (getpeername(fd_ & FD_MASK, endpoint.data(), &addr_len) == 0)
226  endpoint.resize(addr_len);
227 
228  return endpoint;
229 }
230 
231 auto FiberSocket::Send(const iovec* ptr, size_t len) -> expected_size_t {
232  CHECK(p_);
233  CHECK_GT(len, 0);
234  CHECK_GE(fd_, 0);
235 
236  if (fd_ & IS_SHUTDOWN) {
237  return nonstd::make_unexpected(std::make_error_code(std::errc::connection_aborted));
238  }
239 
240  msghdr msg;
241  memset(&msg, 0, sizeof(msg));
242  msg.msg_iov = const_cast<iovec*>(ptr);
243  msg.msg_iovlen = len;
244 
245  ssize_t res;
246  int fd = fd_ & FD_MASK;
247 
248  while (true) {
249  FiberCall fc(p_);
250  fc->PrepSendMsg(fd, &msg, 0);
251  res = fc.Get(); // Interrupt point
252  if (res >= 0) {
253  return res; // Fastpath
254  }
255  DVSOCK(1) << "Got " << res;
256  res = -res;
257  if (res == EAGAIN || res == EBUSY)
258  continue;
259 
260  if (base::_in(res, {ECONNABORTED, EPIPE, ECONNRESET})) {
261  if (res == EPIPE) // We do not care about EPIPE that can happen when we shutdown our socket.
262  res = ECONNABORTED;
263  break;
264  }
265 
266  LOG(FATAL) << "Unexpected error " << res << "/" << strerror(res);
267  }
268  std::error_code ec(res, std::generic_category());
269  VSOCK(1) << "Error " << ec << " on " << RemoteEndpoint();
270 
271  return nonstd::make_unexpected(std::move(ec));
272 }
273 
274 auto FiberSocket::Recv(iovec* ptr, size_t len) -> expected_size_t {
275  CHECK_GT(len, 0);
276  CHECK(p_);
277  CHECK_GE(fd_, 0);
278 
279  if (fd_ & IS_SHUTDOWN) {
280  return nonstd::make_unexpected(std::make_error_code(std::errc::connection_aborted));
281  }
282 
283  msghdr msg;
284  memset(&msg, 0, sizeof(msg));
285  msg.msg_iov = const_cast<iovec*>(ptr);
286  msg.msg_iovlen = len;
287  int fd = fd_ & FD_MASK;
288 
289  // There is a possible data-race bug since GetSubmitEntry can preempt inside
290  // FiberCall, thus introducing a chain with random SQE not from here.
291  //
292  // The bug is not really interesting in this context here since we handle the use-case of old
293  // kernels without fast-poll, however it's problematic for transactions that require SQE chains.
294  // Added TODO to proactor.h
295  if (!p_->HasFastPoll()) {
296  SubmitEntry se = p_->GetSubmitEntry(nullptr, 0);
297  se.PrepPollAdd(fd, POLLIN);
298  se.sqe()->flags = IOSQE_IO_LINK;
299  }
300 
301  ssize_t res;
302  while (true) {
303  FiberCall fc(p_);
304  fc->PrepRecvMsg(fd, &msg, 0);
305  res = fc.Get();
306 
307  if (res > 0) {
308  return res;
309  }
310  DVSOCK(1) << "Got " << res;
311 
312  res = -res;
313  if (res == EAGAIN || res == EBUSY)
314  continue;
315 
316  if (res == 0)
317  res = ECONNABORTED;
318 
319  if (base::_in(res, {ECONNABORTED, EPIPE, ECONNRESET})) {
320  break;
321  }
322 
323  LOG(FATAL) << "Unexpected error " << res << "/" << strerror(res);
324  }
325  std::error_code ec(res, std::generic_category());
326  VSOCK(1) << "Error " << ec << " on " << RemoteEndpoint();
327  expected_size_t es;
328  es.operator bool();
329 
330  return nonstd::make_unexpected(std::move(ec));
331 }
332 
333 } // namespace uring
334 } // namespace util