FIFO, Exactly-Once, and Other Costs

There’s been a lot of discussion about exactly-once semantics lately, sparked by the recent announcement of support for it in Kafka 0.11. I’ve already written at length about strong guarantees in messaging.

My former coworker Kevin Sookocheff recently made a post about ordered and exactly-once message delivery as it relates to Amazon SQS. It does a good job of illustrating what the trade-offs are, and I want to drive home some points.

In the article, Kevin shows how FIFO delivery is really only meaningful when you have one single-threaded publisher and one single-threaded receiver. Amazon’s FIFO queues allow you to control how restrictive this requirement is by applying ordering on a per-group basis. In other words, we can improve throughput if we can partition work into different ordered groups rather than a single totally ordered group. However, FIFO still effectively limits throughput on a group to a single publisher and single subscriber. If there are multiple publishers, they have to coordinate to ensure ordering is preserved with respect to our application’s semantics. On the subscriber side, things are simpler because SQS will only deliver messages in a group one at a time in order amongst subscribers.

Amazon’s FIFO queues also have an exactly-once processing feature which deduplicates messages within a five-minute window. Note, however, that there are some caveats with this, the obvious one being duplicate delivery outside of the five-minute window. A mission-critical system would have to be designed to account for this possibility. My argument here is if you still have to account for it, what’s the point unless the cost of detecting duplicates is prohibitively expensive? But to be fair, five minutes probably reduces the likelihood enough to the point that it’s useful and in those rare cases where it fails, the duplicate is acceptable.

The more interesting caveat is that FIFO queues do not guarantee exactly-once delivery to consumers (which, as we know, is impossible). Rather, they offer exactly-once processing by guaranteeing that once a message has successfully been acknowledged as processed, it won’t be delivered again. It’s up to applications to ack appropriately. When a message is delivered to a consumer, it remains in the queue until it’s acked. The visibility timeout prevents other consumers from processing it. With FIFO queues, this also means head-of-line blocking for other messages in the same group.

Now, let’s assume a subscriber receives a batch of messages from the queue, processes them—perhaps by storing some results to a database—and then sends an acknowledgement back to SQS which removes them from the queue. It’s entirely possible that during that process step a delay happens—a prolonged GC pause, crash, network delay, whatever. When this happens, the visibility timeout expires and the messages are redelivered and, potentially, reprocessed. What has to happen here is essentially cooperation between the queue and processing step. We might do this by using a database transaction to atomically process and acknowledge the messages. An alternative, yet similar, approach might be to use a write-ahead-log-like strategy whereby the consuming system reads messages from SQS and transactionally stores them in a database for future processing. Once the messages have been committed, the consumer deletes the messages from SQS. In either of these approaches, we’re basically shifting the onus of exactly-once processing onto an ACID-compliant relational database.

Note that this is really how Kafka achieves its exactly-once semantics. It requires end-to-end cooperation for exactly-once to work. State changes in your application need to be committed transactionally with your Kafka offsets.

As Kevin points out, FIFO SQS queues offer exactly-once processing only if 1) publishers never publish duplicate messages wider than five minutes apart and 2) consumers never fail to delete messages they have processed from the queue. Solving either of these problems probably requires some kind of coordination between the application and queue, likely in the form of a database transaction. And if you’re using a database either as the message source, sink, or both, what are exactly-once FIFO queues actually buying you? You’re paying a seemingly large cost in throughput for little perceived value. Your messages are already going through some sort of transactional boundary that provides ordering and uniqueness.

Where I see FIFO and exactly-once semantics being useful is when talking to systems which cannot cooperate with the end-to-end transaction. This might be a legacy service or a system with side effects, such as sending an email. Often in the case of these “distributed workflows”, latency is a lower priority and humans can be involved in various steps. Other use cases might be scheduled integrations with legacy batch processes where throughput is known a priori. These can simply be re-run when errors occur.

When people describe a messaging system with FIFO and exactly-once semantics, they’re usually providing a poor description of a relational database with support for ACID transactions. Providing these semantics in a messaging system likely still involves database transactions, it’s just more complicated. It turns out relational databases are really good at ensuring invariants like exactly-once.

I’ve picked on Kafka a bit in the past, especially with the exactly-once announcement, but my issue is not with Kafka itself. Kafka is a fantastic technology. It’s well-architected, battle-tested, and the team behind it is talented and knows the space well. My issue is more with some of the intangible costs associated with it. The same goes for similar systems (like exactly-once FIFO SQS queues). Beyond just the operational complexity (which Confluent is attempting to tackle with its Kafka-as-a-service), you have to get developer understanding. This is harder than it sounds in any modestly-sized organization. That’s not to say that developers are dumb or incapable of understanding, but the fact is your average developer is simply not thinking about all of the edge cases brought on by operating distributed systems at scale. They see “exactly-once FIFO queues” in SQS or “exactly-once delivery” in Kafka and take it at face value. They don’t read beyond the headline. They don’t look for the caveats. That’s why I took issue with how Kafka claimed to do the impossible with exactly-once delivery when it’s really exactly-once processing or, as I’ve come to call it, “atomic processing.” Henry Robinson put it best when talking about the Kafka announcement:

If I were to rewrite the article, I’d structure it thus: “exactly-once looks like atomic broadcast. Atomic broadcast is impossible. Here’s how exactly-once might fail, and here’s why we think you shouldn’t be worried about it.” That’s a harder argument for users to swallow…

Basically “exactly-once” markets better. It’s something developers can latch onto, but it’s also misleading. I know it’s only a matter of time before people start linking me to the Confluent post saying, “see, exactly-once is easy!” But this is just pain deferral. On the contrary, exactly-once semantics require careful construction of your application, assume a closed, transactional world, and do not support the case where I think people want exactly-once the most: side effects.

Interestingly, one of my chief concerns about Kafka’s implementation was what the difficulty of ensuring end-to-end cooperation would be in practice. Side effects into downstream systems with no support for idempotency or transactions could make it difficult. Jay’s counterpoint to this was that the majority of users are using good old-fashioned relational databases, so all you really need to do is commit your offsets and state changes together. It’s not trivial, but it’s not that much harder than avoiding partial updates on failure if you’re updating multiple tables. This brings us back to two of the original points of contention: why not merely use the database for exactly-once in the first place and what about legacy systems?

That’s not to say exactly-once semantics, as offered in systems like SQS and Kafka, are not useful. I think we just need to be more conscientious of the other costs and encourage developers to more deeply understand the solution space—too much sprinkling on of Kafka or exactly-once or FIFO and not enough thinking about the actual business problem. Too much prescribing of solutions and not enough describing of problems.

My thanks to Kevin Sookocheff and Beau Lyddon for reviewing this post.

You Cannot Have Exactly-Once Delivery Redux

A couple years ago I wrote You Cannot Have Exactly-Once Delivery. It stirred up quite a bit of discussion and was even referenced in a book, which I found rather surprising considering I’m not exactly an academic. Recently, the topic of exactly-once delivery has again become a popular point of discussion, particularly with the release of Kafka 0.11, which introduces support for idempotent producers, transactional writes across multiple partitions, and—wait for it—exactly-once semantics.

Naturally, when this hit Hacker News, I received a lot of messages from people asking me, “what gives?” There’s literally a TechCrunch headline titled, Confluent achieves holy grail of “exactly once” delivery on Kafka messaging service (Jay assures me, they don’t write the headlines). The myth has been disproved!

First, let me say what Confluent has accomplished with Kafka is an impressive achievement and one worth celebrating. They made a monumental effort to implement these semantics, and it paid off. The intention of this post is not to minimize any of that work but to try to clarify a few key points and hopefully cut down on some of the misinformation and noise.

“Exactly-once delivery” is a poor term. The word “delivery” is overloaded. Frankly, I think it’s a marketing word. The better term is “exactly-once processing.” Some call the distinction pedantic, but I think it’s important and there is some nuance. Kafka did not solve the Two Generals Problem. Exactly-once delivery, at the transport level, is impossible. It doesn’t exist in any meaningful way and isn’t all that interesting to talk about. “We have a word for infinite packet delay—outage,” as Jay puts it. That’s why TCP exists, but TCP doesn’t care about your application semantics. And in the end, that’s what’s interesting—application semantics. My problem with “exactly-once delivery” is it assumes too much, which causes a lot of folks to make bad assumptions. “Delivery” is a transport semantic. “Processing” is an application semantic.

All is not lost, however. We can still get correct results by having our application cooperate with the processing pipeline. This is essentially what Kafka does, exactly-once processing, and Confluent makes note of that in the blog post towards the end. What does this mean?

To achieve exactly-once processing semantics, we must have a closed system with end-to-end support for modeling input, output, and processor state as a single, atomic operation. Kafka supports this by providing a new transaction API and idempotent producers. Any state changes in your application need to be made atomically in conjunction with Kafka. You must commit your state changes and offsets together. It requires architecting your application in a specific way. State changes in external systems must be part of the Kafka transaction. Confluent’s goal is to make this as easy as possible by providing the platform around Kafka with its streams and connector APIs. The point here is it’s not just a switch you flip and, magically, messages are delivered exactly once. It requires careful construction, application logic coordination, isolating state change and non-determinism, and maintaining a closed system around Kafka. Applications that use the consumer API still have to do this themselves. As Neha puts it in the post, it’s not “magical pixie dust.” This is the most important part of the post and, if it were up to me, would be at the very top.

Exactly-once processing is an end-to-end guarantee and the application has to be designed to not violate the property as well. If you are using the consumer API, this means ensuring that you commit changes to your application state concordant with your offsets as described here.

Side effects into downstream systems with no support for idempotency or distributed transactions make this really difficult in practice I suspect. The argument is that most people are using relational databases that support transactions, but I think there’s still a reasonably large, non-obvious assumption here. Making your event processing atomic might not be easy in all cases. Moreover, every part in your system needs to participate to ensure end-to-end, exactly-once semantics.

Several other messaging systems like TIBCO EMS and Azure Service Bus have provided similar transactional processing guarantees. Kafka, as I understand it, attempts to make it easier and with less performance overhead. That’s a great accomplishment.

What’s really worth drawing attention to is the effort made by Confluent to deliver a correct solution. Achieving exactly-once processing, in and of itself, is relatively “easy” (I use that word loosely). What’s hard is dealing with the range of failures. The announcement shows they’ve done extensive testing, likely much more than most other systems, and have shown that it works and with minimal performance impact.

Kafka provides exactly-once processing semantics because it’s a closed system. There is still a lot of difficulty in ensuring those semantics are maintained across external services, but Confluent attempts to ameliorate this through APIs and tooling. But that’s just it: it’s not exactly-once semantics in a building block that’s the hard thing, it’s building loosely coupled systems that agree on the state of the world. Nevertheless, there is no holy grail here, just some good ole’ fashioned hard work.

Special thanks to Jay Kreps and Sean T. Allen for their feedback on an early draft of this post. Any inaccuracies or opinions are mine alone.

Smart Endpoints, Dumb Pipes

I read an interesting article recently called How do you cut a monolith in half? There are a lot of thoughts in the article that resonate with me and some that I disagree with, prompting this response.

The overall message of the article is don’t use a message broker to break apart a monolith because it’s like a cross between a load balancer and a database, with the disadvantages of both and the advantages of neither. The author argues that message brokers are a popular way to pull apart components over a network because they have low setup cost and provide easy service discovery, but they come at a high operational cost. My response to that is the same advice the author puts forward: it depends.

I think it’s important not to conflate “message broker” and “message queue.” The article uses them interchangeably, but it’s really talking about the latter, which I see as a subset of the former. Queues provide, well, queuing semantics. They try to ensure delivery of a message or, more generally, distribution of work. As the author puts it: “In practice, a message broker is a service that transforms network errors and machine failures into filled disks.” Replace “broker” with “queue” and I agree with this statement. This is really describing systems like RabbitMQ, Amazon SQS, TIBCO EMS, IronMQ, and maybe even Kafka fits into that category.

People are easily seduced by “fat” middleware—systems with more features, more capabilities, more responsibilities—because they think it makes their lives easier, and it might at first. Pushing off more responsibility onto the infrastructure makes the application simpler, but it also makes the infrastructure more complex, more fragile, and more slow. Take exactly-once message delivery, for example. Lots of people want it, but the pursuit of it introduces a host of complexity, overhead (in terms of development, operations, and performance), and risk. The end result is something that, in addition to these things, requires all downstream systems to not introduce duplicates and be mindful about their side effects. That is, everything in the processing pipeline must be exactly-once or nothing is. So typically what you end up with is an approximation of exactly-once delivery. You make big investments to lower the likelihood of duplicates, but you still have to deal with the problem. This might make sense if the cost of having duplicates is high, but that doesn’t seem like the common case. My advice is to always opt for the simple solution. We tend to think of engineering challenges as technical problems when, in reality, they’re often just mindset problems. Usually the technical problems have already been solved if we can just adjust our mindset.

There are a couple things to keep in mind here. The first thing to consider is simply capability lock-in. As you push more and more logic off onto more and more specialized middleware, you make it harder to move off it or change things. The second is what we already hinted at. Even with smart middleware, problems still leak out and you have to handle them at the edge—you’re now being taxed twice. This is essentially the end-to-end argument. Push responsibility to the edges, smart endpoints, dumb pipes, etc. It’s the idea that if you need business-level guarantees, build them into the business layer because the infrastructure doesn’t care about them.

The article suggests for short-lived tasks, use a load balancer because with a queue, you’ll end up building a load balancer along with an ad-hoc RPC system, with extra latency. For long-lived tasks, use a database because with a queue, you’ll be building a lock manager, a database, and a scheduler.

A persistent message queue is not bad in itself, but relying on it for recovery, and by extension, correct behaviour, is fraught with peril.

