FIFO, Exactly-Once, and Other Costs

There’s been a lot of discussion about exactly-once semantics lately, sparked by the recent announcement of support for it in Kafka 0.11. I’ve already written at length about strong guarantees in messaging.

My former coworker Kevin Sookocheff recently made a post about ordered and exactly-once message delivery as it relates to Amazon SQS. It does a good job of illustrating what the trade-offs are, and I want to drive home some points.

In the article, Kevin shows how FIFO delivery is really only meaningful when you have one single-threaded publisher and one single-threaded receiver. Amazon’s FIFO queues allow you to control how restrictive this requirement is by applying ordering on a per-group basis. In other words, we can improve throughput if we can partition work into different ordered groups rather than a single totally ordered group. However, FIFO still effectively limits throughput on a group to a single publisher and single subscriber. If there are multiple publishers, they have to coordinate to ensure ordering is preserved with respect to our application’s semantics. On the subscriber side, things are simpler because SQS will only deliver messages in a group one at a time in order amongst subscribers.

Amazon’s FIFO queues also have an exactly-once processing feature which deduplicates messages within a five-minute window. Note, however, that there are some caveats with this, the obvious one being duplicate delivery outside of the five-minute window. A mission-critical system would have to be designed to account for this possibility. My argument here is if you still have to account for it, what’s the point unless the cost of detecting duplicates is prohibitively expensive? But to be fair, five minutes probably reduces the likelihood enough to the point that it’s useful and in those rare cases where it fails, the duplicate is acceptable.

The more interesting caveat is that FIFO queues do not guarantee exactly-once delivery to consumers (which, as we know, is impossible). Rather, they offer exactly-once processing by guaranteeing that once a message has successfully been acknowledged as processed, it won’t be delivered again. It’s up to applications to ack appropriately. When a message is delivered to a consumer, it remains in the queue until it’s acked. The visibility timeout prevents other consumers from processing it. With FIFO queues, this also means head-of-line blocking for other messages in the same group.

Now, let’s assume a subscriber receives a batch of messages from the queue, processes them—perhaps by storing some results to a database—and then sends an acknowledgement back to SQS which removes them from the queue. It’s entirely possible that during that process step a delay happens—a prolonged GC pause, crash, network delay, whatever. When this happens, the visibility timeout expires and the messages are redelivered and, potentially, reprocessed. What has to happen here is essentially cooperation between the queue and processing step. We might do this by using a database transaction to atomically process and acknowledge the messages. An alternative, yet similar, approach might be to use a write-ahead-log-like strategy whereby the consuming system reads messages from SQS and transactionally stores them in a database for future processing. Once the messages have been committed, the consumer deletes the messages from SQS. In either of these approaches, we’re basically shifting the onus of exactly-once processing onto an ACID-compliant relational database.

Note that this is really how Kafka achieves its exactly-once semantics. It requires end-to-end cooperation for exactly-once to work. State changes in your application need to be committed transactionally with your Kafka offsets.

As Kevin points out, FIFO SQS queues offer exactly-once processing only if 1) publishers never publish duplicate messages wider than five minutes apart and 2) consumers never fail to delete messages they have processed from the queue. Solving either of these problems probably requires some kind of coordination between the application and queue, likely in the form of a database transaction. And if you’re using a database either as the message source, sink, or both, what are exactly-once FIFO queues actually buying you? You’re paying a seemingly large cost in throughput for little perceived value. Your messages are already going through some sort of transactional boundary that provides ordering and uniqueness.

Where I see FIFO and exactly-once semantics being useful is when talking to systems which cannot cooperate with the end-to-end transaction. This might be a legacy service or a system with side effects, such as sending an email. Often in the case of these “distributed workflows”, latency is a lower priority and humans can be involved in various steps. Other use cases might be scheduled integrations with legacy batch processes where throughput is known a priori. These can simply be re-run when errors occur.

When people describe a messaging system with FIFO and exactly-once semantics, they’re usually providing a poor description of a relational database with support for ACID transactions. Providing these semantics in a messaging system likely still involves database transactions, it’s just more complicated. It turns out relational databases are really good at ensuring invariants like exactly-once.

