Breaking and Entering: Lose the Lock While Embracing Concurrency

This article originally appeared on Workiva’s engineering blog as a two-part series.

Providing robust message routing was a priority for us at Workiva when building our distributed messaging infrastructure. This encompassed directed messaging, which allows us to route messages to specific endpoints based on service or client identifiers, but also topic fan-out with support for wildcards and pattern matching.

Existing message-oriented middleware, such as RabbitMQ, provide varying levels of support for these but don’t offer the rich features needed to power Wdesk. This includes transport fallback with graceful degradation, tunable qualities of service, support for client-side messaging, and pluggable authentication middleware. As such, we set out to build a new system, not by reinventing the wheel, but by repurposing it.

Eventually, we settled on Apache Kafka as our wheel, or perhaps more accurately, our log. Kafka demonstrates a telling story of speed, scalability, and fault tolerance—each a requisite component of any reliable messaging system—but it’s only half the story. Pub/sub is a critical messaging pattern for us and underpins a wide range of use cases, but Kafka’s topic model isn’t designed for this purpose. One of the key engineering challenges we faced was building a practical routing mechanism by which messages are matched to interested subscribers. On the surface, this problem appears fairly trivial and is far from novel, but it becomes quite interesting as we dig deeper.

Back to Basics

Topic routing works by matching a published message with interested subscribers. A consumer might subscribe to the topic “foo.bar.baz,” in which any message published to this topic would be delivered to them. We also must support * and # wildcards, which match exactly one word and zero or more words, respectively. In this sense, we follow the AMQP spec:

The routing key used for a topic exchange MUST consist of zero or more words delimited by dots. Each word may contain the letters A–Z and a–z and digits 0–9. The routing pattern follows the same rules as the routing key with the addition that * matches a single word, and # matches zero or more words. Thus the routing pattern *.stock.# matches the routing keys usd.stock and eur.stock.db but not stock.nasdaq.

This problem can be modeled using a trie structure. RabbitMQ went with this approach after exploring other options, like caching topics and indexing the patterns or using a deterministic finite automaton. The latter options have greater time and space complexities. The former requires backtracking the tree for wildcard lookups.

The subscription trie looks something like this:

subscription_trie

Even in spite of the backtracking required for wildcards, the trie ends up being a more performant solution due to its logarithmic complexity and tendency to fit CPU cache lines. Most tries have hot paths, particularly closer to the root, so caching becomes indispensable. The trie approach is also vastly easier to implement.

In almost all cases, this subscription trie needs to be thread-safe as clients are concurrently subscribing, unsubscribing, and publishing. We could serialize access to it with a reader-writer lock. For some, this would be the end of the story, but for high-throughput systems, locking is a major bottleneck. We can do better.

Breaking the Lock

We considered lock-free techniques that could be applied. Lock-free concurrency means that while a particular thread of execution may be blocked, all CPUs are able to continue processing other work. For example, imagine a program that protects access to some resource using a mutex. If a thread acquires this mutex and is subsequently preempted, no other thread can proceed until this thread is rescheduled by the OS. If the scheduler is adversarial, it may never resume execution of the thread, and the program would be effectively deadlocked. A key point, however, is that the mere lack of a lock does not guarantee a program is lock-free. In this context, “lock” really refers to deadlock, livelock, or the misdeeds of a malevolent scheduler.

In practice, what lock-free concurrency buys us is increased system throughput at the expense of increased tail latencies. Looking at a transactional system, lock-freedom allows us to process many concurrent transactions, any of which may block, while guaranteeing systemwide progress. Depending on the access patterns, when a transaction does block, there are always other transactions which can be processed—a CPU never idles. For high-throughput databases, this is essential.

Concurrent Timelines and Linearizability

Lock-freedom can be achieved using a number of techniques, but it ultimately reduces to a small handful of fundamental patterns. In order to fully comprehend these patterns, it’s important to grasp the concept of linearizability.

It takes approximately 100 nanoseconds for data to move from the CPU to main memory. This means that the laws of physics govern the unavoidable discrepancy between when you perceive an operation to have occurred and when it actually occurred. There is the time from when an operation is invoked to when some state change physically occurs (call it tinv), and there is the time from when that state change occurs to when we actually observe the operation as completed (call it tcom). Operations are not instantaneous, which means the wall-clock history of operations is uncertain. tinv and tcom vary for every operation. This is more easily visualized using a timeline diagram like the one below:

timeline

This timeline shows several reads and writes happening concurrently on some state. Physical time moves from left to right. This illustrates that even if a write is invoked before another concurrent write in real time, the later write could be applied first. If there are multiple threads performing operations on shared state, the notion of physical time is meaningless.

We use a linearizable consistency model to allow some semblance of a timeline by providing a total order of all state updates. Linearizability requires that each operation appears to occur atomically at some point between its invocation and completion. This point is called the linearization point. When an operation completes, it’s guaranteed to be observable by all threads because, by definition, the operation occurred before its completion time. After this point, reads will only see this value or a later one—never anything before. This gives us a proper sequencing of operations which can be reasoned about. Linearizability is a correctness condition for concurrent objects.

