Fast Topic Matching

A common problem in messaging middleware is that of efficiently matching message topics with interested subscribers. For example, assume we have a set of subscribers, numbered 1 to 3:

Subscriber Match Request
1 forex.usd
2 forex.*
3 stock.nasdaq.msft

And we have a stream of messages, numbered 1 to N:

Message Topic
1 forex.gbp
2 stock.nyse.ibm
3 stock.nyse.ge
4 forex.eur
5 forex.usd
N stock.nasdaq.msft

We are then tasked with routing messages whose topics match the respective subscriber requests, where a “*” wildcard matches any word. This is frequently a bottleneck for message-oriented middleware like ZeroMQ, RabbitMQ, ActiveMQ, TIBCO EMS, et al. Because of this, there are a number of well-known solutions to the problem. In this post, I’ll describe some of these solutions, as well as a novel one, and attempt to quantify them through benchmarking. As usual, the code is available on GitHub.

The Naive Solution

The naive solution is pretty simple: use a hashmap that maps topics to subscribers. Subscribing involves adding a new entry to the map (or appending to a list if it already exists). Matching a message to subscribers involves scanning through every entry in the map, checking if the match request matches the message topic, and returning the subscribers for those that do.

Inserts are approximately O(1) and lookups approximately O(n*m) where n is the number of subscriptions and m is the number of words in a topic. This means the performance of this solution is heavily dependent upon how many subscriptions exist in the map and also the access patterns (rate of reads vs. writes). Since most use cases are heavily biased towards searches rather than updates, the naive solution—unsurprisingly—is not a great option.

The microbenchmark below compares the performance of subscribe, unsubscribe, and lookup (matching) operations, first using an empty hashmap (what we call cold) and then with one containing 1,000 randomly generated 5-word topic subscriptions (what we call hot). With the populated subscription map, lookups are about three orders of magnitude slower, which is why we have to use a log scale in the chart below.

subscribe unsubscribe lookup
cold 172ns 51.2ns 787ns
hot 221ns 55ns 815,787ns


Inverted Bitmap

The inverted bitmap technique builds on the observation that lookups are more frequent than updates and assumes that the search space is finite. Consequently, it shifts some of the cost from the read path to the write path. It works by storing a set of bitmaps, one per topic, or criteria, in the search space. Subscriptions are then assigned an increasing number starting at 0. We analyze each subscription to determine the matching criteria and set the corresponding bits in the criteria bitmaps to 1. For example, assume our search space consists of the following set of topics:

  • forex.usd
  • forex.gbp
  • forex.jpy
  • forex.eur
  • stock.nasdaq
  • stock.nyse

We then have the following subscriptions:

  • 0 = forex.* (matches forex.usd, forex.gbp, forex.jpy, and forex.eur)
  • 1 = stock.nyse (matches stock.nyse)
  • 2 = *.* (matches everything)
  • 3 = stock.* (matches stock.nasdaq and stock.nyse)

When we index the subscriptions above, we get the following set of bitmaps:

 Criteria 0 1 2 3
forex.usd 1 0 1 0
forex.gbp 1 0 1 0
forex.jpy 1 0 1 0
forex.eur 1 0 1 0
stock.nasdaq 0 0 1 1
stock.nyse 0 1 1 1

When we match a message, we simply need to lookup the corresponding bitmap and check the set bits. As we see below, subscribe and unsubscribe are quite expensive with respect to the naive solution, but lookups now fall well below half a microsecond, which is pretty good (the fact that the chart below doesn’t use a log scale like the one above should be an indictment of the naive hashmap-based solution).

subscribe unsubscribe lookup
cold 3,795ns 198ns 380ns
hot 3,863ns 198ns 395ns

The inverted bitmap is a better option than the hashmap when we have a read-heavy workload. One limitation is it requires us to know the search space ahead of time or otherwise requires reindexing which, frankly, is prohibitively expensive.

Optimized Inverted Bitmap

The inverted bitmap technique works well enough, but only if the topic space is fairly static. It also falls over pretty quickly when the topic space and number of subscriptions are large, say, millions of topics and thousands of subscribers. The main benefit of topic-based routing is it allows for faster matching algorithms in contrast to content-based routing, which can be exponentially slower. The truth is, to be useful, your topics probably consist of stock.nyse.ibm, stock.nyse.ge, stock.nasdaq.msft, stock.nasdaq.aapl, etc., not stock.nyse and stock.nasdaq. We could end up with an explosion of topics and, even with efficient bitmaps, the memory consumption tends to be too high despite the fact that most of the bitmaps are quite sparse.

Fortunately, we can reduce the amount of memory we consume using a fairly straightforward optimization. Rather than requiring the entire search space a priori, we simply require the max topic size, in terms of words, e.g. stock.nyse.ibm has a size of 3. We can handle topics of the max size or less, e.g. stock.nyse.bac, stock.nasdaq.txn, forex.usd, index, etc. If we see a message with more words than the max, we can safely assume there are no matching subscriptions.

The optimized inverted bitmap works by splitting topics into their constituent parts. Each constituent position has a set of bitmaps, and we use a technique similar to the one described above on each part. We end up with a bitmap for each constituent which we perform a logical AND on to give a resulting bitmap. Each 1 in the resulting bitmap corresponds to a subscription. This means if the max topic size is n, we only AND at most n bitmaps. Furthermore, if we come across any empty bitmaps, we can stop early since we know there are no matching subscribers.

Let’s say our max topic size is 2 and we have the following subscriptions:

  • 0 = forex.*
  • 1 = stock.nyse
  • 2 = index
  • 3 = stock.*

The inverted bitmap for the first constituent looks like the following:

forex.* stock.nyse index stock.*
null 0 0 0 0
forex 1 0 0 0
stock 0 1 0 1
index 0 0 1 0
other 0 0 0 0

And the second constituent bitmap:

forex.* stock.nyse index stock.*
null 0 0 1 0
nyse 0 1 0 0
other 1 0 0 1

The “null” and “other” rows are worth pointing out. “Null” simply means the topic has no corresponding constituent.  For example, “index” has no second constituent, so “null” is marked. “Other” allows us to limit the number of rows needed such that we only need the ones that appear in subscriptions.  For example, if messages are published on forex.eur, forex.usd, and forex.gbp but I merely subscribe to forex.*, there’s no need to index eur, usd, or gbp. Instead, we just mark the “other” row which will match all of them.

Let’s look at an example using the above bitmaps. Imagine we want to route a message published on forex.eur. We split the topic into its constituents: “forex” and “eur.” We get the row corresponding to “forex” from the first constituent bitmap, the one corresponding to “eur” from the second (other), and then AND the rows.

forex.* stock.nyse index stock.*
1 = forex 1 0 0 0
2 = other 1 0 0 1
AND 1 0 0 0

The forex.* subscription matches.

Let’s try one more example: a message published on stock.nyse.

forex.* stock.nyse index stock.*
1 = stock 0 1 0 1
2 = nyse 0 1 0 1
AND 0 1 0 1

In this case, we also need to OR the “other” row for the second constituent. This gives us a match for stock.nyse and stock.*.

Subscribe operations are significantly faster with the space-optimized inverted bitmap compared to the normal inverted bitmap, but lookups are much slower. However, the optimized version consumes roughly 4.5x less memory for every subscription. The increased flexibility and improved scalability makes the optimized version a better choice for all but the very latency-sensitive use cases.

subscribe unsubscribe lookup
cold 1,053ns 330ns 2,724ns
hot 1,076ns 371ns 3,337ns

Trie

The optimized inverted bitmap improves space complexity, but it does so at the cost of lookup efficiency. Is there a way we can reconcile both time and space complexity? While inverted bitmaps allow for efficient lookups, they are quite wasteful for sparse sets, even when using highly compressed bitmaps like Roaring bitmaps.

Tries can often be more space efficient in these circumstances. When we add a subscription, we descend the trie, adding nodes along the way as necessary, until we run out of words in the topic. Finally, we add some metadata containing the subscription information to the last node in the chain. To match a message topic, we perform a similar traversal. If a node doesn’t exist in the chain, we know there are no subscribers. One downside of this method is, in order to support wildcards, we must backtrack on a literal match and check the “*” branch as well.

For the given set of subscriptions, the trie would look something like the following:

  • forex.*
  • stock.nyse
  • index
  • stock.*

You might be tempted to ask: “why do we even need the “*” nodes? When someone subscribes to stock.*, just follow all branches after “stock” and add the subscriber.” This would indeed move the backtracking cost from the read path to the write path, but—like the first inverted bitmap we looked at—it only works if the search space is known ahead of time. It would also largely negate the memory-usage benefits we’re looking for since it would require pre-indexing all topics while requiring a finite search space.

It turns out, this trie technique is how systems like ZeroMQ and RabbitMQ implement their topic matching due to its balance between space and time complexity and overall performance predictability.

subscribe unsubscribe lookup
cold 406ns 221ns 2,145ns
hot 443ns 257ns 2,278ns

We can see that, compared to the optimized inverted bitmap, the trie performs much more predictably with relation to the number of subscriptions held.

Concurrent Subscription Trie

One thing we haven’t paid much attention to so far is concurrency. Indeed, message-oriented middleware is typically highly concurrent since they have to deal with heavy IO (reading messages from the wire, writing messages to the wire, reading messages from disk, writing messages to disk, etc.) and CPU operations (like topic matching and routing). Subscribe, unsubscribe, and lookups are usually all happening in different threads of execution. This is especially important when we want to talk advantage of multi-core processors.

It wasn’t shown, but all of the preceding algorithms used global locks to ensure thread safety between read and write operations, making the data structures safe for concurrent use. However, the microbenchmarks don’t really show the impact of this, which we will see momentarily.

Lock-freedom, which I’ve written about, allows us to increase throughput at the expense of increased tail latency.

Lock-free concurrency means that while a particular thread of execution may be blocked, all CPUs are able to continue processing other work. For example, imagine a program that protects access to some resource using a mutex. If a thread acquires this mutex and is subsequently preempted, no other thread can proceed until this thread is rescheduled by the OS. If the scheduler is adversarial, it may never resume execution of the thread, and the program would be effectively deadlocked. A key point, however, is that the mere lack of a lock does not guarantee a program is lock-free. In this context, “lock” really refers to deadlock, livelock, or the misdeeds of a malevolent scheduler.

The concurrent subscription trie, or CS-trie,  is a new take on the trie-based solution described earlier. It combines the idea of the topic-matching trie with that of a Ctrie, or concurrent trie, which is a non-blocking concurrent hash trie.

The fundamental problem with the trie, as it relates to concurrency, is it requires a global lock, which severely limits throughput. To address this, the CS-trie uses indirection nodes, or I-nodes, which remain present in the trie even as the nodes above and below change. Subscriptions are then added or removed by creating a copy of the respective node, and performing a CAS on its parent I-node. This allows us to add, remove, and lookup subscriptions concurrently and in a lock-free, linearizable manner.

For the given set of subscribers, labeled x, y, and z, the CS-trie would look something like the following:

  • x = foo, bar, bar.baz
  • y = foo, bar.qux
  • z = bar.*

Lookups on the CS-trie perform, on average, better than the standard trie, and the CS-trie scales better with respect to concurrent operations.

subscribe unsubscribe lookup
cold 412ns 245ns 1,615ns
hot 471ns 280ns 1,637ns

