ZIO
Python and C++ interface to ZeroMQ and Zyre
port.cpp
Go to the documentation of this file.
1 #include "zio/port.hpp"
2 #include "zio/logging.hpp"
3 
4 #include <sstream>
5 #include <algorithm>
6 #include <string>
7 
8 static
9 std::string make_tcp_address(std::string hostname, int port)
10 {
11  std::stringstream ss;
12  ss << "tcp://" << hostname << ":";
13  if (port)
14  ss << port;
15  else
16  ss << "*";
17  return ss.str();
18 }
19 
20 struct DirectBinder {
22  std::string address;
23  std::string operator()() {
24  sock.bind(address);
25  zio::debug("DirectBinder {}", address.c_str());
26  return address;
27  }
28 };
29 
30 
33  std::string hostname;
34  int tcpportnum{0};
35  std::string operator()() {
36  if (tcpportnum != 0) {
37  std::string address = make_tcp_address(hostname, tcpportnum);
38  sock.bind(address);
39  return address;
40  }
41  for (int port = 49152; port < 65535; ++port) {
42  std::string address = make_tcp_address(hostname, port);
43  try {
44  sock.bind(address);
45  return address;
46  }
47  catch (zio::error_t& e) {
48  zio::debug("failed to bind({})", address.c_str());
49  continue;
50  }
51  }
52  throw std::runtime_error("exaused ephemeral ports");
53  }
54 };
55 
56 zio::Port::Port(const std::string& name, int stype, const std::string& hostname)
57  : m_name(name), m_sock(m_ctx , stype), m_hostname(hostname), m_online(false)
58 {
59 }
60 
62 {
63  if (m_online) { // fixme: add state checks throughout
64  offline();
65  }
66 }
67 
68 
70 {
71  zio::debug("[port {}] bind default", m_name.c_str());
72  bind(m_hostname, 0);
73 }
74 
75 void zio::Port::bind(const std::string& hostname, int port)
76 {
77  zio::debug("[port {}]: bind host/port: {}:{}",
78  m_name.c_str(), hostname.c_str(), port);
79  HostPortBinder binder{m_sock, hostname, 0};
80  m_binders.push_back(binder);
81 }
82 
83 void zio::Port::bind(const address_t& address)
84 {
85  zio::debug("[port {}]: bind address: {}",
86  m_name.c_str(), address.c_str());
87  m_binders.push_back(DirectBinder{m_sock, address});
88 }
89 
90 void zio::Port::connect(const address_t& address)
91 {
92  m_connect_addresses.push_back(address);
93 }
94 
95 void zio::Port::connect(const nodename_t& node, const portname_t& port)
96 {
97  m_connect_nodeports.push_back(std::make_pair(node,port));
98 }
99 
100 void zio::Port::subscribe(const std::string& prefix)
101 {
102  if (zio::sock_type(m_sock) == ZMQ_SUB)
103  m_sock.setsockopt(ZMQ_SUBSCRIBE, prefix.c_str(), prefix.size());
104 }
105 
106 
107 void zio::Port::set_header(const std::string& leafname, const std::string& value)
108 {
109  std::string key = "zio.port." + m_name + "." + leafname;
110  m_headers[key] = value;
111 }
112 
114 {
115  std::stringstream ss;
116  std::string comma = "";
117 
118  zio::debug("DEBUG: binders: {}", m_binders.size());
119 
120  for (auto& binder : m_binders) {
121  auto address = binder();
122  ss << comma << address;
123  comma = " ";
124  m_bound.push_back(address);
125  }
126  std::string addresses = ss.str();
127  set_header("address", addresses);
128  set_header("socket", zio::sock_type_name(zio::sock_type(m_sock)));
129  for (const auto& hh : m_headers) {
130  zio::debug("[port {}]: {} = {}",
131  m_name.c_str(), hh.first.c_str(), hh.second.c_str());
132  }
133 
134  return m_headers;
135 }
136 
138 {
139  if (m_online) { return; }
140  m_online = true;
141 
142  if (m_verbose) {
143  zio::debug("[port {}]: going online with {}({}+{}) connects, {} binds",
144  m_name.c_str(),
145  m_connect_nodeports.size()+m_connect_addresses.size(),
146  m_connect_nodeports.size(),
147  m_connect_addresses.size(),
148  m_binders.size());
149  }
150 
151  for (const auto& addr : m_connect_addresses) {
152  if (m_verbose)
153  zio::debug("[port {}]: connect to {}",
154  m_name.c_str(), addr.c_str());
155  m_sock.connect(addr);
156  m_connected.push_back(addr);
157  }
158 
159  for (const auto& nh : m_connect_nodeports) {
160  if (m_verbose)
161  zio::debug("[port {}]: wait for {}",
162  m_name.c_str(), nh.first.c_str());
163  auto uuids = peer.waitfor(nh.first);
164  assert(uuids.size());
165  if (m_verbose)
166  zio::debug("[port {}]: {} peers match {}",
167  m_name.c_str(), uuids.size(), nh.first.c_str());
168 
169  for (auto uuid : uuids) {
170  auto pi = peer.peer_info(uuid);
171 
172  std::string maybe = pi.branch("zio.port." + nh.second)[".address"];
173  if (maybe.empty()) {
174  zio::warn("[port {}]: found {}:{} ({}) lacking address header",
175  m_name.c_str(), nh.first.c_str(), nh.second.c_str(), uuid.c_str());
176  continue;
177  }
178  std::stringstream ss(maybe);
179  std::string addr;
180  while(std::getline(ss, addr, ' ')) {
181  if (addr.empty() or addr[0] == ' ') continue;
182  if (m_verbose)
183  zio::debug("[port {}]: connect to {}:{} at {}",
184  m_name.c_str(),
185  nh.first.c_str(), nh.second.c_str(), addr.c_str());
186  m_sock.connect(addr);
187  m_connected.push_back(addr);
188  }
189  }
190  }
191 
192 }
193 
194 
196 {
197  if (!m_online) return;
198  m_online = false;
199 
200  for (const auto& addr : m_connected) {
201  m_sock.disconnect(addr);
202  }
203 
204  for (const auto &addr : m_bound) {
205  m_sock.unbind(addr);
206  }
207  m_connected.clear();
208 }
209 
210 static bool needs_codec(int stype)
211 {
212  return
213  stype == ZMQ_SERVER ||
214  stype == ZMQ_CLIENT ||
215  stype == ZMQ_RADIO ||
216  stype == ZMQ_DISH;
217 }
218 
219 
221 {
222  zio::debug("[port {}] send {} {} {}",
223  m_name.c_str(), msg.form().c_str(), msg.seqno(), msg.label().c_str());
224  msg.set_coord(m_origin);
225  int stype = zio::sock_type(m_sock);
226  if (needs_codec(stype)) {
227  zio::message_t spmsg = msg.encode();
228  zio::debug("[port {}] send single-part {} rid:{}",
229  m_name.c_str(), spmsg.size(), spmsg.routing_id());
230  auto rc = m_sock.send(spmsg, zio::send_flags::none);
231  return;
232  }
233  zio::multipart_t mpmsg = msg.toparts();
234  zio::debug("[port {}] send multi-part {}",
235  m_name.c_str(), mpmsg.size());
236  mpmsg.send(m_sock);
237 }
238 
240 {
241  zio::debug("[port {}] polling for {}",
242  m_name.c_str(), timeout);
243  zio::pollitem_t items[] = {{m_sock, 0, ZMQ_POLLIN, 0}};
244  int item = zio::poll(&items[0], 1, timeout);
245  if (!item) return false;
246 
247  int stype = zio::sock_type(m_sock);
248  if (needs_codec(stype)) {
249  zio::message_t spmsg;
250  auto res = m_sock.recv(spmsg);
251  if (!res) return false;
252  zio::debug("[port {}] recv single-part {} rid:{}",
253  m_name.c_str(), spmsg.size(), spmsg.routing_id());
254  msg.decode(spmsg);
255  return true;
256  }
257 
258  zio::debug("[port {}] recving multipart", m_name.c_str());
259  zio::multipart_t mpmsg;
260  bool ok = mpmsg.recv(m_sock);
261  if (!ok) return false;
262  msg.fromparts(mpmsg);
263 
264  return true;
265 }
266 
267 
zio::socket_t & sock
Definition: port.cpp:21
std::string form() const
Definition: message.cpp:90
const char * addr
inproc hangs. no messages ever get received by server. tcp/ipc okay.
Definition: test_tcs.cpp:16
zmq_pollitem_t pollitem_t
Definition: zmq.hpp:243
void setsockopt(int option_, T const &optval)
Definition: zmq.hpp:1279
std::string label() const
Definition: message.cpp:102
size_t send(const void *buf_, size_t len_, int flags_=0)
Definition: zmq.hpp:1345
std::string portname_t
Definition: port.hpp:31
void unbind(std::string const &addr)
Definition: zmq.hpp:1315
seqno_t seqno() const
Definition: message.hpp:86
message_t encode() const
Definition: message.cpp:118
bool recv(Message &msg, int timeout=-1)
Recieve a message, return false if timeout occurred.
Definition: port.cpp:239
name
Definition: setup.py:4
std::map< header_key_t, header_value_t > headerset_t
Definition: peer.hpp:22
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_=-1)
Definition: zmq.hpp:259
void online(Peer &peer)
Make any previously requested connections.
Definition: port.cpp:137
void offline()
Disconnect and unbind.
Definition: port.cpp:195
multipart_t toparts() const
Serialize self to multipart.
Definition: message.cpp:134
std::string address
Definition: port.cpp:22
zio::socket_t & sock
Definition: port.cpp:32
std::string nodename_t
Definition: port.hpp:30
void set_coord(origin_t origin=0, granule_t gran=0)
Definition: message.cpp:106
std::string operator()()
Definition: port.cpp:23
void subscribe(const std::string &prefix="")
Subscribe to a PUB topic.
Definition: port.cpp:100
std::string sock_type_name(int stype)
Definition: interned.cpp:11
void decode(const message_t &dat)
Definition: message.cpp:128
void connect(std::string const &addr)
Definition: zmq.hpp:1324
void send(Message &msg)
Send a message.
Definition: port.cpp:220
void bind()
Request a default bind.
Definition: port.cpp:69
std::string hostname
Definition: port.cpp:33
size_t size() const ZMQ_NOTHROW
Definition: zmq.hpp:488
void bind(std::string const &addr)
Definition: zmq.hpp:1306
std::string operator()()
Definition: port.cpp:35
void disconnect(std::string const &addr)
Definition: zmq.hpp:1333
int sock_type(const socket_t &sock)
Return the ZeroMQ socket type number for the socket.
Definition: interned.cpp:6
void connect(const address_t &address)
Request connect to fully qualified ZeroMQ address string.
Definition: port.cpp:90
std::string address_t
Definition: port.hpp:29
peer_info_t peer_info(const uuid_t &uuid)
Return info about peer. If unknown, return default structure.
Definition: peer.cpp:194
headerset_t do_binds()
Perform any requested binds.
Definition: port.cpp:113
headerset_t branch(const std::string &prefix)
Definition: peer.cpp:4
a ZIO message
Definition: message.hpp:59
Port(const std::string &name, int stype, const std::string &hostname="127.0.0.1")
Create a port of given name and socket type.
Definition: port.cpp:56
Peer at the network to discover peers and advertise self.
Definition: peer.hpp:44
void set_header(const std::string &leafname, const std::string &value)
Set an extra port header.
Definition: port.cpp:107
void fromparts(const multipart_t &allparts)
Set self from multipart. Nullifyies routing ID.
~Port()
Definition: port.cpp:61
std::vector< uuid_t > waitfor(const nickname_t &nickname, timeout_t timeout=-1)
Wait for a peer of a given nickname to be discovered.
Definition: peer.cpp:148