ZIO
Python and C++ interface to ZeroMQ and Zyre
peer.cpp
Go to the documentation of this file.
1 #include "zio/peer.hpp"
2 #include "zio/logging.hpp"
3 
4 zio::headerset_t zio::peer_info_t::branch(const std::string& prefix)
5 {
6  const size_t len = prefix.size();
7  headerset_t ret;
8  for (const auto& hh : headers) {
9  if (hh.first.substr(0,len) != prefix) {
10  continue;
11  }
12  ret[hh.first.substr(len)] = hh.second;
13  }
14  return ret;
15 }
16 
18  bool verbose)
19  : m_nick(nickname), m_verbose(verbose), m_zyre(nullptr)
20 {
21  m_zyre = zyre_new(m_nick.c_str());
22  if (!m_zyre) {
23  std::string s = "Peer(" + m_nick + ") failed to create zyre";
24  throw std::runtime_error(s);
25  }
26  if (m_verbose) {
27  zyre_set_verbose(m_zyre);
28  }
29  for (const auto& header : headers) {
30  if (verbose)
31  zio::debug("[peer {}]: header {} = {}",
32  m_nick.c_str(),
33  header.first.c_str(), header.second.c_str());
34  zyre_set_header(m_zyre, header.first.c_str(), "%s", header.second.c_str());
35  }
36  // fixme: starting this here negates the user doing various useful
37  // things with the zyre node.
38  int rc = zyre_start(m_zyre);
39  if (rc < 0) {
40  throw std::runtime_error("failed to start zyre");
41  }
42 }
44 {
45  zio::debug("[peer {}]: stop and destroy", m_nick.c_str());
46  zyre_stop(m_zyre);
47  zyre_destroy(&m_zyre);
48 }
49 
51 {
52  const int64_t start = zclock_usecs() / 1000;
53 
54  bool got_one = false;
55  zpoller_t* poller = zpoller_new(zyre_socket(m_zyre), NULL);
56  while (true) {
57  void* which = zpoller_wait(poller, timeout);
58  if (!which) {
59  zio::debug("[peer {}]: poll timeout", m_nick.c_str());
60  break; // timeout
61  }
62  zyre_event_t *event = zyre_event_new (m_zyre);
63  if (!event) {
64  zio::debug("[peer {}]: no zyre event", m_nick.c_str());
65  break; // timeout
66  }
67  const char* event_type = zyre_event_type(event);
68  zio::debug("[peer {}]: poll event '{}'",
69  m_nick.c_str(), event_type);
70  zyre_event_print(event);
71  if (streq(event_type, "ENTER")) {
72  uuid_t uuid = zyre_event_peer_uuid(event);
73  zio::debug("[peer {}]: poll ENTER peer \"{}\"",
74  m_nick.c_str(),uuid.c_str());
75  peer_info_t pi;
76  pi.nick = zyre_event_peer_name(event);
77  zhash_t* hh = zyre_event_headers(event);
78  zlist_t* keys = zhash_keys(hh);
79  void* cursor = zlist_first(keys);
80  while (cursor) {
81  void* vptr = zhash_lookup(hh, static_cast<char*>(cursor));
82  header_key_t key = static_cast<char*>(cursor);
83  header_value_t val = static_cast<char*>(vptr);
84  pi.headers[key] = val;
85  zio::debug("[peer {}]: poll add {} header {}={}",
86  m_nick.c_str(), pi.nick.c_str(),
87  key.c_str(), val.c_str());
88  cursor = zlist_next(keys);
89  }
90  m_known_peers[uuid] = pi;
91  zlist_destroy (&keys);
92  zio::debug("[peer {}]: poll add \"{}\" ({}), know {}",
93  m_nick.c_str(), pi.nick.c_str(), uuid.c_str(),
94  m_known_peers.size());
95  got_one = true;
96  }
97  else
98  if (streq(event_type, "EXIT")) {
99  uuid_t uuid = zyre_event_peer_uuid(event);
100  peerset_t::iterator maybe = m_known_peers.find(uuid);
101  if (maybe != m_known_peers.end()) {
102  m_known_peers.erase(maybe);
103  }
104  zio::debug("[peer {}]: poll remove {}, know {}",
105  m_nick.c_str(), uuid.c_str(), m_known_peers.size());
106  got_one = true;
107  }
108  zyre_event_destroy(&event);
109 
110  // Ignore other events but keep going if we've not yet reached timeout
111  if (!got_one) {
112  if (timeout < 0) {
113  zio::debug("[peer {}]: poll again with infinite wait",
114  m_nick.c_str());
115  continue;
116  }
117  auto timeleft = zclock_usecs() / 1000 - start - timeout;
118  if (timeleft > 0) {
119  zio::debug("[peer {}]: poll again with {} msec left",
120  m_nick.c_str(), timeleft);
121  continue;
122  }
123  }
124  zio::debug("[peer {}]: poll done", m_nick.c_str());
125  break;
126  }
127  zpoller_destroy(&poller); // fixme: retain poller?
128  return got_one;
129 }
130 
131 
132 std::vector<zio::uuid_t> zio::Peer::nickmatch(const nickname_t& nickname)
133 {
134  std::vector<uuid_t> ret;
135  for (const auto& pp : m_known_peers) {
136  zio::debug("[peer {}]: check nick '{}' against '{}'",
137  m_nick.c_str(),
138  nickname.c_str(), pp.second.nick.c_str());
139  if (pp.second.nick == nickname) {
140  ret.push_back(pp.first);
141  }
142  }
143  zio::debug("[peer {}]: nickmatch found {} instances of {}",
144  m_nick.c_str(), ret.size(), nickname.c_str());
145  return ret;
146 }
147 
148 std::vector<zio::uuid_t> zio::Peer::waitfor(const nickname_t& nickname,
150 {
151  const int64_t start = zclock_usecs() / 1000;
152  std::vector<uuid_t> maybe = nickmatch(nickname);
153 
154  while (maybe.empty()) {
155  zio::debug("[peer {}]: waitfor peer \"{}\" to come online",
156  m_nick.c_str(), nickname.c_str());
157  bool ok = poll(timeout);
158  maybe = nickmatch(nickname);
159  zio::debug("[peer {}]: waitfor see \"{}\" after {} ms, have {} match",
160  m_nick.c_str(), nickname.c_str(),
161  zclock_usecs() / 1000 - start,
162  maybe.size());
163 
164  if (maybe.size()) {
165  break;
166  }
167  if (timeout < 0) {
168  continue;
169  }
170  if (zclock_usecs() / 1000 - start >= timeout) {
171  break;
172  }
173  }
174  return maybe;
175 }
176 
178 {
179  while (poll()) {};
180 }
181 
182 bool zio::Peer::isknown(const uuid_t& uuid)
183 {
184  drain();
185  peerset_t::iterator maybe = m_known_peers.find(uuid);
186  return maybe != m_known_peers.end();
187 }
188 
190 {
191  drain();
192  return m_known_peers;
193 }
195 {
196  drain();
197  peerset_t::iterator maybe = m_known_peers.find(uuid);
198  if (maybe == m_known_peers.end()) {
199  zio::debug("[peer {}]: failed to find peer {} out of {}",
200  m_nick.c_str(), uuid.c_str(), m_known_peers.size());
201  return peer_info_t{};
202  }
203  return maybe->second;
204 }
205 
207 {
208  m_verbose = verbose;
209  if (m_verbose and m_zyre) {
210  // note, zyre's verbose can't be turned off.
211  zyre_set_verbose(m_zyre);
212  }
213 
214 }
Peer(const nickname_t &nickname, const headerset_t &headers={}, bool verbose=false)
Advertise own nickname and headers.
Definition: peer.cpp:17
void drain()
Continually poll until all queued zyre messages are processed.
Definition: peer.cpp:177
const peerset_t & peers()
Return known peers as map from UUID to nickname.
Definition: peer.cpp:189
const nickname_t nickname()
Get our nickname.
Definition: peer.hpp:60
std::string uuid_t
Uniquely identify a peer.
Definition: peer.hpp:14
std::string header_key_t
A "header" is on in a set of key/value pairs asserted by a peer.
Definition: peer.hpp:17
bool isknown(const uuid_t &uuid)
Return true if peer has been seen ENTER the network and not yet seen to EXIT.
Definition: peer.cpp:182
std::map< header_key_t, header_value_t > headerset_t
Definition: peer.hpp:22
~Peer()
Definition: peer.cpp:43
nickname_t nick
Definition: peer.hpp:25
headerset_t headers
Definition: peer.hpp:26
std::map< uuid_t, peer_info_t > peerset_t
Definition: peer.hpp:36
void set_verbose(bool verbose=true)
Turn on verbose debugging of the underlying Zyre actor.
Definition: peer.cpp:206
std::vector< uuid_t > nickmatch(const nickname_t &nick)
Return all UUIDs with matching nickname.
Definition: peer.cpp:132
int timeout_t
A timeout in milliseconds.
Definition: peer.hpp:47
std::string nickname_t
A peer asserts a nickname.
Definition: peer.hpp:12
peer_info_t peer_info(const uuid_t &uuid)
Return info about peer. If unknown, return default structure.
Definition: peer.cpp:194
headerset_t branch(const std::string &prefix)
Definition: peer.cpp:4
std::string header_value_t
Definition: peer.hpp:18
bool poll(timeout_t timeout=0)
Poll the network for updates, timeout in msec.
Definition: peer.cpp:50
std::vector< uuid_t > waitfor(const nickname_t &nickname, timeout_t timeout=-1)
Wait for a peer of a given nickname to be discovered.
Definition: peer.cpp:148