Pain-Driven Development: Why Greedy Algorithms Are Bad for Engineering Orgs

I recently wrote about the importance of understanding decision impact and why it’s important for building an empathetic engineering culture. I presented the distinction between pain displacement and pain deferral, and this was something I wanted to expand on a bit.

When you distill it down, I think what’s at the heart of a lot of engineering orgs is this idea of “pain-driven development.” When a company grows to a certain size, it develops limbs, and each of these limbs has its own pain receptors. This is when empathy becomes important because it becomes harder and less natural. These limbs of course are teams or, more generally speaking, silos. Teams have a natural tendency to operate in a way that minimizes the amount of pain they feel.

It’s time for some game theory: pain is a zero-sum game. By always following the path of least resistance, we end up displacing pain instead of feeling it. This is literally just instinct. In other words, by making locally optimal choices, we run the risk of losing out on a globally optimal solution. Sometimes this is an explicit business decision, but many times it’s not.

Tech debt is a common example of when pain displacement is a deliberate business decision. It’s pain deferral—there’s pain we need to feel, but we can choose to feel it later and in the meantime provide incremental value to the business. This is usually a team choosing to apply a bandaid and coming back to fix it later. “We have this large batch job that has a five-minute timeout, and we’re sporadically seeing this timeout getting hit. Why don’t we just bump up the timeout to 10 minutes?” This is a bandaid, and a particularly poor one at that because, by Parkinson’s law, as soon as you bump up the timeout to 10 minutes, you’ll start seeing 11-minute jobs, and we’ll be having the same discussion over again. I see the exact same types of discussions happening with resource provisioning: “we’re hitting memory limits—can we just provision our instances with more RAM?” “We’re pegging CPU. Obviously we just need more cores.” Throwing hardware at the problem is the path of least resistance for the developers. They have a deliverable in front of them, they have a lot of pressure to ship, this is how they do it. It’s a greedy algorithm. It minimizes pain.

Where things become really problematic is when the pain displacement involves multiple teams. This is why understanding decision impact is so key. Pain displacement doesn’t just involve engineering teams, it also involves customers and other stakeholders in the organization. This is something I see quite a bit: displacing pain away from customers onto various teams within the org by setting unrealistic expectations up front.

For example, we build a product MVP and run it on a single, high-memory instance, and we don’t actually write data out to disk to keep it fast. We then put this product in front of sales folks, marketing, or even customers and say “hey, look at this cool thing we built.” Then the customers say “wow, this is great! I don’t feel any pain at all using this!” That’s because the pain has been moved elsewhere.

This MVP isn’t fault tolerant because it’s running on a single machine. This MVP isn’t horizontally scalable because we keep all the state in memory on one instance. This MVP isn’t safe because the data isn’t durably stored to disk. The problem is we weren’t testing at scale, so we never felt any pain until it was too late. So we start working backward to address these issues after the fact. We need to run multiple instances so we can have failover. But wait, now we need stateful request routing to maintain our performance expectations. Does our infrastructure support that? We need a mechanism to split and merge units of work that plays nicely with our autoscaling system to give us a better scale story, avoid hot instances, and reduce excess capacity. But wait, how long will that take to build? We need to attach persistent disks so we can durably store data and keep things fast. But wait, does our cluster provisioning allow for that? Does that even meet our compliance requirements?

The only way you reach this point is by making local decisions without thinking about the trade-offs involved or the fact that what you’ve actually done is simply displaced the pain.

If someone doesn’t feel pain, they have a harder time developing a sense of empathy. For instance, the goal of any good operations team is to effectively put itself out of a job by empowering developers to self-service through tooling and automation. One example of this is infrastructure as code, so an ops team adds a process requiring developers to provision their own infrastructure using CloudFormation scripts. For the ops folks, this is a boon—now they no longer have to labor through countless UIs and AWS consoles to provision databases, queues, and the like for each environment. Developers, on the other hand, were never exposed to that pain, so to them, writing CloudFormation scripts is a new hoop to jump through—setting up infrastructure is ops’ job! They might feel pain now, but they don’t necessarily see the immediate payoff.

A coworker of mine recently posed an interesting question: why do product teams often overlook the need for tools required to support their product in production until after they’ve deployed to production? And while the answer he posits is good, and one I very much agree with—solving a problem and solving the problem of solving problems are two very different problems—my answer is this: pain-driven development. In this case, you’re deferring the pain by hooking up debuggers or SSHing into the box and poking about instead of relying on instrumentation which is what we’re limited to in the field. As long as you’re cognizant of this and know that at some point you will have to feel some pain, it can be okay. But if you’re just displacing pain thinking it’s actually disappearing, you’ll be in for a rude awakening. Remember, it’s a zero-sum game.

I’m looking at this through an infrastructure or operations lens, but this applies everywhere and it cuts both ways. Understanding the why behind something rather than just the how is critical to building empathy. It’s being able to look at a problem through someone else’s perspective and applying that to your own work. Changing your perspective is a powerful way to deepen your relationships. Pain-driven development is intoxicating because it allows us to move fast. It’s a greedy algorithm, but it provides a poor global approximation for large engineering organizations. Thinking holistically is important.

Decision Impact

