Take It to the Limit: Considerations for Building Reliable Systems

Complex systems usually operate in failure mode. This is because a complex system typically consists of many discrete pieces, each of which can fail in isolation (or in concert). In a microservice architecture where a given function potentially comprises several independent service calls, high availability hinges on the ability to be partially available. This is a core tenet behind resilience engineering. If a function depends on three services, each with a reliability of 90%, 95%, and 99%, respectively, partial availability could be the difference between 99.995% reliability and 84% reliability (assuming failures are independent). Resilience engineering means designing with failure as the normal.

Anticipating failure is the first step to resilience zen, but the second is embracing it. Telling the client “no” and failing on purpose is better than failing in unpredictable or unexpected ways. Backpressure is another critical resilience engineering pattern. Fundamentally, it’s about enforcing limits. This comes in the form of queue lengths, bandwidth throttling, traffic shaping, message rate limits, max payload sizes, etc. Prescribing these restrictions makes the limits explicit when they would otherwise be implicit (eventually your server will exhaust its memory, but since the limit is implicit, it’s unclear exactly when or what the consequences might be). Relying on unbounded queues and other implicit limits is like someone saying they know when to stop drinking because they eventually pass out.

Rate limiting is important not just to prevent bad actors from DoSing your system, but also yourself. Queue limits and message size limits are especially interesting because they seem to confuse and frustrate developers who haven’t fully internalized the motivation behind them. But really, these are just another form of rate limiting or, more generally, backpressure. Let’s look at max message size as a case study.

Imagine we have a system of distributed actors. An actor can send messages to other actors who, in turn, process the messages and may choose to send messages themselves. Now, as any good software engineer knows, the eighth fallacy of distributed computing is “the network is homogenous.” This means not all actors are using the same hardware, software, or network configuration. We have servers with 128GB RAM running Ubuntu, laptops with 16GB RAM running macOS, mobile clients with 2GB RAM running Android, IoT edge devices with 512MB RAM, and everything in between, all running a hodgepodge of software and network interfaces.

When we choose not to put an upper bound on message sizes, we are making an implicit assumption (recall the discussion on implicit/explicit limits from earlier). Put another way, you and everyone you interact with (likely unknowingly) enters an unspoken contract of which neither party can opt out. This is because any actor may send a message of arbitrary size. This means any downstream consumers of this message, either directly or indirectly, must also support arbitrarily large messages.

How can we test something that is arbitrary? We can’t. We have two options: either we make the limit explicit or we keep this implicit, arbitrarily binding contract. The former allows us to define our operating boundaries and gives us something to test. The latter requires us to test at some undefined production-level scale. The second option is literally gambling reliability for convenience. The limit is still there, it’s just hidden. When we don’t make it explicit, we make it easy to DoS ourselves in production. Limits become even more important when dealing with cloud infrastructure due to their multitenant nature. They prevent a bad actor (or yourself) from bringing down services or dominating infrastructure and system resources.

In our heterogeneous actor system, we have messages bound for mobile devices and web browsers, which are often single-threaded or memory-constrained consumers. Without an explicit limit on message size, a client could easily doom itself by requesting too much data or simply receiving data outside of its control—this is why the contract is unspoken but binding.

Let’s look at this from a different kind of engineering perspective. Consider another type of system: the US National Highway System. The US Department of Transportation uses the Federal Bridge Gross Weight Formula as a means to prevent heavy vehicles from damaging roads and bridges. It’s really the same engineering problem, just a different discipline and a different type of infrastructure.

The August 2007 collapse of the Interstate 35W Mississippi River bridge in Minneapolis brought renewed attention to the issue of truck weights and their relation to bridge stress. In November 2008, the National Transportation Safety Board determined there had been several reasons for the bridge’s collapse, including (but not limited to): faulty gusset plates, inadequate inspections, and the extra weight of heavy construction equipment combined with the weight of rush hour traffic.

The DOT relies on weigh stations to ensure trucks comply with federal weight regulations, fining those that exceed restrictions without an overweight permit.

The federal maximum weight is set at 80,000 pounds. Trucks exceeding the federal weight limit can still operate on the country’s highways with an overweight permit, but such permits are only issued before the scheduled trip and expire at the end of the trip. Overweight permits are only issued for loads that cannot be broken down to smaller shipments that fall below the federal weight limit, and if there is no other alternative to moving the cargo by truck.

Weight limits need to be enforced so civil engineers have a defined operating range for the roads, bridges, and other infrastructure they build. Computers are no different. This is the reason many systems enforce these types of limits. For example, Amazon clearly publishes the limits for its Simple Queue Service—the max in-flight messages for standard queues is 120,000 messages and 20,000 messages for FIFO queues. Messages are limited to 256KB in size. Amazon KinesisApache KafkaNATS, and Google App Engine pull queues all limit messages to 1MB in size. These limits allow the system designers to optimize their infrastructure and ameliorate some of the risks of multitenancy—not to mention it makes capacity planning much easier.

Unbounded anything—whether its queues, message sizes, queries, or traffic—is a resilience engineering anti-pattern. Without explicit limits, things fail in unexpected and unpredictable ways. Remember, the limits exist, they’re just hidden. By making them explicit, we restrict the failure domain giving us more predictability, longer mean time between failures, and shorter mean time to recovery at the cost of more upfront work or slightly more complexity.

It’s better to be explicit and handle these limits upfront than to punt on the problem and allow systems to fail in unexpected ways. The latter might seem like less work at first but will lead to more problems long term. By requiring developers to deal with these limitations directly, they will think through their APIs and business logic more thoroughly and design better interactions with respect to stability, scalability, and performance.

Benchmarking Commit Logs

In this article, we look at Apache Kafka and NATS Streaming, two messaging systems based on the idea of a commit log. We’ll compare some of the features of both but spend less time talking about Kafka since by now it’s quite well known. Similar to previous studies, we’ll attempt to quantify their general performance characteristics through careful benchmarking.

The purpose of this benchmark is to test drive the newly released NATS Streaming system, which was made generally available just in the last few months. NATS Streaming doesn’t yet support clustering, so we try to put its performance into context by looking at a similar configuration of Kafka.

Unlike conventional message queues, commit logs are an append-only data structure. This results in several nice properties like total ordering of messages, at-least-once delivery, and message-replay semantics. Jay Kreps’ blog post The Log is a great introduction to the concept and particularly why it’s so useful in the context of distributed systems and stream processing (his book I Heart Logs is an extended version of the blog post and is a quick read).

Kafka, which originated at LinkedIn, is by far the most popular and most mature implementation of the commit log (AWS offers their own flavor of it called Kinesis, and imitation is the sincerest form of flattery). It’s billed as a “distributed streaming platform for building real-time data pipelines and streaming apps.” The much newer NATS Streaming is actually a data-streaming layer built on top of Apcera’s high-performance publish-subscribe system NATS. It’s billed as “real-time streaming for Big Data, IoT, Mobile, and Cloud Native Applications.” Both have some similarities as well as some key differences.

Fundamental to the notion of a log is a way to globally order events. Neither NATS Streaming nor Kafka are actually a single log but many logs, each totally ordered using a sequence number or offset, respectively.

In Kafka, topics are partitioned into multiple logs which are then replicated across a number of servers for fault tolerance, making it a distributed commit log. Each partition has a server that acts as the leader. Cluster membership and leader election is managed by ZooKeeper.

NATS Streaming’s topics are called “channels” which are globally ordered. Unlike Kafka, NATS Streaming does not support replication or partitioning of channels, though my understanding is clustering support is slated for Q1 2017. Its message store is pluggable, so it can provide durability using a file-backed implementation, like Kafka, or simply an in-memory store.

NATS Streaming is closer to a hybrid of traditional message queues and the commit log. Like Kafka, it allows replaying the log from a specific offset, the beginning of time, or the newest offset, but it also exposes an API for reading from the log at a specific physical time offset, e.g. all messages from the last 30 seconds. Kafka, on the other hand, only has a notion of logical offsets (correction: Kafka added support for offset lookup by timestamp in 0.10.1.0) . Generally, relying on physical time is an anti-pattern in distributed systems due to clock drift and the fact that clocks are not always monotonic. For example, imagine a situation where a NATS Streaming server is restarted and the clock is changed. Messages are still ordered by their sequence numbers but their timestamps might not reflect that. Developers would need to be aware of this while implementing their business logic.

