Building a Distributed Log from Scratch, Part 4: Trade-Offs and Lessons Learned

In part three of this series we talked about scaling message delivery in a distributed log. In part four, we’ll look at some key trade-offs involved with such systems and discuss a few lessons learned while building NATS Streaming.

Competing Goals

There are a number of competing goals when building a distributed log (these goals also extend to many other types of systems). Recall from part one that our key priorities for this type of system are performance, high availability, and scalability. The preceding parts of this series described at various levels how we can accomplish these three goals, but astute readers likely noticed that some of these things conflict with one another.

It’s easy to make something fast if it’s not fault-tolerant or scalable. If our log runs on a single server, our only constraints are how fast we can send data over the network and how fast the disk I/O is. And this is how a lot of systems, including many databases, tend to work—not only because it performs well, but because it’s simple. We can make these types of systems fault-tolerant by introducing a standby server and allowing clients to failover, but there are a couple issues worth mentioning with this.

With data systems, such as a log, high availability does not just pertain to continuity of service, but also availability of data. If I write data to the system and the system acknowledges that, that data should not be lost in the event of a failure. So with a standby server, we need to ensure data is replicated to avoid data loss (otherwise, in the context of a message log, we must relax our requirement of guaranteed delivery).

NATS Streaming initially shipped as a single-node system, which raised immediate concerns about production-readiness due to a single point of failure. The first step at trying to address some of these concerns was to introduce a fault-tolerance mode whereby a group of servers would run and only one would run as the active server. The active server would obtain an exclusive lock and process requests. Upon detecting a failure, standby servers would attempt to obtain the lock and become the active server.

Aside from the usual issues with distributed locks, this design requires a shared storage layer. With NATS Streaming, this meant either a shared volume, such as Gluster or EFS, or a shared MySQL database. This poses a performance challenge and isn’t particularly “cloud-native” friendly. Another issue is data is not replicated unless done so out-of-band by the storage layer. When we add in data replication, performance is hamstrung even further. But this was a quick and easy solution that offered some solace with respect to a SPOF (disclosure: I was not involved with NATS or NATS Streaming at this time). The longer term solution was to provide first-class clustering and data-replication support, but sometimes it’s more cost effective to provide fast recovery of a single-node system.

Another challenge with the single-node design is scalability. There is only so much capacity that one node can handle. At a certain point, scaling out becomes a requirement, and so we start partitioning. This is a common technique for relational databases where we basically just run multiple databases and divide up the data by some key. NATS Streaming is no different as it offers a partitioning story for dividing up channels between servers. The trouble with partitioning is it complicates things as it typically requires cooperation from the application. To make matters worse, NATS Streaming does not currently offer partitioning at the channel level, which means if a single topic has a lot of load, the solution is to manually partition it into multiple channels at the application level. This is why Kafka chose to partition its topics by default.

So performance is at odds with fault-tolerance and scalability, but another factor is what I call simplicity of mechanism. That is, the simplicity of the design plays an important role in the performance of a system. This plays out at multiple levels. We saw that, at an architectural level, using a simple, single-node design performs best but falls short as a robust solution. In part one, we saw that using a simple file structure for our log allowed us to take advantage of the hardware and operating system in terms of sequential disk access, page caching, and zero-copy reads. In part two, we made the observation that we can treat the log itself as a replicated WAL to solve the problem of data replication in an efficient way. And in part three, we discussed how a simple pull-based model can reduce complexity around flow control and batching.

At the same time, simplicity of “UX” makes performance harder. When I say UX, I mean the ergonomics of the system and how easy it is to use, operate, etc. NATS Streaming initially optimized for UX, which is why it fills an interesting space. Simplicity is a core part of the NATS philosophy, so it caught a small mindshare with developers frustrated or overwhelmed by Kafka. There is appetite for a “Kafka lite,” something which serves a similar purpose to Kafka but without all the bells and whistles and probably not targeted at large enterprises—a classic Innovator’s Dilemma to be sure.

NATS Streaming tracks consumer positions automatically, provides simple APIs, and uses a simple push-based protocol. This also means building a client library is a much less daunting task. The downside is the server needs to do more work. With a single node, as NATS Streaming was initially designed, this isn’t much of a problem. Where it starts to rear its head is when we need to replicate that state across a cluster of nodes. This has important implications with respect to performance and scale. Smart middleware has a natural tendency to become more complex, more fragile, and slower. The end-to-end principle attests to this. Amusingly, NATS Streaming was originally named STAN because it’s the opposite of NATS, a fast and simple messaging system with minimal guarantees.

Simplicity of mechanism tends to simply push complexity around in the system. For example, NATS Streaming provides an ergonomic API to clients by shifting the complexity to the server. Kafka scales and performs exceptionally well by shifting the complexity to other parts of the system, namely the client and ZooKeeper.

