A Look at Nanomsg and Scalability Protocols (Why ZeroMQ Shouldn’t Be Your First Choice)

Earlier this month, I explored ZeroMQ and how it proves to be a promising solution for building fast, high-throughput, and scalable distributed systems. Despite lending itself quite well to these types of problems, ZeroMQ is not without its flaws. Its creators have attempted to rectify many of these shortcomings through spiritual successors Crossroads I/O and nanomsg.

The now-defunct Crossroads I/O is a proper fork of ZeroMQ with the true intention being to build a viable commercial ecosystem around it. Nanomsg, however, is a reimagining of ZeroMQ—a complete rewrite in C ((The author explains why he should have originally written ZeroMQ in C instead of C++.)). It builds upon ZeroMQ’s rock-solid performance characteristics while providing several vital improvements, both internal and external. It also attempts to address many of the strange behaviors that ZeroMQ can often exhibit. Today, I’ll take a look at what differentiates nanomsg from its predecessor and implement a use case for it in the form of service discovery.

Nanomsg vs. ZeroMQ

A common gripe people have with ZeroMQ is that it doesn’t provide an API for new transport protocols, which essentially limits you to TCP, PGM, IPC, and ITC. Nanomsg addresses this problem by providing a pluggable interface for transports and messaging protocols. This means support for new transports (e.g. WebSockets) and new messaging patterns beyond the standard set of PUB/SUB, REQ/REP, etc.

Nanomsg is also fully POSIX-compliant, giving it a cleaner API and better compatibility. No longer are sockets represented as void pointers and tied to a context—simply initialize a new socket and begin using it in one step. With ZeroMQ, the context internally acts as a storage mechanism for global state and, to the user, as a pool of I/O threads. This concept has been completely removed from nanomsg.

In addition to POSIX compliance, nanomsg is hoping to be interoperable at the API and protocol levels, which would allow it to be a drop-in replacement for, or otherwise interoperate with, ZeroMQ and other libraries which implement ZMTP/1.0 and ZMTP/2.0. It has yet to reach full parity, however.

ZeroMQ has a fundamental flaw in its architecture. Its sockets are not thread-safe. In and of itself, this is not problematic and, in fact, is beneficial in some cases. By isolating each object in its own thread, the need for semaphores and mutexes is removed. Threads don’t touch each other and, instead, concurrency is achieved with message passing. This pattern works well for objects managed by worker threads but breaks down when objects are managed in user threads. If the thread is executing another task, the object is blocked. Nanomsg does away with the one-to-one relationship between objects and threads. Rather than relying on message passing, interactions are modeled as sets of state machines. Consequently, nanomsg sockets are thread-safe.

Nanomsg has a number of other internal optimizations aimed at improving memory and CPU efficiency. ZeroMQ uses a simple trie structure to store and match PUB/SUB subscriptions, which performs nicely for sub-10,000 subscriptions but quickly becomes unreasonable for anything beyond that number. Nanomsg uses a space-optimized trie called a radix tree to store subscriptions. Unlike its predecessor, the library also offers a true zero-copy API which greatly improves performance by allowing memory to be copied from machine to machine while completely bypassing the CPU.

ZeroMQ implements load balancing using a round-robin algorithm. While it provides equal distribution of work, it has its limitations. Suppose you have two datacenters, one in New York and one in London, and each site hosts instances of “foo” services. Ideally, a request made for foo from New York shouldn’t get routed to the London datacenter and vice versa. With ZeroMQ’s round-robin balancing, this is entirely possible unfortunately. One of the new user-facing features that nanomsg offers is priority routing for outbound traffic. We avoid this latency problem by assigning priority one to foo services hosted in New York for applications also hosted there. Priority two is then assigned to foo services hosted in London, giving us a failover in the event that foos in New York are unavailable.

Additionally, nanomsg offers a command-line tool for interfacing with the system called nanocat. This tool lets you send and receive data via nanomsg sockets, which is useful for debugging and health checks.

Scalability Protocols

