9 std::string make_tcp_address(std::string hostname,
int port)
12 ss <<
"tcp://" << hostname <<
":";
25 zio::debug(
"DirectBinder {}", address.c_str());
36 if (tcpportnum != 0) {
37 std::string
address = make_tcp_address(hostname, tcpportnum);
41 for (
int port = 49152; port < 65535; ++port) {
42 std::string
address = make_tcp_address(hostname, port);
48 zio::debug(
"failed to bind({})", address.c_str());
52 throw std::runtime_error(
"exaused ephemeral ports");
57 : m_name(name), m_sock(m_ctx , stype), m_hostname(hostname), m_online(false)
71 zio::debug(
"[port {}] bind default", m_name.c_str());
78 m_name.c_str(), hostname.c_str(), port);
80 m_binders.push_back(binder);
86 m_name.c_str(), address.c_str());
92 m_connect_addresses.push_back(address);
97 m_connect_nodeports.push_back(std::make_pair(node,port));
103 m_sock.
setsockopt(ZMQ_SUBSCRIBE, prefix.c_str(), prefix.size());
109 std::string key =
"zio.port." + m_name +
"." + leafname;
110 m_headers[key] = value;
115 std::stringstream
ss;
116 std::string comma =
"";
118 zio::debug(
"DEBUG: binders: {}", m_binders.size());
120 for (
auto& binder : m_binders) {
121 auto address = binder();
122 ss << comma << address;
124 m_bound.push_back(address);
126 std::string addresses = ss.str();
129 for (
const auto& hh : m_headers) {
131 m_name.c_str(), hh.first.c_str(), hh.second.c_str());
139 if (m_online) {
return; }
143 zio::debug(
"[port {}]: going online with {}({}+{}) connects, {} binds",
145 m_connect_nodeports.size()+m_connect_addresses.size(),
146 m_connect_nodeports.size(),
147 m_connect_addresses.size(),
151 for (
const auto&
addr : m_connect_addresses) {
154 m_name.c_str(),
addr.c_str());
156 m_connected.push_back(
addr);
159 for (
const auto& nh : m_connect_nodeports) {
162 m_name.c_str(), nh.first.c_str());
163 auto uuids = peer.
waitfor(nh.first);
164 assert(uuids.size());
167 m_name.c_str(), uuids.size(), nh.first.c_str());
169 for (
auto uuid : uuids) {
172 std::string maybe = pi.
branch(
"zio.port." + nh.second)[
".address"];
174 zio::warn(
"[port {}]: found {}:{} ({}) lacking address header",
175 m_name.c_str(), nh.first.c_str(), nh.second.c_str(), uuid.c_str());
178 std::stringstream
ss(maybe);
180 while(std::getline(ss, addr,
' ')) {
181 if (addr.empty() or addr[0] ==
' ')
continue;
183 zio::debug(
"[port {}]: connect to {}:{} at {}",
185 nh.first.c_str(), nh.second.c_str(), addr.c_str());
187 m_connected.push_back(addr);
197 if (!m_online)
return;
200 for (
const auto&
addr : m_connected) {
204 for (
const auto &
addr : m_bound) {
210 static bool needs_codec(
int stype)
213 stype == ZMQ_SERVER ||
214 stype == ZMQ_CLIENT ||
215 stype == ZMQ_RADIO ||
223 m_name.c_str(), msg.
form().c_str(), msg.
seqno(), msg.
label().c_str());
226 if (needs_codec(stype)) {
228 zio::debug(
"[port {}] send single-part {} rid:{}",
229 m_name.c_str(), spmsg.
size(), spmsg.routing_id());
230 auto rc = m_sock.
send(spmsg, zio::send_flags::none);
233 zio::multipart_t mpmsg = msg.
toparts();
235 m_name.c_str(), mpmsg.size());
242 m_name.c_str(), timeout);
244 int item =
zio::poll(&items[0], 1, timeout);
245 if (!item)
return false;
248 if (needs_codec(stype)) {
250 auto res = m_sock.recv(spmsg);
251 if (!res)
return false;
252 zio::debug(
"[port {}] recv single-part {} rid:{}",
253 m_name.c_str(), spmsg.
size(), spmsg.routing_id());
258 zio::debug(
"[port {}] recving multipart", m_name.c_str());
259 zio::multipart_t mpmsg;
260 bool ok = mpmsg.recv(m_sock);
261 if (!ok)
return false;
const char * addr
inproc hangs. no messages ever get received by server. tcp/ipc okay.
zmq_pollitem_t pollitem_t
void setsockopt(int option_, T const &optval)
std::string label() const
size_t send(const void *buf_, size_t len_, int flags_=0)
void unbind(std::string const &addr)
bool recv(Message &msg, int timeout=-1)
Recieve a message, return false if timeout occurred.
std::map< header_key_t, header_value_t > headerset_t
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_=-1)
void online(Peer &peer)
Make any previously requested connections.
void offline()
Disconnect and unbind.
multipart_t toparts() const
Serialize self to multipart.
void set_coord(origin_t origin=0, granule_t gran=0)
void subscribe(const std::string &prefix="")
Subscribe to a PUB topic.
std::string sock_type_name(int stype)
void decode(const message_t &dat)
void connect(std::string const &addr)
void send(Message &msg)
Send a message.
void bind()
Request a default bind.
size_t size() const ZMQ_NOTHROW
void bind(std::string const &addr)
void disconnect(std::string const &addr)
int sock_type(const socket_t &sock)
Return the ZeroMQ socket type number for the socket.
void connect(const address_t &address)
Request connect to fully qualified ZeroMQ address string.
peer_info_t peer_info(const uuid_t &uuid)
Return info about peer. If unknown, return default structure.
headerset_t do_binds()
Perform any requested binds.
headerset_t branch(const std::string &prefix)
Port(const std::string &name, int stype, const std::string &hostname="127.0.0.1")
Create a port of given name and socket type.
Peer at the network to discover peers and advertise self.
void set_header(const std::string &leafname, const std::string &value)
Set an extra port header.
void fromparts(const multipart_t &allparts)
Set self from multipart. Nullifyies routing ID.
std::vector< uuid_t > waitfor(const nickname_t &nickname, timeout_t timeout=-1)
Wait for a peer of a given nickname to be discovered.