4 #include "util/rpc/rpc_test_utils.h" 6 #include <boost/fiber/operations.hpp> 8 #include "absl/strings/numbers.h" 9 #include "absl/strings/strip.h" 10 #include "base/logging.h" 12 #include "util/asio/accept_server.h" 13 #include "util/asio/fiber_socket.h" 16 using namespace boost;
17 using namespace chrono;
19 DEFINE_uint32(rpc_test_io_pool, 0,
"Number of IO loops");
24 TestBridge::~TestBridge() {
27 void TestBridge::Join() {
28 this_fiber::sleep_for(milliseconds(10));
31 void TestBridge::HandleEnvelope(uint64_t rpc_id, Envelope* envelope, EnvelopeWriter writer) {
32 VLOG(1) <<
"Got " << rpc_id <<
", hs=" << envelope->header.size()
33 <<
", ls=" << envelope->letter.size();
37 absl::string_view header(strings::charptr(envelope->header.data()), envelope->header.size());
39 if (absl::ConsumePrefix(&header,
"repeat")) {
41 CHECK(absl::SimpleAtoi(header, &repeat));
42 for (uint32_t i = 0; i < repeat; ++i) {
44 Copy(envelope->letter, &tmp.letter);
45 string h = string(
"cont:") + to_string(i + 1 < repeat);
47 writer(std::move(tmp));
52 if (absl::ConsumePrefix(&header,
"sleep")) {
54 CHECK(absl::SimpleAtoi(header, &msec));
55 this_fiber::sleep_for(milliseconds(msec));
58 writer(std::move(*envelope));
61 ServerTest::ServerTest() {}
63 void ServerTest::SetUp() {
64 pool_.reset(
new IoContextPool(FLAGS_rpc_test_io_pool));
66 service_.reset(
new TestInterface);
67 server_.reset(
new AcceptServer(pool_.get()));
68 port_ = server_->AddListener(0, service_.get());
72 sock2_ = std::make_unique<FiberSyncSocket>(
"localhost", std::to_string(port_),
73 &pool_->GetNextContext());
75 ec_ = sock2_->ClientWaitToConnect(1000);
76 CHECK(!ec_) << ec_.message();
77 VLOG(1) <<
"Sock2 created " << sock2_->native_handle();
80 void ServerTest::TearDown() {
81 VLOG(1) <<
"ServerTest::TearDown Start";
85 VLOG(1) <<
"ServerTest::TearDown End";