Everything You Know About Latency Is Wrong

Okay, maybe not everything you know about latency is wrong. But now that I have your attention, we can talk about why the tools and methodologies you use to measure and reason about latency are likely horribly flawed. In fact, they’re not just flawed, they’re probably lying to your face.

When I went to Strange Loop in September, I attended a workshop called “Understanding Latency and Application Responsiveness” by Gil Tene. Gil is the CTO of Azul Systems, which is most renowned for its C4 pauseless garbage collector and associated Zing Java runtime. While the workshop was four and a half hours long, Gil also gave a 40-minute talk called “How NOT to Measure Latency” which was basically an abbreviated, less interactive version of the workshop. If you ever get the opportunity to see Gil speak or attend his workshop, I recommend you do. At the very least, do yourself a favor and watch one of his recorded talks or find his slide decks online.

The remainder of this post is primarily a summarization of that talk. You may not get anything out of it that you wouldn’t get out of the talk, but I think it can be helpful to absorb some of these ideas in written form. Plus, for my own benefit, writing about them helps solidify it in my head.

What is Latency?

Latency is defined as the time it took one operation to happen. This means every operation has its own latency—with one million operations there are one million latencies. As a result, latency cannot be measured as work units / time. What we’re interested in is how latency behaves. To do this meaningfully, we must describe the complete distribution of latencies. Latency almost never follows a normal, Gaussian, or Poisson distribution, so looking at averages, medians, and even standard deviations is useless.

Latency tends to be heavily multi-modal, and part of this is attributed to “hiccups” in response time. Hiccups resemble periodic freezes and can be due to any number of reasons—GC pauses, hypervisor pauses, context switches, interrupts, database reindexing, cache buffer flushes to disk, etc. These hiccups never resemble normal distributions and the shift between modes is often rapid and eclectic.

Screen Shot 2015-10-04 at 4.32.24 PM

How do we meaningfully describe the distribution of latencies? We have to look at percentiles, but it’s even more nuanced than this. A trap that many people fall into is fixating on “the common case.” The problem with this is that there is a lot more to latency behavior than the common case. Not only that, but the “common” case is likely not as common as you think.

This is partly a tooling problem. Many of the tools we use do not do a good job of capturing and representing this data. For example, the majority of latency graphs produced by Grafana, such as the one below, are basically worthless. We like to look at pretty charts, and by plotting what’s convenient we get a nice colorful graph which is quite readable. Only looking at the 95th percentile is what you do when you want to hide all the bad stuff. As Gil describes, it’s a “marketing system.” Whether it’s the CTO, potential customers, or engineers—someone’s getting duped. Furthermore, averaging percentiles is mathematically absurd. To conserve space, we often keep the summaries and throw away the data, but the “average of the 95th percentile” is a meaningless statement. You cannot average percentiles, yet note the labels in most of your Grafana charts. Unfortunately, it only gets worse from here.

graph_logbase10_ms

Gil says, “The number one indicator you should never get rid of is the maximum value. That is not noise, that is the signal. The rest of it is noise.” To this point, someone in the workshop naturally responded with “But what if the max is just something like a VM restarting? That doesn’t describe the behavior of the system. It’s just an unfortunate, unlikely occurrence.” By ignoring the maximum, you’re effectively saying “this doesn’t happen.” If you can identify the cause as noise, you’re okay, but if you’re not capturing that data, you have no idea of what’s actually happening.

How Many Nines?

But how many “nines” do I really need to look at? The 99th percentile, by definition, is the latency below which 99% of the observations may be found. Is the 99th percentile rare? If we have a single search engine node, a single key-value store node, a single database node, or a single CDN node, what is the chance we actually hit the 99th percentile?

Gil describes some real-world data he collected which shows how many of the web pages we go to actually experience the 99th percentile, displayed in table below. The second column counts the number of HTTP requests generated by a single access of the web page. The third column shows the likelihood of one access experiencing the 99th percentile. With the exception of google.com, every page has a probability of 50% or higher of seeing the 99th percentile.

Screen Shot 2015-10-04 at 6.15.24 PM

The point Gil makes is that the 99th percentile is what most of your web pages will see. It’s not “rare.”

What metric is more representative of user experience? We know it’s not the average or the median. 95th percentile? 99.9th percentile? Gil walks through a simple, hypothetical example: a typical user session involves five page loads, averaging 40 resources per page. How many users will not experience something worse than the 95th percentile? 0.003%. By looking at the 95th percentile, you’re looking at a number which is relevant to 0.003% of your users. This means 99.997% of your users are going to see worse than this number, so why are you even looking at it?

On the flip side, 18% of your users are going to experience a response time worse than the 99.9th percentile, meaning 82% of users will experience the 99.9th percentile or better. Going further, more than 95% of users will experience the 99.97th percentile and more than 99% of users will experience the 99.995th percentile.

