Building a Distributed Log from Scratch, Part 4: Trade-Offs and Lessons Learned

In part three of this series we talked about scaling message delivery in a distributed log. In part four, we’ll look at some key trade-offs involved with such systems and discuss a few lessons learned while building NATS Streaming.

Competing Goals

There are a number of competing goals when building a distributed log (these goals also extend to many other types of systems). Recall from part one that our key priorities for this type of system are performance, high availability, and scalability. The preceding parts of this series described at various levels how we can accomplish these three goals, but astute readers likely noticed that some of these things conflict with one another.

It’s easy to make something fast if it’s not fault-tolerant or scalable. If our log runs on a single server, our only constraints are how fast we can send data over the network and how fast the disk I/O is. And this is how a lot of systems, including many databases, tend to work—not only because it performs well, but because it’s simple. We can make these types of systems fault-tolerant by introducing a standby server and allowing clients to failover, but there are a couple issues worth mentioning with this.

With data systems, such as a log, high availability does not just pertain to continuity of service, but also availability of data. If I write data to the system and the system acknowledges that, that data should not be lost in the event of a failure. So with a standby server, we need to ensure data is replicated to avoid data loss (otherwise, in the context of a message log, we must relax our requirement of guaranteed delivery).

NATS Streaming initially shipped as a single-node system, which raised immediate concerns about production-readiness due to a single point of failure. The first step at trying to address some of these concerns was to introduce a fault-tolerance mode whereby a group of servers would run and only one would run as the active server. The active server would obtain an exclusive lock and process requests. Upon detecting a failure, standby servers would attempt to obtain the lock and become the active server.

Aside from the usual issues with distributed locks, this design requires a shared storage layer. With NATS Streaming, this meant either a shared volume, such as Gluster or EFS, or a shared MySQL database. This poses a performance challenge and isn’t particularly “cloud-native” friendly. Another issue is data is not replicated unless done so out-of-band by the storage layer. When we add in data replication, performance is hamstrung even further. But this was a quick and easy solution that offered some solace with respect to a SPOF (disclosure: I was not involved with NATS or NATS Streaming at this time). The longer term solution was to provide first-class clustering and data-replication support, but sometimes it’s more cost effective to provide fast recovery of a single-node system.

Another challenge with the single-node design is scalability. There is only so much capacity that one node can handle. At a certain point, scaling out becomes a requirement, and so we start partitioning. This is a common technique for relational databases where we basically just run multiple databases and divide up the data by some key. NATS Streaming is no different as it offers a partitioning story for dividing up channels between servers. The trouble with partitioning is it complicates things as it typically requires cooperation from the application. To make matters worse, NATS Streaming does not currently offer partitioning at the channel level, which means if a single topic has a lot of load, the solution is to manually partition it into multiple channels at the application level. This is why Kafka chose to partition its topics by default.

So performance is at odds with fault-tolerance and scalability, but another factor is what I call simplicity of mechanism. That is, the simplicity of the design plays an important role in the performance of a system. This plays out at multiple levels. We saw that, at an architectural level, using a simple, single-node design performs best but falls short as a robust solution. In part one, we saw that using a simple file structure for our log allowed us to take advantage of the hardware and operating system in terms of sequential disk access, page caching, and zero-copy reads. In part two, we made the observation that we can treat the log itself as a replicated WAL to solve the problem of data replication in an efficient way. And in part three, we discussed how a simple pull-based model can reduce complexity around flow control and batching.

At the same time, simplicity of “UX” makes performance harder. When I say UX, I mean the ergonomics of the system and how easy it is to use, operate, etc. NATS Streaming initially optimized for UX, which is why it fills an interesting space. Simplicity is a core part of the NATS philosophy, so it caught a small mindshare with developers frustrated or overwhelmed by Kafka. There is appetite for a “Kafka lite,” something which serves a similar purpose to Kafka but without all the bells and whistles and probably not targeted at large enterprises—a classic Innovator’s Dilemma to be sure.

NATS Streaming tracks consumer positions automatically, provides simple APIs, and uses a simple push-based protocol. This also means building a client library is a much less daunting task. The downside is the server needs to do more work. With a single node, as NATS Streaming was initially designed, this isn’t much of a problem. Where it starts to rear its head is when we need to replicate that state across a cluster of nodes. This has important implications with respect to performance and scale. Smart middleware has a natural tendency to become more complex, more fragile, and slower. The end-to-end principle attests to this. Amusingly, NATS Streaming was originally named STAN because it’s the opposite of NATS, a fast and simple messaging system with minimal guarantees.

Simplicity of mechanism tends to simply push complexity around in the system. For example, NATS Streaming provides an ergonomic API to clients by shifting the complexity to the server. Kafka scales and performs exceptionally well by shifting the complexity to other parts of the system, namely the client and ZooKeeper.

Scalability and fault-tolerance are equally at odds with simplicity for reasons mostly described above. The important point here is that these cannot be an afterthought. As I learned while implementing clustering in NATS Streaming, you can’t cleanly and effectively bolt on fault-tolerance onto an existing complex system. One of the laws of Systemantics comes to mind here: “A complex system designed from scratch never works and cannot be patched up to make it work. You have to start over, beginning with a working simple system.” Scalability and fault-tolerance need to be designed from day one.

Lastly, availability is inherently at odds with consistency. This is simply the CAP theorem. Guaranteeing strong consistency requires a quorum when replicating data, which hinders availability and performance. The key here is minimize what you need to replicate or relax your requirements.

Lessons Learned

The section above already contains several lessons learned in the process of working on NATS Streaming and implementing clustering, but I’ll capture a few important ones here.

First, distributed systems are complex enough. Simple is usually better—and faster. Again, we go back to the laws of systems here: “A complex system that works is invariably found to have evolved from a simple system that works.”

Second, lean on existing work. A critical part to delivering clustering rapidly was sticking with Raft and an existing Go implementation for leader election and data replication. There was considerable time spent designing a proprietary solution before I joined which still had edge cases not fully thought through. Not only is Raft off the shelf, it’s provably correct (implementation bugs notwithstanding). And following from the first lesson learned, start with a solution that works before worrying about optimization. It’s far easier to make a correct solution fast than it is to make a fast solution correct. Don’t roll your own coordination protocol if you don’t need to (and chances are you don’t need to).

There are probably edge cases for which you haven’t written tests. There are many failures modes, and you can only write so many tests. Formal methods and property-based testing can help a lot here. Similarly, chaos and fault-injection testing such as Kyle Kingsbury’s Jepsen help too.

Lastly, be honest with your users. Don’t try to be everything to everyone. Instead, be explicit about design decisions, trade-offs, guarantees, defaults, etc. If there’s one takeaway from Kyle’s Jepsen series it’s that many vendors are dishonest in their documentation and marketing. MongoDB became infamous for having unsafe defaults and implementation issues early on, most likely because they make benchmarks look much more impressive.