Latency Comparison

The chart below shows the topic-matching operation latencies for all of the algorithms side-by-side. First, we look at the performance of a cold start (no subscriptions) and then the performance of a hot start (1,000 subscriptions).

Throughput Comparison

So far, we’ve looked at the latency of individual topic-matching operations. Next, we look at overall throughput of each of the algorithms and their memory footprint.

 algorithm msg/sec
naive  4,053.48
inverted bitmap  1,052,315.02
optimized inverted bitmap  130,705.98
trie  248,762.10
cs-trie  340,910.64

On the surface, the inverted bitmap looks like the clear winner, clocking in at over 1 million matches per second. However, we know the inverted bitmap does not scale and, indeed, this becomes clear when we look at memory consumption, underscored by the fact that the below chart uses a log scale.

Scalability with Respect to Concurrency

Lastly, we’ll look at how each of these algorithms scales with respect to concurrency. We do this by performing concurrent operations and varying the level of concurrency and number of operations. We start with a 50-50 split between reads and writes. We vary the number of goroutines from 2 to 16 (the benchmark was run using a 2.6 GHz Intel Core i7 processor with 8 logical cores). Each goroutine performs 1,000 reads or 1,000 writes. For example, the 2-goroutine benchmark performs 1,000 reads and 1,000 writes, the 4-goroutine benchmark performs 2,000 reads and 2,000 writes, etc. We then measure the total amount of time needed to complete the workload.

We can see that the tries hardly even register on the scale above, so we’ll plot them separately.

The tries are clearly much more efficient than the other solutions, but the CS-trie in particular scales well to the increased workload and concurrency.

Since most workloads are heavily biased towards reads over writes, we’ll run a separate benchmark that uses a 90-10 split reads and writes. This should hopefully provide a more realistic result.

The results look, more or less, like what we would expect, with the reduced writes improving the inverted bitmap performance. The CS-trie still scales quite well in comparison to the global-lock trie.

Conclusion

As we’ve seen, there are several approaches to consider to implement fast topic matching. There are also several aspects to look at: read/write access patterns, time complexity, space complexity, throughput, and latency.

The naive hashmap solution is generally a poor choice due to its prohibitively expensive lookup time. Inverted bitmaps offer a better solution. The standard implementation is reasonable if the search space is finite, small, and known a priori, especially if read latency is critical. The space-optimized version is a better choice for scalability, offering a good balance between read and write performance while keeping a small memory footprint. The trie is an even better choice, providing lower latency than the optimized inverted bitmap and consuming less memory. It’s particularly good if the subscription tree is sparse and topics are not known a priori. Lastly, the concurrent subscription trie is the best option if there is high concurrency and throughput matters. It offers similar performance to the trie but scales better. The only downside is an increase in implementation complexity.

Benchmarking Commit Logs

In this article, we look at Apache Kafka and NATS Streaming, two messaging systems based on the idea of a commit log. We’ll compare some of the features of both but spend less time talking about Kafka since by now it’s quite well known. Similar to previous studies, we’ll attempt to quantify their general performance characteristics through careful benchmarking.

The purpose of this benchmark is to test drive the newly released NATS Streaming system, which was made generally available just in the last few months. NATS Streaming doesn’t yet support clustering, so we try to put its performance into context by looking at a similar configuration of Kafka.

Unlike conventional message queues, commit logs are an append-only data structure. This results in several nice properties like total ordering of messages, at-least-once delivery, and message-replay semantics. Jay Kreps’ blog post The Log is a great introduction to the concept and particularly why it’s so useful in the context of distributed systems and stream processing (his book I Heart Logs is an extended version of the blog post and is a quick read).

Kafka, which originated at LinkedIn, is by far the most popular and most mature implementation of the commit log (AWS offers their own flavor of it called Kinesis, and imitation is the sincerest form of flattery). It’s billed as a “distributed streaming platform for building real-time data pipelines and streaming apps.” The much newer NATS Streaming is actually a data-streaming layer built on top of Apcera’s high-performance publish-subscribe system NATS. It’s billed as “real-time streaming for Big Data, IoT, Mobile, and Cloud Native Applications.” Both have some similarities as well as some key differences.

Fundamental to the notion of a log is a way to globally order events. Neither NATS Streaming nor Kafka are actually a single log but many logs, each totally ordered using a sequence number or offset, respectively.

In Kafka, topics are partitioned into multiple logs which are then replicated across a number of servers for fault tolerance, making it a distributed commit log. Each partition has a server that acts as the leader. Cluster membership and leader election is managed by ZooKeeper.

NATS Streaming’s topics are called “channels” which are globally ordered. Unlike Kafka, NATS Streaming does not support replication or partitioning of channels, though my understanding is clustering support is slated for Q1 2017. Its message store is pluggable, so it can provide durability using a file-backed implementation, like Kafka, or simply an in-memory store.

NATS Streaming is closer to a hybrid of traditional message queues and the commit log. Like Kafka, it allows replaying the log from a specific offset, the beginning of time, or the newest offset, but it also exposes an API for reading from the log at a specific physical time offset, e.g. all messages from the last 30 seconds. Kafka, on the other hand, only has a notion of logical offsets (correction: Kafka added support for offset lookup by timestamp in 0.10.1.0) . Generally, relying on physical time is an anti-pattern in distributed systems due to clock drift and the fact that clocks are not always monotonic. For example, imagine a situation where a NATS Streaming server is restarted and the clock is changed. Messages are still ordered by their sequence numbers but their timestamps might not reflect that. Developers would need to be aware of this while implementing their business logic.

With Kafka, it’s strictly on consumers to track their offset into the log (or the high-level consumer which stores offsets in ZooKeeper (correction: Kafka itself can now store offsets which is used by the new Consumer API, meaning clients do not have to manage offsets directly or rely on ZooKeeper)). NATS Streaming allows clients to either track their sequence number or use a durable subscription, which causes the server to track the last acknowledged message for a client. If the client restarts, the server will resume delivery starting at the earliest unacknowledged message. This is closer to what you would expect from a traditional message-oriented middleware like RabbitMQ.

Lastly, NATS Streaming supports publisher and subscriber rate limiting. This works by configuring the maximum number of in-flight (unacknowledged) messages either from the publisher to the server or from the server to the subscriber. Starting in version 0.9, Kafka supports a similar rate limiting feature that allows producer and consumer byte-rate thresholds to be defined for groups of clients with its Quotas protocol.

Kafka was designed to avoid tracking any client state on the server for performance and scalability reasons. Throughput and storage capacity scale linearly with the number of nodes. NATS Streaming provides some additional features over Kafka at the cost of some added state on the server. Since clustering isn’t supported, there isn’t really any scale or HA story yet, so it’s unclear how that will play out. That said, once replication is supported, there’s a lot of work going into verifying its correctness (which is a major advantage Kafka has).

Benchmarks

Since NATS Streaming does not support replication at this time (0.3.1), we’ll compare running a single instance of it with file-backed persistence to running a single instance of Kafka (0.10.1.0). We’ll look at both latency and throughput running on commodity hardware (m4.xlarge EC2 instances) with load generation and consumption each running on separate instances. In all of these benchmarks, the systems under test have not been tuned at all and are essentially in their “off-the-shelf” configurations.

We’ll first look at latency by publishing messages of various sizes, ranging from 256 bytes to 1MB, at a fixed rate of 50 messages/second for 30 seconds. Message contents are randomized to account for compression. We then plot the latency distribution by percentile on a logarithmic scale from the 0th percentile to the 99.9999th percentile. Benchmarks are run several times in an attempt to produce a “normalized” result. The benchmark code used is open source.

First, to establish a baseline and later get a feel for the overhead added by the file system, we’ll benchmark NATS Streaming with in-memory storage, meaning messages are not written to disk.

Unsurprisingly, the 1MB configuration has much higher latencies than the other configurations, but everything falls within single-digit-millisecond latencies.nats_mem

NATS Streaming 0.3.1 (in-memory persistence)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 0.3750ms 1.0367ms 1.1257ms 1.1257ms 1.1257ms
1KB 0.38064ms 0.8321ms 1.3260ms 1.3260ms 1.3260ms
5KB 0.4408ms 1.7569ms 2.1465ms 2.1465ms 2.1465ms
1MB 6.6337ms 8.8097ms 9.5263ms 9.5263ms 9.5263ms

Next, we look at NATS Streaming with file-backed persistence. This provides the same durability guarantees as Kafka running with a replication factor of 1. By default, Kafka stores logs under /tmp. Many Unix distributions mount /tmp to tmpfs which appears as a mounted file system but is actually stored in volatile memory. To account for this and provide as level a playing field as possible, we configure NATS Streaming to also store its logs in /tmp.

As expected, latencies increase by about an order of magnitude once we start going to disk.

nats_file_fsync

NATS Streaming 0.3.1 (file-backed persistence)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 21.7051ms 25.0369ms 27.0524ms 27.0524ms 27.0524ms
1KB 20.6090ms 23.8858ms 24.7124ms 24.7124ms 24.7124ms
5KB 22.1692ms 35.7394ms 40.5612ms 40.5612ms 40.5612ms
1MB 45.2490ms 130.3972ms 141.1564ms 141.1564ms 141.1564ms

Since we will be looking at Kafka, there is an important thing to consider relating to fsync behavior. As of version 0.8, Kafka does not call fsync directly and instead relies entirely on the background flush performed by the OS. This is clearly indicated by their documentation:

We recommend using the default flush settings which disable application fsync entirely. This means relying on the background flush done by the OS and Kafka’s own background flush. This provides the best of all worlds for most uses: no knobs to tune, great throughput and latency, and full recovery guarantees. We generally feel that the guarantees provided by replication are stronger than sync to local disk, however the paranoid still may prefer having both and application level fsync policies are still supported.

However, NATS Streaming calls fsync every time a batch is written to disk by default. This can be disabled through the use of the –file_sync flag. By setting this flag to false, we put NATS Streaming’s persistence behavior closer in line with Kafka’s (again assuming a replication factor of 1).

As an aside, the comparison between NATS Streaming and Kafka still isn’t completely “fair”. Jay Kreps points out that Kafka relies on replication as the primary means of durability.

Kafka leaves [fsync] off by default because it relies on replication not fsync for durability, which is generally faster. If you don’t have replication I think you probably need fsync and maybe some kind of high integrity file system.

I don’t think we can provide a truly fair comparison until NATS Streaming supports replication, at which point we will revisit this.

To no one’s surprise, setting –file_sync=false has a significant impact on latency, shown in the distribution below.

nats_file_no_fsync

In fact, it’s now in line with the in-memory performance as before for 256B, 1KB, and 5KB messages, shown in the comparison below.

nats_file_mem

For a reason I have yet to figure out, the latency for 1MB messages is roughly an order of magnitude faster when fsync is enabled after the 95th percentile, which seems counterintuitive. If anyone has an explanation, I would love to hear it. I’m sure there’s a good debug story there. The distribution below shows the 1MB configuration for NATS Streaming with and without fsync enabled and just how big the difference is at the 95th percentile and beyond.

nats_file_mem_1mb

NATS Streaming 0.3.1 (file-backed persistence, –file_sync=false)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 0.4304ms 0.8577ms 1.0706ms 1.0706ms 1.0706ms
1KB 0.4372ms 1.5987ms 1.8651ms 1.8651ms 1.8651ms
5KB 0.4939ms 2.0828ms 2.2540ms 2.2540ms 2.2540ms
1MB 1296.1464ms 1556.1441ms 1596.1457ms 1596.1457ms 1596.1457ms