Of course, linearizability comes at a cost. This is why most memory models aren’t linearizable by default. Going back to our subscription trie, we could make operations on it appear atomic by applying a global lock. This kills throughput, but it ensures linearization.

lock trie

In reality, the trie operations do not occur at a specific instant in time as the illustration above depicts. However, mutual exclusion gives it the appearance and, as a result, linearizability holds at the expense of systemwide progress. Acquiring and releasing the lock appear instantaneous in the timeline because they are backed by atomic hardware operations like test-and-set. Linearizability is a composable property, meaning if an object is composed of linearizable objects, it is also linearizable. This allows us to construct abstractions from linearizable hardware instructions to data structures, all the way up to linearizable distributed systems.

Read-Modify-Write and CAS

Locks are expensive, not just due to contention but because they completely preclude parallelism. As we saw, if a thread which acquires a lock is preempted, any other threads waiting for the lock will continue to block.

Read-modify-write operations like compare-and-swap offer a lock-free approach to ensuring linearizable consistency. Such techniques loosen the bottleneck by guaranteeing systemwide throughput even if one or more threads are blocked. The typical pattern is to perform some speculative work then attempt to publish the changes with a CAS. If the CAS fails, then another thread performed a concurrent operation, and the transaction needs to be retried. If it succeeds, the operation was committed and is now visible, preserving linearizability. The CAS loop is a pattern used in many lock-free data structures and proves to be a useful primitive for our subscription trie.

CAS is susceptible to the ABA problem. These operations work by comparing values at a memory address. If the value is the same, it’s assumed that nothing has changed. However, this can be problematic if another thread modifies the shared memory and changes it back before the first thread resumes execution. The ABA problem is represented by the following sequence of events:

  1. Thread T1 reads shared-memory value A
  2. T1 is preempted, and T2 is scheduled
  3. T2 changes A to B then back to A
  4. T2 is preempted, and T1 is scheduled
  5. T1 sees the shared-memory value is A and continues

In this situation, T1 assumes nothing has changed when, in fact, an invariant may have been violated. We’ll see how this problem is addressed later.

At this point, we’ve explored the subscription-matching problem space, demonstrated why it’s an area of high contention, and examined why locks pose a serious problem to throughput. Linearizability provides an important foundation of understanding for lock-freedom, and we’ve looked at the most fundamental pattern for building lock-free data structures, compare-and-swap. Next, we will take a deep dive on applying lock-free techniques in practice by building on this knowledge. We’ll continue our narrative of how we applied these same techniques to our subscription engine and provide some further motivation for them.

Lock-Free Applied

Let’s revisit our subscription trie from earlier. Our naive approach to making it linearizable was to protect it with a lock. This proved easy, but as we observed, severely limited throughput. For a message broker, access to this trie is a critical path, and we usually have multiple threads performing inserts, removals, and lookups on it concurrently. Intuition tells us we can implement these operations without coarse-grained locking by relying on a CAS to perform mutations on the trie.

If we recall, read-modify-write is typically applied by copying a shared variable to a local variable, performing some speculative work on it, and attempting to publish the changes with a CAS. When inserting into the trie, our speculative work is creating an updated copy of a node. We commit the new node by updating the parent’s reference with a CAS. For example, if we want to add a subscriber to a node, we would copy the node, add the new subscriber, and CAS the pointer to it in the parent.

This approach is broken, however. To see why, imagine if a thread inserts a subscription on a node while another thread concurrently inserts a subscription as a child of that node. The second insert could be lost due to the sequencing of the reference updates. The diagram below illustrates this problem. Dotted lines represent a reference updated with a CAS.

trie cas add

The orphaned nodes containing “x” and “z” mean the subscription to “foo.bar” was lost. The trie is in an inconsistent state.

We looked to existing research in the field of non-blocking data structures to help illuminate a path. “Concurrent Tries with Efficient Non-Blocking Snapshots” by Prokopec et al. introduces the Ctrie, a non-blocking, concurrent hash trie based on shared-memory, single-word CAS instructions.

A hash array mapped trie (HAMT) is an implementation of an associative array which, unlike a hashmap, is dynamically allocated. Memory consumption is always proportional to the number of keys in the trie. A HAMT works by hashing keys and using the resulting bits in the hash code to determine which branches to follow down the trie. Each node contains a table with a fixed number of branch slots. Typically, the number of branch slots is 32. On a 64-bit machine, this would mean it takes 256 bytes (32 branches x 8-byte pointers) to store the branch table of a node.

The size of L1-L3 cache lines is 64 bytes on most modern processors. We can’t fit the branch table in a CPU cache line, let alone the entire node. Instead of allocating space for all branches, we use a bitmap to indicate the presence of a branch at a particular slot. This reduces the size of an empty node from roughly 264 bytes to 12 bytes. We can safely fit a node with up to six branches in a single cache line.

The Ctrie is a concurrent, lock-free version of the HAMT which ensures progress and linearizability. It solves the CAS problem described above by introducing indirection nodes, or I-nodes, which remain present in the trie even as nodes above and below change. This invariant ensures correctness on inserts by applying the CAS operation on the I-node instead of the internal node array.

