More Environments Will Not Make Things Easier

Microservices are hard. They require extreme discipline. They require a lot more upfront thinking. They introduce integration challenges and complexity that you otherwise wouldn’t have with a monolith, but service-oriented design is an important part of scaling organization structure. Hundreds of engineers all working on the same codebase will only lead to angst and the inability to be nimble.

This requires a pretty significant change in the way we think about things. We’re creatures of habit, so if we’re not careful, we’ll just keep on applying the same practices we used before we did services. And that will end in frustration.

How can we possibly build working software that comprises dozens of services owned by dozens of teams? Instinct tells us full-scale integration. That’s how we did things before, right? We ran integration tests. We run all of the services we depend on and develop our service against that. But it turns out, these dozen or so services I depend on also have their own dependencies! This problem is not linear.

Okay, so we can’t run everything on our laptop. Instead, let’s just have a development environment that is a facsimile of production with everything deployed. This way, teams can develop their products against real, deployed services. The trade-off is teams need to provide a high level of stability for these “development” services since other teams are relying on them for their own development. If nothing works, development is hamstrung. Personally, I think this is a pretty reasonable trade-off because if we’re disciplined enough, it shouldn’t be hard to provide stable APIs. In fact, if we’re disciplined, it should be a requirement. This is why upfront thinking is critical. Designing your APIs is the most important thing you do. Service-oriented architecture necessitates API-driven development. Literally nothing else matters but the APIs. It reminds me of the famous Jeff Bezos mandate:

  1. All teams will henceforth expose their data and functionality through service interfaces.

  2. Teams must communicate with each other through these interfaces.

  3. There will be no other form of interprocess communication allowed: no direct linking, no direct reads of another team’s data store, no shared-memory model, no back-doors whatsoever. The only communication allowed is via service interface calls over the network.

  4. It doesn’t matter what technology they use. HTTP, Corba, Pubsub, custom protocols – doesn’t matter. Bezos doesn’t care.

  5. All service interfaces, without exception, must be designed from the ground up to be externalizable. That is to say, the team must plan and design to be able to expose the interface to developers in the outside world. No exceptions.

  6. Anyone who doesn’t do this will be fired.

  7. Thank you; have a nice day!

If we’re not disciplined, maintaining stability in a development environment becomes too difficult. So naturally, the solution becomes doubling down—we just need more environments. If every team just gets its own full-scale environment to develop against, no more stability problems. We get to develop our distributed monolith happily in our own little world. That sound you hear is every CFO collectively losing their shit, but whatever, they’re nerds and we’ve gotta get this feature to production!

Besides the obvious cost implications to this approach, perhaps the more insidious problem is it will cause teams to develop in a vacuum. In and of itself, this is not an issue, but for the undisciplined team who is not practicing rigorous API-driven development, it will create moving goalposts. A team will spend months developing its product against static dependencies only to find a massive integration headache come production time. It’s pain deferral, plain and simple. That pain isn’t being avoided or managed, you’re just neglecting to deal with instability and integration to a point where it is even more difficult. It is the opposite of the “fail-fast” mindset. It’s failing slowly and drawn out.

“We need to run everything with this particular configuration to test this, and if anyone so much as sneezes my service becomes unstable.” Good luck with that. I’ve got a dirty little secret: if you’re not disciplined, no amount of environments will make things easier. If you can’t keep your service running in an integration environment, production isn’t going to be any easier.

Similarly, massive end-to-end integration tests spanning numerous services  are an anti-pattern. Another dirty little secret: integrated tests are a scam. With a big enough system, you cannot reasonably expect to write meaningful large-scale tests in any tractable way.

What are we to do then? With respect to development, get it out of your head that you can run a facsimile of production to build features against. If you need local development, the only sane and cost-effective option is to stub. Stub everything. If you have a consistent RPC layer—discipline—this shouldn’t be too difficult. You might even be able to generate portions of stubs.

We used Google App Engine heavily at Workiva, which is a PaaS encompassing numerous services—app server, datastore, task queues, memcache, blobstore, cron, mail—all managed by Google. We were doing serverless before serverless was even a thing. App Engine provides an SDK for developing applications locally on your machine. Numerous times I overheard someone who thought the SDK was just running a facsimile of App Engine on their laptop. In reality, it was running a bunch of stubs!

If you need a full-scale deployed environment, keep in mind that stability is the cost of entry. Otherwise, you’re just delaying problems. In either case, you need stable APIs.

With respect to integration testing, the only tractable solution that doesn’t lull you into a false sense of security is consumer-driven contract testing. We run our tests against a stub, but these tests are also included in a consumer-driven contract. An API provider runs consumer-driven contract tests against its service to ensure it’s not breaking any downstream services.

