Building a Distributed Log from Scratch, Part 2: Data Replication

In part one of this series we introduced the idea of a message log, touched on why it’s useful, and discussed the storage mechanics behind it. In part two, we discuss data replication.

We have our log. We know how to write data to it and read it back as well as how data is persisted. The caveat to this is, although we have a durable log, it’s a single point of failure (SPOF). If the machine where the log data is stored dies, we’re SOL. Recall that one of our three priorities with this system is high availability, so the question is how do we achieve high availability and fault tolerance?

With high availability, we’re specifically talking about ensuring continuity of reads and writes. A server failing shouldn’t preclude either of these, or at least unavailability should be kept to an absolute minimum and without the need for operator intervention. Ensuring this continuity should be fairly obvious: we eliminate the SPOF. To do that, we replicate the data. Replication can also be a means for increasing scalability, but for now we’re only looking at this through the lens of high availability.

There are a number of ways we can go about replicating the log data. Broadly speaking, we can group the techniques into two different categories: gossip/multicast protocols and consensus protocols. The former includes things like epidemic broadcast trees, bimodal multicast, SWIM, HyParView, and NeEM. These tend to be eventually consistent and/or stochastic. The latter, which I’ve described in more detail here, includes 2PC/3PC, Paxos, Raft, Zab, and chain replication. These tend to favor strong consistency over availability.

So there are a lot of ways we can replicate data, but some of these solutions are better suited than others to this particular problem. Since ordering is an important property of a log, consistency becomes important for a replicated log. If we read from one replica and then read from another, it’s important those views of the log don’t conflict with each other. This more or less rules out the stochastic and eventually consistent options, leaving us with consensus-based replication.

There are essentially two components to consensus-based replication schemes: 1) designate a leader who is responsible for sequencing writes and 2) replicate the writes to the rest of the cluster.

Designating a leader can be as simple as a configuration setting, but the purpose of replication is fault tolerance. If our configured leader crashes, we’re no longer able to accept writes. This means we need the leader to be dynamic. It turns out leader election is a well-understood problem, so we’ll get to this in a bit.

Once a leader is established, it needs to replicate the data to followers. In general, this can be done by either waiting for all replicas or waiting for only a quorum (majority) of replicas. There are pros and cons to both approaches.

Pros Cons
All Replicas Tolerates f failures with f+1 replicas Latency pegged to slowest replica
Quorum Hides delay from a slow replica Tolerates f failures with 2f+1 replicas

Waiting on all replicas means we can make progress as long as at least one replica is available. With quorum, tolerating the same amount of failures requires more replicas because we need a majority to make progress. The trade-off is that the quorum hides any delays from a slow replica. Kafka is an example of a system which uses all replicas (with some conditions on this which we will see later), and NATS Streaming is one that uses a quorum. Let’s take a look at both in more detail.

Replication in Kafka

In Kafka, a leader is selected (we’ll touch on this in a moment). This leader maintains an in-sync replica set (ISR) consisting of all the replicas which are fully caught up with the leader. This is every replica, by definition, at the beginning. All reads and writes go through the leader. The leader writes messages to a write-ahead log (WAL). Messages written to the WAL are considered uncommitted or “dirty” initially. The leader only commits a message once all replicas in the ISR have written it to their own WAL. The leader also maintains a high-water mark (HW) which is the last committed message in the WAL. This gets piggybacked on the replica fetch responses from which replicas periodically checkpoint to disk for recovery purposes. The piggybacked HW then allows replicas to know when to commit.

Only committed messages are exposed to consumers. However, producers can configure how they want to receive acknowledgements on writes. It can wait until the message is committed on the leader (and thus replicated to the ISR), wait for the message to only be written (but not committed) to the leader’s WAL, or not wait at all. This all depends on what trade-offs the producer wants to make between latency and durability.

The graphic below shows how this replication process works for a cluster of three brokers: b1, b2, and b3. Followers are effectively special consumers of the leader’s log.

Now let’s look at a few failure modes and how Kafka handles them.

Leader Fails

Kafka relies on Apache ZooKeeper for certain cluster coordination tasks, such as leader election, though this is not actually how the log leader is elected. A Kafka cluster has a single controller broker whose election is handled by ZooKeeper. This controller is responsible for performing administrative tasks on the cluster. One of these tasks is selecting a new log leader (actually partition leader, but this will be described later in the series) from the ISR when the current leader dies. ZooKeeper is also used to detect these broker failures and signal them to the controller.

Thus, when the leader crashes, the cluster controller is notified by ZooKeeper and it selects a new leader from the ISR and announces this to the followers. This gives us automatic failover of the leader. All committed messages up to the HW are preserved and uncommitted messages may be lost during the failover. In this case, b1 fails and b2 steps up as leader.

Follower Fails

The leader tracks information on how “caught up” each replica is. Before Kafka 0.9, this included both how many messages a replica was behind, replica.lag.max.messages, and the amount of time since the replica last fetched messages from the leader, replica.lag.time.max.ms. Since 0.9, replica.lag.max.messages was removed and replica.lag.time.max.ms now refers to both the time since the last fetch request and the amount of time since the replica last caught up.

Thus, when a follower fails (or stops fetching messages for whatever reason), the leader will detect this based on replica.lag.time.max.ms. After that time expires, the leader will consider the replica out of sync and remove it from the ISR. In this scenario, the cluster enters an “under-replicated” state since the ISR has shrunk. Specifically, b2 fails and is removed from the ISR.

Follower Temporarily Partitioned

The case of a follower being temporarily partitioned, e.g. due to a transient network failure, is handled in a similar fashion to the follower itself failing. These two failure modes can really be combined since the latter is just the former with an arbitrarily long partition, i.e. it’s the difference between crash-stop and crash-recovery models.

In this case, b3 is partitioned from the leader. As before, replica.lag.time.max.ms acts as our failure detector and causes b3 to be removed from the ISR. We enter an under-replicated state and the remaining two brokers continue committing messages 4 and 5. Accordingly, the HW is updated to 5 on these brokers.

When the partition heals, b3 continues reading from the leader and catching up. Once it is fully caught up with the leader, it’s added back into the ISR and the cluster resumes its fully replicated state.

We can generalize this to the crash-recovery model. For example, instead of a network partition, the follower could crash and be restarted later. When the failed replica is restarted, it recovers the HW from disk and truncates its log up to the HW. This preserves the invariant that messages after the HW are not guaranteed to be committed. At this point, it can begin catching up from the leader and will end up with a log consistent with the leader’s once fully caught up.

Replication in NATS Streaming

