Benchmarking Message Queue Latency

About a year and a half ago, I published Dissecting Message Queues, which broke down a few different messaging systems and did some performance benchmarking. It was a naive attempt and had a lot of problems, but it was also my first time doing any kind of system benchmarking. It turns out benchmarking systems correctly is actually pretty difficult and many folks get it wrong. I don’t claim to have gotten it right, but over the past year and a half I’ve learned a lot, tried to build some better tools, and improve my methodology.

Tooling and Methodology

The Dissecting Message Queues benchmarks used a framework I wrote which published a specified number of messages effectively as fast as possible, received them, and recorded the end-to-end latency. There are several problems with this. First, load generation and consumption run on the same machine. Second, the system under test runs on the same machine as the benchmark client—both of these confound measurements. Third, running “pedal to the metal” and looking at the resulting latency isn’t a very useful benchmark because it’s not representative of a production environment (as Gil Tene likes to say, this is like driving your car as fast as possible, crashing it into a pole, and looking at the shape of the bumper afterwards—it’s always going to look bad). Lastly, the benchmark recorded average latency, which, for all intents and purposes, is a useless metric to look at.

I wrote Flotilla to automate “scaled-up” benchmarking—running the broker and benchmark clients on separate, distributed VMs. Flotilla also attempted to capture a better view of latency by looking at the latency distribution, though it only went up to the 99th percentile, which can sweep a lot of really bad things under the rug as we’ll see later. However, it still ran tests at full throttle, which isn’t great.

Bench is an attempt to get back to basics. It’s a simple, generic benchmarking library for measuring latency. It provides a straightforward Requester interface which can be implemented for various systems under test. Bench works by attempting to issue a fixed rate of requests per second and measuring the latency of each request issued synchronously. Latencies are captured using HDR Histogram, which observes the complete latency distribution and allows us to look, for example, at “six nines” latency.

Introducing a request schedule allows us to measure latency for different configurations of request rate and message size, but in a “closed-loop” test, it creates another problem called coordinated omission. The problem with a lot of benchmarks is that they end up measuring service time rather than response time, but the latter is likely what you care about because it’s what your users experience.

The best way to describe service time vs. response time is to think of a cash register. The cashier might be able to ring up a customer in under 30 seconds 99% of the time, but 1% of the time it takes three minutes. The time it takes to ring up a customer is the service time, while the response time consists of the service time plus the time the customer waited in line. Thus, the response time is dependent upon the variation in both service time and the rate of arrival. When we measure latency, we really want to measure response time.

Now, let’s think about how most latency benchmarks work. They usually do this:

  1. Note timestamp before request, t0.
  2. Make synchronous request.
  3. Note timestamp after request, t1.
  4. Record latency t1t0.
  5. Repeat as needed for request schedule.

What’s the problem with this? Nothing, as long as our requests fit within the specified request schedule.  For example, if we’re issuing 100 requests per second and each request takes 10 ms to complete, we’re good. However, if one request takes 100 ms to complete, that means we issued only one request during those 100 ms when, according to our schedule, we should have issued 10 requests in that window. Nine other requests should have been issued, but the benchmark effectively coordinated with the system under test by backing off. In reality, those nine requests waited in line—one for 100 ms, one for 90 ms, one for 80 ms, etc. Most benchmarks don’t capture this time spent waiting in line, yet it can have a dramatic effect on the results. The graph below shows the same benchmark with coordinated omission both uncorrected (red) and corrected (blue):
coordinated_omission

HDR Histogram attempts to correct coordinated omission by filling in additional samples when a request falls outside of its expected interval. We can also deal with coordinated omission by simply avoiding it altogether—always issue requests according to the schedule.

Message Queue Benchmarks

I benchmarked several messaging systems using bench—RabbitMQ (3.6.0), Kafka (0.8.2.2 and 0.9.0.0), Redis (2.8.4) pub/sub, and NATS (0.7.3). In this context, a “request” consists of publishing a message to the server and waiting for a response (i.e. a roundtrip). We attempt to issue requests at a fixed rate and correct for coordinated omission, then plot the complete latency distribution all the way up to the 99.9999th percentile. We repeat this for several configurations of request rate and request size. It’s also important to note that each message going to and coming back from the server are of the specified size, i.e. the “response” is the same size as the “request.”

