Sometimes Kill -9 Isn’t Enough

If there’s one thing to know about distributed systems, it’s that they have to be designed with the expectation of failure. It’s also safe to say that most software these days is, in some form, distributed—whether it’s a database, mobile app, or enterprise SaaS. If you have two different processes talking to each other, you have a distributed system, and it doesn’t matter if those processes are local or intergalactically displaced.

Marc Hedlund recently had a great post on Stripe’s game-day exercises where they block off an afternoon, take a blunt instrument to their servers, and see what happens. We’re talking like abruptly killing instances here—kill -9, ec2-terminate-instances, yanking on the damn power cord—that sort of thing. Everyone should be doing this type of stuff. You really don’t know how your system behaves until you see it under failure conditions.

Netflix uses Chaos Monkey to randomly terminate instances, and they do it in production. That takes some balls, but you know you have a pretty solid system when you’re comfortable killing live production servers. At Workiva, we have a middleware we use to inject datastore and other RPC errors into Google App Engine. Building resilient systems is an objective concern, but we still have a ways to go.

We need to be pessimists and design for failure, but injecting failure isn’t enough. Sure, every so often shit hits the proverbial fan, and we need to be tolerant of that. But more often than not, that fan is just a strong headwind.

Simulating failure is a necessary element for building reliable distributed systems, but system behavior isn’t black and white, it’s a continuum. We build our system in a vacuum and (hopefully) test it under failure, but we should also be observing it in this gray area. How does it perform with unreliable network connections? Low bandwidth? High latency? Dropped packets? Out-of-order packets? Duplicate packets? Not only do our systems need to be fault-tolerant, they need to be pressure-tolerant.

Simulating Pressure

There are a lot of options to do these types of “pressure” simulations. On Linux, we can use iptables to accomplish this.

This will drop incoming and outgoing packets with a 10% probability. Alternatively, we can use tc to simulate network latency, limited bandwidth, and packet loss.

The above adds an additional 250ms of latency with 10% packet loss and a bandwidth limit of 1Mbps. Likewise, on OSX and BSD we can use ipfw or pfctl.

Here we inject 500ms of latency while limiting bandwidth to 1Mbps and dropping 10% of packets.

These are just some very simple traffic-shaping examples. Several of these tools allow you to perform even more advanced testing, like adding variation and correlation values. This would allow you to emulate burst packet loss and other situations we often encounter. For instance, with tc, we can add jitter to the network latency.

This adds 50±20ms of latency. Since network latency typically isn’t uniform, we can apply a normal distribution to achieve a more realistic simulation.

Now we get a nice bell curve which is probably more representative of what we see in practice. We can also use tc to re-order, duplicate, and corrupt packets.

I’ve been working on an open-source tool which attempts to wrap these controls up so you don’t have to memorize the options or worry about portability. It’s pretty primitive and doesn’t support much yet, but it provides a thin layer of abstraction.

Conclusion

Injecting failure is crucial to understanding systems and building confidence, but like good test coverage, it’s important to examine suboptimal-but-operating scenarios. This isn’t even 99th-percentile stuff—this is the type of shit your users deal with every single day. If you can’t handle sustained latency and sporadic network partitions, who cares if you tolerate instance failure? The tools are at our disposal, they just need to be leveraged.

From Mainframe to Microservice: An Introduction to Distributed Systems

I gave a talk at Iowa Code Camp this weekend on distributed systems. It was primarily an introduction to them, so it explored some core concepts at a high level.  We looked at why distributed systems are difficult to build (right), the CAP theorem, consensus, scaling shared data and CRDTs.

There was some interest in making the slides available online. I’m not sure how useful they are without narration, but here they are anyway for posterity.

Scaling Shared Data in Distributed Systems

Sharing mutable data at large scale is an exceedingly difficult problem. In their seminal paper CRDTs: Consistency without concurrency control, Shapiro et al. describe why the CAP theorem demands a give and take between scalability and consistency. In general, CAP requires us to choose between CP and AP. The former requires serializing every write, which doesn’t scale beyond a small cluster. The latter ensures scalability by giving up consistency.

