2 '''Support for writing HDF5 files. 6 - a Writer class with a mapping policy from TENS message to HDF group 8 - a file handler actor function 10 - a data flow client handler actor function 16 from zio
import Port, Message
18 from pyre.zactor
import ZActor
19 from zmq
import CLIENT, PUSH, PULL, Poller, POLLIN
22 from ..util
import message_to_dict
25 log = logging.getLogger(
"zio")
29 '''Write ZIO TENS messages to HDF5. 31 Like all flow writers to HDF5, this one will restrict its writing 32 to a given base group. 34 It will make a subgroup for each message based on the message 47 fobj = msg.label_object
48 if fobj[
"flow"] ==
"BOT":
52 tens = fobj.pop(
"TENS")
53 tensors = tens.pop(
"tensors")
55 log.error(
'No TENS label attribute')
58 user_md = tens.get(
"metadata",{})
61 seq = self.
group.get(gn,
None)
63 seq = self.
group.create_group(gn)
65 log.error(f
'HDF5 TENS writer cowardly refusing to use existing group {gn}')
68 tens = seq.create_group(
"tensors")
70 tens.attrs[
"origin"] = msg.origin
71 tens.attrs[
"granule"] = msg.granule
73 for k,v
in fobj.items():
74 if k
in [
"direction"]:
78 except TypeError
as te:
80 log.error(f
'can not serialize type {ot} for key {k}')
86 for tenind, tenmd
in enumerate(tensors):
87 part = int(tenmd.get(
'part', tenind))
91 sword = str(tenmd[
'word'])
92 shape = list(tenmd[
'shape'])
93 dtype = str(tenmd[
'dtype'])
95 log.debug(f
'TENS PART: {tenind}/{nparts} {dtype} {sword} {shape}')
97 data = numpy.frombuffer(ten, dtype=dtype+sword).reshape(shape)
103 md = seq.create_group(
"metadata")
104 md.attrs.update(user_md)
108 '''An actor that marshals messages from socket to file. 115 Name of an HDF file in which to write 117 wargs : tuple of args 119 wargs[0] : string (address pattern) 121 An f-string formatted with a "port" parameter that should 122 resolve to a legal ZeroMQ socket address. When a successful 123 bind() can be done on the result, the resolved address is 124 returned through the pipe. If no successful address can be 125 bound, an empty string is returned as an error indicator. 129 addrpat = wargs.pop(0)
130 log.debug(f
'actor: writer("{filename}", "{addrpat}")')
131 fp = h5py.File(filename,
'w')
132 log.debug(f
'opened {filename}')
134 log.debug(
'make writer PULL socket')
135 pull = ctx.socket(PULL)
136 minport,maxport = 49999,65000
137 for port
in range(minport,maxport):
138 writer_addr = addrpat.format(port=port)
139 pull.bind(writer_addr)
140 log.debug(f
'writer bind to {writer_addr}')
141 pipe.send_string(writer_addr)
147 poller.register(pipe, POLLIN)
148 poller.register(pull, POLLIN)
151 for which,_
in poller.poll():
153 if not which
or which == pipe:
154 log.debug(f
'writer for {filename} exiting')
165 path = fobj.pop(
"hdfgroup")
166 msg.label = json.dumps(fobj)
167 log.debug(f
'{filename}:/{path} writing:\n{msg}')
169 fw = flow_writer.get(path,
None)
171 sg = fp.get(path,
None)
or fp.create_group(path)
174 fw = flow_writer[path] =
TensWriter(sg, *wargs)
177 log.debug(f
'flush {filename}')
183 '''Connect to and marshall messages between broker and writer sockets. 192 rule_object: dicionary 194 A ruleset rule object. 196 writer_addr :: string 198 The address of the writer's PULL socket to connect. 202 The address of the broker's SERVER socket to connect. 207 rattr = dict(rule_object.get(
"attr",{}), **mattr)
209 base_path = rule_object.get(
"grouppat",
"/").
format(**rattr)
210 log.debug(f
'client_handler(msg, "{base_path}", "{broker_addr}", "{writer_addr}")')
214 push = ctx.socket(PUSH)
215 push.connect(writer_addr)
217 sock = ctx.socket(CLIENT)
218 port =
Port(
"write-handler", sock)
219 port.connect(broker_addr)
222 log.debug (f
'writer({base_path}) send BOT to {broker_addr}')
224 bot = flow.recv_bot()
225 log.debug (f
'writer({base_path}) got response:\n{bot}')
229 log.debug (f
'write_handler({base_path}) push {m}')
231 attr[
'hdfgroup'] = base_path
232 m.label = json.dumps(attr)
233 push.send(m.encode())
238 poller.register(pipe, POLLIN)
239 poller.register(sock, POLLIN)
242 for which,_
in poller.poll():
247 log.debug (
'write_handler pipe hit')
254 except Exception
as err:
255 log.warning(
'flow.get error: %s %s' % (
type(err),err))
259 log.debug(
"write_handler: got EOT")
268 log.debug (
'write_handler exiting')
def file_handler(ctx, pipe, filename, wargs)
def __init__(self, group)
def client_handler(ctx, pipe, bot, rule_object, writer_addr, broker_addr)
A port holds a socket in the context of a node.