An I-node may point to a Ctrie node, or C-node, which is an internal node containing a bitmap and array of references to branches. A branch is either an I-node or a singleton node (S-node) containing a key-value pair. The S-node is a leaf in the Ctrie. A newly initialized Ctrie starts with a root pointer to an I-node which points to an empty C-node. The diagram below illustrates a sequence of inserts on a Ctrie.

ctrie insert

An insert starts by atomically reading the I-node’s reference. Next, we copy the C-node and add the new key, recursively insert on an I-node, or extend the Ctrie with a new I-node. The new C-node is then published by performing a CAS on the parent I-node. A failed CAS indicates another thread has mutated the I-node. We re-linearize by atomically reading the I-node’s reference again, which gives us the current state of the Ctrie according to its linearizable history. We then retry the operation until the CAS succeeds. In this case, the linearization point is a successful CAS. The following figure shows why the presence of I-nodes ensures consistency.

ctrie insert correctness

In the above diagram, (k4,v4) is inserted into a Ctrie containing (k1,v1), (k2,v2), and (k3,v3). The new key-value pair is added to node C1 by creating a copy, C1, with the new entry. A CAS is then performed on the pointer at I1, indicated by the dotted line. Since C1 continues pointing to I2, any concurrent updates which occur below it will remain present in the trie. C1 is then garbage collected once no more threads are accessing it. Because of this, Ctries are much easier to implement in a garbage-collected language. It turns out that this deferred reclamation also solves the ABA problem described earlier by ensuring memory addresses are recycled only when it’s safe to do so.

The I-node invariant is enough to guarantee correctness for inserts and lookups, but removals require some additional invariants in order to avoid update loss. Insertions extend the Ctrie with additional levels, while removals eliminate the need for some of these levels. This is because we want to keep the Ctrie as compact as possible while still remaining correct. For example, a remove operation could result in a C-node with a single S-node below it. This state is valid, but the Ctrie could be made more compact and lookups on the lone S-node more efficient if it were moved up into the C-node above. This would allow the I-node and C-node to be removed.

The problem with this approach is it will cause insertions to be lost. If we move the S-node up and replace the dangling I-node reference with it, another thread could perform a concurrent insert on that I-node just before the compression occurs. The insert would be lost because the pointer to the I-node would be removed.

This issue is solved by introducing a new type of node called the tomb node (T-node) and an associated invariant. The T-node is used to ensure proper ordering during removals. The invariant is as follows: if an I-node points to a T-node at some time t0, then for all times greater than t0, the I-node points to the same T-node. More concisely, a T-node is the last value assigned to an I-node. This ensures that no insertions occur at an I-node if it is being compressed. We call such an I-node a tombed I-node.

If a removal results in a non-root-level C-node with a single S-node below it, the C-node is replaced with a T-node wrapping the S-node. This guarantees that every I-node except the root points to a C-node with at least one branch. This diagram depicts the result of removing (k2,v2) from a Ctrie:

ctrie removal

Removing (k2,v2) results in a C-node with a single branch, so it’s subsequently replaced with a T-node. The T-node provides a sequencing mechanism by effectively acting as a marker. While it solves the problem of lost updates, it doesn’t give us a compacted trie. If two keys have long matching hash code prefixes, removing one of the keys would result in a long chain of C-nodes followed by a single T-node at the end.

An invariant was introduced which says once an I-node points to a T-node, it will always point to that T-node. This means we can’t change a tombed I-node’s pointer, so instead we replace the I-node with its resurrection. The resurrection of a tombed I-node is the S-node wrapped in its T-node. When a T-node is produced during a removal, we ensure that it’s still reachable, and if it is, resurrect its tombed I-node in the C-node above. If it’s not reachable, another thread has already performed the compression. To ensure lock-freedom, all operations which read a T-node must help compress it instead of waiting for the removing thread to complete. Compression on the Ctrie from the previous diagram is illustrated below.

ctrie compression

The resurrection of the tombed I-node ensures the Ctrie is optimally compressed for arbitrarily long chains while maintaining integrity.

With a 32-bit hash code space, collisions are rare but still nontrivial. To deal with this, we introduce one final node, the list node (L-node). An L-node is essentially a persistent linked list. If there is a collision between the hash codes of two different keys, they are placed in an L-node. This is analogous to a hash table using separate chaining to resolve collisions.

One interesting property of the Ctrie is support for lock-free, linearizable, constant-time snapshots. Most concurrent data structures do not support snapshots, instead opting for locks or requiring a quiescent state. This allows Ctries to have O(1) iterator creation, clear, and size retrieval (amortized).

Constant-time snapshots are implemented by writing the Ctrie as a persistent data structure and assigning a generation count to each I-node. A persistent hash trie is updated by rewriting the path from the root of the trie down to the leaf the key belongs to while leaving the rest of the trie intact. The generation demarcates Ctrie snapshots. To create a new snapshot, we copy the root I-node and assign it a new generation. When an operation detects that an I-node’s generation is older than the root’s generation, it copies the I-node to the new generation and updates the parent. The path from the root to some node is only updated the first time it’s accessed, making the snapshot a O(1) operation.

