gcs_read_file.cc
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #include "util/gce/gcs.h"
5 
6 #include <boost/beast/http/buffer_body.hpp>
7 #include <boost/beast/http/parser.hpp>
8 
9 #include "base/logging.h"
10 #include "strings/escaping.h"
11 
12 #include "util/gce/detail/gcs_utils.h"
13 #include "util/http/https_client.h"
14 #include "util/http/https_client_pool.h"
15 
16 namespace util {
17 
18 using namespace boost;
19 using namespace ::std;
20 namespace h2 = beast::http;
21 using file::ReadonlyFile;
22 using http::HttpsClientPool;
23 
24 namespace {
25 
26 string BuildGetObjUrl(absl::string_view bucket, absl::string_view obj_path) {
27  string read_obj_url{"/storage/v1/b/"};
28  absl::StrAppend(&read_obj_url, bucket, "/o/");
29  strings::AppendEncodedUrl(obj_path, &read_obj_url);
30  absl::StrAppend(&read_obj_url, "?alt=media");
31 
32  return read_obj_url;
33 }
34 
35 inline void SetRange(size_t from, size_t to, h2::fields* flds) {
36  string tmp = absl::StrCat("bytes=", from, "-");
37  if (to < kuint64max) {
38  absl::StrAppend(&tmp, to - 1);
39  }
40  flds->set(h2::field::range, std::move(tmp));
41 }
42 
43 class GcsReadFile : public ReadonlyFile, private detail::ApiSenderBufferBody {
44  public:
45  using error_code = ::boost::system::error_code;
46 
47  // does not own gcs object, only wraps it with ReadonlyFile interface.
48  GcsReadFile(const GCE& gce, HttpsClientPool* pool, string read_obj_url)
49  : detail::ApiSenderBufferBody("read", gce, pool), read_obj_url_(std::move(read_obj_url)) {}
50 
51  virtual ~GcsReadFile() final;
52 
53  // Reads upto length bytes and updates the result to point to the data.
54  // May use buffer for storing data. In case, EOF reached sets result.size() < length but still
55  // returns Status::OK.
56  StatusObject<size_t> Read(size_t offset, const strings::MutableByteRange& range) final;
57 
58  // releases the system handle for this file.
59  Status Close() final;
60 
61  size_t Size() const final { return size_; }
62 
63  int Handle() const final { return -1; }
64 
65  Status Open();
66 
67  private:
68  const string read_obj_url_;
69  HttpsClientPool::ClientHandle https_handle_;
70 
71  size_t size_ = 0,offs_ = 0;
72 };
73 
74 GcsReadFile::~GcsReadFile() {}
75 
76 Status GcsReadFile::Open() {
77  string token = gce_.access_token();
78 
79  auto req = detail::PrepareGenericRequest(h2::verb::get, read_obj_url_, token);
80  if (offs_)
81  SetRange(offs_, kuint64max, &req);
82 
83  auto handle_res = SendGeneric(3, req);
84  if (!handle_res.ok())
85  return handle_res.status;
86 
87  const auto& msg = parser()->get();
88  auto content_len_it = msg.find(h2::field::content_length);
89  if (content_len_it != msg.end()) {
90  size_t content_sz = 0;
91  CHECK(absl::SimpleAtoi(detail::absl_sv(content_len_it->value()), &content_sz));
92 
93  if (size_) {
94  CHECK_EQ(size_, content_sz + offs_) << "File size has changed underneath during reopen";
95  } else {
96  size_ = content_sz;
97  }
98  }
99  https_handle_ = std::move(handle_res.obj);
100  return Status::OK;
101 }
102 
103 StatusObject<size_t> GcsReadFile::Read(size_t offset, const strings::MutableByteRange& range) {
104  CHECK(!range.empty());
105 
106  if (offset != offs_) {
107  return Status(StatusCode::INVALID_ARGUMENT, "Only sequential access supported");
108  }
109 
110  // We can not cache parser() into local var because Open() below recreates the parser instance.
111  if (parser()->is_done()) {
112  return 0;
113  }
114 
115  size_t read_sofar = 0;
116  while(read_sofar < range.size()) {
117  // We keep body references inside the loop because Open() that might be called here,
118  // will recreate the parser from the point the connections disconnected.
119  auto& body = parser()->get().body();
120  auto& left_available = body.size;
121  body.data = range.data() + read_sofar;
122  left_available = range.size() - read_sofar;
123 
124  error_code ec = https_handle_->Read(parser()); // decreases left_available.
125  size_t http_read = (range.size() - read_sofar) - left_available;
126 
127  if (!ec || ec == h2::error::need_buffer) { // Success
128  DVLOG(2) << "Read " << http_read << " bytes from " << offset << " with capacity "
129  << range.size() << "ec: " << ec;
130 
131  // This check does not happen. See here why: https://github.com/boostorg/beast/issues/1662
132  // DCHECK_EQ(sz_read, http_read) << " " << range.size() << "/" << left_available;
133  offs_ += http_read;
134 
135  CHECK(left_available == 0 || !ec);
136  return http_read + read_sofar;
137  }
138 
139  if (ec == h2::error::partial_message) {
140  offs_ += http_read;
141  VLOG(1) << "Got partial_message, socket status: "
142  << https_handle_->client()->next_layer().status() << ", socket "
143  << https_handle_->native_handle();
144 
145  // advance the destination buffer as well.
146  read_sofar += http_read;
147  ec = asio::ssl::error::stream_truncated;
148  }
149 
150  if (ec == asio::ssl::error::stream_truncated) {
151  VLOG(1) << "Stream " << read_obj_url_ << " truncated at " << offs_ << "/" << size_;
152  https_handle_.reset();
153 
154  RETURN_IF_ERROR(Open());
155  VLOG(1) << "Reopened the file, new size: " << size_;
156  // TODO: to validate that file version has not been changed between retries.
157  continue;
158  } else {
159  LOG(ERROR) << "ec: " << ec << "/" << ec.message() << " at " << offset << "/" << size_;
160  LOG(ERROR) << "FiberSocket status: " << https_handle_->client()->next_layer().status();
161 
162  return detail::ToStatus(ec);
163  }
164  }
165 
166  return read_sofar;
167 }
168 
169 // releases the system handle for this file.
170 Status GcsReadFile::Close() {
171  if (https_handle_ && parser()) {
172  if (!parser()->is_done()) {
173  // We prefer closing the connection to draining.
174  https_handle_->schedule_reconnect();
175  }
176  }
177  https_handle_.reset();
178 
179  return Status::OK;
180 }
181 
182 } // namespace
183 
184 StatusObject<ReadonlyFile*> OpenGcsReadFile(absl::string_view full_path, const GCE& gce,
185  HttpsClientPool* pool,
186  const ReadonlyFile::Options& opts) {
187  CHECK(opts.sequential && pool);
188  CHECK(IsGcsPath(full_path));
189 
190  absl::string_view bucket, obj_path;
191  CHECK(GCS::SplitToBucketPath(full_path, &bucket, &obj_path));
192 
193  string read_obj_url = BuildGetObjUrl(bucket, obj_path);
194 
195  std::unique_ptr<GcsReadFile> fl(new GcsReadFile(gce, pool, std::move(read_obj_url)));
196  RETURN_IF_ERROR(fl->Open());
197 
198  return fl.release();
199 }
200 
201 } // namespace util
static bool SplitToBucketPath(absl::string_view input, absl::string_view *bucket, absl::string_view *path)
Definition: gcs.cc:331