Distributed Systems Are a UX Problem

Distributed systems are not strictly an engineering problem. It’s far too easy to assume a “backend” development concern, but the reality is there are implications at every point in the stack. Often the trade-offs we make lower in the stack in order to buy responsiveness bubble up to the top—so much, in fact, that it rarely doesn’t impact the application in some way. Distributed systems affect the user. We need to shift the focus from system properties and guarantees to business rules and application behavior. We need to understand the limitations and trade-offs at each level in the stack and why they exist. We need to assume failure and plan for recovery. We need to start thinking of distributed systems as a UX problem.

The Truth is Prohibitively Expensive

Stop relying on strong consistency. Coordination and distributed transactions are slow and inhibit availability. The cost of knowing the “truth” is prohibitively expensive for many applications. For that matter, what you think is the truth is likely just a partial or outdated version of it.

Instead, choose availability over consistency by making local decisions with the knowledge at hand and design the UX accordingly. By making this trade-off, we can dramatically improve the user’s experience—most of the time.

Failure Is an Option

There are a lot of problems with simultaneity in distributed computing. As Justin Sheehy describes it, there is no “now” when it comes to distributed systems—that article, by the way, is a must-read for every engineer, regardless of where they work in the stack.

While some things about computers are “virtual,” they still must operate in the physical world and cannot ignore the challenges of that world.

Even though computers operate in the real world, they are disconnected from it. Imagine an inventory system. It may place orders to its artificial heart’s desire, but if the warehouse burns down, there’s no fulfilling them. Even if the system is perfect, its state may be impossible. But the system is typically not perfect because the truth is prohibitively expensive. And not only do warehouses catch fire or forklifts break down, as rare as this may be, but computers fail and networks partition—and that’s far less rare.

The point is, stop trying to build perfect systems because one of two things will happen:

1. You have a false sense of security because you think the system is perfect, and it’s not.

or

2. You will never ship because perfection is out of reach or exorbitantly expensive.

Either case can be catastrophic, depending on the situation. With systems, failure is not only an option, it’s an inevitability, so let’s plan for it as such. We have a lot to gain by embracing failure. Eric Brewer articulated this idea in a recent interview:

So the general answer is you allow things to be inconsistent and then you find ways to compensate for mistakes, versus trying to prevent mistakes altogether. In fact, the financial system is actually not based on consistency, it’s based on auditing and compensation. They didn’t know anything about the CAP theorem, that was just the decision they made in figuring out what they wanted, and that’s actually, I think, the right decision.

We can look to ATMs, and banks in general, as the canonical example for how this works. When you withdraw money, the bank could choose to first coordinate your account, calculating your available balance at that moment in time, before issuing the withdrawal. But what happens when the ATM is temporarily disconnected from the bank? The bank loses out on revenue.

Instead, they make a calculated risk. They choose availability and compensate the risk of overdraft with interest and charges. Likewise, banks use double-entry bookkeeping to provide an audit trail. Every credit has a corresponding debit. Mistakes happen—accounts are debited twice, an account is credited without another being debited—the failure modes are virtually endless. But we audit and compensate, detect and recover. Banks are loosely coupled systems. Accountants don’t use erasers. Why should programmers?

When you find yourself saying “this is important data or people’s money, it has to be correct,” consider how the problem was solved before computers. Building on Quicksand by Dave Campbell and Pat Helland is a great read on this topic:

Whenever the authors struggle with explaining how to implement loosely-coupled solutions, we look to how things were done before computers. In almost every case, we can find inspiration in paper forms, pneumatic tubes, and forms filed in triplicate.

Consider the lost request and its idempotent execution. In the past, a form would have multiple carbon copies with a printed serial number on top of them. When a purchase-order request was submitted, a copy was kept in the file of the submitter and placed in a folder with the expected date of the response. If the form and its work were not completed by the expected date, the submitter would initiate an inquiry and ask to locate the purchase-order form in question. Even if the work was lost, the purchase-order would be resubmitted without modification to ensure a lack of confusion in the processing of the work. You wouldn’t change the number of items being ordered as that may cause confusion. The unique serial number on the top would act as a mechanism to ensure the work was not performed twice.

Computers allow us to greatly improve the user experience, but many of the same fail-safes still exist, just slightly rethought.

The idea of compensation is actually a common theme within distributed systems. The Saga pattern is a great example of this. Large-scale systems often have to coordinate resources across disparate services.  Traditionally, we might solve this problem using distributed transactions like two-phase commit. The problem with this approach is it doesn’t scale very well, it’s slow, and it’s not particularly fault tolerant. With 2PC, we have deadlock problems and even 3PC is still susceptible to network partitions.

Sagas split a long-lived transaction into individual, interleaved sub-transactions. Each sub-transaction in the sequence has a corresponding compensating transaction which reverses its effects. The compensating transactions must be idempotent so they can be safely retried. In the event of a partial execution, the compensating transactions are run and the Saga is effectively rolled back.

The commonly used example for Sagas is booking a trip. We need to ensure flight, car rental, and hotel are all booked or none are booked. If booking the flight fails, we cancel the hotel and car, etc. Sagas trade off atomicity for availability while still allowing us to manage failure, a common occurrence in distributed systems.

Compensation has a lot of applications as a UX principle because it’s really the only way to build loosely coupled, highly available services.

Calculated Recovery

Pat Helland describes computing as nothing more than “memories, guesses, and apologies.” Computers always have partial knowledge. Either there is a disconnect with the real world (warehouse is on fire) or there is a disconnect between systems (System A sold a Foo Widget but, unbeknownst to it, System B just sold the last one in inventory—oops!). Systems don’t make decisions, they make guesses. The guess might be good or it might be bad, but rarely is there certainty. We can wait to collect as much information as possible before making a guess, but it means progress can’t be made until the system is confident enough to do so.

Computers have memory. This means they remember facts they have learned and guesses they have made. Memories help systems make better guesses in the future, and they can share those memories with other systems to help in their guesses. We can store more memories at the cost of more money, and we can survey other systems’ memories at the cost of more latency.

It is a business decision how much money, latency, and energy should be spent on reducing forgetfulness. To make this decision, the costs of the increased probability of remembering should be weighed against the costs of occasionally forgetting stuff.

Generally speaking, the more forgetfulness we can tolerate, the more responsive our systems will be, provided we know how to handle the situations where something is forgotten.

Sooner or later, a system guesses wrong. It sucks. It might mean we lose out on revenue; the business isn’t happy. It might mean the user loses out on what they want; the customer isn’t happy. But we calculate the impact of these wrong guesses, we determine when the trade-offs do and don’t make sense, we compensate, and—when shit hits the fan—we apologize.

Business realities force apologies.  To cope with these difficult realities, we need code and, frequently, we need human beings to apologize. It is essential that businesses have both code and people to manage these apologies.

Distributed systems are as much about failure modes and recovery as they are about being operationally correct. It’s critical that we can recover gracefully when something goes wrong, and often that affects the UX.

We could choose to spend extraordinary amounts of money and countless man-hours laboring over a system which provides the reliability we want. We could construct a data center. We could deploy big, expensive machines. We could install redundant fiber and switches. We could drudge over infallible code. Or we could stop, think for a moment, and realize maybe “sorry” is a more effective alternative. Knowing when to make that distinction can be the difference between a successful business and a failed one. The implications of distributed systems may be wider reaching than you thought.

Product Development is a Trust Fall

A couple weeks ago, Marty Cagan gave an outstanding talk at CraftConf on why products fail despite having great engineering teams. In it, he calls out many of the common mistakes made by teams, and I think there is an underlying theme: trust.

Product development is a trust fall. In order to be successful, a chain of trust must be established from the business all the way down to the engineers. If any point in that chain is compromised, the integrity of the product—and specifically its success—is put in jeopardy.

Engineers will innovate. Trust them. Engineers will discover requirements. Trust them. Engineers will identify risks. Trust them.

This trust must be symmetrical. The business must trust its product managers, who must trust the business. Product managers must trust the engineers, who must trust the product managers. Each level in this hierarchy must act as its own trust anchor. Trust is assumed, not derived. To say the opposite would imply your system of hiring and firing is fundamentally flawed. If you do not trust your teams, your teams will not trust you.

