5 #include "util/gce/detail/gcs_utils.h" 7 #include <boost/fiber/operations.hpp> 9 #include "base/logging.h" 10 #include "base/walltime.h" 11 #include "util/http/https_client.h" 16 using namespace boost;
17 using http::HttpsClientPool;
18 using namespace ::std;
20 unique_ptr<VarzQps> gcs_writes;
21 unique_ptr<VarzMapAverage5m> gcs_latency;
23 std::ostream& operator<<(std::ostream& os,
const h2::response<h2::buffer_body>& msg) {
24 os << msg.reason() << endl;
25 for (
const auto& f : msg) {
26 os << f.name_string() <<
" : " << f.value() << endl;
28 os <<
"-------------------------";
33 h2::request<h2::dynamic_body> PrepareGenericRequest(h2::verb req_verb,
const bb_str_view url,
34 const bb_str_view token) {
35 h2::request<h2::dynamic_body> req(req_verb, url, 11);
36 req.set(h2::field::host, GCE::kApiDomain);
38 AddBearer(absl_sv(token), &req);
42 ApiSenderBase::ApiSenderBase(
const char* name,
const GCE& gce, http::HttpsClientPool* pool)
43 : name_(name), gce_(gce), pool_(pool) {
47 ApiSenderBase::~ApiSenderBase() {}
49 StatusObject<HttpsClientPool::ClientHandle> ApiSenderBase::SendGeneric(
unsigned num_iterations,
51 system::error_code ec;
52 HttpsClientPool::ClientHandle handle;
54 uint64_t start = base::GetMonotonicMicrosFast();
58 for (
unsigned iters = 0; iters < num_iterations; ++iters) {
60 handle = pool_->GetHandle();
61 ec = handle->status();
66 const Request::header_type& header = req;
68 VLOG(1) <<
"ReqIter " << iters <<
": socket " << handle->native_handle() <<
" " << header;
70 ec = SendRequestIterative(req, handle.get());
73 gcs_latency->IncBy(name_, base::GetMonotonicMicrosFast() - start);
77 if (ec == system::errc::operation_not_permitted) {
78 return Status(StatusCode::IO_ERROR,
"Disabled operation");
81 if (ec == asio::error::no_permission) {
82 auto token_res = gce_.RefreshAccessToken(&pool_->io_context());
84 return token_res.status;
86 AddBearer(token_res.obj, &req);
87 }
else if (ec == asio::error::try_again) {
89 LOG(INFO) <<
"RespIter " << iters <<
": socket " << handle->native_handle() <<
" retrying";
91 LOG(INFO) <<
"RespIter " << iters <<
": socket " << handle->native_handle()
92 <<
" failed with error " << ec <<
"/" << ec.message() <<
" " 93 << ERR_GET_REASON(ec.value());
95 if (ec.category() == asio::error::get_ssl_category()) {
102 auto status = handle->status();
104 VLOG(1) <<
"Resetting connection due to " << status;
110 return Status(StatusCode::IO_ERROR,
"Maximum iterations reached");
113 auto ApiSenderBufferBody::SendRequestIterative(
const Request& req, http::HttpsClient* client)
115 system::error_code ec = client->Send(req);
117 LOG(INFO) <<
"Error sending request " << ec;
121 parser_.emplace().body_limit(kuint64max);
122 ec = client->ReadHeader(&parser_.value());
125 LOG(INFO) <<
"Error reading response " << ec;
129 if (!parser_->keep_alive()) {
130 client->schedule_reconnect();
131 LOG(INFO) <<
"Scheduling reconnect due to conn-close header";
134 const auto& msg = parser_->get();
135 VLOG(1) <<
"HeaderResp(" << client->native_handle() <<
"): " << msg;
138 if (msg.result() == h2::status::ok || msg.result() == h2::status::partial_content) {
142 string err_str(2048U,
'\0');
143 auto& body = parser_->get().body();
144 body.data = &err_str.front();
145 body.size = err_str.size() - 1;
146 ec = client->Read(&parser_.value());
153 ec = client->DrainResponse(&parser_.value());
158 if (DoesServerPushback(msg.result())) {
159 LOG(INFO) <<
"Retrying(" << client->native_handle() <<
") with " << msg;
161 this_fiber::sleep_for(1s);
162 return asio::error::try_again;
165 if (IsUnauthorized(msg)) {
166 return asio::error::no_permission;
169 if (msg.result() == h2::status::forbidden) {
170 LOG(ERROR) <<
"Error accessing GCS: " << err_str.c_str();
172 return system::errc::make_error_code(system::errc::operation_not_permitted);
175 LOG(ERROR) <<
"Unexpected status " << msg << msg.result_int() <<
"\n" << err_str.c_str() <<
"\n";
177 return h2::error::bad_status;
180 static once_flag gcs_write_set_flag;
182 void InitVarzStats() {
183 std::call_once(gcs_write_set_flag, [] {
184 gcs_writes.reset(
new VarzQps(
"gcs-writes"));
185 gcs_latency.reset(
new VarzMapAverage5m(
"gcs-latency"));