ZIO
Python and C++ interface to ZeroMQ and Zyre
domo_broker.cpp
Go to the documentation of this file.
1 // The broker code is implemented closely following the C++ example
2 // for Majordomo in the Zguide (mdbroker.cpp). However, we use cppzmq
3 // and abstract away the differences between ROUTER and SERVER
4 
5 #include "zio/domo/broker.hpp"
6 #include "zio/domo/protocol.hpp"
7 #include "zio/util.hpp"
8 #include "zio/logging.hpp"
9 #include <sstream>
10 
11 using namespace zio::domo;
12 
13 
14 Broker::Service::~Service () {
15 }
16 
17 
19  : m_sock(sock)
20 {
21  int stype = m_sock.getsockopt<int>(ZMQ_TYPE);
22  if (ZMQ_SERVER == stype) {
23  recv = recv_server;
24  send = send_server;
25  zio::debug("zio::domo::Broker with SERVER starting");
26  return;
27  }
28  if(ZMQ_ROUTER == stype) {
29  recv = recv_router;
30  send = send_router;
31  zio::debug("zio::domo::Broker with ROUTER starting");
32  return;
33  }
34  throw std::runtime_error("zio::domo::Broker requires SERVER or ROUTER socket");
35 }
36 
38 {
39  while (! m_services.empty()) {
40  delete m_services.begin()->second;
41  m_services.erase(m_services.begin());
42  }
43  while (! m_workers.empty()) {
44  delete m_workers.begin()->second;
45  m_workers.erase(m_workers.begin());
46  }
47 }
48 
49 
51 {
52  zio::multipart_t mmsg;
53  remote_identity_t sender = recv(m_sock, mmsg);
54  assert(mmsg.size() > 0);
55  std::string header = mmsg.popstr(); // 7/MDP frame 1
56  if (header == mdp::client::ident) {
57  zio::debug("zio::domo::Broker process client");
58  client_process(sender, mmsg);
59  }
60  else if (header == mdp::worker::ident) {
61  zio::debug("zio::domo::Broker process worker");
62  worker_process(sender, mmsg);
63  }
64  else {
65  zio::warn("zio::domo::Broker invalid message from " + sender);
66  }
67 }
68 
70 {
71  auto now = now_ms();
72  if (now < heartbeat_at) {
73  return;
74  }
75  purge_workers();
76  for (auto& wrk : m_waiting) {
77  zio::debug("zio::domo::Broker heartbeat to worker");
78  zio::multipart_t mmsg;
79  mmsg.pushstr(mdp::worker::heartbeat);
80  mmsg.pushstr(mdp::worker::ident);
81  send(m_sock, mmsg, wrk->identity);
82  }
83 }
84 
86 {
88  time_unit_t heartbeat_at = now + m_hb_interval;
89 
90  zio::poller_t<> poller;
91  poller.add(m_sock, zio::event_flags::pollin);
92  while (! interrupted()) {
94  if (heartbeat_at > now ) {
95  timeout = heartbeat_at - now;
96  }
97 
98  std::vector< zio::poller_event<> > events(1);
99  int rc = poller.wait_all(events, timeout);
100  if (rc > 0) { // got one
101  proc_one();
102  }
103  proc_heartbeat(heartbeat_at);
104 
105  heartbeat_at += m_hb_interval;
106  now = now_ms();
107  }
108 }
109 
110 void Broker::purge_workers()
111 {
112  auto now = now_ms();
113  // can't remove from the set while iterating, so make a temp
114  std::vector<Worker*> dead;
115  for (auto wrk : m_waiting) {
116  if (wrk->expiry <= now) {
117  dead.push_back(wrk);
118  }
119  }
120  for (auto wrk : dead) {
121  zio::debug("zio::domo::Broker deleting expired worker: " + wrk->identity);
122  worker_delete(wrk,0); // operates on m_waiting set
123  }
124 }
125 
126 Broker::Service* Broker::service_require(std::string name)
127 {
128  Service* srv = m_services[name];
129  if (!srv) {
130  srv = new Service{name};
131  m_services[name] = srv;
132  zio::debug("zio::domo::Broker registering new service: " + name);
133  }
134  return srv;
135 }
136 
137 void Broker::service_internal(remote_identity_t rid, std::string service_name, zio::multipart_t& mmsg)
138 {
139  zio::multipart_t response;
140 
141  if (service_name == "mmi.service") {
142  std::string sn = mmsg.popstr();
143  Service* srv = m_services[sn];
144  if (srv and srv->nworkers) {
145  response.pushstr("200");
146  }
147  else {
148  response.pushstr("404");
149  }
150  }
151  else {
152  response.pushstr("501");
153  }
154 
155  send(m_sock, response, rid);
156 }
157 
158 void Broker::service_dispatch(Service* srv)
159 {
160  purge_workers();
161  while (srv->waiting.size() and srv->requests.size()) {
162 
163  std::list<Worker*>::iterator wrk_it = srv->waiting.begin();
164  std::list<Worker*>::iterator next = wrk_it;
165  for (++next; next != srv->waiting.end(); ++next) {
166  if ((*next)->expiry > (*wrk_it)->expiry) {
167  wrk_it = next;
168  }
169  }
170 
171  zio::multipart_t& mmsg = srv->requests.front();
172  zio::debug("zio::domo::Broker send work");
173  send(m_sock, mmsg, (*wrk_it)->identity);
174  srv->requests.pop_front();
175  m_waiting.erase(*wrk_it);
176  srv->waiting.erase(wrk_it);
177  }
178 }
179 
180 
181 Broker::Worker* Broker::worker_require(remote_identity_t identity)
182 {
183  Worker* wrk = m_workers[identity];
184  if (!wrk) {
185  wrk = new Worker{identity};
186  m_workers[identity] = wrk;
187  zio::debug("zio::domo::Broker registering new worker");
188  }
189  return wrk;
190 }
191 
192 void Broker::worker_delete(Broker::Worker*& wrk, int disconnect)
193 {
194  if (disconnect) {
195  zio::multipart_t mmsg;
196  mmsg.pushstr(mdp::worker::disconnect);
197  mmsg.pushstr(mdp::worker::ident);
198  zio::debug("zio::domo::Broker disconnect worker");
199  send(m_sock, mmsg, wrk->identity);
200  }
201  if (wrk->service) {
202  for (std::list<Worker*>::iterator it = wrk->service->waiting.begin();
203  it != wrk->service->waiting.end();) {
204  if (*it == wrk) {
205  it = wrk->service->waiting.erase(it);
206  }
207  else {
208  ++it;
209  }
210  }
211  --wrk->service->nworkers;
212  }
213  m_waiting.erase(wrk);
214  m_workers.erase(wrk->identity);
215  delete wrk;
216  wrk=0;
217 }
218 // mmsg holds starting with 7/MDP Frame 2.
219 void Broker::worker_process(remote_identity_t sender, zio::multipart_t& mmsg)
220 {
221  assert(mmsg.size() >= 1);
222  const std::string command = mmsg.popstr(); // 0x01, 0x02, ....
223  bool worker_ready = (m_workers.find(sender) != m_workers.end());
224  Worker* wrk = worker_require(sender);
225 
226  if (mdp::worker::ready == command) {
227  if (worker_ready) { // protocol error
228  zio::error("zio::domo::Broker protocol error (double ready) from: " + sender);
229  worker_delete(wrk, 1);
230  return;
231  }
232  if (sender.size() >= 4 && sender.find_first_of("mmi.") == 0) {
233  zio::error("zio::domo::Broker protocol error (worker mmi) from: " + sender);
234  worker_delete(wrk, 1);
235  return;
236  }
237  // Attach worker to service and mark as idle
238  std::string service_name = mmsg.popstr();
239  wrk->service = service_require(service_name);
240  wrk->service->nworkers++;
241  worker_waiting(wrk);
242  return;
243  }
244  if (mdp::worker::reply == command) {
245  if (!worker_ready) {
246  worker_delete(wrk, 1);
247  return;
248  }
249  remote_identity_t client_id = mmsg.popstr();
250  mmsg.pop();
251  mmsg.pushstr(wrk->service->name);
252  mmsg.pushstr(mdp::client::ident);
253  zio::debug("zio::domo::Broker reply to client");
254  send(m_sock, mmsg, client_id);
255  worker_waiting(wrk);
256  return;
257  }
258  if (mdp::worker::heartbeat == command) {
259  if (!worker_ready) {
260  worker_delete(wrk, 1);
261  return;
262  }
263  wrk->expiry = now_ms() + m_hb_expiry;
264  return;
265  }
266  if (mdp::worker::disconnect == command) {
267  worker_delete(wrk, 0);
268  return;
269  }
270  zio::error("zio::domo::Broker invalid input message " + command);
271 }
272 
273 
274 void Broker::worker_waiting(Broker::Worker* wrk)
275 {
276  m_waiting.insert(wrk);
277  wrk->service->waiting.push_back(wrk);
278  wrk->expiry = now_ms() + m_hb_expiry;
279 
280  service_dispatch(wrk->service);
281 }
282 
283 void Broker::client_process(remote_identity_t client_id, zio::multipart_t& mmsg)
284 {
285  std::string service_name = mmsg.popstr(); // Client REQUEST Frame 2
286  Service* srv = service_require(service_name);
287  if (service_name.size() >= 4 and service_name.find_first_of("mmi.") == 0) {
288  service_internal(client_id, service_name, mmsg);
289  }
290  else {
291  mmsg.pushmem(NULL,0); // frame 4
292  mmsg.pushstr(client_id); // frame 3
293  mmsg.pushstr(mdp::worker::request); // frame 2
294  mmsg.pushstr(mdp::worker::ident); // frame 1
295  srv->requests.emplace_back(std::move(mmsg));
296  service_dispatch(srv);
297  }
298 }
299 
300 
301 // An actor function running a Broker.
302 
303 
304 void zio::domo::broker_actor(zio::socket_t& link, std::string address, int socktype)
305 {
307  zio::socket_t sock(ctx, socktype);
308  sock.bind(address);
309 
310  Broker broker(sock);
311  link.send(zio::message_t{}, zio::send_flags::none);
312 
313  // basically the guts of start() but we also poll on link as well as sock
314 
315  time_unit_t now = now_ms();
316  time_unit_t hb_interval{HEARTBEAT_INTERVAL};
317  time_unit_t heartbeat_at = now + hb_interval;
318 
319  zio::poller_t<> poller;
320  poller.add(link, zio::event_flags::pollin);
321  poller.add(sock, zio::event_flags::pollin);
322 
323  while (! interrupted()) {
324  time_unit_t timeout{0};
325  if (heartbeat_at > now ) {
326  timeout = heartbeat_at - now;
327  }
328 
329  zio::debug("broker actor wait");
330  std::vector< zio::poller_event<> > events(2);
331  int nevents = poller.wait_all(events, timeout);
332  for (int iev=0; iev < nevents; ++iev) {
333 
334  if (events[iev].socket == sock) {
335  zio::debug("broker actor sock hit");
336  broker.proc_one();
337  }
338 
339  if (events[iev].socket == link) {
340  zio::debug("broker actor link hit");
341  zio::message_t msg;
342  auto res = events[0].socket.recv(msg, zio::recv_flags::dontwait);
343  assert(res);
344  std::stringstream ss;
345  ss << "msg: " << msg.size();
346  zio::debug("broker actor link " + ss.str());
347  return; // terminated
348  }
349  }
350  if (!nevents) {
351  zio::debug("broker actor timeout");
352  }
353  broker.proc_heartbeat(heartbeat_at);
354 
355  heartbeat_at += hb_interval;
356  now = now_ms();
357  }
358 
359  zio::message_t die;
360  auto res = link.recv(die);
361 }
362 
void send_router(socket_t &router_socket, multipart_t &mmsg, remote_identity_t rid)
def now()
Definition: test_ugly.py:10
const char * reply
Definition: protocol.hpp:23
bool interrupted()
Definition: util.cpp:194
const char * heartbeat
Definition: protocol.hpp:24
size_t send(const void *buf_, size_t len_, int flags_=0)
Definition: zmq.hpp:1345
const char * disconnect
Definition: protocol.hpp:25
def broker(verbose, socket, address)
Definition: domo.py:58
void send_server(socket_t &server_socket, multipart_t &mmsg, remote_identity_t rid)
name
Definition: setup.py:4
remote_identity_t recv_router(socket_t &router_socket, multipart_t &mmsg)
std::chrono::milliseconds now_ms()
Definition: util.cpp:174
const char * ready
Worker commands as strings.
Definition: protocol.hpp:21
const char * request
Definition: protocol.hpp:22
void proc_heartbeat(time_unit_t heartbeat_at)
Do heartbeat processing given next heatbeat time.
Definition: domo_broker.cpp:69
const char * ident
Definition: protocol.hpp:18
void getsockopt(int option_, void *optval_, size_t *optvallen_) const
Definition: zmq.hpp:1291
void broker_actor(zio::socket_t &link, std::string address, int socktype)
size_t size() const ZMQ_NOTHROW
Definition: zmq.hpp:488
void bind(std::string const &addr)
Definition: zmq.hpp:1306
void proc_one()
Process one input on socket.
Definition: domo_broker.cpp:50
const time_unit_t HEARTBEAT_INTERVAL
Definition: util.hpp:15
const char * ident
Definition: protocol.hpp:14
remote_identity_t recv_server(socket_t &server_socket, multipart_t &mmsg)
std::string remote_identity_t
Definition: util.hpp:24
Broker(zio::socket_t &sock)
Definition: domo_broker.cpp:18
std::chrono::milliseconds time_unit_t
Definition: util.hpp:11