Scaling Shared Data in Distributed Systems

Sharing mutable data at large scale is an exceedingly difficult problem. In their seminal paper CRDTs: Consistency without concurrency control, Shapiro et al. describe why the CAP theorem demands a give and take between scalability and consistency. In general, CAP requires us to choose between CP and AP. The former requires serializing every write, which doesn’t scale beyond a small cluster. The latter ensures scalability by giving up consistency.

Sharing Data in Centralized Systems

We tend to prefer weaker consistency models because they mean lower latency and higher availability. To highlight this point, consider the fact that the memory models for most programming languages are not serializable by default. More concisely, programs with shared memory are not inherently thread-safe. This is a conscious design decision because enforcing memory serializability incurs a significant latency penalty. Instead, programming languages require explicit memory barriers which can be used around the critical sections which need this property.

For example, the Java memory model uses within-thread as-if-serial semantics. This means the execution of a thread in isolation, regardless of runtime optimizations, is guaranteed to be the same as it would have been had all statements been run in program order. The implication of as-if-serial semantics is that it gives up consistency—different threads can and will have different views of the data. Java requires the use of memory barriers, either through explicit locking or the volatile keyword, in order to establish a happens-before relationship between statements in different threads.

This can be thought of as scaling shared data! We have multiple threads (systems) accessing the same data. While not distributed, many of the same ideas apply. Consistency, by definition, requires linearizability. In multi-threaded programs, we achieve this with mutexes. In distributed systems, we use transactions and distributed locking. Intuitively, both involve performance trade-offs.

Sharing Data in Distributed Systems

Consider a shared, global counter used to measure ad impressions on a website accessed by millions of users around the world.

shared_data

Traditionally, we might implement this using transactions—get the value, increment it by one, then save it back atomically. The problem is transactions will not scale. In order to guarantee consistency, they must be serialized. This results in high latency and low availability if a lot of writes are occurring. We lose some of the key advantages of distributed systems: parallel computation and availability.

So CAP makes it difficult to scale mutable, shared data. How do we do it then? There are several different strategies, each with their own pros and cons.

Immutable Data

Scaling shared read-only data is easy using replication techniques. This means the simplest solution for sharing data in a distributed system is to use immutable data. If we don’t have to worry about writes, then scaling is trivial. Unfortunately, this isn’t always possible, but if your use case allows for it, it’s the best option.

Last-Write Wins

From a set of conflicting writes, LWW selects the one with the most recent timestamp. Clock drift happens, so LWW will inevitably lead to data loss with enough concurrent writes. It’s critical to accept this reality, but it’s often acceptable for some use cases. LWW trades consistency for availability.

Application-Level Conflict Resolution

Often times, the best way to ensure safety is by resolving write conflicts at the application level. When there are conflicting writes on a piece of data, applications can apply business rules to determine the canonical update. An example of this is Riak’s application-side conflict resolution strategy.

Causal Ordering

Rather than relying on LWW, which has a high probability of data loss, we can use the causal relationships between writes in order to determine which one to apply. Unlike timestamps, which attempt to provide a total order, causal ordering establishes a partial order. We can approximate a causal ordering by using techniques like Lamport timestamps or vector clocks. By storing a causal history with each write and reading that history before each write, we can make informed decisions on the correctness of updates. The trade-off here is the added overhead of storing this additional metadata and the extra round trip.

Distributed Data Types

CRDTs, or convergent/commutative replicated data types, are the new, up-and-coming solution for scaling shared data, but they aren’t at all new. In fact, the theory behind CRDTs has been in use for hundreds of years. CRDTs are grounded in mathematics. Operations or updates on a CRDT always converge. Because the operations must be commutative, associative, and idempotent, they can be applied in any order and the outcome will always be the same. This means we don’t care about causal ordering—it doesn’t matter.

CRDTs are generally modeled after common data structures like sets, maps, lists, and counters, just in a distributed sense. What they provide us are highly available, eventually consistent data structures in which we don’t have to worry about write coordination.

Aside from the operation requirements, the other drawback of CRDTs is that they require knowledge of all clients. Each client has a replica of the CRDT, so the global state is determined by merging them. And although CRDTs can be applied to a wide variety of use cases, they typically require some interpretation and specialization of common data structures. These interpretations tend to be more limited in capability.

In Summary

Scaling mutable data is hard. On the other hand, scaling immutable data is easy, so if you can get away with it, do it. There are a number of ways to approach the problem, but as with anything, it all comes down to your use case. The solutions are all about trade-offs—namely the trade-off between consistency and availability. Use weakly consistent models when you can because they afford you high availability and low latency, and rely on stronger models only when absolutely necessary. Do what makes sense for your system.

Understanding Consensus

A classical problem presented within the field of distributed systems is the Byzantine Generals Problem. In it, we observe two allied armies positioned on either side of a valley. Within the valley is a fortified city. Each army has a general with one acting as commander. Both armies must attack at the same time or face defeat by the city’s defenders. In order to come to an agreement on when to attack, messengers must be sent through the valley, risking capture by the city’s patrols. Consider the diagram below illustrating this problem.

byzantine_generals

In the above scenario, Army A has sent a messenger to Army B with a message saying “Attack at 0700.” Army B receives this message and dispatches a messenger carrying an acknowledgement of the attack plans; however, our ill-fated messenger has been intercepted by the city’s defenders.

How do our armies come to an agreement on when to attack? Perhaps Army A sends 100 messengers and attacks regardless. Unfortunately, if all of the messengers are captured, this would result in a swift defeat because A would attack without B. What if, instead, A sends 100 messengers, waits for acknowledgements of those messages, and only attacks if it reaches a certain level of confidence, say receiving 75 or more confirmations? Yet again, this could very well end in defeat, this time with B attacking without A.