Kafka with replication factor 1 tends to have higher latencies than NATS Streaming with –file_sync=false. There was one potential caveat here Ivan Kozlovic pointed out to me in that NATS Streaming uses a caching optimization for reads that may put it at an advantage.

Now, there is one side where NATS Streaming *may* be looking better and not fair to Kafka. By default, the file store keeps everything in memory once stored. This means look-ups will be fast. There is only a all-or-nothing mode right now, which means either cache everything or nothing. With caching disabled (–file_cache=false), every lookup will result in disk access (which when you have 1 to many subscribers will be bad). I am working on changing that. But if you do notice that in Kafka, consuming results in a disk read (given the other default behavior described above, they actually may not ;-)., then you could disable NATS Streaming file caching.

Fortunately, we can verify if Kafka is actually going to disk to read messages back from the log during the benchmark using iostat. We see something like this for the majority of the benchmark duration:

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          13.53    0.00   11.28    0.00    0.00   75.19

Device:    tps   Blk_read/s   Blk_wrtn/s   Blk_read   Blk_wrtn
xvda      0.00         0.00         0.00          0          0

Specifically, we’re interested in Blk_read, which indicates the total number of blocks read. It appears that Kafka does indeed make heavy use of the operating system’s page cache as Blk_wrtn and Blk_read rarely show any activity throughout the entire benchmark. As such, it seems fair to leave NATS Streaming’s –file_cache=true, which is the default.

One interesting point is Kafka offloads much of its caching to the page cache and outside of the JVM heap, clearly in an effort to minimize GC pauses. I’m not clear if the cache Ivan refers to in NATS Streaming is off-heap or not (NATS Streaming is written in Go which, like Java, is a garbage-collected language).

Below is the distribution of latencies for 256B, 1KB, and 5KB configurations in Kafka.

kafka

Similar to NATS Streaming, 1MB message latencies tend to be orders of magnitude worse after about the 80th percentile. The distribution below compares the 1MB configuration for NATS Streaming and Kafka.

nats_kafka_1mb

Kafka 0.10.1.0 (replication factor 1)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 0.9230ms 1.4575ms 1.6596ms 1.6596ms 1.6596ms
1KB 0.5942ms 1.3123ms 17.6556ms 17.6556ms 17.6556ms
5KB 0.7203ms 5.7236ms 18.9334ms 18.9334ms 18.9334ms
1MB 5337.3174ms 5597.3315ms 5617.3199ms 5617.3199ms 5617.3199ms

The percentile distributions below compare NATS Streaming and Kafka for the 256B, 1KB, and 5KB configurations, respectively.

nats_kafka_256b

nats_kafka_1kb

nats_kafka_5kb

Next, we’ll look at overall throughput for the two systems. This is done by publishing 100,000 messages using the same range of sizes as before and measuring the elapsed time. Specifically, we measure throughput at the publisher and the subscriber.

Despite using an asynchronous publisher in both the NATS Streaming and Kafka benchmarks, we do not consider the publisher “complete” until it has received acks for all published messages from the server. In Kafka, we do this by setting request.required.acks to 1, which means the leader replica has received the data, and consuming the received acks. This is important because the default value is 0, which means the producer never waits for an ack from the broker. In NATS Streaming, we provide an ack callback on every publish. We use the same benchmark configuration as the latency tests, separating load generation and consumption on different EC2 instances. Note the log scale in the following charts.

Once again, we’ll start by looking at NATS Streaming using in-memory persistence. The truncated 1MB send and receive throughputs are 93.01 messages/second.

nats_mem_throughput

For comparison, we now look at NATS Streaming with file persistence and –file_sync=false. As before, this provides the closest behavior to Kafka’s default flush behavior. The second chart shows a side-by-side comparison between NATS Streaming with in-memory and file persistence.

nats_file_throughput

nats_compare_throughput

Lastly, we look at Kafka with replication factor 1. Throughput significantly deteriorates when we set request.required.acks = 1 since the producer must wait for all acks from the server. This is important though because, by default, the client does not require an ack from the server. If this were the case, the producer would have no idea how much data actually reached the server once it finished—it could simply be buffered in the client, in flight over the wire, or in the server but not yet on disk. Running the benchmark with request.required.acks = 0 yields much higher throughput on the sender but is basically an exercise in how fast you can write to a channel using the Sarama Go client—slightly misleading.

kafka_throughput

Looking at some comparisons of Kafka and NATS Streaming, we can see that NATS Streaming has higher throughput in all but a few cases.

nats_kafka_throughput

nats_kafka_send_throughput

I want to repeat the disclaimer from before: the purpose of this benchmark is to test drive the newly released NATS Streaming system (which as mentioned earlier, doesn’t yet support clustering), and put its performance into context by looking at a similar configuration of Kafka.

Kafka generally scales very well, so measuring the throughput of a single broker with a single producer and single consumer isn’t particularly meaningful. In reality, we’d be running a cluster with several brokers and partitioning our topics across them.

For as young as it is, NATS Streaming has solid performance (which shouldn’t come as much of a surprise considering the history of NATS itself), and I imagine it will only get better with time as the NATS team continues to optimize. In some ways, NATS Streaming bridges the gap between the commit log as made popular by Kafka and the conventional message queue as made popular by protocols like JMS, AMQP, STOMP, and the like.

The bigger question at this point is how NATS Streaming will tackle scaling and replication (a requirement for true production-readiness in my opinion). Kafka was designed from the ground up for high scalability and availability through the use of external coordination (read ZooKeeper). Naturally, there is a lot of complexity and cost that comes with that. NATS Streaming attempts to keep NATS’ spirit of simplicity, but it’s yet to be seen how it will reconcile that with the complex nature of distributed systems. I’m excited to see where Apcera takes NATS Streaming and generally the NATS ecosystem in the future since the team has a lot of experience in this area.

So You Wanna Go Fast?

I originally proposed this as a GopherCon talk on writing “high-performance Go”, which is why it may seem rambling, incoherent, and—at times—not at all related to Go. The talk was rejected (probably because of the rambling and incoherence), but I still think it’s a subject worth exploring. The good news is, since it was rejected, I can take this where I want. The remainder of this piece is mostly the outline of that talk with some parts filled in, some meandering stories which may or may not pertain to the topic, and some lessons learned along the way. I think it might make a good talk one day, but this will have to do for now.

We work on some interesting things at Workiva—graph traversal, distributed and in-memory calculation engines, low-latency messaging systems, databases optimized for two-dimensional data computation. It turns out, when you want to build a complicated financial-reporting suite with the simplicity and speed of Microsoft Office, and put it entirely in the cloud, you can’t really just plumb some crap together and call it good. It also turns out that when you try to do this, performance becomes kind of important, not because of the complexity of the data—after all, it’s mostly just numbers and formulas—but because of the scale of it. Now, distribute that data in the cloud, consider the security and compliance implications associated with it, add in some collaboration and control mechanisms, and you’ve got yourself some pretty monumental engineering problems.

As I hinted at, performance starts to be really important, whether it’s performing a formula evaluation, publishing a data-change event, or opening up a workbook containing a million rows of data (accountants are weird). A lot of the backend systems powering all of this are, for better or worse, written in Go. Go is, of course, a garbage-collected language, and it compares closely to Java (though the latter has over 20 years invested in it, while the former has about seven).

At this point, you might be asking, “why not C?” It’s honestly a good question to ask, but the reality is there is always history. The first solution was written in Python on Google App Engine (something about MVPs, setting your customers’ expectations low, and giving yourself room to improve?). This was before Go was even a thing, though Java and C were definitely things, but this was a startup. And it was Python. And it was on App Engine. I don’t know exactly what led to those combination of things—I wasn’t there—but, truthfully, App Engine probably played a large role in the company’s early success. Python and App Engine were fast. Not like “this code is fucking fast” fast—what we call performance—more like “we need to get this shit working so we have jobs tomorrow” fast—what we call delivery. I don’t envy that kind of fast, but when you’re a startup trying to disrupt, speed to market matters a hell of a lot more than the speed of your software.

I’ve talked about App Engine at length before. Ultimately, you hit the ceiling of what you can do with it, and you have to migrate off (if you’re a business that is trying to grow, anyway). We hit that migration point at a really weird, uncomfortable time. This was right when Docker was starting to become a thing, and microservices were this thing that everybody was talking about but nobody was doing. Google had been successfully using containers for years, and Netflix was all about microservices. Everybody wanted to be like them, but no one really knew how—but it was the future (unikernels are the new future, by the way).

The problem is—coming from a PaaS like App Engine that does your own laundry—you don’t have the tools, skills, or experience needed to hit the ground running, so you kind of drunkenly stumble your way there. You don’t even have a DevOps team because you didn’t need one! Nobody knew how to use Docker, which is why at the first Dockercon, five people got on stage and presented five solutions to the same problem. It was the blind leading the blind. I love this article by Jesper L. Andersen, How to build stable systems, which contains a treasure trove of practical engineering tips. The very last paragraph of the article reads:

Docker is not mature (Feb 2016). Avoid it in production for now until it matures. Currently Docker is a time sink not fulfilling its promises. This will change over time, so know when to adopt it.

Trying to build microservices using Docker while everyone is stumbling over themselves was, and continues to be, a painful process, exacerbated by the heavy weight suddenly lifted by leaving App Engine. It’s not great if you want to go fast. App Engine made scaling easy by restricting you in what you could do, but once that burden was removed, it was off to the races. What people might not have realized, however, was that App Engine also made distributed systems easy by restricting you in what you could do. Some seem to think the limitations enforced by App Engine are there to make their lives harder or make Google richer (trust me, they’d bill you more if they could), so why would we have similar limitations in our own infrastructure? App Engine makes these limitations, of course, so that it can actually scale. Don’t take that for granted.

App Engine was stateless, so the natural tendency once you’re off it was to make everything stateful. And we did. What I don’t think we realized was that we were, in effect, trading one type of fast for the other—performance for delivery. We can build software that’s fast and runs on your desktop PC like in the 90’s, but now you want to put that in the cloud and make it scale? It takes a big infrastructure investment. It also takes a big time investment. Neither of which are good if you want to go fast, especially when you’re using enough microservices, Docker, and Go to rattle the Hacker News fart chamber. You kind of get caught in this endless rut of innovation that you almost lose your balance. Leaving the statelessness of App Engine for more stateful pastures was sort of like an infant learning to walk. You look down and it dawns on you—you have legs! So you run with it, because that’s amazing, and you stumble spectacularly a few times along the way. Finally, you realize maybe running full speed isn’t the best idea for someone who just learned to walk.

We were also making this transition while Go had started reaching critical mass. Every other headline in the tech aggregators was “why we switched to Go and you should too.” And we did. I swear this post has a point.

Tips for Writing High-Performance Go

By now, I’ve forgotten what I was writing about, but I promised this post was about Go. It is, and it’s largely about performance fast, not delivery fast—the two are often at odds with each other. Everything up until this point was mostly just useless context and ranting. But it also shows you that we are solving some hard problems and why we are where we are. There is always history.