Sharing Data in Centralized Systems

We tend to prefer weaker consistency models because they mean lower latency and higher availability. To highlight this point, consider the fact that the memory models for most programming languages are not serializable by default. More concisely, programs with shared memory are not inherently thread-safe. This is a conscious design decision because enforcing memory serializability incurs a significant latency penalty. Instead, programming languages require explicit memory barriers which can be used around the critical sections which need this property.

For example, the Java memory model uses within-thread as-if-serial semantics. This means the execution of a thread in isolation, regardless of runtime optimizations, is guaranteed to be the same as it would have been had all statements been run in program order. The implication of as-if-serial semantics is that it gives up consistency—different threads can and will have different views of the data. Java requires the use of memory barriers, either through explicit locking or the volatile keyword, in order to establish a happens-before relationship between statements in different threads.

This can be thought of as scaling shared data! We have multiple threads (systems) accessing the same data. While not distributed, many of the same ideas apply. Consistency, by definition, requires linearizability. In multi-threaded programs, we achieve this with mutexes. In distributed systems, we use transactions and distributed locking. Intuitively, both involve performance trade-offs.

Sharing Data in Distributed Systems

Consider a shared, global counter used to measure ad impressions on a website accessed by millions of users around the world.

shared_data

Traditionally, we might implement this using transactions—get the value, increment it by one, then save it back atomically. The problem is transactions will not scale. In order to guarantee consistency, they must be serialized. This results in high latency and low availability if a lot of writes are occurring. We lose some of the key advantages of distributed systems: parallel computation and availability.

So CAP makes it difficult to scale mutable, shared data. How do we do it then? There are several different strategies, each with their own pros and cons.

Immutable Data

Scaling shared read-only data is easy using replication techniques. This means the simplest solution for sharing data in a distributed system is to use immutable data. If we don’t have to worry about writes, then scaling is trivial. Unfortunately, this isn’t always possible, but if your use case allows for it, it’s the best option.

Last-Write Wins

From a set of conflicting writes, LWW selects the one with the most recent timestamp. Clock drift happens, so LWW will inevitably lead to data loss with enough concurrent writes. It’s critical to accept this reality, but it’s often acceptable for some use cases. LWW trades consistency for availability.

Application-Level Conflict Resolution

Often times, the best way to ensure safety is by resolving write conflicts at the application level. When there are conflicting writes on a piece of data, applications can apply business rules to determine the canonical update. An example of this is Riak’s application-side conflict resolution strategy.

Causal Ordering

Rather than relying on LWW, which has a high probability of data loss, we can use the causal relationships between writes in order to determine which one to apply. Unlike timestamps, which attempt to provide a total order, causal ordering establishes a partial order. We can approximate a causal ordering by using techniques like Lamport timestamps or vector clocks. By storing a causal history with each write and reading that history before each write, we can make informed decisions on the correctness of updates. The trade-off here is the added overhead of storing this additional metadata and the extra round trip.

Distributed Data Types

CRDTs, or convergent/commutative replicated data types, are the new, up-and-coming solution for scaling shared data, but they aren’t at all new. In fact, the theory behind CRDTs has been in use for hundreds of years. CRDTs are grounded in mathematics. Operations or updates on a CRDT always converge. Because the operations must be commutative, associative, and idempotent, they can be applied in any order and the outcome will always be the same. This means we don’t care about causal ordering—it doesn’t matter.

CRDTs are generally modeled after common data structures like sets, maps, lists, and counters, just in a distributed sense. What they provide us are highly available, eventually consistent data structures in which we don’t have to worry about write coordination.

Aside from the operation requirements, the other drawback of CRDTs is that they require knowledge of all clients. Each client has a replica of the CRDT, so the global state is determined by merging them. And although CRDTs can be applied to a wide variety of use cases, they typically require some interpretation and specialization of common data structures. These interpretations tend to be more limited in capability.

In Summary