All of this aside, the broader issue is ensuring a highly disciplined engineering organization. Without this, the rest becomes much more difficult as pain-driven development takes hold. Discipline is a key part of doing service-oriented design and preventing things from getting out of control as a company scales. Moving to microservices means using the right tools and processes, not just applying the old ones in a new context.

There and Back Again: Why PaaS Is Passé (And Why It’s Not)

In 10 years nobody will be talking about Kubernetes. Not because people stopped using it or because it fell out of favor, but because it became utility. Containers, Kubernetes, service meshes—they’ll all be there, the same way VMs, hypervisors, and switches will be. Compute is a commodity, and I don’t care how my workload runs so long as it meets my business’s SLOs and other requirements. Within AWS alone, there are now innumerable ways to run a compute workload.

This was the promise of Platform as a Service (PaaS): provide a pre-built runtime where you simply plug in your application and the rest—compute, networking, storage—is handled for you. Heroku (2007), Google App Engine (2008), OpenShift (2011), and Cloud Foundry (2011) all come to mind. But PaaS has, in many ways, become a sort of taboo in recent years. As a consultant working with companies either in the cloud or looking to move to the cloud, I’ve found PaaS to almost be a trigger word; the wince from clients upon its utterance is almost palpable. It’s hard to pin down exactly why this is the case, but I think there are a number of reasons which range from entirely legit to outright FUD.

There is often a funny cognitive dissonance with these companies who recoil at the mention of PaaS. After unequivocally rejecting the idea for reasons like vendor lock-in and runtime restrictions (again, some of these are legitimate concerns), they will describe, in piecemeal fashion, their own half-baked idea of a PaaS. “Well, we’ll use Kubernetes to handle compute, ELK stack for logging, Prometheus for metrics, OpenTracing for distributed tracing, Redis for caching…”, and so the list goes on. Not to mention there tends to be a bias on build over buy. And we need to somehow provide all of these things as a self-service platform to developers.

While there are ongoing efforts to democratize the cloud and provide reference architectures of sorts, the fact is there are no standards and the proliferation of tools and technologies continues to expand at a rapid pace. On the other hand, as certain tools emerge, such as Kubernetes, the patterns and practices around them have naturally lagged behind. The serverless movement bears this out further. Serverless is the microservice equivalent for PaaS but with a lot less tooling and operations maturity. This is an exciting time, but the cloud has become—without a doubt—an unnavigable wasteland. Even with all the things at your disposal today, it’s still a ton of work to build and operate what is essentially your own PaaS.

But technology is cyclical and the cloud is no different. This evolution, in some sense, parallels what happened with the NoSQL movement. Eric Brewer discusses this in his RICON 2012 talk. When you cut through the hype, NoSQL was about giving developers more control at the expense of less pre-packaged functionality, but it was not intended to be the end game or an alternative to SQL. It’s about two different, equally valid world views: top-down and bottom-up. The top-down view is looking at a model and its semantics and then figuring out what you need to do to implement it. With a relational database, this is using SQL to declaratively construct our model. The bottom-up view is about the layering of primitive components into something more complex. For example, modern databases like CockroachDB present a SQL abstraction on top of a transactional layer on top of a replication layer on top of a simple key-value-store layer. NoSQL gives us a reusable storage component with a lot of flexibility and, over time, as we add more and more pieces on top, we get something that looks more like a database. We start with low-level layers, but the end goal is still the same: nice, user-friendly semantics. I would argue the same thing is happening with PaaS.

What the major cloud providers are doing is unbundling the PaaS. We have our compute, our cluster scheduler, our databases and caches, our message queues, and other components. What’s missing is the glue—the standards and tools that tie these things together into a coherent, manageable unit—a PaaS. Everything old is new again. What we will see is the rebundling of these components gradually happen over time as those standards and tools emerge. Tools like AWS Fargate and Google App Engine Flexible Environment are a step in that direction (Google really screwed up by calling it App Engine Flex because of all the PaaS baggage associated with the App Engine name). The container is just the interface. However, that’s only the start.

PaaS and serverless are great because they truly accelerate application development and reduce operations overhead. However, the trade-off is: we become constrained. For example, with App Engine, we were initially constrained to certain Google Cloud APIs, such as Cloud Datastore and Task Queues, and specific language runtimes. Over time, this has improved, notably, with Cloud SQL, and now today we can use custom runtimes. Similarly, PaaS gives us service autoscaling, high availability, and critical security patches for free, but we lose a degree of control over compute characteristics and workload-processing patterns.

In a sense, what a PaaS offers is an opinionated framework for running applications. Opinionated is good if you want to be productive, but it’s limiting once you have a mature product. What we want are the benefits of PaaS with a bit more flexibility. A PaaS provides us a top-down template from which we can start, but we want to be able to tweak that to our needs. Kubernetes is a key part of that template, but it’s ultimately just a means to an end.