I work with a lot of smart people. Many of us have a near obsession with performance, but the point I was attempting to make earlier is we’re trying to push the boundaries of what you can expect from cloud software. App Engine had some rigid boundaries, so we made a change. Since adopting Go, we’ve learned a lot about how to make things fast and how to make Go work in the world of systems programming.

Go’s simplicity and concurrency model make it an appealing choice for backend systems, but the larger question is how does it fare for latency-sensitive applications? Is it worth sacrificing the simplicity of the language to make it faster? Let’s walk through a few areas of performance optimization in Go—namely language features, memory management, and concurrency—and try to make that determination. All of the code for the benchmarks presented here are available on GitHub.

Channels

Channels in Go get a lot of attention because they are a convenient concurrency primitive, but it’s important to be aware of their performance implications. Usually the performance is “good enough” for most cases, but in certain latency-critical situations, they can pose a bottleneck. Channels are not magic. Under the hood, they are just doing locking. This works great in a single-threaded application where there is no lock contention, but in a multithreaded environment, performance significantly degrades. We can mimic a channel’s semantics quite easily using a lock-free ring buffer.

The first benchmark looks at the performance of a single-item-buffered channel and ring buffer with a single producer and single consumer. First, we look at the performance in the single-threaded case (GOMAXPROCS=1).

BenchmarkChannel 3000000 512 ns/op
BenchmarkRingBuffer 20000000 80.9 ns/op

As you can see, the ring buffer is roughly six times faster (if you’re unfamiliar with Go’s benchmarking tool, the first number next to the benchmark name indicates the number of times the benchmark was run before giving a stable result). Next, we look at the same benchmark with GOMAXPROCS=8.

BenchmarkChannel-8 3000000 542 ns/op
BenchmarkRingBuffer-8 10000000 182 ns/op

The ring buffer is almost three times faster.

Channels are often used to distribute work across a pool of workers. In this benchmark, we look at performance with high read contention on a buffered channel and ring buffer. The GOMAXPROCS=1 test shows how channels are decidedly better for single-threaded systems.

BenchmarkChannelReadContention 10000000 148 ns/op
BenchmarkRingBufferReadContention 10000 390195 ns/op

However, the ring buffer is faster in the multithreaded case:

BenchmarkChannelReadContention-8 1000000 3105 ns/op
BenchmarkRingBufferReadContention-8 3000000 411 ns/op

Lastly, we look at performance with contention on both the reader and writer. Again, the ring buffer’s performance is much worse in the single-threaded case but better in the multithreaded case.

BenchmarkChannelContention 10000 160892 ns/op
BenchmarkRingBufferContention 2 806834344 ns/op
BenchmarkChannelContention-8 5000 314428 ns/op
BenchmarkRingBufferContention-8 10000 182557 ns/op

The lock-free ring buffer achieves thread safety using only CAS operations. We can see that deciding to use it over the channel depends largely on the number of OS threads available to the program. For most systems, GOMAXPROCS > 1, so the lock-free ring buffer tends to be a better option when performance matters. Channels are a rather poor choice for performant access to shared state in a multithreaded system.

Defer

Defer is a useful language feature in Go for readability and avoiding bugs related to releasing resources. For example, when we open a file to read, we need to be careful to close it when we’re done. Without defer, we need to ensure the file is closed at each exit point of the function.

This is really error-prone since it’s easy to miss a return point. Defer solves this problem by effectively adding the cleanup code to the stack and invoking it when the enclosing function returns.

At first glance, one would think defer statements could be completely optimized away by the compiler. If I defer something at the beginning of a function, simply insert the closure at each point the function returns. However, it’s more complicated than this. For example, we can defer a call within a conditional statement or a loop. The first case might require the compiler to track the condition leading to the defer. The compiler would also need to be able to determine if a statement can panic since this is another exit point for a function. Statically proving this seems to be, at least on the surface, an undecidable problem.

The point is defer is not a zero-cost abstraction. We can benchmark it to show the performance overhead. In this benchmark, we compare locking a mutex and unlocking it with a defer in a loop to locking a mutex and unlocking it without defer.

BenchmarkMutexDeferUnlock-8 20000000 96.6 ns/op
BenchmarkMutexUnlock-8 100000000 19.5 ns/op

Using defer is almost five times slower in this test. To be fair, we’re looking at a difference of 77 nanoseconds, but in a tight loop on a critical path, this adds up. One trend you’ll notice with these optimizations is it’s usually up to the developer to make a trade-off between performance and readability. Optimization rarely comes free.

Reflection and JSON

Reflection is generally slow and should be avoided for latency-sensitive applications. JSON is a common data-interchange format, but Go’s encoding/json package relies on reflection to marshal and unmarshal structs. With ffjson, we can use code generation to avoid reflection and benchmark the difference.

BenchmarkJSONReflectionMarshal-8 200000 7063 ns/op
BenchmarkJSONMarshal-8 500000 3981 ns/op

BenchmarkJSONReflectionUnmarshal-8 200000 9362 ns/op
BenchmarkJSONUnmarshal-8 300000 5839 ns/op

Code-generated JSON is about 38% faster than the standard library’s reflection-based implementation. Of course, if we’re concerned about performance, we should really avoid JSON altogether. MessagePack is a better option with serialization code that can also be generated. In this benchmark, we use the msgp library and compare its performance to JSON.

BenchmarkMsgpackMarshal-8 3000000 555 ns/op
BenchmarkJSONReflectionMarshal-8 200000 7063 ns/op
BenchmarkJSONMarshal-8 500000 3981 ns/op

BenchmarkMsgpackUnmarshal-8 20000000 94.6 ns/op
BenchmarkJSONReflectionUnmarshal-8 200000 9362 ns/op
BenchmarkJSONUnmarshal-8 300000 5839 ns/op

The difference here is dramatic. Even when compared to the generated JSON serialization code, MessagePack is significantly faster.

If we’re really trying to micro-optimize, we should also be careful to avoid using interfaces, which have some overhead not just with marshaling but also method invocations. As with other kinds of dynamic dispatch, there is a cost of indirection when performing a lookup for the method call at runtime. The compiler is unable to inline these calls.

BenchmarkJSONReflectionUnmarshal-8 200000 9362 ns/op
BenchmarkJSONReflectionUnmarshalIface-8 200000 10099 ns/op

We can also look at the overhead of the invocation lookup, I2T, which converts an interface to its backing concrete type. This benchmark calls the same method on the same struct. The difference is the second one holds a reference to an interface which the struct implements.

BenchmarkStructMethodCall-8 2000000000 0.44 ns/op
BenchmarkIfaceMethodCall-8 1000000000 2.97 ns/op

Sorting is a more practical example that shows the performance difference. In this benchmark, we compare sorting a slice of 1,000,000 structs and 1,000,000 interfaces backed by the same struct. Sorting the structs is nearly 92% faster than sorting the interfaces.

BenchmarkSortStruct-8 10 105276994 ns/op
BenchmarkSortIface-8 5 286123558 ns/op

To summarize, avoid JSON if possible. If you need it, generate the marshaling and unmarshaling code. In general, it’s best to avoid code that relies on reflection and interfaces and instead write code that uses concrete types. Unfortunately, this often leads to a lot of duplicated code, so it’s best to abstract this with code generation. Once again, the trade-off manifests.

Memory Management

Go doesn’t actually expose heap or stack allocation directly to the user. In fact, the words “heap” and “stack” do not appear anywhere in the language specification. This means anything pertaining to the stack and heap are technically implementation-dependent. In practice, of course, Go does have a stack per goroutine and a heap. The compiler does escape analysis to determine if an object can live on the stack or needs to be allocated in the heap.

Unsurprisingly, avoiding heap allocations can be a major area of optimization. By allocating on the stack, we avoid expensive malloc calls, as the benchmark below shows.

BenchmarkAllocateHeap-8 20000000 62.3 ns/op 96 B/op 1 allocs/op
BenchmarkAllocateStack-8 100000000 11.6 ns/op 0 B/op 0 allocs/op

Naturally, passing by reference is faster than passing by value since the former requires copying only a pointer while the latter requires copying values. The difference is negligible with the struct used in these benchmarks, though it largely depends on what has to be copied. Keep in mind there are also likely some compiler optimizations being performed in this synthetic benchmark.

BenchmarkPassByReference-8 1000000000 2.35 ns/op
BenchmarkPassByValue-8 200000000 6.36 ns/op

However, the larger issue with heap allocation is garbage collection. If we’re creating lots of short-lived objects, we’ll cause the GC to thrash. Object pooling becomes quite important in these scenarios. In this benchmark, we compare allocating structs in 10 concurrent goroutines on the heap vs. using a sync.Pool for the same purpose. Pooling yields a 5x improvement.

BenchmarkConcurrentStructAllocate-8 5000000 337 ns/op
BenchmarkConcurrentStructPool-8 20000000 65.5 ns/op

It’s important to point out that Go’s sync.Pool is drained during garbage collection. The purpose of sync.Pool is to reuse memory between garbage collections. One can maintain their own free list of objects to hold onto memory across garbage collection cycles, though this arguably subverts the purpose of a garbage collector. Go’s pprof tool is extremely useful for profiling memory usage. Use it before blindly making memory optimizations.

False Sharing

When performance really matters, you have to start thinking at the hardware level. Formula One driver Jackie Stewart is famous for once saying, “You don’t have to be an engineer to be be a racing driver, but you do have to have mechanical sympathy.” Having a deep understanding of the inner workings of a car makes you a better driver. Likewise, having an understanding of how a computer actually works makes you a better programmer. For example, how is memory laid out? How do CPU caches work? How do hard disks work?

Memory bandwidth continues to be a limited resource in modern CPU architectures, so caching becomes extremely important to prevent performance bottlenecks. Modern multiprocessor CPUs cache data in small lines, typically 64 bytes in size, to avoid expensive trips to main memory. A write to a piece of memory will cause the CPU cache to evict that line to maintain cache coherency. A subsequent read on that address requires a refresh of the cache line. This is a phenomenon known as false sharing, and it’s especially problematic when multiple processors are accessing independent data in the same cache line.

Imagine a struct in Go and how it’s laid out in memory. Let’s use the ring buffer from earlier as an example. Here’s what that struct might normally look like:

The queue and dequeue fields are used to determine producer and consumer positions, respectively. These fields, which are both eight bytes in size, are concurrently accessed and modified by multiple threads to add and remove items from the queue. Since these two fields are positioned contiguously in memory and occupy only 16 bytes of memory, it’s likely they will stored in a single CPU cache line. Therefore, writing to one will result in evicting the other, meaning a subsequent read will stall. In more concrete terms, adding or removing things from the ring buffer will cause subsequent operations to be slower and will result in lots of thrashing of the CPU cache.

We can modify the struct by adding padding between fields. Each padding is the width of a single CPU cache line to guarantee the fields end up in different lines. What we end up with is the following:

How big a difference does padding out CPU cache lines actually make? As with anything, it depends. It depends on the amount of multiprocessing. It depends on the amount of contention. It depends on memory layout. There are many factors to consider, but we should always use data to back our decisions. We can benchmark operations on the ring buffer with and without padding to see what the difference is in practice.

First, we benchmark a single producer and single consumer, each running in a goroutine. With this test, the improvement between padded and unpadded is fairly small, about 15%.

BenchmarkRingBufferSPSC-8 10000000 156 ns/op
BenchmarkRingBufferPaddedSPSC-8 10000000 132 ns/op