With Kafka, it’s strictly on consumers to track their offset into the log (or the high-level consumer which stores offsets in ZooKeeper (correction: Kafka itself can now store offsets which is used by the new Consumer API, meaning clients do not have to manage offsets directly or rely on ZooKeeper)). NATS Streaming allows clients to either track their sequence number or use a durable subscription, which causes the server to track the last acknowledged message for a client. If the client restarts, the server will resume delivery starting at the earliest unacknowledged message. This is closer to what you would expect from a traditional message-oriented middleware like RabbitMQ.

Lastly, NATS Streaming supports publisher and subscriber rate limiting. This works by configuring the maximum number of in-flight (unacknowledged) messages either from the publisher to the server or from the server to the subscriber. Starting in version 0.9, Kafka supports a similar rate limiting feature that allows producer and consumer byte-rate thresholds to be defined for groups of clients with its Quotas protocol.

Kafka was designed to avoid tracking any client state on the server for performance and scalability reasons. Throughput and storage capacity scale linearly with the number of nodes. NATS Streaming provides some additional features over Kafka at the cost of some added state on the server. Since clustering isn’t supported, there isn’t really any scale or HA story yet, so it’s unclear how that will play out. That said, once replication is supported, there’s a lot of work going into verifying its correctness (which is a major advantage Kafka has).

Benchmarks

Since NATS Streaming does not support replication at this time (0.3.1), we’ll compare running a single instance of it with file-backed persistence to running a single instance of Kafka (0.10.1.0). We’ll look at both latency and throughput running on commodity hardware (m4.xlarge EC2 instances) with load generation and consumption each running on separate instances. In all of these benchmarks, the systems under test have not been tuned at all and are essentially in their “off-the-shelf” configurations.

We’ll first look at latency by publishing messages of various sizes, ranging from 256 bytes to 1MB, at a fixed rate of 50 messages/second for 30 seconds. Message contents are randomized to account for compression. We then plot the latency distribution by percentile on a logarithmic scale from the 0th percentile to the 99.9999th percentile. Benchmarks are run several times in an attempt to produce a “normalized” result. The benchmark code used is open source.

First, to establish a baseline and later get a feel for the overhead added by the file system, we’ll benchmark NATS Streaming with in-memory storage, meaning messages are not written to disk.

Unsurprisingly, the 1MB configuration has much higher latencies than the other configurations, but everything falls within single-digit-millisecond latencies.nats_mem

NATS Streaming 0.3.1 (in-memory persistence)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 0.3750ms 1.0367ms 1.1257ms 1.1257ms 1.1257ms
1KB 0.38064ms 0.8321ms 1.3260ms 1.3260ms 1.3260ms
5KB 0.4408ms 1.7569ms 2.1465ms 2.1465ms 2.1465ms
1MB 6.6337ms 8.8097ms 9.5263ms 9.5263ms 9.5263ms

Next, we look at NATS Streaming with file-backed persistence. This provides the same durability guarantees as Kafka running with a replication factor of 1. By default, Kafka stores logs under /tmp. Many Unix distributions mount /tmp to tmpfs which appears as a mounted file system but is actually stored in volatile memory. To account for this and provide as level a playing field as possible, we configure NATS Streaming to also store its logs in /tmp.

As expected, latencies increase by about an order of magnitude once we start going to disk.

nats_file_fsync

NATS Streaming 0.3.1 (file-backed persistence)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 21.7051ms 25.0369ms 27.0524ms 27.0524ms 27.0524ms
1KB 20.6090ms 23.8858ms 24.7124ms 24.7124ms 24.7124ms
5KB 22.1692ms 35.7394ms 40.5612ms 40.5612ms 40.5612ms
1MB 45.2490ms 130.3972ms 141.1564ms 141.1564ms 141.1564ms

Since we will be looking at Kafka, there is an important thing to consider relating to fsync behavior. As of version 0.8, Kafka does not call fsync directly and instead relies entirely on the background flush performed by the OS. This is clearly indicated by their documentation:

We recommend using the default flush settings which disable application fsync entirely. This means relying on the background flush done by the OS and Kafka’s own background flush. This provides the best of all worlds for most uses: no knobs to tune, great throughput and latency, and full recovery guarantees. We generally feel that the guarantees provided by replication are stronger than sync to local disk, however the paranoid still may prefer having both and application level fsync policies are still supported.

However, NATS Streaming calls fsync every time a batch is written to disk by default. This can be disabled through the use of the –file_sync flag. By setting this flag to false, we put NATS Streaming’s persistence behavior closer in line with Kafka’s (again assuming a replication factor of 1).

As an aside, the comparison between NATS Streaming and Kafka still isn’t completely “fair”. Jay Kreps points out that Kafka relies on replication as the primary means of durability.

Kafka leaves [fsync] off by default because it relies on replication not fsync for durability, which is generally faster. If you don’t have replication I think you probably need fsync and maybe some kind of high integrity file system.

I don’t think we can provide a truly fair comparison until NATS Streaming supports replication, at which point we will revisit this.

To no one’s surprise, setting –file_sync=false has a significant impact on latency, shown in the distribution below.

nats_file_no_fsync

In fact, it’s now in line with the in-memory performance as before for 256B, 1KB, and 5KB messages, shown in the comparison below.

nats_file_mem

For a reason I have yet to figure out, the latency for 1MB messages is roughly an order of magnitude faster when fsync is enabled after the 95th percentile, which seems counterintuitive. If anyone has an explanation, I would love to hear it. I’m sure there’s a good debug story there. The distribution below shows the 1MB configuration for NATS Streaming with and without fsync enabled and just how big the difference is at the 95th percentile and beyond.

nats_file_mem_1mb

NATS Streaming 0.3.1 (file-backed persistence, –file_sync=false)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 0.4304ms 0.8577ms 1.0706ms 1.0706ms 1.0706ms
1KB 0.4372ms 1.5987ms 1.8651ms 1.8651ms 1.8651ms
5KB 0.4939ms 2.0828ms 2.2540ms 2.2540ms 2.2540ms
1MB 1296.1464ms 1556.1441ms 1596.1457ms 1596.1457ms 1596.1457ms

Kafka with replication factor 1 tends to have higher latencies than NATS Streaming with –file_sync=false. There was one potential caveat here Ivan Kozlovic pointed out to me in that NATS Streaming uses a caching optimization for reads that may put it at an advantage.

Now, there is one side where NATS Streaming *may* be looking better and not fair to Kafka. By default, the file store keeps everything in memory once stored. This means look-ups will be fast. There is only a all-or-nothing mode right now, which means either cache everything or nothing. With caching disabled (–file_cache=false), every lookup will result in disk access (which when you have 1 to many subscribers will be bad). I am working on changing that. But if you do notice that in Kafka, consuming results in a disk read (given the other default behavior described above, they actually may not ;-)., then you could disable NATS Streaming file caching.

Fortunately, we can verify if Kafka is actually going to disk to read messages back from the log during the benchmark using iostat. We see something like this for the majority of the benchmark duration:

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          13.53    0.00   11.28    0.00    0.00   75.19

Device:    tps   Blk_read/s   Blk_wrtn/s   Blk_read   Blk_wrtn
xvda      0.00         0.00         0.00          0          0

Specifically, we’re interested in Blk_read, which indicates the total number of blocks read. It appears that Kafka does indeed make heavy use of the operating system’s page cache as Blk_wrtn and Blk_read rarely show any activity throughout the entire benchmark. As such, it seems fair to leave NATS Streaming’s –file_cache=true, which is the default.

One interesting point is Kafka offloads much of its caching to the page cache and outside of the JVM heap, clearly in an effort to minimize GC pauses. I’m not clear if the cache Ivan refers to in NATS Streaming is off-heap or not (NATS Streaming is written in Go which, like Java, is a garbage-collected language).

Below is the distribution of latencies for 256B, 1KB, and 5KB configurations in Kafka.

kafka

Similar to NATS Streaming, 1MB message latencies tend to be orders of magnitude worse after about the 80th percentile. The distribution below compares the 1MB configuration for NATS Streaming and Kafka.

nats_kafka_1mb

Kafka 0.10.1.0 (replication factor 1)

 Size 99% 99.9% 99.99% 99.999% 99.9999% 
256B 0.9230ms 1.4575ms 1.6596ms 1.6596ms 1.6596ms
1KB 0.5942ms 1.3123ms 17.6556ms 17.6556ms 17.6556ms
5KB 0.7203ms 5.7236ms 18.9334ms 18.9334ms 18.9334ms
1MB 5337.3174ms 5597.3315ms 5617.3199ms 5617.3199ms 5617.3199ms

The percentile distributions below compare NATS Streaming and Kafka for the 256B, 1KB, and 5KB configurations, respectively.

nats_kafka_256b

nats_kafka_1kb

nats_kafka_5kb