NATS Streaming relies on the Raft consensus algorithm for leader election and data replication. This sometimes comes as a surprise to some as Raft is largely seen as a protocol for replicated state machines. We’ll try to understand why Raft was chosen for this particular problem in the following sections. We won’t dive deep into Raft itself beyond what is needed for the purposes of this discussion.

While a log is a state machine, it’s a very simple one: a series of appends. Raft is frequently used as the replication mechanism for key-value stores which have a clearer notion of “state machine.” For example, with a key-value store, we have set and delete operations. If we set foo = bar and then later set foo = baz, the state gets rolled up. That is, we don’t necessarily care about the provenance of the key, only its current state.

However, NATS Streaming differs from Kafka in a number of key ways. One of these differences is that NATS Streaming attempts to provide a sort of unified API for streaming and queueing semantics not too dissimilar from Apache Pulsar. This means, while it has a notion of a log, it also has subscriptions on that log. Unlike Kafka, NATS Streaming tracks these subscriptions and metadata associated with them, such as where a client is in the log. These have definite “state machines” affiliated with them, like creating and deleting subscriptions, positions in the log, clients joining or leaving queue groups, and message-redelivery information.

Currently, NATS Streaming uses multiple Raft groups for replication. There is a single metadata Raft group used for replicating client state and there is a separate Raft group per topic which replicates messages and subscriptions.

Raft solves both the problems of leader election and data replication in a single protocol. The Secret Lives of Data provides an excellent interactive illustration of how this works. As you step through that illustration, you’ll notice that the algorithm is actually quite similar to the Kafka replication protocol we walked through earlier. This is because although Raft is used to implement replicated state machines, it actually is a replicated WAL, which is exactly what Kafka is. One benefit of using Raft is we no longer have the need for ZooKeeper or some other coordination service.

Raft handles electing a leader. Heartbeats are used to maintain leadership. Writes flow through the leader to the followers. The leader appends writes to its WAL and they are subsequently piggybacked onto the heartbeats which get sent to the followers using AppendEntries messages. At this point, the followers append the write to their own WALs, assuming they don’t detect a gap, and send a response back to the leader. The leader commits the write once it receives a successful response from a quorum of followers.

Similar to Kafka, each replica in Raft maintains a high-water mark of sorts called the commit index, which is the index of the highest log entry known to be committed. This is piggybacked on the AppendEntries messages which the followers use to know when to commit entries in their WALs. If a follower detects that it missed an entry (i.e. there was a gap in the log), it rejects the AppendEntries and informs the leader to rewind the replication. The Raft paper details how it ensures correctness, even in the face of many failure modes such as the ones described earlier.

Conceptually, there are two logs: the Raft log and the NATS Streaming message log. The Raft log handles replicating messages and, once committed, they are appended to the NATS Streaming log. If it seems like there’s some redundancy here, that’s because there is, which we’ll get to soon. However, keep in mind we’re not just replicating the message log, but also the state machines associated with the log and any clients.

There are a few challenges with this replication technique, two of which we will talk about. The first is scaling Raft. With a single topic, there is one Raft group, which means one node is elected leader and it heartbeats messages to followers.

As the number of topics increases, so do the number of Raft groups, each with their own leaders and heartbeats. Unless we constrain the Raft group participants or the number of topics, this creates an explosion of network traffic between nodes.

There are a couple ways we can go about addressing this. One option is to run a fixed number of Raft groups and use a consistent hash to map a topic to a group. This can work well if we know roughly the number of topics beforehand since we can size the number of Raft groups accordingly. If you expect only 10 topics, running 10 Raft groups is probably reasonable. But if you expect 10,000 topics, you probably don’t want 10,000 Raft groups. If hashing is consistent, it would be feasible to dynamically add or remove Raft groups at runtime, but it would still require repartitioning a portion of topics which can be complicated.

Another option is to run an entire node’s worth of topics as a single group using a layer on top of Raft. This is what CockroachDB does to scale Raft in proportion to the number of key ranges using a layer on top of Raft they call MultiRaft. This requires some cooperation from the Raft implementation, so it’s a bit more involved than the partitioning technique but eschews the repartitioning problem and redundant heartbeating.

The second challenge with using Raft for this problem is the issue of “dual writes.” As mentioned before, there are really two logs: the Raft log and the NATS Streaming message log, which we’ll call the “store.” When a message is published, the leader writes it to its Raft log and it goes through the Raft replication process.

Once the message is committed in Raft, it’s written to the NATS Streaming log and the message is now visible to consumers.

Note, however, that not only messages are written to the Raft log. We also have subscriptions and cluster topology changes, for instance. These other items are not written to the NATS Streaming log but handled in other ways on commit. That said, messages tend to occur in much greater volume than these other entries.

Messages end up getting stored redundantly, once in the Raft log and once in the NATS Streaming log. We can address this problem if we think about our logs a bit differently. If you recall from part one, our log storage consists of two parts: the log segment and the log index. The segment stores the actual log data, and the index stores a mapping from log offset to position in the segment.

Along these lines, we can think of the Raft log index as a “physical offset” and the NATS Streaming log index as a “logical offset.” Instead of maintaining two logs, we treat the Raft log as our message write-ahead log and treat the NATS Streaming log as an index into that WAL. Particularly, messages are written to the Raft log as usual. Once committed, we write an index entry for the message offset that points back into the log. As before, we use the index to do lookups into the log and can then read sequentially from the log itself.

Remaining Questions

We’ve answered the questions of how to ensure continuity of reads and writes, how to replicate data, and how to ensure replicas are consistent. The remaining two questions pertaining to replication are how do we keep things fast and how do we ensure data is durable?

There are several things we can do with respect to performance. The first is we can configure publisher acks depending on our application’s requirements. Specifically, we have three options. The first is the broker acks on commit. This is slow but safe as it guarantees the data is replicated. The second is the broker acks on appending to its local log. This is fast but unsafe since it doesn’t wait on any replica roundtrips but, by that very fact, means that the data is not replicated. If the leader crashes, the message could be lost. Lastly, the publisher can just not wait for an ack at all. This is the fastest but least safe option for obvious reasons. Tuning this all depends on what requirements and trade-offs make sense for your application.

The second thing we do is don’t explicitly fsync writes on the broker and instead rely on replication for durability. Both Kafka and NATS Streaming (when clustered) do this. With fsync enabled (in Kafka, this is configured with flush.messages and/or flush.ms and in NATS Streaming, with file_sync), every message that gets published results in a sync to disk. This ends up being very expensive. The thought here is if we are replicating to enough nodes, the replication itself is sufficient for HA of data since the likelihood of more than a quorum of nodes failing is low, especially if we are using rack-aware clustering. Note that data is still periodically flushed in the background by the kernel.