Engineers will prioritize. Trust them. Engineers will deliver. Trust them. Engineers will fail. Let them.

Product development is a trust fall. When you let go, your team will catch you. And when they don’t, they’ll pick you up, dust you off, and say, “we’ll make an adjustment.” Fail fast but recover faster. The more times you fall and hit the ground, the more adjustments we make. The quicker we repeat this process, the less time you spend on the ground. Shame on the teams that spend days, weeks, months planning their fall, ensuring everything is in place, only to find the ground has moved.

It’s inexcusable to say you fail fast when it’s really a slow, prolonged death in product, in technology, or in execution, yet it’s surprisingly common. In order to innovate, you have to fail first. In order to build an effective team, you have to fail first. In order to produce a successful product, you have to fail first. The number one fatal mistake teams make is not recognizing when they’ve failed or being too proud to admit it. This is what Agile is actually about. It’s not about roadmaps or requirements gathering or user stories or stand-ups. It’s about failing and adjusting, failing and adjusting, failing and adjusting. Agile is micro failure on a macro level. As Cagan remarks, the biggest visible distinguisher of a great team is no roadmap.

A roadmap is essentially dooming your team to get out a small number of things that will almost certainly—most of them—not work.

Developers need to be part of the ideation process from day one. The customer is usually wrong. They often don’t know what they want or what’s possible. Developers are invested in the technology and understand its capabilities and limitations. Trust them.

If you’re just using your developers to code, you’re only getting about half their value.

I’ve said it before but without a focused vision, a product will fail. Without embracing new ideas and technology, a company will become irrelevant. Developers must be closely involved with both aspects in order to be successful. Innovate, fail, adjust, deliver. Repeat.

Product development is a trust fall. The key is letting go.

Writing Good Code

There’s no shortage of people preaching the importance of good code. Indeed, many make a career of it. The resources available are equally endless, but lately I’ve been wondering how to extract the essence of building high-quality systems into a shorter, more concise narrative. This is actually something I’ve thought about for a while, but I’m just now starting to formulate some ideas into a blog post. The ideas aren’t fully developed, but my hope is to flesh them out further in the future. You can talk about design patterns, abstraction, encapsulation, and cohesion until you’re blue in the face, but what is the essence of good code?

Like any other engineering discipline, quality control is a huge part of building software. This isn’t just ensuring that it “works”—it’s ensuring it works under the complete range of operating conditions, ensuring it’s usable, ensuring it’s maintainable, ensuring it performs well, and ensuring a number of other characteristics. Verifying it “works” is just a small part of a much larger picture. Anybody can write code that works, but there’s more to it than that. Software is more malleable than most other things. Not only does it require longevity, it requires giving in to that malleability. If it doesn’t, you end up with something that’s brittle and broken. Because of this, it’s vital we test for correctness and measure for quality.

SCRAP for Quality

Quality is a very subjective thing. How can one possibly measure it? Code complexity and static analysis tooling come to mind, and these are deservedly valued, but it really just scratches the surface. How do we narrow an immensely broad topic like “quality” into a set of tangible, quantifiable goals? This is really the crux of the problem, but we can start by identifying a sort of checklist or guidelines for writing software. This breaks that larger problem into smaller, more digestible pieces. The checklist I’ve come up with is called SCRAP, an acronym defined below. It’s unlikely to be comprehensive, but I think it covers most, if not all, of the key areas.

Scalability Plan for growth
Complexity Plan for humans
Resiliency Plan for failure
API Plan for integration
Performance Plan for execution

Each of these items is itself a blog post, so this is only a brief explanation. There is definitely overlap between some of these facets, and there are also multiple dimensions to some.

Scalability is a plan for growth—in code, in organization, in architecture, and in workload. Without it, you reach a point where your system falls over, whether it’s because of a growing userbase, a growing codebase, or any number of other reasons. It’s also worth pointing out that without the ‘S’, all you have is CRAP. This also helps illustrate some of the overlap between these areas of focus as it leads into Complexity, which is a plan for humans. Scalability is about technology scale and demand scale, but it’s also about people scale. As your team grows or as your company grows, how do you manage that growth at the code level?

Planning for people doesn’t just mean managing growth, it also means managing complexity. If code is overly complex, it’s difficult to maintain, it’s difficult to extend, and it’s difficult to fix. If systems are overly complex, they’re difficult to deploy, difficult to manage, and difficult to monitor. Plan for humans, not machines.

Resiliency is a strategy for fault tolerance. It’s a plan for failure. What happens when you crash? What happens when a service you depend on crashes? What happens when the database is unavailable? What happens when the network is unreliable? Systems of all kind need to be designed with the expectation of failure. If you’re not thinking about failure at the code level, you’re not thinking about it enough.

One thing you should be noticing is that “people” is a cross-cutting concern. After all, it’s people who design the systems, and it’s people who write the code. While API is a plan for integration, it’s people who integrate the pieces. This is about making your API a first-class citizen. It doesn’t matter if it’s an internal API, a library API, or a RESTful API. It doesn’t matter if it’s for first parties or third parties. As a programmer, your API is your user interface. It needs to be clean. It needs to be sensible. It needs to be well-documented. If those integration points aren’t properly thought out, the integration will be more difficult than it needs to be.

The last item on the checklist is Performance. I originally defined this as a plan for speed, but I realized there’s a lot more to performance than doing things fast. It’s about doing things well, which is why I call Performance a plan for execution. Again, this has some overlap with Resiliency and Scalability, but it’s also about measurement. It’s about benchmarking and profiling. It’s about testing at scale and under failure because testing in a vacuum doesn’t mean much. It’s about optimization.

This brings about the oft-asked question: how do I know when and where to optimize? While premature optimization might be the root of all evil, it’s not a universal law. Optimize along the critical path and outward from there only as necessary. The further you get from that critical path, the more wasted effort it’s going to end up being. It depreciates quickly, so don’t lose sight of your optimization ROI. This will enable you to ship quickly and ship quality code. But once you ship, you’re not done measuring! It’s more important than ever that you continue to measure in production. Use performance and usage-pattern data to drive intelligent decisions and intelligent iteration. The payoff is that this doesn’t just apply to code decisions, it applies to all decisions. This is where the real value of measuring comes through. Decisions that aren’t backed by data aren’t decisions, they’re impulses. Don’t be impulsive, be empirical.

Going Forward

There is work to be done with respect to quantifying the items on this checklist. However, I strongly suspect even just thinking about them, formally or informally, will improve the overall quality of your code by an equally-unmeasurable order of magnitude. If your code doesn’t pass this checklist, it’s tech debt. Sometimes that’s okay, but remember that tech debt has compounding interest. If you don’t pay it off, you will eventually go bankrupt.

It’s not about being a 10x developer. It’s about being a 1x developer who writes 10x code. By that I mean the quality of your code is far more important than its quantity. Quality will outlast and outperform quantity. These guidelines tend to have a ripple effect. Legacy code often breeds legacy-like code. Instilling these rules in your developer culture helps to make engineers cognizant of when they should break the mold, introduce new patterns, or improve existing ones. Bad code begets bad code, and bad code is the atrophy of good developers.

If State Is Hell, SOA Is Satan

More and more companies are describing their success stories regarding the switch to a service-oriented architecture. As with any technological upswing, there’s a clear and palpable hype factor involved (Big Data™ or The Cloud™ anyone?), but obviously it’s not just puff.

While microservices and SOA have seen a staggering rate of adoption in recent years, the mindset of developers often seems to be stuck in the past. I think this is, at least in part, because we seek a mental model we can reason about. It’s why we build abstractions in the first place. In a sense, I would argue there’s a comparison to be made between the explosion of OOP in the early 90’s and today’s SOA trend. After all, SOA is as much about people scale as it is about workload scale, so it makes sense from an organizational perspective.

The Perils of Good Abstractions

