Introducing Liftbridge: Lightweight, Fault-Tolerant Message Streams

Last week I open sourced Liftbridge, my latest project and contribution to the Cloud Native Computing Foundation ecosystem. Liftbridge is a system for lightweight, fault-tolerant (LIFT) message streams built on NATS and gRPC. Fundamentally, it extends NATS with a Kafka-like publish-subscribe log API that is highly available and horizontally scalable.

I’ve been working on Liftbridge for the past couple of months, but it’s something I’ve been thinking about for over a year. I sketched out the design for it last year and wrote about it in January. It was largely inspired while I was working on NATS Streaming, which I’m currently still the second top contributor to. My primary involvement with NATS Streaming was building out the early data replication and clustering solution for high availability, which has continued to evolve since I left the project. In many ways, Liftbridge is about applying a lot of the things I learned while working on NATS Streaming as well as my observations from being closely involved with the NATS community for some time. It’s also the product of scratching an itch I’ve had since these are the kinds of problems I enjoy working on, and I needed something to code.

At its core, Liftbridge is a server that implements a durable, replicated message log for the NATS messaging system. Clients create a named stream which is attached to a NATS subject. The stream then records messages on that subject to a replicated write-ahead log. Multiple consumers can read back from the same stream, and multiple streams can be attached to the same subject.

The goal is to bridge the gap between sophisticated log-based messaging systems like Apache Kafka and Apache Pulsar and simpler, cloud-native systems. This meant not relying on external coordination services like ZooKeeper, not using the JVM, keeping the API as simple and small as possible, and keeping client libraries thin. The system is written in Go, making it a single static binary with a small footprint (~16MB). It relies on the Raft consensus algorithm to do coordination. It has a very minimal API (just three endpoints at the moment). And the API uses gRPC, so client libraries can be generated for most popular programming languages (there is a Go client which provides some additional wrapper logic, but it’s pretty thin). The goal is to keep Liftbridge very lightweight—in terms of runtime, operations, and complexity.

However, the bigger goal of Liftbridge is to extend NATS with a durable, at-least-once delivery mechanism that upholds the NATS tenets of simplicity, performance, and scalability. Unlike NATS Streaming, it uses the core NATS protocol with optional extensions. This means it can be added to an existing NATS deployment to provide message durability with no code changes.

NATS Streaming provides a similar log-based messaging solution. However, it is an entirely separate protocol built on top of NATS. NATS is an implementation detail—the transport—for NATS Streaming. This means the two systems have separate messaging namespaces—messages published to NATS are not accessible from NATS Streaming and vice versa. Of course, it’s a bit more nuanced than this because, in reality, NATS Streaming is using NATS subjects underneath; technically messages can be accessed, but they are serialized protobufs. These nuances often get confounded by firsttime users as it’s not always clear that NATS and NATS Streaming are completely separate systems. NATS Streaming also does not support wildcard subscriptions, which sometimes surprises users since it’s a major feature of NATS.

As a result, Liftbridge was built to augment NATS with durability rather than providing a completely separate system. To be clear, it’s still a separate server, but it merely acts as a write-ahead log for NATS subjects. NATS Streaming provides a broader set of features such as durable subscriptions, queue groups, pluggable storage backends, and multiple fault-tolerance modes. Liftbridge aims to have a relatively small API surface area.

The key features that differentiate Liftbridge are the shared message namespace, wildcards, log compaction, and horizontal scalability. NATS Streaming replicates channels to the entire cluster through a single Raft group, so adding servers does not help with scalability and actually creates a head-of-line bottleneck since everything is replicated through a single consensus group (n.b. NATS Streaming does have a partitioning mechanism, but it cannot be used in conjunction with clustering). Liftbridge allows replicating to a subset of the cluster, and each stream is replicated independently in parallel. This allows the cluster to scale horizontally and partition workloads more easily within a single, multi-tenant cluster.

Some of the key features of Liftbridge include:

  • Log-based API for NATS
  • Replicated for fault-tolerance
  • Horizontally scalable
  • Wildcard subscription support
  • At-least-once delivery support
  • Message key-value support
  • Log compaction by key (WIP)
  • Single static binary (~16MB)
  • Designed to be high-throughput (more on this to come)
  • Supremely simple

Initially, Liftbridge is designed to point to an existing NATS deployment. In the future, there will be support for a “standalone” mode where it can run with an embedded NATS server, allowing for a single deployable process. And in support of the “cloud-native” model, there is work to be done to make Liftbridge play nice with Kubernetes and generally productionalize the system, such as implementing an Operator and providing better instrumentation—perhaps with Prometheus support.

Over the coming weeks and months, I will be going into more detail on Liftbridge, including the internals of it—such as its replication protocol—and providing benchmarks for the system. Of course, there’s also a lot of work yet to be done on it, so I’ll be continuing to work on that. There are many interesting problems that still need solved, so consider this my appeal to contributors. :)

Building a Distributed Log from Scratch, Part 5: Sketching a New System

In part four of this series we looked at some key trade-offs involved with a distributed log implementation and discussed a few lessons learned while building NATS Streaming. In this fifth and final installment, we’ll conclude by outlining the design for a new log-based system that draws from the previous entries in the series.

The Context

For context, NATS and NATS Streaming are two different things. NATS Streaming is a log-based streaming system built on top of NATS, and NATS is a lightweight pub/sub messaging system. NATS was originally built (and then open sourced) as the control plane for Cloud Foundry. NATS Streaming was built in response to the community’s ask for higher-level guarantees—durability, at-least-once delivery, and so forth—beyond what NATS provided. It was built as a separate layer on top of NATS. I tend to describe NATS as a dial tone—ubiquitous and always on—perfect for “online” communications. NATS Streaming is the voicemail—leave a message after the beep and someone will get to it later. There are, of course, more nuances than this, but that’s the gist.