Next, we’ll look at overall throughput for the two systems. This is done by publishing 100,000 messages using the same range of sizes as before and measuring the elapsed time. Specifically, we measure throughput at the publisher and the subscriber.

Despite using an asynchronous publisher in both the NATS Streaming and Kafka benchmarks, we do not consider the publisher “complete” until it has received acks for all published messages from the server. In Kafka, we do this by setting request.required.acks to 1, which means the leader replica has received the data, and consuming the received acks. This is important because the default value is 0, which means the producer never waits for an ack from the broker. In NATS Streaming, we provide an ack callback on every publish. We use the same benchmark configuration as the latency tests, separating load generation and consumption on different EC2 instances. Note the log scale in the following charts.

Once again, we’ll start by looking at NATS Streaming using in-memory persistence. The truncated 1MB send and receive throughputs are 93.01 messages/second.

nats_mem_throughput

For comparison, we now look at NATS Streaming with file persistence and –file_sync=false. As before, this provides the closest behavior to Kafka’s default flush behavior. The second chart shows a side-by-side comparison between NATS Streaming with in-memory and file persistence.

nats_file_throughput

nats_compare_throughput

Lastly, we look at Kafka with replication factor 1. Throughput significantly deteriorates when we set request.required.acks = 1 since the producer must wait for all acks from the server. This is important though because, by default, the client does not require an ack from the server. If this were the case, the producer would have no idea how much data actually reached the server once it finished—it could simply be buffered in the client, in flight over the wire, or in the server but not yet on disk. Running the benchmark with request.required.acks = 0 yields much higher throughput on the sender but is basically an exercise in how fast you can write to a channel using the Sarama Go client—slightly misleading.

kafka_throughput

Looking at some comparisons of Kafka and NATS Streaming, we can see that NATS Streaming has higher throughput in all but a few cases.

nats_kafka_throughput

nats_kafka_send_throughput

I want to repeat the disclaimer from before: the purpose of this benchmark is to test drive the newly released NATS Streaming system (which as mentioned earlier, doesn’t yet support clustering), and put its performance into context by looking at a similar configuration of Kafka.

Kafka generally scales very well, so measuring the throughput of a single broker with a single producer and single consumer isn’t particularly meaningful. In reality, we’d be running a cluster with several brokers and partitioning our topics across them.

For as young as it is, NATS Streaming has solid performance (which shouldn’t come as much of a surprise considering the history of NATS itself), and I imagine it will only get better with time as the NATS team continues to optimize. In some ways, NATS Streaming bridges the gap between the commit log as made popular by Kafka and the conventional message queue as made popular by protocols like JMS, AMQP, STOMP, and the like.

The bigger question at this point is how NATS Streaming will tackle scaling and replication (a requirement for true production-readiness in my opinion). Kafka was designed from the ground up for high scalability and availability through the use of external coordination (read ZooKeeper). Naturally, there is a lot of complexity and cost that comes with that. NATS Streaming attempts to keep NATS’ spirit of simplicity, but it’s yet to be seen how it will reconcile that with the complex nature of distributed systems. I’m excited to see where Apcera takes NATS Streaming and generally the NATS ecosystem in the future since the team has a lot of experience in this area.

So You Wanna Go Fast?

I originally proposed this as a GopherCon talk on writing “high-performance Go”, which is why it may seem rambling, incoherent, and—at times—not at all related to Go. The talk was rejected (probably because of the rambling and incoherence), but I still think it’s a subject worth exploring. The good news is, since it was rejected, I can take this where I want. The remainder of this piece is mostly the outline of that talk with some parts filled in, some meandering stories which may or may not pertain to the topic, and some lessons learned along the way. I think it might make a good talk one day, but this will have to do for now.

We work on some interesting things at Workiva—graph traversal, distributed and in-memory calculation engines, low-latency messaging systems, databases optimized for two-dimensional data computation. It turns out, when you want to build a complicated financial-reporting suite with the simplicity and speed of Microsoft Office, and put it entirely in the cloud, you can’t really just plumb some crap together and call it good. It also turns out that when you try to do this, performance becomes kind of important, not because of the complexity of the data—after all, it’s mostly just numbers and formulas—but because of the scale of it. Now, distribute that data in the cloud, consider the security and compliance implications associated with it, add in some collaboration and control mechanisms, and you’ve got yourself some pretty monumental engineering problems.

As I hinted at, performance starts to be really important, whether it’s performing a formula evaluation, publishing a data-change event, or opening up a workbook containing a million rows of data (accountants are weird). A lot of the backend systems powering all of this are, for better or worse, written in Go. Go is, of course, a garbage-collected language, and it compares closely to Java (though the latter has over 20 years invested in it, while the former has about seven).

At this point, you might be asking, “why not C?” It’s honestly a good question to ask, but the reality is there is always history. The first solution was written in Python on Google App Engine (something about MVPs, setting your customers’ expectations low, and giving yourself room to improve?). This was before Go was even a thing, though Java and C were definitely things, but this was a startup. And it was Python. And it was on App Engine. I don’t know exactly what led to those combination of things—I wasn’t there—but, truthfully, App Engine probably played a large role in the company’s early success. Python and App Engine were fast. Not like “this code is fucking fast” fast—what we call performance—more like “we need to get this shit working so we have jobs tomorrow” fast—what we call delivery. I don’t envy that kind of fast, but when you’re a startup trying to disrupt, speed to market matters a hell of a lot more than the speed of your software.

I’ve talked about App Engine at length before. Ultimately, you hit the ceiling of what you can do with it, and you have to migrate off (if you’re a business that is trying to grow, anyway). We hit that migration point at a really weird, uncomfortable time. This was right when Docker was starting to become a thing, and microservices were this thing that everybody was talking about but nobody was doing. Google had been successfully using containers for years, and Netflix was all about microservices. Everybody wanted to be like them, but no one really knew how—but it was the future (unikernels are the new future, by the way).

The problem is—coming from a PaaS like App Engine that does your own laundry—you don’t have the tools, skills, or experience needed to hit the ground running, so you kind of drunkenly stumble your way there. You don’t even have a DevOps team because you didn’t need one! Nobody knew how to use Docker, which is why at the first Dockercon, five people got on stage and presented five solutions to the same problem. It was the blind leading the blind. I love this article by Jesper L. Andersen, How to build stable systems, which contains a treasure trove of practical engineering tips. The very last paragraph of the article reads:

Docker is not mature (Feb 2016). Avoid it in production for now until it matures. Currently Docker is a time sink not fulfilling its promises. This will change over time, so know when to adopt it.

Trying to build microservices using Docker while everyone is stumbling over themselves was, and continues to be, a painful process, exacerbated by the heavy weight suddenly lifted by leaving App Engine. It’s not great if you want to go fast. App Engine made scaling easy by restricting you in what you could do, but once that burden was removed, it was off to the races. What people might not have realized, however, was that App Engine also made distributed systems easy by restricting you in what you could do. Some seem to think the limitations enforced by App Engine are there to make their lives harder or make Google richer (trust me, they’d bill you more if they could), so why would we have similar limitations in our own infrastructure? App Engine makes these limitations, of course, so that it can actually scale. Don’t take that for granted.

App Engine was stateless, so the natural tendency once you’re off it was to make everything stateful. And we did. What I don’t think we realized was that we were, in effect, trading one type of fast for the other—performance for delivery. We can build software that’s fast and runs on your desktop PC like in the 90’s, but now you want to put that in the cloud and make it scale? It takes a big infrastructure investment. It also takes a big time investment. Neither of which are good if you want to go fast, especially when you’re using enough microservices, Docker, and Go to rattle the Hacker News fart chamber. You kind of get caught in this endless rut of innovation that you almost lose your balance. Leaving the statelessness of App Engine for more stateful pastures was sort of like an infant learning to walk. You look down and it dawns on you—you have legs! So you run with it, because that’s amazing, and you stumble spectacularly a few times along the way. Finally, you realize maybe running full speed isn’t the best idea for someone who just learned to walk.

We were also making this transition while Go had started reaching critical mass. Every other headline in the tech aggregators was “why we switched to Go and you should too.” And we did. I swear this post has a point.

Tips for Writing High-Performance Go

By now, I’ve forgotten what I was writing about, but I promised this post was about Go. It is, and it’s largely about performance fast, not delivery fast—the two are often at odds with each other. Everything up until this point was mostly just useless context and ranting. But it also shows you that we are solving some hard problems and why we are where we are. There is always history.

I work with a lot of smart people. Many of us have a near obsession with performance, but the point I was attempting to make earlier is we’re trying to push the boundaries of what you can expect from cloud software. App Engine had some rigid boundaries, so we made a change. Since adopting Go, we’ve learned a lot about how to make things fast and how to make Go work in the world of systems programming.