Batching aggressively is also a key part of ensuring good performance. Kafka supports end-to-end batching from the producer all the way to the consumer. NATS Streaming does not currently support batching at the API level, but it uses aggressive batching when replicating and persisting messages. In my experience, this makes about an order-of-magnitude improvement in throughput.

Finally, as already discussed earlier in the series, keeping disk access sequential and maximizing zero-copy reads makes a big difference as well.

There are a few things worth noting with respect to durability. Quorum is what guarantees durability of data. This comes “for free” with Raft due to the nature of that protocol. In Kafka, we need to do a bit of configuring to ensure this. Namely, we need to configure min.insync.replicas on the broker and acks on the producer. The former controls the minimum number of replicas that must acknowledge a write for it to be considered successful when a producer sets acks to “all.” The latter controls the number of acknowledgments the producer requires the leader to have received before considering a request complete. For example, with a topic that has a replication factor of three, min.insync.replicas needs to be set to two and acks set to “all.” This will, in effect, require a quorum of two replicas to process writes.

Another caveat with Kafka is unclean leader elections. That is, if all replicas become unavailable, there are two options: choose the first replica to come back to life (not necessarily in the ISR) and elect this replica as leader (which could result in data loss) or wait for a replica in the ISR to come back to life and elect it as leader (which could result in prolonged unavailability). Initially, Kafka favored availability by default by choosing the first strategy. If you preferred consistency, you needed to set unclean.leader.election.enable to false. However, as of 0.11, unclean.leader.election.enable now defaults to this.

Fundamentally, durability and consistency are at odds with availability. If there is no quorum, then no reads or writes can be accepted and the cluster is unavailable. This is the crux of the CAP theorem.

In part three of this series, we will discuss scaling message delivery in the distributed log.

From the Ground Up: Reasoning About Distributed Systems in the Real World

The rabbit hole is deep. Down and down it goes. Where it ends, nobody knows. But as we traverse it, patterns appear. They give us hope, they quell the fear.

Distributed systems literature is abundant, but as a practitioner, I often find it difficult to know where to start or how to synthesize this knowledge without a more formal background. This is a non-academic’s attempt to provide a line of thought for rationalizing design decisions. This piece doesn’t necessarily contribute any new ideas but rather tries to provide a holistic framework by studying some influential existing ones. It includes references which provide a good starting point for thinking about distributed systems. Specifically, we look at a few formal results and slightly less formal design principles to provide a basis from which we can argue about system design.

This is your last chance. After this, there is no turning back. I wish I could say there is no red-pill/blue-pill scenario at play here, but the world of distributed systems is complex. In order to make sense of it, we reason from the ground up while simultaneously stumbling down the deep and cavernous rabbit hole.

Guiding Principles

In order to reason about distributed system design, it’s important to lay out some guiding principles or theorems used to establish an argument. Perhaps the most fundamental of which is the Two Generals Problem originally introduced by Akkoyunlu et al. in Some Constraints and Trade-offs in the Design of Network Communications and popularized by Jim Gray in Notes on Data Base Operating Systems in 1975 and 1978, respectively. The Two Generals Problem demonstrates that it’s impossible for two processes to agree on a decision over an unreliable network. It’s closely related to the binary consensus problem (“attack” or “don’t attack”) where the following conditions must hold:

  • Termination: all correct processes decide some value (liveness property).
  • Validity: if all correct processes decide v, then v must have been proposed by some correct process (non-triviality property).
  • Integrity: all correct processes decide at most one value v, and is the “right” value (safety property).
  • Agreement: all correct processes must agree on the same value (safety property).

It becomes quickly apparent that any useful distributed algorithm consists of some intersection of both liveness and safety properties. The problem becomes more complicated when we consider an asynchronous network with crash failures:

  • Asynchronous: messages may be delayed arbitrarily long but will eventually be delivered.
  • Crash failure: processes can halt indefinitely.

Considering this environment actually leads us to what is arguably one of the most important results in distributed systems theory: the FLP impossibility result introduced by Fischer, Lynch, and Patterson in their 1985 paper Impossibility of Distributed Consensus with One Faulty Process. This result shows that the Two Generals Problem is provably impossible. When we do not consider an upper bound on the time a process takes to complete its work and respond in a crash-failure model, it’s impossible to make the distinction between a process that is crashed and one that is taking a long time to respond. FLP shows there is no algorithm which deterministically solves the consensus problem in an asynchronous environment when it’s possible for at least one process to crash. Equivalently, we say it’s impossible to have a perfect failure detector in an asynchronous system with crash failures.

When talking about fault-tolerant systems, it’s also important to consider Byzantine faults, which are essentially arbitrary faults. These include, but are not limited to, attacks which might try to subvert the system. For example, a security attack might try to generate or falsify messages. The Byzantine Generals Problem is a generalized version of the Two Generals Problem which describes this fault model. Byzantine fault tolerance attempts to protect against these threats by detecting or masking a bounded number of Byzantine faults.

Why do we care about consensus? The reason is it’s central to so many important problems in system design. Leader election implements consensus allowing you to dynamically promote a coordinator to avoid single points of failure. Distributed databases implement consensus to ensure data consistency across nodes. Message queues implement consensus to provide transactional or ordered delivery. Distributed init systems implement consensus to coordinate processes. Consensus is fundamentally an important problem in distributed programming.

It has been shown time and time again that networks, whether local-area or wide-area, are often unreliable and largely asynchronous. As a result, these proofs impose real and significant challenges to system design.

The implications of these results are not simply academic: these impossibility results have motivated a proliferation of systems and designs offering a range of alternative guarantees in the event of network failures.

L. Peter Deutsch’s fallacies of distributed computing are a key jumping-off point in the theory of distributed systems. It presents a set of incorrect assumptions which many new to the space frequently make, of which the first is “the network is reliable.”

  1. The network is reliable.
  2. Latency is zero.
  3. Bandwidth is infinite.
  4. The network is secure.
  5. Topology doesn’t change.
  6. There is one administrator.
  7. Transport cost is zero.
  8. The network is homogeneous.

The CAP theorem, while recently the subject of scrutiny and debate over whether it’s overstated or not, is a useful tool for establishing fundamental trade-offs in distributed systems and detecting vendor sleight of hand. Gilbert and Lynch’s Perspectives on the CAP Theorem lays out the intrinsic trade-off between safety and liveness in a fault-prone system, while Fox and Brewer’s Harvest, Yield, and Scalable Tolerant Systems characterizes it in a more pragmatic light. I will continue to say unequivocally that the CAP theorem is important within the field of distributed systems and of significance to system designers and practitioners.

