8 log = logging.getLogger(
"zio")
    14     ZIO FLOW command line interface    31     Return dictionary of attribute values parsed from list of "k=v" strings.    36             k,v = attr.split(
'=', 1)
    38             log.error(f
'failed to parse {attr}')
    43 @cli.command(
"test-ruleset")
    44 @click.option(
"-r",
"--ruleset",type=click.Path(), required=
True,
    45               help=
"A file in JSON or Jsonnet format providing the ruleset")
    46 @click.option(
"-v",
"--verbosity", default=
"info",
    47               help=
"Set logging level (debug, info, warning, error, critical)")
    48 @click.argument(
"attrs", nargs=-1)
    51     Test a rule set by giving attributes as key=value on command line.    53     log.level = getattr(logging, verbosity.upper(), 
"INFO")
    57     rs = jsonnet.load(ruleset)
    59     for ind,robj 
in enumerate(rs):
    60         attr = dict(robj.get(
'attr',{}), **msg_attr)
    61         log.debug(f
'attr: {attr}')
    63         def dump(errtype, err):
    64             log.error(f
'{errtype} "{err}"\n{rule}\n{attr}')
    68             parsed = rules.parse(robj, **attr)
    69         except KeyError 
as ke:
    75         log.debug(f
'parsed expression: {parsed}')
    76         expr = rules.Rule(parsed) 
    77         log.debug(f
'rule expression: {expr}')
    79         log.debug(f
'rule evaluation: {tf}')
    82         filepat = robj[
'filepat']
    84             path = filepat.format(**attr)
    85         except KeyError 
as ke:
    86             dump(f
'key error "{filepat}"', ke)
    89         grouppat = robj[
'grouppat']
    91             group = grouppat.format(**attr)
    92         except KeyError 
as ke:
    93             dump(f
'key error "{grouppat}"', ke)
    97         tf = 
