5 #include "util/gce/gcs.h" 7 #include <boost/beast/http/buffer_body.hpp> 8 #include <boost/beast/http/dynamic_body.hpp> 9 #include <boost/beast/http/empty_body.hpp> 11 #include <rapidjson/document.h> 12 #include <rapidjson/error/en.h> 14 #include "absl/strings/strip.h" 15 #include "absl/types/variant.h" 17 #include "base/logging.h" 18 #include "base/walltime.h" 19 #include "strings/escaping.h" 20 #include "util/asio/fiber_socket.h" 21 #include "util/asio/io_context.h" 22 #include "util/gce/detail/gcs_utils.h" 23 #include "util/http/beast_rj_utils.h" 24 #include "util/http/https_client.h" 25 #include "util/stats/varz_stats.h" 29 DECLARE_uint32(gcs_upload_buf_log_size);
32 using namespace boost;
34 namespace h2 = beast::http;
35 namespace rj = rapidjson;
37 static constexpr
char kDomain[] =
"www.googleapis.com";
42 inline Status ToStatus(const ::boost::system::error_code& ec) {
43 return ec ? Status(StatusCode::IO_ERROR, absl::StrCat(ec.value(),
": ", ec.message()))
47 inline Status HttpError(
const h2::header<false, h2::fields>& resp) {
48 return Status(StatusCode::IO_ERROR,
49 absl::StrCat(
"Http error: ", resp.result_int(),
" ",
50 detail::absl_sv(resp.reason())));
53 #define RETURN_EC_STATUS(x) \ 57 VLOG(1) << "EC: " << __ec$ << " " << __ec$.message(); \ 62 inline h2::request<h2::empty_body> PrepareRequest(h2::verb req_verb,
const beast::string_view url,
63 const beast::string_view access_token) {
64 h2::request<h2::empty_body> req(req_verb, url, 11);
65 req.set(h2::field::host, kDomain);
66 req.set(h2::field::authorization, access_token);
72 template <
typename Msg>
inline bool IsUnauthorized(
const Msg& msg) {
73 if (msg.result() != h2::status::unauthorized) {
76 auto it = msg.find(
"WWW-Authenticate");
78 return it != msg.end();
83 inline void SetRange(
size_t from,
size_t to, h2::fields* flds) {
84 string tmp = absl::StrCat(
"bytes=", from,
"-");
85 if (to < kuint64max) {
86 absl::StrAppend(&tmp, to - 1);
88 flds->set(h2::field::range, std::move(tmp));
93 std::ostream& operator<<(std::ostream& os,
const h2::response<h2::buffer_body>& msg) {
94 os << msg.reason() << endl;
95 for (
const auto& f : msg) {
96 os << f.name_string() <<
" : " << f.value() << endl;
98 os <<
"-------------------------";
103 template <
typename Body> std::ostream& operator<<(std::ostream& os,
const h2::request<Body>& msg) {
104 os << msg.method_string() <<
" " << msg.target() << endl;
105 for (
const auto& f : msg) {
106 os << f.name_string() <<
" : " << f.value() << endl;
108 os <<
"-------------------------";
113 GCS::GCS(
const GCE& gce, asio::ssl::context* ssl_cntx, IoContext* io_context)
114 : gce_(gce), io_context_(*io_context),
115 https_client_(new http::HttpsClient(kDomain, io_context, ssl_cntx)) {
116 https_client_->set_retry_count(3);
120 VLOG(1) <<
"GCS::~GCS";
121 https_client_.reset();
124 Status GCS::Connect(
unsigned msec) {
125 detail::InitVarzStats();
127 auto ec = https_client_->Connect(msec);
129 VLOG(1) <<
"Error connecting " << ec <<
" " << https_client_->client()->next_layer().status();
134 access_token_header_ = absl::StrCat(
"Bearer ", token);
136 VLOG(1) <<
"GCS::Connect OK " << native_handle();
141 auto GCS::ListBuckets() -> ListBucketResult {
142 RETURN_IF_ERROR(PrepareConnection());
144 string url = absl::StrCat(
"/storage/v1/b?project=", gce_.project_id());
145 absl::StrAppend(&url,
"&fields=items,nextPageToken");
147 auto http_req = PrepareRequest(h2::verb::get, url, access_token_header_);
151 vector<string> results;
154 h2::response<h2::dynamic_body> resp_msg;
156 RETURN_IF_ERROR(SendWithToken(&http_req, &resp_msg));
157 if (resp_msg.result() != h2::status::ok) {
158 return HttpError(resp_msg);
161 http::RjBufSequenceStream is(resp_msg.body().data());
163 doc.ParseStream<rj::kParseDefaultFlags>(is);
164 if (doc.HasParseError()) {
165 LOG(ERROR) << rj::GetParseError_En(doc.GetParseError()) << resp_msg;
166 return Status(StatusCode::PARSE_ERROR,
"Could not parse json response");
169 auto it = doc.FindMember(
"items");
170 CHECK(it != doc.MemberEnd()) << resp_msg;
171 const auto& val = it->value;
172 CHECK(val.IsArray());
173 auto array = val.GetArray();
175 for (
size_t i = 0; i < array.Size(); ++i) {
176 const auto& item = array[i];
177 auto it = item.FindMember(
"name");
178 if (it != item.MemberEnd()) {
179 results.emplace_back(it->value.GetString(), it->value.GetStringLength());
182 it = doc.FindMember(
"nextPageToken");
183 if (it == doc.MemberEnd()) {
186 absl::string_view page_token{it->value.GetString(), it->value.GetStringLength()};
187 http_req.target(absl::StrCat(url,
"&pageToken=", page_token));
194 CHECK(!bucket.empty());
195 VLOG(1) <<
"GCS::List " << native_handle();
197 RETURN_IF_ERROR(PrepareConnection());
199 string url =
"/storage/v1/b/";
200 absl::StrAppend(&url, bucket,
"/o?prefix=");
201 strings::AppendEncodedUrl(prefix, &url);
203 absl::StrAppend(&url,
"&delimiter=%2f");
205 auto http_req = PrepareRequest(h2::verb::get, url, access_token_header_);
210 h2::response<h2::dynamic_body> resp_msg;
211 RETURN_IF_ERROR(SendWithToken(&http_req, &resp_msg));
212 CHECK_EQ(h2::status::ok, resp_msg.result()) << resp_msg;
216 doc.ParseStream<rj::kParseDefaultFlags>(is);
218 if (doc.HasParseError()) {
219 LOG(ERROR) << rj::GetParseError_En(doc.GetParseError()) << resp_msg;
220 return Status(StatusCode::PARSE_ERROR,
"Could not parse json response");
223 auto it = doc.FindMember(
"items");
224 if (it == doc.MemberEnd())
227 const auto& val = it->value;
228 CHECK(val.IsArray());
229 auto array = val.GetArray();
231 for (
size_t i = 0; i < array.Size(); ++i) {
232 const auto& item = array[i];
233 auto it = item.FindMember(
"name");
234 CHECK(it != item.MemberEnd());
235 absl::string_view key_name(it->value.GetString(), it->value.GetStringLength());
236 it = item.FindMember(
"size");
237 CHECK(it != item.MemberEnd());
238 absl::string_view sz_str(it->value.GetString(), it->value.GetStringLength());
239 size_t item_size = 0;
240 CHECK(absl::SimpleAtoi(sz_str, &item_size));
241 cb(item_size, key_name);
243 it = doc.FindMember(
"nextPageToken");
244 if (it == doc.MemberEnd()) {
247 absl::string_view page_token{it->value.GetString(), it->value.GetStringLength()};
248 http_req.target(absl::StrCat(url,
"&pageToken=", page_token));
253 auto GCS::Read(absl::string_view bucket, absl::string_view obj_path,
size_t ofs,
254 const strings::MutableByteRange& range) -> ReadObjectResult {
255 CHECK(!range.empty());
256 RETURN_IF_ERROR(PrepareConnection());
258 string read_obj_url = BuildGetObjUrl(bucket, obj_path);
260 auto req = PrepareRequest(h2::verb::get, read_obj_url, access_token_header_);
261 SetRange(ofs, ofs + range.size(), &req);
263 h2::response<h2::buffer_body> resp_msg;
264 auto& body = resp_msg.body();
265 body.data = range.data();
266 body.size = range.size();
269 RETURN_IF_ERROR(SendWithToken(&req, &resp_msg));
270 if (resp_msg.result() != h2::status::partial_content) {
271 return Status(StatusCode::IO_ERROR,
string(resp_msg.reason()));
274 auto left_available = body.size;
275 return range.size() - left_available;
279 string GCS::BuildGetObjUrl(absl::string_view bucket, absl::string_view obj_path) {
280 string read_obj_url{
"/storage/v1/b/"};
281 absl::StrAppend(&read_obj_url, bucket,
"/o/");
282 strings::AppendEncodedUrl(obj_path, &read_obj_url);
283 absl::StrAppend(&read_obj_url,
"?alt=media");
288 Status GCS::PrepareConnection() {
294 Status GCS::RefreshToken(Request* req) {
295 auto res = gce_.RefreshAccessToken(&io_context_);
299 access_token_header_ = absl::StrCat(
"Bearer ", res.obj);
300 req->set(h2::field::authorization, access_token_header_);
305 template <
typename RespBody> Status GCS::SendWithToken(Request* req, Response<RespBody>* resp) {
306 for (
unsigned i = 0; i < 2; ++i) {
307 VLOG(1) <<
"HttpReq" << i <<
": " << *req <<
", socket " << native_handle();
309 error_code ec = https_client_->Send(*req, resp);
313 VLOG(1) <<
"HttpResp" << i <<
": " << *resp;
315 if (resp->result() == h2::status::ok) {
319 if (IsUnauthorized(*resp)) {
320 RETURN_IF_ERROR(RefreshToken(req));
321 *resp = Response<RespBody>{};
324 LOG(FATAL) <<
"Unexpected response " << *resp;
329 constexpr
char kGsUrl[] =
"gs://";
332 absl::string_view* path) {
333 if (!absl::ConsumePrefix(&input, kGsUrl))
336 auto pos = input.find(
'/');
337 *bucket = input.substr(0, pos);
338 *path = (pos == absl::string_view::npos) ? absl::string_view{} : input.substr(pos + 1);
342 std::string
GCS::ToGcsPath(absl::string_view bucket, absl::string_view obj_path) {
343 return absl::StrCat(kGsUrl, bucket,
"/", obj_path);
346 uint32_t GCS::native_handle() {
return https_client_->client()->next_layer().native_handle(); }
348 bool IsGcsPath(absl::string_view path) {
return absl::StartsWith(path, kGsUrl); }
ListObjectResult List(absl::string_view bucket, absl::string_view prefix, bool fs_mode, ListObjectCb cb)
static std::string ToGcsPath(absl::string_view bucket, absl::string_view obj_path)
Inverse function to SplitToBucketPath. Returns full gcs URI that starts with "gs://"".
std::function< void(size_t, absl::string_view)> ListObjectCb
Called with (size, key_name) pairs.
static bool SplitToBucketPath(absl::string_view input, absl::string_view *bucket, absl::string_view *path)
std::string access_token() const