Go’s simplicity and concurrency model make it an appealing choice for backend systems, but the larger question is how does it fare for latency-sensitive applications? Is it worth sacrificing the simplicity of the language to make it faster? Let’s walk through a few areas of performance optimization in Go—namely language features, memory management, and concurrency—and try to make that determination. All of the code for the benchmarks presented here are available on GitHub.

Channels

Channels in Go get a lot of attention because they are a convenient concurrency primitive, but it’s important to be aware of their performance implications. Usually the performance is “good enough” for most cases, but in certain latency-critical situations, they can pose a bottleneck. Channels are not magic. Under the hood, they are just doing locking. This works great in a single-threaded application where there is no lock contention, but in a multithreaded environment, performance significantly degrades. We can mimic a channel’s semantics quite easily using a lock-free ring buffer.

The first benchmark looks at the performance of a single-item-buffered channel and ring buffer with a single producer and single consumer. First, we look at the performance in the single-threaded case (GOMAXPROCS=1).

BenchmarkChannel 3000000 512 ns/op
BenchmarkRingBuffer 20000000 80.9 ns/op

As you can see, the ring buffer is roughly six times faster (if you’re unfamiliar with Go’s benchmarking tool, the first number next to the benchmark name indicates the number of times the benchmark was run before giving a stable result). Next, we look at the same benchmark with GOMAXPROCS=8.

BenchmarkChannel-8 3000000 542 ns/op
BenchmarkRingBuffer-8 10000000 182 ns/op

The ring buffer is almost three times faster.

Channels are often used to distribute work across a pool of workers. In this benchmark, we look at performance with high read contention on a buffered channel and ring buffer. The GOMAXPROCS=1 test shows how channels are decidedly better for single-threaded systems.

BenchmarkChannelReadContention 10000000 148 ns/op
BenchmarkRingBufferReadContention 10000 390195 ns/op

However, the ring buffer is faster in the multithreaded case:

BenchmarkChannelReadContention-8 1000000 3105 ns/op
BenchmarkRingBufferReadContention-8 3000000 411 ns/op

Lastly, we look at performance with contention on both the reader and writer. Again, the ring buffer’s performance is much worse in the single-threaded case but better in the multithreaded case.

BenchmarkChannelContention 10000 160892 ns/op
BenchmarkRingBufferContention 2 806834344 ns/op
BenchmarkChannelContention-8 5000 314428 ns/op
BenchmarkRingBufferContention-8 10000 182557 ns/op

The lock-free ring buffer achieves thread safety using only CAS operations. We can see that deciding to use it over the channel depends largely on the number of OS threads available to the program. For most systems, GOMAXPROCS > 1, so the lock-free ring buffer tends to be a better option when performance matters. Channels are a rather poor choice for performant access to shared state in a multithreaded system.

Defer

Defer is a useful language feature in Go for readability and avoiding bugs related to releasing resources. For example, when we open a file to read, we need to be careful to close it when we’re done. Without defer, we need to ensure the file is closed at each exit point of the function.

This is really error-prone since it’s easy to miss a return point. Defer solves this problem by effectively adding the cleanup code to the stack and invoking it when the enclosing function returns.

At first glance, one would think defer statements could be completely optimized away by the compiler. If I defer something at the beginning of a function, simply insert the closure at each point the function returns. However, it’s more complicated than this. For example, we can defer a call within a conditional statement or a loop. The first case might require the compiler to track the condition leading to the defer. The compiler would also need to be able to determine if a statement can panic since this is another exit point for a function. Statically proving this seems to be, at least on the surface, an undecidable problem.

The point is defer is not a zero-cost abstraction. We can benchmark it to show the performance overhead. In this benchmark, we compare locking a mutex and unlocking it with a defer in a loop to locking a mutex and unlocking it without defer.

BenchmarkMutexDeferUnlock-8 20000000 96.6 ns/op
BenchmarkMutexUnlock-8 100000000 19.5 ns/op

Using defer is almost five times slower in this test. To be fair, we’re looking at a difference of 77 nanoseconds, but in a tight loop on a critical path, this adds up. One trend you’ll notice with these optimizations is it’s usually up to the developer to make a trade-off between performance and readability. Optimization rarely comes free.

Reflection and JSON

Reflection is generally slow and should be avoided for latency-sensitive applications. JSON is a common data-interchange format, but Go’s encoding/json package relies on reflection to marshal and unmarshal structs. With ffjson, we can use code generation to avoid reflection and benchmark the difference.

BenchmarkJSONReflectionMarshal-8 200000 7063 ns/op
BenchmarkJSONMarshal-8 500000 3981 ns/op

BenchmarkJSONReflectionUnmarshal-8 200000 9362 ns/op
BenchmarkJSONUnmarshal-8 300000 5839 ns/op

Code-generated JSON is about 38% faster than the standard library’s reflection-based implementation. Of course, if we’re concerned about performance, we should really avoid JSON altogether. MessagePack is a better option with serialization code that can also be generated. In this benchmark, we use the msgp library and compare its performance to JSON.

BenchmarkMsgpackMarshal-8 3000000 555 ns/op
BenchmarkJSONReflectionMarshal-8 200000 7063 ns/op
BenchmarkJSONMarshal-8 500000 3981 ns/op

BenchmarkMsgpackUnmarshal-8 20000000 94.6 ns/op
BenchmarkJSONReflectionUnmarshal-8 200000 9362 ns/op
BenchmarkJSONUnmarshal-8 300000 5839 ns/op

The difference here is dramatic. Even when compared to the generated JSON serialization code, MessagePack is significantly faster.

If we’re really trying to micro-optimize, we should also be careful to avoid using interfaces, which have some overhead not just with marshaling but also method invocations. As with other kinds of dynamic dispatch, there is a cost of indirection when performing a lookup for the method call at runtime. The compiler is unable to inline these calls.

BenchmarkJSONReflectionUnmarshal-8 200000 9362 ns/op
BenchmarkJSONReflectionUnmarshalIface-8 200000 10099 ns/op

We can also look at the overhead of the invocation lookup, I2T, which converts an interface to its backing concrete type. This benchmark calls the same method on the same struct. The difference is the second one holds a reference to an interface which the struct implements.

BenchmarkStructMethodCall-8 2000000000 0.44 ns/op
BenchmarkIfaceMethodCall-8 1000000000 2.97 ns/op

Sorting is a more practical example that shows the performance difference. In this benchmark, we compare sorting a slice of 1,000,000 structs and 1,000,000 interfaces backed by the same struct. Sorting the structs is nearly 92% faster than sorting the interfaces.

BenchmarkSortStruct-8 10 105276994 ns/op
BenchmarkSortIface-8 5 286123558 ns/op

To summarize, avoid JSON if possible. If you need it, generate the marshaling and unmarshaling code. In general, it’s best to avoid code that relies on reflection and interfaces and instead write code that uses concrete types. Unfortunately, this often leads to a lot of duplicated code, so it’s best to abstract this with code generation. Once again, the trade-off manifests.

Memory Management

Go doesn’t actually expose heap or stack allocation directly to the user. In fact, the words “heap” and “stack” do not appear anywhere in the language specification. This means anything pertaining to the stack and heap are technically implementation-dependent. In practice, of course, Go does have a stack per goroutine and a heap. The compiler does escape analysis to determine if an object can live on the stack or needs to be allocated in the heap.

Unsurprisingly, avoiding heap allocations can be a major area of optimization. By allocating on the stack, we avoid expensive malloc calls, as the benchmark below shows.

BenchmarkAllocateHeap-8 20000000 62.3 ns/op 96 B/op 1 allocs/op
BenchmarkAllocateStack-8 100000000 11.6 ns/op 0 B/op 0 allocs/op

Naturally, passing by reference is faster than passing by value since the former requires copying only a pointer while the latter requires copying values. The difference is negligible with the struct used in these benchmarks, though it largely depends on what has to be copied. Keep in mind there are also likely some compiler optimizations being performed in this synthetic benchmark.

BenchmarkPassByReference-8 1000000000 2.35 ns/op
BenchmarkPassByValue-8 200000000 6.36 ns/op

However, the larger issue with heap allocation is garbage collection. If we’re creating lots of short-lived objects, we’ll cause the GC to thrash. Object pooling becomes quite important in these scenarios. In this benchmark, we compare allocating structs in 10 concurrent goroutines on the heap vs. using a sync.Pool for the same purpose. Pooling yields a 5x improvement.

BenchmarkConcurrentStructAllocate-8 5000000 337 ns/op
BenchmarkConcurrentStructPool-8 20000000 65.5 ns/op