Scalability and fault-tolerance are equally at odds with simplicity for reasons mostly described above. The important point here is that these cannot be an afterthought. As I learned while implementing clustering in NATS Streaming, you can’t cleanly and effectively bolt on fault-tolerance onto an existing complex system. One of the laws of Systemantics comes to mind here: “A complex system designed from scratch never works and cannot be patched up to make it work. You have to start over, beginning with a working simple system.” Scalability and fault-tolerance need to be designed from day one.

Lastly, availability is inherently at odds with consistency. This is simply the CAP theorem. Guaranteeing strong consistency requires a quorum when replicating data, which hinders availability and performance. The key here is minimize what you need to replicate or relax your requirements.

Lessons Learned

The section above already contains several lessons learned in the process of working on NATS Streaming and implementing clustering, but I’ll capture a few important ones here.

First, distributed systems are complex enough. Simple is usually better—and faster. Again, we go back to the laws of systems here: “A complex system that works is invariably found to have evolved from a simple system that works.”

Second, lean on existing work. A critical part to delivering clustering rapidly was sticking with Raft and an existing Go implementation for leader election and data replication. There was considerable time spent designing a proprietary solution before I joined which still had edge cases not fully thought through. Not only is Raft off the shelf, it’s provably correct (implementation bugs notwithstanding). And following from the first lesson learned, start with a solution that works before worrying about optimization. It’s far easier to make a correct solution fast than it is to make a fast solution correct. Don’t roll your own coordination protocol if you don’t need to (and chances are you don’t need to).

There are probably edge cases for which you haven’t written tests. There are many failures modes, and you can only write so many tests. Formal methods and property-based testing can help a lot here. Similarly, chaos and fault-injection testing such as Kyle Kingsbury’s Jepsen help too.

Lastly, be honest with your users. Don’t try to be everything to everyone. Instead, be explicit about design decisions, trade-offs, guarantees, defaults, etc. If there’s one takeaway from Kyle’s Jepsen series it’s that many vendors are dishonest in their documentation and marketing. MongoDB became infamous for having unsafe defaults and implementation issues early on, most likely because they make benchmarks look much more impressive.

In part five of this series, we’ll conclude by outlining the design for a new log-based system that draws from ideas in the previous entries in the series.

Building a Distributed Log from Scratch, Part 3: Scaling Message Delivery

In part two of this series we discussed data replication within the context of a distributed log and how it relates to high availability. Next, we’ll look at what it takes to scale the log such that it can handle non-trivial workloads.

Data Scalability

A key part of scaling any kind of data-intensive system is the ability to partition the data. Partitioning is how we can scale a system linearly, that is to say we can handle more load by adding more nodes. We make the system horizontally scalable.

Kafka was designed this way from the beginning. Topics are partitioned and ordering is only guaranteed within a partition. For example, in an e-commerce application, we might have two topics, purchases and inventory, each with two partitions. These partitions allow us to distribute reads and writes across a set of brokers. In Kafka, the log is actually the partition.

The challenge with this is how we partition the data. We might distribute data using round robin, in effect randomly distributing it. The problem with this is we lose out on ordering, which is an important characteristic of the log. For example, imagine we have add and remove inventory operations. With random partitioning, we might end up with a remove followed by an add getting processed if they’re placed in different partitions. However, if they’re placed in the same partition, we know they will be ordered correctly from the perspective of the publisher.

We could also distribute by hashing a key and sending all writes with the same keys to the same partitions or some custom partitioning strategy along these lines. Continuing with our example, we might partition purchases by account name and inventory by SKU. This way, all purchase operations by the same account are ordered, as are all inventory operations pertaining to the same SKU. The diagram below shows a (naive) custom strategy that partitions topics by ranges based on the account and SKU.

The important point here is that how you partition your data is largely dependent on your application and its usage patterns, but partitioning is a critical part of scalability. It allows you to scale your workload processing by dividing up responsibilities, which in turn, allows you to throw more resources at the problem in a tractable way.

One of NATS Streaming’s shortcomings, in my opinion, is that it doesn’t currently offer a good story around partitioning. Channels are totally ordered, essentially making them the equivalent of a Kafka partition. The workaround is to partition among multiple channels at the application level. To some, this is a benefit because it’s conceptually simpler than Kafka, but Kafka was designed as such because scalability was a key design goal from day one.

Consumer Scalability

