You Cannot Have Exactly-Once Delivery

I’m often surprised that people continually have fundamental misconceptions about how distributed systems behave. I myself shared many of these misconceptions, so I try not to demean or dismiss but rather educate and enlighten, hopefully while sounding less preachy than that just did. I continue to learn only by following in the footsteps of others. In retrospect, it shouldn’t be surprising that folks buy into these fallacies as I once did, but it can be frustrating when trying to communicate certain design decisions and constraints.

Within the context of a distributed system, you cannot have exactly-once message delivery. Web browser and server? Distributed. Server and database? Distributed. Server and message queue? Distributed. You cannot have exactly-once delivery semantics in any of these situations.

As I’ve described in the past, distributed systems are all about trade-offs. This is one of them. There are essentially three types of delivery semantics: at-most-once, at-least-once, and exactly-once. Of the three, the first two are feasible and widely used. If you want to be super anal, you might say at-least-once delivery is also impossible because, technically speaking, network partitions are not strictly time-bound. If the connection from you to the server is interrupted indefinitely, you can’t deliver anything. Practically speaking, you have bigger fish to fry at that point—like calling your ISP—so we consider at-least-once delivery, for all intents and purposes, possible. With this model of thinking, network partitions are finitely bounded in time, however arbitrary this may be.

So where does the trade-off come into play, and why is exactly-once delivery impossible? The answer lies in the Two Generals thought experiment or the more generalized Byzantine Generals Problem, which I’ve looked at extensively. We must also consider the FLP result, which basically says, given the possibility of a faulty process, it’s impossible for a system of processes to agree on a decision.

In the letter I mail you, I ask you to call me once you receive it. You never do. Either you really didn’t care for my letter or it got lost in the mail. That’s the cost of doing business. I can send the one letter and hope you get it, or I can send 10 letters and assume you’ll get at least one of them. The trade-off here is quite clear (postage is expensive!), but sending 10 letters doesn’t really provide any additional guarantees. In a distributed system, we try to guarantee the delivery of a message by waiting for an acknowledgement that it was received, but all sorts of things can go wrong. Did the message get dropped? Did the ack get dropped? Did the receiver crash? Are they just slow? Is the network slow? Am slow? FLP and the Two Generals Problem are not design complexities, they are impossibility results.

People often bend the meaning of “delivery” in order to make their system fit the semantics of exactly-once, or in other cases, the term is overloaded to mean something entirely different. State-machine replication is a good example of this. Atomic broadcast protocols ensure messages are delivered reliably and in order. The truth is, we can’t deliver messages reliably and in order in the face of network partitions and crashes without a high degree of coordination. This coordination, of course, comes at a cost (latency and availability), while still relying on at-least-once semantics. Zab, the atomic broadcast protocol which lays the foundation for ZooKeeper, enforces idempotent operations.

State changes are idempotent and applying the same state change multiple times does not lead to inconsistencies as long as the application order is consistent with the delivery order. Consequently, guaranteeing at-least once semantics is sufficient and simplifies the implementation.

“Simplifies the implementation” is the authors’ attempt at subtlety. State-machine replication is just that, replicating state. If our messages have side effects, all of this goes out the window.

We’re left with a few options, all equally tenuous. When a message is delivered, it’s acknowledged immediately before processing. The sender receives the ack and calls it a day. However, if the receiver crashes before or during its processing, that data is lost forever. Customer transaction? Sorry, looks like you’re not getting your order. This is the worldview of at-most-once delivery. To be honest, implementing at-most-once semantics is more complicated than this depending on the situation. If there are multiple workers processing tasks or the work queues are replicated, the broker must be strongly consistent (or CP in CAP theorem parlance) so as to ensure a task is not delivered to any other workers once it’s been acked. Apache Kafka uses ZooKeeper to handle this coordination.

On the other hand, we can acknowledge messages after they are processed. If the process crashes after handling a message but before acking (or the ack isn’t delivered), the sender will redeliver. Hello, at-least-once delivery. Furthermore, if you want to deliver messages in order to more than one site, you need an atomic broadcast which is a huge burden on throughput. Fast or consistent. Welcome to the world of distributed systems.

Every major message queue in existence which provides any guarantees will market itself as at-least-once delivery. If it claims exactly-once, it’s because they are lying to your face in hopes that you will buy it or they themselves do not understand distributed systems. Either way, it’s not a good indicator.

RabbitMQ attempts to provide guarantees along these lines:

When using confirms, producers recovering from a channel or connection failure should retransmit any messages for which an acknowledgement has not been received from the broker. There is a possibility of message duplication here, because the broker might have sent a confirmation that never reached the producer (due to network failures, etc). Therefore consumer applications will need to perform deduplication or handle incoming messages in an idempotent manner.

