A tour of CPPZMQ, the C++ bindings to libzmq

Table of Contents

Introduction

The ZeroMQ project cppzmq provides C++ bindings for libzmq. With them we can exercise the power of ZeroMQ with idiomatic, modern C++. The bindings provide type safety, exception-based error reporting, the RAII approach to resource management. Most cppzmq functionality will work with C++11 and even older and here we consider features requiring C++17 and we ignore deprecated interfaces.

This document

In this document, I tour the main parts of the cppzmq package so I can learn them better. I write down what I learn along the way so maybe others can learn something too.

Brevity is attempted and so the general concepts of ZeroMQ are assumed to already be understood. If you, the curious reader, have not yet done so please do yourself a big favor and read the entire ZeroMQ Guide. Read it at least twice. I think gets even better each time I read it as I find something new and useful that I had previously missed.

This tour may not be needed by everyone. We may make effective use of cppzmq simply by reading its header files and maybe checking the unit tests. In some cases we may also need to refer to the libzmq API documentation which should be installed on your system as Unix man pages (man zmq to start) and which are available online. This document attempts to distill what was learned in the process of just such a reading. I hope it may serve as a gentle narrative or tour that will help others quickly feel comfortable developing with cppzmq.

The source for this document may be found at https://github.com/brettviren/cppzmq-tour. Corrections, particularly in the form of pull requests, are welcome.

Project contents

The cppzmq package core is a header-only library providing the C++ namespace zmq::. Currently the library consists of two headers and here is a summary of what they provide:

zmq.hpp
(single part) message, context, buffer, socket, monitor, poller
zmq_addon.hpp
multipart and related functions and another form of poller

Most of the goodies are in zmq.hpp and we go through them in order they appear in that file. The goodies in zmq_addon.hpp are just as good and I feel there's no need for a separation. All my cppzmq projects benefit from both.

The package also provides substantial unit tests which can be mined for programming examples. The package README file gives instructions for formal installation but one very simple option is copying the zmq.hpp and zmq_addon.hpp files right into our own package.

Messages

The message_t class from zmq.hpp is a C++ facade over an opaque zmq_msg_t from libzmq. It is conceptually equivalent to the zframe(3) from CZMQ, the high level C bindings to libzmq. In ZeroMQ terms, message_t is a single-part message in that it holds a contiguous block of memory. Later, (sections Socket and Multipart) the idea of multiple single-part messages and multipart messages are covered.

Creating messages

Here are some examples of constructing a message_t:

// Default constructs an empty message of size zero.
zmq::message_t msg;

// Have message allocate some memory internally.
const size_t size = 1024;
zmq::message_t msg(size);

// Initialize internal memory with a copy of external data.
const char bytes[s] = {0};
zmq::message_t msg(bytes, size);

// Or, from string literal (see also str_buffer).
zmq::message_t msg("hello world!", 12);

// Initialize through iteration on a container.
std::vector<int> ivec = {1,2,3};
zmq::message_t msg(ivec.begin(), ivec.end());
// or more simply
zmq::message_t msg(ivec);

// Zero-copy provision with custom deallocation callback.
// Wraps zmq_msg_init_data(3), see man page for details.
zmq::message_t msg(byte, size, myfree, nullptr);

// Move/swap the data of one message into a new one
zmq::message_t msg2(msg);
// or
zmq::message_t msg2 = msg;

Resetting the message

We can also "rebuild" an existing message. The methods to do this mirror the constructor prototypes:

// Empty the message
msg.rebuild();

// Allocate a new size
msg.rebuild(size);

// Allocate and set with a copy
msg.rebuild(bytes, size);

Not included here is a zero-copy version similar to the zero-copy constructor shown above.

Accessing data in a message

The message is a block of bytes and we can get at those bytes in a few ways:

// Size of the data in bytes 
size_t msg.size();

// Low level, type free access
const void* vptr = msg.data();

// As above but type cast
const int* iptr = msg.data<int>();

// With a copy into a non-const collection
std::vector ints(msg.data<int>(), msg.size()/sizeof(int));

// If the data makes sense as a string.
std::string str = msg.to_string();

// And we can get a zero copy view.
std::string_view strv = msg.to_string_view();

// An artistic string representation of the contents.
std::cout << msg.str() << std::endl;

Message metadata