The key point here is that NATS and NATS Streaming are distinct systems with distinct protocols, distinct APIs, and distinct client libraries. In fact, NATS Streaming was designed to essentially act as a client to NATS. As such, clients don’t talk to NATS Streaming directly, rather all communication goes through NATS. However, the NATS Streaming binary can be configured to either embed NATS or point to a standalone deployment. The architecture is shown below in a diagram borrowed from the NATS website.

Architecturally, this makes a lot of sense. It supports the end-to-end principle in that we layer on additional functionality rather than bake it in to the underlying infrastructure. After all, we can always build stronger guarantees on top, but we can’t always remove them from below. This particular architecture, however, introduces a few challenges (disclosure: while I’m still a fan, I’m no longer involved with the NATS project and the NATS team is aware of these problems and no doubt working to address many of them).

First, there is no “cross-talk” between NATS and NATS Streaming, meaning messages published to NATS are not visible in NATS Streaming and vice versa. Again, they are two completely separate systems that just share the same infrastructure. This means we’re not really layering on message durability to NATS, we’re just exposing a new system which provides these semantics.

Second, because NATS Streaming runs as a “sidecar” to NATS and all of its communication runs through NATS, there is an inherent bottleneck at the NATS connection. This may only be a theoretical limit, but it precludes certain optimizations like using sendfile to do zero-copy reads of the log. It also means we rely on timeouts even in cases where the server could send a response immediately, such as when there is no leader elected for the cluster.

Third, NATS Streaming currently lacks a compelling story around linear scaling other than running multiple clusters and partitioning channels among them at the application level. With respect to scaling a single channel, the only alternative at the moment is to partition it into multiple channels at the application level. My hope is that as clustering matures, this will too.

Fourth, without extending its protocol, NATS Streaming’s authorization is intrinsically limited to the authorization provided by NATS since all communication goes through it. In and of itself, this isn’t a problem. NATS supports multi-user authentication and subject-level permissions, but since NATS Streaming uses an opaque protocol atop NATS, it’s difficult to setup proper ACLs at the streaming level. Of course, many layered protocols support authentication, e.g. HTTP atop TCP. For example, the NATS Streaming protocol could carry authentication tokens or session keys, but it currently does not do this.

Fifth, NATS Streaming does not support wildcard semantics, which—at least in my opinion—is a large selling point of NATS and, as a result, something users have come to expect. Specifically, NATS supports two wildcards in subject subscriptions: asterisk (*) which matches any token in the subject (e.g. foo.* matches foo.bar, foo.baz, etc.) and full wildcard (>) which matches one or more tokens at the tail of the subject (e.g. foo.> matches foo.bar, foo.bar.baz, etc.). Note that this limitation in NATS Streaming is not directly related to the overall architecture but more in how we design the log.

More generally, clustering and data replication was more of an afterthought in NATS Streaming. As we discussed in part four, it’s hard to add this after the fact. Combined with the APIs NATS Streaming provides (which do flow control and track consumer state), this creates a lot of complexity in the server.

A New System

I wasn’t involved much with NATS Streaming beyond the clustering implementation. However, from that work—and through my own use of NATS and from discussions I’ve had with the community—I’ve thought about how I would build something like it if I were to start over. It would look a bit different from NATS Streaming and Kafka, yet also share some similarities. I’ve dubbed this theoretical system Jetstream (update: this is now Liftbridge), though I’ve yet to actually build anything beyond small prototypes. It’s a side project of mine I hope to get to at some point.

Core NATS has a strong community with solid mindshare, but NATS Streaming doesn’t fully leverage this since it’s a new silo. Jetstream aims to address the above problems starting from a simple proposition: many people are already using NATS today and simply want streaming semantics for what they already have. However, we must also acknowledge that other users are happy with NATS as it currently is and have no need for additional features that might compromise simplicity or performance. This was a deciding factor in choosing not to build NATS Streaming’s functionality directly into NATS.

Like NATS Streaming, Jetstream is a separate component which acts as a NATS client. Unlike NATS Streaming, it augments NATS as opposed to implementing a wholly new protocol. More succinctly, Jetstream is a durable stream augmentation for NATS. Next, we’ll talk about how it accomplishes this by sketching out a design.

Cross-Talk

In NATS Streaming, the log is modeled as a channel. Clients create channels implicitly by publishing or subscribing to a topic (called a subject in NATS). A channel might be foo but internally this is translated to a NATS pub/sub subject such as _STAN.pub.foo. Therefore, while NATS Streaming is technically a client of NATS, it’s done so just to dispatch communication between the client and server. The log is implemented on top of plain pub/sub messaging.

Jetstream is merely a consumer of NATS. In it, the log is modeled as a stream. Clients create streams explicitly, which are subscriptions to NATS subjects that are sequenced, replicated, and durably stored. Thus, there is no “cross-talk” or internal subjects needed because Jetstream messages are NATS messages. Clients just publish their messages to NATS as usual and, behind the scenes, Jetstream will handle storing them in a log. In some sense, it’s just an audit log of messages flowing through NATS.

With this, we get wildcards for free since streams are bound to NATS subjects. There are some trade-offs to this, however, which we will discuss in a bit.

Performance

Jetstream does not track subscription positions. It is up to consumers to track their position in a stream or, optionally, store their position in a stream (more on this later). This means we treat a stream as a simple log, allowing us to do fast, sequential disk I/O and minimize replication and protocol chatter as well as code complexity.

Consumers connect directly to Jetstream using a pull-based socket API. The log is stored in the manner described in part one. This enables us to do zero-copy reads from a stream and other important optimizations which NATS Streaming is precluded from doing. It also simplifies things around flow control and batching as we discussed in part three.

Scalability

Jetstream is designed to be clustered and horizontally scalable from the start. We make the observation that NATS is already efficient at routing messages, particularly with high consumer fan-out, and provides clustering of the interest graph. Streams provide the unit of storage and scalability in Jetstream.

A stream is a named log attached to a NATS subject. Akin to a partition in Kafka, each stream has a replicationFactor, which controls the number of nodes in the Jetstream cluster that participate in replicating the stream, and each stream has a leader. The leader is responsible for receiving messages from NATS, sequencing them, and performing replication (NATS provides per-publisher message ordering).