It’s important to point out that Go’s sync.Pool is drained during garbage collection. The purpose of sync.Pool is to reuse memory between garbage collections. One can maintain their own free list of objects to hold onto memory across garbage collection cycles, though this arguably subverts the purpose of a garbage collector. Go’s pprof tool is extremely useful for profiling memory usage. Use it before blindly making memory optimizations.

False Sharing

When performance really matters, you have to start thinking at the hardware level. Formula One driver Jackie Stewart is famous for once saying, “You don’t have to be an engineer to be be a racing driver, but you do have to have mechanical sympathy.” Having a deep understanding of the inner workings of a car makes you a better driver. Likewise, having an understanding of how a computer actually works makes you a better programmer. For example, how is memory laid out? How do CPU caches work? How do hard disks work?

Memory bandwidth continues to be a limited resource in modern CPU architectures, so caching becomes extremely important to prevent performance bottlenecks. Modern multiprocessor CPUs cache data in small lines, typically 64 bytes in size, to avoid expensive trips to main memory. A write to a piece of memory will cause the CPU cache to evict that line to maintain cache coherency. A subsequent read on that address requires a refresh of the cache line. This is a phenomenon known as false sharing, and it’s especially problematic when multiple processors are accessing independent data in the same cache line.

Imagine a struct in Go and how it’s laid out in memory. Let’s use the ring buffer from earlier as an example. Here’s what that struct might normally look like:

The queue and dequeue fields are used to determine producer and consumer positions, respectively. These fields, which are both eight bytes in size, are concurrently accessed and modified by multiple threads to add and remove items from the queue. Since these two fields are positioned contiguously in memory and occupy only 16 bytes of memory, it’s likely they will stored in a single CPU cache line. Therefore, writing to one will result in evicting the other, meaning a subsequent read will stall. In more concrete terms, adding or removing things from the ring buffer will cause subsequent operations to be slower and will result in lots of thrashing of the CPU cache.

We can modify the struct by adding padding between fields. Each padding is the width of a single CPU cache line to guarantee the fields end up in different lines. What we end up with is the following:

How big a difference does padding out CPU cache lines actually make? As with anything, it depends. It depends on the amount of multiprocessing. It depends on the amount of contention. It depends on memory layout. There are many factors to consider, but we should always use data to back our decisions. We can benchmark operations on the ring buffer with and without padding to see what the difference is in practice.

First, we benchmark a single producer and single consumer, each running in a goroutine. With this test, the improvement between padded and unpadded is fairly small, about 15%.

BenchmarkRingBufferSPSC-8 10000000 156 ns/op
BenchmarkRingBufferPaddedSPSC-8 10000000 132 ns/op

However, when we have multiple producers and multiple consumers, say 100 each, the difference becomes slightly more pronounced. In this case, the padded version is about 36% faster.

BenchmarkRingBufferMPMC-8 100000 27763 ns/op
BenchmarkRingBufferPaddedMPMC-8 100000 17860 ns/op

False sharing is a very real problem. Depending on the amount of concurrency and memory contention, it can be worth introducing padding to help alleviate its effects. These numbers might seem negligible, but they start to add up, particularly in situations where every clock cycle counts.

Lock-Freedom

Lock-free data structures are important for fully utilizing multiple cores. Considering Go is targeted at highly concurrent use cases, it doesn’t offer much in the way of lock-freedom. The encouragement seems to be largely directed towards channels and, to a lesser extent, mutexes.

That said, the standard library does offer the usual low-level memory primitives with the atomic package. Compare-and-swap, atomic pointer access—it’s all there. However, use of the atomic package is heavily discouraged:

We generally don’t want sync/atomic to be used at all…Experience has shown us again and again that very very few people are capable of writing correct code that uses atomic operations…If we had thought of internal packages when we added the sync/atomic package, perhaps we would have used that. Now we can’t remove the package because of the Go 1 guarantee.

How hard can lock-free really be though? Just rub some CAS on it and call it a day, right? After a sufficient amount of hubris, I’ve come to learn that it’s definitely a double-edged sword. Lock-free code can get complicated in a hurry. The atomic and unsafe packages are not easy to use, at least not at first. The latter gets its name for a reason. Tread lightly—this is dangerous territory. Even more so, writing lock-free algorithms can be tricky and error-prone. Simple lock-free data structures, like the ring buffer, are pretty manageable, but anything more than that starts to get hairy.

The Ctrie, which I wrote about in detail, was my foray into the world of lock-free data structures beyond your standard fare of queues and lists. Though the theory is reasonably understandable, the implementation is thoroughly complex. In fact, the complexity largely stems from the lack of a native double compare-and-swap, which is needed to atomically compare indirection nodes (to detect mutations on the tree) and node generations (to detect snapshots taken of the tree). Since no hardware provides such an operation, it has to be simulated using standard primitives (and it can).

The first Ctrie implementation was actually horribly broken, and not even because I was using Go’s synchronization primitives incorrectly. Instead, I had made an incorrect assumption about the language. Each node in a Ctrie has a generation associated with it. When a snapshot is taken of the tree, its root node is copied to a new generation. As nodes in the tree are accessed, they are lazily copied to the new generation (à la persistent data structures), allowing for constant-time snapshotting. To avoid integer overflow, we use objects allocated on the heap to demarcate generations. In Go, this is done using an empty struct. In Java, two newly constructed Objects are not equivalent when compared since their memory addresses will be different. I made a blind assumption that the same was true in Go, when in fact, it’s not. Literally the last paragraph of the Go language specification reads:

A struct or array type has size zero if it contains no fields (or elements, respectively) that have a size greater than zero. Two distinct zero-size variables may have the same address in memory.

Oops. The result was that two different generations were considered equivalent, so the double compare-and-swap always succeeded. This allowed snapshots to potentially put the tree in an inconsistent state. That was a fun bug to track down. Debugging highly concurrent, lock-free code is hell. If you don’t get it right the first time, you’ll end up sinking a ton of time into fixing it, but only after some really subtle bugs crop up. And it’s unlikely you get it right the first time. You win this time, Ian Lance Taylor.

But wait! Obviously there’s some payoff with complicated lock-free algorithms or why else would one subject themselves to this? With the Ctrie, lookup performance is comparable to a synchronized map or a concurrent skip list. Inserts are more expensive due to the increased indirection. The real benefit of the Ctrie is its scalability in terms of memory consumption, which, unlike most hash tables, is always a function of the number of keys currently in the tree. The other advantage is its ability to perform constant-time, linearizable snapshots. We can compare performing a “snapshot” on a synchronized map concurrently in 100 different goroutines with the same test using a Ctrie:

BenchmarkConcurrentSnapshotMap-8 1000 9941784 ns/op
BenchmarkConcurrentSnapshotCtrie-8 20000 90412 ns/op

Depending on access patterns, lock-free data structures can offer better performance in multithreaded systems. For example, the NATS message bus uses a synchronized map-based structure to perform subscription matching. When compared with a Ctrie-inspired, lock-free structure, throughput scales a lot better. The blue line is the lock-based data structure, while the red line is the lock-free implementation.

matchbox_bench_1_1

Avoiding locks can be a boon depending on the situation. The advantage was apparent when comparing the ring buffer to the channel. Nonetheless, it’s important to weigh any benefit against the added complexity of the code. In fact, sometimes lock-freedom doesn’t provide any tangible benefit at all!

A Note on Optimization

As we’ve seen throughout this post, performance optimization almost always comes with a cost. Identifying and understanding optimizations themselves is just the first step. What’s more important is understanding when and where to apply them. The famous quote by C. A. R. Hoare, popularized by Donald Knuth, has become a longtime adage of programmers:

The real problem is that programmers have spent far too much time worrying about efficiency in the wrong places and at the wrong times; premature optimization is the root of all evil (or at least most of it) in programming.

Though the point of this quote is not to eliminate optimization altogether, it’s to learn how to strike a balance between speeds—speed of an algorithm, speed of delivery, speed of maintenance, speed of a system. It’s a highly subjective topic, and there is no single rule of thumb. Is premature optimization the root of all evil? Should I just make it work, then make it fast? Does it need to be fast at all? These are not binary decisions. For example, sometimes making it work then making it fast is impossible if there is a fundamental problem in the design.

However, I will say focus on optimizing along the critical path and outward from that only as necessary. The further you get from that critical path, the more likely your return on investment is to diminish and the more time you end up wasting. It’s important to identify what adequate performance is. Do not spend time going beyond that point. This is an area where data-driven decisions are key—be empirical, not impulsive. More important, be practical. There’s no use shaving nanoseconds off of an operation if it just doesn’t matter. There is more to going fast than fast code.

