CAP and the Illusion of Choice

The CAP theorem is widely discussed and often misunderstood within the world of distributed systems. It states that any networked, shared-data system can, at most, guarantee two of three properties: consistency, availability, and partition tolerance. I won’t go into detail on CAP since the literature is abundant, but the notion of “two of three”—while conceptually accessible—is utterly misleading. Brewer has indicated this, echoed by many more, but there still seems to be a lot of confusion when the topic is brought up. The bottom line is you can’t sacrifice partition tolerance, but it seems CAP is a bit more nuanced than that.

On the surface, CAP presents three categories of systems. CA implies one which maintains consistency and availability given a perfectly reliable network. CP provides consistency and partition tolerance at the expense of availability, and AP gives us availability and partition tolerance without linearizability. Clearly, CA suggests that the system guarantees consistency and availability only when there are no network partitions. However, to say that there will never be network partitions is blatantly dishonest. This is where the source of much confusion lies.

Partitions happen. They happen for countless reasons. Switches fail, NICs fail, link layers fail, servers fail, processes fail. Partitions happen even when systems don’t fail due to GC pauses or prolonged I/O latency for example. Let’s accept this as fact and move on. What this means is that a “CA” system is CA only until it’s not. Once that partition happens, all your assumptions and all your guarantees hit the fan in spectacular fashion. Where does this leave us?

At its core, CAP is about trade-offs, but it’s an exclusion principle. It tells us what our systems cannot do given the nature of reality. The distinction here is that not all systems fit nicely into these archetypes. If Jepsen has taught us anything, it’s that the majority of systems don’t fit into any of these categories, even when the designers state otherwise. CAP isn’t as black and white as people paint it.

There’s a really nice series on CAP written recently by Nicolas Liochon. It does an excellent job of explaining the terminology (far better than I could), which is often overloaded and misused, and it makes some interesting points. Nicolas suggests that CA should really be thought of as a specification for an operating range, while CP and AP are descriptions of behavior. I would tend to agree, but my concern is that this eschews the trade-off that must be made.

We know that we cannot avoid network partition. What if we specify our application like this: “this application does not handle network partition. If it happens, the application will be partly unavailable, the data may be corrupted, and you may have to fix the data manually.” In other words, we’re really asking to be CA here, but if a partition occurs we may be CP, or, if we are unlucky, both not available and not consistent.

As an operating range, CA basically means when a partition occurs, the system throws up its hands and says, “welp, see ya later!” If we specify that the system does not work well under network partitions, we’re saying partitions are outside its operating range. What good is a specification for a spaceship designed to fly the upper atmosphere of planet Terah when we’re down here on Earth? We live in a world where partitions are the norm, so surely we need to include them in our operating range. CA does specify an operating range, but it’s not one you can put in an SLA and hand to a customer. Colloquially, it’s just a mode of “undefined behavior”—the system is consistent and available—until it’s not.

CAP isn’t a perfect metaphor, but in my mind, it does a decent job of highlighting the fundamental trade-offs involved in building distributed systems. Either we have linearizable writes or we don’t. If we do, we can’t guarantee availability. It’s true that CAP seems to imply a binary choice between consistency and availability in the face of partitions. In fact, it’s not a binary choice. You have AP, CP, or None of the Above. The problem with None of the Above is that it’s difficult to reason about and even more difficult to define. Ultimately, it ends up being more an illusion of choice since we cannot sacrifice partition tolerance.

Writing Good Code

There’s no shortage of people preaching the importance of good code. Indeed, many make a career of it. The resources available are equally endless, but lately I’ve been wondering how to extract the essence of building high-quality systems into a shorter, more concise narrative. This is actually something I’ve thought about for a while, but I’m just now starting to formulate some ideas into a blog post. The ideas aren’t fully developed, but my hope is to flesh them out further in the future. You can talk about design patterns, abstraction, encapsulation, and cohesion until you’re blue in the face, but what is the essence of good code?

Like any other engineering discipline, quality control is a huge part of building software. This isn’t just ensuring that it “works”—it’s ensuring it works under the complete range of operating conditions, ensuring it’s usable, ensuring it’s maintainable, ensuring it performs well, and ensuring a number of other characteristics. Verifying it “works” is just a small part of a much larger picture. Anybody can write code that works, but there’s more to it than that. Software is more malleable than most other things. Not only does it require longevity, it requires giving in to that malleability. If it doesn’t, you end up with something that’s brittle and broken. Because of this, it’s vital we test for correctness and measure for quality.

SCRAP for Quality

Quality is a very subjective thing. How can one possibly measure it? Code complexity and static analysis tooling come to mind, and these are deservedly valued, but it really just scratches the surface. How do we narrow an immensely broad topic like “quality” into a set of tangible, quantifiable goals? This is really the crux of the problem, but we can start by identifying a sort of checklist or guidelines for writing software. This breaks that larger problem into smaller, more digestible pieces. The checklist I’ve come up with is called SCRAP, an acronym defined below. It’s unlikely to be comprehensive, but I think it covers most, if not all, of the key areas.

Scalability Plan for growth
Complexity Plan for humans
Resiliency Plan for failure
API Plan for integration
Performance Plan for execution

Each of these items is itself a blog post, so this is only a brief explanation. There is definitely overlap between some of these facets, and there are also multiple dimensions to some.

Scalability is a plan for growth—in code, in organization, in architecture, and in workload. Without it, you reach a point where your system falls over, whether it’s because of a growing userbase, a growing codebase, or any number of other reasons. It’s also worth pointing out that without the ‘S’, all you have is CRAP. This also helps illustrate some of the overlap between these areas of focus as it leads into Complexity, which is a plan for humans. Scalability is about technology scale and demand scale, but it’s also about people scale. As your team grows or as your company grows, how do you manage that growth at the code level?

Planning for people doesn’t just mean managing growth, it also means managing complexity. If code is overly complex, it’s difficult to maintain, it’s difficult to extend, and it’s difficult to fix. If systems are overly complex, they’re difficult to deploy, difficult to manage, and difficult to monitor. Plan for humans, not machines.

Resiliency is a strategy for fault tolerance. It’s a plan for failure. What happens when you crash? What happens when a service you depend on crashes? What happens when the database is unavailable? What happens when the network is unreliable? Systems of all kind need to be designed with the expectation of failure. If you’re not thinking about failure at the code level, you’re not thinking about it enough.

One thing you should be noticing is that “people” is a cross-cutting concern. After all, it’s people who design the systems, and it’s people who write the code. While API is a plan for integration, it’s people who integrate the pieces. This is about making your API a first-class citizen. It doesn’t matter if it’s an internal API, a library API, or a RESTful API. It doesn’t matter if it’s for first parties or third parties. As a programmer, your API is your user interface. It needs to be clean. It needs to be sensible. It needs to be well-documented. If those integration points aren’t properly thought out, the integration will be more difficult than it needs to be.

The last item on the checklist is Performance. I originally defined this as a plan for speed, but I realized there’s a lot more to performance than doing things fast. It’s about doing things well, which is why I call Performance a plan for execution. Again, this has some overlap with Resiliency and Scalability, but it’s also about measurement. It’s about benchmarking and profiling. It’s about testing at scale and under failure because testing in a vacuum doesn’t mean much. It’s about optimization.

