gcs.cc
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #include "util/gce/gcs.h"
6 
7 #include <boost/beast/http/buffer_body.hpp>
8 #include <boost/beast/http/dynamic_body.hpp>
9 #include <boost/beast/http/empty_body.hpp>
10 
11 #include <rapidjson/document.h>
12 #include <rapidjson/error/en.h>
13 
14 #include "absl/strings/strip.h"
15 #include "absl/types/variant.h"
16 
17 #include "base/logging.h"
18 #include "base/walltime.h"
19 #include "strings/escaping.h"
20 #include "util/asio/fiber_socket.h"
21 #include "util/asio/io_context.h"
22 #include "util/gce/detail/gcs_utils.h"
23 #include "util/http/beast_rj_utils.h"
24 #include "util/http/https_client.h"
25 #include "util/stats/varz_stats.h"
26 
27 namespace util {
28 
29 DECLARE_uint32(gcs_upload_buf_log_size);
30 
31 using namespace std;
32 using namespace boost;
33 
34 namespace h2 = beast::http;
35 namespace rj = rapidjson;
36 
37 static constexpr char kDomain[] = "www.googleapis.com";
38 
39 namespace {
40 
41 
42 inline Status ToStatus(const ::boost::system::error_code& ec) {
43  return ec ? Status(StatusCode::IO_ERROR, absl::StrCat(ec.value(), ": ", ec.message()))
44  : Status::OK;
45 }
46 
47 inline Status HttpError(const h2::header<false, h2::fields>& resp) {
48  return Status(StatusCode::IO_ERROR,
49  absl::StrCat("Http error: ", resp.result_int(), " ",
50  detail::absl_sv(resp.reason())));
51 }
52 
53 #define RETURN_EC_STATUS(x) \
54  do { \
55  auto __ec$ = (x); \
56  if (__ec$) { \
57  VLOG(1) << "EC: " << __ec$ << " " << __ec$.message(); \
58  return ToStatus(x); \
59  } \
60  } while (false)
61 
62 inline h2::request<h2::empty_body> PrepareRequest(h2::verb req_verb, const beast::string_view url,
63  const beast::string_view access_token) {
64  h2::request<h2::empty_body> req(req_verb, url, 11);
65  req.set(h2::field::host, kDomain);
66  req.set(h2::field::authorization, access_token);
67  req.keep_alive(true);
68 
69  return req;
70 }
71 
72 template <typename Msg> inline bool IsUnauthorized(const Msg& msg) {
73  if (msg.result() != h2::status::unauthorized) {
74  return false;
75  }
76  auto it = msg.find("WWW-Authenticate");
77 
78  return it != msg.end();
79 }
80 
81 
83 inline void SetRange(size_t from, size_t to, h2::fields* flds) {
84  string tmp = absl::StrCat("bytes=", from, "-");
85  if (to < kuint64max) {
86  absl::StrAppend(&tmp, to - 1);
87  }
88  flds->set(h2::field::range, std::move(tmp));
89 }
90 
91 } // namespace
92 
93 std::ostream& operator<<(std::ostream& os, const h2::response<h2::buffer_body>& msg) {
94  os << msg.reason() << endl;
95  for (const auto& f : msg) {
96  os << f.name_string() << " : " << f.value() << endl;
97  }
98  os << "-------------------------";
99 
100  return os;
101 }
102 
103 template <typename Body> std::ostream& operator<<(std::ostream& os, const h2::request<Body>& msg) {
104  os << msg.method_string() << " " << msg.target() << endl;
105  for (const auto& f : msg) {
106  os << f.name_string() << " : " << f.value() << endl;
107  }
108  os << "-------------------------";
109 
110  return os;
111 }
112 
113 GCS::GCS(const GCE& gce, asio::ssl::context* ssl_cntx, IoContext* io_context)
114  : gce_(gce), io_context_(*io_context),
115  https_client_(new http::HttpsClient(kDomain, io_context, ssl_cntx)) {
116  https_client_->set_retry_count(3);
117 }
118 
119 GCS::~GCS() {
120  VLOG(1) << "GCS::~GCS";
121  https_client_.reset();
122 }
123 
124 Status GCS::Connect(unsigned msec) {
125  detail::InitVarzStats();
126 
127  auto ec = https_client_->Connect(msec);
128  if (ec) {
129  VLOG(1) << "Error connecting " << ec << " " << https_client_->client()->next_layer().status();
130  return ToStatus(ec);
131  }
132 
133  string token = gce_.access_token();
134  access_token_header_ = absl::StrCat("Bearer ", token);
135 
136  VLOG(1) << "GCS::Connect OK " << native_handle();
137 
138  return Status::OK;
139 }
140 
141 auto GCS::ListBuckets() -> ListBucketResult {
142  RETURN_IF_ERROR(PrepareConnection());
143 
144  string url = absl::StrCat("/storage/v1/b?project=", gce_.project_id());
145  absl::StrAppend(&url, "&fields=items,nextPageToken");
146 
147  auto http_req = PrepareRequest(h2::verb::get, url, access_token_header_);
148 
149  // TODO: to have a handler extracting what we need.
150  rj::Document doc;
151  vector<string> results;
152 
153  while (true) {
154  h2::response<h2::dynamic_body> resp_msg;
155 
156  RETURN_IF_ERROR(SendWithToken(&http_req, &resp_msg));
157  if (resp_msg.result() != h2::status::ok) {
158  return HttpError(resp_msg);
159  }
160 
161  http::RjBufSequenceStream is(resp_msg.body().data());
162 
163  doc.ParseStream<rj::kParseDefaultFlags>(is);
164  if (doc.HasParseError()) {
165  LOG(ERROR) << rj::GetParseError_En(doc.GetParseError()) << resp_msg;
166  return Status(StatusCode::PARSE_ERROR, "Could not parse json response");
167  }
168 
169  auto it = doc.FindMember("items");
170  CHECK(it != doc.MemberEnd()) << resp_msg;
171  const auto& val = it->value;
172  CHECK(val.IsArray());
173  auto array = val.GetArray();
174 
175  for (size_t i = 0; i < array.Size(); ++i) {
176  const auto& item = array[i];
177  auto it = item.FindMember("name");
178  if (it != item.MemberEnd()) {
179  results.emplace_back(it->value.GetString(), it->value.GetStringLength());
180  }
181  }
182  it = doc.FindMember("nextPageToken");
183  if (it == doc.MemberEnd()) {
184  break;
185  }
186  absl::string_view page_token{it->value.GetString(), it->value.GetStringLength()};
187  http_req.target(absl::StrCat(url, "&pageToken=", page_token));
188  }
189  return results;
190 }
191 
192 auto GCS::List(absl::string_view bucket, absl::string_view prefix, bool fs_mode, ListObjectCb cb)
193  -> ListObjectResult {
194  CHECK(!bucket.empty());
195  VLOG(1) << "GCS::List " << native_handle();
196 
197  RETURN_IF_ERROR(PrepareConnection());
198 
199  string url = "/storage/v1/b/";
200  absl::StrAppend(&url, bucket, "/o?prefix=");
201  strings::AppendEncodedUrl(prefix, &url);
202  if (fs_mode) {
203  absl::StrAppend(&url, "&delimiter=%2f");
204  }
205  auto http_req = PrepareRequest(h2::verb::get, url, access_token_header_);
206 
207  // TODO: to have a handler extracting what we need.
208  rj::Document doc;
209  while (true) {
210  h2::response<h2::dynamic_body> resp_msg;
211  RETURN_IF_ERROR(SendWithToken(&http_req, &resp_msg));
212  CHECK_EQ(h2::status::ok, resp_msg.result()) << resp_msg;
213 
214  http::RjBufSequenceStream is(resp_msg.body().data());
215 
216  doc.ParseStream<rj::kParseDefaultFlags>(is);
217 
218  if (doc.HasParseError()) {
219  LOG(ERROR) << rj::GetParseError_En(doc.GetParseError()) << resp_msg;
220  return Status(StatusCode::PARSE_ERROR, "Could not parse json response");
221  }
222 
223  auto it = doc.FindMember("items");
224  if (it == doc.MemberEnd())
225  break;
226 
227  const auto& val = it->value;
228  CHECK(val.IsArray());
229  auto array = val.GetArray();
230 
231  for (size_t i = 0; i < array.Size(); ++i) {
232  const auto& item = array[i];
233  auto it = item.FindMember("name");
234  CHECK(it != item.MemberEnd());
235  absl::string_view key_name(it->value.GetString(), it->value.GetStringLength());
236  it = item.FindMember("size");
237  CHECK(it != item.MemberEnd());
238  absl::string_view sz_str(it->value.GetString(), it->value.GetStringLength());
239  size_t item_size = 0;
240  CHECK(absl::SimpleAtoi(sz_str, &item_size));
241  cb(item_size, key_name);
242  }
243  it = doc.FindMember("nextPageToken");
244  if (it == doc.MemberEnd()) {
245  break;
246  }
247  absl::string_view page_token{it->value.GetString(), it->value.GetStringLength()};
248  http_req.target(absl::StrCat(url, "&pageToken=", page_token));
249  }
250  return Status::OK;
251 }
252 
253 auto GCS::Read(absl::string_view bucket, absl::string_view obj_path, size_t ofs,
254  const strings::MutableByteRange& range) -> ReadObjectResult {
255  CHECK(!range.empty());
256  RETURN_IF_ERROR(PrepareConnection());
257 
258  string read_obj_url = BuildGetObjUrl(bucket, obj_path);
259 
260  auto req = PrepareRequest(h2::verb::get, read_obj_url, access_token_header_);
261  SetRange(ofs, ofs + range.size(), &req);
262 
263  h2::response<h2::buffer_body> resp_msg;
264  auto& body = resp_msg.body();
265  body.data = range.data();
266  body.size = range.size();
267  body.more = false;
268 
269  RETURN_IF_ERROR(SendWithToken(&req, &resp_msg));
270  if (resp_msg.result() != h2::status::partial_content) {
271  return Status(StatusCode::IO_ERROR, string(resp_msg.reason()));
272  }
273 
274  auto left_available = body.size;
275  return range.size() - left_available; // how much written
276 }
277 
278 
279 string GCS::BuildGetObjUrl(absl::string_view bucket, absl::string_view obj_path) {
280  string read_obj_url{"/storage/v1/b/"};
281  absl::StrAppend(&read_obj_url, bucket, "/o/");
282  strings::AppendEncodedUrl(obj_path, &read_obj_url);
283  absl::StrAppend(&read_obj_url, "?alt=media");
284 
285  return read_obj_url;
286 }
287 
288 Status GCS::PrepareConnection() {
289  return Status::OK;
290 }
291 
292 
293 
294 Status GCS::RefreshToken(Request* req) {
295  auto res = gce_.RefreshAccessToken(&io_context_);
296  if (!res.ok())
297  return res.status;
298 
299  access_token_header_ = absl::StrCat("Bearer ", res.obj);
300  req->set(h2::field::authorization, access_token_header_);
301 
302  return Status::OK;
303 }
304 
305 template <typename RespBody> Status GCS::SendWithToken(Request* req, Response<RespBody>* resp) {
306  for (unsigned i = 0; i < 2; ++i) { // Iterate for possible token refresh.
307  VLOG(1) << "HttpReq" << i << ": " << *req << ", socket " << native_handle();
308 
309  error_code ec = https_client_->Send(*req, resp);
310  if (ec) {
311  return ToStatus(ec);
312  }
313  VLOG(1) << "HttpResp" << i << ": " << *resp;
314 
315  if (resp->result() == h2::status::ok) {
316  break;
317  };
318 
319  if (IsUnauthorized(*resp)) {
320  RETURN_IF_ERROR(RefreshToken(req));
321  *resp = Response<RespBody>{};
322  continue;
323  }
324  LOG(FATAL) << "Unexpected response " << *resp;
325  }
326  return Status::OK;
327 }
328 
329 constexpr char kGsUrl[] = "gs://";
330 
331 bool GCS::SplitToBucketPath(absl::string_view input, absl::string_view* bucket,
332  absl::string_view* path) {
333  if (!absl::ConsumePrefix(&input, kGsUrl))
334  return false;
335 
336  auto pos = input.find('/');
337  *bucket = input.substr(0, pos);
338  *path = (pos == absl::string_view::npos) ? absl::string_view{} : input.substr(pos + 1);
339  return true;
340 }
341 
342 std::string GCS::ToGcsPath(absl::string_view bucket, absl::string_view obj_path) {
343  return absl::StrCat(kGsUrl, bucket, "/", obj_path);
344 }
345 
346 uint32_t GCS::native_handle() { return https_client_->client()->next_layer().native_handle(); }
347 
348 bool IsGcsPath(absl::string_view path) { return absl::StartsWith(path, kGsUrl); }
349 
350 } // namespace util
ListObjectResult List(absl::string_view bucket, absl::string_view prefix, bool fs_mode, ListObjectCb cb)
Definition: gcs.cc:192
static std::string ToGcsPath(absl::string_view bucket, absl::string_view obj_path)
Inverse function to SplitToBucketPath. Returns full gcs URI that starts with "gs://"".
Definition: gcs.cc:342
std::function< void(size_t, absl::string_view)> ListObjectCb
Called with (size, key_name) pairs.
Definition: gcs.h:53
static bool SplitToBucketPath(absl::string_view input, absl::string_view *bucket, absl::string_view *path)
Definition: gcs.cc:331
std::string access_token() const
Definition: gce.cc:215