Fast Topic Matching

A common problem in messaging middleware is that of efficiently matching message topics with interested subscribers. For example, assume we have a set of subscribers, numbered 1 to 3:

Subscriber Match Request
1 forex.usd
2 forex.*
3 stock.nasdaq.msft

And we have a stream of messages, numbered 1 to N:

Message Topic
1 forex.gbp
2 stock.nyse.ibm
3 stock.nyse.ge
4 forex.eur
5 forex.usd
N stock.nasdaq.msft

We are then tasked with routing messages whose topics match the respective subscriber requests, where a “*” wildcard matches any word. This is frequently a bottleneck for message-oriented middleware like ZeroMQ, RabbitMQ, ActiveMQ, TIBCO EMS, et al. Because of this, there are a number of well-known solutions to the problem. In this post, I’ll describe some of these solutions, as well as a novel one, and attempt to quantify them through benchmarking. As usual, the code is available on GitHub.

The Naive Solution

The naive solution is pretty simple: use a hashmap that maps topics to subscribers. Subscribing involves adding a new entry to the map (or appending to a list if it already exists). Matching a message to subscribers involves scanning through every entry in the map, checking if the match request matches the message topic, and returning the subscribers for those that do.

Inserts are approximately O(1) and lookups approximately O(n*m) where n is the number of subscriptions and m is the number of words in a topic. This means the performance of this solution is heavily dependent upon how many subscriptions exist in the map and also the access patterns (rate of reads vs. writes). Since most use cases are heavily biased towards searches rather than updates, the naive solution—unsurprisingly—is not a great option.

The microbenchmark below compares the performance of subscribe, unsubscribe, and lookup (matching) operations, first using an empty hashmap (what we call cold) and then with one containing 1,000 randomly generated 5-word topic subscriptions (what we call hot). With the populated subscription map, lookups are about three orders of magnitude slower, which is why we have to use a log scale in the chart below.

subscribe unsubscribe lookup
cold 172ns 51.2ns 787ns
hot 221ns 55ns 815,787ns


Inverted Bitmap

The inverted bitmap technique builds on the observation that lookups are more frequent than updates and assumes that the search space is finite. Consequently, it shifts some of the cost from the read path to the write path. It works by storing a set of bitmaps, one per topic, or criteria, in the search space. Subscriptions are then assigned an increasing number starting at 0. We analyze each subscription to determine the matching criteria and set the corresponding bits in the criteria bitmaps to 1. For example, assume our search space consists of the following set of topics:

  • forex.usd
  • forex.gbp
  • forex.jpy
  • forex.eur
  • stock.nasdaq
  • stock.nyse

We then have the following subscriptions:

  • 0 = forex.* (matches forex.usd, forex.gbp, forex.jpy, and forex.eur)
  • 1 = stock.nyse (matches stock.nyse)
  • 2 = *.* (matches everything)
  • 3 = stock.* (matches stock.nasdaq and stock.nyse)

When we index the subscriptions above, we get the following set of bitmaps:

 Criteria 0 1 2 3
forex.usd 1 0 1 0
forex.gbp 1 0 1 0
forex.jpy 1 0 1 0
forex.eur 1 0 1 0
stock.nasdaq 0 0 1 1
stock.nyse 0 1 1 1

When we match a message, we simply need to lookup the corresponding bitmap and check the set bits. As we see below, subscribe and unsubscribe are quite expensive with respect to the naive solution, but lookups now fall well below half a microsecond, which is pretty good (the fact that the chart below doesn’t use a log scale like the one above should be an indictment of the naive hashmap-based solution).

subscribe unsubscribe lookup
cold 3,795ns 198ns 380ns
hot 3,863ns 198ns 395ns

The inverted bitmap is a better option than the hashmap when we have a read-heavy workload. One limitation is it requires us to know the search space ahead of time or otherwise requires reindexing which, frankly, is prohibitively expensive.

Optimized Inverted Bitmap

The inverted bitmap technique works well enough, but only if the topic space is fairly static. It also falls over pretty quickly when the topic space and number of subscriptions are large, say, millions of topics and thousands of subscribers. The main benefit of topic-based routing is it allows for faster matching algorithms in contrast to content-based routing, which can be exponentially slower. The truth is, to be useful, your topics probably consist of stock.nyse.ibm, stock.nyse.ge, stock.nasdaq.msft, stock.nasdaq.aapl, etc., not stock.nyse and stock.nasdaq. We could end up with an explosion of topics and, even with efficient bitmaps, the memory consumption tends to be too high despite the fact that most of the bitmaps are quite sparse.

Fortunately, we can reduce the amount of memory we consume using a fairly straightforward optimization. Rather than requiring the entire search space a priori, we simply require the max topic size, in terms of words, e.g. stock.nyse.ibm has a size of 3. We can handle topics of the max size or less, e.g. stock.nyse.bac, stock.nasdaq.txn, forex.usd, index, etc. If we see a message with more words than the max, we can safely assume there are no matching subscriptions.

The optimized inverted bitmap works by splitting topics into their constituent parts. Each constituent position has a set of bitmaps, and we use a technique similar to the one described above on each part. We end up with a bitmap for each constituent which we perform a logical AND on to give a resulting bitmap. Each 1 in the resulting bitmap corresponds to a subscription. This means if the max topic size is n, we only AND at most n bitmaps. Furthermore, if we come across any empty bitmaps, we can stop early since we know there are no matching subscribers.

Let’s say our max topic size is 2 and we have the following subscriptions:

  • 0 = forex.*
  • 1 = stock.nyse
  • 2 = index
  • 3 = stock.*

The inverted bitmap for the first constituent looks like the following:

forex.* stock.nyse index stock.*
null 0 0 0 0
forex 1 0 0 0
stock 0 1 0 1
index 0 0 1 0
other 0 0 0 0

And the second constituent bitmap:

forex.* stock.nyse index stock.*
null 0 0 1 0
nyse 0 1 0 0
other 1 0 0 1

The “null” and “other” rows are worth pointing out. “Null” simply means the topic has no corresponding constituent.  For example, “index” has no second constituent, so “null” is marked. “Other” allows us to limit the number of rows needed such that we only need the ones that appear in subscriptions.  For example, if messages are published on forex.eur, forex.usd, and forex.gbp but I merely subscribe to forex.*, there’s no need to index eur, usd, or gbp. Instead, we just mark the “other” row which will match all of them.

Let’s look at an example using the above bitmaps. Imagine we want to route a message published on forex.eur. We split the topic into its constituents: “forex” and “eur.” We get the row corresponding to “forex” from the first constituent bitmap, the one corresponding to “eur” from the second (other), and then AND the rows.

forex.* stock.nyse index stock.*
1 = forex 1 0 0 0
2 = other 1 0 0 1
AND 1 0 0 0

The forex.* subscription matches.

Let’s try one more example: a message published on stock.nyse.

forex.* stock.nyse index stock.*
1 = stock 0 1 0 1
2 = nyse 0 1 0 1
AND 0 1 0 1

In this case, we also need to OR the “other” row for the second constituent. This gives us a match for stock.nyse and stock.*.

Subscribe operations are significantly faster with the space-optimized inverted bitmap compared to the normal inverted bitmap, but lookups are much slower. However, the optimized version consumes roughly 4.5x less memory for every subscription. The increased flexibility and improved scalability makes the optimized version a better choice for all but the very latency-sensitive use cases.

subscribe unsubscribe lookup
cold 1,053ns 330ns 2,724ns
hot 1,076ns 371ns 3,337ns

Trie

The optimized inverted bitmap improves space complexity, but it does so at the cost of lookup efficiency. Is there a way we can reconcile both time and space complexity? While inverted bitmaps allow for efficient lookups, they are quite wasteful for sparse sets, even when using highly compressed bitmaps like Roaring bitmaps.

Tries can often be more space efficient in these circumstances. When we add a subscription, we descend the trie, adding nodes along the way as necessary, until we run out of words in the topic. Finally, we add some metadata containing the subscription information to the last node in the chain. To match a message topic, we perform a similar traversal. If a node doesn’t exist in the chain, we know there are no subscribers. One downside of this method is, in order to support wildcards, we must backtrack on a literal match and check the “*” branch as well.

For the given set of subscriptions, the trie would look something like the following:

  • forex.*
  • stock.nyse
  • index
  • stock.*

You might be tempted to ask: “why do we even need the “*” nodes? When someone subscribes to stock.*, just follow all branches after “stock” and add the subscriber.” This would indeed move the backtracking cost from the read path to the write path, but—like the first inverted bitmap we looked at—it only works if the search space is known ahead of time. It would also largely negate the memory-usage benefits we’re looking for since it would require pre-indexing all topics while requiring a finite search space.

It turns out, this trie technique is how systems like ZeroMQ and RabbitMQ implement their topic matching due to its balance between space and time complexity and overall performance predictability.

subscribe unsubscribe lookup
cold 406ns 221ns 2,145ns
hot 443ns 257ns 2,278ns

We can see that, compared to the optimized inverted bitmap, the trie performs much more predictably with relation to the number of subscriptions held.

Concurrent Subscription Trie

One thing we haven’t paid much attention to so far is concurrency. Indeed, message-oriented middleware is typically highly concurrent since they have to deal with heavy IO (reading messages from the wire, writing messages to the wire, reading messages from disk, writing messages to disk, etc.) and CPU operations (like topic matching and routing). Subscribe, unsubscribe, and lookups are usually all happening in different threads of execution. This is especially important when we want to talk advantage of multi-core processors.

It wasn’t shown, but all of the preceding algorithms used global locks to ensure thread safety between read and write operations, making the data structures safe for concurrent use. However, the microbenchmarks don’t really show the impact of this, which we will see momentarily.

Lock-freedom, which I’ve written about, allows us to increase throughput at the expense of increased tail latency.

Lock-free concurrency means that while a particular thread of execution may be blocked, all CPUs are able to continue processing other work. For example, imagine a program that protects access to some resource using a mutex. If a thread acquires this mutex and is subsequently preempted, no other thread can proceed until this thread is rescheduled by the OS. If the scheduler is adversarial, it may never resume execution of the thread, and the program would be effectively deadlocked. A key point, however, is that the mere lack of a lock does not guarantee a program is lock-free. In this context, “lock” really refers to deadlock, livelock, or the misdeeds of a malevolent scheduler.

The concurrent subscription trie, or CS-trie,  is a new take on the trie-based solution described earlier. It combines the idea of the topic-matching trie with that of a Ctrie, or concurrent trie, which is a non-blocking concurrent hash trie.