Scaling mutable data is hard. On the other hand, scaling immutable data is easy, so if you can get away with it, do it. There are a number of ways to approach the problem, but as with anything, it all comes down to your use case. The solutions are all about trade-offs—namely the trade-off between consistency and availability. Use weakly consistent models when you can because they afford you high availability and low latency, and rely on stronger models only when absolutely necessary. Do what makes sense for your system.

Understanding Consensus

A classical problem presented within the field of distributed systems is the Byzantine Generals Problem. In it, we observe two allied armies positioned on either side of a valley. Within the valley is a fortified city. Each army has a general with one acting as commander. Both armies must attack at the same time or face defeat by the city’s defenders. In order to come to an agreement on when to attack, messengers must be sent through the valley, risking capture by the city’s patrols. Consider the diagram below illustrating this problem.

byzantine_generals

In the above scenario, Army A has sent a messenger to Army B with a message saying “Attack at 0700.” Army B receives this message and dispatches a messenger carrying an acknowledgement of the attack plans; however, our ill-fated messenger has been intercepted by the city’s defenders.

How do our armies come to an agreement on when to attack? Perhaps Army A sends 100 messengers and attacks regardless. Unfortunately, if all of the messengers are captured, this would result in a swift defeat because A would attack without B. What if, instead, A sends 100 messengers, waits for acknowledgements of those messages, and only attacks if it reaches a certain level of confidence, say receiving 75 or more confirmations? Yet again, this could very well end in defeat, this time with B attacking without A.

We also need to bear in mind that sending messages has a certain amount of overhead. We can’t, in good conscience, send a million messengers to their potential demise. Or maybe we can, but it’s more than the number of soldiers in our army.

In fact, we can’t reliably make a decision. It’s provenly impossible. In the face of a Byzantine failure, it becomes even more complicated by the possibility of traitors or forged messages.

Now replace two generals with N generals. Coming to a perfectly reliable agreement between two generals was already impossible but becomes dramatically more complicated. It’s a problem more commonly referred to as distributed consensus, and it’s the focus of an army of researchers.

The problem of consensus is blissfully simple, but the solution is far from trivial. Consensus is the basis of distributed coordination services, locking protocols, and databases. A monolithic system (think a MySQL server) can enforce ACID constraints with consistent reads but exhibits generally poor availability and fault tolerance. The original Google App Engine datastore relied on a master/slave architecture where a single data center held the primary copy of data which was replicated to backup sites asynchronously. This offered applications strong consistency and low latency with the implied trade-off of availability. The health of an application was directly tied to the health of a data center. Beyond transient losses, it also meant periods of planned unavailability and read-only access while Google performed data center maintenance. App Engine has since transitioned to a high-replication datastore which relies on distributed consensus to replicate data across sites. This allows the datastore to continue operating in the presence of failures and at greater availability. In agreement with CAP, this naturally means higher latency on writes.

There are a number of solutions to distributed consensus, but most of them tend to be pretty characteristic of each other. We will look at some of these solutions, including multi-phase commit and state-replication approaches.

Two-Phase Commit

Two-phase commit (2PC) is the simplest multi-phase commit protocol. In two-phase commit, all transactions go through a coordinator who is responsible for ensuring a transaction occurs across one or more remote sites (cohorts).

2pc

When the coordinator receives a request, it asks each of its cohorts to vote yes or no. During this phase, each cohort performs the transaction up to the point of committing it. The coordinator then waits for all votes. If the vote is unanimously “yes,” it sends a message to its cohorts to commit the transaction. If one or more vote is “no,” a message is sent to rollback. The cohorts then acknowledge whether the transaction was committed or rolled back and the process is complete.

Two-phase commit is a blocking protocol. The coordinator blocks waiting for votes from its cohorts, and cohorts block waiting for a commit/rollback message from the coordinator. Unfortunately, this means 2PC can, in some circumstances, result in a deadlock, e.g. the coordinator dies while cohorts wait or a cohort dies while the coordinator waits. Another problematic scenario is when a coordinator and cohort simultaneously fail. Even if another coordinator takes its place, it won’t be able to determine whether to commit or rollback.

