gcs_write_file.cc
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 // Copyright 2019, Beeri 15. All rights reserved.
5 // Author: Roman Gershman (romange@gmail.com)
6 //
7 
8 #include "util/gce/gcs.h"
9 
10 #include <boost/beast/http/dynamic_body.hpp>
11 #include <boost/fiber/operations.hpp>
12 
13 #include "absl/strings/strip.h"
14 #include "base/logging.h"
15 #include "base/walltime.h"
16 #include "strings/escaping.h"
17 
18 #include "util/asio/io_context.h"
19 #include "util/gce/detail/gcs_utils.h"
20 
21 #include "util/http/http_common.h"
22 #include "util/http/https_client.h"
23 #include "util/http/https_client_pool.h"
24 
25 
26 namespace util {
27 
28 DEFINE_bool(gcs_dry_write, false,
29  "If set true do not really perform upload requests."
30  "Still creates gcs connections for upload.");
31 
32 DEFINE_uint32(gcs_upload_buf_log_size, 20, "Upload buffer size is 2^k of this parameter.");
33 
34 using namespace boost;
35 using namespace ::std;
36 namespace h2 = detail::h2;
37 using base::GetMonotonicMicrosFast;
38 using file::WriteFile;
39 using http::HttpsClientPool;
40 
41 namespace {
42 
44 string ContentRangeHeader(size_t from, size_t to, ssize_t total) {
45  CHECK_LE(from, to);
46  string tmp{"bytes "};
47 
48  if (from < to) { // common case.
49  absl::StrAppend(&tmp, from, "-", to - 1, "/"); // content-range is inclusive.
50  if (total >= 0) {
51  absl::StrAppend(&tmp, total);
52  } else {
53  tmp.push_back('*');
54  }
55  } else {
56  // We can write empty ranges only when we finalize the file and total is known.
57  CHECK_GE(total, 0);
58  absl::StrAppend(&tmp, "*/", total);
59  }
60 
61  return tmp;
62 }
63 
64 class ApiSenderDynamicBody : public detail::ApiSenderBase {
65  public:
66  using Parser = h2::response_parser<h2::dynamic_body>;
67 
68  using ApiSenderBase::ApiSenderBase;
69 
71  Parser* parser() { return parser_.has_value() ? &parser_.value() : nullptr; }
72 
73  private:
74  error_code SendRequestIterative(const Request& req, http::HttpsClient* client) final;
75  absl::optional<Parser> parser_;
76 };
77 
78 class GcsWriteFile : public WriteFile, protected ApiSenderDynamicBody {
79  public:
87  GcsWriteFile(absl::string_view name, const GCE& gce, string obj_url, HttpsClientPool* pool);
88 
89  bool Close() final;
90 
91  bool Open() final;
92 
93  Status Write(const uint8* buffer, uint64 length) final;
94 
95  private:
96  size_t FillBuf(const uint8* buffer, size_t length);
97 
98  Status Upload();
99  Request PrepareRequest(size_t to, ssize_t total);
100 
101  string obj_url_;
102  beast::multi_buffer body_mb_;
103  size_t uploaded_ = 0;
104 };
105 
106 GcsWriteFile::GcsWriteFile(absl::string_view name, const GCE& gce, string obj_url,
107  HttpsClientPool* pool)
108  : WriteFile(name), ApiSenderDynamicBody("write", gce, pool), obj_url_(std::move(obj_url)),
109  body_mb_(1 << FLAGS_gcs_upload_buf_log_size) {
110  CHECK(!obj_url_.empty());
111  CHECK_GE(FLAGS_gcs_upload_buf_log_size, 18);
112 }
113 
114 bool GcsWriteFile::Close() {
115  CHECK(pool_->io_context().InContextThread());
116 
117  size_t to = uploaded_ + body_mb_.size();
118  Request req = PrepareRequest(to, to);
119  Request::header_type header = req;
120 
121  Status res;
122  if (!FLAGS_gcs_dry_write) {
123  res = SendGeneric(3, std::move(req)).status;
124  }
125 
126  if (res.ok()) {
127  VLOG(1) << "Closed file " << header;
128  } else {
129  LOG(ERROR) << "Error closing GCS file " << parser()->get() << " for request: \n"
130  << header << ", status " << res;
131  }
132  delete this;
133 
134  return res.ok();
135 }
136 
137 bool GcsWriteFile::Open() {
138  LOG(FATAL) << "Should not be called";
139 
140  return true;
141 }
142 
143 Status GcsWriteFile::Write(const uint8* buffer, uint64 length) {
144  CHECK(pool_->io_context().InContextThread());
145 
146  while (length) {
147  size_t written = FillBuf(buffer, length);
148  if (body_mb_.size() < body_mb_.max_size())
149  break;
150  length -= written;
151  buffer += written;
152  RETURN_IF_ERROR(Upload());
153  }
154 
155  return Status::OK;
156 }
157 
158 size_t GcsWriteFile::FillBuf(const uint8* buffer, size_t length) {
159  size_t prepare_size = std::min(length, body_mb_.max_size() - body_mb_.size());
160  auto mbs = body_mb_.prepare(prepare_size);
161  size_t offs = 0;
162  for (auto mb : mbs) {
163  memcpy(mb.data(), buffer + offs, mb.size());
164  offs += mb.size();
165  }
166  CHECK_EQ(offs, prepare_size);
167  body_mb_.commit(prepare_size);
168 
169  return offs;
170 }
171 
172 Status GcsWriteFile::Upload() {
173  size_t body_size = body_mb_.size();
174  CHECK_GT(body_size, 0);
175  CHECK_EQ(0, body_size % (1U << 18)) << body_size; // Must be multiple of 256KB.
176 
177  size_t to = uploaded_ + body_size;
178 
179  Request req = PrepareRequest(to, -1);
180 
181  Status res;
182  if (!FLAGS_gcs_dry_write) {
183  uint64_t start = GetMonotonicMicrosFast();
184  res = SendGeneric(3, std::move(req)).status;
185  VLOG(1) << "Uploaded range " << uploaded_ << "/" << to << " for " << obj_url_;
186 
187  Parser* upload_parser = CHECK_NOTNULL(parser());
188  const auto& resp_msg = upload_parser->get();
189  auto it = resp_msg.find(h2::field::range);
190  CHECK(it != resp_msg.end()) << resp_msg;
191 
192  absl::string_view range = detail::absl_sv(it->value());
193  CHECK(absl::ConsumePrefix(&range, "bytes="));
194  size_t pos = range.find('-');
195  CHECK_LT(pos, range.size());
196  size_t uploaded_pos = 0;
197  CHECK(absl::SimpleAtoi(range.substr(pos + 1), &uploaded_pos));
198  CHECK_EQ(uploaded_pos + 1, to);
199 
200  detail::gcs_writes->Inc();
201  detail::gcs_latency->IncBy("write", GetMonotonicMicrosFast() - start);
202 
203  if (!res.ok())
204  return res;
205  }
206 
207  uploaded_ = to;
208 
209  return Status::OK;
210 }
211 
212 auto GcsWriteFile::PrepareRequest(size_t to, ssize_t total) -> Request {
213  Request req(h2::verb::put, obj_url_, 11);
214  req.body() = std::move(body_mb_);
215  req.set(h2::field::content_range, ContentRangeHeader(uploaded_, to, total));
216  req.set(h2::field::content_type, http::kBinMime);
217  req.prepare_payload();
218 
219  DCHECK_EQ(0, body_mb_.size());
220 
221  return req;
222 }
223 
224 auto ApiSenderDynamicBody::SendRequestIterative(const Request& req, http::HttpsClient* client)
225  -> error_code {
226  system::error_code ec = client->Send(req);
227  if (ec) {
228  VLOG(1) << "Error sending to socket " << client->native_handle() << " " << ec;
229  return ec;
230  }
231 
232  parser_.emplace(); // .body_limit(kuint64max);
233  ec = client->Read(&parser_.value());
234  if (ec) {
235  return ec;
236  }
237 
238  if (!parser_->keep_alive()) {
239  client->schedule_reconnect();
240  LOG(INFO) << "Scheduling reconnect due to conn-close header";
241  }
242 
243  const auto& msg = parser_->get();
244  VLOG(1) << "HeaderResp(" << client->native_handle() << "): " << msg;
245 
246  // 308 or http ok are both good responses.
247  if (msg.result() == h2::status::ok || msg.result() == h2::status::permanent_redirect) {
248  return error_code{}; // all is good.
249  }
250 
251  if (detail::DoesServerPushback(msg.result())) {
252  LOG(INFO) << "Retrying(" << client->native_handle() << ") with " << msg;
253 
254  this_fiber::sleep_for(1s);
255  return asio::error::try_again; // retry
256  }
257 
258  if (detail::IsUnauthorized(msg)) {
259  return asio::error::no_permission;
260  } else if (msg.result() == h2::status::gone) {
261  const Request::header_type& header = req;
262 
263  LOG(INFO) << "Closing(" << client->native_handle() << ") with " << msg << " for request "
264  << header;
265 
266  this_fiber::sleep_for(1s);
267 
268  return system::errc::make_error_code(system::errc::connection_refused);
269  }
270 
271  LOG(ERROR) << "Unexpected status " << msg;
272 
273  return h2::error::bad_status;
274 }
275 
276 } // namespace
277 
278 StatusObject<file::WriteFile*> OpenGcsWriteFile(absl::string_view full_path, const GCE& gce,
279  http::HttpsClientPool* pool) {
280  absl::string_view bucket, obj_path;
281  CHECK(GCS::SplitToBucketPath(full_path, &bucket, &obj_path));
282 
283  string url = "/upload/storage/v1/b/";
284  absl::StrAppend(&url, bucket, "/o?uploadType=resumable&name=");
285  strings::AppendEncodedUrl(obj_path, &url);
286  string token = gce.access_token();
287 
288  CHECK(!token.empty());
289 
290  auto req = detail::PrepareGenericRequest(h2::verb::post, url, token);
291  req.prepare_payload();
292 
293  ApiSenderDynamicBody sender("start_write", gce, pool);
294  auto res = sender.SendGeneric(3, std::move(req));
295  if (!res.ok())
296  return res.status;
297 
298  const auto& resp = sender.parser()->get();
299 
300  // HttpsClientPool::ClientHandle handle = std::move(res.obj);
301 
302  auto it = resp.find(h2::field::location);
303  if (it == resp.end()) {
304  return Status(StatusCode::PARSE_ERROR, "Can not find location header");
305  }
306  string upload_id = string(it->value());
307 
308  return new GcsWriteFile(full_path, gce, std::move(upload_id), pool);
309 }
310 
311 } // namespace util
static bool SplitToBucketPath(absl::string_view input, absl::string_view *bucket, absl::string_view *path)
Definition: gcs.cc:331