The configurations used are listed below. Each configuration is run for a sustained 30 seconds.

  • 256B requests at 3,000 requests/sec (768 KB/s)
  • 1KB requests at 3,000 requests/sec (3 MB/s)
  • 5KB requests at 2,000 requests/sec (10 MB/s)
  • 1KB requests at 20,000 requests/sec (20.48 MB/s)
  • 1MB requests at 100 requests/sec (100 MB/s)

These message sizes are mostly arbitrary, and there might be a better way to go about this. Though I think it’s worth pointing out that the Ethernet MTU is 1500 bytes, so accounting for headers, the maximum amount of data you’ll get in a single TCP packet will likely be between 1400 and 1500 bytes.

The system under test and benchmarking client are on two different m4.xlarge EC2 instances (2.4 GHz Intel Xeon Haswell, 16GB RAM) with enhanced networking enabled.

Redis and NATS

Redis pub/sub and NATS have similar performance characteristics. Both offer very lightweight, non-transactional messaging with no persistence options (discounting Redis’ RDB and AOF persistence, which don’t apply to pub/sub), and both support some level of topic pattern matching. I’m hesitant to call either a “message queue” in the traditional sense, so I usually just refer to them as message brokers or buses. Because of their ephemeral nature, both are a nice choice for low-latency, lossy messaging.

Redis tail latency peaks around 1.5 ms.

Redis_latency

NATS performance looks comparable to Redis. Latency peaks around 1.2 ms.

NATS_latency

The resemblance becomes more apparent when we overlay the two distributions for the 1KB and 5KB runs. NATS tends to be about 0.1 to 0.4 ms faster.

Redis_NATS_latency

The 1KB, 20,000 requests/sec run uses 25 concurrent connections. With concurrent load, tail latencies jump up, peaking around 90 and 120 ms at the 99.9999th percentile in NATS and Redis, respectively.

Redis_NATS_1KB_20000_latency

Large messages (1MB) don’t hold up nearly as well, exhibiting large tail latencies starting around the 95th and 97th percentiles in NATS and Redis, respectively. 1MB is the default maximum message size in NATS. The latency peaks around 214 ms. Again, keep in mind these are synchronous, roundtrip latencies.

Redis_NATS_1MB_latency

Apcera’s Ivan Kozlovic pointed out that the version of the NATS client I was using didn’t include a recent performance optimization. Before, the protocol parser scanned over each byte in the payload, but the newer version skips to the end (the previous benchmarks were updated to use the newer version). The optimization does have a noticeable effect, illustrated below. There was about a 30% improvement with the 5KB latencies.

NATS_optimization_latency

The difference is even more pronounced in the 1MB case, which has roughly a 90% improvement up to the 90th percentile. The linear scale in the graph below hides this fact, but at the 90th percentile, for example, the pre-optimization latency is 10 ms and the optimized latency is 3.8 ms. Clearly, the large tail is mostly unaffected, however.

NATS_1MB_optimization_latency

In general, this shows that NATS and Redis are better suited to smaller messages (well below 1MB), in which latency tends to be sub-millisecond up to four nines.

RabbitMQ and Kafka

RabbitMQ is a popular AMQP implementation. Unlike NATS, it’s a more traditional message queue in the sense that it supports binding queues and transactional-delivery semantics. Consequently, RabbitMQ is a more “heavyweight” queuing solution and tends to pay an additional premium with latency. In this benchmark, non-durable queues were used. As a result, we should see reduced latencies since we aren’t going to disk.

RabbitMQ_latency

Latency tends to be sub-millisecond up to the 99.7th percentile, but we can see that it doesn’t hold up to NATS beyond that point for the 1KB and 5KB payloads.

RabbitMQ_NATS_latency

Kafka, on the other hand, requires disk persistence, but this doesn’t have a dramatic effect on latency until we look at the 94th percentile and beyond, when compared to RabbitMQ. Writes should be to page cache with flushes to disk happening asynchronously. The graphs below are for 0.8.2.2.

Kafka_latency

RabbitMQ_Kafka_latency