Perhaps most interesting is nanomsg’s philosophical departure from ZeroMQ. Instead of acting as a generic networking library, nanomsg intends to provide the “Lego bricks” for building scalable and performant distributed systems by implementing what it refers to as “scalability protocols.” These scalability protocols are communication patterns which are an abstraction on top of the network stack’s transport layer. The protocols are fully separated from each other such that each can embody a well-defined distributed algorithm. The intention, as stated by nanomsg’s author Martin Sustrik, is to have the protocol specifications standardized through the IETF.

Nanomsg currently defines six different scalability protocols: PAIR, REQREP, PIPELINE, BUS, PUBSUB, and SURVEY.

PAIR (Bidirectional Communication)

PAIR implements simple one-to-one, bidirectional communication between two endpoints. Two nodes can send messages back and forth to each other.

REQREP (Client Requests, Server Replies)

The REQREP protocol defines a pattern for building stateless services to process user requests. A client sends a request, the server receives the request, does some processing, and returns a response.

PIPELINE (One-Way Dataflow)

PIPELINE provides unidirectional dataflow which is useful for creating load-balanced processing pipelines. A producer node submits work that is distributed among consumer nodes.

BUS (Many-to-Many Communication)

BUS allows messages sent from each peer to be delivered to every other peer in the group.

PUBSUB (Topic Broadcasting)

PUBSUB allows publishers to multicast messages to zero or more subscribers. Subscribers, which can connect to multiple publishers, can subscribe to specific topics, allowing them to receive only messages that are relevant to them.

SURVEY (Ask Group a Question)

The last scalability protocol, and the one in which I will further examine by implementing a use case with, is SURVEY. The SURVEY pattern is similar to PUBSUB in that a message from one node is broadcasted to the entire group, but where it differs is that each node in the group responds to the message. This opens up a wide variety of applications because it allows you to quickly and easily query the state of a large number of systems in one go. The survey respondents must respond within a time window configured by the surveyor.

Implementing Service Discovery

As I pointed out, the SURVEY protocol has a lot of interesting applications. For example:

  • What data do you have for this record?
  • What price will you offer for this item?
  • Who can handle this request?

To continue exploring it, I will implement a basic service-discovery pattern. Service discovery is a pretty simple question that’s well-suited for SURVEY: what services are out there? Our solution will work by periodically submitting the question. As services spin up, they will connect with our service discovery system so they can identify themselves. We can tweak parameters like how often we survey the group to ensure we have an accurate list of services and how long services have to respond.

This is great because 1) the discovery system doesn’t need to be aware of what services there are—it just blindly submits the survey—and 2) when a service spins up, it will be discovered and if it dies, it will be “undiscovered.”

Here is the ServiceDiscovery class:

The discover method submits the survey and then collects the responses. Notice we construct a SURVEYOR socket and set the SURVEYOR_DEADLINE option on it. This deadline is the number of milliseconds from when a survey is submitted to when a response must be received—adjust it accordingly based on your network topology. Once the survey deadline has been reached, a NanoMsgAPIError is raised and we break the loop. The resolve method will take the name of a service and randomly select an available provider from our discovered services.

We can then wrap ServiceDiscovery with a daemon that will periodically run discover.

The discovery parameters are configured through environment variables which I inject into a Docker container.

Services must connect to the discovery system when they start up. When they receive a survey, they should respond by identifying what service they provide and where the service is located. One such service might look like the following:

Once again, we configure parameters through environment variables set on a container. Note that we connect to the discovery system with a RESPONDENT socket which then responds to service queries with the service name and address. The service itself uses a REP socket that simply responds to any requests with “The answer is 42,” but it could take any number of forms such as HTTP, raw socket, etc.

The full code for this example, including Dockerfiles, can be found on GitHub.

Nanomsg or ZeroMQ?

Based on all the improvements that nanomsg makes on top of ZeroMQ, you might be wondering why you would use the latter at all. Nanomsg is still relatively young. Although it has numerous language bindings, it hasn’t reached the maturity of ZeroMQ which has a thriving development community. ZeroMQ has extensive documentation and other resources to help developers make use of the library, while nanomsg has very little. Doing a quick Google search will give you an idea of the difference (about 500,000 results for ZeroMQ to nanomsg’s 13,500).