One challenge with the log is the problem of high fan-out. Specifically, how do we scale to a large number of consumers? In Kafka and NATS Streaming, reads (and writes) are only served by the leader. Similarly, Amazon Kinesis supports up to only five reads per second per shard (a shard is Kinesis’ equivalent of a partition). Thus, if we have five consumers reading from the same shard, we’ve already hit our fan-out limit. The thought is to partition your workload to increase parallelism and/or daisy chain streams to increase fan-out. But if we are trying to do very high fan-out, e.g. to thousands of IoT devices, neither of these are ideal solutions. Not all use cases may lend themselves to partitioning (though one can argue this is just a sign of poor architecting), and chaining up streams (or in Kafka nomenclature, topics) tends to be kludgey.

However, we can make the following observation: with an immutable log, there are no stale or phantom reads. Unlike a database, we can loosen our requirements a bit. Whereas a database is typically mutable, with a log, we’re only appending things. From a consumer’s perspective, a replica is either up-to-date with the leader or in the process of catching up, but in either case, if we read all of the records, we should end up in the same state. Immutability, at least in theory, should make it “easy” to scale to a large number of consumers because we don’t have to read from the leader to get correct results (ignoring log compaction and other “mutable” operations), so long as we’re okay with strong eventual consistency with respect to tailing the log.

In NATS Streaming, with Raft, we could simply allow followers to serve reads and scale reads by increasing the size of the cluster, but this would impact performance because the quorum size would also increase. Instead, we can use “non-voters” to act as read replicas and balance consumers among them. These read replicas do not participate in quorum or leader election, they simply receive committed log entries. In effect, this is the daisy chaining of streams mentioned earlier but done implicitly by the system. This is an otherwise common pattern for increasing consumer fan-out in Kinesis but is usually done in an ad hoc, Rube Goldberg-esque fashion. Note that, in the case of NATS Streaming, this isn’t quite as simple as it sounds due to the delivery mechanism used, which we’ll describe next.

Push vs. Pull

In Kafka, consumers pull data from brokers. In NATS Streaming, brokers push data to consumers. Kafka’s documentation describes this design decision in detail. The key factor largely comes down to flow control. With push, flow control needs to be explicit to deal with diverse consumers. Different consumers will consume at different rates, so the broker needs to be aware of this so as not to overwhelm a consumer.

There are obvious advantages and disadvantages to both approaches. With push, it can be a tricky balance to ensure full utilization of the consumer. We might use a backoff protocol like additive increase/multiplicative decrease, widely known for its use in TCP congestion control, to optimize utilization. NATS Streaming, like many other messaging systems, implements flow control by using acks. Upon receiving a message, consumers ack back to the server, and the server tracks the in-flight messages for each consumer. If that number goes above a certain threshold, the server will stop delivery until more acks are received. There is a similar flow-control mechanism between the publisher and the server. The trade-off here is the server needs to do some bookkeeping, which we’ll get to in a bit. With a pull-based system, flow control is implicit. Consumers simply go at their own pace, and the server doesn’t need to track anything. There is much less complexity with this.

Pull-based systems lend themselves to aggressive batching. With push, we must decide whether to send a message immediately or wait to accumulate more messages before sending. This is a decision pertaining to latency versus throughput. Push is often viewed as an optimization for latency, but if we’re tuning for low latency, we send messages one at a time only for them to end up being buffered on the consumer anyway. With pull, the consumer fetches all available messages after its current position in the log, which basically removes the guesswork around tuning batching and latency.

There are API implications with this decision too, particularly from an ergonomics and complexity perspective. Kafka clients tend to be “thick” and have a lot of complexity. That is, they do a lot because the broker is designed to be simple. That’s my guess as to why there are so few native client libraries up to par with the Java client. NATS Streaming clients, on the other hand, are relatively “thin” because the server does more. We end up just pushing the complexity around based on our design decisions, but one can argue that the smart client and dumb server is a more scalable approach. We’ll go into detail on that in the next installment of this series.

Circling back on consumer scalability, the fact that NATS Streaming uses a push-based model means we can’t simply setup read replicas and balance consumers among them. Instead, we would need to partition consumers among the replicas so that each server is responsible for pushing data to a subset of consumers. The increased complexity over pull becomes immediately apparent here.

Bookkeeping

There are two ways to track position in the log: have the server track it for consumers or have consumers track it themselves. Again, there are trade-offs with this, namely between API simplicity, server complexity, performance, and scalability. NATS Streaming tracks subscription positions for consumers. This means consumers can come and go as they like and pick back up where they left off easily. Before NATS Streaming supported clustering, this made a lot of sense because the bookkeeping was all in one server. But with clustering, this data must be replicated just like the message data, which poses a performance challenge.

The alternative is to punt the problem to the consumer. But also keep in mind that consumers might not have access to fast stable storage, such as with an IoT device or ephemeral container. Is there a way we can split the difference?