The median is the number that 99.9999999999% of response times will be worse than. This is why median latency is irrelevant. People often describe “typical” response time using a median, but the median just describes what everything will be worse than. It’s also the most commonly used metric.

If it’s so critical that we look at a lot of nines (and it is), why do most monitoring systems stop at the 95th or 99th percentile? The answer is simply because “it’s hard!” The data collected by most monitoring systems is usually summarized in small, five or ten second windows. This, combined with the fact that we can’t average percentiles or derive five nines from a bunch of small samples of percentiles means there’s no way to know what the 99.999th percentile for the minute or hour was. We end up throwing away a lot of good data and losing fidelity.

A Coordinated Conspiracy

Benchmarking is hard. Almost all latency benchmarks are broken because almost all benchmarking tools are broken. The number one cause of problems in benchmarks is something called “coordinated omission,” which Gil refers to as “a conspiracy we’re all a part of” because it’s everywhere. Almost all load generators have this problem.

We can look at a common load-testing example to see how this problem manifests. With this type of test, a client generally issues requests at a certain rate, measures the response time for each request, and puts them in buckets from which we can study percentiles later.

The problem is what if the thing being measured took longer than the time it would have taken before sending the next thing? What if you’re sending something every second, but this particular thing took 1.5 seconds? You wait before you send the next one, but by doing this, you avoided measuring something when the system was problematic. You’ve coordinated with it by backing off and not measuring when things were bad. To remain accurate, this method of measuring only works if all responses fit within an expected interval.

Coordinated omission also occurs in monitoring code. The way we typically measure something is by recording the time before, running the thing, then recording the time after and looking at the delta. We put the deltas in stats buckets and calculate percentiles from that. The code below is taken from a Cassandra benchmark.

Screen Shot 2015-10-04 at 7.29.09 PM

However, if the system experiences one of the “hiccups” described earlier, you will only have one bad operation and 10,000 other operations waiting in line. When those 10,000 other things go through, they will look really good when in reality the experience was really bad. Long operations only get measured once, and delays outside the timing window don’t get measured at all.

In both of these examples, we’re omitting data that looks bad on a very selective basis, but just how much of an impact can this have on benchmark results? It turns out the impact is huge.

Screen Shot 2015-10-04 at 7.27.43 PM

Imagine a “perfect” system which processes 100 requests/second at exactly 1 ms per request. Now consider what happens when we freeze the system (for example, using CTRL+Z) after 100 seconds of perfect operation for 100 seconds and repeat. We can intuitively characterize this system:

  • The average over the first 100 seconds is 1 ms.
  • The average over the next 100 seconds is 50 seconds.
  • The average over the 200 seconds is 25 seconds.
  • The 50th percentile is 1 ms.
  • The 75th percentile is 50 seconds.
  • The 99.99th percentile is 100 seconds.

Screen Shot 2015-10-04 at 7.49.10 PM

Now we try measuring the system using a load generator. Before freezing, we run 100 seconds at 100 requests/second for a total of 10,000 requests at 1 ms each. After the stall, we get one result of 100 seconds. This is the entirety of our data, and when we do the math, we get these results:

  • The average over the 200 seconds is 10.9 ms (should be 25 seconds).
  • The 50th percentile is 1 ms.
  • The 75th percentile is 1 ms (should be 50 seconds).
  • The 99.99th percentile is 1 ms (should be 100 seconds).

Screen Shot 2015-10-04 at 7.57.23 PM

Basically, your load generator and monitoring code tell you the system is ready for production, when in fact it’s lying to you! A simple “CTRL+Z” test can catch coordinated omission, but people rarely do it. It’s critical to calibrate your system this way. If you find it giving you these kind of results, throw away all the numbers—they’re worthless.

You have to measure at random or “fair” rates. If you measure 10,000 things in the first 100 seconds, you have to measure 10,000 things in the second 100 seconds during the stall. If you do this, you’ll get the correct numbers, but they won’t be as pretty. Coordinated omission is the simple act of erasing, ignoring, or missing all the “bad” stuff, but the data is good.

Surely this data can still be useful though, even if it doesn’t accurately represent the system? For example, we can still use it to identify performance regressions or validate improvements, right? Sadly, this couldn’t be further from the truth. To see why, imagine we improve our system. Instead of pausing for 100 seconds after 100 seconds of perfect operation, it handles all requests at 5 ms each after 100 seconds. Doing the math, we get the following:

  • The 50th percentile is 1 ms
  • The 75th percentile is 2.5 ms (stall showed 1 ms)
  • The 99.99th percentile is 5 ms (stall showed 1 ms)