Here we have a few details that are specific to certain socket types. We still haven't gotten to sockets, but they are coming. You may feel comfortable to skip this section for now.

SERVER Routing ID

When we receive and send messages via a socket of type SERVER our application must manage a "routing ID" in order to associate the messages with a remote CLIENT socket. We do this by getting and setting this ID from/on the message as:

// After we receive a message, remember its routing ID:
uint32_t rid = msg.routing_id();

// Later, just before sending, we make sure to set the ID:
zmq::message_t msg2;
msg2.set_routing_id(rid);

In this example, two messages are used. If the received message is reused for the subsequent send, and we have not rebuilt it, the routing ID is retained and no explicit get/set is required.

Broadcast Groups

The RADIO/DISH sockets have a concept similar to SERVER/CLIENT routing ID and PUB/SUB topics which is that of a named group to which messages are associated. This group name may be set on and retrieved from the message.

// 16 byte max including null '\0' terminator char
const char* grp = "hello world!";
msg.set_group(grp);

// Get the group name.
grp = msg.group();

Metadata Properties

More generally, messages may carry per-connection metadata "properties". The key and values for these are of type string. We'll describe how these may be set through socket options later but for now here is an example of how properties may be retrieved.

// Get system property 
const char* stype = msg.gets("Socket-Type");

// Get an application property
const char* color = msg.gets("X-Favorite-Color");

This gets() method wraps zmq_msg_gets(3) so see that man page for details.

Buffers

In cppzmq, a buffer is like a message, but different. It allows us another way to transmit data without creating a message or to gives us easier ways to create a message from our data.

Buffer types

There are two variants of buffers, mutable_buffer and const_buffer. As the name suggests, the data given to the first may be modified while the data given to the second may not be.

Buffer construction

Either type of buffer may be constructed directly as in this example:

// Empty 
mutable_buffer mbuf();
const_buffer cbuf();

// Fodder data
const size_t size = 1024;
void* ptr = malloc(size);
const void* cptr = ptr;

// With data
mutable_buffer mbuf(ptr, size);
const_buffer cbuf(cptr, size);

We are also given a variety of functions named buffer() to construct buffers in useful ways. We give some examples next and will see later some examples of how to use buffers in the sections Socket and Multipart.

// Fodder data from previous example

// Basic construction
zmq::mutable_buffer mbuf = zmq::buffer(ptr, size);
zmq::const_buffer cbuf = zmq::buffer(cptr, size);

// C array. 
int data[1024];
mbuf = zmq::buffer(data);

// C++ vector, std::array is similar
std::vector<int> data(size);
mbuf = zmq::buffer(data);

// C++ string and string literal
std::string str = "hello world";
mbuf = zmq::buffer(str);
cbuf = zmq::str_buffer("hello world");

Buffer operations

Once constructed, buffers are rather simple but we can operate on them to narrow their view of the underlying data.

// Truncate tail half, same works with const_buffer
mbuf = zmq::buffer(mbuf, mbuf.size()/2);
// Truncate front half, etc cbuf.
mbuf += mbuf.size()/2;

Why do this narrowing? One very useful pattern is for an application to take some action based on the prefix or postfix of a message. This narrowing can be performed after this information is used and the remainder can be forwarded to an output socket. No copying needed.

Access buffer data

Unlike a message, a buffer has only a basic pair of methods to get back the original data and its size, reflecting any narrowing that occurred.

void *vptr = mbuf.data();
size_t size = mbuf.size();

And, that's about it.

Context

The context_t from zmq.hpp embodies an opaque libzmq context such as created by zmq_ctx_new(3). A context is used by ZeroMQ to collect and manage a set of sockets which the application creates on/in the context. The context is thread safe (unlike some sockets) and may be shared between threads without concern for locking at the application level.

Creating a context

Almost always we create the default context:

zmq::context_t ctx;

A move constructor is also available. After construction, various context options may be set and queried and cppzmq provides ctx.set() and ctx.get() to do this. I have yet to use them but we may check zmq_ctx_set(3) for anything interesting.

Context life cycle

We can not copy a context but we can move. Typically, we will construct it, keep it alive as long as we need the sockets that we have created with it and if we need it in some other code context we may pass it by reference. Here is a contrived example.

void run_app(zmq::context_t& ctx);