I think a critical part of building an empathetic engineering culture is understanding decision impact. This is a blindspot that I see happening a lot: a deliberate effort to understand the effects caused by a decision. How does adopting X affect operations? Does our dev tooling support this? Is this architecture supported by our current infrastructure? What are the compliance or security implications of this? Will this scale in production? A particular decision might save you time, but does it create work or slow others down? Are we just displacing pain somewhere else?

What’s needed is a broad understanding of the net effects. Pain displacement is an indication that we’re not thinking beyond the path of least resistance. The problem is if we lack a certain empathy, we aren’t aware the pain displacement is occurring in the first place. It’s important we widen our vision beyond the deliverable in front of us. We have to think holistically—like a systems person—and think deeply about the interactions between decisions. Part of this is having an organizational awareness.

Tech debt is the one exception to this because it’s pain displacement we feel ourselves—it’s pain deferral. This is usually a decision we can make ourselves, but when we’re dealing with pain displacement involving multiple teams, that’s when problems start happening. And that’s where empathy becomes critical because software engineering is more about collaboration than code and shit has this natural tendency to roll downhill.

The first sentence of The Five Dysfunctions of a Team captures this idea really well: “Not finance. Not strategy. Not technology. It is teamwork that remains the ultimate competitive advantage, both because it is so powerful and so rare.” The powerful part is obvious, but the bit about rarity is interesting when we think about teams holistically. The cause, I think, is deeply rooted in the silos or fiefdoms that naturally form around teams. The difficulty comes as an organization scales. What I see happening frequently are goals that diverge or conflict. The fix is rallying teams around a shared cause—a single, compelling vision. Likewise, it’s thinking holistically and having empathy. Understanding decision impact and pain displacement is one step to developing that empathy. This is what unlocks the rarity part of teamwork.

Fast Topic Matching

A common problem in messaging middleware is that of efficiently matching message topics with interested subscribers. For example, assume we have a set of subscribers, numbered 1 to 3:

Subscriber Match Request
1 forex.usd
2 forex.*
3 stock.nasdaq.msft

And we have a stream of messages, numbered 1 to N:

Message Topic
1 forex.gbp
2 stock.nyse.ibm
3 stock.nyse.ge
4 forex.eur
5 forex.usd
N stock.nasdaq.msft

We are then tasked with routing messages whose topics match the respective subscriber requests, where a “*” wildcard matches any word. This is frequently a bottleneck for message-oriented middleware like ZeroMQ, RabbitMQ, ActiveMQ, TIBCO EMS, et al. Because of this, there are a number of well-known solutions to the problem. In this post, I’ll describe some of these solutions, as well as a novel one, and attempt to quantify them through benchmarking. As usual, the code is available on GitHub.

The Naive Solution

The naive solution is pretty simple: use a hashmap that maps topics to subscribers. Subscribing involves adding a new entry to the map (or appending to a list if it already exists). Matching a message to subscribers involves scanning through every entry in the map, checking if the match request matches the message topic, and returning the subscribers for those that do.

Inserts are approximately O(1) and lookups approximately O(n*m) where n is the number of subscriptions and m is the number of words in a topic. This means the performance of this solution is heavily dependent upon how many subscriptions exist in the map and also the access patterns (rate of reads vs. writes). Since most use cases are heavily biased towards searches rather than updates, the naive solution—unsurprisingly—is not a great option.

The microbenchmark below compares the performance of subscribe, unsubscribe, and lookup (matching) operations, first using an empty hashmap (what we call cold) and then with one containing 1,000 randomly generated 5-word topic subscriptions (what we call hot). With the populated subscription map, lookups are about three orders of magnitude slower, which is why we have to use a log scale in the chart below.

subscribe unsubscribe lookup
cold 172ns 51.2ns 787ns
hot 221ns 55ns 815,787ns


Inverted Bitmap

The inverted bitmap technique builds on the observation that lookups are more frequent than updates and assumes that the search space is finite. Consequently, it shifts some of the cost from the read path to the write path. It works by storing a set of bitmaps, one per topic, or criteria, in the search space. Subscriptions are then assigned an increasing number starting at 0. We analyze each subscription to determine the matching criteria and set the corresponding bits in the criteria bitmaps to 1. For example, assume our search space consists of the following set of topics:

  • forex.usd
  • forex.gbp
  • forex.jpy
  • forex.eur
  • stock.nasdaq
  • stock.nyse

We then have the following subscriptions:

  • 0 = forex.* (matches forex.usd, forex.gbp, forex.jpy, and forex.eur)
  • 1 = stock.nyse (matches stock.nyse)
  • 2 = *.* (matches everything)
  • 3 = stock.* (matches stock.nasdaq and stock.nyse)

When we index the subscriptions above, we get the following set of bitmaps:

 Criteria 0 1 2 3
forex.usd 1 0 1 0
forex.gbp 1 0 1 0
forex.jpy 1 0 1 0
forex.eur 1 0 1 0
stock.nasdaq 0 0 1 1
stock.nyse 0 1 1 1

When we match a message, we simply need to lookup the corresponding bitmap and check the set bits. As we see below, subscribe and unsubscribe are quite expensive with respect to the naive solution, but lookups now fall well below half a microsecond, which is pretty good (the fact that the chart below doesn’t use a log scale like the one above should be an indictment of the naive hashmap-based solution).