This data tells us we hurt the four nines and made the system 5x worse! This would tell us to revert the change and go back to the way it was before, which is clearly the wrong decision. With bad data, better can look worse. This shows that you cannot have any intuition based on any of these numbers. The data is garbage.

With many load generators, the situation is actually much worse than this. These systems work by generating a constant load. If our test is generating 100 requests/second, we run 10,000 requests in the first 100 seconds. When we stall, we process just one request. After the stall, the load generator sees that it’s 9,999 requests behind and issues those requests to catch back up. Not only did it get rid of the bad requests, it replaced them with good requests. Now the data is twice as wrong as just dropping the bad requests.

What coordinated omission is really showing you is service time, not response time. If we imagine a cashier ringing up customers, the service time is the time it takes the cashier to do the work. The response time is the time a customer waits before they reach the register. If the rate of arrival is higher than the service rate, the response time will continue to grow. Because hiccups and other phenomena happen, response times often bounce around. However, coordinated omission lies to you about response time by actually telling you the service time and hiding the fact that things stalled or waited in line.

Measuring Latency

Latency doesn’t live in a vacuum. Measuring response time is important, but you need to look at it in the context of load. But how do we properly measure this? When you’re nearly idle, things are nearly perfect, so obviously that’s not very useful. When you’re pedal to the metal, things fall apart. This is somewhat useful because it tells us how “fast” we can go before we start getting angry phone calls.

However, studying the behavior of latency at saturation is like looking at the shape of your car’s bumper after wrapping it around a pole. The only thing that matters when you hit the pole is that you hit the pole. There’s no point in trying to engineer a better bumper, but we can engineer for the speed at which we lose control. Everything is going to suck at saturation, so it’s not super useful to look at beyond determining your operating range.

What’s more important is testing the speeds in between idle and hitting the pole. Define your SLAs and plot those requirements, then run different scenarios using different loads and different configurations. This tells us if we’re meeting our SLAs but also how many machines we need to provision to do so. If you don’t do this, you don’t know how many machines you need.

How do we capture this data? In an ideal world, we could store information for every request, but this usually isn’t practical. HdrHistogram is a tool which allows you to capture latency and retain high resolution. It also includes facilities for correcting coordinated omission and plotting latency distributions. The original version of HdrHistogram was written in Java, but there are versions for many other languages.

Screen Shot 2015-10-05 at 12.00.04 AM

To Summarize

To understand latency, you have to consider the entire distribution. Do this by plotting the latency distribution curve. Simply looking at the 95th or even 99th percentile is not sufficient. Tail latency matters. Worse yet, the median is not representative of the “common” case, the average even less so. There is no single metric which defines the behavior of latency. Be conscious of your monitoring and benchmarking tools and the data they report. You can’t average percentiles.

Remember that latency is not service time. If you plot your data with coordinated omission, there’s often a quick, high rise in the curve. Run a “CTRL+Z” test to see if you have this problem. A non-omitted test has a much smoother curve. Very few tools actually correct for coordinated omission.

Latency needs to be measured in the context of load, but constantly running your car into a pole in every test is not useful. This isn’t how you’re running in production, and if it is, you probably need to provision more machines. Use it to establish your limits and test the sustainable throughputs in between to determine if you’re meeting your SLAs. There are a lot of flawed tools out there, but HdrHistogram is one of the few that isn’t. It’s useful for benchmarking and, since histograms are additive and HdrHistogram uses log buckets, it can also be useful for capturing high-volume data in production.

Stream Processing and Probabilistic Methods: Data at Scale

Stream processing and related abstractions have become all the rage following the rise of systems like Apache Kafka, Samza, and the Lambda architecture. Applying the idea of immutable, append-only event sourcing means we’re storing more data than ever before. However, as the cost of storage continues to decline, it’s becoming more feasible to store more data for longer periods of time. With immutability, how the data lives isn’t interesting anymore. It’s all about how it moves.

The shifting landscape of data architecture parallels the way we’re designing systems today. Specifically, the evolution of monolithic to service-oriented architecture necessitates a change in the way we do data integration. The traditional normalization approach doesn’t cut it. Our systems are composed of databases, caches, search indexes, data warehouses, and a multitude of other components. Moreover, there’s an increasing demand for online, real-time processing of this data that’s tantamount to the growing popularity of large-scale, offline processing along the lines of Hadoop. This presents an interesting set of new challenges, namely, how do we drink from the firehose without getting drenched?

The answer most likely lies in frameworks like Samza, Storm, and Spark Streaming. Similarly, tools like StatsD solve the problem of collecting real-time analytics. However, this discussion is meant to explore some of the primitives used in stream processing. The ideas extend far beyond event sourcing, generalizing to any type of data stream, unbounded or not.

Batch vs. Streaming

