FIFO, Exactly-Once, and Other Costs

There’s been a lot of discussion about exactly-once semantics lately, sparked by the recent announcement of support for it in Kafka 0.11. I’ve already written at length about strong guarantees in messaging.

My former coworker Kevin Sookocheff recently made a post about ordered and exactly-once message delivery as it relates to Amazon SQS. It does a good job of illustrating what the trade-offs are, and I want to drive home some points.

In the article, Kevin shows how FIFO delivery is really only meaningful when you have one single-threaded publisher and one single-threaded receiver. Amazon’s FIFO queues allow you to control how restrictive this requirement is by applying ordering on a per-group basis. In other words, we can improve throughput if we can partition work into different ordered groups rather than a single totally ordered group. However, FIFO still effectively limits throughput on a group to a single publisher and single subscriber. If there are multiple publishers, they have to coordinate to ensure ordering is preserved with respect to our application’s semantics. On the subscriber side, things are simpler because SQS will only deliver messages in a group one at a time in order amongst subscribers.

Amazon’s FIFO queues also have an exactly-once processing feature which deduplicates messages within a five-minute window. Note, however, that there are some caveats with this, the obvious one being duplicate delivery outside of the five-minute window. A mission-critical system would have to be designed to account for this possibility. My argument here is if you still have to account for it, what’s the point unless the cost of detecting duplicates is prohibitively expensive? But to be fair, five minutes probably reduces the likelihood enough to the point that it’s useful and in those rare cases where it fails, the duplicate is acceptable.

The more interesting caveat is that FIFO queues do not guarantee exactly-once delivery to consumers (which, as we know, is impossible). Rather, they offer exactly-once processing by guaranteeing that once a message has successfully been acknowledged as processed, it won’t be delivered again. It’s up to applications to ack appropriately. When a message is delivered to a consumer, it remains in the queue until it’s acked. The visibility timeout prevents other consumers from processing it. With FIFO queues, this also means head-of-line blocking for other messages in the same group.

Now, let’s assume a subscriber receives a batch of messages from the queue, processes them—perhaps by storing some results to a database—and then sends an acknowledgement back to SQS which removes them from the queue. It’s entirely possible that during that process step a delay happens—a prolonged GC pause, crash, network delay, whatever. When this happens, the visibility timeout expires and the messages are redelivered and, potentially, reprocessed. What has to happen here is essentially cooperation between the queue and processing step. We might do this by using a database transaction to atomically process and acknowledge the messages. An alternative, yet similar, approach might be to use a write-ahead-log-like strategy whereby the consuming system reads messages from SQS and transactionally stores them in a database for future processing. Once the messages have been committed, the consumer deletes the messages from SQS. In either of these approaches, we’re basically shifting the onus of exactly-once processing onto an ACID-compliant relational database.

Note that this is really how Kafka achieves its exactly-once semantics. It requires end-to-end cooperation for exactly-once to work. State changes in your application need to be committed transactionally with your Kafka offsets.

As Kevin points out, FIFO SQS queues offer exactly-once processing only if 1) publishers never publish duplicate messages wider than five minutes apart and 2) consumers never fail to delete messages they have processed from the queue. Solving either of these problems probably requires some kind of coordination between the application and queue, likely in the form of a database transaction. And if you’re using a database either as the message source, sink, or both, what are exactly-once FIFO queues actually buying you? You’re paying a seemingly large cost in throughput for little perceived value. Your messages are already going through some sort of transactional boundary that provides ordering and uniqueness.

Where I see FIFO and exactly-once semantics being useful is when talking to systems which cannot cooperate with the end-to-end transaction. This might be a legacy service or a system with side effects, such as sending an email. Often in the case of these “distributed workflows”, latency is a lower priority and humans can be involved in various steps. Other use cases might be scheduled integrations with legacy batch processes where throughput is known a priori. These can simply be re-run when errors occur.

When people describe a messaging system with FIFO and exactly-once semantics, they’re usually providing a poor description of a relational database with support for ACID transactions. Providing these semantics in a messaging system likely still involves database transactions, it’s just more complicated. It turns out relational databases are really good at ensuring invariants like exactly-once.