subscribe unsubscribe lookup
cold 3,795ns 198ns 380ns
hot 3,863ns 198ns 395ns

The inverted bitmap is a better option than the hashmap when we have a read-heavy workload. One limitation is it requires us to know the search space ahead of time or otherwise requires reindexing which, frankly, is prohibitively expensive.

Optimized Inverted Bitmap

The inverted bitmap technique works well enough, but only if the topic space is fairly static. It also falls over pretty quickly when the topic space and number of subscriptions are large, say, millions of topics and thousands of subscribers. The main benefit of topic-based routing is it allows for faster matching algorithms in contrast to content-based routing, which can be exponentially slower. The truth is, to be useful, your topics probably consist of stock.nyse.ibm, stock.nyse.ge, stock.nasdaq.msft, stock.nasdaq.aapl, etc., not stock.nyse and stock.nasdaq. We could end up with an explosion of topics and, even with efficient bitmaps, the memory consumption tends to be too high despite the fact that most of the bitmaps are quite sparse.

Fortunately, we can reduce the amount of memory we consume using a fairly straightforward optimization. Rather than requiring the entire search space a priori, we simply require the max topic size, in terms of words, e.g. stock.nyse.ibm has a size of 3. We can handle topics of the max size or less, e.g. stock.nyse.bac, stock.nasdaq.txn, forex.usd, index, etc. If we see a message with more words than the max, we can safely assume there are no matching subscriptions.

The optimized inverted bitmap works by splitting topics into their constituent parts. Each constituent position has a set of bitmaps, and we use a technique similar to the one described above on each part. We end up with a bitmap for each constituent which we perform a logical AND on to give a resulting bitmap. Each 1 in the resulting bitmap corresponds to a subscription. This means if the max topic size is n, we only AND at most n bitmaps. Furthermore, if we come across any empty bitmaps, we can stop early since we know there are no matching subscribers.

Let’s say our max topic size is 2 and we have the following subscriptions:

  • 0 = forex.*
  • 1 = stock.nyse
  • 2 = index
  • 3 = stock.*

The inverted bitmap for the first constituent looks like the following:

forex.* stock.nyse index stock.*
null 0 0 0 0
forex 1 0 0 0
stock 0 1 0 1
index 0 0 1 0
other 0 0 0 0

And the second constituent bitmap:

forex.* stock.nyse index stock.*
null 0 0 1 0
nyse 0 1 0 0
other 1 0 0 1

The “null” and “other” rows are worth pointing out. “Null” simply means the topic has no corresponding constituent.  For example, “index” has no second constituent, so “null” is marked. “Other” allows us to limit the number of rows needed such that we only need the ones that appear in subscriptions.  For example, if messages are published on forex.eur, forex.usd, and forex.gbp but I merely subscribe to forex.*, there’s no need to index eur, usd, or gbp. Instead, we just mark the “other” row which will match all of them.

Let’s look at an example using the above bitmaps. Imagine we want to route a message published on forex.eur. We split the topic into its constituents: “forex” and “eur.” We get the row corresponding to “forex” from the first constituent bitmap, the one corresponding to “eur” from the second (other), and then AND the rows.

forex.* stock.nyse index stock.*
1 = forex 1 0 0 0
2 = other 1 0 0 1
AND 1 0 0 0

The forex.* subscription matches.

Let’s try one more example: a message published on stock.nyse.

forex.* stock.nyse index stock.*
1 = stock 0 1 0 1
2 = nyse 0 1 0 1
AND 0 1 0 1

In this case, we also need to OR the “other” row for the second constituent. This gives us a match for stock.nyse and stock.*.

Subscribe operations are significantly faster with the space-optimized inverted bitmap compared to the normal inverted bitmap, but lookups are much slower. However, the optimized version consumes roughly 4.5x less memory for every subscription. The increased flexibility and improved scalability makes the optimized version a better choice for all but the very latency-sensitive use cases.

subscribe unsubscribe lookup
cold 1,053ns 330ns 2,724ns
hot 1,076ns 371ns 3,337ns

Trie

The optimized inverted bitmap improves space complexity, but it does so at the cost of lookup efficiency. Is there a way we can reconcile both time and space complexity? While inverted bitmaps allow for efficient lookups, they are quite wasteful for sparse sets, even when using highly compressed bitmaps like Roaring bitmaps.

Tries can often be more space efficient in these circumstances. When we add a subscription, we descend the trie, adding nodes along the way as necessary, until we run out of words in the topic. Finally, we add some metadata containing the subscription information to the last node in the chain. To match a message topic, we perform a similar traversal. If a node doesn’t exist in the chain, we know there are no subscribers. One downside of this method is, in order to support wildcards, we must backtrack on a literal match and check the “*” branch as well.

For the given set of subscriptions, the trie would look something like the following:

  • forex.*
  • stock.nyse
  • index
  • stock.*

You might be tempted to ask: “why do we even need the “*” nodes? When someone subscribes to stock.*, just follow all branches after “stock” and add the subscriber.” This would indeed move the backtracking cost from the read path to the write path, but—like the first inverted bitmap we looked at—it only works if the search space is known ahead of time. It would also largely negate the memory-usage benefits we’re looking for since it would require pre-indexing all topics while requiring a finite search space.