With offline or batch processing, we often have some heuristics which provide insight into our data set, and we can typically afford multiple passes of the data. Without a strict time constraint, data structures are less important. We can store the entire data set on disk (or perhaps across a distributed file system) and process it in batches.

With streaming data, however, there’s a need for near real-time processing—collecting analytics, monitoring and alerting, updating search indexes and caches, etc. With web crawlers, we process a stream of URLs and documents and produce indexed content. With websites, we process a stream of page views and update various counters and gauges. With email, we process a stream of text and produce a filtered, spam-free inbox. These cases involve massive, often limitless data sets. Processing that data online can’t be done with the conventional ETL or MapReduce-style methods, and unless you’re doing windowed processing, it’s entirely impractical to store that data in memory.

Framing the Problem

As a concrete example of stream processing, imagine we want to count the number of distinct document views across a large corpus, say, Wikipedia. A naive solution would be to use a hash table which maps a document to a count. Wikipedia has roughly 35 million pages. Let’s assume each document is identified by a 16-byte GUID, and the counters are stored as 8-byte integers. This means we need in the ball park of a gigabyte of memory. Given today’s hardware, this might not sound completely unreasonable, but now let’s say we want to track views per unique IP address. We see that this approach quickly becomes intractable.

To illustrate further, consider how to count the cardinality of IP addresses which access our website. Instead of a hash table, we can use a bitmap or sparse bit array to count addresses. There are over four billion possible distinct IPv4 addresses. Sure, we could allocate half a gigabyte of RAM, but if you’re developing performance-critical systems, large heap sizes are going to kill you, and this overhead doesn’t help. ((Granted, if you’re sensitive to garbage-collection pauses, you may be better off using something like C or Rust, but that’s not always the most practical option.))

A Probabilistic Approach

Instead, we turn to probabilistic ways of solving these problems. Probabilistic algorithms and data structures seem to be oft-overlooked, or maybe purposely ignored, by system designers despite the theory behind them having been around for a long time in many cases. The goal should be to move these from the world of academia to industry because they are immensely useful and widely neglected.

Probabilistic solutions trade space and performance for accuracy. While a loss in precision may be a cause for concern to some, the fact is with large data streams, we have to trade something off to get real-time insight. Much like the CAP theorem, we must choose between consistency (accuracy) and availability (online). With CAP, we can typically adjust the level in which that trade-off occurs. This space/performance-accuracy exchange behaves very much the same way.

The literature can get pretty dense, so let’s look at what some of these approaches are and the problems they solve in a way that’s (hopefully) understandable.

Bloom Filters

The Bloom filter is probably the most well-known and, conceptually, simplest probabilistic data structure. It also serves as a good foundation because there are a lot of twists you can put on it. The theory provides a kernel from which many other probabilistic solutions are derived, as we will see.

Bloom filters answer a simple question: is this element a member of a set? A normal set requires linear space because it stores every element. A Bloom filter doesn’t store the actual elements, it merely stores the “membership” of them. It uses sub-linear space opening the possibility for false positives, meaning there’s a non-zero probability it reports an item is in the set when it’s actually not. This has a wide range of applications. For example, a Bloom filter can be placed in front of a database. When a query for a piece of data comes in and the filter doesn’t contain it, we completely bypass the database.

The Bloom filter consists of a bit array of length and hash functions. Both of these parameters are configurable, but we can optimize them based on a desired rate of false positives. Each bit in the array is initially unset. When an element is “added” to the filter, it’s hashed by each of the functions, h1…hk, and modded by m, resulting in indices into the bit array. Each bit at the respective index is set.

To query the membership of an element, we hash it and check if each bit is set. If any of them are zero, we know the item isn’t in the set. What happens when two different elements hash to the same index? This is where false positives are introduced. If the bits aren’t set, we know the element wasn’t added, but if they are, it’s possible that some other element or group of elements hashed to the same indices. Bloom filters have false positives, but false negatives are impossible—a member will never be reported incorrectly as a non-member. Unfortunately, this also means items can’t be removed from the filter.

It’s clear that the likelihood of false positives increases with each element added to the filter. We can target a specific probability of false positives by selecting an optimal value for m and k for up to n insertions. ((The math behind determining optimal m and k is left as an exercise for the reader.)) While this implementation works well in practice, it has a drawback in that some elements are potentially more sensitive to false positives than others. We can solve this problem by partitioning the m bits among the k hash functions such that each one produces an index over its respective partition. As a result, each element is always described by exactly bits. This prevents any one element from being especially sensitive to false positives. Since calculating k hashes for every element is computationally expensive, we can actually perform a single hash and derive k hashes from it for a significant speedup. ((Less Hashing, Same Performance: Building a Better Bloom Filter discusses the use of two hash functions to simulate additional hashes. We can use a 64-bit hash, such as FNV, and use the upper and lower 32-bits as two different hashes.))

