6 const size_t len = prefix.size();
9 if (hh.first.substr(0,len) != prefix) {
12 ret[hh.first.substr(len)] = hh.second;
19 : m_nick(nickname), m_verbose(verbose), m_zyre(nullptr)
21 m_zyre = zyre_new(m_nick.c_str());
23 std::string s =
"Peer(" + m_nick +
") failed to create zyre";
24 throw std::runtime_error(s);
27 zyre_set_verbose(m_zyre);
29 for (
const auto& header : headers) {
33 header.first.c_str(), header.second.c_str());
34 zyre_set_header(m_zyre, header.first.c_str(),
"%s", header.second.c_str());
38 int rc = zyre_start(m_zyre);
40 throw std::runtime_error(
"failed to start zyre");
45 zio::debug(
"[peer {}]: stop and destroy", m_nick.c_str());
47 zyre_destroy(&m_zyre);
52 const int64_t start = zclock_usecs() / 1000;
55 zpoller_t* poller = zpoller_new(zyre_socket(m_zyre), NULL);
57 void* which = zpoller_wait(poller, timeout);
59 zio::debug(
"[peer {}]: poll timeout", m_nick.c_str());
62 zyre_event_t *
event = zyre_event_new (m_zyre);
64 zio::debug(
"[peer {}]: no zyre event", m_nick.c_str());
67 const char* event_type = zyre_event_type(event);
69 m_nick.c_str(), event_type);
70 zyre_event_print(event);
71 if (streq(event_type,
"ENTER")) {
72 uuid_t uuid = zyre_event_peer_uuid(event);
73 zio::debug(
"[peer {}]: poll ENTER peer \"{}\"",
74 m_nick.c_str(),uuid.c_str());
76 pi.
nick = zyre_event_peer_name(event);
77 zhash_t* hh = zyre_event_headers(event);
78 zlist_t* keys = zhash_keys(hh);
79 void* cursor = zlist_first(keys);
81 void* vptr = zhash_lookup(hh, static_cast<char*>(cursor));
84 pi.headers[key] = val;
85 zio::debug(
"[peer {}]: poll add {} header {}={}",
86 m_nick.c_str(), pi.nick.c_str(),
87 key.c_str(), val.c_str());
88 cursor = zlist_next(keys);
90 m_known_peers[uuid] = pi;
91 zlist_destroy (&keys);
92 zio::debug(
"[peer {}]: poll add \"{}\" ({}), know {}",
93 m_nick.c_str(), pi.nick.c_str(), uuid.c_str(),
94 m_known_peers.size());
98 if (streq(event_type,
"EXIT")) {
99 uuid_t uuid = zyre_event_peer_uuid(event);
100 peerset_t::iterator maybe = m_known_peers.find(uuid);
101 if (maybe != m_known_peers.end()) {
102 m_known_peers.erase(maybe);
104 zio::debug(
"[peer {}]: poll remove {}, know {}",
105 m_nick.c_str(), uuid.c_str(), m_known_peers.size());
108 zyre_event_destroy(&event);
113 zio::debug(
"[peer {}]: poll again with infinite wait",
117 auto timeleft = zclock_usecs() / 1000 - start - timeout;
119 zio::debug(
"[peer {}]: poll again with {} msec left",
120 m_nick.c_str(), timeleft);
124 zio::debug(
"[peer {}]: poll done", m_nick.c_str());
127 zpoller_destroy(&poller);
134 std::vector<uuid_t> ret;
135 for (
const auto& pp : m_known_peers) {
136 zio::debug(
"[peer {}]: check nick '{}' against '{}'",
138 nickname.c_str(), pp.second.nick.c_str());
139 if (pp.second.nick == nickname) {
140 ret.push_back(pp.first);
143 zio::debug(
"[peer {}]: nickmatch found {} instances of {}",
144 m_nick.c_str(), ret.size(), nickname.c_str());
151 const int64_t start = zclock_usecs() / 1000;
152 std::vector<uuid_t> maybe =
nickmatch(nickname);
154 while (maybe.empty()) {
155 zio::debug(
"[peer {}]: waitfor peer \"{}\" to come online",
156 m_nick.c_str(), nickname.c_str());
159 zio::debug(
"[peer {}]: waitfor see \"{}\" after {} ms, have {} match",
160 m_nick.c_str(), nickname.c_str(),
161 zclock_usecs() / 1000 - start,
170 if (zclock_usecs() / 1000 - start >= timeout) {
185 peerset_t::iterator maybe = m_known_peers.find(uuid);
186 return maybe != m_known_peers.end();
192 return m_known_peers;
197 peerset_t::iterator maybe = m_known_peers.find(uuid);
198 if (maybe == m_known_peers.end()) {
199 zio::debug(
"[peer {}]: failed to find peer {} out of {}",
200 m_nick.c_str(), uuid.c_str(), m_known_peers.size());
203 return maybe->second;
209 if (m_verbose and m_zyre) {
211 zyre_set_verbose(m_zyre);
Peer(const nickname_t &nickname, const headerset_t &headers={}, bool verbose=false)
Advertise own nickname and headers.
void drain()
Continually poll until all queued zyre messages are processed.
const peerset_t & peers()
Return known peers as map from UUID to nickname.
const nickname_t nickname()
Get our nickname.
std::string uuid_t
Uniquely identify a peer.
std::string header_key_t
A "header" is on in a set of key/value pairs asserted by a peer.
bool isknown(const uuid_t &uuid)
Return true if peer has been seen ENTER the network and not yet seen to EXIT.
std::map< header_key_t, header_value_t > headerset_t
std::map< uuid_t, peer_info_t > peerset_t
void set_verbose(bool verbose=true)
Turn on verbose debugging of the underlying Zyre actor.
std::vector< uuid_t > nickmatch(const nickname_t &nick)
Return all UUIDs with matching nickname.
int timeout_t
A timeout in milliseconds.
std::string nickname_t
A peer asserts a nickname.
peer_info_t peer_info(const uuid_t &uuid)
Return info about peer. If unknown, return default structure.
headerset_t branch(const std::string &prefix)
std::string header_value_t
bool poll(timeout_t timeout=0)
Poll the network for updates, timeout in msec.
std::vector< uuid_t > waitfor(const nickname_t &nickname, timeout_t timeout=-1)
Wait for a peer of a given nickname to be discovered.