In part five of this series, we’ll conclude by outlining the design for a new log-based system that draws from ideas in the previous entries in the series.

Building a Distributed Log from Scratch, Part 3: Scaling Message Delivery

In part two of this series we discussed data replication within the context of a distributed log and how it relates to high availability. Next, we’ll look at what it takes to scale the log such that it can handle non-trivial workloads.

Data Scalability

A key part of scaling any kind of data-intensive system is the ability to partition the data. Partitioning is how we can scale a system linearly, that is to say we can handle more load by adding more nodes. We make the system horizontally scalable.

Kafka was designed this way from the beginning. Topics are partitioned and ordering is only guaranteed within a partition. For example, in an e-commerce application, we might have two topics, purchases and inventory, each with two partitions. These partitions allow us to distribute reads and writes across a set of brokers. In Kafka, the log is actually the partition.

The challenge with this is how we partition the data. We might distribute data using round robin, in effect randomly distributing it. The problem with this is we lose out on ordering, which is an important characteristic of the log. For example, imagine we have add and remove inventory operations. With random partitioning, we might end up with a remove followed by an add getting processed if they’re placed in different partitions. However, if they’re placed in the same partition, we know they will be ordered correctly from the perspective of the publisher.

We could also distribute by hashing a key and sending all writes with the same keys to the same partitions or some custom partitioning strategy along these lines. Continuing with our example, we might partition purchases by account name and inventory by SKU. This way, all purchase operations by the same account are ordered, as are all inventory operations pertaining to the same SKU. The diagram below shows a (naive) custom strategy that partitions topics by ranges based on the account and SKU.

The important point here is that how you partition your data is largely dependent on your application and its usage patterns, but partitioning is a critical part of scalability. It allows you to scale your workload processing by dividing up responsibilities, which in turn, allows you to throw more resources at the problem in a tractable way.

One of NATS Streaming’s shortcomings, in my opinion, is that it doesn’t currently offer a good story around partitioning. Channels are totally ordered, essentially making them the equivalent of a Kafka partition. The workaround is to partition among multiple channels at the application level. To some, this is a benefit because it’s conceptually simpler than Kafka, but Kafka was designed as such because scalability was a key design goal from day one.

Consumer Scalability

One challenge with the log is the problem of high fan-out. Specifically, how do we scale to a large number of consumers? In Kafka and NATS Streaming, reads (and writes) are only served by the leader. Similarly, Amazon Kinesis supports up to only five reads per second per shard (a shard is Kinesis’ equivalent of a partition). Thus, if we have five consumers reading from the same shard, we’ve already hit our fan-out limit. The thought is to partition your workload to increase parallelism and/or daisy chain streams to increase fan-out. But if we are trying to do very high fan-out, e.g. to thousands of IoT devices, neither of these are ideal solutions. Not all use cases may lend themselves to partitioning (though one can argue this is just a sign of poor architecting), and chaining up streams (or in Kafka nomenclature, topics) tends to be kludgey.

However, we can make the following observation: with an immutable log, there are no stale or phantom reads. Unlike a database, we can loosen our requirements a bit. Whereas a database is typically mutable, with a log, we’re only appending things. From a consumer’s perspective, a replica is either up-to-date with the leader or in the process of catching up, but in either case, if we read all of the records, we should end up in the same state. Immutability, at least in theory, should make it “easy” to scale to a large number of consumers because we don’t have to read from the leader to get correct results (ignoring log compaction and other “mutable” operations), so long as we’re okay with strong eventual consistency with respect to tailing the log.

In NATS Streaming, with Raft, we could simply allow followers to serve reads and scale reads by increasing the size of the cluster, but this would impact performance because the quorum size would also increase. Instead, we can use “non-voters” to act as read replicas and balance consumers among them. These read replicas do not participate in quorum or leader election, they simply receive committed log entries. In effect, this is the daisy chaining of streams mentioned earlier but done implicitly by the system. This is an otherwise common pattern for increasing consumer fan-out in Kinesis but is usually done in an ad hoc, Rube Goldberg-esque fashion. Note that, in the case of NATS Streaming, this isn’t quite as simple as it sounds due to the delivery mechanism used, which we’ll describe next.

Push vs. Pull

In Kafka, consumers pull data from brokers. In NATS Streaming, brokers push data to consumers. Kafka’s documentation describes this design decision in detail. The key factor largely comes down to flow control. With push, flow control needs to be explicit to deal with diverse consumers. Different consumers will consume at different rates, so the broker needs to be aware of this so as not to overwhelm a consumer.

There are obvious advantages and disadvantages to both approaches. With push, it can be a tricky balance to ensure full utilization of the consumer. We might use a backoff protocol like additive increase/multiplicative decrease, widely known for its use in TCP congestion control, to optimize utilization. NATS Streaming, like many other messaging systems, implements flow control by using acks. Upon receiving a message, consumers ack back to the server, and the server tracks the in-flight messages for each consumer. If that number goes above a certain threshold, the server will stop delivery until more acks are received. There is a similar flow-control mechanism between the publisher and the server. The trade-off here is the server needs to do some bookkeeping, which we’ll get to in a bit. With a pull-based system, flow control is implicit. Consumers simply go at their own pace, and the server doesn’t need to track anything. There is much less complexity with this.

Pull-based systems lend themselves to aggressive batching. With push, we must decide whether to send a message immediately or wait to accumulate more messages before sending. This is a decision pertaining to latency versus throughput. Push is often viewed as an optimization for latency, but if we’re tuning for low latency, we send messages one at a time only for them to end up being buffered on the consumer anyway. With pull, the consumer fetches all available messages after its current position in the log, which basically removes the guesswork around tuning batching and latency.

There are API implications with this decision too, particularly from an ergonomics and complexity perspective. Kafka clients tend to be “thick” and have a lot of complexity. That is, they do a lot because the broker is designed to be simple. That’s my guess as to why there are so few native client libraries up to par with the Java client. NATS Streaming clients, on the other hand, are relatively “thin” because the server does more. We end up just pushing the complexity around based on our design decisions, but one can argue that the smart client and dumb server is a more scalable approach. We’ll go into detail on that in the next installment of this series.

Circling back on consumer scalability, the fact that NATS Streaming uses a push-based model means we can’t simply setup read replicas and balance consumers among them. Instead, we would need to partition consumers among the replicas so that each server is responsible for pushing data to a subset of consumers. The increased complexity over pull becomes immediately apparent here.

Bookkeeping

There are two ways to track position in the log: have the server track it for consumers or have consumers track it themselves. Again, there are trade-offs with this, namely between API simplicity, server complexity, performance, and scalability. NATS Streaming tracks subscription positions for consumers. This means consumers can come and go as they like and pick back up where they left off easily. Before NATS Streaming supported clustering, this made a lot of sense because the bookkeeping was all in one server. But with clustering, this data must be replicated just like the message data, which poses a performance challenge.