Once again, the 1KB, 20,000 requests/sec run is distributed across 25 concurrent connections. With RabbitMQ, we see the dramatic increase in tail latencies as we did with Redis and NATS. The RabbitMQ latencies in the concurrent case stay in line with the previous latencies up to about the 99th percentile. Interestingly, Kafka, doesn’t appear to be significantly affected. The latencies of 20,000 requests/sec at 1KB per request are not terribly different than the latencies of 3,000 requests/sec at 1KB per request, both peaking around 250 ms.

RabbitMQ_Kafka_1KB_20000_latency

What’s particularly interesting is the behavior of 1MB messages vs. the rest. With RabbitMQ, there’s almost a 14x difference in max latencies between the 5KB and 1MB runs with 1MB being the faster. With Kafka 0.8.2.2, the difference is over 126x in the same direction. We can plot the 1MB latencies for RabbitMQ and Kafka since it’s difficult to discern them with a linear scale.

RabbitMQ_Kafka_1MB_latency

tried to understand what was causing this behavior. I’ve yet to find a reasonable explanation for RabbitMQ. Intuition tells me it’s a result of buffering—either at the OS level or elsewhere—and the large messages cause more frequent flushing. Remember that these benchmarks were with transient publishes. There should be no disk accesses occurring, though my knowledge of Rabbit’s internals are admittedly limited. The fact that this behavior occurs in RabbitMQ and not Redis or NATS seems odd. Nagle’s algorithm is disabled in all of the benchmarks (TCP_NODELAY). After inspecting packets with Wireshark, it doesn’t appear to be a problem with delayed acks.

To show just how staggering the difference is, we can plot Kafka 0.8.2.2 and RabbitMQ 1MB latencies alongside Redis and NATS 5KB latencies. They are all within the same ballpark. Whatever the case may be, both RabbitMQ and Kafka appear to handle large messages extremely well in contrast to Redis and NATS.

RabbitMQ_Kafka_NATS_Redis_latency

This leads me to believe you’ll see better overall throughput, in terms of raw data, with RabbitMQ and Kafka, but more predictable, tighter tail latencies with Redis and NATS. Where SLAs are important, it’s hard to beat NATS. Of course, it’s unfair to compare Kafka with something like NATS or Redis or even RabbitMQ since they are very different (and sometimes complementary), but it’s also worth pointing out that the former is much more operationally complex.

However, benchmarking Kafka 0.9.0.0 (blue and red) shows an astounding difference in tail latencies compared to 0.8.2.2 (orange and green).

Kafka_0_8_0_9_latency

Kafka 0.9’s performance is much more in line with RabbitMQ’s at high percentiles as seen below.

RabbitMQ_Kafka_0_9_latency

Likewise, it’s a much closer comparison to NATS when looking at the 1KB and 5KB runs.

Kafka_NATS_latency

As with 0.8, Kafka 0.9 does an impressive job dealing with 1MB messages in comparison to NATS, especially when looking at the 92nd percentile and beyond. It’s hard to decipher in the graph below, but Kafka 0.9’s 99th, 99.9th, and 99.99th percentile latencies are 0.66, 0.78, and 1.35 ms, respectively.

Kafka_0_9_NATS_1MB

My initial thought was that the difference between Kafka 0.8 and 0.9 was attributed to a change in fsync behavior. To quote the Kafka documentation:

Kafka always immediately writes all data to the filesystem and supports the ability to configure the flush policy that controls when data is forced out of the OS cache and onto disk using the and flush. This flush policy can be controlled to force data to disk after a period of time or after a certain number of messages has been written.

However, there don’t appear to be any changes in the default flushing configuration between 0.8 and 0.9. The default configuration disables application fsync entirely, instead relying on the OS’s background flush. Jay Kreps indicates it’s a result of several “high percentile latency issues” that were fixed in 0.9. After scanning the 0.9 release notes, I was unable to determine specifically what those fixes might be. Either way, the difference is certainly not something to scoff at.

Conclusion

As always, interpret these benchmark results with a critical eye and perform your own tests if you’re evaluating these systems. This was more an exercise in benchmark methodology and tooling than an actual system analysis (and, as always, there’s still a lot of room for improvement). If anything, I think these results show how much we can miss by not looking beyond the 99th percentile. In almost all cases, everything looks pretty good up to that point, but after that things can get really bad. This is important to be conscious of when discussing SLAs.