I’ve picked on Kafka a bit in the past, especially with the exactly-once announcement, but my issue is not with Kafka itself. Kafka is a fantastic technology. It’s well-architected, battle-tested, and the team behind it is talented and knows the space well. My issue is more with some of the intangible costs associated with it. The same goes for similar systems (like exactly-once FIFO SQS queues). Beyond just the operational complexity (which Confluent is attempting to tackle with its Kafka-as-a-service), you have to get developer understanding. This is harder than it sounds in any modestly-sized organization. That’s not to say that developers are dumb or incapable of understanding, but the fact is your average developer is simply not thinking about all of the edge cases brought on by operating distributed systems at scale. They see “exactly-once FIFO queues” in SQS or “exactly-once delivery” in Kafka and take it at face value. They don’t read beyond the headline. They don’t look for the caveats. That’s why I took issue with how Kafka claimed to do the impossible with exactly-once delivery when it’s really exactly-once processing or, as I’ve come to call it, “atomic processing.” Henry Robinson put it best when talking about the Kafka announcement:

If I were to rewrite the article, I’d structure it thus: “exactly-once looks like atomic broadcast. Atomic broadcast is impossible. Here’s how exactly-once might fail, and here’s why we think you shouldn’t be worried about it.” That’s a harder argument for users to swallow…

Basically “exactly-once” markets better. It’s something developers can latch onto, but it’s also misleading. I know it’s only a matter of time before people start linking me to the Confluent post saying, “see, exactly-once is easy!” But this is just pain deferral. On the contrary, exactly-once semantics require careful construction of your application, assume a closed, transactional world, and do not support the case where I think people want exactly-once the most: side effects.

Interestingly, one of my chief concerns about Kafka’s implementation was what the difficulty of ensuring end-to-end cooperation would be in practice. Side effects into downstream systems with no support for idempotency or transactions could make it difficult. Jay’s counterpoint to this was that the majority of users are using good old-fashioned relational databases, so all you really need to do is commit your offsets and state changes together. It’s not trivial, but it’s not that much harder than avoiding partial updates on failure if you’re updating multiple tables. This brings us back to two of the original points of contention: why not merely use the database for exactly-once in the first place and what about legacy systems?

That’s not to say exactly-once semantics, as offered in systems like SQS and Kafka, are not useful. I think we just need to be more conscientious of the other costs and encourage developers to more deeply understand the solution space—too much sprinkling on of Kafka or exactly-once or FIFO and not enough thinking about the actual business problem. Too much prescribing of solutions and not enough describing of problems.

My thanks to Kevin Sookocheff and Beau Lyddon for reviewing this post.

CAP and the Illusion of Choice

The CAP theorem is widely discussed and often misunderstood within the world of distributed systems. It states that any networked, shared-data system can, at most, guarantee two of three properties: consistency, availability, and partition tolerance. I won’t go into detail on CAP since the literature is abundant, but the notion of “two of three”—while conceptually accessible—is utterly misleading. Brewer has indicated this, echoed by many more, but there still seems to be a lot of confusion when the topic is brought up. The bottom line is you can’t sacrifice partition tolerance, but it seems CAP is a bit more nuanced than that.

On the surface, CAP presents three categories of systems. CA implies one which maintains consistency and availability given a perfectly reliable network. CP provides consistency and partition tolerance at the expense of availability, and AP gives us availability and partition tolerance without linearizability. Clearly, CA suggests that the system guarantees consistency and availability only when there are no network partitions. However, to say that there will never be network partitions is blatantly dishonest. This is where the source of much confusion lies.

Partitions happen. They happen for countless reasons. Switches fail, NICs fail, link layers fail, servers fail, processes fail. Partitions happen even when systems don’t fail due to GC pauses or prolonged I/O latency for example. Let’s accept this as fact and move on. What this means is that a “CA” system is CA only until it’s not. Once that partition happens, all your assumptions and all your guarantees hit the fan in spectacular fashion. Where does this leave us?

At its core, CAP is about trade-offs, but it’s an exclusion principle. It tells us what our systems cannot do given the nature of reality. The distinction here is that not all systems fit nicely into these archetypes. If Jepsen has taught us anything, it’s that the majority of systems don’t fit into any of these categories, even when the designers state otherwise. CAP isn’t as black and white as people paint it.