We also need to bear in mind that sending messages has a certain amount of overhead. We can’t, in good conscience, send a million messengers to their potential demise. Or maybe we can, but it’s more than the number of soldiers in our army.

In fact, we can’t reliably make a decision. It’s provenly impossible. In the face of a Byzantine failure, it becomes even more complicated by the possibility of traitors or forged messages.

Now replace two generals with N generals. Coming to a perfectly reliable agreement between two generals was already impossible but becomes dramatically more complicated. It’s a problem more commonly referred to as distributed consensus, and it’s the focus of an army of researchers.

The problem of consensus is blissfully simple, but the solution is far from trivial. Consensus is the basis of distributed coordination services, locking protocols, and databases. A monolithic system (think a MySQL server) can enforce ACID constraints with consistent reads but exhibits generally poor availability and fault tolerance. The original Google App Engine datastore relied on a master/slave architecture where a single data center held the primary copy of data which was replicated to backup sites asynchronously. This offered applications strong consistency and low latency with the implied trade-off of availability. The health of an application was directly tied to the health of a data center. Beyond transient losses, it also meant periods of planned unavailability and read-only access while Google performed data center maintenance. App Engine has since transitioned to a high-replication datastore which relies on distributed consensus to replicate data across sites. This allows the datastore to continue operating in the presence of failures and at greater availability. In agreement with CAP, this naturally means higher latency on writes.

There are a number of solutions to distributed consensus, but most of them tend to be pretty characteristic of each other. We will look at some of these solutions, including multi-phase commit and state-replication approaches.

Two-Phase Commit

Two-phase commit (2PC) is the simplest multi-phase commit protocol. In two-phase commit, all transactions go through a coordinator who is responsible for ensuring a transaction occurs across one or more remote sites (cohorts).

2pc

When the coordinator receives a request, it asks each of its cohorts to vote yes or no. During this phase, each cohort performs the transaction up to the point of committing it. The coordinator then waits for all votes. If the vote is unanimously “yes,” it sends a message to its cohorts to commit the transaction. If one or more vote is “no,” a message is sent to rollback. The cohorts then acknowledge whether the transaction was committed or rolled back and the process is complete.

Two-phase commit is a blocking protocol. The coordinator blocks waiting for votes from its cohorts, and cohorts block waiting for a commit/rollback message from the coordinator. Unfortunately, this means 2PC can, in some circumstances, result in a deadlock, e.g. the coordinator dies while cohorts wait or a cohort dies while the coordinator waits. Another problematic scenario is when a coordinator and cohort simultaneously fail. Even if another coordinator takes its place, it won’t be able to determine whether to commit or rollback.

Three-Phase Commit

Three-phase commit (3PC) is designed to solve the problems identified in two-phase by implementing a non-blocking protocol with an added “prepare” phase. Like 2PC, it relies on a coordinator which relays messages to its cohorts.

3pc

Unlike 2PC, cohorts do not execute a transaction during the voting phase. Rather, they simply indicate if they are prepared to perform the transaction. If cohorts timeout during this phase or there is one or more “no” vote, the transaction is aborted. If the vote is unanimously “yes,” the coordinator moves on to the “prepare” phase, sending a message to its cohorts to acknowledge the transaction will be committed. Again, if an ack times out, the transaction is aborted. Once all cohorts have acknowledged the commit, we are guaranteed to be in a state where all cohorts have agreed to commit. At this point, if the commit message from the coordinator is not received in the third phase, the cohort will go ahead and commit anyway. This solves the deadlocking problems described earlier. However, 3PC is still susceptible to network partitions. If a partition occurs, the coordinator will timeout and progress will not be made.

State Replication

Protocols like Raft, Paxos, and Zab are popular and widely used solutions to the problem of distributed consensus. These implement state replication or primary-backup using leaders, quorums, and replicas of operation logs or incremental delta states.

consensus quorum

These protocols work by electing a leader (coordinator). Like multi-phase commit, all changes must go through that leader, who then broadcasts the changes to the group. Changes occur by appending a log entry, and each node has its own log replica. Where multi-phase commit falls down in the face of network partitions, these protocols are able to continue working by relying on a quorum (majority). The leader commits the change once the quorum has acknowledged it.

The use of quorums provide partition tolerance by fencing minority partitions while the majority continues to operate. This is the pessimistic approach to solving split-brain, so it comes with an inherent availability trade-off. This problem is mitigated by the fact that each node hosts a replicated state machine which can be rebuilt or reconciled once the partition is healed.

Google relies on Paxos for its high-replication datastore in App Engine as well as its Chubby lock service. The distributed key-value store etcd uses Raft to manage highly available replicated logs. Zab, which differentiates itself from the former by implementing a primary-backup protocol, was designed for the ZooKeeper coordination service. In general, there are several different implementations of these protocols, such as the Go implementation of Raft.

Distributed consensus is a difficult thing to get right, but it’s important to frame it within the context of CAP. We can ensure stronger consistency at the cost of higher latency and lower availability. On the other hand, we can achieve higher availability with decreased latency while giving up strong consistency. The trade-offs really depend on what your needs are.

Iris Decentralized Cloud Messaging

A couple weeks ago, I published a rather extensive analysis of numerous message queues, both brokered and brokerless. Brokerless messaging is really just another name for peer-to-peer communication. As we saw, the difference in message latency and throughput between peer-to-peer systems and brokered ones is several orders of magnitude. ZeroMQ and nanomsg are able to reliably transmit millions of messages per second at the expense of guaranteed delivery.

