gcs_utils.cc
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #include "util/gce/detail/gcs_utils.h"
6 
7 #include <boost/fiber/operations.hpp>
8 
9 #include "base/logging.h"
10 #include "base/walltime.h"
11 #include "util/http/https_client.h"
12 
13 namespace util {
14 namespace detail {
15 
16 using namespace boost;
17 using http::HttpsClientPool;
18 using namespace ::std;
19 
20 unique_ptr<VarzQps> gcs_writes;
21 unique_ptr<VarzMapAverage5m> gcs_latency;
22 
23 std::ostream& operator<<(std::ostream& os, const h2::response<h2::buffer_body>& msg) {
24  os << msg.reason() << endl;
25  for (const auto& f : msg) {
26  os << f.name_string() << " : " << f.value() << endl;
27  }
28  os << "-------------------------";
29 
30  return os;
31 }
32 
33 h2::request<h2::dynamic_body> PrepareGenericRequest(h2::verb req_verb, const bb_str_view url,
34  const bb_str_view token) {
35  h2::request<h2::dynamic_body> req(req_verb, url, 11);
36  req.set(h2::field::host, GCE::kApiDomain);
37 
38  AddBearer(absl_sv(token), &req);
39  return req;
40 }
41 
42 ApiSenderBase::ApiSenderBase(const char* name, const GCE& gce, http::HttpsClientPool* pool)
43  : name_(name), gce_(gce), pool_(pool) {
44  InitVarzStats();
45 }
46 
47 ApiSenderBase::~ApiSenderBase() {}
48 
49 StatusObject<HttpsClientPool::ClientHandle> ApiSenderBase::SendGeneric(unsigned num_iterations,
50  Request req) {
51  system::error_code ec;
52  HttpsClientPool::ClientHandle handle;
53 
54  uint64_t start = base::GetMonotonicMicrosFast();
55 
56  // for now we may increase num_iterations indefinitely in some cases.
57  // TODO: to refine this logic.
58  for (unsigned iters = 0; iters < num_iterations; ++iters) {
59  if (!handle) {
60  handle = pool_->GetHandle();
61  ec = handle->status();
62  if (ec) {
63  return ToStatus(ec);
64  }
65  }
66  const Request::header_type& header = req;
67 
68  VLOG(1) << "ReqIter " << iters << ": socket " << handle->native_handle() << " " << header;
69 
70  ec = SendRequestIterative(req, handle.get());
71 
72  if (!ec) { // Success and fast path
73  gcs_latency->IncBy(name_, base::GetMonotonicMicrosFast() - start);
74  return handle;
75  }
76 
77  if (ec == system::errc::operation_not_permitted) {
78  return Status(StatusCode::IO_ERROR, "Disabled operation");
79  }
80 
81  if (ec == asio::error::no_permission) {
82  auto token_res = gce_.RefreshAccessToken(&pool_->io_context());
83  if (!token_res.ok())
84  return token_res.status;
85 
86  AddBearer(token_res.obj, &req);
87  } else if (ec == asio::error::try_again) {
88  ++num_iterations;
89  LOG(INFO) << "RespIter " << iters << ": socket " << handle->native_handle() << " retrying";
90  } else {
91  LOG(INFO) << "RespIter " << iters << ": socket " << handle->native_handle()
92  << " failed with error " << ec << "/" << ec.message() << " "
93  << ERR_GET_REASON(ec.value());
94  handle.reset();
95  if (ec.category() == asio::error::get_ssl_category()) {
96  // if (ERR_GET_REASON(ec.value()) == SSL_R_WRONG_VERSION_NUMBER) {
97  ++num_iterations;
98  //}
99  }
100  }
101  if (handle) {
102  auto status = handle->status();
103  if (status) { // Should cover reconnect header request.
104  VLOG(1) << "Resetting connection due to " << status;
105  handle.reset();
106  }
107  }
108  }
109 
110  return Status(StatusCode::IO_ERROR, "Maximum iterations reached");
111 }
112 
113 auto ApiSenderBufferBody::SendRequestIterative(const Request& req, http::HttpsClient* client)
114  -> error_code {
115  system::error_code ec = client->Send(req);
116  if (ec) {
117  LOG(INFO) << "Error sending request " << ec;
118  return ec;
119  }
120 
121  parser_.emplace().body_limit(kuint64max);
122  ec = client->ReadHeader(&parser_.value());
123 
124  if (ec) {
125  LOG(INFO) << "Error reading response " << ec;
126  return ec;
127  }
128 
129  if (!parser_->keep_alive()) {
130  client->schedule_reconnect();
131  LOG(INFO) << "Scheduling reconnect due to conn-close header";
132  }
133 
134  const auto& msg = parser_->get();
135  VLOG(1) << "HeaderResp(" << client->native_handle() << "): " << msg;
136 
137  // Partial content can appear because of the previous reconnect.
138  if (msg.result() == h2::status::ok || msg.result() == h2::status::partial_content) {
139  return error_code{}; // all is good.
140  }
141 
142  string err_str(2048U, '\0');
143  auto& body = parser_->get().body();
144  body.data = &err_str.front();
145  body.size = err_str.size() - 1;
146  ec = client->Read(&parser_.value());
147  if (ec) {
148  return ec;
149  }
150 
151  // Parse & drain whatever comes after problematic status.
152  // We must do it as long as we plan to use this connection for more requests.
153  ec = client->DrainResponse(&parser_.value());
154  if (ec) {
155  return ec;
156  }
157 
158  if (DoesServerPushback(msg.result())) {
159  LOG(INFO) << "Retrying(" << client->native_handle() << ") with " << msg;
160 
161  this_fiber::sleep_for(1s);
162  return asio::error::try_again; // retry
163  }
164 
165  if (IsUnauthorized(msg)) {
166  return asio::error::no_permission;
167  }
168 
169  if (msg.result() == h2::status::forbidden) {
170  LOG(ERROR) << "Error accessing GCS: " << err_str.c_str();
171 
172  return system::errc::make_error_code(system::errc::operation_not_permitted);
173  }
174 
175  LOG(ERROR) << "Unexpected status " << msg << msg.result_int() << "\n" << err_str.c_str() << "\n";
176 
177  return h2::error::bad_status;
178 }
179 
180 static once_flag gcs_write_set_flag;
181 
182 void InitVarzStats() {
183  std::call_once(gcs_write_set_flag, [] {
184  gcs_writes.reset(new VarzQps("gcs-writes"));
185  gcs_latency.reset(new VarzMapAverage5m("gcs-latency"));
186  });
187 }
188 
189 } // namespace detail
190 } // namespace util