We can store the offsets themselves directly in the log. As of 0.9, this is what Kafka does. Before that, clients had to manage offsets themselves or store them in ZooKeeper. This forced a dependency on ZooKeeper for clients but also posed a major bottleneck since ZooKeeper is relatively low throughput. But by storing offsets in the log, they are treated just like any other write to a Kafka topic, which scales quite well (offsets are stored in an internal Kafka topic called __consumer_offsets partitioned by consumer group; there is also a special read cache for speeding up the read path).

Clients periodically checkpoint their offset to the log. We then use log compaction to retain only the latest offsets. Log compaction works by rewriting the log to retain only the latest message for a given key. On recovery, clients fetch the latest offset from the log. The important part here is we need to structure our keys such that compaction retains the latest offset for each unique consumer. For example, we might structure it as consumer-topic-partition. We end up with something resembling the following, where the message value is the offset:

The above log is uncompacted. Once compacted, it becomes the following:

Note that compaction violates some of our previous assumptions around the immutability of the log, but that’s for another discussion.

There are a number of advantages to this approach. We get fault-tolerance and durability due to the fact that our log is already fault-tolerant and durable as designed earlier. We get consistent reads again due to our replication scheme. Unlike ZooKeeper, we get high write throughput. And we reuse existing structures, so there’s less server complexity. We’re just reusing the log, there aren’t really any major new codepaths.

Interestingly, the bookkeeping needed for flow control in push-based systems—such as acks in NATS Streaming—serves much the same purpose as offset tracking in pull-based systems, since it needs to track position. The difference comes when we allow out-of-order processing. If we don’t allow it, then acks are simply a high-water mark that indicate the client is “this far” caught up. The problem with push is we also have to deal with redeliveries, whereas with pull they are implicitly handled by the client.  If we do allow out-of-order processing, then we need to track individual, in-flight messages, which is what per-message acks allow us to do. In this case, the system starts to look less like a log and more like a message queue. This makes push even more complicated.

The nice thing about reusing the log to track offsets is it greatly reduces the amount of code and complexity needed. Since NATS Streaming allows out-of-order processing, it uses a separate acking subsystem which otherwise has the same requirements as an offset-tracking subsystem.

In part four of this series, we will discuss some of the key trade-offs involved with implementing a distributed log and some lessons learned while building NATS Streaming.

Building a Distributed Log from Scratch, Part 2: Data Replication

In part one of this series we introduced the idea of a message log, touched on why it’s useful, and discussed the storage mechanics behind it. In part two, we discuss data replication.

We have our log. We know how to write data to it and read it back as well as how data is persisted. The caveat to this is, although we have a durable log, it’s a single point of failure (SPOF). If the machine where the log data is stored dies, we’re SOL. Recall that one of our three priorities with this system is high availability, so the question is how do we achieve high availability and fault tolerance?

With high availability, we’re specifically talking about ensuring continuity of reads and writes. A server failing shouldn’t preclude either of these, or at least unavailability should be kept to an absolute minimum and without the need for operator intervention. Ensuring this continuity should be fairly obvious: we eliminate the SPOF. To do that, we replicate the data. Replication can also be a means for increasing scalability, but for now we’re only looking at this through the lens of high availability.

There are a number of ways we can go about replicating the log data. Broadly speaking, we can group the techniques into two different categories: gossip/multicast protocols and consensus protocols. The former includes things like epidemic broadcast trees, bimodal multicast, SWIM, HyParView, and NeEM. These tend to be eventually consistent and/or stochastic. The latter, which I’ve described in more detail here, includes 2PC/3PC, Paxos, Raft, Zab, and chain replication. These tend to favor strong consistency over availability.

So there are a lot of ways we can replicate data, but some of these solutions are better suited than others to this particular problem. Since ordering is an important property of a log, consistency becomes important for a replicated log. If we read from one replica and then read from another, it’s important those views of the log don’t conflict with each other. This more or less rules out the stochastic and eventually consistent options, leaving us with consensus-based replication.

There are essentially two components to consensus-based replication schemes: 1) designate a leader who is responsible for sequencing writes and 2) replicate the writes to the rest of the cluster.

Designating a leader can be as simple as a configuration setting, but the purpose of replication is fault tolerance. If our configured leader crashes, we’re no longer able to accept writes. This means we need the leader to be dynamic. It turns out leader election is a well-understood problem, so we’ll get to this in a bit.

Once a leader is established, it needs to replicate the data to followers. In general, this can be done by either waiting for all replicas or waiting for only a quorum (majority) of replicas. There are pros and cons to both approaches.

Pros Cons
All Replicas Tolerates f failures with f+1 replicas Latency pegged to slowest replica
Quorum Hides delay from a slow replica Tolerates f failures with 2f+1 replicas