There’s a really nice series on CAP written recently by Nicolas Liochon. It does an excellent job of explaining the terminology (far better than I could), which is often overloaded and misused, and it makes some interesting points. Nicolas suggests that CA should really be thought of as a specification for an operating range, while CP and AP are descriptions of behavior. I would tend to agree, but my concern is that this eschews the trade-off that must be made.

We know that we cannot avoid network partition. What if we specify our application like this: “this application does not handle network partition. If it happens, the application will be partly unavailable, the data may be corrupted, and you may have to fix the data manually.” In other words, we’re really asking to be CA here, but if a partition occurs we may be CP, or, if we are unlucky, both not available and not consistent.

As an operating range, CA basically means when a partition occurs, the system throws up its hands and says, “welp, see ya later!” If we specify that the system does not work well under network partitions, we’re saying partitions are outside its operating range. What good is a specification for a spaceship designed to fly the upper atmosphere of planet Terah when we’re down here on Earth? We live in a world where partitions are the norm, so surely we need to include them in our operating range. CA does specify an operating range, but it’s not one you can put in an SLA and hand to a customer. Colloquially, it’s just a mode of “undefined behavior”—the system is consistent and available—until it’s not.

CAP isn’t a perfect metaphor, but in my mind, it does a decent job of highlighting the fundamental trade-offs involved in building distributed systems. Either we have linearizable writes or we don’t. If we do, we can’t guarantee availability. It’s true that CAP seems to imply a binary choice between consistency and availability in the face of partitions. In fact, it’s not a binary choice. You have AP, CP, or None of the Above. The problem with None of the Above is that it’s difficult to reason about and even more difficult to define. Ultimately, it ends up being more an illusion of choice since we cannot sacrifice partition tolerance.

Not Invented Here

Engineers love engineering things. The reason is self-evident (and maybe self-fulfilling—why else would you be an engineer?). We like to think we’re pretty good at solving problems. Unfortunately, this mindset can, on occasion, yield undesirable consequences which might not be immediately apparent but all the while damaging.

Developers are all in tune with the idea of “don’t reinvent the wheel,” but it seems to be eschewed sometimes, deliberately or otherwise. People don’t generally write their own merge sort, so why would they write their own consensus protocol? Anecdotally speaking, they do.

Not-Invented-Here Syndrome is a very real thing. In many cases, consciously or not, it’s a cultural problem. In others, it’s an engineering one. Camille Fournier’s blog post on ZooKeeper helps to illustrate this point and provide some context. In it, she describes why some distributed systems choose to rely on external services, such as ZooKeeper, for distributed coordination, while others build in their own coordination logic.

We draw a parallel between distributed systems and traditional RDBMSs, which typically implement their own file system and other low-level facilities. Why? Because it’s their competitive advantage. SQL databases sell because they offer finely tuned performance, and in order to do that, they need to control these things that the OS otherwise provides. Distributed databases like Riak sell because they own the coordination logic, which helps promote their competitive advantage. This follows what Joel Spolsky says about NIH Syndrome in that “if it’s a core business function—do it yourself, no matter what.”

If you’re developing a computer game where the plot is your competitive advantage, it’s OK to use a third party 3D library. But if cool 3D effects are going to be your distinguishing feature, you had better roll your own.

This makes a lot of sense. My sorting algorithm is unlikely to provide me with a competitive edge, but something else might, even if it’s not particularly novel.

So in some situations, homegrown is justifiable, but that’s not always the case. Redis’ competitive advantage is its predictably low latencies and data structures. Does it make sense for it to implement its own clustering and leader election protocols? Maybe, but this is where NIH can bite you. If what you’re doing is important and there’s precedent, lean on existing research and solutions. Most would argue write safety is important, and there is certainly precedent for leader election. Why not leverage that work? Things like Raft, Paxos, and Zab provide solutions which are proven using formal methods and are peer reviewed. That doesn’t mean new solutions can’t be developed, but they generally require model checking and further scrutiny to ensure correctness. Otherwise, you’ll inevitably run into problems. Implementing our own solutions can provide valuable insight, but leave them at home if they’re not rigorously approached. Rolling your own and calling it “good enough” is dishonest to your users if it’s not properly communicated.