The way we achieve exactly-once delivery in practice is by faking it. Either the messages themselves should be idempotent, meaning they can be applied more than once without adverse effects, or we remove the need for idempotency through deduplication. Ideally, our messages don’t require strict ordering and are commutative instead. There are design implications and trade-offs involved with whichever route you take, but this is the reality in which we must live.

Rethinking operations as idempotent actions might be easier said than done, but it mostly requires a change in the way we think about state. This is best described by revisiting the replicated state machine. Rather than distributing operations to apply at various nodes, what if we just distribute the state changes themselves? Rather than mutating state, let’s just report facts at various points in time. This is effectively how Zab works.

Imagine we want to tell a friend to come pick us up. We send him a series of text messages with turn-by-turn directions, but one of the messages is delivered twice! Our friend isn’t too happy when he finds himself in the bad part of town. Instead, let’s just tell him where we are and let him figure it out. If the message gets delivered more than once, it won’t matter. The implications are wider reaching than this, since we’re still concerned with the ordering of messages, which is why solutions like commutative and convergent replicated data types are becoming more popular. That said, we can typically solve this problem through extrinsic means like sequencing, vector clocks, or other partial-ordering mechanisms. It’s usually causal ordering that we’re after anyway. People who say otherwise don’t quite realize that there is no now in a distributed system.

To reiterate, there is no such thing as exactly-once delivery. We must choose between the lesser of two evils, which is at-least-once delivery in most cases. This can be used to simulate exactly-once semantics by ensuring idempotency or otherwise eliminating side effects from operations. Once again, it’s important to understand the trade-offs involved when designing distributed systems. There is asynchrony abound, which means you cannot expect synchronous, guaranteed behavior. Design for failure and resiliency against this asynchronous nature.

If State Is Hell, SOA Is Satan

More and more companies are describing their success stories regarding the switch to a service-oriented architecture. As with any technological upswing, there’s a clear and palpable hype factor involved (Big Data™ or The Cloud™ anyone?), but obviously it’s not just puff.

While microservices and SOA have seen a staggering rate of adoption in recent years, the mindset of developers often seems to be stuck in the past. I think this is, at least in part, because we seek a mental model we can reason about. It’s why we build abstractions in the first place. In a sense, I would argue there’s a comparison to be made between the explosion of OOP in the early 90’s and today’s SOA trend. After all, SOA is as much about people scale as it is about workload scale, so it makes sense from an organizational perspective.

The Perils of Good Abstractions

While systems are becoming more and more distributed, abstractions are attempting to make them less and less complex. Mesosphere is a perfect example of this, attempting to provide the “datacenter operating system.” Apache Mesos allows you to “program against your datacenter like it’s a single pool of resources.” It’s an appealing proposition to say the least. PaaS like Google App Engine and Heroku offer similar abstractions—write your code without thinking about scale. The problem is you absolutely have to think about scale or you’re bound to run into problems down the road. And while these abstractions are nice, they can be dangerous just the same. Welcome to the perils of good abstractions.

I like to talk about App Engine because I have firsthand experience with it. It’s an easy sell for startups. It handles spinning up instances when you need them, turning them down when you don’t. It’s your app server, database, caching, job scheduler, task queue all in one, and it does it at scale. There’s vendor lock-in, sure, yet it means no ops, no sysadmins, no overhead. Push to deploy. But it’s a leaky abstraction. It has to be. App Engine scales because it’s distributed, but it allows—no, encourages—you to write your system as a monolith. The datastore, memcache, and task queue accesses are masked as RPCs. This is great for our developer mental model, but it will bite you if you’re not careful. App Engine imposes certain limitations to encourage good design; for instance, front-end requests and datastore calls are limited to 60 seconds (it used to be much less), but the leakiness goes beyond that.

RPC is consistently at odds with distributed systems. I would go so far as to say it’s an anti-pattern in many cases. RPC encourages writing synchronous code, but distributed systems are inherently asynchronous. The network is not reliable. The network is not fast. The network is not your friend. Developers who either don’t understand this or don’t realize what’s happening when they make an RPC will write code as if they were calling a function. It will sure as hell look like just calling a function. When we think synchronously, we end up with systems that are slow, fault intolerant, and generally not scalable. To be quite honest, however, this is perfectly acceptable for 90% of startups as they are getting off the ground because they don’t have workloads at meaningful scale.

There’s certainly some irony here. One of the selling points of App Engine is its ability to scale to large amounts of traffic, yet the vast majority of startups would be perfectly suited to scaling up rather than out, perhaps with some failover in place for good measure. Stack Overflow is the poster child of scale-up architecture. In truth, your architecture should be a function of your access patterns, not the other way around (and App Engine is very much tailored to a specific set of access patterns). Nonetheless, it shows that vertical scaling can work. I would bet a lot of startups could sufficiently run on a large, adequately specced machine or maybe a small handful of them.