A Bloom filter eventually reaches a point where all bits are set, which means every query will indicate membership, effectively making the probability of false positives one. The problem with this is it requires a priori knowledge of the data set in order to select optimal parameters and avoid “overfilling.” Consequently, Bloom filters are ideal for offline processing and not so great for dealing with streams. Next, we’ll look at some variations of the Bloom filter which attempt to deal with this issue.

Scalable Bloom Filter

As we saw earlier, traditional Bloom filters are a great way to deal with set-membership problems in a space-efficient way, but they require knowing the size of the data set ahead of time in order to be effective. The Scalable Bloom Filter (SBF) was introduced by Almeida et al. as a way to cope with the capacity dilemma.

The Scalable Bloom Filter dynamically adapts to the size of the data set while enforcing a tight upper bound on the rate of false positives. Like the classic Bloom filter, false negatives are impossible. The SBF is essentially an array of Bloom filters with geometrically decreasing false-positive rates. New elements are added to the last filter. When this filter becomes “full”—more specifically, when it reaches a target fill ratio—a new filter is added with a tightened error probability. A tightening ratio, r, controls the growth of new filters.

Testing membership of an element consists of checking each of the filters. The geometrically decreasing false-positive rate of each filter is an interesting property. Since the fill ratio determines when a new filter is added, it turns out this progression can be modeled as a Taylor series which allows us to provide a tight upper bound on false positives using an optimal fill ratio of 0.5 (once the filter is 50% full, a new one is added). The compounded probability over the whole series converges to a target value, even accounting for an infinite series.

We can limit the error rate, but obviously the trade-off is that we end up allocating memory proportional to the size of the data set since we’re continuously adding filters. We also pay some computational cost on adds. Amortized, the cost of filter insertions is negligible, but for every add we must compute the current fill ratio of the filter to determine if a new filter needs to be added. Fortunately, we can optimize this by computing an estimated fill ratio. If we keep a running count of the items added to the filter, n, the approximate current fill ratio, p, can be obtained from the Taylor series expansion with p \approx 1-e^{-n/m} where m is the number of bits in the filter. Calculating this estimate turns out to be quite a bit faster than computing the actual fill ratio—fewer memory reads.

To provide some context around performance, adds in my Go implementation of a classic Bloom filter take about 166 ns on my MacBook Pro, and membership tests take 118 ns. With the SBF, adds take 422 ns and tests 113 ns. This isn’t a completely fair comparison since, as the SBF grows over time, tests require scanning through each filter, but certainly the add numbers are intuitive.

Scalable Bloom Filters are useful for cases where the size of the data set isn’t known a priori and memory constraints aren’t of particular concern. It’s an effective variation of a regular Bloom filter which generally requires space allocation orders-of-magnitude larger than the data set to allow enough headroom.

Stable Bloom Filter

Classic Bloom filters aren’t great for streaming data, and Scalable Bloom Filters, while a better option, still present a memory problem. Another derivative, the Stable Bloom Filter (SBF), was proposed by Deng and Rafiei as a technique for detecting duplicates in unbounded data streams with limited space. Most solutions work by dividing the stream into fixed-size windows and solving the problem within that discrete space. This allows the use of traditional methods like the Bloom filter.

The SBF attempts to approximate duplicates without the use of windows. Clearly, we can’t store the entire history of an infinite stream in limited memory. Instead, the thinking is more recent data has more value than stale data in many scenarios. As an example, web crawlers may not care about redundantly fetching a web page that was crawled a long time ago compared to fetching a page that was crawled more recently. With this impetus, the SBF works by representing more recent data while discarding old information.

The Stable Bloom Filter tweaks the classic Bloom filter by replacing the bit array with an array of m cells, each allocated d bits. The cells are simply counters, initially set to zero. The maximum value, Max, of a cell is 2^d-1. When an element is added, we first have to ensure space for it. This is done by selecting P random cells and decrementing their counters by one, clamping to zero. Next, we hash the element to k cells and set their counters to Max. Testing membership consists of probing the k cells. If any of them are zero, it’s not a member. The Bloom filter is actually a special case of an SBF where d is one and P is zero.

Like traditional Bloom filters, an SBF has a non-zero probability of false positives, which is controlled by several parameters. Unlike the Bloom filter, an SBF has a tight upper bound on the rate of false positives while introducing a non-zero rate of false negatives. The false-positive rate of a classic Bloom filter eventually reaches one, after which all queries result in a false positive. The stable-point property of an SBF means the false-positive rate asymptotically approaches a configurable fixed constant.

