8 log = logging.getLogger(
"zio")
14 ZIO FLOW command line interface 31 Return dictionary of attribute values parsed from list of "k=v" strings. 36 k,v = attr.split(
'=', 1)
38 log.error(f
'failed to parse {attr}')
43 @cli.command(
"test-ruleset")
44 @click.option(
"-r",
"--ruleset",type=click.Path(), required=
True,
45 help=
"A file in JSON or Jsonnet format providing the ruleset")
46 @click.option(
"-v",
"--verbosity", default=
"info",
47 help=
"Set logging level (debug, info, warning, error, critical)")
48 @click.argument(
"attrs", nargs=-1)
51 Test a rule set by giving attributes as key=value on command line. 53 log.level = getattr(logging, verbosity.upper(),
"INFO")
57 rs = jsonnet.load(ruleset)
59 for ind,robj
in enumerate(rs):
60 attr = dict(robj.get(
'attr',{}), **msg_attr)
61 log.debug(f
'attr: {attr}')
63 def dump(errtype, err):
64 log.error(f
'{errtype} "{err}"\n{rule}\n{attr}')
68 parsed = rules.parse(robj, **attr)
69 except KeyError
as ke:
75 log.debug(f
'parsed expression: {parsed}')
76 expr = rules.Rule(parsed)
77 log.debug(f
'rule expression: {expr}')
79 log.debug(f
'rule evaluation: {tf}')
82 filepat = robj[
'filepat']
84 path = filepat.format(**attr)
85 except KeyError
as ke:
86 dump(f
'key error "{filepat}"', ke)
89 grouppat = robj[
'grouppat']
91 group = grouppat.format(**attr)
92 except KeyError
as ke:
93 dump(f
'key error "{grouppat}"', ke)
97 tf =
"TRUE" if tf
else "FALSE" 98 log.info(f
'#{ind} {tf:5s} {rw} {path}:/{group}')
101 @cli.command(
"flow-file-server")
102 @click.option(
"-b",
"--bind", default=
"tcp://127.0.0.1:5555",
103 help=
"An address to which the server shall bind")
104 @click.option(
"-f",
"--format", default=
"hdf", type=click.Choice([
"hdf"]),
106 @click.option(
"-n",
"--name", default=
"file-server",
107 help=
"The Zyre node name for the server")
108 @click.option(
"-p",
"--port", default=
"flow",
109 help=
"The port name for the server socket")
110 @click.option(
"-v",
"--verbosity", default=
"info",
111 help=
"Set logging level (debug, info, warning, error, critical)")
112 @click.argument(
"ruleset")
114 '''Source and sink ZIO flow protocol to file 116 This brings back-end reader and writer handlers to external 117 clients via the flow broker. A ruleset factory dynamically spawns 118 handlers based on incoming BOT flow messages. The file format 119 maps to different flavors of handlers. 123 from zio
import Port, Message, Node
131 assert(format ==
"hdf")
133 log.level = getattr(logging, verbosity.upper(),
"INFO")
135 ruleset = jsonnet_load(ruleset)
139 factory = Factory(ctx, ruleset,
140 wactors=((writer.file_handler,
141 (
"inproc://"+format+
"writer{port}")),
142 (writer.client_handler,
144 ractor=(reader.handler,(bind,)))
147 sport = node.port(port, zmq.SERVER)
150 log.info(f
'domo-broker {name}:{port} online at {bind}')
153 broker = Broker(sport, factory)
155 log.info(f
'domo-broker {name} entering loop')
161 log.debug(f
'domo-broker {name} is lonely')
162 log.debug(node.peer.peers)
163 except Exception
as e:
169 @cli.command(
"flow-send-tens")
170 @click.option(
"-n",
"--number",default=10,
171 help=
"Number of TENS messages to generate")
172 @click.option(
"-c",
"--connect", default=
"tcp://127.0.0.1:5555",
173 help=
"An address to which this client shall connect")
174 @click.option(
"-s",
"--shape", default=
"6000,800",
175 help=
"Comma separated list of integers giving tensor shape")
176 @click.option(
"-v",
"--verbosity", default=
"info",
177 help=
"Set logging level (debug, info, warning, error, critical)")
178 @click.argument(
"attrs", nargs=-1)
179 def send_tens(number, connect, shape, verbosity, attrs):
181 Generate and flow some TENS messages. 184 from zio
import Port, Message, Node
187 log.level = getattr(logging, verbosity.upper(),
"INFO")
191 cnode =
Node(
"flow-send-tens")
192 cport = cnode.port(
"output", zmq.CLIENT)
193 cport.connect(connect)
197 shape = list(map(int, shape.split(
',')))
202 attr = dict(credit=2, direction=
"extract", **msg_attr)
203 bot =
Message(label=json.dumps(attr))
205 bot = cflow.recv_bot(5000);
206 log.debug(
'flow-send-tens: BOT handshake done')
209 tens_attr = dict(shape=shape, word=1, dtype=
'u') # unsigned char 210 attr["TENS"] = dict(tensors=[tens_attr], metadata=dict(source=
"gen-tens"))
211 label = json.dumps(attr)
212 payload = [b
'X'*size]
214 for count
in range(number):
215 msg =
Message(label=label,payload=payload)
217 log.debug(f
'flow-send-tens: {count}: {msg}')
219 log.debug(f
'flow-send-tens: send EOT')
221 log.debug(f
'flow-send-tens: recv EOT (waiting)')
223 log.debug(f
'flow-send-tens: going offline')
225 log.debug(f
'flow-send-tens: end')
227 @cli.command(
"flow-recv-tens")
228 @click.option(
"-n",
"--number",default=10,
229 help=
"Number of TENS messages to recv before exiting, 0=forever")
230 @click.option(
"-c",
"--connect", default=
"tcp://127.0.0.1:5555",
231 help=
"An address to which this client shall connect")
232 @click.option(
"-v",
"--verbosity", default=
"info",
233 help=
"Set logging level (debug, info, warning, error, critical)")
234 @click.argument(
"attrs", nargs=-1)
237 Client to recv flow of TENS messages. 240 from zio
import Port, Message, Node
243 log.level = getattr(logging, verbosity.upper(),
"INFO")
247 cnode =
Node(
"flow-recv-tens")
248 cport = cnode.port(
"input", zmq.CLIENT)
249 cport.connect(connect)
253 attr = dict(credit=2, direction=
"inject", **msg_attr)
254 bot =
Message(label=json.dumps(attr))
256 bot = cflow.recv_bot(5000);
257 log.debug(
'flow-recv-tens: BOT handshake done')
262 if number > 0
and count == number:
266 log.info(f
'flow-recv-tens: {count}: {msg}')
270 log.debug(
'flow-recv-tens: EOT whille receiving')
273 log.debug(f
'flow-recv-tens: send EOT')
275 log.debug(f
'flow-recv-tens: recv EOT (waiting)')
277 log.debug(f
'flow-recv-tens: going offline')
279 log.debug(f
'flow-recv-tens: end')
284 @cli.command(
"flow-recv-server")
285 @click.option(
"-n",
"--number",default=10,
286 help=
"Number of TENS messages to generate")
287 @click.option(
"-b",
"--bind", default=
"tcp://127.0.0.1:5555",
288 help=
"An address to which this client shall bind")
289 @click.option(
"-s",
"--shape", default=
"6000,800",
290 help=
"Comma separated list of integers giving tensor shape")
291 @click.option(
"-v",
"--verbosity", default=
"info",
292 help=
"Set logging level (debug, info, warning, error, critical)")
293 @click.argument(
"attrs", nargs=-1)
296 A simple server to catch some TENS messages from flow and dump them. 299 from zio
import Port, Message, Node
302 log.level = getattr(logging, verbosity.upper(),
"INFO")
306 snode =
Node(
"server")
307 sport = snode.port(
"sport", zmq.SERVER)
312 bot = sflow.recv_bot();
314 lobj = bot.label_object
315 lobj[
"direction"] =
"inject" 316 bot.label_object = lobj
318 log.debug(
'flow-recv-server: BOT handshake done')
320 log.debug(
'flow-recv-server: looping')
322 msg = sflow.get(1000)
323 log.info(f
'flow-recv-server: {msg}')
324 if not msg
or 'EOT' == msg.label_object[
'flow']:
325 log.debug(
'flow-recv-server: got EOT')
330 log.debug(f
'flow-recv-server: end')
def test_ruleset(ruleset, verbosity, attrs)
An identified vertex in a ported, directed graph.
def recv_server(number, bind, shape, verbosity, attrs)
def send_tens(number, connect, shape, verbosity, attrs)
def recv_tens(number, connect, verbosity, attrs)
def file_server(bind, format, name, port, verbosity, ruleset)
void dump(const SM &) noexcept