The cruel irony is that once you hit a certain scale with App Engine, both in terms of your development organization and user base, you’ve reached a point where you have to migrate off it. And if your data model isn’t properly thought out, you will without a doubt hit scale problems. It’s to the point where you need someone with deep knowledge of how App Engine works in order to build quality systems on it. Good luck hiring a team of engineers who understand it. GAE is great at accelerating you to 100 mph, but you better have some nice airbags for the brick wall it launches you into. In fairness, this is a problem every org hits—Conway’s law is very much a reality and every startup has growing pains. To be clear, this isn’t a jab at GAE, which is actually very effective at accelerating a product using little capital and can sustain long-term success given the right use case. Instead, I use it to illustrate a point.

Peering Through the Abstraction

Eventually SOA makes sense, but our abstractions can cause problems if we don’t understand what’s going on behind the curtain (hence the leakiness). Partial failure is all but guaranteed, and latency, partitioning, and other network pressure happens all the time.

Ken Arnold is famed with once saying “state is hell” in reference to designing distributed systems. In the past, I’ve written how scaling shared data is hard, but with SOA it’s practically a requirement. Ken is right though—state is hell, and SOA is fundamentally competing with consistency. The FLP Impossibility result and the CAP theorem can prove it formally, but really this should be intuitively obvious if we accept the laws of physics.

On the other hand, if you store information that I can’t reconstruct, then a whole host of questions suddenly surface. One question is, “Are you now a single point of failure?” I have to talk to you now. I can’t talk to anyone else. So what happens if you go down?

To deal with that, you could be replicated. But now you have to worry about replication strategies. What if I talk to one replicant and modify some data, then I talk to another? Is that modification guaranteed to have already arrived there? What is the replication strategy? What kind of consistency do you need—tight or loose? What happens if the network gets partitioned and the replicants can’t talk to each other? Can anybody proceed?

Essentially, the more stateful your system is, the harder it’s going to be to scale it because distributing that state introduces a rich tapestry of problems. In practice, we often can’t eliminate state wholesale, but basically everything that can be stateless should be stateless.

Making servers disposable allows you a great deal of flexibility. Former Netflix Cloud Architect Adrian Cockcroft articulates this idea well:

You want to think of servers like cattle, not pets. If you have a machine in production that performs a specialized function, and you know it by name, and everyone gets sad when it goes down, it’s a pet. Instead you should think of your servers like a herd of cows. What you care about is how many gallons of milk you get. If one day you notice you’re getting less milk than usual, you find out which cows aren’t producing well and replace them.

This is effectively how App Engine achieves its scalability. With lightweight, stateless, and disposable instances, it can spin them up and down on the fly without worrying about being in an invalid state.

App Engine also relies on eventual consistency as the default model for datastore interactions. This makes queries fast and highly available, while snapshot isolation can be achieved using entity-group transactions if necessary. The latter, of course, can result in a lot of contention and latency. Yet, people seem to have a hard time grappling with the reality of eventual consistency in distributed systems. State is hell, but calling SOA “satan” is clearly a hyperbole. It is a tough problem nevertheless.

A State of Mind

In the situations where we need state, we have to reconcile with the realities of distributed systems. This means understanding the limitations and accepting the complexities, not papering over them. It doesn’t mean throwing away abstractions. Fortunately, distributed computing is the focus of a lot of great research, so there are primitives with which we can build: immutability, causal ordering, eventual consistency, CRDTs, and other ideas.

As long as we recognize the trade-offs, we can design around them. The crux is knowing they exist in the first place. We can’t have ACID semantics while remaining highly available, but we can use Highly Available Transactions to provide strong-enough guarantees. At the same time, not all operations require coordination or concurrency control. The sooner we view eventual consistency as a solution and not a consequence, the sooner we can let go of this existential crisis. Other interesting research includes BOOM, which seeks to provide a high-level, declarative approach to distributed programming.

State might be hell, but it’s a hell we have to live. I don’t advocate an all-out microservice architecture for a company just getting its start. The complications far outweigh any benefits to be gained, but it becomes a necessity at a certain point. The key is having an exit strategy. PaaS providers make this difficult due to vendor lock-in and architectural constraints. Weigh their advantages carefully.

Once you do transition to a SOA, make as many of those services, or the pieces backing them, as stateless as possible. For those which aren’t stateless, know that the problem typically isn’t novel. These problems have been solved or are continuing to be solved in new and interesting ways. Academic research is naturally at the bleeding edge with industry often lagging behind. OOP concepts date back to as early as the 60’s but didn’t gain widespread adoption until several decades later. Distributed computing is no different. SOA is just a state of mind.