Stable Bloom Filters lend themselves to situations where the size of the data set isn’t known ahead of time and memory is bounded. For example, an SBF can be used to deduplicate events from an unbounded event stream with a specified upper bound on false positives and minimal false negatives. In particular, if the stream is not uniformly distributed, meaning duplicates are likely to be grouped closer together, the rate of false positives becomes immaterial.

Counting Bloom Filter

Conventional Bloom filters don’t allow items to be removed. The Stable Bloom Filter, on the other hand, works by evicting old data, but its API doesn’t expose a way to remove specific elements. The Counting Bloom Filter (CBF) is yet another twist on this structure. Introduced by Fan et al. in Summary Cache: A Scalable Wide-Area Web Cache Sharing Protocol, the CBF provides a way of deleting data from a filter.

It’s actually a very straightforward approach. The filter comprises an array of n-bit buckets similar to the Stable Bloom Filter. When an item is added, the corresponding counters are incremented, and when it’s removed, the counters are decremented.

Obviously, a CBF takes n-times more space than a regular Bloom filter, but it also has a scalability limit. Unless items are removed frequently enough, a counting filter’s false-positive probability will approach one. We need to dimension it accordingly, and this normally requires inferring from our data set. Likewise, since the CBF allows removals, it exposes an opportunity for false negatives. The multi-bit counters diminish the rate, but it’s important to be cognizant of this property so the CBF can be applied appropriately.

Inverse Bloom Filter

One of the distinguishing features of Bloom filters is the guarantee of no false negatives. This can be a valuable invariant, but what if we want the opposite of that? Is there an efficient data structure that can offer us no false positives with the possibility of false negatives? The answer is deceptively simple.

Jeff Hodges describes a rather trivial—yet strikingly pragmatic—approach which he calls the “opposite of a Bloom filter.” Since that name is a bit of a mouthful, I’m referring to this as the Inverse Bloom Filter (IBF).

The Inverse Bloom Filter may report a false negative but can never report a false positive. That is, it may indicate that an item has not been seen when it actually has, but it will never report an item as seen which it hasn’t come across. The IBF behaves in a similar manner to a fixed-size hash map of m buckets which doesn’t handle conflicts, but it provides lock-free concurrency using an underlying CAS.

The test-and-add operation hashes an element to an index in the filter. We take the value at that index while atomically storing the new value, then the old value is compared to the new one. If they differ, the new value wasn’t a member. If the values are equivalent, it was in the set.

The Inverse Bloom Filter is a nice option for dealing with unbounded streams or large data sets due to its limited memory usage. If duplicates are close together, the rate of false negatives becomes vanishingly small with an adequately sized filter.

HyperLogLog

Let’s revisit the problem of counting distinct IP addresses visiting a website. One solution might be to allocate a sparse bit array proportional to the number of IPv4 addresses. Alternatively, we could combine a Bloom filter with a counter. When an address comes in and it’s not in the filter, increment the count. While these both work, they’re not likely to scale very well. Rather, we can apply a probabilistic data structure known as the HyperLogLog (HLL). First presented by Flajolet et al. in 2007, HyperLogLog is an algorithm which approximately counts the number of distinct elements, or cardinality, of a multiset (a set which allows multiple occurrences of its elements).

Imagine our stream as a series of coin tosses. Someone tells you the most heads they flipped in a row was three. You can guess that they didn’t flip the coin very many times. However, if they flipped 20 heads in a row, it probably took them quite a while. This seems like a really unconvincing way of estimating the number of tosses, but from this kernel of thought grows a fruitful idea.

HLL replaces heads and tails with zeros and ones. Instead of counting the longest run of heads, it counts the longest run of leading zeros in a binary hash. Using a good hash function means that the values are, more or less, statistically independent and uniformly distributed. If the maximum run of leading zeros is n, the number of distinct elements in the set is approximately 2^n. But wait, what’s the math behind this? If we think about it, half of all binary numbers start with 1. Each additional bit divides the probability of a run further in half; that is, 25% start with 01, 12.5% start with 001, 6.25% start with 0001, ad infinitum. Thus, the probability of a run of length n is 2^{-(n+1)}.

Like flipping a coin, there’s potential for an enormous amount of variance with this technique. For instance, it’s entirely possible—though unlikely—that you flip 20 heads in a row on your first try. One experiment doesn’t provide enough data, so instead you flip 10 coins. HyperLogLog uses a similar strategy to reduce variance by splitting the stream up across a set of buckets, or registers, and applying the counting algorithm on the values in each one. It tracks the longest run of zeros in every register, and the total cardinality is computed by taking the harmonic mean across all registers. The harmonic mean is used to discount outliers since the distribution of values tends to skew towards the right. We can increase accuracy by adding more registers, which of course comes at the expense of performance and memory.