Peer-to-peer messaging is decentralized, scalable, and fast, but it brings with it an inherent complexity. There is a dichotomy between how brokerless messaging is conceptualized and how distributed systems are actually built. Distributed systems are composed of services like applications, databases, caches, etc. Services are composed of instances or nodes—individually addressable hosts, either physical or virtual. The key observation is that, conceptually, the unit of interaction lies at the service level, not the instance level. We don’t care about which database server we interact with, we just want to talk to database server (or perhaps multiple). We’re concerned with logical groups of nodes.

While traditional socket-queuing systems like ZeroMQ solve the problem of scaling, they bring about a certain coupling between components. System designers are forced to build applications which communicate with nodes, not services. We can introduce load balancers like HAProxy, but we’re still addressing specific locations while creating potential single points of failure. With lightweight VMs and the pervasiveness of elastic clouds, IP addresses are becoming less and less static—they come and go. The canonical way of dealing with this problem is to use distributed coordination and service discovery via ZooKeeper, et al., but this introduces more configuration, more moving parts, and more headaches.

The reality is that distributed systems are not built with the instance as the smallest unit of composition in mind, they’re built with services in mind. As discussed earlier, a service is simply a logical grouping of nodes. This abstraction is what we attempt to mimic with things like etcd, ZooKeeper and HAProxy. These assemblies are proven, but there are alternative solutions that offer zero configuration, minimal network management, and overall less complexity. One such solution that I want to explore is a distributed messaging framework called Iris.

Decentralized Messaging with Iris

Iris is posited as a decentralized approach to backend messaging middleware. It looks to address several of the fundamental issues with traditional brokerless systems, like tight coupling and security.

In order to avoid the problem of addressing instances, Iris considers clusters to be the smallest logical blocks of which systems are composed. A cluster is a collection of zero or more nodes which are responsible for a certain service sub-task. Clusters are then assembled into services such that they can communicate with each other without any regard as to which instance is servicing their requests or where it’s located. Lastly, services are composed into federations, which allow them to communicate across different clouds.

instances_vs_clusters

This form of composition allows Iris to use semantic or logical addressing instead of the standard physical addressing. Nodes specify the name of the cluster they wish to participate in, while Iris handles the intricacies of routing and balancing. For example, you might have three database servers which belong to a single cluster called “databases.” The cluster is reached by its name and requests are distributed across the three nodes. Iris also takes care of service discovery, detecting new clusters as they are created on the same cloud.

physical_vs_semantic

With libraries like ZeroMQ, security tends to be an afterthought. Iris has been built from the ground-up with security in mind, and it provides a security model that is simple and fast.

Iris uses a relaxed security model that provides perfect secrecy whilst at the same time requiring effectively zero configuration. This is achieved through the observation that if a node of a service is compromised, the whole system is considered undermined. Hence, the unit of security is a service – opposed to individual instances – where any successfully authenticated node is trusted by all. This enables full data protection whilst maintaining the loosely coupled nature of the system.

In practice, what this means is that each cluster uses a single private key. This encryption scheme not only makes deployment trivial, it minimizes the effect security has on speed.

authentication_and_encryption

Like ZeroMQ and nanomsg, Iris offers a few different messaging patterns. It provides the standard request-reply and publish-subscribe schemes, but it’s important to remember that the smallest addressable unit is the cluster, not the node. As such, requests are targeted at a cluster and subsequently relayed on to a member in a load-balanced fashion. Publish-subscribe, on the other hand, is not targeted at a single cluster. It allows members of any cluster to subscribe and publish to a topic.

Iris also implements two patterns called “broadcast” and “tunnel.” While request-reply forwards a message to one member of a cluster, broadcast forwards it to all members. The caveat is that there is no way to listen for responses to a broadcast.

Tunnel is designed to address the problem of stateful or streaming transactions where a communication between two endpoints may consist of multiple data exchanges which need to occur as an atomic operation. It provides the guarantee of in-order and throttled message delivery by establishing a channel between a client and a node.

schemes

Performance Characteristics

According to its author, Iris is still in a “feature phase” and hasn’t been optimized for speed. Since it’s written in Go, I’ve compared its pub/sub benchmark performance with other Go messaging libraries, NATS and NSQ. As before, these benchmarks shouldn’t be taken as gospel, the code is available here, and pull requests are welcome.

We can see that Iris is comparable to NSQ on the sending side and about 4x on the receiving side, at least out of the box.

Conclusion

Brokerless systems like ZeroMQ and nanomsg offer considerably higher throughput and less latency than classical message-oriented middleware but require greater orchestration of network topologies. They offer high scalability but can lead to tighter coupling between components. Traditional brokered message queues, like those of the AMQP variety, tend to be slower while providing more guarantees and reduced coupling. However, they are also more prone to scale problems like availability and partitioning.

In terms of its qualities, Iris appears to be a reasonable compromise between the decentralized nature of the brokerless systems and the minimal-configuration and management of the brokered ones. Its intrinsic value lies in its ability to hide the complexities of the underlying infrastructure behind distributed systems. Rather, Iris lends itself to building large-scale systems the way we conceptualize and reason about them—by using services as the building blocks, not instances.

Dissecting Message Queues

Continuing my series on message queues, I spent this weekend dissecting various libraries for performing distributed messaging. In this analysis, I look at a few different aspects, including API characteristics, ease of deployment and maintenance, and performance qualities. The message queues have been categorized into two groups: brokerless and brokered. Brokerless message queues are peer-to-peer such that there is no middleman involved in the transmission of messages, while brokered queues have some sort of server in between endpoints.

The systems I’ll be analyzing are:

Brokerless
nanomsg
ZeroMQ

Brokered
ActiveMQ
NATS
Kafka
Kestrel
NSQ
RabbitMQ
Redis
ruby-nats