Stream Processing and Probabilistic Methods: Data at Scale

Stream processing and related abstractions have become all the rage following the rise of systems like Apache Kafka, Samza, and the Lambda architecture. Applying the idea of immutable, append-only event sourcing means we’re storing more data than ever before. However, as the cost of storage continues to decline, it’s becoming more feasible to store more data for longer periods of time. With immutability, how the data lives isn’t interesting anymore. It’s all about how it moves.

The shifting landscape of data architecture parallels the way we’re designing systems today. Specifically, the evolution of monolithic to service-oriented architecture necessitates a change in the way we do data integration. The traditional normalization approach doesn’t cut it. Our systems are composed of databases, caches, search indexes, data warehouses, and a multitude of other components. Moreover, there’s an increasing demand for online, real-time processing of this data that’s tantamount to the growing popularity of large-scale, offline processing along the lines of Hadoop. This presents an interesting set of new challenges, namely, how do we drink from the firehose without getting drenched?

The answer most likely lies in frameworks like Samza, Storm, and Spark Streaming. Similarly, tools like StatsD solve the problem of collecting real-time analytics. However, this discussion is meant to explore some of the primitives used in stream processing. The ideas extend far beyond event sourcing, generalizing to any type of data stream, unbounded or not.

Batch vs. Streaming

With offline or batch processing, we often have some heuristics which provide insight into our data set, and we can typically afford multiple passes of the data. Without a strict time constraint, data structures are less important. We can store the entire data set on disk (or perhaps across a distributed file system) and process it in batches.

With streaming data, however, there’s a need for near real-time processing—collecting analytics, monitoring and alerting, updating search indexes and caches, etc. With web crawlers, we process a stream of URLs and documents and produce indexed content. With websites, we process a stream of page views and update various counters and gauges. With email, we process a stream of text and produce a filtered, spam-free inbox. These cases involve massive, often limitless data sets. Processing that data online can’t be done with the conventional ETL or MapReduce-style methods, and unless you’re doing windowed processing, it’s entirely impractical to store that data in memory.

Framing the Problem

As a concrete example of stream processing, imagine we want to count the number of distinct document views across a large corpus, say, Wikipedia. A naive solution would be to use a hash table which maps a document to a count. Wikipedia has roughly 35 million pages. Let’s assume each document is identified by a 16-byte GUID, and the counters are stored as 8-byte integers. This means we need in the ball park of a gigabyte of memory. Given today’s hardware, this might not sound completely unreasonable, but now let’s say we want to track views per unique IP address. We see that this approach quickly becomes intractable.

To illustrate further, consider how to count the cardinality of IP addresses which access our website. Instead of a hash table, we can use a bitmap or sparse bit array to count addresses. There are over four billion possible distinct IPv4 addresses. Sure, we could allocate half a gigabyte of RAM, but if you’re developing performance-critical systems, large heap sizes are going to kill you, and this overhead doesn’t help. ((Granted, if you’re sensitive to garbage-collection pauses, you may be better off using something like C or Rust, but that’s not always the most practical option.))

A Probabilistic Approach

Instead, we turn to probabilistic ways of solving these problems. Probabilistic algorithms and data structures seem to be oft-overlooked, or maybe purposely ignored, by system designers despite the theory behind them having been around for a long time in many cases. The goal should be to move these from the world of academia to industry because they are immensely useful and widely neglected.

Probabilistic solutions trade space and performance for accuracy. While a loss in precision may be a cause for concern to some, the fact is with large data streams, we have to trade something off to get real-time insight. Much like the CAP theorem, we must choose between consistency (accuracy) and availability (online). With CAP, we can typically adjust the level in which that trade-off occurs. This space/performance-accuracy exchange behaves very much the same way.

The literature can get pretty dense, so let’s look at what some of these approaches are and the problems they solve in a way that’s (hopefully) understandable.

Bloom Filters

The Bloom filter is probably the most well-known and, conceptually, simplest probabilistic data structure. It also serves as a good foundation because there are a lot of twists you can put on it. The theory provides a kernel from which many other probabilistic solutions are derived, as we will see.

Bloom filters answer a simple question: is this element a member of a set? A normal set requires linear space because it stores every element. A Bloom filter doesn’t store the actual elements, it merely stores the “membership” of them. It uses sub-linear space opening the possibility for false positives, meaning there’s a non-zero probability it reports an item is in the set when it’s actually not. This has a wide range of applications. For example, a Bloom filter can be placed in front of a database. When a query for a piece of data comes in and the filter doesn’t contain it, we completely bypass the database.