So why the distinction between message brokers and message queues? The point is not all message brokers need to be large, complicated pieces of infrastructure like most message queues tend to be. This was the reason I gravitated towards NATS while architecting Workiva’s messaging platform and why last month I joined Apcera to work on NATS full time.

When Derek Collison originally wrote NATS it was largely for the reasons stated in the article and for the reasonstalk about frequently. It was out of frustration with the current state of the art. In my opinion, NATS was the first system in the space that really turned the way we did messaging on its head (outside of maybe ZeroMQ). It didn’t provide any strong delivery guarantees, transactions, message persistence, or other responsibilities usually assumed by message brokers (there is a layer that provides some of these things, but it’s not baked into the core technology). Instead, NATS prioritized availability, simplicity, and performance over everything else. A simple technology in a vast sea of complexity (my marketing game is strong).

NATS is no-frills pub/sub. It solves the problem of service discovery and work assignment, assumes no other responsibilities, and gets out of your way. It’s designed to be easy to use, easy to operate, and add minimal latency even at scale so that, unlike many other brokers, it is a good way to integrate your microservices. What makes NATS interesting is what it doesn’t do and what it gains by not doing them. Simplicity is a feature—the ultimate sophistication, according to da Vinci. I call it looking at the negative space.

The article reads:

A protocol is the rules and expectations of participants in a system, and how they are beholden to each other. A protocol defines who takes responsibility for failure.

The problem with message brokers, and queues, is that no-one does.

NATS plays to the strengths of the end-to-end principle. It’s a dumb pipe. Handle failures and retries at the client and NATS will do everything it can to remain available and fast. Don’t rely on fragile guarantees or semantics. Instead, face complexity head-on. The author states what you really want is request/reply, which is one point I disagree on. RPC is a bad abstraction for building distributed systems. Use simple, versatile primitives and embrace asynchrony and messaging.

So yes, be careful about relying on message brokers. How smart should the pipes really be? More to the point, be careful about relying on strong semantics because experience shows few things are guaranteed when working with distributed systems at scale. Err to the side of simple. Make few assumptions of your middleware. Push work out of your infrastructure and to the edges if you care about performance and scalability because nothing is harder to scale (or operate) than slow infrastructure that tries to do too much.

Fast Topic Matching

A common problem in messaging middleware is that of efficiently matching message topics with interested subscribers. For example, assume we have a set of subscribers, numbered 1 to 3:

Subscriber Match Request
1 forex.usd
2 forex.*
3 stock.nasdaq.msft

And we have a stream of messages, numbered 1 to N:

Message Topic
1 forex.gbp
2 stock.nyse.ibm
3 stock.nyse.ge
4 forex.eur
5 forex.usd
N stock.nasdaq.msft

We are then tasked with routing messages whose topics match the respective subscriber requests, where a “*” wildcard matches any word. This is frequently a bottleneck for message-oriented middleware like ZeroMQ, RabbitMQ, ActiveMQ, TIBCO EMS, et al. Because of this, there are a number of well-known solutions to the problem. In this post, I’ll describe some of these solutions, as well as a novel one, and attempt to quantify them through benchmarking. As usual, the code is available on GitHub.

The Naive Solution

The naive solution is pretty simple: use a hashmap that maps topics to subscribers. Subscribing involves adding a new entry to the map (or appending to a list if it already exists). Matching a message to subscribers involves scanning through every entry in the map, checking if the match request matches the message topic, and returning the subscribers for those that do.

Inserts are approximately O(1) and lookups approximately O(n*m) where n is the number of subscriptions and m is the number of words in a topic. This means the performance of this solution is heavily dependent upon how many subscriptions exist in the map and also the access patterns (rate of reads vs. writes). Since most use cases are heavily biased towards searches rather than updates, the naive solution—unsurprisingly—is not a great option.

The microbenchmark below compares the performance of subscribe, unsubscribe, and lookup (matching) operations, first using an empty hashmap (what we call cold) and then with one containing 1,000 randomly generated 5-word topic subscriptions (what we call hot). With the populated subscription map, lookups are about three orders of magnitude slower, which is why we have to use a log scale in the chart below.

subscribe unsubscribe lookup
cold 172ns 51.2ns 787ns
hot 221ns 55ns 815,787ns


Inverted Bitmap

The inverted bitmap technique builds on the observation that lookups are more frequent than updates and assumes that the search space is finite. Consequently, it shifts some of the cost from the read path to the write path. It works by storing a set of bitmaps, one per topic, or criteria, in the search space. Subscriptions are then assigned an increasing number starting at 0. We analyze each subscription to determine the matching criteria and set the corresponding bits in the criteria bitmaps to 1. For example, assume our search space consists of the following set of topics:

  • forex.usd
  • forex.gbp
  • forex.jpy
  • forex.eur
  • stock.nasdaq
  • stock.nyse

We then have the following subscriptions:

  • 0 = forex.* (matches forex.usd, forex.gbp, forex.jpy, and forex.eur)
  • 1 = stock.nyse (matches stock.nyse)
  • 2 = *.* (matches everything)
  • 3 = stock.* (matches stock.nasdaq and stock.nyse)

When we index the subscriptions above, we get the following set of bitmaps:

 Criteria 0 1 2 3
forex.usd 1 0 1 0
forex.gbp 1 0 1 0
forex.jpy 1 0 1 0
forex.eur 1 0 1 0
stock.nasdaq 0 0 1 1
stock.nyse 0 1 1 1

When we match a message, we simply need to lookup the corresponding bitmap and check the set bits. As we see below, subscribe and unsubscribe are quite expensive with respect to the naive solution, but lookups now fall well below half a microsecond, which is pretty good (the fact that the chart below doesn’t use a log scale like the one above should be an indictment of the naive hashmap-based solution).

subscribe unsubscribe lookup
cold 3,795ns 198ns 380ns
hot 3,863ns 198ns 395ns

The inverted bitmap is a better option than the hashmap when we have a read-heavy workload. One limitation is it requires us to know the search space ahead of time or otherwise requires reindexing which, frankly, is prohibitively expensive.

Optimized Inverted Bitmap

The inverted bitmap technique works well enough, but only if the topic space is fairly static. It also falls over pretty quickly when the topic space and number of subscriptions are large, say, millions of topics and thousands of subscribers. The main benefit of topic-based routing is it allows for faster matching algorithms in contrast to content-based routing, which can be exponentially slower. The truth is, to be useful, your topics probably consist of stock.nyse.ibm, stock.nyse.ge, stock.nasdaq.msft, stock.nasdaq.aapl, etc., not stock.nyse and stock.nasdaq. We could end up with an explosion of topics and, even with efficient bitmaps, the memory consumption tends to be too high despite the fact that most of the bitmaps are quite sparse.

Fortunately, we can reduce the amount of memory we consume using a fairly straightforward optimization. Rather than requiring the entire search space a priori, we simply require the max topic size, in terms of words, e.g. stock.nyse.ibm has a size of 3. We can handle topics of the max size or less, e.g. stock.nyse.bac, stock.nasdaq.txn, forex.usd, index, etc. If we see a message with more words than the max, we can safely assume there are no matching subscriptions.

The optimized inverted bitmap works by splitting topics into their constituent parts. Each constituent position has a set of bitmaps, and we use a technique similar to the one described above on each part. We end up with a bitmap for each constituent which we perform a logical AND on to give a resulting bitmap. Each 1 in the resulting bitmap corresponds to a subscription. This means if the max topic size is n, we only AND at most n bitmaps. Furthermore, if we come across any empty bitmaps, we can stop early since we know there are no matching subscribers.

Let’s say our max topic size is 2 and we have the following subscriptions:

  • 0 = forex.*
  • 1 = stock.nyse
  • 2 = index
  • 3 = stock.*

The inverted bitmap for the first constituent looks like the following:

forex.* stock.nyse index stock.*
null 0 0 0 0
forex 1 0 0 0
stock 0 1 0 1
index 0 0 1 0
other 0 0 0 0

And the second constituent bitmap:

forex.* stock.nyse index stock.*
null 0 0 1 0
nyse 0 1 0 0
other 1 0 0 1

The “null” and “other” rows are worth pointing out. “Null” simply means the topic has no corresponding constituent.  For example, “index” has no second constituent, so “null” is marked. “Other” allows us to limit the number of rows needed such that we only need the ones that appear in subscriptions.  For example, if messages are published on forex.eur, forex.usd, and forex.gbp but I merely subscribe to forex.*, there’s no need to index eur, usd, or gbp. Instead, we just mark the “other” row which will match all of them.

Let’s look at an example using the above bitmaps. Imagine we want to route a message published on forex.eur. We split the topic into its constituents: “forex” and “eur.” We get the row corresponding to “forex” from the first constituent bitmap, the one corresponding to “eur” from the second (other), and then AND the rows.

forex.* stock.nyse index stock.*
1 = forex 1 0 0 0
2 = other 1 0 0 1
AND 1 0 0 0

The forex.* subscription matches.

Let’s try one more example: a message published on stock.nyse.

forex.* stock.nyse index stock.*
1 = stock 0 1 0 1
2 = nyse 0 1 0 1
AND 0 1 0 1

In this case, we also need to OR the “other” row for the second constituent. This gives us a match for stock.nyse and stock.*.

Subscribe operations are significantly faster with the space-optimized inverted bitmap compared to the normal inverted bitmap, but lookups are much slower. However, the optimized version consumes roughly 4.5x less memory for every subscription. The increased flexibility and improved scalability makes the optimized version a better choice for all but the very latency-sensitive use cases.

subscribe unsubscribe lookup
cold 1,053ns 330ns 2,724ns
hot 1,076ns 371ns 3,337ns

Trie

The optimized inverted bitmap improves space complexity, but it does so at the cost of lookup efficiency. Is there a way we can reconcile both time and space complexity? While inverted bitmaps allow for efficient lookups, they are quite wasteful for sparse sets, even when using highly compressed bitmaps like Roaring bitmaps.

Tries can often be more space efficient in these circumstances. When we add a subscription, we descend the trie, adding nodes along the way as necessary, until we run out of words in the topic. Finally, we add some metadata containing the subscription information to the last node in the chain. To match a message topic, we perform a similar traversal. If a node doesn’t exist in the chain, we know there are no subscribers. One downside of this method is, in order to support wildcards, we must backtrack on a literal match and check the “*” branch as well.

For the given set of subscriptions, the trie would look something like the following:

  • forex.*
  • stock.nyse
  • index
  • stock.*

You might be tempted to ask: “why do we even need the “*” nodes? When someone subscribes to stock.*, just follow all branches after “stock” and add the subscriber.” This would indeed move the backtracking cost from the read path to the write path, but—like the first inverted bitmap we looked at—it only works if the search space is known ahead of time. It would also largely negate the memory-usage benefits we’re looking for since it would require pre-indexing all topics while requiring a finite search space.

It turns out, this trie technique is how systems like ZeroMQ and RabbitMQ implement their topic matching due to its balance between space and time complexity and overall performance predictability.

subscribe unsubscribe lookup
cold 406ns 221ns 2,145ns
hot 443ns 257ns 2,278ns

We can see that, compared to the optimized inverted bitmap, the trie performs much more predictably with relation to the number of subscriptions held.

Concurrent Subscription Trie

One thing we haven’t paid much attention to so far is concurrency. Indeed, message-oriented middleware is typically highly concurrent since they have to deal with heavy IO (reading messages from the wire, writing messages to the wire, reading messages from disk, writing messages to disk, etc.) and CPU operations (like topic matching and routing). Subscribe, unsubscribe, and lookups are usually all happening in different threads of execution. This is especially important when we want to talk advantage of multi-core processors.

It wasn’t shown, but all of the preceding algorithms used global locks to ensure thread safety between read and write operations, making the data structures safe for concurrent use. However, the microbenchmarks don’t really show the impact of this, which we will see momentarily.

Lock-freedom, which I’ve written about, allows us to increase throughput at the expense of increased tail latency.

Lock-free concurrency means that while a particular thread of execution may be blocked, all CPUs are able to continue processing other work. For example, imagine a program that protects access to some resource using a mutex. If a thread acquires this mutex and is subsequently preempted, no other thread can proceed until this thread is rescheduled by the OS. If the scheduler is adversarial, it may never resume execution of the thread, and the program would be effectively deadlocked. A key point, however, is that the mere lack of a lock does not guarantee a program is lock-free. In this context, “lock” really refers to deadlock, livelock, or the misdeeds of a malevolent scheduler.

The concurrent subscription trie, or CS-trie,  is a new take on the trie-based solution described earlier. It combines the idea of the topic-matching trie with that of a Ctrie, or concurrent trie, which is a non-blocking concurrent hash trie.

The fundamental problem with the trie, as it relates to concurrency, is it requires a global lock, which severely limits throughput. To address this, the CS-trie uses indirection nodes, or I-nodes, which remain present in the trie even as the nodes above and below change. Subscriptions are then added or removed by creating a copy of the respective node, and performing a CAS on its parent I-node. This allows us to add, remove, and lookup subscriptions concurrently and in a lock-free, linearizable manner.

For the given set of subscribers, labeled x, y, and z, the CS-trie would look something like the following:

  • x = foo, bar, bar.baz
  • y = foo, bar.qux
  • z = bar.*

Lookups on the CS-trie perform, on average, better than the standard trie, and the CS-trie scales better with respect to concurrent operations.

subscribe unsubscribe lookup
cold 412ns 245ns 1,615ns
hot 471ns 280ns 1,637ns

Latency Comparison

The chart below shows the topic-matching operation latencies for all of the algorithms side-by-side. First, we look at the performance of a cold start (no subscriptions) and then the performance of a hot start (1,000 subscriptions).

Throughput Comparison

So far, we’ve looked at the latency of individual topic-matching operations. Next, we look at overall throughput of each of the algorithms and their memory footprint.

 algorithm msg/sec
