9 std::string make_tcp_address(std::string hostname, 
int port)
    12     ss << 
"tcp://" << hostname << 
":";
    25         zio::debug(
"DirectBinder {}", address.c_str());
    36         if (tcpportnum != 0) {
    37             std::string 
address = make_tcp_address(hostname, tcpportnum);
    41         for (
int port = 49152; port < 65535; ++port) {
    42             std::string 
address = make_tcp_address(hostname, port);
    48                 zio::debug(
"failed to bind({})", address.c_str());
    52         throw std::runtime_error(
"exaused ephemeral ports");
    57     : m_name(name), m_sock(m_ctx , stype), m_hostname(hostname), m_online(false)
    71     zio::debug(
"[port {}] bind default", m_name.c_str());
    78                m_name.c_str(), hostname.c_str(), port);
    80     m_binders.push_back(binder);
    86                m_name.c_str(), address.c_str());
    92     m_connect_addresses.push_back(address);
    97     m_connect_nodeports.push_back(std::make_pair(node,port));
   103         m_sock.
setsockopt(ZMQ_SUBSCRIBE, prefix.c_str(), prefix.size());
   109     std::string key = 
"zio.port." + m_name + 
"." + leafname;
   110     m_headers[key] = value;
   115     std::stringstream 
ss;
   116     std::string comma = 
"";
   118     zio::debug(
"DEBUG: binders: {}", m_binders.size());
   120     for (
auto& binder : m_binders) {
   121         auto address = binder();
   122         ss << comma << address;
   124         m_bound.push_back(address);
   126     std::string addresses = ss.str();
   129     for (
const auto& hh : m_headers) {
   131                    m_name.c_str(), hh.first.c_str(), hh.second.c_str());
   139     if (m_online) { 
return; }
   143         zio::debug(
"[port {}]: going online with {}({}+{}) connects, {} binds",
   145                    m_connect_nodeports.size()+m_connect_addresses.size(),
   146                    m_connect_nodeports.size(),
   147                    m_connect_addresses.size(),
   151     for (
const auto& 
addr : m_connect_addresses) {
   154                        m_name.c_str(), 
addr.c_str());
   156         m_connected.push_back(
addr);
   159     for (
const auto& nh : m_connect_nodeports) {
   162                        m_name.c_str(), nh.first.c_str());
   163         auto uuids = peer.
waitfor(nh.first);
   164         assert(uuids.size());
   167                        m_name.c_str(), uuids.size(), nh.first.c_str());
   169         for (
auto uuid : uuids) {
   172             std::string maybe = pi.
branch(
"zio.port." + nh.second)[
".address"];
   174                 zio::warn(
"[port {}]: found {}:{} ({}) lacking address header",
   175                           m_name.c_str(), nh.first.c_str(), nh.second.c_str(), uuid.c_str());
   178             std::stringstream 
ss(maybe);
   180             while(std::getline(ss, addr, 
' ')) {
   181                 if (addr.empty() or addr[0] == 
' ') 
continue;
   183                     zio::debug(
"[port {}]: connect to {}:{} at {}",
   185                                nh.first.c_str(), nh.second.c_str(), addr.c_str());
   187                 m_connected.push_back(addr);
   197     if (!m_online) 
return;
   200     for (
const auto& 
addr : m_connected) {
   204     for (
const auto &
addr : m_bound) {
   210 static bool needs_codec(
int stype)
   213         stype == ZMQ_SERVER ||
   214         stype == ZMQ_CLIENT ||
   215         stype == ZMQ_RADIO ||
   223                m_name.c_str(), msg.
form().c_str(), msg.
seqno(), msg.
label().c_str());
   226     if (needs_codec(stype)) {
   228         zio::debug(
"[port {}] send single-part {} rid:{}",
   229                    m_name.c_str(), spmsg.
size(), spmsg.routing_id());
   230         auto rc = m_sock.
send(spmsg, zio::send_flags::none);
   233     zio::multipart_t mpmsg = msg.
toparts();
   235                m_name.c_str(), mpmsg.size());
   242                m_name.c_str(), timeout);
   244     int item = 
zio::poll(&items[0], 1, timeout);
   245     if (!item) 
return false;
   248     if (needs_codec(stype)) {
   250         auto res = m_sock.recv(spmsg);
   251         if (!res) 
return false;
   252         zio::debug(
"[port {}] recv single-part {} rid:{}",
   253                    m_name.c_str(), spmsg.
size(), spmsg.routing_id());
   258     zio::debug(
"[port {}] recving multipart", m_name.c_str());
   259     zio::multipart_t mpmsg;
   260     bool ok = mpmsg.recv(m_sock);
   261     if (!ok) 
return false;
 
const char * addr
inproc hangs. no messages ever get received by server. tcp/ipc okay. 
 
zmq_pollitem_t pollitem_t
 
void setsockopt(int option_, T const &optval)
 
std::string label() const
 
size_t send(const void *buf_, size_t len_, int flags_=0)
 
void unbind(std::string const &addr)
 
bool recv(Message &msg, int timeout=-1)
Recieve a message, return false if timeout occurred. 
 
std::map< header_key_t, header_value_t > headerset_t
 
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_=-1)
 
void online(Peer &peer)
Make any previously requested connections. 
 
void offline()
Disconnect and unbind. 
 
multipart_t toparts() const
Serialize self to multipart. 
 
void set_coord(origin_t origin=0, granule_t gran=0)
 
void subscribe(const std::string &prefix="")
Subscribe to a PUB topic. 
 
std::string sock_type_name(int stype)
 
void decode(const message_t &dat)
 
void connect(std::string const &addr)
 
void send(Message &msg)
Send a message. 
 
void bind()
Request a default bind. 
 
size_t size() const ZMQ_NOTHROW
 
void bind(std::string const &addr)
 
void disconnect(std::string const &addr)
 
int sock_type(const socket_t &sock)
Return the ZeroMQ socket type number for the socket. 
 
void connect(const address_t &address)
Request connect to fully qualified ZeroMQ address string. 
 
peer_info_t peer_info(const uuid_t &uuid)
Return info about peer. If unknown, return default structure. 
 
headerset_t do_binds()
Perform any requested binds. 
 
headerset_t branch(const std::string &prefix)
 
Port(const std::string &name, int stype, const std::string &hostname="127.0.0.1")
Create a port of given name and socket type. 
 
Peer at the network to discover peers and advertise self. 
 
void set_header(const std::string &leafname, const std::string &value)
Set an extra port header. 
 
void fromparts(const multipart_t &allparts)
Set self from multipart. Nullifyies routing ID. 
 
std::vector< uuid_t > waitfor(const nickname_t &nickname, timeout_t timeout=-1)
Wait for a peer of a given nickname to be discovered.