The Bloom filter consists of a bit array of length and hash functions. Both of these parameters are configurable, but we can optimize them based on a desired rate of false positives. Each bit in the array is initially unset. When an element is “added” to the filter, it’s hashed by each of the functions, h1…hk, and modded by m, resulting in indices into the bit array. Each bit at the respective index is set.

To query the membership of an element, we hash it and check if each bit is set. If any of them are zero, we know the item isn’t in the set. What happens when two different elements hash to the same index? This is where false positives are introduced. If the bits aren’t set, we know the element wasn’t added, but if they are, it’s possible that some other element or group of elements hashed to the same indices. Bloom filters have false positives, but false negatives are impossible—a member will never be reported incorrectly as a non-member. Unfortunately, this also means items can’t be removed from the filter.

It’s clear that the likelihood of false positives increases with each element added to the filter. We can target a specific probability of false positives by selecting an optimal value for m and k for up to n insertions. ((The math behind determining optimal m and k is left as an exercise for the reader.)) While this implementation works well in practice, it has a drawback in that some elements are potentially more sensitive to false positives than others. We can solve this problem by partitioning the m bits among the k hash functions such that each one produces an index over its respective partition. As a result, each element is always described by exactly bits. This prevents any one element from being especially sensitive to false positives. Since calculating k hashes for every element is computationally expensive, we can actually perform a single hash and derive k hashes from it for a significant speedup. ((Less Hashing, Same Performance: Building a Better Bloom Filter discusses the use of two hash functions to simulate additional hashes. We can use a 64-bit hash, such as FNV, and use the upper and lower 32-bits as two different hashes.))

A Bloom filter eventually reaches a point where all bits are set, which means every query will indicate membership, effectively making the probability of false positives one. The problem with this is it requires a priori knowledge of the data set in order to select optimal parameters and avoid “overfilling.” Consequently, Bloom filters are ideal for offline processing and not so great for dealing with streams. Next, we’ll look at some variations of the Bloom filter which attempt to deal with this issue.

Scalable Bloom Filter

As we saw earlier, traditional Bloom filters are a great way to deal with set-membership problems in a space-efficient way, but they require knowing the size of the data set ahead of time in order to be effective. The Scalable Bloom Filter (SBF) was introduced by Almeida et al. as a way to cope with the capacity dilemma.

The Scalable Bloom Filter dynamically adapts to the size of the data set while enforcing a tight upper bound on the rate of false positives. Like the classic Bloom filter, false negatives are impossible. The SBF is essentially an array of Bloom filters with geometrically decreasing false-positive rates. New elements are added to the last filter. When this filter becomes “full”—more specifically, when it reaches a target fill ratio—a new filter is added with a tightened error probability. A tightening ratio, r, controls the growth of new filters.

Testing membership of an element consists of checking each of the filters. The geometrically decreasing false-positive rate of each filter is an interesting property. Since the fill ratio determines when a new filter is added, it turns out this progression can be modeled as a Taylor series which allows us to provide a tight upper bound on false positives using an optimal fill ratio of 0.5 (once the filter is 50% full, a new one is added). The compounded probability over the whole series converges to a target value, even accounting for an infinite series.

We can limit the error rate, but obviously the trade-off is that we end up allocating memory proportional to the size of the data set since we’re continuously adding filters. We also pay some computational cost on adds. Amortized, the cost of filter insertions is negligible, but for every add we must compute the current fill ratio of the filter to determine if a new filter needs to be added. Fortunately, we can optimize this by computing an estimated fill ratio. If we keep a running count of the items added to the filter, n, the approximate current fill ratio, p, can be obtained from the Taylor series expansion with p \approx 1-e^{-n/m} where m is the number of bits in the filter. Calculating this estimate turns out to be quite a bit faster than computing the actual fill ratio—fewer memory reads.

To provide some context around performance, adds in my Go implementation of a classic Bloom filter take about 166 ns on my MacBook Pro, and membership tests take 118 ns. With the SBF, adds take 422 ns and tests 113 ns. This isn’t a completely fair comparison since, as the SBF grows over time, tests require scanning through each filter, but certainly the add numbers are intuitive.

Scalable Bloom Filters are useful for cases where the size of the data set isn’t known a priori and memory constraints aren’t of particular concern. It’s an effective variation of a regular Bloom filter which generally requires space allocation orders-of-magnitude larger than the data set to allow enough headroom.

Stable Bloom Filter

Classic Bloom filters aren’t great for streaming data, and Scalable Bloom Filters, while a better option, still present a memory problem. Another derivative, the Stable Bloom Filter (SBF), was proposed by Deng and Rafiei as a technique for detecting duplicates in unbounded data streams with limited space. Most solutions work by dividing the stream into fixed-size windows and solving the problem within that discrete space. This allows the use of traditional methods like the Bloom filter.