naive  4,053.48
inverted bitmap  1,052,315.02
optimized inverted bitmap  130,705.98
trie  248,762.10
cs-trie  340,910.64

On the surface, the inverted bitmap looks like the clear winner, clocking in at over 1 million matches per second. However, we know the inverted bitmap does not scale and, indeed, this becomes clear when we look at memory consumption, underscored by the fact that the below chart uses a log scale.

Scalability with Respect to Concurrency

Lastly, we’ll look at how each of these algorithms scales with respect to concurrency. We do this by performing concurrent operations and varying the level of concurrency and number of operations. We start with a 50-50 split between reads and writes. We vary the number of goroutines from 2 to 16 (the benchmark was run using a 2.6 GHz Intel Core i7 processor with 8 logical cores). Each goroutine performs 1,000 reads or 1,000 writes. For example, the 2-goroutine benchmark performs 1,000 reads and 1,000 writes, the 4-goroutine benchmark performs 2,000 reads and 2,000 writes, etc. We then measure the total amount of time needed to complete the workload.

We can see that the tries hardly even register on the scale above, so we’ll plot them separately.

The tries are clearly much more efficient than the other solutions, but the CS-trie in particular scales well to the increased workload and concurrency.

Since most workloads are heavily biased towards reads over writes, we’ll run a separate benchmark that uses a 90-10 split reads and writes. This should hopefully provide a more realistic result.

The results look, more or less, like what we would expect, with the reduced writes improving the inverted bitmap performance. The CS-trie still scales quite well in comparison to the global-lock trie.

Conclusion

As we’ve seen, there are several approaches to consider to implement fast topic matching. There are also several aspects to look at: read/write access patterns, time complexity, space complexity, throughput, and latency.

The naive hashmap solution is generally a poor choice due to its prohibitively expensive lookup time. Inverted bitmaps offer a better solution. The standard implementation is reasonable if the search space is finite, small, and known a priori, especially if read latency is critical. The space-optimized version is a better choice for scalability, offering a good balance between read and write performance while keeping a small memory footprint. The trie is an even better choice, providing lower latency than the optimized inverted bitmap and consuming less memory. It’s particularly good if the subscription tree is sparse and topics are not known a priori. Lastly, the concurrent subscription trie is the best option if there is high concurrency and throughput matters. It offers similar performance to the trie but scales better. The only downside is an increase in implementation complexity.

Take It to the Limit: Considerations for Building Reliable Systems

Complex systems usually operate in failure mode. This is because a complex system typically consists of many discrete pieces, each of which can fail in isolation (or in concert). In a microservice architecture where a given function potentially comprises several independent service calls, high availability hinges on the ability to be partially available. This is a core tenet behind resilience engineering. If a function depends on three services, each with a reliability of 90%, 95%, and 99%, respectively, partial availability could be the difference between 99.995% reliability and 84% reliability (assuming failures are independent). Resilience engineering means designing with failure as the normal.

Anticipating failure is the first step to resilience zen, but the second is embracing it. Telling the client “no” and failing on purpose is better than failing in unpredictable or unexpected ways. Backpressure is another critical resilience engineering pattern. Fundamentally, it’s about enforcing limits. This comes in the form of queue lengths, bandwidth throttling, traffic shaping, message rate limits, max payload sizes, etc. Prescribing these restrictions makes the limits explicit when they would otherwise be implicit (eventually your server will exhaust its memory, but since the limit is implicit, it’s unclear exactly when or what the consequences might be). Relying on unbounded queues and other implicit limits is like someone saying they know when to stop drinking because they eventually pass out.

Rate limiting is important not just to prevent bad actors from DoSing your system, but also yourself. Queue limits and message size limits are especially interesting because they seem to confuse and frustrate developers who haven’t fully internalized the motivation behind them. But really, these are just another form of rate limiting or, more generally, backpressure. Let’s look at max message size as a case study.

Imagine we have a system of distributed actors. An actor can send messages to other actors who, in turn, process the messages and may choose to send messages themselves. Now, as any good software engineer knows, the eighth fallacy of distributed computing is “the network is homogenous.” This means not all actors are using the same hardware, software, or network configuration. We have servers with 128GB RAM running Ubuntu, laptops with 16GB RAM running macOS, mobile clients with 2GB RAM running Android, IoT edge devices with 512MB RAM, and everything in between, all running a hodgepodge of software and network interfaces.

When we choose not to put an upper bound on message sizes, we are making an implicit assumption (recall the discussion on implicit/explicit limits from earlier). Put another way, you and everyone you interact with (likely unknowingly) enters an unspoken contract of which neither party can opt out. This is because any actor may send a message of arbitrary size. This means any downstream consumers of this message, either directly or indirectly, must also support arbitrarily large messages.

How can we test something that is arbitrary? We can’t. We have two options: either we make the limit explicit or we keep this implicit, arbitrarily binding contract. The former allows us to define our operating boundaries and gives us something to test. The latter requires us to test at some undefined production-level scale. The second option is literally gambling reliability for convenience. The limit is still there, it’s just hidden. When we don’t make it explicit, we make it easy to DoS ourselves in production. Limits become even more important when dealing with cloud infrastructure due to their multitenant nature. They prevent a bad actor (or yourself) from bringing down services or dominating infrastructure and system resources.

In our heterogeneous actor system, we have messages bound for mobile devices and web browsers, which are often single-threaded or memory-constrained consumers. Without an explicit limit on message size, a client could easily doom itself by requesting too much data or simply receiving data outside of its control—this is why the contract is unspoken but binding.

Let’s look at this from a different kind of engineering perspective. Consider another type of system: the US National Highway System. The US Department of Transportation uses the Federal Bridge Gross Weight Formula as a means to prevent heavy vehicles from damaging roads and bridges. It’s really the same engineering problem, just a different discipline and a different type of infrastructure.

The August 2007 collapse of the Interstate 35W Mississippi River bridge in Minneapolis brought renewed attention to the issue of truck weights and their relation to bridge stress. In November 2008, the National Transportation Safety Board determined there had been several reasons for the bridge’s collapse, including (but not limited to): faulty gusset plates, inadequate inspections, and the extra weight of heavy construction equipment combined with the weight of rush hour traffic.

The DOT relies on weigh stations to ensure trucks comply with federal weight regulations, fining those that exceed restrictions without an overweight permit.

The federal maximum weight is set at 80,000 pounds. Trucks exceeding the federal weight limit can still operate on the country’s highways with an overweight permit, but such permits are only issued before the scheduled trip and expire at the end of the trip. Overweight permits are only issued for loads that cannot be broken down to smaller shipments that fall below the federal weight limit, and if there is no other alternative to moving the cargo by truck.

Weight limits need to be enforced so civil engineers have a defined operating range for the roads, bridges, and other infrastructure they build. Computers are no different. This is the reason many systems enforce these types of limits. For example, Amazon clearly publishes the limits for its Simple Queue Service—the max in-flight messages for standard queues is 120,000 messages and 20,000 messages for FIFO queues. Messages are limited to 256KB in size. Amazon KinesisApache KafkaNATS, and Google App Engine pull queues all limit messages to 1MB in size. These limits allow the system designers to optimize their infrastructure and ameliorate some of the risks of multitenancy—not to mention it makes capacity planning much easier.

Unbounded anything—whether its queues, message sizes, queries, or traffic—is a resilience engineering anti-pattern. Without explicit limits, things fail in unexpected and unpredictable ways. Remember, the limits exist, they’re just hidden. By making them explicit, we restrict the failure domain giving us more predictability, longer mean time between failures, and shorter mean time to recovery at the cost of more upfront work or slightly more complexity.

It’s better to be explicit and handle these limits upfront than to punt on the problem and allow systems to fail in unexpected ways. The latter might seem like less work at first but will lead to more problems long term. By requiring developers to deal with these limitations directly, they will think through their APIs and business logic more thoroughly and design better interactions with respect to stability, scalability, and performance.

Benchmarking Commit Logs

In this article, we look at Apache Kafka and NATS Streaming, two messaging systems based on the idea of a commit log. We’ll compare some of the features of both but spend less time talking about Kafka since by now it’s quite well known. Similar to previous studies, we’ll attempt to quantify their general performance characteristics through careful benchmarking.

The purpose of this benchmark is to test drive the newly released NATS Streaming system, which was made generally available just in the last few months. NATS Streaming doesn’t yet support clustering, so we try to put its performance into context by looking at a similar configuration of Kafka.

Unlike conventional message queues, commit logs are an append-only data structure. This results in several nice properties like total ordering of messages, at-least-once delivery, and message-replay semantics. Jay Kreps’ blog post The Log is a great introduction to the concept and particularly why it’s so useful in the context of distributed systems and stream processing (his book I Heart Logs is an extended version of the blog post and is a quick read).

Kafka, which originated at LinkedIn, is by far the most popular and most mature implementation of the commit log (AWS offers their own flavor of it called Kinesis, and imitation is the sincerest form of flattery). It’s billed as a “distributed streaming platform for building real-time data pipelines and streaming apps.” The much newer NATS Streaming is actually a data-streaming layer built on top of Apcera’s high-performance publish-subscribe system NATS. It’s billed as “real-time streaming for Big Data, IoT, Mobile, and Cloud Native Applications.” Both have some similarities as well as some key differences.

Fundamental to the notion of a log is a way to globally order events. Neither NATS Streaming nor Kafka are actually a single log but many logs, each totally ordered using a sequence number or offset, respectively.

In Kafka, topics are partitioned into multiple logs which are then replicated across a number of servers for fault tolerance, making it a distributed commit log. Each partition has a server that acts as the leader. Cluster membership and leader election is managed by ZooKeeper.

NATS Streaming’s topics are called “channels” which are globally ordered. Unlike Kafka, NATS Streaming does not support replication or partitioning of channels, though my understanding is clustering support is slated for Q1 2017. Its message store is pluggable, so it can provide durability using a file-backed implementation, like Kafka, or simply an in-memory store.

NATS Streaming is closer to a hybrid of traditional message queues and the commit log. Like Kafka, it allows replaying the log from a specific offset, the beginning of time, or the newest offset, but it also exposes an API for reading from the log at a specific physical time offset, e.g. all messages from the last 30 seconds. Kafka, on the other hand, only has a notion of logical offsets (correction: Kafka added support for offset lookup by timestamp in 0.10.1.0) . Generally, relying on physical time is an anti-pattern in distributed systems due to clock drift and the fact that clocks are not always monotonic. For example, imagine a situation where a NATS Streaming server is restarted and the clock is changed. Messages are still ordered by their sequence numbers but their timestamps might not reflect that. Developers would need to be aware of this while implementing their business logic.

With Kafka, it’s strictly on consumers to track their offset into the log (or the high-level consumer which stores offsets in ZooKeeper (correction: Kafka itself can now store offsets which is used by the new Consumer API, meaning clients do not have to manage offsets directly or rely on ZooKeeper)). NATS Streaming allows clients to either track their sequence number or use a durable subscription, which causes the server to track the last acknowledged message for a client. If the client restarts, the server will resume delivery starting at the earliest unacknowledged message. This is closer to what you would expect from a traditional message-oriented middleware like RabbitMQ.

Lastly, NATS Streaming supports publisher and subscriber rate limiting. This works by configuring the maximum number of in-flight (unacknowledged) messages either from the publisher to the server or from the server to the subscriber. Starting in version 0.9, Kafka supports a similar rate limiting feature that allows producer and consumer byte-rate thresholds to be defined for groups of clients with its Quotas protocol.

Kafka was designed to avoid tracking any client state on the server for performance and scalability reasons. Throughput and storage capacity scale linearly with the number of nodes. NATS Streaming provides some additional features over Kafka at the cost of some added state on the server. Since clustering isn’t supported, there isn’t really any scale or HA story yet, so it’s unclear how that will play out. That said, once replication is supported, there’s a lot of work going into verifying its correctness (which is a major advantage Kafka has).

Benchmarks

Since NATS Streaming does not support replication at this time (0.3.1), we’ll compare running a single instance of it with file-backed persistence to running a single instance of Kafka (0.10.1.0). We’ll look at both latency and throughput running on commodity hardware (m4.xlarge EC2 instances) with load generation and consumption each running on separate instances. In all of these benchmarks, the systems under test have not been tuned at all and are essentially in their “off-the-shelf” configurations.

We’ll first look at latency by publishing messages of various sizes, ranging from 256 bytes to 1MB, at a fixed rate of 50 messages/second for 30 seconds. Message contents are randomized to account for compression. We then plot the latency distribution by percentile on a logarithmic scale from the 0th percentile to the 99.9999th percentile. Benchmarks are run several times in an attempt to produce a “normalized” result. The benchmark code used is open source.

First, to establish a baseline and later get a feel for the overhead added by the file system, we’ll benchmark NATS Streaming with in-memory storage, meaning messages are not written to disk.

Unsurprisingly, the 1MB configuration has much higher latencies than the other configurations, but everything falls within single-digit-millisecond latencies.nats_mem

NATS Streaming 0.3.1 (in-memory persistence)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 0.3750ms 1.0367ms 1.1257ms 1.1257ms 1.1257ms
1KB 0.38064ms 0.8321ms 1.3260ms 1.3260ms 1.3260ms
5KB 0.4408ms 1.7569ms 2.1465ms 2.1465ms 2.1465ms
1MB 6.6337ms 8.8097ms 9.5263ms 9.5263ms 9.5263ms

Next, we look at NATS Streaming with file-backed persistence. This provides the same durability guarantees as Kafka running with a replication factor of 1. By default, Kafka stores logs under /tmp. Many Unix distributions mount /tmp to tmpfs which appears as a mounted file system but is actually stored in volatile memory. To account for this and provide as level a playing field as possible, we configure NATS Streaming to also store its logs in /tmp.

As expected, latencies increase by about an order of magnitude once we start going to disk.