Elasticsearch is another interesting case to look at. You might say Elasticsearch’s competitive advantage is its full-text search engine, but it’s not. Like Solr, it’s built on Lucene. Elasticsearch was designed from the ground-up to be distributed. This is what gives it a leg up over Solr and other similar search servers where horizontal scaling and fault tolerance were essentially tacked on. In a way, this resembles what happened with Redis, where failover and clustering were introduced as an afterthought. However, unlike Redis, which chose to implement its own failover coordination and cluster-membership protocol, Solr opted to use ZooKeeper as an external coordinator.

We see that Elasticsearch’s core advantage is its distributed nature. Following that notion, it makes sense for it to own that coordination, which is why its designers chose to implement their own internal cluster membership, ZenDisco. But it turns out writing cluster-membership protocols is really fucking hard, and unless you’ve written proofs for it, you probably shouldn’t do it at all. The analogy here would be writing your own encryption algorithm—there’s tons of institutional knowledge which has laid the groundwork for solutions which are well-researched and well-understood. That knowledge should be embraced in situations like this.

I don’t mean to pick on Redis and Elasticsearch. They’re both excellent systems, but they serve as good examples for this discussion. The problem is that users of these systems tend to overlook the issues exposed by this mentality. Frankly, few people would know problems exist unless they are clearly documented by vendors (and not sales people) and even then, how many people actually read the docs cover-to-cover? It’s essential we know a system’s shortcomings and edge cases so we can recognize which situations to apply it and, more important, which we should not.

You don’t have to rely on an existing third-party library or service. Believe it or not, this isn’t a sales pitch for ZooKeeper. If it’s a core business function, it probably makes sense to build it yourself as Joel describes. What doesn’t make sense, however, is to build out whatever that is without being cognizant of conventional wisdom. I’m amazed at how often people are willing to throw away institutional knowledge, either because they don’t seek it out or they think they can do better (without formal verification). If I have seen further, it is by standing on the shoulders of giants.

Scaling Shared Data in Distributed Systems

Sharing mutable data at large scale is an exceedingly difficult problem. In their seminal paper CRDTs: Consistency without concurrency control, Shapiro et al. describe why the CAP theorem demands a give and take between scalability and consistency. In general, CAP requires us to choose between CP and AP. The former requires serializing every write, which doesn’t scale beyond a small cluster. The latter ensures scalability by giving up consistency.

Sharing Data in Centralized Systems

We tend to prefer weaker consistency models because they mean lower latency and higher availability. To highlight this point, consider the fact that the memory models for most programming languages are not serializable by default. More concisely, programs with shared memory are not inherently thread-safe. This is a conscious design decision because enforcing memory serializability incurs a significant latency penalty. Instead, programming languages require explicit memory barriers which can be used around the critical sections which need this property.

For example, the Java memory model uses within-thread as-if-serial semantics. This means the execution of a thread in isolation, regardless of runtime optimizations, is guaranteed to be the same as it would have been had all statements been run in program order. The implication of as-if-serial semantics is that it gives up consistency—different threads can and will have different views of the data. Java requires the use of memory barriers, either through explicit locking or the volatile keyword, in order to establish a happens-before relationship between statements in different threads.

This can be thought of as scaling shared data! We have multiple threads (systems) accessing the same data. While not distributed, many of the same ideas apply. Consistency, by definition, requires linearizability. In multi-threaded programs, we achieve this with mutexes. In distributed systems, we use transactions and distributed locking. Intuitively, both involve performance trade-offs.

Sharing Data in Distributed Systems

Consider a shared, global counter used to measure ad impressions on a website accessed by millions of users around the world.

shared_data

Traditionally, we might implement this using transactions—get the value, increment it by one, then save it back atomically. The problem is transactions will not scale. In order to guarantee consistency, they must be serialized. This results in high latency and low availability if a lot of writes are occurring. We lose some of the key advantages of distributed systems: parallel computation and availability.

So CAP makes it difficult to scale mutable, shared data. How do we do it then? There are several different strategies, each with their own pros and cons.

Immutable Data

Scaling shared read-only data is easy using replication techniques. This means the simplest solution for sharing data in a distributed system is to use immutable data. If we don’t have to worry about writes, then scaling is trivial. Unfortunately, this isn’t always possible, but if your use case allows for it, it’s the best option.

Last-Write Wins