void main()
{
    zmq::context_t ctx;
    run_app(ctx);
}

In most applications, we will let the context_t destruct and that will tear down our sockets. We may also tear down even sooner:

// Cease any blocking operations in progress.
ctx.shutdown();

// Do a shutdown, if needed and destroy the context.
ctx.close();

Socket

The heart of ZeroMQ is the socket and for it cppzmq supplies us with the class socket_t. There exists too much useful information on the intriguing variety of ZeroMQ sockets to repeat here. A concise and definitive source is zmq_socket(3). Really, read that carefully as it answers a large fraction of questions I have had and see asked by others.

Socket life cycle

To be useful, sockets must be created with a context (which we covered in section Context) and a socket type identifier. In libzmq the type is identified with an integer, usually as provided by a CPP macro like ZMQ_PUB. In cppzmq (in C++11) we may still use these macros or bare integers or we may use an enum class socket_type.

using namespace zmq;
context_t ctx;
socket_t pub(ctx, ZMQ_PUB);
socket_t sub(ctx, socket_type::sub);

A default socket may be constructed and later swap guts with another socket through a move assignment. A socket may also be move-constructed. But we can not copy a socket.

Not just for cppzmq, we must take care that all ZeroMQ sockets besides SERVER, CLIENT, RADIO and DISH must not be used from any thread other than the one in which they were created. Some caveats apply but best thing is just don't do it.

When a socket destructs or if its socket_t::close() method is explicitly called, the underlying libzmq socket will be destroyed via zmq_close(3).

Sending

We have several ways to send with cppzmq and several more which are deprecated. Here, let's focus on the preferred methods. The choice is to pass a message_t by lvalue reference or by rvalue reference or to pass a const_buffer by value In all cases we must also provide something called "send flags" which we will cover in a little bit

zmq::message_t msg = ...;

// Pass by reference
auto res = sock.send(msg, zmq::send_flags::none);

// Pass by move
auto res = sock.send(std::move(msg), zmq::send_flags::none);

// Pass by buffer
auto res = sock.send(zmq::str_buffer("hello world"), zmq::send_flags::none);

The first two call zmq_msg_send(3) and the last calls msg_send(3) so see those man pages for any nitty gritty details you may want.

Send flags

ZeroMQ accepts a few "hints" on how it is to treat a message when sending. In cppzmq we may provide these with the enum class called zmq::send_flags. It is rare that I use anything other than zmq::send_flags::none, as shown in the example, but two more are available:

dontwait
if the send may block instead return immediately with error EAGAIN
sndmore
the message is to be sent together with a following message

Details of the applicability and meaning of these flags are found in zmq_msg_send(3).

Send results

That auto res holding the return of a send() in the example above is a zmq::send_result_t and that is a std::optional which may hold a size_t giving the number of bytes sent or nothing if EAGAIN error occurred and that only happens with a dontwait flag (see above).

Regardless of the send flag used we must save the send result in a variable because the send() methods are compiled marked with the C++ attribute [[nodiscard]]. We can be lazy and then do nothing with the res variable, but we should be more rigorous and compile with -Wunused (maybe via -Wall) so we can be told when we fail to use the res send result. Let's be even better and compile with -Werror to really force us to do something with it.

Regardless of what send flag we use, any other error that occurs will lead to a throw of the zmq::error_t exception so we do not require any special compiler flags to be sure all other errors can not go silently unchecked.

One comment to give my preference. In most cases we will use none. The API would be friendlier to us if none was set as a default. However, that simpler call signature was already taken by other (now deprecated) versions of send(). Maybe in a future release, at the cost of a breaking change, this friendliness can be added!

Send polling

At times our application may send messages faster than downstream applications can receive. Eventually, our socket's internal buffer will reach its "high water mark" (HWM) and enter what is called the "mute" state. What happens next depends on the socket types used and is explained in nice detail in zmq_socket(3). We may elect to let that built-in behavior handle the issue or we may develop our application to be more proactive. For that, we can learn if the next message will tip our socket into "mute" step by polling it prior to output (pollout). A pollout is maybe not so commonly used compared to its opposite (pollin) which we will touch on in the next section and we will revisit polling in more detail in section Poller.

Receiving