nats_file_fsync

NATS Streaming 0.3.1 (file-backed persistence)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 21.7051ms 25.0369ms 27.0524ms 27.0524ms 27.0524ms
1KB 20.6090ms 23.8858ms 24.7124ms 24.7124ms 24.7124ms
5KB 22.1692ms 35.7394ms 40.5612ms 40.5612ms 40.5612ms
1MB 45.2490ms 130.3972ms 141.1564ms 141.1564ms 141.1564ms

Since we will be looking at Kafka, there is an important thing to consider relating to fsync behavior. As of version 0.8, Kafka does not call fsync directly and instead relies entirely on the background flush performed by the OS. This is clearly indicated by their documentation:

We recommend using the default flush settings which disable application fsync entirely. This means relying on the background flush done by the OS and Kafka’s own background flush. This provides the best of all worlds for most uses: no knobs to tune, great throughput and latency, and full recovery guarantees. We generally feel that the guarantees provided by replication are stronger than sync to local disk, however the paranoid still may prefer having both and application level fsync policies are still supported.

However, NATS Streaming calls fsync every time a batch is written to disk by default. This can be disabled through the use of the –file_sync flag. By setting this flag to false, we put NATS Streaming’s persistence behavior closer in line with Kafka’s (again assuming a replication factor of 1).

As an aside, the comparison between NATS Streaming and Kafka still isn’t completely “fair”. Jay Kreps points out that Kafka relies on replication as the primary means of durability.

Kafka leaves [fsync] off by default because it relies on replication not fsync for durability, which is generally faster. If you don’t have replication I think you probably need fsync and maybe some kind of high integrity file system.

I don’t think we can provide a truly fair comparison until NATS Streaming supports replication, at which point we will revisit this.

To no one’s surprise, setting –file_sync=false has a significant impact on latency, shown in the distribution below.

nats_file_no_fsync

In fact, it’s now in line with the in-memory performance as before for 256B, 1KB, and 5KB messages, shown in the comparison below.

nats_file_mem

For a reason I have yet to figure out, the latency for 1MB messages is roughly an order of magnitude faster when fsync is enabled after the 95th percentile, which seems counterintuitive. If anyone has an explanation, I would love to hear it. I’m sure there’s a good debug story there. The distribution below shows the 1MB configuration for NATS Streaming with and without fsync enabled and just how big the difference is at the 95th percentile and beyond.

nats_file_mem_1mb

NATS Streaming 0.3.1 (file-backed persistence, –file_sync=false)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 0.4304ms 0.8577ms 1.0706ms 1.0706ms 1.0706ms
1KB 0.4372ms 1.5987ms 1.8651ms 1.8651ms 1.8651ms
5KB 0.4939ms 2.0828ms 2.2540ms 2.2540ms 2.2540ms
1MB 1296.1464ms 1556.1441ms 1596.1457ms 1596.1457ms 1596.1457ms

Kafka with replication factor 1 tends to have higher latencies than NATS Streaming with –file_sync=false. There was one potential caveat here Ivan Kozlovic pointed out to me in that NATS Streaming uses a caching optimization for reads that may put it at an advantage.

Now, there is one side where NATS Streaming *may* be looking better and not fair to Kafka. By default, the file store keeps everything in memory once stored. This means look-ups will be fast. There is only a all-or-nothing mode right now, which means either cache everything or nothing. With caching disabled (–file_cache=false), every lookup will result in disk access (which when you have 1 to many subscribers will be bad). I am working on changing that. But if you do notice that in Kafka, consuming results in a disk read (given the other default behavior described above, they actually may not ;-)., then you could disable NATS Streaming file caching.

Fortunately, we can verify if Kafka is actually going to disk to read messages back from the log during the benchmark using iostat. We see something like this for the majority of the benchmark duration:

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          13.53    0.00   11.28    0.00    0.00   75.19

Device:    tps   Blk_read/s   Blk_wrtn/s   Blk_read   Blk_wrtn
xvda      0.00         0.00         0.00          0          0

Specifically, we’re interested in Blk_read, which indicates the total number of blocks read. It appears that Kafka does indeed make heavy use of the operating system’s page cache as Blk_wrtn and Blk_read rarely show any activity throughout the entire benchmark. As such, it seems fair to leave NATS Streaming’s –file_cache=true, which is the default.

One interesting point is Kafka offloads much of its caching to the page cache and outside of the JVM heap, clearly in an effort to minimize GC pauses. I’m not clear if the cache Ivan refers to in NATS Streaming is off-heap or not (NATS Streaming is written in Go which, like Java, is a garbage-collected language).

Below is the distribution of latencies for 256B, 1KB, and 5KB configurations in Kafka.

kafka

Similar to NATS Streaming, 1MB message latencies tend to be orders of magnitude worse after about the 80th percentile. The distribution below compares the 1MB configuration for NATS Streaming and Kafka.

nats_kafka_1mb

Kafka 0.10.1.0 (replication factor 1)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 0.9230ms 1.4575ms 1.6596ms 1.6596ms 1.6596ms
1KB 0.5942ms 1.3123ms 17.6556ms 17.6556ms 17.6556ms
5KB 0.7203ms 5.7236ms 18.9334ms 18.9334ms 18.9334ms
1MB 5337.3174ms 5597.3315ms 5617.3199ms 5617.3199ms 5617.3199ms

The percentile distributions below compare NATS Streaming and Kafka for the 256B, 1KB, and 5KB configurations, respectively.

nats_kafka_256b

nats_kafka_1kb

nats_kafka_5kb

Next, we’ll look at overall throughput for the two systems. This is done by publishing 100,000 messages using the same range of sizes as before and measuring the elapsed time. Specifically, we measure throughput at the publisher and the subscriber.

Despite using an asynchronous publisher in both the NATS Streaming and Kafka benchmarks, we do not consider the publisher “complete” until it has received acks for all published messages from the server. In Kafka, we do this by setting request.required.acks to 1, which means the leader replica has received the data, and consuming the received acks. This is important because the default value is 0, which means the producer never waits for an ack from the broker. In NATS Streaming, we provide an ack callback on every publish. We use the same benchmark configuration as the latency tests, separating load generation and consumption on different EC2 instances. Note the log scale in the following charts.

Once again, we’ll start by looking at NATS Streaming using in-memory persistence. The truncated 1MB send and receive throughputs are 93.01 messages/second.

nats_mem_throughput

For comparison, we now look at NATS Streaming with file persistence and –file_sync=false. As before, this provides the closest behavior to Kafka’s default flush behavior. The second chart shows a side-by-side comparison between NATS Streaming with in-memory and file persistence.

nats_file_throughput

nats_compare_throughput

Lastly, we look at Kafka with replication factor 1. Throughput significantly deteriorates when we set request.required.acks = 1 since the producer must wait for all acks from the server. This is important though because, by default, the client does not require an ack from the server. If this were the case, the producer would have no idea how much data actually reached the server once it finished—it could simply be buffered in the client, in flight over the wire, or in the server but not yet on disk. Running the benchmark with request.required.acks = 0 yields much higher throughput on the sender but is basically an exercise in how fast you can write to a channel using the Sarama Go client—slightly misleading.

kafka_throughput

Looking at some comparisons of Kafka and NATS Streaming, we can see that NATS Streaming has higher throughput in all but a few cases.

nats_kafka_throughput

nats_kafka_send_throughput

I want to repeat the disclaimer from before: the purpose of this benchmark is to test drive the newly released NATS Streaming system (which as mentioned earlier, doesn’t yet support clustering), and put its performance into context by looking at a similar configuration of Kafka.

Kafka generally scales very well, so measuring the throughput of a single broker with a single producer and single consumer isn’t particularly meaningful. In reality, we’d be running a cluster with several brokers and partitioning our topics across them.

For as young as it is, NATS Streaming has solid performance (which shouldn’t come as much of a surprise considering the history of NATS itself), and I imagine it will only get better with time as the NATS team continues to optimize. In some ways, NATS Streaming bridges the gap between the commit log as made popular by Kafka and the conventional message queue as made popular by protocols like JMS, AMQP, STOMP, and the like.

The bigger question at this point is how NATS Streaming will tackle scaling and replication (a requirement for true production-readiness in my opinion). Kafka was designed from the ground up for high scalability and availability through the use of external coordination (read ZooKeeper). Naturally, there is a lot of complexity and cost that comes with that. NATS Streaming attempts to keep NATS’ spirit of simplicity, but it’s yet to be seen how it will reconcile that with the complex nature of distributed systems. I’m excited to see where Apcera takes NATS Streaming and generally the NATS ecosystem in the future since the team has a lot of experience in this area.

Benchmarking Message Queue Latency

About a year and a half ago, I published Dissecting Message Queues, which broke down a few different messaging systems and did some performance benchmarking. It was a naive attempt and had a lot of problems, but it was also my first time doing any kind of system benchmarking. It turns out benchmarking systems correctly is actually pretty difficult and many folks get it wrong. I don’t claim to have gotten it right, but over the past year and a half I’ve learned a lot, tried to build some better tools, and improve my methodology.

Tooling and Methodology

The Dissecting Message Queues benchmarks used a framework I wrote which published a specified number of messages effectively as fast as possible, received them, and recorded the end-to-end latency. There are several problems with this. First, load generation and consumption run on the same machine. Second, the system under test runs on the same machine as the benchmark client—both of these confound measurements. Third, running “pedal to the metal” and looking at the resulting latency isn’t a very useful benchmark because it’s not representative of a production environment (as Gil Tene likes to say, this is like driving your car as fast as possible, crashing it into a pole, and looking at the shape of the bumper afterwards—it’s always going to look bad). Lastly, the benchmark recorded average latency, which, for all intents and purposes, is a useless metric to look at.

I wrote Flotilla to automate “scaled-up” benchmarking—running the broker and benchmark clients on separate, distributed VMs. Flotilla also attempted to capture a better view of latency by looking at the latency distribution, though it only went up to the 99th percentile, which can sweep a lot of really bad things under the rug as we’ll see later. However, it still ran tests at full throttle, which isn’t great.

Bench is an attempt to get back to basics. It’s a simple, generic benchmarking library for measuring latency. It provides a straightforward Requester interface which can be implemented for various systems under test. Bench works by attempting to issue a fixed rate of requests per second and measuring the latency of each request issued synchronously. Latencies are captured using HDR Histogram, which observes the complete latency distribution and allows us to look, for example, at “six nines” latency.

Introducing a request schedule allows us to measure latency for different configurations of request rate and message size, but in a “closed-loop” test, it creates another problem called coordinated omission. The problem with a lot of benchmarks is that they end up measuring service time rather than response time, but the latter is likely what you care about because it’s what your users experience.

The best way to describe service time vs. response time is to think of a cash register. The cashier might be able to ring up a customer in under 30 seconds 99% of the time, but 1% of the time it takes three minutes. The time it takes to ring up a customer is the service time, while the response time consists of the service time plus the time the customer waited in line. Thus, the response time is dependent upon the variation in both service time and the rate of arrival. When we measure latency, we really want to measure response time.

Now, let’s think about how most latency benchmarks work. They usually do this:

  1. Note timestamp before request, t0.
  2. Make synchronous request.
  3. Note timestamp after request, t1.
  4. Record latency t1t0.
  5. Repeat as needed for request schedule.

What’s the problem with this? Nothing, as long as our requests fit within the specified request schedule.  For example, if we’re issuing 100 requests per second and each request takes 10 ms to complete, we’re good. However, if one request takes 100 ms to complete, that means we issued only one request during those 100 ms when, according to our schedule, we should have issued 10 requests in that window. Nine other requests should have been issued, but the benchmark effectively coordinated with the system under test by backing off. In reality, those nine requests waited in line—one for 100 ms, one for 90 ms, one for 80 ms, etc. Most benchmarks don’t capture this time spent waiting in line, yet it can have a dramatic effect on the results. The graph below shows the same benchmark with coordinated omission both uncorrected (red) and corrected (blue):
coordinated_omission

HDR Histogram attempts to correct coordinated omission by filling in additional samples when a request falls outside of its expected interval. We can also deal with coordinated omission by simply avoiding it altogether—always issue requests according to the schedule.

Message Queue Benchmarks

I benchmarked several messaging systems using bench—RabbitMQ (3.6.0), Kafka (0.8.2.2 and 0.9.0.0), Redis (2.8.4) pub/sub, and NATS (0.7.3). In this context, a “request” consists of publishing a message to the server and waiting for a response (i.e. a roundtrip). We attempt to issue requests at a fixed rate and correct for coordinated omission, then plot the complete latency distribution all the way up to the 99.9999th percentile. We repeat this for several configurations of request rate and request size. It’s also important to note that each message going to and coming back from the server are of the specified size, i.e. the “response” is the same size as the “request.”

The configurations used are listed below. Each configuration is run for a sustained 30 seconds.

  • 256B requests at 3,000 requests/sec (768 KB/s)
  • 1KB requests at 3,000 requests/sec (3 MB/s)
  • 5KB requests at 2,000 requests/sec (10 MB/s)
  • 1KB requests at 20,000 requests/sec (20.48 MB/s)
  • 1MB requests at 100 requests/sec (100 MB/s)

These message sizes are mostly arbitrary, and there might be a better way to go about this. Though I think it’s worth pointing out that the Ethernet MTU is 1500 bytes, so accounting for headers, the maximum amount of data you’ll get in a single TCP packet will likely be between 1400 and 1500 bytes.

The system under test and benchmarking client are on two different m4.xlarge EC2 instances (2.4 GHz Intel Xeon Haswell, 16GB RAM) with enhanced networking enabled.

Redis and NATS