Wrapping Up

If you’ve made it this far, congratulations, there might be something wrong with you. We’ve learned that there are really two kinds of fast in software—delivery and performance.  Customers want the first, developers want the second, and CTOs want both. The first is by far the most important, at least when you’re trying to go to market. The second is something you need to plan for and iterate on. Both are usually at odds with each other.

Perhaps more interestingly, we looked at a few ways we can eke out that extra bit of performance in Go and make it viable for low-latency systems. The language is designed to be simple, but that simplicity can sometimes come at a price. Like the trade-off between the two fasts, there is a similar trade-off between code lifecycle and code performance. Speed comes at the cost of simplicity, at the cost of development time, and at the cost of continued maintenance. Choose wisely.

From the Ground Up: Reasoning About Distributed Systems in the Real World

The rabbit hole is deep. Down and down it goes. Where it ends, nobody knows. But as we traverse it, patterns appear. They give us hope, they quell the fear.

Distributed systems literature is abundant, but as a practitioner, I often find it difficult to know where to start or how to synthesize this knowledge without a more formal background. This is a non-academic’s attempt to provide a line of thought for rationalizing design decisions. This piece doesn’t necessarily contribute any new ideas but rather tries to provide a holistic framework by studying some influential existing ones. It includes references which provide a good starting point for thinking about distributed systems. Specifically, we look at a few formal results and slightly less formal design principles to provide a basis from which we can argue about system design.

This is your last chance. After this, there is no turning back. I wish I could say there is no red-pill/blue-pill scenario at play here, but the world of distributed systems is complex. In order to make sense of it, we reason from the ground up while simultaneously stumbling down the deep and cavernous rabbit hole.

Guiding Principles

In order to reason about distributed system design, it’s important to lay out some guiding principles or theorems used to establish an argument. Perhaps the most fundamental of which is the Two Generals Problem originally introduced by Akkoyunlu et al. in Some Constraints and Trade-offs in the Design of Network Communications and popularized by Jim Gray in Notes on Data Base Operating Systems in 1975 and 1978, respectively. The Two Generals Problem demonstrates that it’s impossible for two processes to agree on a decision over an unreliable network. It’s closely related to the binary consensus problem (“attack” or “don’t attack”) where the following conditions must hold:

  • Termination: all correct processes decide some value (liveness property).
  • Validity: if all correct processes decide v, then v must have been proposed by some correct process (non-triviality property).
  • Integrity: all correct processes decide at most one value v, and is the “right” value (safety property).
  • Agreement: all correct processes must agree on the same value (safety property).

It becomes quickly apparent that any useful distributed algorithm consists of some intersection of both liveness and safety properties. The problem becomes more complicated when we consider an asynchronous network with crash failures:

  • Asynchronous: messages may be delayed arbitrarily long but will eventually be delivered.
  • Crash failure: processes can halt indefinitely.

Considering this environment actually leads us to what is arguably one of the most important results in distributed systems theory: the FLP impossibility result introduced by Fischer, Lynch, and Patterson in their 1985 paper Impossibility of Distributed Consensus with One Faulty Process. This result shows that the Two Generals Problem is provably impossible. When we do not consider an upper bound on the time a process takes to complete its work and respond in a crash-failure model, it’s impossible to make the distinction between a process that is crashed and one that is taking a long time to respond. FLP shows there is no algorithm which deterministically solves the consensus problem in an asynchronous environment when it’s possible for at least one process to crash. Equivalently, we say it’s impossible to have a perfect failure detector in an asynchronous system with crash failures.

When talking about fault-tolerant systems, it’s also important to consider Byzantine faults, which are essentially arbitrary faults. These include, but are not limited to, attacks which might try to subvert the system. For example, a security attack might try to generate or falsify messages. The Byzantine Generals Problem is a generalized version of the Two Generals Problem which describes this fault model. Byzantine fault tolerance attempts to protect against these threats by detecting or masking a bounded number of Byzantine faults.

Why do we care about consensus? The reason is it’s central to so many important problems in system design. Leader election implements consensus allowing you to dynamically promote a coordinator to avoid single points of failure. Distributed databases implement consensus to ensure data consistency across nodes. Message queues implement consensus to provide transactional or ordered delivery. Distributed init systems implement consensus to coordinate processes. Consensus is fundamentally an important problem in distributed programming.

It has been shown time and time again that networks, whether local-area or wide-area, are often unreliable and largely asynchronous. As a result, these proofs impose real and significant challenges to system design.

The implications of these results are not simply academic: these impossibility results have motivated a proliferation of systems and designs offering a range of alternative guarantees in the event of network failures.

L. Peter Deutsch’s fallacies of distributed computing are a key jumping-off point in the theory of distributed systems. It presents a set of incorrect assumptions which many new to the space frequently make, of which the first is “the network is reliable.”

  1. The network is reliable.
  2. Latency is zero.
  3. Bandwidth is infinite.
  4. The network is secure.
  5. Topology doesn’t change.
  6. There is one administrator.
  7. Transport cost is zero.
  8. The network is homogeneous.

The CAP theorem, while recently the subject of scrutiny and debate over whether it’s overstated or not, is a useful tool for establishing fundamental trade-offs in distributed systems and detecting vendor sleight of hand. Gilbert and Lynch’s Perspectives on the CAP Theorem lays out the intrinsic trade-off between safety and liveness in a fault-prone system, while Fox and Brewer’s Harvest, Yield, and Scalable Tolerant Systems characterizes it in a more pragmatic light. I will continue to say unequivocally that the CAP theorem is important within the field of distributed systems and of significance to system designers and practitioners.

A Renewed Hope

Following from the results detailed earlier would imply many distributed algorithms, including those which implement linearizable operations, serializable transactions, and leader election, are a hopeless endeavor. Is it game over? Fortunately, no. Carefully designed distributed systems can maintain correctness without relying on pure coincidence.

First, it’s important to point out that the FLP result does not indicate consensus is unreachable, just that it’s not always reachable in bounded time. Second, the system model FLP uses is, in some ways, a pathological one. Synchronous systems place a known upper bound on message delivery between processes and on process computation. Asynchronous systems have no fixed upper bounds. In practice, systems tend to exhibit partial synchrony, which is described as one of two models by Dwork and Lynch in Consensus in the Presence of Partial Synchrony. In the first model of partial synchrony, fixed bounds exist but they are not known a priori. In the second model, the bounds are known but are only guaranteed to hold starting at unknown time T. Dwork and Lynch present fault-tolerant consensus protocols for both partial-synchrony models combined with various fault models.

Chandra and Toueg introduce the concept of unreliable failure detectors in Unreliable Failure Detectors for Reliable Distributed Systems. Each process has a local, external failure detector which can make mistakes. The detector monitors a subset of the processes in the system and maintains a list of those it suspects to have crashed. Failures are detected by simply pinging each process periodically and suspecting any process which doesn’t respond to the ping within twice the maximum round-trip time for any previous ping. The detector makes a mistake when it erroneously suspects a correct process, but it may later correct the mistake by removing the process from its list of suspects. The presence of failure detectors, even unreliable ones, makes consensus solvable in a slightly relaxed system model.

While consensus ensures processes agree on a value, atomic broadcast ensures processes deliver the same messages in the same order. This same paper shows that the problems of consensus and atomic broadcast are reducible to each other, meaning they are equivalent. Thus, the FLP result and others apply equally to atomic broadcast, which is used in coordination services like Apache ZooKeeper.

In Introduction to Reliable and Secure Distributed Programming, Cachin, Guerraoui, and Rodrigues suggest most practical systems can be described as partially synchronous:

Generally, distributed systems appear to be synchronous. More precisely, for most systems that we know of, it is relatively easy to define physical time bounds that are respected most of the time. There are, however, periods where the timing assumptions do not hold, i.e., periods during which the system is asynchronous. These are periods where the network is overloaded, for instance, or some process has a shortage of memory that slows it down. Typically, the buffer that a process uses to store incoming and outgoing messages may overflow, and messages may thus get lost, violating the time bound on the delivery. The retransmission of the messages may help ensure the reliability of the communication links but introduce unpredictable delays. In this sense, practical systems are partially synchronous.

We capture partial synchrony by assuming timing assumptions only hold eventually without stating exactly when. Similarly, we call the system eventually synchronous. However, this does not guarantee the system is synchronous forever after a certain time, nor does it require the system to be initially asynchronous then after a period of time become synchronous. Instead it implies the system has periods of asynchrony which are not bounded, but there are periods where the system is synchronous long enough for an algorithm to do something useful or terminate. The key thing to remember with asynchronous systems is that they contain no timing assumptions.