Waiting on all replicas means we can make progress as long as at least one replica is available. With quorum, tolerating the same amount of failures requires more replicas because we need a majority to make progress. The trade-off is that the quorum hides any delays from a slow replica. Kafka is an example of a system which uses all replicas (with some conditions on this which we will see later), and NATS Streaming is one that uses a quorum. Let’s take a look at both in more detail.

Replication in Kafka

In Kafka, a leader is selected (we’ll touch on this in a moment). This leader maintains an in-sync replica set (ISR) consisting of all the replicas which are fully caught up with the leader. This is every replica, by definition, at the beginning. All reads and writes go through the leader. The leader writes messages to a write-ahead log (WAL). Messages written to the WAL are considered uncommitted or “dirty” initially. The leader only commits a message once all replicas in the ISR have written it to their own WAL. The leader also maintains a high-water mark (HW) which is the last committed message in the WAL. This gets piggybacked on the replica fetch responses from which replicas periodically checkpoint to disk for recovery purposes. The piggybacked HW then allows replicas to know when to commit.

Only committed messages are exposed to consumers. However, producers can configure how they want to receive acknowledgements on writes. It can wait until the message is committed on the leader (and thus replicated to the ISR), wait for the message to only be written (but not committed) to the leader’s WAL, or not wait at all. This all depends on what trade-offs the producer wants to make between latency and durability.

The graphic below shows how this replication process works for a cluster of three brokers: b1, b2, and b3. Followers are effectively special consumers of the leader’s log.

Now let’s look at a few failure modes and how Kafka handles them.

Leader Fails

Kafka relies on Apache ZooKeeper for certain cluster coordination tasks, such as leader election, though this is not actually how the log leader is elected. A Kafka cluster has a single controller broker whose election is handled by ZooKeeper. This controller is responsible for performing administrative tasks on the cluster. One of these tasks is selecting a new log leader (actually partition leader, but this will be described later in the series) from the ISR when the current leader dies. ZooKeeper is also used to detect these broker failures and signal them to the controller.

Thus, when the leader crashes, the cluster controller is notified by ZooKeeper and it selects a new leader from the ISR and announces this to the followers. This gives us automatic failover of the leader. All committed messages up to the HW are preserved and uncommitted messages may be lost during the failover. In this case, b1 fails and b2 steps up as leader.

Follower Fails

The leader tracks information on how “caught up” each replica is. Before Kafka 0.9, this included both how many messages a replica was behind, replica.lag.max.messages, and the amount of time since the replica last fetched messages from the leader, replica.lag.time.max.ms. Since 0.9, replica.lag.max.messages was removed and replica.lag.time.max.ms now refers to both the time since the last fetch request and the amount of time since the replica last caught up.

Thus, when a follower fails (or stops fetching messages for whatever reason), the leader will detect this based on replica.lag.time.max.ms. After that time expires, the leader will consider the replica out of sync and remove it from the ISR. In this scenario, the cluster enters an “under-replicated” state since the ISR has shrunk. Specifically, b2 fails and is removed from the ISR.

Follower Temporarily Partitioned

The case of a follower being temporarily partitioned, e.g. due to a transient network failure, is handled in a similar fashion to the follower itself failing. These two failure modes can really be combined since the latter is just the former with an arbitrarily long partition, i.e. it’s the difference between crash-stop and crash-recovery models.

In this case, b3 is partitioned from the leader. As before, replica.lag.time.max.ms acts as our failure detector and causes b3 to be removed from the ISR. We enter an under-replicated state and the remaining two brokers continue committing messages 4 and 5. Accordingly, the HW is updated to 5 on these brokers.

When the partition heals, b3 continues reading from the leader and catching up. Once it is fully caught up with the leader, it’s added back into the ISR and the cluster resumes its fully replicated state.

We can generalize this to the crash-recovery model. For example, instead of a network partition, the follower could crash and be restarted later. When the failed replica is restarted, it recovers the HW from disk and truncates its log up to the HW. This preserves the invariant that messages after the HW are not guaranteed to be committed. At this point, it can begin catching up from the leader and will end up with a log consistent with the leader’s once fully caught up.

Replication in NATS Streaming

NATS Streaming relies on the Raft consensus algorithm for leader election and data replication. This sometimes comes as a surprise to some as Raft is largely seen as a protocol for replicated state machines. We’ll try to understand why Raft was chosen for this particular problem in the following sections. We won’t dive deep into Raft itself beyond what is needed for the purposes of this discussion.

While a log is a state machine, it’s a very simple one: a series of appends. Raft is frequently used as the replication mechanism for key-value stores which have a clearer notion of “state machine.” For example, with a key-value store, we have set and delete operations. If we set foo = bar and then later set foo = baz, the state gets rolled up. That is, we don’t necessarily care about the provenance of the key, only its current state.