"TRUE" if tf 
else "FALSE"    98         log.info(f
'#{ind} {tf:5s} {rw} {path}:/{group}')
   101 @cli.command(
"flow-file-server")
   102 @click.option(
"-b",
"--bind", default=
"tcp://127.0.0.1:5555",
   103               help=
"An address to which the server shall bind")
   104 @click.option(
"-f",
"--format", default=
"hdf", type=click.Choice([
"hdf"]),
   106 @click.option(
"-n",
"--name", default=
"file-server",
   107               help=
"The Zyre node name for the server")
   108 @click.option(
"-p",
"--port", default=
"flow",
   109               help=
"The port name for the server socket")
   110 @click.option(
"-v",
"--verbosity", default=
"info",
   111               help=
"Set logging level (debug, info, warning, error, critical)")
   112 @click.argument(
"ruleset")
   114     '''Source and sink ZIO flow protocol to file   116     This brings back-end reader and writer handlers to external   117     clients via the flow broker.  A ruleset factory dynamically spawns   118     handlers based on incoming BOT flow messages.  The file format   119     maps to different flavors of handlers.   123     from zio 
import Port, Message, Node
   131     assert(format == 
"hdf")
   133     log.level = getattr(logging, verbosity.upper(), 
"INFO")
   135     ruleset = jsonnet_load(ruleset)
   139     factory = Factory(ctx, ruleset, 
   140                       wactors=((writer.file_handler,
   141                                 (
"inproc://"+format+
"writer{port}")),
   142                                (writer.client_handler,
   144                       ractor=(reader.handler,(bind,)))
   147     sport = node.port(port, zmq.SERVER)
   150     log.info(f
'domo-broker {name}:{port} online at {bind}')
   153     broker = Broker(sport, factory)
   155     log.info(f
'domo-broker {name} entering loop')
   161             log.debug(f
'domo-broker {name} is lonely')
   162             log.debug(node.peer.peers)
   163         except Exception 
as e:
   169 @cli.command(
"flow-send-tens")
   170 @click.option(
"-n",
"--number",default=10,
   171               help=
"Number of TENS messages to generate")
   172 @click.option(
"-c",
"--connect", default=
"tcp://127.0.0.1:5555",
   173               help=
"An address to which this client shall connect")
   174 @click.option(
"-s",
"--shape", default=
"6000,800",
   175               help=
"Comma separated list of integers giving tensor shape")
   176 @click.option(
"-v",
"--verbosity", default=
"info",
   177               help=
"Set logging level (debug, info, warning, error, critical)")
   178 @click.argument(
"attrs", nargs=-1)
   179 def send_tens(number, connect, shape, verbosity, attrs):
   181     Generate and flow some TENS messages.   184     from zio 
import Port, Message, Node
   187     log.level = getattr(logging, verbosity.upper(), 
"INFO")
   191     cnode = 
Node(
"flow-send-tens")
   192     cport = cnode.port(
"output", zmq.CLIENT)
   193     cport.connect(connect)
   197     shape = list(map(int, shape.split(
',')))
   202     attr = dict(credit=2, direction=
"extract", **msg_attr)
   203     bot = 
Message(label=json.dumps(attr))
   205     bot = cflow.recv_bot(5000);
   206     log.debug(
'flow-send-tens: BOT handshake done')
   209     tens_attr = dict(shape=shape, word=1, dtype=
'u') # unsigned char   210     attr["TENS"] = dict(tensors=[tens_attr], metadata=dict(source=
"gen-tens"))
   211     label = json.dumps(attr)
   212     payload = [b
'X'*size]
   214     for count 
in range(number):
   215         msg = 
Message(label=label,payload=payload)
   217         log.debug(f
'flow-send-tens: {count}: {msg}')
   219     log.debug(f
'flow-send-tens: send EOT')
   221     log.debug(f
'flow-send-tens: recv EOT (waiting)')
   223     log.debug(f
'flow-send-tens: going offline')
   225     log.debug(f
'flow-send-tens: end')
   227 @cli.command(
"flow-recv-tens")
   228 @click.option(
"-n",
"--number",default=10,
   229               help=
"Number of TENS messages to recv before exiting, 0=forever")
   230 @click.option(
"-c",
"--connect", default=
"tcp://127.0.0.1:5555",
   231               help=
"An address to which this client shall connect")
   232 @click.option(
"-v",
"--verbosity", default=
"info",
   233               help=
"Set logging level (debug, info, warning, error, critical)")
   234 @click.argument(
"attrs", nargs=-1)
   237     Client to recv flow of TENS messages.   240     from zio 
import Port, Message, Node
   243     log.level = getattr(logging, verbosity.upper(), 
"INFO")
   247     cnode = 
Node(
"flow-recv-tens")
   248     cport = cnode.port(
"input", zmq.CLIENT)
   249     cport.connect(connect)
   253     attr = dict(credit=2, direction=
"inject", **msg_attr)
   254     bot = 
Message(label=json.dumps(attr))
   256     bot = cflow.recv_bot(5000);
   257     log.debug(
'flow-recv-tens: BOT handshake done')
   262         if number > 0 
and count == number:
   266         log.info(f
'flow-recv-tens: {count}: {msg}')
   270             log.debug(
'flow-recv-tens: EOT whille receiving')
   273     log.debug(f
'flow-recv-tens: send EOT')
   275     log.debug(f
'flow-recv-tens: recv EOT (waiting)')
   277     log.debug(f
'flow-recv-tens: going offline')
   279     log.debug(f
'flow-recv-tens: end')
   284 @cli.command(
"flow-recv-server")
   285 @click.option(
"-n",
"--number",default=10,
   286               help=
"Number of TENS messages to generate")
   287 @click.option(
"-b",
"--bind", default=
"tcp://127.0.0.1:5555",
   288               help=
"An address to which this client shall bind")
   289 @click.option(
"-s",
"--shape", default=
"6000,800",
   290               help=
"Comma separated list of integers giving tensor shape")
   291 @click.option(
"-v",
"--verbosity", default=
"info",
   292               help=
"Set logging level (debug, info, warning, error, critical)")
   293 @click.argument(
"attrs", nargs=-1)
   296     A simple server to catch some TENS messages from flow and dump them.   299     from zio 
import Port, Message, Node
   302     log.level = getattr(logging, verbosity.upper(), 
"INFO")
   306     snode = 
Node(
"server")
   307     sport = snode.port(
"sport", zmq.SERVER)
   312     bot = sflow.recv_bot();
   314     lobj = bot.label_object
   315     lobj[
"direction"] = 
"inject"   316     bot.label_object = lobj
   318     log.debug(
'flow-recv-server: BOT handshake done')
   320     log.debug(
'flow-recv-server: looping')
   322         msg = sflow.get(1000)
   323         log.info(f
'flow-recv-server: {msg}')
   324         if not msg 
or 'EOT' == msg.label_object[
'flow']:
   325             log.debug(
'flow-recv-server: got EOT')
   330     log.debug(f
'flow-recv-server: end')    
 def test_ruleset(ruleset, verbosity, attrs)
 
An identified vertex in a ported, directed graph. 
 
def recv_server(number, bind, shape, verbosity, attrs)
 
def send_tens(number, connect, shape, verbosity, attrs)
 
def recv_tens(number, connect, verbosity, attrs)
 
def file_server(bind, format, name, port, verbosity, ruleset)
 
void dump(const SM &) noexcept