s3.cc
1 // Copyright 2019, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 
5 #include "util/aws/s3.h"
6 
7 #include <libxml/xpath.h>
8 #include <libxml/xpathInternals.h>
9 
10 #include <boost/asio/ssl/error.hpp>
11 #include <boost/beast/http/dynamic_body.hpp>
12 #include <boost/beast/http/empty_body.hpp>
13 #include <boost/beast/http/string_body.hpp>
14 
15 #include "absl/strings/match.h"
16 #include "absl/strings/str_cat.h"
17 #include "absl/strings/str_format.h"
18 #include "absl/strings/strip.h"
19 #include "absl/types/optional.h"
20 #include "base/logging.h"
21 #include "base/walltime.h"
22 #include "strings/escaping.h"
23 #include "util/asio/io_context.h"
24 #include "util/aws/aws.h"
25 #include "util/http/http_common.h"
26 #include "util/http/https_client.h"
27 #include "util/http/https_client_pool.h"
28 
29 namespace util {
30 
31 DEFINE_uint32(s3_upload_buf_mb, 5, "Upload buffer size in MB. must be at least 5MB");
32 
33 using file::ReadonlyFile;
34 using http::HttpsClientPool;
35 
36 using namespace boost;
37 namespace h2 = beast::http;
38 using std::string;
39 
40 using bb_str_view = ::boost::beast::string_view;
41 
42 namespace {
43 
44 constexpr char kS3Url[] = "s3://";
45 
46 // TODO: the same like in gcs_utils.h
47 inline Status ToStatus(const ::boost::system::error_code& ec) {
48  return ec ? Status(StatusCode::IO_ERROR, absl::StrCat(ec.value(), ": ", ec.message()))
49  : Status::OK;
50 }
51 
52 inline absl::string_view absl_sv(const bb_str_view s) {
53  return absl::string_view{s.data(), s.size()};
54 }
55 
56 std::ostream& operator<<(std::ostream& os, const h2::response<h2::buffer_body>& msg) {
57  os << msg.reason() << std::endl;
58  for (const auto& f : msg) {
59  os << f.name_string() << " : " << f.value() << std::endl;
60  }
61  os << "-------------------------";
62 
63  return os;
64 }
65 
66 // TODO: the same as in GCS. Can be implemented in terms of static 64 bytes buffer.
67 inline void SetRange(size_t from, size_t to, h2::fields* flds) {
68  string tmp = absl::StrCat("bytes=", from, "-");
69  if (to < kuint64max) {
70  absl::StrAppend(&tmp, to - 1);
71  }
72  flds->set(h2::field::range, std::move(tmp));
73 }
74 
75 inline const char* as_char(const xmlChar* var) {
76  return reinterpret_cast<const char*>(var);
77 }
78 
79 class S3ReadFile : public ReadonlyFile {
80  public:
81  using error_code = ::boost::system::error_code;
82  using Parser = h2::response_parser<h2::buffer_body>;
83 
84  // does not own pool object, only wraps it with ReadonlyFile interface.
85  S3ReadFile(const AWS& aws, HttpsClientPool* pool, string read_obj_url)
86  : aws_(aws), pool_(pool), read_obj_url_(std::move(read_obj_url)) {
87  }
88 
89  virtual ~S3ReadFile() final;
90 
91  // Reads upto length bytes and updates the result to point to the data.
92  // May use buffer for storing data. In case, EOF reached sets result.size() < length but still
93  // returns Status::OK.
94  StatusObject<size_t> Read(size_t offset, const strings::MutableByteRange& range) final;
95 
96  // releases the system handle for this file.
97  Status Close() final;
98 
99  size_t Size() const final {
100  return size_;
101  }
102 
103  int Handle() const final {
104  return -1;
105  }
106 
107  Status Open();
108 
109  private:
110  Parser* parser() {
111  return &parser_;
112  }
113 
114  const AWS& aws_;
115  HttpsClientPool* pool_;
116 
117  const string read_obj_url_;
118  HttpsClientPool::ClientHandle https_handle_;
119 
120  Parser parser_;
121  size_t size_ = 0, offs_ = 0;
122 };
123 
124 class S3WriteFile : public file::WriteFile {
125  public:
133  S3WriteFile(absl::string_view name, const AWS& aws, string upload_id, HttpsClientPool* pool);
134 
135  bool Close() final;
136 
137  bool Open() final;
138 
139  Status Write(const uint8* buffer, uint64 length) final;
140 
141  private:
142  size_t FillBuf(const uint8* buffer, size_t length);
143 
144  Status Upload();
145 
146  const AWS& aws_;
147 
148  string upload_id_;
149  beast::multi_buffer body_mb_;
150  size_t uploaded_ = 0;
151  HttpsClientPool* pool_;
152  std::vector<string> parts_;
153  std::vector<fibers::fiber> uploads_;
154 };
155 
156 S3ReadFile::~S3ReadFile() {
157 }
158 
159 Status S3ReadFile::Open() {
160  string url = absl::StrCat("/", read_obj_url_);
161  h2::request<h2::empty_body> req{h2::verb::get, url, 11};
162 
163  if (offs_)
164  SetRange(offs_, kuint64max, &req);
165 
166  VLOG(1) << "Unsigned request: " << req;
167 
168  aws_.SignEmpty(pool_->domain(), &req);
169 
170  // TODO: to wrap IO operations with retryable mechanism like in GCS.
171  HttpsClientPool::ClientHandle handle = pool_->GetHandle();
172 
173  system::error_code ec = handle->Send(req);
174 
175  if (ec) {
176  return ToStatus(ec);
177  }
178 
179  parser_.body_limit(kuint64max);
180  ec = handle->ReadHeader(&parser_);
181  if (ec) {
182  return ToStatus(ec);
183  }
184 
185  CHECK(parser_.keep_alive()) << "TBD";
186  const auto& msg = parser_.get();
187 
188  if (msg.result() != h2::status::ok) {
189  LOG(INFO) << "OpenError: " << msg;
190 
191  return Status(StatusCode::IO_ERROR, string(msg.reason()));
192  }
193  VLOG(1) << "HeaderResp(" << handle->native_handle() << "): " << msg;
194 
195  auto content_len_it = msg.find(h2::field::content_length);
196  if (content_len_it != msg.end()) {
197  size_t content_sz = 0;
198  CHECK(absl::SimpleAtoi(absl_sv(content_len_it->value()), &content_sz));
199 
200  if (size_) {
201  CHECK_EQ(size_, content_sz + offs_) << "File size has changed underneath during reopen";
202  } else {
203  size_ = content_sz;
204  }
205  }
206  https_handle_ = std::move(handle);
207  return Status::OK;
208 }
209 
210 StatusObject<size_t> S3ReadFile::Read(size_t offset, const strings::MutableByteRange& range) {
211  CHECK(!range.empty());
212 
213  if (offset != offs_) {
214  return Status(StatusCode::INVALID_ARGUMENT, "Only sequential access supported");
215  }
216 
217  // We can not cache parser() into local var because Open() below recreates the parser instance.
218  if (parser_.is_done()) {
219  return 0;
220  }
221 
222  size_t read_sofar = 0;
223  while (read_sofar < range.size()) {
224  // We keep body references inside the loop because Open() that might be called here,
225  // will recreate the parser from the point the connections disconnected.
226  auto& body = parser()->get().body();
227  auto& left_available = body.size;
228  body.data = range.data() + read_sofar;
229  left_available = range.size() - read_sofar;
230 
231  error_code ec = https_handle_->Read(parser()); // decreases left_available.
232  size_t http_read = (range.size() - read_sofar) - left_available;
233 
234  if (!ec || ec == h2::error::need_buffer) { // Success
235  DVLOG(2) << "Read " << http_read << " bytes from " << offset << " with capacity "
236  << range.size() << "ec: " << ec;
237 
238  // This check does not happen. See here why: https://github.com/boostorg/beast/issues/1662
239  // DCHECK_EQ(sz_read, http_read) << " " << range.size() << "/" << left_available;
240  offs_ += http_read;
241 
242  CHECK(left_available == 0 || !ec);
243  return http_read + read_sofar;
244  }
245 
246  if (ec == h2::error::partial_message) {
247  offs_ += http_read;
248  VLOG(1) << "Got partial_message, socket status: "
249  << https_handle_->client()->next_layer().status() << ", socket "
250  << https_handle_->native_handle();
251 
252  // advance the destination buffer as well.
253  read_sofar += http_read;
254  ec = asio::ssl::error::stream_truncated;
255  }
256 
257  if (ec == asio::ssl::error::stream_truncated) {
258  VLOG(1) << "Stream " << read_obj_url_ << " truncated at " << offs_ << "/" << size_;
259  https_handle_.reset();
260 
261  RETURN_IF_ERROR(Open());
262  VLOG(1) << "Reopened the file, new size: " << size_;
263  // TODO: to validate that file version has not been changed between retries.
264  continue;
265  } else {
266  LOG(ERROR) << "ec: " << ec << "/" << ec.message() << " at " << offset << "/" << size_;
267  LOG(ERROR) << "FiberSocket status: " << https_handle_->client()->next_layer().status();
268 
269  return ToStatus(ec);
270  }
271  }
272 
273  return read_sofar;
274 }
275 
276 // releases the system handle for this file.
277 Status S3ReadFile::Close() {
278  if (https_handle_ && parser()) {
279  if (!parser()->is_done()) {
280  // We prefer closing the connection to draining.
281  https_handle_->schedule_reconnect();
282  }
283  }
284  https_handle_.reset();
285 
286  return Status::OK;
287 }
288 
289 S3WriteFile::S3WriteFile(absl::string_view name, const AWS& aws, string upload_id,
290  HttpsClientPool* pool)
291  : file::WriteFile(name), aws_(aws), upload_id_(std::move(upload_id)),
292  body_mb_(FLAGS_s3_upload_buf_mb * (1 << 20)), pool_(pool) {
293 }
294 
295 bool S3WriteFile::Close() {
296  CHECK(pool_->io_context().InContextThread());
297 
298  auto status = Upload();
299  if (!status.ok()) {
300  LOG(ERROR) << "Error uploading " << status;
301  return false;
302  }
303 
304  VLOG(1) << "Joining with " << uploads_.size() << " fibers";
305  for (auto& f : uploads_) {
306  f.join();
307  }
308  uploads_.clear();
309 
310  if (parts_.empty())
311  return true;
312 
313  string url("/");
314 
315  strings::AppendEncodedUrl(create_file_name_, &url);
316 
317  // Signed params must look like key/value pairs. Instead of handling key-only params
318  // in the signing code we just pass empty value here.
319  absl::StrAppend(&url, "?uploadId=", upload_id_);
320 
321  h2::request<h2::string_body> req{h2::verb::post, url, 11};
322  h2::response<h2::string_body> resp;
323 
324  req.set(h2::field::content_type, http::kXmlMime);
325  auto& body = req.body();
326  body = R"(<?xml version="1.0" encoding="UTF-8"?>
327 <CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">)";
328 
329  for (size_t i = 0; i < parts_.size(); ++i) {
330  absl::StrAppend(&body, "<Part><ETag>\"", parts_[i], "\"</ETag><PartNumber>", i + 1);
331  absl::StrAppend(&body, "</PartNumber></Part>\n");
332  }
333  body.append("</CompleteMultipartUpload>");
334 
335  req.prepare_payload();
336  char sha256[65];
337 
338  detail::Sha256String(req.body(), sha256);
339  aws_.Sign(pool_->domain(), absl::string_view{sha256, 64}, &req);
340 
341  uint64_t start = base::GetMonotonicMicrosFast();
342  HttpsClientPool::ClientHandle handle = pool_->GetHandle();
343  system::error_code ec = handle->Send(req, &resp);
344 
345  if (ec) {
346  VLOG(1) << "Error sending to socket " << handle->native_handle() << " " << ec;
347  return false;
348  }
349 
350  if (resp.result() != h2::status::ok) {
351  LOG(ERROR) << "S3WriteFile::Close: " << req << "/ " << resp;
352 
353  return false;
354  }
355  VLOG(1) << "S3Close took " << base::GetMonotonicMicrosFast() - start << " micros";
356  parts_.clear();
357 
358  return true;
359 }
360 
361 bool S3WriteFile::Open() {
362  LOG(FATAL) << "Should not be called";
363 
364  return true;
365 }
366 
367 Status S3WriteFile::Write(const uint8* buffer, uint64 length) {
368  while (length) {
369  size_t written = FillBuf(buffer, length);
370  if (body_mb_.size() < body_mb_.max_size())
371  break;
372  length -= written;
373  buffer += written;
374  RETURN_IF_ERROR(Upload());
375  }
376 
377  return Status::OK;
378 }
379 
380 size_t S3WriteFile::FillBuf(const uint8* buffer, size_t length) {
381  size_t prepare_size = std::min(length, body_mb_.max_size() - body_mb_.size());
382  auto mbs = body_mb_.prepare(prepare_size);
383  size_t offs = 0;
384  for (auto mb : mbs) {
385  memcpy(mb.data(), buffer + offs, mb.size());
386  offs += mb.size();
387  }
388  CHECK_EQ(offs, prepare_size);
389  body_mb_.commit(prepare_size);
390 
391  return offs;
392 }
393 
394 Status S3WriteFile::Upload() {
395  size_t body_size = body_mb_.size();
396  if (body_size == 0)
397  return Status::OK;
398 
399  string url("/");
400  char sha256[65];
401 
402  // TODO: To figure out why SHA256 is so slow.
403  //detail::Sha256String(body_mb_, sha256);
404  const char* kFakeSha = "UNSIGNED-PAYLOAD";
405  strings::AppendEncodedUrl(create_file_name_, &url);
406  absl::StrAppend(&url, "?uploadId=", upload_id_);
407  absl::StrAppend(&url, "&partNumber=", parts_.size() + 1);
408 
409  h2::request<h2::dynamic_body> req{h2::verb::put, url, 11};
410  req.set(h2::field::content_type, http::kBinMime);
411 
412  req.body() = std::move(body_mb_);
413  req.prepare_payload();
414 
415  aws_.Sign(pool_->domain(), absl::string_view{kFakeSha}, &req);
416 
417  auto up_cb = [this, req = std::move(req), id = parts_.size()] {
418  VLOG(2) << "StartUpCb";
419  h2::response<h2::string_body> resp;
420  HttpsClientPool::ClientHandle handle = pool_->GetHandle();
421 
422  VLOG(2) << "BeforeSendUpCb";
423  uint64_t start = base::GetMonotonicMicrosFast();
424  system::error_code ec = handle->Send(req, &resp);
425  CHECK(!ec) << "Error sending to socket " << handle->native_handle() << " " << ec;
426 
427  VLOG(2) << "Upload: " << resp;
428  CHECK(resp.result() == h2::status::ok) << "S3WriteFile::Upload: " << resp;
429 
430  VLOG(1) << "S3Upload tool " << base::GetMonotonicMicrosFast() - start << " micros";
431 
432  auto it = resp.find(h2::field::etag);
433  CHECK(it != resp.end());
434 
435  parts_[id] = string(it->value());
436 
437  if (!resp.keep_alive()) {
438  handle->schedule_reconnect();
439  }
440  };
441  parts_.emplace_back();
442 
443  // We run it immediately
444  fibers::fiber fb(fibers::launch::dispatch, std::move(up_cb));
445  uploads_.emplace_back(std::move(fb));
446 
447  return Status::OK;
448 }
449 
450 // ******************** Helper utilities
451 
452 inline xmlDocPtr XmlRead(absl::string_view xml) {
453  return xmlReadMemory(xml.data(), xml.size(), NULL, NULL, XML_PARSE_COMPACT | XML_PARSE_NOBLANKS);
454 }
455 
456 std::pair<size_t, absl::string_view> ParseXmlObjContents(xmlNodePtr node) {
457  std::pair<size_t, absl::string_view> res;
458 
459  for (xmlNodePtr child = node->children; child; child = child->next) {
460  if (child->type == XML_ELEMENT_NODE) {
461  xmlNodePtr grand = child->children;
462 
463  if (!strcmp(as_char(child->name), "Key")) {
464  CHECK(grand && grand->type == XML_TEXT_NODE);
465  res.second = absl::string_view(as_char(grand->content));
466  } else if (!strcmp(as_char(child->name), "Size")) {
467  CHECK(grand && grand->type == XML_TEXT_NODE);
468  CHECK(absl::SimpleAtoi(as_char(grand->content), &res.first));
469  }
470  }
471  }
472  return res;
473 }
474 
475 void ParseXmlStartUpload(absl::string_view xml_resp, string* upload_id) {
476  xmlDocPtr doc = XmlRead(xml_resp);
477  CHECK(doc);
478 
479  xmlNodePtr root = xmlDocGetRootElement(doc);
480  CHECK_STREQ("InitiateMultipartUploadResult", as_char(root->name));
481 
482  for (xmlNodePtr child = root->children; child; child = child->next) {
483  if (child->type == XML_ELEMENT_NODE) {
484  xmlNodePtr grand = child->children;
485  if (!strcmp(as_char(child->name), "UploadId")) {
486  CHECK(grand && grand->type == XML_TEXT_NODE);
487  upload_id->assign(as_char(grand->content));
488  }
489  }
490  }
491  xmlFreeDoc(doc);
492 }
493 
494 } // namespace
495 
496 namespace detail {
497 
498 std::vector<std::string> ParseXmlListBuckets(absl::string_view xml_resp) {
499  xmlDocPtr doc = XmlRead(xml_resp);
500  CHECK(doc);
501 
502  xmlXPathContextPtr xpathCtx = xmlXPathNewContext(doc);
503 
504  auto register_res = xmlXPathRegisterNs(xpathCtx, BAD_CAST "NS",
505  BAD_CAST "http://s3.amazonaws.com/doc/2006-03-01/");
506  CHECK_EQ(register_res, 0);
507 
508  xmlXPathObjectPtr xpathObj = xmlXPathEvalExpression(
509  BAD_CAST "/NS:ListAllMyBucketsResult/NS:Buckets/NS:Bucket/NS:Name", xpathCtx);
510  CHECK(xpathObj);
511  xmlNodeSetPtr nodes = xpathObj->nodesetval;
512  std::vector<std::string> res;
513  if (nodes) {
514  int size = nodes->nodeNr;
515  for (int i = 0; i < size; ++i) {
516  xmlNodePtr cur = nodes->nodeTab[i];
517  CHECK_EQ(XML_ELEMENT_NODE, cur->type);
518  CHECK(cur->ns);
519  CHECK(nullptr == cur->content);
520 
521  if (cur->children && cur->last == cur->children && cur->children->type == XML_TEXT_NODE) {
522  CHECK(cur->children->content);
523  res.push_back(as_char(cur->children->content));
524  }
525  }
526  }
527 
528  xmlXPathFreeObject(xpathObj);
529  xmlXPathFreeContext(xpathCtx);
530  xmlFreeDoc(doc);
531 
532  return res;
533 }
534 
535 void ParseXmlListObj(absl::string_view xml_obj, S3Bucket::ListObjectCb cb) {
536  xmlDocPtr doc = XmlRead(xml_obj);
537  CHECK(doc);
538 
539  xmlNodePtr root = xmlDocGetRootElement(doc);
540  CHECK_STREQ("ListBucketResult", as_char(root->name));
541 
542  for (xmlNodePtr child = root->children; child; child = child->next) {
543  if (child->type == XML_ELEMENT_NODE) {
544  xmlNodePtr grand = child->children;
545  if (!strcmp(as_char(child->name), "IsTruncated")) {
546  CHECK(grand && grand->type == XML_TEXT_NODE);
547  CHECK_STREQ("false", as_char(grand->content)) << "TBD";
548  } else if (!strcmp(as_char(child->name), "Marker")) {
549  } else if (!strcmp(as_char(child->name), "Contents")) {
550  auto sz_name = ParseXmlObjContents(child);
551  cb(sz_name.first, sz_name.second);
552  }
553  }
554  }
555  xmlFreeDoc(doc);
556 }
557 
558 } // namespace detail
559 
560 const char* S3Bucket::kRootDomain = "s3.amazonaws.com";
561 
562 S3Bucket::S3Bucket(const AWS& aws, http::HttpsClientPool* pool) : aws_(aws), pool_(pool) {
563 }
564 
565 auto S3Bucket::List(absl::string_view glob, bool fs_mode, ListObjectCb cb) -> ListObjectResult {
566  HttpsClientPool::ClientHandle handle = pool_->GetHandle();
567 
568  string url{"/?"};
569 
570  if (!glob.empty()) {
571  url.append("&prefix=");
572  strings::AppendEncodedUrl(glob, &url);
573  }
574 
575  if (fs_mode) {
576  url.append("&delimiter=");
577  strings::AppendEncodedUrl("/", &url);
578  }
579 
580  h2::request<h2::empty_body> req{h2::verb::get, url, 11};
581  h2::response<h2::string_body> resp;
582 
583  aws_.SignEmpty(pool_->domain(), &req);
584  VLOG(1) << "Req: " << req;
585 
586  system::error_code ec = handle->Send(req, &resp);
587 
588  if (ec) {
589  return ToStatus(ec);
590  }
591 
592  if (resp.result() != h2::status::ok) {
593  LOG(INFO) << "ListError: " << resp;
594 
595  return Status(StatusCode::IO_ERROR, string(resp.reason()));
596  }
597  VLOG(1) << "ListResp: " << resp;
598  detail::ParseXmlListObj(resp.body(), std::move(cb));
599 
600  return Status::OK;
601 }
602 
603 bool S3Bucket::SplitToBucketPath(absl::string_view input, absl::string_view* bucket,
604  absl::string_view* path) {
605  if (!absl::ConsumePrefix(&input, kS3Url))
606  return false;
607 
608  auto pos = input.find('/');
609  *bucket = input.substr(0, pos);
610  *path = (pos == absl::string_view::npos) ? absl::string_view{} : input.substr(pos + 1);
611  return true;
612 }
613 
614 string S3Bucket::ToFullPath(absl::string_view bucket, absl::string_view key_path) {
615  return absl::StrCat(kS3Url, bucket, "/", key_path);
616 }
617 
618 ListS3BucketResult ListS3Buckets(const AWS& aws, http::HttpsClientPool* pool) {
619  HttpsClientPool::ClientHandle handle = pool->GetHandle();
620 
621  h2::request<h2::empty_body> req{h2::verb::get, "/", 11};
622  h2::response<h2::string_body> resp;
623 
624  aws.SignEmpty(S3Bucket::kRootDomain, &req);
625 
626  VLOG(1) << "Req: " << req;
627 
628  system::error_code ec = handle->Send(req, &resp);
629 
630  if (ec) {
631  return ToStatus(ec);
632  }
633 
634  if (resp.result() != h2::status::ok) {
635  LOG(INFO) << "Error: " << resp;
636 
637  return Status(StatusCode::IO_ERROR, string(resp.reason()));
638  }
639 
640  VLOG(1) << "ListS3Buckets: " << resp;
641 
642  return detail::ParseXmlListBuckets(resp.body());
643 }
644 
645 StatusObject<file::ReadonlyFile*> OpenS3ReadFile(absl::string_view key_path, const AWS& aws,
646  http::HttpsClientPool* pool,
647  const file::ReadonlyFile::Options& opts) {
648  CHECK(opts.sequential && pool);
649 
650  absl::string_view bucket, obj_path;
651 
652  string read_obj_url{key_path};
653  std::unique_ptr<S3ReadFile> fl(new S3ReadFile(aws, pool, std::move(read_obj_url)));
654  RETURN_IF_ERROR(fl->Open());
655 
656  return fl.release();
657 }
658 
659 StatusObject<file::WriteFile*> OpenS3WriteFile(absl::string_view key_path, const AWS& aws,
660  http::HttpsClientPool* pool) {
661  string url("/");
662 
663  strings::AppendEncodedUrl(key_path, &url);
664 
665  // Signed params must look like key/value pairs. Instead of handling key-only params
666  // in the signing code we just pass empty value here.
667  absl::StrAppend(&url, "?uploads=");
668 
669  h2::request<h2::empty_body> req{h2::verb::post, url, 11};
670  h2::response<h2::string_body> resp;
671 
672  aws.SignEmpty(pool->domain(), &req);
673 
674  HttpsClientPool::ClientHandle handle = pool->GetHandle();
675  system::error_code ec = handle->Send(req, &resp);
676 
677  if (ec) {
678  return ToStatus(ec);
679  }
680 
681  if (resp.result() != h2::status::ok) {
682  LOG(ERROR) << "OpenWriteFile Error: " << resp;
683 
684  return Status(StatusCode::IO_ERROR, string(resp.reason()));
685  }
686  string upload_id;
687  ParseXmlStartUpload(resp.body(), &upload_id);
688 
689  VLOG(1) << "OpenS3WriteFile: " << req << "/" << resp << "UploadId: " << upload_id;
690 
691  return new S3WriteFile(key_path, aws, std::move(upload_id), pool);
692 }
693 
694 bool IsS3Path(absl::string_view path) {
695  return absl::StartsWith(path, kS3Url);
696 }
697 
698 } // namespace util
std::function< void(size_t, absl::string_view)> ListObjectCb
Called with (size, key_name) pairs.
Definition: s3.h:29
ListObjectResult List(absl::string_view glob, bool fs_mode, ListObjectCb cb)
Lists objects for a particular bucket.
Definition: s3.cc:565
Definition: aws.h:18
ClientHandle GetHandle()
Returns https client connection from the pool.