To start, let’s look at the performance metrics since this is arguably what people care the most about. I’ve measured two key metrics: throughput and latency. All tests were run on a MacBook Pro 2.6 GHz i7, 16GB RAM. These tests are evaluating a publish-subscribe topology with a single producer and single consumer. This provides a good baseline. It would be interesting to benchmark a scaled-up topology but requires more instrumentation.

The code used for benchmarking, written in Go, is available on GitHub. The results below shouldn’t be taken as gospel as there are likely optimizations that can be made to squeeze out performance gains. Pull requests are welcome.

Throughput Benchmarks

Throughput is the number of messages per second the system is able to process, but what’s important to note here is that there is no single “throughput” that a queue might have. We’re sending messages between two different endpoints, so what we observe is a “sender” throughput and a “receiver” throughput—that is, the number of messages that can be sent per second and the number of messages that can be received per second.

This test was performed by sending 1,000,000 1KB messages and measuring the time to send and receive on each side. Many performance tests tend to use smaller messages in the range of 100 to 500 bytes. I chose 1KB because it’s more representative of what you might see in a production environment, although this varies case by case. For message-oriented middleware systems, only one broker was used. In most cases, a clustered environment would yield much better results.Unsurprisingly, there’s higher throughput on the sending side. What’s interesting, however, is the disparity in the sender-to-receiver ratios. ZeroMQ is capable of sending over 5,000,000 messages per second but is only able to receive about 600,000/second. In contrast, nanomsg sends shy of 3,000,000/second but can receive almost 2,000,000.

Now let’s take a look at the brokered message queues. Intuitively, we observe that brokered message queues have dramatically less throughput than their brokerless counterparts by a couple orders of magnitude for the most part. Half the brokered queues have a throughput below 25,000 messages/second. The numbers for Redis might be a bit misleading though. Despite providing pub/sub functionality, it’s not really designed to operate as a robust messaging queue. In a similar fashion to ZeroMQ, Redis disconnects slow clients, and it’s important to point out that it was not able to reliably handle this volume of messaging. As such, we consider it an outlier. Kafka and ruby-nats have similar performance characteristics to Redis but were able to reliably handle the message volume without intermittent failures. The Go implementation of NATS, gnatsd, has exceptional throughput for a brokered message queue.

Outliers aside, we see that the brokered queues have fairly uniform throughputs. Unlike the brokerless libraries, there is little-to-no disparity in the sender-to-receiver ratios, which themselves are all very close to one.

Latency Benchmarks

The second key performance metric is message latency. This measures how long it takes for a message to be transmitted between endpoints. Intuition might tell us that this is simply the inverse of throughput, i.e. if throughput is messages/second, latency is seconds/message. However, by looking closely at this image borrowed from a ZeroMQ white paper, we can see that this isn’t quite the case. latency The reality is that the latency per message sent over the wire is not uniform. It can vary wildly for each one. In truth, the relationship between latency and throughput is a bit more involved. Unlike throughput, however, latency is not measured at the sender or the receiver but rather as a whole. But since each message has its own latency, we will look at the averages of all of them. Going further, we will see how the average message latency fluctuates in relation to the number of messages sent. Again, intuition tells us that more messages means more queueing, which means higher latency.

As we did before, we’ll start by looking at the brokerless systems.
In general, our hypothesis proves correct in that, as more messages are sent through the system, the latency of each message increases. What’s interesting is the tapering at the 500,000-point in which latency appears to increase at a slower rate as we approach 1,000,000 messages. Another interesting observation is the initial spike in latency between 1,000 and 5,000 messages, which is more pronounced with nanomsg. It’s difficult to pinpoint causation, but these changes might be indicative of how message batching and other network-stack traversal optimizations are implemented in each library. More data points may provide better visibility.

We see some similar patterns with brokered queues and also some interesting new ones.

Redis behaves in a similar manner as before, with an initial latency spike and then a quick tapering off. It differs in that the tapering becomes essentially constant right after 5,000 messages. NSQ doesn’t exhibit the same spike in latency and behaves, more or less, linearly. Kestrel fits our hypothesis.

Notice that ruby-nats and NATS hardly even register on the chart. They exhibited surprisingly low latencies and unexpected relationships with the number of messages.Remarkably, the message latencies for ruby-nats and NATS appear to be constant. This is counterintuitive to our hypothesis.

You may have noticed that Kafka, ActiveMQ, and RabbitMQ were absent from the above charts. This was because their latencies tended to be orders-of-magnitude higher than the other brokered message queues, so ActiveMQ and RabbitMQ were grouped into their own AMQP category. I’ve also included Kafka since it’s in the same ballpark.

Here we see that RabbitMQ’s latency is constant, while ActiveMQ and Kafka are linear. What’s unclear is the apparent disconnect between their throughput and mean latencies.

Qualitative Analysis

Now that we’ve seen some empirical data on how these different libraries perform, I’ll take a look at how they work from a pragmatic point of view. Message throughput and speed is important, but it isn’t very practical if the library is difficult to use, deploy, or maintain.

ZeroMQ and Nanomsg

Technically speaking, nanomsg isn’t a message queue but rather a socket-style library for performing distributed messaging through a variety of convenient patterns. As a result, there’s nothing to deploy aside from embedding the library itself within your application. This makes deployment a non-issue.

Nanomsg is written by one of the ZeroMQ authors, and as I discussed before, works in a very similar way to that library. From a development standpoint, nanomsg provides an overall cleaner API. Unlike ZeroMQ, there is no notion of a context in which sockets are bound to. Furthermore, nanomsg provides pluggable transport and messaging protocols, which make it more open to extension. Its additional built-in scalability protocols also make it quite appealing.

