4 #include "util/gce/gcs.h" 6 #include <boost/beast/http/buffer_body.hpp> 7 #include <boost/beast/http/parser.hpp> 9 #include "base/logging.h" 10 #include "strings/escaping.h" 12 #include "util/gce/detail/gcs_utils.h" 13 #include "util/http/https_client.h" 14 #include "util/http/https_client_pool.h" 18 using namespace boost;
19 using namespace ::std;
20 namespace h2 = beast::http;
21 using file::ReadonlyFile;
22 using http::HttpsClientPool;
26 string BuildGetObjUrl(absl::string_view bucket, absl::string_view obj_path) {
27 string read_obj_url{
"/storage/v1/b/"};
28 absl::StrAppend(&read_obj_url, bucket,
"/o/");
29 strings::AppendEncodedUrl(obj_path, &read_obj_url);
30 absl::StrAppend(&read_obj_url,
"?alt=media");
35 inline void SetRange(
size_t from,
size_t to, h2::fields* flds) {
36 string tmp = absl::StrCat(
"bytes=", from,
"-");
37 if (to < kuint64max) {
38 absl::StrAppend(&tmp, to - 1);
40 flds->set(h2::field::range, std::move(tmp));
43 class GcsReadFile :
public ReadonlyFile,
private detail::ApiSenderBufferBody {
45 using error_code = ::boost::system::error_code;
48 GcsReadFile(
const GCE& gce, HttpsClientPool* pool,
string read_obj_url)
49 : detail::ApiSenderBufferBody(
"read", gce, pool), read_obj_url_(std::move(read_obj_url)) {}
51 virtual ~GcsReadFile() final;
56 StatusObject<
size_t> Read(
size_t offset, const strings::MutableByteRange& range) final;
61 size_t Size() const final {
return size_; }
63 int Handle() const final {
return -1; }
68 const string read_obj_url_;
69 HttpsClientPool::ClientHandle https_handle_;
71 size_t size_ = 0,offs_ = 0;
74 GcsReadFile::~GcsReadFile() {}
76 Status GcsReadFile::Open() {
77 string token = gce_.access_token();
79 auto req = detail::PrepareGenericRequest(h2::verb::get, read_obj_url_, token);
81 SetRange(offs_, kuint64max, &req);
83 auto handle_res = SendGeneric(3, req);
85 return handle_res.status;
87 const auto& msg = parser()->get();
88 auto content_len_it = msg.find(h2::field::content_length);
89 if (content_len_it != msg.end()) {
90 size_t content_sz = 0;
91 CHECK(absl::SimpleAtoi(detail::absl_sv(content_len_it->value()), &content_sz));
94 CHECK_EQ(size_, content_sz + offs_) <<
"File size has changed underneath during reopen";
99 https_handle_ = std::move(handle_res.obj);
103 StatusObject<size_t> GcsReadFile::Read(
size_t offset,
const strings::MutableByteRange& range) {
104 CHECK(!range.empty());
106 if (offset != offs_) {
107 return Status(StatusCode::INVALID_ARGUMENT,
"Only sequential access supported");
111 if (parser()->is_done()) {
115 size_t read_sofar = 0;
116 while(read_sofar < range.size()) {
119 auto& body = parser()->get().body();
120 auto& left_available = body.size;
121 body.data = range.data() + read_sofar;
122 left_available = range.size() - read_sofar;
124 error_code ec = https_handle_->Read(parser());
125 size_t http_read = (range.size() - read_sofar) - left_available;
127 if (!ec || ec == h2::error::need_buffer) {
128 DVLOG(2) <<
"Read " << http_read <<
" bytes from " << offset <<
" with capacity " 129 << range.size() <<
"ec: " << ec;
135 CHECK(left_available == 0 || !ec);
136 return http_read + read_sofar;
139 if (ec == h2::error::partial_message) {
141 VLOG(1) <<
"Got partial_message, socket status: " 142 << https_handle_->client()->next_layer().status() <<
", socket " 143 << https_handle_->native_handle();
146 read_sofar += http_read;
147 ec = asio::ssl::error::stream_truncated;
150 if (ec == asio::ssl::error::stream_truncated) {
151 VLOG(1) <<
"Stream " << read_obj_url_ <<
" truncated at " << offs_ <<
"/" << size_;
152 https_handle_.reset();
154 RETURN_IF_ERROR(Open());
155 VLOG(1) <<
"Reopened the file, new size: " << size_;
159 LOG(ERROR) <<
"ec: " << ec <<
"/" << ec.message() <<
" at " << offset <<
"/" << size_;
160 LOG(ERROR) <<
"FiberSocket status: " << https_handle_->client()->next_layer().status();
162 return detail::ToStatus(ec);
170 Status GcsReadFile::Close() {
171 if (https_handle_ && parser()) {
172 if (!parser()->is_done()) {
174 https_handle_->schedule_reconnect();
177 https_handle_.reset();
184 StatusObject<ReadonlyFile*> OpenGcsReadFile(absl::string_view full_path,
const GCE& gce,
185 HttpsClientPool* pool,
186 const ReadonlyFile::Options& opts) {
187 CHECK(opts.sequential && pool);
188 CHECK(IsGcsPath(full_path));
190 absl::string_view bucket, obj_path;
193 string read_obj_url = BuildGetObjUrl(bucket, obj_path);
195 std::unique_ptr<GcsReadFile> fl(
new GcsReadFile(gce, pool, std::move(read_obj_url)));
196 RETURN_IF_ERROR(fl->Open());
static bool SplitToBucketPath(absl::string_view input, absl::string_view *bucket, absl::string_view *path)