The final piece needed for snapshots is a special type of CAS operation. There is a race condition between the thread creating a snapshot and the threads which have already read the root I-node with the previous generation. The linearization point for an insert is a successful CAS on an I-node, but we need to ensure that both the I-node has not been modified and its generation matches that of the root. This could be accomplished with a double compare-and-swap, but most architectures do not support such an operation.

The alternative is to use a RDCSS double-compare-single-swap originally described by Harris et al. We implement an operation with similar semantics to RDCSS called GCAS, or generation compare-and-swap. The GCAS allows us to atomically compare both the I-node pointer and its generation to the expected values before committing an update.

After researching the Ctrie, we wrote a Go implementation in order to gain a deeper understanding of the applied techniques. These same ideas would hopefully be adaptable to our problem domain.

Generalizing the Ctrie

The subscription trie shares some similarities to the hash array mapped trie but there are some key differences. First, values are not strictly stored at the leaves but can be on internal nodes as well. Second, the decomposed topic is used to determine how the trie is descended rather than a hash code. Wildcards complicate lookups further by requiring backtracking. Lastly, the number of branches on a node is not a fixed size. Applying the Ctrie techniques to the subscription trie, we end up with something like this:

matchbox

Much of the same logic applies. The main distinctions are the branch traversal based on topic words and rules around wildcards. Each branch is associated with a word and set of subscribers and may or may not point to an I-node. The I-nodes still ensure correctness on inserts. The behavior of T-nodes changes slightly. With the Ctrie, a T-node is created from a C-node with a single branch and then compressed. With the subscription trie, we don’t introduce a T-node until all branches are removed. A branch is pruned if it has no subscribers and points to nowhere or it has no subscribers and points to a tombed I-node. The GCAS and snapshotting remain unchanged.

We implemented this Ctrie derivative in order to build our concurrent pattern-matching engine, matchbox. This library provides an exceptionally simple API which allows a client to subscribe to a topic, unsubscribe from a topic, and lookup a topic’s subscribers. Snapshotting is also leveraged to retrieve the global subscription tree and the topics to which clients are currently subscribed. These are useful to see who currently has subscriptions and for what.

In Practice

Matchbox has been pretty extensively benchmarked, but to see how it behaves, it’s critical to observe its performance under contention. Many messaging systems opt for a mutex which tends to result in a lot of lock contention. It’s important to know what the access patterns look like in practice, but for our purposes, it’s heavily parallel. We don’t want to waste CPU cycles if we can help it.

To see how matchbox compares to lock-based subscription structures, I benchmarked it against gnatsd, a popular high-performance messaging system also written in Go. Gnatsd uses a tree-like structure protected by a mutex to manage subscriptions and offers similar wildcard semantics.

The benchmarks consist of one or more insertion goroutines and one or more lookup goroutines. Each insertion goroutine inserts 1000 subscriptions, and each lookup goroutine looks up 1000 subscriptions. We scale these goroutines up to see how the systems behave under contention.

The first benchmark is a 1:1 concurrent insert-to-lookup workload. A lookup corresponds to a message being published and matched to interested subscribers, while an insert occurs when a client subscribes to a topic. In practice, lookups are much more frequent than inserts, so the second benchmark is a 1:3 concurrent insert-to-lookup workload to help simulate this. The timings correspond to the complete insert and lookup workload. GOMAXPROCS was set to 8, which controls the number of operating system threads that can execute simultaneously. The benchmarks were run on a machine with a 2.6 GHz Intel Core i7 processor.

matchbox_bench_1_1

matchbox_bench_1_3

It’s quite clear that the lock-free approach scales a lot better under contention. This follows our intuition because lock-freedom allows system-wide progress even when a thread is blocked. If one goroutine is blocked on an insert or lookup operation, other operations may proceed. With a mutex, this isn’t possible.

Matchbox performs well, particularly in multithreaded environments, but there are still more optimizations to be made. This includes improvements both in memory consumption and runtime performance. Applying the Ctrie techniques to this type of trie results in a fairly non-compact structure. There may be ways to roll up branches—either eagerly or after removals—and expand them lazily as necessary. Other optimizations might include placing a cache or Bloom filter in front of the trie to avoid descending it. The main difficulty with these will be managing support for wildcards.

Conclusion

To summarize, we’ve seen why subscription matching is often a major concern for message-oriented middleware and why it’s frequently a bottleneck. Concurrency is crucial for high-performance systems, and we’ve looked at how we can achieve concurrency without relying on locks while framing it within the context of linearizability. Compare-and-swap is a fundamental tool used to implement lock-free data structures, but it’s important to be conscious of the pitfalls. We introduce invariants to protect data consistency. The Ctrie is a great example of how to do this and was foundational in our lock-free subscription-matching implementation. Finally, we validated our work by showing that lock-free data structures scale dramatically better with multithreaded workloads under contention.

My thanks to Steven Osborne and Dustin Hiatt for reviewing this article.

What You Want Is What You Don’t: Understanding Trade-Offs in Distributed Messaging

