Stream Processing and Probabilistic Methods: Data at Scale

Stream processing and related abstractions have become all the rage following the rise of systems like Apache Kafka, Samza, and the Lambda architecture. Applying the idea of immutable, append-only event sourcing means we’re storing more data than ever before. However, as the cost of storage continues to decline, it’s becoming more feasible to store more data for longer periods of time. With immutability, how the data lives isn’t interesting anymore. It’s all about how it moves.

The shifting landscape of data architecture parallels the way we’re designing systems today. Specifically, the evolution of monolithic to service-oriented architecture necessitates a change in the way we do data integration. The traditional normalization approach doesn’t cut it. Our systems are composed of databases, caches, search indexes, data warehouses, and a multitude of other components. Moreover, there’s an increasing demand for online, real-time processing of this data that’s tantamount to the growing popularity of large-scale, offline processing along the lines of Hadoop. This presents an interesting set of new challenges, namely, how do we drink from the firehose without getting drenched?

The answer most likely lies in frameworks like Samza, Storm, and Spark Streaming. Similarly, tools like StatsD solve the problem of collecting real-time analytics. However, this discussion is meant to explore some of the primitives used in stream processing. The ideas extend far beyond event sourcing, generalizing to any type of data stream, unbounded or not.

Batch vs. Streaming

With offline or batch processing, we often have some heuristics which provide insight into our data set, and we can typically afford multiple passes of the data. Without a strict time constraint, data structures are less important. We can store the entire data set on disk (or perhaps across a distributed file system) and process it in batches.

With streaming data, however, there’s a need for near real-time processing—collecting analytics, monitoring and alerting, updating search indexes and caches, etc. With web crawlers, we process a stream of URLs and documents and produce indexed content. With websites, we process a stream of page views and update various counters and gauges. With email, we process a stream of text and produce a filtered, spam-free inbox. These cases involve massive, often limitless data sets. Processing that data online can’t be done with the conventional ETL or MapReduce-style methods, and unless you’re doing windowed processing, it’s entirely impractical to store that data in memory.

Framing the Problem

As a concrete example of stream processing, imagine we want to count the number of distinct document views across a large corpus, say, Wikipedia. A naive solution would be to use a hash table which maps a document to a count. Wikipedia has roughly 35 million pages. Let’s assume each document is identified by a 16-byte GUID, and the counters are stored as 8-byte integers. This means we need in the ball park of a gigabyte of memory. Given today’s hardware, this might not sound completely unreasonable, but now let’s say we want to track views per unique IP address. We see that this approach quickly becomes intractable.

To illustrate further, consider how to count the cardinality of IP addresses which access our website. Instead of a hash table, we can use a bitmap or sparse bit array to count addresses. There are over four billion possible distinct IPv4 addresses. Sure, we could allocate half a gigabyte of RAM, but if you’re developing performance-critical systems, large heap sizes are going to kill you, and this overhead doesn’t help. ((Granted, if you’re sensitive to garbage-collection pauses, you may be better off using something like C or Rust, but that’s not always the most practical option.))

A Probabilistic Approach

Instead, we turn to probabilistic ways of solving these problems. Probabilistic algorithms and data structures seem to be oft-overlooked, or maybe purposely ignored, by system designers despite the theory behind them having been around for a long time in many cases. The goal should be to move these from the world of academia to industry because they are immensely useful and widely neglected.

Probabilistic solutions trade space and performance for accuracy. While a loss in precision may be a cause for concern to some, the fact is with large data streams, we have to trade something off to get real-time insight. Much like the CAP theorem, we must choose between consistency (accuracy) and availability (online). With CAP, we can typically adjust the level in which that trade-off occurs. This space/performance-accuracy exchange behaves very much the same way.

The literature can get pretty dense, so let’s look at what some of these approaches are and the problems they solve in a way that’s (hopefully) understandable.

Bloom Filters

The Bloom filter is probably the most well-known and, conceptually, simplest probabilistic data structure. It also serves as a good foundation because there are a lot of twists you can put on it. The theory provides a kernel from which many other probabilistic solutions are derived, as we will see.