The opposite to sending to a socket is receiving from a socket. We may do this with recv(). The misspelling of this verb has a long history in network programming and ZeroMQ and cppzmq cheerfully continues it. Like with send(), we have one recv() method for a message one for a buffer and for both we give recv flags.

// Fill a message passed by reference
auto res = sock.recv(msg, zmq::recv_flags::none);

// Fill a suitably pre-sized mutable_buffer
auto res = sock.recv(buf, zmq::recv_flags::none);

These two examples correspond to the low-level zmq_msg_recv(3) and zmq_recv(3) respectively. When using the buffer to receive the data, take care that it has sufficient capacity to hold the expected message. The care you must take pays back in avoiding a copy.

Receive flags

Like send(), recv() takes flags but with this method we may often want to specify a flag besides none. There is however only one other choice:

dontwait
if the recv may block instead return immediately with error EAGAIN

This flag can be useful if our application wants to do a "quick check" to see if a message is waiting, and to receive it. If no message is sitting in the socket's input queue, the recv() will return immediately. If we use none then recv() may, in principle, wait until the end of the universe before returning.

Receive results

Also like send(), our recv() returns a std::optional result which is empty if EAGAIN was returned by libzmq. Otherwise it returns the number of bytes received. And likewise, this return value also must be saved to a variable. The same comments about the importance to check this value as describe in the Send results section apply here.

When we recv() with message_t, a non-empty receive result will hold the size in bytes of the message. OTOH, when using a buffer, it is extra important to check the receive result. If non-empty it holds a zmq::recv_buffer_size which is a little struct holding two size attributes. The size says how much data we received and the untruncated_size tells us how much data ZeroMQ wanted to give us. If the two values differ we know our application failed to provide a large enough buffer. Oops.

Receive polling

Depending on the receive flags we can either assure an immediate return from recv() or we may risk it never returning. That's a tough dichotomy to live with. Thankfully we can assure that the call waits at most some time in between those extremes using receive polling. This is discussed more below in the section Poller.

Socket options

Usually we develop an application that makes some ZeroMQ sockets, do our bind()/connect() and some send()/recv() and that's all we need for a lot of fun. There's not much detail to worry about. Sometimes we have a tough problem that needs something special and there's very likely a way that past ZeroMQ community members have found a solution. We then just need to find and set the right socket options.

The rather long list of possible options (85 counted today) are given in zmq_setsockopt(3). There they have names like ZMQ_IMMEDIATE or ZMQ_ROUTING_ID which are CPP macros expanding to some integer. For each, cppzmq creates a type tag to use with friendly get()/set() methods on socket_t.

sock.set(zmq::sockopt::immediate, false);
sock.set(zmq::sockopt::routing_id, "me");
auto rid = sock.get(zmq::sockopt::routing_id);

We will still need to carefully read zmq_setsockopt to discover and understand what options may help us, but then applying or querying them with a cppzmq socket is simple.

Socket properties

One special socket option lets us set socket properties. These can be retrieved from messages that pass through the socket as described above in Message metadata.