This brings about the oft-asked question: how do I know when and where to optimize? While premature optimization might be the root of all evil, it’s not a universal law. Optimize along the critical path and outward from there only as necessary. The further you get from that critical path, the more wasted effort it’s going to end up being. It depreciates quickly, so don’t lose sight of your optimization ROI. This will enable you to ship quickly and ship quality code. But once you ship, you’re not done measuring! It’s more important than ever that you continue to measure in production. Use performance and usage-pattern data to drive intelligent decisions and intelligent iteration. The payoff is that this doesn’t just apply to code decisions, it applies to all decisions. This is where the real value of measuring comes through. Decisions that aren’t backed by data aren’t decisions, they’re impulses. Don’t be impulsive, be empirical.

Going Forward

There is work to be done with respect to quantifying the items on this checklist. However, I strongly suspect even just thinking about them, formally or informally, will improve the overall quality of your code by an equally-unmeasurable order of magnitude. If your code doesn’t pass this checklist, it’s tech debt. Sometimes that’s okay, but remember that tech debt has compounding interest. If you don’t pay it off, you will eventually go bankrupt.

It’s not about being a 10x developer. It’s about being a 1x developer who writes 10x code. By that I mean the quality of your code is far more important than its quantity. Quality will outlast and outperform quantity. These guidelines tend to have a ripple effect. Legacy code often breeds legacy-like code. Instilling these rules in your developer culture helps to make engineers cognizant of when they should break the mold, introduce new patterns, or improve existing ones. Bad code begets bad code, and bad code is the atrophy of good developers.

You Cannot Have Exactly-Once Delivery

I’m often surprised that people continually have fundamental misconceptions about how distributed systems behave. I myself shared many of these misconceptions, so I try not to demean or dismiss but rather educate and enlighten, hopefully while sounding less preachy than that just did. I continue to learn only by following in the footsteps of others. In retrospect, it shouldn’t be surprising that folks buy into these fallacies as I once did, but it can be frustrating when trying to communicate certain design decisions and constraints.

Within the context of a distributed system, you cannot have exactly-once message delivery. Web browser and server? Distributed. Server and database? Distributed. Server and message queue? Distributed. You cannot have exactly-once delivery semantics in any of these situations.

As I’ve described in the past, distributed systems are all about trade-offs. This is one of them. There are essentially three types of delivery semantics: at-most-once, at-least-once, and exactly-once. Of the three, the first two are feasible and widely used. If you want to be super anal, you might say at-least-once delivery is also impossible because, technically speaking, network partitions are not strictly time-bound. If the connection from you to the server is interrupted indefinitely, you can’t deliver anything. Practically speaking, you have bigger fish to fry at that point—like calling your ISP—so we consider at-least-once delivery, for all intents and purposes, possible. With this model of thinking, network partitions are finitely bounded in time, however arbitrary this may be.

So where does the trade-off come into play, and why is exactly-once delivery impossible? The answer lies in the Two Generals thought experiment or the more generalized Byzantine Generals Problem, which I’ve looked at extensively. We must also consider the FLP result, which basically says, given the possibility of a faulty process, it’s impossible for a system of processes to agree on a decision.

In the letter I mail you, I ask you to call me once you receive it. You never do. Either you really didn’t care for my letter or it got lost in the mail. That’s the cost of doing business. I can send the one letter and hope you get it, or I can send 10 letters and assume you’ll get at least one of them. The trade-off here is quite clear (postage is expensive!), but sending 10 letters doesn’t really provide any additional guarantees. In a distributed system, we try to guarantee the delivery of a message by waiting for an acknowledgement that it was received, but all sorts of things can go wrong. Did the message get dropped? Did the ack get dropped? Did the receiver crash? Are they just slow? Is the network slow? Am slow? FLP and the Two Generals Problem are not design complexities, they are impossibility results.

People often bend the meaning of “delivery” in order to make their system fit the semantics of exactly-once, or in other cases, the term is overloaded to mean something entirely different. State-machine replication is a good example of this. Atomic broadcast protocols ensure messages are delivered reliably and in order. The truth is, we can’t deliver messages reliably and in order in the face of network partitions and crashes without a high degree of coordination. This coordination, of course, comes at a cost (latency and availability), while still relying on at-least-once semantics. Zab, the atomic broadcast protocol which lays the foundation for ZooKeeper, enforces idempotent operations.

State changes are idempotent and applying the same state change multiple times does not lead to inconsistencies as long as the application order is consistent with the delivery order. Consequently, guaranteeing at-least once semantics is sufficient and simplifies the implementation.

“Simplifies the implementation” is the authors’ attempt at subtlety. State-machine replication is just that, replicating state. If our messages have side effects, all of this goes out the window.

We’re left with a few options, all equally tenuous. When a message is delivered, it’s acknowledged immediately before processing. The sender receives the ack and calls it a day. However, if the receiver crashes before or during its processing, that data is lost forever. Customer transaction? Sorry, looks like you’re not getting your order. This is the worldview of at-most-once delivery. To be honest, implementing at-most-once semantics is more complicated than this depending on the situation. If there are multiple workers processing tasks or the work queues are replicated, the broker must be strongly consistent (or CP in CAP theorem parlance) so as to ensure a task is not delivered to any other workers once it’s been acked. Apache Kafka uses ZooKeeper to handle this coordination.

On the other hand, we can acknowledge messages after they are processed. If the process crashes after handling a message but before acking (or the ack isn’t delivered), the sender will redeliver. Hello, at-least-once delivery. Furthermore, if you want to deliver messages in order to more than one site, you need an atomic broadcast which is a huge burden on throughput. Fast or consistent. Welcome to the world of distributed systems.

Every major message queue in existence which provides any guarantees will market itself as at-least-once delivery. If it claims exactly-once, it’s because they are lying to your face in hopes that you will buy it or they themselves do not understand distributed systems. Either way, it’s not a good indicator.

RabbitMQ attempts to provide guarantees along these lines:

When using confirms, producers recovering from a channel or connection failure should retransmit any messages for which an acknowledgement has not been received from the broker. There is a possibility of message duplication here, because the broker might have sent a confirmation that never reached the producer (due to network failures, etc). Therefore consumer applications will need to perform deduplication or handle incoming messages in an idempotent manner.

The way we achieve exactly-once delivery in practice is by faking it. Either the messages themselves should be idempotent, meaning they can be applied more than once without adverse effects, or we remove the need for idempotency through deduplication. Ideally, our messages don’t require strict ordering and are commutative instead. There are design implications and trade-offs involved with whichever route you take, but this is the reality in which we must live.

Rethinking operations as idempotent actions might be easier said than done, but it mostly requires a change in the way we think about state. This is best described by revisiting the replicated state machine. Rather than distributing operations to apply at various nodes, what if we just distribute the state changes themselves? Rather than mutating state, let’s just report facts at various points in time. This is effectively how Zab works.

Imagine we want to tell a friend to come pick us up. We send him a series of text messages with turn-by-turn directions, but one of the messages is delivered twice! Our friend isn’t too happy when he finds himself in the bad part of town. Instead, let’s just tell him where we are and let him figure it out. If the message gets delivered more than once, it won’t matter. The implications are wider reaching than this, since we’re still concerned with the ordering of messages, which is why solutions like commutative and convergent replicated data types are becoming more popular. That said, we can typically solve this problem through extrinsic means like sequencing, vector clocks, or other partial-ordering mechanisms. It’s usually causal ordering that we’re after anyway. People who say otherwise don’t quite realize that there is no now in a distributed system.

To reiterate, there is no such thing as exactly-once delivery. We must choose between the lesser of two evils, which is at-least-once delivery in most cases. This can be used to simulate exactly-once semantics by ensuring idempotency or otherwise eliminating side effects from operations. Once again, it’s important to understand the trade-offs involved when designing distributed systems. There is asynchrony abound, which means you cannot expect synchronous, guaranteed behavior. Design for failure and resiliency against this asynchronous nature.