Like Kafka’s controller, there is a single metadata leader for a Jetstream cluster which is responsible for processing requests to create or delete streams. If a request is sent to a follower, it’s automatically forwarded to the leader. When a stream is created, the metadata leader selects replicationFactor nodes to participate in the stream (initially, this selection is random but could be made more intelligent, e.g. selecting based on current load) and replicates the stream to all nodes in the cluster. Once this replication completes, the stream has been created and its leader begins processing messages. This means NATS messages are not stored unless there is a stream matching their subject (this is the trade-off to support wildcards, but it also means we don’t waste resources storing messages we might not care about). This can be mitigated by having publishers ensure a stream exists before publishing, e.g. at startup.

There can exist multiple streams attached to the same NATS subject or even subjects that are semantically equivalent, e.g. foo.bar and foo.*. Each of these streams will receive a copy of the message as NATS handles this fan-out. However, the stream name is unique within a given subject. For example, creating two streams for the subject foo.bar named foo and bar, respectively, will create two streams which will independently sequence all of the messages on the NATS subject foo.bar, but attempting to create two streams for the same subject both named foo will result in creating just a single stream (creation is idempotent).

With this in mind, we can scale linearly with respect to consumers—covered in part three—by adding more nodes to the Jetstream cluster and creating more streams which will be distributed among the cluster. This has the advantage that we don’t need to worry about partitioning so long as NATS is able to withstand the load (there is also an assumption that we can ensure reasonable balance of stream leaders across the cluster). We’ve basically split out message routing from storage and consumption, which allows us to scale independently.

Additionally, streams can join a named consumer group. This, in effect, partitions a NATS subject among the streams in the group, again covered in part three, allowing us to create competing consumers for load-balancing purposes. This works by using NATS queue subscriptions, so the downside is partitioning is effectively random. The upside is consumer groups don’t affect normal streams.

Compaction and Offset Tracking

Streams support multiple log-compaction rules: time-based, message-based, and size-based. As in Kafka, we also support a fourth kind: key compaction. This is how offset storage will work, which was described in part three, but it also enables some other interesting use cases like KTables in Kafka Streams.

As discussed above, messages in Jetstream are simply NATS messages. There is no special protocol needed for Jetstream to process messages. However, publishers can choose to optionally “enhance” their messages by providing additional metadata and serializing their messages into envelopes. The envelope includes a special cookie Jetstream uses to detect if a message is an envelope or a simple NATS message (if the cookie is present by coincidence and envelope deserialization fails, we fall back to treating it as a normal message).

One of the metadata fields on the envelope is an optional message key. A stream can be configured to compact by key. In this case, it retains only the last message for each key (if no key is present, the message is always retained).

Consumers can optionally store their offsets in Jetstream (this can also be transparently managed by a client library similar to Kafka’s high-level consumer). This works by storing offsets in a stream keyed by consumer. A consumer (or consumer library) publishes their latest offset. This allows them to later retrieve their offset from the stream, and key compaction means Jetstream will only retain the latest offset for each consumer. For improved performance, the client library should only periodically checkpoint this offset.

Authorization

Because Jetstream is a separate server which is merely a consumer of NATS, it can provide ACLs or other authorization mechanisms on streams. A simple configuration might be to restrict NATS access to Jetstream and configure Jetstream to only allow access to certain subjects. There is more work involved because there is a separate access-control system, but this gives greater flexibility by separating out the systems.

At-Least Once Delivery

To ensure at-least-once delivery of messages, Jetstream relies on replication and publisher acks. When a message is received on a stream, it’s assigned an offset by the leader and then replicated. Upon a successful replication, the stream publishes an ack to NATS on the reply subject of the message, if present (the reply subject is a part of the NATS message protocol).

There are two implications with this. First, if the publisher doesn’t care about ensuring its message is stored, it need not set a reply subject. Second, because there are potentially multiple (or no) streams attached to a subject (and creation/deletion of streams is dynamic), it’s not possible for the publisher to know how many acks to expect. This is a trade-off we make for enabling subject fan-out and wildcards while remaining scalable and fast. We make the assertion that if guaranteed delivery is important, the publisher should be responsible for determining the destination streams a priori. This allows attaching streams to a subject for use cases that do not require strong guarantees without the publisher having to be aware. Note that this might be an area for future improvement to increase usability, such as storing streams in a registry. However, this is akin to other similar systems, like Kafka, where you must first create a topic and then you publish to that topic.

One caveat to this is if there are existing application-level uses of the reply subject on NATS messages. That is, if other systems are already publishing replies, then Jetstream will overload this. The alternative would be to require the envelope, which would include a canonical reply subject for acks, for at-least-once delivery. Otherwise we would need a change to the NATS protocol itself.

Replication Protocol

For metadata replication and leadership election, we rely on Raft. However, for replication of streams, rather than using Raft or other quorum-based techniques, we use a technique similar to Kafka as described in part two.

For each stream, we maintain an in-sync replica set (ISR), which is all of the replicas currently up to date (at stream creation time, this is all of the replicas). During replication, the leader writes messages to a WAL, and we only wait on replicas in the ISR before committing. If a replica falls behind or fails, it’s removed from the ISR. If the leader fails, any replica in the ISR can take its place. If a failed replica catches back up, it rejoins the ISR. The general stream replication process is as follows:

  1. Client creates a stream with a replicationFactor of n.
  2. Metadata leader selects n replicas to participate and one leader at random (this comprises the initial ISR).
  3. Metadata leader replicates the stream via Raft to the entire cluster.
  4. The nodes participating in the stream initialize it, and the leader subscribes to the NATS subject.
  5. The leader initializes the high-water mark (HW) to 0. This is the offset of the last committed message in the stream.
  6. The leader begins sequencing messages from NATS and writes them to the log uncommitted.
  7. Replicas consume from the leader’s log to replicate messages to their own log. We piggyback the leader’s HW on these responses, and replicas periodically checkpoint the HW to stable storage.
  8. Replicas acknowledge they’ve replicated the message.
  9. Once the leader has heard from the ISR, the message is committed and the HW is updated.