If there’s one unifying theme of this blog, it’s that distributed systems are riddled with trade-offs. Specifically, with distributed messaging, you cannot have exactly-once delivery. However, messaging trade-offs don’t stop at delivery semantics. I want to talk about what I mean by this and explain why many developers often have the wrong mindset when it comes to building distributed applications.

The natural tendency is to build distributed systems as if they aren’t distributed at all—assuming data consistency, reliable messaging, and predictability. It’s much easier to reason about, but it’s also blatantly misleading.

The only thing guaranteed in messaging—and distributed systems in general—is that sooner or later, your guarantees are going to break down. If you assume these guarantees as axiomatic, everything built on them becomes unsound. Depending on the situation, this can range from mildly annoying to utterly catastrophic.

I recently ran across a comment from Apcera CEO Derek Collison on this topic which resonated with me:

On systems that do claim some form of guarantee, it’s best to look at what level that guarantee really runs out. Especially around persistence, exactly once delivery semantics, etc. I spent much of my career designing and building messaging systems that have those guarantees, and in turn developed many systems utilizing some of those features. For me, I found that depending on these guarantees was a bad pattern in distributed system design…

You should know how your system behaves when you reach the breaking point, but what’s less obvious is that providing these types of strong guarantees is usually very expensive. What price are we willing to pay, what level do our guarantees hold to, and what happens when they give out? In this sense, a “guarantee” is really no different from a SLA, yet stronger guarantees allow for stronger assumptions.

This all sounds quite vague, so let’s look at a specific example. With messaging, we’re often concerned with delivery reliability. In a perfect world, message delivery would be guaranteed and exactly once. Of course, I’ve talked at length why this is impossible, so let’s anchor ourselves in reality. We can look to TCP/IP for how this works.

IP is an unreliable delivery system which runs on unreliable network infrastructure. Packets can be delivered in order, out of order, or not at all. There are no acknowledgements, so the sender has no way of knowing if what they sent was received. TCP builds on IP by effectively making the transmission stateful and adding a layer of control. Through added complexity and performance costs, we achieve reliable delivery over an unreliable stack.

The key takeaway here is that we start with something primitive, like moving bits from point A to point B, and layer on abstractions to build stronger guarantees.  These abstractions almost always come at a price, tangible or not, which is why it’s important to push the costs up into the layers above. If not every use case demands reliable delivery, why force the cost onto everyone?

Exactly-once delivery is the Holy Grail of distributed messaging, and guaranteed delivery is the unicorn. The irony is that even if they were attainable, you likely wouldn’t want them. These types of strong guarantees demand expensive infrastructure which perform expensive coordination which require expensive administration. But what does all this expensive stuff really buy you at the end of the day?

A key problem is that there is a huge difference between message delivery and message processing. Sure, TCP can more or less ensure that your packet was either delivered or not, but what good is that actually in practice? How does the sender know that its message was successfully processed or that the receiver did what it needed to do? The only way to truly know is for the receiver to send a business-level acknowledgement. The low-level transport protocol doesn’t know about the application semantics, so the only way to go, really, is up. And if we assume that any guarantees will eventually give out, we have to account for that at the business level. To quote from a related article, “if reliability is important on the business level, do it on the business level.” It’s important not to conflate the transport protocol with the business-transaction protocol.

This is why systems like Akka don’t provide a notion of guaranteed delivery—because what does “guaranteed delivery” actually mean? Does it mean the message was handed to the transport layer? Does it mean the remote machine received the message? Does it mean the message was enqueued in the recipient’s mailbox?  Does it mean the recipient has started processing it? Does it mean the recipient has finished processing it? Each of these things has a very different set of requirements, constraints, and costs. Also, what does it even mean for a message to be “processed”? It depends on the business context. As such, it usually doesn’t make sense for the underlying infrastructure to make these decisions because the decisions usually impact the layers above significantly.

By providing only basic guarantees those use cases which do not need stricter guarantees do not pay the cost of their implementation; it is always possible to add stricter guarantees on top of basic ones, but it is not possible to retro-actively remove guarantees in order to gain more performance.

Distributed computation is inherently asynchronous and the network is inherently unreliable, so it’s better to embrace this asynchrony than to build on leaky abstractions. Rather than hide these inconveniences, make them explicit and force users to design around them. What you end up with is a more robust, more reliable, and often more performant system. This trade-off is highlighted in the paper “Exactly-once semantics in a replicated messaging system” by Huang et al. while studying the problem of exactly-once delivery:

Thus, server-centric algorithms cannot achieve exactly-once semantics. Instead, we will strive to achieve a weaker notion of correctness.

By relaxing our requirements, we end up with a solution that has less performance overhead and less complexity. Why bother pursuing the impossible? You’re paying a huge premium for something which is probably less reliable than you think while performing poorly. In many cases, it’s better to let the pendulum swing the other direction.

The network is not reliable, which means message delivery is never truly guaranteed—it can only be best-effort. The Two Generals’ Problem shows that it’s provenly impossible for two remote processes to safely agree on a decision. Similarly, the FLP impossibility result shows that, in an asynchronous environment, reliable failure detection is impossible. That is, there’s no way to tell if a process has crashed or is simply taking a long time to respond. Therefore, if it’s possible for a process to crash, it’s impossible for a set of processes to come to an agreement.