While systems are becoming more and more distributed, abstractions are attempting to make them less and less complex. Mesosphere is a perfect example of this, attempting to provide the “datacenter operating system.” Apache Mesos allows you to “program against your datacenter like it’s a single pool of resources.” It’s an appealing proposition to say the least. PaaS like Google App Engine and Heroku offer similar abstractions—write your code without thinking about scale. The problem is you absolutely have to think about scale or you’re bound to run into problems down the road. And while these abstractions are nice, they can be dangerous just the same. Welcome to the perils of good abstractions.

I like to talk about App Engine because I have firsthand experience with it. It’s an easy sell for startups. It handles spinning up instances when you need them, turning them down when you don’t. It’s your app server, database, caching, job scheduler, task queue all in one, and it does it at scale. There’s vendor lock-in, sure, yet it means no ops, no sysadmins, no overhead. Push to deploy. But it’s a leaky abstraction. It has to be. App Engine scales because it’s distributed, but it allows—no, encourages—you to write your system as a monolith. The datastore, memcache, and task queue accesses are masked as RPCs. This is great for our developer mental model, but it will bite you if you’re not careful. App Engine imposes certain limitations to encourage good design; for instance, front-end requests and datastore calls are limited to 60 seconds (it used to be much less), but the leakiness goes beyond that.

RPC is consistently at odds with distributed systems. I would go so far as to say it’s an anti-pattern in many cases. RPC encourages writing synchronous code, but distributed systems are inherently asynchronous. The network is not reliable. The network is not fast. The network is not your friend. Developers who either don’t understand this or don’t realize what’s happening when they make an RPC will write code as if they were calling a function. It will sure as hell look like just calling a function. When we think synchronously, we end up with systems that are slow, fault intolerant, and generally not scalable. To be quite honest, however, this is perfectly acceptable for 90% of startups as they are getting off the ground because they don’t have workloads at meaningful scale.

There’s certainly some irony here. One of the selling points of App Engine is its ability to scale to large amounts of traffic, yet the vast majority of startups would be perfectly suited to scaling up rather than out, perhaps with some failover in place for good measure. Stack Overflow is the poster child of scale-up architecture. In truth, your architecture should be a function of your access patterns, not the other way around (and App Engine is very much tailored to a specific set of access patterns). Nonetheless, it shows that vertical scaling can work. I would bet a lot of startups could sufficiently run on a large, adequately specced machine or maybe a small handful of them.

The cruel irony is that once you hit a certain scale with App Engine, both in terms of your development organization and user base, you’ve reached a point where you have to migrate off it. And if your data model isn’t properly thought out, you will without a doubt hit scale problems. It’s to the point where you need someone with deep knowledge of how App Engine works in order to build quality systems on it. Good luck hiring a team of engineers who understand it. GAE is great at accelerating you to 100 mph, but you better have some nice airbags for the brick wall it launches you into. In fairness, this is a problem every org hits—Conway’s law is very much a reality and every startup has growing pains. To be clear, this isn’t a jab at GAE, which is actually very effective at accelerating a product using little capital and can sustain long-term success given the right use case. Instead, I use it to illustrate a point.

Peering Through the Abstraction

Eventually SOA makes sense, but our abstractions can cause problems if we don’t understand what’s going on behind the curtain (hence the leakiness). Partial failure is all but guaranteed, and latency, partitioning, and other network pressure happens all the time.

Ken Arnold is famed with once saying “state is hell” in reference to designing distributed systems. In the past, I’ve written how scaling shared data is hard, but with SOA it’s practically a requirement. Ken is right though—state is hell, and SOA is fundamentally competing with consistency. The FLP Impossibility result and the CAP theorem can prove it formally, but really this should be intuitively obvious if we accept the laws of physics.

On the other hand, if you store information that I can’t reconstruct, then a whole host of questions suddenly surface. One question is, “Are you now a single point of failure?” I have to talk to you now. I can’t talk to anyone else. So what happens if you go down?

To deal with that, you could be replicated. But now you have to worry about replication strategies. What if I talk to one replicant and modify some data, then I talk to another? Is that modification guaranteed to have already arrived there? What is the replication strategy? What kind of consistency do you need—tight or loose? What happens if the network gets partitioned and the replicants can’t talk to each other? Can anybody proceed?

Essentially, the more stateful your system is, the harder it’s going to be to scale it because distributing that state introduces a rich tapestry of problems. In practice, we often can’t eliminate state wholesale, but basically everything that can be stateless should be stateless.

Making servers disposable allows you a great deal of flexibility. Former Netflix Cloud Architect Adrian Cockcroft articulates this idea well:

You want to think of servers like cattle, not pets. If you have a machine in production that performs a specialized function, and you know it by name, and everyone gets sad when it goes down, it’s a pet. Instead you should think of your servers like a herd of cows. What you care about is how many gallons of milk you get. If one day you notice you’re getting less milk than usual, you find out which cows aren’t producing well and replace them.

This is effectively how App Engine achieves its scalability. With lightweight, stateless, and disposable instances, it can spin them up and down on the fly without worrying about being in an invalid state.

App Engine also relies on eventual consistency as the default model for datastore interactions. This makes queries fast and highly available, while snapshot isolation can be achieved using entity-group transactions if necessary. The latter, of course, can result in a lot of contention and latency. Yet, people seem to have a hard time grappling with the reality of eventual consistency in distributed systems. State is hell, but calling SOA “satan” is clearly a hyperbole. It is a tough problem nevertheless.

A State of Mind

In the situations where we need state, we have to reconcile with the realities of distributed systems. This means understanding the limitations and accepting the complexities, not papering over them. It doesn’t mean throwing away abstractions. Fortunately, distributed computing is the focus of a lot of great research, so there are primitives with which we can build: immutability, causal ordering, eventual consistency, CRDTs, and other ideas.

As long as we recognize the trade-offs, we can design around them. The crux is knowing they exist in the first place. We can’t have ACID semantics while remaining highly available, but we can use Highly Available Transactions to provide strong-enough guarantees. At the same time, not all operations require coordination or concurrency control. The sooner we view eventual consistency as a solution and not a consequence, the sooner we can let go of this existential crisis. Other interesting research includes BOOM, which seeks to provide a high-level, declarative approach to distributed programming.

State might be hell, but it’s a hell we have to live. I don’t advocate an all-out microservice architecture for a company just getting its start. The complications far outweigh any benefits to be gained, but it becomes a necessity at a certain point. The key is having an exit strategy. PaaS providers make this difficult due to vendor lock-in and architectural constraints. Weigh their advantages carefully.

Once you do transition to a SOA, make as many of those services, or the pieces backing them, as stateless as possible. For those which aren’t stateless, know that the problem typically isn’t novel. These problems have been solved or are continuing to be solved in new and interesting ways. Academic research is naturally at the bleeding edge with industry often lagging behind. OOP concepts date back to as early as the 60’s but didn’t gain widespread adoption until several decades later. Distributed computing is no different. SOA is just a state of mind.

Not Invented Here

Engineers love engineering things. The reason is self-evident (and maybe self-fulfilling—why else would you be an engineer?). We like to think we’re pretty good at solving problems. Unfortunately, this mindset can, on occasion, yield undesirable consequences which might not be immediately apparent but all the while damaging.

Developers are all in tune with the idea of “don’t reinvent the wheel,” but it seems to be eschewed sometimes, deliberately or otherwise. People don’t generally write their own merge sort, so why would they write their own consensus protocol? Anecdotally speaking, they do.

Not-Invented-Here Syndrome is a very real thing. In many cases, consciously or not, it’s a cultural problem. In others, it’s an engineering one. Camille Fournier’s blog post on ZooKeeper helps to illustrate this point and provide some context. In it, she describes why some distributed systems choose to rely on external services, such as ZooKeeper, for distributed coordination, while others build in their own coordination logic.

We draw a parallel between distributed systems and traditional RDBMSs, which typically implement their own file system and other low-level facilities. Why? Because it’s their competitive advantage. SQL databases sell because they offer finely tuned performance, and in order to do that, they need to control these things that the OS otherwise provides. Distributed databases like Riak sell because they own the coordination logic, which helps promote their competitive advantage. This follows what Joel Spolsky says about NIH Syndrome in that “if it’s a core business function—do it yourself, no matter what.”