Redis pub/sub and NATS have similar performance characteristics. Both offer very lightweight, non-transactional messaging with no persistence options (discounting Redis’ RDB and AOF persistence, which don’t apply to pub/sub), and both support some level of topic pattern matching. I’m hesitant to call either a “message queue” in the traditional sense, so I usually just refer to them as message brokers or buses. Because of their ephemeral nature, both are a nice choice for low-latency, lossy messaging.

Redis tail latency peaks around 1.5 ms.

Redis_latency

NATS performance looks comparable to Redis. Latency peaks around 1.2 ms.

NATS_latency

The resemblance becomes more apparent when we overlay the two distributions for the 1KB and 5KB runs. NATS tends to be about 0.1 to 0.4 ms faster.

Redis_NATS_latency

The 1KB, 20,000 requests/sec run uses 25 concurrent connections. With concurrent load, tail latencies jump up, peaking around 90 and 120 ms at the 99.9999th percentile in NATS and Redis, respectively.

Redis_NATS_1KB_20000_latency

Large messages (1MB) don’t hold up nearly as well, exhibiting large tail latencies starting around the 95th and 97th percentiles in NATS and Redis, respectively. 1MB is the default maximum message size in NATS. The latency peaks around 214 ms. Again, keep in mind these are synchronous, roundtrip latencies.

Redis_NATS_1MB_latency

Apcera’s Ivan Kozlovic pointed out that the version of the NATS client I was using didn’t include a recent performance optimization. Before, the protocol parser scanned over each byte in the payload, but the newer version skips to the end (the previous benchmarks were updated to use the newer version). The optimization does have a noticeable effect, illustrated below. There was about a 30% improvement with the 5KB latencies.

NATS_optimization_latency

The difference is even more pronounced in the 1MB case, which has roughly a 90% improvement up to the 90th percentile. The linear scale in the graph below hides this fact, but at the 90th percentile, for example, the pre-optimization latency is 10 ms and the optimized latency is 3.8 ms. Clearly, the large tail is mostly unaffected, however.

NATS_1MB_optimization_latency

In general, this shows that NATS and Redis are better suited to smaller messages (well below 1MB), in which latency tends to be sub-millisecond up to four nines.

RabbitMQ and Kafka

RabbitMQ is a popular AMQP implementation. Unlike NATS, it’s a more traditional message queue in the sense that it supports binding queues and transactional-delivery semantics. Consequently, RabbitMQ is a more “heavyweight” queuing solution and tends to pay an additional premium with latency. In this benchmark, non-durable queues were used. As a result, we should see reduced latencies since we aren’t going to disk.

RabbitMQ_latency

Latency tends to be sub-millisecond up to the 99.7th percentile, but we can see that it doesn’t hold up to NATS beyond that point for the 1KB and 5KB payloads.

RabbitMQ_NATS_latency

Kafka, on the other hand, requires disk persistence, but this doesn’t have a dramatic effect on latency until we look at the 94th percentile and beyond, when compared to RabbitMQ. Writes should be to page cache with flushes to disk happening asynchronously. The graphs below are for 0.8.2.2.

Kafka_latency

RabbitMQ_Kafka_latency

Once again, the 1KB, 20,000 requests/sec run is distributed across 25 concurrent connections. With RabbitMQ, we see the dramatic increase in tail latencies as we did with Redis and NATS. The RabbitMQ latencies in the concurrent case stay in line with the previous latencies up to about the 99th percentile. Interestingly, Kafka, doesn’t appear to be significantly affected. The latencies of 20,000 requests/sec at 1KB per request are not terribly different than the latencies of 3,000 requests/sec at 1KB per request, both peaking around 250 ms.

RabbitMQ_Kafka_1KB_20000_latency

What’s particularly interesting is the behavior of 1MB messages vs. the rest. With RabbitMQ, there’s almost a 14x difference in max latencies between the 5KB and 1MB runs with 1MB being the faster. With Kafka 0.8.2.2, the difference is over 126x in the same direction. We can plot the 1MB latencies for RabbitMQ and Kafka since it’s difficult to discern them with a linear scale.

RabbitMQ_Kafka_1MB_latency

tried to understand what was causing this behavior. I’ve yet to find a reasonable explanation for RabbitMQ. Intuition tells me it’s a result of buffering—either at the OS level or elsewhere—and the large messages cause more frequent flushing. Remember that these benchmarks were with transient publishes. There should be no disk accesses occurring, though my knowledge of Rabbit’s internals are admittedly limited. The fact that this behavior occurs in RabbitMQ and not Redis or NATS seems odd. Nagle’s algorithm is disabled in all of the benchmarks (TCP_NODELAY). After inspecting packets with Wireshark, it doesn’t appear to be a problem with delayed acks.

To show just how staggering the difference is, we can plot Kafka 0.8.2.2 and RabbitMQ 1MB latencies alongside Redis and NATS 5KB latencies. They are all within the same ballpark. Whatever the case may be, both RabbitMQ and Kafka appear to handle large messages extremely well in contrast to Redis and NATS.

RabbitMQ_Kafka_NATS_Redis_latency

This leads me to believe you’ll see better overall throughput, in terms of raw data, with RabbitMQ and Kafka, but more predictable, tighter tail latencies with Redis and NATS. Where SLAs are important, it’s hard to beat NATS. Of course, it’s unfair to compare Kafka with something like NATS or Redis or even RabbitMQ since they are very different (and sometimes complementary), but it’s also worth pointing out that the former is much more operationally complex.

However, benchmarking Kafka 0.9.0.0 (blue and red) shows an astounding difference in tail latencies compared to 0.8.2.2 (orange and green).

Kafka_0_8_0_9_latency

Kafka 0.9’s performance is much more in line with RabbitMQ’s at high percentiles as seen below.

RabbitMQ_Kafka_0_9_latency

Likewise, it’s a much closer comparison to NATS when looking at the 1KB and 5KB runs.

Kafka_NATS_latency

As with 0.8, Kafka 0.9 does an impressive job dealing with 1MB messages in comparison to NATS, especially when looking at the 92nd percentile and beyond. It’s hard to decipher in the graph below, but Kafka 0.9’s 99th, 99.9th, and 99.99th percentile latencies are 0.66, 0.78, and 1.35 ms, respectively.

Kafka_0_9_NATS_1MB

My initial thought was that the difference between Kafka 0.8 and 0.9 was attributed to a change in fsync behavior. To quote the Kafka documentation:

Kafka always immediately writes all data to the filesystem and supports the ability to configure the flush policy that controls when data is forced out of the OS cache and onto disk using the and flush. This flush policy can be controlled to force data to disk after a period of time or after a certain number of messages has been written.

However, there don’t appear to be any changes in the default flushing configuration between 0.8 and 0.9. The default configuration disables application fsync entirely, instead relying on the OS’s background flush. Jay Kreps indicates it’s a result of several “high percentile latency issues” that were fixed in 0.9. After scanning the 0.9 release notes, I was unable to determine specifically what those fixes might be. Either way, the difference is certainly not something to scoff at.

Conclusion

As always, interpret these benchmark results with a critical eye and perform your own tests if you’re evaluating these systems. This was more an exercise in benchmark methodology and tooling than an actual system analysis (and, as always, there’s still a lot of room for improvement). If anything, I think these results show how much we can miss by not looking beyond the 99th percentile. In almost all cases, everything looks pretty good up to that point, but after that things can get really bad. This is important to be conscious of when discussing SLAs.

I think the key takeaway is to consider your expected load in production, benchmark configurations around that, determine your allowable service levels, and iterate or provision more resources until you’re within those limits. The other important takeaway with respect to benchmarking is to look at the complete latency distribution. Otherwise, you’re not getting a clear picture of how your system actually behaves.

From the Ground Up: Reasoning About Distributed Systems in the Real World

The rabbit hole is deep. Down and down it goes. Where it ends, nobody knows. But as we traverse it, patterns appear. They give us hope, they quell the fear.

Distributed systems literature is abundant, but as a practitioner, I often find it difficult to know where to start or how to synthesize this knowledge without a more formal background. This is a non-academic’s attempt to provide a line of thought for rationalizing design decisions. This piece doesn’t necessarily contribute any new ideas but rather tries to provide a holistic framework by studying some influential existing ones. It includes references which provide a good starting point for thinking about distributed systems. Specifically, we look at a few formal results and slightly less formal design principles to provide a basis from which we can argue about system design.

This is your last chance. After this, there is no turning back. I wish I could say there is no red-pill/blue-pill scenario at play here, but the world of distributed systems is complex. In order to make sense of it, we reason from the ground up while simultaneously stumbling down the deep and cavernous rabbit hole.

Guiding Principles

In order to reason about distributed system design, it’s important to lay out some guiding principles or theorems used to establish an argument. Perhaps the most fundamental of which is the Two Generals Problem originally introduced by Akkoyunlu et al. in Some Constraints and Trade-offs in the Design of Network Communications and popularized by Jim Gray in Notes on Data Base Operating Systems in 1975 and 1978, respectively. The Two Generals Problem demonstrates that it’s impossible for two processes to agree on a decision over an unreliable network. It’s closely related to the binary consensus problem (“attack” or “don’t attack”) where the following conditions must hold:

  • Termination: all correct processes decide some value (liveness property).
  • Validity: if all correct processes decide v, then v must have been proposed by some correct process (non-triviality property).
  • Integrity: all correct processes decide at most one value v, and is the “right” value (safety property).
  • Agreement: all correct processes must agree on the same value (safety property).

It becomes quickly apparent that any useful distributed algorithm consists of some intersection of both liveness and safety properties. The problem becomes more complicated when we consider an asynchronous network with crash failures:

  • Asynchronous: messages may be delayed arbitrarily long but will eventually be delivered.
  • Crash failure: processes can halt indefinitely.

Considering this environment actually leads us to what is arguably one of the most important results in distributed systems theory: the FLP impossibility result introduced by Fischer, Lynch, and Patterson in their 1985 paper Impossibility of Distributed Consensus with One Faulty Process. This result shows that the Two Generals Problem is provably impossible. When we do not consider an upper bound on the time a process takes to complete its work and respond in a crash-failure model, it’s impossible to make the distinction between a process that is crashed and one that is taking a long time to respond. FLP shows there is no algorithm which deterministically solves the consensus problem in an asynchronous environment when it’s possible for at least one process to crash. Equivalently, we say it’s impossible to have a perfect failure detector in an asynchronous system with crash failures.

When talking about fault-tolerant systems, it’s also important to consider Byzantine faults, which are essentially arbitrary faults. These include, but are not limited to, attacks which might try to subvert the system. For example, a security attack might try to generate or falsify messages. The Byzantine Generals Problem is a generalized version of the Two Generals Problem which describes this fault model. Byzantine fault tolerance attempts to protect against these threats by detecting or masking a bounded number of Byzantine faults.

Why do we care about consensus? The reason is it’s central to so many important problems in system design. Leader election implements consensus allowing you to dynamically promote a coordinator to avoid single points of failure. Distributed databases implement consensus to ensure data consistency across nodes. Message queues implement consensus to provide transactional or ordered delivery. Distributed init systems implement consensus to coordinate processes. Consensus is fundamentally an important problem in distributed programming.

It has been shown time and time again that networks, whether local-area or wide-area, are often unreliable and largely asynchronous. As a result, these proofs impose real and significant challenges to system design.

The implications of these results are not simply academic: these impossibility results have motivated a proliferation of systems and designs offering a range of alternative guarantees in the event of network failures.

L. Peter Deutsch’s fallacies of distributed computing are a key jumping-off point in the theory of distributed systems. It presents a set of incorrect assumptions which many new to the space frequently make, of which the first is “the network is reliable.”

  1. The network is reliable.
  2. Latency is zero.
  3. Bandwidth is infinite.
  4. The network is secure.
  5. Topology doesn’t change.
  6. There is one administrator.
  7. Transport cost is zero.
  8. The network is homogeneous.

The CAP theorem, while recently the subject of scrutiny and debate over whether it’s overstated or not, is a useful tool for establishing fundamental trade-offs in distributed systems and detecting vendor sleight of hand. Gilbert and Lynch’s Perspectives on the CAP Theorem lays out the intrinsic trade-off between safety and liveness in a fault-prone system, while Fox and Brewer’s Harvest, Yield, and Scalable Tolerant Systems characterizes it in a more pragmatic light. I will continue to say unequivocally that the CAP theorem is important within the field of distributed systems and of significance to system designers and practitioners.

A Renewed Hope

Following from the results detailed earlier would imply many distributed algorithms, including those which implement linearizable operations, serializable transactions, and leader election, are a hopeless endeavor. Is it game over? Fortunately, no. Carefully designed distributed systems can maintain correctness without relying on pure coincidence.

First, it’s important to point out that the FLP result does not indicate consensus is unreachable, just that it’s not always reachable in bounded time. Second, the system model FLP uses is, in some ways, a pathological one. Synchronous systems place a known upper bound on message delivery between processes and on process computation. Asynchronous systems have no fixed upper bounds. In practice, systems tend to exhibit partial synchrony, which is described as one of two models by Dwork and Lynch in Consensus in the Presence of Partial Synchrony. In the first model of partial synchrony, fixed bounds exist but they are not known a priori. In the second model, the bounds are known but are only guaranteed to hold starting at unknown time T. Dwork and Lynch present fault-tolerant consensus protocols for both partial-synchrony models combined with various fault models.

Chandra and Toueg introduce the concept of unreliable failure detectors in Unreliable Failure Detectors for Reliable Distributed Systems. Each process has a local, external failure detector which can make mistakes. The detector monitors a subset of the processes in the system and maintains a list of those it suspects to have crashed. Failures are detected by simply pinging each process periodically and suspecting any process which doesn’t respond to the ping within twice the maximum round-trip time for any previous ping. The detector makes a mistake when it erroneously suspects a correct process, but it may later correct the mistake by removing the process from its list of suspects. The presence of failure detectors, even unreliable ones, makes consensus solvable in a slightly relaxed system model.

