ZIO
Python and C++ interface to ZeroMQ and Zyre
test_tcs.cpp
Go to the documentation of this file.
1 #include "zio/cppzmq.hpp"
2 #include "zio/logging.hpp"
3 #include "zio/main.hpp"
4 #include <unistd.h>
5 #include <chrono>
6 #include <thread>
7 
8 #include <map>
9 
10 const int server_type = ZMQ_SERVER;
11 const int client_type = ZMQ_CLIENT;
12 
14 //const char* addr = "inproc://test_tcs";
15 //const char* addr = "tcp://127.0.0.1:5678";
16 const char* addr = "ipc://test_tcs.ipc";
17 
18 typedef std::chrono::duration<int64_t,std::micro> microseconds_type;
19 
20 // a very very ugly server
21 static
22 void server(zio::socket_t& s)
23 {
24  std::map<uint32_t,uint32_t> rids;
25  std::map<uint32_t, std::vector<int> > tosend;
26 
27  zio::poller_t<> poller;
28  poller.add(s, zio::event_flags::pollin);
29  zio::info("server: loop starts");
30  const auto wait = std::chrono::milliseconds{2000};
31 
32  int64_t ttot=0, tmin=0, tmax=0;
33  int count = 0;
34 
35  int dead = 0;
36 
37  while (true) {
38  zio::info("server: polling {}", count);
40  std::vector<zio::poller_event<>> events(1);
41  try {
42  int rc = poller.wait_all(events, wait);
43  if (rc == 0) {
44  zio::info("server: poll times out");
45  break;
46  }
47  } catch (zio::error_t e) {
48  zio::info("server: poller exception: {}", e.what());
49  return;
50  }
52  const microseconds_type dtus = std::chrono::duration_cast<std::chrono::microseconds>(t1 - t0);
53  const int64_t dt = dtus.count();
54 
55  if (count == 0) {
56  tmin = tmax = dt;
57  }
58  else {
59  tmin = std::min(tmin, dt);
60  tmax = std::max(tmax, dt);
61  }
62  count += 1;
63  ttot += dt;
64  zio::info("server: #{} [{}, {}] <{}> tot={} dt={} [us]", count,
65  tmin, tmax, (ttot/count), ttot, dt);
66 
67 
68  zio::message_t msg;
69  auto res = s.recv(msg);
70  assert(res);
71  uint32_t rid = msg.routing_id();
72  assert (rid > 0);
73 
74  if (msg.size() == 0) {
75  if (dead) {
76  break; // bail after more than 1
77  }
78  dead += 1;
79  }
80 
81  // the "message"
82  int them = *static_cast<int*>(msg.data());
83 
84  zio::debug("server: recvd {} {}", rid, them);
85 
86  if (rids.empty()) {
87  rids[rid]=0;
88  tosend[rid].push_back(them);
89  continue;
90  }
91  if (rids.size() == 1) {
92  uint32_t orid = rids.begin()->first;
93  if (rid == orid) { // more of the same
94  tosend[rid].push_back(them);
95  continue;
96  }
97  // now have 2
98  rids[rid] = orid;
99  rids[orid] = rid;
100  }
101  tosend[rid].push_back(them);
102 
103  for (const auto& rid_v : tosend) {
104  uint32_t rid = rid_v.first;
105  for (auto them : rid_v.second) {
106  uint32_t orid = rids[rid];
107  zio::message_t msg(&them, sizeof(int));
108  msg.set_routing_id(orid);
109  auto ses = s.send(msg, zio::send_flags::none);
110  assert(ses);
111  }
112  }
113  tosend.clear();
114  }
115 }
116 
117 static
118 void client(zio::socket_t& c, int me)
119 {
120  zio::debug("client {}: starts", me);
121 
122  zio::poller_t<> poller;
123  poller.add(c, zio::event_flags::pollin);
124  const auto wait = std::chrono::milliseconds{2000};
125 
126  int zzz = 1000000;
127  zio::info("client {}: sleeps for {}", me, zzz);
128  usleep(zzz);
129  for (int count=0; count<2000; ++count) {
130  zio::message_t msg(&me, sizeof(int));
131  zio::debug("client {}: send", me);
132  c.send(msg, zio::send_flags::none);
133 
134  zio::debug("client {}: polling", me);
135  std::vector<zio::poller_event<>> events(1);
136  try {
137  int rc = poller.wait_all(events, wait);
138  if (rc == 0) {
139  zio::info("client: poll times out");
140  return;
141  }
142  } catch (zio::error_t e) {
143  zio::info("client: poller exception: {}", e.what());
144  return;
145  }
146 
147  zio::debug("client {}: recv", me);
148  auto res = c.recv(msg);
149  assert(res);
150  int them = *static_cast<int*>(msg.data());
151  zio::debug("client {}: got {}", me, them);
152  }
153  zio::debug("client {}: send final", me);
154  c.send(zio::message_t(), zio::send_flags::none);
155 }
156 int main()
157 {
158  zio::init_all();
159  zio::info("test_tcs starting");
160 
162  zio::socket_t s(ctx, server_type);
163  s.bind(addr);
164 
165  std::thread ser(server, std::ref(s));
166  zio::info("sleeping between server and client thread starts");
167  usleep(100000);
168 
169 
170  zio::socket_t c1(ctx, client_type);
171  c1.connect(addr);
172  zio::debug("client 1: connected");
173 
174  zio::socket_t c2(ctx, client_type);
175  c2.connect(addr);
176  zio::debug("client 2: connected");
177 
178  std::thread cli1(client, std::ref(c1), 1);
179  std::thread cli2(client, std::ref(c2), 2);
180 
181  cli1.join();
182  cli2.join();
183  ser.join();
184 
185 }
virtual const char * what() const ZMQ_NOTHROW ZMQ_OVERRIDE
Definition: zmq.hpp:249
const char * addr
inproc hangs. no messages ever get received by server. tcp/ipc okay.
Definition: test_tcs.cpp:16
def now()
Definition: test_ugly.py:10
void server()
const int server_type
Definition: test_tcs.cpp:10
size_t send(const void *buf_, size_t len_, int flags_=0)
Definition: zmq.hpp:1345
void * data() ZMQ_NOTHROW
Definition: zmq.hpp:481
std::chrono::duration< int64_t, std::micro > microseconds_type
Definition: test_tcs.cpp:18
int main()
Definition: test_tcs.cpp:156
const int client_type
Definition: test_tcs.cpp:11
void client(std::string str)
void init_all()
Definition: main.cpp:22
void connect(std::string const &addr)
Definition: zmq.hpp:1324
size_t size() const ZMQ_NOTHROW
Definition: zmq.hpp:488
void bind(std::string const &addr)
Definition: zmq.hpp:1306