However, when we have multiple producers and multiple consumers, say 100 each, the difference becomes slightly more pronounced. In this case, the padded version is about 36% faster.

BenchmarkRingBufferMPMC-8 100000 27763 ns/op
BenchmarkRingBufferPaddedMPMC-8 100000 17860 ns/op

False sharing is a very real problem. Depending on the amount of concurrency and memory contention, it can be worth introducing padding to help alleviate its effects. These numbers might seem negligible, but they start to add up, particularly in situations where every clock cycle counts.

Lock-Freedom

Lock-free data structures are important for fully utilizing multiple cores. Considering Go is targeted at highly concurrent use cases, it doesn’t offer much in the way of lock-freedom. The encouragement seems to be largely directed towards channels and, to a lesser extent, mutexes.

That said, the standard library does offer the usual low-level memory primitives with the atomic package. Compare-and-swap, atomic pointer access—it’s all there. However, use of the atomic package is heavily discouraged:

We generally don’t want sync/atomic to be used at all…Experience has shown us again and again that very very few people are capable of writing correct code that uses atomic operations…If we had thought of internal packages when we added the sync/atomic package, perhaps we would have used that. Now we can’t remove the package because of the Go 1 guarantee.

How hard can lock-free really be though? Just rub some CAS on it and call it a day, right? After a sufficient amount of hubris, I’ve come to learn that it’s definitely a double-edged sword. Lock-free code can get complicated in a hurry. The atomic and unsafe packages are not easy to use, at least not at first. The latter gets its name for a reason. Tread lightly—this is dangerous territory. Even more so, writing lock-free algorithms can be tricky and error-prone. Simple lock-free data structures, like the ring buffer, are pretty manageable, but anything more than that starts to get hairy.

The Ctrie, which I wrote about in detail, was my foray into the world of lock-free data structures beyond your standard fare of queues and lists. Though the theory is reasonably understandable, the implementation is thoroughly complex. In fact, the complexity largely stems from the lack of a native double compare-and-swap, which is needed to atomically compare indirection nodes (to detect mutations on the tree) and node generations (to detect snapshots taken of the tree). Since no hardware provides such an operation, it has to be simulated using standard primitives (and it can).

The first Ctrie implementation was actually horribly broken, and not even because I was using Go’s synchronization primitives incorrectly. Instead, I had made an incorrect assumption about the language. Each node in a Ctrie has a generation associated with it. When a snapshot is taken of the tree, its root node is copied to a new generation. As nodes in the tree are accessed, they are lazily copied to the new generation (à la persistent data structures), allowing for constant-time snapshotting. To avoid integer overflow, we use objects allocated on the heap to demarcate generations. In Go, this is done using an empty struct. In Java, two newly constructed Objects are not equivalent when compared since their memory addresses will be different. I made a blind assumption that the same was true in Go, when in fact, it’s not. Literally the last paragraph of the Go language specification reads:

A struct or array type has size zero if it contains no fields (or elements, respectively) that have a size greater than zero. Two distinct zero-size variables may have the same address in memory.

Oops. The result was that two different generations were considered equivalent, so the double compare-and-swap always succeeded. This allowed snapshots to potentially put the tree in an inconsistent state. That was a fun bug to track down. Debugging highly concurrent, lock-free code is hell. If you don’t get it right the first time, you’ll end up sinking a ton of time into fixing it, but only after some really subtle bugs crop up. And it’s unlikely you get it right the first time. You win this time, Ian Lance Taylor.

But wait! Obviously there’s some payoff with complicated lock-free algorithms or why else would one subject themselves to this? With the Ctrie, lookup performance is comparable to a synchronized map or a concurrent skip list. Inserts are more expensive due to the increased indirection. The real benefit of the Ctrie is its scalability in terms of memory consumption, which, unlike most hash tables, is always a function of the number of keys currently in the tree. The other advantage is its ability to perform constant-time, linearizable snapshots. We can compare performing a “snapshot” on a synchronized map concurrently in 100 different goroutines with the same test using a Ctrie:

BenchmarkConcurrentSnapshotMap-8 1000 9941784 ns/op
BenchmarkConcurrentSnapshotCtrie-8 20000 90412 ns/op

Depending on access patterns, lock-free data structures can offer better performance in multithreaded systems. For example, the NATS message bus uses a synchronized map-based structure to perform subscription matching. When compared with a Ctrie-inspired, lock-free structure, throughput scales a lot better. The blue line is the lock-based data structure, while the red line is the lock-free implementation.

matchbox_bench_1_1

Avoiding locks can be a boon depending on the situation. The advantage was apparent when comparing the ring buffer to the channel. Nonetheless, it’s important to weigh any benefit against the added complexity of the code. In fact, sometimes lock-freedom doesn’t provide any tangible benefit at all!

A Note on Optimization

As we’ve seen throughout this post, performance optimization almost always comes with a cost. Identifying and understanding optimizations themselves is just the first step. What’s more important is understanding when and where to apply them. The famous quote by C. A. R. Hoare, popularized by Donald Knuth, has become a longtime adage of programmers:

The real problem is that programmers have spent far too much time worrying about efficiency in the wrong places and at the wrong times; premature optimization is the root of all evil (or at least most of it) in programming.

Though the point of this quote is not to eliminate optimization altogether, it’s to learn how to strike a balance between speeds—speed of an algorithm, speed of delivery, speed of maintenance, speed of a system. It’s a highly subjective topic, and there is no single rule of thumb. Is premature optimization the root of all evil? Should I just make it work, then make it fast? Does it need to be fast at all? These are not binary decisions. For example, sometimes making it work then making it fast is impossible if there is a fundamental problem in the design.

However, I will say focus on optimizing along the critical path and outward from that only as necessary. The further you get from that critical path, the more likely your return on investment is to diminish and the more time you end up wasting. It’s important to identify what adequate performance is. Do not spend time going beyond that point. This is an area where data-driven decisions are key—be empirical, not impulsive. More important, be practical. There’s no use shaving nanoseconds off of an operation if it just doesn’t matter. There is more to going fast than fast code.

Wrapping Up

If you’ve made it this far, congratulations, there might be something wrong with you. We’ve learned that there are really two kinds of fast in software—delivery and performance.  Customers want the first, developers want the second, and CTOs want both. The first is by far the most important, at least when you’re trying to go to market. The second is something you need to plan for and iterate on. Both are usually at odds with each other.

Perhaps more interestingly, we looked at a few ways we can eke out that extra bit of performance in Go and make it viable for low-latency systems. The language is designed to be simple, but that simplicity can sometimes come at a price. Like the trade-off between the two fasts, there is a similar trade-off between code lifecycle and code performance. Speed comes at the cost of simplicity, at the cost of development time, and at the cost of continued maintenance. Choose wisely.

Breaking and Entering: Lose the Lock While Embracing Concurrency

This article originally appeared on Workiva’s engineering blog as a two-part series.

Providing robust message routing was a priority for us at Workiva when building our distributed messaging infrastructure. This encompassed directed messaging, which allows us to route messages to specific endpoints based on service or client identifiers, but also topic fan-out with support for wildcards and pattern matching.

Existing message-oriented middleware, such as RabbitMQ, provide varying levels of support for these but don’t offer the rich features needed to power Wdesk. This includes transport fallback with graceful degradation, tunable qualities of service, support for client-side messaging, and pluggable authentication middleware. As such, we set out to build a new system, not by reinventing the wheel, but by repurposing it.

Eventually, we settled on Apache Kafka as our wheel, or perhaps more accurately, our log. Kafka demonstrates a telling story of speed, scalability, and fault tolerance—each a requisite component of any reliable messaging system—but it’s only half the story. Pub/sub is a critical messaging pattern for us and underpins a wide range of use cases, but Kafka’s topic model isn’t designed for this purpose. One of the key engineering challenges we faced was building a practical routing mechanism by which messages are matched to interested subscribers. On the surface, this problem appears fairly trivial and is far from novel, but it becomes quite interesting as we dig deeper.

Back to Basics

Topic routing works by matching a published message with interested subscribers. A consumer might subscribe to the topic “foo.bar.baz,” in which any message published to this topic would be delivered to them. We also must support * and # wildcards, which match exactly one word and zero or more words, respectively. In this sense, we follow the AMQP spec:

The routing key used for a topic exchange MUST consist of zero or more words delimited by dots. Each word may contain the letters A–Z and a–z and digits 0–9. The routing pattern follows the same rules as the routing key with the addition that * matches a single word, and # matches zero or more words. Thus the routing pattern *.stock.# matches the routing keys usd.stock and eur.stock.db but not stock.nasdaq.

This problem can be modeled using a trie structure. RabbitMQ went with this approach after exploring other options, like caching topics and indexing the patterns or using a deterministic finite automaton. The latter options have greater time and space complexities. The former requires backtracking the tree for wildcard lookups.

The subscription trie looks something like this:

subscription_trie

Even in spite of the backtracking required for wildcards, the trie ends up being a more performant solution due to its logarithmic complexity and tendency to fit CPU cache lines. Most tries have hot paths, particularly closer to the root, so caching becomes indispensable. The trie approach is also vastly easier to implement.

In almost all cases, this subscription trie needs to be thread-safe as clients are concurrently subscribing, unsubscribing, and publishing. We could serialize access to it with a reader-writer lock. For some, this would be the end of the story, but for high-throughput systems, locking is a major bottleneck. We can do better.

Breaking the Lock

We considered lock-free techniques that could be applied. Lock-free concurrency means that while a particular thread of execution may be blocked, all CPUs are able to continue processing other work. For example, imagine a program that protects access to some resource using a mutex. If a thread acquires this mutex and is subsequently preempted, no other thread can proceed until this thread is rescheduled by the OS. If the scheduler is adversarial, it may never resume execution of the thread, and the program would be effectively deadlocked. A key point, however, is that the mere lack of a lock does not guarantee a program is lock-free. In this context, “lock” really refers to deadlock, livelock, or the misdeeds of a malevolent scheduler.

In practice, what lock-free concurrency buys us is increased system throughput at the expense of increased tail latencies. Looking at a transactional system, lock-freedom allows us to process many concurrent transactions, any of which may block, while guaranteeing systemwide progress. Depending on the access patterns, when a transaction does block, there are always other transactions which can be processed—a CPU never idles. For high-throughput databases, this is essential.

Concurrent Timelines and Linearizability

Lock-freedom can be achieved using a number of techniques, but it ultimately reduces to a small handful of fundamental patterns. In order to fully comprehend these patterns, it’s important to grasp the concept of linearizability.

It takes approximately 100 nanoseconds for data to move from the CPU to main memory. This means that the laws of physics govern the unavoidable discrepancy between when you perceive an operation to have occurred and when it actually occurred. There is the time from when an operation is invoked to when some state change physically occurs (call it tinv), and there is the time from when that state change occurs to when we actually observe the operation as completed (call it tcom). Operations are not instantaneous, which means the wall-clock history of operations is uncertain. tinv and tcom vary for every operation. This is more easily visualized using a timeline diagram like the one below:

timeline

This timeline shows several reads and writes happening concurrently on some state. Physical time moves from left to right. This illustrates that even if a write is invoked before another concurrent write in real time, the later write could be applied first. If there are multiple threads performing operations on shared state, the notion of physical time is meaningless.