Bloom filters answer a simple question: is this element a member of a set? A normal set requires linear space because it stores every element. A Bloom filter doesn’t store the actual elements, it merely stores the “membership” of them. It uses sub-linear space opening the possibility for false positives, meaning there’s a non-zero probability it reports an item is in the set when it’s actually not. This has a wide range of applications. For example, a Bloom filter can be placed in front of a database. When a query for a piece of data comes in and the filter doesn’t contain it, we completely bypass the database.

The Bloom filter consists of a bit array of length and hash functions. Both of these parameters are configurable, but we can optimize them based on a desired rate of false positives. Each bit in the array is initially unset. When an element is “added” to the filter, it’s hashed by each of the functions, h1…hk, and modded by m, resulting in indices into the bit array. Each bit at the respective index is set.

To query the membership of an element, we hash it and check if each bit is set. If any of them are zero, we know the item isn’t in the set. What happens when two different elements hash to the same index? This is where false positives are introduced. If the bits aren’t set, we know the element wasn’t added, but if they are, it’s possible that some other element or group of elements hashed to the same indices. Bloom filters have false positives, but false negatives are impossible—a member will never be reported incorrectly as a non-member. Unfortunately, this also means items can’t be removed from the filter.

It’s clear that the likelihood of false positives increases with each element added to the filter. We can target a specific probability of false positives by selecting an optimal value for m and k for up to n insertions. ((The math behind determining optimal m and k is left as an exercise for the reader.)) While this implementation works well in practice, it has a drawback in that some elements are potentially more sensitive to false positives than others. We can solve this problem by partitioning the m bits among the k hash functions such that each one produces an index over its respective partition. As a result, each element is always described by exactly bits. This prevents any one element from being especially sensitive to false positives. Since calculating k hashes for every element is computationally expensive, we can actually perform a single hash and derive k hashes from it for a significant speedup. ((Less Hashing, Same Performance: Building a Better Bloom Filter discusses the use of two hash functions to simulate additional hashes. We can use a 64-bit hash, such as FNV, and use the upper and lower 32-bits as two different hashes.))

A Bloom filter eventually reaches a point where all bits are set, which means every query will indicate membership, effectively making the probability of false positives one. The problem with this is it requires a priori knowledge of the data set in order to select optimal parameters and avoid “overfilling.” Consequently, Bloom filters are ideal for offline processing and not so great for dealing with streams. Next, we’ll look at some variations of the Bloom filter which attempt to deal with this issue.

Scalable Bloom Filter

As we saw earlier, traditional Bloom filters are a great way to deal with set-membership problems in a space-efficient way, but they require knowing the size of the data set ahead of time in order to be effective. The Scalable Bloom Filter (SBF) was introduced by Almeida et al. as a way to cope with the capacity dilemma.

The Scalable Bloom Filter dynamically adapts to the size of the data set while enforcing a tight upper bound on the rate of false positives. Like the classic Bloom filter, false negatives are impossible. The SBF is essentially an array of Bloom filters with geometrically decreasing false-positive rates. New elements are added to the last filter. When this filter becomes “full”—more specifically, when it reaches a target fill ratio—a new filter is added with a tightened error probability. A tightening ratio, r, controls the growth of new filters.

Testing membership of an element consists of checking each of the filters. The geometrically decreasing false-positive rate of each filter is an interesting property. Since the fill ratio determines when a new filter is added, it turns out this progression can be modeled as a Taylor series which allows us to provide a tight upper bound on false positives using an optimal fill ratio of 0.5 (once the filter is 50% full, a new one is added). The compounded probability over the whole series converges to a target value, even accounting for an infinite series.

We can limit the error rate, but obviously the trade-off is that we end up allocating memory proportional to the size of the data set since we’re continuously adding filters. We also pay some computational cost on adds. Amortized, the cost of filter insertions is negligible, but for every add we must compute the current fill ratio of the filter to determine if a new filter needs to be added. Fortunately, we can optimize this by computing an estimated fill ratio. If we keep a running count of the items added to the filter, n, the approximate current fill ratio, p, can be obtained from the Taylor series expansion with p \approx 1-e^{-n/m} where m is the number of bits in the filter. Calculating this estimate turns out to be quite a bit faster than computing the actual fill ratio—fewer memory reads.

