ZIO
Python and C++ interface to ZeroMQ and Zyre
util.cpp
Go to the documentation of this file.
1 #include "zio/util.hpp"
2 // #include "zio/logging.hpp"
3 #include <chrono>
4 #include <thread>
5 #include <signal.h>
6 
7 using namespace zio;
8 
9 
11  zio::multipart_t& mmsg)
12 {
13  int stype = sock.getsockopt<int>(ZMQ_TYPE);
14  if (ZMQ_SERVER == stype) {
15  return recv_server(sock, mmsg);
16  }
17  if(ZMQ_ROUTER == stype) {
18  return recv_router(sock, mmsg);
19  }
20  throw std::runtime_error("recv requires SERVER or ROUTER socket");
21 }
22 
23 
25  zio::multipart_t& mmsg)
26 {
27  zio::message_t msg;
28  auto res = server_socket.recv(msg, zio::recv_flags::none);
29  uint32_t routing_id = msg.routing_id();
31  rid.push_back((0xff000000&routing_id) >> 24);
32  rid.push_back((0x00ff0000&routing_id) >> 16);
33  rid.push_back((0x0000ff00&routing_id) >> 8);
34  rid.push_back((0x000000ff&routing_id));
35 
36 
37  mmsg.decode_append(msg);
38  // {
39  // std::stringstream ss;
40  // ss << "zio::recv SERVER msg size " << msg.size()
41  // << ", " << mmsg.size() << " parts \"" << rid << "\"";
42  // zio::debug(ss.str());
43  // }
44  return rid;
45 }
46 
48  zio::multipart_t& mmsg)
49 {
50  mmsg.recv(router_socket);
51  remote_identity_t rid = mmsg.popstr();
52  mmsg.pop(); // empty
53  return rid;
54 }
55 
56 
58  zio::multipart_t& mmsg, remote_identity_t rid)
59 {
60  int stype = sock.getsockopt<int>(ZMQ_TYPE);
61  if (ZMQ_SERVER == stype) {
62  return send_server(sock, mmsg, rid);
63  }
64  if(ZMQ_ROUTER == stype) {
65  return send_router(sock, mmsg, rid);
66  }
67  throw std::runtime_error("send requires SERVER or ROUTER socket");
68 }
69 
70 void zio::send_server(zio::socket_t& server_socket,
71  zio::multipart_t& mmsg, remote_identity_t rid)
72 {
73  zio::message_t msg = mmsg.encode();
74  uint32_t routing_id =
75  0xff000000&(rid[0] << 24) |
76  0x00ff0000&(rid[1] << 16) |
77  0x0000ff00&(rid[2] << 8) |
78  0x000000ff&rid[3];
79  msg.set_routing_id(routing_id);
80  // {
81  // std::stringstream ss;
82  // ss << "zio::send SERVER msg size " << msg.size()
83  // << ", " << mmsg.size() << " parts \"" << rid << "\"";
84  // zio::debug(ss.str());
85  // }
86  server_socket.send(msg, zio::send_flags::none);
87 }
88 
89 void zio::send_router(zio::socket_t& router_socket,
90  zio::multipart_t& mmsg, remote_identity_t rid)
91 {
92  mmsg.pushmem(NULL, 0);
93  mmsg.pushstr(rid);
94  mmsg.send(router_socket);
95 }
96 
97 
99  zio::multipart_t& mmsg)
100 {
101  int stype = socket.getsockopt<int>(ZMQ_TYPE);
102  if (ZMQ_CLIENT == stype) {
103  recv_client(socket, mmsg);
104  return;
105  }
106  if(ZMQ_DEALER == stype) {
107  recv_dealer(socket, mmsg);
108  return;
109  }
110  throw std::runtime_error("recv requires CLIENT or DEALER socket");
111 }
112 
113 void zio::recv_client(zio::socket_t& client_socket,
114  zio::multipart_t& mmsg)
115 {
116  zio::message_t msg;
117  auto res = client_socket.recv(msg, zio::recv_flags::none);
118  mmsg.decode_append(msg);
119  // {
120  // std::stringstream ss;
121  // ss << "zio::recv CLIENT msg size " << msg.size()
122  // << ", " << mmsg.size() << " parts";
123  // zio::debug(ss.str());
124  // }
125  return;
126 }
127 
128 
129 void zio::recv_dealer(zio::socket_t& dealer_socket,
130  zio::multipart_t& mmsg)
131 {
132  mmsg.recv(dealer_socket);
133  mmsg.pop(); // fake being REQ
134  return;
135 }
136 
137 
139  zio::multipart_t& mmsg)
140 {
141  int stype = socket.getsockopt<int>(ZMQ_TYPE);
142  if (ZMQ_CLIENT == stype) {
143  send_client(socket, mmsg);
144  return;
145  }
146  if(ZMQ_DEALER == stype) {
147  send_dealer(socket, mmsg);
148  return;
149  }
150  throw std::runtime_error("send requires CLIENT or DEALER socket");
151 }
152 
153 void zio::send_client(zio::socket_t& client_socket,
154  zio::multipart_t& mmsg)
155 {
156  zio::message_t msg = mmsg.encode();
157  // {
158  // std::stringstream ss;
159  // ss << "zio::send CLIENT msg size " << msg.size()
160  // << ", " << mmsg.size() << " parts";
161  // zio::debug(ss.str());
162  // }
163  client_socket.send(msg, zio::send_flags::none);
164 }
165 
166 void zio::send_dealer(zio::socket_t& dealer_socket,
167  zio::multipart_t& mmsg)
168 {
169  mmsg.pushmem(NULL,0); // pretend to be REQ
170  mmsg.send(dealer_socket);
171 }
172 
173 
174 std::chrono::milliseconds zio::now_ms()
175 {
176  return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
177 }
178 std::chrono::microseconds zio::now_us()
179 {
180  return std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch());
181 }
182 
183 void zio::sleep_ms(std::chrono::milliseconds zzz)
184 {
185  std::this_thread::sleep_for(zzz);
186 }
187 
188 static int s_interrupted = 0;
189 static
190 void s_signal_handler (int signal_value)
191 {
192  s_interrupted = 1;
193 }
195 {
196  return s_interrupted==1;
197 }
198 
199 // Call from main()
201 {
202  struct sigaction action;
203  action.sa_handler = s_signal_handler;
204  action.sa_flags = 0;
205  sigemptyset (&action.sa_mask);
206  sigaction (SIGINT, &action, NULL);
207  sigaction (SIGTERM, &action, NULL);
208 }
void send_serverish(socket_t &socket, multipart_t &mmsg, remote_identity_t rid)
void sleep_ms(std::chrono::milliseconds zzz)
Definition: util.cpp:183
void send_router(socket_t &router_socket, multipart_t &mmsg, remote_identity_t rid)
void send_client(socket_t &client_socket, multipart_t &mmsg)
def now()
Definition: test_ugly.py:10
void catch_signals()
Definition: util.cpp:200
bool interrupted()
Definition: util.cpp:194
size_t send(const void *buf_, size_t len_, int flags_=0)
Definition: zmq.hpp:1345
void send_server(socket_t &server_socket, multipart_t &mmsg, remote_identity_t rid)
remote_identity_t recv_router(socket_t &router_socket, multipart_t &mmsg)
std::chrono::milliseconds now_ms()
Definition: util.cpp:174
remote_identity_t recv_serverish(socket_t &socket, multipart_t &mmsg)
void getsockopt(int option_, void *optval_, size_t *optvallen_) const
Definition: zmq.hpp:1291
void send_dealer(socket_t &dealer_socket, multipart_t &mmsg)
void recv_client(socket_t &client_socket, multipart_t &mmsg)
std::chrono::microseconds now_us()
Definition: util.cpp:178
void recv_dealer(socket_t &dealer_socket, multipart_t &mmsg)
void send_clientish(socket_t &socket, multipart_t &mmsg)
void recv_clientish(socket_t &socket, multipart_t &mmsg)
remote_identity_t recv_server(socket_t &server_socket, multipart_t &mmsg)
std::string remote_identity_t
Definition: util.hpp:24
implementation of ZIO data flow protocol endpoints
Definition: actor.hpp:14