This is why I think no one will be talking about Kubernetes in 10 years. Hopefully by then it’s just not that interesting. If it still is, we’re not done yet.

Plant Trees Before You Need the Shade

Like humans, companies go through phases. There’s the early seed and development phase. Founders are so preoccupied with a problem they go crazy. They consider solutions and the feasibility of a business. There’s the startup phase, when a business is actually born, and it stumbles towards product/market fit. There’s the growth and scaling phase, as we try to close more and more deals while, at the same time, hiring the right people. If we’re lucky, we reach the later stages. There’s the expansion phase, as we attempt to land and expand or attack new verticals or geographies. This is when things get really interesting—and hard. Who are the right people to hire? What are the right products to build? The formula that got us here almost certainly won’t get us there. Lastly, there’s maturity, which is when the business has really hit its stride. Maybe there’s an exit, and very likely there’s new leadership involved.

Consistent in all of this are two things: culture and capabilities. Culture is the invisible hand inside your organization. It’s your company’s autopilot. Specifically, culture is the unique combination of processes and values within an organization. These processes and values are what enable us to replicate our success. They allow people to make decisions which are in alignment with the goals of the company without having to constantly coordinate with one another.

This also means your culture is derived from your capabilities, what your organization can and cannot do. Clayton Christensen groups these factors into three buckets: resources, processes, and values. Resources are the (mostly) tangible things a company has—people, capital, brands, intellectual property, relationships with customers, manufacturers, distributors, and so forth. Processes are what we do with resources to accomplish the organization’s goals, such as developing products, developing employees, hiring, firing, doing market research, and allocating resources. They take in resources and produce value. Processes help us protect and scale our values by providing a means of documenting and codifying them. These are predominantly intangible things. Finally, values define how a company makes decisions. What goes to the top of the list, and what gets ignored. What gets investment, and what doesn’t. These are our priorities that guide us.

There are a few problems with how leadership tends to view capabilities. There is typically an overemphasis on resources. This happens because in the startup phase, success is largely governed by resources. Notably, people. This is especially true of software startups. I quote this section from The Innovator’s Dilemma frequently:

In the start-up stages of an organization, much of what gets done is attributable to resources—people, in particular. The addition or departure of a few key people can profoundly influence its success. Over time, however, the locus of the organization’s capabilities shifts toward its processes and values. As people address recurrent tasks, processes become defined. And as the business model takes shape and it becomes clear which types of business need to be accorded highest priority, values coalesce. In fact, one reason that many soaring young companies flame out after an IPO based on a single hot product is that their initial success is grounded in resources—often the founding engineers—and they fail to develop processes that can create a sequence of hot products.

In the beginning stages, people drive success. Early engineers and founders (the two are not mutually exclusive) have an itch to scratch that they are so passionate about, they evangelize this crazy grand vision and people get excited. The company is small, focused, passionate, and everyone is working closely together to solve a customer problem they are obsessed with. And, sometimes, it pays off.

Next, leadership starts asking itself, “OK, we shipped this crazy successful product, now how do we grow?” This is where the wheels start to come off. There are three problems that occur.

First, the focus shifts away from solving a problem you’re obsessed with to finding the next big product. These companies fail to find a new product because they are searching for one without passion. They are seeking top-line revenue growth, not pursuing a vision. They are looking at markets through the lens of “here’s where we can make money” and not “here’s where we can solve problems,” and in doing so, they lose sight of the customer. At this stage, they basically have forgotten what got them here.

Second, their processes get in their own way. This seems contradictory given that processes, by their very nature, are meant to facilitate repeatability. If we have good processes in place, we should be able to apply them time and time again, even with different people, and end up with consistent results. But things break down when we apply the same processes to different problems. In essence, they try to find success using what brought them success to begin with, but with a warped perspective. You can look at every startup and they will all have wildly different stories about how they found success, but the one thing they will all have in common is that relentless itch. When we attack a new market, we need to do a reset on the processes and values, not a recycle. Christensen suggests if your company is so deeply entrenched in its processes that this isn’t viable, as is often the case with large enterprises, spin it out into a new venture. Processes are as dynamic as the company itself. They are not a one-size-fits-all deal. Christensen calls this the migration of capabilities.

The factors that define an organization’s capabilities and disabilities evolve over time—they start in resources; then move to visible, articulated processes and values; and migrate finally to culture. As long as the organization continues to face the same sorts of problems that its processes and values were designed to address, managing the organization can be straightforward. But because those factors also define what an organization cannot do, they constitute disabilities when the problems facing the company change fundamentally.