The SBF attempts to approximate duplicates without the use of windows. Clearly, we can’t store the entire history of an infinite stream in limited memory. Instead, the thinking is more recent data has more value than stale data in many scenarios. As an example, web crawlers may not care about redundantly fetching a web page that was crawled a long time ago compared to fetching a page that was crawled more recently. With this impetus, the SBF works by representing more recent data while discarding old information.

The Stable Bloom Filter tweaks the classic Bloom filter by replacing the bit array with an array of m cells, each allocated d bits. The cells are simply counters, initially set to zero. The maximum value, Max, of a cell is 2^d-1. When an element is added, we first have to ensure space for it. This is done by selecting P random cells and decrementing their counters by one, clamping to zero. Next, we hash the element to k cells and set their counters to Max. Testing membership consists of probing the k cells. If any of them are zero, it’s not a member. The Bloom filter is actually a special case of an SBF where d is one and P is zero.

Like traditional Bloom filters, an SBF has a non-zero probability of false positives, which is controlled by several parameters. Unlike the Bloom filter, an SBF has a tight upper bound on the rate of false positives while introducing a non-zero rate of false negatives. The false-positive rate of a classic Bloom filter eventually reaches one, after which all queries result in a false positive. The stable-point property of an SBF means the false-positive rate asymptotically approaches a configurable fixed constant.

Stable Bloom Filters lend themselves to situations where the size of the data set isn’t known ahead of time and memory is bounded. For example, an SBF can be used to deduplicate events from an unbounded event stream with a specified upper bound on false positives and minimal false negatives. In particular, if the stream is not uniformly distributed, meaning duplicates are likely to be grouped closer together, the rate of false positives becomes immaterial.

Counting Bloom Filter

Conventional Bloom filters don’t allow items to be removed. The Stable Bloom Filter, on the other hand, works by evicting old data, but its API doesn’t expose a way to remove specific elements. The Counting Bloom Filter (CBF) is yet another twist on this structure. Introduced by Fan et al. in Summary Cache: A Scalable Wide-Area Web Cache Sharing Protocol, the CBF provides a way of deleting data from a filter.

It’s actually a very straightforward approach. The filter comprises an array of n-bit buckets similar to the Stable Bloom Filter. When an item is added, the corresponding counters are incremented, and when it’s removed, the counters are decremented.

Obviously, a CBF takes n-times more space than a regular Bloom filter, but it also has a scalability limit. Unless items are removed frequently enough, a counting filter’s false-positive probability will approach one. We need to dimension it accordingly, and this normally requires inferring from our data set. Likewise, since the CBF allows removals, it exposes an opportunity for false negatives. The multi-bit counters diminish the rate, but it’s important to be cognizant of this property so the CBF can be applied appropriately.

Inverse Bloom Filter

One of the distinguishing features of Bloom filters is the guarantee of no false negatives. This can be a valuable invariant, but what if we want the opposite of that? Is there an efficient data structure that can offer us no false positives with the possibility of false negatives? The answer is deceptively simple.

Jeff Hodges describes a rather trivial—yet strikingly pragmatic—approach which he calls the “opposite of a Bloom filter.” Since that name is a bit of a mouthful, I’m referring to this as the Inverse Bloom Filter (IBF).

The Inverse Bloom Filter may report a false negative but can never report a false positive. That is, it may indicate that an item has not been seen when it actually has, but it will never report an item as seen which it hasn’t come across. The IBF behaves in a similar manner to a fixed-size hash map of m buckets which doesn’t handle conflicts, but it provides lock-free concurrency using an underlying CAS.

The test-and-add operation hashes an element to an index in the filter. We take the value at that index while atomically storing the new value, then the old value is compared to the new one. If they differ, the new value wasn’t a member. If the values are equivalent, it was in the set.

The Inverse Bloom Filter is a nice option for dealing with unbounded streams or large data sets due to its limited memory usage. If duplicates are close together, the rate of false negatives becomes vanishingly small with an adequately sized filter.

HyperLogLog

Let’s revisit the problem of counting distinct IP addresses visiting a website. One solution might be to allocate a sparse bit array proportional to the number of IPv4 addresses. Alternatively, we could combine a Bloom filter with a counter. When an address comes in and it’s not in the filter, increment the count. While these both work, they’re not likely to scale very well. Rather, we can apply a probabilistic data structure known as the HyperLogLog (HLL). First presented by Flajolet et al. in 2007, HyperLogLog is an algorithm which approximately counts the number of distinct elements, or cardinality, of a multiset (a set which allows multiple occurrences of its elements).