A Renewed Hope

Following from the results detailed earlier would imply many distributed algorithms, including those which implement linearizable operations, serializable transactions, and leader election, are a hopeless endeavor. Is it game over? Fortunately, no. Carefully designed distributed systems can maintain correctness without relying on pure coincidence.

First, it’s important to point out that the FLP result does not indicate consensus is unreachable, just that it’s not always reachable in bounded time. Second, the system model FLP uses is, in some ways, a pathological one. Synchronous systems place a known upper bound on message delivery between processes and on process computation. Asynchronous systems have no fixed upper bounds. In practice, systems tend to exhibit partial synchrony, which is described as one of two models by Dwork and Lynch in Consensus in the Presence of Partial Synchrony. In the first model of partial synchrony, fixed bounds exist but they are not known a priori. In the second model, the bounds are known but are only guaranteed to hold starting at unknown time T. Dwork and Lynch present fault-tolerant consensus protocols for both partial-synchrony models combined with various fault models.

Chandra and Toueg introduce the concept of unreliable failure detectors in Unreliable Failure Detectors for Reliable Distributed Systems. Each process has a local, external failure detector which can make mistakes. The detector monitors a subset of the processes in the system and maintains a list of those it suspects to have crashed. Failures are detected by simply pinging each process periodically and suspecting any process which doesn’t respond to the ping within twice the maximum round-trip time for any previous ping. The detector makes a mistake when it erroneously suspects a correct process, but it may later correct the mistake by removing the process from its list of suspects. The presence of failure detectors, even unreliable ones, makes consensus solvable in a slightly relaxed system model.

While consensus ensures processes agree on a value, atomic broadcast ensures processes deliver the same messages in the same order. This same paper shows that the problems of consensus and atomic broadcast are reducible to each other, meaning they are equivalent. Thus, the FLP result and others apply equally to atomic broadcast, which is used in coordination services like Apache ZooKeeper.

In Introduction to Reliable and Secure Distributed Programming, Cachin, Guerraoui, and Rodrigues suggest most practical systems can be described as partially synchronous:

Generally, distributed systems appear to be synchronous. More precisely, for most systems that we know of, it is relatively easy to define physical time bounds that are respected most of the time. There are, however, periods where the timing assumptions do not hold, i.e., periods during which the system is asynchronous. These are periods where the network is overloaded, for instance, or some process has a shortage of memory that slows it down. Typically, the buffer that a process uses to store incoming and outgoing messages may overflow, and messages may thus get lost, violating the time bound on the delivery. The retransmission of the messages may help ensure the reliability of the communication links but introduce unpredictable delays. In this sense, practical systems are partially synchronous.

We capture partial synchrony by assuming timing assumptions only hold eventually without stating exactly when. Similarly, we call the system eventually synchronous. However, this does not guarantee the system is synchronous forever after a certain time, nor does it require the system to be initially asynchronous then after a period of time become synchronous. Instead it implies the system has periods of asynchrony which are not bounded, but there are periods where the system is synchronous long enough for an algorithm to do something useful or terminate. The key thing to remember with asynchronous systems is that they contain no timing assumptions.

Lastly, On the Minimal Synchronism Needed for Distributed Consensus by Dolev, Dwork, and Stockmeyer describes a consensus protocol as t-resilient if it operates correctly when at most t processes fail. In the paper, several critical system parameters and synchronicity conditions are identified, and it’s shown how varying them affects the t-resiliency of an algorithm. Consensus is shown to be provably possible for some models and impossible for others.

Fault-tolerant consensus is made possible by relying on quorums. The intuition is that as long as a majority of processes agree on every decision, there is at least one process which knows about the complete history in the presence of faults.

Deterministic consensus, and by extension a number of other useful algorithms, is impossible in certain system models, but we can model most real-world systems in a way that circumvents this. Nevertheless, it shows the inherent complexities involved with distributed systems and the rigor needed to solve certain problems.

Theory to Practice

What does all of this mean for us in practice? For starters, it means distributed systems are usually a harder problem than they let on. Unfortunately, this is often the cause of improperly documented trade-offs or, in many cases, data loss and safety violations. It also suggests we need to rethink the way we design systems by shifting the focus from system properties and guarantees to business rules and application invariants.

One of my favorite papers is End-To-End Arguments in System Design by Saltzer, Reed, and Clark. It’s an easy read, but it presents a compelling design principle for determining where to place functionality in a distributed system. The principle idea behind the end-to-end argument is that functions placed at a low level in a system may be redundant or of little value when compared to the cost of providing them at that low level. It follows that, in many situations, it makes more sense to flip guarantees “inside out”—pushing them outwards rather than relying on subsystems, middleware, or low-level layers of the stack to maintain them.

To illustrate this, we consider the problem of “careful file transfer.” A file is stored by a file system on the disk of computer A, which is linked by a communication network to computer B. The goal is to move the file from computer A’s storage to computer B’s storage without damage and in the face of various failures along the way. The application in this case is the file-transfer program which relies on storage and network abstractions. We can enumerate just a few of the potential problems an application designer might be concerned with:

  1. The file, though originally written correctly onto the disk at host A, if read now may contain incorrect data, perhaps because of hardware faults in the disk storage system.
  2. The software of the file system, the file transfer program, or the data communication system might make a mistake in buffering and copying the data of the file, either at host A or host B.
  3. The hardware processor or its local memory might have a transient error while doing the buffering and copying, either at host A or host B.
  4. The communication system might drop or change the bits in a packet, or lose a packet or deliver a packet more than once.
  5. Either of the hosts may crash part way through the transaction after performing an unknown amount (perhaps all) of the transaction.

Many of these problems are Byzantine in nature. When we consider each threat one by one, it becomes abundantly clear that even if we place countermeasures in the low-level subsystems, there will still be checks required in the high-level application. For example, we might place checksums, retries, and sequencing of packets in the communication system to provide reliable data transmission, but this really only eliminates threat four. An end-to-end checksum and retry mechanism at the file-transfer level is needed to guard against the remaining threats.

Building reliability into the low level has a number of costs involved. It takes a non-trivial amount of effort to build it. It’s redundant and, in fact, hinders performance by reducing the frequency of application retries and adding unneeded overhead. It also has no actual effect on correctness because correctness is determined and enforced by the end-to-end checksum and retries. The reliability and correctness of the communication system is of little importance, so going out of its way to ensure resiliency does not reduce any burden on the application. In fact, ensuring correctness by relying on the low level might be altogether impossible since threat number two requires writing correct programs, but not all programs involved may be written by the file-transfer application programmer.