I’ve picked on Kafka a bit in the past, especially with the exactly-once announcement, but my issue is not with Kafka itself. Kafka is a fantastic technology. It’s well-architected, battle-tested, and the team behind it is talented and knows the space well. My issue is more with some of the intangible costs associated with it. The same goes for similar systems (like exactly-once FIFO SQS queues). Beyond just the operational complexity (which Confluent is attempting to tackle with its Kafka-as-a-service), you have to get developer understanding. This is harder than it sounds in any modestly-sized organization. That’s not to say that developers are dumb or incapable of understanding, but the fact is your average developer is simply not thinking about all of the edge cases brought on by operating distributed systems at scale. They see “exactly-once FIFO queues” in SQS or “exactly-once delivery” in Kafka and take it at face value. They don’t read beyond the headline. They don’t look for the caveats. That’s why I took issue with how Kafka claimed to do the impossible with exactly-once delivery when it’s really exactly-once processing or, as I’ve come to call it, “atomic processing.” Henry Robinson put it best when talking about the Kafka announcement:

If I were to rewrite the article, I’d structure it thus: “exactly-once looks like atomic broadcast. Atomic broadcast is impossible. Here’s how exactly-once might fail, and here’s why we think you shouldn’t be worried about it.” That’s a harder argument for users to swallow…

Basically “exactly-once” markets better. It’s something developers can latch onto, but it’s also misleading. I know it’s only a matter of time before people start linking me to the Confluent post saying, “see, exactly-once is easy!” But this is just pain deferral. On the contrary, exactly-once semantics require careful construction of your application, assume a closed, transactional world, and do not support the case where I think people want exactly-once the most: side effects.

Interestingly, one of my chief concerns about Kafka’s implementation was what the difficulty of ensuring end-to-end cooperation would be in practice. Side effects into downstream systems with no support for idempotency or transactions could make it difficult. Jay’s counterpoint to this was that the majority of users are using good old-fashioned relational databases, so all you really need to do is commit your offsets and state changes together. It’s not trivial, but it’s not that much harder than avoiding partial updates on failure if you’re updating multiple tables. This brings us back to two of the original points of contention: why not merely use the database for exactly-once in the first place and what about legacy systems?

That’s not to say exactly-once semantics, as offered in systems like SQS and Kafka, are not useful. I think we just need to be more conscientious of the other costs and encourage developers to more deeply understand the solution space—too much sprinkling on of Kafka or exactly-once or FIFO and not enough thinking about the actual business problem. Too much prescribing of solutions and not enough describing of problems.

My thanks to Kevin Sookocheff and Beau Lyddon for reviewing this post.

You Cannot Have Exactly-Once Delivery Redux

A couple years ago I wrote You Cannot Have Exactly-Once Delivery. It stirred up quite a bit of discussion and was even referenced in a book, which I found rather surprising considering I’m not exactly an academic. Recently, the topic of exactly-once delivery has again become a popular point of discussion, particularly with the release of Kafka 0.11, which introduces support for idempotent producers, transactional writes across multiple partitions, and—wait for it—exactly-once semantics.

Naturally, when this hit Hacker News, I received a lot of messages from people asking me, “what gives?” There’s literally a TechCrunch headline titled, Confluent achieves holy grail of “exactly once” delivery on Kafka messaging service (Jay assures me, they don’t write the headlines). The myth has been disproved!

First, let me say what Confluent has accomplished with Kafka is an impressive achievement and one worth celebrating. They made a monumental effort to implement these semantics, and it paid off. The intention of this post is not to minimize any of that work but to try to clarify a few key points and hopefully cut down on some of the misinformation and noise.

“Exactly-once delivery” is a poor term. The word “delivery” is overloaded. Frankly, I think it’s a marketing word. The better term is “exactly-once processing.” Some call the distinction pedantic, but I think it’s important and there is some nuance. Kafka did not solve the Two Generals Problem. Exactly-once delivery, at the transport level, is impossible. It doesn’t exist in any meaningful way and isn’t all that interesting to talk about. “We have a word for infinite packet delay—outage,” as Jay puts it. That’s why TCP exists, but TCP doesn’t care about your application semantics. And in the end, that’s what’s interesting—application semantics. My problem with “exactly-once delivery” is it assumes too much, which causes a lot of folks to make bad assumptions. “Delivery” is a transport semantic. “Processing” is an application semantic.

All is not lost, however. We can still get correct results by having our application cooperate with the processing pipeline. This is essentially what Kafka does, exactly-once processing, and Confluent makes note of that in the blog post towards the end. What does this mean?

To achieve exactly-once processing semantics, we must have a closed system with end-to-end support for modeling input, output, and processor state as a single, atomic operation. Kafka supports this by providing a new transaction API and idempotent producers. Any state changes in your application need to be made atomically in conjunction with Kafka. You must commit your state changes and offsets together. It requires architecting your application in a specific way. State changes in external systems must be part of the Kafka transaction. Confluent’s goal is to make this as easy as possible by providing the platform around Kafka with its streams and connector APIs. The point here is it’s not just a switch you flip and, magically, messages are delivered exactly once. It requires careful construction, application logic coordination, isolating state change and non-determinism, and maintaining a closed system around Kafka. Applications that use the consumer API still have to do this themselves. As Neha puts it in the post, it’s not “magical pixie dust.” This is the most important part of the post and, if it were up to me, would be at the very top.