If you’re developing a computer game where the plot is your competitive advantage, it’s OK to use a third party 3D library. But if cool 3D effects are going to be your distinguishing feature, you had better roll your own.

This makes a lot of sense. My sorting algorithm is unlikely to provide me with a competitive edge, but something else might, even if it’s not particularly novel.

So in some situations, homegrown is justifiable, but that’s not always the case. Redis’ competitive advantage is its predictably low latencies and data structures. Does it make sense for it to implement its own clustering and leader election protocols? Maybe, but this is where NIH can bite you. If what you’re doing is important and there’s precedent, lean on existing research and solutions. Most would argue write safety is important, and there is certainly precedent for leader election. Why not leverage that work? Things like Raft, Paxos, and Zab provide solutions which are proven using formal methods and are peer reviewed. That doesn’t mean new solutions can’t be developed, but they generally require model checking and further scrutiny to ensure correctness. Otherwise, you’ll inevitably run into problems. Implementing our own solutions can provide valuable insight, but leave them at home if they’re not rigorously approached. Rolling your own and calling it “good enough” is dishonest to your users if it’s not properly communicated.

Elasticsearch is another interesting case to look at. You might say Elasticsearch’s competitive advantage is its full-text search engine, but it’s not. Like Solr, it’s built on Lucene. Elasticsearch was designed from the ground-up to be distributed. This is what gives it a leg up over Solr and other similar search servers where horizontal scaling and fault tolerance were essentially tacked on. In a way, this resembles what happened with Redis, where failover and clustering were introduced as an afterthought. However, unlike Redis, which chose to implement its own failover coordination and cluster-membership protocol, Solr opted to use ZooKeeper as an external coordinator.

We see that Elasticsearch’s core advantage is its distributed nature. Following that notion, it makes sense for it to own that coordination, which is why its designers chose to implement their own internal cluster membership, ZenDisco. But it turns out writing cluster-membership protocols is really fucking hard, and unless you’ve written proofs for it, you probably shouldn’t do it at all. The analogy here would be writing your own encryption algorithm—there’s tons of institutional knowledge which has laid the groundwork for solutions which are well-researched and well-understood. That knowledge should be embraced in situations like this.

I don’t mean to pick on Redis and Elasticsearch. They’re both excellent systems, but they serve as good examples for this discussion. The problem is that users of these systems tend to overlook the issues exposed by this mentality. Frankly, few people would know problems exist unless they are clearly documented by vendors (and not sales people) and even then, how many people actually read the docs cover-to-cover? It’s essential we know a system’s shortcomings and edge cases so we can recognize which situations to apply it and, more important, which we should not.

You don’t have to rely on an existing third-party library or service. Believe it or not, this isn’t a sales pitch for ZooKeeper. If it’s a core business function, it probably makes sense to build it yourself as Joel describes. What doesn’t make sense, however, is to build out whatever that is without being cognizant of conventional wisdom. I’m amazed at how often people are willing to throw away institutional knowledge, either because they don’t seek it out or they think they can do better (without formal verification). If I have seen further, it is by standing on the shoulders of giants.

Sometimes Kill -9 Isn’t Enough

If there’s one thing to know about distributed systems, it’s that they have to be designed with the expectation of failure. It’s also safe to say that most software these days is, in some form, distributed—whether it’s a database, mobile app, or enterprise SaaS. If you have two different processes talking to each other, you have a distributed system, and it doesn’t matter if those processes are local or intergalactically displaced.

Marc Hedlund recently had a great post on Stripe’s game-day exercises where they block off an afternoon, take a blunt instrument to their servers, and see what happens. We’re talking like abruptly killing instances here—kill -9, ec2-terminate-instances, yanking on the damn power cord—that sort of thing. Everyone should be doing this type of stuff. You really don’t know how your system behaves until you see it under failure conditions.

Netflix uses Chaos Monkey to randomly terminate instances, and they do it in production. That takes some balls, but you know you have a pretty solid system when you’re comfortable killing live production servers. At Workiva, we have a middleware we use to inject datastore and other RPC errors into Google App Engine. Building resilient systems is an objective concern, but we still have a ways to go.

We need to be pessimists and design for failure, but injecting failure isn’t enough. Sure, every so often shit hits the proverbial fan, and we need to be tolerant of that. But more often than not, that fan is just a strong headwind.

Simulating failure is a necessary element for building reliable distributed systems, but system behavior isn’t black and white, it’s a continuum. We build our system in a vacuum and (hopefully) test it under failure, but we should also be observing it in this gray area. How does it perform with unreliable network connections? Low bandwidth? High latency? Dropped packets? Out-of-order packets? Duplicate packets? Not only do our systems need to be fault-tolerant, they need to be pressure-tolerant.

Simulating Pressure

There are a lot of options to do these types of “pressure” simulations. On Linux, we can use iptables to accomplish this.

This will drop incoming and outgoing packets with a 10% probability. Alternatively, we can use tc to simulate network latency, limited bandwidth, and packet loss.

The above adds an additional 250ms of latency with 10% packet loss and a bandwidth limit of 1Mbps. Likewise, on OSX and BSD we can use ipfw or pfctl.

Here we inject 500ms of latency while limiting bandwidth to 1Mbps and dropping 10% of packets.

These are just some very simple traffic-shaping examples. Several of these tools allow you to perform even more advanced testing, like adding variation and correlation values. This would allow you to emulate burst packet loss and other situations we often encounter. For instance, with tc, we can add jitter to the network latency.

This adds 50±20ms of latency. Since network latency typically isn’t uniform, we can apply a normal distribution to achieve a more realistic simulation.

Now we get a nice bell curve which is probably more representative of what we see in practice. We can also use tc to re-order, duplicate, and corrupt packets.

I’ve been working on an open-source tool which attempts to wrap these controls up so you don’t have to memorize the options or worry about portability. It’s pretty primitive and doesn’t support much yet, but it provides a thin layer of abstraction.

Conclusion

Injecting failure is crucial to understanding systems and building confidence, but like good test coverage, it’s important to examine suboptimal-but-operating scenarios. This isn’t even 99th-percentile stuff—this is the type of shit your users deal with every single day. If you can’t handle sustained latency and sporadic network partitions, who cares if you tolerate instance failure? The tools are at our disposal, they just need to be leveraged.

Iris Decentralized Cloud Messaging

A couple weeks ago, I published a rather extensive analysis of numerous message queues, both brokered and brokerless. Brokerless messaging is really just another name for peer-to-peer communication. As we saw, the difference in message latency and throughput between peer-to-peer systems and brokered ones is several orders of magnitude. ZeroMQ and nanomsg are able to reliably transmit millions of messages per second at the expense of guaranteed delivery.

Peer-to-peer messaging is decentralized, scalable, and fast, but it brings with it an inherent complexity. There is a dichotomy between how brokerless messaging is conceptualized and how distributed systems are actually built. Distributed systems are composed of services like applications, databases, caches, etc. Services are composed of instances or nodes—individually addressable hosts, either physical or virtual. The key observation is that, conceptually, the unit of interaction lies at the service level, not the instance level. We don’t care about which database server we interact with, we just want to talk to database server (or perhaps multiple). We’re concerned with logical groups of nodes.

While traditional socket-queuing systems like ZeroMQ solve the problem of scaling, they bring about a certain coupling between components. System designers are forced to build applications which communicate with nodes, not services. We can introduce load balancers like HAProxy, but we’re still addressing specific locations while creating potential single points of failure. With lightweight VMs and the pervasiveness of elastic clouds, IP addresses are becoming less and less static—they come and go. The canonical way of dealing with this problem is to use distributed coordination and service discovery via ZooKeeper, et al., but this introduces more configuration, more moving parts, and more headaches.

The reality is that distributed systems are not built with the instance as the smallest unit of composition in mind, they’re built with services in mind. As discussed earlier, a service is simply a logical grouping of nodes. This abstraction is what we attempt to mimic with things like etcd, ZooKeeper and HAProxy. These assemblies are proven, but there are alternative solutions that offer zero configuration, minimal network management, and overall less complexity. One such solution that I want to explore is a distributed messaging framework called Iris.