The fundamental problem with the trie, as it relates to concurrency, is it requires a global lock, which severely limits throughput. To address this, the CS-trie uses indirection nodes, or I-nodes, which remain present in the trie even as the nodes above and below change. Subscriptions are then added or removed by creating a copy of the respective node, and performing a CAS on its parent I-node. This allows us to add, remove, and lookup subscriptions concurrently and in a lock-free, linearizable manner.

For the given set of subscribers, labeled x, y, and z, the CS-trie would look something like the following:

  • x = foo, bar, bar.baz
  • y = foo, bar.qux
  • z = bar.*

Lookups on the CS-trie perform, on average, better than the standard trie, and the CS-trie scales better with respect to concurrent operations.

subscribe unsubscribe lookup
cold 412ns 245ns 1,615ns
hot 471ns 280ns 1,637ns

Latency Comparison

The chart below shows the topic-matching operation latencies for all of the algorithms side-by-side. First, we look at the performance of a cold start (no subscriptions) and then the performance of a hot start (1,000 subscriptions).

Throughput Comparison

So far, we’ve looked at the latency of individual topic-matching operations. Next, we look at overall throughput of each of the algorithms and their memory footprint.

 algorithm msg/sec
naive  4,053.48
inverted bitmap  1,052,315.02
optimized inverted bitmap  130,705.98
trie  248,762.10
cs-trie  340,910.64

On the surface, the inverted bitmap looks like the clear winner, clocking in at over 1 million matches per second. However, we know the inverted bitmap does not scale and, indeed, this becomes clear when we look at memory consumption, underscored by the fact that the below chart uses a log scale.

Scalability with Respect to Concurrency

Lastly, we’ll look at how each of these algorithms scales with respect to concurrency. We do this by performing concurrent operations and varying the level of concurrency and number of operations. We start with a 50-50 split between reads and writes. We vary the number of goroutines from 2 to 16 (the benchmark was run using a 2.6 GHz Intel Core i7 processor with 8 logical cores). Each goroutine performs 1,000 reads or 1,000 writes. For example, the 2-goroutine benchmark performs 1,000 reads and 1,000 writes, the 4-goroutine benchmark performs 2,000 reads and 2,000 writes, etc. We then measure the total amount of time needed to complete the workload.

We can see that the tries hardly even register on the scale above, so we’ll plot them separately.

The tries are clearly much more efficient than the other solutions, but the CS-trie in particular scales well to the increased workload and concurrency.

Since most workloads are heavily biased towards reads over writes, we’ll run a separate benchmark that uses a 90-10 split reads and writes. This should hopefully provide a more realistic result.

The results look, more or less, like what we would expect, with the reduced writes improving the inverted bitmap performance. The CS-trie still scales quite well in comparison to the global-lock trie.

Conclusion

As we’ve seen, there are several approaches to consider to implement fast topic matching. There are also several aspects to look at: read/write access patterns, time complexity, space complexity, throughput, and latency.

The naive hashmap solution is generally a poor choice due to its prohibitively expensive lookup time. Inverted bitmaps offer a better solution. The standard implementation is reasonable if the search space is finite, small, and known a priori, especially if read latency is critical. The space-optimized version is a better choice for scalability, offering a good balance between read and write performance while keeping a small memory footprint. The trie is an even better choice, providing lower latency than the optimized inverted bitmap and consuming less memory. It’s particularly good if the subscription tree is sparse and topics are not known a priori. Lastly, the concurrent subscription trie is the best option if there is high concurrency and throughput matters. It offers similar performance to the trie but scales better. The only downside is an increase in implementation complexity.

Take It to the Limit: Considerations for Building Reliable Systems

Complex systems usually operate in failure mode. This is because a complex system typically consists of many discrete pieces, each of which can fail in isolation (or in concert). In a microservice architecture where a given function potentially comprises several independent service calls, high availability hinges on the ability to be partially available. This is a core tenet behind resilience engineering. If a function depends on three services, each with a reliability of 90%, 95%, and 99%, respectively, partial availability could be the difference between 99.995% reliability and 84% reliability (assuming failures are independent). Resilience engineering means designing with failure as the normal.

Anticipating failure is the first step to resilience zen, but the second is embracing it. Telling the client “no” and failing on purpose is better than failing in unpredictable or unexpected ways. Backpressure is another critical resilience engineering pattern. Fundamentally, it’s about enforcing limits. This comes in the form of queue lengths, bandwidth throttling, traffic shaping, message rate limits, max payload sizes, etc. Prescribing these restrictions makes the limits explicit when they would otherwise be implicit (eventually your server will exhaust its memory, but since the limit is implicit, it’s unclear exactly when or what the consequences might be). Relying on unbounded queues and other implicit limits is like someone saying they know when to stop drinking because they eventually pass out.

Rate limiting is important not just to prevent bad actors from DoSing your system, but also yourself. Queue limits and message size limits are especially interesting because they seem to confuse and frustrate developers who haven’t fully internalized the motivation behind them. But really, these are just another form of rate limiting or, more generally, backpressure. Let’s look at max message size as a case study.

Imagine we have a system of distributed actors. An actor can send messages to other actors who, in turn, process the messages and may choose to send messages themselves. Now, as any good software engineer knows, the eighth fallacy of distributed computing is “the network is homogenous.” This means not all actors are using the same hardware, software, or network configuration. We have servers with 128GB RAM running Ubuntu, laptops with 16GB RAM running macOS, mobile clients with 2GB RAM running Android, IoT edge devices with 512MB RAM, and everything in between, all running a hodgepodge of software and network interfaces.

When we choose not to put an upper bound on message sizes, we are making an implicit assumption (recall the discussion on implicit/explicit limits from earlier). Put another way, you and everyone you interact with (likely unknowingly) enters an unspoken contract of which neither party can opt out. This is because any actor may send a message of arbitrary size. This means any downstream consumers of this message, either directly or indirectly, must also support arbitrarily large messages.

How can we test something that is arbitrary? We can’t. We have two options: either we make the limit explicit or we keep this implicit, arbitrarily binding contract. The former allows us to define our operating boundaries and gives us something to test. The latter requires us to test at some undefined production-level scale. The second option is literally gambling reliability for convenience. The limit is still there, it’s just hidden. When we don’t make it explicit, we make it easy to DoS ourselves in production. Limits become even more important when dealing with cloud infrastructure due to their multitenant nature. They prevent a bad actor (or yourself) from bringing down services or dominating infrastructure and system resources.

In our heterogeneous actor system, we have messages bound for mobile devices and web browsers, which are often single-threaded or memory-constrained consumers. Without an explicit limit on message size, a client could easily doom itself by requesting too much data or simply receiving data outside of its control—this is why the contract is unspoken but binding.

Let’s look at this from a different kind of engineering perspective. Consider another type of system: the US National Highway System. The US Department of Transportation uses the Federal Bridge Gross Weight Formula as a means to prevent heavy vehicles from damaging roads and bridges. It’s really the same engineering problem, just a different discipline and a different type of infrastructure.

The August 2007 collapse of the Interstate 35W Mississippi River bridge in Minneapolis brought renewed attention to the issue of truck weights and their relation to bridge stress. In November 2008, the National Transportation Safety Board determined there had been several reasons for the bridge’s collapse, including (but not limited to): faulty gusset plates, inadequate inspections, and the extra weight of heavy construction equipment combined with the weight of rush hour traffic.

The DOT relies on weigh stations to ensure trucks comply with federal weight regulations, fining those that exceed restrictions without an overweight permit.

The federal maximum weight is set at 80,000 pounds. Trucks exceeding the federal weight limit can still operate on the country’s highways with an overweight permit, but such permits are only issued before the scheduled trip and expire at the end of the trip. Overweight permits are only issued for loads that cannot be broken down to smaller shipments that fall below the federal weight limit, and if there is no other alternative to moving the cargo by truck.

Weight limits need to be enforced so civil engineers have a defined operating range for the roads, bridges, and other infrastructure they build. Computers are no different. This is the reason many systems enforce these types of limits. For example, Amazon clearly publishes the limits for its Simple Queue Service—the max in-flight messages for standard queues is 120,000 messages and 20,000 messages for FIFO queues. Messages are limited to 256KB in size. Amazon KinesisApache KafkaNATS, and Google App Engine pull queues all limit messages to 1MB in size. These limits allow the system designers to optimize their infrastructure and ameliorate some of the risks of multitenancy—not to mention it makes capacity planning much easier.

Unbounded anything—whether its queues, message sizes, queries, or traffic—is a resilience engineering anti-pattern. Without explicit limits, things fail in unexpected and unpredictable ways. Remember, the limits exist, they’re just hidden. By making them explicit, we restrict the failure domain giving us more predictability, longer mean time between failures, and shorter mean time to recovery at the cost of more upfront work or slightly more complexity.

It’s better to be explicit and handle these limits upfront than to punt on the problem and allow systems to fail in unexpected ways. The latter might seem like less work at first but will lead to more problems long term. By requiring developers to deal with these limitations directly, they will think through their APIs and business logic more thoroughly and design better interactions with respect to stability, scalability, and performance.

Benchmarking Message Queue Latency

About a year and a half ago, I published Dissecting Message Queues, which broke down a few different messaging systems and did some performance benchmarking. It was a naive attempt and had a lot of problems, but it was also my first time doing any kind of system benchmarking. It turns out benchmarking systems correctly is actually pretty difficult and many folks get it wrong. I don’t claim to have gotten it right, but over the past year and a half I’ve learned a lot, tried to build some better tools, and improve my methodology.

Tooling and Methodology

The Dissecting Message Queues benchmarks used a framework I wrote which published a specified number of messages effectively as fast as possible, received them, and recorded the end-to-end latency. There are several problems with this. First, load generation and consumption run on the same machine. Second, the system under test runs on the same machine as the benchmark client—both of these confound measurements. Third, running “pedal to the metal” and looking at the resulting latency isn’t a very useful benchmark because it’s not representative of a production environment (as Gil Tene likes to say, this is like driving your car as fast as possible, crashing it into a pole, and looking at the shape of the bumper afterwards—it’s always going to look bad). Lastly, the benchmark recorded average latency, which, for all intents and purposes, is a useless metric to look at.

I wrote Flotilla to automate “scaled-up” benchmarking—running the broker and benchmark clients on separate, distributed VMs. Flotilla also attempted to capture a better view of latency by looking at the latency distribution, though it only went up to the 99th percentile, which can sweep a lot of really bad things under the rug as we’ll see later. However, it still ran tests at full throttle, which isn’t great.

Bench is an attempt to get back to basics. It’s a simple, generic benchmarking library for measuring latency. It provides a straightforward Requester interface which can be implemented for various systems under test. Bench works by attempting to issue a fixed rate of requests per second and measuring the latency of each request issued synchronously. Latencies are captured using HDR Histogram, which observes the complete latency distribution and allows us to look, for example, at “six nines” latency.

