Liftbridge 1.0

Liftbridge has evolved a lot since making the first commit in October 2017, but the vision has remained the same: provide a message-streaming solution with a focus on simplicity and usability. This is demonstrated through many of the design and implementation decisions. A few examples include the use of NATS as the messaging backbone, avoiding heavy dependencies on runtimes like the JVM and external coordination systems like ZooKeeper, compiling down to a small, single static binary, opting for a gRPC-based API, and relying on plain YAML configuration. Liftbridge is written in Go, and the code is structured with the hopes that it’s relatively easy for someone to hop in and contribute to the project.

The goal of Liftbridge is to bridge the gap between sophisticated but complex log-based messaging systems like Apache Kafka and Apache Pulsar and simpler, cloud-native solutions. If you’re not familiar with the project, the introduction post sheds some light. It’s been nearly two years since I open-sourced Liftbridge, and I’m pleased to announce the project has now reached a 1.0 release. In practical terms, what this means is that the API has reached a point of stability suitable for production use and will provide a backward-compatibility commitment going forward. Liftbridge will continue to follow a semantic versioning scheme.

A lot of great features have landed since the project was first conceived in 2016 and started in 2017—replication, log compaction and retention rules, stream partitioning, activity events, and stream pausing to name a few. An official Java client has been implemented and is quickly evolving. Python will follow shortly after. There’s also a lot of exciting stuff on the roadmap ahead including auto-pausing of sparsely used partitions, durable and fault-tolerant consumer groups, a better stream re-partitioning story, and broader client support.

If you’re already using Liftbridge today or are thinking about using it, I’d love to hear from you. Be sure to follow Liftbridge on Twitter and join the community Slack channel to stay up-to-date on the latest developments.

Introducing Liftbridge: Lightweight, Fault-Tolerant Message Streams

Last week I open sourced Liftbridge, my latest project and contribution to the Cloud Native Computing Foundation ecosystem. Liftbridge is a system for lightweight, fault-tolerant (LIFT) message streams built on NATS and gRPC. Fundamentally, it extends NATS with a Kafka-like publish-subscribe log API that is highly available and horizontally scalable.

I’ve been working on Liftbridge for the past couple of months, but it’s something I’ve been thinking about for over a year. I sketched out the design for it last year and wrote about it in January. It was largely inspired while I was working on NATS Streaming, which I’m currently still the second top contributor to. My primary involvement with NATS Streaming was building out the early data replication and clustering solution for high availability, which has continued to evolve since I left the project. In many ways, Liftbridge is about applying a lot of the things I learned while working on NATS Streaming as well as my observations from being closely involved with the NATS community for some time. It’s also the product of scratching an itch I’ve had since these are the kinds of problems I enjoy working on, and I needed something to code.

At its core, Liftbridge is a server that implements a durable, replicated message log for the NATS messaging system. Clients create a named stream which is attached to a NATS subject. The stream then records messages on that subject to a replicated write-ahead log. Multiple consumers can read back from the same stream, and multiple streams can be attached to the same subject.

The goal is to bridge the gap between sophisticated log-based messaging systems like Apache Kafka and Apache Pulsar and simpler, cloud-native systems. This meant not relying on external coordination services like ZooKeeper, not using the JVM, keeping the API as simple and small as possible, and keeping client libraries thin. The system is written in Go, making it a single static binary with a small footprint (~16MB). It relies on the Raft consensus algorithm to do coordination. It has a very minimal API (just three endpoints at the moment). And the API uses gRPC, so client libraries can be generated for most popular programming languages (there is a Go client which provides some additional wrapper logic, but it’s pretty thin). The goal is to keep Liftbridge very lightweight—in terms of runtime, operations, and complexity.

However, the bigger goal of Liftbridge is to extend NATS with a durable, at-least-once delivery mechanism that upholds the NATS tenets of simplicity, performance, and scalability. Unlike NATS Streaming, it uses the core NATS protocol with optional extensions. This means it can be added to an existing NATS deployment to provide message durability with no code changes.

NATS Streaming provides a similar log-based messaging solution. However, it is an entirely separate protocol built on top of NATS. NATS is an implementation detail—the transport—for NATS Streaming. This means the two systems have separate messaging namespaces—messages published to NATS are not accessible from NATS Streaming and vice versa. Of course, it’s a bit more nuanced than this because, in reality, NATS Streaming is using NATS subjects underneath; technically messages can be accessed, but they are serialized protobufs. These nuances often get confounded by firsttime users as it’s not always clear that NATS and NATS Streaming are completely separate systems. NATS Streaming also does not support wildcard subscriptions, which sometimes surprises users since it’s a major feature of NATS.