To provide some context around performance, adds in my Go implementation of a classic Bloom filter take about 166 ns on my MacBook Pro, and membership tests take 118 ns. With the SBF, adds take 422 ns and tests 113 ns. This isn’t a completely fair comparison since, as the SBF grows over time, tests require scanning through each filter, but certainly the add numbers are intuitive.

Scalable Bloom Filters are useful for cases where the size of the data set isn’t known a priori and memory constraints aren’t of particular concern. It’s an effective variation of a regular Bloom filter which generally requires space allocation orders-of-magnitude larger than the data set to allow enough headroom.

Stable Bloom Filter

Classic Bloom filters aren’t great for streaming data, and Scalable Bloom Filters, while a better option, still present a memory problem. Another derivative, the Stable Bloom Filter (SBF), was proposed by Deng and Rafiei as a technique for detecting duplicates in unbounded data streams with limited space. Most solutions work by dividing the stream into fixed-size windows and solving the problem within that discrete space. This allows the use of traditional methods like the Bloom filter.

The SBF attempts to approximate duplicates without the use of windows. Clearly, we can’t store the entire history of an infinite stream in limited memory. Instead, the thinking is more recent data has more value than stale data in many scenarios. As an example, web crawlers may not care about redundantly fetching a web page that was crawled a long time ago compared to fetching a page that was crawled more recently. With this impetus, the SBF works by representing more recent data while discarding old information.

The Stable Bloom Filter tweaks the classic Bloom filter by replacing the bit array with an array of m cells, each allocated d bits. The cells are simply counters, initially set to zero. The maximum value, Max, of a cell is 2^d-1. When an element is added, we first have to ensure space for it. This is done by selecting P random cells and decrementing their counters by one, clamping to zero. Next, we hash the element to k cells and set their counters to Max. Testing membership consists of probing the k cells. If any of them are zero, it’s not a member. The Bloom filter is actually a special case of an SBF where d is one and P is zero.

Like traditional Bloom filters, an SBF has a non-zero probability of false positives, which is controlled by several parameters. Unlike the Bloom filter, an SBF has a tight upper bound on the rate of false positives while introducing a non-zero rate of false negatives. The false-positive rate of a classic Bloom filter eventually reaches one, after which all queries result in a false positive. The stable-point property of an SBF means the false-positive rate asymptotically approaches a configurable fixed constant.

Stable Bloom Filters lend themselves to situations where the size of the data set isn’t known ahead of time and memory is bounded. For example, an SBF can be used to deduplicate events from an unbounded event stream with a specified upper bound on false positives and minimal false negatives. In particular, if the stream is not uniformly distributed, meaning duplicates are likely to be grouped closer together, the rate of false positives becomes immaterial.

Counting Bloom Filter

Conventional Bloom filters don’t allow items to be removed. The Stable Bloom Filter, on the other hand, works by evicting old data, but its API doesn’t expose a way to remove specific elements. The Counting Bloom Filter (CBF) is yet another twist on this structure. Introduced by Fan et al. in Summary Cache: A Scalable Wide-Area Web Cache Sharing Protocol, the CBF provides a way of deleting data from a filter.

It’s actually a very straightforward approach. The filter comprises an array of n-bit buckets similar to the Stable Bloom Filter. When an item is added, the corresponding counters are incremented, and when it’s removed, the counters are decremented.

Obviously, a CBF takes n-times more space than a regular Bloom filter, but it also has a scalability limit. Unless items are removed frequently enough, a counting filter’s false-positive probability will approach one. We need to dimension it accordingly, and this normally requires inferring from our data set. Likewise, since the CBF allows removals, it exposes an opportunity for false negatives. The multi-bit counters diminish the rate, but it’s important to be cognizant of this property so the CBF can be applied appropriately.

Inverse Bloom Filter

One of the distinguishing features of Bloom filters is the guarantee of no false negatives. This can be a valuable invariant, but what if we want the opposite of that? Is there an efficient data structure that can offer us no false positives with the possibility of false negatives? The answer is deceptively simple.

Jeff Hodges describes a rather trivial—yet strikingly pragmatic—approach which he calls the “opposite of a Bloom filter.” Since that name is a bit of a mouthful, I’m referring to this as the Inverse Bloom Filter (IBF).