We use a linearizable consistency model to allow some semblance of a timeline by providing a total order of all state updates. Linearizability requires that each operation appears to occur atomically at some point between its invocation and completion. This point is called the linearization point. When an operation completes, it’s guaranteed to be observable by all threads because, by definition, the operation occurred before its completion time. After this point, reads will only see this value or a later one—never anything before. This gives us a proper sequencing of operations which can be reasoned about. Linearizability is a correctness condition for concurrent objects.

Of course, linearizability comes at a cost. This is why most memory models aren’t linearizable by default. Going back to our subscription trie, we could make operations on it appear atomic by applying a global lock. This kills throughput, but it ensures linearization.

lock trie

In reality, the trie operations do not occur at a specific instant in time as the illustration above depicts. However, mutual exclusion gives it the appearance and, as a result, linearizability holds at the expense of systemwide progress. Acquiring and releasing the lock appear instantaneous in the timeline because they are backed by atomic hardware operations like test-and-set. Linearizability is a composable property, meaning if an object is composed of linearizable objects, it is also linearizable. This allows us to construct abstractions from linearizable hardware instructions to data structures, all the way up to linearizable distributed systems.

Read-Modify-Write and CAS

Locks are expensive, not just due to contention but because they completely preclude parallelism. As we saw, if a thread which acquires a lock is preempted, any other threads waiting for the lock will continue to block.

Read-modify-write operations like compare-and-swap offer a lock-free approach to ensuring linearizable consistency. Such techniques loosen the bottleneck by guaranteeing systemwide throughput even if one or more threads are blocked. The typical pattern is to perform some speculative work then attempt to publish the changes with a CAS. If the CAS fails, then another thread performed a concurrent operation, and the transaction needs to be retried. If it succeeds, the operation was committed and is now visible, preserving linearizability. The CAS loop is a pattern used in many lock-free data structures and proves to be a useful primitive for our subscription trie.

CAS is susceptible to the ABA problem. These operations work by comparing values at a memory address. If the value is the same, it’s assumed that nothing has changed. However, this can be problematic if another thread modifies the shared memory and changes it back before the first thread resumes execution. The ABA problem is represented by the following sequence of events:

  1. Thread T1 reads shared-memory value A
  2. T1 is preempted, and T2 is scheduled
  3. T2 changes A to B then back to A
  4. T2 is preempted, and T1 is scheduled
  5. T1 sees the shared-memory value is A and continues

In this situation, T1 assumes nothing has changed when, in fact, an invariant may have been violated. We’ll see how this problem is addressed later.

At this point, we’ve explored the subscription-matching problem space, demonstrated why it’s an area of high contention, and examined why locks pose a serious problem to throughput. Linearizability provides an important foundation of understanding for lock-freedom, and we’ve looked at the most fundamental pattern for building lock-free data structures, compare-and-swap. Next, we will take a deep dive on applying lock-free techniques in practice by building on this knowledge. We’ll continue our narrative of how we applied these same techniques to our subscription engine and provide some further motivation for them.

Lock-Free Applied

Let’s revisit our subscription trie from earlier. Our naive approach to making it linearizable was to protect it with a lock. This proved easy, but as we observed, severely limited throughput. For a message broker, access to this trie is a critical path, and we usually have multiple threads performing inserts, removals, and lookups on it concurrently. Intuition tells us we can implement these operations without coarse-grained locking by relying on a CAS to perform mutations on the trie.

If we recall, read-modify-write is typically applied by copying a shared variable to a local variable, performing some speculative work on it, and attempting to publish the changes with a CAS. When inserting into the trie, our speculative work is creating an updated copy of a node. We commit the new node by updating the parent’s reference with a CAS. For example, if we want to add a subscriber to a node, we would copy the node, add the new subscriber, and CAS the pointer to it in the parent.

This approach is broken, however. To see why, imagine if a thread inserts a subscription on a node while another thread concurrently inserts a subscription as a child of that node. The second insert could be lost due to the sequencing of the reference updates. The diagram below illustrates this problem. Dotted lines represent a reference updated with a CAS.

trie cas add

The orphaned nodes containing “x” and “z” mean the subscription to “foo.bar” was lost. The trie is in an inconsistent state.

We looked to existing research in the field of non-blocking data structures to help illuminate a path. “Concurrent Tries with Efficient Non-Blocking Snapshots” by Prokopec et al. introduces the Ctrie, a non-blocking, concurrent hash trie based on shared-memory, single-word CAS instructions.

A hash array mapped trie (HAMT) is an implementation of an associative array which, unlike a hashmap, is dynamically allocated. Memory consumption is always proportional to the number of keys in the trie. A HAMT works by hashing keys and using the resulting bits in the hash code to determine which branches to follow down the trie. Each node contains a table with a fixed number of branch slots. Typically, the number of branch slots is 32. On a 64-bit machine, this would mean it takes 256 bytes (32 branches x 8-byte pointers) to store the branch table of a node.

The size of L1-L3 cache lines is 64 bytes on most modern processors. We can’t fit the branch table in a CPU cache line, let alone the entire node. Instead of allocating space for all branches, we use a bitmap to indicate the presence of a branch at a particular slot. This reduces the size of an empty node from roughly 264 bytes to 12 bytes. We can safely fit a node with up to six branches in a single cache line.

The Ctrie is a concurrent, lock-free version of the HAMT which ensures progress and linearizability. It solves the CAS problem described above by introducing indirection nodes, or I-nodes, which remain present in the trie even as nodes above and below change. This invariant ensures correctness on inserts by applying the CAS operation on the I-node instead of the internal node array.

An I-node may point to a Ctrie node, or C-node, which is an internal node containing a bitmap and array of references to branches. A branch is either an I-node or a singleton node (S-node) containing a key-value pair. The S-node is a leaf in the Ctrie. A newly initialized Ctrie starts with a root pointer to an I-node which points to an empty C-node. The diagram below illustrates a sequence of inserts on a Ctrie.

ctrie insert

An insert starts by atomically reading the I-node’s reference. Next, we copy the C-node and add the new key, recursively insert on an I-node, or extend the Ctrie with a new I-node. The new C-node is then published by performing a CAS on the parent I-node. A failed CAS indicates another thread has mutated the I-node. We re-linearize by atomically reading the I-node’s reference again, which gives us the current state of the Ctrie according to its linearizable history. We then retry the operation until the CAS succeeds. In this case, the linearization point is a successful CAS. The following figure shows why the presence of I-nodes ensures consistency.

ctrie insert correctness

In the above diagram, (k4,v4) is inserted into a Ctrie containing (k1,v1), (k2,v2), and (k3,v3). The new key-value pair is added to node C1 by creating a copy, C1, with the new entry. A CAS is then performed on the pointer at I1, indicated by the dotted line. Since C1 continues pointing to I2, any concurrent updates which occur below it will remain present in the trie. C1 is then garbage collected once no more threads are accessing it. Because of this, Ctries are much easier to implement in a garbage-collected language. It turns out that this deferred reclamation also solves the ABA problem described earlier by ensuring memory addresses are recycled only when it’s safe to do so.

The I-node invariant is enough to guarantee correctness for inserts and lookups, but removals require some additional invariants in order to avoid update loss. Insertions extend the Ctrie with additional levels, while removals eliminate the need for some of these levels. This is because we want to keep the Ctrie as compact as possible while still remaining correct. For example, a remove operation could result in a C-node with a single S-node below it. This state is valid, but the Ctrie could be made more compact and lookups on the lone S-node more efficient if it were moved up into the C-node above. This would allow the I-node and C-node to be removed.

The problem with this approach is it will cause insertions to be lost. If we move the S-node up and replace the dangling I-node reference with it, another thread could perform a concurrent insert on that I-node just before the compression occurs. The insert would be lost because the pointer to the I-node would be removed.

This issue is solved by introducing a new type of node called the tomb node (T-node) and an associated invariant. The T-node is used to ensure proper ordering during removals. The invariant is as follows: if an I-node points to a T-node at some time t0, then for all times greater than t0, the I-node points to the same T-node. More concisely, a T-node is the last value assigned to an I-node. This ensures that no insertions occur at an I-node if it is being compressed. We call such an I-node a tombed I-node.

If a removal results in a non-root-level C-node with a single S-node below it, the C-node is replaced with a T-node wrapping the S-node. This guarantees that every I-node except the root points to a C-node with at least one branch. This diagram depicts the result of removing (k2,v2) from a Ctrie:

ctrie removal

Removing (k2,v2) results in a C-node with a single branch, so it’s subsequently replaced with a T-node. The T-node provides a sequencing mechanism by effectively acting as a marker. While it solves the problem of lost updates, it doesn’t give us a compacted trie. If two keys have long matching hash code prefixes, removing one of the keys would result in a long chain of C-nodes followed by a single T-node at the end.

An invariant was introduced which says once an I-node points to a T-node, it will always point to that T-node. This means we can’t change a tombed I-node’s pointer, so instead we replace the I-node with its resurrection. The resurrection of a tombed I-node is the S-node wrapped in its T-node. When a T-node is produced during a removal, we ensure that it’s still reachable, and if it is, resurrect its tombed I-node in the C-node above. If it’s not reachable, another thread has already performed the compression. To ensure lock-freedom, all operations which read a T-node must help compress it instead of waiting for the removing thread to complete. Compression on the Ctrie from the previous diagram is illustrated below.

ctrie compression

The resurrection of the tombed I-node ensures the Ctrie is optimally compressed for arbitrarily long chains while maintaining integrity.

With a 32-bit hash code space, collisions are rare but still nontrivial. To deal with this, we introduce one final node, the list node (L-node). An L-node is essentially a persistent linked list. If there is a collision between the hash codes of two different keys, they are placed in an L-node. This is analogous to a hash table using separate chaining to resolve collisions.

One interesting property of the Ctrie is support for lock-free, linearizable, constant-time snapshots. Most concurrent data structures do not support snapshots, instead opting for locks or requiring a quiescent state. This allows Ctries to have O(1) iterator creation, clear, and size retrieval (amortized).

Constant-time snapshots are implemented by writing the Ctrie as a persistent data structure and assigning a generation count to each I-node. A persistent hash trie is updated by rewriting the path from the root of the trie down to the leaf the key belongs to while leaving the rest of the trie intact. The generation demarcates Ctrie snapshots. To create a new snapshot, we copy the root I-node and assign it a new generation. When an operation detects that an I-node’s generation is older than the root’s generation, it copies the I-node to the new generation and updates the parent. The path from the root to some node is only updated the first time it’s accessed, making the snapshot a O(1) operation.

The final piece needed for snapshots is a special type of CAS operation. There is a race condition between the thread creating a snapshot and the threads which have already read the root I-node with the previous generation. The linearization point for an insert is a successful CAS on an I-node, but we need to ensure that both the I-node has not been modified and its generation matches that of the root. This could be accomplished with a double compare-and-swap, but most architectures do not support such an operation.

The alternative is to use a RDCSS double-compare-single-swap originally described by Harris et al. We implement an operation with similar semantics to RDCSS called GCAS, or generation compare-and-swap. The GCAS allows us to atomically compare both the I-node pointer and its generation to the expected values before committing an update.

After researching the Ctrie, we wrote a Go implementation in order to gain a deeper understanding of the applied techniques. These same ideas would hopefully be adaptable to our problem domain.

Generalizing the Ctrie