From a set of conflicting writes, LWW selects the one with the most recent timestamp. Clock drift happens, so LWW will inevitably lead to data loss with enough concurrent writes. It’s critical to accept this reality, but it’s often acceptable for some use cases. LWW trades consistency for availability.

Application-Level Conflict Resolution

Often times, the best way to ensure safety is by resolving write conflicts at the application level. When there are conflicting writes on a piece of data, applications can apply business rules to determine the canonical update. An example of this is Riak’s application-side conflict resolution strategy.

Causal Ordering

Rather than relying on LWW, which has a high probability of data loss, we can use the causal relationships between writes in order to determine which one to apply. Unlike timestamps, which attempt to provide a total order, causal ordering establishes a partial order. We can approximate a causal ordering by using techniques like Lamport timestamps or vector clocks. By storing a causal history with each write and reading that history before each write, we can make informed decisions on the correctness of updates. The trade-off here is the added overhead of storing this additional metadata and the extra round trip.

Distributed Data Types

CRDTs, or convergent/commutative replicated data types, are the new, up-and-coming solution for scaling shared data, but they aren’t at all new. In fact, the theory behind CRDTs has been in use for hundreds of years. CRDTs are grounded in mathematics. Operations or updates on a CRDT always converge. Because the operations must be commutative, associative, and idempotent, they can be applied in any order and the outcome will always be the same. This means we don’t care about causal ordering—it doesn’t matter.

CRDTs are generally modeled after common data structures like sets, maps, lists, and counters, just in a distributed sense. What they provide us are highly available, eventually consistent data structures in which we don’t have to worry about write coordination.

Aside from the operation requirements, the other drawback of CRDTs is that they require knowledge of all clients. Each client has a replica of the CRDT, so the global state is determined by merging them. And although CRDTs can be applied to a wide variety of use cases, they typically require some interpretation and specialization of common data structures. These interpretations tend to be more limited in capability.

In Summary

Scaling mutable data is hard. On the other hand, scaling immutable data is easy, so if you can get away with it, do it. There are a number of ways to approach the problem, but as with anything, it all comes down to your use case. The solutions are all about trade-offs—namely the trade-off between consistency and availability. Use weakly consistent models when you can because they afford you high availability and low latency, and rely on stronger models only when absolutely necessary. Do what makes sense for your system.

Understanding Consensus

A classical problem presented within the field of distributed systems is the Byzantine Generals Problem. In it, we observe two allied armies positioned on either side of a valley. Within the valley is a fortified city. Each army has a general with one acting as commander. Both armies must attack at the same time or face defeat by the city’s defenders. In order to come to an agreement on when to attack, messengers must be sent through the valley, risking capture by the city’s patrols. Consider the diagram below illustrating this problem.

byzantine_generals

In the above scenario, Army A has sent a messenger to Army B with a message saying “Attack at 0700.” Army B receives this message and dispatches a messenger carrying an acknowledgement of the attack plans; however, our ill-fated messenger has been intercepted by the city’s defenders.

How do our armies come to an agreement on when to attack? Perhaps Army A sends 100 messengers and attacks regardless. Unfortunately, if all of the messengers are captured, this would result in a swift defeat because A would attack without B. What if, instead, A sends 100 messengers, waits for acknowledgements of those messages, and only attacks if it reaches a certain level of confidence, say receiving 75 or more confirmations? Yet again, this could very well end in defeat, this time with B attacking without A.

We also need to bear in mind that sending messages has a certain amount of overhead. We can’t, in good conscience, send a million messengers to their potential demise. Or maybe we can, but it’s more than the number of soldiers in our army.

In fact, we can’t reliably make a decision. It’s provenly impossible. In the face of a Byzantine failure, it becomes even more complicated by the possibility of traitors or forged messages.

Now replace two generals with N generals. Coming to a perfectly reliable agreement between two generals was already impossible but becomes dramatically more complicated. It’s a problem more commonly referred to as distributed consensus, and it’s the focus of an army of researchers.