While consensus ensures processes agree on a value, atomic broadcast ensures processes deliver the same messages in the same order. This same paper shows that the problems of consensus and atomic broadcast are reducible to each other, meaning they are equivalent. Thus, the FLP result and others apply equally to atomic broadcast, which is used in coordination services like Apache ZooKeeper.

In Introduction to Reliable and Secure Distributed Programming, Cachin, Guerraoui, and Rodrigues suggest most practical systems can be described as partially synchronous:

Generally, distributed systems appear to be synchronous. More precisely, for most systems that we know of, it is relatively easy to define physical time bounds that are respected most of the time. There are, however, periods where the timing assumptions do not hold, i.e., periods during which the system is asynchronous. These are periods where the network is overloaded, for instance, or some process has a shortage of memory that slows it down. Typically, the buffer that a process uses to store incoming and outgoing messages may overflow, and messages may thus get lost, violating the time bound on the delivery. The retransmission of the messages may help ensure the reliability of the communication links but introduce unpredictable delays. In this sense, practical systems are partially synchronous.

We capture partial synchrony by assuming timing assumptions only hold eventually without stating exactly when. Similarly, we call the system eventually synchronous. However, this does not guarantee the system is synchronous forever after a certain time, nor does it require the system to be initially asynchronous then after a period of time become synchronous. Instead it implies the system has periods of asynchrony which are not bounded, but there are periods where the system is synchronous long enough for an algorithm to do something useful or terminate. The key thing to remember with asynchronous systems is that they contain no timing assumptions.

Lastly, On the Minimal Synchronism Needed for Distributed Consensus by Dolev, Dwork, and Stockmeyer describes a consensus protocol as t-resilient if it operates correctly when at most t processes fail. In the paper, several critical system parameters and synchronicity conditions are identified, and it’s shown how varying them affects the t-resiliency of an algorithm. Consensus is shown to be provably possible for some models and impossible for others.

Fault-tolerant consensus is made possible by relying on quorums. The intuition is that as long as a majority of processes agree on every decision, there is at least one process which knows about the complete history in the presence of faults.

Deterministic consensus, and by extension a number of other useful algorithms, is impossible in certain system models, but we can model most real-world systems in a way that circumvents this. Nevertheless, it shows the inherent complexities involved with distributed systems and the rigor needed to solve certain problems.

Theory to Practice

What does all of this mean for us in practice? For starters, it means distributed systems are usually a harder problem than they let on. Unfortunately, this is often the cause of improperly documented trade-offs or, in many cases, data loss and safety violations. It also suggests we need to rethink the way we design systems by shifting the focus from system properties and guarantees to business rules and application invariants.

One of my favorite papers is End-To-End Arguments in System Design by Saltzer, Reed, and Clark. It’s an easy read, but it presents a compelling design principle for determining where to place functionality in a distributed system. The principle idea behind the end-to-end argument is that functions placed at a low level in a system may be redundant or of little value when compared to the cost of providing them at that low level. It follows that, in many situations, it makes more sense to flip guarantees “inside out”—pushing them outwards rather than relying on subsystems, middleware, or low-level layers of the stack to maintain them.

To illustrate this, we consider the problem of “careful file transfer.” A file is stored by a file system on the disk of computer A, which is linked by a communication network to computer B. The goal is to move the file from computer A’s storage to computer B’s storage without damage and in the face of various failures along the way. The application in this case is the file-transfer program which relies on storage and network abstractions. We can enumerate just a few of the potential problems an application designer might be concerned with:

  1. The file, though originally written correctly onto the disk at host A, if read now may contain incorrect data, perhaps because of hardware faults in the disk storage system.
  2. The software of the file system, the file transfer program, or the data communication system might make a mistake in buffering and copying the data of the file, either at host A or host B.
  3. The hardware processor or its local memory might have a transient error while doing the buffering and copying, either at host A or host B.
  4. The communication system might drop or change the bits in a packet, or lose a packet or deliver a packet more than once.
  5. Either of the hosts may crash part way through the transaction after performing an unknown amount (perhaps all) of the transaction.

Many of these problems are Byzantine in nature. When we consider each threat one by one, it becomes abundantly clear that even if we place countermeasures in the low-level subsystems, there will still be checks required in the high-level application. For example, we might place checksums, retries, and sequencing of packets in the communication system to provide reliable data transmission, but this really only eliminates threat four. An end-to-end checksum and retry mechanism at the file-transfer level is needed to guard against the remaining threats.

Building reliability into the low level has a number of costs involved. It takes a non-trivial amount of effort to build it. It’s redundant and, in fact, hinders performance by reducing the frequency of application retries and adding unneeded overhead. It also has no actual effect on correctness because correctness is determined and enforced by the end-to-end checksum and retries. The reliability and correctness of the communication system is of little importance, so going out of its way to ensure resiliency does not reduce any burden on the application. In fact, ensuring correctness by relying on the low level might be altogether impossible since threat number two requires writing correct programs, but not all programs involved may be written by the file-transfer application programmer.

Fundamentally, there are two problems with placing functionality at the lower level. First, the lower level is not aware of the application needs or semantics, which means logic placed there is often insufficient. This leads to duplication of logic as seen in the example earlier. Second, other applications which rely on the lower level pay the cost of the added functionality even when they don’t necessarily need it.

Saltzer, Reed, and Clark propose the end-to-end principle as a sort of “Occam’s razor” for system design, arguing that it helps guide the placement of functionality and organization of layers in a system.

Because the communication subsystem is frequently specified before applications that use the subsystem are known, the designer may be tempted to “help” the users by taking on more function than necessary. Awareness of end-to end arguments can help to reduce such temptations.

However, it’s important to note that the end-to-end principle is not a panacea. Rather, it’s a guideline to help get designers to think about their solutions end to end, acknowledge their application requirements, and consider their failure modes. Ultimately, it provides a rationale for moving function upward in a layered system, closer to the application that uses the function, but there are always exceptions to the rule. Low-level mechanisms might be built as a performance optimization. Regardless, the end-to-end argument contends that lower levels should avoid taking on any more responsibility than necessary. The “lessons” section from Google’s Bigtable paper echoes some of these same sentiments:

Another lesson we learned is that it is important to delay adding new features until it is clear how the new features will be used. For example, we initially planned to support general-purpose transactions in our API. Because we did not have an immediate use for them, however, we did not implement them. Now that we have many real applications running on Bigtable, we have been able to examine their actual needs, and have discovered that most applications require only single-row transactions. Where people have requested distributed transactions, the most important use is for maintaining secondary indices, and we plan to add a specialized mechanism to satisfy this need. The new mechanism will be less general than distributed transactions, but will be more efficient (especially for updates that span hundreds of rows or more) and will also interact better with our scheme for optimistic cross-datacenter replication.

We’ll see the end-to-end argument as a common theme throughout the remainder of this piece.

Whose Guarantee Is It Anyway?

Generally, we rely on robust algorithms, transaction managers, and coordination services to maintain consistency and application correctness. The problem with these is twofold: they are often unreliable and they impose a massive performance bottleneck.

Distributed coordination algorithms are difficult to get right. Even tried-and-true protocols like two-phase commit are susceptible to crash failures and network partitions. Protocols which are more fault tolerant like Paxos and Raft generally don’t scale well beyond small clusters or across wide-area networks. Consensus systems like ZooKeeper own your availability, meaning if you depend on one and it goes down, you’re up a creek. Since quorums are often kept small for performance reasons, this might be less rare than you think.

Coordination systems become a fragile and complex piece of your infrastructure, which seems ironic considering they are usually employed to reduce fragility. On the other hand, message-oriented middleware largely use coordination to provide developers with strong guarantees: exactly-once, ordered, transactional delivery and the like.

From transmission protocols to enterprise message brokers, relying on delivery guarantees is an anti-pattern in distributed system design. Delivery semantics are a tricky business. As such, when it comes to distributed messaging, what you want is often not what you need. It’s important to look at the trade-offs involved, how they impact system design (and UX!), and how we can cope with them to make better decisions.

Subtle and not-so-subtle failure modes make providing strong guarantees exceedingly difficult. In fact, some guarantees, like exactly-once delivery, aren’t even really possible to achieve when we consider things like the Two Generals Problem and the FLP result. When we try to provide semantics like guaranteed, exactly-once, and ordered message delivery, we usually end up with something that’s over-engineered, difficult to deploy and operate, fragile, and slow. What is the upside to all of this? Something that makes your life easier as a developer when things go perfectly well, but the reality is things don’t go perfectly well most of the time. Instead, you end up getting paged at 1 a.m. trying to figure out why RabbitMQ told your monitoring everything is awesome while proceeding to take a dump in your front yard.

If you have something that relies on these types of guarantees in production, know that this will happen to you at least once sooner or later (and probably much more than that). Eventually, a guarantee is going to break down. It might be inconsequential, it might not. Not only is this a precarious way to go about designing things, but if you operate at a large scale, care about throughput, or have sensitive SLAs, it’s probably a nonstarter.

The performance implications of distributed transactions are obvious. Coordination is expensive because processes can’t make progress independently, which in turn limits throughput, availability, and scalability. Peter Bailis gave an excellent talk called Silence is Golden: Coordination-Avoiding Systems Design which explains this in great detail and how coordination can be avoided. In it, he explains how distributed transactions can result in nearly a 400x decrease in throughput in certain situations.

Avoiding coordination enables infinite scale-out while drastically improving throughput and availability, but in some cases coordination is unavoidable. In Coordination Avoidance in Database Systems, Bailis et al. answer a key question: when is coordination necessary for correctness? They present a property, invariant confluence (I-confluence), which is necessary and sufficient for safe, coordination-free, available, and convergent execution. I-confluence essentially works by pushing invariants up into the business layer where we specify correctness in terms of application semantics rather than low-level database operations.

Without knowledge of what “correctness” means to your app (e.g., the invariants used in I-confluence), the best you can do to preserve correctness under a read/write model is serializability.

I-confluence can be determined given a set of transactions and a merge function used to reconcile divergent states. If I-confluence holds, there exists a coordination-free execution strategy that preserves invariants. If it doesn’t hold, no such strategy exists—coordination is required. I-confluence allows us to identify when we can and can’t give up coordination, and by pushing invariants up, we remove a lot of potential bottlenecks from areas which don’t require it.

If we recall, “synchrony” within the context of distributed computing is really just making assumptions about time, so synchronization is basically two or more processes coordinating around time. As we saw, a system which performs no coordination will have optimal performance and availability since everyone can proceed independently. However, a distributed system which performs zero coordination isn’t particularly useful or possible as I-confluence shows. Christopher Meiklejohn’s Strange Loop talk, Distributed, Eventually Consistent Computations, provides an interesting take on coordination with the parable of the car. A car requires friction to drive, but that friction is limited to very small contact points. Any other friction on the car causes problems or inefficiencies. If we think about physical time as friction, we know we can’t eliminate it altogether because it’s essential to the problem, but we want to reduce the use of it in our systems as much as possible. We can typically avoid relying on physical time by instead using logical time, for example, with the use of Lamport clocks or other conflict-resolution techniques. Lamport’s Time, Clocks, and the Ordering of Events in a Distributed System is the classical introduction to this idea.

Often, systems simply forgo coordination altogether for latency-sensitive operations, a perfectly reasonable thing to do provided the trade-off is explicit and well-documented. Sadly, this is frequently not the case. But we can do better. I-confluence provides a useful framework for avoiding coordination, but there’s a seemingly larger lesson to be learned here. What it really advocates is reexamining how we design systems, which seems in some ways to closely parallel our end-to-end argument.

When we think low level, we pay the upfront cost of entry—serializable transactions, linearizable reads and writes, coordination. This seems contradictory to the end-to-end principle. Our application doesn’t really care about atomicity or isolation levels or linearizability. It cares about two users sharing the same ID or two reservations booking the same room or a negative balance in a bank account, but the database doesn’t know that. Sometimes these rules don’t even require any expensive coordination.

If all we do is code our business rules and constraints into the language our infrastructure understands, we end up with a few problems. First, we have to know how to translate our application semantics into these low-level operations while avoiding any impedance mismatch. In the context of messaging, guaranteed delivery doesn’t really mean anything to our application which cares about what’s done with the messages. Second, we preclude ourselves from using a lot of generalized solutions and, in some cases, we end up having to engineer specialized ones ourselves. It’s not clear how well this scales in practice. Third, we pay a performance penalty that could otherwise be avoided (as I-confluence shows). Lastly, we put ourselves at the mercy of our infrastructure and hope it makes good on its promises—it often doesn’t.

Working on a messaging platform team, I’ve had countless conversations which resemble the following exchange:

Developer: “We need fast messaging.”
Me: “Is it okay if messages get dropped occasionally?”
Developer: “What? Of course not! We need it to be reliable.”
Me: “Okay, we’ll add a delivery ack, but what happens if your application crashes before it processes the message?”
Developer: “We’ll ack after processing.”
Me: “What happens if you crash after processing but before acking?”
Developer: “We’ll just retry.”
Me: “So duplicate delivery is okay?”
Developer: “Well, it should really be exactly-once.”
Me: “But you want it to be fast?”
Developer: “Yep. Oh, and it should maintain message ordering.”
Me: “Here’s TCP.”

If, instead, we reevaluate the interactions between our systems, their APIs, their semantics, and move some of that responsibility off of our infrastructure and onto our applications, then maybe we can start to build more robust, resilient, and performant systems. With messaging, does our infrastructure really need to enforce FIFO ordering? Preserving order with distributed messaging in the presence of failure while trying to simultaneously maintain high availability is difficult and expensive. Why rely on it when it can be avoided with commutativity? Likewise, transactional delivery requires coordination which is slow and brittle while still not providing application guarantees. Why rely on it when it can be avoided with idempotence and retries? If you need application-level guarantees, build them into the application level. The infrastructure can’t provide it.

