5 #include "util/aws/s3.h" 7 #include <libxml/xpath.h> 8 #include <libxml/xpathInternals.h> 10 #include <boost/asio/ssl/error.hpp> 11 #include <boost/beast/http/dynamic_body.hpp> 12 #include <boost/beast/http/empty_body.hpp> 13 #include <boost/beast/http/string_body.hpp> 15 #include "absl/strings/match.h" 16 #include "absl/strings/str_cat.h" 17 #include "absl/strings/str_format.h" 18 #include "absl/strings/strip.h" 19 #include "absl/types/optional.h" 20 #include "base/logging.h" 21 #include "base/walltime.h" 22 #include "strings/escaping.h" 23 #include "util/asio/io_context.h" 24 #include "util/aws/aws.h" 25 #include "util/http/http_common.h" 26 #include "util/http/https_client.h" 27 #include "util/http/https_client_pool.h" 31 DEFINE_uint32(s3_upload_buf_mb, 5,
"Upload buffer size in MB. must be at least 5MB");
33 using file::ReadonlyFile;
34 using http::HttpsClientPool;
36 using namespace boost;
37 namespace h2 = beast::http;
40 using bb_str_view = ::boost::beast::string_view;
44 constexpr
char kS3Url[] =
"s3://";
47 inline Status ToStatus(const ::boost::system::error_code& ec) {
48 return ec ? Status(StatusCode::IO_ERROR, absl::StrCat(ec.value(),
": ", ec.message()))
52 inline absl::string_view absl_sv(
const bb_str_view s) {
53 return absl::string_view{s.data(), s.size()};
56 std::ostream& operator<<(std::ostream& os,
const h2::response<h2::buffer_body>& msg) {
57 os << msg.reason() << std::endl;
58 for (
const auto& f : msg) {
59 os << f.name_string() <<
" : " << f.value() << std::endl;
61 os <<
"-------------------------";
67 inline void SetRange(
size_t from,
size_t to, h2::fields* flds) {
68 string tmp = absl::StrCat(
"bytes=", from,
"-");
69 if (to < kuint64max) {
70 absl::StrAppend(&tmp, to - 1);
72 flds->set(h2::field::range, std::move(tmp));
75 inline const char* as_char(
const xmlChar* var) {
76 return reinterpret_cast<const char*>(var);
79 class S3ReadFile :
public ReadonlyFile {
81 using error_code = ::boost::system::error_code;
82 using Parser = h2::response_parser<h2::buffer_body>;
85 S3ReadFile(
const AWS& aws, HttpsClientPool* pool,
string read_obj_url)
86 : aws_(aws), pool_(pool), read_obj_url_(std::move(read_obj_url)) {
89 virtual ~S3ReadFile() final;
94 StatusObject<
size_t> Read(
size_t offset, const strings::MutableByteRange& range) final;
99 size_t Size() const final {
103 int Handle() const final {
115 HttpsClientPool* pool_;
117 const string read_obj_url_;
118 HttpsClientPool::ClientHandle https_handle_;
121 size_t size_ = 0, offs_ = 0;
124 class S3WriteFile :
public file::WriteFile {
133 S3WriteFile(absl::string_view name,
const AWS& aws,
string upload_id, HttpsClientPool* pool);
139 Status Write(const uint8* buffer, uint64 length) final;
142 size_t FillBuf(const uint8* buffer,
size_t length);
149 beast::multi_buffer body_mb_;
150 size_t uploaded_ = 0;
151 HttpsClientPool* pool_;
152 std::vector<
string> parts_;
153 std::vector<fibers::fiber> uploads_;
156 S3ReadFile::~S3ReadFile() {
159 Status S3ReadFile::Open() {
160 string url = absl::StrCat(
"/", read_obj_url_);
161 h2::request<h2::empty_body> req{h2::verb::get, url, 11};
164 SetRange(offs_, kuint64max, &req);
166 VLOG(1) <<
"Unsigned request: " << req;
168 aws_.SignEmpty(pool_->domain(), &req);
171 HttpsClientPool::ClientHandle handle = pool_->GetHandle();
173 system::error_code ec = handle->Send(req);
179 parser_.body_limit(kuint64max);
180 ec = handle->ReadHeader(&parser_);
185 CHECK(parser_.keep_alive()) <<
"TBD";
186 const auto& msg = parser_.get();
188 if (msg.result() != h2::status::ok) {
189 LOG(INFO) <<
"OpenError: " << msg;
191 return Status(StatusCode::IO_ERROR,
string(msg.reason()));
193 VLOG(1) <<
"HeaderResp(" << handle->native_handle() <<
"): " << msg;
195 auto content_len_it = msg.find(h2::field::content_length);
196 if (content_len_it != msg.end()) {
197 size_t content_sz = 0;
198 CHECK(absl::SimpleAtoi(absl_sv(content_len_it->value()), &content_sz));
201 CHECK_EQ(size_, content_sz + offs_) <<
"File size has changed underneath during reopen";
206 https_handle_ = std::move(handle);
210 StatusObject<size_t> S3ReadFile::Read(
size_t offset,
const strings::MutableByteRange& range) {
211 CHECK(!range.empty());
213 if (offset != offs_) {
214 return Status(StatusCode::INVALID_ARGUMENT,
"Only sequential access supported");
218 if (parser_.is_done()) {
222 size_t read_sofar = 0;
223 while (read_sofar < range.size()) {
226 auto& body = parser()->get().body();
227 auto& left_available = body.size;
228 body.data = range.data() + read_sofar;
229 left_available = range.size() - read_sofar;
231 error_code ec = https_handle_->Read(parser());
232 size_t http_read = (range.size() - read_sofar) - left_available;
234 if (!ec || ec == h2::error::need_buffer) {
235 DVLOG(2) <<
"Read " << http_read <<
" bytes from " << offset <<
" with capacity " 236 << range.size() <<
"ec: " << ec;
242 CHECK(left_available == 0 || !ec);
243 return http_read + read_sofar;
246 if (ec == h2::error::partial_message) {
248 VLOG(1) <<
"Got partial_message, socket status: " 249 << https_handle_->client()->next_layer().status() <<
", socket " 250 << https_handle_->native_handle();
253 read_sofar += http_read;
254 ec = asio::ssl::error::stream_truncated;
257 if (ec == asio::ssl::error::stream_truncated) {
258 VLOG(1) <<
"Stream " << read_obj_url_ <<
" truncated at " << offs_ <<
"/" << size_;
259 https_handle_.reset();
261 RETURN_IF_ERROR(Open());
262 VLOG(1) <<
"Reopened the file, new size: " << size_;
266 LOG(ERROR) <<
"ec: " << ec <<
"/" << ec.message() <<
" at " << offset <<
"/" << size_;
267 LOG(ERROR) <<
"FiberSocket status: " << https_handle_->client()->next_layer().status();
277 Status S3ReadFile::Close() {
278 if (https_handle_ && parser()) {
279 if (!parser()->is_done()) {
281 https_handle_->schedule_reconnect();
284 https_handle_.reset();
289 S3WriteFile::S3WriteFile(absl::string_view name,
const AWS& aws,
string upload_id,
290 HttpsClientPool* pool)
291 : file::WriteFile(name), aws_(aws), upload_id_(std::move(upload_id)),
292 body_mb_(FLAGS_s3_upload_buf_mb * (1 << 20)), pool_(pool) {
295 bool S3WriteFile::Close() {
296 CHECK(pool_->io_context().InContextThread());
298 auto status = Upload();
300 LOG(ERROR) <<
"Error uploading " << status;
304 VLOG(1) <<
"Joining with " << uploads_.size() <<
" fibers";
305 for (
auto& f : uploads_) {
315 strings::AppendEncodedUrl(create_file_name_, &url);
319 absl::StrAppend(&url,
"?uploadId=", upload_id_);
321 h2::request<h2::string_body> req{h2::verb::post, url, 11};
322 h2::response<h2::string_body> resp;
324 req.set(h2::field::content_type, http::kXmlMime);
325 auto& body = req.body();
326 body = R
"(<?xml version="1.0" encoding="UTF-8"?> 327 <CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">)"; 329 for (
size_t i = 0; i < parts_.size(); ++i) {
330 absl::StrAppend(&body,
"<Part><ETag>\"", parts_[i],
"\"</ETag><PartNumber>", i + 1);
331 absl::StrAppend(&body,
"</PartNumber></Part>\n");
333 body.append(
"</CompleteMultipartUpload>");
335 req.prepare_payload();
338 detail::Sha256String(req.body(), sha256);
339 aws_.Sign(pool_->domain(), absl::string_view{sha256, 64}, &req);
341 uint64_t start = base::GetMonotonicMicrosFast();
342 HttpsClientPool::ClientHandle handle = pool_->
GetHandle();
343 system::error_code ec = handle->Send(req, &resp);
346 VLOG(1) <<
"Error sending to socket " << handle->native_handle() <<
" " << ec;
350 if (resp.result() != h2::status::ok) {
351 LOG(ERROR) <<
"S3WriteFile::Close: " << req <<
"/ " << resp;
355 VLOG(1) <<
"S3Close took " << base::GetMonotonicMicrosFast() - start <<
" micros";
361 bool S3WriteFile::Open() {
362 LOG(FATAL) <<
"Should not be called";
367 Status S3WriteFile::Write(
const uint8* buffer, uint64 length) {
369 size_t written = FillBuf(buffer, length);
370 if (body_mb_.size() < body_mb_.max_size())
374 RETURN_IF_ERROR(Upload());
380 size_t S3WriteFile::FillBuf(
const uint8* buffer,
size_t length) {
381 size_t prepare_size = std::min(length, body_mb_.max_size() - body_mb_.size());
382 auto mbs = body_mb_.prepare(prepare_size);
384 for (
auto mb : mbs) {
385 memcpy(mb.data(), buffer + offs, mb.size());
388 CHECK_EQ(offs, prepare_size);
389 body_mb_.commit(prepare_size);
394 Status S3WriteFile::Upload() {
395 size_t body_size = body_mb_.size();
404 const char* kFakeSha =
"UNSIGNED-PAYLOAD";
405 strings::AppendEncodedUrl(create_file_name_, &url);
406 absl::StrAppend(&url,
"?uploadId=", upload_id_);
407 absl::StrAppend(&url,
"&partNumber=", parts_.size() + 1);
409 h2::request<h2::dynamic_body> req{h2::verb::put, url, 11};
410 req.set(h2::field::content_type, http::kBinMime);
412 req.body() = std::move(body_mb_);
413 req.prepare_payload();
415 aws_.Sign(pool_->domain(), absl::string_view{kFakeSha}, &req);
417 auto up_cb = [
this, req = std::move(req),
id = parts_.size()] {
418 VLOG(2) <<
"StartUpCb";
419 h2::response<h2::string_body> resp;
420 HttpsClientPool::ClientHandle handle = pool_->
GetHandle();
422 VLOG(2) <<
"BeforeSendUpCb";
423 uint64_t start = base::GetMonotonicMicrosFast();
424 system::error_code ec = handle->Send(req, &resp);
425 CHECK(!ec) <<
"Error sending to socket " << handle->native_handle() <<
" " << ec;
427 VLOG(2) <<
"Upload: " << resp;
428 CHECK(resp.result() == h2::status::ok) <<
"S3WriteFile::Upload: " << resp;
430 VLOG(1) <<
"S3Upload tool " << base::GetMonotonicMicrosFast() - start <<
" micros";
432 auto it = resp.find(h2::field::etag);
433 CHECK(it != resp.end());
435 parts_[id] = string(it->value());
437 if (!resp.keep_alive()) {
438 handle->schedule_reconnect();
441 parts_.emplace_back();
444 fibers::fiber fb(fibers::launch::dispatch, std::move(up_cb));
445 uploads_.emplace_back(std::move(fb));
452 inline xmlDocPtr XmlRead(absl::string_view xml) {
453 return xmlReadMemory(xml.data(), xml.size(), NULL, NULL, XML_PARSE_COMPACT | XML_PARSE_NOBLANKS);
456 std::pair<size_t, absl::string_view> ParseXmlObjContents(xmlNodePtr node) {
457 std::pair<size_t, absl::string_view> res;
459 for (xmlNodePtr child = node->children; child; child = child->next) {
460 if (child->type == XML_ELEMENT_NODE) {
461 xmlNodePtr grand = child->children;
463 if (!strcmp(as_char(child->name),
"Key")) {
464 CHECK(grand && grand->type == XML_TEXT_NODE);
465 res.second = absl::string_view(as_char(grand->content));
466 }
else if (!strcmp(as_char(child->name),
"Size")) {
467 CHECK(grand && grand->type == XML_TEXT_NODE);
468 CHECK(absl::SimpleAtoi(as_char(grand->content), &res.first));
475 void ParseXmlStartUpload(absl::string_view xml_resp,
string* upload_id) {
476 xmlDocPtr doc = XmlRead(xml_resp);
479 xmlNodePtr root = xmlDocGetRootElement(doc);
480 CHECK_STREQ(
"InitiateMultipartUploadResult", as_char(root->name));
482 for (xmlNodePtr child = root->children; child; child = child->next) {
483 if (child->type == XML_ELEMENT_NODE) {
484 xmlNodePtr grand = child->children;
485 if (!strcmp(as_char(child->name),
"UploadId")) {
486 CHECK(grand && grand->type == XML_TEXT_NODE);
487 upload_id->assign(as_char(grand->content));
498 std::vector<std::string> ParseXmlListBuckets(absl::string_view xml_resp) {
499 xmlDocPtr doc = XmlRead(xml_resp);
502 xmlXPathContextPtr xpathCtx = xmlXPathNewContext(doc);
504 auto register_res = xmlXPathRegisterNs(xpathCtx, BAD_CAST
"NS",
505 BAD_CAST
"http://s3.amazonaws.com/doc/2006-03-01/");
506 CHECK_EQ(register_res, 0);
508 xmlXPathObjectPtr xpathObj = xmlXPathEvalExpression(
509 BAD_CAST
"/NS:ListAllMyBucketsResult/NS:Buckets/NS:Bucket/NS:Name", xpathCtx);
511 xmlNodeSetPtr nodes = xpathObj->nodesetval;
512 std::vector<std::string> res;
514 int size = nodes->nodeNr;
515 for (
int i = 0; i < size; ++i) {
516 xmlNodePtr cur = nodes->nodeTab[i];
517 CHECK_EQ(XML_ELEMENT_NODE, cur->type);
519 CHECK(
nullptr == cur->content);
521 if (cur->children && cur->last == cur->children && cur->children->type == XML_TEXT_NODE) {
522 CHECK(cur->children->content);
523 res.push_back(as_char(cur->children->content));
528 xmlXPathFreeObject(xpathObj);
529 xmlXPathFreeContext(xpathCtx);
535 void ParseXmlListObj(absl::string_view xml_obj, S3Bucket::ListObjectCb cb) {
536 xmlDocPtr doc = XmlRead(xml_obj);
539 xmlNodePtr root = xmlDocGetRootElement(doc);
540 CHECK_STREQ(
"ListBucketResult", as_char(root->name));
542 for (xmlNodePtr child = root->children; child; child = child->next) {
543 if (child->type == XML_ELEMENT_NODE) {
544 xmlNodePtr grand = child->children;
545 if (!strcmp(as_char(child->name),
"IsTruncated")) {
546 CHECK(grand && grand->type == XML_TEXT_NODE);
547 CHECK_STREQ(
"false", as_char(grand->content)) <<
"TBD";
548 }
else if (!strcmp(as_char(child->name),
"Marker")) {
549 }
else if (!strcmp(as_char(child->name),
"Contents")) {
550 auto sz_name = ParseXmlObjContents(child);
551 cb(sz_name.first, sz_name.second);
560 const char* S3Bucket::kRootDomain =
"s3.amazonaws.com";
566 HttpsClientPool::ClientHandle handle = pool_->
GetHandle();
571 url.append(
"&prefix=");
572 strings::AppendEncodedUrl(glob, &url);
576 url.append(
"&delimiter=");
577 strings::AppendEncodedUrl(
"/", &url);
580 h2::request<h2::empty_body> req{h2::verb::get, url, 11};
581 h2::response<h2::string_body> resp;
583 aws_.SignEmpty(pool_->domain(), &req);
584 VLOG(1) <<
"Req: " << req;
586 system::error_code ec = handle->Send(req, &resp);
592 if (resp.result() != h2::status::ok) {
593 LOG(INFO) <<
"ListError: " << resp;
595 return Status(StatusCode::IO_ERROR,
string(resp.reason()));
597 VLOG(1) <<
"ListResp: " << resp;
598 detail::ParseXmlListObj(resp.body(), std::move(cb));
603 bool S3Bucket::SplitToBucketPath(absl::string_view input, absl::string_view* bucket,
604 absl::string_view* path) {
605 if (!absl::ConsumePrefix(&input, kS3Url))
608 auto pos = input.find(
'/');
609 *bucket = input.substr(0, pos);
610 *path = (pos == absl::string_view::npos) ? absl::string_view{} : input.substr(pos + 1);
614 string S3Bucket::ToFullPath(absl::string_view bucket, absl::string_view key_path) {
615 return absl::StrCat(kS3Url, bucket,
"/", key_path);
618 ListS3BucketResult ListS3Buckets(
const AWS& aws, http::HttpsClientPool* pool) {
619 HttpsClientPool::ClientHandle handle = pool->GetHandle();
621 h2::request<h2::empty_body> req{h2::verb::get,
"/", 11};
622 h2::response<h2::string_body> resp;
624 aws.SignEmpty(S3Bucket::kRootDomain, &req);
626 VLOG(1) <<
"Req: " << req;
628 system::error_code ec = handle->Send(req, &resp);
634 if (resp.result() != h2::status::ok) {
635 LOG(INFO) <<
"Error: " << resp;
637 return Status(StatusCode::IO_ERROR,
string(resp.reason()));
640 VLOG(1) <<
"ListS3Buckets: " << resp;
642 return detail::ParseXmlListBuckets(resp.body());
645 StatusObject<file::ReadonlyFile*> OpenS3ReadFile(absl::string_view key_path,
const AWS& aws,
646 http::HttpsClientPool* pool,
647 const file::ReadonlyFile::Options& opts) {
648 CHECK(opts.sequential && pool);
650 absl::string_view bucket, obj_path;
652 string read_obj_url{key_path};
653 std::unique_ptr<S3ReadFile> fl(
new S3ReadFile(aws, pool, std::move(read_obj_url)));
654 RETURN_IF_ERROR(fl->Open());
659 StatusObject<file::WriteFile*> OpenS3WriteFile(absl::string_view key_path,
const AWS& aws,
660 http::HttpsClientPool* pool) {
663 strings::AppendEncodedUrl(key_path, &url);
667 absl::StrAppend(&url,
"?uploads=");
669 h2::request<h2::empty_body> req{h2::verb::post, url, 11};
670 h2::response<h2::string_body> resp;
672 aws.SignEmpty(pool->domain(), &req);
674 HttpsClientPool::ClientHandle handle = pool->GetHandle();
675 system::error_code ec = handle->Send(req, &resp);
681 if (resp.result() != h2::status::ok) {
682 LOG(ERROR) <<
"OpenWriteFile Error: " << resp;
684 return Status(StatusCode::IO_ERROR,
string(resp.reason()));
687 ParseXmlStartUpload(resp.body(), &upload_id);
689 VLOG(1) <<
"OpenS3WriteFile: " << req <<
"/" << resp <<
"UploadId: " << upload_id;
691 return new S3WriteFile(key_path, aws, std::move(upload_id), pool);
694 bool IsS3Path(absl::string_view path) {
695 return absl::StartsWith(path, kS3Url);
std::function< void(size_t, absl::string_view)> ListObjectCb
Called with (size, key_name) pairs.
ListObjectResult List(absl::string_view glob, bool fs_mode, ListObjectCb cb)
Lists objects for a particular bucket.
ClientHandle GetHandle()
Returns https client connection from the pool.