Exactly-once processing is an end-to-end guarantee and the application has to be designed to not violate the property as well. If you are using the consumer API, this means ensuring that you commit changes to your application state concordant with your offsets as described here.

Side effects into downstream systems with no support for idempotency or distributed transactions make this really difficult in practice I suspect. The argument is that most people are using relational databases that support transactions, but I think there’s still a reasonably large, non-obvious assumption here. Making your event processing atomic might not be easy in all cases. Moreover, every part in your system needs to participate to ensure end-to-end, exactly-once semantics.

Several other messaging systems like TIBCO EMS and Azure Service Bus have provided similar transactional processing guarantees. Kafka, as I understand it, attempts to make it easier and with less performance overhead. That’s a great accomplishment.

What’s really worth drawing attention to is the effort made by Confluent to deliver a correct solution. Achieving exactly-once processing, in and of itself, is relatively “easy” (I use that word loosely). What’s hard is dealing with the range of failures. The announcement shows they’ve done extensive testing, likely much more than most other systems, and have shown that it works and with minimal performance impact.

Kafka provides exactly-once processing semantics because it’s a closed system. There is still a lot of difficulty in ensuring those semantics are maintained across external services, but Confluent attempts to ameliorate this through APIs and tooling. But that’s just it: it’s not exactly-once semantics in a building block that’s the hard thing, it’s building loosely coupled systems that agree on the state of the world. Nevertheless, there is no holy grail here, just some good ole’ fashioned hard work.

Special thanks to Jay Kreps and Sean T. Allen for their feedback on an early draft of this post. Any inaccuracies or opinions are mine alone.

Smart Endpoints, Dumb Pipes

I read an interesting article recently called How do you cut a monolith in half? There are a lot of thoughts in the article that resonate with me and some that I disagree with, prompting this response.

The overall message of the article is don’t use a message broker to break apart a monolith because it’s like a cross between a load balancer and a database, with the disadvantages of both and the advantages of neither. The author argues that message brokers are a popular way to pull apart components over a network because they have low setup cost and provide easy service discovery, but they come at a high operational cost. My response to that is the same advice the author puts forward: it depends.

I think it’s important not to conflate “message broker” and “message queue.” The article uses them interchangeably, but it’s really talking about the latter, which I see as a subset of the former. Queues provide, well, queuing semantics. They try to ensure delivery of a message or, more generally, distribution of work. As the author puts it: “In practice, a message broker is a service that transforms network errors and machine failures into filled disks.” Replace “broker” with “queue” and I agree with this statement. This is really describing systems like RabbitMQ, Amazon SQS, TIBCO EMS, IronMQ, and maybe even Kafka fits into that category.

People are easily seduced by “fat” middleware—systems with more features, more capabilities, more responsibilities—because they think it makes their lives easier, and it might at first. Pushing off more responsibility onto the infrastructure makes the application simpler, but it also makes the infrastructure more complex, more fragile, and more slow. Take exactly-once message delivery, for example. Lots of people want it, but the pursuit of it introduces a host of complexity, overhead (in terms of development, operations, and performance), and risk. The end result is something that, in addition to these things, requires all downstream systems to not introduce duplicates and be mindful about their side effects. That is, everything in the processing pipeline must be exactly-once or nothing is. So typically what you end up with is an approximation of exactly-once delivery. You make big investments to lower the likelihood of duplicates, but you still have to deal with the problem. This might make sense if the cost of having duplicates is high, but that doesn’t seem like the common case. My advice is to always opt for the simple solution. We tend to think of engineering challenges as technical problems when, in reality, they’re often just mindset problems. Usually the technical problems have already been solved if we can just adjust our mindset.

There are a couple things to keep in mind here. The first thing to consider is simply capability lock-in. As you push more and more logic off onto more and more specialized middleware, you make it harder to move off it or change things. The second is what we already hinted at. Even with smart middleware, problems still leak out and you have to handle them at the edge—you’re now being taxed twice. This is essentially the end-to-end argument. Push responsibility to the edges, smart endpoints, dumb pipes, etc. It’s the idea that if you need business-level guarantees, build them into the business layer because the infrastructure doesn’t care about them.