The alternative is to punt the problem to the consumer. But also keep in mind that consumers might not have access to fast stable storage, such as with an IoT device or ephemeral container. Is there a way we can split the difference?

We can store the offsets themselves directly in the log. As of 0.9, this is what Kafka does. Before that, clients had to manage offsets themselves or store them in ZooKeeper. This forced a dependency on ZooKeeper for clients but also posed a major bottleneck since ZooKeeper is relatively low throughput. But by storing offsets in the log, they are treated just like any other write to a Kafka topic, which scales quite well (offsets are stored in an internal Kafka topic called __consumer_offsets partitioned by consumer group; there is also a special read cache for speeding up the read path).

Clients periodically checkpoint their offset to the log. We then use log compaction to retain only the latest offsets. Log compaction works by rewriting the log to retain only the latest message for a given key. On recovery, clients fetch the latest offset from the log. The important part here is we need to structure our keys such that compaction retains the latest offset for each unique consumer. For example, we might structure it as consumer-topic-partition. We end up with something resembling the following, where the message value is the offset:

The above log is uncompacted. Once compacted, it becomes the following:

Note that compaction violates some of our previous assumptions around the immutability of the log, but that’s for another discussion.

There are a number of advantages to this approach. We get fault-tolerance and durability due to the fact that our log is already fault-tolerant and durable as designed earlier. We get consistent reads again due to our replication scheme. Unlike ZooKeeper, we get high write throughput. And we reuse existing structures, so there’s less server complexity. We’re just reusing the log, there aren’t really any major new codepaths.

Interestingly, the bookkeeping needed for flow control in push-based systems—such as acks in NATS Streaming—serves much the same purpose as offset tracking in pull-based systems, since it needs to track position. The difference comes when we allow out-of-order processing. If we don’t allow it, then acks are simply a high-water mark that indicate the client is “this far” caught up. The problem with push is we also have to deal with redeliveries, whereas with pull they are implicitly handled by the client.  If we do allow out-of-order processing, then we need to track individual, in-flight messages, which is what per-message acks allow us to do. In this case, the system starts to look less like a log and more like a message queue. This makes push even more complicated.

The nice thing about reusing the log to track offsets is it greatly reduces the amount of code and complexity needed. Since NATS Streaming allows out-of-order processing, it uses a separate acking subsystem which otherwise has the same requirements as an offset-tracking subsystem.

In part four of this series, we will discuss some of the key trade-offs involved with implementing a distributed log and some lessons learned while building NATS Streaming.

Thrift on Steroids: A Tale of Scale and Abstraction

Apache Thrift is an RPC framework developed at Facebook for building “scalable cross-language services.” It consists of an interface definition language (IDL), communication protocol, API libraries, and a code generator that allows you to build and evolve services independently and in a polyglot fashion across a wide range of languages. This is nothing new and has been around for over a decade now.

There are a number of notable users of Thrift aside from Facebook, including Twitter (mainly by way of Finagle), Foursquare, Pinterest, Uber (via TChannel), and Evernote, among others—and for good reason, Thrift is mature and battle-tested.

The white paper explains the motivation behind Thrift in greater detail, though I think the following paragraph taken from the introduction does a pretty good job of summarizing it:

As Facebook’s traffic and network structure have scaled, the resource demands of many operations on the site (i.e. search, ad selection and delivery, event logging) have presented technical requirements drastically outside the scope of the LAMP framework. In our implementation of these services, various programming languages have been selected to optimize for the right combination of performance, ease and speed of development, availability of existing libraries, etc. By and large, Facebook’s engineering culture has tended towards choosing the best tools and implementations available over standardizing on any one programming language and begrudgingly accepting its inherent limitations.

Basically, as Facebook scaled, they moved more and more away from PHP and the LAMP stack and became increasingly polyglot. I think this same evolution is seen at most startups as they grow into themselves. We saw a similar transition in my time at Workiva, moving from our monolothic Python application on Google App Engine to a polyglot service-oriented architecture in AWS. It was an exciting but awkward time as we went through our adolescence as an engineering culture and teams started to find their identities. Teams learned what it meant to build backward-compatible APIs and loosely coupled services, how to deprecate APIs, how to build resilient and highly available systems, how to properly instrument services and diagnose issues, how to run and manage the underlying infrastructure, and—most importantly—how to collaborate with each other. There was lots of stumbling and mistakes along the way, lots of postmortems, lots of stress, but with that comes the learning and growing. The payoff is big but the process is painful. I don’t think it ever isn’t.

With one or two services written in the same language and relatively few developers, it was easy to just stick with “REST” (in quotes because it’s always a bastardized version of what REST ought to be), sling some JSON around, and call it a day. As the number of tech stacks and integration points increase, it becomes apparent that some standards are important. And once things are highly polyglot with lots of developers and lots of services running with lots of versions, strict service contracts become essential.

Uber has a blog post on building microservices that explains this and why they settled on Thrift to solve this problem.

Since the number of service calls grows rapidly, it is necessary to maintain a well-defined interface for every call. We knew we wanted to use an IDL for managing this interface, and we ultimately decided on Thrift. Thrift forces service owners to publish strict interface definitions, which streamlines the process of integrating with services. Calls that do not abide by the interface are rejected at the Thrift level instead of leaking into a service and failing deeper within the code. This strategy of publicly declaring your interface emphasizes the importance of backwards compatibility, since multiple versions of a service’s Thrift interface could be in use at any given time. The service author must not make breaking changes, and instead must only make non-breaking additions to the interface definition until all consumers are ready for deprecation.

Early on, I was tasked with building a unified messaging solution that would help with our integration challenges. The advantages of a unified solution should be obvious: reusability (before this, everyone was solving the problem in their own way), focus (allow developers to focus on their problem space, not the glue), acceleration (if the tools are already available, there’s less work to do), and shared pain points (it’s a lot easier to prioritize your work when everyone is complaining about the same thing). Also, a longer term benefit is developing the knowledge of this shared solution into an organizational competency which has a sort of “economy of scale” to it. Our job was not just to ship a messaging platform but evangelize it and help other teams to be successful with it. We did this through countless blog posts, training sessions, workshops, talks, and even a podcast.

Before we set out on building a common messaging solution, there were a few key principles we used to guide ourselves. We wanted to provide a core set of tools, libraries, and infrastructure for service integration. We wanted a solution that was rigid yet flexible. We provide only a minimal set of messaging patterns to act as generic building blocks with strict, strongly typed APIs, and promote design best practices and a service-oriented mindset. This meant supporting service evolution and API iteration through versioning and backward compatibility, allowing for resiliency patterns like timeouts, retries, circuit breakers, etc., and generally advocating asynchronous, loosely coupled communication. Lastly, we had to keep in mind that, at the end of the day, developers are just trying to ship stuff, so we had to balance these concerns out with ergonomics and developer experience so they could build, integrate, and ship quickly.