It turns out, this trie technique is how systems like ZeroMQ and RabbitMQ implement their topic matching due to its balance between space and time complexity and overall performance predictability.

subscribe unsubscribe lookup
cold 406ns 221ns 2,145ns
hot 443ns 257ns 2,278ns

We can see that, compared to the optimized inverted bitmap, the trie performs much more predictably with relation to the number of subscriptions held.

Concurrent Subscription Trie

One thing we haven’t paid much attention to so far is concurrency. Indeed, message-oriented middleware is typically highly concurrent since they have to deal with heavy IO (reading messages from the wire, writing messages to the wire, reading messages from disk, writing messages to disk, etc.) and CPU operations (like topic matching and routing). Subscribe, unsubscribe, and lookups are usually all happening in different threads of execution. This is especially important when we want to talk advantage of multi-core processors.

It wasn’t shown, but all of the preceding algorithms used global locks to ensure thread safety between read and write operations, making the data structures safe for concurrent use. However, the microbenchmarks don’t really show the impact of this, which we will see momentarily.

Lock-freedom, which I’ve written about, allows us to increase throughput at the expense of increased tail latency.

Lock-free concurrency means that while a particular thread of execution may be blocked, all CPUs are able to continue processing other work. For example, imagine a program that protects access to some resource using a mutex. If a thread acquires this mutex and is subsequently preempted, no other thread can proceed until this thread is rescheduled by the OS. If the scheduler is adversarial, it may never resume execution of the thread, and the program would be effectively deadlocked. A key point, however, is that the mere lack of a lock does not guarantee a program is lock-free. In this context, “lock” really refers to deadlock, livelock, or the misdeeds of a malevolent scheduler.

The concurrent subscription trie, or CS-trie,  is a new take on the trie-based solution described earlier. It combines the idea of the topic-matching trie with that of a Ctrie, or concurrent trie, which is a non-blocking concurrent hash trie.

The fundamental problem with the trie, as it relates to concurrency, is it requires a global lock, which severely limits throughput. To address this, the CS-trie uses indirection nodes, or I-nodes, which remain present in the trie even as the nodes above and below change. Subscriptions are then added or removed by creating a copy of the respective node, and performing a CAS on its parent I-node. This allows us to add, remove, and lookup subscriptions concurrently and in a lock-free, linearizable manner.

For the given set of subscribers, labeled x, y, and z, the CS-trie would look something like the following:

  • x = foo, bar, bar.baz
  • y = foo, bar.qux
  • z = bar.*

Lookups on the CS-trie perform, on average, better than the standard trie, and the CS-trie scales better with respect to concurrent operations.

subscribe unsubscribe lookup
cold 412ns 245ns 1,615ns
hot 471ns 280ns 1,637ns

Latency Comparison

The chart below shows the topic-matching operation latencies for all of the algorithms side-by-side. First, we look at the performance of a cold start (no subscriptions) and then the performance of a hot start (1,000 subscriptions).

Throughput Comparison

So far, we’ve looked at the latency of individual topic-matching operations. Next, we look at overall throughput of each of the algorithms and their memory footprint.

 algorithm msg/sec
naive  4,053.48
inverted bitmap  1,052,315.02
optimized inverted bitmap  130,705.98
trie  248,762.10
cs-trie  340,910.64

On the surface, the inverted bitmap looks like the clear winner, clocking in at over 1 million matches per second. However, we know the inverted bitmap does not scale and, indeed, this becomes clear when we look at memory consumption, underscored by the fact that the below chart uses a log scale.

Scalability with Respect to Concurrency

Lastly, we’ll look at how each of these algorithms scales with respect to concurrency. We do this by performing concurrent operations and varying the level of concurrency and number of operations. We start with a 50-50 split between reads and writes. We vary the number of goroutines from 2 to 16 (the benchmark was run using a 2.6 GHz Intel Core i7 processor with 8 logical cores). Each goroutine performs 1,000 reads or 1,000 writes. For example, the 2-goroutine benchmark performs 1,000 reads and 1,000 writes, the 4-goroutine benchmark performs 2,000 reads and 2,000 writes, etc. We then measure the total amount of time needed to complete the workload.

We can see that the tries hardly even register on the scale above, so we’ll plot them separately.

The tries are clearly much more efficient than the other solutions, but the CS-trie in particular scales well to the increased workload and concurrency.

Since most workloads are heavily biased towards reads over writes, we’ll run a separate benchmark that uses a 90-10 split reads and writes. This should hopefully provide a more realistic result.

The results look, more or less, like what we would expect, with the reduced writes improving the inverted bitmap performance. The CS-trie still scales quite well in comparison to the global-lock trie.

Conclusion

As we’ve seen, there are several approaches to consider to implement fast topic matching. There are also several aspects to look at: read/write access patterns, time complexity, space complexity, throughput, and latency.