Introducing a request schedule allows us to measure latency for different configurations of request rate and message size, but in a “closed-loop” test, it creates another problem called coordinated omission. The problem with a lot of benchmarks is that they end up measuring service time rather than response time, but the latter is likely what you care about because it’s what your users experience.

The best way to describe service time vs. response time is to think of a cash register. The cashier might be able to ring up a customer in under 30 seconds 99% of the time, but 1% of the time it takes three minutes. The time it takes to ring up a customer is the service time, while the response time consists of the service time plus the time the customer waited in line. Thus, the response time is dependent upon the variation in both service time and the rate of arrival. When we measure latency, we really want to measure response time.

Now, let’s think about how most latency benchmarks work. They usually do this:

  1. Note timestamp before request, t0.
  2. Make synchronous request.
  3. Note timestamp after request, t1.
  4. Record latency t1t0.
  5. Repeat as needed for request schedule.

What’s the problem with this? Nothing, as long as our requests fit within the specified request schedule.  For example, if we’re issuing 100 requests per second and each request takes 10 ms to complete, we’re good. However, if one request takes 100 ms to complete, that means we issued only one request during those 100 ms when, according to our schedule, we should have issued 10 requests in that window. Nine other requests should have been issued, but the benchmark effectively coordinated with the system under test by backing off. In reality, those nine requests waited in line—one for 100 ms, one for 90 ms, one for 80 ms, etc. Most benchmarks don’t capture this time spent waiting in line, yet it can have a dramatic effect on the results. The graph below shows the same benchmark with coordinated omission both uncorrected (red) and corrected (blue):
coordinated_omission

HDR Histogram attempts to correct coordinated omission by filling in additional samples when a request falls outside of its expected interval. We can also deal with coordinated omission by simply avoiding it altogether—always issue requests according to the schedule.

Message Queue Benchmarks

I benchmarked several messaging systems using bench—RabbitMQ (3.6.0), Kafka (0.8.2.2 and 0.9.0.0), Redis (2.8.4) pub/sub, and NATS (0.7.3). In this context, a “request” consists of publishing a message to the server and waiting for a response (i.e. a roundtrip). We attempt to issue requests at a fixed rate and correct for coordinated omission, then plot the complete latency distribution all the way up to the 99.9999th percentile. We repeat this for several configurations of request rate and request size. It’s also important to note that each message going to and coming back from the server are of the specified size, i.e. the “response” is the same size as the “request.”

The configurations used are listed below. Each configuration is run for a sustained 30 seconds.

  • 256B requests at 3,000 requests/sec (768 KB/s)
  • 1KB requests at 3,000 requests/sec (3 MB/s)
  • 5KB requests at 2,000 requests/sec (10 MB/s)
  • 1KB requests at 20,000 requests/sec (20.48 MB/s)
  • 1MB requests at 100 requests/sec (100 MB/s)

These message sizes are mostly arbitrary, and there might be a better way to go about this. Though I think it’s worth pointing out that the Ethernet MTU is 1500 bytes, so accounting for headers, the maximum amount of data you’ll get in a single TCP packet will likely be between 1400 and 1500 bytes.

The system under test and benchmarking client are on two different m4.xlarge EC2 instances (2.4 GHz Intel Xeon Haswell, 16GB RAM) with enhanced networking enabled.

Redis and NATS

Redis pub/sub and NATS have similar performance characteristics. Both offer very lightweight, non-transactional messaging with no persistence options (discounting Redis’ RDB and AOF persistence, which don’t apply to pub/sub), and both support some level of topic pattern matching. I’m hesitant to call either a “message queue” in the traditional sense, so I usually just refer to them as message brokers or buses. Because of their ephemeral nature, both are a nice choice for low-latency, lossy messaging.

Redis tail latency peaks around 1.5 ms.

Redis_latency

NATS performance looks comparable to Redis. Latency peaks around 1.2 ms.

NATS_latency

The resemblance becomes more apparent when we overlay the two distributions for the 1KB and 5KB runs. NATS tends to be about 0.1 to 0.4 ms faster.

Redis_NATS_latency

The 1KB, 20,000 requests/sec run uses 25 concurrent connections. With concurrent load, tail latencies jump up, peaking around 90 and 120 ms at the 99.9999th percentile in NATS and Redis, respectively.

Redis_NATS_1KB_20000_latency

Large messages (1MB) don’t hold up nearly as well, exhibiting large tail latencies starting around the 95th and 97th percentiles in NATS and Redis, respectively. 1MB is the default maximum message size in NATS. The latency peaks around 214 ms. Again, keep in mind these are synchronous, roundtrip latencies.

Redis_NATS_1MB_latency

Apcera’s Ivan Kozlovic pointed out that the version of the NATS client I was using didn’t include a recent performance optimization. Before, the protocol parser scanned over each byte in the payload, but the newer version skips to the end (the previous benchmarks were updated to use the newer version). The optimization does have a noticeable effect, illustrated below. There was about a 30% improvement with the 5KB latencies.

NATS_optimization_latency

The difference is even more pronounced in the 1MB case, which has roughly a 90% improvement up to the 90th percentile. The linear scale in the graph below hides this fact, but at the 90th percentile, for example, the pre-optimization latency is 10 ms and the optimized latency is 3.8 ms. Clearly, the large tail is mostly unaffected, however.

NATS_1MB_optimization_latency

In general, this shows that NATS and Redis are better suited to smaller messages (well below 1MB), in which latency tends to be sub-millisecond up to four nines.

RabbitMQ and Kafka

RabbitMQ is a popular AMQP implementation. Unlike NATS, it’s a more traditional message queue in the sense that it supports binding queues and transactional-delivery semantics. Consequently, RabbitMQ is a more “heavyweight” queuing solution and tends to pay an additional premium with latency. In this benchmark, non-durable queues were used. As a result, we should see reduced latencies since we aren’t going to disk.

RabbitMQ_latency

Latency tends to be sub-millisecond up to the 99.7th percentile, but we can see that it doesn’t hold up to NATS beyond that point for the 1KB and 5KB payloads.

RabbitMQ_NATS_latency

Kafka, on the other hand, requires disk persistence, but this doesn’t have a dramatic effect on latency until we look at the 94th percentile and beyond, when compared to RabbitMQ. Writes should be to page cache with flushes to disk happening asynchronously. The graphs below are for 0.8.2.2.

Kafka_latency

RabbitMQ_Kafka_latency

Once again, the 1KB, 20,000 requests/sec run is distributed across 25 concurrent connections. With RabbitMQ, we see the dramatic increase in tail latencies as we did with Redis and NATS. The RabbitMQ latencies in the concurrent case stay in line with the previous latencies up to about the 99th percentile. Interestingly, Kafka, doesn’t appear to be significantly affected. The latencies of 20,000 requests/sec at 1KB per request are not terribly different than the latencies of 3,000 requests/sec at 1KB per request, both peaking around 250 ms.

RabbitMQ_Kafka_1KB_20000_latency

What’s particularly interesting is the behavior of 1MB messages vs. the rest. With RabbitMQ, there’s almost a 14x difference in max latencies between the 5KB and 1MB runs with 1MB being the faster. With Kafka 0.8.2.2, the difference is over 126x in the same direction. We can plot the 1MB latencies for RabbitMQ and Kafka since it’s difficult to discern them with a linear scale.

RabbitMQ_Kafka_1MB_latency

tried to understand what was causing this behavior. I’ve yet to find a reasonable explanation for RabbitMQ. Intuition tells me it’s a result of buffering—either at the OS level or elsewhere—and the large messages cause more frequent flushing. Remember that these benchmarks were with transient publishes. There should be no disk accesses occurring, though my knowledge of Rabbit’s internals are admittedly limited. The fact that this behavior occurs in RabbitMQ and not Redis or NATS seems odd. Nagle’s algorithm is disabled in all of the benchmarks (TCP_NODELAY). After inspecting packets with Wireshark, it doesn’t appear to be a problem with delayed acks.

To show just how staggering the difference is, we can plot Kafka 0.8.2.2 and RabbitMQ 1MB latencies alongside Redis and NATS 5KB latencies. They are all within the same ballpark. Whatever the case may be, both RabbitMQ and Kafka appear to handle large messages extremely well in contrast to Redis and NATS.

RabbitMQ_Kafka_NATS_Redis_latency

This leads me to believe you’ll see better overall throughput, in terms of raw data, with RabbitMQ and Kafka, but more predictable, tighter tail latencies with Redis and NATS. Where SLAs are important, it’s hard to beat NATS. Of course, it’s unfair to compare Kafka with something like NATS or Redis or even RabbitMQ since they are very different (and sometimes complementary), but it’s also worth pointing out that the former is much more operationally complex.

However, benchmarking Kafka 0.9.0.0 (blue and red) shows an astounding difference in tail latencies compared to 0.8.2.2 (orange and green).

Kafka_0_8_0_9_latency

Kafka 0.9’s performance is much more in line with RabbitMQ’s at high percentiles as seen below.

RabbitMQ_Kafka_0_9_latency

Likewise, it’s a much closer comparison to NATS when looking at the 1KB and 5KB runs.

Kafka_NATS_latency

As with 0.8, Kafka 0.9 does an impressive job dealing with 1MB messages in comparison to NATS, especially when looking at the 92nd percentile and beyond. It’s hard to decipher in the graph below, but Kafka 0.9’s 99th, 99.9th, and 99.99th percentile latencies are 0.66, 0.78, and 1.35 ms, respectively.

Kafka_0_9_NATS_1MB

My initial thought was that the difference between Kafka 0.8 and 0.9 was attributed to a change in fsync behavior. To quote the Kafka documentation:

Kafka always immediately writes all data to the filesystem and supports the ability to configure the flush policy that controls when data is forced out of the OS cache and onto disk using the and flush. This flush policy can be controlled to force data to disk after a period of time or after a certain number of messages has been written.

However, there don’t appear to be any changes in the default flushing configuration between 0.8 and 0.9. The default configuration disables application fsync entirely, instead relying on the OS’s background flush. Jay Kreps indicates it’s a result of several “high percentile latency issues” that were fixed in 0.9. After scanning the 0.9 release notes, I was unable to determine specifically what those fixes might be. Either way, the difference is certainly not something to scoff at.

Conclusion

As always, interpret these benchmark results with a critical eye and perform your own tests if you’re evaluating these systems. This was more an exercise in benchmark methodology and tooling than an actual system analysis (and, as always, there’s still a lot of room for improvement). If anything, I think these results show how much we can miss by not looking beyond the 99th percentile. In almost all cases, everything looks pretty good up to that point, but after that things can get really bad. This is important to be conscious of when discussing SLAs.

