ZIO
Python and C++ interface to ZeroMQ and Zyre
domo_worker.cpp
Go to the documentation of this file.
1 #include "zio/domo/worker.hpp"
2 #include "zio/domo/protocol.hpp"
3 #include "zio/logging.hpp"
4 
5 
6 using namespace zio::domo;
7 
8 Worker::Worker(zio::socket_t& sock, std::string broker_address,
9  std::string service)
10  : m_sock(sock)
11  , m_address(broker_address)
12  , m_service(service)
13 {
14  zio::debug("zio::domo::Worker constructing on " + m_address);
15  int stype = m_sock.getsockopt<int>(ZMQ_TYPE);
16  if (ZMQ_CLIENT == stype) {
17  really_recv = recv_client;
18  really_send = send_client;
19  }
20  else if (ZMQ_DEALER == stype) {
21  really_recv = recv_dealer;
22  really_send = send_dealer;
23  }
24  else {
25  throw std::runtime_error("worker must be given DEALER or CLIENT socket");
26  }
27 
28  connect_to_broker(false);
29 }
30 
32 {
33  zio::debug("zio::domo::Worker destructing");
34  m_sock.disconnect(m_address);
35 }
36 
37 void Worker::connect_to_broker(bool reconnect)
38 {
39  if (reconnect) {
40  zio::debug("zio::domo::Worker disconnect from " + m_address);
41  m_sock.disconnect(m_address);
42  }
43 
44  int linger=0;
45  m_sock.setsockopt(ZMQ_LINGER, linger);
46  // set socket routing ID?
47  m_sock.connect(m_address);
48  zio::debug("zio::domo::Worker connect to " + m_address);
49 
50  zio::multipart_t mmsg;
51  mmsg.pushstr(m_service); // 3
52  mmsg.pushstr(mdp::worker::ready); // 2
53  mmsg.pushstr(mdp::worker::ident); // 1
54  really_send(m_sock, mmsg);
55 
56  m_liveness = HEARTBEAT_LIVENESS;
57  m_heartbeat_at = now_ms() + m_heartbeat;
58 }
59 
60 void Worker::send(zio::multipart_t& reply)
61 {
62  if (reply.empty()) {
63  return;
64  }
65  reply.pushmem(NULL,0); // 4
66  reply.pushstr(m_reply_to); // 3
67  reply.pushstr(mdp::worker::reply); // 2
68  reply.pushstr(mdp::worker::ident); // 1
69  really_send(m_sock, reply);
70 }
71 
72 void Worker::recv(zio::multipart_t& request)
73 {
74  zio::poller_t<> poller;
75  poller.add(m_sock, zio::event_flags::pollin);
76 
77  std::vector< zio::poller_event<> > events(1);
78  int rc = poller.wait_all(events, m_heartbeat);
79  if (rc > 0) { // got one
80  zio::multipart_t mmsg;
81  really_recv(m_sock, mmsg);
82  m_liveness = HEARTBEAT_LIVENESS;
83  std::string header = mmsg.popstr(); // 1
84  assert(header == mdp::worker::ident);
85  std::string command = mmsg.popstr(); // 2
86  if (mdp::worker::request == command) {
87  m_reply_to = mmsg.popstr(); // 3
88  mmsg.pop(); // 4
89  request = std::move(mmsg); // 5+
90  return;
91  }
92  else if (mdp::worker::heartbeat == command) {
93  // nothing
94  }
95  else if (mdp::worker::disconnect == command) {
96  connect_to_broker();
97  }
98  else {
99  zio::warn("zio::domo::Worker invalid command: " + command);
100  }
101  }
102  else { // timeout
103  --m_liveness;
104  if (m_liveness == 0) {
105  zio::debug("zio::domo::Worker disconnect from broker - retrying...");
106  }
107  sleep_ms(m_reconnect);
108  connect_to_broker();
109  }
110  if (now_ms() >= m_heartbeat_at) {
111  zio::multipart_t mmsg;
112  mmsg.pushstr(mdp::worker::heartbeat); // 2
113  mmsg.pushstr(mdp::worker::ident); // 1
114  really_send(m_sock, mmsg);
115  m_heartbeat_at += m_heartbeat;
116  }
117 
118  return;
119 }
120 
121 zio::multipart_t Worker::work(zio::multipart_t& reply)
122 {
123  send(reply);
124 
125  while (! interrupted() ) {
126  zio::multipart_t request;
127  recv(request);
128  if (request.empty()) {
129  continue;
130  }
131  return request;
132  }
133  if (interrupted()) {
134  zio::info("zio::domo::Worker interupt received, killing worker");
135  }
136 
137  return zio::multipart_t{};
138 }
139 
140 
141 void zio::domo::echo_worker(zio::socket_t& link, std::string address, int socktype)
142 {
143  // fixme: should implement BIND actor protocol
145  zio::socket_t sock(ctx, socktype);
146  Worker worker(sock, address, "echo");
147  zio::debug("worker echo created on " + address);
148 
149  zio::poller_t<> poller;
150  poller.add(link, zio::event_flags::pollin);
151  poller.add(sock, zio::event_flags::pollin);
152 
153  // we want to get back to our main loop often enough to check for
154  // our creator to issue a termination (link hit) but not so fast
155  // that the loop spins and wastes CPU.
156  time_unit_t poll_resolution{500};
157 
158  link.send(zio::message_t{}, zio::send_flags::none); // ready
159 
160  zio::debug("worker echo starting");
161  zio::multipart_t reply;
162  while ( ! interrupted() ) {
163 
164  zio::debug("worker check link");
165  std::vector< zio::poller_event<> > events(2);
166  int nevents = poller.wait_all(events, poll_resolution);
167  for (int iev=0; iev < nevents; ++iev) {
168 
169  if (events[iev].socket == link) {
170  zio::debug("worker link hit");
171  return;
172  }
173 
174  if (events[iev].socket == sock) {
175  zio::debug("worker echo work");
176  zio::multipart_t request;
177  worker.recv(request);
178  if (request.empty()) {
179  zio::warn("worker echo got null request");
180  break;
181  }
182  reply = std::move(request);
183  worker.send(reply);
184  }
185  }
186  }
187  // fixme: should poll on link to check for early shutdown
188  zio::debug("worker echo wait for term");
189  zio::message_t die;
190  auto res = link.recv(die, zio::recv_flags::none);
191  zio::debug("worker echo wait for exit");
192 }
void sleep_ms(std::chrono::milliseconds zzz)
Definition: util.cpp:183
void recv(zio::multipart_t &request)
Definition: domo_worker.cpp:72
void send_client(socket_t &client_socket, multipart_t &mmsg)
const char * reply
Definition: protocol.hpp:23
void setsockopt(int option_, T const &optval)
Definition: zmq.hpp:1279
bool interrupted()
Definition: util.cpp:194
const char * heartbeat
Definition: protocol.hpp:24
size_t send(const void *buf_, size_t len_, int flags_=0)
Definition: zmq.hpp:1345
const char * disconnect
Definition: protocol.hpp:25
void echo_worker(zio::socket_t &link, std::string address, int socktype)
std::chrono::milliseconds now_ms()
Definition: util.cpp:174
const char * ready
Worker commands as strings.
Definition: protocol.hpp:21
const char * request
Definition: protocol.hpp:22
const char * ident
Definition: protocol.hpp:18
void getsockopt(int option_, void *optval_, size_t *optvallen_) const
Definition: zmq.hpp:1291
void send_dealer(socket_t &dealer_socket, multipart_t &mmsg)
void connect(std::string const &addr)
Definition: zmq.hpp:1324
void recv_client(socket_t &client_socket, multipart_t &mmsg)
Worker(zio::socket_t &sock, std::string broker_address, std::string service)
Definition: domo_worker.cpp:8
zio::multipart_t work(zio::multipart_t &reply)
void recv_dealer(socket_t &dealer_socket, multipart_t &mmsg)
void disconnect(std::string const &addr)
Definition: zmq.hpp:1333
const int HEARTBEAT_LIVENESS
Definition: util.hpp:14
void send(zio::multipart_t &reply)
Definition: domo_worker.cpp:60
std::chrono::milliseconds time_unit_t
Definition: util.hpp:11