The naive hashmap solution is generally a poor choice due to its prohibitively expensive lookup time. Inverted bitmaps offer a better solution. The standard implementation is reasonable if the search space is finite, small, and known a priori, especially if read latency is critical. The space-optimized version is a better choice for scalability, offering a good balance between read and write performance while keeping a small memory footprint. The trie is an even better choice, providing lower latency than the optimized inverted bitmap and consuming less memory. It’s particularly good if the subscription tree is sparse and topics are not known a priori. Lastly, the concurrent subscription trie is the best option if there is high concurrency and throughput matters. It offers similar performance to the trie but scales better. The only downside is an increase in implementation complexity.

Take It to the Limit: Considerations for Building Reliable Systems

Complex systems usually operate in failure mode. This is because a complex system typically consists of many discrete pieces, each of which can fail in isolation (or in concert). In a microservice architecture where a given function potentially comprises several independent service calls, high availability hinges on the ability to be partially available. This is a core tenet behind resilience engineering. If a function depends on three services, each with a reliability of 90%, 95%, and 99%, respectively, partial availability could be the difference between 99.995% reliability and 84% reliability (assuming failures are independent). Resilience engineering means designing with failure as the normal.

Anticipating failure is the first step to resilience zen, but the second is embracing it. Telling the client “no” and failing on purpose is better than failing in unpredictable or unexpected ways. Backpressure is another critical resilience engineering pattern. Fundamentally, it’s about enforcing limits. This comes in the form of queue lengths, bandwidth throttling, traffic shaping, message rate limits, max payload sizes, etc. Prescribing these restrictions makes the limits explicit when they would otherwise be implicit (eventually your server will exhaust its memory, but since the limit is implicit, it’s unclear exactly when or what the consequences might be). Relying on unbounded queues and other implicit limits is like someone saying they know when to stop drinking because they eventually pass out.

Rate limiting is important not just to prevent bad actors from DoSing your system, but also yourself. Queue limits and message size limits are especially interesting because they seem to confuse and frustrate developers who haven’t fully internalized the motivation behind them. But really, these are just another form of rate limiting or, more generally, backpressure. Let’s look at max message size as a case study.

Imagine we have a system of distributed actors. An actor can send messages to other actors who, in turn, process the messages and may choose to send messages themselves. Now, as any good software engineer knows, the eighth fallacy of distributed computing is “the network is homogenous.” This means not all actors are using the same hardware, software, or network configuration. We have servers with 128GB RAM running Ubuntu, laptops with 16GB RAM running macOS, mobile clients with 2GB RAM running Android, IoT edge devices with 512MB RAM, and everything in between, all running a hodgepodge of software and network interfaces.

When we choose not to put an upper bound on message sizes, we are making an implicit assumption (recall the discussion on implicit/explicit limits from earlier). Put another way, you and everyone you interact with (likely unknowingly) enters an unspoken contract of which neither party can opt out. This is because any actor may send a message of arbitrary size. This means any downstream consumers of this message, either directly or indirectly, must also support arbitrarily large messages.

How can we test something that is arbitrary? We can’t. We have two options: either we make the limit explicit or we keep this implicit, arbitrarily binding contract. The former allows us to define our operating boundaries and gives us something to test. The latter requires us to test at some undefined production-level scale. The second option is literally gambling reliability for convenience. The limit is still there, it’s just hidden. When we don’t make it explicit, we make it easy to DoS ourselves in production. Limits become even more important when dealing with cloud infrastructure due to their multitenant nature. They prevent a bad actor (or yourself) from bringing down services or dominating infrastructure and system resources.

In our heterogeneous actor system, we have messages bound for mobile devices and web browsers, which are often single-threaded or memory-constrained consumers. Without an explicit limit on message size, a client could easily doom itself by requesting too much data or simply receiving data outside of its control—this is why the contract is unspoken but binding.

Let’s look at this from a different kind of engineering perspective. Consider another type of system: the US National Highway System. The US Department of Transportation uses the Federal Bridge Gross Weight Formula as a means to prevent heavy vehicles from damaging roads and bridges. It’s really the same engineering problem, just a different discipline and a different type of infrastructure.

The August 2007 collapse of the Interstate 35W Mississippi River bridge in Minneapolis brought renewed attention to the issue of truck weights and their relation to bridge stress. In November 2008, the National Transportation Safety Board determined there had been several reasons for the bridge’s collapse, including (but not limited to): faulty gusset plates, inadequate inspections, and the extra weight of heavy construction equipment combined with the weight of rush hour traffic.

The DOT relies on weigh stations to ensure trucks comply with federal weight regulations, fining those that exceed restrictions without an overweight permit.

The federal maximum weight is set at 80,000 pounds. Trucks exceeding the federal weight limit can still operate on the country’s highways with an overweight permit, but such permits are only issued before the scheduled trip and expire at the end of the trip. Overweight permits are only issued for loads that cannot be broken down to smaller shipments that fall below the federal weight limit, and if there is no other alternative to moving the cargo by truck.

Weight limits need to be enforced so civil engineers have a defined operating range for the roads, bridges, and other infrastructure they build. Computers are no different. This is the reason many systems enforce these types of limits. For example, Amazon clearly publishes the limits for its Simple Queue Service—the max in-flight messages for standard queues is 120,000 messages and 20,000 messages for FIFO queues. Messages are limited to 256KB in size. Amazon KinesisApache KafkaNATS, and Google App Engine pull queues all limit messages to 1MB in size. These limits allow the system designers to optimize their infrastructure and ameliorate some of the risks of multitenancy—not to mention it makes capacity planning much easier.