I think the key takeaway is to consider your expected load in production, benchmark configurations around that, determine your allowable service levels, and iterate or provision more resources until you’re within those limits. The other important takeaway with respect to benchmarking is to look at the complete latency distribution. Otherwise, you’re not getting a clear picture of how your system actually behaves.

What You Want Is What You Don’t: Understanding Trade-Offs in Distributed Messaging

If there’s one unifying theme of this blog, it’s that distributed systems are riddled with trade-offs. Specifically, with distributed messaging, you cannot have exactly-once delivery. However, messaging trade-offs don’t stop at delivery semantics. I want to talk about what I mean by this and explain why many developers often have the wrong mindset when it comes to building distributed applications.

The natural tendency is to build distributed systems as if they aren’t distributed at all—assuming data consistency, reliable messaging, and predictability. It’s much easier to reason about, but it’s also blatantly misleading.

The only thing guaranteed in messaging—and distributed systems in general—is that sooner or later, your guarantees are going to break down. If you assume these guarantees as axiomatic, everything built on them becomes unsound. Depending on the situation, this can range from mildly annoying to utterly catastrophic.

I recently ran across a comment from Apcera CEO Derek Collison on this topic which resonated with me:

On systems that do claim some form of guarantee, it’s best to look at what level that guarantee really runs out. Especially around persistence, exactly once delivery semantics, etc. I spent much of my career designing and building messaging systems that have those guarantees, and in turn developed many systems utilizing some of those features. For me, I found that depending on these guarantees was a bad pattern in distributed system design…

You should know how your system behaves when you reach the breaking point, but what’s less obvious is that providing these types of strong guarantees is usually very expensive. What price are we willing to pay, what level do our guarantees hold to, and what happens when they give out? In this sense, a “guarantee” is really no different from a SLA, yet stronger guarantees allow for stronger assumptions.

This all sounds quite vague, so let’s look at a specific example. With messaging, we’re often concerned with delivery reliability. In a perfect world, message delivery would be guaranteed and exactly once. Of course, I’ve talked at length why this is impossible, so let’s anchor ourselves in reality. We can look to TCP/IP for how this works.

IP is an unreliable delivery system which runs on unreliable network infrastructure. Packets can be delivered in order, out of order, or not at all. There are no acknowledgements, so the sender has no way of knowing if what they sent was received. TCP builds on IP by effectively making the transmission stateful and adding a layer of control. Through added complexity and performance costs, we achieve reliable delivery over an unreliable stack.

The key takeaway here is that we start with something primitive, like moving bits from point A to point B, and layer on abstractions to build stronger guarantees.  These abstractions almost always come at a price, tangible or not, which is why it’s important to push the costs up into the layers above. If not every use case demands reliable delivery, why force the cost onto everyone?

Exactly-once delivery is the Holy Grail of distributed messaging, and guaranteed delivery is the unicorn. The irony is that even if they were attainable, you likely wouldn’t want them. These types of strong guarantees demand expensive infrastructure which perform expensive coordination which require expensive administration. But what does all this expensive stuff really buy you at the end of the day?

A key problem is that there is a huge difference between message delivery and message processing. Sure, TCP can more or less ensure that your packet was either delivered or not, but what good is that actually in practice? How does the sender know that its message was successfully processed or that the receiver did what it needed to do? The only way to truly know is for the receiver to send a business-level acknowledgement. The low-level transport protocol doesn’t know about the application semantics, so the only way to go, really, is up. And if we assume that any guarantees will eventually give out, we have to account for that at the business level. To quote from a related article, “if reliability is important on the business level, do it on the business level.” It’s important not to conflate the transport protocol with the business-transaction protocol.

This is why systems like Akka don’t provide a notion of guaranteed delivery—because what does “guaranteed delivery” actually mean? Does it mean the message was handed to the transport layer? Does it mean the remote machine received the message? Does it mean the message was enqueued in the recipient’s mailbox?  Does it mean the recipient has started processing it? Does it mean the recipient has finished processing it? Each of these things has a very different set of requirements, constraints, and costs. Also, what does it even mean for a message to be “processed”? It depends on the business context. As such, it usually doesn’t make sense for the underlying infrastructure to make these decisions because the decisions usually impact the layers above significantly.

By providing only basic guarantees those use cases which do not need stricter guarantees do not pay the cost of their implementation; it is always possible to add stricter guarantees on top of basic ones, but it is not possible to retro-actively remove guarantees in order to gain more performance.

Distributed computation is inherently asynchronous and the network is inherently unreliable, so it’s better to embrace this asynchrony than to build on leaky abstractions. Rather than hide these inconveniences, make them explicit and force users to design around them. What you end up with is a more robust, more reliable, and often more performant system. This trade-off is highlighted in the paper “Exactly-once semantics in a replicated messaging system” by Huang et al. while studying the problem of exactly-once delivery:

Thus, server-centric algorithms cannot achieve exactly-once semantics. Instead, we will strive to achieve a weaker notion of correctness.

By relaxing our requirements, we end up with a solution that has less performance overhead and less complexity. Why bother pursuing the impossible? You’re paying a huge premium for something which is probably less reliable than you think while performing poorly. In many cases, it’s better to let the pendulum swing the other direction.

The network is not reliable, which means message delivery is never truly guaranteed—it can only be best-effort. The Two Generals’ Problem shows that it’s provenly impossible for two remote processes to safely agree on a decision. Similarly, the FLP impossibility result shows that, in an asynchronous environment, reliable failure detection is impossible. That is, there’s no way to tell if a process has crashed or is simply taking a long time to respond. Therefore, if it’s possible for a process to crash, it’s impossible for a set of processes to come to an agreement.

If message delivery is not guaranteed and consensus is impossible, is message ordering really that important? Some use cases might actually demand it, but I suspect, more often than not, it’s an artificial constraint. The fact that the network is unreliable, processes are faulty, and distributed communication is asynchronous makes reliable, in-order delivery surprisingly expensive. But doesn’t TCP solve this problem? At the transport level, yes, but that only gets you so far as I’ve been trying to demonstrate.

So you use TCP and process messages with a single thread. Most of the time, it just works. But what happens under heavy load? What happens when message delivery fails? What happens when you need to scale? If you are queuing messages or you have a dead-letter queue or you have network partitions or a crash-recovery model, you’re probably going to encounter duplicate, dropped, or out-of-order messages. Even if the infrastructure provides ordered delivery, these problems will likely manifest themselves at the application level.

If you’re distributed, forget about ordering and start thinking about commutativity. Forget about guaranteed delivery and start thinking about idempotence. Stop thinking about the messaging platform and start thinking about the messaging patterns and business semantics. A pattern which is commutative and idempotent will be far less brittle and more efficient than a system which is totally ordered and “guaranteed.” This is why CRDTs are becoming increasingly popular in the distributed space. Never write code which assumes messages will arrive in order when you can’t write code that will assume they arrive at all.

In the end, think carefully about the business case and what your requirements really are. Can you satisfy them without relying on costly and leaky abstractions or deceptive guarantees? If you can’t, what happens when those guarantees go out the window? This is very similar to understanding what happens when a SLA is not met. Are the performance and complexity trade-offs worth it? What about the operations and business overheads? In my experience, it’s better to confront the intricacies of distributed systems head-on than to sweep them under the rug. Sooner or later, they will rear their ugly heads.

You Cannot Have Exactly-Once Delivery

I’m often surprised that people continually have fundamental misconceptions about how distributed systems behave. I myself shared many of these misconceptions, so I try not to demean or dismiss but rather educate and enlighten, hopefully while sounding less preachy than that just did. I continue to learn only by following in the footsteps of others. In retrospect, it shouldn’t be surprising that folks buy into these fallacies as I once did, but it can be frustrating when trying to communicate certain design decisions and constraints.

Within the context of a distributed system, you cannot have exactly-once message delivery. Web browser and server? Distributed. Server and database? Distributed. Server and message queue? Distributed. You cannot have exactly-once delivery semantics in any of these situations.

As I’ve described in the past, distributed systems are all about trade-offs. This is one of them. There are essentially three types of delivery semantics: at-most-once, at-least-once, and exactly-once. Of the three, the first two are feasible and widely used. If you want to be super anal, you might say at-least-once delivery is also impossible because, technically speaking, network partitions are not strictly time-bound. If the connection from you to the server is interrupted indefinitely, you can’t deliver anything. Practically speaking, you have bigger fish to fry at that point—like calling your ISP—so we consider at-least-once delivery, for all intents and purposes, possible. With this model of thinking, network partitions are finitely bounded in time, however arbitrary this may be.

So where does the trade-off come into play, and why is exactly-once delivery impossible? The answer lies in the Two Generals thought experiment or the more generalized Byzantine Generals Problem, which I’ve looked at extensively. We must also consider the FLP result, which basically says, given the possibility of a faulty process, it’s impossible for a system of processes to agree on a decision.

In the letter I mail you, I ask you to call me once you receive it. You never do. Either you really didn’t care for my letter or it got lost in the mail. That’s the cost of doing business. I can send the one letter and hope you get it, or I can send 10 letters and assume you’ll get at least one of them. The trade-off here is quite clear (postage is expensive!), but sending 10 letters doesn’t really provide any additional guarantees. In a distributed system, we try to guarantee the delivery of a message by waiting for an acknowledgement that it was received, but all sorts of things can go wrong. Did the message get dropped? Did the ack get dropped? Did the receiver crash? Are they just slow? Is the network slow? Am slow? FLP and the Two Generals Problem are not design complexities, they are impossibility results.

People often bend the meaning of “delivery” in order to make their system fit the semantics of exactly-once, or in other cases, the term is overloaded to mean something entirely different. State-machine replication is a good example of this. Atomic broadcast protocols ensure messages are delivered reliably and in order. The truth is, we can’t deliver messages reliably and in order in the face of network partitions and crashes without a high degree of coordination. This coordination, of course, comes at a cost (latency and availability), while still relying on at-least-once semantics. Zab, the atomic broadcast protocol which lays the foundation for ZooKeeper, enforces idempotent operations.

State changes are idempotent and applying the same state change multiple times does not lead to inconsistencies as long as the application order is consistent with the delivery order. Consequently, guaranteeing at-least once semantics is sufficient and simplifies the implementation.

“Simplifies the implementation” is the authors’ attempt at subtlety. State-machine replication is just that, replicating state. If our messages have side effects, all of this goes out the window.

We’re left with a few options, all equally tenuous. When a message is delivered, it’s acknowledged immediately before processing. The sender receives the ack and calls it a day. However, if the receiver crashes before or during its processing, that data is lost forever. Customer transaction? Sorry, looks like you’re not getting your order. This is the worldview of at-most-once delivery. To be honest, implementing at-most-once semantics is more complicated than this depending on the situation. If there are multiple workers processing tasks or the work queues are replicated, the broker must be strongly consistent (or CP in CAP theorem parlance) so as to ensure a task is not delivered to any other workers once it’s been acked. Apache Kafka uses ZooKeeper to handle this coordination.