The article suggests for short-lived tasks, use a load balancer because with a queue, you’ll end up building a load balancer along with an ad-hoc RPC system, with extra latency. For long-lived tasks, use a database because with a queue, you’ll be building a lock manager, a database, and a scheduler.

A persistent message queue is not bad in itself, but relying on it for recovery, and by extension, correct behaviour, is fraught with peril.

So why the distinction between message brokers and message queues? The point is not all message brokers need to be large, complicated pieces of infrastructure like most message queues tend to be. This was the reason I gravitated towards NATS while architecting Workiva’s messaging platform and why last month I joined Apcera to work on NATS full time.

When Derek Collison originally wrote NATS it was largely for the reasons stated in the article and for the reasonstalk about frequently. It was out of frustration with the current state of the art. In my opinion, NATS was the first system in the space that really turned the way we did messaging on its head (outside of maybe ZeroMQ). It didn’t provide any strong delivery guarantees, transactions, message persistence, or other responsibilities usually assumed by message brokers (there is a layer that provides some of these things, but it’s not baked into the core technology). Instead, NATS prioritized availability, simplicity, and performance over everything else. A simple technology in a vast sea of complexity (my marketing game is strong).

NATS is no-frills pub/sub. It solves the problem of service discovery and work assignment, assumes no other responsibilities, and gets out of your way. It’s designed to be easy to use, easy to operate, and add minimal latency even at scale so that, unlike many other brokers, it is a good way to integrate your microservices. What makes NATS interesting is what it doesn’t do and what it gains by not doing them. Simplicity is a feature—the ultimate sophistication, according to da Vinci. I call it looking at the negative space.

The article reads:

A protocol is the rules and expectations of participants in a system, and how they are beholden to each other. A protocol defines who takes responsibility for failure.

The problem with message brokers, and queues, is that no-one does.

NATS plays to the strengths of the end-to-end principle. It’s a dumb pipe. Handle failures and retries at the client and NATS will do everything it can to remain available and fast. Don’t rely on fragile guarantees or semantics. Instead, face complexity head-on. The author states what you really want is request/reply, which is one point I disagree on. RPC is a bad abstraction for building distributed systems. Use simple, versatile primitives and embrace asynchrony and messaging.

So yes, be careful about relying on message brokers. How smart should the pipes really be? More to the point, be careful about relying on strong semantics because experience shows few things are guaranteed when working with distributed systems at scale. Err to the side of simple. Make few assumptions of your middleware. Push work out of your infrastructure and to the edges if you care about performance and scalability because nothing is harder to scale (or operate) than slow infrastructure that tries to do too much.

Take It to the Limit: Considerations for Building Reliable Systems

Complex systems usually operate in failure mode. This is because a complex system typically consists of many discrete pieces, each of which can fail in isolation (or in concert). In a microservice architecture where a given function potentially comprises several independent service calls, high availability hinges on the ability to be partially available. This is a core tenet behind resilience engineering. If a function depends on three services, each with a reliability of 90%, 95%, and 99%, respectively, partial availability could be the difference between 99.995% reliability and 84% reliability (assuming failures are independent). Resilience engineering means designing with failure as the normal.

Anticipating failure is the first step to resilience zen, but the second is embracing it. Telling the client “no” and failing on purpose is better than failing in unpredictable or unexpected ways. Backpressure is another critical resilience engineering pattern. Fundamentally, it’s about enforcing limits. This comes in the form of queue lengths, bandwidth throttling, traffic shaping, message rate limits, max payload sizes, etc. Prescribing these restrictions makes the limits explicit when they would otherwise be implicit (eventually your server will exhaust its memory, but since the limit is implicit, it’s unclear exactly when or what the consequences might be). Relying on unbounded queues and other implicit limits is like someone saying they know when to stop drinking because they eventually pass out.

Rate limiting is important not just to prevent bad actors from DoSing your system, but also yourself. Queue limits and message size limits are especially interesting because they seem to confuse and frustrate developers who haven’t fully internalized the motivation behind them. But really, these are just another form of rate limiting or, more generally, backpressure. Let’s look at max message size as a case study.

Imagine we have a system of distributed actors. An actor can send messages to other actors who, in turn, process the messages and may choose to send messages themselves. Now, as any good software engineer knows, the eighth fallacy of distributed computing is “the network is homogenous.” This means not all actors are using the same hardware, software, or network configuration. We have servers with 128GB RAM running Ubuntu, laptops with 16GB RAM running macOS, mobile clients with 2GB RAM running Android, IoT edge devices with 512MB RAM, and everything in between, all running a hodgepodge of software and network interfaces.

