ZIO
Python and C++ interface to ZeroMQ and Zyre
writer.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 '''Support for writing HDF5 files.
3 
4 It provides:
5 
6 - a Writer class with a mapping policy from TENS message to HDF group
7 
8 - a file handler actor function
9 
10 - a data flow client handler actor function
11 
12 '''
13 
14 import json
15 
16 from zio import Port, Message
17 from zio.flow import objectify, Flow
18 from pyre.zactor import ZActor
19 from zmq import CLIENT, PUSH, PULL, Poller, POLLIN
20 import h5py
21 import numpy
22 from ..util import message_to_dict
23 
24 import logging
25 log = logging.getLogger("zio")
26 
27 
28 class TensWriter:
29  '''Write ZIO TENS messages to HDF5.
30 
31  Like all flow writers to HDF5, this one will restrict its writing
32  to a given base group.
33 
34  It will make a subgroup for each message based on the message
35  sequence number.
36 
37  '''
38 
39  seqno_interp = "%04d"
40  part_interp = "%02d"
41 
42  def __init__(self, group):
43  self.group = group
44 
45  def save(self, msg):
46 # log.debug(f'save: {msg}')
47  fobj = msg.label_object
48  if fobj["flow"] == "BOT":
49  return
50 
51  try:
52  tens = fobj.pop("TENS")
53  tensors = tens.pop("tensors")
54  except KeyError:
55  log.error('No TENS label attribute')
56  log.error(f'{msg}')
57  return
58  user_md = tens.get("metadata",{})
59 
60  gn = self.seqno_interp % msg.seqno
61  seq = self.group.get(gn, None)
62  if seq is None: # as it should be
63  seq = self.group.create_group(gn)
64  else:
65  log.error(f'HDF5 TENS writer cowardly refusing to use existing group {gn}')
66  log.error(f'{msg}')
67  return
68  tens = seq.create_group("tensors")
69 
70  tens.attrs["origin"] = msg.origin
71  tens.attrs["granule"] = msg.granule
72 
73  for k,v in fobj.items():
74  if k in ["direction"]:
75  continue
76  try:
77  tens.attrs[k] = v
78  except TypeError as te:
79  ot = type(v)
80  log.error(f'can not serialize type {ot} for key {k}')
81  continue
82 
83  parts = msg.toparts()
84  parts = parts[2:] # skip headers
85  nparts = len(parts)
86  for tenind, tenmd in enumerate(tensors):
87  part = int(tenmd.get('part', tenind))
88  ten = parts[part]
89 
90  # required
91  sword = str(tenmd['word'])
92  shape = list(tenmd['shape'])
93  dtype = str(tenmd['dtype'])
94 
95  log.debug(f'TENS PART: {tenind}/{nparts} {dtype} {sword} {shape}')
96 
97  data = numpy.frombuffer(ten, dtype=dtype+sword).reshape(shape)
98  ds = tens.create_dataset(self.part_interp % part,
99  data = data,
100  chunks = True)
101 
102  if user_md:
103  md = seq.create_group("metadata")
104  md.attrs.update(user_md)
105 
106 
107 def file_handler(ctx, pipe, filename, *wargs):
108  '''An actor that marshals messages from socket to file.
109 
110  Parameters
111  -----------
112 
113  filename : string
114 
115  Name of an HDF file in which to write
116 
117  wargs : tuple of args
118 
119  wargs[0] : string (address pattern)
120 
121  An f-string formatted with a "port" parameter that should
122  resolve to a legal ZeroMQ socket address. When a successful
123  bind() can be done on the result, the resolved address is
124  returned through the pipe. If no successful address can be
125  bound, an empty string is returned as an error indicator.
126 
127  '''
128  wargs = list(wargs)
129  addrpat = wargs.pop(0)
130  log.debug(f'actor: writer("{filename}", "{addrpat}")')
131  fp = h5py.File(filename,'w')
132  log.debug(f'opened {filename}')
133  pipe.signal()
134  log.debug('make writer PULL socket')
135  pull = ctx.socket(PULL)
136  minport,maxport = 49999,65000
137  for port in range(minport,maxport):
138  writer_addr = addrpat.format(port=port)
139  pull.bind(writer_addr)
140  log.debug(f'writer bind to {writer_addr}')
141  pipe.send_string(writer_addr)
142  break
143 
144  flow_writer = dict()
145 
146  poller = Poller()
147  poller.register(pipe, POLLIN)
148  poller.register(pull, POLLIN)
149 
150  while True:
151  for which,_ in poller.poll():
152 
153  if not which or which == pipe: # signal exit
154  log.debug(f'writer for {filename} exiting')
155  fp.close()
156  return
157 
158  # o.w. we have flow
159 
160  data = pull.recv()
161  if not data:
162  continue
163  msg = Message(encoded=data)
164  fobj = objectify(msg)
165  path = fobj.pop("hdfgroup") # must be supplied
166  msg.label = json.dumps(fobj)
167  log.debug(f'{filename}:/{path} writing:\n{msg}')
168 
169  fw = flow_writer.get(path, None)
170  if fw is None:
171  sg = fp.get(path, None) or fp.create_group(path)
172  # note: in principle/future, TensWriter type can be
173  # made an arg to support other message formats.
174  fw = flow_writer[path] = TensWriter(sg, *wargs)
175 
176  fw.save(msg)
177  log.debug(f'flush {filename}')
178  fp.flush()
179 
180  return
181 
182 def client_handler(ctx, pipe, bot, rule_object, writer_addr, broker_addr):
183  '''Connect to and marshall messages between broker and writer sockets.
184 
185  Parameters
186  ----------
187 
188  bot : zio.Message
189 
190  The BOT message
191 
192  rule_object: dicionary
193 
194  A ruleset rule object.
195 
196  writer_addr :: string
197 
198  The address of the writer's PULL socket to connect.
199 
200  broker_addr : string
201 
202  The address of the broker's SERVER socket to connect.
203 
204  '''
205  # An HDF path to be added to every message we send to writer.
206  mattr = message_to_dict(bot)
207  rattr = dict(rule_object.get("attr",{}), **mattr)
208  log.info(f'{rattr}')
209  base_path = rule_object.get("grouppat","/").format(**rattr)
210  log.debug(f'client_handler(msg, "{base_path}", "{broker_addr}", "{writer_addr}")')
211  log.debug(bot)
212  pipe.signal()
213 
214  push = ctx.socket(PUSH)
215  push.connect(writer_addr)
216 
217  sock = ctx.socket(CLIENT)
218  port = Port("write-handler", sock)
219  port.connect(broker_addr)
220  port.online(None)
221  flow = Flow(port)
222  log.debug (f'writer({base_path}) send BOT to {broker_addr}')
223  flow.send_bot(bot) # this introduces us to the server
224  bot = flow.recv_bot()
225  log.debug (f'writer({base_path}) got response:\n{bot}')
226  flow.flush_pay()
227 
228  def push_message(m):
229  log.debug (f'write_handler({base_path}) push {m}')
230  attr = message_to_dict(m)
231  attr['hdfgroup'] = base_path
232  m.label = json.dumps(attr)
233  push.send(m.encode())
234 
235  push_message(bot)
236 
237  poller = Poller()
238  poller.register(pipe, POLLIN)
239  poller.register(sock, POLLIN)
240  while True:
241 
242  for which,_ in poller.poll():
243  if not which:
244  return
245 
246  if which == pipe: # signal exit
247  log.debug ('write_handler pipe hit')
248  return
249 
250  # o.w. we have flow
251 
252  try:
253  msg = flow.get()
254  except Exception as err:
255  log.warning('flow.get error: %s %s' % (type(err),err))
256  continue
257 
258  if not msg:
259  log.debug("write_handler: got EOT")
260  flow.send_eot()
261  # fixme: send an EOT also to push socket?.
262  break
263 
264  push_message(msg)
265 
266  continue
267 
268  log.debug ('write_handler exiting')
269  pipe.signal()
270 
def file_handler(ctx, pipe, filename, wargs)
Definition: writer.py:107
ZIO data flow.
Definition: flow.hpp:17
def __init__(self, group)
Definition: writer.py:42
def objectify(morl)
Definition: util.py:5
def client_handler(ctx, pipe, bot, rule_object, writer_addr, broker_addr)
Definition: writer.py:182
def message_to_dict(msg)
Definition: util.py:44
A port holds a socket in the context of a node.
Definition: port.hpp:27
a ZIO message
Definition: message.hpp:59