Third, they don’t fully appreciate the nature of dynamic priorities. Software startups in particular often mistakenly attribute their initial success to technology when, in reality, it’s because of people and timing. Aside from the passion, you need people early on who can ship and ship often. These might not be the most technically capable engineers, but they get things done when it matters. Later, you need people who can still ship but while cleaning things up. Lastly, you need maintainers—people who can refine without breaking anything. That’s not to say you have these archetypes exclusively at each stage—you want a balance of people—but it’s similar to how companies commonly need a different type of CTO at different phases or an Interim CTO.

While resources are an essential part of early success, it’s processes and values that will sustain you. However, we tend to overfit on resources because we become biased from that success—investing heavily in technology and innovation and grounding the company’s success in a few key individuals. We also overfit because resources are more visible and measurable. Being deliberate about establishing processes and values—which are derived from vision—helps to overcome this bias, but it needs to happen early and be continually reinforced. We also need to ensure our processes adapt to new problems. The larger and more complex an organization becomes, the harder this gets.

Moreover, we need to be conscientious about which processes matter to us the most. Often the most important capabilities aren’t reflected by the most visible processes—product development or customer service, for example—but in the less visible, background processes that support decisions about where to invest resources. These might include determining how market research is done, how financial and sales projections are drawn from this analysis, how products are conceived, how planning and budgets are negotiated, and so on. These processes are where many companies get their inability to cope with change.

And this is where the breakdown happens: a company has highly capable people—people who have helped shape its success as a startup—but arms them with the wrong processes and values. The result is often a boom followed by a bunch of fizzles as they try to catch lightning in a bottle once more. A compelling vision plants a seed. Strong processes and clear values help that seed to grow. But the shade produced by that seed—our capabilities—is stationary, so when we approach a new challenge, we need to recognize when to start tilling.

Building a Distributed Log from Scratch, Part 5: Sketching a New System

In part four of this series we looked at some key trade-offs involved with a distributed log implementation and discussed a few lessons learned while building NATS Streaming. In this fifth and final installment, we’ll conclude by outlining the design for a new log-based system that draws from the previous entries in the series.

The Context

For context, NATS and NATS Streaming are two different things. NATS Streaming is a log-based streaming system built on top of NATS, and NATS is a lightweight pub/sub messaging system. NATS was originally built (and then open sourced) as the control plane for Cloud Foundry. NATS Streaming was built in response to the community’s ask for higher-level guarantees—durability, at-least-once delivery, and so forth—beyond what NATS provided. It was built as a separate layer on top of NATS. I tend to describe NATS as a dial tone—ubiquitous and always on—perfect for “online” communications. NATS Streaming is the voicemail—leave a message after the beep and someone will get to it later. There are, of course, more nuances than this, but that’s the gist.

The key point here is that NATS and NATS Streaming are distinct systems with distinct protocols, distinct APIs, and distinct client libraries. In fact, NATS Streaming was designed to essentially act as a client to NATS. As such, clients don’t talk to NATS Streaming directly, rather all communication goes through NATS. However, the NATS Streaming binary can be configured to either embed NATS or point to a standalone deployment. The architecture is shown below in a diagram borrowed from the NATS website.

Architecturally, this makes a lot of sense. It supports the end-to-end principle in that we layer on additional functionality rather than bake it in to the underlying infrastructure. After all, we can always build stronger guarantees on top, but we can’t always remove them from below. This particular architecture, however, introduces a few challenges (disclosure: while I’m still a fan, I’m no longer involved with the NATS project and the NATS team is aware of these problems and no doubt working to address many of them).

First, there is no “cross-talk” between NATS and NATS Streaming, meaning messages published to NATS are not visible in NATS Streaming and vice versa. Again, they are two completely separate systems that just share the same infrastructure. This means we’re not really layering on message durability to NATS, we’re just exposing a new system which provides these semantics.

Second, because NATS Streaming runs as a “sidecar” to NATS and all of its communication runs through NATS, there is an inherent bottleneck at the NATS connection. This may only be a theoretical limit, but it precludes certain optimizations like using sendfile to do zero-copy reads of the log. It also means we rely on timeouts even in cases where the server could send a response immediately, such as when there is no leader elected for the cluster.

Third, NATS Streaming currently lacks a compelling story around linear scaling other than running multiple clusters and partitioning channels among them at the application level. With respect to scaling a single channel, the only alternative at the moment is to partition it into multiple channels at the application level. My hope is that as clustering matures, this will too.

Fourth, without extending its protocol, NATS Streaming’s authorization is intrinsically limited to the authorization provided by NATS since all communication goes through it. In and of itself, this isn’t a problem. NATS supports multi-user authentication and subject-level permissions, but since NATS Streaming uses an opaque protocol atop NATS, it’s difficult to setup proper ACLs at the streaming level. Of course, many layered protocols support authentication, e.g. HTTP atop TCP. For example, the NATS Streaming protocol could carry authentication tokens or session keys, but it currently does not do this.