If State Is Hell, SOA Is Satan

More and more companies are describing their success stories regarding the switch to a service-oriented architecture. As with any technological upswing, there’s a clear and palpable hype factor involved (Big Data™ or The Cloud™ anyone?), but obviously it’s not just puff.

While microservices and SOA have seen a staggering rate of adoption in recent years, the mindset of developers often seems to be stuck in the past. I think this is, at least in part, because we seek a mental model we can reason about. It’s why we build abstractions in the first place. In a sense, I would argue there’s a comparison to be made between the explosion of OOP in the early 90’s and today’s SOA trend. After all, SOA is as much about people scale as it is about workload scale, so it makes sense from an organizational perspective.

The Perils of Good Abstractions

While systems are becoming more and more distributed, abstractions are attempting to make them less and less complex. Mesosphere is a perfect example of this, attempting to provide the “datacenter operating system.” Apache Mesos allows you to “program against your datacenter like it’s a single pool of resources.” It’s an appealing proposition to say the least. PaaS like Google App Engine and Heroku offer similar abstractions—write your code without thinking about scale. The problem is you absolutely have to think about scale or you’re bound to run into problems down the road. And while these abstractions are nice, they can be dangerous just the same. Welcome to the perils of good abstractions.

I like to talk about App Engine because I have firsthand experience with it. It’s an easy sell for startups. It handles spinning up instances when you need them, turning them down when you don’t. It’s your app server, database, caching, job scheduler, task queue all in one, and it does it at scale. There’s vendor lock-in, sure, yet it means no ops, no sysadmins, no overhead. Push to deploy. But it’s a leaky abstraction. It has to be. App Engine scales because it’s distributed, but it allows—no, encourages—you to write your system as a monolith. The datastore, memcache, and task queue accesses are masked as RPCs. This is great for our developer mental model, but it will bite you if you’re not careful. App Engine imposes certain limitations to encourage good design; for instance, front-end requests and datastore calls are limited to 60 seconds (it used to be much less), but the leakiness goes beyond that.

RPC is consistently at odds with distributed systems. I would go so far as to say it’s an anti-pattern in many cases. RPC encourages writing synchronous code, but distributed systems are inherently asynchronous. The network is not reliable. The network is not fast. The network is not your friend. Developers who either don’t understand this or don’t realize what’s happening when they make an RPC will write code as if they were calling a function. It will sure as hell look like just calling a function. When we think synchronously, we end up with systems that are slow, fault intolerant, and generally not scalable. To be quite honest, however, this is perfectly acceptable for 90% of startups as they are getting off the ground because they don’t have workloads at meaningful scale.

There’s certainly some irony here. One of the selling points of App Engine is its ability to scale to large amounts of traffic, yet the vast majority of startups would be perfectly suited to scaling up rather than out, perhaps with some failover in place for good measure. Stack Overflow is the poster child of scale-up architecture. In truth, your architecture should be a function of your access patterns, not the other way around (and App Engine is very much tailored to a specific set of access patterns). Nonetheless, it shows that vertical scaling can work. I would bet a lot of startups could sufficiently run on a large, adequately specced machine or maybe a small handful of them.

The cruel irony is that once you hit a certain scale with App Engine, both in terms of your development organization and user base, you’ve reached a point where you have to migrate off it. And if your data model isn’t properly thought out, you will without a doubt hit scale problems. It’s to the point where you need someone with deep knowledge of how App Engine works in order to build quality systems on it. Good luck hiring a team of engineers who understand it. GAE is great at accelerating you to 100 mph, but you better have some nice airbags for the brick wall it launches you into. In fairness, this is a problem every org hits—Conway’s law is very much a reality and every startup has growing pains. To be clear, this isn’t a jab at GAE, which is actually very effective at accelerating a product using little capital and can sustain long-term success given the right use case. Instead, I use it to illustrate a point.

Peering Through the Abstraction

Eventually SOA makes sense, but our abstractions can cause problems if we don’t understand what’s going on behind the curtain (hence the leakiness). Partial failure is all but guaranteed, and latency, partitioning, and other network pressure happens all the time.

Ken Arnold is famed with once saying “state is hell” in reference to designing distributed systems. In the past, I’ve written how scaling shared data is hard, but with SOA it’s practically a requirement. Ken is right though—state is hell, and SOA is fundamentally competing with consistency. The FLP Impossibility result and the CAP theorem can prove it formally, but really this should be intuitively obvious if we accept the laws of physics.

On the other hand, if you store information that I can’t reconstruct, then a whole host of questions suddenly surface. One question is, “Are you now a single point of failure?” I have to talk to you now. I can’t talk to anyone else. So what happens if you go down?

To deal with that, you could be replicated. But now you have to worry about replication strategies. What if I talk to one replicant and modify some data, then I talk to another? Is that modification guaranteed to have already arrived there? What is the replication strategy? What kind of consistency do you need—tight or loose? What happens if the network gets partitioned and the replicants can’t talk to each other? Can anybody proceed?

Essentially, the more stateful your system is, the harder it’s going to be to scale it because distributing that state introduces a rich tapestry of problems. In practice, we often can’t eliminate state wholesale, but basically everything that can be stateless should be stateless.

Making servers disposable allows you a great deal of flexibility. Former Netflix Cloud Architect Adrian Cockcroft articulates this idea well:

You want to think of servers like cattle, not pets. If you have a machine in production that performs a specialized function, and you know it by name, and everyone gets sad when it goes down, it’s a pet. Instead you should think of your servers like a herd of cows. What you care about is how many gallons of milk you get. If one day you notice you’re getting less milk than usual, you find out which cows aren’t producing well and replace them.

This is effectively how App Engine achieves its scalability. With lightweight, stateless, and disposable instances, it can spin them up and down on the fly without worrying about being in an invalid state.

App Engine also relies on eventual consistency as the default model for datastore interactions. This makes queries fast and highly available, while snapshot isolation can be achieved using entity-group transactions if necessary. The latter, of course, can result in a lot of contention and latency. Yet, people seem to have a hard time grappling with the reality of eventual consistency in distributed systems. State is hell, but calling SOA “satan” is clearly a hyperbole. It is a tough problem nevertheless.

A State of Mind

In the situations where we need state, we have to reconcile with the realities of distributed systems. This means understanding the limitations and accepting the complexities, not papering over them. It doesn’t mean throwing away abstractions. Fortunately, distributed computing is the focus of a lot of great research, so there are primitives with which we can build: immutability, causal ordering, eventual consistency, CRDTs, and other ideas.

As long as we recognize the trade-offs, we can design around them. The crux is knowing they exist in the first place. We can’t have ACID semantics while remaining highly available, but we can use Highly Available Transactions to provide strong-enough guarantees. At the same time, not all operations require coordination or concurrency control. The sooner we view eventual consistency as a solution and not a consequence, the sooner we can let go of this existential crisis. Other interesting research includes BOOM, which seeks to provide a high-level, declarative approach to distributed programming.

State might be hell, but it’s a hell we have to live. I don’t advocate an all-out microservice architecture for a company just getting its start. The complications far outweigh any benefits to be gained, but it becomes a necessity at a certain point. The key is having an exit strategy. PaaS providers make this difficult due to vendor lock-in and architectural constraints. Weigh their advantages carefully.

Once you do transition to a SOA, make as many of those services, or the pieces backing them, as stateless as possible. For those which aren’t stateless, know that the problem typically isn’t novel. These problems have been solved or are continuing to be solved in new and interesting ways. Academic research is naturally at the bleeding edge with industry often lagging behind. OOP concepts date back to as early as the 60’s but didn’t gain widespread adoption until several decades later. Distributed computing is no different. SOA is just a state of mind.