When we choose not to put an upper bound on message sizes, we are making an implicit assumption (recall the discussion on implicit/explicit limits from earlier). Put another way, you and everyone you interact with (likely unknowingly) enters an unspoken contract of which neither party can opt out. This is because any actor may send a message of arbitrary size. This means any downstream consumers of this message, either directly or indirectly, must also support arbitrarily large messages.

How can we test something that is arbitrary? We can’t. We have two options: either we make the limit explicit or we keep this implicit, arbitrarily binding contract. The former allows us to define our operating boundaries and gives us something to test. The latter requires us to test at some undefined production-level scale. The second option is literally gambling reliability for convenience. The limit is still there, it’s just hidden. When we don’t make it explicit, we make it easy to DoS ourselves in production. Limits become even more important when dealing with cloud infrastructure due to their multitenant nature. They prevent a bad actor (or yourself) from bringing down services or dominating infrastructure and system resources.

In our heterogeneous actor system, we have messages bound for mobile devices and web browsers, which are often single-threaded or memory-constrained consumers. Without an explicit limit on message size, a client could easily doom itself by requesting too much data or simply receiving data outside of its control—this is why the contract is unspoken but binding.

Let’s look at this from a different kind of engineering perspective. Consider another type of system: the US National Highway System. The US Department of Transportation uses the Federal Bridge Gross Weight Formula as a means to prevent heavy vehicles from damaging roads and bridges. It’s really the same engineering problem, just a different discipline and a different type of infrastructure.

The August 2007 collapse of the Interstate 35W Mississippi River bridge in Minneapolis brought renewed attention to the issue of truck weights and their relation to bridge stress. In November 2008, the National Transportation Safety Board determined there had been several reasons for the bridge’s collapse, including (but not limited to): faulty gusset plates, inadequate inspections, and the extra weight of heavy construction equipment combined with the weight of rush hour traffic.

The DOT relies on weigh stations to ensure trucks comply with federal weight regulations, fining those that exceed restrictions without an overweight permit.

The federal maximum weight is set at 80,000 pounds. Trucks exceeding the federal weight limit can still operate on the country’s highways with an overweight permit, but such permits are only issued before the scheduled trip and expire at the end of the trip. Overweight permits are only issued for loads that cannot be broken down to smaller shipments that fall below the federal weight limit, and if there is no other alternative to moving the cargo by truck.

Weight limits need to be enforced so civil engineers have a defined operating range for the roads, bridges, and other infrastructure they build. Computers are no different. This is the reason many systems enforce these types of limits. For example, Amazon clearly publishes the limits for its Simple Queue Service—the max in-flight messages for standard queues is 120,000 messages and 20,000 messages for FIFO queues. Messages are limited to 256KB in size. Amazon KinesisApache KafkaNATS, and Google App Engine pull queues all limit messages to 1MB in size. These limits allow the system designers to optimize their infrastructure and ameliorate some of the risks of multitenancy—not to mention it makes capacity planning much easier.

Unbounded anything—whether its queues, message sizes, queries, or traffic—is a resilience engineering anti-pattern. Without explicit limits, things fail in unexpected and unpredictable ways. Remember, the limits exist, they’re just hidden. By making them explicit, we restrict the failure domain giving us more predictability, longer mean time between failures, and shorter mean time to recovery at the cost of more upfront work or slightly more complexity.

It’s better to be explicit and handle these limits upfront than to punt on the problem and allow systems to fail in unexpected ways. The latter might seem like less work at first but will lead to more problems long term. By requiring developers to deal with these limitations directly, they will think through their APIs and business logic more thoroughly and design better interactions with respect to stability, scalability, and performance.

Benchmarking Commit Logs

In this article, we look at Apache Kafka and NATS Streaming, two messaging systems based on the idea of a commit log. We’ll compare some of the features of both but spend less time talking about Kafka since by now it’s quite well known. Similar to previous studies, we’ll attempt to quantify their general performance characteristics through careful benchmarking.

The purpose of this benchmark is to test drive the newly released NATS Streaming system, which was made generally available just in the last few months. NATS Streaming doesn’t yet support clustering, so we try to put its performance into context by looking at a similar configuration of Kafka.

Unlike conventional message queues, commit logs are an append-only data structure. This results in several nice properties like total ordering of messages, at-least-once delivery, and message-replay semantics. Jay Kreps’ blog post The Log is a great introduction to the concept and particularly why it’s so useful in the context of distributed systems and stream processing (his book I Heart Logs is an extended version of the blog post and is a quick read).