Like ZeroMQ, it guarantees that messages will be delivered atomically intact and ordered but does not guarantee the delivery of them. Partial messages will not be delivered, and it’s possible that some messages won’t be delivered at all. The library’s author, Martin Sustrik, makes this abundantly clear:

Guaranteed delivery is a myth. Nothing is 100% guaranteed. That’s the nature of the world we live in. What we should do instead is to build an internet-like system that is resilient in face of failures and routes around damage.

The philosophy is to use a combination of topologies to build resilient systems that add in these guarantees in a best-effort sort of way.

On the other hand, nanomsg is still in beta and may not be considered production-ready. Consequently, there aren’t a lot of resources available and not much of a development community around it.

ZeroMQ is a battle-tested messaging library that’s been around since 2007. Some may perceive it as a predecessor to nanomsg, but what nano lacks is where ZeroMQ thrives—a flourishing developer community and a deluge of resources and supporting material. For many, it’s the de facto tool for building fast, asynchronous distributed messaging systems that scale.

Like nanomsg, ZeroMQ is not a message-oriented middleware and simply operates as a socket abstraction. In terms of usability, it’s very much the same as nanomsg, although its API is marginally more involved.

ActiveMQ and RabbitMQ

ActiveMQ and RabbitMQ are implementations of AMQP. They act as brokers which ensure messages are delivered. ActiveMQ and RabbitMQ support both persistent and non-persistent delivery. By default, messages are written to disk such that they survive a broker restart. They also support synchronous and asynchronous sending of messages with the former having substantial impact on latency. To guarantee delivery, these brokers use message acknowledgements which also incurs a massive latency penalty.

As far as availability and fault tolerance goes, these brokers support clustering through shared storage or shared nothing. Queues can be replicated across clustered nodes so there is no single point of failure or message loss.

AMQP is a non-trivial protocol which its creators claim to be over-engineered. These additional guarantees are made at the expense of major complexity and performance trade-offs. Fundamentally, clients are more difficult to implement and use.

Since they’re message brokers, ActiveMQ and RabbitMQ are additional moving parts that need to be managed in your distributed system, which brings deployment and maintenance costs. The same is true for the remaining message queues being discussed.

NATS and Ruby-NATS

NATS (gnatsd) is a pure Go implementation of the ruby-nats messaging system. NATS is distributed messaging rethought to be less enterprisey and more lightweight (this is in direct contrast to systems like ActiveMQ, RabbitMQ, and others). Apcera’s Derek Collison, the library’s author and former TIBCO architect, describes NATS as “more like a nervous system” than an enterprise message queue. It doesn’t do persistence or message transactions, but it’s fast and easy to use. Clustering is supported so it can be built on top of with high availability and failover in mind, and clients can be sharded. Unfortunately, TLS and SSL are not yet supported in NATS (they are in the ruby-nats) but on the roadmap.

As we observed earlier, NATS performs far better than the original Ruby implementation. Clients can be used interchangeably with NATS and ruby-nats.

Kafka

Originally developed by LinkedIn, Kafka implements publish-subscribe messaging through a distributed commit log. It’s designed to operate as a cluster that can be consumed by large amounts of clients. Horizontal scaling is done effortlessly using ZooKeeper so that additional consumers and brokers can be introduced seamlessly. It also transparently takes care of cluster rebalancing.

Kafka uses a persistent commit log to store messages on the broker. Unlike other durable queues which usually remove persisted messages on consumption, Kafka retains them for a configured period of time. This means that messages can be “replayed” in the event that a consumer fails.

ZooKeeper makes managing Kafka clusters relatively easy, but it does introduce yet another element that needs to be maintained. That said, Kafka exposes a great API and Shopify has an excellent Go client called Sarama that makes interfacing with Kafka very accessible.

Kestrel

Kestrel is a distributed message queue open sourced by Twitter. It’s intended to be fast and lightweight. Because of this, it has no concept of clustering or failover. While Kafka is built from the ground up to be clustered through ZooKeeper, the onus of message partitioning is put upon the clients of Kestrel. There is no cross-communication between nodes. It makes this trade-off in the name of simplicity. It features durable queues, item expiration, transactional reads, and fanout queues while operating over Thrift or memcache protocols.

Kestrel is designed to be small, but this means that more work must be done by the developer to build out a robust messaging system on top of it. Kafka seems to be a more “all-in-one” solution.

NSQ

NSQ is a messaging platform built by Bitly. I use the word platform because there’s a lot of tooling built around NSQ to make it useful for real-time distributed messaging. The daemon that receives, queues, and delivers messages to clients is called nsqd. The daemon can run standalone, but NSQ is designed to run in as a distributed, decentralized topology. To achieve this, it leverages another daemon called nsqlookupd. Nsqlookupd acts as a service-discovery mechanism for nsqd instances. NSQ also provides nsqadmin, which is a web UI that displays real-time cluster statistics and acts as a way to perform various administrative tasks like clearing queues and managing topics.

By default, messages in NSQ are not durable. It’s primarily designed to be an in-memory message queue, but queue sizes can be configured such that after a certain point, messages will be written to disk. Despite this, there is no built-in replication. NSQ uses acknowledgements to guarantee message delivery, but the order of delivery is not guaranteed. Messages can also be delivered more than once, so it’s the developer’s responsibility to introduce idempotence.

Similar to Kafka, additional nodes can be added to an NSQ cluster seamlessly. It also exposes both an HTTP and TCP API, which means you don’t actually need a client library to push messages into the system. Despite all the moving parts, it’s actually quite easy to deploy. Its API is also easy to use and there are a number of client libraries available.

Redis