HyperLogLog is a remarkable algorithm. It almost feels like magic, and it’s exceptionally useful for working with data streams. HLL has a fraction of the memory footprint of other solutions. In fact, it’s usually several orders of magnitude while remaining surprisingly accurate.

Count-Min Sketch

HyperLogLog is a great option to efficiently count the number of distinct elements in a stream using a minimal amount of space, but it only gives us cardinality. What about counting the frequencies of specific elements? Cormode and Muthukrishnan developed the Count-Min sketch (CM sketch) to approximate the occurrences of different event types from a stream of events.

In contrast to a hash table, the Count-Min sketch uses sub-linear space to count frequencies. It consists of a matrix with w columns and d rows. These parameters determine the trade-off between space/time constraints and accuracy. Each row has an associated hash function. When an element arrives, it’s hashed for each row. The corresponding index in the rows are incremented by one. In this regard, the CM sketch shares some similarities with the Bloom filter.

count_min_sketch

The frequency of an element is estimated by taking the minimum of all the element’s respective counter values. The thought process here is that there is possibility for collisions between elements, which affects the counters for multiple items.  Taking the minimum count results in a closer approximation.

The Count-Min sketch is an excellent tool for counting problems for the same reasons as HyperLogLog. It has a lot of similarities to Bloom filters, but where Bloom filters effectively represent sets, the CM sketch considers multisets.

MinHash

The last probabilistic technique we’ll briefly look at is MinHash. This algorithm, invented by Andrei Broder, is used to quickly estimate the similarity between two sets. This has a variety of uses, such as detecting duplicate bodies of text and clustering or comparing documents.

MinHash works, in part, by using the Jaccard coefficient, a statistic which represents the size of the intersection of two sets divided by the size of the union:

J(A,B) = {{|A \cap B|}\over{|A \cup B|}}

This provides a measure of how similar the two sets are, but computing the intersection and union is expensive. Instead, MinHash essentially works by comparing randomly selected subsets through element hashing. It resembles locality-sensitive hashing (LSH), which means that similar items have a greater tendency to map to the same hash values. This varies from most hashing schemes, which attempt to avoid collisions. Instead, LSH is designed to maximize collisions of similar elements.

MinHash is one of the more difficult algorithms to fully grasp, and I can’t even begin to provide a proper explanation. If you’re interested in how it works, read the literature but know that there are a number of variations of it. The important thing is to know it exists and what problems it can be applied to.

In Practice

We’ve gone over several fundamental primitives for processing large data sets and streams while solving a few different types of problems. Bloom filters can be used to reduce disk reads and other expensive operations. They can also be purposed to detect duplicate events in a stream or prune a large decision tree. HyperLogLog excels at counting the number of distinct items in a stream while using minimal space, and the Count-Min sketch tracks the frequency of particular elements. Lastly, the MinHash method provides an efficient mechanism for comparing the similarity between documents. This has a number of applications, namely around identifying duplicates and characterizing content.

While there are no doubt countless implementations for most of these data structures and algorithms, I’ve been putting together a Go library geared towards providing efficient techniques for stream processing called Boom Filters. It includes implementations for all of the probabilistic data structures outlined above. As streaming data and real-time consumption grow more and more prominent, it will become important that we have the tools and understanding to deal with them. If nothing else, it’s valuable to be aware of probabilistic methods because they often provide accurate results at a fraction of the cost. These things aren’t just academic research, they form the basis for building online, high-performance systems at scale.

Benchmark Responsibly

When I posted my Dissecting Message Queues article last summer, it understandably caused some controversy.  I received both praise and scathing comments, emails asking why I didn’t benchmark X and pull requests to bump the numbers of Y. To be honest, that analysis was more of a brain dump from my own test driving of various message queues than any sort of authoritative or scientific study—it was far from the latter, to say the least. The qualitative discussion was pretty innocuous, but the benchmarks and supporting code were the target of a lot of (valid) criticism. In retrospect, it was probably irresponsible to publish them, but I was young and naive back then; now I’m just mostly naive.

Comparing Apples to Other Assorted Fruit

One such criticism was that the benchmarks were divided into two very broad categories: brokerless and brokered. While the brokerless group compared two very similar libraries, ZeroMQ and nanomsg, the second group included a number of distinct message brokers like RabbitMQ, Kafka, NATS, and Redis, to name a few.

The problem is not all brokers are created equal. They often have different goals and different prescribed use cases. As such, they impose different guarantees, different trade-offs, and different constraints. By grouping these benchmarks together, I implied they were fundamentally equivalent, when in fact, most were fundamentally different. For example, NATS serves a very different purpose than Kafka, and Redis, which offers pub/sub messaging, typically isn’t thought of as a message broker at all.

Measure Right or Don’t Measure at All

