ZIO
Python and C++ interface to ZeroMQ and Zyre
peer.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 '''
3 ZIO peering
4 '''
5 import zmq
6 import pyre
7 import uuid
8 import json
9 import time
10 import logging
11 from collections import namedtuple
12 
13 log = logging.getLogger(__name__)
14 PeerInfo = namedtuple("PeerInfo","uuid nick headers")
15 
16 class Peer:
17  '''
18  Peer at the network to discover others and advertise self.
19 
20  This is equivalent to C++ zio::Peer and is a wrapper around Pyre,
21  the Python Zyre.
22  '''
23 
24  def __init__(self, nickname, **headers):
25  '''
26  Create a peer with a nickname.
27 
28  Extra headers may be given as a dictionary.
29  '''
30  self.zyre = pyre.Pyre(nickname)
31  self.peers = dict() # by UUID
32 
33  for k,v in headers.items():
34  v = str(v)
35  self.zyre.set_header(k,v)
36  self.zyre.start()
37  self.poller = zmq.Poller()
38  self.poller.register(self.zyre.socket(), zmq.POLLIN)
39 
40  def __del__(self):
41  self.stop()
42 
43  def stop(self):
44  '''
45  Stop the peer.
46 
47  This MUST be called or the application will hang.
48  '''
49  if hasattr(self, "zyre"):
50  self.drain()
51  self.zyre.stop()
52  del self.zyre
53 
54  def poll(self, timeout=0):
55  '''
56  Poll the network for an update.
57 
58  Return True if an event was received. True is returned on
59  reception of any type of Zyre event. Use timeout in msec to
60  wait for an event.
61  '''
62  which = dict(self.poller.poll(timeout))
63  if not self.zyre.socket() in which:
64  return None
65  msg = self.zyre.recv()
66  if log.isEnabledFor(logging.DEBUG):
67  log.debug("[peer %s]: zyre message:" %(self.zyre.name(),))
68  for ind, part in enumerate(msg):
69  if len(part) < 100:
70  log.debug(" part %d: %s" %(ind, part))
71  else:
72  log.debug(" part %d: (long %d bytes)" %(ind, len(part)))
73  if msg[0] == b'ENTER':
74  uid = uuid.UUID(bytes=msg[1])
75  nick = msg[2].decode('utf-8')
76  headers = json.loads(msg[3].decode('utf-8'))
77  self.peers[uid] = PeerInfo(uid, nick, headers)
78  if msg[0] == b'EXIT':
79  uid = uuid.UUID(bytes=msg[1])
80  if self.peers.get(uid, None):
81  del self.peers[uid]
82  pass
83 
84  return True
85 
86  def drain(self):
87  '''
88  Continually poll until all queued Zyre events are processed.
89  '''
90  while self.poll(0):
91  pass
92 
93  def matchnick(self, nick):
94  '''
95  Return UUIDs of all peers with matching nicks
96  '''
97  ret = list()
98  for uid,pi in self.peers.items():
99  if pi.nick == nick:
100  ret.append(uid)
101  return ret;
102 
103 
104  def waitfor(self, nickname, timeout=-1):
105  '''
106  Wait for at least one peer with the nickname to be.
107 
108  Return a list of UUIDs of peers discovered to have this
109  nickname.
110  '''
111  t0ms = 1000*time.time()
112  while True:
113  self.drain()
114  got = self.matchnick(nickname)
115  if got:
116  return got
117  nowms = 1000*time.time()
118  if timeout >= 0: # reduce finite timeout
119  timeout = timeout - (nowms-t0ms)
120  if timeout < 0:
121  return got
122  log.debug("[peer %s]: wait for %s (timeout=%d)" % \
123  (self.zyre.name(), nickname, timeout))
124  #print(self.peers)
125  self.poll(timeout)
126 
127 
128  def party(self, nicks, timeout=-1):
129  '''
130  Wait as set of nicks have been seen or until timeout.
131 
132  Return a peer dictionary holding last seen info about peers
133  with matching nicks.
134  '''
135  until = -1
136  if timeout >= 0:
137  until = time.time() + 1e-3*timeout
138  want = set(nicks)
139  seen = dict()
140  self.drain()
141  #print ("want:", want)
142  while True:
143  for pi in self.peers.values():
144  if pi.nick in want:
145  seen[pi.uuid] = pi
146  know = set([pi.nick for pi in seen.values()])
147  if want == know:
148  return seen
149  if until > 0:
150  timeout = int(1000*(until - time.time()))
151  if timeout <= 0:
152  return seen
153  print ("seen:",' '.join(know))
154  #print ("peers:",self.peers)
155  self.poll(timeout)
156 
157 
158 
159 
160 
def waitfor(self, nickname, timeout=-1)
Definition: peer.py:104
def matchnick(self, nick)
Definition: peer.py:93
def __del__(self)
Definition: peer.py:40
name
Definition: setup.py:4
def drain(self)
Definition: peer.py:86
def stop(self)
Definition: peer.py:43
PeerInfo
Definition: peer.py:14
def __init__(self, nickname, headers)
Definition: peer.py:24
def party(self, nicks, timeout=-1)
Definition: peer.py:128
def poll(self, timeout=0)
Definition: peer.py:54