ZIO
Python and C++ interface to ZeroMQ and Zyre
test_peer.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 import logging
4 logger = logging.getLogger("zio.peer")
5 logger.level = logging.DEBUG
6 
7 import zio
8 import unittest
9 
10 class TestPeer(unittest.TestCase):
11 
12  nick1 = "zio_test_peer_nick1"
13  nick2 = "zio_test_peer_nick2"
14 
15  def setUp(self):
16  self.p1 = zio.Peer(self.nick1, a='42', b='hello') #, verbose=True)
17  self.p2 = zio.Peer(self.nick2, a='6.9', b='world') #, verbose=True)
18 
19  def tearDown(self):
20  del self.p2
21  del self.p1
22 
23  def test_01poll(self):
24  ok = self.p2.poll(timeout=1000)
25  assert(ok)
26 
27  def test_02waitfor(self):
28  ok = self.p1.waitfor(self.nick2, timeout=1000)
29  assert(ok)
30  uids = self.p1.matchnick(self.nick2)
31  assert (len(uids)>0)
32  pi = self.p1.peers[uids[0]]
33  assert(pi.nick == self.nick2)
34  assert(pi.headers['b'] == 'world') # may fail if other peers match nick
35 
36  def test_03drain(self):
37  self.p1.drain()
38  self.p2.drain()
39 
40  def test_04stop(self):
41  self.p1.stop()
42 
43 if __name__ == '__main__':
44  unittest.main()
45 
def test_01poll(self)
Definition: test_peer.py:23
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_=-1)
Definition: zmq.hpp:259
def tearDown(self)
Definition: test_peer.py:19
def test_03drain(self)
Definition: test_peer.py:36
def test_04stop(self)
Definition: test_peer.py:40
def setUp(self)
Definition: test_peer.py:15
def test_02waitfor(self)
Definition: test_peer.py:27
Peer at the network to discover peers and advertise self.
Definition: peer.hpp:44