ZIO
Python and C++ interface to ZeroMQ and Zyre
port.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 import zmq
4 from zmq.error import ZMQError
5 from .util import socket_names, needs_codec
6 from .message import Message
7 
8 def bind_address(sock, addr):
9  """Bind socket to address
10 
11  Any error is printed and reraised.
12 
13  :param sock: ZeroMQ socket object
14  :param addr: address in ZeroMQ format
15  :returns: the address
16  :rtype: string
17 
18  """
19  try:
20  sock.bind(addr)
21  except ZMQError as e:
22  print(f'failed to bind {addr}')
23  raise
24 
25  return addr
26 
27 ephemeral_port_range = (49152, 65535)
28 
29 def bind_hostport(sock, host, port):
30  """Bind socket TCP host and port
31 
32  :param sock: ZeroMQ socket object
33  :param host: name or IP address of host to bind
34  :param port: TCP port number
35  :returns: ZeroMQ address string
36  :rtype: string
37 
38  """
39  if type(port) is int and port > 0:
40  return bind_address(sock, "tcp://%s:%d" % (host, port))
41  addr = "tcp://%s"%host
42  try:
43  #print ("bind %s:*" % addr)
44  port = sock.bind_to_random_port(addr,
45  *ephemeral_port_range)
46  except ZMQError as e:
47  print(f'failed to bind {addr}')
48  raise
49  return "tcp://%s:%d" % (host, port)
50 
51 
52 class Port:
53  def __init__(self, name, sock, hostname='127.0.0.1'):
54  '''
55  Create a Port with a name and a ZeroMQ socket type.
56 
57  Ports are typically not meant to be constructed directly by
58  application code but instead through a managing zio.Node
59  '''
60  self.name = name
61  self._hostname = hostname
62  if type(sock) is int:
63  self.ctx = zmq.Context()
64  self.sock = self.ctx.socket(sock)
65  else:
66  self.sock = sock
67  self.origin = None
68  self.to_bind = list()
69  self.to_conn = list()
70  self.headers = dict()
71  self.is_online = False
72  self.connected = list()
73  self.bound = list()
74  self.poller = zmq.Poller()
75  self.poller.register(self.sock, zmq.POLLIN)
76 
77  def __str__(self):
78  return "[port %s]: type:%s binds:%d(todo:%d) conns:%d(todo:%d)" % \
79  (self.name, socket_names[self.sock.type],
80  len(self.bound), len(self.to_bind),
81  len(self.connected), len(self.to_conn))
82 
83  def bind(self, *spec):
84  '''
85  Request a bind.
86 
87  If spec is None, do a default/ephemeral bind.
88  If spec is a string, it is assumed to be a ZeroMQ adddress.
89  If spec is a tuple ithen it is a (hostname,portnumber).
90  '''
91  if not spec:
92  #print ("[port %s] request default bind %s" % (self.name, spec,))
93  self.to_bind.append((self._hostname, 0))
94  return
95  if len(spec) > 2:
96  raise ValueError("unsupported bind specification")
97  self.to_bind.append(spec)
98  return
99 
100  def connect(self, *spec):
101  '''
102  Request a connect.
103 
104  If a single string is given it is a ZeroMQ address.
105  If two strings are given they are (nodename, portname).
106  '''
107  if not spec or len(spec) > 2:
108  raise ValueError("unsupported connect specification")
109  self.to_conn.append(spec)
110 
111  def subscribe(self, prefix=""):
112  '''
113  Subscribe to a PUB/SUB topic.
114 
115  This method is only meaningful if our socket is a SUB and then
116  it MUST be called if messages are expected to be received.
117  '''
118  if self.sock.type is not zmq.SUB:
119  return
120  self.sock.setsockopt_string(zmq.SUBSCRIBE, prefix)
121  pass
122 
123  def add_headers(self, *args, **kwargs):
124  '''
125  Add one or more headers or dictionaries of headers.
126 
127  Every key will be wrapped into ZIO port header convention.
128  The headers will appear to the network as
129 
130  zio.port.<portname>.<key> = <value>
131 
132  The self.headers collects these.
133  '''
134  for d in list(args) + [kwargs]:
135  for k,v in d.items():
136  key = "zio.port.%s.%s" % (self.name, k)
137  self.headers[key] = v
138  pass
139 
140  def do_binds(self):
141  '''
142  Actually perform binds.
143 
144  Return dictionary suitable for use as peer headers the give
145  information about the binds.
146 
147  This must be called prior to any call of .online() and is
148  intended to be used by a zio.Node which holds this zio.Port.
149  '''
150  for spec in self.to_bind:
151  if len(spec) == 2:
152  #print ("[port %s] bind host port %s" % (self.name, spec,))
153  addr = bind_hostport(self.sock, *spec)
154  else: # direct
155  #print ("[port %s] bind address %s" % (self.name, spec[0],))
156  addr = bind_address(self.sock, spec[0])
157  self.bound.append(addr)
158  self.add_headers(address=addr)
159  self.to_bind = list()
160  self.add_headers(socket=socket_names[self.sock.type])
161  return self.headers;
162 
163 
164  def online(self, peer = None):
165  '''
166  Bring this port online.
167 
168  If no peer is given then indirect connects will fail.
169 
170  This method is intended for use by a zio.Node which holds this
171  zio.Port.
172  '''
173  if self.is_online: return
174  self.is_online = True
175  for conn in self.to_conn:
176  if len(conn) == 1 and type(conn[0]) is str:
177  conn = conn[0]
178  self.sock.connect(conn)
179  self.connected.append(conn)
180  continue
181 
182  if not peer:
183  err="[port %s]: no peer given, can not connect %s" % \
184  (self.name, conn)
185  raise ValueError(err)
186 
187  nodename,portname = conn
188  uuids = peer.waitfor(nodename)
189  if not uuids:
190  raise RuntimeError('failed to wait for "%s"' % nodename)
191  for uuid in uuids:
192  pi = peer.peers[uuid]
193  key = f"zio.port.{portname}.address"
194  addr = pi.headers.get(key, False)
195  if not addr:
196  continue
197  self.sock.connect(addr)
198  self.connected.append(addr)
199  continue
200  continue
201  self.to_conn = list()
202  return
203 
204  def offline(self):
205  '''
206  Bring this port offline.
207 
208  This unbinds and disconnects and forgets their addresses
209  '''
210  if not self.is_online: return
211  self.is_online = False
212  for addr in self.connected:
213  self.sock.disconnect(addr)
214  for addr in self.bound:
215  self.sock.unbind(addr)
216  m_bound = list()
217  m_connected = list()
218 
219  def send(self, msg):
220  '''
221  Send a zio.Message
222 
223  This modifies the message prior to sending to set the origin
224  if this port has one.
225  '''
226  if self.origin is not None:
227  msg.coord.origin = self.origin
228  if needs_codec(self.sock.type):
229  data = msg.encode()
230  if (self.sock.type == zmq.SERVER):
231  self.sock.send(data, routing_id = msg.routing_id)
232  else:
233  self.sock.send(data)
234  else:
235  parts = msg.toparts()
236  self.sock.send_multipart(parts)
237  return
238 
239  def recv(self, timeout=None):
240  '''
241  Receive and return a zio.Message waiting up to a timeout
242 
243  If timeout is reached then None is returned.
244  '''
245  which = dict(self.poller.poll(timeout))
246  if not self.sock in which:
247  return None
248 
249  if needs_codec(self.sock.type):
250  if (self.sock.type == zmq.SERVER):
251  frame = self.sock.recv(copy=False)
252  if not frame:
253  return None
254  return Message(frame=frame)
255  else:
256  data = self.sock.recv(copy=True)
257  return Message(encoded=data)
258 
259  parts = self.sock.recv_multipart()
260  if not parts:
261  return None
262  return Message(parts=parts)
263 
def bind_address(sock, addr)
Definition: port.py:8
def online(self, peer=None)
Definition: port.py:164
def add_headers(self, args, kwargs)
Definition: port.py:123
const char * disconnect
Definition: protocol.hpp:25
def offline(self)
Definition: port.py:204
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_=-1)
Definition: zmq.hpp:259
def subscribe(self, prefix="")
Definition: port.py:111
def recv(self, timeout=None)
Definition: port.py:239
def bind_hostport(sock, host, port)
Definition: port.py:29
def __str__(self)
Definition: port.py:77
def bind(self, spec)
Definition: port.py:83
void append(Message &msg, message_t &&data, const std::vector< size_t > &shape, size_t word_size, const char *tn)
Definition: tens.cpp:34
def __init__(self, name, sock, hostname='127.0.0.1')
Definition: port.py:53
def needs_codec(stype)
Definition: util.py:32
def connect(self, spec)
Definition: port.py:100
def send(self, msg)
Definition: port.py:219
def do_binds(self)
Definition: port.py:140