8 #include "util/gce/gcs.h" 10 #include <boost/beast/http/dynamic_body.hpp> 11 #include <boost/fiber/operations.hpp> 13 #include "absl/strings/strip.h" 14 #include "base/logging.h" 15 #include "base/walltime.h" 16 #include "strings/escaping.h" 18 #include "util/asio/io_context.h" 19 #include "util/gce/detail/gcs_utils.h" 21 #include "util/http/http_common.h" 22 #include "util/http/https_client.h" 23 #include "util/http/https_client_pool.h" 28 DEFINE_bool(gcs_dry_write,
false,
29 "If set true do not really perform upload requests." 30 "Still creates gcs connections for upload.");
32 DEFINE_uint32(gcs_upload_buf_log_size, 20,
"Upload buffer size is 2^k of this parameter.");
34 using namespace boost;
35 using namespace ::std;
36 namespace h2 = detail::h2;
37 using base::GetMonotonicMicrosFast;
38 using file::WriteFile;
39 using http::HttpsClientPool;
44 string ContentRangeHeader(
size_t from,
size_t to, ssize_t total) {
49 absl::StrAppend(&tmp, from,
"-", to - 1,
"/");
51 absl::StrAppend(&tmp, total);
58 absl::StrAppend(&tmp,
"*/", total);
64 class ApiSenderDynamicBody :
public detail::ApiSenderBase {
66 using Parser = h2::response_parser<h2::dynamic_body>;
68 using ApiSenderBase::ApiSenderBase;
71 Parser* parser() {
return parser_.has_value() ? &parser_.value() :
nullptr; }
74 error_code SendRequestIterative(
const Request& req, http::HttpsClient* client)
final;
75 absl::optional<Parser> parser_;
78 class GcsWriteFile :
public WriteFile,
protected ApiSenderDynamicBody {
87 GcsWriteFile(absl::string_view name,
const GCE& gce,
string obj_url, HttpsClientPool* pool);
93 Status Write(const uint8* buffer, uint64 length) final;
96 size_t FillBuf(const uint8* buffer,
size_t length);
99 Request PrepareRequest(
size_t to, ssize_t total);
102 beast::multi_buffer body_mb_;
103 size_t uploaded_ = 0;
106 GcsWriteFile::GcsWriteFile(absl::string_view name, const GCE& gce,
string obj_url,
107 HttpsClientPool* pool)
108 : WriteFile(name), ApiSenderDynamicBody("write", gce, pool), obj_url_(std::move(obj_url)),
109 body_mb_(1 << FLAGS_gcs_upload_buf_log_size) {
110 CHECK(!obj_url_.empty());
111 CHECK_GE(FLAGS_gcs_upload_buf_log_size, 18);
114 bool GcsWriteFile::Close() {
115 CHECK(pool_->io_context().InContextThread());
117 size_t to = uploaded_ + body_mb_.size();
118 Request req = PrepareRequest(to, to);
119 Request::header_type header = req;
122 if (!FLAGS_gcs_dry_write) {
123 res = SendGeneric(3, std::move(req)).status;
127 VLOG(1) <<
"Closed file " << header;
129 LOG(ERROR) <<
"Error closing GCS file " << parser()->get() <<
" for request: \n" 130 << header <<
", status " << res;
137 bool GcsWriteFile::Open() {
138 LOG(FATAL) <<
"Should not be called";
143 Status GcsWriteFile::Write(
const uint8* buffer, uint64 length) {
144 CHECK(pool_->io_context().InContextThread());
147 size_t written = FillBuf(buffer, length);
148 if (body_mb_.size() < body_mb_.max_size())
152 RETURN_IF_ERROR(Upload());
158 size_t GcsWriteFile::FillBuf(
const uint8* buffer,
size_t length) {
159 size_t prepare_size = std::min(length, body_mb_.max_size() - body_mb_.size());
160 auto mbs = body_mb_.prepare(prepare_size);
162 for (
auto mb : mbs) {
163 memcpy(mb.data(), buffer + offs, mb.size());
166 CHECK_EQ(offs, prepare_size);
167 body_mb_.commit(prepare_size);
172 Status GcsWriteFile::Upload() {
173 size_t body_size = body_mb_.size();
174 CHECK_GT(body_size, 0);
175 CHECK_EQ(0, body_size % (1U << 18)) << body_size;
177 size_t to = uploaded_ + body_size;
179 Request req = PrepareRequest(to, -1);
182 if (!FLAGS_gcs_dry_write) {
183 uint64_t start = GetMonotonicMicrosFast();
184 res = SendGeneric(3, std::move(req)).status;
185 VLOG(1) <<
"Uploaded range " << uploaded_ <<
"/" << to <<
" for " << obj_url_;
187 Parser* upload_parser = CHECK_NOTNULL(parser());
188 const auto& resp_msg = upload_parser->get();
189 auto it = resp_msg.find(h2::field::range);
190 CHECK(it != resp_msg.end()) << resp_msg;
192 absl::string_view range = detail::absl_sv(it->value());
193 CHECK(absl::ConsumePrefix(&range,
"bytes="));
194 size_t pos = range.find(
'-');
195 CHECK_LT(pos, range.size());
196 size_t uploaded_pos = 0;
197 CHECK(absl::SimpleAtoi(range.substr(pos + 1), &uploaded_pos));
198 CHECK_EQ(uploaded_pos + 1, to);
200 detail::gcs_writes->Inc();
201 detail::gcs_latency->IncBy(
"write", GetMonotonicMicrosFast() - start);
212 auto GcsWriteFile::PrepareRequest(
size_t to, ssize_t total) -> Request {
213 Request req(h2::verb::put, obj_url_, 11);
214 req.body() = std::move(body_mb_);
215 req.set(h2::field::content_range, ContentRangeHeader(uploaded_, to, total));
216 req.set(h2::field::content_type, http::kBinMime);
217 req.prepare_payload();
219 DCHECK_EQ(0, body_mb_.size());
224 auto ApiSenderDynamicBody::SendRequestIterative(
const Request& req, http::HttpsClient* client)
226 system::error_code ec = client->Send(req);
228 VLOG(1) <<
"Error sending to socket " << client->native_handle() <<
" " << ec;
233 ec = client->Read(&parser_.value());
238 if (!parser_->keep_alive()) {
239 client->schedule_reconnect();
240 LOG(INFO) <<
"Scheduling reconnect due to conn-close header";
243 const auto& msg = parser_->get();
244 VLOG(1) <<
"HeaderResp(" << client->native_handle() <<
"): " << msg;
247 if (msg.result() == h2::status::ok || msg.result() == h2::status::permanent_redirect) {
251 if (detail::DoesServerPushback(msg.result())) {
252 LOG(INFO) <<
"Retrying(" << client->native_handle() <<
") with " << msg;
254 this_fiber::sleep_for(1s);
255 return asio::error::try_again;
258 if (detail::IsUnauthorized(msg)) {
259 return asio::error::no_permission;
260 }
else if (msg.result() == h2::status::gone) {
261 const Request::header_type& header = req;
263 LOG(INFO) <<
"Closing(" << client->native_handle() <<
") with " << msg <<
" for request " 266 this_fiber::sleep_for(1s);
268 return system::errc::make_error_code(system::errc::connection_refused);
271 LOG(ERROR) <<
"Unexpected status " << msg;
273 return h2::error::bad_status;
278 StatusObject<file::WriteFile*> OpenGcsWriteFile(absl::string_view full_path,
const GCE& gce,
279 http::HttpsClientPool* pool) {
280 absl::string_view bucket, obj_path;
283 string url =
"/upload/storage/v1/b/";
284 absl::StrAppend(&url, bucket,
"/o?uploadType=resumable&name=");
285 strings::AppendEncodedUrl(obj_path, &url);
286 string token = gce.access_token();
288 CHECK(!token.empty());
290 auto req = detail::PrepareGenericRequest(h2::verb::post, url, token);
291 req.prepare_payload();
293 ApiSenderDynamicBody sender(
"start_write", gce, pool);
294 auto res = sender.SendGeneric(3, std::move(req));
298 const auto& resp = sender.parser()->get();
302 auto it = resp.find(h2::field::location);
303 if (it == resp.end()) {
304 return Status(StatusCode::PARSE_ERROR,
"Can not find location header");
306 string upload_id = string(it->value());
308 return new GcsWriteFile(full_path, gce, std::move(upload_id), pool);
static bool SplitToBucketPath(absl::string_view input, absl::string_view *bucket, absl::string_view *path)