The subscription trie shares some similarities to the hash array mapped trie but there are some key differences. First, values are not strictly stored at the leaves but can be on internal nodes as well. Second, the decomposed topic is used to determine how the trie is descended rather than a hash code. Wildcards complicate lookups further by requiring backtracking. Lastly, the number of branches on a node is not a fixed size. Applying the Ctrie techniques to the subscription trie, we end up with something like this:

matchbox

Much of the same logic applies. The main distinctions are the branch traversal based on topic words and rules around wildcards. Each branch is associated with a word and set of subscribers and may or may not point to an I-node. The I-nodes still ensure correctness on inserts. The behavior of T-nodes changes slightly. With the Ctrie, a T-node is created from a C-node with a single branch and then compressed. With the subscription trie, we don’t introduce a T-node until all branches are removed. A branch is pruned if it has no subscribers and points to nowhere or it has no subscribers and points to a tombed I-node. The GCAS and snapshotting remain unchanged.

We implemented this Ctrie derivative in order to build our concurrent pattern-matching engine, matchbox. This library provides an exceptionally simple API which allows a client to subscribe to a topic, unsubscribe from a topic, and lookup a topic’s subscribers. Snapshotting is also leveraged to retrieve the global subscription tree and the topics to which clients are currently subscribed. These are useful to see who currently has subscriptions and for what.

In Practice

Matchbox has been pretty extensively benchmarked, but to see how it behaves, it’s critical to observe its performance under contention. Many messaging systems opt for a mutex which tends to result in a lot of lock contention. It’s important to know what the access patterns look like in practice, but for our purposes, it’s heavily parallel. We don’t want to waste CPU cycles if we can help it.

To see how matchbox compares to lock-based subscription structures, I benchmarked it against gnatsd, a popular high-performance messaging system also written in Go. Gnatsd uses a tree-like structure protected by a mutex to manage subscriptions and offers similar wildcard semantics.

The benchmarks consist of one or more insertion goroutines and one or more lookup goroutines. Each insertion goroutine inserts 1000 subscriptions, and each lookup goroutine looks up 1000 subscriptions. We scale these goroutines up to see how the systems behave under contention.

The first benchmark is a 1:1 concurrent insert-to-lookup workload. A lookup corresponds to a message being published and matched to interested subscribers, while an insert occurs when a client subscribes to a topic. In practice, lookups are much more frequent than inserts, so the second benchmark is a 1:3 concurrent insert-to-lookup workload to help simulate this. The timings correspond to the complete insert and lookup workload. GOMAXPROCS was set to 8, which controls the number of operating system threads that can execute simultaneously. The benchmarks were run on a machine with a 2.6 GHz Intel Core i7 processor.

matchbox_bench_1_1

matchbox_bench_1_3

It’s quite clear that the lock-free approach scales a lot better under contention. This follows our intuition because lock-freedom allows system-wide progress even when a thread is blocked. If one goroutine is blocked on an insert or lookup operation, other operations may proceed. With a mutex, this isn’t possible.

Matchbox performs well, particularly in multithreaded environments, but there are still more optimizations to be made. This includes improvements both in memory consumption and runtime performance. Applying the Ctrie techniques to this type of trie results in a fairly non-compact structure. There may be ways to roll up branches—either eagerly or after removals—and expand them lazily as necessary. Other optimizations might include placing a cache or Bloom filter in front of the trie to avoid descending it. The main difficulty with these will be managing support for wildcards.

Conclusion

To summarize, we’ve seen why subscription matching is often a major concern for message-oriented middleware and why it’s frequently a bottleneck. Concurrency is crucial for high-performance systems, and we’ve looked at how we can achieve concurrency without relying on locks while framing it within the context of linearizability. Compare-and-swap is a fundamental tool used to implement lock-free data structures, but it’s important to be conscious of the pitfalls. We introduce invariants to protect data consistency. The Ctrie is a great example of how to do this and was foundational in our lock-free subscription-matching implementation. Finally, we validated our work by showing that lock-free data structures scale dramatically better with multithreaded workloads under contention.

My thanks to Steven Osborne and Dustin Hiatt for reviewing this article.

Everything You Know About Latency Is Wrong

Okay, maybe not everything you know about latency is wrong. But now that I have your attention, we can talk about why the tools and methodologies you use to measure and reason about latency are likely horribly flawed. In fact, they’re not just flawed, they’re probably lying to your face.

When I went to Strange Loop in September, I attended a workshop called “Understanding Latency and Application Responsiveness” by Gil Tene. Gil is the CTO of Azul Systems, which is most renowned for its C4 pauseless garbage collector and associated Zing Java runtime. While the workshop was four and a half hours long, Gil also gave a 40-minute talk called “How NOT to Measure Latency” which was basically an abbreviated, less interactive version of the workshop. If you ever get the opportunity to see Gil speak or attend his workshop, I recommend you do. At the very least, do yourself a favor and watch one of his recorded talks or find his slide decks online.

The remainder of this post is primarily a summarization of that talk. You may not get anything out of it that you wouldn’t get out of the talk, but I think it can be helpful to absorb some of these ideas in written form. Plus, for my own benefit, writing about them helps solidify it in my head.

What is Latency?

Latency is defined as the time it took one operation to happen. This means every operation has its own latency—with one million operations there are one million latencies. As a result, latency cannot be measured as work units / time. What we’re interested in is how latency behaves. To do this meaningfully, we must describe the complete distribution of latencies. Latency almost never follows a normal, Gaussian, or Poisson distribution, so looking at averages, medians, and even standard deviations is useless.

Latency tends to be heavily multi-modal, and part of this is attributed to “hiccups” in response time. Hiccups resemble periodic freezes and can be due to any number of reasons—GC pauses, hypervisor pauses, context switches, interrupts, database reindexing, cache buffer flushes to disk, etc. These hiccups never resemble normal distributions and the shift between modes is often rapid and eclectic.

Screen Shot 2015-10-04 at 4.32.24 PM

How do we meaningfully describe the distribution of latencies? We have to look at percentiles, but it’s even more nuanced than this. A trap that many people fall into is fixating on “the common case.” The problem with this is that there is a lot more to latency behavior than the common case. Not only that, but the “common” case is likely not as common as you think.

This is partly a tooling problem. Many of the tools we use do not do a good job of capturing and representing this data. For example, the majority of latency graphs produced by Grafana, such as the one below, are basically worthless. We like to look at pretty charts, and by plotting what’s convenient we get a nice colorful graph which is quite readable. Only looking at the 95th percentile is what you do when you want to hide all the bad stuff. As Gil describes, it’s a “marketing system.” Whether it’s the CTO, potential customers, or engineers—someone’s getting duped. Furthermore, averaging percentiles is mathematically absurd. To conserve space, we often keep the summaries and throw away the data, but the “average of the 95th percentile” is a meaningless statement. You cannot average percentiles, yet note the labels in most of your Grafana charts. Unfortunately, it only gets worse from here.

graph_logbase10_ms

Gil says, “The number one indicator you should never get rid of is the maximum value. That is not noise, that is the signal. The rest of it is noise.” To this point, someone in the workshop naturally responded with “But what if the max is just something like a VM restarting? That doesn’t describe the behavior of the system. It’s just an unfortunate, unlikely occurrence.” By ignoring the maximum, you’re effectively saying “this doesn’t happen.” If you can identify the cause as noise, you’re okay, but if you’re not capturing that data, you have no idea of what’s actually happening.

How Many Nines?

But how many “nines” do I really need to look at? The 99th percentile, by definition, is the latency below which 99% of the observations may be found. Is the 99th percentile rare? If we have a single search engine node, a single key-value store node, a single database node, or a single CDN node, what is the chance we actually hit the 99th percentile?

Gil describes some real-world data he collected which shows how many of the web pages we go to actually experience the 99th percentile, displayed in table below. The second column counts the number of HTTP requests generated by a single access of the web page. The third column shows the likelihood of one access experiencing the 99th percentile. With the exception of google.com, every page has a probability of 50% or higher of seeing the 99th percentile.

Screen Shot 2015-10-04 at 6.15.24 PM

The point Gil makes is that the 99th percentile is what most of your web pages will see. It’s not “rare.”

What metric is more representative of user experience? We know it’s not the average or the median. 95th percentile? 99.9th percentile? Gil walks through a simple, hypothetical example: a typical user session involves five page loads, averaging 40 resources per page. How many users will not experience something worse than the 95th percentile? 0.003%. By looking at the 95th percentile, you’re looking at a number which is relevant to 0.003% of your users. This means 99.997% of your users are going to see worse than this number, so why are you even looking at it?

On the flip side, 18% of your users are going to experience a response time worse than the 99.9th percentile, meaning 82% of users will experience the 99.9th percentile or better. Going further, more than 95% of users will experience the 99.97th percentile and more than 99% of users will experience the 99.995th percentile.

The median is the number that 99.9999999999% of response times will be worse than. This is why median latency is irrelevant. People often describe “typical” response time using a median, but the median just describes what everything will be worse than. It’s also the most commonly used metric.

If it’s so critical that we look at a lot of nines (and it is), why do most monitoring systems stop at the 95th or 99th percentile? The answer is simply because “it’s hard!” The data collected by most monitoring systems is usually summarized in small, five or ten second windows. This, combined with the fact that we can’t average percentiles or derive five nines from a bunch of small samples of percentiles means there’s no way to know what the 99.999th percentile for the minute or hour was. We end up throwing away a lot of good data and losing fidelity.

A Coordinated Conspiracy

Benchmarking is hard. Almost all latency benchmarks are broken because almost all benchmarking tools are broken. The number one cause of problems in benchmarks is something called “coordinated omission,” which Gil refers to as “a conspiracy we’re all a part of” because it’s everywhere. Almost all load generators have this problem.

We can look at a common load-testing example to see how this problem manifests. With this type of test, a client generally issues requests at a certain rate, measures the response time for each request, and puts them in buckets from which we can study percentiles later.

The problem is what if the thing being measured took longer than the time it would have taken before sending the next thing? What if you’re sending something every second, but this particular thing took 1.5 seconds? You wait before you send the next one, but by doing this, you avoided measuring something when the system was problematic. You’ve coordinated with it by backing off and not measuring when things were bad. To remain accurate, this method of measuring only works if all responses fit within an expected interval.

Coordinated omission also occurs in monitoring code. The way we typically measure something is by recording the time before, running the thing, then recording the time after and looking at the delta. We put the deltas in stats buckets and calculate percentiles from that. The code below is taken from a Cassandra benchmark.

Screen Shot 2015-10-04 at 7.29.09 PM

However, if the system experiences one of the “hiccups” described earlier, you will only have one bad operation and 10,000 other operations waiting in line. When those 10,000 other things go through, they will look really good when in reality the experience was really bad. Long operations only get measured once, and delays outside the timing window don’t get measured at all.

In both of these examples, we’re omitting data that looks bad on a very selective basis, but just how much of an impact can this have on benchmark results? It turns out the impact is huge.

Screen Shot 2015-10-04 at 7.27.43 PM

Imagine a “perfect” system which processes 100 requests/second at exactly 1 ms per request. Now consider what happens when we freeze the system (for example, using CTRL+Z) after 100 seconds of perfect operation for 100 seconds and repeat. We can intuitively characterize this system:

  • The average over the first 100 seconds is 1 ms.
  • The average over the next 100 seconds is 50 seconds.
  • The average over the 200 seconds is 25 seconds.
  • The 50th percentile is 1 ms.
  • The 75th percentile is 50 seconds.
  • The 99.99th percentile is 100 seconds.