I think the key takeaway is to consider your expected load in production, benchmark configurations around that, determine your allowable service levels, and iterate or provision more resources until you’re within those limits. The other important takeaway with respect to benchmarking is to look at the complete latency distribution. Otherwise, you’re not getting a clear picture of how your system actually behaves.

Not Invented Here

Engineers love engineering things. The reason is self-evident (and maybe self-fulfilling—why else would you be an engineer?). We like to think we’re pretty good at solving problems. Unfortunately, this mindset can, on occasion, yield undesirable consequences which might not be immediately apparent but all the while damaging.

Developers are all in tune with the idea of “don’t reinvent the wheel,” but it seems to be eschewed sometimes, deliberately or otherwise. People don’t generally write their own merge sort, so why would they write their own consensus protocol? Anecdotally speaking, they do.

Not-Invented-Here Syndrome is a very real thing. In many cases, consciously or not, it’s a cultural problem. In others, it’s an engineering one. Camille Fournier’s blog post on ZooKeeper helps to illustrate this point and provide some context. In it, she describes why some distributed systems choose to rely on external services, such as ZooKeeper, for distributed coordination, while others build in their own coordination logic.

We draw a parallel between distributed systems and traditional RDBMSs, which typically implement their own file system and other low-level facilities. Why? Because it’s their competitive advantage. SQL databases sell because they offer finely tuned performance, and in order to do that, they need to control these things that the OS otherwise provides. Distributed databases like Riak sell because they own the coordination logic, which helps promote their competitive advantage. This follows what Joel Spolsky says about NIH Syndrome in that “if it’s a core business function—do it yourself, no matter what.”

If you’re developing a computer game where the plot is your competitive advantage, it’s OK to use a third party 3D library. But if cool 3D effects are going to be your distinguishing feature, you had better roll your own.

This makes a lot of sense. My sorting algorithm is unlikely to provide me with a competitive edge, but something else might, even if it’s not particularly novel.

So in some situations, homegrown is justifiable, but that’s not always the case. Redis’ competitive advantage is its predictably low latencies and data structures. Does it make sense for it to implement its own clustering and leader election protocols? Maybe, but this is where NIH can bite you. If what you’re doing is important and there’s precedent, lean on existing research and solutions. Most would argue write safety is important, and there is certainly precedent for leader election. Why not leverage that work? Things like Raft, Paxos, and Zab provide solutions which are proven using formal methods and are peer reviewed. That doesn’t mean new solutions can’t be developed, but they generally require model checking and further scrutiny to ensure correctness. Otherwise, you’ll inevitably run into problems. Implementing our own solutions can provide valuable insight, but leave them at home if they’re not rigorously approached. Rolling your own and calling it “good enough” is dishonest to your users if it’s not properly communicated.

Elasticsearch is another interesting case to look at. You might say Elasticsearch’s competitive advantage is its full-text search engine, but it’s not. Like Solr, it’s built on Lucene. Elasticsearch was designed from the ground-up to be distributed. This is what gives it a leg up over Solr and other similar search servers where horizontal scaling and fault tolerance were essentially tacked on. In a way, this resembles what happened with Redis, where failover and clustering were introduced as an afterthought. However, unlike Redis, which chose to implement its own failover coordination and cluster-membership protocol, Solr opted to use ZooKeeper as an external coordinator.

We see that Elasticsearch’s core advantage is its distributed nature. Following that notion, it makes sense for it to own that coordination, which is why its designers chose to implement their own internal cluster membership, ZenDisco. But it turns out writing cluster-membership protocols is really fucking hard, and unless you’ve written proofs for it, you probably shouldn’t do it at all. The analogy here would be writing your own encryption algorithm—there’s tons of institutional knowledge which has laid the groundwork for solutions which are well-researched and well-understood. That knowledge should be embraced in situations like this.

I don’t mean to pick on Redis and Elasticsearch. They’re both excellent systems, but they serve as good examples for this discussion. The problem is that users of these systems tend to overlook the issues exposed by this mentality. Frankly, few people would know problems exist unless they are clearly documented by vendors (and not sales people) and even then, how many people actually read the docs cover-to-cover? It’s essential we know a system’s shortcomings and edge cases so we can recognize which situations to apply it and, more important, which we should not.