On the other hand, we can acknowledge messages after they are processed. If the process crashes after handling a message but before acking (or the ack isn’t delivered), the sender will redeliver. Hello, at-least-once delivery. Furthermore, if you want to deliver messages in order to more than one site, you need an atomic broadcast which is a huge burden on throughput. Fast or consistent. Welcome to the world of distributed systems.

Every major message queue in existence which provides any guarantees will market itself as at-least-once delivery. If it claims exactly-once, it’s because they are lying to your face in hopes that you will buy it or they themselves do not understand distributed systems. Either way, it’s not a good indicator.

RabbitMQ attempts to provide guarantees along these lines:

When using confirms, producers recovering from a channel or connection failure should retransmit any messages for which an acknowledgement has not been received from the broker. There is a possibility of message duplication here, because the broker might have sent a confirmation that never reached the producer (due to network failures, etc). Therefore consumer applications will need to perform deduplication or handle incoming messages in an idempotent manner.

The way we achieve exactly-once delivery in practice is by faking it. Either the messages themselves should be idempotent, meaning they can be applied more than once without adverse effects, or we remove the need for idempotency through deduplication. Ideally, our messages don’t require strict ordering and are commutative instead. There are design implications and trade-offs involved with whichever route you take, but this is the reality in which we must live.

Rethinking operations as idempotent actions might be easier said than done, but it mostly requires a change in the way we think about state. This is best described by revisiting the replicated state machine. Rather than distributing operations to apply at various nodes, what if we just distribute the state changes themselves? Rather than mutating state, let’s just report facts at various points in time. This is effectively how Zab works.

Imagine we want to tell a friend to come pick us up. We send him a series of text messages with turn-by-turn directions, but one of the messages is delivered twice! Our friend isn’t too happy when he finds himself in the bad part of town. Instead, let’s just tell him where we are and let him figure it out. If the message gets delivered more than once, it won’t matter. The implications are wider reaching than this, since we’re still concerned with the ordering of messages, which is why solutions like commutative and convergent replicated data types are becoming more popular. That said, we can typically solve this problem through extrinsic means like sequencing, vector clocks, or other partial-ordering mechanisms. It’s usually causal ordering that we’re after anyway. People who say otherwise don’t quite realize that there is no now in a distributed system.

To reiterate, there is no such thing as exactly-once delivery. We must choose between the lesser of two evils, which is at-least-once delivery in most cases. This can be used to simulate exactly-once semantics by ensuring idempotency or otherwise eliminating side effects from operations. Once again, it’s important to understand the trade-offs involved when designing distributed systems. There is asynchrony abound, which means you cannot expect synchronous, guaranteed behavior. Design for failure and resiliency against this asynchronous nature.

Benchmark Responsibly

When I posted my Dissecting Message Queues article last summer, it understandably caused some controversy.  I received both praise and scathing comments, emails asking why I didn’t benchmark X and pull requests to bump the numbers of Y. To be honest, that analysis was more of a brain dump from my own test driving of various message queues than any sort of authoritative or scientific study—it was far from the latter, to say the least. The qualitative discussion was pretty innocuous, but the benchmarks and supporting code were the target of a lot of (valid) criticism. In retrospect, it was probably irresponsible to publish them, but I was young and naive back then; now I’m just mostly naive.

Comparing Apples to Other Assorted Fruit

One such criticism was that the benchmarks were divided into two very broad categories: brokerless and brokered. While the brokerless group compared two very similar libraries, ZeroMQ and nanomsg, the second group included a number of distinct message brokers like RabbitMQ, Kafka, NATS, and Redis, to name a few.

The problem is not all brokers are created equal. They often have different goals and different prescribed use cases. As such, they impose different guarantees, different trade-offs, and different constraints. By grouping these benchmarks together, I implied they were fundamentally equivalent, when in fact, most were fundamentally different. For example, NATS serves a very different purpose than Kafka, and Redis, which offers pub/sub messaging, typically isn’t thought of as a message broker at all.

Measure Right or Don’t Measure at All

Another criticism was the way in which the benchmarks were performed. The tests were immaterial. The producer, consumer, and the message queue itself all ran on the same machine. Even worse, they used just a single publisher and subscriber. Not only does it not test what a remotely realistic configuration looks like, but it doesn’t even give you a good idea of a trivial one.

To be meaningful, we need to test with more than one producer and consumer, ideally distributed across many machines. We want to see how the system scales to larger workloads. Certainly, the producers and consumers cannot be collocated when we’re measuring discrete throughputs on either end, nor should the broker. This helps to reduce confounding variables between the system under test and the load generation.

It’s Not Rocket Science, It’s Computer Science

The third major criticism lay with the measurements themselves. Measuring throughput is fairly straightforward: we look at the number of messages sent per unit of time at both the sender and the receiver. If we think of a pipe carrying water, we might look at a discrete cross section and the rate at which water passes through it.

Latency, as a concept, is equally simple. With the pipe, it’s the time it takes for a drop of water to travel from one end to the other. While throughput is dependent on the pipe’s diameter, latency is dependent upon its length. What this means is that we can’t derive one from the other. In order to properly measure latency, we need to consider the latency of each message sent through the system.

However, we can’t ignore the relationship between throughput and latency and what the compromise between them means. Generally, we want to make things as fast as possible. Consider a single-cycle CPU. Its latency per instruction will be extremely low but contrasted with a pipelined processor, its throughput is abysmal—one instruction per clock cycle. The implication is that if we trade per-operation latency for throughput, we actually get a decrease in latency for aggregate instructions. Unfortunately, the benchmarks eschewed this relationship by requiring separate latency and throughput tests which used different code paths.

The interaction between latency and throughput is easy to get confused, but it often has interesting ramifications, whether you’re looking at message queues, CPUs, or databases. In a general sense, we’d say “optimize for latency” because lower latency means higher throughput, but the reality is it’s almost always easier (and more cost-effective) to increase throughput than it is to decrease latency, especially on commodity hardware.

Capturing this data, in and of itself, isn’t terribly difficult, but what’s more susceptible to error is how it’s represented. This was the main fault of the benchmarks (in addition to the things described earlier). The most egregious thing they did was report latency as an average. This is like the cardinal sin of benchmarking. The number is practically useless, particularly without any context like a standard deviation.

We know that latency isn’t going to be uniform, but it’s probably not going to follow a normal distribution either. While network latency may be prone to fitting a nice bell curve, system latency almost certainly won’t. They often exhibit things like GC pauses and other “hiccups,” and averages tend to hide these.

latency

Measuring performance isn’t all that easy, but if you do it, at least do it in a way that disambiguates the results. Look at quantiles, not averages. If you do present a mean, include the standard deviation and max in addition to the 90th or 99th percentile. Plotting latency by percentile distribution is an excellent way to see what your performance behavior actually looks like. Gil Tene has a great talk on measuring latency which I highly recommend.

Working Towards a Better Solution

With all this in mind, we can work towards building a better way to test and measure messaging systems. The discussion above really just gives us three key takeaways:

  1. Don’t compare apples to oranges.
  2. Don’t instrument tests in a way that’s not at all representative of real life.
  3. Don’t present results in a statistically insignificant way.

My first attempt at taking these ideas to heart is a tool I call Flotilla. It’s meant to provide a way to test messaging systems in more realistic configurations, at scale, while offering more useful data. Flotilla allows you to easily spin up producers and consumers on arbitrarily many machines, start a message broker, and run a benchmark against it, all in an automated fashion. It then collects data like producer/consumer throughput and the complete latency distribution and reports back to the user.

Flotilla uses a Go port of HdrHistogram to capture latency data, of which I’m a raving fan. HdrHistogram uses a bucketed approach to record values across a configured high-dynamic range at a particular resolution. Recording is in the single-nanosecond range and the memory footprint is constant. It also has support for correcting coordinated omission, which is a common problem in benchmarking. Seriously, if you’re doing anything performance sensitive, give HdrHistogram a look.

Still, Flotilla is not perfect and there’s certainly work to do, but I think it’s a substantial improvement over the previous MQ benchmarking utility. Longer term, it would be great to integrate it with something like Comcast to test workloads under different network conditions. Testing in a vacuum is nice and all, but we know in the real word, the network isn’t perfectly reliable.

So, Where Are the Benchmarks?

Omitted—for now, anyway. My goal really isn’t to rank a hodgepodge of different message queues because there’s really not much value in doing that. There are different use cases for different systems. I might, at some point, look at individual systems in greater detail, but comparing things like message throughput and latency just devolves into a hotly contested pissing contest. My hope is to garner more feedback and improvements to Flotilla before using it to definitively measure anything.

Benchmark responsibly.

Iris Decentralized Cloud Messaging

A couple weeks ago, I published a rather extensive analysis of numerous message queues, both brokered and brokerless. Brokerless messaging is really just another name for peer-to-peer communication. As we saw, the difference in message latency and throughput between peer-to-peer systems and brokered ones is several orders of magnitude. ZeroMQ and nanomsg are able to reliably transmit millions of messages per second at the expense of guaranteed delivery.

Peer-to-peer messaging is decentralized, scalable, and fast, but it brings with it an inherent complexity. There is a dichotomy between how brokerless messaging is conceptualized and how distributed systems are actually built. Distributed systems are composed of services like applications, databases, caches, etc. Services are composed of instances or nodes—individually addressable hosts, either physical or virtual. The key observation is that, conceptually, the unit of interaction lies at the service level, not the instance level. We don’t care about which database server we interact with, we just want to talk to database server (or perhaps multiple). We’re concerned with logical groups of nodes.

While traditional socket-queuing systems like ZeroMQ solve the problem of scaling, they bring about a certain coupling between components. System designers are forced to build applications which communicate with nodes, not services. We can introduce load balancers like HAProxy, but we’re still addressing specific locations while creating potential single points of failure. With lightweight VMs and the pervasiveness of elastic clouds, IP addresses are becoming less and less static—they come and go. The canonical way of dealing with this problem is to use distributed coordination and service discovery via ZooKeeper, et al., but this introduces more configuration, more moving parts, and more headaches.

The reality is that distributed systems are not built with the instance as the smallest unit of composition in mind, they’re built with services in mind. As discussed earlier, a service is simply a logical grouping of nodes. This abstraction is what we attempt to mimic with things like etcd, ZooKeeper and HAProxy. These assemblies are proven, but there are alternative solutions that offer zero configuration, minimal network management, and overall less complexity. One such solution that I want to explore is a distributed messaging framework called Iris.

Decentralized Messaging with Iris