I really like Gregor Hohpe’s “Your Coffee Shop Doesn’t Use Two-Phase Commit” because it shows how simple solutions can be if we just model them off of the real world. It gives me hope we can design better systems, sometimes by just turning things on their head. There’s usually a reason things work the way they do, and it often doesn’t even involve the use of computers or complicated algorithms.

Rather than try to hide complexities by using flaky and heavy abstractions, we should engage directly by recognizing them in our design decisions and thinking end to end. It may be a long and winding path to distributed systems zen, but the best place to start is from the beginning.

I’d like to thank Tom Santero for reviewing an early draft of this writing. Any inaccuracies or opinions expressed are mine alone.

Breaking and Entering: Lose the Lock While Embracing Concurrency

This article originally appeared on Workiva’s engineering blog as a two-part series.

Providing robust message routing was a priority for us at Workiva when building our distributed messaging infrastructure. This encompassed directed messaging, which allows us to route messages to specific endpoints based on service or client identifiers, but also topic fan-out with support for wildcards and pattern matching.

Existing message-oriented middleware, such as RabbitMQ, provide varying levels of support for these but don’t offer the rich features needed to power Wdesk. This includes transport fallback with graceful degradation, tunable qualities of service, support for client-side messaging, and pluggable authentication middleware. As such, we set out to build a new system, not by reinventing the wheel, but by repurposing it.

Eventually, we settled on Apache Kafka as our wheel, or perhaps more accurately, our log. Kafka demonstrates a telling story of speed, scalability, and fault tolerance—each a requisite component of any reliable messaging system—but it’s only half the story. Pub/sub is a critical messaging pattern for us and underpins a wide range of use cases, but Kafka’s topic model isn’t designed for this purpose. One of the key engineering challenges we faced was building a practical routing mechanism by which messages are matched to interested subscribers. On the surface, this problem appears fairly trivial and is far from novel, but it becomes quite interesting as we dig deeper.

Back to Basics

Topic routing works by matching a published message with interested subscribers. A consumer might subscribe to the topic “foo.bar.baz,” in which any message published to this topic would be delivered to them. We also must support * and # wildcards, which match exactly one word and zero or more words, respectively. In this sense, we follow the AMQP spec:

The routing key used for a topic exchange MUST consist of zero or more words delimited by dots. Each word may contain the letters A–Z and a–z and digits 0–9. The routing pattern follows the same rules as the routing key with the addition that * matches a single word, and # matches zero or more words. Thus the routing pattern *.stock.# matches the routing keys usd.stock and eur.stock.db but not stock.nasdaq.

This problem can be modeled using a trie structure. RabbitMQ went with this approach after exploring other options, like caching topics and indexing the patterns or using a deterministic finite automaton. The latter options have greater time and space complexities. The former requires backtracking the tree for wildcard lookups.

The subscription trie looks something like this:

subscription_trie

Even in spite of the backtracking required for wildcards, the trie ends up being a more performant solution due to its logarithmic complexity and tendency to fit CPU cache lines. Most tries have hot paths, particularly closer to the root, so caching becomes indispensable. The trie approach is also vastly easier to implement.

In almost all cases, this subscription trie needs to be thread-safe as clients are concurrently subscribing, unsubscribing, and publishing. We could serialize access to it with a reader-writer lock. For some, this would be the end of the story, but for high-throughput systems, locking is a major bottleneck. We can do better.

Breaking the Lock

We considered lock-free techniques that could be applied. Lock-free concurrency means that while a particular thread of execution may be blocked, all CPUs are able to continue processing other work. For example, imagine a program that protects access to some resource using a mutex. If a thread acquires this mutex and is subsequently preempted, no other thread can proceed until this thread is rescheduled by the OS. If the scheduler is adversarial, it may never resume execution of the thread, and the program would be effectively deadlocked. A key point, however, is that the mere lack of a lock does not guarantee a program is lock-free. In this context, “lock” really refers to deadlock, livelock, or the misdeeds of a malevolent scheduler.

In practice, what lock-free concurrency buys us is increased system throughput at the expense of increased tail latencies. Looking at a transactional system, lock-freedom allows us to process many concurrent transactions, any of which may block, while guaranteeing systemwide progress. Depending on the access patterns, when a transaction does block, there are always other transactions which can be processed—a CPU never idles. For high-throughput databases, this is essential.

Concurrent Timelines and Linearizability

Lock-freedom can be achieved using a number of techniques, but it ultimately reduces to a small handful of fundamental patterns. In order to fully comprehend these patterns, it’s important to grasp the concept of linearizability.

It takes approximately 100 nanoseconds for data to move from the CPU to main memory. This means that the laws of physics govern the unavoidable discrepancy between when you perceive an operation to have occurred and when it actually occurred. There is the time from when an operation is invoked to when some state change physically occurs (call it tinv), and there is the time from when that state change occurs to when we actually observe the operation as completed (call it tcom). Operations are not instantaneous, which means the wall-clock history of operations is uncertain. tinv and tcom vary for every operation. This is more easily visualized using a timeline diagram like the one below:

timeline

This timeline shows several reads and writes happening concurrently on some state. Physical time moves from left to right. This illustrates that even if a write is invoked before another concurrent write in real time, the later write could be applied first. If there are multiple threads performing operations on shared state, the notion of physical time is meaningless.

We use a linearizable consistency model to allow some semblance of a timeline by providing a total order of all state updates. Linearizability requires that each operation appears to occur atomically at some point between its invocation and completion. This point is called the linearization point. When an operation completes, it’s guaranteed to be observable by all threads because, by definition, the operation occurred before its completion time. After this point, reads will only see this value or a later one—never anything before. This gives us a proper sequencing of operations which can be reasoned about. Linearizability is a correctness condition for concurrent objects.

Of course, linearizability comes at a cost. This is why most memory models aren’t linearizable by default. Going back to our subscription trie, we could make operations on it appear atomic by applying a global lock. This kills throughput, but it ensures linearization.

lock trie

In reality, the trie operations do not occur at a specific instant in time as the illustration above depicts. However, mutual exclusion gives it the appearance and, as a result, linearizability holds at the expense of systemwide progress. Acquiring and releasing the lock appear instantaneous in the timeline because they are backed by atomic hardware operations like test-and-set. Linearizability is a composable property, meaning if an object is composed of linearizable objects, it is also linearizable. This allows us to construct abstractions from linearizable hardware instructions to data structures, all the way up to linearizable distributed systems.

Read-Modify-Write and CAS

Locks are expensive, not just due to contention but because they completely preclude parallelism. As we saw, if a thread which acquires a lock is preempted, any other threads waiting for the lock will continue to block.

Read-modify-write operations like compare-and-swap offer a lock-free approach to ensuring linearizable consistency. Such techniques loosen the bottleneck by guaranteeing systemwide throughput even if one or more threads are blocked. The typical pattern is to perform some speculative work then attempt to publish the changes with a CAS. If the CAS fails, then another thread performed a concurrent operation, and the transaction needs to be retried. If it succeeds, the operation was committed and is now visible, preserving linearizability. The CAS loop is a pattern used in many lock-free data structures and proves to be a useful primitive for our subscription trie.

CAS is susceptible to the ABA problem. These operations work by comparing values at a memory address. If the value is the same, it’s assumed that nothing has changed. However, this can be problematic if another thread modifies the shared memory and changes it back before the first thread resumes execution. The ABA problem is represented by the following sequence of events:

  1. Thread T1 reads shared-memory value A
  2. T1 is preempted, and T2 is scheduled
  3. T2 changes A to B then back to A
  4. T2 is preempted, and T1 is scheduled
  5. T1 sees the shared-memory value is A and continues

In this situation, T1 assumes nothing has changed when, in fact, an invariant may have been violated. We’ll see how this problem is addressed later.

At this point, we’ve explored the subscription-matching problem space, demonstrated why it’s an area of high contention, and examined why locks pose a serious problem to throughput. Linearizability provides an important foundation of understanding for lock-freedom, and we’ve looked at the most fundamental pattern for building lock-free data structures, compare-and-swap. Next, we will take a deep dive on applying lock-free techniques in practice by building on this knowledge. We’ll continue our narrative of how we applied these same techniques to our subscription engine and provide some further motivation for them.

Lock-Free Applied

Let’s revisit our subscription trie from earlier. Our naive approach to making it linearizable was to protect it with a lock. This proved easy, but as we observed, severely limited throughput. For a message broker, access to this trie is a critical path, and we usually have multiple threads performing inserts, removals, and lookups on it concurrently. Intuition tells us we can implement these operations without coarse-grained locking by relying on a CAS to perform mutations on the trie.

If we recall, read-modify-write is typically applied by copying a shared variable to a local variable, performing some speculative work on it, and attempting to publish the changes with a CAS. When inserting into the trie, our speculative work is creating an updated copy of a node. We commit the new node by updating the parent’s reference with a CAS. For example, if we want to add a subscriber to a node, we would copy the node, add the new subscriber, and CAS the pointer to it in the parent.

This approach is broken, however. To see why, imagine if a thread inserts a subscription on a node while another thread concurrently inserts a subscription as a child of that node. The second insert could be lost due to the sequencing of the reference updates. The diagram below illustrates this problem. Dotted lines represent a reference updated with a CAS.

trie cas add

The orphaned nodes containing “x” and “z” mean the subscription to “foo.bar” was lost. The trie is in an inconsistent state.

We looked to existing research in the field of non-blocking data structures to help illuminate a path. “Concurrent Tries with Efficient Non-Blocking Snapshots” by Prokopec et al. introduces the Ctrie, a non-blocking, concurrent hash trie based on shared-memory, single-word CAS instructions.

A hash array mapped trie (HAMT) is an implementation of an associative array which, unlike a hashmap, is dynamically allocated. Memory consumption is always proportional to the number of keys in the trie. A HAMT works by hashing keys and using the resulting bits in the hash code to determine which branches to follow down the trie. Each node contains a table with a fixed number of branch slots. Typically, the number of branch slots is 32. On a 64-bit machine, this would mean it takes 256 bytes (32 branches x 8-byte pointers) to store the branch table of a node.

The size of L1-L3 cache lines is 64 bytes on most modern processors. We can’t fit the branch table in a CPU cache line, let alone the entire node. Instead of allocating space for all branches, we use a bitmap to indicate the presence of a branch at a particular slot. This reduces the size of an empty node from roughly 264 bytes to 12 bytes. We can safely fit a node with up to six branches in a single cache line.

The Ctrie is a concurrent, lock-free version of the HAMT which ensures progress and linearizability. It solves the CAS problem described above by introducing indirection nodes, or I-nodes, which remain present in the trie even as nodes above and below change. This invariant ensures correctness on inserts by applying the CAS operation on the I-node instead of the internal node array.

An I-node may point to a Ctrie node, or C-node, which is an internal node containing a bitmap and array of references to branches. A branch is either an I-node or a singleton node (S-node) containing a key-value pair. The S-node is a leaf in the Ctrie. A newly initialized Ctrie starts with a root pointer to an I-node which points to an empty C-node. The diagram below illustrates a sequence of inserts on a Ctrie.

ctrie insert

An insert starts by atomically reading the I-node’s reference. Next, we copy the C-node and add the new key, recursively insert on an I-node, or extend the Ctrie with a new I-node. The new C-node is then published by performing a CAS on the parent I-node. A failed CAS indicates another thread has mutated the I-node. We re-linearize by atomically reading the I-node’s reference again, which gives us the current state of the Ctrie according to its linearizable history. We then retry the operation until the CAS succeeds. In this case, the linearization point is a successful CAS. The following figure shows why the presence of I-nodes ensures consistency.

ctrie insert correctness

In the above diagram, (k4,v4) is inserted into a Ctrie containing (k1,v1), (k2,v2), and (k3,v3). The new key-value pair is added to node C1 by creating a copy, C1, with the new entry. A CAS is then performed on the pointer at I1, indicated by the dotted line. Since C1 continues pointing to I2, any concurrent updates which occur below it will remain present in the trie. C1 is then garbage collected once no more threads are accessing it. Because of this, Ctries are much easier to implement in a garbage-collected language. It turns out that this deferred reclamation also solves the ABA problem described earlier by ensuring memory addresses are recycled only when it’s safe to do so.

The I-node invariant is enough to guarantee correctness for inserts and lookups, but removals require some additional invariants in order to avoid update loss. Insertions extend the Ctrie with additional levels, while removals eliminate the need for some of these levels. This is because we want to keep the Ctrie as compact as possible while still remaining correct. For example, a remove operation could result in a C-node with a single S-node below it. This state is valid, but the Ctrie could be made more compact and lookups on the lone S-node more efficient if it were moved up into the C-node above. This would allow the I-node and C-node to be removed.

The problem with this approach is it will cause insertions to be lost. If we move the S-node up and replace the dangling I-node reference with it, another thread could perform a concurrent insert on that I-node just before the compression occurs. The insert would be lost because the pointer to the I-node would be removed.

This issue is solved by introducing a new type of node called the tomb node (T-node) and an associated invariant. The T-node is used to ensure proper ordering during removals. The invariant is as follows: if an I-node points to a T-node at some time t0, then for all times greater than t0, the I-node points to the same T-node. More concisely, a T-node is the last value assigned to an I-node. This ensures that no insertions occur at an I-node if it is being compressed. We call such an I-node a tombed I-node.

If a removal results in a non-root-level C-node with a single S-node below it, the C-node is replaced with a T-node wrapping the S-node. This guarantees that every I-node except the root points to a C-node with at least one branch. This diagram depicts the result of removing (k2,v2) from a Ctrie:

ctrie removal