Stream Processing and Probabilistic Methods: Data at Scale

Stream processing and related abstractions have become all the rage following the rise of systems like Apache Kafka, Samza, and the Lambda architecture. Applying the idea of immutable, append-only event sourcing means we’re storing more data than ever before. However, as the cost of storage continues to decline, it’s becoming more feasible to store more data for longer periods of time. With immutability, how the data lives isn’t interesting anymore. It’s all about how it moves.

The shifting landscape of data architecture parallels the way we’re designing systems today. Specifically, the evolution of monolithic to service-oriented architecture necessitates a change in the way we do data integration. The traditional normalization approach doesn’t cut it. Our systems are composed of databases, caches, search indexes, data warehouses, and a multitude of other components. Moreover, there’s an increasing demand for online, real-time processing of this data that’s tantamount to the growing popularity of large-scale, offline processing along the lines of Hadoop. This presents an interesting set of new challenges, namely, how do we drink from the firehose without getting drenched?

The answer most likely lies in frameworks like Samza, Storm, and Spark Streaming. Similarly, tools like StatsD solve the problem of collecting real-time analytics. However, this discussion is meant to explore some of the primitives used in stream processing. The ideas extend far beyond event sourcing, generalizing to any type of data stream, unbounded or not.

Batch vs. Streaming

With offline or batch processing, we often have some heuristics which provide insight into our data set, and we can typically afford multiple passes of the data. Without a strict time constraint, data structures are less important. We can store the entire data set on disk (or perhaps across a distributed file system) and process it in batches.

With streaming data, however, there’s a need for near real-time processing—collecting analytics, monitoring and alerting, updating search indexes and caches, etc. With web crawlers, we process a stream of URLs and documents and produce indexed content. With websites, we process a stream of page views and update various counters and gauges. With email, we process a stream of text and produce a filtered, spam-free inbox. These cases involve massive, often limitless data sets. Processing that data online can’t be done with the conventional ETL or MapReduce-style methods, and unless you’re doing windowed processing, it’s entirely impractical to store that data in memory.

Framing the Problem

As a concrete example of stream processing, imagine we want to count the number of distinct document views across a large corpus, say, Wikipedia. A naive solution would be to use a hash table which maps a document to a count. Wikipedia has roughly 35 million pages. Let’s assume each document is identified by a 16-byte GUID, and the counters are stored as 8-byte integers. This means we need in the ball park of a gigabyte of memory. Given today’s hardware, this might not sound completely unreasonable, but now let’s say we want to track views per unique IP address. We see that this approach quickly becomes intractable.

To illustrate further, consider how to count the cardinality of IP addresses which access our website. Instead of a hash table, we can use a bitmap or sparse bit array to count addresses. There are over four billion possible distinct IPv4 addresses. Sure, we could allocate half a gigabyte of RAM, but if you’re developing performance-critical systems, large heap sizes are going to kill you, and this overhead doesn’t help.1

A Probabilistic Approach

Instead, we turn to probabilistic ways of solving these problems. Probabilistic algorithms and data structures seem to be oft-overlooked, or maybe purposely ignored, by system designers despite the theory behind them having been around for a long time in many cases. The goal should be to move these from the world of academia to industry because they are immensely useful and widely neglected.

Probabilistic solutions trade space and performance for accuracy. While a loss in precision may be a cause for concern to some, the fact is with large data streams, we have to trade something off to get real-time insight. Much like the CAP theorem, we must choose between consistency (accuracy) and availability (online). With CAP, we can typically adjust the level in which that trade-off occurs. This space/performance-accuracy exchange behaves very much the same way.

The literature can get pretty dense, so let’s look at what some of these approaches are and the problems they solve in a way that’s (hopefully) understandable.

Bloom Filters

The Bloom filter is probably the most well-known and, conceptually, simplest probabilistic data structure. It also serves as a good foundation because there are a lot of twists you can put on it. The theory provides a kernel from which many other probabilistic solutions are derived, as we will see.

Bloom filters answer a simple question: is this element a member of a set? A normal set requires linear space because it stores every element. A Bloom filter doesn’t store the actual elements, it merely stores the “membership” of them. It uses sub-linear space opening the possibility for false positives, meaning there’s a non-zero probability it reports an item is in the set when it’s actually not. This has a wide range of applications. For example, a Bloom filter can be placed in front of a database. When a query for a piece of data comes in and the filter doesn’t contain it, we completely bypass the database.


The Bloom filter consists of a bit array of length and hash functions. Both of these parameters are configurable, but we can optimize them based on a desired rate of false positives. Each bit in the array is initially unset. When an element is “added” to the filter, it’s hashed by each of the functions, h1…hk, and modded by m, resulting in indices into the bit array. Each bit at the respective index is set.


To query the membership of an element, we hash it and check if each bit is set. If any of them are zero, we know the item isn’t in the set. What happens when two different elements hash to the same index? This is where false positives are introduced. If the bits aren’t set, we know the element wasn’t added, but if they are, it’s possible that some other element or group of elements hashed to the same indices. Bloom filters have false positives, but false negatives are impossible—a member will never be reported incorrectly as a non-member. Unfortunately, this also means items can’t be removed from the filter.

It’s clear that the likelihood of false positives increases with each element added to the filter. We can target a specific probability of false positives by selecting an optimal value for m and k for up to n insertions.2 While this implementation works well in practice, it has a drawback in that some elements are potentially more sensitive to false positives than others. We can solve this problem by partitioning the m bits among the k hash functions such that each one produces an index over its respective partition. As a result, each element is always described by exactly bits. This prevents any one element from being especially sensitive to false positives. Since calculating k hashes for every element is computationally expensive, we can actually perform a single hash and derive k hashes from it for a significant speedup.3

A Bloom filter eventually reaches a point where all bits are set, which means every query will indicate membership, effectively making the probability of false positives one. The problem with this is it requires a priori knowledge of the data set in order to select optimal parameters and avoid “overfilling.” Consequently, Bloom filters are ideal for offline processing and not so great for dealing with streams. Next, we’ll look at some variations of the Bloom filter which attempt to deal with this issue.

Scalable Bloom Filter

As we saw earlier, traditional Bloom filters are a great way to deal with set-membership problems in a space-efficient way, but they require knowing the size of the data set ahead of time in order to be effective. The Scalable Bloom Filter (SBF) was introduced by Almeida et al. as a way to cope with the capacity dilemma.

The Scalable Bloom Filter dynamically adapts to the size of the data set while enforcing a tight upper bound on the rate of false positives. Like the classic Bloom filter, false negatives are impossible. The SBF is essentially an array of Bloom filters with geometrically decreasing false-positive rates. New elements are added to the last filter. When this filter becomes “full”—more specifically, when it reaches a target fill ratio—a new filter is added with a tightened error probability. A tightening ratio, r, controls the growth of new filters.


Testing membership of an element consists of checking each of the filters. The geometrically decreasing false-positive rate of each filter is an interesting property. Since the fill ratio determines when a new filter is added, it turns out this progression can be modeled as a Taylor series which allows us to provide a tight upper bound on false positives using an optimal fill ratio of 0.5 (once the filter is 50% full, a new one is added). The compounded probability over the whole series converges to a target value, even accounting for an infinite series.

We can limit the error rate, but obviously the trade-off is that we end up allocating memory proportional to the size of the data set since we’re continuously adding filters. We also pay some computational cost on adds. Amortized, the cost of filter insertions is negligible, but for every add we must compute the current fill ratio of the filter to determine if a new filter needs to be added. Fortunately, we can optimize this by computing an estimated fill ratio. If we keep a running count of the items added to the filter, n, the approximate current fill ratio, p, can be obtained from the Taylor series expansion with p \approx 1-e^{-n/m} where m is the number of bits in the filter. Calculating this estimate turns out to be quite a bit faster than computing the actual fill ratio—fewer memory reads.