Iris is posited as a decentralized approach to backend messaging middleware. It looks to address several of the fundamental issues with traditional brokerless systems, like tight coupling and security.

In order to avoid the problem of addressing instances, Iris considers clusters to be the smallest logical blocks of which systems are composed. A cluster is a collection of zero or more nodes which are responsible for a certain service sub-task. Clusters are then assembled into services such that they can communicate with each other without any regard as to which instance is servicing their requests or where it’s located. Lastly, services are composed into federations, which allow them to communicate across different clouds.

instances_vs_clusters

This form of composition allows Iris to use semantic or logical addressing instead of the standard physical addressing. Nodes specify the name of the cluster they wish to participate in, while Iris handles the intricacies of routing and balancing. For example, you might have three database servers which belong to a single cluster called “databases.” The cluster is reached by its name and requests are distributed across the three nodes. Iris also takes care of service discovery, detecting new clusters as they are created on the same cloud.

physical_vs_semantic

With libraries like ZeroMQ, security tends to be an afterthought. Iris has been built from the ground-up with security in mind, and it provides a security model that is simple and fast.

Iris uses a relaxed security model that provides perfect secrecy whilst at the same time requiring effectively zero configuration. This is achieved through the observation that if a node of a service is compromised, the whole system is considered undermined. Hence, the unit of security is a service – opposed to individual instances – where any successfully authenticated node is trusted by all. This enables full data protection whilst maintaining the loosely coupled nature of the system.

In practice, what this means is that each cluster uses a single private key. This encryption scheme not only makes deployment trivial, it minimizes the effect security has on speed.

authentication_and_encryption

Like ZeroMQ and nanomsg, Iris offers a few different messaging patterns. It provides the standard request-reply and publish-subscribe schemes, but it’s important to remember that the smallest addressable unit is the cluster, not the node. As such, requests are targeted at a cluster and subsequently relayed on to a member in a load-balanced fashion. Publish-subscribe, on the other hand, is not targeted at a single cluster. It allows members of any cluster to subscribe and publish to a topic.

Iris also implements two patterns called “broadcast” and “tunnel.” While request-reply forwards a message to one member of a cluster, broadcast forwards it to all members. The caveat is that there is no way to listen for responses to a broadcast.

Tunnel is designed to address the problem of stateful or streaming transactions where a communication between two endpoints may consist of multiple data exchanges which need to occur as an atomic operation. It provides the guarantee of in-order and throttled message delivery by establishing a channel between a client and a node.

schemes

Performance Characteristics

According to its author, Iris is still in a “feature phase” and hasn’t been optimized for speed. Since it’s written in Go, I’ve compared its pub/sub benchmark performance with other Go messaging libraries, NATS and NSQ. As before, these benchmarks shouldn’t be taken as gospel, the code is available here, and pull requests are welcome.

We can see that Iris is comparable to NSQ on the sending side and about 4x on the receiving side, at least out of the box.

Conclusion

Brokerless systems like ZeroMQ and nanomsg offer considerably higher throughput and less latency than classical message-oriented middleware but require greater orchestration of network topologies. They offer high scalability but can lead to tighter coupling between components. Traditional brokered message queues, like those of the AMQP variety, tend to be slower while providing more guarantees and reduced coupling. However, they are also more prone to scale problems like availability and partitioning.

In terms of its qualities, Iris appears to be a reasonable compromise between the decentralized nature of the brokerless systems and the minimal-configuration and management of the brokered ones. Its intrinsic value lies in its ability to hide the complexities of the underlying infrastructure behind distributed systems. Rather, Iris lends itself to building large-scale systems the way we conceptualize and reason about them—by using services as the building blocks, not instances.

Dissecting Message Queues

Continuing my series on message queues, I spent this weekend dissecting various libraries for performing distributed messaging. In this analysis, I look at a few different aspects, including API characteristics, ease of deployment and maintenance, and performance qualities. The message queues have been categorized into two groups: brokerless and brokered. Brokerless message queues are peer-to-peer such that there is no middleman involved in the transmission of messages, while brokered queues have some sort of server in between endpoints.

The systems I’ll be analyzing are:

Brokerless
nanomsg
ZeroMQ

Brokered
ActiveMQ
NATS
Kafka
Kestrel
NSQ
RabbitMQ
Redis
ruby-nats

To start, let’s look at the performance metrics since this is arguably what people care the most about. I’ve measured two key metrics: throughput and latency. All tests were run on a MacBook Pro 2.6 GHz i7, 16GB RAM. These tests are evaluating a publish-subscribe topology with a single producer and single consumer. This provides a good baseline. It would be interesting to benchmark a scaled-up topology but requires more instrumentation.

The code used for benchmarking, written in Go, is available on GitHub. The results below shouldn’t be taken as gospel as there are likely optimizations that can be made to squeeze out performance gains. Pull requests are welcome.

Throughput Benchmarks

Throughput is the number of messages per second the system is able to process, but what’s important to note here is that there is no single “throughput” that a queue might have. We’re sending messages between two different endpoints, so what we observe is a “sender” throughput and a “receiver” throughput—that is, the number of messages that can be sent per second and the number of messages that can be received per second.

This test was performed by sending 1,000,000 1KB messages and measuring the time to send and receive on each side. Many performance tests tend to use smaller messages in the range of 100 to 500 bytes. I chose 1KB because it’s more representative of what you might see in a production environment, although this varies case by case. For message-oriented middleware systems, only one broker was used. In most cases, a clustered environment would yield much better results.Unsurprisingly, there’s higher throughput on the sending side. What’s interesting, however, is the disparity in the sender-to-receiver ratios. ZeroMQ is capable of sending over 5,000,000 messages per second but is only able to receive about 600,000/second. In contrast, nanomsg sends shy of 3,000,000/second but can receive almost 2,000,000.

Now let’s take a look at the brokered message queues. Intuitively, we observe that brokered message queues have dramatically less throughput than their brokerless counterparts by a couple orders of magnitude for the most part. Half the brokered queues have a throughput below 25,000 messages/second. The numbers for Redis might be a bit misleading though. Despite providing pub/sub functionality, it’s not really designed to operate as a robust messaging queue. In a similar fashion to ZeroMQ, Redis disconnects slow clients, and it’s important to point out that it was not able to reliably handle this volume of messaging. As such, we consider it an outlier. Kafka and ruby-nats have similar performance characteristics to Redis but were able to reliably handle the message volume without intermittent failures. The Go implementation of NATS, gnatsd, has exceptional throughput for a brokered message queue.

Outliers aside, we see that the brokered queues have fairly uniform throughputs. Unlike the brokerless libraries, there is little-to-no disparity in the sender-to-receiver ratios, which themselves are all very close to one.

Latency Benchmarks

The second key performance metric is message latency. This measures how long it takes for a message to be transmitted between endpoints. Intuition might tell us that this is simply the inverse of throughput, i.e. if throughput is messages/second, latency is seconds/message. However, by looking closely at this image borrowed from a ZeroMQ white paper, we can see that this isn’t quite the case. latency The reality is that the latency per message sent over the wire is not uniform. It can vary wildly for each one. In truth, the relationship between latency and throughput is a bit more involved. Unlike throughput, however, latency is not measured at the sender or the receiver but rather as a whole. But since each message has its own latency, we will look at the averages of all of them. Going further, we will see how the average message latency fluctuates in relation to the number of messages sent. Again, intuition tells us that more messages means more queueing, which means higher latency.

As we did before, we’ll start by looking at the brokerless systems.
In general, our hypothesis proves correct in that, as more messages are sent through the system, the latency of each message increases. What’s interesting is the tapering at the 500,000-point in which latency appears to increase at a slower rate as we approach 1,000,000 messages. Another interesting observation is the initial spike in latency between 1,000 and 5,000 messages, which is more pronounced with nanomsg. It’s difficult to pinpoint causation, but these changes might be indicative of how message batching and other network-stack traversal optimizations are implemented in each library. More data points may provide better visibility.

We see some similar patterns with brokered queues and also some interesting new ones.

Redis behaves in a similar manner as before, with an initial latency spike and then a quick tapering off. It differs in that the tapering becomes essentially constant right after 5,000 messages. NSQ doesn’t exhibit the same spike in latency and behaves, more or less, linearly. Kestrel fits our hypothesis.

Notice that ruby-nats and NATS hardly even register on the chart. They exhibited surprisingly low latencies and unexpected relationships with the number of messages.Remarkably, the message latencies for ruby-nats and NATS appear to be constant. This is counterintuitive to our hypothesis.

You may have noticed that Kafka, ActiveMQ, and RabbitMQ were absent from the above charts. This was because their latencies tended to be orders-of-magnitude higher than the other brokered message queues, so ActiveMQ and RabbitMQ were grouped into their own AMQP category. I’ve also included Kafka since it’s in the same ballpark.

Here we see that RabbitMQ’s latency is constant, while ActiveMQ and Kafka are linear. What’s unclear is the apparent disconnect between their throughput and mean latencies.

Qualitative Analysis

Now that we’ve seen some empirical data on how these different libraries perform, I’ll take a look at how they work from a pragmatic point of view. Message throughput and speed is important, but it isn’t very practical if the library is difficult to use, deploy, or maintain.

ZeroMQ and Nanomsg

Technically speaking, nanomsg isn’t a message queue but rather a socket-style library for performing distributed messaging through a variety of convenient patterns. As a result, there’s nothing to deploy aside from embedding the library itself within your application. This makes deployment a non-issue.

Nanomsg is written by one of the ZeroMQ authors, and as I discussed before, works in a very similar way to that library. From a development standpoint, nanomsg provides an overall cleaner API. Unlike ZeroMQ, there is no notion of a context in which sockets are bound to. Furthermore, nanomsg provides pluggable transport and messaging protocols, which make it more open to extension. Its additional built-in scalability protocols also make it quite appealing.

Like ZeroMQ, it guarantees that messages will be delivered atomically intact and ordered but does not guarantee the delivery of them. Partial messages will not be delivered, and it’s possible that some messages won’t be delivered at all. The library’s author, Martin Sustrik, makes this abundantly clear:

Guaranteed delivery is a myth. Nothing is 100% guaranteed. That’s the nature of the world we live in. What we should do instead is to build an internet-like system that is resilient in face of failures and routes around damage.

The philosophy is to use a combination of topologies to build resilient systems that add in these guarantees in a best-effort sort of way.

On the other hand, nanomsg is still in beta and may not be considered production-ready. Consequently, there aren’t a lot of resources available and not much of a development community around it.

ZeroMQ is a battle-tested messaging library that’s been around since 2007. Some may perceive it as a predecessor to nanomsg, but what nano lacks is where ZeroMQ thrives—a flourishing developer community and a deluge of resources and supporting material. For many, it’s the de facto tool for building fast, asynchronous distributed messaging systems that scale.