The Inverse Bloom Filter may report a false negative but can never report a false positive. That is, it may indicate that an item has not been seen when it actually has, but it will never report an item as seen which it hasn’t come across. The IBF behaves in a similar manner to a fixed-size hash map of m buckets which doesn’t handle conflicts, but it provides lock-free concurrency using an underlying CAS.

The test-and-add operation hashes an element to an index in the filter. We take the value at that index while atomically storing the new value, then the old value is compared to the new one. If they differ, the new value wasn’t a member. If the values are equivalent, it was in the set.

The Inverse Bloom Filter is a nice option for dealing with unbounded streams or large data sets due to its limited memory usage. If duplicates are close together, the rate of false negatives becomes vanishingly small with an adequately sized filter.

HyperLogLog

Let’s revisit the problem of counting distinct IP addresses visiting a website. One solution might be to allocate a sparse bit array proportional to the number of IPv4 addresses. Alternatively, we could combine a Bloom filter with a counter. When an address comes in and it’s not in the filter, increment the count. While these both work, they’re not likely to scale very well. Rather, we can apply a probabilistic data structure known as the HyperLogLog (HLL). First presented by Flajolet et al. in 2007, HyperLogLog is an algorithm which approximately counts the number of distinct elements, or cardinality, of a multiset (a set which allows multiple occurrences of its elements).

Imagine our stream as a series of coin tosses. Someone tells you the most heads they flipped in a row was three. You can guess that they didn’t flip the coin very many times. However, if they flipped 20 heads in a row, it probably took them quite a while. This seems like a really unconvincing way of estimating the number of tosses, but from this kernel of thought grows a fruitful idea.

HLL replaces heads and tails with zeros and ones. Instead of counting the longest run of heads, it counts the longest run of leading zeros in a binary hash. Using a good hash function means that the values are, more or less, statistically independent and uniformly distributed. If the maximum run of leading zeros is n, the number of distinct elements in the set is approximately 2^n. But wait, what’s the math behind this? If we think about it, half of all binary numbers start with 1. Each additional bit divides the probability of a run further in half; that is, 25% start with 01, 12.5% start with 001, 6.25% start with 0001, ad infinitum. Thus, the probability of a run of length n is 2^{-(n+1)}.

Like flipping a coin, there’s potential for an enormous amount of variance with this technique. For instance, it’s entirely possible—though unlikely—that you flip 20 heads in a row on your first try. One experiment doesn’t provide enough data, so instead you flip 10 coins. HyperLogLog uses a similar strategy to reduce variance by splitting the stream up across a set of buckets, or registers, and applying the counting algorithm on the values in each one. It tracks the longest run of zeros in every register, and the total cardinality is computed by taking the harmonic mean across all registers. The harmonic mean is used to discount outliers since the distribution of values tends to skew towards the right. We can increase accuracy by adding more registers, which of course comes at the expense of performance and memory.

HyperLogLog is a remarkable algorithm. It almost feels like magic, and it’s exceptionally useful for working with data streams. HLL has a fraction of the memory footprint of other solutions. In fact, it’s usually several orders of magnitude while remaining surprisingly accurate.

Count-Min Sketch

HyperLogLog is a great option to efficiently count the number of distinct elements in a stream using a minimal amount of space, but it only gives us cardinality. What about counting the frequencies of specific elements? Cormode and Muthukrishnan developed the Count-Min sketch (CM sketch) to approximate the occurrences of different event types from a stream of events.

In contrast to a hash table, the Count-Min sketch uses sub-linear space to count frequencies. It consists of a matrix with w columns and d rows. These parameters determine the trade-off between space/time constraints and accuracy. Each row has an associated hash function. When an element arrives, it’s hashed for each row. The corresponding index in the rows are incremented by one. In this regard, the CM sketch shares some similarities with the Bloom filter.

count_min_sketch

The frequency of an element is estimated by taking the minimum of all the element’s respective counter values. The thought process here is that there is possibility for collisions between elements, which affects the counters for multiple items.  Taking the minimum count results in a closer approximation.

The Count-Min sketch is an excellent tool for counting problems for the same reasons as HyperLogLog. It has a lot of similarities to Bloom filters, but where Bloom filters effectively represent sets, the CM sketch considers multisets.

MinHash

The last probabilistic technique we’ll briefly look at is MinHash. This algorithm, invented by Andrei Broder, is used to quickly estimate the similarity between two sets. This has a variety of uses, such as detecting duplicate bodies of text and clustering or comparing documents.