As much as I think RPC is a bad abstraction, it’s what developers want. If you don’t provide them with an RPC solution, they will build their own, so we had to provide first-class support for it. We evaluated solutions in the RPC space. We looked at GRPC extensively, which is the new RPC hotness from Google, but it had a few key drawbacks, namely its “newness” (it was still in early beta at the time and has since been almost entirely rewritten), it’s coupled to HTTP/2 as a transport (which at the time had fairly limited support), and it lacks support for JavaScript (let alone Dart, which is what most of our client applications were being written in). Avro was another we looked at.

Ultimately, we settled on Thrift due to its maturity and wide use in production, its performance, its architecture (it separates out the transports, protocols, and RPC layer with the first two being pluggable), its rich feature set, and its wide range of language support (checking off all the languages we standardized on as a company including Go, Java, Python, JavaScript, and Dart). Thrift is not without its problems though—more on this in a bit.

In addition to RPC, we wanted to promote a more asynchronous, message-passing style of communication with pub/sub. This would allow for greater flexibility in messaging patterns like fan-out and fan-in, interest-based messaging, and reduced coupling and fragility of services. This enables things like the worker pattern where we can distribute work to a pool of workers and scale that pool independently, whereas RPC tends to promote more stateful types of services. In my experience, developers tend to bias towards stateful services since this is how we’ve built things for a long time, but as we’ve entered the cloud-native era, things are running in containers which are autoscaled, more ephemeral, and more distributed. We have to grapple with the complexity imposed by distributed systems. This is why asynchronous messaging is important and why we wanted to support it from the onset.

We selected NATS as a messaging backplane because of its simplicity, performance, scalability, and adoption of the cloud-native mentality. When it comes to service integration, you need an always-on dial tone and NATS provides just that. Because of Thrift’s pluggable transport layer, we could build a NATS RPC transport while also providing HTTP and TCP transports.

Unfortunately, Thrift doesn’t provide any kind of support for pub/sub, and we wanted the same guarantees for it that we had with RPC, like type safety and versioning with code-generated APIs and service contracts. Aside from this, Thrift has a number of other, more glaring problems:

  • Head-of-line blocking: a single, slow request will block any subsequent requests for a client.
  • Out-of-order responses: an out-of-order response puts a Thrift transport in a bad state, requiring it to be torn down and reestablished, e.g. if a slow request times out at the client, the client issues a subsequent request, and a response comes back for the first request, the client blows up.
  • Concurrency: a Thrift client cannot be shared between multiple threads of execution, requiring each thread to have its own client issuing requests sequentially. This, combined with head-of-line blocking, is a major performance killer. This problem is compounded when each transport has its own resources, such as a socket.
  • RPC timeouts: Thrift does not provide good facilities for per-request timeouts, instead opting for a global transport read timeout.
  • Request headers: Thrift does not provide support for request metadata, making it difficult to implement things like authentication/authorization and distributed tracing. Instead, you are required to bake these things into your IDL or in a wrapped transport. The problem with this is it puts the onus on service providers rather than allowing an API gateway or middleware to perform these functions in a centralized way.
  • Middleware: Thrift does not have any support for client or server middleware. This means clients must be wrapped to implement interceptor logic and middleware code must be duplicated within handler functions. This makes it impossible to implement AOP-style logic in a clean, DRY way.

Twitter’s Finagle addresses many of these issues but is solely for the JVM, so we decided to address Thrift’s shortcomings in a cross-platform way without completely reinventing the wheel. That is, we took Thrift and extended it. What we ended up with was Frugal, a superset of Thrift recently open sourced that aims to solve the problems described above while also providing support for asynchronous pub/sub APIs—a sort of Thrift on steroids as I’ve come to call it. Its key features include:

  • Request multiplexing: client requests are fully multiplexed, allowing them to be issued concurrently while simultaneously avoiding the head-of-line blocking and out-of-order response problems. This also lays some groundwork for asynchronous messaging patterns.
  • Thread-safety: clients can be safely shared between multiple threads in which requests can be made in parallel.
  • Pub/sub: IDL and code-generation extensions for defining pub/sub APIs in a type-safe way.
  • Request context: a first-class request context object is added to every operation which allows defining request/response headers and per-request timeouts. By making the context part of the Frugal protocol, headers can be introspected or even injected by external middleware. This context could be used to send OAuth2 tokens and user-context information, avoiding the need to include it everywhere in your IDL and handler logic. Correlation IDs for distributed tracing purposes are also built into the request context.
  • Middleware: client- and server- side middleware is supported for RPC and pub/sub APIs. This allows you to implement interceptor logic around handler functions, e.g. for authentication, logging, or retry policies. One can easily integrate OpenTracing as a middleware, for example.
  • Cross-language: support for Go, Java, Dart, and Python (2.7 and 3.5).

Frugal adds a second kind of transport alongside Thrift’s RPC transport for pub/sub. With this, we provide a NATS transport for both pub/sub and RPC (internally, Workiva also has an at-least-once delivery pub/sub transport built around Amazon SQS for mission-critical data). In addition to this, we built a SDK which developers use to connect to the messaging infrastructure (such as NATS) with minimal ceremony. The messaging SDK played a vital role not just in making it easy for developers to adopt and integrate, but providing us a shim where we could introduce sweeping changes across the organization in one place, such as adding instrumentation, tracing, and authentication. This enabled us to roll critical integration components out to every service by making a change in one place.

To support pub/sub, we extended the Thrift IDL with an additional top-level construct called a scope, which is effectively a pub/sub namespace (basically what a service is to RPC). We wrote the IDL using a parsing expression grammar which allows us to generate a parser. We then implemented a code generator for the various language targets. The Frugal compiler is written in Go and is, at least in my opinion, much more maintainable than Thrift’s C++ codebase. However, the language libraries make use of the existing Thrift APIs, such as protocols, transports, etc. This means we didn’t need to implement any of the low-level mechanics like serialization.

I’ve since left Workiva (and am now actually working on NATS), but as far as I know, Frugal helps power nearly every production service at the company. It was an interesting experience from which I learned a lot. I was happy to see some of that work open sourced so others could use it and learn from it.

Of course, if I were starting over today, things would probably look different. GRPC is much more mature and the notion of a “service mesh” has taken the container world by storm with things like Istio, Linkerd, and Envoy. What we built was Workiva’s service mesh, we just didn’t have a name for it, so we called it a “Messaging SDK.” The corollary to this is you don’t need to adopt bleeding-edge tech to be successful. The concepts are what’s important, and if enough people are working on the same types of problems in parallel, they will likely converge on solutions that look very similar to each other given enough time and enough people working on them.