As a result, Liftbridge was built to augment NATS with durability rather than providing a completely separate system. To be clear, it’s still a separate server, but it merely acts as a write-ahead log for NATS subjects. NATS Streaming provides a broader set of features such as durable subscriptions, queue groups, pluggable storage backends, and multiple fault-tolerance modes. Liftbridge aims to have a relatively small API surface area.

The key features that differentiate Liftbridge are the shared message namespace, wildcards, log compaction, and horizontal scalability. NATS Streaming replicates channels to the entire cluster through a single Raft group, so adding servers does not help with scalability and actually creates a head-of-line bottleneck since everything is replicated through a single consensus group (n.b. NATS Streaming does have a partitioning mechanism, but it cannot be used in conjunction with clustering). Liftbridge allows replicating to a subset of the cluster, and each stream is replicated independently in parallel. This allows the cluster to scale horizontally and partition workloads more easily within a single, multi-tenant cluster.

Some of the key features of Liftbridge include:

  • Log-based API for NATS
  • Replicated for fault-tolerance
  • Horizontally scalable
  • Wildcard subscription support
  • At-least-once delivery support
  • Message key-value support
  • Log compaction by key (WIP)
  • Single static binary (~16MB)
  • Designed to be high-throughput (more on this to come)
  • Supremely simple

Initially, Liftbridge is designed to point to an existing NATS deployment. In the future, there will be support for a “standalone” mode where it can run with an embedded NATS server, allowing for a single deployable process. And in support of the “cloud-native” model, there is work to be done to make Liftbridge play nice with Kubernetes and generally productionalize the system, such as implementing an Operator and providing better instrumentation—perhaps with Prometheus support.

Over the coming weeks and months, I will be going into more detail on Liftbridge, including the internals of it—such as its replication protocol—and providing benchmarks for the system. Of course, there’s also a lot of work yet to be done on it, so I’ll be continuing to work on that. There are many interesting problems that still need solved, so consider this my appeal to contributors. :)

Building a Distributed Log from Scratch, Part 5: Sketching a New System

In part four of this series we looked at some key trade-offs involved with a distributed log implementation and discussed a few lessons learned while building NATS Streaming. In this fifth and final installment, we’ll conclude by outlining the design for a new log-based system that draws from the previous entries in the series.

The Context

For context, NATS and NATS Streaming are two different things. NATS Streaming is a log-based streaming system built on top of NATS, and NATS is a lightweight pub/sub messaging system. NATS was originally built (and then open sourced) as the control plane for Cloud Foundry. NATS Streaming was built in response to the community’s ask for higher-level guarantees—durability, at-least-once delivery, and so forth—beyond what NATS provided. It was built as a separate layer on top of NATS. I tend to describe NATS as a dial tone—ubiquitous and always on—perfect for “online” communications. NATS Streaming is the voicemail—leave a message after the beep and someone will get to it later. There are, of course, more nuances than this, but that’s the gist.

The key point here is that NATS and NATS Streaming are distinct systems with distinct protocols, distinct APIs, and distinct client libraries. In fact, NATS Streaming was designed to essentially act as a client to NATS. As such, clients don’t talk to NATS Streaming directly, rather all communication goes through NATS. However, the NATS Streaming binary can be configured to either embed NATS or point to a standalone deployment. The architecture is shown below in a diagram borrowed from the NATS website.

Architecturally, this makes a lot of sense. It supports the end-to-end principle in that we layer on additional functionality rather than bake it in to the underlying infrastructure. After all, we can always build stronger guarantees on top, but we can’t always remove them from below. This particular architecture, however, introduces a few challenges (disclosure: while I’m still a fan, I’m no longer involved with the NATS project and the NATS team is aware of these problems and no doubt working to address many of them).

First, there is no “cross-talk” between NATS and NATS Streaming, meaning messages published to NATS are not visible in NATS Streaming and vice versa. Again, they are two completely separate systems that just share the same infrastructure. This means we’re not really layering on message durability to NATS, we’re just exposing a new system which provides these semantics.