Note that clients only see committed messages in the log. There are a variety of failures that can occur in the replication process. A few of them are described below along with how they are mitigated.

If a follower suspects that the leader has failed, it will notify the metadata leader. If the metadata leader receives a notification from the majority of the ISR within a bounded period, it will select a new leader for the stream, apply this update to the Raft group, and notify the replica set. These notifications need to go through Raft as well in the event of a metadata leader failover occurring at the same time as a stream leader failure. Committed messages are always preserved during a leadership change, but uncommitted messages could be lost.

If the stream leader detects that a replica has failed or fallen too far behind, it removes the replica from the ISR by notifying the metadata leader. The metadata leader replicates this fact via Raft. The stream leader continues to commit messages with fewer replicas in the ISR, entering an under-replicated state.

When a failed replica is restarted, it recovers the latest HW from stable storage and truncates its log up to the HW. This removes any potentially uncommitted messages in the log. The replica then begins fetching messages from the leader starting at the HW. Once the replica has caught up, it’s added back into the ISR and the system resumes its fully replicated state.

If the metadata leader fails, Raft will handle electing a new leader. The metadata Raft group stores the leader and ISR for every stream, so failover of the metadata leader is not a problem.

There are a few other corner cases and nuances to handle, but this covers replication in broad strokes. We also haven’t discussed how to implement failure detection (Kafka uses ZooKeeper for this), but we won’t prescribe that here.

Wrapping Up

This concludes our series on building a distributed log that is fast, highly available, and scalable. In part one, we introduced the log abstraction and talked about the storage mechanics behind it. In part two, we covered high availability and data replication. In part three, we we discussed scaling message delivery. In part four, we looked at some trade-offs and lessons learned. Lastly, in part five, we outlined the design for a new log-based system that draws from the previous entries in the series.

The goal of this series was to learn a bit about the internals of a log abstraction, to learn how it can achieve the three priorities described earlier, and to learn some applied distributed systems theory. Hopefully you found it useful or, at the very least, interesting.

If you or your company are looking for help with system architecture, performance, or scalability, contact Real Kinetic.

Building a Distributed Log from Scratch, Part 4: Trade-Offs and Lessons Learned

In part three of this series we talked about scaling message delivery in a distributed log. In part four, we’ll look at some key trade-offs involved with such systems and discuss a few lessons learned while building NATS Streaming.

Competing Goals

There are a number of competing goals when building a distributed log (these goals also extend to many other types of systems). Recall from part one that our key priorities for this type of system are performance, high availability, and scalability. The preceding parts of this series described at various levels how we can accomplish these three goals, but astute readers likely noticed that some of these things conflict with one another.

It’s easy to make something fast if it’s not fault-tolerant or scalable. If our log runs on a single server, our only constraints are how fast we can send data over the network and how fast the disk I/O is. And this is how a lot of systems, including many databases, tend to work—not only because it performs well, but because it’s simple. We can make these types of systems fault-tolerant by introducing a standby server and allowing clients to failover, but there are a couple issues worth mentioning with this.

With data systems, such as a log, high availability does not just pertain to continuity of service, but also availability of data. If I write data to the system and the system acknowledges that, that data should not be lost in the event of a failure. So with a standby server, we need to ensure data is replicated to avoid data loss (otherwise, in the context of a message log, we must relax our requirement of guaranteed delivery).

NATS Streaming initially shipped as a single-node system, which raised immediate concerns about production-readiness due to a single point of failure. The first step at trying to address some of these concerns was to introduce a fault-tolerance mode whereby a group of servers would run and only one would run as the active server. The active server would obtain an exclusive lock and process requests. Upon detecting a failure, standby servers would attempt to obtain the lock and become the active server.

Aside from the usual issues with distributed locks, this design requires a shared storage layer. With NATS Streaming, this meant either a shared volume, such as Gluster or EFS, or a shared MySQL database. This poses a performance challenge and isn’t particularly “cloud-native” friendly. Another issue is data is not replicated unless done so out-of-band by the storage layer. When we add in data replication, performance is hamstrung even further. But this was a quick and easy solution that offered some solace with respect to a SPOF (disclosure: I was not involved with NATS or NATS Streaming at this time). The longer term solution was to provide first-class clustering and data-replication support, but sometimes it’s more cost effective to provide fast recovery of a single-node system.

Another challenge with the single-node design is scalability. There is only so much capacity that one node can handle. At a certain point, scaling out becomes a requirement, and so we start partitioning. This is a common technique for relational databases where we basically just run multiple databases and divide up the data by some key. NATS Streaming is no different as it offers a partitioning story for dividing up channels between servers. The trouble with partitioning is it complicates things as it typically requires cooperation from the application. To make matters worse, NATS Streaming does not currently offer partitioning at the channel level, which means if a single topic has a lot of load, the solution is to manually partition it into multiple channels at the application level. This is why Kafka chose to partition its topics by default.

So performance is at odds with fault-tolerance and scalability, but another factor is what I call simplicity of mechanism. That is, the simplicity of the design plays an important role in the performance of a system. This plays out at multiple levels. We saw that, at an architectural level, using a simple, single-node design performs best but falls short as a robust solution. In part one, we saw that using a simple file structure for our log allowed us to take advantage of the hardware and operating system in terms of sequential disk access, page caching, and zero-copy reads. In part two, we made the observation that we can treat the log itself as a replicated WAL to solve the problem of data replication in an efficient way. And in part three, we discussed how a simple pull-based model can reduce complexity around flow control and batching.

At the same time, simplicity of “UX” makes performance harder. When I say UX, I mean the ergonomics of the system and how easy it is to use, operate, etc. NATS Streaming initially optimized for UX, which is why it fills an interesting space. Simplicity is a core part of the NATS philosophy, so it caught a small mindshare with developers frustrated or overwhelmed by Kafka. There is appetite for a “Kafka lite,” something which serves a similar purpose to Kafka but without all the bells and whistles and probably not targeted at large enterprises—a classic Innovator’s Dilemma to be sure.