Kafka, which originated at LinkedIn, is by far the most popular and most mature implementation of the commit log (AWS offers their own flavor of it called Kinesis, and imitation is the sincerest form of flattery). It’s billed as a “distributed streaming platform for building real-time data pipelines and streaming apps.” The much newer NATS Streaming is actually a data-streaming layer built on top of Apcera’s high-performance publish-subscribe system NATS. It’s billed as “real-time streaming for Big Data, IoT, Mobile, and Cloud Native Applications.” Both have some similarities as well as some key differences.

Fundamental to the notion of a log is a way to globally order events. Neither NATS Streaming nor Kafka are actually a single log but many logs, each totally ordered using a sequence number or offset, respectively.

In Kafka, topics are partitioned into multiple logs which are then replicated across a number of servers for fault tolerance, making it a distributed commit log. Each partition has a server that acts as the leader. Cluster membership and leader election is managed by ZooKeeper.

NATS Streaming’s topics are called “channels” which are globally ordered. Unlike Kafka, NATS Streaming does not support replication or partitioning of channels, though my understanding is clustering support is slated for Q1 2017. Its message store is pluggable, so it can provide durability using a file-backed implementation, like Kafka, or simply an in-memory store.

NATS Streaming is closer to a hybrid of traditional message queues and the commit log. Like Kafka, it allows replaying the log from a specific offset, the beginning of time, or the newest offset, but it also exposes an API for reading from the log at a specific physical time offset, e.g. all messages from the last 30 seconds. Kafka, on the other hand, only has a notion of logical offsets (correction: Kafka added support for offset lookup by timestamp in 0.10.1.0) . Generally, relying on physical time is an anti-pattern in distributed systems due to clock drift and the fact that clocks are not always monotonic. For example, imagine a situation where a NATS Streaming server is restarted and the clock is changed. Messages are still ordered by their sequence numbers but their timestamps might not reflect that. Developers would need to be aware of this while implementing their business logic.

With Kafka, it’s strictly on consumers to track their offset into the log (or the high-level consumer which stores offsets in ZooKeeper (correction: Kafka itself can now store offsets which is used by the new Consumer API, meaning clients do not have to manage offsets directly or rely on ZooKeeper)). NATS Streaming allows clients to either track their sequence number or use a durable subscription, which causes the server to track the last acknowledged message for a client. If the client restarts, the server will resume delivery starting at the earliest unacknowledged message. This is closer to what you would expect from a traditional message-oriented middleware like RabbitMQ.

Lastly, NATS Streaming supports publisher and subscriber rate limiting. This works by configuring the maximum number of in-flight (unacknowledged) messages either from the publisher to the server or from the server to the subscriber. Starting in version 0.9, Kafka supports a similar rate limiting feature that allows producer and consumer byte-rate thresholds to be defined for groups of clients with its Quotas protocol.

Kafka was designed to avoid tracking any client state on the server for performance and scalability reasons. Throughput and storage capacity scale linearly with the number of nodes. NATS Streaming provides some additional features over Kafka at the cost of some added state on the server. Since clustering isn’t supported, there isn’t really any scale or HA story yet, so it’s unclear how that will play out. That said, once replication is supported, there’s a lot of work going into verifying its correctness (which is a major advantage Kafka has).

Benchmarks

Since NATS Streaming does not support replication at this time (0.3.1), we’ll compare running a single instance of it with file-backed persistence to running a single instance of Kafka (0.10.1.0). We’ll look at both latency and throughput running on commodity hardware (m4.xlarge EC2 instances) with load generation and consumption each running on separate instances. In all of these benchmarks, the systems under test have not been tuned at all and are essentially in their “off-the-shelf” configurations.

We’ll first look at latency by publishing messages of various sizes, ranging from 256 bytes to 1MB, at a fixed rate of 50 messages/second for 30 seconds. Message contents are randomized to account for compression. We then plot the latency distribution by percentile on a logarithmic scale from the 0th percentile to the 99.9999th percentile. Benchmarks are run several times in an attempt to produce a “normalized” result. The benchmark code used is open source.

First, to establish a baseline and later get a feel for the overhead added by the file system, we’ll benchmark NATS Streaming with in-memory storage, meaning messages are not written to disk.

Unsurprisingly, the 1MB configuration has much higher latencies than the other configurations, but everything falls within single-digit-millisecond latencies.nats_mem

NATS Streaming 0.3.1 (in-memory persistence)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 0.3750ms 1.0367ms 1.1257ms 1.1257ms 1.1257ms
1KB 0.38064ms 0.8321ms 1.3260ms 1.3260ms 1.3260ms
5KB 0.4408ms 1.7569ms 2.1465ms 2.1465ms 2.1465ms
1MB 6.6337ms 8.8097ms 9.5263ms 9.5263ms 9.5263ms