Lastly, On the Minimal Synchronism Needed for Distributed Consensus by Dolev, Dwork, and Stockmeyer describes a consensus protocol as t-resilient if it operates correctly when at most t processes fail. In the paper, several critical system parameters and synchronicity conditions are identified, and it’s shown how varying them affects the t-resiliency of an algorithm. Consensus is shown to be provably possible for some models and impossible for others.

Fault-tolerant consensus is made possible by relying on quorums. The intuition is that as long as a majority of processes agree on every decision, there is at least one process which knows about the complete history in the presence of faults.

Deterministic consensus, and by extension a number of other useful algorithms, is impossible in certain system models, but we can model most real-world systems in a way that circumvents this. Nevertheless, it shows the inherent complexities involved with distributed systems and the rigor needed to solve certain problems.

Theory to Practice

What does all of this mean for us in practice? For starters, it means distributed systems are usually a harder problem than they let on. Unfortunately, this is often the cause of improperly documented trade-offs or, in many cases, data loss and safety violations. It also suggests we need to rethink the way we design systems by shifting the focus from system properties and guarantees to business rules and application invariants.

One of my favorite papers is End-To-End Arguments in System Design by Saltzer, Reed, and Clark. It’s an easy read, but it presents a compelling design principle for determining where to place functionality in a distributed system. The principle idea behind the end-to-end argument is that functions placed at a low level in a system may be redundant or of little value when compared to the cost of providing them at that low level. It follows that, in many situations, it makes more sense to flip guarantees “inside out”—pushing them outwards rather than relying on subsystems, middleware, or low-level layers of the stack to maintain them.

To illustrate this, we consider the problem of “careful file transfer.” A file is stored by a file system on the disk of computer A, which is linked by a communication network to computer B. The goal is to move the file from computer A’s storage to computer B’s storage without damage and in the face of various failures along the way. The application in this case is the file-transfer program which relies on storage and network abstractions. We can enumerate just a few of the potential problems an application designer might be concerned with:

  1. The file, though originally written correctly onto the disk at host A, if read now may contain incorrect data, perhaps because of hardware faults in the disk storage system.
  2. The software of the file system, the file transfer program, or the data communication system might make a mistake in buffering and copying the data of the file, either at host A or host B.
  3. The hardware processor or its local memory might have a transient error while doing the buffering and copying, either at host A or host B.
  4. The communication system might drop or change the bits in a packet, or lose a packet or deliver a packet more than once.
  5. Either of the hosts may crash part way through the transaction after performing an unknown amount (perhaps all) of the transaction.

Many of these problems are Byzantine in nature. When we consider each threat one by one, it becomes abundantly clear that even if we place countermeasures in the low-level subsystems, there will still be checks required in the high-level application. For example, we might place checksums, retries, and sequencing of packets in the communication system to provide reliable data transmission, but this really only eliminates threat four. An end-to-end checksum and retry mechanism at the file-transfer level is needed to guard against the remaining threats.

Building reliability into the low level has a number of costs involved. It takes a non-trivial amount of effort to build it. It’s redundant and, in fact, hinders performance by reducing the frequency of application retries and adding unneeded overhead. It also has no actual effect on correctness because correctness is determined and enforced by the end-to-end checksum and retries. The reliability and correctness of the communication system is of little importance, so going out of its way to ensure resiliency does not reduce any burden on the application. In fact, ensuring correctness by relying on the low level might be altogether impossible since threat number two requires writing correct programs, but not all programs involved may be written by the file-transfer application programmer.

Fundamentally, there are two problems with placing functionality at the lower level. First, the lower level is not aware of the application needs or semantics, which means logic placed there is often insufficient. This leads to duplication of logic as seen in the example earlier. Second, other applications which rely on the lower level pay the cost of the added functionality even when they don’t necessarily need it.

Saltzer, Reed, and Clark propose the end-to-end principle as a sort of “Occam’s razor” for system design, arguing that it helps guide the placement of functionality and organization of layers in a system.

Because the communication subsystem is frequently specified before applications that use the subsystem are known, the designer may be tempted to “help” the users by taking on more function than necessary. Awareness of end-to end arguments can help to reduce such temptations.

However, it’s important to note that the end-to-end principle is not a panacea. Rather, it’s a guideline to help get designers to think about their solutions end to end, acknowledge their application requirements, and consider their failure modes. Ultimately, it provides a rationale for moving function upward in a layered system, closer to the application that uses the function, but there are always exceptions to the rule. Low-level mechanisms might be built as a performance optimization. Regardless, the end-to-end argument contends that lower levels should avoid taking on any more responsibility than necessary. The “lessons” section from Google’s Bigtable paper echoes some of these same sentiments:

Another lesson we learned is that it is important to delay adding new features until it is clear how the new features will be used. For example, we initially planned to support general-purpose transactions in our API. Because we did not have an immediate use for them, however, we did not implement them. Now that we have many real applications running on Bigtable, we have been able to examine their actual needs, and have discovered that most applications require only single-row transactions. Where people have requested distributed transactions, the most important use is for maintaining secondary indices, and we plan to add a specialized mechanism to satisfy this need. The new mechanism will be less general than distributed transactions, but will be more efficient (especially for updates that span hundreds of rows or more) and will also interact better with our scheme for optimistic cross-datacenter replication.

We’ll see the end-to-end argument as a common theme throughout the remainder of this piece.

Whose Guarantee Is It Anyway?

Generally, we rely on robust algorithms, transaction managers, and coordination services to maintain consistency and application correctness. The problem with these is twofold: they are often unreliable and they impose a massive performance bottleneck.

Distributed coordination algorithms are difficult to get right. Even tried-and-true protocols like two-phase commit are susceptible to crash failures and network partitions. Protocols which are more fault tolerant like Paxos and Raft generally don’t scale well beyond small clusters or across wide-area networks. Consensus systems like ZooKeeper own your availability, meaning if you depend on one and it goes down, you’re up a creek. Since quorums are often kept small for performance reasons, this might be less rare than you think.

Coordination systems become a fragile and complex piece of your infrastructure, which seems ironic considering they are usually employed to reduce fragility. On the other hand, message-oriented middleware largely use coordination to provide developers with strong guarantees: exactly-once, ordered, transactional delivery and the like.

From transmission protocols to enterprise message brokers, relying on delivery guarantees is an anti-pattern in distributed system design. Delivery semantics are a tricky business. As such, when it comes to distributed messaging, what you want is often not what you need. It’s important to look at the trade-offs involved, how they impact system design (and UX!), and how we can cope with them to make better decisions.

Subtle and not-so-subtle failure modes make providing strong guarantees exceedingly difficult. In fact, some guarantees, like exactly-once delivery, aren’t even really possible to achieve when we consider things like the Two Generals Problem and the FLP result. When we try to provide semantics like guaranteed, exactly-once, and ordered message delivery, we usually end up with something that’s over-engineered, difficult to deploy and operate, fragile, and slow. What is the upside to all of this? Something that makes your life easier as a developer when things go perfectly well, but the reality is things don’t go perfectly well most of the time. Instead, you end up getting paged at 1 a.m. trying to figure out why RabbitMQ told your monitoring everything is awesome while proceeding to take a dump in your front yard.

If you have something that relies on these types of guarantees in production, know that this will happen to you at least once sooner or later (and probably much more than that). Eventually, a guarantee is going to break down. It might be inconsequential, it might not. Not only is this a precarious way to go about designing things, but if you operate at a large scale, care about throughput, or have sensitive SLAs, it’s probably a nonstarter.

The performance implications of distributed transactions are obvious. Coordination is expensive because processes can’t make progress independently, which in turn limits throughput, availability, and scalability. Peter Bailis gave an excellent talk called Silence is Golden: Coordination-Avoiding Systems Design which explains this in great detail and how coordination can be avoided. In it, he explains how distributed transactions can result in nearly a 400x decrease in throughput in certain situations.

Avoiding coordination enables infinite scale-out while drastically improving throughput and availability, but in some cases coordination is unavoidable. In Coordination Avoidance in Database Systems, Bailis et al. answer a key question: when is coordination necessary for correctness? They present a property, invariant confluence (I-confluence), which is necessary and sufficient for safe, coordination-free, available, and convergent execution. I-confluence essentially works by pushing invariants up into the business layer where we specify correctness in terms of application semantics rather than low-level database operations.

Without knowledge of what “correctness” means to your app (e.g., the invariants used in I-confluence), the best you can do to preserve correctness under a read/write model is serializability.