MinHash works, in part, by using the Jaccard coefficient, a statistic which represents the size of the intersection of two sets divided by the size of the union:

J(A,B) = {{|A \cap B|}\over{|A \cup B|}}

This provides a measure of how similar the two sets are, but computing the intersection and union is expensive. Instead, MinHash essentially works by comparing randomly selected subsets through element hashing. It resembles locality-sensitive hashing (LSH), which means that similar items have a greater tendency to map to the same hash values. This varies from most hashing schemes, which attempt to avoid collisions. Instead, LSH is designed to maximize collisions of similar elements.

MinHash is one of the more difficult algorithms to fully grasp, and I can’t even begin to provide a proper explanation. If you’re interested in how it works, read the literature but know that there are a number of variations of it. The important thing is to know it exists and what problems it can be applied to.

In Practice

We’ve gone over several fundamental primitives for processing large data sets and streams while solving a few different types of problems. Bloom filters can be used to reduce disk reads and other expensive operations. They can also be purposed to detect duplicate events in a stream or prune a large decision tree. HyperLogLog excels at counting the number of distinct items in a stream while using minimal space, and the Count-Min sketch tracks the frequency of particular elements. Lastly, the MinHash method provides an efficient mechanism for comparing the similarity between documents. This has a number of applications, namely around identifying duplicates and characterizing content.

While there are no doubt countless implementations for most of these data structures and algorithms, I’ve been putting together a Go library geared towards providing efficient techniques for stream processing called Boom Filters. It includes implementations for all of the probabilistic data structures outlined above. As streaming data and real-time consumption grow more and more prominent, it will become important that we have the tools and understanding to deal with them. If nothing else, it’s valuable to be aware of probabilistic methods because they often provide accurate results at a fraction of the cost. These things aren’t just academic research, they form the basis for building online, high-performance systems at scale.

On Hireability and Recruiting

Developers deal with recruiter emails on a daily basis. It’s frustrating because it’s almost always a shotgun approach, but occasionally you get something that strays from the path and, delightfully, it stands out. It’s refreshing to have someone who has actually taken the time to determine your skill set, technology background, and how you spend your free time with respect to software. You feel like they at least have an inkling of who you are. On the other hand, it’s irritating to be bombarded by contract-to-hire offers which aren’t even remotely tailored to you. What’s worse is when it’s ten different emails from ten different people working for the same recruiting agency.

There are basically two types of software jobs: the ones which just need warm bodies—commodity developers who mostly need to code the spec handed down from the Powers That Be—and the ones which need individual contributors—people which are hired to solve hard problems. ((These would be what Google refers to as smart creatives.)) I seem to paint the first in a negative light, which isn’t actually my intention. I know plenty of developers who are perfectly content with these types of jobs. There’s nothing wrong with that, but sometimes the wires seemingly get crossed.

The problem, as I see it, is that the communications meant for one type of person are getting received by people who don’t identify. It’s a numbers game, so it boils down to lazy recruiting. Throw enough paper airplanes and eventually one will land in the lap of someone who cares—the rest will poke everyone else’s eyes out. This isn’t a humblebrag about how I’m such an Awesome Developer™ or a suggestion that we should paint everyone with a broad brush, it’s merely what I’ve observed. People are welcome to disagree and do so openly.

Many companies I would love to work at have pinged me after coming across my GitHub profile or reading my blog. Recruiters for jobs I would never work at have emailed me after seeing my LinkedIn. The split isn’t completely black and white—there are places I’d work who’ve contacted me through LinkedIn, and vice versa—but there’s definitely a strong correlation. ((Granted, the sample size of recruiters emailing me through LinkedIn is probably an order-of-magnitude larger.)) The difference is that the former almost always receive an enthusiastic, well-thought-out response, while the latter goes straight to the trash. Doing otherwise would simply be too demoralizing. I’ve talked to other developers about this, and I know I’m not the only one. Bad recruiting is just causing people to be more jaded and cynical. I don’t know about other industries, but in software it seems to be particularly magnified.