That said, nanomsg’s improvements and, in particular, its scalability protocols make it very appealing. A lot of the strange behaviors that ZeroMQ exposes have been resolved completely or at least mitigated. It’s actively being developed and is quickly gaining more and more traction. Technically, nanomsg has been in beta since March, but it’s starting to look production-ready if it’s not there already.

Distributed Messaging with ZeroMQ

“A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable.” -Leslie Lamport

With the increased prevalence and accessibility of cloud computing, distributed systems architecture has largely supplanted more monolithic constructs. The implication of using a service-oriented architecture, of course, is that you now have to deal with a myriad of difficulties that previously never existed, such as fault tolerance, availability, and horizontal scaling. Another interesting layer of complexity is providing consistency across nodes, which itself is a problem surrounded with endless research. Algorithms like Paxos and Raft attempt to provide solutions for managing replicated data, while other solutions offer eventual consistency.

Building scalable, distributed systems is not a trivial feat, but it pales in comparison to building real-time systems of a similar nature. Distributed architecture is a well-understood problem and the fact is, most applications have a high tolerance for latency. Few systems have a demonstrable need for real-time communication, but the few that do present an interesting challenge for developers. In this article, I explore the use of ZeroMQ to approach the problem of distributed, real-time messaging in a scalable manner while also considering the notion of eventual consistency.

The Intelligent Transport Layer

ZeroMQ is a high-performance asynchronous messaging library written in C++. It’s not a dedicated message broker but rather an embeddable concurrency framework with support for direct and fan-out endpoint connections over a variety of transports. ZeroMQ implements a number of different communication patterns like request-reply, pub-sub, and push-pull through TCP, PGM (multicast), in-process, and inter-process channels. The glaring lack of UDP support is, more or less, by design because ZeroMQ was conceived to provide guaranteed-ish delivery of atomic messages. The library makes no actual guarantee of delivery, but it does make a best effort. What ZeroMQ does guarantee, however, is that you will never receive a partial message, and messages will be received in order. This is important because UDP’s performance gains really only manifest themselves in lossy or congested environments.

The comprehensive list of messaging patterns and transports alone make ZeroMQ an appealing choice for building distributed applications, but it particularly excels due to its reliability, scalability and high throughput. ZeroMQ and related technologies are popular within high-frequency trading, where packet loss of financial data is often unacceptable ((ZeroMQ’s founder, iMatix, was responsible for moving JPMorgan Chase and the Dow Jones Industrial Average trading platforms to OpenAMQ)). In 2011, CERN actually performed a study comparing CORBA, Ice, Thrift, ZeroMQ, and several other protocols for use in its particle accelerators and ranked ZeroMQ the highest.

cern

ZeroMQ uses some tricks that allow it to actually outperform TCP sockets in terms of throughput such as intelligent message batching, minimizing network-stack traversals, and disabling Nagle’s algorithm. By default (and when possible), messages are queued on the subscriber, which attempts to avoid the problem of slow subscribers. However, when this isn’t sufficient, ZeroMQ employs a pattern called the “Suicidal Snail.” When a subscriber is running slow and is unable to keep up with incoming messages, ZeroMQ convinces the subscriber to kill itself. “Slow” is determined by a configurable high-water mark. The idea here is that it’s better to fail fast and allow the issue to be resolved quickly than to potentially allow stale data to flow downstream. Again, think about the high-frequency trading use case.

A Distributed, Scalable, and Fast Messaging Architecture

ZeroMQ makes a convincing case for use as a transport layer. Let’s explore a little deeper to see how it could be used to build a messaging framework for use in a real-time system. ZeroMQ is fairly intuitive to use and offers a plethora of bindings for various languages, so we’ll focus more on the architecture and messaging paradigms than the actual code.

About a year ago, while I first started investigating ZeroMQ, I built a framework to perform real-time messaging and document syncing called Zinc. A “document,” in this sense, is any well-structured and mutable piece of data—think text document, spreadsheet, canvas, etc. While purely academic, the goal was to provide developers with a framework for building rich, collaborative experiences in a distributed manner.