Decentralized Messaging with Iris

Iris is posited as a decentralized approach to backend messaging middleware. It looks to address several of the fundamental issues with traditional brokerless systems, like tight coupling and security.

In order to avoid the problem of addressing instances, Iris considers clusters to be the smallest logical blocks of which systems are composed. A cluster is a collection of zero or more nodes which are responsible for a certain service sub-task. Clusters are then assembled into services such that they can communicate with each other without any regard as to which instance is servicing their requests or where it’s located. Lastly, services are composed into federations, which allow them to communicate across different clouds.

instances_vs_clusters

This form of composition allows Iris to use semantic or logical addressing instead of the standard physical addressing. Nodes specify the name of the cluster they wish to participate in, while Iris handles the intricacies of routing and balancing. For example, you might have three database servers which belong to a single cluster called “databases.” The cluster is reached by its name and requests are distributed across the three nodes. Iris also takes care of service discovery, detecting new clusters as they are created on the same cloud.

physical_vs_semantic

With libraries like ZeroMQ, security tends to be an afterthought. Iris has been built from the ground-up with security in mind, and it provides a security model that is simple and fast.

Iris uses a relaxed security model that provides perfect secrecy whilst at the same time requiring effectively zero configuration. This is achieved through the observation that if a node of a service is compromised, the whole system is considered undermined. Hence, the unit of security is a service – opposed to individual instances – where any successfully authenticated node is trusted by all. This enables full data protection whilst maintaining the loosely coupled nature of the system.

In practice, what this means is that each cluster uses a single private key. This encryption scheme not only makes deployment trivial, it minimizes the effect security has on speed.

authentication_and_encryption

Like ZeroMQ and nanomsg, Iris offers a few different messaging patterns. It provides the standard request-reply and publish-subscribe schemes, but it’s important to remember that the smallest addressable unit is the cluster, not the node. As such, requests are targeted at a cluster and subsequently relayed on to a member in a load-balanced fashion. Publish-subscribe, on the other hand, is not targeted at a single cluster. It allows members of any cluster to subscribe and publish to a topic.

Iris also implements two patterns called “broadcast” and “tunnel.” While request-reply forwards a message to one member of a cluster, broadcast forwards it to all members. The caveat is that there is no way to listen for responses to a broadcast.

Tunnel is designed to address the problem of stateful or streaming transactions where a communication between two endpoints may consist of multiple data exchanges which need to occur as an atomic operation. It provides the guarantee of in-order and throttled message delivery by establishing a channel between a client and a node.

schemes

Performance Characteristics

According to its author, Iris is still in a “feature phase” and hasn’t been optimized for speed. Since it’s written in Go, I’ve compared its pub/sub benchmark performance with other Go messaging libraries, NATS and NSQ. As before, these benchmarks shouldn’t be taken as gospel, the code is available here, and pull requests are welcome.

We can see that Iris is comparable to NSQ on the sending side and about 4x on the receiving side, at least out of the box.

Conclusion

Brokerless systems like ZeroMQ and nanomsg offer considerably higher throughput and less latency than classical message-oriented middleware but require greater orchestration of network topologies. They offer high scalability but can lead to tighter coupling between components. Traditional brokered message queues, like those of the AMQP variety, tend to be slower while providing more guarantees and reduced coupling. However, they are also more prone to scale problems like availability and partitioning.

In terms of its qualities, Iris appears to be a reasonable compromise between the decentralized nature of the brokerless systems and the minimal-configuration and management of the brokered ones. Its intrinsic value lies in its ability to hide the complexities of the underlying infrastructure behind distributed systems. Rather, Iris lends itself to building large-scale systems the way we conceptualize and reason about them—by using services as the building blocks, not instances.

Dissecting Message Queues

Continuing my series on message queues, I spent this weekend dissecting various libraries for performing distributed messaging. In this analysis, I look at a few different aspects, including API characteristics, ease of deployment and maintenance, and performance qualities. The message queues have been categorized into two groups: brokerless and brokered. Brokerless message queues are peer-to-peer such that there is no middleman involved in the transmission of messages, while brokered queues have some sort of server in between endpoints.

The systems I’ll be analyzing are:

Brokerless
nanomsg
ZeroMQ

Brokered
ActiveMQ
NATS
Kafka
Kestrel
NSQ
RabbitMQ
Redis
ruby-nats

To start, let’s look at the performance metrics since this is arguably what people care the most about. I’ve measured two key metrics: throughput and latency. All tests were run on a MacBook Pro 2.6 GHz i7, 16GB RAM. These tests are evaluating a publish-subscribe topology with a single producer and single consumer. This provides a good baseline. It would be interesting to benchmark a scaled-up topology but requires more instrumentation.

The code used for benchmarking, written in Go, is available on GitHub. The results below shouldn’t be taken as gospel as there are likely optimizations that can be made to squeeze out performance gains. Pull requests are welcome.

Throughput Benchmarks

Throughput is the number of messages per second the system is able to process, but what’s important to note here is that there is no single “throughput” that a queue might have. We’re sending messages between two different endpoints, so what we observe is a “sender” throughput and a “receiver” throughput—that is, the number of messages that can be sent per second and the number of messages that can be received per second.

This test was performed by sending 1,000,000 1KB messages and measuring the time to send and receive on each side. Many performance tests tend to use smaller messages in the range of 100 to 500 bytes. I chose 1KB because it’s more representative of what you might see in a production environment, although this varies case by case. For message-oriented middleware systems, only one broker was used. In most cases, a clustered environment would yield much better results.Unsurprisingly, there’s higher throughput on the sending side. What’s interesting, however, is the disparity in the sender-to-receiver ratios. ZeroMQ is capable of sending over 5,000,000 messages per second but is only able to receive about 600,000/second. In contrast, nanomsg sends shy of 3,000,000/second but can receive almost 2,000,000.

Now let’s take a look at the brokered message queues. Intuitively, we observe that brokered message queues have dramatically less throughput than their brokerless counterparts by a couple orders of magnitude for the most part. Half the brokered queues have a throughput below 25,000 messages/second. The numbers for Redis might be a bit misleading though. Despite providing pub/sub functionality, it’s not really designed to operate as a robust messaging queue. In a similar fashion to ZeroMQ, Redis disconnects slow clients, and it’s important to point out that it was not able to reliably handle this volume of messaging. As such, we consider it an outlier. Kafka and ruby-nats have similar performance characteristics to Redis but were able to reliably handle the message volume without intermittent failures. The Go implementation of NATS, gnatsd, has exceptional throughput for a brokered message queue.

Outliers aside, we see that the brokered queues have fairly uniform throughputs. Unlike the brokerless libraries, there is little-to-no disparity in the sender-to-receiver ratios, which themselves are all very close to one.

Latency Benchmarks

The second key performance metric is message latency. This measures how long it takes for a message to be transmitted between endpoints. Intuition might tell us that this is simply the inverse of throughput, i.e. if throughput is messages/second, latency is seconds/message. However, by looking closely at this image borrowed from a ZeroMQ white paper, we can see that this isn’t quite the case. latency The reality is that the latency per message sent over the wire is not uniform. It can vary wildly for each one. In truth, the relationship between latency and throughput is a bit more involved. Unlike throughput, however, latency is not measured at the sender or the receiver but rather as a whole. But since each message has its own latency, we will look at the averages of all of them. Going further, we will see how the average message latency fluctuates in relation to the number of messages sent. Again, intuition tells us that more messages means more queueing, which means higher latency.

As we did before, we’ll start by looking at the brokerless systems.
In general, our hypothesis proves correct in that, as more messages are sent through the system, the latency of each message increases. What’s interesting is the tapering at the 500,000-point in which latency appears to increase at a slower rate as we approach 1,000,000 messages. Another interesting observation is the initial spike in latency between 1,000 and 5,000 messages, which is more pronounced with nanomsg. It’s difficult to pinpoint causation, but these changes might be indicative of how message batching and other network-stack traversal optimizations are implemented in each library. More data points may provide better visibility.

We see some similar patterns with brokered queues and also some interesting new ones.