NATS Streaming tracks consumer positions automatically, provides simple APIs, and uses a simple push-based protocol. This also means building a client library is a much less daunting task. The downside is the server needs to do more work. With a single node, as NATS Streaming was initially designed, this isn’t much of a problem. Where it starts to rear its head is when we need to replicate that state across a cluster of nodes. This has important implications with respect to performance and scale. Smart middleware has a natural tendency to become more complex, more fragile, and slower. The end-to-end principle attests to this. Amusingly, NATS Streaming was originally named STAN because it’s the opposite of NATS, a fast and simple messaging system with minimal guarantees.

Simplicity of mechanism tends to simply push complexity around in the system. For example, NATS Streaming provides an ergonomic API to clients by shifting the complexity to the server. Kafka scales and performs exceptionally well by shifting the complexity to other parts of the system, namely the client and ZooKeeper.

Scalability and fault-tolerance are equally at odds with simplicity for reasons mostly described above. The important point here is that these cannot be an afterthought. As I learned while implementing clustering in NATS Streaming, you can’t cleanly and effectively bolt on fault-tolerance onto an existing complex system. One of the laws of Systemantics comes to mind here: “A complex system designed from scratch never works and cannot be patched up to make it work. You have to start over, beginning with a working simple system.” Scalability and fault-tolerance need to be designed from day one.

Lastly, availability is inherently at odds with consistency. This is simply the CAP theorem. Guaranteeing strong consistency requires a quorum when replicating data, which hinders availability and performance. The key here is minimize what you need to replicate or relax your requirements.

Lessons Learned

The section above already contains several lessons learned in the process of working on NATS Streaming and implementing clustering, but I’ll capture a few important ones here.

First, distributed systems are complex enough. Simple is usually better—and faster. Again, we go back to the laws of systems here: “A complex system that works is invariably found to have evolved from a simple system that works.”

Second, lean on existing work. A critical part to delivering clustering rapidly was sticking with Raft and an existing Go implementation for leader election and data replication. There was considerable time spent designing a proprietary solution before I joined which still had edge cases not fully thought through. Not only is Raft off the shelf, it’s provably correct (implementation bugs notwithstanding). And following from the first lesson learned, start with a solution that works before worrying about optimization. It’s far easier to make a correct solution fast than it is to make a fast solution correct. Don’t roll your own coordination protocol if you don’t need to (and chances are you don’t need to).

There are probably edge cases for which you haven’t written tests. There are many failures modes, and you can only write so many tests. Formal methods and property-based testing can help a lot here. Similarly, chaos and fault-injection testing such as Kyle Kingsbury’s Jepsen help too.

Lastly, be honest with your users. Don’t try to be everything to everyone. Instead, be explicit about design decisions, trade-offs, guarantees, defaults, etc. If there’s one takeaway from Kyle’s Jepsen series it’s that many vendors are dishonest in their documentation and marketing. MongoDB became infamous for having unsafe defaults and implementation issues early on, most likely because they make benchmarks look much more impressive.

In part five of this series, we’ll conclude by outlining the design for a new log-based system that draws from ideas in the previous entries in the series.

Building a Distributed Log from Scratch, Part 3: Scaling Message Delivery

In part two of this series we discussed data replication within the context of a distributed log and how it relates to high availability. Next, we’ll look at what it takes to scale the log such that it can handle non-trivial workloads.

Data Scalability

A key part of scaling any kind of data-intensive system is the ability to partition the data. Partitioning is how we can scale a system linearly, that is to say we can handle more load by adding more nodes. We make the system horizontally scalable.

Kafka was designed this way from the beginning. Topics are partitioned and ordering is only guaranteed within a partition. For example, in an e-commerce application, we might have two topics, purchases and inventory, each with two partitions. These partitions allow us to distribute reads and writes across a set of brokers. In Kafka, the log is actually the partition.

The challenge with this is how we partition the data. We might distribute data using round robin, in effect randomly distributing it. The problem with this is we lose out on ordering, which is an important characteristic of the log. For example, imagine we have add and remove inventory operations. With random partitioning, we might end up with a remove followed by an add getting processed if they’re placed in different partitions. However, if they’re placed in the same partition, we know they will be ordered correctly from the perspective of the publisher.

We could also distribute by hashing a key and sending all writes with the same keys to the same partitions or some custom partitioning strategy along these lines. Continuing with our example, we might partition purchases by account name and inventory by SKU. This way, all purchase operations by the same account are ordered, as are all inventory operations pertaining to the same SKU. The diagram below shows a (naive) custom strategy that partitions topics by ranges based on the account and SKU.

The important point here is that how you partition your data is largely dependent on your application and its usage patterns, but partitioning is a critical part of scalability. It allows you to scale your workload processing by dividing up responsibilities, which in turn, allows you to throw more resources at the problem in a tractable way.

One of NATS Streaming’s shortcomings, in my opinion, is that it doesn’t currently offer a good story around partitioning. Channels are totally ordered, essentially making them the equivalent of a Kafka partition. The workaround is to partition among multiple channels at the application level. To some, this is a benefit because it’s conceptually simpler than Kafka, but Kafka was designed as such because scalability was a key design goal from day one.

Consumer Scalability

One challenge with the log is the problem of high fan-out. Specifically, how do we scale to a large number of consumers? In Kafka and NATS Streaming, reads (and writes) are only served by the leader. Similarly, Amazon Kinesis supports up to only five reads per second per shard (a shard is Kinesis’ equivalent of a partition). Thus, if we have five consumers reading from the same shard, we’ve already hit our fan-out limit. The thought is to partition your workload to increase parallelism and/or daisy chain streams to increase fan-out. But if we are trying to do very high fan-out, e.g. to thousands of IoT devices, neither of these are ideal solutions. Not all use cases may lend themselves to partitioning (though one can argue this is just a sign of poor architecting), and chaining up streams (or in Kafka nomenclature, topics) tends to be kludgey.