Last up is Redis. While Redis is great for lightweight messaging and transient storage, I can’t advocate its use as the backbone of a distributed messaging system. Its pub/sub is fast but its capabilities are limited. It would require a lot of work to build a robust system. There are solutions better suited to the problem, such as those described above, and there are also some scaling concerns with it.

These matters aside, Redis is easy to use, it’s easy to deploy and manage, and it has a relatively small footprint. Depending on the use case, it can be a great choice for real-time messaging as I’ve explored before.

Conclusion

The purpose of this analysis is not to present some sort of “winner” but instead showcase a few different options for distributed messaging. There is no “one-size-fits-all” option because it depends entirely on your needs. Some use cases require fast, fire-and-forget messages, others require delivery guarantees. In fact, many systems will call for a combination of these. My hope is that this dissection will offer some insight into which solutions work best for a given problem so that you can make an intelligent decision.

A Look at Nanomsg and Scalability Protocols (Why ZeroMQ Shouldn’t Be Your First Choice)

Earlier this month, I explored ZeroMQ and how it proves to be a promising solution for building fast, high-throughput, and scalable distributed systems. Despite lending itself quite well to these types of problems, ZeroMQ is not without its flaws. Its creators have attempted to rectify many of these shortcomings through spiritual successors Crossroads I/O and nanomsg.

The now-defunct Crossroads I/O is a proper fork of ZeroMQ with the true intention being to build a viable commercial ecosystem around it. Nanomsg, however, is a reimagining of ZeroMQ—a complete rewrite in C1. It builds upon ZeroMQ’s rock-solid performance characteristics while providing several vital improvements, both internal and external. It also attempts to address many of the strange behaviors that ZeroMQ can often exhibit. Today, I’ll take a look at what differentiates nanomsg from its predecessor and implement a use case for it in the form of service discovery.

Nanomsg vs. ZeroMQ

A common gripe people have with ZeroMQ is that it doesn’t provide an API for new transport protocols, which essentially limits you to TCP, PGM, IPC, and ITC. Nanomsg addresses this problem by providing a pluggable interface for transports and messaging protocols. This means support for new transports (e.g. WebSockets) and new messaging patterns beyond the standard set of PUB/SUB, REQ/REP, etc.

Nanomsg is also fully POSIX-compliant, giving it a cleaner API and better compatibility. No longer are sockets represented as void pointers and tied to a context—simply initialize a new socket and begin using it in one step. With ZeroMQ, the context internally acts as a storage mechanism for global state and, to the user, as a pool of I/O threads. This concept has been completely removed from nanomsg.

In addition to POSIX compliance, nanomsg is hoping to be interoperable at the API and protocol levels, which would allow it to be a drop-in replacement for, or otherwise interoperate with, ZeroMQ and other libraries which implement ZMTP/1.0 and ZMTP/2.0. It has yet to reach full parity, however.

ZeroMQ has a fundamental flaw in its architecture. Its sockets are not thread-safe. In and of itself, this is not problematic and, in fact, is beneficial in some cases. By isolating each object in its own thread, the need for semaphores and mutexes is removed. Threads don’t touch each other and, instead, concurrency is achieved with message passing. This pattern works well for objects managed by worker threads but breaks down when objects are managed in user threads. If the thread is executing another task, the object is blocked. Nanomsg does away with the one-to-one relationship between objects and threads. Rather than relying on message passing, interactions are modeled as sets of state machines. Consequently, nanomsg sockets are thread-safe.

Nanomsg has a number of other internal optimizations aimed at improving memory and CPU efficiency. ZeroMQ uses a simple trie structure to store and match PUB/SUB subscriptions, which performs nicely for sub-10,000 subscriptions but quickly becomes unreasonable for anything beyond that number. Nanomsg uses a space-optimized trie called a radix tree to store subscriptions. Unlike its predecessor, the library also offers a true zero-copy API which greatly improves performance by allowing memory to be copied from machine to machine while completely bypassing the CPU.

ZeroMQ implements load balancing using a round-robin algorithm. While it provides equal distribution of work, it has its limitations. Suppose you have two datacenters, one in New York and one in London, and each site hosts instances of “foo” services. Ideally, a request made for foo from New York shouldn’t get routed to the London datacenter and vice versa. With ZeroMQ’s round-robin balancing, this is entirely possible unfortunately. One of the new user-facing features that nanomsg offers is priority routing for outbound traffic. We avoid this latency problem by assigning priority one to foo services hosted in New York for applications also hosted there. Priority two is then assigned to foo services hosted in London, giving us a failover in the event that foos in New York are unavailable.

Additionally, nanomsg offers a command-line tool for interfacing with the system called nanocat. This tool lets you send and receive data via nanomsg sockets, which is useful for debugging and health checks.

Scalability Protocols

Perhaps most interesting is nanomsg’s philosophical departure from ZeroMQ. Instead of acting as a generic networking library, nanomsg intends to provide the “Lego bricks” for building scalable and performant distributed systems by implementing what it refers to as “scalability protocols.” These scalability protocols are communication patterns which are an abstraction on top of the network stack’s transport layer. The protocols are fully separated from each other such that each can embody a well-defined distributed algorithm. The intention, as stated by nanomsg’s author Martin Sustrik, is to have the protocol specifications standardized through the IETF.

Nanomsg currently defines six different scalability protocols: PAIR, REQREP, PIPELINE, BUS, PUBSUB, and SURVEY.

PAIR (Bidirectional Communication)

PAIR implements simple one-to-one, bidirectional communication between two endpoints. Two nodes can send messages back and forth to each other.

REQREP (Client Requests, Server Replies)

The REQREP protocol defines a pattern for building stateless services to process user requests. A client sends a request, the server receives the request, does some processing, and returns a response.