Fifth, NATS Streaming does not support wildcard semantics, which—at least in my opinion—is a large selling point of NATS and, as a result, something users have come to expect. Specifically, NATS supports two wildcards in subject subscriptions: asterisk (*) which matches any token in the subject (e.g. foo.* matches foo.bar, foo.baz, etc.) and full wildcard (>) which matches one or more tokens at the tail of the subject (e.g. foo.> matches foo.bar, foo.bar.baz, etc.). Note that this limitation in NATS Streaming is not directly related to the overall architecture but more in how we design the log.

More generally, clustering and data replication was more of an afterthought in NATS Streaming. As we discussed in part four, it’s hard to add this after the fact. Combined with the APIs NATS Streaming provides (which do flow control and track consumer state), this creates a lot of complexity in the server.

A New System

I wasn’t involved much with NATS Streaming beyond the clustering implementation. However, from that work—and through my own use of NATS and from discussions I’ve had with the community—I’ve thought about how I would build something like it if I were to start over. It would look a bit different from NATS Streaming and Kafka, yet also share some similarities. I’ve dubbed this theoretical system Jetstream (update: this is now Liftbridge), though I’ve yet to actually build anything beyond small prototypes. It’s a side project of mine I hope to get to at some point.

Core NATS has a strong community with solid mindshare, but NATS Streaming doesn’t fully leverage this since it’s a new silo. Jetstream aims to address the above problems starting from a simple proposition: many people are already using NATS today and simply want streaming semantics for what they already have. However, we must also acknowledge that other users are happy with NATS as it currently is and have no need for additional features that might compromise simplicity or performance. This was a deciding factor in choosing not to build NATS Streaming’s functionality directly into NATS.

Like NATS Streaming, Jetstream is a separate component which acts as a NATS client. Unlike NATS Streaming, it augments NATS as opposed to implementing a wholly new protocol. More succinctly, Jetstream is a durable stream augmentation for NATS. Next, we’ll talk about how it accomplishes this by sketching out a design.

Cross-Talk

In NATS Streaming, the log is modeled as a channel. Clients create channels implicitly by publishing or subscribing to a topic (called a subject in NATS). A channel might be foo but internally this is translated to a NATS pub/sub subject such as _STAN.pub.foo. Therefore, while NATS Streaming is technically a client of NATS, it’s done so just to dispatch communication between the client and server. The log is implemented on top of plain pub/sub messaging.

Jetstream is merely a consumer of NATS. In it, the log is modeled as a stream. Clients create streams explicitly, which are subscriptions to NATS subjects that are sequenced, replicated, and durably stored. Thus, there is no “cross-talk” or internal subjects needed because Jetstream messages are NATS messages. Clients just publish their messages to NATS as usual and, behind the scenes, Jetstream will handle storing them in a log. In some sense, it’s just an audit log of messages flowing through NATS.

With this, we get wildcards for free since streams are bound to NATS subjects. There are some trade-offs to this, however, which we will discuss in a bit.

Performance

Jetstream does not track subscription positions. It is up to consumers to track their position in a stream or, optionally, store their position in a stream (more on this later). This means we treat a stream as a simple log, allowing us to do fast, sequential disk I/O and minimize replication and protocol chatter as well as code complexity.

Consumers connect directly to Jetstream using a pull-based socket API. The log is stored in the manner described in part one. This enables us to do zero-copy reads from a stream and other important optimizations which NATS Streaming is precluded from doing. It also simplifies things around flow control and batching as we discussed in part three.

Scalability

Jetstream is designed to be clustered and horizontally scalable from the start. We make the observation that NATS is already efficient at routing messages, particularly with high consumer fan-out, and provides clustering of the interest graph. Streams provide the unit of storage and scalability in Jetstream.

A stream is a named log attached to a NATS subject. Akin to a partition in Kafka, each stream has a replicationFactor, which controls the number of nodes in the Jetstream cluster that participate in replicating the stream, and each stream has a leader. The leader is responsible for receiving messages from NATS, sequencing them, and performing replication (NATS provides per-publisher message ordering).

Like Kafka’s controller, there is a single metadata leader for a Jetstream cluster which is responsible for processing requests to create or delete streams. If a request is sent to a follower, it’s automatically forwarded to the leader. When a stream is created, the metadata leader selects replicationFactor nodes to participate in the stream (initially, this selection is random but could be made more intelligent, e.g. selecting based on current load) and replicates the stream to all nodes in the cluster. Once this replication completes, the stream has been created and its leader begins processing messages. This means NATS messages are not stored unless there is a stream matching their subject (this is the trade-off to support wildcards, but it also means we don’t waste resources storing messages we might not care about). This can be mitigated by having publishers ensure a stream exists before publishing, e.g. at startup.