Like nanomsg, ZeroMQ is not a message-oriented middleware and simply operates as a socket abstraction. In terms of usability, it’s very much the same as nanomsg, although its API is marginally more involved.

ActiveMQ and RabbitMQ

ActiveMQ and RabbitMQ are implementations of AMQP. They act as brokers which ensure messages are delivered. ActiveMQ and RabbitMQ support both persistent and non-persistent delivery. By default, messages are written to disk such that they survive a broker restart. They also support synchronous and asynchronous sending of messages with the former having substantial impact on latency. To guarantee delivery, these brokers use message acknowledgements which also incurs a massive latency penalty.

As far as availability and fault tolerance goes, these brokers support clustering through shared storage or shared nothing. Queues can be replicated across clustered nodes so there is no single point of failure or message loss.

AMQP is a non-trivial protocol which its creators claim to be over-engineered. These additional guarantees are made at the expense of major complexity and performance trade-offs. Fundamentally, clients are more difficult to implement and use.

Since they’re message brokers, ActiveMQ and RabbitMQ are additional moving parts that need to be managed in your distributed system, which brings deployment and maintenance costs. The same is true for the remaining message queues being discussed.

NATS and Ruby-NATS

NATS (gnatsd) is a pure Go implementation of the ruby-nats messaging system. NATS is distributed messaging rethought to be less enterprisey and more lightweight (this is in direct contrast to systems like ActiveMQ, RabbitMQ, and others). Apcera’s Derek Collison, the library’s author and former TIBCO architect, describes NATS as “more like a nervous system” than an enterprise message queue. It doesn’t do persistence or message transactions, but it’s fast and easy to use. Clustering is supported so it can be built on top of with high availability and failover in mind, and clients can be sharded. Unfortunately, TLS and SSL are not yet supported in NATS (they are in the ruby-nats) but on the roadmap.

As we observed earlier, NATS performs far better than the original Ruby implementation. Clients can be used interchangeably with NATS and ruby-nats.

Kafka

Originally developed by LinkedIn, Kafka implements publish-subscribe messaging through a distributed commit log. It’s designed to operate as a cluster that can be consumed by large amounts of clients. Horizontal scaling is done effortlessly using ZooKeeper so that additional consumers and brokers can be introduced seamlessly. It also transparently takes care of cluster rebalancing.

Kafka uses a persistent commit log to store messages on the broker. Unlike other durable queues which usually remove persisted messages on consumption, Kafka retains them for a configured period of time. This means that messages can be “replayed” in the event that a consumer fails.

ZooKeeper makes managing Kafka clusters relatively easy, but it does introduce yet another element that needs to be maintained. That said, Kafka exposes a great API and Shopify has an excellent Go client called Sarama that makes interfacing with Kafka very accessible.

Kestrel

Kestrel is a distributed message queue open sourced by Twitter. It’s intended to be fast and lightweight. Because of this, it has no concept of clustering or failover. While Kafka is built from the ground up to be clustered through ZooKeeper, the onus of message partitioning is put upon the clients of Kestrel. There is no cross-communication between nodes. It makes this trade-off in the name of simplicity. It features durable queues, item expiration, transactional reads, and fanout queues while operating over Thrift or memcache protocols.

Kestrel is designed to be small, but this means that more work must be done by the developer to build out a robust messaging system on top of it. Kafka seems to be a more “all-in-one” solution.

NSQ

NSQ is a messaging platform built by Bitly. I use the word platform because there’s a lot of tooling built around NSQ to make it useful for real-time distributed messaging. The daemon that receives, queues, and delivers messages to clients is called nsqd. The daemon can run standalone, but NSQ is designed to run in as a distributed, decentralized topology. To achieve this, it leverages another daemon called nsqlookupd. Nsqlookupd acts as a service-discovery mechanism for nsqd instances. NSQ also provides nsqadmin, which is a web UI that displays real-time cluster statistics and acts as a way to perform various administrative tasks like clearing queues and managing topics.

By default, messages in NSQ are not durable. It’s primarily designed to be an in-memory message queue, but queue sizes can be configured such that after a certain point, messages will be written to disk. Despite this, there is no built-in replication. NSQ uses acknowledgements to guarantee message delivery, but the order of delivery is not guaranteed. Messages can also be delivered more than once, so it’s the developer’s responsibility to introduce idempotence.

Similar to Kafka, additional nodes can be added to an NSQ cluster seamlessly. It also exposes both an HTTP and TCP API, which means you don’t actually need a client library to push messages into the system. Despite all the moving parts, it’s actually quite easy to deploy. Its API is also easy to use and there are a number of client libraries available.

Redis

Last up is Redis. While Redis is great for lightweight messaging and transient storage, I can’t advocate its use as the backbone of a distributed messaging system. Its pub/sub is fast but its capabilities are limited. It would require a lot of work to build a robust system. There are solutions better suited to the problem, such as those described above, and there are also some scaling concerns with it.

These matters aside, Redis is easy to use, it’s easy to deploy and manage, and it has a relatively small footprint. Depending on the use case, it can be a great choice for real-time messaging as I’ve explored before.

Conclusion

The purpose of this analysis is not to present some sort of “winner” but instead showcase a few different options for distributed messaging. There is no “one-size-fits-all” option because it depends entirely on your needs. Some use cases require fast, fire-and-forget messages, others require delivery guarantees. In fact, many systems will call for a combination of these. My hope is that this dissection will offer some insight into which solutions work best for a given problem so that you can make an intelligent decision.

A Look at Nanomsg and Scalability Protocols (Why ZeroMQ Shouldn’t Be Your First Choice)

Earlier this month, I explored ZeroMQ and how it proves to be a promising solution for building fast, high-throughput, and scalable distributed systems. Despite lending itself quite well to these types of problems, ZeroMQ is not without its flaws. Its creators have attempted to rectify many of these shortcomings through spiritual successors Crossroads I/O and nanomsg.

The now-defunct Crossroads I/O is a proper fork of ZeroMQ with the true intention being to build a viable commercial ecosystem around it. Nanomsg, however, is a reimagining of ZeroMQ—a complete rewrite in C1. It builds upon ZeroMQ’s rock-solid performance characteristics while providing several vital improvements, both internal and external. It also attempts to address many of the strange behaviors that ZeroMQ can often exhibit. Today, I’ll take a look at what differentiates nanomsg from its predecessor and implement a use case for it in the form of service discovery.

Nanomsg vs. ZeroMQ

A common gripe people have with ZeroMQ is that it doesn’t provide an API for new transport protocols, which essentially limits you to TCP, PGM, IPC, and ITC. Nanomsg addresses this problem by providing a pluggable interface for transports and messaging protocols. This means support for new transports (e.g. WebSockets) and new messaging patterns beyond the standard set of PUB/SUB, REQ/REP, etc.

Nanomsg is also fully POSIX-compliant, giving it a cleaner API and better compatibility. No longer are sockets represented as void pointers and tied to a context—simply initialize a new socket and begin using it in one step. With ZeroMQ, the context internally acts as a storage mechanism for global state and, to the user, as a pool of I/O threads. This concept has been completely removed from nanomsg.

In addition to POSIX compliance, nanomsg is hoping to be interoperable at the API and protocol levels, which would allow it to be a drop-in replacement for, or otherwise interoperate with, ZeroMQ and other libraries which implement ZMTP/1.0 and ZMTP/2.0. It has yet to reach full parity, however.

ZeroMQ has a fundamental flaw in its architecture. Its sockets are not thread-safe. In and of itself, this is not problematic and, in fact, is beneficial in some cases. By isolating each object in its own thread, the need for semaphores and mutexes is removed. Threads don’t touch each other and, instead, concurrency is achieved with message passing. This pattern works well for objects managed by worker threads but breaks down when objects are managed in user threads. If the thread is executing another task, the object is blocked. Nanomsg does away with the one-to-one relationship between objects and threads. Rather than relying on message passing, interactions are modeled as sets of state machines. Consequently, nanomsg sockets are thread-safe.

Nanomsg has a number of other internal optimizations aimed at improving memory and CPU efficiency. ZeroMQ uses a simple trie structure to store and match PUB/SUB subscriptions, which performs nicely for sub-10,000 subscriptions but quickly becomes unreasonable for anything beyond that number. Nanomsg uses a space-optimized trie called a radix tree to store subscriptions. Unlike its predecessor, the library also offers a true zero-copy API which greatly improves performance by allowing memory to be copied from machine to machine while completely bypassing the CPU.

ZeroMQ implements load balancing using a round-robin algorithm. While it provides equal distribution of work, it has its limitations. Suppose you have two datacenters, one in New York and one in London, and each site hosts instances of “foo” services. Ideally, a request made for foo from New York shouldn’t get routed to the London datacenter and vice versa. With ZeroMQ’s round-robin balancing, this is entirely possible unfortunately. One of the new user-facing features that nanomsg offers is priority routing for outbound traffic. We avoid this latency problem by assigning priority one to foo services hosted in New York for applications also hosted there. Priority two is then assigned to foo services hosted in London, giving us a failover in the event that foos in New York are unavailable.

Additionally, nanomsg offers a command-line tool for interfacing with the system called nanocat. This tool lets you send and receive data via nanomsg sockets, which is useful for debugging and health checks.

Scalability Protocols

Perhaps most interesting is nanomsg’s philosophical departure from ZeroMQ. Instead of acting as a generic networking library, nanomsg intends to provide the “Lego bricks” for building scalable and performant distributed systems by implementing what it refers to as “scalability protocols.” These scalability protocols are communication patterns which are an abstraction on top of the network stack’s transport layer. The protocols are fully separated from each other such that each can embody a well-defined distributed algorithm. The intention, as stated by nanomsg’s author Martin Sustrik, is to have the protocol specifications standardized through the IETF.

Nanomsg currently defines six different scalability protocols: PAIR, REQREP, PIPELINE, BUS, PUBSUB, and SURVEY.

PAIR (Bidirectional Communication)

PAIR implements simple one-to-one, bidirectional communication between two endpoints. Two nodes can send messages back and forth to each other.

REQREP (Client Requests, Server Replies)

The REQREP protocol defines a pattern for building stateless services to process user requests. A client sends a request, the server receives the request, does some processing, and returns a response.

PIPELINE (One-Way Dataflow)

PIPELINE provides unidirectional dataflow which is useful for creating load-balanced processing pipelines. A producer node submits work that is distributed among consumer nodes.

BUS (Many-to-Many Communication)

BUS allows messages sent from each peer to be delivered to every other peer in the group.

PUBSUB (Topic Broadcasting)

PUBSUB allows publishers to multicast messages to zero or more subscribers. Subscribers, which can connect to multiple publishers, can subscribe to specific topics, allowing them to receive only messages that are relevant to them.