However, NATS Streaming differs from Kafka in a number of key ways. One of these differences is that NATS Streaming attempts to provide a sort of unified API for streaming and queueing semantics not too dissimilar from Apache Pulsar. This means, while it has a notion of a log, it also has subscriptions on that log. Unlike Kafka, NATS Streaming tracks these subscriptions and metadata associated with them, such as where a client is in the log. These have definite “state machines” affiliated with them, like creating and deleting subscriptions, positions in the log, clients joining or leaving queue groups, and message-redelivery information.

Currently, NATS Streaming uses multiple Raft groups for replication. There is a single metadata Raft group used for replicating client state and there is a separate Raft group per topic which replicates messages and subscriptions.

Raft solves both the problems of leader election and data replication in a single protocol. The Secret Lives of Data provides an excellent interactive illustration of how this works. As you step through that illustration, you’ll notice that the algorithm is actually quite similar to the Kafka replication protocol we walked through earlier. This is because although Raft is used to implement replicated state machines, it actually is a replicated WAL, which is exactly what Kafka is. One benefit of using Raft is we no longer have the need for ZooKeeper or some other coordination service.

Raft handles electing a leader. Heartbeats are used to maintain leadership. Writes flow through the leader to the followers. The leader appends writes to its WAL and they are subsequently piggybacked onto the heartbeats which get sent to the followers using AppendEntries messages. At this point, the followers append the write to their own WALs, assuming they don’t detect a gap, and send a response back to the leader. The leader commits the write once it receives a successful response from a quorum of followers.

Similar to Kafka, each replica in Raft maintains a high-water mark of sorts called the commit index, which is the index of the highest log entry known to be committed. This is piggybacked on the AppendEntries messages which the followers use to know when to commit entries in their WALs. If a follower detects that it missed an entry (i.e. there was a gap in the log), it rejects the AppendEntries and informs the leader to rewind the replication. The Raft paper details how it ensures correctness, even in the face of many failure modes such as the ones described earlier.

Conceptually, there are two logs: the Raft log and the NATS Streaming message log. The Raft log handles replicating messages and, once committed, they are appended to the NATS Streaming log. If it seems like there’s some redundancy here, that’s because there is, which we’ll get to soon. However, keep in mind we’re not just replicating the message log, but also the state machines associated with the log and any clients.

There are a few challenges with this replication technique, two of which we will talk about. The first is scaling Raft. With a single topic, there is one Raft group, which means one node is elected leader and it heartbeats messages to followers.

As the number of topics increases, so do the number of Raft groups, each with their own leaders and heartbeats. Unless we constrain the Raft group participants or the number of topics, this creates an explosion of network traffic between nodes.

There are a couple ways we can go about addressing this. One option is to run a fixed number of Raft groups and use a consistent hash to map a topic to a group. This can work well if we know roughly the number of topics beforehand since we can size the number of Raft groups accordingly. If you expect only 10 topics, running 10 Raft groups is probably reasonable. But if you expect 10,000 topics, you probably don’t want 10,000 Raft groups. If hashing is consistent, it would be feasible to dynamically add or remove Raft groups at runtime, but it would still require repartitioning a portion of topics which can be complicated.

Another option is to run an entire node’s worth of topics as a single group using a layer on top of Raft. This is what CockroachDB does to scale Raft in proportion to the number of key ranges using a layer on top of Raft they call MultiRaft. This requires some cooperation from the Raft implementation, so it’s a bit more involved than the partitioning technique but eschews the repartitioning problem and redundant heartbeating.

The second challenge with using Raft for this problem is the issue of “dual writes.” As mentioned before, there are really two logs: the Raft log and the NATS Streaming message log, which we’ll call the “store.” When a message is published, the leader writes it to its Raft log and it goes through the Raft replication process.

Once the message is committed in Raft, it’s written to the NATS Streaming log and the message is now visible to consumers.

Note, however, that not only messages are written to the Raft log. We also have subscriptions and cluster topology changes, for instance. These other items are not written to the NATS Streaming log but handled in other ways on commit. That said, messages tend to occur in much greater volume than these other entries.

Messages end up getting stored redundantly, once in the Raft log and once in the NATS Streaming log. We can address this problem if we think about our logs a bit differently. If you recall from part one, our log storage consists of two parts: the log segment and the log index. The segment stores the actual log data, and the index stores a mapping from log offset to position in the segment.

Along these lines, we can think of the Raft log index as a “physical offset” and the NATS Streaming log index as a “logical offset.” Instead of maintaining two logs, we treat the Raft log as our message write-ahead log and treat the NATS Streaming log as an index into that WAL. Particularly, messages are written to the Raft log as usual. Once committed, we write an index entry for the message offset that points back into the log. As before, we use the index to do lookups into the log and can then read sequentially from the log itself.