There can exist multiple streams attached to the same NATS subject or even subjects that are semantically equivalent, e.g. foo.bar and foo.*. Each of these streams will receive a copy of the message as NATS handles this fan-out. However, the stream name is unique within a given subject. For example, creating two streams for the subject foo.bar named foo and bar, respectively, will create two streams which will independently sequence all of the messages on the NATS subject foo.bar, but attempting to create two streams for the same subject both named foo will result in creating just a single stream (creation is idempotent).

With this in mind, we can scale linearly with respect to consumers—covered in part three—by adding more nodes to the Jetstream cluster and creating more streams which will be distributed among the cluster. This has the advantage that we don’t need to worry about partitioning so long as NATS is able to withstand the load (there is also an assumption that we can ensure reasonable balance of stream leaders across the cluster). We’ve basically split out message routing from storage and consumption, which allows us to scale independently.

Additionally, streams can join a named consumer group. This, in effect, partitions a NATS subject among the streams in the group, again covered in part three, allowing us to create competing consumers for load-balancing purposes. This works by using NATS queue subscriptions, so the downside is partitioning is effectively random. The upside is consumer groups don’t affect normal streams.

Compaction and Offset Tracking

Streams support multiple log-compaction rules: time-based, message-based, and size-based. As in Kafka, we also support a fourth kind: key compaction. This is how offset storage will work, which was described in part three, but it also enables some other interesting use cases like KTables in Kafka Streams.

As discussed above, messages in Jetstream are simply NATS messages. There is no special protocol needed for Jetstream to process messages. However, publishers can choose to optionally “enhance” their messages by providing additional metadata and serializing their messages into envelopes. The envelope includes a special cookie Jetstream uses to detect if a message is an envelope or a simple NATS message (if the cookie is present by coincidence and envelope deserialization fails, we fall back to treating it as a normal message).

One of the metadata fields on the envelope is an optional message key. A stream can be configured to compact by key. In this case, it retains only the last message for each key (if no key is present, the message is always retained).

Consumers can optionally store their offsets in Jetstream (this can also be transparently managed by a client library similar to Kafka’s high-level consumer). This works by storing offsets in a stream keyed by consumer. A consumer (or consumer library) publishes their latest offset. This allows them to later retrieve their offset from the stream, and key compaction means Jetstream will only retain the latest offset for each consumer. For improved performance, the client library should only periodically checkpoint this offset.

Authorization

Because Jetstream is a separate server which is merely a consumer of NATS, it can provide ACLs or other authorization mechanisms on streams. A simple configuration might be to restrict NATS access to Jetstream and configure Jetstream to only allow access to certain subjects. There is more work involved because there is a separate access-control system, but this gives greater flexibility by separating out the systems.

At-Least Once Delivery

To ensure at-least-once delivery of messages, Jetstream relies on replication and publisher acks. When a message is received on a stream, it’s assigned an offset by the leader and then replicated. Upon a successful replication, the stream publishes an ack to NATS on the reply subject of the message, if present (the reply subject is a part of the NATS message protocol).

There are two implications with this. First, if the publisher doesn’t care about ensuring its message is stored, it need not set a reply subject. Second, because there are potentially multiple (or no) streams attached to a subject (and creation/deletion of streams is dynamic), it’s not possible for the publisher to know how many acks to expect. This is a trade-off we make for enabling subject fan-out and wildcards while remaining scalable and fast. We make the assertion that if guaranteed delivery is important, the publisher should be responsible for determining the destination streams a priori. This allows attaching streams to a subject for use cases that do not require strong guarantees without the publisher having to be aware. Note that this might be an area for future improvement to increase usability, such as storing streams in a registry. However, this is akin to other similar systems, like Kafka, where you must first create a topic and then you publish to that topic.

One caveat to this is if there are existing application-level uses of the reply subject on NATS messages. That is, if other systems are already publishing replies, then Jetstream will overload this. The alternative would be to require the envelope, which would include a canonical reply subject for acks, for at-least-once delivery. Otherwise we would need a change to the NATS protocol itself.

Replication Protocol

For metadata replication and leadership election, we rely on Raft. However, for replication of streams, rather than using Raft or other quorum-based techniques, we use a technique similar to Kafka as described in part two.

For each stream, we maintain an in-sync replica set (ISR), which is all of the replicas currently up to date (at stream creation time, this is all of the replicas). During replication, the leader writes messages to a WAL, and we only wait on replicas in the ISR before committing. If a replica falls behind or fails, it’s removed from the ISR. If the leader fails, any replica in the ISR can take its place. If a failed replica catches back up, it rejoins the ISR. The general stream replication process is as follows:

  1. Client creates a stream with a replicationFactor of n.
  2. Metadata leader selects n replicas to participate and one leader at random (this comprises the initial ISR).
  3. Metadata leader replicates the stream via Raft to the entire cluster.
  4. The nodes participating in the stream initialize it, and the leader subscribes to the NATS subject.
  5. The leader initializes the high-water mark (HW) to 0. This is the offset of the last committed message in the stream.
  6. The leader begins sequencing messages from NATS and writes them to the log uncommitted.
  7. Replicas consume from the leader’s log to replicate messages to their own log. We piggyback the leader’s HW on these responses, and replicas periodically checkpoint the HW to stable storage.
  8. Replicas acknowledge they’ve replicated the message.
  9. Once the leader has heard from the ISR, the message is committed and the HW is updated.

