7 #include "base/type_traits.h" 10 #include "mr/mr_types.h" 15 template <
typename OutT>
class TableImplT;
16 inline bool IsBinary(pb::WireFormat::Type tp) {
return tp == pb::WireFormat::LST; }
21 pb::Output* mutable_msg() {
return out_; }
22 const pb::Output& msg()
const {
return *out_; }
24 bool is_binary()
const {
return detail::IsBinary(out_->format().type()); }
31 void SetCompress(pb::Output::CompressType ct,
int level);
32 void SetShardSpec(pb::ShardSpec::Type st,
unsigned modn = 0);
33 void FailUndefinedShard()
const;
37 friend class detail::TableImplT<T>;
39 using CustomShardingFunc = std::function<std::string(
const T&)>;
40 using ModNShardingFunc = std::function<unsigned(
const T&)>;
42 absl::variant<absl::monostate, ShardId, ModNShardingFunc, CustomShardingFunc> shard_op_;
49 Visitor(
const T& t,
unsigned modn) :t_(t), modn_(modn) {}
52 ShardId operator()(
const ModNShardingFunc& func)
const {
return ShardId{func(t_) % modn_}; }
53 ShardId operator()(
const CustomShardingFunc& func)
const {
return ShardId{func(t_)}; }
54 ShardId operator()(absl::monostate ms)
const {
63 template <
typename U>
Output& WithCustomSharding(U&& func) {
64 static_assert(base::is_invocable_r<std::string, U, const T&>::value,
"");
65 shard_op_ = std::forward<U>(func);
66 SetShardSpec(pb::ShardSpec::USER_DEFINED);
71 template <
typename U>
Output& WithModNSharding(
unsigned modn, U&& func) {
72 static_assert(base::is_invocable_r<unsigned, U, const T&>::value,
"");
73 shard_op_ = std::forward<U>(func);
74 SetShardSpec(pb::ShardSpec::MODN, modn);
80 Output& AndCompress(pb::Output::CompressType ct,
int level = -10000);
82 ShardId Shard(
const T& t)
const {
83 auto res = absl::visit(Visitor{t, modn_}, shard_op_);
84 if (absl::holds_alternative<absl::monostate>(res)) {
91 void SetConstantShard(
ShardId sid) { shard_op_ = std::move(sid); }
97 template <
typename OutT>
99 SetCompress(ct, level);