The upshot is how to minimize the “bad communications” while maximizing the “good.” You can’t avoid bad recruiters. I suspect there are lists of names with email addresses and phone numbers which get circulated around staffing agencies. ((I wouldn’t be terribly surprised if some universities sold contact information for new graduates either.)) You can remove all contact information from your online persona, but then you make it a lot harder for the “good communications” to get through.

The best way to make yourself hireable isn’t to have a good resume and interviewing skills, it’s to do cool stuff in the open and talk about it. It can be scary because sometimes you crash and burn, but that’s just as valuable. Some find it terrifying to expose their work to the world. Half the reason I blog and share it publicly is to open my work and myself up to criticism. It stings sometimes, but you get over it. I wouldn’t be where I am today without people kicking me in the ass to do better. Other times, the community can actually learn and gain insight from your viewpoint and mindset. People are too modest.

Good developers aren’t doing this to get hired either, they’re doing it because they think it’s interesting in its own right. That’s the other half of why I blog (as well as to keep a journal for myself). The self-marketing is a byproduct. My side projects and blog posts have led to far more interesting conversations and opportunities than my resume or LinkedIn profile.

People should be more open to the idea of sharing their knowledge and work with the community. The benefits go both ways.

CS Literature of the Day

I read a lot of research papers and other nerdy computer science things in my spare time. I’m also a huge fan of Paper We Love, which is an awesome repository of academic CS papers and a community of people who read, share, and present them.

For the purposes of posterity and information-sharing, I thought it would be a good idea to share some of the nerdy things I read or watch like various papers, blog posts, and talks—all related to computer science. That’s why I’m tweeting a new piece of CS literature every day with the hashtag #CSLOTD and maintaining a GitHub repo containing that content called CS Literature of the Day.

To start, CSLOTD will just be literature which I come across myself and find interesting, but it’s completely open to contributors to share thought-provoking material. The repo won’t host any content but rather provide links to it.

I’m doing this because I’m enthusiastic about CS. I reserve the right to stop doing it at any time or miss days for whatever reason. I also tweet things that I think are interesting. If you don’t find them interesting, that’s your problem. As such, I reserve the right to reject submissions if I don’t find them interesting enough to include. I have pretty low standards though. :)

Fast, Scalable Networking in Go with Mangos

In the past, I’ve looked at nanomsg and why it’s a formidable alternative to the well-regarded ZeroMQ. Like ZeroMQ, nanomsg is a native library which markets itself as a way to build fast and scalable networking layers. I won’t go into detail on how nanomsg accomplishes this since my analysis of it already covers that fairly extensively, but instead I want to talk about a Go implementation of the protocol called Mangos. ((Full disclosure: I am a contributor on the Mangos project, but only because I was a user first!)) If you’re not familiar with nanomsg or Scalability Protocols, I recommend reading my overview of those first.

nanomsg is a shared library written in C. This, combined with its zero-copy API, makes it an extremely low-latency transport layer. While there are a lot of client bindings which allow you to use nanomsg from other languages, dealing with shared libraries can often be a pain—not to mention it complicates deployment.

More and more companies are starting to use Go for backend development because of its speed and concurrency primitives. It’s really good at building server components that scale. Go obviously provides the APIs needed for socket networking, but building a scalable distributed system that’s reliable using these primitives can be somewhat onerous. Solutions like nanomsg’s Scalability Protocols and ZeroMQ attempt to make this much easier by providing useful communication patterns and by taking care of other messaging concerns like queueing.

Naturally, there are Go bindings for nanomsg and ZeroMQ, but like I said, dealing with shared libraries can be fraught with peril. In Go (and often other languages), we tend to avoid loading native libraries if we can. It’s much easier to reason about, debug, and deploy a single binary than multiple. Fortunately, there’s a really nice implementation of nanomsg’s Scalability Protocols in pure Go called Mangos by Garrett D’Amore of illumos fame.

Mangos offers an idiomatic Go implementation and interface which affords us the same messaging patterns that nanomsg provides while maintaining compatibility. Pub/Sub, Pair, Req/Rep, Pipeline, Bus, and Survey are all there. It also supports the same pluggable transport model, allowing additional transports to be added (and extended ((Mangos supports TLS with the TCP transport as an experimental extension.))) on top of the base TCP, IPC, and inproc ones. ((A nanomsg WebSocket transport is currently in the works.)) Mangos has been tested for interoperability with nanomsg using the nanocat command-line interface.

