ZIO
Python and C++ interface to ZeroMQ and Zyre
flow.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 import json
3 from .. import jsonnet
4 import click
5 from .. import rules
6 
7 import logging
8 log = logging.getLogger("zio")
9 
10 @click.group("flow")
11 @click.pass_context
12 def cli(ctx):
13  '''
14  ZIO FLOW command line interface
15  '''
16 
17 
18 def typeit(v):
19  try:
20  return int(v)
21  except ValueError:
22  pass
23  try:
24  return float(v)
25  except ValueError:
26  pass
27  return v # fixme: should recurs...
28 
29 def attrify(attrs):
30  '''
31  Return dictionary of attribute values parsed from list of "k=v" strings.
32  '''
33  ret = dict()
34  for attr in attrs:
35  try:
36  k,v = attr.split('=', 1)
37  except ValueError:
38  log.error(f'failed to parse {attr}')
39  continue
40  ret[k] = typeit(v)
41  return ret
42 
43 @cli.command("test-ruleset")
44 @click.option("-r","--ruleset",type=click.Path(), required=True,
45  help="A file in JSON or Jsonnet format providing the ruleset")
46 @click.option("-v","--verbosity", default="info",
47  help="Set logging level (debug, info, warning, error, critical)")
48 @click.argument("attrs", nargs=-1)
49 def test_ruleset(ruleset, verbosity, attrs):
50  '''
51  Test a rule set by giving attributes as key=value on command line.
52  '''
53  log.level = getattr(logging, verbosity.upper(), "INFO")
54 
55  msg_attr = attrify(attrs)
56 
57  rs = jsonnet.load(ruleset)
58  # fixme: move this to a module
59  for ind,robj in enumerate(rs):
60  attr = dict(robj.get('attr',{}), **msg_attr)
61  log.debug(f'attr: {attr}')
62 
63  def dump(errtype, err):
64  log.error(f'{errtype} "{err}"\n{rule}\n{attr}')
65 
66  # do parsing
67  try:
68  parsed = rules.parse(robj, **attr)
69  except KeyError as ke:
70  dump("key error", ke)
71  continue
72 
73  # do evaluating
74  # can call rules.evaluate() but we want to print extra stuff here
75  log.debug(f'parsed expression: {parsed}')
76  expr = rules.Rule(parsed) #, return_bool = True)
77  log.debug(f'rule expression: {expr}')
78  tf = expr.match();
79  log.debug(f'rule evaluation: {tf}')
80 
81  # do string interpolation no the "pat" patterns
82  filepat = robj['filepat']
83  try:
84  path = filepat.format(**attr)
85  except KeyError as ke:
86  dump(f'key error "{filepat}"', ke)
87  continue
88 
89  grouppat = robj['grouppat']
90  try:
91  group = grouppat.format(**attr)
92  except KeyError as ke:
93  dump(f'key error "{grouppat}"', ke)
94  continue
95 
96  rw = robj['rw']
97  tf = "TRUE" if tf else "FALSE"
98  log.info(f'#{ind} {tf:5s} {rw} {path}:/{group}')
99 
100 
101 @cli.command("flow-file-server")
102 @click.option("-b","--bind", default="tcp://127.0.0.1:5555",
103  help="An address to which the server shall bind")
104 @click.option("-f","--format", default="hdf", type=click.Choice(["hdf"]),
105  help="File format")
106 @click.option("-n","--name", default="file-server",
107  help="The Zyre node name for the server")
108 @click.option("-p","--port", default="flow",
109  help="The port name for the server socket")
110 @click.option("-v","--verbosity", default="info",
111  help="Set logging level (debug, info, warning, error, critical)")
112 @click.argument("ruleset")
113 def file_server(bind, format, name, port, verbosity, ruleset):
114  '''Source and sink ZIO flow protocol to file
115 
116  This brings back-end reader and writer handlers to external
117  clients via the flow broker. A ruleset factory dynamically spawns
118  handlers based on incoming BOT flow messages. The file format
119  maps to different flavors of handlers.
120 
121  '''
122  import zmq
123  from zio import Port, Message, Node
124  from zio.flow.broker import Broker
125  from zio.flow.factories import Ruleset as Factory
126  from zio.jsonnet import load as jsonnet_load
127 
128  # For now we only support HDF. In future this may be replaced by
129  # a mapping from supported format to module providing handlers
130  from zio.flow.hdf import writer, reader
131  assert(format == "hdf")
132 
133  log.level = getattr(logging, verbosity.upper(), "INFO")
134 
135  ruleset = jsonnet_load(ruleset)
136 
137  # fixme: is it really needed to create a common ctx?
138  ctx = zmq.Context()
139  factory = Factory(ctx, ruleset,
140  wactors=((writer.file_handler,
141  ("inproc://"+format+"writer{port}")),
142  (writer.client_handler,
143  (bind,))),
144  ractor=(reader.handler,(bind,)))
145 
146  node = Node(name)
147  sport = node.port(port, zmq.SERVER)
148  sport.bind(bind)
149  node.online()
150  log.info(f'domo-broker {name}:{port} online at {bind}')
151 
152  # this may throw
153  broker = Broker(sport, factory)
154 
155  log.info(f'domo-broker {name} entering loop')
156  while True:
157  try:
158  broker.poll(10000)
159  except TimeoutError:
160  node.peer.drain()
161  log.debug(f'domo-broker {name} is lonely')
162  log.debug(node.peer.peers)
163  except Exception as e:
164  log.error(e)
165  continue
166 
167  broker.stop()
168 
169 @cli.command("flow-send-tens")
170 @click.option("-n","--number",default=10,
171  help="Number of TENS messages to generate")
172 @click.option("-c","--connect", default="tcp://127.0.0.1:5555",
173  help="An address to which this client shall connect")
174 @click.option("-s","--shape", default="6000,800",
175  help="Comma separated list of integers giving tensor shape")
176 @click.option("-v","--verbosity", default="info",
177  help="Set logging level (debug, info, warning, error, critical)")
178 @click.argument("attrs", nargs=-1)
179 def send_tens(number, connect, shape, verbosity, attrs):
180  '''
181  Generate and flow some TENS messages.
182  '''
183  import zmq
184  from zio import Port, Message, Node
185  from zio.flow import Flow
186 
187  log.level = getattr(logging, verbosity.upper(), "INFO")
188 
189  msg_attr = attrify(attrs)
190 
191  cnode = Node("flow-send-tens")
192  cport = cnode.port("output", zmq.CLIENT)
193  cport.connect(connect)
194  cnode.online()
195  cflow = Flow(cport)
196 
197  shape = list(map(int, shape.split(',')))
198  size = 1
199  for s in shape:
200  size *= s
201 
202  attr = dict(credit=2, direction="extract", **msg_attr)
203  bot = Message(label=json.dumps(attr))
204  cflow.send_bot(bot)
205  bot = cflow.recv_bot(5000);
206  log.debug('flow-send-tens: BOT handshake done')
207  assert(bot)
208 
209  tens_attr = dict(shape=shape, word=1, dtype='u') # unsigned char
210  attr["TENS"] = dict(tensors=[tens_attr], metadata=dict(source="gen-tens"))
211  label = json.dumps(attr)
212  payload = [b'X'*size]
213 
214  for count in range(number):
215  msg = Message(label=label,payload=payload)
216  cflow.put(msg)
217  log.debug(f'flow-send-tens: {count}: {msg}')
218 
219  log.debug(f'flow-send-tens: send EOT')
220  cflow.send_eot(Message())
221  log.debug(f'flow-send-tens: recv EOT (waiting)')
222  cflow.recv_eot()
223  log.debug(f'flow-send-tens: going offline')
224  cnode.offline()
225  log.debug(f'flow-send-tens: end')
226 
227 @cli.command("flow-recv-tens")
228 @click.option("-n","--number",default=10,
229  help="Number of TENS messages to recv before exiting, 0=forever")
230 @click.option("-c","--connect", default="tcp://127.0.0.1:5555",
231  help="An address to which this client shall connect")
232 @click.option("-v","--verbosity", default="info",
233  help="Set logging level (debug, info, warning, error, critical)")
234 @click.argument("attrs", nargs=-1)
235 def recv_tens(number, connect, verbosity, attrs):
236  '''
237  Client to recv flow of TENS messages.
238  '''
239  import zmq
240  from zio import Port, Message, Node
241  from zio.flow import Flow
242 
243  log.level = getattr(logging, verbosity.upper(), "INFO")
244 
245  msg_attr = attrify(attrs)
246 
247  cnode = Node("flow-recv-tens")
248  cport = cnode.port("input", zmq.CLIENT)
249  cport.connect(connect)
250  cnode.online()
251  cflow = Flow(cport)
252 
253  attr = dict(credit=2, direction="inject", **msg_attr)
254  bot = Message(label=json.dumps(attr))
255  cflow.send_bot(bot)
256  bot = cflow.recv_bot(5000);
257  log.debug('flow-recv-tens: BOT handshake done')
258  assert(bot)
259 
260  count = 0
261  while True:
262  if number > 0 and count == number:
263  break
264  ++count
265  msg = cflow.get()
266  log.info(f'flow-recv-tens: {count}: {msg}')
267  if msg is None:
268  cflow.send_eot()
269  cnode.offline()
270  log.debug('flow-recv-tens: EOT whille receiving')
271  return
272 
273  log.debug(f'flow-recv-tens: send EOT')
274  cflow.send_eot(Message())
275  log.debug(f'flow-recv-tens: recv EOT (waiting)')
276  cflow.recv_eot()
277  log.debug(f'flow-recv-tens: going offline')
278  cnode.offline()
279  log.debug(f'flow-recv-tens: end')
280 
281 
282 
283 
284 @cli.command("flow-recv-server")
285 @click.option("-n","--number",default=10,
286  help="Number of TENS messages to generate")
287 @click.option("-b","--bind", default="tcp://127.0.0.1:5555",
288  help="An address to which this client shall bind")
289 @click.option("-s","--shape", default="6000,800",
290  help="Comma separated list of integers giving tensor shape")
291 @click.option("-v","--verbosity", default="info",
292  help="Set logging level (debug, info, warning, error, critical)")
293 @click.argument("attrs", nargs=-1)
294 def recv_server(number, bind, shape, verbosity, attrs):
295  '''
296  A simple server to catch some TENS messages from flow and dump them.
297  '''
298  import zmq
299  from zio import Port, Message, Node
300  from zio.flow import Flow
301 
302  log.level = getattr(logging, verbosity.upper(), "INFO")
303 
304  msg_attr = attrify(attrs)
305 
306  snode = Node("server")
307  sport = snode.port("sport", zmq.SERVER)
308  sport.bind(bind)
309  snode.online()
310  sflow = Flow(sport)
311 
312  bot = sflow.recv_bot();
313  assert (bot)
314  lobj = bot.label_object
315  lobj["direction"] = "inject"
316  bot.label_object = lobj
317  sflow.send_bot(bot)
318  log.debug('flow-recv-server: BOT handshake done')
319  sflow.flush_pay()
320  log.debug('flow-recv-server: looping')
321  while True:
322  msg = sflow.get(1000)
323  log.info(f'flow-recv-server: {msg}')
324  if not msg or 'EOT' == msg.label_object['flow']:
325  log.debug('flow-recv-server: got EOT')
326  sflow.send_eot() # answer
327  break
328 
329  snode.offline()
330  log.debug(f'flow-recv-server: end')
def test_ruleset(ruleset, verbosity, attrs)
Definition: flow.py:49
An identified vertex in a ported, directed graph.
Definition: node.hpp:30
def recv_server(number, bind, shape, verbosity, attrs)
Definition: flow.py:294
def send_tens(number, connect, shape, verbosity, attrs)
Definition: flow.py:179
ZIO data flow.
Definition: flow.hpp:17
def recv_tens(number, connect, verbosity, attrs)
Definition: flow.py:235
def file_server(bind, format, name, port, verbosity, ruleset)
Definition: flow.py:113
void dump(const SM &) noexcept
Definition: _test_hsm.cpp:179
def attrify(attrs)
Definition: flow.py:29
def cli(ctx)
Definition: flow.py:12
def typeit(v)
Definition: flow.py:18
a ZIO message
Definition: message.hpp:59