rpc_test_utils.cc
1 // Copyright 2018, Beeri 15. All rights reserved.
2 // Author: Roman Gershman (romange@gmail.com)
3 //
4 #include "util/rpc/rpc_test_utils.h"
5 
6 #include <boost/fiber/operations.hpp>
7 
8 #include "absl/strings/numbers.h"
9 #include "absl/strings/strip.h"
10 #include "base/logging.h"
11 
12 #include "util/asio/accept_server.h"
13 #include "util/asio/fiber_socket.h"
14 
15 using namespace std;
16 using namespace boost;
17 using namespace chrono;
18 
19 DEFINE_uint32(rpc_test_io_pool, 0, "Number of IO loops");
20 
21 namespace util {
22 namespace rpc {
23 
24 TestBridge::~TestBridge() {
25 }
26 
27 void TestBridge::Join() {
28  this_fiber::sleep_for(milliseconds(10));
29 }
30 
31 void TestBridge::HandleEnvelope(uint64_t rpc_id, Envelope* envelope, EnvelopeWriter writer) {
32  VLOG(1) << "Got " << rpc_id << ", hs=" << envelope->header.size()
33  << ", ls=" << envelope->letter.size();
34  if (clear_) {
35  envelope->Clear();
36  }
37  absl::string_view header(strings::charptr(envelope->header.data()), envelope->header.size());
38 
39  if (absl::ConsumePrefix(&header, "repeat")) {
40  uint32_t repeat = 0;
41  CHECK(absl::SimpleAtoi(header, &repeat));
42  for (uint32_t i = 0; i < repeat; ++i) {
43  Envelope tmp;
44  Copy(envelope->letter, &tmp.letter);
45  string h = string("cont:") + to_string(i + 1 < repeat);
46  Copy(h, &tmp.header);
47  writer(std::move(tmp));
48  }
49  return;
50  }
51 
52  if (absl::ConsumePrefix(&header, "sleep")) {
53  uint32_t msec = 0;
54  CHECK(absl::SimpleAtoi(header, &msec));
55  this_fiber::sleep_for(milliseconds(msec));
56  }
57 
58  writer(std::move(*envelope));
59 }
60 
61 ServerTest::ServerTest() {}
62 
63 void ServerTest::SetUp() {
64  pool_.reset(new IoContextPool(FLAGS_rpc_test_io_pool));
65  pool_->Run();
66  service_.reset(new TestInterface);
67  server_.reset(new AcceptServer(pool_.get()));
68  port_ = server_->AddListener(0, service_.get());
69 
70  server_->Run();
71 
72  sock2_ = std::make_unique<FiberSyncSocket>("localhost", std::to_string(port_),
73  &pool_->GetNextContext());
74 
75  ec_ = sock2_->ClientWaitToConnect(1000);
76  CHECK(!ec_) << ec_.message();
77  VLOG(1) << "Sock2 created " << sock2_->native_handle();
78 }
79 
80 void ServerTest::TearDown() {
81  VLOG(1) << "ServerTest::TearDown Start";
82  server_.reset();
83  sock2_.reset();
84  pool_->Stop();
85  VLOG(1) << "ServerTest::TearDown End";
86 }
87 
88 } // namespace rpc
89 } // namespace util