You don’t have to rely on an existing third-party library or service. Believe it or not, this isn’t a sales pitch for ZooKeeper. If it’s a core business function, it probably makes sense to build it yourself as Joel describes. What doesn’t make sense, however, is to build out whatever that is without being cognizant of conventional wisdom. I’m amazed at how often people are willing to throw away institutional knowledge, either because they don’t seek it out or they think they can do better (without formal verification). If I have seen further, it is by standing on the shoulders of giants.

Dissecting Message Queues

Disclaimer (10/29/20) – The benchmarks and performance analysis presented in this post should not be relied on. This post was written roughly six years ago, and at the time, was just the result of my exploration of various messaging systems. The benchmarks are not implemented in a meaningful way, which I discussed in a follow-up post. This post will remain for posterity and learning purposes, but I do not claim that this information is accurate or useful.

Continuing my series on message queues, I spent this weekend dissecting various libraries for performing distributed messaging. In this analysis, I look at a few different aspects, including API characteristics, ease of deployment and maintenance, and performance qualities. The message queues have been categorized into two groups: brokerless and brokered. Brokerless message queues are peer-to-peer such that there is no middleman involved in the transmission of messages, while brokered queues have some sort of server in between endpoints.

The systems I’ll be analyzing are:

Brokerless
nanomsg
ZeroMQ

Brokered
ActiveMQ
NATS
Kafka
Kestrel
NSQ
RabbitMQ
Redis
ruby-nats

To start, let’s look at the performance metrics since this is arguably what people care the most about. I’ve measured two key metrics: throughput and latency. All tests were run on a MacBook Pro 2.6 GHz i7, 16GB RAM. These tests are evaluating a publish-subscribe topology with a single producer and single consumer. This provides a good baseline. It would be interesting to benchmark a scaled-up topology but requires more instrumentation.

The code used for benchmarking, written in Go, is available on GitHub. The results below shouldn’t be taken as gospel as there are likely optimizations that can be made to squeeze out performance gains. Pull requests are welcome.

Throughput Benchmarks

Throughput is the number of messages per second the system is able to process, but what’s important to note here is that there is no single “throughput” that a queue might have. We’re sending messages between two different endpoints, so what we observe is a “sender” throughput and a “receiver” throughput—that is, the number of messages that can be sent per second and the number of messages that can be received per second.

This test was performed by sending 1,000,000 1KB messages and measuring the time to send and receive on each side. Many performance tests tend to use smaller messages in the range of 100 to 500 bytes. I chose 1KB because it’s more representative of what you might see in a production environment, although this varies case by case. For message-oriented middleware systems, only one broker was used. In most cases, a clustered environment would yield much better results.

Unsurprisingly, there’s higher throughput on the sending side. What’s interesting, however, is the disparity in the sender-to-receiver ratios. ZeroMQ is capable of sending over 5,000,000 messages per second but is only able to receive about 600,000/second. In contrast, nanomsg sends shy of 3,000,000/second but can receive almost 2,000,000.

Now let’s take a look at the brokered message queues.

Intuitively, we observe that brokered message queues have dramatically less throughput than their brokerless counterparts by a couple orders of magnitude for the most part. Half the brokered queues have a throughput below 25,000 messages/second. The numbers for Redis might be a bit misleading though. Despite providing pub/sub functionality, it’s not really designed to operate as a robust messaging queue. In a similar fashion to ZeroMQ, Redis disconnects slow clients, and it’s important to point out that it was not able to reliably handle this volume of messaging. As such, we consider it an outlier. Kafka and ruby-nats have similar performance characteristics to Redis but were able to reliably handle the message volume without intermittent failures. The Go implementation of NATS, gnatsd, has exceptional throughput for a brokered message queue.

Outliers aside, we see that the brokered queues have fairly uniform throughputs. Unlike the brokerless libraries, there is little-to-no disparity in the sender-to-receiver ratios, which themselves are all very close to one.

Latency Benchmarks

The second key performance metric is message latency. This measures how long it takes for a message to be transmitted between endpoints. Intuition might tell us that this is simply the inverse of throughput, i.e. if throughput is messages/second, latency is seconds/message. However, by looking closely at this image borrowed from a ZeroMQ white paper, we can see that this isn’t quite the case.