Unbounded anything—whether its queues, message sizes, queries, or traffic—is a resilience engineering anti-pattern. Without explicit limits, things fail in unexpected and unpredictable ways. Remember, the limits exist, they’re just hidden. By making them explicit, we restrict the failure domain giving us more predictability, longer mean time between failures, and shorter mean time to recovery at the cost of more upfront work or slightly more complexity.

It’s better to be explicit and handle these limits upfront than to punt on the problem and allow systems to fail in unexpected ways. The latter might seem like less work at first but will lead to more problems long term. By requiring developers to deal with these limitations directly, they will think through their APIs and business logic more thoroughly and design better interactions with respect to stability, scalability, and performance.

Benchmarking Commit Logs

In this article, we look at Apache Kafka and NATS Streaming, two messaging systems based on the idea of a commit log. We’ll compare some of the features of both but spend less time talking about Kafka since by now it’s quite well known. Similar to previous studies, we’ll attempt to quantify their general performance characteristics through careful benchmarking.

The purpose of this benchmark is to test drive the newly released NATS Streaming system, which was made generally available just in the last few months. NATS Streaming doesn’t yet support clustering, so we try to put its performance into context by looking at a similar configuration of Kafka.

Unlike conventional message queues, commit logs are an append-only data structure. This results in several nice properties like total ordering of messages, at-least-once delivery, and message-replay semantics. Jay Kreps’ blog post The Log is a great introduction to the concept and particularly why it’s so useful in the context of distributed systems and stream processing (his book I Heart Logs is an extended version of the blog post and is a quick read).

Kafka, which originated at LinkedIn, is by far the most popular and most mature implementation of the commit log (AWS offers their own flavor of it called Kinesis, and imitation is the sincerest form of flattery). It’s billed as a “distributed streaming platform for building real-time data pipelines and streaming apps.” The much newer NATS Streaming is actually a data-streaming layer built on top of Apcera’s high-performance publish-subscribe system NATS. It’s billed as “real-time streaming for Big Data, IoT, Mobile, and Cloud Native Applications.” Both have some similarities as well as some key differences.

Fundamental to the notion of a log is a way to globally order events. Neither NATS Streaming nor Kafka are actually a single log but many logs, each totally ordered using a sequence number or offset, respectively.

In Kafka, topics are partitioned into multiple logs which are then replicated across a number of servers for fault tolerance, making it a distributed commit log. Each partition has a server that acts as the leader. Cluster membership and leader election is managed by ZooKeeper.

NATS Streaming’s topics are called “channels” which are globally ordered. Unlike Kafka, NATS Streaming does not support replication or partitioning of channels, though my understanding is clustering support is slated for Q1 2017. Its message store is pluggable, so it can provide durability using a file-backed implementation, like Kafka, or simply an in-memory store.

NATS Streaming is closer to a hybrid of traditional message queues and the commit log. Like Kafka, it allows replaying the log from a specific offset, the beginning of time, or the newest offset, but it also exposes an API for reading from the log at a specific physical time offset, e.g. all messages from the last 30 seconds. Kafka, on the other hand, only has a notion of logical offsets (correction: Kafka added support for offset lookup by timestamp in 0.10.1.0) . Generally, relying on physical time is an anti-pattern in distributed systems due to clock drift and the fact that clocks are not always monotonic. For example, imagine a situation where a NATS Streaming server is restarted and the clock is changed. Messages are still ordered by their sequence numbers but their timestamps might not reflect that. Developers would need to be aware of this while implementing their business logic.

With Kafka, it’s strictly on consumers to track their offset into the log (or the high-level consumer which stores offsets in ZooKeeper (correction: Kafka itself can now store offsets which is used by the new Consumer API, meaning clients do not have to manage offsets directly or rely on ZooKeeper)). NATS Streaming allows clients to either track their sequence number or use a durable subscription, which causes the server to track the last acknowledged message for a client. If the client restarts, the server will resume delivery starting at the earliest unacknowledged message. This is closer to what you would expect from a traditional message-oriented middleware like RabbitMQ.

Lastly, NATS Streaming supports publisher and subscriber rate limiting. This works by configuring the maximum number of in-flight (unacknowledged) messages either from the publisher to the server or from the server to the subscriber. Starting in version 0.9, Kafka supports a similar rate limiting feature that allows producer and consumer byte-rate thresholds to be defined for groups of clients with its Quotas protocol.

Kafka was designed to avoid tracking any client state on the server for performance and scalability reasons. Throughput and storage capacity scale linearly with the number of nodes. NATS Streaming provides some additional features over Kafka at the cost of some added state on the server. Since clustering isn’t supported, there isn’t really any scale or HA story yet, so it’s unclear how that will play out. That said, once replication is supported, there’s a lot of work going into verifying its correctness (which is a major advantage Kafka has).

Benchmarks

Since NATS Streaming does not support replication at this time (0.3.1), we’ll compare running a single instance of it with file-backed persistence to running a single instance of Kafka (0.10.1.0). We’ll look at both latency and throughput running on commodity hardware (m4.xlarge EC2 instances) with load generation and consumption each running on separate instances. In all of these benchmarks, the systems under test have not been tuned at all and are essentially in their “off-the-shelf” configurations.

