5 #include "util/uring/fiber_socket.h" 7 #include <netinet/in.h> 10 #include <boost/fiber/context.hpp> 12 #include "base/logging.h" 13 #include "base/stl_util.h" 14 #include "util/uring/proactor.h" 16 #define VSOCK(verbosity) VLOG(verbosity) << "sock[" << native_handle() << "] " 17 #define DVSOCK(verbosity) DVLOG(verbosity) << "sock[" << native_handle() << "] " 23 using namespace boost;
24 using IoResult = Proactor::IoResult;
34 FiberCall(Proactor* proactor) : me_(fibers::context::active()), io_res_(0) {
35 auto waker = [
this](IoResult res, int32_t, Proactor* mgr) {
37 fibers::context::active()->schedule(me_);
39 se_ = proactor->GetSubmitEntry(std::move(waker), 0);
43 CHECK(!me_) <<
"Get was not called!";
46 SubmitEntry* operator->() {
58 inline ssize_t posix_err_wrap(ssize_t res, FiberSocket::error_code* ec) {
60 *ec = FiberSocket::error_code(errno, std::system_category());
62 LOG(WARNING) <<
"Bad posix error " << res;
69 FiberSocket::~FiberSocket() {
70 error_code ec = Close();
72 LOG_IF(WARNING, ec) <<
"Error closing socket " << ec <<
"/" << ec.message();
75 FiberSocket& FiberSocket::operator=(FiberSocket&& other) noexcept {
77 error_code ec = Close();
78 LOG_IF(WARNING, ec) <<
"Error closing socket " << ec <<
"/" << ec.message();
89 auto FiberSocket::Shutdown(
int how) -> error_code {
95 if (fd_ & IS_SHUTDOWN)
98 posix_err_wrap(::shutdown(fd_ & FD_MASK, how), &ec);
104 auto FiberSocket::Close() -> error_code {
107 DVSOCK(1) <<
"Closing socket";
109 posix_err_wrap(::close(fd_ & FD_MASK), &ec);
115 auto FiberSocket::Listen(
unsigned port,
unsigned backlog, uint32_t sock_opts_mask) -> error_code {
116 CHECK_EQ(fd_, -1) <<
"Close socket before!";
119 fd_ = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
120 if (posix_err_wrap(fd_, &ec) < 0)
124 for (
int opt = 0; sock_opts_mask; ++opt) {
125 if (sock_opts_mask & 1) {
126 if (setsockopt(fd_, SOL_SOCKET, opt, &val,
sizeof(val)) < 0) {
127 LOG(WARNING) <<
"setsockopt: could not set opt " << opt <<
", " << strerror(errno);
130 sock_opts_mask >>= 1;
133 sockaddr_in server_addr;
134 memset(&server_addr, 0,
sizeof(server_addr));
135 server_addr.sin_family = AF_INET;
136 server_addr.sin_port = htons(port);
137 server_addr.sin_addr.s_addr = INADDR_ANY;
139 if (posix_err_wrap(bind(fd_, (
struct sockaddr*)&server_addr,
sizeof(server_addr)), &ec) < 0)
142 posix_err_wrap(listen(fd_, backlog), &ec);
146 auto FiberSocket::Accept(FiberSocket* peer) -> error_code {
149 sockaddr_in client_addr;
150 socklen_t addr_len =
sizeof(client_addr);
153 int fd = fd_ & FD_MASK;
156 int res = accept4(fd, (
struct sockaddr*)&client_addr, &addr_len, SOCK_NONBLOCK | SOCK_CLOEXEC);
158 *peer = FiberSocket{res};
164 if (errno == EAGAIN) {
166 fc->PrepPollAdd(fd, POLLIN);
167 IoResult io_res = fc.Get();
169 if (io_res == POLLERR) {
170 return system::errc::make_error_code(system::errc::connection_aborted);
175 posix_err_wrap(res, &ec);
180 auto FiberSocket::Connect(
const endpoint_type& ep) -> error_code {
182 CHECK(p_ && p_->InMyThread());
186 fd_ = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
187 if (posix_err_wrap(fd_, &ec) < 0)
191 fc->PrepConnect(fd_, ep.data(), ep.size());
193 IoResult io_res = fc.Get();
195 if (close(fd_) < 0) {
196 LOG(WARNING) <<
"Could not close fd " << strerror(errno);
199 ec = error_code(-io_res, system::system_category());
204 auto FiberSocket::LocalEndpoint() const -> endpoint_type {
205 endpoint_type endpoint;
209 socklen_t addr_len = endpoint.capacity();
211 posix_err_wrap(::getsockname(fd_ & FD_MASK, endpoint.data(), &addr_len), &ec);
212 CHECK(!ec) << ec <<
"/" << ec.message() <<
" while running getsockname";
214 endpoint.resize(addr_len);
219 auto FiberSocket::RemoteEndpoint() const -> endpoint_type {
220 endpoint_type endpoint;
223 socklen_t addr_len = endpoint.capacity();
225 if (getpeername(fd_ & FD_MASK, endpoint.data(), &addr_len) == 0)
226 endpoint.resize(addr_len);
231 auto FiberSocket::Send(
const iovec* ptr,
size_t len) -> expected_size_t {
236 if (fd_ & IS_SHUTDOWN) {
237 return nonstd::make_unexpected(std::make_error_code(std::errc::connection_aborted));
241 memset(&msg, 0,
sizeof(msg));
242 msg.msg_iov = const_cast<iovec*>(ptr);
243 msg.msg_iovlen = len;
246 int fd = fd_ & FD_MASK;
250 fc->PrepSendMsg(fd, &msg, 0);
255 DVSOCK(1) <<
"Got " << res;
257 if (res == EAGAIN || res == EBUSY)
260 if (base::_in(res, {ECONNABORTED, EPIPE, ECONNRESET})) {
266 LOG(FATAL) <<
"Unexpected error " << res <<
"/" << strerror(res);
268 std::error_code ec(res, std::generic_category());
269 VSOCK(1) <<
"Error " << ec <<
" on " << RemoteEndpoint();
271 return nonstd::make_unexpected(std::move(ec));
274 auto FiberSocket::Recv(iovec* ptr,
size_t len) -> expected_size_t {
279 if (fd_ & IS_SHUTDOWN) {
280 return nonstd::make_unexpected(std::make_error_code(std::errc::connection_aborted));
284 memset(&msg, 0,
sizeof(msg));
285 msg.msg_iov = const_cast<iovec*>(ptr);
286 msg.msg_iovlen = len;
287 int fd = fd_ & FD_MASK;
295 if (!p_->HasFastPoll()) {
296 SubmitEntry se = p_->GetSubmitEntry(
nullptr, 0);
297 se.PrepPollAdd(fd, POLLIN);
298 se.sqe()->flags = IOSQE_IO_LINK;
304 fc->PrepRecvMsg(fd, &msg, 0);
310 DVSOCK(1) <<
"Got " << res;
313 if (res == EAGAIN || res == EBUSY)
319 if (base::_in(res, {ECONNABORTED, EPIPE, ECONNRESET})) {
323 LOG(FATAL) <<
"Unexpected error " << res <<
"/" << strerror(res);
325 std::error_code ec(res, std::generic_category());
326 VSOCK(1) <<
"Error " << ec <<
" on " << RemoteEndpoint();
330 return nonstd::make_unexpected(std::move(ec));