The framework actually had two implementations, one backed by the native ZeroMQ, and one backed by the pure Java implementation, JeroMQ ((In systems where near real-time is sufficient, JeroMQ is adequate and benefits by not requiring any native linking.)). It was really designed to allow any transport layer to be used though.

Zinc is structured around just a few core concepts: Endpoints, ChannelListeners, MessageHandlers, and Messages. An Endpoint represents a single node in an application cluster and provides functionality for sending and receiving messages to and from other Endpoints. It has outbound and inbound channels for transmitting messages to peers and receiving them, respectively.

endpoint

ChannelListeners essentially act as daemons listening for incoming messages when the inbound channel is open on an Endpoint. When a message is received, it’s passed to a thread pool to be processed by a MessageHandler. Therefore, Messages are processed asynchronously in the order they are received, and as mentioned earlier, ZeroMQ guarantees in-order message delivery. As an aside, this is before I began learning Go, which would make for an ideal replacement for Java here as it’s quite well-suited to the problem :)

Messages are simply the data being exchanged between Endpoints, from which we can build upon with Documents and DocumentFragments. A Document is the structured data defined by an application, while DocumentFragment represents a partial Document, or delta, which can be as fine- or coarse- grained as needed.

Zinc is built around the publish-subscribe and push-pull messaging patterns. One Endpoint will act as the host of a cluster, while the others act as clients. With this architecture, the host acts as a publisher and the clients as subscribers. Thus, when a host fires off a Message, it’s delivered to every subscribing client in a multicast-like fashion. Conversely, clients also act as “push” Endpoints with the host being a “pull” Endpoint. Clients can then push Messages into the host’s Message queue from which the host is pulling from in a first-in-first-out manner.

This architecture allows Messages to be propagated across the entire cluster—a client makes a change which is sent to the host, who propagates this delta to all clients. This means that the client who initiated the change will receive an “echo” delta, but it will be discarded by checking the Message origin, a UUID which uniquely identifies an Endpoint. Clients are then responsible for preserving data consistency if necessary, perhaps through operational transformation or by maintaining a single source of truth from which clients can reconcile.

cluster

One of the advantages of this architecture is that it scales reasonably well due to its composability. Specifically, we can construct our cluster as a tree of clients with arbitrary breadth and depth. Obviously, the more we scale horizontally or vertically, the more latency we introduce between edge nodes. Coupled with eventual consistency, this can cause problems for some applications but might be acceptable to others.

scalability

The downside is this inherently introduces a single point of failure characterized by the client-server model. One solution might be to promote another node when the host fails and balance the tree.

Once again, this framework was mostly academic and acted as a way for me to test-drive ZeroMQ, although there are some other interesting applications of it. Since the framework supports multicast message delivery via push-pull or publish-subscribe mechanisms, one such use case is autonomous load balancing.

Paired with something like ZooKeeper, etcd, or some other service-discovery protocol, clients would be capable of discovering hosts, who act as load balancers. Once a client has discovered a host, it can request to become a part of that host’s cluster. If the host accepts the request, the client can begin to send messages to the host (and, as a result, to the rest of the cluster) and, likewise, receive messages from the host (and the rest of the cluster). This enables clients and hosts to submit work to the cluster such that it’s processed in an evenly distributed way, and workers can determine whether to pass work on further down the tree or process it themselves. Clients can choose to participate in load-balancing clusters at their own will and when they become available, making them mostly autonomous. Clients could then be quickly spun-up and spun-down using, for example, Docker containers.

ZeroMQ is great for achieving reliable, fast, and scalable distributed messaging, but it’s equally useful for performing parallel computation on a single machine or several locally networked ones by facilitating in- and inter- process communication using the same patterns. It also scales in the sense that it can effortlessly leverage multiple cores on each machine. ZeroMQ is not a replacement for a message broker, but it can work in unison with traditional message-oriented middleware. Combined with Protocol Buffers and other serialization methods, ZeroMQ makes it easy to build extremely high-throughput messaging frameworks.