Remaining Questions

We’ve answered the questions of how to ensure continuity of reads and writes, how to replicate data, and how to ensure replicas are consistent. The remaining two questions pertaining to replication are how do we keep things fast and how do we ensure data is durable?

There are several things we can do with respect to performance. The first is we can configure publisher acks depending on our application’s requirements. Specifically, we have three options. The first is the broker acks on commit. This is slow but safe as it guarantees the data is replicated. The second is the broker acks on appending to its local log. This is fast but unsafe since it doesn’t wait on any replica roundtrips but, by that very fact, means that the data is not replicated. If the leader crashes, the message could be lost. Lastly, the publisher can just not wait for an ack at all. This is the fastest but least safe option for obvious reasons. Tuning this all depends on what requirements and trade-offs make sense for your application.

The second thing we do is don’t explicitly fsync writes on the broker and instead rely on replication for durability. Both Kafka and NATS Streaming (when clustered) do this. With fsync enabled (in Kafka, this is configured with flush.messages and/or flush.ms and in NATS Streaming, with file_sync), every message that gets published results in a sync to disk. This ends up being very expensive. The thought here is if we are replicating to enough nodes, the replication itself is sufficient for HA of data since the likelihood of more than a quorum of nodes failing is low, especially if we are using rack-aware clustering. Note that data is still periodically flushed in the background by the kernel.

Batching aggressively is also a key part of ensuring good performance. Kafka supports end-to-end batching from the producer all the way to the consumer. NATS Streaming does not currently support batching at the API level, but it uses aggressive batching when replicating and persisting messages. In my experience, this makes about an order-of-magnitude improvement in throughput.

Finally, as already discussed earlier in the series, keeping disk access sequential and maximizing zero-copy reads makes a big difference as well.

There are a few things worth noting with respect to durability. Quorum is what guarantees durability of data. This comes “for free” with Raft due to the nature of that protocol. In Kafka, we need to do a bit of configuring to ensure this. Namely, we need to configure min.insync.replicas on the broker and acks on the producer. The former controls the minimum number of replicas that must acknowledge a write for it to be considered successful when a producer sets acks to “all.” The latter controls the number of acknowledgments the producer requires the leader to have received before considering a request complete. For example, with a topic that has a replication factor of three, min.insync.replicas needs to be set to two and acks set to “all.” This will, in effect, require a quorum of two replicas to process writes.

Another caveat with Kafka is unclean leader elections. That is, if all replicas become unavailable, there are two options: choose the first replica to come back to life (not necessarily in the ISR) and elect this replica as leader (which could result in data loss) or wait for a replica in the ISR to come back to life and elect it as leader (which could result in prolonged unavailability). By default, Kafka favors availability by choosing the second strategy. If you prefer consistency, you must set unclean.leader.election.enable to false.

Fundamentally, durability and consistency are at odds with availability. If there is no quorum, then no reads or writes can be accepted and the cluster is unavailable. This is the crux of the CAP theorem.

In part three of this series, we will discuss scaling message delivery in the distributed log.

Understanding Consensus

A classical problem presented within the field of distributed systems is the Byzantine Generals Problem. In it, we observe two allied armies positioned on either side of a valley. Within the valley is a fortified city. Each army has a general with one acting as commander. Both armies must attack at the same time or face defeat by the city’s defenders. In order to come to an agreement on when to attack, messengers must be sent through the valley, risking capture by the city’s patrols. Consider the diagram below illustrating this problem.

byzantine_generals

In the above scenario, Army A has sent a messenger to Army B with a message saying “Attack at 0700.” Army B receives this message and dispatches a messenger carrying an acknowledgement of the attack plans; however, our ill-fated messenger has been intercepted by the city’s defenders.

How do our armies come to an agreement on when to attack? Perhaps Army A sends 100 messengers and attacks regardless. Unfortunately, if all of the messengers are captured, this would result in a swift defeat because A would attack without B. What if, instead, A sends 100 messengers, waits for acknowledgements of those messages, and only attacks if it reaches a certain level of confidence, say receiving 75 or more confirmations? Yet again, this could very well end in defeat, this time with B attacking without A.

We also need to bear in mind that sending messages has a certain amount of overhead. We can’t, in good conscience, send a million messengers to their potential demise. Or maybe we can, but it’s more than the number of soldiers in our army.

In fact, we can’t reliably make a decision. It’s provenly impossible. In the face of a Byzantine failure, it becomes even more complicated by the possibility of traitors or forged messages.