However, we can make the following observation: with an immutable log, there are no stale or phantom reads. Unlike a database, we can loosen our requirements a bit. Whereas a database is typically mutable, with a log, we’re only appending things. From a consumer’s perspective, a replica is either up-to-date with the leader or in the process of catching up, but in either case, if we read all of the records, we should end up in the same state. Immutability, at least in theory, should make it “easy” to scale to a large number of consumers because we don’t have to read from the leader to get correct results (ignoring log compaction and other “mutable” operations), so long as we’re okay with strong eventual consistency with respect to tailing the log.

In NATS Streaming, with Raft, we could simply allow followers to serve reads and scale reads by increasing the size of the cluster, but this would impact performance because the quorum size would also increase. Instead, we can use “non-voters” to act as read replicas and balance consumers among them. These read replicas do not participate in quorum or leader election, they simply receive committed log entries. In effect, this is the daisy chaining of streams mentioned earlier but done implicitly by the system. This is an otherwise common pattern for increasing consumer fan-out in Kinesis but is usually done in an ad hoc, Rube Goldberg-esque fashion. Note that, in the case of NATS Streaming, this isn’t quite as simple as it sounds due to the delivery mechanism used, which we’ll describe next.

Push vs. Pull

In Kafka, consumers pull data from brokers. In NATS Streaming, brokers push data to consumers. Kafka’s documentation describes this design decision in detail. The key factor largely comes down to flow control. With push, flow control needs to be explicit to deal with diverse consumers. Different consumers will consume at different rates, so the broker needs to be aware of this so as not to overwhelm a consumer.

There are obvious advantages and disadvantages to both approaches. With push, it can be a tricky balance to ensure full utilization of the consumer. We might use a backoff protocol like additive increase/multiplicative decrease, widely known for its use in TCP congestion control, to optimize utilization. NATS Streaming, like many other messaging systems, implements flow control by using acks. Upon receiving a message, consumers ack back to the server, and the server tracks the in-flight messages for each consumer. If that number goes above a certain threshold, the server will stop delivery until more acks are received. There is a similar flow-control mechanism between the publisher and the server. The trade-off here is the server needs to do some bookkeeping, which we’ll get to in a bit. With a pull-based system, flow control is implicit. Consumers simply go at their own pace, and the server doesn’t need to track anything. There is much less complexity with this.

Pull-based systems lend themselves to aggressive batching. With push, we must decide whether to send a message immediately or wait to accumulate more messages before sending. This is a decision pertaining to latency versus throughput. Push is often viewed as an optimization for latency, but if we’re tuning for low latency, we send messages one at a time only for them to end up being buffered on the consumer anyway. With pull, the consumer fetches all available messages after its current position in the log, which basically removes the guesswork around tuning batching and latency.

There are API implications with this decision too, particularly from an ergonomics and complexity perspective. Kafka clients tend to be “thick” and have a lot of complexity. That is, they do a lot because the broker is designed to be simple. That’s my guess as to why there are so few native client libraries up to par with the Java client. NATS Streaming clients, on the other hand, are relatively “thin” because the server does more. We end up just pushing the complexity around based on our design decisions, but one can argue that the smart client and dumb server is a more scalable approach. We’ll go into detail on that in the next installment of this series.

Circling back on consumer scalability, the fact that NATS Streaming uses a push-based model means we can’t simply setup read replicas and balance consumers among them. Instead, we would need to partition consumers among the replicas so that each server is responsible for pushing data to a subset of consumers. The increased complexity over pull becomes immediately apparent here.

Bookkeeping

There are two ways to track position in the log: have the server track it for consumers or have consumers track it themselves. Again, there are trade-offs with this, namely between API simplicity, server complexity, performance, and scalability. NATS Streaming tracks subscription positions for consumers. This means consumers can come and go as they like and pick back up where they left off easily. Before NATS Streaming supported clustering, this made a lot of sense because the bookkeeping was all in one server. But with clustering, this data must be replicated just like the message data, which poses a performance challenge.

The alternative is to punt the problem to the consumer. But also keep in mind that consumers might not have access to fast stable storage, such as with an IoT device or ephemeral container. Is there a way we can split the difference?

We can store the offsets themselves directly in the log. As of 0.9, this is what Kafka does. Before that, clients had to manage offsets themselves or store them in ZooKeeper. This forced a dependency on ZooKeeper for clients but also posed a major bottleneck since ZooKeeper is relatively low throughput. But by storing offsets in the log, they are treated just like any other write to a Kafka topic, which scales quite well (offsets are stored in an internal Kafka topic called __consumer_offsets partitioned by consumer group; there is also a special read cache for speeding up the read path).

Clients periodically checkpoint their offset to the log. We then use log compaction to retain only the latest offsets. Log compaction works by rewriting the log to retain only the latest message for a given key. On recovery, clients fetch the latest offset from the log. The important part here is we need to structure our keys such that compaction retains the latest offset for each unique consumer. For example, we might structure it as consumer-topic-partition. We end up with something resembling the following, where the message value is the offset:

The above log is uncompacted. Once compacted, it becomes the following:

Note that compaction violates some of our previous assumptions around the immutability of the log, but that’s for another discussion.

There are a number of advantages to this approach. We get fault-tolerance and durability due to the fact that our log is already fault-tolerant and durable as designed earlier. We get consistent reads again due to our replication scheme. Unlike ZooKeeper, we get high write throughput. And we reuse existing structures, so there’s less server complexity. We’re just reusing the log, there aren’t really any major new codepaths.

Interestingly, the bookkeeping needed for flow control in push-based systems—such as acks in NATS Streaming—serves much the same purpose as offset tracking in pull-based systems, since it needs to track position. The difference comes when we allow out-of-order processing. If we don’t allow it, then acks are simply a high-water mark that indicate the client is “this far” caught up. The problem with push is we also have to deal with redeliveries, whereas with pull they are implicitly handled by the client.  If we do allow out-of-order processing, then we need to track individual, in-flight messages, which is what per-message acks allow us to do. In this case, the system starts to look less like a log and more like a message queue. This makes push even more complicated.