We’ll first look at latency by publishing messages of various sizes, ranging from 256 bytes to 1MB, at a fixed rate of 50 messages/second for 30 seconds. Message contents are randomized to account for compression. We then plot the latency distribution by percentile on a logarithmic scale from the 0th percentile to the 99.9999th percentile. Benchmarks are run several times in an attempt to produce a “normalized” result. The benchmark code used is open source.

First, to establish a baseline and later get a feel for the overhead added by the file system, we’ll benchmark NATS Streaming with in-memory storage, meaning messages are not written to disk.

Unsurprisingly, the 1MB configuration has much higher latencies than the other configurations, but everything falls within single-digit-millisecond latencies.nats_mem

NATS Streaming 0.3.1 (in-memory persistence)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 0.3750ms 1.0367ms 1.1257ms 1.1257ms 1.1257ms
1KB 0.38064ms 0.8321ms 1.3260ms 1.3260ms 1.3260ms
5KB 0.4408ms 1.7569ms 2.1465ms 2.1465ms 2.1465ms
1MB 6.6337ms 8.8097ms 9.5263ms 9.5263ms 9.5263ms

Next, we look at NATS Streaming with file-backed persistence. This provides the same durability guarantees as Kafka running with a replication factor of 1. By default, Kafka stores logs under /tmp. Many Unix distributions mount /tmp to tmpfs which appears as a mounted file system but is actually stored in volatile memory. To account for this and provide as level a playing field as possible, we configure NATS Streaming to also store its logs in /tmp.

As expected, latencies increase by about an order of magnitude once we start going to disk.

nats_file_fsync

NATS Streaming 0.3.1 (file-backed persistence)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 21.7051ms 25.0369ms 27.0524ms 27.0524ms 27.0524ms
1KB 20.6090ms 23.8858ms 24.7124ms 24.7124ms 24.7124ms
5KB 22.1692ms 35.7394ms 40.5612ms 40.5612ms 40.5612ms
1MB 45.2490ms 130.3972ms 141.1564ms 141.1564ms 141.1564ms

Since we will be looking at Kafka, there is an important thing to consider relating to fsync behavior. As of version 0.8, Kafka does not call fsync directly and instead relies entirely on the background flush performed by the OS. This is clearly indicated by their documentation:

We recommend using the default flush settings which disable application fsync entirely. This means relying on the background flush done by the OS and Kafka’s own background flush. This provides the best of all worlds for most uses: no knobs to tune, great throughput and latency, and full recovery guarantees. We generally feel that the guarantees provided by replication are stronger than sync to local disk, however the paranoid still may prefer having both and application level fsync policies are still supported.

However, NATS Streaming calls fsync every time a batch is written to disk by default. This can be disabled through the use of the –file_sync flag. By setting this flag to false, we put NATS Streaming’s persistence behavior closer in line with Kafka’s (again assuming a replication factor of 1).

As an aside, the comparison between NATS Streaming and Kafka still isn’t completely “fair”. Jay Kreps points out that Kafka relies on replication as the primary means of durability.

Kafka leaves [fsync] off by default because it relies on replication not fsync for durability, which is generally faster. If you don’t have replication I think you probably need fsync and maybe some kind of high integrity file system.

I don’t think we can provide a truly fair comparison until NATS Streaming supports replication, at which point we will revisit this.

To no one’s surprise, setting –file_sync=false has a significant impact on latency, shown in the distribution below.

nats_file_no_fsync

In fact, it’s now in line with the in-memory performance as before for 256B, 1KB, and 5KB messages, shown in the comparison below.

nats_file_mem

For a reason I have yet to figure out, the latency for 1MB messages is roughly an order of magnitude faster when fsync is enabled after the 95th percentile, which seems counterintuitive. If anyone has an explanation, I would love to hear it. I’m sure there’s a good debug story there. The distribution below shows the 1MB configuration for NATS Streaming with and without fsync enabled and just how big the difference is at the 95th percentile and beyond.

nats_file_mem_1mb

NATS Streaming 0.3.1 (file-backed persistence, –file_sync=false)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 0.4304ms 0.8577ms 1.0706ms 1.0706ms 1.0706ms
1KB 0.4372ms 1.5987ms 1.8651ms 1.8651ms 1.8651ms
5KB 0.4939ms 2.0828ms 2.2540ms 2.2540ms 2.2540ms
1MB 1296.1464ms 1556.1441ms 1596.1457ms 1596.1457ms 1596.1457ms

Kafka with replication factor 1 tends to have higher latencies than NATS Streaming with –file_sync=false. There was one potential caveat here Ivan Kozlovic pointed out to me in that NATS Streaming uses a caching optimization for reads that may put it at an advantage.

Now, there is one side where NATS Streaming *may* be looking better and not fair to Kafka. By default, the file store keeps everything in memory once stored. This means look-ups will be fast. There is only a all-or-nothing mode right now, which means either cache everything or nothing. With caching disabled (–file_cache=false), every lookup will result in disk access (which when you have 1 to many subscribers will be bad). I am working on changing that. But if you do notice that in Kafka, consuming results in a disk read (given the other default behavior described above, they actually may not ;-)., then you could disable NATS Streaming file caching.