If message delivery is not guaranteed and consensus is impossible, is message ordering really that important? Some use cases might actually demand it, but I suspect, more often than not, it’s an artificial constraint. The fact that the network is unreliable, processes are faulty, and distributed communication is asynchronous makes reliable, in-order delivery surprisingly expensive. But doesn’t TCP solve this problem? At the transport level, yes, but that only gets you so far as I’ve been trying to demonstrate.

So you use TCP and process messages with a single thread. Most of the time, it just works. But what happens under heavy load? What happens when message delivery fails? What happens when you need to scale? If you are queuing messages or you have a dead-letter queue or you have network partitions or a crash-recovery model, you’re probably going to encounter duplicate, dropped, or out-of-order messages. Even if the infrastructure provides ordered delivery, these problems will likely manifest themselves at the application level.

If you’re distributed, forget about ordering and start thinking about commutativity. Forget about guaranteed delivery and start thinking about idempotence. Stop thinking about the messaging platform and start thinking about the messaging patterns and business semantics. A pattern which is commutative and idempotent will be far less brittle and more efficient than a system which is totally ordered and “guaranteed.” This is why CRDTs are becoming increasingly popular in the distributed space. Never write code which assumes messages will arrive in order when you can’t write code that will assume they arrive at all.

In the end, think carefully about the business case and what your requirements really are. Can you satisfy them without relying on costly and leaky abstractions or deceptive guarantees? If you can’t, what happens when those guarantees go out the window? This is very similar to understanding what happens when a SLA is not met. Are the performance and complexity trade-offs worth it? What about the operations and business overheads? In my experience, it’s better to confront the intricacies of distributed systems head-on than to sweep them under the rug. Sooner or later, they will rear their ugly heads.

Designed to Fail

When it comes to reliability engineering, people often talk about things like fault injection, monitoring, and operations runbooks. These are all critical pieces for building systems which can withstand failure, but what’s less talked about is the need to design systems which deliberately fail.

Reliability design has a natural progression which closely follows that of architectural design. With monolithic systems, we care more about preventing failure from occurring. With service-oriented architectures, controlling failure becomes less manageable, so instead we learn to anticipate it. With highly distributed microservice architectures where failure is all but guaranteed, we embrace it.

What does it mean to embrace failure? Anticipating failure is understanding the behavior when things go wrong, building systems to be resilient to it, and having a game plan for when it happens, either manual or automated. Embracing failure means making a conscious decision to purposely fail, and it’s essential for building highly available large-scale systems.

A microservice architecture typically means a complex web of service dependencies. One of SOA’s goals is to isolate failure and allow for graceful degradation. The key to being highly available is learning to be partially available. Frequently, one of the requirements for partial availability is telling the client “no.” Outright rejecting service requests is often better than allowing them to back up because, when dealing with distributed services, the latter usually results in cascading failure across dependent systems.

While designing our distributed messaging service at Workiva, we made explicit decisions to drop messages on the floor if we detect the system is becoming overloaded. As queues become backed up, incoming messages are discarded, a statsd counter is incremented, and a backpressure notification is sent to the client. Upon receiving this notification, the client can respond accordingly by failing fast, exponentially backing off, or using some other flow-control strategy. By bounding resource utilization, we maintain predictable performance, predictable (and measurable) lossiness, and impede cascading failure.

Other techniques include building kill switches into service calls and routers. If an overloaded service is not essential to core business, we fail fast on calls to it to prevent availability or latency problems upstream. For example, a spam-detection service is not essential to an email system, so if it’s unavailable or overwhelmed, we can simply bypass it. Netflix’s Hystrix has a set of really nice patterns for handling this.

If we’re not careful, we can often be our own worst enemy. Many times, it’s our own internal services which cause the biggest DoS attacks on ourselves. By isolating and controlling it, we can prevent failure from becoming widespread and unpredictable. By building in backpressure mechanisms and other types of intentional “failure” modes, we can ensure better availability and reliability for our systems through graceful degradation. Sometimes it’s better to fight fire with fire and failure with failure.

You Cannot Have Exactly-Once Delivery

I’m often surprised that people continually have fundamental misconceptions about how distributed systems behave. I myself shared many of these misconceptions, so I try not to demean or dismiss but rather educate and enlighten, hopefully while sounding less preachy than that just did. I continue to learn only by following in the footsteps of others. In retrospect, it shouldn’t be surprising that folks buy into these fallacies as I once did, but it can be frustrating when trying to communicate certain design decisions and constraints.

Within the context of a distributed system, you cannot have exactly-once message delivery. Web browser and server? Distributed. Server and database? Distributed. Server and message queue? Distributed. You cannot have exactly-once delivery semantics in any of these situations.