Fundamentally, there are two problems with placing functionality at the lower level. First, the lower level is not aware of the application needs or semantics, which means logic placed there is often insufficient. This leads to duplication of logic as seen in the example earlier. Second, other applications which rely on the lower level pay the cost of the added functionality even when they don’t necessarily need it.

Saltzer, Reed, and Clark propose the end-to-end principle as a sort of “Occam’s razor” for system design, arguing that it helps guide the placement of functionality and organization of layers in a system.

Because the communication subsystem is frequently specified before applications that use the subsystem are known, the designer may be tempted to “help” the users by taking on more function than necessary. Awareness of end-to end arguments can help to reduce such temptations.

However, it’s important to note that the end-to-end principle is not a panacea. Rather, it’s a guideline to help get designers to think about their solutions end to end, acknowledge their application requirements, and consider their failure modes. Ultimately, it provides a rationale for moving function upward in a layered system, closer to the application that uses the function, but there are always exceptions to the rule. Low-level mechanisms might be built as a performance optimization. Regardless, the end-to-end argument contends that lower levels should avoid taking on any more responsibility than necessary. The “lessons” section from Google’s Bigtable paper echoes some of these same sentiments:

Another lesson we learned is that it is important to delay adding new features until it is clear how the new features will be used. For example, we initially planned to support general-purpose transactions in our API. Because we did not have an immediate use for them, however, we did not implement them. Now that we have many real applications running on Bigtable, we have been able to examine their actual needs, and have discovered that most applications require only single-row transactions. Where people have requested distributed transactions, the most important use is for maintaining secondary indices, and we plan to add a specialized mechanism to satisfy this need. The new mechanism will be less general than distributed transactions, but will be more efficient (especially for updates that span hundreds of rows or more) and will also interact better with our scheme for optimistic cross-datacenter replication.

We’ll see the end-to-end argument as a common theme throughout the remainder of this piece.

Whose Guarantee Is It Anyway?

Generally, we rely on robust algorithms, transaction managers, and coordination services to maintain consistency and application correctness. The problem with these is twofold: they are often unreliable and they impose a massive performance bottleneck.

Distributed coordination algorithms are difficult to get right. Even tried-and-true protocols like two-phase commit are susceptible to crash failures and network partitions. Protocols which are more fault tolerant like Paxos and Raft generally don’t scale well beyond small clusters or across wide-area networks. Consensus systems like ZooKeeper own your availability, meaning if you depend on one and it goes down, you’re up a creek. Since quorums are often kept small for performance reasons, this might be less rare than you think.

Coordination systems become a fragile and complex piece of your infrastructure, which seems ironic considering they are usually employed to reduce fragility. On the other hand, message-oriented middleware largely use coordination to provide developers with strong guarantees: exactly-once, ordered, transactional delivery and the like.

From transmission protocols to enterprise message brokers, relying on delivery guarantees is an anti-pattern in distributed system design. Delivery semantics are a tricky business. As such, when it comes to distributed messaging, what you want is often not what you need. It’s important to look at the trade-offs involved, how they impact system design (and UX!), and how we can cope with them to make better decisions.

Subtle and not-so-subtle failure modes make providing strong guarantees exceedingly difficult. In fact, some guarantees, like exactly-once delivery, aren’t even really possible to achieve when we consider things like the Two Generals Problem and the FLP result. When we try to provide semantics like guaranteed, exactly-once, and ordered message delivery, we usually end up with something that’s over-engineered, difficult to deploy and operate, fragile, and slow. What is the upside to all of this? Something that makes your life easier as a developer when things go perfectly well, but the reality is things don’t go perfectly well most of the time. Instead, you end up getting paged at 1 a.m. trying to figure out why RabbitMQ told your monitoring everything is awesome while proceeding to take a dump in your front yard.

If you have something that relies on these types of guarantees in production, know that this will happen to you at least once sooner or later (and probably much more than that). Eventually, a guarantee is going to break down. It might be inconsequential, it might not. Not only is this a precarious way to go about designing things, but if you operate at a large scale, care about throughput, or have sensitive SLAs, it’s probably a nonstarter.

The performance implications of distributed transactions are obvious. Coordination is expensive because processes can’t make progress independently, which in turn limits throughput, availability, and scalability. Peter Bailis gave an excellent talk called Silence is Golden: Coordination-Avoiding Systems Design which explains this in great detail and how coordination can be avoided. In it, he explains how distributed transactions can result in nearly a 400x decrease in throughput in certain situations.

Avoiding coordination enables infinite scale-out while drastically improving throughput and availability, but in some cases coordination is unavoidable. In Coordination Avoidance in Database Systems, Bailis et al. answer a key question: when is coordination necessary for correctness? They present a property, invariant confluence (I-confluence), which is necessary and sufficient for safe, coordination-free, available, and convergent execution. I-confluence essentially works by pushing invariants up into the business layer where we specify correctness in terms of application semantics rather than low-level database operations.

Without knowledge of what “correctness” means to your app (e.g., the invariants used in I-confluence), the best you can do to preserve correctness under a read/write model is serializability.

I-confluence can be determined given a set of transactions and a merge function used to reconcile divergent states. If I-confluence holds, there exists a coordination-free execution strategy that preserves invariants. If it doesn’t hold, no such strategy exists—coordination is required. I-confluence allows us to identify when we can and can’t give up coordination, and by pushing invariants up, we remove a lot of potential bottlenecks from areas which don’t require it.

If we recall, “synchrony” within the context of distributed computing is really just making assumptions about time, so synchronization is basically two or more processes coordinating around time. As we saw, a system which performs no coordination will have optimal performance and availability since everyone can proceed independently. However, a distributed system which performs zero coordination isn’t particularly useful or possible as I-confluence shows. Christopher Meiklejohn’s Strange Loop talk, Distributed, Eventually Consistent Computations, provides an interesting take on coordination with the parable of the car. A car requires friction to drive, but that friction is limited to very small contact points. Any other friction on the car causes problems or inefficiencies. If we think about physical time as friction, we know we can’t eliminate it altogether because it’s essential to the problem, but we want to reduce the use of it in our systems as much as possible. We can typically avoid relying on physical time by instead using logical time, for example, with the use of Lamport clocks or other conflict-resolution techniques. Lamport’s Time, Clocks, and the Ordering of Events in a Distributed System is the classical introduction to this idea.