To provide some context around performance, adds in my Go implementation of a classic Bloom filter take about 166 ns on my MacBook Pro, and membership tests take 118 ns. With the SBF, adds take 422 ns and tests 113 ns. This isn’t a completely fair comparison since, as the SBF grows over time, tests require scanning through each filter, but certainly the add numbers are intuitive.

Scalable Bloom Filters are useful for cases where the size of the data set isn’t known a priori and memory constraints aren’t of particular concern. It’s an effective variation of a regular Bloom filter which generally requires space allocation orders-of-magnitude larger than the data set to allow enough headroom.

Stable Bloom Filter

Classic Bloom filters aren’t great for streaming data, and Scalable Bloom Filters, while a better option, still present a memory problem. Another derivative, the Stable Bloom Filter (SBF), was proposed by Deng and Rafiei as a technique for detecting duplicates in unbounded data streams with limited space. Most solutions work by dividing the stream into fixed-size windows and solving the problem within that discrete space. This allows the use of traditional methods like the Bloom filter.

The SBF attempts to approximate duplicates without the use of windows. Clearly, we can’t store the entire history of an infinite stream in limited memory. Instead, the thinking is more recent data has more value than stale data in many scenarios. As an example, web crawlers may not care about redundantly fetching a web page that was crawled a long time ago compared to fetching a page that was crawled more recently. With this impetus, the SBF works by representing more recent data while discarding old information.

The Stable Bloom Filter tweaks the classic Bloom filter by replacing the bit array with an array of m cells, each allocated d bits. The cells are simply counters, initially set to zero. The maximum value, Max, of a cell is 2^d-1. When an element is added, we first have to ensure space for it. This is done by selecting P random cells and decrementing their counters by one, clamping to zero. Next, we hash the element to k cells and set their counters to Max. Testing membership consists of probing the k cells. If any of them are zero, it’s not a member. The Bloom filter is actually a special case of an SBF where d is one and P is zero.


Like traditional Bloom filters, an SBF has a non-zero probability of false positives, which is controlled by several parameters. Unlike the Bloom filter, an SBF has a tight upper bound on the rate of false positives while introducing a non-zero rate of false negatives. The false-positive rate of a classic Bloom filter eventually reaches one, after which all queries result in a false positive. The stable-point property of an SBF means the false-positive rate asymptotically approaches a configurable fixed constant.

Stable Bloom Filters lend themselves to situations where the size of the data set isn’t known ahead of time and memory is bounded. For example, an SBF can be used to deduplicate events from an unbounded event stream with a specified upper bound on false positives and minimal false negatives. In particular, if the stream is not uniformly distributed, meaning duplicates are likely to be grouped closer together, the rate of false positives becomes immaterial.

Counting Bloom Filter

Conventional Bloom filters don’t allow items to be removed. The Stable Bloom Filter, on the other hand, works by evicting old data, but its API doesn’t expose a way to remove specific elements. The Counting Bloom Filter (CBF) is yet another twist on this structure. Introduced by Fan et al. in Summary Cache: A Scalable Wide-Area Web Cache Sharing Protocol, the CBF provides a way of deleting data from a filter.

It’s actually a very straightforward approach. The filter comprises an array of n-bit buckets similar to the Stable Bloom Filter. When an item is added, the corresponding counters are incremented, and when it’s removed, the counters are decremented.

Obviously, a CBF takes n-times more space than a regular Bloom filter, but it also has a scalability limit. Unless items are removed frequently enough, a counting filter’s false-positive probability will approach one. We need to dimension it accordingly, and this normally requires inferring from our data set. Likewise, since the CBF allows removals, it exposes an opportunity for false negatives. The multi-bit counters diminish the rate, but it’s important to be cognizant of this property so the CBF can be applied appropriately.

Inverse Bloom Filter

One of the distinguishing features of Bloom filters is the guarantee of no false negatives. This can be a valuable invariant, but what if we want the opposite of that? Is there an efficient data structure that can offer us no false positives with the possibility of false negatives? The answer is deceptively simple.

Jeff Hodges describes a rather trivial—yet strikingly pragmatic—approach which he calls the “opposite of a Bloom filter.” Since that name is a bit of a mouthful, I’m referring to this as the Inverse Bloom Filter (IBF).

The Inverse Bloom Filter may report a false negative but can never report a false positive. That is, it may indicate that an item has not been seen when it actually has, but it will never report an item as seen which it hasn’t come across. The IBF behaves in a similar manner to a fixed-size hash map of m buckets which doesn’t handle conflicts, but it provides lock-free concurrency using an underlying CAS.

The test-and-add operation hashes an element to an index in the filter. We take the value at that index while atomically storing the new value, then the old value is compared to the new one. If they differ, the new value wasn’t a member. If the values are equivalent, it was in the set.


The Inverse Bloom Filter is a nice option for dealing with unbounded streams or large data sets due to its limited memory usage. If duplicates are close together, the rate of false negatives becomes vanishingly small with an adequately sized filter.


Let’s revisit the problem of counting distinct IP addresses visiting a website. One solution might be to allocate a sparse bit array proportional to the number of IPv4 addresses. Alternatively, we could combine a Bloom filter with a counter. When an address comes in and it’s not in the filter, increment the count. While these both work, they’re not likely to scale very well. Rather, we can apply a probabilistic data structure known as the HyperLogLog (HLL). First presented by Flajolet et al. in 2007, HyperLogLog is an algorithm which approximately counts the number of distinct elements, or cardinality, of a multiset (a set which allows multiple occurrences of its elements).

Imagine our stream as a series of coin tosses. Someone tells you the most heads they flipped in a row was three. You can guess that they didn’t flip the coin very many times. However, if they flipped 20 heads in a row, it probably took them quite a while. This seems like a really unconvincing way of estimating the number of tosses, but from this kernel of thought grows a fruitful idea.

HLL replaces heads and tails with zeros and ones. Instead of counting the longest run of heads, it counts the longest run of leading zeros in a binary hash. Using a good hash function means that the values are, more or less, statistically independent and uniformly distributed. If the maximum run of leading zeros is n, the number of distinct elements in the set is approximately 2^n. But wait, what’s the math behind this? If we think about it, half of all binary numbers start with 1. Each additional bit divides the probability of a run further in half; that is, 25% start with 01, 12.5% start with 001, 6.25% start with 0001, ad infinitum. Thus, the probability of a run of length n is 2^{-(n+1)}.

Like flipping a coin, there’s potential for an enormous amount of variance with this technique. For instance, it’s entirely possible—though unlikely—that you flip 20 heads in a row on your first try. One experiment doesn’t provide enough data, so instead you flip 10 coins. HyperLogLog uses a similar strategy to reduce variance by splitting the stream up across a set of buckets, or registers, and applying the counting algorithm on the values in each one. It tracks the longest run of zeros in every register, and the total cardinality is computed by taking the harmonic mean across all registers. The harmonic mean is used to discount outliers since the distribution of values tends to skew towards the right. We can increase accuracy by adding more registers, which of course comes at the expense of performance and memory.

HyperLogLog is a remarkable algorithm. It almost feels like magic, and it’s exceptionally useful for working with data streams. HLL has a fraction of the memory footprint of other solutions. In fact, it’s usually several orders of magnitude while remaining surprisingly accurate.

Count-Min Sketch