sock.set(zmq::sockopt::metadata, zmq::str_buffer("X-color:purple");
auto res = sock.recv(msg, zmq::recv_flags::none);
std::string val = msg.gets("X-color");
assert(val == "purple");

Socket references

Sockets in cppzmq can not be copied (they can be moved) while it is usual that we want various parts of our application code to share access to the same socket. We can pass around an lvalue reference to our socket but that is not always possible or convenient.

To help with this, we are given a zmq::socket_ref which refers to but does not own a socket. With a socket ref our code can do almost everything it would do with a full socket object and it can know if the underlying socket has been closed.

zmq::socket_t sock(ctx, zmq::socket_type::pub);
zmq::socket_ref sref = sock;

// pass ref by value (copy)
do_some_socket_stuff(sref);

// Check if our socket is still there
if (sref == nullptr) {
    respond_to_closure();
}

// Do stuff with collections of socket refs
std::unordered_set<zmq::socket_ref> bag_of_socks;
bag_of_socks.insert(sref);
for (auto s : bag_of_socks) {
    auto res = s.send(bcast, zmq::send_flags::none);
}

Socket handle

Our nice cppzmq socket is a facade over an opaque socket object from libzmq. Rarely do we care about that but there may be cases where we do. For example, if we want to directly call some libzmq function or maybe inter-operate with CZMQ code, we can do so by getting the underlying libzmq socket.

zmq::socket_t sock = ...;
void* libzmq_socket = sock.handle();
// use libzmq to bind.  We would probably never do this as we'd prefer
// to use sock.bind(), but it shows the possibility.
int port = zmq_bind(libzmq_socket, "tcp://*:*");

// Use a CZMQ socket.  We might do something this if we use nice CZMQ
// CLASS like Zyre!
zsock_t czmq_socket = zsock_new_pub("tcp://*:*");
zmq::socket_ref sock(zmq::from_handle, zsock_resolve(czmq_socket));

Most of the time, let's ignore we have a "handle" on the libzmq socket and rest assured that if we ever need it, it will be there.

Monitor

ZeroMQ gives us great power and all it asks is that we stay out of its way. To nag ZeroMQ to tell us every detail about its internal operation is also to slow it down and impede its job. We strive then to relax and use ZeroMQ sockets as they are intended, a sink or a source of messages to and from the cosmos.

However, and zen platitudes aside, sometimes we must get uptight and worry about what our trusty ZeroMQ friend is busy doing. For that, cppzmq provides us a socket monitor, spelled zmq::monitor_t.

The way for our application to make use of a monitor_t is to subclass it and implement some of the many monitor_t:on_event_*() virtual methods. Each such method corresponds to one of the event types listed in zmq_socket_monitor(3). We may also limit the events that our monitor reacts to by giving it a list of events as a bitmap.

class connect_monitor_t : public zmq::monitor_t {
public:
    void on_event_connected(const zmq_event_t& event,
                            const char* addr) override
    {
        std::cout << "got connection from " << addr << std::endl;
    }    
};

// elsewhere
zmq::socket_t sock = ...;
const int events = ZMQ_EVENT_CONNECTED;
// Monitor sock using the given transport for internal communication
connect_monitor_t mon(sock, "inproc://conmon", events);
// mon runs forever....

// Now, try it a different way:
connect_monitor_t mon2;
// here, default is ZMQ_EVENT_ALL
mon2.init(sock, "inproc://conmon2");
// init returns and we poll when we want
if (! mon2.check_event(100)) {
    std::cout << "timeout" << std::endl;
}

In this example, the construction of mon with arguments tells it to run forever. This might make the rest of the code in our application jealous so it would have been better were we to put it running in its own thread. For mon2 we default construct and then after an explicit init() we may check for any activity with a timeout. In this case we ask for all events but since our connect_monitor_t implements but one callback, all the other events result in silent no-op calls to the base class on_event_*() methods.

There is no queue monitoring.

I have seen people asking how to monitor the number of messages waiting in the input or output queue of a socket. I've sometimes thought I absolutely needed this information myself. However, this information is not available for monitoring.

I have come to understand this choice is due to a few reasons. First, to provide these values would require additional processing which would degrade the performance of ZeroMQ. Second, by the time the application gets the answer the value has long gone stale, particularly in the case of a high message rates. Third, there is not much use in an application knowing this information in the first place because if the socket is not in mute state then all is well, if it is in mute, the application can tell in ways described above and more next in section Poller. Forth, there is no forth.

In understanding my own motives to ask for this and trying to understand others, I conclude the design choice is correct and the requests are really uncovering examples of the X-Y problem. There's really some problem and we think peaking under ZeroMQ's skirts will solve it, but the real problem may be something else and there are other solutions.

But, if one tosses aside these words as absurd ramblings, then we may always bolt on some application-managed message queues, eg as simple std::deque. We may then feed a send() and drain a recv() as fast as possible and monitor the .size() of our application-level queues. If there is concern that .size() doesn't count what ZeroMQ holds we may set the socket HWM to, say a value of 1. If you follow this approach, please benchmark throughput, latency and memory usage with and without application buffers and let me know the results.

Poller

In several places above we have alluded to something called polling. It was even used, if maybe not noticed, in the example in the Monitor section when we call check_event(100). This call takes at most 100 milliseconds to return. If an event is waiting or arrives within that timeout, it will return even sooner. Had we used a timeout of -1 the method will never return if no new monitored events were raised.

This act of waiting for an event with a finite or infinite timeout is generally termed polling. Most often, as with check_event() we poll for the event that a message is ready to recv(). This is called pollin ("poll in") or in libzmq spelling ZMQ_POLLIN. Less common, but important for robust applications is to poll for the ability to send. This is called pollout or ZMQ_POLLOUT.

A "poller" then helps application code to respond to a successful poll or to know that instead the poll timed out. In cppzmq we have the zmq::poller_t in zmq.hpp and the active_poller_t from zmq_addon.hpp.

zmq::poller_t

In the following example, we construct two poller_t instances, one to poll on input and one to poll on output add some sockets and their associated events (pollin/pollout). We then use the pollers to wait up to a timeout. Depending on the return, our application reacts.

// We could combine the pollers but here keep input and output polling
// separate.
zmq::poller_t<> in_poller, out_poller;
// Our application has two input sockets and one output.
in_poller.add(input_socket1, zmq::event_flags::pollout);
in_poller.add(input_socket2, zmq::event_flags::pollout);
out_poller.add(output_socket, zmq::event_flags::pollout);

const std::chrono::milliseconds timeout{100};
std::vector<zio::poller_event<>> in_events(2);
std::vector<zio::poller_event<>> out_events(1);
while (true) {
    const auto nin = in_poller.wait_all(in_events, timeout);
    if (!nin) {
        std::cout << "input timeout, try again" << std::endl;
        continue;
    }
    for (int ind=0; ind<nin; ++ind) {
        zmq::message_t msg;
        auto rres = in_events[ind].socket.recv(msg, zmq::recv_flags::none);

        const auto nout = out_poller.wait_all(out_events, timeout);
        if (!nout) {
            std::cout << "output timeout, freakout" << std::endl;
            abort();
        }
        auto sres = out_events[0].socket.send(msg, zmq::send_flags::none);
    }
} 

The code that reacts to timeouts in this example is very good. We will be more thoughtful in our real application.

Poller data

In the example above you notice the poller_t<> declaration. That empty template argument list sure is curious. We may fill it in order to associate some user data to a socket event being polled. This user data is added along with the socket and event and is then made available in any event resulting from a poll. Here is an example where the user data is of type int.

using mypoller_t = zmq::poller_t poller<int>;
mypoller_t poller;
int val = 42;
poller.add(sock, zmq::event_flags::pollin, &val);
std::vector<mypoller_t> events(1);
auto n = poller.wait_all(events, timeout);
if (n) {
    assert(42 == *events[0].val);
}

Each socket/event registered with .add() can have its own user data but they must all be of the same (pointer to) type.

Interlude

So far this tour has taken us through zmq.hpp and here, right in the middle of this section, we move to zmq_addon.hpp. As noted above, this switch of files feels somewhat arbitrary to me. Parts of zmq_addon.hpp are more "core" for my own use while parts of zmq.hpp are not so "core" to me. So, let us not worry that we now switch our focus to a header that on the surface may sound somehow secondary. It's in fact full of more goodies.

zmq::active_poller_t

The active_poller_t is created and filled with sockets in a similar manner as poller_t. In fact it uses a poller_t under the hood. However, its add() method takes a third argument which is a std::function<void(event_flags)> and that function will be called when that event is seen.

This allows our application the option of structuring its response to events differently. Instead of complexity held inside the loop, it can be placed in functions or lambdas.

void chirp(zmq::event_flags ef);
zmq::active_poller_t ap;
ap.add(sock1, zmq::event_flags::pollin, [](zmq::event_flags ef) {
  std::cout << "sock1 got " << ef << std::endl;
});
ap.add(sock2, zmq::event_flags::pollin, chirp);

while (true) {
    auto n = ap.wait(timeout);
    if (n) {
        std::cout << "got " << n << " sockets hit\n";
    }
    else {
        std::cout << "timeout" << std::endl;
    }
}

Multipart

When I first started to learn ZeroMQ I learned lots of interesting things and also struggled with lots of confusion. One thing that confused me extra was multipart messages. Take for example the delightful puzzle from zmq_msg_send(3) which describes

int zmq_msg_send (zmq_msg_t *msg, void *socket, int flags);

Multi-part messages: A 0MQ message is composed of 1 or more message parts. Each message part is an independent zmqmsgt in its own right. 0MQ ensures atomic delivery of messages: peers shall receive either all message parts of a message or none at all. The total number of message parts is unlimited except by available memory.

So, we send a zmq_msg_t which is a message, but really it's a message part and we can send many of them but really we send only one. Okay, okay, so, it all kind of makes sense now, but at the start it was all rather confusing.

Thankfully, things are more clear and precise using cppzmq's terms and objects. Previously in this tour, we described a message_t as being of a single part because it held data in a single contiguous block of memory. We will stick to that but then describe how cppzmq allows us to use multiple calls to send() or recv() in order to transmit multiple, single-part messages altogether. Finally, we will describe the multipart_t object of cppzmq which allows us to aggregate multiple, single-part message_t in useful ways and then send() or recv() them with but a single call. Finally we describe how to send and receive multiple parts with out even any message_t at all.

Multiple messages

ZeroMQ has the concept of "more". As we send a single (single-part) message we also tell the socket, "you know what, next I'm gonna send you another message and I want you to wait until I do that and then you can transmit both of them together". This is done with cppzmq like:

zmq::message_t msg1, msg2;
auto res = sock.send(msg1, zmq::send_flags::sndmore);
res = sock.send(msg2, zmq::send_flags::none);

That's "more" and of course you can have more "more" messages than just the first one. The chain can continue and, as it does, ZeroMQ is collecting these messages behind the scenes, waiting for that final none. When none comes, ZeroMQ races to do its magic to assure every message in the chain is delivered to all linked sockets or none of them are.

Thus, the zmq_msg_send(3) riddle is understood. There may be multiple (single-part) messages (by which I mean some zmq_msg_t's) that the application provides to a socket for sending. Then, once given the go-ahead, ZeroMQ will transport all their data as a single message.

On the other end of the transport, ZeroMQ receives the data from these multiple "more" messages and presents it to the application through multiple calls to recv(). We must write our application to expect this chain of separate-but-together messages and cppzmq provides the help we need.

zmq::message_t msg1, msg2;
auto res = sock.recv(msg1, zmq::recv_flags::none);
if (!msg1.more()) {
    std::cout << "Oh dear, sheeps are going astray\n";
}
else {
    auto res = sock.recv(msg2, zmq::recv_flags::none);
}

This ZeroMQ concept of "more" is kind of a "micro protocol" and kind of an optimization to allow truly large messages broken up to while still transmitted in a way that their ordering is assured regardless of how many peers may be attempting to otherwise multiplex their streams. This can be needed but it can also add unwanted complication. ZeroMQ is still buffering these messages in memory and still transmitting them sequentially, so why not allow the application to do that packaging itself? Well….

Multipart messages

Enter multipart_t. I hesitate to call this a "message" because it's really multiple messages which are aggregated into a collection (called multipart_t). Behind the scenes a multipart_t::send() and ::recv() are simply handling multiple single-part message_t objects with "more", as described above. Later we will describe how this can be done without even forming a multipart_t by using code which is to send/recv sort of like what buffers are to message_t.

But first multipart_t, which we should think about as if it were an STL collection (and in fact it uses a std::deque<message_t> under the covers). We can construct them in many ways.

multipart_t empty;
multipart_t first_from_recv(sock);
std::vector<int> data(1024,0);
multipart_t first_from_data(data.data(), data.size()*sizeof(int));
multipart_t first_from_string(std::string("hello world"));

As the variable names are meant to imply, these latter constructors fill in an initial element of the multipart_t container. We can then add more message_t elements:

message_t msg;
multipart_t mp, mp2;

// Prefix and postfix concatenation. 
mp.prepend(mp2);
mp.append(mp2);

// push a part in various forms to front, also "add" variants to back.
mp.pushmem(data, size);
mp.pushstr("hello again");
mp.pushtyp(myobject);           // effectively a memcpy
mp.push(msg);
mp.push_back(msg);              // same as push, but STL spelling

We can also remove a message (a part):

// from front
message_t msg = mp.pop();
std::string s = mp.popstr();
MyObject mo = mp.pop<MyObject>();

// from back
message_t msg = mp.remove();

// all
mp.clear();

We can query and iterate on the multipart_t in many ways:

message_t& first = mp.front();
message_t& last = mp.back();
message_t* nth = mp.peek(42);
std::count << mp.size() << " parts:\n";
for (auto& msg : mp) {
    std::cout << msg.size() << " bytes:\n" << std::endl;
    std::cout << msg.str() << std::endl;
}
if (mp.empty()) {
    std::cout << "no parts\n";
}

Finally, getting back to multiple single-part message transmission we may do:

// Clear and receive one or MORE message_t from a socket
mp.recv(sock);

// Send the one or MORE message_t parts to a socket
mp.send(sock);

This happily hides all the "more" business from our delicate eyes.

Multipart send() and recv() without multipart_t

Building up a multipart_t to send and receive data that is already "multipart'ish" can require unwanted code and memory copies. If your application already has some collection of data, why should you go to this extra trouble? The answer is, you may not need to.

The templated recv_multipart() and send_multipart() free functions are provided. They may be used with a multipart_t as a replacement for that classes send() and recv(). But they may work with other collections holding message_t or buffers or other types. First some receives.

std::vector<message_t> msgs;
aut res = recv_multipart(sock, std::back_inserter(msgs));

multipart_t mp;
aut res = recv_multipart(sock, std::back_inserter(mp));

And some sends.

std::vector<zmq::message_t> msgs;
auto res = send_multipart(sock, msgs);

std::vector<const_buffer> cbufs;
auto res = send_multipart(sock, cbufs);

std::vector<mutable_buffer> mbufs;
auto res = send_multipart(sock, mbufs);

Codec

We just saw how cppzmq can transmit a sequence of multiple, single-part messages in the ZeroMQ "more" way and how the multipart_t can be used to bundle up the "more"'ing inside the application. CZMQ introduced another way to handle multiple, single-part messages and that is to encode them into a single, single-part message inside the application.

A bit of nomenclature. As was mentioned, what cppzmq calls a message_t, CZMQ writes zframe_t. What cppzmq calls multipart_t, CZMQ says zmsg_t. As described in zmsg(3), the CZMQ function zmsg_encode(zmsg_t*) returns a zframe_t. That zmsg_t is a multi-part message. Then back in cppzmq we say we can encode one of its multipart_t into one message_t. Of course, an encode is not useful without a decode and together we have a codec.

Why a codec

With the nice multipart_t class methods and free functions for send/recv that were just described, why introduce a codec? The reason is that the new thread-safe sockets such as SERVER/CLIENT do not support the "more" algorithm at all. They send one message_t at a time Something must be done in if application protocol requires transmitting an sequence of messages in an atomic manner. The inspiration for this was generaldomo which generalizes the Majordomo protocol (v1) to use SERVER/CLIENT as well as ROUTER/DEALER.

So, we must do something to squish multiple messages into single one. A codec for this can be invented by any application but interoperability is increased if a common one is shared by various ZeroMQ bindings and so the one used in CZMQ is offered in the cppzmq.

The CZMQ / cppzmq codec is rather simple but not well documented outside the source code. Let this count as documentation until some interested person writes an ZeroMQ RFC.

A number of messages are input to the encoding function and for each message its size is emitted followed by the message data. If the message is smaller than 255 bytes, the size value is stored in a single byte. Otherwise a literal byte value of 0xFF is emitted followed by the message size as a four byte value.

The decode works in reverse. If the first byte is 0xFF the decoder knows the next four bytes provide the message size else the first byte provides the size. The decoder then consumes a number of bytes as given by that size and fills a message_t. This repeats until the byte stream is exhausted or a parse error is encountered.

Using the codec

Okay, enough blah blah about the codec. There are two ways to use it. One may call methods on multipart_t or one may use free functions in analogy to how multipart send()/recv() was described above. First, the multipart_t methods.

multipart_t mp;

message_t msg = mp.encode();
multipart_t mp2;
// in place, decode+append
mp2.decode_append(msg);

// return constructed
multipart_t mp3 = multipart_t::decode(msg);

Take care how this last one is a static class method. You can call mp.decode(msg) and it returns a new multipart_t and does not touch the object mp used to call it.

Here's the codec exercised with free functions. They have the added flexibility to operate on buffers.

std::vector<message_t> msgs, msgs2;
auto msg = encode(msgs);
decode(msg, std::back_inserter(msgs2));

std::vector<const_buffer> cbufs;
auto msg = encode(cbufs);
multipart_t mp;
decode(msg, std::back_inserter(mp));

FIN

That's it. Go forth and make the next great ZeroMQ application in C++!

Author: Brett Viren

Created: 2020-04-15 Wed 19:59

Validate