Removing (k2,v2) results in a C-node with a single branch, so it’s subsequently replaced with a T-node. The T-node provides a sequencing mechanism by effectively acting as a marker. While it solves the problem of lost updates, it doesn’t give us a compacted trie. If two keys have long matching hash code prefixes, removing one of the keys would result in a long chain of C-nodes followed by a single T-node at the end.

An invariant was introduced which says once an I-node points to a T-node, it will always point to that T-node. This means we can’t change a tombed I-node’s pointer, so instead we replace the I-node with its resurrection. The resurrection of a tombed I-node is the S-node wrapped in its T-node. When a T-node is produced during a removal, we ensure that it’s still reachable, and if it is, resurrect its tombed I-node in the C-node above. If it’s not reachable, another thread has already performed the compression. To ensure lock-freedom, all operations which read a T-node must help compress it instead of waiting for the removing thread to complete. Compression on the Ctrie from the previous diagram is illustrated below.

ctrie compression

The resurrection of the tombed I-node ensures the Ctrie is optimally compressed for arbitrarily long chains while maintaining integrity.

With a 32-bit hash code space, collisions are rare but still nontrivial. To deal with this, we introduce one final node, the list node (L-node). An L-node is essentially a persistent linked list. If there is a collision between the hash codes of two different keys, they are placed in an L-node. This is analogous to a hash table using separate chaining to resolve collisions.

One interesting property of the Ctrie is support for lock-free, linearizable, constant-time snapshots. Most concurrent data structures do not support snapshots, instead opting for locks or requiring a quiescent state. This allows Ctries to have O(1) iterator creation, clear, and size retrieval (amortized).

Constant-time snapshots are implemented by writing the Ctrie as a persistent data structure and assigning a generation count to each I-node. A persistent hash trie is updated by rewriting the path from the root of the trie down to the leaf the key belongs to while leaving the rest of the trie intact. The generation demarcates Ctrie snapshots. To create a new snapshot, we copy the root I-node and assign it a new generation. When an operation detects that an I-node’s generation is older than the root’s generation, it copies the I-node to the new generation and updates the parent. The path from the root to some node is only updated the first time it’s accessed, making the snapshot a O(1) operation.

The final piece needed for snapshots is a special type of CAS operation. There is a race condition between the thread creating a snapshot and the threads which have already read the root I-node with the previous generation. The linearization point for an insert is a successful CAS on an I-node, but we need to ensure that both the I-node has not been modified and its generation matches that of the root. This could be accomplished with a double compare-and-swap, but most architectures do not support such an operation.

The alternative is to use a RDCSS double-compare-single-swap originally described by Harris et al. We implement an operation with similar semantics to RDCSS called GCAS, or generation compare-and-swap. The GCAS allows us to atomically compare both the I-node pointer and its generation to the expected values before committing an update.

After researching the Ctrie, we wrote a Go implementation in order to gain a deeper understanding of the applied techniques. These same ideas would hopefully be adaptable to our problem domain.

Generalizing the Ctrie

The subscription trie shares some similarities to the hash array mapped trie but there are some key differences. First, values are not strictly stored at the leaves but can be on internal nodes as well. Second, the decomposed topic is used to determine how the trie is descended rather than a hash code. Wildcards complicate lookups further by requiring backtracking. Lastly, the number of branches on a node is not a fixed size. Applying the Ctrie techniques to the subscription trie, we end up with something like this:

matchbox

Much of the same logic applies. The main distinctions are the branch traversal based on topic words and rules around wildcards. Each branch is associated with a word and set of subscribers and may or may not point to an I-node. The I-nodes still ensure correctness on inserts. The behavior of T-nodes changes slightly. With the Ctrie, a T-node is created from a C-node with a single branch and then compressed. With the subscription trie, we don’t introduce a T-node until all branches are removed. A branch is pruned if it has no subscribers and points to nowhere or it has no subscribers and points to a tombed I-node. The GCAS and snapshotting remain unchanged.

We implemented this Ctrie derivative in order to build our concurrent pattern-matching engine, matchbox. This library provides an exceptionally simple API which allows a client to subscribe to a topic, unsubscribe from a topic, and lookup a topic’s subscribers. Snapshotting is also leveraged to retrieve the global subscription tree and the topics to which clients are currently subscribed. These are useful to see who currently has subscriptions and for what.

In Practice

Matchbox has been pretty extensively benchmarked, but to see how it behaves, it’s critical to observe its performance under contention. Many messaging systems opt for a mutex which tends to result in a lot of lock contention. It’s important to know what the access patterns look like in practice, but for our purposes, it’s heavily parallel. We don’t want to waste CPU cycles if we can help it.

To see how matchbox compares to lock-based subscription structures, I benchmarked it against gnatsd, a popular high-performance messaging system also written in Go. Gnatsd uses a tree-like structure protected by a mutex to manage subscriptions and offers similar wildcard semantics.

The benchmarks consist of one or more insertion goroutines and one or more lookup goroutines. Each insertion goroutine inserts 1000 subscriptions, and each lookup goroutine looks up 1000 subscriptions. We scale these goroutines up to see how the systems behave under contention.

The first benchmark is a 1:1 concurrent insert-to-lookup workload. A lookup corresponds to a message being published and matched to interested subscribers, while an insert occurs when a client subscribes to a topic. In practice, lookups are much more frequent than inserts, so the second benchmark is a 1:3 concurrent insert-to-lookup workload to help simulate this. The timings correspond to the complete insert and lookup workload. GOMAXPROCS was set to 8, which controls the number of operating system threads that can execute simultaneously. The benchmarks were run on a machine with a 2.6 GHz Intel Core i7 processor.

matchbox_bench_1_1

matchbox_bench_1_3

It’s quite clear that the lock-free approach scales a lot better under contention. This follows our intuition because lock-freedom allows system-wide progress even when a thread is blocked. If one goroutine is blocked on an insert or lookup operation, other operations may proceed. With a mutex, this isn’t possible.

Matchbox performs well, particularly in multithreaded environments, but there are still more optimizations to be made. This includes improvements both in memory consumption and runtime performance. Applying the Ctrie techniques to this type of trie results in a fairly non-compact structure. There may be ways to roll up branches—either eagerly or after removals—and expand them lazily as necessary. Other optimizations might include placing a cache or Bloom filter in front of the trie to avoid descending it. The main difficulty with these will be managing support for wildcards.

Conclusion

To summarize, we’ve seen why subscription matching is often a major concern for message-oriented middleware and why it’s frequently a bottleneck. Concurrency is crucial for high-performance systems, and we’ve looked at how we can achieve concurrency without relying on locks while framing it within the context of linearizability. Compare-and-swap is a fundamental tool used to implement lock-free data structures, but it’s important to be conscious of the pitfalls. We introduce invariants to protect data consistency. The Ctrie is a great example of how to do this and was foundational in our lock-free subscription-matching implementation. Finally, we validated our work by showing that lock-free data structures scale dramatically better with multithreaded workloads under contention.

My thanks to Steven Osborne and Dustin Hiatt for reviewing this article.

What You Want Is What You Don’t: Understanding Trade-Offs in Distributed Messaging

If there’s one unifying theme of this blog, it’s that distributed systems are riddled with trade-offs. Specifically, with distributed messaging, you cannot have exactly-once delivery. However, messaging trade-offs don’t stop at delivery semantics. I want to talk about what I mean by this and explain why many developers often have the wrong mindset when it comes to building distributed applications.

The natural tendency is to build distributed systems as if they aren’t distributed at all—assuming data consistency, reliable messaging, and predictability. It’s much easier to reason about, but it’s also blatantly misleading.

The only thing guaranteed in messaging—and distributed systems in general—is that sooner or later, your guarantees are going to break down. If you assume these guarantees as axiomatic, everything built on them becomes unsound. Depending on the situation, this can range from mildly annoying to utterly catastrophic.

I recently ran across a comment from Apcera CEO Derek Collison on this topic which resonated with me:

On systems that do claim some form of guarantee, it’s best to look at what level that guarantee really runs out. Especially around persistence, exactly once delivery semantics, etc. I spent much of my career designing and building messaging systems that have those guarantees, and in turn developed many systems utilizing some of those features. For me, I found that depending on these guarantees was a bad pattern in distributed system design…

You should know how your system behaves when you reach the breaking point, but what’s less obvious is that providing these types of strong guarantees is usually very expensive. What price are we willing to pay, what level do our guarantees hold to, and what happens when they give out? In this sense, a “guarantee” is really no different from a SLA, yet stronger guarantees allow for stronger assumptions.

This all sounds quite vague, so let’s look at a specific example. With messaging, we’re often concerned with delivery reliability. In a perfect world, message delivery would be guaranteed and exactly once. Of course, I’ve talked at length why this is impossible, so let’s anchor ourselves in reality. We can look to TCP/IP for how this works.

IP is an unreliable delivery system which runs on unreliable network infrastructure. Packets can be delivered in order, out of order, or not at all. There are no acknowledgements, so the sender has no way of knowing if what they sent was received. TCP builds on IP by effectively making the transmission stateful and adding a layer of control. Through added complexity and performance costs, we achieve reliable delivery over an unreliable stack.

The key takeaway here is that we start with something primitive, like moving bits from point A to point B, and layer on abstractions to build stronger guarantees.  These abstractions almost always come at a price, tangible or not, which is why it’s important to push the costs up into the layers above. If not every use case demands reliable delivery, why force the cost onto everyone?

Exactly-once delivery is the Holy Grail of distributed messaging, and guaranteed delivery is the unicorn. The irony is that even if they were attainable, you likely wouldn’t want them. These types of strong guarantees demand expensive infrastructure which perform expensive coordination which require expensive administration. But what does all this expensive stuff really buy you at the end of the day?

A key problem is that there is a huge difference between message delivery and message processing. Sure, TCP can more or less ensure that your packet was either delivered or not, but what good is that actually in practice? How does the sender know that its message was successfully processed or that the receiver did what it needed to do? The only way to truly know is for the receiver to send a business-level acknowledgement. The low-level transport protocol doesn’t know about the application semantics, so the only way to go, really, is up. And if we assume that any guarantees will eventually give out, we have to account for that at the business level. To quote from a related article, “if reliability is important on the business level, do it on the business level.” It’s important not to conflate the transport protocol with the business-transaction protocol.

This is why systems like Akka don’t provide a notion of guaranteed delivery—because what does “guaranteed delivery” actually mean? Does it mean the message was handed to the transport layer? Does it mean the remote machine received the message? Does it mean the message was enqueued in the recipient’s mailbox?  Does it mean the recipient has started processing it? Does it mean the recipient has finished processing it? Each of these things has a very different set of requirements, constraints, and costs. Also, what does it even mean for a message to be “processed”? It depends on the business context. As such, it usually doesn’t make sense for the underlying infrastructure to make these decisions because the decisions usually impact the layers above significantly.

By providing only basic guarantees those use cases which do not need stricter guarantees do not pay the cost of their implementation; it is always possible to add stricter guarantees on top of basic ones, but it is not possible to retro-actively remove guarantees in order to gain more performance.

Distributed computation is inherently asynchronous and the network is inherently unreliable, so it’s better to embrace this asynchrony than to build on leaky abstractions. Rather than hide these inconveniences, make them explicit and force users to design around them. What you end up with is a more robust, more reliable, and often more performant system. This trade-off is highlighted in the paper “Exactly-once semantics in a replicated messaging system” by Huang et al. while studying the problem of exactly-once delivery:

Thus, server-centric algorithms cannot achieve exactly-once semantics. Instead, we will strive to achieve a weaker notion of correctness.

By relaxing our requirements, we end up with a solution that has less performance overhead and less complexity. Why bother pursuing the impossible? You’re paying a huge premium for something which is probably less reliable than you think while performing poorly. In many cases, it’s better to let the pendulum swing the other direction.

The network is not reliable, which means message delivery is never truly guaranteed—it can only be best-effort. The Two Generals’ Problem shows that it’s provenly impossible for two remote processes to safely agree on a decision. Similarly, the FLP impossibility result shows that, in an asynchronous environment, reliable failure detection is impossible. That is, there’s no way to tell if a process has crashed or is simply taking a long time to respond. Therefore, if it’s possible for a process to crash, it’s impossible for a set of processes to come to an agreement.

If message delivery is not guaranteed and consensus is impossible, is message ordering really that important? Some use cases might actually demand it, but I suspect, more often than not, it’s an artificial constraint. The fact that the network is unreliable, processes are faulty, and distributed communication is asynchronous makes reliable, in-order delivery surprisingly expensive. But doesn’t TCP solve this problem? At the transport level, yes, but that only gets you so far as I’ve been trying to demonstrate.

So you use TCP and process messages with a single thread. Most of the time, it just works. But what happens under heavy load? What happens when message delivery fails? What happens when you need to scale? If you are queuing messages or you have a dead-letter queue or you have network partitions or a crash-recovery model, you’re probably going to encounter duplicate, dropped, or out-of-order messages. Even if the infrastructure provides ordered delivery, these problems will likely manifest themselves at the application level.

If you’re distributed, forget about ordering and start thinking about commutativity. Forget about guaranteed delivery and start thinking about idempotence. Stop thinking about the messaging platform and start thinking about the messaging patterns and business semantics. A pattern which is commutative and idempotent will be far less brittle and more efficient than a system which is totally ordered and “guaranteed.” This is why CRDTs are becoming increasingly popular in the distributed space. Never write code which assumes messages will arrive in order when you can’t write code that will assume they arrive at all.

In the end, think carefully about the business case and what your requirements really are. Can you satisfy them without relying on costly and leaky abstractions or deceptive guarantees? If you can’t, what happens when those guarantees go out the window? This is very similar to understanding what happens when a SLA is not met. Are the performance and complexity trade-offs worth it? What about the operations and business overheads? In my experience, it’s better to confront the intricacies of distributed systems head-on than to sweep them under the rug. Sooner or later, they will rear their ugly heads.