HyperLogLog is a great option to efficiently count the number of distinct elements in a stream using a minimal amount of space, but it only gives us cardinality. What about counting the frequencies of specific elements? Cormode and Muthukrishnan developed the Count-Min sketch (CM sketch) to approximate the occurrences of different event types from a stream of events.

In contrast to a hash table, the Count-Min sketch uses sub-linear space to count frequencies. It consists of a matrix with w columns and d rows. These parameters determine the trade-off between space/time constraints and accuracy. Each row has an associated hash function. When an element arrives, it’s hashed for each row. The corresponding index in the rows are incremented by one. In this regard, the CM sketch shares some similarities with the Bloom filter.


The frequency of an element is estimated by taking the minimum of all the element’s respective counter values. The thought process here is that there is possibility for collisions between elements, which affects the counters for multiple items.  Taking the minimum count results in a closer approximation.

The Count-Min sketch is an excellent tool for counting problems for the same reasons as HyperLogLog. It has a lot of similarities to Bloom filters, but where Bloom filters effectively represent sets, the CM sketch considers multisets.


The last probabilistic technique we’ll briefly look at is MinHash. This algorithm, invented by Andrei Broder, is used to quickly estimate the similarity between two sets. This has a variety of uses, such as detecting duplicate bodies of text and clustering or comparing documents.

MinHash works, in part, by using the Jaccard coefficient, a statistic which represents the size of the intersection of two sets divided by the size of the union:

J(A,B) = {{|A \cap B|}\over{|A \cup B|}}

This provides a measure of how similar the two sets are, but computing the intersection and union is expensive. Instead, MinHash essentially works by comparing randomly selected subsets through element hashing. It resembles locality-sensitive hashing (LSH), which means that similar items have a greater tendency to map to the same hash values. This varies from most hashing schemes, which attempt to avoid collisions. Instead, LSH is designed to maximize collisions of similar elements.

MinHash is one of the more difficult algorithms to fully grasp, and I can’t even begin to provide a proper explanation. If you’re interested in how it works, read the literature but know that there are a number of variations of it. The important thing is to know it exists and what problems it can be applied to.

In Practice

We’ve gone over several fundamental primitives for processing large data sets and streams while solving a few different types of problems. Bloom filters can be used to reduce disk reads and other expensive operations. They can also be purposed to detect duplicate events in a stream or prune a large decision tree. HyperLogLog excels at counting the number of distinct items in a stream while using minimal space, and the Count-Min sketch tracks the frequency of particular elements. Lastly, the MinHash method provides an efficient mechanism for comparing the similarity between documents. This has a number of applications, namely around identifying duplicates and characterizing content.

While there are no doubt countless implementations for most of these data structures and algorithms, I’ve been putting together a Go library geared towards providing efficient techniques for stream processing called Boom Filters. It includes implementations for all of the probabilistic data structures outlined above. As streaming data and real-time consumption grow more and more prominent, it will become important that we have the tools and understanding to deal with them. If nothing else, it’s valuable to be aware of probabilistic methods because they often provide accurate results at a fraction of the cost. These things aren’t just academic research, they form the basis for building online, high-performance systems at scale.

  1. Granted, if you’re sensitive to garbage-collection pauses, you may be better off using something like C or Rust, but that’s not always the most practical option. []
  2. The math behind determining optimal m and k is left as an exercise for the reader. []
  3. Less Hashing, Same Performance: Building a Better Bloom Filter discusses the use of two hash functions to simulate additional hashes. We can use a 64-bit hash, such as FNV, and use the upper and lower 32-bits as two different hashes. []

On Hireability and Recruiting

Developers deal with recruiter emails on a daily basis. It’s frustrating because it’s almost always a shotgun approach, but occasionally you get something that strays from the path and, delightfully, it stands out. It’s refreshing to have someone who has actually taken the time to determine your skill set, technology background, and how you spend your free time with respect to software. You feel like they at least have an inkling of who you are. On the other hand, it’s irritating to be bombarded by contract-to-hire offers which aren’t even remotely tailored to you. What’s worse is when it’s ten different emails from ten different people working for the same recruiting agency.

There are basically two types of software jobs: the ones which just need warm bodies—commodity developers who mostly need to code the spec handed down from the Powers That Be—and the ones which need individual contributors—people which are hired to solve hard problems.1 I seem to paint the first in a negative light, which isn’t actually my intention. I know plenty of developers who are perfectly content with these types of jobs. There’s nothing wrong with that, but sometimes the wires seemingly get crossed.

The problem, as I see it, is that the communications meant for one type of person are getting received by people who don’t identify. It’s a numbers game, so it boils down to lazy recruiting. Throw enough paper airplanes and eventually one will land in the lap of someone who cares—the rest will poke everyone else’s eyes out. This isn’t a humblebrag about how I’m such an Awesome Developer™ or a suggestion that we should paint everyone with a broad brush, it’s merely what I’ve observed. People are welcome to disagree and do so openly.

Many companies I would love to work at have pinged me after coming across my GitHub profile or reading my blog. Recruiters for jobs I would never work at have emailed me after seeing my LinkedIn. The split isn’t completely black and white—there are places I’d work who’ve contacted me through LinkedIn, and vice versa—but there’s definitely a strong correlation.2 The difference is that the former almost always receive an enthusiastic, well-thought-out response, while the latter goes straight to the trash. Doing otherwise would simply be too demoralizing. I’ve talked to other developers about this, and I know I’m not the only one. Bad recruiting is just causing people to be more jaded and cynical. I don’t know about other industries, but in software it seems to be particularly magnified.

The upshot is how to minimize the “bad communications” while maximizing the “good.” You can’t avoid bad recruiters. I suspect there are lists of names with email addresses and phone numbers which get circulated around staffing agencies.3 You can remove all contact information from your online persona, but then you make it a lot harder for the “good communications” to get through.

The best way to make yourself hireable isn’t to have a good resume and interviewing skills, it’s to do cool stuff in the open and talk about it. It can be scary because sometimes you crash and burn, but that’s just as valuable. Some find it terrifying to expose their work to the world. Half the reason I blog and share it publicly is to open my work and myself up to criticism. It stings sometimes, but you get over it. I wouldn’t be where I am today without people kicking me in the ass to do better. Other times, the community can actually learn and gain insight from your viewpoint and mindset. People are too modest.

Good developers aren’t doing this to get hired either, they’re doing it because they think it’s interesting in its own right. That’s the other half of why I blog (as well as to keep a journal for myself). The self-marketing is a byproduct. My side projects and blog posts have led to far more interesting conversations and opportunities than my resume or LinkedIn profile.

People should be more open to the idea of sharing their knowledge and work with the community. The benefits go both ways.

  1. These would be what Google refers to as smart creatives. []
  2. Granted, the sample size of recruiters emailing me through LinkedIn is probably an order-of-magnitude larger. []
  3. I wouldn’t be terribly surprised if some universities sold contact information for new graduates either. []

CS Literature of the Day

I read a lot of research papers and other nerdy computer science things in my spare time. I’m also a huge fan of Paper We Love, which is an awesome repository of academic CS papers and a community of people who read, share, and present them.

For the purposes of posterity and information-sharing, I thought it would be a good idea to share some of the nerdy things I read or watch like various papers, blog posts, and talks—all related to computer science. That’s why I’m tweeting a new piece of CS literature every day with the hashtag #CSLOTD and maintaining a GitHub repo containing that content called CS Literature of the Day.

To start, CSLOTD will just be literature which I come across myself and find interesting, but it’s completely open to contributors to share thought-provoking material. The repo won’t host any content but rather provide links to it.