Imagine our stream as a series of coin tosses. Someone tells you the most heads they flipped in a row was three. You can guess that they didn’t flip the coin very many times. However, if they flipped 20 heads in a row, it probably took them quite a while. This seems like a really unconvincing way of estimating the number of tosses, but from this kernel of thought grows a fruitful idea.

HLL replaces heads and tails with zeros and ones. Instead of counting the longest run of heads, it counts the longest run of leading zeros in a binary hash. Using a good hash function means that the values are, more or less, statistically independent and uniformly distributed. If the maximum run of leading zeros is n, the number of distinct elements in the set is approximately 2^n. But wait, what’s the math behind this? If we think about it, half of all binary numbers start with 1. Each additional bit divides the probability of a run further in half; that is, 25% start with 01, 12.5% start with 001, 6.25% start with 0001, ad infinitum. Thus, the probability of a run of length n is 2^{-(n+1)}.

Like flipping a coin, there’s potential for an enormous amount of variance with this technique. For instance, it’s entirely possible—though unlikely—that you flip 20 heads in a row on your first try. One experiment doesn’t provide enough data, so instead you flip 10 coins. HyperLogLog uses a similar strategy to reduce variance by splitting the stream up across a set of buckets, or registers, and applying the counting algorithm on the values in each one. It tracks the longest run of zeros in every register, and the total cardinality is computed by taking the harmonic mean across all registers. The harmonic mean is used to discount outliers since the distribution of values tends to skew towards the right. We can increase accuracy by adding more registers, which of course comes at the expense of performance and memory.

HyperLogLog is a remarkable algorithm. It almost feels like magic, and it’s exceptionally useful for working with data streams. HLL has a fraction of the memory footprint of other solutions. In fact, it’s usually several orders of magnitude while remaining surprisingly accurate.

Count-Min Sketch

HyperLogLog is a great option to efficiently count the number of distinct elements in a stream using a minimal amount of space, but it only gives us cardinality. What about counting the frequencies of specific elements? Cormode and Muthukrishnan developed the Count-Min sketch (CM sketch) to approximate the occurrences of different event types from a stream of events.

In contrast to a hash table, the Count-Min sketch uses sub-linear space to count frequencies. It consists of a matrix with w columns and d rows. These parameters determine the trade-off between space/time constraints and accuracy. Each row has an associated hash function. When an element arrives, it’s hashed for each row. The corresponding index in the rows are incremented by one. In this regard, the CM sketch shares some similarities with the Bloom filter.

count_min_sketch

The frequency of an element is estimated by taking the minimum of all the element’s respective counter values. The thought process here is that there is possibility for collisions between elements, which affects the counters for multiple items.  Taking the minimum count results in a closer approximation.

The Count-Min sketch is an excellent tool for counting problems for the same reasons as HyperLogLog. It has a lot of similarities to Bloom filters, but where Bloom filters effectively represent sets, the CM sketch considers multisets.

MinHash

The last probabilistic technique we’ll briefly look at is MinHash. This algorithm, invented by Andrei Broder, is used to quickly estimate the similarity between two sets. This has a variety of uses, such as detecting duplicate bodies of text and clustering or comparing documents.

MinHash works, in part, by using the Jaccard coefficient, a statistic which represents the size of the intersection of two sets divided by the size of the union:

J(A,B) = {{|A \cap B|}\over{|A \cup B|}}

This provides a measure of how similar the two sets are, but computing the intersection and union is expensive. Instead, MinHash essentially works by comparing randomly selected subsets through element hashing. It resembles locality-sensitive hashing (LSH), which means that similar items have a greater tendency to map to the same hash values. This varies from most hashing schemes, which attempt to avoid collisions. Instead, LSH is designed to maximize collisions of similar elements.

MinHash is one of the more difficult algorithms to fully grasp, and I can’t even begin to provide a proper explanation. If you’re interested in how it works, read the literature but know that there are a number of variations of it. The important thing is to know it exists and what problems it can be applied to.

In Practice

We’ve gone over several fundamental primitives for processing large data sets and streams while solving a few different types of problems. Bloom filters can be used to reduce disk reads and other expensive operations. They can also be purposed to detect duplicate events in a stream or prune a large decision tree. HyperLogLog excels at counting the number of distinct items in a stream while using minimal space, and the Count-Min sketch tracks the frequency of particular elements. Lastly, the MinHash method provides an efficient mechanism for comparing the similarity between documents. This has a number of applications, namely around identifying duplicates and characterizing content.