The reality is that the latency per message sent over the wire is not uniform. It can vary wildly for each one. In truth, the relationship between latency and throughput is a bit more involved. Unlike throughput, however, latency is not measured at the sender or the receiver but rather as a whole. But since each message has its own latency, we will look at the averages of all of them. Going further, we will see how the average message latency fluctuates in relation to the number of messages sent. Again, intuition tells us that more messages means more queueing, which means higher latency.

As we did before, we’ll start by looking at the brokerless systems.

In general, our hypothesis proves correct in that, as more messages are sent through the system, the latency of each message increases. What’s interesting is the tapering at the 500,000-point in which latency appears to increase at a slower rate as we approach 1,000,000 messages. Another interesting observation is the initial spike in latency between 1,000 and 5,000 messages, which is more pronounced with nanomsg. It’s difficult to pinpoint causation, but these changes might be indicative of how message batching and other network-stack traversal optimizations are implemented in each library. More data points may provide better visibility.

We see some similar patterns with brokered queues and also some interesting new ones.

Redis behaves in a similar manner as before, with an initial latency spike and then a quick tapering off. It differs in that the tapering becomes essentially constant right after 5,000 messages. NSQ doesn’t exhibit the same spike in latency and behaves, more or less, linearly. Kestrel fits our hypothesis.

Notice that ruby-nats and NATS hardly even register on the chart. They exhibited surprisingly low latencies and unexpected relationships with the number of messages.

Interestingly, the message latencies for ruby-nats and NATS appear to be constant. This is counterintuitive to our hypothesis.

You may have noticed that Kafka, ActiveMQ, and RabbitMQ were absent from the above charts. This was because their latencies tended to be orders-of-magnitude higher than the other brokered message queues, so ActiveMQ and RabbitMQ were grouped into their own AMQP category. I’ve also included Kafka since it’s in the same ballpark.

Here we see that RabbitMQ’s latency is constant, while ActiveMQ and Kafka are linear. What’s unclear is the apparent disconnect between their throughput and mean latencies.

Qualitative Analysis

Now that we’ve seen some empirical data on how these different libraries perform, I’ll take a look at how they work from a pragmatic point of view. Message throughput and speed is important, but it isn’t very practical if the library is difficult to use, deploy, or maintain.

ZeroMQ and Nanomsg

Technically speaking, nanomsg isn’t a message queue but rather a socket-style library for performing distributed messaging through a variety of convenient patterns. As a result, there’s nothing to deploy aside from embedding the library itself within your application. This makes deployment a non-issue.

Nanomsg is written by one of the ZeroMQ authors, and as I discussed before, works in a very similar way to that library. From a development standpoint, nanomsg provides an overall cleaner API. Unlike ZeroMQ, there is no notion of a context in which sockets are bound to. Furthermore, nanomsg provides pluggable transport and messaging protocols, which make it more open to extension. Its additional built-in scalability protocols also make it quite appealing.

Like ZeroMQ, it guarantees that messages will be delivered atomically intact and ordered but does not guarantee the delivery of them. Partial messages will not be delivered, and it’s possible that some messages won’t be delivered at all. The library’s author, Martin Sustrik, makes this abundantly clear:

Guaranteed delivery is a myth. Nothing is 100% guaranteed. That’s the nature of the world we live in. What we should do instead is to build an internet-like system that is resilient in face of failures and routes around damage.

The philosophy is to use a combination of topologies to build resilient systems that add in these guarantees in a best-effort sort of way.

On the other hand, nanomsg is still in beta and may not be considered production-ready. Consequently, there aren’t a lot of resources available and not much of a development community around it.

ZeroMQ is a battle-tested messaging library that’s been around since 2007. Some may perceive it as a predecessor to nanomsg, but what nano lacks is where ZeroMQ thrives—a flourishing developer community and a deluge of resources and supporting material. For many, it’s the de facto tool for building fast, asynchronous distributed messaging systems that scale.

Like nanomsg, ZeroMQ is not a message-oriented middleware and simply operates as a socket abstraction. In terms of usability, it’s very much the same as nanomsg, although its API is marginally more involved.

ActiveMQ and RabbitMQ