Three-Phase Commit

Three-phase commit (3PC) is designed to solve the problems identified in two-phase by implementing a non-blocking protocol with an added “prepare” phase. Like 2PC, it relies on a coordinator which relays messages to its cohorts.

3pc

Unlike 2PC, cohorts do not execute a transaction during the voting phase. Rather, they simply indicate if they are prepared to perform the transaction. If cohorts timeout during this phase or there is one or more “no” vote, the transaction is aborted. If the vote is unanimously “yes,” the coordinator moves on to the “prepare” phase, sending a message to its cohorts to acknowledge the transaction will be committed. Again, if an ack times out, the transaction is aborted. Once all cohorts have acknowledged the commit, we are guaranteed to be in a state where all cohorts have agreed to commit. At this point, if the commit message from the coordinator is not received in the third phase, the cohort will go ahead and commit anyway. This solves the deadlocking problems described earlier. However, 3PC is still susceptible to network partitions. If a partition occurs, the coordinator will timeout and progress will not be made.

State Replication

Protocols like Raft, Paxos, and Zab are popular and widely used solutions to the problem of distributed consensus. These implement state replication or primary-backup using leaders, quorums, and replicas of operation logs or incremental delta states.

consensus quorum

These protocols work by electing a leader (coordinator). Like multi-phase commit, all changes must go through that leader, who then broadcasts the changes to the group. Changes occur by appending a log entry, and each node has its own log replica. Where multi-phase commit falls down in the face of network partitions, these protocols are able to continue working by relying on a quorum (majority). The leader commits the change once the quorum has acknowledged it.

The use of quorums provide partition tolerance by fencing minority partitions while the majority continues to operate. This is the pessimistic approach to solving split-brain, so it comes with an inherent availability trade-off. This problem is mitigated by the fact that each node hosts a replicated state machine which can be rebuilt or reconciled once the partition is healed.

Google relies on Paxos for its high-replication datastore in App Engine as well as its Chubby lock service. The distributed key-value store etcd uses Raft to manage highly available replicated logs. Zab, which differentiates itself from the former by implementing a primary-backup protocol, was designed for the ZooKeeper coordination service. In general, there are several different implementations of these protocols, such as the Go implementation of Raft.

Distributed consensus is a difficult thing to get right, but it’s important to frame it within the context of CAP. We can ensure stronger consistency at the cost of higher latency and lower availability. On the other hand, we can achieve higher availability with decreased latency while giving up strong consistency. The trade-offs really depend on what your needs are.

Iris Decentralized Cloud Messaging

A couple weeks ago, I published a rather extensive analysis of numerous message queues, both brokered and brokerless. Brokerless messaging is really just another name for peer-to-peer communication. As we saw, the difference in message latency and throughput between peer-to-peer systems and brokered ones is several orders of magnitude. ZeroMQ and nanomsg are able to reliably transmit millions of messages per second at the expense of guaranteed delivery.

Peer-to-peer messaging is decentralized, scalable, and fast, but it brings with it an inherent complexity. There is a dichotomy between how brokerless messaging is conceptualized and how distributed systems are actually built. Distributed systems are composed of services like applications, databases, caches, etc. Services are composed of instances or nodes—individually addressable hosts, either physical or virtual. The key observation is that, conceptually, the unit of interaction lies at the service level, not the instance level. We don’t care about which database server we interact with, we just want to talk to database server (or perhaps multiple). We’re concerned with logical groups of nodes.

While traditional socket-queuing systems like ZeroMQ solve the problem of scaling, they bring about a certain coupling between components. System designers are forced to build applications which communicate with nodes, not services. We can introduce load balancers like HAProxy, but we’re still addressing specific locations while creating potential single points of failure. With lightweight VMs and the pervasiveness of elastic clouds, IP addresses are becoming less and less static—they come and go. The canonical way of dealing with this problem is to use distributed coordination and service discovery via ZooKeeper, et al., but this introduces more configuration, more moving parts, and more headaches.

