4 #include "util/pprint/file_printer.h" 6 #include <google/protobuf/compiler/importer.h> 8 #include "absl/strings/escaping.h" 9 #include "base/flags.h" 10 #include "base/hash.h" 11 #include "base/logging.h" 12 #include "base/map-util.h" 13 #include "file/list_file.h" 14 #include "file/proto_writer.h" 15 #include "util/pb2json.h" 16 #include "util/plang/plang_parser.hh" 17 #include "util/plang/plang_scanner.h" 18 #include "util/pprint/pprint_utils.h" 20 DEFINE_string(protofiles,
"",
"");
21 DEFINE_string(proto_db_file,
"s3://test/roman/proto_db.lst",
"");
22 DEFINE_string(type,
"",
"");
24 DEFINE_string(where,
"",
"boolean constraint in plang language");
25 DEFINE_bool(sizes,
false,
"Prints a rough estimation of the size of every field");
26 DEFINE_bool(json,
false,
"");
27 DEFINE_bool(raw,
false,
"");
28 DEFINE_string(sample_key,
"",
"");
29 DEFINE_int32(sample_factor, 0,
"If bigger than 0 samples and outputs record once in k times");
30 DEFINE_bool(parallel,
true,
"");
31 DEFINE_bool(count,
false,
"");
39 namespace gpc = gpb::compiler;
41 using strings::AsString;
44 void AddError(
const string& filenname,
int line,
int column,
const string& message) {
45 std::cerr <<
"Error File : " << filenname <<
" : " << message << std::endl;
49 static const gpb::Descriptor* FindDescriptor() {
50 CHECK(!FLAGS_type.empty()) <<
"type must be filled. For example: --type=foursquare.Category";
51 const gpb::DescriptorPool* gen_pool = gpb::DescriptorPool::generated_pool();
52 const gpb::Descriptor* descriptor = gen_pool->FindMessageTypeByName(FLAGS_type);
56 gpc::DiskSourceTree tree;
57 tree.MapPath(
"START_FILE", FLAGS_protofiles);
59 gpc::Importer importer(&tree, &collector);
60 if (!FLAGS_protofiles.empty()) {
62 CHECK(importer.Import(
"START_FILE"));
64 descriptor = importer.pool()->FindMessageTypeByName(FLAGS_type);
67 static gpb::SimpleDescriptorDatabase proto_db;
68 static gpb::DescriptorPool proto_db_pool(&proto_db);
69 file::ListReader reader(FLAGS_proto_db_file);
72 while (reader.ReadRecord(&record, &record_buf)) {
73 gpb::FileDescriptorProto* fdp =
new gpb::FileDescriptorProto;
74 CHECK(fdp->ParseFromArray(record.data(), record.size()));
75 proto_db.AddAndOwn(fdp);
77 descriptor = proto_db_pool.FindMessageTypeByName(FLAGS_type);
78 CHECK(descriptor) <<
"Can not find " << FLAGS_type <<
" in the proto pool.";
82 static bool ShouldSkip(
const gpb::Message& msg,
const FdPath& fd_path) {
83 if (FLAGS_sample_factor <= 0 || FLAGS_sample_key.empty())
85 const string* val =
nullptr;
87 auto cb = [&val, &buf](
const gpb::Message& msg,
const gpb::FieldDescriptor* fd, int, int) {
88 const gpb::Reflection* refl = msg.GetReflection();
89 val = &refl->GetStringReference(msg, fd, &buf);
91 fd_path.ExtractValue(msg, cb);
94 uint32 num = base::Fingerprint(*val);
96 return (num % FLAGS_sample_factor) != 0;
101 typedef PrintSharedData* SharedData;
103 void InitShared(SharedData d) {
109 local_msg_.reset(to_clone->New());
111 if (!FLAGS_sample_key.empty()) {
112 fd_path_ =
FdPath(to_clone->GetDescriptor(), FLAGS_sample_key);
113 CHECK(!fd_path_.IsRepeated());
114 CHECK_EQ(gpb::FieldDescriptor::CPPTYPE_STRING, fd_path_.path().back()->cpp_type());
118 void operator()(
const std::string& obj) {
120 std::lock_guard<mutex> lock(shared_data_->m);
121 std::cout << absl::Utf8SafeCEscape(obj) <<
"\n";
124 CHECK(local_msg_->ParseFromString(obj));
125 if (shared_data_->expr && !plang::EvaluateBoolExpr(*shared_data_->expr, *local_msg_))
128 if (ShouldSkip(*local_msg_, fd_path_))
130 std::lock_guard<mutex> lock(shared_data_->m);
133 shared_data_->size_summarizer->AddSizes(*local_msg_);
138 string str = Pb2Json(*local_msg_, options_);
139 std::cout << str <<
"\n";
141 shared_data_->printer->Output(*local_msg_);
146 std::unique_ptr<gpb::Message> local_msg_;
148 SharedData shared_data_;
152 FilePrinter::FilePrinter() {}
153 FilePrinter::~FilePrinter() {}
155 void FilePrinter::Init(
const string& fname) {
158 if (!FLAGS_where.empty()) {
159 std::istringstream istr(FLAGS_where);
161 plang::Parser parser(&scanner, &test_expr_);
162 CHECK_EQ(0, parser.parse()) <<
"Could not parse " << FLAGS_where;
169 size_summarizer_.reset(
new SizeSummarizer(descr_msg_->GetDescriptor()));
170 printer_.reset(
new Printer(descr_msg_->GetDescriptor(), field_printer_cb_));
175 pool_.reset(
new TaskPool(
"pool", 10));
177 shared_data_.size_summarizer = size_summarizer_.get();
178 shared_data_.printer = printer_.get();
179 shared_data_.expr = test_expr_.get();
180 pool_->SetSharedData(&shared_data_);
181 pool_->Launch(descr_msg_.get(), options_);
183 if (FLAGS_parallel) {
184 LOG(INFO) <<
"Running in parallel " << pool_->thread_count() <<
" threads";
188 auto FilePrinter::GetDescriptor() const -> const Descriptor* {
189 return descr_msg_ ? descr_msg_->GetDescriptor() :
nullptr;
192 Status FilePrinter::Run() {
204 if (FLAGS_parallel) {
205 pool_->RunTask(AsString(record));
207 pool_->RunInline(AsString(record));
211 pool_->WaitForTasksToComplete();
213 if (size_summarizer_.get())
214 std::cout << *size_summarizer_ <<
"\n";
219 void ListReaderPrinter::LoadFile(
const std::string& fname) {
220 auto corrupt_cb = [
this](
size_t bytes,
const util::Status& status) { st_ = status; };
222 reader_.reset(
new ListReader(fname,
false, corrupt_cb));
224 if (!FLAGS_raw && !FLAGS_count) {
225 std::map<std::string, std::string> meta;
226 if (!reader_->GetMetaData(&meta)) {
227 LOG(FATAL) <<
"Error fetching metadata from " << fname;
229 string ptype = FindValueWithDefault(meta, file::kProtoTypeKey,
string());
230 string fd_set = FindValueWithDefault(meta, file::kProtoSetKey,
string());
231 if (!ptype.empty() && !fd_set.empty())
232 descr_msg_.reset(AllocateMsgByMeta(ptype, fd_set));
234 descr_msg_.reset(AllocateMsgFromDescr(FindDescriptor()));
239 bool res = reader_->ReadRecord(record, &record_buf_);
246 void ListReaderPrinter::PostRun() {
247 LOG(INFO) <<
"Data bytes: " << reader_->read_data_bytes()
248 <<
" header bytes: " << reader_->read_header_bytes();