ActiveMQ and RabbitMQ are implementations of AMQP. They act as brokers which ensure messages are delivered. ActiveMQ and RabbitMQ support both persistent and non-persistent delivery. By default, messages are written to disk such that they survive a broker restart. They also support synchronous and asynchronous sending of messages with the former having substantial impact on latency. To guarantee delivery, these brokers use message acknowledgements which also incurs a massive latency penalty.

As far as availability and fault tolerance goes, these brokers support clustering through shared storage or shared nothing. Queues can be replicated across clustered nodes so there is no single point of failure or message loss.

AMQP is a non-trivial protocol which its creators claim to be over-engineered. These additional guarantees are made at the expense of major complexity and performance trade-offs. Fundamentally, clients are more difficult to implement and use.

Since they’re message brokers, ActiveMQ and RabbitMQ are additional moving parts that need to be managed in your distributed system, which brings deployment and maintenance costs. The same is true for the remaining message queues being discussed.

NATS and Ruby-NATS

NATS (gnatsd) is a pure Go implementation of the ruby-nats messaging system. NATS is distributed messaging rethought to be less enterprisey and more lightweight (this is in direct contrast to systems like ActiveMQ, RabbitMQ, and others). Apcera’s Derek Collison, the library’s author and former TIBCO architect, describes NATS as “more like a nervous system” than an enterprise message queue. It doesn’t do persistence or message transactions, but it’s fast and easy to use. Clustering is supported so it can be built on top of with high availability and failover in mind, and clients can be sharded. Unfortunately, TLS and SSL are not yet supported in NATS (they are in the ruby-nats) but on the roadmap.

As we observed earlier, NATS performs far better than the original Ruby implementation. Clients can be used interchangeably with NATS and ruby-nats.

Kafka

Originally developed by LinkedIn, Kafka implements publish-subscribe messaging through a distributed commit log. It’s designed to operate as a cluster that can be consumed by large amounts of clients. Horizontal scaling is done effortlessly using ZooKeeper so that additional consumers and brokers can be introduced seamlessly. It also transparently takes care of cluster rebalancing.

Kafka uses a persistent commit log to store messages on the broker. Unlike other durable queues which usually remove persisted messages on consumption, Kafka retains them for a configured period of time. This means that messages can be “replayed” in the event that a consumer fails.

ZooKeeper makes managing Kafka clusters relatively easy, but it does introduce yet another element that needs to be maintained. That said, Kafka exposes a great API and Shopify has an excellent Go client called Sarama that makes interfacing with Kafka very accessible.

Kestrel

Kestrel is a distributed message queue open sourced by Twitter. It’s intended to be fast and lightweight. Because of this, it has no concept of clustering or failover. While Kafka is built from the ground up to be clustered through ZooKeeper, the onus of message partitioning is put upon the clients of Kestrel. There is no cross-communication between nodes. It makes this trade-off in the name of simplicity. It features durable queues, item expiration, transactional reads, and fanout queues while operating over Thrift or memcache protocols.

Kestrel is designed to be small, but this means that more work must be done by the developer to build out a robust messaging system on top of it. Kafka seems to be a more “all-in-one” solution.

NSQ

NSQ is a messaging platform built by Bitly. I use the word platform because there’s a lot of tooling built around NSQ to make it useful for real-time distributed messaging. The daemon that receives, queues, and delivers messages to clients is called nsqd. The daemon can run standalone, but NSQ is designed to run in as a distributed, decentralized topology. To achieve this, it leverages another daemon called nsqlookupd. Nsqlookupd acts as a service-discovery mechanism for nsqd instances. NSQ also provides nsqadmin, which is a web UI that displays real-time cluster statistics and acts as a way to perform various administrative tasks like clearing queues and managing topics.

By default, messages in NSQ are not durable. It’s primarily designed to be an in-memory message queue, but queue sizes can be configured such that after a certain point, messages will be written to disk. Despite this, there is no built-in replication. NSQ uses acknowledgements to guarantee message delivery, but the order of delivery is not guaranteed. Messages can also be delivered more than once, so it’s the developer’s responsibility to introduce idempotence.

Similar to Kafka, additional nodes can be added to an NSQ cluster seamlessly. It also exposes both an HTTP and TCP API, which means you don’t actually need a client library to push messages into the system. Despite all the moving parts, it’s actually quite easy to deploy. Its API is also easy to use and there are a number of client libraries available.

