4 #include "util/gce/gcs.h"     6 #include <boost/beast/http/buffer_body.hpp>     7 #include <boost/beast/http/parser.hpp>     9 #include "base/logging.h"    10 #include "strings/escaping.h"    12 #include "util/gce/detail/gcs_utils.h"    13 #include "util/http/https_client.h"    14 #include "util/http/https_client_pool.h"    18 using namespace boost;
    19 using namespace ::std;
    20 namespace h2 = beast::http;
    21 using file::ReadonlyFile;
    22 using http::HttpsClientPool;
    26 string BuildGetObjUrl(absl::string_view bucket, absl::string_view obj_path) {
    27   string read_obj_url{
"/storage/v1/b/"};
    28   absl::StrAppend(&read_obj_url, bucket, 
"/o/");
    29   strings::AppendEncodedUrl(obj_path, &read_obj_url);
    30   absl::StrAppend(&read_obj_url, 
"?alt=media");
    35 inline void SetRange(
size_t from, 
size_t to, h2::fields* flds) {
    36   string tmp = absl::StrCat(
"bytes=", from, 
"-");
    37   if (to < kuint64max) {
    38     absl::StrAppend(&tmp, to - 1);
    40   flds->set(h2::field::range, std::move(tmp));
    43 class GcsReadFile : 
public ReadonlyFile, 
private detail::ApiSenderBufferBody {
    45   using error_code = ::boost::system::error_code;
    48   GcsReadFile(
const GCE& gce, HttpsClientPool* pool, 
string read_obj_url)
    49       : detail::ApiSenderBufferBody(
"read", gce, pool), read_obj_url_(std::move(read_obj_url)) {}
    51   virtual ~GcsReadFile() final;
    56   StatusObject<
size_t> Read(
size_t offset, const strings::MutableByteRange& range) final;
    61   size_t Size() const final { 
return size_; }
    63   int Handle() const final { 
return -1; }
    68   const string read_obj_url_;
    69   HttpsClientPool::ClientHandle https_handle_;
    71   size_t size_ = 0,offs_ = 0;
    74 GcsReadFile::~GcsReadFile() {}
    76 Status GcsReadFile::Open() {
    77   string token = gce_.access_token();
    79   auto req = detail::PrepareGenericRequest(h2::verb::get, read_obj_url_, token);
    81     SetRange(offs_, kuint64max, &req);
    83   auto handle_res = SendGeneric(3, req);
    85     return handle_res.status;
    87   const auto& msg = parser()->get();
    88   auto content_len_it = msg.find(h2::field::content_length);
    89   if (content_len_it != msg.end()) {
    90     size_t content_sz = 0;
    91     CHECK(absl::SimpleAtoi(detail::absl_sv(content_len_it->value()), &content_sz));
    94       CHECK_EQ(size_, content_sz + offs_) << 
"File size has changed underneath during reopen";
    99   https_handle_ = std::move(handle_res.obj);
   103 StatusObject<size_t> GcsReadFile::Read(
size_t offset, 
const strings::MutableByteRange& range) {
   104   CHECK(!range.empty());
   106   if (offset != offs_) {
   107     return Status(StatusCode::INVALID_ARGUMENT, 
"Only sequential access supported");
   111   if (parser()->is_done()) {
   115   size_t read_sofar = 0;
   116   while(read_sofar < range.size()) {
   119     auto& body = parser()->get().body();
   120     auto& left_available = body.size;
   121     body.data = range.data() + read_sofar;
   122     left_available = range.size() - read_sofar;
   124     error_code ec = https_handle_->Read(parser());  
   125     size_t http_read = (range.size() - read_sofar) - left_available;
   127     if (!ec || ec == h2::error::need_buffer) {  
   128       DVLOG(2) << 
"Read " << http_read << 
" bytes from " << offset << 
" with capacity "   129                << range.size() << 
"ec: " << ec;
   135       CHECK(left_available == 0 || !ec);
   136       return http_read + read_sofar;
   139     if (ec == h2::error::partial_message) {
   141       VLOG(1) << 
"Got partial_message, socket status: "   142               << https_handle_->client()->next_layer().status() << 
", socket "   143               << https_handle_->native_handle();
   146       read_sofar += http_read;
   147       ec = asio::ssl::error::stream_truncated;
   150     if (ec == asio::ssl::error::stream_truncated) {
   151       VLOG(1) << 
"Stream " << read_obj_url_ << 
" truncated at " << offs_ << 
"/" << size_;
   152       https_handle_.reset();
   154       RETURN_IF_ERROR(Open());
   155       VLOG(1) << 
"Reopened the file, new size: " << size_;
   159       LOG(ERROR) << 
"ec: " << ec << 
"/" << ec.message() << 
" at " << offset << 
"/" << size_;
   160       LOG(ERROR) << 
"FiberSocket status: " << https_handle_->client()->next_layer().status();
   162       return detail::ToStatus(ec);
   170 Status GcsReadFile::Close() {
   171   if (https_handle_ && parser()) {
   172     if (!parser()->is_done()) {
   174       https_handle_->schedule_reconnect();
   177   https_handle_.reset();
   184 StatusObject<ReadonlyFile*> OpenGcsReadFile(absl::string_view full_path, 
const GCE& gce,
   185                                             HttpsClientPool* pool,
   186                                             const ReadonlyFile::Options& opts) {
   187   CHECK(opts.sequential && pool);
   188   CHECK(IsGcsPath(full_path));
   190   absl::string_view bucket, obj_path;
   193   string read_obj_url = BuildGetObjUrl(bucket, obj_path);
   195   std::unique_ptr<GcsReadFile> fl(
new GcsReadFile(gce, pool, std::move(read_obj_url)));
   196   RETURN_IF_ERROR(fl->Open());
 static bool SplitToBucketPath(absl::string_view input, absl::string_view *bucket, absl::string_view *path)