Note that clients only see committed messages in the log. There are a variety of failures that can occur in the replication process. A few of them are described below along with how they are mitigated.

If a follower suspects that the leader has failed, it will notify the metadata leader. If the metadata leader receives a notification from the majority of the ISR within a bounded period, it will select a new leader for the stream, apply this update to the Raft group, and notify the replica set. These notifications need to go through Raft as well in the event of a metadata leader failover occurring at the same time as a stream leader failure. Committed messages are always preserved during a leadership change, but uncommitted messages could be lost.

If the stream leader detects that a replica has failed or fallen too far behind, it removes the replica from the ISR by notifying the metadata leader. The metadata leader replicates this fact via Raft. The stream leader continues to commit messages with fewer replicas in the ISR, entering an under-replicated state.

When a failed replica is restarted, it recovers the latest HW from stable storage and truncates its log up to the HW. This removes any potentially uncommitted messages in the log. The replica then begins fetching messages from the leader starting at the HW. Once the replica has caught up, it’s added back into the ISR and the system resumes its fully replicated state.

If the metadata leader fails, Raft will handle electing a new leader. The metadata Raft group stores the leader and ISR for every stream, so failover of the metadata leader is not a problem.

There are a few other corner cases and nuances to handle, but this covers replication in broad strokes. We also haven’t discussed how to implement failure detection (Kafka uses ZooKeeper for this), but we won’t prescribe that here.

Wrapping Up

This concludes our series on building a distributed log that is fast, highly available, and scalable. In part one, we introduced the log abstraction and talked about the storage mechanics behind it. In part two, we covered high availability and data replication. In part three, we we discussed scaling message delivery. In part four, we looked at some trade-offs and lessons learned. Lastly, in part five, we outlined the design for a new log-based system that draws from the previous entries in the series.

The goal of this series was to learn a bit about the internals of a log abstraction, to learn how it can achieve the three priorities described earlier, and to learn some applied distributed systems theory. Hopefully you found it useful or, at the very least, interesting.

If you or your company are looking for help with system architecture, performance, or scalability, contact Real Kinetic.

Building a Distributed Log from Scratch, Part 4: Trade-Offs and Lessons Learned

In part three of this series we talked about scaling message delivery in a distributed log. In part four, we’ll look at some key trade-offs involved with such systems and discuss a few lessons learned while building NATS Streaming.

Competing Goals

There are a number of competing goals when building a distributed log (these goals also extend to many other types of systems). Recall from part one that our key priorities for this type of system are performance, high availability, and scalability. The preceding parts of this series described at various levels how we can accomplish these three goals, but astute readers likely noticed that some of these things conflict with one another.

It’s easy to make something fast if it’s not fault-tolerant or scalable. If our log runs on a single server, our only constraints are how fast we can send data over the network and how fast the disk I/O is. And this is how a lot of systems, including many databases, tend to work—not only because it performs well, but because it’s simple. We can make these types of systems fault-tolerant by introducing a standby server and allowing clients to failover, but there are a couple issues worth mentioning with this.

With data systems, such as a log, high availability does not just pertain to continuity of service, but also availability of data. If I write data to the system and the system acknowledges that, that data should not be lost in the event of a failure. So with a standby server, we need to ensure data is replicated to avoid data loss (otherwise, in the context of a message log, we must relax our requirement of guaranteed delivery).

NATS Streaming initially shipped as a single-node system, which raised immediate concerns about production-readiness due to a single point of failure. The first step at trying to address some of these concerns was to introduce a fault-tolerance mode whereby a group of servers would run and only one would run as the active server. The active server would obtain an exclusive lock and process requests. Upon detecting a failure, standby servers would attempt to obtain the lock and become the active server.

Aside from the usual issues with distributed locks, this design requires a shared storage layer. With NATS Streaming, this meant either a shared volume, such as Gluster or EFS, or a shared MySQL database. This poses a performance challenge and isn’t particularly “cloud-native” friendly. Another issue is data is not replicated unless done so out-of-band by the storage layer. When we add in data replication, performance is hamstrung even further. But this was a quick and easy solution that offered some solace with respect to a SPOF (disclosure: I was not involved with NATS or NATS Streaming at this time). The longer term solution was to provide first-class clustering and data-replication support, but sometimes it’s more cost effective to provide fast recovery of a single-node system.