Redis

Last up is Redis. While Redis is great for lightweight messaging and transient storage, I can’t advocate its use as the backbone of a distributed messaging system. Its pub/sub is fast but its capabilities are limited. It would require a lot of work to build a robust system. There are solutions better suited to the problem, such as those described above, and there are also some scaling concerns with it.

These matters aside, Redis is easy to use, it’s easy to deploy and manage, and it has a relatively small footprint. Depending on the use case, it can be a great choice for real-time messaging as I’ve explored before.

Conclusion

The purpose of this analysis is not to present some sort of “winner” but instead showcase a few different options for distributed messaging. There is no “one-size-fits-all” option because it depends entirely on your needs. Some use cases require fast, fire-and-forget messages, others require delivery guarantees. In fact, many systems will call for a combination of these. My hope is that this dissection will offer some insight into which solutions work best for a given problem so that you can make an intelligent decision.

Real-Time Client Notifications Using Redis and Socket.IO

Backbone.js is great for building structured client-side applications. Its declarative event-handling makes it easy to listen for actions in the UI and keep your data model in sync, but what about changes that occur to your data model on the server? Coordinating user interfaces for data consistency isn’t a trivial problem. Take a simple example: users A and B are viewing the same data at the same time, while user A makes a change to that data. How do we propagate those changes to user B? Now, how do we do it at scale, say, several thousand concurrent users? What about external consumers of that data?

One of our products at WebFilings called for real-time notifications for a few reasons:

  1. We needed to keep users’ view of data consistent.
  2. We needed a mechanism that would alert users to changes in the web client (and allow them to subscribe/unsubscribe to certain events).
  3. We needed notifications to be easily consumable (beyond the scope of a web client, e.g. email alerts, monitoring services, etc.).

I worked on developing a pattern that would address each of these concerns while fitting within our platform’s ecosystem, giving us a path of least resistance with maximum payoff.

Polling sucks. Long-polling isn’t much better. Server-Sent Events are an improvement. They provide a less rich API than the WebSocket protocol, which supports bi-directional communication, but they do have some niceties like handling reconnects and operating over traditional HTTP. Socket.IO provides a nice wrapper around WebSockets while falling back to other transport methods when necessary. It has a rich API with features like namespaces, multiplexing, and reconnects, but it’s built on Node.js, which means it doesn’t plug into our Python stack very easily.

The solution I decided on was a library called gevent-socketio, which is a Python implementation of the Socket.IO protocol built on gevent, making it incredibly simple to hook in to our existing Flask app. 

The gevent-socketio solution really only solves a small part of the overarching problem by providing a way to broadcast messages to clients. We still need a way to hook these messages in to our Backbone application and, more important, a way to publish and subscribe to events across threads and processes. The Socket.IO dispatcher is just one of potentially many consumers after all.

The other piece of the solution is to use Redis for its excellent pubsub capabilities. Redis allows us to publish and subscribe to messages from anywhere, even from different machines. Events that occur as a result of user actions, task queues, or cron jobs can all be captured and published to any interested parties as they happen. We’re already using Redis as a caching layer, so we get this for free. The overall architecture looks something like this:

pubsub

Let’s dive into the code.

Hooking gevent-socketio into our Flask app is pretty straightforward. We essentially just wrap it with a SocketIOServer.

The other piece is registering client subscribers for notifications:

NotificationsNamespace is a Socket.IO namespace we will use to broadcast notification messages. We use gevent-socketio’s BroadcastMixin to multicast messages to clients.

When a connection is received, we spawn a greenlet that listens for messages and broadcasts them to clients in the notifications namespace. We can then build a minimal API that can be used across our application to publish notifications.

Wiring notifications up to the UI is equally simple. To facilitate communication between our Backbone components while keeping them decoupled, we use an event-dispatcher pattern relying on Backbone.Events. The pattern looks something like this:

This pattern makes it trivial for us to allow our views, collections, and models to subscribe to our Socket.IO notifications because we just have to pump the messages into the dispatcher pipeline.

Now our UI components can subscribe and react to client- and server-side events as they see fit and in a completely decoupled fashion. This makes it very easy for us to ensure our client-side views and models are updated automatically while also letting other services consume these events.