Second, because NATS Streaming runs as a “sidecar” to NATS and all of its communication runs through NATS, there is an inherent bottleneck at the NATS connection. This may only be a theoretical limit, but it precludes certain optimizations like using sendfile to do zero-copy reads of the log. It also means we rely on timeouts even in cases where the server could send a response immediately, such as when there is no leader elected for the cluster.

Third, NATS Streaming currently lacks a compelling story around linear scaling other than running multiple clusters and partitioning channels among them at the application level. With respect to scaling a single channel, the only alternative at the moment is to partition it into multiple channels at the application level. My hope is that as clustering matures, this will too.

Fourth, without extending its protocol, NATS Streaming’s authorization is intrinsically limited to the authorization provided by NATS since all communication goes through it. In and of itself, this isn’t a problem. NATS supports multi-user authentication and subject-level permissions, but since NATS Streaming uses an opaque protocol atop NATS, it’s difficult to setup proper ACLs at the streaming level. Of course, many layered protocols support authentication, e.g. HTTP atop TCP. For example, the NATS Streaming protocol could carry authentication tokens or session keys, but it currently does not do this.

Fifth, NATS Streaming does not support wildcard semantics, which—at least in my opinion—is a large selling point of NATS and, as a result, something users have come to expect. Specifically, NATS supports two wildcards in subject subscriptions: asterisk (*) which matches any token in the subject (e.g. foo.* matches foo.bar, foo.baz, etc.) and full wildcard (>) which matches one or more tokens at the tail of the subject (e.g. foo.> matches foo.bar, foo.bar.baz, etc.). Note that this limitation in NATS Streaming is not directly related to the overall architecture but more in how we design the log.

More generally, clustering and data replication was more of an afterthought in NATS Streaming. As we discussed in part four, it’s hard to add this after the fact. Combined with the APIs NATS Streaming provides (which do flow control and track consumer state), this creates a lot of complexity in the server.

A New System

I wasn’t involved much with NATS Streaming beyond the clustering implementation. However, from that work—and through my own use of NATS and from discussions I’ve had with the community—I’ve thought about how I would build something like it if I were to start over. It would look a bit different from NATS Streaming and Kafka, yet also share some similarities. I’ve dubbed this theoretical system Jetstream (update: this is now Liftbridge), though I’ve yet to actually build anything beyond small prototypes. It’s a side project of mine I hope to get to at some point.

Core NATS has a strong community with solid mindshare, but NATS Streaming doesn’t fully leverage this since it’s a new silo. Jetstream aims to address the above problems starting from a simple proposition: many people are already using NATS today and simply want streaming semantics for what they already have. However, we must also acknowledge that other users are happy with NATS as it currently is and have no need for additional features that might compromise simplicity or performance. This was a deciding factor in choosing not to build NATS Streaming’s functionality directly into NATS.

Like NATS Streaming, Jetstream is a separate component which acts as a NATS client. Unlike NATS Streaming, it augments NATS as opposed to implementing a wholly new protocol. More succinctly, Jetstream is a durable stream augmentation for NATS. Next, we’ll talk about how it accomplishes this by sketching out a design.

Cross-Talk

In NATS Streaming, the log is modeled as a channel. Clients create channels implicitly by publishing or subscribing to a topic (called a subject in NATS). A channel might be foo but internally this is translated to a NATS pub/sub subject such as _STAN.pub.foo. Therefore, while NATS Streaming is technically a client of NATS, it’s done so just to dispatch communication between the client and server. The log is implemented on top of plain pub/sub messaging.

Jetstream is merely a consumer of NATS. In it, the log is modeled as a stream. Clients create streams explicitly, which are subscriptions to NATS subjects that are sequenced, replicated, and durably stored. Thus, there is no “cross-talk” or internal subjects needed because Jetstream messages are NATS messages. Clients just publish their messages to NATS as usual and, behind the scenes, Jetstream will handle storing them in a log. In some sense, it’s just an audit log of messages flowing through NATS.

With this, we get wildcards for free since streams are bound to NATS subjects. There are some trade-offs to this, however, which we will discuss in a bit.

Performance

Jetstream does not track subscription positions. It is up to consumers to track their position in a stream or, optionally, store their position in a stream (more on this later). This means we treat a stream as a simple log, allowing us to do fast, sequential disk I/O and minimize replication and protocol chatter as well as code complexity.

Consumers connect directly to Jetstream using a pull-based socket API. The log is stored in the manner described in part one. This enables us to do zero-copy reads from a stream and other important optimizations which NATS Streaming is precluded from doing. It also simplifies things around flow control and batching as we discussed in part three.