The nice thing about reusing the log to track offsets is it greatly reduces the amount of code and complexity needed. Since NATS Streaming allows out-of-order processing, it uses a separate acking subsystem which otherwise has the same requirements as an offset-tracking subsystem.

In part four of this series, we will discuss some of the key trade-offs involved with implementing a distributed log and some lessons learned while building NATS Streaming.

Thrift on Steroids: A Tale of Scale and Abstraction

Apache Thrift is an RPC framework developed at Facebook for building “scalable cross-language services.” It consists of an interface definition language (IDL), communication protocol, API libraries, and a code generator that allows you to build and evolve services independently and in a polyglot fashion across a wide range of languages. This is nothing new and has been around for over a decade now.

There are a number of notable users of Thrift aside from Facebook, including Twitter (mainly by way of Finagle), Foursquare, Pinterest, Uber (via TChannel), and Evernote, among others—and for good reason, Thrift is mature and battle-tested.

The white paper explains the motivation behind Thrift in greater detail, though I think the following paragraph taken from the introduction does a pretty good job of summarizing it:

As Facebook’s traffic and network structure have scaled, the resource demands of many operations on the site (i.e. search, ad selection and delivery, event logging) have presented technical requirements drastically outside the scope of the LAMP framework. In our implementation of these services, various programming languages have been selected to optimize for the right combination of performance, ease and speed of development, availability of existing libraries, etc. By and large, Facebook’s engineering culture has tended towards choosing the best tools and implementations available over standardizing on any one programming language and begrudgingly accepting its inherent limitations.

Basically, as Facebook scaled, they moved more and more away from PHP and the LAMP stack and became increasingly polyglot. I think this same evolution is seen at most startups as they grow into themselves. We saw a similar transition in my time at Workiva, moving from our monolothic Python application on Google App Engine to a polyglot service-oriented architecture in AWS. It was an exciting but awkward time as we went through our adolescence as an engineering culture and teams started to find their identities. Teams learned what it meant to build backward-compatible APIs and loosely coupled services, how to deprecate APIs, how to build resilient and highly available systems, how to properly instrument services and diagnose issues, how to run and manage the underlying infrastructure, and—most importantly—how to collaborate with each other. There was lots of stumbling and mistakes along the way, lots of postmortems, lots of stress, but with that comes the learning and growing. The payoff is big but the process is painful. I don’t think it ever isn’t.

With one or two services written in the same language and relatively few developers, it was easy to just stick with “REST” (in quotes because it’s always a bastardized version of what REST ought to be), sling some JSON around, and call it a day. As the number of tech stacks and integration points increase, it becomes apparent that some standards are important. And once things are highly polyglot with lots of developers and lots of services running with lots of versions, strict service contracts become essential.

Uber has a blog post on building microservices that explains this and why they settled on Thrift to solve this problem.

Since the number of service calls grows rapidly, it is necessary to maintain a well-defined interface for every call. We knew we wanted to use an IDL for managing this interface, and we ultimately decided on Thrift. Thrift forces service owners to publish strict interface definitions, which streamlines the process of integrating with services. Calls that do not abide by the interface are rejected at the Thrift level instead of leaking into a service and failing deeper within the code. This strategy of publicly declaring your interface emphasizes the importance of backwards compatibility, since multiple versions of a service’s Thrift interface could be in use at any given time. The service author must not make breaking changes, and instead must only make non-breaking additions to the interface definition until all consumers are ready for deprecation.

Early on, I was tasked with building a unified messaging solution that would help with our integration challenges. The advantages of a unified solution should be obvious: reusability (before this, everyone was solving the problem in their own way), focus (allow developers to focus on their problem space, not the glue), acceleration (if the tools are already available, there’s less work to do), and shared pain points (it’s a lot easier to prioritize your work when everyone is complaining about the same thing). Also, a longer term benefit is developing the knowledge of this shared solution into an organizational competency which has a sort of “economy of scale” to it. Our job was not just to ship a messaging platform but evangelize it and help other teams to be successful with it. We did this through countless blog posts, training sessions, workshops, talks, and even a podcast.

Before we set out on building a common messaging solution, there were a few key principles we used to guide ourselves. We wanted to provide a core set of tools, libraries, and infrastructure for service integration. We wanted a solution that was rigid yet flexible. We provide only a minimal set of messaging patterns to act as generic building blocks with strict, strongly typed APIs, and promote design best practices and a service-oriented mindset. This meant supporting service evolution and API iteration through versioning and backward compatibility, allowing for resiliency patterns like timeouts, retries, circuit breakers, etc., and generally advocating asynchronous, loosely coupled communication. Lastly, we had to keep in mind that, at the end of the day, developers are just trying to ship stuff, so we had to balance these concerns out with ergonomics and developer experience so they could build, integrate, and ship quickly.

As much as I think RPC is a bad abstraction, it’s what developers want. If you don’t provide them with an RPC solution, they will build their own, so we had to provide first-class support for it. We evaluated solutions in the RPC space. We looked at GRPC extensively, which is the new RPC hotness from Google, but it had a few key drawbacks, namely its “newness” (it was still in early beta at the time and has since been almost entirely rewritten), it’s coupled to HTTP/2 as a transport (which at the time had fairly limited support), and it lacks support for JavaScript (let alone Dart, which is what most of our client applications were being written in). Avro was another we looked at.

Ultimately, we settled on Thrift due to its maturity and wide use in production, its performance, its architecture (it separates out the transports, protocols, and RPC layer with the first two being pluggable), its rich feature set, and its wide range of language support (checking off all the languages we standardized on as a company including Go, Java, Python, JavaScript, and Dart). Thrift is not without its problems though—more on this in a bit.

In addition to RPC, we wanted to promote a more asynchronous, message-passing style of communication with pub/sub. This would allow for greater flexibility in messaging patterns like fan-out and fan-in, interest-based messaging, and reduced coupling and fragility of services. This enables things like the worker pattern where we can distribute work to a pool of workers and scale that pool independently, whereas RPC tends to promote more stateful types of services. In my experience, developers tend to bias towards stateful services since this is how we’ve built things for a long time, but as we’ve entered the cloud-native era, things are running in containers which are autoscaled, more ephemeral, and more distributed. We have to grapple with the complexity imposed by distributed systems. This is why asynchronous messaging is important and why we wanted to support it from the onset.