Redis behaves in a similar manner as before, with an initial latency spike and then a quick tapering off. It differs in that the tapering becomes essentially constant right after 5,000 messages. NSQ doesn’t exhibit the same spike in latency and behaves, more or less, linearly. Kestrel fits our hypothesis.

Notice that ruby-nats and NATS hardly even register on the chart. They exhibited surprisingly low latencies and unexpected relationships with the number of messages.Remarkably, the message latencies for ruby-nats and NATS appear to be constant. This is counterintuitive to our hypothesis.

You may have noticed that Kafka, ActiveMQ, and RabbitMQ were absent from the above charts. This was because their latencies tended to be orders-of-magnitude higher than the other brokered message queues, so ActiveMQ and RabbitMQ were grouped into their own AMQP category. I’ve also included Kafka since it’s in the same ballpark.

Here we see that RabbitMQ’s latency is constant, while ActiveMQ and Kafka are linear. What’s unclear is the apparent disconnect between their throughput and mean latencies.

Qualitative Analysis

Now that we’ve seen some empirical data on how these different libraries perform, I’ll take a look at how they work from a pragmatic point of view. Message throughput and speed is important, but it isn’t very practical if the library is difficult to use, deploy, or maintain.

ZeroMQ and Nanomsg

Technically speaking, nanomsg isn’t a message queue but rather a socket-style library for performing distributed messaging through a variety of convenient patterns. As a result, there’s nothing to deploy aside from embedding the library itself within your application. This makes deployment a non-issue.

Nanomsg is written by one of the ZeroMQ authors, and as I discussed before, works in a very similar way to that library. From a development standpoint, nanomsg provides an overall cleaner API. Unlike ZeroMQ, there is no notion of a context in which sockets are bound to. Furthermore, nanomsg provides pluggable transport and messaging protocols, which make it more open to extension. Its additional built-in scalability protocols also make it quite appealing.

Like ZeroMQ, it guarantees that messages will be delivered atomically intact and ordered but does not guarantee the delivery of them. Partial messages will not be delivered, and it’s possible that some messages won’t be delivered at all. The library’s author, Martin Sustrik, makes this abundantly clear:

Guaranteed delivery is a myth. Nothing is 100% guaranteed. That’s the nature of the world we live in. What we should do instead is to build an internet-like system that is resilient in face of failures and routes around damage.

The philosophy is to use a combination of topologies to build resilient systems that add in these guarantees in a best-effort sort of way.

On the other hand, nanomsg is still in beta and may not be considered production-ready. Consequently, there aren’t a lot of resources available and not much of a development community around it.

ZeroMQ is a battle-tested messaging library that’s been around since 2007. Some may perceive it as a predecessor to nanomsg, but what nano lacks is where ZeroMQ thrives—a flourishing developer community and a deluge of resources and supporting material. For many, it’s the de facto tool for building fast, asynchronous distributed messaging systems that scale.

Like nanomsg, ZeroMQ is not a message-oriented middleware and simply operates as a socket abstraction. In terms of usability, it’s very much the same as nanomsg, although its API is marginally more involved.

ActiveMQ and RabbitMQ

ActiveMQ and RabbitMQ are implementations of AMQP. They act as brokers which ensure messages are delivered. ActiveMQ and RabbitMQ support both persistent and non-persistent delivery. By default, messages are written to disk such that they survive a broker restart. They also support synchronous and asynchronous sending of messages with the former having substantial impact on latency. To guarantee delivery, these brokers use message acknowledgements which also incurs a massive latency penalty.

As far as availability and fault tolerance goes, these brokers support clustering through shared storage or shared nothing. Queues can be replicated across clustered nodes so there is no single point of failure or message loss.

AMQP is a non-trivial protocol which its creators claim to be over-engineered. These additional guarantees are made at the expense of major complexity and performance trade-offs. Fundamentally, clients are more difficult to implement and use.

Since they’re message brokers, ActiveMQ and RabbitMQ are additional moving parts that need to be managed in your distributed system, which brings deployment and maintenance costs. The same is true for the remaining message queues being discussed.

NATS and Ruby-NATS

NATS (gnatsd) is a pure Go implementation of the ruby-nats messaging system. NATS is distributed messaging rethought to be less enterprisey and more lightweight (this is in direct contrast to systems like ActiveMQ, RabbitMQ, and others). Apcera’s Derek Collison, the library’s author and former TIBCO architect, describes NATS as “more like a nervous system” than an enterprise message queue. It doesn’t do persistence or message transactions, but it’s fast and easy to use. Clustering is supported so it can be built on top of with high availability and failover in mind, and clients can be sharded. Unfortunately, TLS and SSL are not yet supported in NATS (they are in the ruby-nats) but on the roadmap.

As we observed earlier, NATS performs far better than the original Ruby implementation. Clients can be used interchangeably with NATS and ruby-nats.

Kafka

Originally developed by LinkedIn, Kafka implements publish-subscribe messaging through a distributed commit log. It’s designed to operate as a cluster that can be consumed by large amounts of clients. Horizontal scaling is done effortlessly using ZooKeeper so that additional consumers and brokers can be introduced seamlessly. It also transparently takes care of cluster rebalancing.

Kafka uses a persistent commit log to store messages on the broker. Unlike other durable queues which usually remove persisted messages on consumption, Kafka retains them for a configured period of time. This means that messages can be “replayed” in the event that a consumer fails.

ZooKeeper makes managing Kafka clusters relatively easy, but it does introduce yet another element that needs to be maintained. That said, Kafka exposes a great API and Shopify has an excellent Go client called Sarama that makes interfacing with Kafka very accessible.

Kestrel

Kestrel is a distributed message queue open sourced by Twitter. It’s intended to be fast and lightweight. Because of this, it has no concept of clustering or failover. While Kafka is built from the ground up to be clustered through ZooKeeper, the onus of message partitioning is put upon the clients of Kestrel. There is no cross-communication between nodes. It makes this trade-off in the name of simplicity. It features durable queues, item expiration, transactional reads, and fanout queues while operating over Thrift or memcache protocols.

Kestrel is designed to be small, but this means that more work must be done by the developer to build out a robust messaging system on top of it. Kafka seems to be a more “all-in-one” solution.

NSQ

NSQ is a messaging platform built by Bitly. I use the word platform because there’s a lot of tooling built around NSQ to make it useful for real-time distributed messaging. The daemon that receives, queues, and delivers messages to clients is called nsqd. The daemon can run standalone, but NSQ is designed to run in as a distributed, decentralized topology. To achieve this, it leverages another daemon called nsqlookupd. Nsqlookupd acts as a service-discovery mechanism for nsqd instances. NSQ also provides nsqadmin, which is a web UI that displays real-time cluster statistics and acts as a way to perform various administrative tasks like clearing queues and managing topics.

By default, messages in NSQ are not durable. It’s primarily designed to be an in-memory message queue, but queue sizes can be configured such that after a certain point, messages will be written to disk. Despite this, there is no built-in replication. NSQ uses acknowledgements to guarantee message delivery, but the order of delivery is not guaranteed. Messages can also be delivered more than once, so it’s the developer’s responsibility to introduce idempotence.

Similar to Kafka, additional nodes can be added to an NSQ cluster seamlessly. It also exposes both an HTTP and TCP API, which means you don’t actually need a client library to push messages into the system. Despite all the moving parts, it’s actually quite easy to deploy. Its API is also easy to use and there are a number of client libraries available.

Redis

Last up is Redis. While Redis is great for lightweight messaging and transient storage, I can’t advocate its use as the backbone of a distributed messaging system. Its pub/sub is fast but its capabilities are limited. It would require a lot of work to build a robust system. There are solutions better suited to the problem, such as those described above, and there are also some scaling concerns with it.

These matters aside, Redis is easy to use, it’s easy to deploy and manage, and it has a relatively small footprint. Depending on the use case, it can be a great choice for real-time messaging as I’ve explored before.

Conclusion

The purpose of this analysis is not to present some sort of “winner” but instead showcase a few different options for distributed messaging. There is no “one-size-fits-all” option because it depends entirely on your needs. Some use cases require fast, fire-and-forget messages, others require delivery guarantees. In fact, many systems will call for a combination of these. My hope is that this dissection will offer some insight into which solutions work best for a given problem so that you can make an intelligent decision.