Often, systems simply forgo coordination altogether for latency-sensitive operations, a perfectly reasonable thing to do provided the trade-off is explicit and well-documented. Sadly, this is frequently not the case. But we can do better. I-confluence provides a useful framework for avoiding coordination, but there’s a seemingly larger lesson to be learned here. What it really advocates is reexamining how we design systems, which seems in some ways to closely parallel our end-to-end argument.

When we think low level, we pay the upfront cost of entry—serializable transactions, linearizable reads and writes, coordination. This seems contradictory to the end-to-end principle. Our application doesn’t really care about atomicity or isolation levels or linearizability. It cares about two users sharing the same ID or two reservations booking the same room or a negative balance in a bank account, but the database doesn’t know that. Sometimes these rules don’t even require any expensive coordination.

If all we do is code our business rules and constraints into the language our infrastructure understands, we end up with a few problems. First, we have to know how to translate our application semantics into these low-level operations while avoiding any impedance mismatch. In the context of messaging, guaranteed delivery doesn’t really mean anything to our application which cares about what’s done with the messages. Second, we preclude ourselves from using a lot of generalized solutions and, in some cases, we end up having to engineer specialized ones ourselves. It’s not clear how well this scales in practice. Third, we pay a performance penalty that could otherwise be avoided (as I-confluence shows). Lastly, we put ourselves at the mercy of our infrastructure and hope it makes good on its promises—it often doesn’t.

Working on a messaging platform team, I’ve had countless conversations which resemble the following exchange:

Developer: “We need fast messaging.”
Me: “Is it okay if messages get dropped occasionally?”
Developer: “What? Of course not! We need it to be reliable.”
Me: “Okay, we’ll add a delivery ack, but what happens if your application crashes before it processes the message?”
Developer: “We’ll ack after processing.”
Me: “What happens if you crash after processing but before acking?”
Developer: “We’ll just retry.”
Me: “So duplicate delivery is okay?”
Developer: “Well, it should really be exactly-once.”
Me: “But you want it to be fast?”
Developer: “Yep. Oh, and it should maintain message ordering.”
Me: “Here’s TCP.”

If, instead, we reevaluate the interactions between our systems, their APIs, their semantics, and move some of that responsibility off of our infrastructure and onto our applications, then maybe we can start to build more robust, resilient, and performant systems. With messaging, does our infrastructure really need to enforce FIFO ordering? Preserving order with distributed messaging in the presence of failure while trying to simultaneously maintain high availability is difficult and expensive. Why rely on it when it can be avoided with commutativity? Likewise, transactional delivery requires coordination which is slow and brittle while still not providing application guarantees. Why rely on it when it can be avoided with idempotence and retries? If you need application-level guarantees, build them into the application level. The infrastructure can’t provide it.

I really like Gregor Hohpe’s “Your Coffee Shop Doesn’t Use Two-Phase Commit” because it shows how simple solutions can be if we just model them off of the real world. It gives me hope we can design better systems, sometimes by just turning things on their head. There’s usually a reason things work the way they do, and it often doesn’t even involve the use of computers or complicated algorithms.

Rather than try to hide complexities by using flaky and heavy abstractions, we should engage directly by recognizing them in our design decisions and thinking end to end. It may be a long and winding path to distributed systems zen, but the best place to start is from the beginning.

I’d like to thank Tom Santero for reviewing an early draft of this writing. Any inaccuracies or opinions expressed are mine alone.

You Own Your Availability

There’s been a lot of discussion around “availability” lately. It’s often trumpeted with phrases like “you own your availability,” meaning there is no buck-passing when it comes to service uptime. The AWS outage earlier this week served as a stark reminder that, while owning your availability is a commendable ambition, for many it’s still largely owned by Amazon and the like.

In order to “own” your availability, it’s important to first understand what “availability” really means. Within the context of distributed-systems theory, availability is usually discussed in relation to the CAP theorem. Formally, CAP defines availability as a liveness property: “every request received by a non-failing node in the system must result in a response.” This is a weak definition for two reasons. First, the proviso “every request received by a non-failing node” means that a system in which all nodes have failed is trivially available.  Second, Gilbert and Lynch stipulate no upper bound on latency, only that operations eventually return a response. This means an operation could take weeks to complete and availability would not be violated.

Martin Kleppmann points out these issues in his recent paper “A Critique of the CAP Theorem.” I don’t think there is necessarily a problem with the formalizations made by CAP, just a matter of engineering practicality. Kleppmann’s critique recalls a pertinent quote from Leslie Lamport on the topic of liveness:

Liveness properties are inherently problematic. The question of whether a real system satisfies a liveness property is meaningless; it can be answered only by observing the system for an infinite length of time, and real systems don’t run forever. Liveness is always an approximation to the property we really care about. We want a program to terminate within 100 years, but proving that it does would require the addition of distracting timing assumptions. So, we prove the weaker condition that the program eventually terminates. This doesn’t prove that the program will terminate within our lifetimes, but it does demonstrate the absence of infinite loops.

Despite the pop culture surrounding it, CAP is not meant to neatly classify systems. It’s meant to serve as a jumping-off point from which we can reason from the ground up about distributed systems and the inherent limitations associated with them. It’s a reality check.

Practically speaking, availability is typically described in terms of “uptime” or the proportion of time which requests are successfully served. Brewer refers to this as “yield,” which is the probability of completing a request. This is the metric that is normally measured in “nines,” such as “five-nines availability.”

In the presence of faults there is typically a tradeoff between providing no answer (reducing yield) and providing an imperfect answer (maintaining yield, but reducing harvest).

However, this definition is only marginally more useful than CAP’s since it still doesn’t provide an upper bound on computation.

CAP is better used as a starting point for system design and understanding trade-offs than as a tool for reasoning about availability because it doesn’t really account for real availability. “Harvest” and “yield” show that availability is really a probabilistic property and that the trade with consistency is usually a gradient. But availability is much more nuanced than CAP’s “are we serving requests?” and harvest/yield’s “how many requests?” In practice, availability equates to SLAs. How many requests are we serving? At what rate? At what latency? At what percentiles? These things can’t really be formalized into a theorem like CAP because they are empirically observed, not properties of an algorithm.

Availability is specified by an SLA but observed by outside users. Unlike consistency, which is a property of the system and maintained by algorithm invariants, availability is determined by the client. For example, one user’s requests are served but another user’s are not. To the first user, the system is completely available.