Next, we look at NATS Streaming with file-backed persistence. This provides the same durability guarantees as Kafka running with a replication factor of 1. By default, Kafka stores logs under /tmp. Many Unix distributions mount /tmp to tmpfs which appears as a mounted file system but is actually stored in volatile memory. To account for this and provide as level a playing field as possible, we configure NATS Streaming to also store its logs in /tmp.

As expected, latencies increase by about an order of magnitude once we start going to disk.

nats_file_fsync

NATS Streaming 0.3.1 (file-backed persistence)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 21.7051ms 25.0369ms 27.0524ms 27.0524ms 27.0524ms
1KB 20.6090ms 23.8858ms 24.7124ms 24.7124ms 24.7124ms
5KB 22.1692ms 35.7394ms 40.5612ms 40.5612ms 40.5612ms
1MB 45.2490ms 130.3972ms 141.1564ms 141.1564ms 141.1564ms

Since we will be looking at Kafka, there is an important thing to consider relating to fsync behavior. As of version 0.8, Kafka does not call fsync directly and instead relies entirely on the background flush performed by the OS. This is clearly indicated by their documentation:

We recommend using the default flush settings which disable application fsync entirely. This means relying on the background flush done by the OS and Kafka’s own background flush. This provides the best of all worlds for most uses: no knobs to tune, great throughput and latency, and full recovery guarantees. We generally feel that the guarantees provided by replication are stronger than sync to local disk, however the paranoid still may prefer having both and application level fsync policies are still supported.

However, NATS Streaming calls fsync every time a batch is written to disk by default. This can be disabled through the use of the –file_sync flag. By setting this flag to false, we put NATS Streaming’s persistence behavior closer in line with Kafka’s (again assuming a replication factor of 1).

As an aside, the comparison between NATS Streaming and Kafka still isn’t completely “fair”. Jay Kreps points out that Kafka relies on replication as the primary means of durability.

Kafka leaves [fsync] off by default because it relies on replication not fsync for durability, which is generally faster. If you don’t have replication I think you probably need fsync and maybe some kind of high integrity file system.

I don’t think we can provide a truly fair comparison until NATS Streaming supports replication, at which point we will revisit this.

To no one’s surprise, setting –file_sync=false has a significant impact on latency, shown in the distribution below.

nats_file_no_fsync

In fact, it’s now in line with the in-memory performance as before for 256B, 1KB, and 5KB messages, shown in the comparison below.

nats_file_mem

For a reason I have yet to figure out, the latency for 1MB messages is roughly an order of magnitude faster when fsync is enabled after the 95th percentile, which seems counterintuitive. If anyone has an explanation, I would love to hear it. I’m sure there’s a good debug story there. The distribution below shows the 1MB configuration for NATS Streaming with and without fsync enabled and just how big the difference is at the 95th percentile and beyond.

nats_file_mem_1mb

NATS Streaming 0.3.1 (file-backed persistence, –file_sync=false)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 0.4304ms 0.8577ms 1.0706ms 1.0706ms 1.0706ms
1KB 0.4372ms 1.5987ms 1.8651ms 1.8651ms 1.8651ms
5KB 0.4939ms 2.0828ms 2.2540ms 2.2540ms 2.2540ms
1MB 1296.1464ms 1556.1441ms 1596.1457ms 1596.1457ms 1596.1457ms

Kafka with replication factor 1 tends to have higher latencies than NATS Streaming with –file_sync=false. There was one potential caveat here Ivan Kozlovic pointed out to me in that NATS Streaming uses a caching optimization for reads that may put it at an advantage.

Now, there is one side where NATS Streaming *may* be looking better and not fair to Kafka. By default, the file store keeps everything in memory once stored. This means look-ups will be fast. There is only a all-or-nothing mode right now, which means either cache everything or nothing. With caching disabled (–file_cache=false), every lookup will result in disk access (which when you have 1 to many subscribers will be bad). I am working on changing that. But if you do notice that in Kafka, consuming results in a disk read (given the other default behavior described above, they actually may not ;-)., then you could disable NATS Streaming file caching.

Fortunately, we can verify if Kafka is actually going to disk to read messages back from the log during the benchmark using iostat. We see something like this for the majority of the benchmark duration:

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          13.53    0.00   11.28    0.00    0.00   75.19

Device:    tps   Blk_read/s   Blk_wrtn/s   Blk_read   Blk_wrtn
xvda      0.00         0.00         0.00          0          0