The problem of consensus is blissfully simple, but the solution is far from trivial. Consensus is the basis of distributed coordination services, locking protocols, and databases. A monolithic system (think a MySQL server) can enforce ACID constraints with consistent reads but exhibits generally poor availability and fault tolerance. The original Google App Engine datastore relied on a master/slave architecture where a single data center held the primary copy of data which was replicated to backup sites asynchronously. This offered applications strong consistency and low latency with the implied trade-off of availability. The health of an application was directly tied to the health of a data center. Beyond transient losses, it also meant periods of planned unavailability and read-only access while Google performed data center maintenance. App Engine has since transitioned to a high-replication datastore which relies on distributed consensus to replicate data across sites. This allows the datastore to continue operating in the presence of failures and at greater availability. In agreement with CAP, this naturally means higher latency on writes.

There are a number of solutions to distributed consensus, but most of them tend to be pretty characteristic of each other. We will look at some of these solutions, including multi-phase commit and state-replication approaches.

Two-Phase Commit

Two-phase commit (2PC) is the simplest multi-phase commit protocol. In two-phase commit, all transactions go through a coordinator who is responsible for ensuring a transaction occurs across one or more remote sites (cohorts).

2pc

When the coordinator receives a request, it asks each of its cohorts to vote yes or no. During this phase, each cohort performs the transaction up to the point of committing it. The coordinator then waits for all votes. If the vote is unanimously “yes,” it sends a message to its cohorts to commit the transaction. If one or more vote is “no,” a message is sent to rollback. The cohorts then acknowledge whether the transaction was committed or rolled back and the process is complete.

Two-phase commit is a blocking protocol. The coordinator blocks waiting for votes from its cohorts, and cohorts block waiting for a commit/rollback message from the coordinator. Unfortunately, this means 2PC can, in some circumstances, result in a deadlock, e.g. the coordinator dies while cohorts wait or a cohort dies while the coordinator waits. Another problematic scenario is when a coordinator and cohort simultaneously fail. Even if another coordinator takes its place, it won’t be able to determine whether to commit or rollback.

Three-Phase Commit

Three-phase commit (3PC) is designed to solve the problems identified in two-phase by implementing a non-blocking protocol with an added “prepare” phase. Like 2PC, it relies on a coordinator which relays messages to its cohorts.

3pc

Unlike 2PC, cohorts do not execute a transaction during the voting phase. Rather, they simply indicate if they are prepared to perform the transaction. If cohorts timeout during this phase or there is one or more “no” vote, the transaction is aborted. If the vote is unanimously “yes,” the coordinator moves on to the “prepare” phase, sending a message to its cohorts to acknowledge the transaction will be committed. Again, if an ack times out, the transaction is aborted. Once all cohorts have acknowledged the commit, we are guaranteed to be in a state where all cohorts have agreed to commit. At this point, if the commit message from the coordinator is not received in the third phase, the cohort will go ahead and commit anyway. This solves the deadlocking problems described earlier. However, 3PC is still susceptible to network partitions. If a partition occurs, the coordinator will timeout and progress will not be made.

State Replication

Protocols like Raft, Paxos, and Zab are popular and widely used solutions to the problem of distributed consensus. These implement state replication or primary-backup using leaders, quorums, and replicas of operation logs or incremental delta states.

consensus quorum

These protocols work by electing a leader (coordinator). Like multi-phase commit, all changes must go through that leader, who then broadcasts the changes to the group. Changes occur by appending a log entry, and each node has its own log replica. Where multi-phase commit falls down in the face of network partitions, these protocols are able to continue working by relying on a quorum (majority). The leader commits the change once the quorum has acknowledged it.

The use of quorums provide partition tolerance by fencing minority partitions while the majority continues to operate. This is the pessimistic approach to solving split-brain, so it comes with an inherent availability trade-off. This problem is mitigated by the fact that each node hosts a replicated state machine which can be rebuilt or reconciled once the partition is healed.

Google relies on Paxos for its high-replication datastore in App Engine as well as its Chubby lock service. The distributed key-value store etcd uses Raft to manage highly available replicated logs. Zab, which differentiates itself from the former by implementing a primary-backup protocol, was designed for the ZooKeeper coordination service. In general, there are several different implementations of these protocols, such as the Go implementation of Raft.

Distributed consensus is a difficult thing to get right, but it’s important to frame it within the context of CAP. We can ensure stronger consistency at the cost of higher latency and lower availability. On the other hand, we can achieve higher availability with decreased latency while giving up strong consistency. The trade-offs really depend on what your needs are.