PIPELINE (One-Way Dataflow)

PIPELINE provides unidirectional dataflow which is useful for creating load-balanced processing pipelines. A producer node submits work that is distributed among consumer nodes.

BUS (Many-to-Many Communication)

BUS allows messages sent from each peer to be delivered to every other peer in the group.

PUBSUB (Topic Broadcasting)

PUBSUB allows publishers to multicast messages to zero or more subscribers. Subscribers, which can connect to multiple publishers, can subscribe to specific topics, allowing them to receive only messages that are relevant to them.

SURVEY (Ask Group a Question)

The last scalability protocol, and the one in which I will further examine by implementing a use case with, is SURVEY. The SURVEY pattern is similar to PUBSUB in that a message from one node is broadcasted to the entire group, but where it differs is that each node in the group responds to the message. This opens up a wide variety of applications because it allows you to quickly and easily query the state of a large number of systems in one go. The survey respondents must respond within a time window configured by the surveyor.

Implementing Service Discovery

As I pointed out, the SURVEY protocol has a lot of interesting applications. For example:

  • What data do you have for this record?
  • What price will you offer for this item?
  • Who can handle this request?

To continue exploring it, I will implement a basic service-discovery pattern. Service discovery is a pretty simple question that’s well-suited for SURVEY: what services are out there? Our solution will work by periodically submitting the question. As services spin up, they will connect with our service discovery system so they can identify themselves. We can tweak parameters like how often we survey the group to ensure we have an accurate list of services and how long services have to respond.

This is great because 1) the discovery system doesn’t need to be aware of what services there are—it just blindly submits the survey—and 2) when a service spins up, it will be discovered and if it dies, it will be “undiscovered.”

Here is the ServiceDiscovery class:

The discover method submits the survey and then collects the responses. Notice we construct a SURVEYOR socket and set the SURVEYOR_DEADLINE option on it. This deadline is the number of milliseconds from when a survey is submitted to when a response must be received—adjust it accordingly based on your network topology. Once the survey deadline has been reached, a NanoMsgAPIError is raised and we break the loop. The resolve method will take the name of a service and randomly select an available provider from our discovered services.

We can then wrap ServiceDiscovery with a daemon that will periodically run discover.

The discovery parameters are configured through environment variables which I inject into a Docker container.

Services must connect to the discovery system when they start up. When they receive a survey, they should respond by identifying what service they provide and where the service is located. One such service might look like the following:

Once again, we configure parameters through environment variables set on a container. Note that we connect to the discovery system with a RESPONDENT socket which then responds to service queries with the service name and address. The service itself uses a REP socket that simply responds to any requests with “The answer is 42,” but it could take any number of forms such as HTTP, raw socket, etc.

The full code for this example, including Dockerfiles, can be found on GitHub.

Nanomsg or ZeroMQ?

Based on all the improvements that nanomsg makes on top of ZeroMQ, you might be wondering why you would use the latter at all. Nanomsg is still relatively young. Although it has numerous language bindings, it hasn’t reached the maturity of ZeroMQ which has a thriving development community. ZeroMQ has extensive documentation and other resources to help developers make use of the library, while nanomsg has very little. Doing a quick Google search will give you an idea of the difference (about 500,000 results for ZeroMQ to nanomsg’s 13,500).

That said, nanomsg’s improvements and, in particular, its scalability protocols make it very appealing. A lot of the strange behaviors that ZeroMQ exposes have been resolved completely or at least mitigated. It’s actively being developed and is quickly gaining more and more traction. Technically, nanomsg has been in beta since March, but it’s starting to look production-ready if it’s not there already.

  1. The author explains why he should have originally written ZeroMQ in C instead of C++. []

Distributed Messaging with ZeroMQ

“A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable.” -Leslie Lamport

With the increased prevalence and accessibility of cloud computing, distributed systems architecture has largely supplanted more monolithic constructs. The implication of using a service-oriented architecture, of course, is that you now have to deal with a myriad of difficulties that previously never existed, such as fault tolerance, availability, and horizontal scaling. Another interesting layer of complexity is providing consistency across nodes, which itself is a problem surrounded with endless research. Algorithms like Paxos and Raft attempt to provide solutions for managing replicated data, while other solutions offer eventual consistency.

Building scalable, distributed systems is not a trivial feat, but it pales in comparison to building real-time systems of a similar nature. Distributed architecture is a well-understood problem and the fact is, most applications have a high tolerance for latency. Few systems have a demonstrable need for real-time communication, but the few that do present an interesting challenge for developers. In this article, I explore the use of ZeroMQ to approach the problem of distributed, real-time messaging in a scalable manner while also considering the notion of eventual consistency.

The Intelligent Transport Layer

ZeroMQ is a high-performance asynchronous messaging library written in C++. It’s not a dedicated message broker but rather an embeddable concurrency framework with support for direct and fan-out endpoint connections over a variety of transports. ZeroMQ implements a number of different communication patterns like request-reply, pub-sub, and push-pull through TCP, PGM (multicast), in-process, and inter-process channels. The glaring lack of UDP support is, more or less, by design because ZeroMQ was conceived to provide guaranteed-ish delivery of atomic messages. The library makes no actual guarantee of delivery, but it does make a best effort. What ZeroMQ does guarantee, however, is that you will never receive a partial message, and messages will be received in order. This is important because UDP’s performance gains really only manifest themselves in lossy or congested environments.

The comprehensive list of messaging patterns and transports alone make ZeroMQ an appealing choice for building distributed applications, but it particularly excels due to its reliability, scalability and high throughput. ZeroMQ and related technologies are popular within high-frequency trading, where packet loss of financial data is often unacceptable1. In 2011, CERN actually performed a study comparing CORBA, Ice, Thrift, ZeroMQ, and several other protocols for use in its particle accelerators and ranked ZeroMQ the highest.