Screen Shot 2015-10-04 at 7.49.10 PM

Now we try measuring the system using a load generator. Before freezing, we run 100 seconds at 100 requests/second for a total of 10,000 requests at 1 ms each. After the stall, we get one result of 100 seconds. This is the entirety of our data, and when we do the math, we get these results:

  • The average over the 200 seconds is 10.9 ms (should be 25 seconds).
  • The 50th percentile is 1 ms.
  • The 75th percentile is 1 ms (should be 50 seconds).
  • The 99.99th percentile is 1 ms (should be 100 seconds).

Screen Shot 2015-10-04 at 7.57.23 PM

Basically, your load generator and monitoring code tell you the system is ready for production, when in fact it’s lying to you! A simple “CTRL+Z” test can catch coordinated omission, but people rarely do it. It’s critical to calibrate your system this way. If you find it giving you these kind of results, throw away all the numbers—they’re worthless.

You have to measure at random or “fair” rates. If you measure 10,000 things in the first 100 seconds, you have to measure 10,000 things in the second 100 seconds during the stall. If you do this, you’ll get the correct numbers, but they won’t be as pretty. Coordinated omission is the simple act of erasing, ignoring, or missing all the “bad” stuff, but the data is good.

Surely this data can still be useful though, even if it doesn’t accurately represent the system? For example, we can still use it to identify performance regressions or validate improvements, right? Sadly, this couldn’t be further from the truth. To see why, imagine we improve our system. Instead of pausing for 100 seconds after 100 seconds of perfect operation, it handles all requests at 5 ms each after 100 seconds. Doing the math, we get the following:

  • The 50th percentile is 1 ms
  • The 75th percentile is 2.5 ms (stall showed 1 ms)
  • The 99.99th percentile is 5 ms (stall showed 1 ms)

This data tells us we hurt the four nines and made the system 5x worse! This would tell us to revert the change and go back to the way it was before, which is clearly the wrong decision. With bad data, better can look worse. This shows that you cannot have any intuition based on any of these numbers. The data is garbage.

With many load generators, the situation is actually much worse than this. These systems work by generating a constant load. If our test is generating 100 requests/second, we run 10,000 requests in the first 100 seconds. When we stall, we process just one request. After the stall, the load generator sees that it’s 9,999 requests behind and issues those requests to catch back up. Not only did it get rid of the bad requests, it replaced them with good requests. Now the data is twice as wrong as just dropping the bad requests.

What coordinated omission is really showing you is service time, not response time. If we imagine a cashier ringing up customers, the service time is the time it takes the cashier to do the work. The response time is the time a customer waits before they reach the register. If the rate of arrival is higher than the service rate, the response time will continue to grow. Because hiccups and other phenomena happen, response times often bounce around. However, coordinated omission lies to you about response time by actually telling you the service time and hiding the fact that things stalled or waited in line.

Measuring Latency

Latency doesn’t live in a vacuum. Measuring response time is important, but you need to look at it in the context of load. But how do we properly measure this? When you’re nearly idle, things are nearly perfect, so obviously that’s not very useful. When you’re pedal to the metal, things fall apart. This is somewhat useful because it tells us how “fast” we can go before we start getting angry phone calls.

However, studying the behavior of latency at saturation is like looking at the shape of your car’s bumper after wrapping it around a pole. The only thing that matters when you hit the pole is that you hit the pole. There’s no point in trying to engineer a better bumper, but we can engineer for the speed at which we lose control. Everything is going to suck at saturation, so it’s not super useful to look at beyond determining your operating range.

What’s more important is testing the speeds in between idle and hitting the pole. Define your SLAs and plot those requirements, then run different scenarios using different loads and different configurations. This tells us if we’re meeting our SLAs but also how many machines we need to provision to do so. If you don’t do this, you don’t know how many machines you need.

How do we capture this data? In an ideal world, we could store information for every request, but this usually isn’t practical. HdrHistogram is a tool which allows you to capture latency and retain high resolution. It also includes facilities for correcting coordinated omission and plotting latency distributions. The original version of HdrHistogram was written in Java, but there are versions for many other languages.

Screen Shot 2015-10-05 at 12.00.04 AM

To Summarize

To understand latency, you have to consider the entire distribution. Do this by plotting the latency distribution curve. Simply looking at the 95th or even 99th percentile is not sufficient. Tail latency matters. Worse yet, the median is not representative of the “common” case, the average even less so. There is no single metric which defines the behavior of latency. Be conscious of your monitoring and benchmarking tools and the data they report. You can’t average percentiles.

Remember that latency is not service time. If you plot your data with coordinated omission, there’s often a quick, high rise in the curve. Run a “CTRL+Z” test to see if you have this problem. A non-omitted test has a much smoother curve. Very few tools actually correct for coordinated omission.

Latency needs to be measured in the context of load, but constantly running your car into a pole in every test is not useful. This isn’t how you’re running in production, and if it is, you probably need to provision more machines. Use it to establish your limits and test the sustainable throughputs in between to determine if you’re meeting your SLAs. There are a lot of flawed tools out there, but HdrHistogram is one of the few that isn’t. It’s useful for benchmarking and, since histograms are additive and HdrHistogram uses log buckets, it can also be useful for capturing high-volume data in production.

Benchmark Responsibly

When I posted my Dissecting Message Queues article last summer, it understandably caused some controversy.  I received both praise and scathing comments, emails asking why I didn’t benchmark X and pull requests to bump the numbers of Y. To be honest, that analysis was more of a brain dump from my own test driving of various message queues than any sort of authoritative or scientific study—it was far from the latter, to say the least. The qualitative discussion was pretty innocuous, but the benchmarks and supporting code were the target of a lot of (valid) criticism. In retrospect, it was probably irresponsible to publish them, but I was young and naive back then; now I’m just mostly naive.

Comparing Apples to Other Assorted Fruit

One such criticism was that the benchmarks were divided into two very broad categories: brokerless and brokered. While the brokerless group compared two very similar libraries, ZeroMQ and nanomsg, the second group included a number of distinct message brokers like RabbitMQ, Kafka, NATS, and Redis, to name a few.

The problem is not all brokers are created equal. They often have different goals and different prescribed use cases. As such, they impose different guarantees, different trade-offs, and different constraints. By grouping these benchmarks together, I implied they were fundamentally equivalent, when in fact, most were fundamentally different. For example, NATS serves a very different purpose than Kafka, and Redis, which offers pub/sub messaging, typically isn’t thought of as a message broker at all.

Measure Right or Don’t Measure at All

Another criticism was the way in which the benchmarks were performed. The tests were immaterial. The producer, consumer, and the message queue itself all ran on the same machine. Even worse, they used just a single publisher and subscriber. Not only does it not test what a remotely realistic configuration looks like, but it doesn’t even give you a good idea of a trivial one.

To be meaningful, we need to test with more than one producer and consumer, ideally distributed across many machines. We want to see how the system scales to larger workloads. Certainly, the producers and consumers cannot be collocated when we’re measuring discrete throughputs on either end, nor should the broker. This helps to reduce confounding variables between the system under test and the load generation.

It’s Not Rocket Science, It’s Computer Science

The third major criticism lay with the measurements themselves. Measuring throughput is fairly straightforward: we look at the number of messages sent per unit of time at both the sender and the receiver. If we think of a pipe carrying water, we might look at a discrete cross section and the rate at which water passes through it.

Latency, as a concept, is equally simple. With the pipe, it’s the time it takes for a drop of water to travel from one end to the other. While throughput is dependent on the pipe’s diameter, latency is dependent upon its length. What this means is that we can’t derive one from the other. In order to properly measure latency, we need to consider the latency of each message sent through the system.

However, we can’t ignore the relationship between throughput and latency and what the compromise between them means. Generally, we want to make things as fast as possible. Consider a single-cycle CPU. Its latency per instruction will be extremely low but contrasted with a pipelined processor, its throughput is abysmal—one instruction per clock cycle. The implication is that if we trade per-operation latency for throughput, we actually get a decrease in latency for aggregate instructions. Unfortunately, the benchmarks eschewed this relationship by requiring separate latency and throughput tests which used different code paths.

The interaction between latency and throughput is easy to get confused, but it often has interesting ramifications, whether you’re looking at message queues, CPUs, or databases. In a general sense, we’d say “optimize for latency” because lower latency means higher throughput, but the reality is it’s almost always easier (and more cost-effective) to increase throughput than it is to decrease latency, especially on commodity hardware.

Capturing this data, in and of itself, isn’t terribly difficult, but what’s more susceptible to error is how it’s represented. This was the main fault of the benchmarks (in addition to the things described earlier). The most egregious thing they did was report latency as an average. This is like the cardinal sin of benchmarking. The number is practically useless, particularly without any context like a standard deviation.

We know that latency isn’t going to be uniform, but it’s probably not going to follow a normal distribution either. While network latency may be prone to fitting a nice bell curve, system latency almost certainly won’t. They often exhibit things like GC pauses and other “hiccups,” and averages tend to hide these.

latency

Measuring performance isn’t all that easy, but if you do it, at least do it in a way that disambiguates the results. Look at quantiles, not averages. If you do present a mean, include the standard deviation and max in addition to the 90th or 99th percentile. Plotting latency by percentile distribution is an excellent way to see what your performance behavior actually looks like. Gil Tene has a great talk on measuring latency which I highly recommend.

Working Towards a Better Solution

With all this in mind, we can work towards building a better way to test and measure messaging systems. The discussion above really just gives us three key takeaways:

  1. Don’t compare apples to oranges.
  2. Don’t instrument tests in a way that’s not at all representative of real life.
  3. Don’t present results in a statistically insignificant way.

My first attempt at taking these ideas to heart is a tool I call Flotilla. It’s meant to provide a way to test messaging systems in more realistic configurations, at scale, while offering more useful data. Flotilla allows you to easily spin up producers and consumers on arbitrarily many machines, start a message broker, and run a benchmark against it, all in an automated fashion. It then collects data like producer/consumer throughput and the complete latency distribution and reports back to the user.

Flotilla uses a Go port of HdrHistogram to capture latency data, of which I’m a raving fan. HdrHistogram uses a bucketed approach to record values across a configured high-dynamic range at a particular resolution. Recording is in the single-nanosecond range and the memory footprint is constant. It also has support for correcting coordinated omission, which is a common problem in benchmarking. Seriously, if you’re doing anything performance sensitive, give HdrHistogram a look.

Still, Flotilla is not perfect and there’s certainly work to do, but I think it’s a substantial improvement over the previous MQ benchmarking utility. Longer term, it would be great to integrate it with something like Comcast to test workloads under different network conditions. Testing in a vacuum is nice and all, but we know in the real word, the network isn’t perfectly reliable.

So, Where Are the Benchmarks?

Omitted—for now, anyway. My goal really isn’t to rank a hodgepodge of different message queues because there’s really not much value in doing that. There are different use cases for different systems. I might, at some point, look at individual systems in greater detail, but comparing things like message throughput and latency just devolves into a hotly contested pissing contest. My hope is to garner more feedback and improvements to Flotilla before using it to definitively measure anything.

Benchmark responsibly.