Another criticism was the way in which the benchmarks were performed. The tests were immaterial. The producer, consumer, and the message queue itself all ran on the same machine. Even worse, they used just a single publisher and subscriber. Not only does it not test what a remotely realistic configuration looks like, but it doesn’t even give you a good idea of a trivial one.

To be meaningful, we need to test with more than one producer and consumer, ideally distributed across many machines. We want to see how the system scales to larger workloads. Certainly, the producers and consumers cannot be collocated when we’re measuring discrete throughputs on either end, nor should the broker. This helps to reduce confounding variables between the system under test and the load generation.

It’s Not Rocket Science, It’s Computer Science

The third major criticism lay with the measurements themselves. Measuring throughput is fairly straightforward: we look at the number of messages sent per unit of time at both the sender and the receiver. If we think of a pipe carrying water, we might look at a discrete cross section and the rate at which water passes through it.

Latency, as a concept, is equally simple. With the pipe, it’s the time it takes for a drop of water to travel from one end to the other. While throughput is dependent on the pipe’s diameter, latency is dependent upon its length. What this means is that we can’t derive one from the other. In order to properly measure latency, we need to consider the latency of each message sent through the system.

However, we can’t ignore the relationship between throughput and latency and what the compromise between them means. Generally, we want to make things as fast as possible. Consider a single-cycle CPU. Its latency per instruction will be extremely low but contrasted with a pipelined processor, its throughput is abysmal—one instruction per clock cycle. The implication is that if we trade per-operation latency for throughput, we actually get a decrease in latency for aggregate instructions. Unfortunately, the benchmarks eschewed this relationship by requiring separate latency and throughput tests which used different code paths.

The interaction between latency and throughput is easy to get confused, but it often has interesting ramifications, whether you’re looking at message queues, CPUs, or databases. In a general sense, we’d say “optimize for latency” because lower latency means higher throughput, but the reality is it’s almost always easier (and more cost-effective) to increase throughput than it is to decrease latency, especially on commodity hardware.

Capturing this data, in and of itself, isn’t terribly difficult, but what’s more susceptible to error is how it’s represented. This was the main fault of the benchmarks (in addition to the things described earlier). The most egregious thing they did was report latency as an average. This is like the cardinal sin of benchmarking. The number is practically useless, particularly without any context like a standard deviation.

We know that latency isn’t going to be uniform, but it’s probably not going to follow a normal distribution either. While network latency may be prone to fitting a nice bell curve, system latency almost certainly won’t. They often exhibit things like GC pauses and other “hiccups,” and averages tend to hide these.

latency

Measuring performance isn’t all that easy, but if you do it, at least do it in a way that disambiguates the results. Look at quantiles, not averages. If you do present a mean, include the standard deviation and max in addition to the 90th or 99th percentile. Plotting latency by percentile distribution is an excellent way to see what your performance behavior actually looks like. Gil Tene has a great talk on measuring latency which I highly recommend.

Working Towards a Better Solution

With all this in mind, we can work towards building a better way to test and measure messaging systems. The discussion above really just gives us three key takeaways:

  1. Don’t compare apples to oranges.
  2. Don’t instrument tests in a way that’s not at all representative of real life.
  3. Don’t present results in a statistically insignificant way.

My first attempt at taking these ideas to heart is a tool I call Flotilla. It’s meant to provide a way to test messaging systems in more realistic configurations, at scale, while offering more useful data. Flotilla allows you to easily spin up producers and consumers on arbitrarily many machines, start a message broker, and run a benchmark against it, all in an automated fashion. It then collects data like producer/consumer throughput and the complete latency distribution and reports back to the user.

Flotilla uses a Go port of HdrHistogram to capture latency data, of which I’m a raving fan. HdrHistogram uses a bucketed approach to record values across a configured high-dynamic range at a particular resolution. Recording is in the single-nanosecond range and the memory footprint is constant. It also has support for correcting coordinated omission, which is a common problem in benchmarking. Seriously, if you’re doing anything performance sensitive, give HdrHistogram a look.

Still, Flotilla is not perfect and there’s certainly work to do, but I think it’s a substantial improvement over the previous MQ benchmarking utility. Longer term, it would be great to integrate it with something like Comcast to test workloads under different network conditions. Testing in a vacuum is nice and all, but we know in the real word, the network isn’t perfectly reliable.

So, Where Are the Benchmarks?

Omitted—for now, anyway. My goal really isn’t to rank a hodgepodge of different message queues because there’s really not much value in doing that. There are different use cases for different systems. I might, at some point, look at individual systems in greater detail, but comparing things like message throughput and latency just devolves into a hotly contested pissing contest. My hope is to garner more feedback and improvements to Flotilla before using it to definitively measure anything.

Benchmark responsibly.