One of the advantages of using a language like C is that it’s not garbage collected. However, if you’re using Go with nanomsg, you’re already paying the cost of GC. Mangos makes use of object pools in order to reduce pressure on the garbage collector. We can’t turn Go’s GC off, but we can make an effort to minimize pauses. This is critical for high-throughput systems, and Mangos tends to perform quite comparably to nanomsg.

Mangos (and nanomsg) has a very familiar, socket-like API. To show what this looks like, the code below illustrates a simple example of how the Pub/Sub protocol is used to build a fan-out messaging system.

My message queue test framework, Flotilla, uses the Req/Rep protocol to allow clients to send requests to distributed daemon processes, which handle them and respond. While this is a very simple use case where you could just as easily get away with raw TCP sockets, there are more advanced cases where Scalability Protocols make sense. We also get the added advantage of transport abstraction, so we’re not strictly tied to TCP sockets.

I’ve been building a distributed messaging system using Mangos as a means of federated communication. Pub/Sub enables a fan-out, interest-based broadcast and Bus facilitates many-to-many messaging. Both of these are exceptionally useful for connecting disparate systems. Mangos also supports an experimental new protocol called Star. This pattern is like Bus, but when a message is received by an immediate peer, it’s propagated to all other members of the topology.

My favorite Scalability Protocol is Survey. As I discussed in my nanomsg overview, there are a lot of really interesting applications of this. Survey allows a process to query the state of multiple peers in one shot. It’s similar to Pub/Sub in that the surveyor publishes a single message which is received by all the respondents (although there’s no topic subscriptions). The respondents then send a message back, and the surveyor collects these responses. We can also enforce a deadline on the respondent replies, which makes Survey particularly useful for service discovery.

With my messaging system, I’ve used Survey to implement a heartbeat protocol. When a broker spins up, it begins broadcasting a heartbeat using a Survey socket. New brokers can connect to existing ones, and they reply to the heartbeat which allows brokers to “discover” each other. If a heartbeat isn’t received before the deadline, the peer is removed. Mangos also handles reconnects, so if a broker goes offline and comes back up, peers will automatically reconnect.

To summarize, if you’re building distributed systems in Go, consider taking a look at Mangos. You can certainly roll your own messaging layer with raw sockets, but you’re going to end up writing a lot of logic for a robust system. Mangos, and nanomsg in general, gives you the right abstraction to quickly build systems that scale and are fast.

Benchmark Responsibly

When I posted my Dissecting Message Queues article last summer, it understandably caused some controversy.  I received both praise and scathing comments, emails asking why I didn’t benchmark X and pull requests to bump the numbers of Y. To be honest, that analysis was more of a brain dump from my own test driving of various message queues than any sort of authoritative or scientific study—it was far from the latter, to say the least. The qualitative discussion was pretty innocuous, but the benchmarks and supporting code were the target of a lot of (valid) criticism. In retrospect, it was probably irresponsible to publish them, but I was young and naive back then; now I’m just mostly naive.

Comparing Apples to Other Assorted Fruit

One such criticism was that the benchmarks were divided into two very broad categories: brokerless and brokered. While the brokerless group compared two very similar libraries, ZeroMQ and nanomsg, the second group included a number of distinct message brokers like RabbitMQ, Kafka, NATS, and Redis, to name a few.

The problem is not all brokers are created equal. They often have different goals and different prescribed use cases. As such, they impose different guarantees, different trade-offs, and different constraints. By grouping these benchmarks together, I implied they were fundamentally equivalent, when in fact, most were fundamentally different. For example, NATS serves a very different purpose than Kafka, and Redis, which offers pub/sub messaging, typically isn’t thought of as a message broker at all.

Measure Right or Don’t Measure at All

Another criticism was the way in which the benchmarks were performed. The tests were immaterial. The producer, consumer, and the message queue itself all ran on the same machine. Even worse, they used just a single publisher and subscriber. Not only does it not test what a remotely realistic configuration looks like, but it doesn’t even give you a good idea of a trivial one.

To be meaningful, we need to test with more than one producer and consumer, ideally distributed across many machines. We want to see how the system scales to larger workloads. Certainly, the producers and consumers cannot be collocated when we’re measuring discrete throughputs on either end, nor should the broker. This helps to reduce confounding variables between the system under test and the load generation.