To truly own your availability, you have to own every piece of infrastructure from the client to you, in addition to the infrastructure your system uses. Therefore, you can’t own your availability anymore than you can own Comcast’s fiber or Verizon’s 4G network. This is obviously impractical, if not impossible, but it might also be taking “own your availability” a bit too literally.

What “you own your availability” actually means is “you own your decisions.” Plain and simple. You own the decision to use AWS. You own the decision to use DynamoDB. You own the decision to not use multiple vendors. Owning your availability means making informed decisions about technology and vendors. “What is the risk/reward for using this database?” “Does using a PaaS/IaaS incur vendor lock-in? What happens when that service goes down?” It also means making informed decisions about the business. “What is the cost of our providers not meeting their SLAs? Is it cost-effective to have redundant providers?”

An SLA is not an insurance policy or a hedge against the business impact of an outage, it’s merely a refund policy. Use them to set expectations and make intelligent decisions, but don’t bank the business on them. Availability is not a timeshare. It’s not at will. You can’t just pawn it off, just like you can’t redirect your tech support to Amazon or Google.

It’s impossible to own your availability because there are too many things left to probability, too many unknowns, and too many variables outside of our control. Own as much as you can predict, as much as you can control, and as much as you can afford. The rest comes down to making informed decisions, hoping for the best, and planning for the worst.

Service-Disoriented Architecture

“You can have a second computer once you’ve shown you know how to use the first one.” -Paul Barham

The first rule of distributed systems is don’t distribute your system until you have an observable reason to. Teams break this rule on the regular. People have been talking about service-oriented architecture for a long time, but only recently have microservices been receiving the hype.

The problem, as Martin Fowler observes, is that teams are becoming too eager to adopt a microservice architecture without first understanding the inherent overheads. A contributing factor, I think, is you only hear the success stories from companies who did it right, like Netflix. However, what folks often fail to realize is that these companies—in almost all cases—didn’t start out that way. There was a long and winding path which led them to where they are today. The inverse of this, which some refer to as microservice envy, is causing teams to rush into microservice hell. I call this service-disoriented architecture (or sometimes disservice-oriented architecture when the architecture is DOA).

The term “monolith” has a very negative connotation—unscalable, unmaintainable, unresilient. These things are not intrinsically tied to each other, however, and there’s no reason a single system can’t be modular, maintainable, and fault tolerant at reasonable scale. It’s just less sexy. Refactoring modular code is much easier than refactoring architecture, and refactoring across service boundaries is equally difficult. Fowler describes this as monolith-first, and I think it’s the right approach (with some exceptions, of course).

Don’t even consider microservices unless you have a system that’s too complex to manage as a monolith. The majority of software systems should be built as a single monolithic application. Do pay attention to good modularity within that monolith, but don’t try to separate it into separate services.

Service-oriented architecture is about organizational complexity and system complexity. If you have both, you have a case to distribute. If you have one of the two, you might have a case (although if you have organizational complexity without system complexity, you’ve probably scaled your organization improperly). If you have neither, you do not have a case to distribute. State, specifically distributed state, is hell, and some pundits argue SOA is satan—perhaps a necessary evil.

There are a lot of motivations for microservices: anti-fragility, fault tolerance, independent deployment and scaling, architectural abstraction, and technology isolation. When services are loosely coupled, the system as a whole tends to be less fragile. When instances are disposable and stateless, services tend to be more fault tolerant because we can spin them up and down, balance traffic, and failover. When responsibility is divided across domain boundaries, services can be independently developed, deployed, and scaled while allowing the right tools to be used for each.

We also need to acknowledge the disadvantages. Adopting a microservice architecture does not automatically buy you anti-fragility. Distributed systems are incredibly precarious. We have to be aware of things like asynchrony, network partitions, node failures, and the trade-off between availability and data consistency. We have to think about resiliency but also the business and UX implications. We have to consider the boundaries of distributed systems like CAP and exactly-once delivery.

When distributing, the emphasis should be on resilience engineering and adopting loosely coupled, stateless components—not microservices for microservices’ sake. We need to view eventual consistency as a tool, not a side effect. The problem I see is that teams often end up with what is essentially a complex, distributed monolith. Now you have two problems. If you’re building a microservice which doesn’t make sense outside the context of another system or isn’t useful on its own, stop and re-evaluate. If you’re designing something to be fast and correct, realize that distributing it will frequently take away both.

Like anti-fragility, microservices do not automatically buy you better maintainability or even scalability. Adopting them requires the proper infrastructure and organization to be in place. Without these, you are bound to fail. In theory, they are intended to increase development velocity, but in many cases the microservice premium ends up slowing it down while creating organizational dependencies and bottlenecks.

There are some key things which must be in place in order for a microservice architecture to be successful: a proper continuous-delivery pipeline, competent DevOps and Ops teams, and prudent service boundaries, to name a few. Good monitoring is essential. It’s also important we have a thorough testing and integration story. This isn’t even considering the fundamental development complexities associated with SOA mentioned earlier.

The better strategy is a bottom-up approach. Start with a monolith or small set of coarse-grained services and work your way up. Make sure you have the data model right. Break out new, finer-grained services as you need to and as you become more confident in your ability to maintain and deploy discrete services. It’s largely about organizational momentum. A young company jumping straight to a microservice architecture is like a golf cart getting on the freeway.

Microservices offer a number of advantages, but for many companies they are a bit of a Holy Grail. Developers are always looking for a silver bullet, but there is always a cost. What we need to do is minimize this cost, and with microservices, this typically means easing our way into it rather than diving into the deep end. Team autonomy and rapid iteration are noble goals, but if we’re not careful, we can end up creating an impedance. Microservices require organization and system maturity. Otherwise, they end up being a premature architectural optimization with a lot of baggage. They end up creating a service-disoriented architecture.

Distributed Systems Are a UX Problem

Distributed systems are not strictly an engineering problem. It’s far too easy to assume a “backend” development concern, but the reality is there are implications at every point in the stack. Often the trade-offs we make lower in the stack in order to buy responsiveness bubble up to the top—so much, in fact, that it rarely doesn’t impact the application in some way. Distributed systems affect the user. We need to shift the focus from system properties and guarantees to business rules and application behavior. We need to understand the limitations and trade-offs at each level in the stack and why they exist. We need to assume failure and plan for recovery. We need to start thinking of distributed systems as a UX problem.

The Truth is Prohibitively Expensive

Stop relying on strong consistency. Coordination and distributed transactions are slow and inhibit availability. The cost of knowing the “truth” is prohibitively expensive for many applications. For that matter, what you think is the truth is likely just a partial or outdated version of it.

