ZIO
Python and C++ interface to ZeroMQ and Zyre
broker.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 '''
3 ZIO brokers bring together ZIO clients.
4 
5 '''
6 
7 import json
8 from collections import namedtuple
9 import zmq
10 import zio
11 from .util import objectify, switch_direction
12 import logging
13 log = logging.getLogger("zio")
14 
15 class Broker:
16  def __init__(self, server, factory):
17  '''Create a flow broker.
18 
19  Parameters
20  ----------
21  server : zio.Port
22  An online SERVER port
23  factory : callable
24  See below
25 
26  This broker routes messages between a pair of clients: a
27  remote client and its handler. It does this by adding a flow
28  initiation protocol to ZIO flow protocol.
29 
30  On receipt of a BOT, the broker passes it to the factory which
31  should return True if the BOT was accepted else None. The
32  factory call should return as promptly as possible as the
33  broker blocks.
34 
35  If factory returns True then it is expected that the factory
36  has started some other client which contacs the broker with
37  the BOT. The factory or the new client may modify the BOT as
38  per flow protorocl but must leave intact the `cid` attribute
39  placed in the flow object by the broker.
40 
41  Note that the flow handler protocol does not communicate the
42  location of the broker's SERVER socket. It is up to handler
43  implementations to locate the the server.
44 
45  '''
46  self.server = server
47  self.factory = factory
48  self.other = dict() # map router IDs
49  self.handlers = set()
50 
51  def poll(self, timeout=None):
52  '''
53  Poll for at most one message from the SERVER port
54 
55  Raises exceptions.
56 
57  '''
58  msg = self.server.recv(timeout)
59  if not msg:
60  log.error (f'broker poll timeout with {timeout}')
61  raise TimeoutError(f'broker poll timeout with {timeout}')
62  log.debug (f'broker recv {msg}')
63  rid = msg.routing_id
64 
65  orid = self.other.get(rid, None)
66  if orid: # we have its other
67  if orid in self.handlers:
68  log.debug (f"broker route c2h {rid} -> {orid}:\n{msg}\n")
69  else:
70  log.debug (f"broker route h2c {rid} -> {orid}:\n{msg}\n")
71  msg.routing_id = orid
72  self.server.send(msg)
73  return
74 
75  # message is from new client or handler
76 
77  fobj = objectify(msg)
78  ftype = fobj.get("flow", None)
79  if not ftype:
80  raise TypeError(f'message not type FLOW')
81  if ftype != "BOT":
82  raise TypeError(f'flow message not type BOT')
83 
84 
85  cid = fobj.get('cid',None)
86  if cid: # BOT from handler
87  self.handlers.add(rid)
88  self.other[rid] = cid
89  self.other[cid] = rid
90  del fobj["cid"]
91  log.debug(f'broker route from handler {rid} <--> {cid}')
92 
93  label_for_client = json.dumps(fobj)
94 
95  fobj = switch_direction(fobj)
96  msg.label = json.dumps(fobj)
97  msg.routing_id = rid
98  log.debug (f"broker route c2h {cid} -> {rid}:\n{msg}\n")
99  self.server.send(msg) # to handler
100 
101  msg.label = label_for_client
102  msg.routing_id = cid
103  log.debug (f"broker route h2c {rid} -> {cid}:\n{msg}\n")
104  self.server.send(msg)
105  return
106 
107  # BOT from client
108  log.debug(f'broker route from client {rid}')
109  fobj = switch_direction(fobj)
110 
111  fobj["cid"] = rid
112  msg.label = json.dumps(fobj)
113  msg.routing_id = 0
114  got = self.factory(msg)
115  if got:
116  return
117 
118  # factory refuses
119  fobj['flow'] = 'EOT'
120  msg.label = json.dumps(fobj)
121  msg.routing_id = rid
122  self.server.send(msg)
123  raise RuntimeError('broker factory rejects ' + str(msg.label))
124 
125 
126  def stop(self):
127  self.factory.stop()
128  return
129 
130 
131 
132 # fixme: broker doesn't handle endpoints disappearing.
133 
134 
def __init__(self, server, factory)
Definition: broker.py:16
def switch_direction(fobj)
Definition: util.py:31
def objectify(morl)
Definition: util.py:5
def poll(self, timeout=None)
Definition: broker.py:51