I think there’s a delicate balance between providing solutions that are “easy” from a developer point of view but may provide longer term drawbacks when it comes to building complex systems the “right” way. I see RPC as an example of this. It’s an “easy” abstraction but it hides a lot of complexity. Service meshes might even be in this category, but they have obvious upsides when it comes to building software in a way that is scalable. Peter Alvaro’s Strange Loop talk “I See What You Mean” does a great job of articulating this dilemma, which I’ve also written about myself. In the end, we decided to optimize for shipping, but we took a principled approach: provide the tools developers need (or want) but help educate them to utilize those tools in a way that allows them to ship products that are reliable and maintainable. Throwing tools or code over the wall is not enough.

If State Is Hell, SOA Is Satan

More and more companies are describing their success stories regarding the switch to a service-oriented architecture. As with any technological upswing, there’s a clear and palpable hype factor involved (Big Data™ or The Cloud™ anyone?), but obviously it’s not just puff.

While microservices and SOA have seen a staggering rate of adoption in recent years, the mindset of developers often seems to be stuck in the past. I think this is, at least in part, because we seek a mental model we can reason about. It’s why we build abstractions in the first place. In a sense, I would argue there’s a comparison to be made between the explosion of OOP in the early 90’s and today’s SOA trend. After all, SOA is as much about people scale as it is about workload scale, so it makes sense from an organizational perspective.

The Perils of Good Abstractions

While systems are becoming more and more distributed, abstractions are attempting to make them less and less complex. Mesosphere is a perfect example of this, attempting to provide the “datacenter operating system.” Apache Mesos allows you to “program against your datacenter like it’s a single pool of resources.” It’s an appealing proposition to say the least. PaaS like Google App Engine and Heroku offer similar abstractions—write your code without thinking about scale. The problem is you absolutely have to think about scale or you’re bound to run into problems down the road. And while these abstractions are nice, they can be dangerous just the same. Welcome to the perils of good abstractions.

I like to talk about App Engine because I have firsthand experience with it. It’s an easy sell for startups. It handles spinning up instances when you need them, turning them down when you don’t. It’s your app server, database, caching, job scheduler, task queue all in one, and it does it at scale. There’s vendor lock-in, sure, yet it means no ops, no sysadmins, no overhead. Push to deploy. But it’s a leaky abstraction. It has to be. App Engine scales because it’s distributed, but it allows—no, encourages—you to write your system as a monolith. The datastore, memcache, and task queue accesses are masked as RPCs. This is great for our developer mental model, but it will bite you if you’re not careful. App Engine imposes certain limitations to encourage good design; for instance, front-end requests and datastore calls are limited to 60 seconds (it used to be much less), but the leakiness goes beyond that.

RPC is consistently at odds with distributed systems. I would go so far as to say it’s an anti-pattern in many cases. RPC encourages writing synchronous code, but distributed systems are inherently asynchronous. The network is not reliable. The network is not fast. The network is not your friend. Developers who either don’t understand this or don’t realize what’s happening when they make an RPC will write code as if they were calling a function. It will sure as hell look like just calling a function. When we think synchronously, we end up with systems that are slow, fault intolerant, and generally not scalable. To be quite honest, however, this is perfectly acceptable for 90% of startups as they are getting off the ground because they don’t have workloads at meaningful scale.

There’s certainly some irony here. One of the selling points of App Engine is its ability to scale to large amounts of traffic, yet the vast majority of startups would be perfectly suited to scaling up rather than out, perhaps with some failover in place for good measure. Stack Overflow is the poster child of scale-up architecture. In truth, your architecture should be a function of your access patterns, not the other way around (and App Engine is very much tailored to a specific set of access patterns). Nonetheless, it shows that vertical scaling can work. I would bet a lot of startups could sufficiently run on a large, adequately specced machine or maybe a small handful of them.

The cruel irony is that once you hit a certain scale with App Engine, both in terms of your development organization and user base, you’ve reached a point where you have to migrate off it. And if your data model isn’t properly thought out, you will without a doubt hit scale problems. It’s to the point where you need someone with deep knowledge of how App Engine works in order to build quality systems on it. Good luck hiring a team of engineers who understand it. GAE is great at accelerating you to 100 mph, but you better have some nice airbags for the brick wall it launches you into. In fairness, this is a problem every org hits—Conway’s law is very much a reality and every startup has growing pains. To be clear, this isn’t a jab at GAE, which is actually very effective at accelerating a product using little capital and can sustain long-term success given the right use case. Instead, I use it to illustrate a point.

Peering Through the Abstraction

Eventually SOA makes sense, but our abstractions can cause problems if we don’t understand what’s going on behind the curtain (hence the leakiness). Partial failure is all but guaranteed, and latency, partitioning, and other network pressure happens all the time.

Ken Arnold is famed with once saying “state is hell” in reference to designing distributed systems. In the past, I’ve written how scaling shared data is hard, but with SOA it’s practically a requirement. Ken is right though—state is hell, and SOA is fundamentally competing with consistency. The FLP Impossibility result and the CAP theorem can prove it formally, but really this should be intuitively obvious if we accept the laws of physics.

On the other hand, if you store information that I can’t reconstruct, then a whole host of questions suddenly surface. One question is, “Are you now a single point of failure?” I have to talk to you now. I can’t talk to anyone else. So what happens if you go down?

To deal with that, you could be replicated. But now you have to worry about replication strategies. What if I talk to one replicant and modify some data, then I talk to another? Is that modification guaranteed to have already arrived there? What is the replication strategy? What kind of consistency do you need—tight or loose? What happens if the network gets partitioned and the replicants can’t talk to each other? Can anybody proceed?

Essentially, the more stateful your system is, the harder it’s going to be to scale it because distributing that state introduces a rich tapestry of problems. In practice, we often can’t eliminate state wholesale, but basically everything that can be stateless should be stateless.

Making servers disposable allows you a great deal of flexibility. Former Netflix Cloud Architect Adrian Cockcroft articulates this idea well:

You want to think of servers like cattle, not pets. If you have a machine in production that performs a specialized function, and you know it by name, and everyone gets sad when it goes down, it’s a pet. Instead you should think of your servers like a herd of cows. What you care about is how many gallons of milk you get. If one day you notice you’re getting less milk than usual, you find out which cows aren’t producing well and replace them.

This is effectively how App Engine achieves its scalability. With lightweight, stateless, and disposable instances, it can spin them up and down on the fly without worrying about being in an invalid state.

App Engine also relies on eventual consistency as the default model for datastore interactions. This makes queries fast and highly available, while snapshot isolation can be achieved using entity-group transactions if necessary. The latter, of course, can result in a lot of contention and latency. Yet, people seem to have a hard time grappling with the reality of eventual consistency in distributed systems. State is hell, but calling SOA “satan” is clearly a hyperbole. It is a tough problem nevertheless.

