ZIO
Python and C++ interface to ZeroMQ and Zyre
broker.py
Go to the documentation of this file.
1 """
2 Majordomo Protocol broker
3 A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8
4 
5 Author: Min RK <benjaminrk@gmail.com>
6 Based on Java example by Arkadiusz Orzechowski
7 
8 Originally from Zguide examples, generalized to use SERVER
9 or ROUTER by brett.viren@gmail.com
10 """
11 
12 import logging
13 import sys
14 import time
15 from binascii import hexlify
16 
17 import zmq
18 
19 # local
20 from . import MDP
21 from .zhelpers import dump, serverish_recv, serverish_send
22 
23 def hash_sender(sender):
24  if isinstance(sender, int):
25  return sender.to_bytes(4, sys.byteorder)
26  return hexlify(sender)
27 
28 
29 class Service(object):
30  """a single Service"""
31  name = None # Service name
32  requests = None # List of client requests
33  waiting = None # List of waiting workers
34 
35  def __init__(self, name):
36  self.name = name
37  self.requests = []
38  self.waiting = []
39 
40 class Worker(object):
41  """a Worker, idle or active"""
42  identity = None # hex Identity of worker
43  address = None # Address to route to
44  service = None # Owning service, if known
45  expiry = None # expires at this point, unless heartbeat
46 
47  def __init__(self, identity, address, lifetime):
48  self.identity = identity
49  self.address = address
50  self.expiry = time.time() + 1e-3*lifetime
51 
52 class Broker(object):
53  """
54  Majordomo Protocol broker
55  A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8
56  """
57 
58  # We'd normally pull these from config data
59  INTERNAL_SERVICE_PREFIX = b"mmi."
60  HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
61  HEARTBEAT_INTERVAL = 2500 # msecs
62  HEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
63 
64  # ---------------------------------------------------------------------
65 
66  ctx = None # Our context
67  socket = None # Socket for clients & workers
68  poller = None # our Poller
69 
70  heartbeat_at = None# When to send HEARTBEAT
71  services = None # known services
72  workers = None # known workers
73  waiting = None # idle workers
74 
75  verbose = False # Print activity to stdout
76 
77  # ---------------------------------------------------------------------
78 
79 
80  def __init__(self, socket_type = zmq.ROUTER, verbose=False):
81  """Initialize broker state."""
82  self.verbose = verbose
83  self.services = {}
84  self.workers = {}
85  self.waiting = []
86  self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL
87  self.ctx = zmq.Context()
88  self.socket = self.ctx.socket(socket_type)
89  self.socket.linger = 0
90  self.poller = zmq.Poller()
91  self.poller.register(self.socket, zmq.POLLIN)
92  logging.basicConfig(format="%(asctime)s %(message)s",
93  datefmt="%Y-%m-%d %H:%M:%S",
94  level=logging.INFO)
95 
96 
97 
98  # ---------------------------------------------------------------------
99 
100  def mediate(self):
101  """Main broker work happens here"""
102  while True:
103  try:
104  items = self.poller.poll(self.HEARTBEAT_INTERVAL)
105  except KeyboardInterrupt:
106  break # Interrupted
107  if items:
108  sender, msg = serverish_recv(self.socket)
109  if self.verbose:
110  logging.info("I: received message:")
111  dump(msg)
112 
113  header = msg.pop(0)
114 
115  if (MDP.C_CLIENT == header):
116  self.process_client(sender, msg)
117  elif (MDP.W_WORKER == header):
118  self.process_worker(sender, msg)
119  else:
120  logging.error("E: invalid message:")
121  dump(msg)
122 
123  self.purge_workers()
124  self.send_heartbeats()
125 
126  def destroy(self):
127  """Disconnect all workers, destroy context."""
128  while self.workers:
129  self.delete_worker(self.workers.values()[0], True)
130  self.ctx.destroy(0)
131 
132 
133  def process_client(self, sender, msg):
134  """Process a request coming from a client."""
135  assert len(msg) >= 2 # Service name + body
136  service = msg.pop(0)
137  # Set reply return address to client sender
138  msg = [hash_sender(sender), b''] + msg
139  if service.startswith(self.INTERNAL_SERVICE_PREFIX):
140  self.service_internal(service, msg)
141  else:
142  self.dispatch(self.require_service(service), msg)
143 
144 
145  def process_worker(self, sender, msg):
146  """Process message sent to us by a worker."""
147  assert len(msg) >= 1 # At least, command
148 
149  command = msg.pop(0)
150 
151  worker_ready = hash_sender(sender) in self.workers
152 
153  worker = self.require_worker(sender)
154 
155  if (MDP.W_READY == command):
156  assert len(msg) >= 1 # At least, a service name
157  service = msg.pop(0)
158  # Not first command in session or Reserved service name
159  if (worker_ready or service.startswith(self.INTERNAL_SERVICE_PREFIX)):
160  self.delete_worker(worker, True)
161  else:
162  # Attach worker to service and mark as idle
163  worker.service = self.require_service(service)
164  self.worker_waiting(worker)
165 
166  elif (MDP.W_REPLY == command):
167  if (worker_ready):
168  # Remove & save client return envelope and insert the
169  # protocol header and service name, then rewrap envelope.
170  client = msg.pop(0)
171  empty = msg.pop(0) # ?
172  #msg = [client, b'', MDP.C_CLIENT, worker.service.name] + msg
173  msg = [MDP.C_CLIENT, worker.service.name] + msg
174  #self.socket.send_multipart(msg)
175  serverish_send(self.socket, client, msg)
176  self.worker_waiting(worker)
177  else:
178  self.delete_worker(worker, True)
179 
180  elif (MDP.W_HEARTBEAT == command):
181  if (worker_ready):
182  worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY
183  else:
184  self.delete_worker(worker, True)
185 
186  elif (MDP.W_DISCONNECT == command):
187  self.delete_worker(worker, False)
188  else:
189  logging.error("E: invalid message:")
190  dump(msg)
191 
192  def delete_worker(self, worker, disconnect):
193  """Deletes worker from all data structures, and deletes worker."""
194  assert worker is not None
195  if disconnect:
196  self.send_to_worker(worker, MDP.W_DISCONNECT, None, None)
197 
198  if worker.service is not None:
199  worker.service.waiting.remove(worker)
200  self.workers.pop(worker.identity)
201 
202  def require_worker(self, address):
203  """Finds the worker (creates if necessary)."""
204  assert (address is not None)
205  identity = hash_sender(address)
206  worker = self.workers.get(identity)
207  if (worker is None):
208  worker = Worker(identity, address, self.HEARTBEAT_EXPIRY)
209  self.workers[identity] = worker
210  if self.verbose:
211  logging.info("I: registering new worker: %s", identity)
212 
213  return worker
214 
215  def require_service(self, name):
216  """Locates the service (creates if necessary)."""
217  assert (name is not None)
218  service = self.services.get(name)
219  if (service is None):
220  service = Service(name)
221  self.services[name] = service
222 
223  return service
224 
225  def bind(self, endpoint):
226  """Bind broker to endpoint, can call this multiple times.
227 
228  We use a single socket for both clients and workers.
229  """
230  self.socket.bind(endpoint)
231  logging.info("I: MDP broker/0.1.1 is active at %s", endpoint)
232 
233  def service_internal(self, service, msg):
234  """Handle internal service according to 8/MMI specification"""
235  returncode = b"501"
236  if b"mmi.service" == service:
237  name = msg[-1]
238  returncode = b"200" if name in self.services else b"404"
239  msg[-1] = returncode
240 
241  # insert the protocol header and service name after the routing envelope ([client, ''])
242  cid = msg[0]
243  #msg = msg[:2] + [MDP.C_CLIENT, service] + msg[2:]
244  msg = [MDP.C_CLIENT, service] + msg[2:]
245  #self.socket.send_multipart(msg)
246  serverish_send(self.socket, cid, msg)
247 
248  def send_heartbeats(self):
249  """Send heartbeats to idle workers if it's time"""
250  if (time.time() > self.heartbeat_at):
251  for worker in self.waiting:
252  self.send_to_worker(worker, MDP.W_HEARTBEAT, None, None)
253 
254  self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL
255 
256  def purge_workers(self):
257  """Look for & kill expired workers.
258 
259  Workers are oldest to most recent, so we stop at the first alive worker.
260  """
261  while self.waiting:
262  w = self.waiting[0]
263  if w.expiry < time.time():
264  logging.info("I: deleting expired worker: %s", w.identity)
265  self.delete_worker(w,False)
266  self.waiting.pop(0)
267  else:
268  break
269 
270  def worker_waiting(self, worker):
271  """This worker is now waiting for work."""
272  # Queue to broker and service waiting lists
273  self.waiting.append(worker)
274  worker.service.waiting.append(worker)
275  worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY
276  self.dispatch(worker.service, None)
277 
278  def dispatch(self, service, msg):
279  """Dispatch requests to waiting workers as possible"""
280  assert (service is not None)
281  if msg is not None:# Queue message if any
282  service.requests.append(msg)
283  self.purge_workers()
284  while service.waiting and service.requests:
285  msg = service.requests.pop(0)
286  worker = service.waiting.pop(0)
287  self.waiting.remove(worker)
288  self.send_to_worker(worker, MDP.W_REQUEST, None, msg)
289 
290  def send_to_worker(self, worker, command, option, msg=None):
291  """Send message to worker.
292 
293  If message is provided, sends that message.
294  """
295 
296  if msg is None:
297  msg = []
298  elif not isinstance(msg, list):
299  msg = [msg]
300 
301  # Stack routing and protocol envelopes to start of message
302  # and routing envelope
303  if option is not None:
304  msg = [option] + msg
305  #msg = [worker.address, b'', MDP.W_WORKER, command] + msg
306  msg = [MDP.W_WORKER, command] + msg
307 
308  if self.verbose:
309  logging.info("I: sending %r to worker", command)
310  dump(msg)
311 
312  #self.socket.send_multipart(msg)
313  serverish_send(self.socket, worker.address, msg)
314 
315 
316 def main():
317  """create and start new broker"""
318  verbose = '-v' in sys.argv
319  broker = MajorDomoBroker(verbose)
320  broker.bind("tcp://*:5555")
321  broker.mediate()
322 
323 if __name__ == '__main__':
324  main()
def process_worker(self, sender, msg)
Definition: broker.py:145
def delete_worker(self, worker, disconnect)
Definition: broker.py:192
def hash_sender(sender)
Definition: broker.py:23
def require_worker(self, address)
Definition: broker.py:202
def __init__(self, name)
Definition: broker.py:35
def process_client(self, sender, msg)
Definition: broker.py:133
def send_to_worker(self, worker, command, option, msg=None)
Definition: broker.py:290
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_=-1)
Definition: zmq.hpp:259
def send_heartbeats(self)
Definition: broker.py:248
def serverish_send(sock, cid, msg, args, kwds)
Definition: zhelpers.py:156
def dispatch(self, service, msg)
Definition: broker.py:278
def purge_workers(self)
Definition: broker.py:256
def service_internal(self, service, msg)
Definition: broker.py:233
def serverish_recv(sock, args, kwds)
Definition: zhelpers.py:134
def __init__(self, socket_type=zmq.ROUTER, verbose=False)
Definition: broker.py:80
string INTERNAL_SERVICE_PREFIX
Definition: broker.py:59
void append(Message &msg, message_t &&data, const std::vector< size_t > &shape, size_t word_size, const char *tn)
Definition: tens.cpp:34
def worker_waiting(self, worker)
Definition: broker.py:270
void dump(const SM &) noexcept
Definition: _test_hsm.cpp:179
def __init__(self, identity, address, lifetime)
Definition: broker.py:47
def bind(self, endpoint)
Definition: broker.py:225
def require_service(self, name)
Definition: broker.py:215