I’m doing this because I’m enthusiastic about CS. I reserve the right to stop doing it at any time or miss days for whatever reason. I also tweet things that I think are interesting. If you don’t find them interesting, that’s your problem. As such, I reserve the right to reject submissions if I don’t find them interesting enough to include. I have pretty low standards though. :)

Fast, Scalable Networking in Go with Mangos

In the past, I’ve looked at nanomsg and why it’s a formidable alternative to the well-regarded ZeroMQ. Like ZeroMQ, nanomsg is a native library which markets itself as a way to build fast and scalable networking layers. I won’t go into detail on how nanomsg accomplishes this since my analysis of it already covers that fairly extensively, but instead I want to talk about a Go implementation of the protocol called Mangos.1 If you’re not familiar with nanomsg or Scalability Protocols, I recommend reading my overview of those first.

nanomsg is a shared library written in C. This, combined with its zero-copy API, makes it an extremely low-latency transport layer. While there are a lot of client bindings which allow you to use nanomsg from other languages, dealing with shared libraries can often be a pain—not to mention it complicates deployment.

More and more companies are starting to use Go for backend development because of its speed and concurrency primitives. It’s really good at building server components that scale. Go obviously provides the APIs needed for socket networking, but building a scalable distributed system that’s reliable using these primitives can be somewhat onerous. Solutions like nanomsg’s Scalability Protocols and ZeroMQ attempt to make this much easier by providing useful communication patterns and by taking care of other messaging concerns like queueing.

Naturally, there are Go bindings for nanomsg and ZeroMQ, but like I said, dealing with shared libraries can be fraught with peril. In Go (and often other languages), we tend to avoid loading native libraries if we can. It’s much easier to reason about, debug, and deploy a single binary than multiple. Fortunately, there’s a really nice implementation of nanomsg’s Scalability Protocols in pure Go called Mangos by Garrett D’Amore of illumos fame.

Mangos offers an idiomatic Go implementation and interface which affords us the same messaging patterns that nanomsg provides while maintaining compatibility. Pub/Sub, Pair, Req/Rep, Pipeline, Bus, and Survey are all there. It also supports the same pluggable transport model, allowing additional transports to be added (and extended2) on top of the base TCP, IPC, and inproc ones.3 Mangos has been tested for interoperability with nanomsg using the nanocat command-line interface.

One of the advantages of using a language like C is that it’s not garbage collected. However, if you’re using Go with nanomsg, you’re already paying the cost of GC. Mangos makes use of object pools in order to reduce pressure on the garbage collector. We can’t turn Go’s GC off, but we can make an effort to minimize pauses. This is critical for high-throughput systems, and Mangos tends to perform quite comparably to nanomsg.

Mangos (and nanomsg) has a very familiar, socket-like API. To show what this looks like, the code below illustrates a simple example of how the Pub/Sub protocol is used to build a fan-out messaging system.

My message queue test framework, Flotilla, uses the Req/Rep protocol to allow clients to send requests to distributed daemon processes, which handle them and respond. While this is a very simple use case where you could just as easily get away with raw TCP sockets, there are more advanced cases where Scalability Protocols make sense. We also get the added advantage of transport abstraction, so we’re not strictly tied to TCP sockets.

I’ve been building a distributed messaging system using Mangos as a means of federated communication. Pub/Sub enables a fan-out, interest-based broadcast and Bus facilitates many-to-many messaging. Both of these are exceptionally useful for connecting disparate systems. Mangos also supports an experimental new protocol called Star. This pattern is like Bus, but when a message is received by an immediate peer, it’s propagated to all other members of the topology.

My favorite Scalability Protocol is Survey. As I discussed in my nanomsg overview, there are a lot of really interesting applications of this. Survey allows a process to query the state of multiple peers in one shot. It’s similar to Pub/Sub in that the surveyor publishes a single message which is received by all the respondents (although there’s no topic subscriptions). The respondents then send a message back, and the surveyor collects these responses. We can also enforce a deadline on the respondent replies, which makes Survey particularly useful for service discovery.

With my messaging system, I’ve used Survey to implement a heartbeat protocol. When a broker spins up, it begins broadcasting a heartbeat using a Survey socket. New brokers can connect to existing ones, and they reply to the heartbeat which allows brokers to “discover” each other. If a heartbeat isn’t received before the deadline, the peer is removed. Mangos also handles reconnects, so if a broker goes offline and comes back up, peers will automatically reconnect.

To summarize, if you’re building distributed systems in Go, consider taking a look at Mangos. You can certainly roll your own messaging layer with raw sockets, but you’re going to end up writing a lot of logic for a robust system. Mangos, and nanomsg in general, gives you the right abstraction to quickly build systems that scale and are fast.

  1. Full disclosure: I am a contributor on the Mangos project, but only because I was a user first! []
  2. Mangos supports TLS with the TCP transport as an experimental extension. []
  3. A nanomsg WebSocket transport is currently in the works. []

Benchmark Responsibly

When I posted my Dissecting Message Queues article last summer, it understandably caused some controversy.  I received both praise and scathing comments, emails asking why I didn’t benchmark X and pull requests to bump the numbers of Y. To be honest, that analysis was more of a brain dump from my own test driving of various message queues than any sort of authoritative or scientific study—it was far from the latter, to say the least. The qualitative discussion was pretty innocuous, but the benchmarks and supporting code were the target of a lot of (valid) criticism. In retrospect, it was probably irresponsible to publish them, but I was young and naive back then; now I’m just mostly naive.

Comparing Apples to Other Assorted Fruit

One such criticism was that the benchmarks were divided into two very broad categories: brokerless and brokered. While the brokerless group compared two very similar libraries, ZeroMQ and nanomsg, the second group included a number of distinct message brokers like RabbitMQ, Kafka, NATS, and Redis, to name a few.

The problem is not all brokers are created equal. They often have different goals and different prescribed use cases. As such, they impose different guarantees, different trade-offs, and different constraints. By grouping these benchmarks together, I implied they were fundamentally equivalent, when in fact, most were fundamentally different. For example, NATS serves a very different purpose than Kafka, and Redis, which offers pub/sub messaging, typically isn’t thought of as a message broker at all.

Measure Right or Don’t Measure at All

Another criticism was the way in which the benchmarks were performed. The tests were immaterial. The producer, consumer, and the message queue itself all ran on the same machine. Even worse, they used just a single publisher and subscriber. Not only does it not test what a remotely realistic configuration looks like, but it doesn’t even give you a good idea of a trivial one.

To be meaningful, we need to test with more than one producer and consumer, ideally distributed across many machines. We want to see how the system scales to larger workloads. Certainly, the producers and consumers cannot be collocated when we’re measuring discrete throughputs on either end, nor should the broker. This helps to reduce confounding variables between the system under test and the load generation.

It’s Not Rocket Science, It’s Computer Science

The third major criticism lay with the measurements themselves. Measuring throughput is fairly straightforward: we look at the number of messages sent per unit of time at both the sender and the receiver. If we think of a pipe carrying water, we might look at a discrete cross section and the rate at which water passes through it.

Latency, as a concept, is equally simple. With the pipe, it’s the time it takes for a drop of water to travel from one end to the other. While throughput is dependent on the pipe’s diameter, latency is dependent upon its length. What this means is that we can’t derive one from the other. In order to properly measure latency, we need to consider the latency of each message sent through the system.

However, we can’t ignore the relationship between throughput and latency and what the compromise between them means. Generally, we want to make things as fast as possible. Consider a single-cycle CPU. Its latency per instruction will be extremely low but contrasted with a pipelined processor, its throughput is abysmal—one instruction per clock cycle. The implication is that if we trade per-operation latency for throughput, we actually get a decrease in latency for aggregate instructions. Unfortunately, the benchmarks eschewed this relationship by requiring separate latency and throughput tests which used different code paths.