Fortunately, we can verify if Kafka is actually going to disk to read messages back from the log during the benchmark using iostat. We see something like this for the majority of the benchmark duration:

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          13.53    0.00   11.28    0.00    0.00   75.19

Device:    tps   Blk_read/s   Blk_wrtn/s   Blk_read   Blk_wrtn
xvda      0.00         0.00         0.00          0          0

Specifically, we’re interested in Blk_read, which indicates the total number of blocks read. It appears that Kafka does indeed make heavy use of the operating system’s page cache as Blk_wrtn and Blk_read rarely show any activity throughout the entire benchmark. As such, it seems fair to leave NATS Streaming’s –file_cache=true, which is the default.

One interesting point is Kafka offloads much of its caching to the page cache and outside of the JVM heap, clearly in an effort to minimize GC pauses. I’m not clear if the cache Ivan refers to in NATS Streaming is off-heap or not (NATS Streaming is written in Go which, like Java, is a garbage-collected language).

Below is the distribution of latencies for 256B, 1KB, and 5KB configurations in Kafka.

kafka

Similar to NATS Streaming, 1MB message latencies tend to be orders of magnitude worse after about the 80th percentile. The distribution below compares the 1MB configuration for NATS Streaming and Kafka.

nats_kafka_1mb

Kafka 0.10.1.0 (replication factor 1)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 0.9230ms 1.4575ms 1.6596ms 1.6596ms 1.6596ms
1KB 0.5942ms 1.3123ms 17.6556ms 17.6556ms 17.6556ms
5KB 0.7203ms 5.7236ms 18.9334ms 18.9334ms 18.9334ms
1MB 5337.3174ms 5597.3315ms 5617.3199ms 5617.3199ms 5617.3199ms

The percentile distributions below compare NATS Streaming and Kafka for the 256B, 1KB, and 5KB configurations, respectively.

nats_kafka_256b

nats_kafka_1kb

nats_kafka_5kb

Next, we’ll look at overall throughput for the two systems. This is done by publishing 100,000 messages using the same range of sizes as before and measuring the elapsed time. Specifically, we measure throughput at the publisher and the subscriber.

Despite using an asynchronous publisher in both the NATS Streaming and Kafka benchmarks, we do not consider the publisher “complete” until it has received acks for all published messages from the server. In Kafka, we do this by setting request.required.acks to 1, which means the leader replica has received the data, and consuming the received acks. This is important because the default value is 0, which means the producer never waits for an ack from the broker. In NATS Streaming, we provide an ack callback on every publish. We use the same benchmark configuration as the latency tests, separating load generation and consumption on different EC2 instances. Note the log scale in the following charts.

Once again, we’ll start by looking at NATS Streaming using in-memory persistence. The truncated 1MB send and receive throughputs are 93.01 messages/second.

nats_mem_throughput

For comparison, we now look at NATS Streaming with file persistence and –file_sync=false. As before, this provides the closest behavior to Kafka’s default flush behavior. The second chart shows a side-by-side comparison between NATS Streaming with in-memory and file persistence.

nats_file_throughput

nats_compare_throughput

Lastly, we look at Kafka with replication factor 1. Throughput significantly deteriorates when we set request.required.acks = 1 since the producer must wait for all acks from the server. This is important though because, by default, the client does not require an ack from the server. If this were the case, the producer would have no idea how much data actually reached the server once it finished—it could simply be buffered in the client, in flight over the wire, or in the server but not yet on disk. Running the benchmark with request.required.acks = 0 yields much higher throughput on the sender but is basically an exercise in how fast you can write to a channel using the Sarama Go client—slightly misleading.

kafka_throughput

Looking at some comparisons of Kafka and NATS Streaming, we can see that NATS Streaming has higher throughput in all but a few cases.

nats_kafka_throughput

nats_kafka_send_throughput

I want to repeat the disclaimer from before: the purpose of this benchmark is to test drive the newly released NATS Streaming system (which as mentioned earlier, doesn’t yet support clustering), and put its performance into context by looking at a similar configuration of Kafka.

Kafka generally scales very well, so measuring the throughput of a single broker with a single producer and single consumer isn’t particularly meaningful. In reality, we’d be running a cluster with several brokers and partitioning our topics across them.

For as young as it is, NATS Streaming has solid performance (which shouldn’t come as much of a surprise considering the history of NATS itself), and I imagine it will only get better with time as the NATS team continues to optimize. In some ways, NATS Streaming bridges the gap between the commit log as made popular by Kafka and the conventional message queue as made popular by protocols like JMS, AMQP, STOMP, and the like.

The bigger question at this point is how NATS Streaming will tackle scaling and replication (a requirement for true production-readiness in my opinion). Kafka was designed from the ground up for high scalability and availability through the use of external coordination (read ZooKeeper). Naturally, there is a lot of complexity and cost that comes with that. NATS Streaming attempts to keep NATS’ spirit of simplicity, but it’s yet to be seen how it will reconcile that with the complex nature of distributed systems. I’m excited to see where Apcera takes NATS Streaming and generally the NATS ecosystem in the future since the team has a lot of experience in this area.