As I’ve described in the past, distributed systems are all about trade-offs. This is one of them. There are essentially three types of delivery semantics: at-most-once, at-least-once, and exactly-once. Of the three, the first two are feasible and widely used. If you want to be super anal, you might say at-least-once delivery is also impossible because, technically speaking, network partitions are not strictly time-bound. If the connection from you to the server is interrupted indefinitely, you can’t deliver anything. Practically speaking, you have bigger fish to fry at that point—like calling your ISP—so we consider at-least-once delivery, for all intents and purposes, possible. With this model of thinking, network partitions are finitely bounded in time, however arbitrary this may be.

So where does the trade-off come into play, and why is exactly-once delivery impossible? The answer lies in the Two Generals thought experiment or the more generalized Byzantine Generals Problem, which I’ve looked at extensively. We must also consider the FLP result, which basically says, given the possibility of a faulty process, it’s impossible for a system of processes to agree on a decision.

In the letter I mail you, I ask you to call me once you receive it. You never do. Either you really didn’t care for my letter or it got lost in the mail. That’s the cost of doing business. I can send the one letter and hope you get it, or I can send 10 letters and assume you’ll get at least one of them. The trade-off here is quite clear (postage is expensive!), but sending 10 letters doesn’t really provide any additional guarantees. In a distributed system, we try to guarantee the delivery of a message by waiting for an acknowledgement that it was received, but all sorts of things can go wrong. Did the message get dropped? Did the ack get dropped? Did the receiver crash? Are they just slow? Is the network slow? Am slow? FLP and the Two Generals Problem are not design complexities, they are impossibility results.

People often bend the meaning of “delivery” in order to make their system fit the semantics of exactly-once, or in other cases, the term is overloaded to mean something entirely different. State-machine replication is a good example of this. Atomic broadcast protocols ensure messages are delivered reliably and in order. The truth is, we can’t deliver messages reliably and in order in the face of network partitions and crashes without a high degree of coordination. This coordination, of course, comes at a cost (latency and availability), while still relying on at-least-once semantics. Zab, the atomic broadcast protocol which lays the foundation for ZooKeeper, enforces idempotent operations.

State changes are idempotent and applying the same state change multiple times does not lead to inconsistencies as long as the application order is consistent with the delivery order. Consequently, guaranteeing at-least once semantics is sufficient and simplifies the implementation.

“Simplifies the implementation” is the authors’ attempt at subtlety. State-machine replication is just that, replicating state. If our messages have side effects, all of this goes out the window.

We’re left with a few options, all equally tenuous. When a message is delivered, it’s acknowledged immediately before processing. The sender receives the ack and calls it a day. However, if the receiver crashes before or during its processing, that data is lost forever. Customer transaction? Sorry, looks like you’re not getting your order. This is the worldview of at-most-once delivery. To be honest, implementing at-most-once semantics is more complicated than this depending on the situation. If there are multiple workers processing tasks or the work queues are replicated, the broker must be strongly consistent (or CP in CAP theorem parlance) so as to ensure a task is not delivered to any other workers once it’s been acked. Apache Kafka uses ZooKeeper to handle this coordination.

On the other hand, we can acknowledge messages after they are processed. If the process crashes after handling a message but before acking (or the ack isn’t delivered), the sender will redeliver. Hello, at-least-once delivery. Furthermore, if you want to deliver messages in order to more than one site, you need an atomic broadcast which is a huge burden on throughput. Fast or consistent. Welcome to the world of distributed systems.

Every major message queue in existence which provides any guarantees will market itself as at-least-once delivery. If it claims exactly-once, it’s because they are lying to your face in hopes that you will buy it or they themselves do not understand distributed systems. Either way, it’s not a good indicator.

RabbitMQ attempts to provide guarantees along these lines:

When using confirms, producers recovering from a channel or connection failure should retransmit any messages for which an acknowledgement has not been received from the broker. There is a possibility of message duplication here, because the broker might have sent a confirmation that never reached the producer (due to network failures, etc). Therefore consumer applications will need to perform deduplication or handle incoming messages in an idempotent manner.

The way we achieve exactly-once delivery in practice is by faking it. Either the messages themselves should be idempotent, meaning they can be applied more than once without adverse effects, or we remove the need for idempotency through deduplication. Ideally, our messages don’t require strict ordering and are commutative instead. There are design implications and trade-offs involved with whichever route you take, but this is the reality in which we must live.

Rethinking operations as idempotent actions might be easier said than done, but it mostly requires a change in the way we think about state. This is best described by revisiting the replicated state machine. Rather than distributing operations to apply at various nodes, what if we just distribute the state changes themselves? Rather than mutating state, let’s just report facts at various points in time. This is effectively how Zab works.

Imagine we want to tell a friend to come pick us up. We send him a series of text messages with turn-by-turn directions, but one of the messages is delivered twice! Our friend isn’t too happy when he finds himself in the bad part of town. Instead, let’s just tell him where we are and let him figure it out. If the message gets delivered more than once, it won’t matter. The implications are wider reaching than this, since we’re still concerned with the ordering of messages, which is why solutions like commutative and convergent replicated data types are becoming more popular. That said, we can typically solve this problem through extrinsic means like sequencing, vector clocks, or other partial-ordering mechanisms. It’s usually causal ordering that we’re after anyway. People who say otherwise don’t quite realize that there is no now in a distributed system.