I-confluence can be determined given a set of transactions and a merge function used to reconcile divergent states. If I-confluence holds, there exists a coordination-free execution strategy that preserves invariants. If it doesn’t hold, no such strategy exists—coordination is required. I-confluence allows us to identify when we can and can’t give up coordination, and by pushing invariants up, we remove a lot of potential bottlenecks from areas which don’t require it.

If we recall, “synchrony” within the context of distributed computing is really just making assumptions about time, so synchronization is basically two or more processes coordinating around time. As we saw, a system which performs no coordination will have optimal performance and availability since everyone can proceed independently. However, a distributed system which performs zero coordination isn’t particularly useful or possible as I-confluence shows. Christopher Meiklejohn’s Strange Loop talk, Distributed, Eventually Consistent Computations, provides an interesting take on coordination with the parable of the car. A car requires friction to drive, but that friction is limited to very small contact points. Any other friction on the car causes problems or inefficiencies. If we think about physical time as friction, we know we can’t eliminate it altogether because it’s essential to the problem, but we want to reduce the use of it in our systems as much as possible. We can typically avoid relying on physical time by instead using logical time, for example, with the use of Lamport clocks or other conflict-resolution techniques. Lamport’s Time, Clocks, and the Ordering of Events in a Distributed System is the classical introduction to this idea.

Often, systems simply forgo coordination altogether for latency-sensitive operations, a perfectly reasonable thing to do provided the trade-off is explicit and well-documented. Sadly, this is frequently not the case. But we can do better. I-confluence provides a useful framework for avoiding coordination, but there’s a seemingly larger lesson to be learned here. What it really advocates is reexamining how we design systems, which seems in some ways to closely parallel our end-to-end argument.

When we think low level, we pay the upfront cost of entry—serializable transactions, linearizable reads and writes, coordination. This seems contradictory to the end-to-end principle. Our application doesn’t really care about atomicity or isolation levels or linearizability. It cares about two users sharing the same ID or two reservations booking the same room or a negative balance in a bank account, but the database doesn’t know that. Sometimes these rules don’t even require any expensive coordination.

If all we do is code our business rules and constraints into the language our infrastructure understands, we end up with a few problems. First, we have to know how to translate our application semantics into these low-level operations while avoiding any impedance mismatch. In the context of messaging, guaranteed delivery doesn’t really mean anything to our application which cares about what’s done with the messages. Second, we preclude ourselves from using a lot of generalized solutions and, in some cases, we end up having to engineer specialized ones ourselves. It’s not clear how well this scales in practice. Third, we pay a performance penalty that could otherwise be avoided (as I-confluence shows). Lastly, we put ourselves at the mercy of our infrastructure and hope it makes good on its promises—it often doesn’t.

Working on a messaging platform team, I’ve had countless conversations which resemble the following exchange:

Developer: “We need fast messaging.”
Me: “Is it okay if messages get dropped occasionally?”
Developer: “What? Of course not! We need it to be reliable.”
Me: “Okay, we’ll add a delivery ack, but what happens if your application crashes before it processes the message?”
Developer: “We’ll ack after processing.”
Me: “What happens if you crash after processing but before acking?”
Developer: “We’ll just retry.”
Me: “So duplicate delivery is okay?”
Developer: “Well, it should really be exactly-once.”
Me: “But you want it to be fast?”
Developer: “Yep. Oh, and it should maintain message ordering.”
Me: “Here’s TCP.”

If, instead, we reevaluate the interactions between our systems, their APIs, their semantics, and move some of that responsibility off of our infrastructure and onto our applications, then maybe we can start to build more robust, resilient, and performant systems. With messaging, does our infrastructure really need to enforce FIFO ordering? Preserving order with distributed messaging in the presence of failure while trying to simultaneously maintain high availability is difficult and expensive. Why rely on it when it can be avoided with commutativity? Likewise, transactional delivery requires coordination which is slow and brittle while still not providing application guarantees. Why rely on it when it can be avoided with idempotence and retries? If you need application-level guarantees, build them into the application level. The infrastructure can’t provide it.

I really like Gregor Hohpe’s “Your Coffee Shop Doesn’t Use Two-Phase Commit” because it shows how simple solutions can be if we just model them off of the real world. It gives me hope we can design better systems, sometimes by just turning things on their head. There’s usually a reason things work the way they do, and it often doesn’t even involve the use of computers or complicated algorithms.

Rather than try to hide complexities by using flaky and heavy abstractions, we should engage directly by recognizing them in our design decisions and thinking end to end. It may be a long and winding path to distributed systems zen, but the best place to start is from the beginning.

I’d like to thank Tom Santero for reviewing an early draft of this writing. Any inaccuracies or opinions expressed are mine alone.

Infrastructure Engineering in the 21st Century

Infrastructure engineering is an inherently treacherous problem space because it’s core to so many things. Systems today are increasingly distributed and increasingly complex but are built on unreliable components and will continue to be. This includes unreliable networks and faulty hardware. The 21st century engineer understands failure is routine.

Naturally, application developers would rather not have to think about low-level failure modes so they can focus on solving the problem at hand. Infrastructure engineers are then tasked with competing goals: provide enough abstraction to make application development tractable and provide enough reliability to make subsystems useful. The second goal often comes with an additional proviso in that there must be sufficient reliability without sacrificing performance to the point of no longer being useful. Anyone who has worked on enterprise messaging systems can tell you that these goals are often contradictory. The result is a wall of sand intended to keep the developer’s feet dry from the incoming tide. The 21st century engineer understands that in order to play in the sand, we all need to be comfortable getting our feet a little wet from time to time.

With the deluge of technology becoming available today, it’s tempting to introduce it all into your stack. Likewise, engineers are never happy. Left unchecked, we will hyper optimize and iterate into oblivion. It’s a problem I call “innovating to a fault.” Relying on “it’s done when it’s done” is a great way to ship vaporware. Have tangible objectives, make them high-level, and realize things change and evolve over time. Frame the concrete things you’re doing today within the context of those objectives. There’s a difference between Agile micromanagey roadmaps and having a clear vision. Determine when to innovate and when not to. Not Invented Here syndrome can be a deadly disease. Take inventory of what’s being built, make sure it ties back to your objectives, and avoid falling prey to tech pop culture. Optimize for the right problems. The 21st century engineer understands that you are not defined by your tools, you are defined by what you produce at the end of the day.

The prevalence of microservice architecture has made production tooling and instrumentation more important than ever. Teams should take ownership of their systems. If you’re not willing to stand by your work, don’t ship it. However, just because something falls outside of your system’s boundaries doesn’t mean it’s not your problem. If you rely on it, own it. Don’t be afraid to roll up your sleeves and dive into someone else’s code. The 21st century engineer understands that they live and die by the code they have in production, and if they don’t run anything in production, they aren’t really an engineer at all.

The way in which we design systems today is different from the way we designed them in the 20th century and the way we will design them in the future. There is a vast amount of research that has gone into computer science and related fields dating back to the invention of the modern computer. Research from the 50’s, 60’s, all the way up to today shows that system design always is an evolving process. Compiling this body of knowledge together provides an invaluable foundation from which we can build. The 21st century engineer understands that without a deeper understanding of that foundation or with a blind trust, we are only as good as our sand castle.

It’s our responsibility as software engineers, as system designers, as programmers to use this knowledge. Our job is not to build systems or write code, our job is to solve problems, of which code is often a byproduct. No one cares about the code you write, they care about the problems you solve. More specifically, they care about the business problems you solve. The 21st century engineer understands that if we’re not thinking about our solutions end to end, we’re not really doing our job.

Engage to Assuage

Abstraction is important. It’s how humans deal with complexity. You shouldn’t have to understand every little intricate detail behind how your system works. It would take years to do so. But abstraction comes at a cost. You agree to the abstraction’s interface, you place your trust in it, and then you remove it from your mind. That is, until it fails—and abstractions of sufficient complexity will fail. After all, we are building atop unreliable components. Also, a layer of abstraction doesn’t provide any guarantees in higher levels above it, which often results in some false assumptions.

We cannot understand how everything will work, but we should have enough understanding of how it will not work. More plainly, we should understand the cost of the abstractions we use so that we can pay for them with confidence. This doesn’t mean giving up on abstraction but engaging with the complexity that it manages.

I’ve written before about how distributed systems are a UX problem. They’re also a design problem. And a development problem. And an ops problem. And a business problem. The point is they are everyone’s problem because they are complex, and things that are sufficiently complex eventually leak. There is no airtight abstraction in this world. Without understanding limitations and trade-offs, without using the knowledge and research that has come before us, without thinking end to end, we set ourselves up for failure. If we’re going to call ourselves engineers, let’s start acting like it. Nothing is a black box to the 21st century engineer.