The interaction between latency and throughput is easy to get confused, but it often has interesting ramifications, whether you’re looking at message queues, CPUs, or databases. In a general sense, we’d say “optimize for latency” because lower latency means higher throughput, but the reality is it’s almost always easier (and more cost-effective) to increase throughput than it is to decrease latency, especially on commodity hardware.

Capturing this data, in and of itself, isn’t terribly difficult, but what’s more susceptible to error is how it’s represented. This was the main fault of the benchmarks (in addition to the things described earlier). The most egregious thing they did was report latency as an average. This is like the cardinal sin of benchmarking. The number is practically useless, particularly without any context like a standard deviation.

We know that latency isn’t going to be uniform, but it’s probably not going to follow a normal distribution either. While network latency may be prone to fitting a nice bell curve, system latency almost certainly won’t. They often exhibit things like GC pauses and other “hiccups,” and averages tend to hide these.


Measuring performance isn’t all that easy, but if you do it, at least do it in a way that disambiguates the results. Look at quantiles, not averages. If you do present a mean, include the standard deviation and max in addition to the 90th or 99th percentile. Plotting latency by percentile distribution is an excellent way to see what your performance behavior actually looks like. Gil Tene has a great talk on measuring latency which I highly recommend.

Working Towards a Better Solution

With all this in mind, we can work towards building a better way to test and measure messaging systems. The discussion above really just gives us three key takeaways:

  1. Don’t compare apples to oranges.
  2. Don’t instrument tests in a way that’s not at all representative of real life.
  3. Don’t present results in a statistically insignificant way.

My first attempt at taking these ideas to heart is a tool I call Flotilla. It’s meant to provide a way to test messaging systems in more realistic configurations, at scale, while offering more useful data. Flotilla allows you to easily spin up producers and consumers on arbitrarily many machines, start a message broker, and run a benchmark against it, all in an automated fashion. It then collects data like producer/consumer throughput and the complete latency distribution and reports back to the user.

Flotilla uses a Go port of HdrHistogram to capture latency data, of which I’m a raving fan. HdrHistogram uses a bucketed approach to record values across a configured high-dynamic range at a particular resolution. Recording is in the single-nanosecond range and the memory footprint is constant. It also has support for correcting coordinated omission, which is a common problem in benchmarking. Seriously, if you’re doing anything performance sensitive, give HdrHistogram a look.

Still, Flotilla is not perfect and there’s certainly work to do, but I think it’s a substantial improvement over the previous MQ benchmarking utility. Longer term, it would be great to integrate it with something like Comcast to test workloads under different network conditions. Testing in a vacuum is nice and all, but we know in the real word, the network isn’t perfectly reliable.

So, Where Are the Benchmarks?

Omitted—for now, anyway. My goal really isn’t to rank a hodgepodge of different message queues because there’s really not much value in doing that. There are different use cases for different systems. I might, at some point, look at individual systems in greater detail, but comparing things like message throughput and latency just devolves into a hotly contested pissing contest. My hope is to garner more feedback and improvements to Flotilla before using it to definitively measure anything.

Benchmark responsibly.

Not Invented Here

Engineers love engineering things. The reason is self-evident (and maybe self-fulfilling—why else would you be an engineer?). We like to think we’re pretty good at solving problems. Unfortunately, this mindset can, on occasion, yield undesirable consequences which might not be immediately apparent but all the while damaging.

Developers are all in tune with the idea of “don’t reinvent the wheel,” but it seems to be eschewed sometimes, deliberately or otherwise. People don’t generally write their own merge sort, so why would they write their own consensus protocol? Anecdotally speaking, they do.

Not-Invented-Here Syndrome is a very real thing. In many cases, consciously or not, it’s a cultural problem. In others, it’s an engineering one. Camille Fournier’s blog post on ZooKeeper helps to illustrate this point and provide some context. In it, she describes why some distributed systems choose to rely on external services, such as ZooKeeper, for distributed coordination, while others build in their own coordination logic.

We draw a parallel between distributed systems and traditional RDBMSs, which typically implement their own file system and other low-level facilities. Why? Because it’s their competitive advantage. SQL databases sell because they offer finely tuned performance, and in order to do that, they need to control these things that the OS otherwise provides. Distributed databases like Riak sell because they own the coordination logic, which helps promote their competitive advantage. This follows what Joel Spolsky says about NIH Syndrome in that “if it’s a core business function—do it yourself, no matter what.”

If you’re developing a computer game where the plot is your competitive advantage, it’s OK to use a third party 3D library. But if cool 3D effects are going to be your distinguishing feature, you had better roll your own.

This makes a lot of sense. My sorting algorithm is unlikely to provide me with a competitive edge, but something else might, even if it’s not particularly novel.

So in some situations, homegrown is justifiable, but that’s not always the case. Redis’ competitive advantage is its predictably low latencies and data structures. Does it make sense for it to implement its own clustering and leader election protocols? Maybe, but this is where NIH can bite you. If what you’re doing is important and there’s precedent, lean on existing research and solutions. Most would argue write safety is important, and there is certainly precedent for leader election. Why not leverage that work? Things like Raft, Paxos, and Zab provide solutions which are proven using formal methods and are peer reviewed. That doesn’t mean new solutions can’t be developed, but they generally require model checking and further scrutiny to ensure correctness. Otherwise, you’ll inevitably run into problems. Implementing our own solutions can provide valuable insight, but leave them at home if they’re not rigorously approached. Rolling your own and calling it “good enough” is dishonest to your users if it’s not properly communicated.

Elasticsearch is another interesting case to look at. You might say Elasticsearch’s competitive advantage is its full-text search engine, but it’s not. Like Solr, it’s built on Lucene. Elasticsearch was designed from the ground-up to be distributed. This is what gives it a leg up over Solr and other similar search servers where horizontal scaling and fault tolerance were essentially tacked on. In a way, this resembles what happened with Redis, where failover and clustering were introduced as an afterthought. However, unlike Redis, which chose to implement its own failover coordination and cluster-membership protocol, Solr opted to use ZooKeeper as an external coordinator.

We see that Elasticsearch’s core advantage is its distributed nature. Following that notion, it makes sense for it to own that coordination, which is why its designers chose to implement their own internal cluster membership, ZenDisco. But it turns out writing cluster-membership protocols is really fucking hard, and unless you’ve written proofs for it, you probably shouldn’t do it at all. The analogy here would be writing your own encryption algorithm—there’s tons of institutional knowledge which has laid the groundwork for solutions which are well-researched and well-understood. That knowledge should be embraced in situations like this.

I don’t mean to pick on Redis and Elasticsearch. They’re both excellent systems, but they serve as good examples for this discussion. The problem is that users of these systems tend to overlook the issues exposed by this mentality. Frankly, few people would know problems exist unless they are clearly documented by vendors (and not sales people) and even then, how many people actually read the docs cover-to-cover? It’s essential we know a system’s shortcomings and edge cases so we can recognize which situations to apply it and, more important, which we should not.

You don’t have to rely on an existing third-party library or service. Believe it or not, this isn’t a sales pitch for ZooKeeper. If it’s a core business function, it probably makes sense to build it yourself as Joel describes. What doesn’t make sense, however, is to build out whatever that is without being cognizant of conventional wisdom. I’m amazed at how often people are willing to throw away institutional knowledge, either because they don’t seek it out or they think they can do better (without formal verification). If I have seen further, it is by standing on the shoulders of giants.