Specifically, we’re interested in Blk_read, which indicates the total number of blocks read. It appears that Kafka does indeed make heavy use of the operating system’s page cache as Blk_wrtn and Blk_read rarely show any activity throughout the entire benchmark. As such, it seems fair to leave NATS Streaming’s –file_cache=true, which is the default.

One interesting point is Kafka offloads much of its caching to the page cache and outside of the JVM heap, clearly in an effort to minimize GC pauses. I’m not clear if the cache Ivan refers to in NATS Streaming is off-heap or not (NATS Streaming is written in Go which, like Java, is a garbage-collected language).

Below is the distribution of latencies for 256B, 1KB, and 5KB configurations in Kafka.

kafka

Similar to NATS Streaming, 1MB message latencies tend to be orders of magnitude worse after about the 80th percentile. The distribution below compares the 1MB configuration for NATS Streaming and Kafka.

nats_kafka_1mb

Kafka 0.10.1.0 (replication factor 1)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 0.9230ms 1.4575ms 1.6596ms 1.6596ms 1.6596ms
1KB 0.5942ms 1.3123ms 17.6556ms 17.6556ms 17.6556ms
5KB 0.7203ms 5.7236ms 18.9334ms 18.9334ms 18.9334ms
1MB 5337.3174ms 5597.3315ms 5617.3199ms 5617.3199ms 5617.3199ms

The percentile distributions below compare NATS Streaming and Kafka for the 256B, 1KB, and 5KB configurations, respectively.

nats_kafka_256b

nats_kafka_1kb

nats_kafka_5kb

Next, we’ll look at overall throughput for the two systems. This is done by publishing 100,000 messages using the same range of sizes as before and measuring the elapsed time. Specifically, we measure throughput at the publisher and the subscriber.

Despite using an asynchronous publisher in both the NATS Streaming and Kafka benchmarks, we do not consider the publisher “complete” until it has received acks for all published messages from the server. In Kafka, we do this by setting request.required.acks to 1, which means the leader replica has received the data, and consuming the received acks. This is important because the default value is 0, which means the producer never waits for an ack from the broker. In NATS Streaming, we provide an ack callback on every publish. We use the same benchmark configuration as the latency tests, separating load generation and consumption on different EC2 instances. Note the log scale in the following charts.

Once again, we’ll start by looking at NATS Streaming using in-memory persistence. The truncated 1MB send and receive throughputs are 93.01 messages/second.

nats_mem_throughput

For comparison, we now look at NATS Streaming with file persistence and –file_sync=false. As before, this provides the closest behavior to Kafka’s default flush behavior. The second chart shows a side-by-side comparison between NATS Streaming with in-memory and file persistence.

nats_file_throughput

nats_compare_throughput

Lastly, we look at Kafka with replication factor 1. Throughput significantly deteriorates when we set request.required.acks = 1 since the producer must wait for all acks from the server. This is important though because, by default, the client does not require an ack from the server. If this were the case, the producer would have no idea how much data actually reached the server once it finished—it could simply be buffered in the client, in flight over the wire, or in the server but not yet on disk. Running the benchmark with request.required.acks = 0 yields much higher throughput on the sender but is basically an exercise in how fast you can write to a channel using the Sarama Go client—slightly misleading.

kafka_throughput

Looking at some comparisons of Kafka and NATS Streaming, we can see that NATS Streaming has higher throughput in all but a few cases.

nats_kafka_throughput

nats_kafka_send_throughput

I want to repeat the disclaimer from before: the purpose of this benchmark is to test drive the newly released NATS Streaming system (which as mentioned earlier, doesn’t yet support clustering), and put its performance into context by looking at a similar configuration of Kafka.

Kafka generally scales very well, so measuring the throughput of a single broker with a single producer and single consumer isn’t particularly meaningful. In reality, we’d be running a cluster with several brokers and partitioning our topics across them.

For as young as it is, NATS Streaming has solid performance (which shouldn’t come as much of a surprise considering the history of NATS itself), and I imagine it will only get better with time as the NATS team continues to optimize. In some ways, NATS Streaming bridges the gap between the commit log as made popular by Kafka and the conventional message queue as made popular by protocols like JMS, AMQP, STOMP, and the like.

The bigger question at this point is how NATS Streaming will tackle scaling and replication (a requirement for true production-readiness in my opinion). Kafka was designed from the ground up for high scalability and availability through the use of external coordination (read ZooKeeper). Naturally, there is a lot of complexity and cost that comes with that. NATS Streaming attempts to keep NATS’ spirit of simplicity, but it’s yet to be seen how it will reconcile that with the complex nature of distributed systems. I’m excited to see where Apcera takes NATS Streaming and generally the NATS ecosystem in the future since the team has a lot of experience in this area.