A State of Mind

In the situations where we need state, we have to reconcile with the realities of distributed systems. This means understanding the limitations and accepting the complexities, not papering over them. It doesn’t mean throwing away abstractions. Fortunately, distributed computing is the focus of a lot of great research, so there are primitives with which we can build: immutability, causal ordering, eventual consistency, CRDTs, and other ideas.

As long as we recognize the trade-offs, we can design around them. The crux is knowing they exist in the first place. We can’t have ACID semantics while remaining highly available, but we can use Highly Available Transactions to provide strong-enough guarantees. At the same time, not all operations require coordination or concurrency control. The sooner we view eventual consistency as a solution and not a consequence, the sooner we can let go of this existential crisis. Other interesting research includes BOOM, which seeks to provide a high-level, declarative approach to distributed programming.

State might be hell, but it’s a hell we have to live. I don’t advocate an all-out microservice architecture for a company just getting its start. The complications far outweigh any benefits to be gained, but it becomes a necessity at a certain point. The key is having an exit strategy. PaaS providers make this difficult due to vendor lock-in and architectural constraints. Weigh their advantages carefully.

Once you do transition to a SOA, make as many of those services, or the pieces backing them, as stateless as possible. For those which aren’t stateless, know that the problem typically isn’t novel. These problems have been solved or are continuing to be solved in new and interesting ways. Academic research is naturally at the bleeding edge with industry often lagging behind. OOP concepts date back to as early as the 60’s but didn’t gain widespread adoption until several decades later. Distributed computing is no different. SOA is just a state of mind.

Stream Processing and Probabilistic Methods: Data at Scale

Stream processing and related abstractions have become all the rage following the rise of systems like Apache Kafka, Samza, and the Lambda architecture. Applying the idea of immutable, append-only event sourcing means we’re storing more data than ever before. However, as the cost of storage continues to decline, it’s becoming more feasible to store more data for longer periods of time. With immutability, how the data lives isn’t interesting anymore. It’s all about how it moves.

The shifting landscape of data architecture parallels the way we’re designing systems today. Specifically, the evolution of monolithic to service-oriented architecture necessitates a change in the way we do data integration. The traditional normalization approach doesn’t cut it. Our systems are composed of databases, caches, search indexes, data warehouses, and a multitude of other components. Moreover, there’s an increasing demand for online, real-time processing of this data that’s tantamount to the growing popularity of large-scale, offline processing along the lines of Hadoop. This presents an interesting set of new challenges, namely, how do we drink from the firehose without getting drenched?

The answer most likely lies in frameworks like Samza, Storm, and Spark Streaming. Similarly, tools like StatsD solve the problem of collecting real-time analytics. However, this discussion is meant to explore some of the primitives used in stream processing. The ideas extend far beyond event sourcing, generalizing to any type of data stream, unbounded or not.

Batch vs. Streaming

With offline or batch processing, we often have some heuristics which provide insight into our data set, and we can typically afford multiple passes of the data. Without a strict time constraint, data structures are less important. We can store the entire data set on disk (or perhaps across a distributed file system) and process it in batches.

With streaming data, however, there’s a need for near real-time processing—collecting analytics, monitoring and alerting, updating search indexes and caches, etc. With web crawlers, we process a stream of URLs and documents and produce indexed content. With websites, we process a stream of page views and update various counters and gauges. With email, we process a stream of text and produce a filtered, spam-free inbox. These cases involve massive, often limitless data sets. Processing that data online can’t be done with the conventional ETL or MapReduce-style methods, and unless you’re doing windowed processing, it’s entirely impractical to store that data in memory.

Framing the Problem

As a concrete example of stream processing, imagine we want to count the number of distinct document views across a large corpus, say, Wikipedia. A naive solution would be to use a hash table which maps a document to a count. Wikipedia has roughly 35 million pages. Let’s assume each document is identified by a 16-byte GUID, and the counters are stored as 8-byte integers. This means we need in the ball park of a gigabyte of memory. Given today’s hardware, this might not sound completely unreasonable, but now let’s say we want to track views per unique IP address. We see that this approach quickly becomes intractable.

To illustrate further, consider how to count the cardinality of IP addresses which access our website. Instead of a hash table, we can use a bitmap or sparse bit array to count addresses. There are over four billion possible distinct IPv4 addresses. Sure, we could allocate half a gigabyte of RAM, but if you’re developing performance-critical systems, large heap sizes are going to kill you, and this overhead doesn’t help.1

A Probabilistic Approach

Instead, we turn to probabilistic ways of solving these problems. Probabilistic algorithms and data structures seem to be oft-overlooked, or maybe purposely ignored, by system designers despite the theory behind them having been around for a long time in many cases. The goal should be to move these from the world of academia to industry because they are immensely useful and widely neglected.

Probabilistic solutions trade space and performance for accuracy. While a loss in precision may be a cause for concern to some, the fact is with large data streams, we have to trade something off to get real-time insight. Much like the CAP theorem, we must choose between consistency (accuracy) and availability (online). With CAP, we can typically adjust the level in which that trade-off occurs. This space/performance-accuracy exchange behaves very much the same way.

The literature can get pretty dense, so let’s look at what some of these approaches are and the problems they solve in a way that’s (hopefully) understandable.

Bloom Filters

The Bloom filter is probably the most well-known and, conceptually, simplest probabilistic data structure. It also serves as a good foundation because there are a lot of twists you can put on it. The theory provides a kernel from which many other probabilistic solutions are derived, as we will see.

Bloom filters answer a simple question: is this element a member of a set? A normal set requires linear space because it stores every element. A Bloom filter doesn’t store the actual elements, it merely stores the “membership” of them. It uses sub-linear space opening the possibility for false positives, meaning there’s a non-zero probability it reports an item is in the set when it’s actually not. This has a wide range of applications. For example, a Bloom filter can be placed in front of a database. When a query for a piece of data comes in and the filter doesn’t contain it, we completely bypass the database.

The Bloom filter consists of a bit array of length and hash functions. Both of these parameters are configurable, but we can optimize them based on a desired rate of false positives. Each bit in the array is initially unset. When an element is “added” to the filter, it’s hashed by each of the functions, h1…hk, and modded by m, resulting in indices into the bit array. Each bit at the respective index is set.

To query the membership of an element, we hash it and check if each bit is set. If any of them are zero, we know the item isn’t in the set. What happens when two different elements hash to the same index? This is where false positives are introduced. If the bits aren’t set, we know the element wasn’t added, but if they are, it’s possible that some other element or group of elements hashed to the same indices. Bloom filters have false positives, but false negatives are impossible—a member will never be reported incorrectly as a non-member. Unfortunately, this also means items can’t be removed from the filter.

