4 from zmq.error
import ZMQError
5 from .util
import socket_names, needs_codec
6 from .message
import Message
9 """Bind socket to address 11 Any error is printed and reraised. 13 :param sock: ZeroMQ socket object 14 :param addr: address in ZeroMQ format 22 print(f
'failed to bind {addr}')
27 ephemeral_port_range = (49152, 65535)
30 """Bind socket TCP host and port 32 :param sock: ZeroMQ socket object 33 :param host: name or IP address of host to bind 34 :param port: TCP port number 35 :returns: ZeroMQ address string 39 if type(port)
is int
and port > 0:
41 addr =
"tcp://%s"%host
44 port = sock.bind_to_random_port(addr,
45 *ephemeral_port_range)
47 print(f
'failed to bind {addr}')
49 return "tcp://%s:%d" % (host, port)
53 def __init__(self, name, sock, hostname='127.0.0.1'):
55 Create a Port with a name and a ZeroMQ socket type. 57 Ports are typically not meant to be constructed directly by 58 application code but instead through a managing zio.Node 63 self.
ctx = zmq.Context()
78 return "[port %s]: type:%s binds:%d(todo:%d) conns:%d(todo:%d)" % \
79 (self.
name, socket_names[self.
sock.type],
87 If spec is None, do a default/ephemeral bind. 88 If spec is a string, it is assumed to be a ZeroMQ adddress. 89 If spec is a tuple ithen it is a (hostname,portnumber). 96 raise ValueError(
"unsupported bind specification")
104 If a single string is given it is a ZeroMQ address. 105 If two strings are given they are (nodename, portname). 107 if not spec
or len(spec) > 2:
108 raise ValueError(
"unsupported connect specification")
113 Subscribe to a PUB/SUB topic. 115 This method is only meaningful if our socket is a SUB and then 116 it MUST be called if messages are expected to be received. 118 if self.
sock.type
is not zmq.SUB:
120 self.
sock.setsockopt_string(zmq.SUBSCRIBE, prefix)
125 Add one or more headers or dictionaries of headers. 127 Every key will be wrapped into ZIO port header convention. 128 The headers will appear to the network as 130 zio.port.<portname>.<key> = <value> 132 The self.headers collects these. 134 for d
in list(args) + [kwargs]:
135 for k,v
in d.items():
136 key =
"zio.port.%s.%s" % (self.
name, k)
142 Actually perform binds. 144 Return dictionary suitable for use as peer headers the give 145 information about the binds. 147 This must be called prior to any call of .online() and is 148 intended to be used by a zio.Node which holds this zio.Port. 166 Bring this port online. 168 If no peer is given then indirect connects will fail. 170 This method is intended for use by a zio.Node which holds this 176 if len(conn) == 1
and type(conn[0])
is str:
183 err=
"[port %s]: no peer given, can not connect %s" % \
185 raise ValueError(err)
187 nodename,portname = conn
188 uuids = peer.waitfor(nodename)
190 raise RuntimeError(
'failed to wait for "%s"' % nodename)
192 pi = peer.peers[uuid]
193 key = f
"zio.port.{portname}.address" 194 addr = pi.headers.get(key,
False)
206 Bring this port offline. 208 This unbinds and disconnects and forgets their addresses 214 for addr
in self.
bound:
215 self.
sock.unbind(addr)
223 This modifies the message prior to sending to set the origin 224 if this port has one. 226 if self.
origin is not None:
227 msg.coord.origin = self.
origin 230 if (self.
sock.type == zmq.SERVER):
231 self.
sock.
send(data, routing_id = msg.routing_id)
235 parts = msg.toparts()
236 self.
sock.send_multipart(parts)
241 Receive and return a zio.Message waiting up to a timeout 243 If timeout is reached then None is returned. 246 if not self.
sock in which:
250 if (self.
sock.type == zmq.SERVER):
259 parts = self.
sock.recv_multipart()
def bind_address(sock, addr)
def online(self, peer=None)
def add_headers(self, args, kwargs)
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_=-1)
def subscribe(self, prefix="")
def recv(self, timeout=None)
def bind_hostport(sock, host, port)
void append(Message &msg, message_t &&data, const std::vector< size_t > &shape, size_t word_size, const char *tn)
def __init__(self, name, sock, hostname='127.0.0.1')