The reality is that distributed systems are not built with the instance as the smallest unit of composition in mind, they’re built with services in mind. As discussed earlier, a service is simply a logical grouping of nodes. This abstraction is what we attempt to mimic with things like etcd, ZooKeeper and HAProxy. These assemblies are proven, but there are alternative solutions that offer zero configuration, minimal network management, and overall less complexity. One such solution that I want to explore is a distributed messaging framework called Iris.

Decentralized Messaging with Iris

Iris is posited as a decentralized approach to backend messaging middleware. It looks to address several of the fundamental issues with traditional brokerless systems, like tight coupling and security.

In order to avoid the problem of addressing instances, Iris considers clusters to be the smallest logical blocks of which systems are composed. A cluster is a collection of zero or more nodes which are responsible for a certain service sub-task. Clusters are then assembled into services such that they can communicate with each other without any regard as to which instance is servicing their requests or where it’s located. Lastly, services are composed into federations, which allow them to communicate across different clouds.

This form of composition allows Iris to use semantic or logical addressing instead of the standard physical addressing. Nodes specify the name of the cluster they wish to participate in, while Iris handles the intricacies of routing and balancing. For example, you might have three database servers which belong to a single cluster called “databases.” The cluster is reached by its name and requests are distributed across the three nodes. Iris also takes care of service discovery, detecting new clusters as they are created on the same cloud.

With libraries like ZeroMQ, security tends to be an afterthought. Iris has been built from the ground-up with security in mind, and it provides a security model that is simple and fast.

Iris uses a relaxed security model that provides perfect secrecy whilst at the same time requiring effectively zero configuration. This is achieved through the observation that if a node of a service is compromised, the whole system is considered undermined. Hence, the unit of security is a service – opposed to individual instances – where any successfully authenticated node is trusted by all. This enables full data protection whilst maintaining the loosely coupled nature of the system.

In practice, what this means is that each cluster uses a single private key. This encryption scheme not only makes deployment trivial, it minimizes the effect security has on speed.

Like ZeroMQ and nanomsg, Iris offers a few different messaging patterns. It provides the standard request-reply and publish-subscribe schemes, but it’s important to remember that the smallest addressable unit is the cluster, not the node. As such, requests are targeted at a cluster and subsequently relayed on to a member in a load-balanced fashion. Publish-subscribe, on the other hand, is not targeted at a single cluster. It allows members of any cluster to subscribe and publish to a topic.

Iris also implements two patterns called “broadcast” and “tunnel.” While request-reply forwards a message to one member of a cluster, broadcast forwards it to all members. The caveat is that there is no way to listen for responses to a broadcast.

Tunnel is designed to address the problem of stateful or streaming transactions where a communication between two endpoints may consist of multiple data exchanges which need to occur as an atomic operation. It provides the guarantee of in-order and throttled message delivery by establishing a channel between a client and a node.

Performance Characteristics

According to its author, Iris is still in a “feature phase” and hasn’t been optimized for speed. Since it’s written in Go, I’ve compared its pub/sub benchmark performance with other Go messaging libraries, NATS and NSQ. As before, these benchmarks shouldn’t be taken as gospel, the code is available here, and pull requests are welcome.

We can see that Iris is comparable to NSQ on the sending side and about 4x on the receiving side, at least out of the box.

Conclusion

Brokerless systems like ZeroMQ and nanomsg offer considerably higher throughput and less latency than classical message-oriented middleware but require greater orchestration of network topologies. They offer high scalability but can lead to tighter coupling between components. Traditional brokered message queues, like those of the AMQP variety, tend to be slower while providing more guarantees and reduced coupling. However, they are also more prone to scale problems like availability and partitioning.

In terms of its qualities, Iris appears to be a reasonable compromise between the decentralized nature of the brokerless systems and the minimal-configuration and management of the brokered ones. Its intrinsic value lies in its ability to hide the complexities of the underlying infrastructure behind distributed systems. Rather, Iris lends itself to building large-scale systems the way we conceptualize and reason about them—by using services as the building blocks, not instances.