A Look at Nanomsg and Scalability Protocols (Why ZeroMQ Shouldn’t Be Your First Choice)

Earlier this month, I explored ZeroMQ and how it proves to be a promising solution for building fast, high-throughput, and scalable distributed systems. Despite lending itself quite well to these types of problems, ZeroMQ is not without its flaws. Its creators have attempted to rectify many of these shortcomings through spiritual successors Crossroads I/O and nanomsg.

The now-defunct Crossroads I/O is a proper fork of ZeroMQ with the true intention being to build a viable commercial ecosystem around it. Nanomsg, however, is a reimagining of ZeroMQ—a complete rewrite in C1. It builds upon ZeroMQ’s rock-solid performance characteristics while providing several vital improvements, both internal and external. It also attempts to address many of the strange behaviors that ZeroMQ can often exhibit. Today, I’ll take a look at what differentiates nanomsg from its predecessor and implement a use case for it in the form of service discovery.

Nanomsg vs. ZeroMQ

A common gripe people have with ZeroMQ is that it doesn’t provide an API for new transport protocols, which essentially limits you to TCP, PGM, IPC, and ITC. Nanomsg addresses this problem by providing a pluggable interface for transports and messaging protocols. This means support for new transports (e.g. WebSockets) and new messaging patterns beyond the standard set of PUB/SUB, REQ/REP, etc.

Nanomsg is also fully POSIX-compliant, giving it a cleaner API and better compatibility. No longer are sockets represented as void pointers and tied to a context—simply initialize a new socket and begin using it in one step. With ZeroMQ, the context internally acts as a storage mechanism for global state and, to the user, as a pool of I/O threads. This concept has been completely removed from nanomsg.

In addition to POSIX compliance, nanomsg is hoping to be interoperable at the API and protocol levels, which would allow it to be a drop-in replacement for, or otherwise interoperate with, ZeroMQ and other libraries which implement ZMTP/1.0 and ZMTP/2.0. It has yet to reach full parity, however.

ZeroMQ has a fundamental flaw in its architecture. Its sockets are not thread-safe. In and of itself, this is not problematic and, in fact, is beneficial in some cases. By isolating each object in its own thread, the need for semaphores and mutexes is removed. Threads don’t touch each other and, instead, concurrency is achieved with message passing. This pattern works well for objects managed by worker threads but breaks down when objects are managed in user threads. If the thread is executing another task, the object is blocked. Nanomsg does away with the one-to-one relationship between objects and threads. Rather than relying on message passing, interactions are modeled as sets of state machines. Consequently, nanomsg sockets are thread-safe.

Nanomsg has a number of other internal optimizations aimed at improving memory and CPU efficiency. ZeroMQ uses a simple trie structure to store and match PUB/SUB subscriptions, which performs nicely for sub-10,000 subscriptions but quickly becomes unreasonable for anything beyond that number. Nanomsg uses a space-optimized trie called a radix tree to store subscriptions. Unlike its predecessor, the library also offers a true zero-copy API which greatly improves performance by allowing memory to be copied from machine to machine while completely bypassing the CPU.

ZeroMQ implements load balancing using a round-robin algorithm. While it provides equal distribution of work, it has its limitations. Suppose you have two datacenters, one in New York and one in London, and each site hosts instances of “foo” services. Ideally, a request made for foo from New York shouldn’t get routed to the London datacenter and vice versa. With ZeroMQ’s round-robin balancing, this is entirely possible unfortunately. One of the new user-facing features that nanomsg offers is priority routing for outbound traffic. We avoid this latency problem by assigning priority one to foo services hosted in New York for applications also hosted there. Priority two is then assigned to foo services hosted in London, giving us a failover in the event that foos in New York are unavailable.

Additionally, nanomsg offers a command-line tool for interfacing with the system called nanocat. This tool lets you send and receive data via nanomsg sockets, which is useful for debugging and health checks.

Scalability Protocols

Perhaps most interesting is nanomsg’s philosophical departure from ZeroMQ. Instead of acting as a generic networking library, nanomsg intends to provide the “Lego bricks” for building scalable and performant distributed systems by implementing what it refers to as “scalability protocols.” These scalability protocols are communication patterns which are an abstraction on top of the network stack’s transport layer. The protocols are fully separated from each other such that each can embody a well-defined distributed algorithm. The intention, as stated by nanomsg’s author Martin Sustrik, is to have the protocol specifications standardized through the IETF.

Nanomsg currently defines six different scalability protocols: PAIR, REQREP, PIPELINE, BUS, PUBSUB, and SURVEY.

PAIR (Bidirectional Communication)

PAIR implements simple one-to-one, bidirectional communication between two endpoints. Two nodes can send messages back and forth to each other.

REQREP (Client Requests, Server Replies)

The REQREP protocol defines a pattern for building stateless services to process user requests. A client sends a request, the server receives the request, does some processing, and returns a response.

PIPELINE (One-Way Dataflow)

PIPELINE provides unidirectional dataflow which is useful for creating load-balanced processing pipelines. A producer node submits work that is distributed among consumer nodes.

BUS (Many-to-Many Communication)

BUS allows messages sent from each peer to be delivered to every other peer in the group.

PUBSUB (Topic Broadcasting)

PUBSUB allows publishers to multicast messages to zero or more subscribers. Subscribers, which can connect to multiple publishers, can subscribe to specific topics, allowing them to receive only messages that are relevant to them.

SURVEY (Ask Group a Question)

The last scalability protocol, and the one in which I will further examine by implementing a use case with, is SURVEY. The SURVEY pattern is similar to PUBSUB in that a message from one node is broadcasted to the entire group, but where it differs is that each node in the group responds to the message. This opens up a wide variety of applications because it allows you to quickly and easily query the state of a large number of systems in one go. The survey respondents must respond within a time window configured by the surveyor.

Implementing Service Discovery

As I pointed out, the SURVEY protocol has a lot of interesting applications. For example:

  • What data do you have for this record?
  • What price will you offer for this item?
  • Who can handle this request?

To continue exploring it, I will implement a basic service-discovery pattern. Service discovery is a pretty simple question that’s well-suited for SURVEY: what services are out there? Our solution will work by periodically submitting the question. As services spin up, they will connect with our service discovery system so they can identify themselves. We can tweak parameters like how often we survey the group to ensure we have an accurate list of services and how long services have to respond.

This is great because 1) the discovery system doesn’t need to be aware of what services there are—it just blindly submits the survey—and 2) when a service spins up, it will be discovered and if it dies, it will be “undiscovered.”

Here is the ServiceDiscovery class:

The discover method submits the survey and then collects the responses. Notice we construct a SURVEYOR socket and set the SURVEYOR_DEADLINE option on it. This deadline is the number of milliseconds from when a survey is submitted to when a response must be received—adjust it accordingly based on your network topology. Once the survey deadline has been reached, a NanoMsgAPIError is raised and we break the loop. The resolve method will take the name of a service and randomly select an available provider from our discovered services.

We can then wrap ServiceDiscovery with a daemon that will periodically run discover.

The discovery parameters are configured through environment variables which I inject into a Docker container.

Services must connect to the discovery system when they start up. When they receive a survey, they should respond by identifying what service they provide and where the service is located. One such service might look like the following:

Once again, we configure parameters through environment variables set on a container. Note that we connect to the discovery system with a RESPONDENT socket which then responds to service queries with the service name and address. The service itself uses a REP socket that simply responds to any requests with “The answer is 42,” but it could take any number of forms such as HTTP, raw socket, etc.

The full code for this example, including Dockerfiles, can be found on GitHub.

Nanomsg or ZeroMQ?

Based on all the improvements that nanomsg makes on top of ZeroMQ, you might be wondering why you would use the latter at all. Nanomsg is still relatively young. Although it has numerous language bindings, it hasn’t reached the maturity of ZeroMQ which has a thriving development community. ZeroMQ has extensive documentation and other resources to help developers make use of the library, while nanomsg has very little. Doing a quick Google search will give you an idea of the difference (about 500,000 results for ZeroMQ to nanomsg’s 13,500).