To reiterate, there is no such thing as exactly-once delivery. We must choose between the lesser of two evils, which is at-least-once delivery in most cases. This can be used to simulate exactly-once semantics by ensuring idempotency or otherwise eliminating side effects from operations. Once again, it’s important to understand the trade-offs involved when designing distributed systems. There is asynchrony abound, which means you cannot expect synchronous, guaranteed behavior. Design for failure and resiliency against this asynchronous nature.

Fast, Scalable Networking in Go with Mangos

In the past, I’ve looked at nanomsg and why it’s a formidable alternative to the well-regarded ZeroMQ. Like ZeroMQ, nanomsg is a native library which markets itself as a way to build fast and scalable networking layers. I won’t go into detail on how nanomsg accomplishes this since my analysis of it already covers that fairly extensively, but instead I want to talk about a Go implementation of the protocol called Mangos. ((Full disclosure: I am a contributor on the Mangos project, but only because I was a user first!)) If you’re not familiar with nanomsg or Scalability Protocols, I recommend reading my overview of those first.

nanomsg is a shared library written in C. This, combined with its zero-copy API, makes it an extremely low-latency transport layer. While there are a lot of client bindings which allow you to use nanomsg from other languages, dealing with shared libraries can often be a pain—not to mention it complicates deployment.

More and more companies are starting to use Go for backend development because of its speed and concurrency primitives. It’s really good at building server components that scale. Go obviously provides the APIs needed for socket networking, but building a scalable distributed system that’s reliable using these primitives can be somewhat onerous. Solutions like nanomsg’s Scalability Protocols and ZeroMQ attempt to make this much easier by providing useful communication patterns and by taking care of other messaging concerns like queueing.

Naturally, there are Go bindings for nanomsg and ZeroMQ, but like I said, dealing with shared libraries can be fraught with peril. In Go (and often other languages), we tend to avoid loading native libraries if we can. It’s much easier to reason about, debug, and deploy a single binary than multiple. Fortunately, there’s a really nice implementation of nanomsg’s Scalability Protocols in pure Go called Mangos by Garrett D’Amore of illumos fame.

Mangos offers an idiomatic Go implementation and interface which affords us the same messaging patterns that nanomsg provides while maintaining compatibility. Pub/Sub, Pair, Req/Rep, Pipeline, Bus, and Survey are all there. It also supports the same pluggable transport model, allowing additional transports to be added (and extended ((Mangos supports TLS with the TCP transport as an experimental extension.))) on top of the base TCP, IPC, and inproc ones. ((A nanomsg WebSocket transport is currently in the works.)) Mangos has been tested for interoperability with nanomsg using the nanocat command-line interface.

One of the advantages of using a language like C is that it’s not garbage collected. However, if you’re using Go with nanomsg, you’re already paying the cost of GC. Mangos makes use of object pools in order to reduce pressure on the garbage collector. We can’t turn Go’s GC off, but we can make an effort to minimize pauses. This is critical for high-throughput systems, and Mangos tends to perform quite comparably to nanomsg.

Mangos (and nanomsg) has a very familiar, socket-like API. To show what this looks like, the code below illustrates a simple example of how the Pub/Sub protocol is used to build a fan-out messaging system.

My message queue test framework, Flotilla, uses the Req/Rep protocol to allow clients to send requests to distributed daemon processes, which handle them and respond. While this is a very simple use case where you could just as easily get away with raw TCP sockets, there are more advanced cases where Scalability Protocols make sense. We also get the added advantage of transport abstraction, so we’re not strictly tied to TCP sockets.

I’ve been building a distributed messaging system using Mangos as a means of federated communication. Pub/Sub enables a fan-out, interest-based broadcast and Bus facilitates many-to-many messaging. Both of these are exceptionally useful for connecting disparate systems. Mangos also supports an experimental new protocol called Star. This pattern is like Bus, but when a message is received by an immediate peer, it’s propagated to all other members of the topology.

My favorite Scalability Protocol is Survey. As I discussed in my nanomsg overview, there are a lot of really interesting applications of this. Survey allows a process to query the state of multiple peers in one shot. It’s similar to Pub/Sub in that the surveyor publishes a single message which is received by all the respondents (although there’s no topic subscriptions). The respondents then send a message back, and the surveyor collects these responses. We can also enforce a deadline on the respondent replies, which makes Survey particularly useful for service discovery.

With my messaging system, I’ve used Survey to implement a heartbeat protocol. When a broker spins up, it begins broadcasting a heartbeat using a Survey socket. New brokers can connect to existing ones, and they reply to the heartbeat which allows brokers to “discover” each other. If a heartbeat isn’t received before the deadline, the peer is removed. Mangos also handles reconnects, so if a broker goes offline and comes back up, peers will automatically reconnect.

To summarize, if you’re building distributed systems in Go, consider taking a look at Mangos. You can certainly roll your own messaging layer with raw sockets, but you’re going to end up writing a lot of logic for a robust system. Mangos, and nanomsg in general, gives you the right abstraction to quickly build systems that scale and are fast.