Scalability

Jetstream is designed to be clustered and horizontally scalable from the start. We make the observation that NATS is already efficient at routing messages, particularly with high consumer fan-out, and provides clustering of the interest graph. Streams provide the unit of storage and scalability in Jetstream.

A stream is a named log attached to a NATS subject. Akin to a partition in Kafka, each stream has a replicationFactor, which controls the number of nodes in the Jetstream cluster that participate in replicating the stream, and each stream has a leader. The leader is responsible for receiving messages from NATS, sequencing them, and performing replication (NATS provides per-publisher message ordering).

Like Kafka’s controller, there is a single metadata leader for a Jetstream cluster which is responsible for processing requests to create or delete streams. If a request is sent to a follower, it’s automatically forwarded to the leader. When a stream is created, the metadata leader selects replicationFactor nodes to participate in the stream (initially, this selection is random but could be made more intelligent, e.g. selecting based on current load) and replicates the stream to all nodes in the cluster. Once this replication completes, the stream has been created and its leader begins processing messages. This means NATS messages are not stored unless there is a stream matching their subject (this is the trade-off to support wildcards, but it also means we don’t waste resources storing messages we might not care about). This can be mitigated by having publishers ensure a stream exists before publishing, e.g. at startup.

There can exist multiple streams attached to the same NATS subject or even subjects that are semantically equivalent, e.g. foo.bar and foo.*. Each of these streams will receive a copy of the message as NATS handles this fan-out. However, the stream name is unique within a given subject. For example, creating two streams for the subject foo.bar named foo and bar, respectively, will create two streams which will independently sequence all of the messages on the NATS subject foo.bar, but attempting to create two streams for the same subject both named foo will result in creating just a single stream (creation is idempotent).

With this in mind, we can scale linearly with respect to consumers—covered in part three—by adding more nodes to the Jetstream cluster and creating more streams which will be distributed among the cluster. This has the advantage that we don’t need to worry about partitioning so long as NATS is able to withstand the load (there is also an assumption that we can ensure reasonable balance of stream leaders across the cluster). We’ve basically split out message routing from storage and consumption, which allows us to scale independently.

Additionally, streams can join a named consumer group. This, in effect, partitions a NATS subject among the streams in the group, again covered in part three, allowing us to create competing consumers for load-balancing purposes. This works by using NATS queue subscriptions, so the downside is partitioning is effectively random. The upside is consumer groups don’t affect normal streams.

Compaction and Offset Tracking

Streams support multiple log-compaction rules: time-based, message-based, and size-based. As in Kafka, we also support a fourth kind: key compaction. This is how offset storage will work, which was described in part three, but it also enables some other interesting use cases like KTables in Kafka Streams.

As discussed above, messages in Jetstream are simply NATS messages. There is no special protocol needed for Jetstream to process messages. However, publishers can choose to optionally “enhance” their messages by providing additional metadata and serializing their messages into envelopes. The envelope includes a special cookie Jetstream uses to detect if a message is an envelope or a simple NATS message (if the cookie is present by coincidence and envelope deserialization fails, we fall back to treating it as a normal message).

One of the metadata fields on the envelope is an optional message key. A stream can be configured to compact by key. In this case, it retains only the last message for each key (if no key is present, the message is always retained).

Consumers can optionally store their offsets in Jetstream (this can also be transparently managed by a client library similar to Kafka’s high-level consumer). This works by storing offsets in a stream keyed by consumer. A consumer (or consumer library) publishes their latest offset. This allows them to later retrieve their offset from the stream, and key compaction means Jetstream will only retain the latest offset for each consumer. For improved performance, the client library should only periodically checkpoint this offset.

Authorization

Because Jetstream is a separate server which is merely a consumer of NATS, it can provide ACLs or other authorization mechanisms on streams. A simple configuration might be to restrict NATS access to Jetstream and configure Jetstream to only allow access to certain subjects. There is more work involved because there is a separate access-control system, but this gives greater flexibility by separating out the systems.

At-Least Once Delivery

To ensure at-least-once delivery of messages, Jetstream relies on replication and publisher acks. When a message is received on a stream, it’s assigned an offset by the leader and then replicated. Upon a successful replication, the stream publishes an ack to NATS on the reply subject of the message, if present (the reply subject is a part of the NATS message protocol).