While there are no doubt countless implementations for most of these data structures and algorithms, I’ve been putting together a Go library geared towards providing efficient techniques for stream processing called Boom Filters. It includes implementations for all of the probabilistic data structures outlined above. As streaming data and real-time consumption grow more and more prominent, it will become important that we have the tools and understanding to deal with them. If nothing else, it’s valuable to be aware of probabilistic methods because they often provide accurate results at a fraction of the cost. These things aren’t just academic research, they form the basis for building online, high-performance systems at scale.

On Hireability and Recruiting

Developers deal with recruiter emails on a daily basis. It’s frustrating because it’s almost always a shotgun approach, but occasionally you get something that strays from the path and, delightfully, it stands out. It’s refreshing to have someone who has actually taken the time to determine your skill set, technology background, and how you spend your free time with respect to software. You feel like they at least have an inkling of who you are. On the other hand, it’s irritating to be bombarded by contract-to-hire offers which aren’t even remotely tailored to you. What’s worse is when it’s ten different emails from ten different people working for the same recruiting agency.

There are basically two types of software jobs: the ones which just need warm bodies—commodity developers who mostly need to code the spec handed down from the Powers That Be—and the ones which need individual contributors—people which are hired to solve hard problems. ((These would be what Google refers to as smart creatives.)) I seem to paint the first in a negative light, which isn’t actually my intention. I know plenty of developers who are perfectly content with these types of jobs. There’s nothing wrong with that, but sometimes the wires seemingly get crossed.

The problem, as I see it, is that the communications meant for one type of person are getting received by people who don’t identify. It’s a numbers game, so it boils down to lazy recruiting. Throw enough paper airplanes and eventually one will land in the lap of someone who cares—the rest will poke everyone else’s eyes out. This isn’t a humblebrag about how I’m such an Awesome Developer™ or a suggestion that we should paint everyone with a broad brush, it’s merely what I’ve observed. People are welcome to disagree and do so openly.

Many companies I would love to work at have pinged me after coming across my GitHub profile or reading my blog. Recruiters for jobs I would never work at have emailed me after seeing my LinkedIn. The split isn’t completely black and white—there are places I’d work who’ve contacted me through LinkedIn, and vice versa—but there’s definitely a strong correlation. ((Granted, the sample size of recruiters emailing me through LinkedIn is probably an order-of-magnitude larger.)) The difference is that the former almost always receive an enthusiastic, well-thought-out response, while the latter goes straight to the trash. Doing otherwise would simply be too demoralizing. I’ve talked to other developers about this, and I know I’m not the only one. Bad recruiting is just causing people to be more jaded and cynical. I don’t know about other industries, but in software it seems to be particularly magnified.

The upshot is how to minimize the “bad communications” while maximizing the “good.” You can’t avoid bad recruiters. I suspect there are lists of names with email addresses and phone numbers which get circulated around staffing agencies. ((I wouldn’t be terribly surprised if some universities sold contact information for new graduates either.)) You can remove all contact information from your online persona, but then you make it a lot harder for the “good communications” to get through.

The best way to make yourself hireable isn’t to have a good resume and interviewing skills, it’s to do cool stuff in the open and talk about it. It can be scary because sometimes you crash and burn, but that’s just as valuable. Some find it terrifying to expose their work to the world. Half the reason I blog and share it publicly is to open my work and myself up to criticism. It stings sometimes, but you get over it. I wouldn’t be where I am today without people kicking me in the ass to do better. Other times, the community can actually learn and gain insight from your viewpoint and mindset. People are too modest.

Good developers aren’t doing this to get hired either, they’re doing it because they think it’s interesting in its own right. That’s the other half of why I blog (as well as to keep a journal for myself). The self-marketing is a byproduct. My side projects and blog posts have led to far more interesting conversations and opportunities than my resume or LinkedIn profile.

People should be more open to the idea of sharing their knowledge and work with the community. The benefits go both ways.

CS Literature of the Day

I read a lot of research papers and other nerdy computer science things in my spare time. I’m also a huge fan of Paper We Love, which is an awesome repository of academic CS papers and a community of people who read, share, and present them.

For the purposes of posterity and information-sharing, I thought it would be a good idea to share some of the nerdy things I read or watch like various papers, blog posts, and talks—all related to computer science. That’s why I’m tweeting a new piece of CS literature every day with the hashtag #CSLOTD and maintaining a GitHub repo containing that content called CS Literature of the Day.

To start, CSLOTD will just be literature which I come across myself and find interesting, but it’s completely open to contributors to share thought-provoking material. The repo won’t host any content but rather provide links to it.

I’m doing this because I’m enthusiastic about CS. I reserve the right to stop doing it at any time or miss days for whatever reason. I also tweet things that I think are interesting. If you don’t find them interesting, that’s your problem. As such, I reserve the right to reject submissions if I don’t find them interesting enough to include. I have pretty low standards though. :)