SURVEY (Ask Group a Question)

The last scalability protocol, and the one in which I will further examine by implementing a use case with, is SURVEY. The SURVEY pattern is similar to PUBSUB in that a message from one node is broadcasted to the entire group, but where it differs is that each node in the group responds to the message. This opens up a wide variety of applications because it allows you to quickly and easily query the state of a large number of systems in one go. The survey respondents must respond within a time window configured by the surveyor.

Implementing Service Discovery

As I pointed out, the SURVEY protocol has a lot of interesting applications. For example:

  • What data do you have for this record?
  • What price will you offer for this item?
  • Who can handle this request?

To continue exploring it, I will implement a basic service-discovery pattern. Service discovery is a pretty simple question that’s well-suited for SURVEY: what services are out there? Our solution will work by periodically submitting the question. As services spin up, they will connect with our service discovery system so they can identify themselves. We can tweak parameters like how often we survey the group to ensure we have an accurate list of services and how long services have to respond.

This is great because 1) the discovery system doesn’t need to be aware of what services there are—it just blindly submits the survey—and 2) when a service spins up, it will be discovered and if it dies, it will be “undiscovered.”

Here is the ServiceDiscovery class:

The discover method submits the survey and then collects the responses. Notice we construct a SURVEYOR socket and set the SURVEYOR_DEADLINE option on it. This deadline is the number of milliseconds from when a survey is submitted to when a response must be received—adjust it accordingly based on your network topology. Once the survey deadline has been reached, a NanoMsgAPIError is raised and we break the loop. The resolve method will take the name of a service and randomly select an available provider from our discovered services.

We can then wrap ServiceDiscovery with a daemon that will periodically run discover.

The discovery parameters are configured through environment variables which I inject into a Docker container.

Services must connect to the discovery system when they start up. When they receive a survey, they should respond by identifying what service they provide and where the service is located. One such service might look like the following:

Once again, we configure parameters through environment variables set on a container. Note that we connect to the discovery system with a RESPONDENT socket which then responds to service queries with the service name and address. The service itself uses a REP socket that simply responds to any requests with “The answer is 42,” but it could take any number of forms such as HTTP, raw socket, etc.

The full code for this example, including Dockerfiles, can be found on GitHub.

Nanomsg or ZeroMQ?

Based on all the improvements that nanomsg makes on top of ZeroMQ, you might be wondering why you would use the latter at all. Nanomsg is still relatively young. Although it has numerous language bindings, it hasn’t reached the maturity of ZeroMQ which has a thriving development community. ZeroMQ has extensive documentation and other resources to help developers make use of the library, while nanomsg has very little. Doing a quick Google search will give you an idea of the difference (about 500,000 results for ZeroMQ to nanomsg’s 13,500).

That said, nanomsg’s improvements and, in particular, its scalability protocols make it very appealing. A lot of the strange behaviors that ZeroMQ exposes have been resolved completely or at least mitigated. It’s actively being developed and is quickly gaining more and more traction. Technically, nanomsg has been in beta since March, but it’s starting to look production-ready if it’s not there already.

  1. The author explains why he should have originally written ZeroMQ in C instead of C++. []

Distributed Messaging with ZeroMQ

“A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable.” -Leslie Lamport

With the increased prevalence and accessibility of cloud computing, distributed systems architecture has largely supplanted more monolithic constructs. The implication of using a service-oriented architecture, of course, is that you now have to deal with a myriad of difficulties that previously never existed, such as fault tolerance, availability, and horizontal scaling. Another interesting layer of complexity is providing consistency across nodes, which itself is a problem surrounded with endless research. Algorithms like Paxos and Raft attempt to provide solutions for managing replicated data, while other solutions offer eventual consistency.

Building scalable, distributed systems is not a trivial feat, but it pales in comparison to building real-time systems of a similar nature. Distributed architecture is a well-understood problem and the fact is, most applications have a high tolerance for latency. Few systems have a demonstrable need for real-time communication, but the few that do present an interesting challenge for developers. In this article, I explore the use of ZeroMQ to approach the problem of distributed, real-time messaging in a scalable manner while also considering the notion of eventual consistency.

The Intelligent Transport Layer

ZeroMQ is a high-performance asynchronous messaging library written in C++. It’s not a dedicated message broker but rather an embeddable concurrency framework with support for direct and fan-out endpoint connections over a variety of transports. ZeroMQ implements a number of different communication patterns like request-reply, pub-sub, and push-pull through TCP, PGM (multicast), in-process, and inter-process channels. The glaring lack of UDP support is, more or less, by design because ZeroMQ was conceived to provide guaranteed-ish delivery of atomic messages. The library makes no actual guarantee of delivery, but it does make a best effort. What ZeroMQ does guarantee, however, is that you will never receive a partial message, and messages will be received in order. This is important because UDP’s performance gains really only manifest themselves in lossy or congested environments.

The comprehensive list of messaging patterns and transports alone make ZeroMQ an appealing choice for building distributed applications, but it particularly excels due to its reliability, scalability and high throughput. ZeroMQ and related technologies are popular within high-frequency trading, where packet loss of financial data is often unacceptable1. In 2011, CERN actually performed a study comparing CORBA, Ice, Thrift, ZeroMQ, and several other protocols for use in its particle accelerators and ranked ZeroMQ the highest.

cern

ZeroMQ uses some tricks that allow it to actually outperform TCP sockets in terms of throughput such as intelligent message batching, minimizing network-stack traversals, and disabling Nagle’s algorithm. By default (and when possible), messages are queued on the subscriber, which attempts to avoid the problem of slow subscribers. However, when this isn’t sufficient, ZeroMQ employs a pattern called the “Suicidal Snail.” When a subscriber is running slow and is unable to keep up with incoming messages, ZeroMQ convinces the subscriber to kill itself. “Slow” is determined by a configurable high-water mark. The idea here is that it’s better to fail fast and allow the issue to be resolved quickly than to potentially allow stale data to flow downstream. Again, think about the high-frequency trading use case.

A Distributed, Scalable, and Fast Messaging Architecture

ZeroMQ makes a convincing case for use as a transport layer. Let’s explore a little deeper to see how it could be used to build a messaging framework for use in a real-time system. ZeroMQ is fairly intuitive to use and offers a plethora of bindings for various languages, so we’ll focus more on the architecture and messaging paradigms than the actual code.

About a year ago, while I first started investigating ZeroMQ, I built a framework to perform real-time messaging and document syncing called Zinc. A “document,” in this sense, is any well-structured and mutable piece of data—think text document, spreadsheet, canvas, etc. While purely academic, the goal was to provide developers with a framework for building rich, collaborative experiences in a distributed manner.

The framework actually had two implementations, one backed by the native ZeroMQ, and one backed by the pure Java implementation, JeroMQ2. It was really designed to allow any transport layer to be used though.

Zinc is structured around just a few core concepts: Endpoints, ChannelListeners, MessageHandlers, and Messages. An Endpoint represents a single node in an application cluster and provides functionality for sending and receiving messages to and from other Endpoints. It has outbound and inbound channels for transmitting messages to peers and receiving them, respectively.

endpoint

ChannelListeners essentially act as daemons listening for incoming messages when the inbound channel is open on an Endpoint. When a message is received, it’s passed to a thread pool to be processed by a MessageHandler. Therefore, Messages are processed asynchronously in the order they are received, and as mentioned earlier, ZeroMQ guarantees in-order message delivery. As an aside, this is before I began learning Go, which would make for an ideal replacement for Java here as it’s quite well-suited to the problem :)

Messages are simply the data being exchanged between Endpoints, from which we can build upon with Documents and DocumentFragments. A Document is the structured data defined by an application, while DocumentFragment represents a partial Document, or delta, which can be as fine- or coarse- grained as needed.

Zinc is built around the publish-subscribe and push-pull messaging patterns. One Endpoint will act as the host of a cluster, while the others act as clients. With this architecture, the host acts as a publisher and the clients as subscribers. Thus, when a host fires off a Message, it’s delivered to every subscribing client in a multicast-like fashion. Conversely, clients also act as “push” Endpoints with the host being a “pull” Endpoint. Clients can then push Messages into the host’s Message queue from which the host is pulling from in a first-in-first-out manner.

This architecture allows Messages to be propagated across the entire cluster—a client makes a change which is sent to the host, who propagates this delta to all clients. This means that the client who initiated the change will receive an “echo” delta, but it will be discarded by checking the Message origin, a UUID which uniquely identifies an Endpoint. Clients are then responsible for preserving data consistency if necessary, perhaps through operational transformation or by maintaining a single source of truth from which clients can reconcile.

cluster

One of the advantages of this architecture is that it scales reasonably well due to its composability. Specifically, we can construct our cluster as a tree of clients with arbitrary breadth and depth. Obviously, the more we scale horizontally or vertically, the more latency we introduce between edge nodes. Coupled with eventual consistency, this can cause problems for some applications but might be acceptable to others.

scalability

The downside is this inherently introduces a single point of failure characterized by the client-server model. One solution might be to promote another node when the host fails and balance the tree.

Once again, this framework was mostly academic and acted as a way for me to test-drive ZeroMQ, although there are some other interesting applications of it. Since the framework supports multicast message delivery via push-pull or publish-subscribe mechanisms, one such use case is autonomous load balancing.

Paired with something like ZooKeeper, etcd, or some other service-discovery protocol, clients would be capable of discovering hosts, who act as load balancers. Once a client has discovered a host, it can request to become a part of that host’s cluster. If the host accepts the request, the client can begin to send messages to the host (and, as a result, to the rest of the cluster) and, likewise, receive messages from the host (and the rest of the cluster). This enables clients and hosts to submit work to the cluster such that it’s processed in an evenly distributed way, and workers can determine whether to pass work on further down the tree or process it themselves. Clients can choose to participate in load-balancing clusters at their own will and when they become available, making them mostly autonomous. Clients could then be quickly spun-up and spun-down using, for example, Docker containers.

ZeroMQ is great for achieving reliable, fast, and scalable distributed messaging, but it’s equally useful for performing parallel computation on a single machine or several locally networked ones by facilitating in- and inter- process communication using the same patterns. It also scales in the sense that it can effortlessly leverage multiple cores on each machine. ZeroMQ is not a replacement for a message broker, but it can work in unison with traditional message-oriented middleware. Combined with Protocol Buffers and other serialization methods, ZeroMQ makes it easy to build extremely high-throughput messaging frameworks.

  1. ZeroMQ’s founder, iMatix, was responsible for moving JPMorgan Chase and the Dow Jones Industrial Average trading platforms to OpenAMQ []
  2. In systems where near real-time is sufficient, JeroMQ is adequate and benefits by not requiring any native linking. []