There are two implications with this. First, if the publisher doesn’t care about ensuring its message is stored, it need not set a reply subject. Second, because there are potentially multiple (or no) streams attached to a subject (and creation/deletion of streams is dynamic), it’s not possible for the publisher to know how many acks to expect. This is a trade-off we make for enabling subject fan-out and wildcards while remaining scalable and fast. We make the assertion that if guaranteed delivery is important, the publisher should be responsible for determining the destination streams a priori. This allows attaching streams to a subject for use cases that do not require strong guarantees without the publisher having to be aware. Note that this might be an area for future improvement to increase usability, such as storing streams in a registry. However, this is akin to other similar systems, like Kafka, where you must first create a topic and then you publish to that topic.

One caveat to this is if there are existing application-level uses of the reply subject on NATS messages. That is, if other systems are already publishing replies, then Jetstream will overload this. The alternative would be to require the envelope, which would include a canonical reply subject for acks, for at-least-once delivery. Otherwise we would need a change to the NATS protocol itself.

Replication Protocol

For metadata replication and leadership election, we rely on Raft. However, for replication of streams, rather than using Raft or other quorum-based techniques, we use a technique similar to Kafka as described in part two.

For each stream, we maintain an in-sync replica set (ISR), which is all of the replicas currently up to date (at stream creation time, this is all of the replicas). During replication, the leader writes messages to a WAL, and we only wait on replicas in the ISR before committing. If a replica falls behind or fails, it’s removed from the ISR. If the leader fails, any replica in the ISR can take its place. If a failed replica catches back up, it rejoins the ISR. The general stream replication process is as follows:

  1. Client creates a stream with a replicationFactor of n.
  2. Metadata leader selects n replicas to participate and one leader at random (this comprises the initial ISR).
  3. Metadata leader replicates the stream via Raft to the entire cluster.
  4. The nodes participating in the stream initialize it, and the leader subscribes to the NATS subject.
  5. The leader initializes the high-water mark (HW) to 0. This is the offset of the last committed message in the stream.
  6. The leader begins sequencing messages from NATS and writes them to the log uncommitted.
  7. Replicas consume from the leader’s log to replicate messages to their own log. We piggyback the leader’s HW on these responses, and replicas periodically checkpoint the HW to stable storage.
  8. Replicas acknowledge they’ve replicated the message.
  9. Once the leader has heard from the ISR, the message is committed and the HW is updated.

Note that clients only see committed messages in the log. There are a variety of failures that can occur in the replication process. A few of them are described below along with how they are mitigated.

If a follower suspects that the leader has failed, it will notify the metadata leader. If the metadata leader receives a notification from the majority of the ISR within a bounded period, it will select a new leader for the stream, apply this update to the Raft group, and notify the replica set. These notifications need to go through Raft as well in the event of a metadata leader failover occurring at the same time as a stream leader failure. Committed messages are always preserved during a leadership change, but uncommitted messages could be lost.

If the stream leader detects that a replica has failed or fallen too far behind, it removes the replica from the ISR by notifying the metadata leader. The metadata leader replicates this fact via Raft. The stream leader continues to commit messages with fewer replicas in the ISR, entering an under-replicated state.

When a failed replica is restarted, it recovers the latest HW from stable storage and truncates its log up to the HW. This removes any potentially uncommitted messages in the log. The replica then begins fetching messages from the leader starting at the HW. Once the replica has caught up, it’s added back into the ISR and the system resumes its fully replicated state.

If the metadata leader fails, Raft will handle electing a new leader. The metadata Raft group stores the leader and ISR for every stream, so failover of the metadata leader is not a problem.

There are a few other corner cases and nuances to handle, but this covers replication in broad strokes. We also haven’t discussed how to implement failure detection (Kafka uses ZooKeeper for this), but we won’t prescribe that here.

Wrapping Up

This concludes our series on building a distributed log that is fast, highly available, and scalable. In part one, we introduced the log abstraction and talked about the storage mechanics behind it. In part two, we covered high availability and data replication. In part three, we we discussed scaling message delivery. In part four, we looked at some trade-offs and lessons learned. Lastly, in part five, we outlined the design for a new log-based system that draws from the previous entries in the series.

The goal of this series was to learn a bit about the internals of a log abstraction, to learn how it can achieve the three priorities described earlier, and to learn some applied distributed systems theory. Hopefully you found it useful or, at the very least, interesting.

If you or your company are looking for help with system architecture, performance, or scalability, contact Real Kinetic.