Now replace two generals with N generals. Coming to a perfectly reliable agreement between two generals was already impossible but becomes dramatically more complicated. It’s a problem more commonly referred to as distributed consensus, and it’s the focus of an army of researchers.

The problem of consensus is blissfully simple, but the solution is far from trivial. Consensus is the basis of distributed coordination services, locking protocols, and databases. A monolithic system (think a MySQL server) can enforce ACID constraints with consistent reads but exhibits generally poor availability and fault tolerance. The original Google App Engine datastore relied on a master/slave architecture where a single data center held the primary copy of data which was replicated to backup sites asynchronously. This offered applications strong consistency and low latency with the implied trade-off of availability. The health of an application was directly tied to the health of a data center. Beyond transient losses, it also meant periods of planned unavailability and read-only access while Google performed data center maintenance. App Engine has since transitioned to a high-replication datastore which relies on distributed consensus to replicate data across sites. This allows the datastore to continue operating in the presence of failures and at greater availability. In agreement with CAP, this naturally means higher latency on writes.

There are a number of solutions to distributed consensus, but most of them tend to be pretty characteristic of each other. We will look at some of these solutions, including multi-phase commit and state-replication approaches.

Two-Phase Commit

Two-phase commit (2PC) is the simplest multi-phase commit protocol. In two-phase commit, all transactions go through a coordinator who is responsible for ensuring a transaction occurs across one or more remote sites (cohorts).

2pc

When the coordinator receives a request, it asks each of its cohorts to vote yes or no. During this phase, each cohort performs the transaction up to the point of committing it. The coordinator then waits for all votes. If the vote is unanimously “yes,” it sends a message to its cohorts to commit the transaction. If one or more vote is “no,” a message is sent to rollback. The cohorts then acknowledge whether the transaction was committed or rolled back and the process is complete.

Two-phase commit is a blocking protocol. The coordinator blocks waiting for votes from its cohorts, and cohorts block waiting for a commit/rollback message from the coordinator. Unfortunately, this means 2PC can, in some circumstances, result in a deadlock, e.g. the coordinator dies while cohorts wait or a cohort dies while the coordinator waits. Another problematic scenario is when a coordinator and cohort simultaneously fail. Even if another coordinator takes its place, it won’t be able to determine whether to commit or rollback.

Three-Phase Commit

Three-phase commit (3PC) is designed to solve the problems identified in two-phase by implementing a non-blocking protocol with an added “prepare” phase. Like 2PC, it relies on a coordinator which relays messages to its cohorts.

3pc

Unlike 2PC, cohorts do not execute a transaction during the voting phase. Rather, they simply indicate if they are prepared to perform the transaction. If cohorts timeout during this phase or there is one or more “no” vote, the transaction is aborted. If the vote is unanimously “yes,” the coordinator moves on to the “prepare” phase, sending a message to its cohorts to acknowledge the transaction will be committed. Again, if an ack times out, the transaction is aborted. Once all cohorts have acknowledged the commit, we are guaranteed to be in a state where all cohorts have agreed to commit. At this point, if the commit message from the coordinator is not received in the third phase, the cohort will go ahead and commit anyway. This solves the deadlocking problems described earlier. However, 3PC is still susceptible to network partitions. If a partition occurs, the coordinator will timeout and progress will not be made.

State Replication

Protocols like Raft, Paxos, and Zab are popular and widely used solutions to the problem of distributed consensus. These implement state replication or primary-backup using leaders, quorums, and replicas of operation logs or incremental delta states.

consensus quorum

These protocols work by electing a leader (coordinator). Like multi-phase commit, all changes must go through that leader, who then broadcasts the changes to the group. Changes occur by appending a log entry, and each node has its own log replica. Where multi-phase commit falls down in the face of network partitions, these protocols are able to continue working by relying on a quorum (majority). The leader commits the change once the quorum has acknowledged it.

The use of quorums provide partition tolerance by fencing minority partitions while the majority continues to operate. This is the pessimistic approach to solving split-brain, so it comes with an inherent availability trade-off. This problem is mitigated by the fact that each node hosts a replicated state machine which can be rebuilt or reconciled once the partition is healed.

Google relies on Paxos for its high-replication datastore in App Engine as well as its Chubby lock service. The distributed key-value store etcd uses Raft to manage highly available replicated logs. Zab, which differentiates itself from the former by implementing a primary-backup protocol, was designed for the ZooKeeper coordination service. In general, there are several different implementations of these protocols, such as the Go implementation of Raft.

Distributed consensus is a difficult thing to get right, but it’s important to frame it within the context of CAP. We can ensure stronger consistency at the cost of higher latency and lower availability. On the other hand, we can achieve higher availability with decreased latency while giving up strong consistency. The trade-offs really depend on what your needs are.