It’s clear that the likelihood of false positives increases with each element added to the filter. We can target a specific probability of false positives by selecting an optimal value for m and k for up to n insertions.2 While this implementation works well in practice, it has a drawback in that some elements are potentially more sensitive to false positives than others. We can solve this problem by partitioning the m bits among the k hash functions such that each one produces an index over its respective partition. As a result, each element is always described by exactly bits. This prevents any one element from being especially sensitive to false positives. Since calculating k hashes for every element is computationally expensive, we can actually perform a single hash and derive k hashes from it for a significant speedup.3

A Bloom filter eventually reaches a point where all bits are set, which means every query will indicate membership, effectively making the probability of false positives one. The problem with this is it requires a priori knowledge of the data set in order to select optimal parameters and avoid “overfilling.” Consequently, Bloom filters are ideal for offline processing and not so great for dealing with streams. Next, we’ll look at some variations of the Bloom filter which attempt to deal with this issue.

Scalable Bloom Filter

As we saw earlier, traditional Bloom filters are a great way to deal with set-membership problems in a space-efficient way, but they require knowing the size of the data set ahead of time in order to be effective. The Scalable Bloom Filter (SBF) was introduced by Almeida et al. as a way to cope with the capacity dilemma.

The Scalable Bloom Filter dynamically adapts to the size of the data set while enforcing a tight upper bound on the rate of false positives. Like the classic Bloom filter, false negatives are impossible. The SBF is essentially an array of Bloom filters with geometrically decreasing false-positive rates. New elements are added to the last filter. When this filter becomes “full”—more specifically, when it reaches a target fill ratio—a new filter is added with a tightened error probability. A tightening ratio, r, controls the growth of new filters.

Testing membership of an element consists of checking each of the filters. The geometrically decreasing false-positive rate of each filter is an interesting property. Since the fill ratio determines when a new filter is added, it turns out this progression can be modeled as a Taylor series which allows us to provide a tight upper bound on false positives using an optimal fill ratio of 0.5 (once the filter is 50% full, a new one is added). The compounded probability over the whole series converges to a target value, even accounting for an infinite series.

We can limit the error rate, but obviously the trade-off is that we end up allocating memory proportional to the size of the data set since we’re continuously adding filters. We also pay some computational cost on adds. Amortized, the cost of filter insertions is negligible, but for every add we must compute the current fill ratio of the filter to determine if a new filter needs to be added. Fortunately, we can optimize this by computing an estimated fill ratio. If we keep a running count of the items added to the filter, n, the approximate current fill ratio, p, can be obtained from the Taylor series expansion with p \approx 1-e^{-n/m} where m is the number of bits in the filter. Calculating this estimate turns out to be quite a bit faster than computing the actual fill ratio—fewer memory reads.

To provide some context around performance, adds in my Go implementation of a classic Bloom filter take about 166 ns on my MacBook Pro, and membership tests take 118 ns. With the SBF, adds take 422 ns and tests 113 ns. This isn’t a completely fair comparison since, as the SBF grows over time, tests require scanning through each filter, but certainly the add numbers are intuitive.

Scalable Bloom Filters are useful for cases where the size of the data set isn’t known a priori and memory constraints aren’t of particular concern. It’s an effective variation of a regular Bloom filter which generally requires space allocation orders-of-magnitude larger than the data set to allow enough headroom.

Stable Bloom Filter

Classic Bloom filters aren’t great for streaming data, and Scalable Bloom Filters, while a better option, still present a memory problem. Another derivative, the Stable Bloom Filter (SBF), was proposed by Deng and Rafiei as a technique for detecting duplicates in unbounded data streams with limited space. Most solutions work by dividing the stream into fixed-size windows and solving the problem within that discrete space. This allows the use of traditional methods like the Bloom filter.

The SBF attempts to approximate duplicates without the use of windows. Clearly, we can’t store the entire history of an infinite stream in limited memory. Instead, the thinking is more recent data has more value than stale data in many scenarios. As an example, web crawlers may not care about redundantly fetching a web page that was crawled a long time ago compared to fetching a page that was crawled more recently. With this impetus, the SBF works by representing more recent data while discarding old information.

The Stable Bloom Filter tweaks the classic Bloom filter by replacing the bit array with an array of m cells, each allocated d bits. The cells are simply counters, initially set to zero. The maximum value, Max, of a cell is 2^d-1. When an element is added, we first have to ensure space for it. This is done by selecting P random cells and decrementing their counters by one, clamping to zero. Next, we hash the element to k cells and set their counters to Max. Testing membership consists of probing the k cells. If any of them are zero, it’s not a member. The Bloom filter is actually a special case of an SBF where d is one and P is zero.

Like traditional Bloom filters, an SBF has a non-zero probability of false positives, which is controlled by several parameters. Unlike the Bloom filter, an SBF has a tight upper bound on the rate of false positives while introducing a non-zero rate of false negatives. The false-positive rate of a classic Bloom filter eventually reaches one, after which all queries result in a false positive. The stable-point property of an SBF means the false-positive rate asymptotically approaches a configurable fixed constant.

Stable Bloom Filters lend themselves to situations where the size of the data set isn’t known ahead of time and memory is bounded. For example, an SBF can be used to deduplicate events from an unbounded event stream with a specified upper bound on false positives and minimal false negatives. In particular, if the stream is not uniformly distributed, meaning duplicates are likely to be grouped closer together, the rate of false positives becomes immaterial.

Counting Bloom Filter

Conventional Bloom filters don’t allow items to be removed. The Stable Bloom Filter, on the other hand, works by evicting old data, but its API doesn’t expose a way to remove specific elements. The Counting Bloom Filter (CBF) is yet another twist on this structure. Introduced by Fan et al. in Summary Cache: A Scalable Wide-Area Web Cache Sharing Protocol, the CBF provides a way of deleting data from a filter.

It’s actually a very straightforward approach. The filter comprises an array of n-bit buckets similar to the Stable Bloom Filter. When an item is added, the corresponding counters are incremented, and when it’s removed, the counters are decremented.

Obviously, a CBF takes n-times more space than a regular Bloom filter, but it also has a scalability limit. Unless items are removed frequently enough, a counting filter’s false-positive probability will approach one. We need to dimension it accordingly, and this normally requires inferring from our data set. Likewise, since the CBF allows removals, it exposes an opportunity for false negatives. The multi-bit counters diminish the rate, but it’s important to be cognizant of this property so the CBF can be applied appropriately.

Inverse Bloom Filter

One of the distinguishing features of Bloom filters is the guarantee of no false negatives. This can be a valuable invariant, but what if we want the opposite of that? Is there an efficient data structure that can offer us no false positives with the possibility of false negatives? The answer is deceptively simple.