cern

ZeroMQ uses some tricks that allow it to actually outperform TCP sockets in terms of throughput such as intelligent message batching, minimizing network-stack traversals, and disabling Nagle’s algorithm. By default (and when possible), messages are queued on the subscriber, which attempts to avoid the problem of slow subscribers. However, when this isn’t sufficient, ZeroMQ employs a pattern called the “Suicidal Snail.” When a subscriber is running slow and is unable to keep up with incoming messages, ZeroMQ convinces the subscriber to kill itself. “Slow” is determined by a configurable high-water mark. The idea here is that it’s better to fail fast and allow the issue to be resolved quickly than to potentially allow stale data to flow downstream. Again, think about the high-frequency trading use case.

A Distributed, Scalable, and Fast Messaging Architecture

ZeroMQ makes a convincing case for use as a transport layer. Let’s explore a little deeper to see how it could be used to build a messaging framework for use in a real-time system. ZeroMQ is fairly intuitive to use and offers a plethora of bindings for various languages, so we’ll focus more on the architecture and messaging paradigms than the actual code.

About a year ago, while I first started investigating ZeroMQ, I built a framework to perform real-time messaging and document syncing called Zinc. A “document,” in this sense, is any well-structured and mutable piece of data—think text document, spreadsheet, canvas, etc. While purely academic, the goal was to provide developers with a framework for building rich, collaborative experiences in a distributed manner.

The framework actually had two implementations, one backed by the native ZeroMQ, and one backed by the pure Java implementation, JeroMQ2. It was really designed to allow any transport layer to be used though.

Zinc is structured around just a few core concepts: Endpoints, ChannelListeners, MessageHandlers, and Messages. An Endpoint represents a single node in an application cluster and provides functionality for sending and receiving messages to and from other Endpoints. It has outbound and inbound channels for transmitting messages to peers and receiving them, respectively.

endpoint

ChannelListeners essentially act as daemons listening for incoming messages when the inbound channel is open on an Endpoint. When a message is received, it’s passed to a thread pool to be processed by a MessageHandler. Therefore, Messages are processed asynchronously in the order they are received, and as mentioned earlier, ZeroMQ guarantees in-order message delivery. As an aside, this is before I began learning Go, which would make for an ideal replacement for Java here as it’s quite well-suited to the problem :)

Messages are simply the data being exchanged between Endpoints, from which we can build upon with Documents and DocumentFragments. A Document is the structured data defined by an application, while DocumentFragment represents a partial Document, or delta, which can be as fine- or coarse- grained as needed.

Zinc is built around the publish-subscribe and push-pull messaging patterns. One Endpoint will act as the host of a cluster, while the others act as clients. With this architecture, the host acts as a publisher and the clients as subscribers. Thus, when a host fires off a Message, it’s delivered to every subscribing client in a multicast-like fashion. Conversely, clients also act as “push” Endpoints with the host being a “pull” Endpoint. Clients can then push Messages into the host’s Message queue from which the host is pulling from in a first-in-first-out manner.

This architecture allows Messages to be propagated across the entire cluster—a client makes a change which is sent to the host, who propagates this delta to all clients. This means that the client who initiated the change will receive an “echo” delta, but it will be discarded by checking the Message origin, a UUID which uniquely identifies an Endpoint. Clients are then responsible for preserving data consistency if necessary, perhaps through operational transformation or by maintaining a single source of truth from which clients can reconcile.

cluster

One of the advantages of this architecture is that it scales reasonably well due to its composability. Specifically, we can construct our cluster as a tree of clients with arbitrary breadth and depth. Obviously, the more we scale horizontally or vertically, the more latency we introduce between edge nodes. Coupled with eventual consistency, this can cause problems for some applications but might be acceptable to others.

scalability

The downside is this inherently introduces a single point of failure characterized by the client-server model. One solution might be to promote another node when the host fails and balance the tree.

Once again, this framework was mostly academic and acted as a way for me to test-drive ZeroMQ, although there are some other interesting applications of it. Since the framework supports multicast message delivery via push-pull or publish-subscribe mechanisms, one such use case is autonomous load balancing.

Paired with something like ZooKeeper, etcd, or some other service-discovery protocol, clients would be capable of discovering hosts, who act as load balancers. Once a client has discovered a host, it can request to become a part of that host’s cluster. If the host accepts the request, the client can begin to send messages to the host (and, as a result, to the rest of the cluster) and, likewise, receive messages from the host (and the rest of the cluster). This enables clients and hosts to submit work to the cluster such that it’s processed in an evenly distributed way, and workers can determine whether to pass work on further down the tree or process it themselves. Clients can choose to participate in load-balancing clusters at their own will and when they become available, making them mostly autonomous. Clients could then be quickly spun-up and spun-down using, for example, Docker containers.

ZeroMQ is great for achieving reliable, fast, and scalable distributed messaging, but it’s equally useful for performing parallel computation on a single machine or several locally networked ones by facilitating in- and inter- process communication using the same patterns. It also scales in the sense that it can effortlessly leverage multiple cores on each machine. ZeroMQ is not a replacement for a message broker, but it can work in unison with traditional message-oriented middleware. Combined with Protocol Buffers and other serialization methods, ZeroMQ makes it easy to build extremely high-throughput messaging frameworks.

  1. ZeroMQ’s founder, iMatix, was responsible for moving JPMorgan Chase and the Dow Jones Industrial Average trading platforms to OpenAMQ []
  2. In systems where near real-time is sufficient, JeroMQ is adequate and benefits by not requiring any native linking. []