We selected NATS as a messaging backplane because of its simplicity, performance, scalability, and adoption of the cloud-native mentality. When it comes to service integration, you need an always-on dial tone and NATS provides just that. Because of Thrift’s pluggable transport layer, we could build a NATS RPC transport while also providing HTTP and TCP transports.

Unfortunately, Thrift doesn’t provide any kind of support for pub/sub, and we wanted the same guarantees for it that we had with RPC, like type safety and versioning with code-generated APIs and service contracts. Aside from this, Thrift has a number of other, more glaring problems:

  • Head-of-line blocking: a single, slow request will block any subsequent requests for a client.
  • Out-of-order responses: an out-of-order response puts a Thrift transport in a bad state, requiring it to be torn down and reestablished, e.g. if a slow request times out at the client, the client issues a subsequent request, and a response comes back for the first request, the client blows up.
  • Concurrency: a Thrift client cannot be shared between multiple threads of execution, requiring each thread to have its own client issuing requests sequentially. This, combined with head-of-line blocking, is a major performance killer. This problem is compounded when each transport has its own resources, such as a socket.
  • RPC timeouts: Thrift does not provide good facilities for per-request timeouts, instead opting for a global transport read timeout.
  • Request headers: Thrift does not provide support for request metadata, making it difficult to implement things like authentication/authorization and distributed tracing. Instead, you are required to bake these things into your IDL or in a wrapped transport. The problem with this is it puts the onus on service providers rather than allowing an API gateway or middleware to perform these functions in a centralized way.
  • Middleware: Thrift does not have any support for client or server middleware. This means clients must be wrapped to implement interceptor logic and middleware code must be duplicated within handler functions. This makes it impossible to implement AOP-style logic in a clean, DRY way.

Twitter’s Finagle addresses many of these issues but is solely for the JVM, so we decided to address Thrift’s shortcomings in a cross-platform way without completely reinventing the wheel. That is, we took Thrift and extended it. What we ended up with was Frugal, a superset of Thrift recently open sourced that aims to solve the problems described above while also providing support for asynchronous pub/sub APIs—a sort of Thrift on steroids as I’ve come to call it. Its key features include:

  • Request multiplexing: client requests are fully multiplexed, allowing them to be issued concurrently while simultaneously avoiding the head-of-line blocking and out-of-order response problems. This also lays some groundwork for asynchronous messaging patterns.
  • Thread-safety: clients can be safely shared between multiple threads in which requests can be made in parallel.
  • Pub/sub: IDL and code-generation extensions for defining pub/sub APIs in a type-safe way.
  • Request context: a first-class request context object is added to every operation which allows defining request/response headers and per-request timeouts. By making the context part of the Frugal protocol, headers can be introspected or even injected by external middleware. This context could be used to send OAuth2 tokens and user-context information, avoiding the need to include it everywhere in your IDL and handler logic. Correlation IDs for distributed tracing purposes are also built into the request context.
  • Middleware: client- and server- side middleware is supported for RPC and pub/sub APIs. This allows you to implement interceptor logic around handler functions, e.g. for authentication, logging, or retry policies. One can easily integrate OpenTracing as a middleware, for example.
  • Cross-language: support for Go, Java, Dart, and Python (2.7 and 3.5).

Frugal adds a second kind of transport alongside Thrift’s RPC transport for pub/sub. With this, we provide a NATS transport for both pub/sub and RPC (internally, Workiva also has an at-least-once delivery pub/sub transport built around Amazon SQS for mission-critical data). In addition to this, we built a SDK which developers use to connect to the messaging infrastructure (such as NATS) with minimal ceremony. The messaging SDK played a vital role not just in making it easy for developers to adopt and integrate, but providing us a shim where we could introduce sweeping changes across the organization in one place, such as adding instrumentation, tracing, and authentication. This enabled us to roll critical integration components out to every service by making a change in one place.

To support pub/sub, we extended the Thrift IDL with an additional top-level construct called a scope, which is effectively a pub/sub namespace (basically what a service is to RPC). We wrote the IDL using a parsing expression grammar which allows us to generate a parser. We then implemented a code generator for the various language targets. The Frugal compiler is written in Go and is, at least in my opinion, much more maintainable than Thrift’s C++ codebase. However, the language libraries make use of the existing Thrift APIs, such as protocols, transports, etc. This means we didn’t need to implement any of the low-level mechanics like serialization.

I’ve since left Workiva (and am now actually working on NATS), but as far as I know, Frugal helps power nearly every production service at the company. It was an interesting experience from which I learned a lot. I was happy to see some of that work open sourced so others could use it and learn from it.

Of course, if I were starting over today, things would probably look different. GRPC is much more mature and the notion of a “service mesh” has taken the container world by storm with things like Istio, Linkerd, and Envoy. What we built was Workiva’s service mesh, we just didn’t have a name for it, so we called it a “Messaging SDK.” The corollary to this is you don’t need to adopt bleeding-edge tech to be successful. The concepts are what’s important, and if enough people are working on the same types of problems in parallel, they will likely converge on solutions that look very similar to each other given enough time and enough people working on them.

I think there’s a delicate balance between providing solutions that are “easy” from a developer point of view but may provide longer term drawbacks when it comes to building complex systems the “right” way. I see RPC as an example of this. It’s an “easy” abstraction but it hides a lot of complexity. Service meshes might even be in this category, but they have obvious upsides when it comes to building software in a way that is scalable. Peter Alvaro’s Strange Loop talk “I See What You Mean” does a great job of articulating this dilemma, which I’ve also written about myself. In the end, we decided to optimize for shipping, but we took a principled approach: provide the tools developers need (or want) but help educate them to utilize those tools in a way that allows them to ship products that are reliable and maintainable. Throwing tools or code over the wall is not enough.