Instead, choose availability over consistency by making local decisions with the knowledge at hand and design the UX accordingly. By making this trade-off, we can dramatically improve the user’s experience—most of the time.

Failure Is an Option

There are a lot of problems with simultaneity in distributed computing. As Justin Sheehy describes it, there is no “now” when it comes to distributed systems—that article, by the way, is a must-read for every engineer, regardless of where they work in the stack.

While some things about computers are “virtual,” they still must operate in the physical world and cannot ignore the challenges of that world.

Even though computers operate in the real world, they are disconnected from it. Imagine an inventory system. It may place orders to its artificial heart’s desire, but if the warehouse burns down, there’s no fulfilling them. Even if the system is perfect, its state may be impossible. But the system is typically not perfect because the truth is prohibitively expensive. And not only do warehouses catch fire or forklifts break down, as rare as this may be, but computers fail and networks partition—and that’s far less rare.

The point is, stop trying to build perfect systems because one of two things will happen:

1. You have a false sense of security because you think the system is perfect, and it’s not.

or

2. You will never ship because perfection is out of reach or exorbitantly expensive.

Either case can be catastrophic, depending on the situation. With systems, failure is not only an option, it’s an inevitability, so let’s plan for it as such. We have a lot to gain by embracing failure. Eric Brewer articulated this idea in a recent interview:

So the general answer is you allow things to be inconsistent and then you find ways to compensate for mistakes, versus trying to prevent mistakes altogether. In fact, the financial system is actually not based on consistency, it’s based on auditing and compensation. They didn’t know anything about the CAP theorem, that was just the decision they made in figuring out what they wanted, and that’s actually, I think, the right decision.

We can look to ATMs, and banks in general, as the canonical example for how this works. When you withdraw money, the bank could choose to first coordinate your account, calculating your available balance at that moment in time, before issuing the withdrawal. But what happens when the ATM is temporarily disconnected from the bank? The bank loses out on revenue.

Instead, they make a calculated risk. They choose availability and compensate the risk of overdraft with interest and charges. Likewise, banks use double-entry bookkeeping to provide an audit trail. Every credit has a corresponding debit. Mistakes happen—accounts are debited twice, an account is credited without another being debited—the failure modes are virtually endless. But we audit and compensate, detect and recover. Banks are loosely coupled systems. Accountants don’t use erasers. Why should programmers?

When you find yourself saying “this is important data or people’s money, it has to be correct,” consider how the problem was solved before computers. Building on Quicksand by Dave Campbell and Pat Helland is a great read on this topic:

Whenever the authors struggle with explaining how to implement loosely-coupled solutions, we look to how things were done before computers. In almost every case, we can find inspiration in paper forms, pneumatic tubes, and forms filed in triplicate.

Consider the lost request and its idempotent execution. In the past, a form would have multiple carbon copies with a printed serial number on top of them. When a purchase-order request was submitted, a copy was kept in the file of the submitter and placed in a folder with the expected date of the response. If the form and its work were not completed by the expected date, the submitter would initiate an inquiry and ask to locate the purchase-order form in question. Even if the work was lost, the purchase-order would be resubmitted without modification to ensure a lack of confusion in the processing of the work. You wouldn’t change the number of items being ordered as that may cause confusion. The unique serial number on the top would act as a mechanism to ensure the work was not performed twice.

Computers allow us to greatly improve the user experience, but many of the same fail-safes still exist, just slightly rethought.

The idea of compensation is actually a common theme within distributed systems. The Saga pattern is a great example of this. Large-scale systems often have to coordinate resources across disparate services.  Traditionally, we might solve this problem using distributed transactions like two-phase commit. The problem with this approach is it doesn’t scale very well, it’s slow, and it’s not particularly fault tolerant. With 2PC, we have deadlock problems and even 3PC is still susceptible to network partitions.

Sagas split a long-lived transaction into individual, interleaved sub-transactions. Each sub-transaction in the sequence has a corresponding compensating transaction which reverses its effects. The compensating transactions must be idempotent so they can be safely retried. In the event of a partial execution, the compensating transactions are run and the Saga is effectively rolled back.

The commonly used example for Sagas is booking a trip. We need to ensure flight, car rental, and hotel are all booked or none are booked. If booking the flight fails, we cancel the hotel and car, etc. Sagas trade off atomicity for availability while still allowing us to manage failure, a common occurrence in distributed systems.

Compensation has a lot of applications as a UX principle because it’s really the only way to build loosely coupled, highly available services.

Calculated Recovery

Pat Helland describes computing as nothing more than “memories, guesses, and apologies.” Computers always have partial knowledge. Either there is a disconnect with the real world (warehouse is on fire) or there is a disconnect between systems (System A sold a Foo Widget but, unbeknownst to it, System B just sold the last one in inventory—oops!). Systems don’t make decisions, they make guesses. The guess might be good or it might be bad, but rarely is there certainty. We can wait to collect as much information as possible before making a guess, but it means progress can’t be made until the system is confident enough to do so.

Computers have memory. This means they remember facts they have learned and guesses they have made. Memories help systems make better guesses in the future, and they can share those memories with other systems to help in their guesses. We can store more memories at the cost of more money, and we can survey other systems’ memories at the cost of more latency.

It is a business decision how much money, latency, and energy should be spent on reducing forgetfulness. To make this decision, the costs of the increased probability of remembering should be weighed against the costs of occasionally forgetting stuff.

Generally speaking, the more forgetfulness we can tolerate, the more responsive our systems will be, provided we know how to handle the situations where something is forgotten.

Sooner or later, a system guesses wrong. It sucks. It might mean we lose out on revenue; the business isn’t happy. It might mean the user loses out on what they want; the customer isn’t happy. But we calculate the impact of these wrong guesses, we determine when the trade-offs do and don’t make sense, we compensate, and—when shit hits the fan—we apologize.

Business realities force apologies.  To cope with these difficult realities, we need code and, frequently, we need human beings to apologize. It is essential that businesses have both code and people to manage these apologies.

Distributed systems are as much about failure modes and recovery as they are about being operationally correct. It’s critical that we can recover gracefully when something goes wrong, and often that affects the UX.

We could choose to spend extraordinary amounts of money and countless man-hours laboring over a system which provides the reliability we want. We could construct a data center. We could deploy big, expensive machines. We could install redundant fiber and switches. We could drudge over infallible code. Or we could stop, think for a moment, and realize maybe “sorry” is a more effective alternative. Knowing when to make that distinction can be the difference between a successful business and a failed one. The implications of distributed systems may be wider reaching than you thought.