It’s Not Rocket Science, It’s Computer Science

The third major criticism lay with the measurements themselves. Measuring throughput is fairly straightforward: we look at the number of messages sent per unit of time at both the sender and the receiver. If we think of a pipe carrying water, we might look at a discrete cross section and the rate at which water passes through it.

Latency, as a concept, is equally simple. With the pipe, it’s the time it takes for a drop of water to travel from one end to the other. While throughput is dependent on the pipe’s diameter, latency is dependent upon its length. What this means is that we can’t derive one from the other. In order to properly measure latency, we need to consider the latency of each message sent through the system.

However, we can’t ignore the relationship between throughput and latency and what the compromise between them means. Generally, we want to make things as fast as possible. Consider a single-cycle CPU. Its latency per instruction will be extremely low but contrasted with a pipelined processor, its throughput is abysmal—one instruction per clock cycle. The implication is that if we trade per-operation latency for throughput, we actually get a decrease in latency for aggregate instructions. Unfortunately, the benchmarks eschewed this relationship by requiring separate latency and throughput tests which used different code paths.

The interaction between latency and throughput is easy to get confused, but it often has interesting ramifications, whether you’re looking at message queues, CPUs, or databases. In a general sense, we’d say “optimize for latency” because lower latency means higher throughput, but the reality is it’s almost always easier (and more cost-effective) to increase throughput than it is to decrease latency, especially on commodity hardware.

Capturing this data, in and of itself, isn’t terribly difficult, but what’s more susceptible to error is how it’s represented. This was the main fault of the benchmarks (in addition to the things described earlier). The most egregious thing they did was report latency as an average. This is like the cardinal sin of benchmarking. The number is practically useless, particularly without any context like a standard deviation.

We know that latency isn’t going to be uniform, but it’s probably not going to follow a normal distribution either. While network latency may be prone to fitting a nice bell curve, system latency almost certainly won’t. They often exhibit things like GC pauses and other “hiccups,” and averages tend to hide these.

latency

Measuring performance isn’t all that easy, but if you do it, at least do it in a way that disambiguates the results. Look at quantiles, not averages. If you do present a mean, include the standard deviation and max in addition to the 90th or 99th percentile. Plotting latency by percentile distribution is an excellent way to see what your performance behavior actually looks like. Gil Tene has a great talk on measuring latency which I highly recommend.

Working Towards a Better Solution

With all this in mind, we can work towards building a better way to test and measure messaging systems. The discussion above really just gives us three key takeaways:

  1. Don’t compare apples to oranges.
  2. Don’t instrument tests in a way that’s not at all representative of real life.
  3. Don’t present results in a statistically insignificant way.

My first attempt at taking these ideas to heart is a tool I call Flotilla. It’s meant to provide a way to test messaging systems in more realistic configurations, at scale, while offering more useful data. Flotilla allows you to easily spin up producers and consumers on arbitrarily many machines, start a message broker, and run a benchmark against it, all in an automated fashion. It then collects data like producer/consumer throughput and the complete latency distribution and reports back to the user.

Flotilla uses a Go port of HdrHistogram to capture latency data, of which I’m a raving fan. HdrHistogram uses a bucketed approach to record values across a configured high-dynamic range at a particular resolution. Recording is in the single-nanosecond range and the memory footprint is constant. It also has support for correcting coordinated omission, which is a common problem in benchmarking. Seriously, if you’re doing anything performance sensitive, give HdrHistogram a look.

Still, Flotilla is not perfect and there’s certainly work to do, but I think it’s a substantial improvement over the previous MQ benchmarking utility. Longer term, it would be great to integrate it with something like Comcast to test workloads under different network conditions. Testing in a vacuum is nice and all, but we know in the real word, the network isn’t perfectly reliable.

So, Where Are the Benchmarks?

Omitted—for now, anyway. My goal really isn’t to rank a hodgepodge of different message queues because there’s really not much value in doing that. There are different use cases for different systems. I might, at some point, look at individual systems in greater detail, but comparing things like message throughput and latency just devolves into a hotly contested pissing contest. My hope is to garner more feedback and improvements to Flotilla before using it to definitively measure anything.

Benchmark responsibly.