That said, nanomsg’s improvements and, in particular, its scalability protocols make it very appealing. A lot of the strange behaviors that ZeroMQ exposes have been resolved completely or at least mitigated. It’s actively being developed and is quickly gaining more and more traction. Technically, nanomsg has been in beta since March, but it’s starting to look production-ready if it’s not there already.

  1. The author explains why he should have originally written ZeroMQ in C instead of C++. []

Distributed Messaging with ZeroMQ

“A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable.” -Leslie Lamport

With the increased prevalence and accessibility of cloud computing, distributed systems architecture has largely supplanted more monolithic constructs. The implication of using a service-oriented architecture, of course, is that you now have to deal with a myriad of difficulties that previously never existed, such as fault tolerance, availability, and horizontal scaling. Another interesting layer of complexity is providing consistency across nodes, which itself is a problem surrounded with endless research. Algorithms like Paxos and Raft attempt to provide solutions for managing replicated data, while other solutions offer eventual consistency.

Building scalable, distributed systems is not a trivial feat, but it pales in comparison to building real-time systems of a similar nature. Distributed architecture is a well-understood problem and the fact is, most applications have a high tolerance for latency. Few systems have a demonstrable need for real-time communication, but the few that do present an interesting challenge for developers. In this article, I explore the use of ZeroMQ to approach the problem of distributed, real-time messaging in a scalable manner while also considering the notion of eventual consistency.

The Intelligent Transport Layer

ZeroMQ is a high-performance asynchronous messaging library written in C++. It’s not a dedicated message broker but rather an embeddable concurrency framework with support for direct and fan-out endpoint connections over a variety of transports. ZeroMQ implements a number of different communication patterns like request-reply, pub-sub, and push-pull through TCP, PGM (multicast), in-process, and inter-process channels. The glaring lack of UDP support is, more or less, by design because ZeroMQ was conceived to provide guaranteed-ish delivery of atomic messages. The library makes no actual guarantee of delivery, but it does make a best effort. What ZeroMQ does guarantee, however, is that you will never receive a partial message, and messages will be received in order. This is important because UDP’s performance gains really only manifest themselves in lossy or congested environments.

The comprehensive list of messaging patterns and transports alone make ZeroMQ an appealing choice for building distributed applications, but it particularly excels due to its reliability, scalability and high throughput. ZeroMQ and related technologies are popular within high-frequency trading, where packet loss of financial data is often unacceptable1. In 2011, CERN actually performed a study comparing CORBA, Ice, Thrift, ZeroMQ, and several other protocols for use in its particle accelerators and ranked ZeroMQ the highest.

cern

ZeroMQ uses some tricks that allow it to actually outperform TCP sockets in terms of throughput such as intelligent message batching, minimizing network-stack traversals, and disabling Nagle’s algorithm. By default (and when possible), messages are queued on the subscriber, which attempts to avoid the problem of slow subscribers. However, when this isn’t sufficient, ZeroMQ employs a pattern called the “Suicidal Snail.” When a subscriber is running slow and is unable to keep up with incoming messages, ZeroMQ convinces the subscriber to kill itself. “Slow” is determined by a configurable high-water mark. The idea here is that it’s better to fail fast and allow the issue to be resolved quickly than to potentially allow stale data to flow downstream. Again, think about the high-frequency trading use case.

A Distributed, Scalable, and Fast Messaging Architecture

ZeroMQ makes a convincing case for use as a transport layer. Let’s explore a little deeper to see how it could be used to build a messaging framework for use in a real-time system. ZeroMQ is fairly intuitive to use and offers a plethora of bindings for various languages, so we’ll focus more on the architecture and messaging paradigms than the actual code.

About a year ago, while I first started investigating ZeroMQ, I built a framework to perform real-time messaging and document syncing called Zinc. A “document,” in this sense, is any well-structured and mutable piece of data—think text document, spreadsheet, canvas, etc. While purely academic, the goal was to provide developers with a framework for building rich, collaborative experiences in a distributed manner.

The framework actually had two implementations, one backed by the native ZeroMQ, and one backed by the pure Java implementation, JeroMQ2. It was really designed to allow any transport layer to be used though.

Zinc is structured around just a few core concepts: Endpoints, ChannelListeners, MessageHandlers, and Messages. An Endpoint represents a single node in an application cluster and provides functionality for sending and receiving messages to and from other Endpoints. It has outbound and inbound channels for transmitting messages to peers and receiving them, respectively.

endpoint

ChannelListeners essentially act as daemons listening for incoming messages when the inbound channel is open on an Endpoint. When a message is received, it’s passed to a thread pool to be processed by a MessageHandler. Therefore, Messages are processed asynchronously in the order they are received, and as mentioned earlier, ZeroMQ guarantees in-order message delivery. As an aside, this is before I began learning Go, which would make for an ideal replacement for Java here as it’s quite well-suited to the problem :)

Messages are simply the data being exchanged between Endpoints, from which we can build upon with Documents and DocumentFragments. A Document is the structured data defined by an application, while DocumentFragment represents a partial Document, or delta, which can be as fine- or coarse- grained as needed.

Zinc is built around the publish-subscribe and push-pull messaging patterns. One Endpoint will act as the host of a cluster, while the others act as clients. With this architecture, the host acts as a publisher and the clients as subscribers. Thus, when a host fires off a Message, it’s delivered to every subscribing client in a multicast-like fashion. Conversely, clients also act as “push” Endpoints with the host being a “pull” Endpoint. Clients can then push Messages into the host’s Message queue from which the host is pulling from in a first-in-first-out manner.

This architecture allows Messages to be propagated across the entire cluster—a client makes a change which is sent to the host, who propagates this delta to all clients. This means that the client who initiated the change will receive an “echo” delta, but it will be discarded by checking the Message origin, a UUID which uniquely identifies an Endpoint. Clients are then responsible for preserving data consistency if necessary, perhaps through operational transformation or by maintaining a single source of truth from which clients can reconcile.

cluster

One of the advantages of this architecture is that it scales reasonably well due to its composability. Specifically, we can construct our cluster as a tree of clients with arbitrary breadth and depth. Obviously, the more we scale horizontally or vertically, the more latency we introduce between edge nodes. Coupled with eventual consistency, this can cause problems for some applications but might be acceptable to others.

scalability

The downside is this inherently introduces a single point of failure characterized by the client-server model. One solution might be to promote another node when the host fails and balance the tree.

Once again, this framework was mostly academic and acted as a way for me to test-drive ZeroMQ, although there are some other interesting applications of it. Since the framework supports multicast message delivery via push-pull or publish-subscribe mechanisms, one such use case is autonomous load balancing.

Paired with something like ZooKeeper, etcd, or some other service-discovery protocol, clients would be capable of discovering hosts, who act as load balancers. Once a client has discovered a host, it can request to become a part of that host’s cluster. If the host accepts the request, the client can begin to send messages to the host (and, as a result, to the rest of the cluster) and, likewise, receive messages from the host (and the rest of the cluster). This enables clients and hosts to submit work to the cluster such that it’s processed in an evenly distributed way, and workers can determine whether to pass work on further down the tree or process it themselves. Clients can choose to participate in load-balancing clusters at their own will and when they become available, making them mostly autonomous. Clients could then be quickly spun-up and spun-down using, for example, Docker containers.

ZeroMQ is great for achieving reliable, fast, and scalable distributed messaging, but it’s equally useful for performing parallel computation on a single machine or several locally networked ones by facilitating in- and inter- process communication using the same patterns. It also scales in the sense that it can effortlessly leverage multiple cores on each machine. ZeroMQ is not a replacement for a message broker, but it can work in unison with traditional message-oriented middleware. Combined with Protocol Buffers and other serialization methods, ZeroMQ makes it easy to build extremely high-throughput messaging frameworks.

  1. ZeroMQ’s founder, iMatix, was responsible for moving JPMorgan Chase and the Dow Jones Industrial Average trading platforms to OpenAMQ []
  2. In systems where near real-time is sufficient, JeroMQ is adequate and benefits by not requiring any native linking. []