Another challenge with the single-node design is scalability. There is only so much capacity that one node can handle. At a certain point, scaling out becomes a requirement, and so we start partitioning. This is a common technique for relational databases where we basically just run multiple databases and divide up the data by some key. NATS Streaming is no different as it offers a partitioning story for dividing up channels between servers. The trouble with partitioning is it complicates things as it typically requires cooperation from the application. To make matters worse, NATS Streaming does not currently offer partitioning at the channel level, which means if a single topic has a lot of load, the solution is to manually partition it into multiple channels at the application level. This is why Kafka chose to partition its topics by default.

So performance is at odds with fault-tolerance and scalability, but another factor is what I call simplicity of mechanism. That is, the simplicity of the design plays an important role in the performance of a system. This plays out at multiple levels. We saw that, at an architectural level, using a simple, single-node design performs best but falls short as a robust solution. In part one, we saw that using a simple file structure for our log allowed us to take advantage of the hardware and operating system in terms of sequential disk access, page caching, and zero-copy reads. In part two, we made the observation that we can treat the log itself as a replicated WAL to solve the problem of data replication in an efficient way. And in part three, we discussed how a simple pull-based model can reduce complexity around flow control and batching.

At the same time, simplicity of “UX” makes performance harder. When I say UX, I mean the ergonomics of the system and how easy it is to use, operate, etc. NATS Streaming initially optimized for UX, which is why it fills an interesting space. Simplicity is a core part of the NATS philosophy, so it caught a small mindshare with developers frustrated or overwhelmed by Kafka. There is appetite for a “Kafka lite,” something which serves a similar purpose to Kafka but without all the bells and whistles and probably not targeted at large enterprises—a classic Innovator’s Dilemma to be sure.

NATS Streaming tracks consumer positions automatically, provides simple APIs, and uses a simple push-based protocol. This also means building a client library is a much less daunting task. The downside is the server needs to do more work. With a single node, as NATS Streaming was initially designed, this isn’t much of a problem. Where it starts to rear its head is when we need to replicate that state across a cluster of nodes. This has important implications with respect to performance and scale. Smart middleware has a natural tendency to become more complex, more fragile, and slower. The end-to-end principle attests to this. Amusingly, NATS Streaming was originally named STAN because it’s the opposite of NATS, a fast and simple messaging system with minimal guarantees.

Simplicity of mechanism tends to simply push complexity around in the system. For example, NATS Streaming provides an ergonomic API to clients by shifting the complexity to the server. Kafka scales and performs exceptionally well by shifting the complexity to other parts of the system, namely the client and ZooKeeper.

Scalability and fault-tolerance are equally at odds with simplicity for reasons mostly described above. The important point here is that these cannot be an afterthought. As I learned while implementing clustering in NATS Streaming, you can’t cleanly and effectively bolt on fault-tolerance onto an existing complex system. One of the laws of Systemantics comes to mind here: “A complex system designed from scratch never works and cannot be patched up to make it work. You have to start over, beginning with a working simple system.” Scalability and fault-tolerance need to be designed from day one.

Lastly, availability is inherently at odds with consistency. This is simply the CAP theorem. Guaranteeing strong consistency requires a quorum when replicating data, which hinders availability and performance. The key here is minimize what you need to replicate or relax your requirements.

Lessons Learned

The section above already contains several lessons learned in the process of working on NATS Streaming and implementing clustering, but I’ll capture a few important ones here.

First, distributed systems are complex enough. Simple is usually better—and faster. Again, we go back to the laws of systems here: “A complex system that works is invariably found to have evolved from a simple system that works.”

Second, lean on existing work. A critical part to delivering clustering rapidly was sticking with Raft and an existing Go implementation for leader election and data replication. There was considerable time spent designing a proprietary solution before I joined which still had edge cases not fully thought through. Not only is Raft off the shelf, it’s provably correct (implementation bugs notwithstanding). And following from the first lesson learned, start with a solution that works before worrying about optimization. It’s far easier to make a correct solution fast than it is to make a fast solution correct. Don’t roll your own coordination protocol if you don’t need to (and chances are you don’t need to).

There are probably edge cases for which you haven’t written tests. There are many failures modes, and you can only write so many tests. Formal methods and property-based testing can help a lot here. Similarly, chaos and fault-injection testing such as Kyle Kingsbury’s Jepsen help too.

Lastly, be honest with your users. Don’t try to be everything to everyone. Instead, be explicit about design decisions, trade-offs, guarantees, defaults, etc. If there’s one takeaway from Kyle’s Jepsen series it’s that many vendors are dishonest in their documentation and marketing. MongoDB became infamous for having unsafe defaults and implementation issues early on, most likely because they make benchmarks look much more impressive.

In part five of this series, we’ll conclude by outlining the design for a new log-based system that draws from ideas in the previous entries in the series.