Jeff Hodges describes a rather trivial—yet strikingly pragmatic—approach which he calls the “opposite of a Bloom filter.” Since that name is a bit of a mouthful, I’m referring to this as the Inverse Bloom Filter (IBF).

The Inverse Bloom Filter may report a false negative but can never report a false positive. That is, it may indicate that an item has not been seen when it actually has, but it will never report an item as seen which it hasn’t come across. The IBF behaves in a similar manner to a fixed-size hash map of m buckets which doesn’t handle conflicts, but it provides lock-free concurrency using an underlying CAS.

The test-and-add operation hashes an element to an index in the filter. We take the value at that index while atomically storing the new value, then the old value is compared to the new one. If they differ, the new value wasn’t a member. If the values are equivalent, it was in the set.

The Inverse Bloom Filter is a nice option for dealing with unbounded streams or large data sets due to its limited memory usage. If duplicates are close together, the rate of false negatives becomes vanishingly small with an adequately sized filter.

HyperLogLog

Let’s revisit the problem of counting distinct IP addresses visiting a website. One solution might be to allocate a sparse bit array proportional to the number of IPv4 addresses. Alternatively, we could combine a Bloom filter with a counter. When an address comes in and it’s not in the filter, increment the count. While these both work, they’re not likely to scale very well. Rather, we can apply a probabilistic data structure known as the HyperLogLog (HLL). First presented by Flajolet et al. in 2007, HyperLogLog is an algorithm which approximately counts the number of distinct elements, or cardinality, of a multiset (a set which allows multiple occurrences of its elements).

Imagine our stream as a series of coin tosses. Someone tells you the most heads they flipped in a row was three. You can guess that they didn’t flip the coin very many times. However, if they flipped 20 heads in a row, it probably took them quite a while. This seems like a really unconvincing way of estimating the number of tosses, but from this kernel of thought grows a fruitful idea.

HLL replaces heads and tails with zeros and ones. Instead of counting the longest run of heads, it counts the longest run of leading zeros in a binary hash. Using a good hash function means that the values are, more or less, statistically independent and uniformly distributed. If the maximum run of leading zeros is n, the number of distinct elements in the set is approximately 2^n. But wait, what’s the math behind this? If we think about it, half of all binary numbers start with 1. Each additional bit divides the probability of a run further in half; that is, 25% start with 01, 12.5% start with 001, 6.25% start with 0001, ad infinitum. Thus, the probability of a run of length n is 2^{-(n+1)}.

Like flipping a coin, there’s potential for an enormous amount of variance with this technique. For instance, it’s entirely possible—though unlikely—that you flip 20 heads in a row on your first try. One experiment doesn’t provide enough data, so instead you flip 10 coins. HyperLogLog uses a similar strategy to reduce variance by splitting the stream up across a set of buckets, or registers, and applying the counting algorithm on the values in each one. It tracks the longest run of zeros in every register, and the total cardinality is computed by taking the harmonic mean across all registers. The harmonic mean is used to discount outliers since the distribution of values tends to skew towards the right. We can increase accuracy by adding more registers, which of course comes at the expense of performance and memory.

HyperLogLog is a remarkable algorithm. It almost feels like magic, and it’s exceptionally useful for working with data streams. HLL has a fraction of the memory footprint of other solutions. In fact, it’s usually several orders of magnitude while remaining surprisingly accurate.

Count-Min Sketch

HyperLogLog is a great option to efficiently count the number of distinct elements in a stream using a minimal amount of space, but it only gives us cardinality. What about counting the frequencies of specific elements? Cormode and Muthukrishnan developed the Count-Min sketch (CM sketch) to approximate the occurrences of different event types from a stream of events.

In contrast to a hash table, the Count-Min sketch uses sub-linear space to count frequencies. It consists of a matrix with w columns and d rows. These parameters determine the trade-off between space/time constraints and accuracy. Each row has an associated hash function. When an element arrives, it’s hashed for each row. The corresponding index in the rows are incremented by one. In this regard, the CM sketch shares some similarities with the Bloom filter.

count_min_sketch

The frequency of an element is estimated by taking the minimum of all the element’s respective counter values. The thought process here is that there is possibility for collisions between elements, which affects the counters for multiple items.  Taking the minimum count results in a closer approximation.

The Count-Min sketch is an excellent tool for counting problems for the same reasons as HyperLogLog. It has a lot of similarities to Bloom filters, but where Bloom filters effectively represent sets, the CM sketch considers multisets.

MinHash

The last probabilistic technique we’ll briefly look at is MinHash. This algorithm, invented by Andrei Broder, is used to quickly estimate the similarity between two sets. This has a variety of uses, such as detecting duplicate bodies of text and clustering or comparing documents.

MinHash works, in part, by using the Jaccard coefficient, a statistic which represents the size of the intersection of two sets divided by the size of the union:

J(A,B) = {{|A \cap B|}\over{|A \cup B|}}

This provides a measure of how similar the two sets are, but computing the intersection and union is expensive. Instead, MinHash essentially works by comparing randomly selected subsets through element hashing. It resembles locality-sensitive hashing (LSH), which means that similar items have a greater tendency to map to the same hash values. This varies from most hashing schemes, which attempt to avoid collisions. Instead, LSH is designed to maximize collisions of similar elements.

MinHash is one of the more difficult algorithms to fully grasp, and I can’t even begin to provide a proper explanation. If you’re interested in how it works, read the literature but know that there are a number of variations of it. The important thing is to know it exists and what problems it can be applied to.

In Practice

We’ve gone over several fundamental primitives for processing large data sets and streams while solving a few different types of problems. Bloom filters can be used to reduce disk reads and other expensive operations. They can also be purposed to detect duplicate events in a stream or prune a large decision tree. HyperLogLog excels at counting the number of distinct items in a stream while using minimal space, and the Count-Min sketch tracks the frequency of particular elements. Lastly, the MinHash method provides an efficient mechanism for comparing the similarity between documents. This has a number of applications, namely around identifying duplicates and characterizing content.

While there are no doubt countless implementations for most of these data structures and algorithms, I’ve been putting together a Go library geared towards providing efficient techniques for stream processing called Boom Filters. It includes implementations for all of the probabilistic data structures outlined above. As streaming data and real-time consumption grow more and more prominent, it will become important that we have the tools and understanding to deal with them. If nothing else, it’s valuable to be aware of probabilistic methods because they often provide accurate results at a fraction of the cost. These things aren’t just academic research, they form the basis for building online, high-performance systems at scale.

  1. Granted, if you’re sensitive to garbage-collection pauses, you may be better off using something like C or Rust, but that’s not always the most practical option. []
  2. The math behind determining optimal m and k is left as an exercise for the reader. []
  3. Less Hashing, Same Performance: Building a Better Bloom Filter discusses the use of two hash functions to simulate additional hashes. We can use a 64-bit hash, such as FNV, and use the upper and lower 32-bits as two different hashes. []