Take It to the Limit: Considerations for Building Reliable Systems

Complex systems usually operate in failure mode. This is because a complex system typically consists of many discrete pieces, each of which can fail in isolation (or in concert). In a microservice architecture where a given function potentially comprises several independent service calls, high availability hinges on the ability to be partially available. This is a core tenet behind resilience engineering. If a function depends on three services, each with a reliability of 90%, 95%, and 99%, respectively, partial availability could be the difference between 99.995% reliability and 84% reliability (assuming failures are independent). Resilience engineering means designing with failure as the normal.

Anticipating failure is the first step to resilience zen, but the second is embracing it. Telling the client “no” and failing on purpose is better than failing in unpredictable or unexpected ways. Backpressure is another critical resilience engineering pattern. Fundamentally, it’s about enforcing limits. This comes in the form of queue lengths, bandwidth throttling, traffic shaping, message rate limits, max payload sizes, etc. Prescribing these restrictions makes the limits explicit when they would otherwise be implicit (eventually your server will exhaust its memory, but since the limit is implicit, it’s unclear exactly when or what the consequences might be). Relying on unbounded queues and other implicit limits is like someone saying they know when to stop drinking because they eventually pass out.

Rate limiting is important not just to prevent bad actors from DoSing your system, but also yourself. Queue limits and message size limits are especially interesting because they seem to confuse and frustrate developers who haven’t fully internalized the motivation behind them. But really, these are just another form of rate limiting or, more generally, backpressure. Let’s look at max message size as a case study.

Imagine we have a system of distributed actors. An actor can send messages to other actors who, in turn, process the messages and may choose to send messages themselves. Now, as any good software engineer knows, the eighth fallacy of distributed computing is “the network is homogenous.” This means not all actors are using the same hardware, software, or network configuration. We have servers with 128GB RAM running Ubuntu, laptops with 16GB RAM running macOS, mobile clients with 2GB RAM running Android, IoT edge devices with 512MB RAM, and everything in between, all running a hodgepodge of software and network interfaces.

When we choose not to put an upper bound on message sizes, we are making an implicit assumption (recall the discussion on implicit/explicit limits from earlier). Put another way, you and everyone you interact with (likely unknowingly) enters an unspoken contract of which neither party can opt out. This is because any actor may send a message of arbitrary size. This means any downstream consumers of this message, either directly or indirectly, must also support arbitrarily large messages.

How can we test something that is arbitrary? We can’t. We have two options: either we make the limit explicit or we keep this implicit, arbitrarily binding contract. The former allows us to define our operating boundaries and gives us something to test. The latter requires us to test at some undefined production-level scale. The second option is literally gambling reliability for convenience. The limit is still there, it’s just hidden. When we don’t make it explicit, we make it easy to DoS ourselves in production. Limits become even more important when dealing with cloud infrastructure due to their multitenant nature. They prevent a bad actor (or yourself) from bringing down services or dominating infrastructure and system resources.

In our heterogeneous actor system, we have messages bound for mobile devices and web browsers, which are often single-threaded or memory-constrained consumers. Without an explicit limit on message size, a client could easily doom itself by requesting too much data or simply receiving data outside of its control—this is why the contract is unspoken but binding.

Let’s look at this from a different kind of engineering perspective. Consider another type of system: the US National Highway System. The US Department of Transportation uses the Federal Bridge Gross Weight Formula as a means to prevent heavy vehicles from damaging roads and bridges. It’s really the same engineering problem, just a different discipline and a different type of infrastructure.

The August 2007 collapse of the Interstate 35W Mississippi River bridge in Minneapolis brought renewed attention to the issue of truck weights and their relation to bridge stress. In November 2008, the National Transportation Safety Board determined there had been several reasons for the bridge’s collapse, including (but not limited to): faulty gusset plates, inadequate inspections, and the extra weight of heavy construction equipment combined with the weight of rush hour traffic.

The DOT relies on weigh stations to ensure trucks comply with federal weight regulations, fining those that exceed restrictions without an overweight permit.

The federal maximum weight is set at 80,000 pounds. Trucks exceeding the federal weight limit can still operate on the country’s highways with an overweight permit, but such permits are only issued before the scheduled trip and expire at the end of the trip. Overweight permits are only issued for loads that cannot be broken down to smaller shipments that fall below the federal weight limit, and if there is no other alternative to moving the cargo by truck.

Weight limits need to be enforced so civil engineers have a defined operating range for the roads, bridges, and other infrastructure they build. Computers are no different. This is the reason many systems enforce these types of limits. For example, Amazon clearly publishes the limits for its Simple Queue Service—the max in-flight messages for standard queues is 120,000 messages and 20,000 messages for FIFO queues. Messages are limited to 256KB in size. Amazon KinesisApache KafkaNATS, and Google App Engine pull queues all limit messages to 1MB in size. These limits allow the system designers to optimize their infrastructure and ameliorate some of the risks of multitenancy—not to mention it makes capacity planning much easier.

Unbounded anything—whether its queues, message sizes, queries, or traffic—is a resilience engineering anti-pattern. Without explicit limits, things fail in unexpected and unpredictable ways. Remember, the limits exist, they’re just hidden. By making them explicit, we restrict the failure domain giving us more predictability, longer mean time between failures, and shorter mean time to recovery at the cost of more upfront work or slightly more complexity.

It’s better to be explicit and handle these limits upfront than to punt on the problem and allow systems to fail in unexpected ways. The latter might seem like less work at first but will lead to more problems long term. By requiring developers to deal with these limitations directly, they will think through their APIs and business logic more thoroughly and design better interactions with respect to stability, scalability, and performance.

Iris Decentralized Cloud Messaging

A couple weeks ago, I published a rather extensive analysis of numerous message queues, both brokered and brokerless. Brokerless messaging is really just another name for peer-to-peer communication. As we saw, the difference in message latency and throughput between peer-to-peer systems and brokered ones is several orders of magnitude. ZeroMQ and nanomsg are able to reliably transmit millions of messages per second at the expense of guaranteed delivery.

Peer-to-peer messaging is decentralized, scalable, and fast, but it brings with it an inherent complexity. There is a dichotomy between how brokerless messaging is conceptualized and how distributed systems are actually built. Distributed systems are composed of services like applications, databases, caches, etc. Services are composed of instances or nodes—individually addressable hosts, either physical or virtual. The key observation is that, conceptually, the unit of interaction lies at the service level, not the instance level. We don’t care about which database server we interact with, we just want to talk to database server (or perhaps multiple). We’re concerned with logical groups of nodes.

While traditional socket-queuing systems like ZeroMQ solve the problem of scaling, they bring about a certain coupling between components. System designers are forced to build applications which communicate with nodes, not services. We can introduce load balancers like HAProxy, but we’re still addressing specific locations while creating potential single points of failure. With lightweight VMs and the pervasiveness of elastic clouds, IP addresses are becoming less and less static—they come and go. The canonical way of dealing with this problem is to use distributed coordination and service discovery via ZooKeeper, et al., but this introduces more configuration, more moving parts, and more headaches.

The reality is that distributed systems are not built with the instance as the smallest unit of composition in mind, they’re built with services in mind. As discussed earlier, a service is simply a logical grouping of nodes. This abstraction is what we attempt to mimic with things like etcd, ZooKeeper and HAProxy. These assemblies are proven, but there are alternative solutions that offer zero configuration, minimal network management, and overall less complexity. One such solution that I want to explore is a distributed messaging framework called Iris.

Decentralized Messaging with Iris

Iris is posited as a decentralized approach to backend messaging middleware. It looks to address several of the fundamental issues with traditional brokerless systems, like tight coupling and security.

In order to avoid the problem of addressing instances, Iris considers clusters to be the smallest logical blocks of which systems are composed. A cluster is a collection of zero or more nodes which are responsible for a certain service sub-task. Clusters are then assembled into services such that they can communicate with each other without any regard as to which instance is servicing their requests or where it’s located. Lastly, services are composed into federations, which allow them to communicate across different clouds.

instances_vs_clusters

This form of composition allows Iris to use semantic or logical addressing instead of the standard physical addressing. Nodes specify the name of the cluster they wish to participate in, while Iris handles the intricacies of routing and balancing. For example, you might have three database servers which belong to a single cluster called “databases.” The cluster is reached by its name and requests are distributed across the three nodes. Iris also takes care of service discovery, detecting new clusters as they are created on the same cloud.

physical_vs_semantic

With libraries like ZeroMQ, security tends to be an afterthought. Iris has been built from the ground-up with security in mind, and it provides a security model that is simple and fast.

Iris uses a relaxed security model that provides perfect secrecy whilst at the same time requiring effectively zero configuration. This is achieved through the observation that if a node of a service is compromised, the whole system is considered undermined. Hence, the unit of security is a service – opposed to individual instances – where any successfully authenticated node is trusted by all. This enables full data protection whilst maintaining the loosely coupled nature of the system.

In practice, what this means is that each cluster uses a single private key. This encryption scheme not only makes deployment trivial, it minimizes the effect security has on speed.

authentication_and_encryption

Like ZeroMQ and nanomsg, Iris offers a few different messaging patterns. It provides the standard request-reply and publish-subscribe schemes, but it’s important to remember that the smallest addressable unit is the cluster, not the node. As such, requests are targeted at a cluster and subsequently relayed on to a member in a load-balanced fashion. Publish-subscribe, on the other hand, is not targeted at a single cluster. It allows members of any cluster to subscribe and publish to a topic.

Iris also implements two patterns called “broadcast” and “tunnel.” While request-reply forwards a message to one member of a cluster, broadcast forwards it to all members. The caveat is that there is no way to listen for responses to a broadcast.

Tunnel is designed to address the problem of stateful or streaming transactions where a communication between two endpoints may consist of multiple data exchanges which need to occur as an atomic operation. It provides the guarantee of in-order and throttled message delivery by establishing a channel between a client and a node.

schemes

Performance Characteristics

According to its author, Iris is still in a “feature phase” and hasn’t been optimized for speed. Since it’s written in Go, I’ve compared its pub/sub benchmark performance with other Go messaging libraries, NATS and NSQ. As before, these benchmarks shouldn’t be taken as gospel, the code is available here, and pull requests are welcome.

We can see that Iris is comparable to NSQ on the sending side and about 4x on the receiving side, at least out of the box.

Conclusion

Brokerless systems like ZeroMQ and nanomsg offer considerably higher throughput and less latency than classical message-oriented middleware but require greater orchestration of network topologies. They offer high scalability but can lead to tighter coupling between components. Traditional brokered message queues, like those of the AMQP variety, tend to be slower while providing more guarantees and reduced coupling. However, they are also more prone to scale problems like availability and partitioning.

In terms of its qualities, Iris appears to be a reasonable compromise between the decentralized nature of the brokerless systems and the minimal-configuration and management of the brokered ones. Its intrinsic value lies in its ability to hide the complexities of the underlying infrastructure behind distributed systems. Rather, Iris lends itself to building large-scale systems the way we conceptualize and reason about them—by using services as the building blocks, not instances.

A Look at Nanomsg and Scalability Protocols (Why ZeroMQ Shouldn’t Be Your First Choice)

Earlier this month, I explored ZeroMQ and how it proves to be a promising solution for building fast, high-throughput, and scalable distributed systems. Despite lending itself quite well to these types of problems, ZeroMQ is not without its flaws. Its creators have attempted to rectify many of these shortcomings through spiritual successors Crossroads I/O and nanomsg.

The now-defunct Crossroads I/O is a proper fork of ZeroMQ with the true intention being to build a viable commercial ecosystem around it. Nanomsg, however, is a reimagining of ZeroMQ—a complete rewrite in C1. It builds upon ZeroMQ’s rock-solid performance characteristics while providing several vital improvements, both internal and external. It also attempts to address many of the strange behaviors that ZeroMQ can often exhibit. Today, I’ll take a look at what differentiates nanomsg from its predecessor and implement a use case for it in the form of service discovery.

Nanomsg vs. ZeroMQ

A common gripe people have with ZeroMQ is that it doesn’t provide an API for new transport protocols, which essentially limits you to TCP, PGM, IPC, and ITC. Nanomsg addresses this problem by providing a pluggable interface for transports and messaging protocols. This means support for new transports (e.g. WebSockets) and new messaging patterns beyond the standard set of PUB/SUB, REQ/REP, etc.

Nanomsg is also fully POSIX-compliant, giving it a cleaner API and better compatibility. No longer are sockets represented as void pointers and tied to a context—simply initialize a new socket and begin using it in one step. With ZeroMQ, the context internally acts as a storage mechanism for global state and, to the user, as a pool of I/O threads. This concept has been completely removed from nanomsg.

In addition to POSIX compliance, nanomsg is hoping to be interoperable at the API and protocol levels, which would allow it to be a drop-in replacement for, or otherwise interoperate with, ZeroMQ and other libraries which implement ZMTP/1.0 and ZMTP/2.0. It has yet to reach full parity, however.

ZeroMQ has a fundamental flaw in its architecture. Its sockets are not thread-safe. In and of itself, this is not problematic and, in fact, is beneficial in some cases. By isolating each object in its own thread, the need for semaphores and mutexes is removed. Threads don’t touch each other and, instead, concurrency is achieved with message passing. This pattern works well for objects managed by worker threads but breaks down when objects are managed in user threads. If the thread is executing another task, the object is blocked. Nanomsg does away with the one-to-one relationship between objects and threads. Rather than relying on message passing, interactions are modeled as sets of state machines. Consequently, nanomsg sockets are thread-safe.

Nanomsg has a number of other internal optimizations aimed at improving memory and CPU efficiency. ZeroMQ uses a simple trie structure to store and match PUB/SUB subscriptions, which performs nicely for sub-10,000 subscriptions but quickly becomes unreasonable for anything beyond that number. Nanomsg uses a space-optimized trie called a radix tree to store subscriptions. Unlike its predecessor, the library also offers a true zero-copy API which greatly improves performance by allowing memory to be copied from machine to machine while completely bypassing the CPU.

ZeroMQ implements load balancing using a round-robin algorithm. While it provides equal distribution of work, it has its limitations. Suppose you have two datacenters, one in New York and one in London, and each site hosts instances of “foo” services. Ideally, a request made for foo from New York shouldn’t get routed to the London datacenter and vice versa. With ZeroMQ’s round-robin balancing, this is entirely possible unfortunately. One of the new user-facing features that nanomsg offers is priority routing for outbound traffic. We avoid this latency problem by assigning priority one to foo services hosted in New York for applications also hosted there. Priority two is then assigned to foo services hosted in London, giving us a failover in the event that foos in New York are unavailable.

Additionally, nanomsg offers a command-line tool for interfacing with the system called nanocat. This tool lets you send and receive data via nanomsg sockets, which is useful for debugging and health checks.

Scalability Protocols

Perhaps most interesting is nanomsg’s philosophical departure from ZeroMQ. Instead of acting as a generic networking library, nanomsg intends to provide the “Lego bricks” for building scalable and performant distributed systems by implementing what it refers to as “scalability protocols.” These scalability protocols are communication patterns which are an abstraction on top of the network stack’s transport layer. The protocols are fully separated from each other such that each can embody a well-defined distributed algorithm. The intention, as stated by nanomsg’s author Martin Sustrik, is to have the protocol specifications standardized through the IETF.

Nanomsg currently defines six different scalability protocols: PAIR, REQREP, PIPELINE, BUS, PUBSUB, and SURVEY.

PAIR (Bidirectional Communication)

PAIR implements simple one-to-one, bidirectional communication between two endpoints. Two nodes can send messages back and forth to each other.

pair

REQREP (Client Requests, Server Replies)

The REQREP protocol defines a pattern for building stateless services to process user requests. A client sends a request, the server receives the request, does some processing, and returns a response.

reqrep

PIPELINE (One-Way Dataflow)

PIPELINE provides unidirectional dataflow which is useful for creating load-balanced processing pipelines. A producer node submits work that is distributed among consumer nodes.

pipeline

BUS (Many-to-Many Communication)

BUS allows messages sent from each peer to be delivered to every other peer in the group.

bus

PUBSUB (Topic Broadcasting)

PUBSUB allows publishers to multicast messages to zero or more subscribers. Subscribers, which can connect to multiple publishers, can subscribe to specific topics, allowing them to receive only messages that are relevant to them.

pubsub

SURVEY (Ask Group a Question)

The last scalability protocol, and the one in which I will further examine by implementing a use case with, is SURVEY. The SURVEY pattern is similar to PUBSUB in that a message from one node is broadcasted to the entire group, but where it differs is that each node in the group responds to the message. This opens up a wide variety of applications because it allows you to quickly and easily query the state of a large number of systems in one go. The survey respondents must respond within a time window configured by the surveyor.

survey

Implementing Service Discovery

As I pointed out, the SURVEY protocol has a lot of interesting applications. For example:

  • What data do you have for this record?
  • What price will you offer for this item?
  • Who can handle this request?

To continue exploring it, I will implement a basic service-discovery pattern. Service discovery is a pretty simple question that’s well-suited for SURVEY: what services are out there? Our solution will work by periodically submitting the question. As services spin up, they will connect with our service discovery system so they can identify themselves. We can tweak parameters like how often we survey the group to ensure we have an accurate list of services and how long services have to respond.

This is great because 1) the discovery system doesn’t need to be aware of what services there are—it just blindly submits the survey—and 2) when a service spins up, it will be discovered and if it dies, it will be “undiscovered.”

Here is the ServiceDiscovery class:

The discover method submits the survey and then collects the responses. Notice we construct a SURVEYOR socket and set the SURVEYOR_DEADLINE option on it. This deadline is the number of milliseconds from when a survey is submitted to when a response must be received—adjust it accordingly based on your network topology. Once the survey deadline has been reached, a NanoMsgAPIError is raised and we break the loop. The resolve method will take the name of a service and randomly select an available provider from our discovered services.

We can then wrap ServiceDiscovery with a daemon that will periodically run discover.

The discovery parameters are configured through environment variables which I inject into a Docker container.

Services must connect to the discovery system when they start up. When they receive a survey, they should respond by identifying what service they provide and where the service is located. One such service might look like the following:

Once again, we configure parameters through environment variables set on a container. Note that we connect to the discovery system with a RESPONDENT socket which then responds to service queries with the service name and address. The service itself uses a REP socket that simply responds to any requests with “The answer is 42,” but it could take any number of forms such as HTTP, raw socket, etc.

The full code for this example, including Dockerfiles, can be found on GitHub.

Nanomsg or ZeroMQ?

Based on all the improvements that nanomsg makes on top of ZeroMQ, you might be wondering why you would use the latter at all. Nanomsg is still relatively young. Although it has numerous language bindings, it hasn’t reached the maturity of ZeroMQ which has a thriving development community. ZeroMQ has extensive documentation and other resources to help developers make use of the library, while nanomsg has very little. Doing a quick Google search will give you an idea of the difference (about 500,000 results for ZeroMQ to nanomsg’s 13,500).

That said, nanomsg’s improvements and, in particular, its scalability protocols make it very appealing. A lot of the strange behaviors that ZeroMQ exposes have been resolved completely or at least mitigated. It’s actively being developed and is quickly gaining more and more traction. Technically, nanomsg has been in beta since March, but it’s starting to look production-ready if it’s not there already.

  1. The author explains why he should have originally written ZeroMQ in C instead of C++. []

Distributed Messaging with ZeroMQ

“A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable.” -Leslie Lamport

With the increased prevalence and accessibility of cloud computing, distributed systems architecture has largely supplanted more monolithic constructs. The implication of using a service-oriented architecture, of course, is that you now have to deal with a myriad of difficulties that previously never existed, such as fault tolerance, availability, and horizontal scaling. Another interesting layer of complexity is providing consistency across nodes, which itself is a problem surrounded with endless research. Algorithms like Paxos and Raft attempt to provide solutions for managing replicated data, while other solutions offer eventual consistency.

Building scalable, distributed systems is not a trivial feat, but it pales in comparison to building real-time systems of a similar nature. Distributed architecture is a well-understood problem and the fact is, most applications have a high tolerance for latency. Few systems have a demonstrable need for real-time communication, but the few that do present an interesting challenge for developers. In this article, I explore the use of ZeroMQ to approach the problem of distributed, real-time messaging in a scalable manner while also considering the notion of eventual consistency.

The Intelligent Transport Layer

ZeroMQ is a high-performance asynchronous messaging library written in C++. It’s not a dedicated message broker but rather an embeddable concurrency framework with support for direct and fan-out endpoint connections over a variety of transports. ZeroMQ implements a number of different communication patterns like request-reply, pub-sub, and push-pull through TCP, PGM (multicast), in-process, and inter-process channels. The glaring lack of UDP support is, more or less, by design because ZeroMQ was conceived to provide guaranteed-ish delivery of atomic messages. The library makes no actual guarantee of delivery, but it does make a best effort. What ZeroMQ does guarantee, however, is that you will never receive a partial message, and messages will be received in order. This is important because UDP’s performance gains really only manifest themselves in lossy or congested environments.

The comprehensive list of messaging patterns and transports alone make ZeroMQ an appealing choice for building distributed applications, but it particularly excels due to its reliability, scalability and high throughput. ZeroMQ and related technologies are popular within high-frequency trading, where packet loss of financial data is often unacceptable1. In 2011, CERN actually performed a study comparing CORBA, Ice, Thrift, ZeroMQ, and several other protocols for use in its particle accelerators and ranked ZeroMQ the highest.

cern

ZeroMQ uses some tricks that allow it to actually outperform TCP sockets in terms of throughput such as intelligent message batching, minimizing network-stack traversals, and disabling Nagle’s algorithm. By default (and when possible), messages are queued on the subscriber, which attempts to avoid the problem of slow subscribers. However, when this isn’t sufficient, ZeroMQ employs a pattern called the “Suicidal Snail.” When a subscriber is running slow and is unable to keep up with incoming messages, ZeroMQ convinces the subscriber to kill itself. “Slow” is determined by a configurable high-water mark. The idea here is that it’s better to fail fast and allow the issue to be resolved quickly than to potentially allow stale data to flow downstream. Again, think about the high-frequency trading use case.

A Distributed, Scalable, and Fast Messaging Architecture

ZeroMQ makes a convincing case for use as a transport layer. Let’s explore a little deeper to see how it could be used to build a messaging framework for use in a real-time system. ZeroMQ is fairly intuitive to use and offers a plethora of bindings for various languages, so we’ll focus more on the architecture and messaging paradigms than the actual code.

About a year ago, while I first started investigating ZeroMQ, I built a framework to perform real-time messaging and document syncing called Zinc. A “document,” in this sense, is any well-structured and mutable piece of data—think text document, spreadsheet, canvas, etc. While purely academic, the goal was to provide developers with a framework for building rich, collaborative experiences in a distributed manner.

The framework actually had two implementations, one backed by the native ZeroMQ, and one backed by the pure Java implementation, JeroMQ2. It was really designed to allow any transport layer to be used though.

Zinc is structured around just a few core concepts: Endpoints, ChannelListeners, MessageHandlers, and Messages. An Endpoint represents a single node in an application cluster and provides functionality for sending and receiving messages to and from other Endpoints. It has outbound and inbound channels for transmitting messages to peers and receiving them, respectively.

endpoint

ChannelListeners essentially act as daemons listening for incoming messages when the inbound channel is open on an Endpoint. When a message is received, it’s passed to a thread pool to be processed by a MessageHandler. Therefore, Messages are processed asynchronously in the order they are received, and as mentioned earlier, ZeroMQ guarantees in-order message delivery. As an aside, this is before I began learning Go, which would make for an ideal replacement for Java here as it’s quite well-suited to the problem :)

Messages are simply the data being exchanged between Endpoints, from which we can build upon with Documents and DocumentFragments. A Document is the structured data defined by an application, while DocumentFragment represents a partial Document, or delta, which can be as fine- or coarse- grained as needed.

Zinc is built around the publish-subscribe and push-pull messaging patterns. One Endpoint will act as the host of a cluster, while the others act as clients. With this architecture, the host acts as a publisher and the clients as subscribers. Thus, when a host fires off a Message, it’s delivered to every subscribing client in a multicast-like fashion. Conversely, clients also act as “push” Endpoints with the host being a “pull” Endpoint. Clients can then push Messages into the host’s Message queue from which the host is pulling from in a first-in-first-out manner.

This architecture allows Messages to be propagated across the entire cluster—a client makes a change which is sent to the host, who propagates this delta to all clients. This means that the client who initiated the change will receive an “echo” delta, but it will be discarded by checking the Message origin, a UUID which uniquely identifies an Endpoint. Clients are then responsible for preserving data consistency if necessary, perhaps through operational transformation or by maintaining a single source of truth from which clients can reconcile.

cluster

One of the advantages of this architecture is that it scales reasonably well due to its composability. Specifically, we can construct our cluster as a tree of clients with arbitrary breadth and depth. Obviously, the more we scale horizontally or vertically, the more latency we introduce between edge nodes. Coupled with eventual consistency, this can cause problems for some applications but might be acceptable to others.

scalability

The downside is this inherently introduces a single point of failure characterized by the client-server model. One solution might be to promote another node when the host fails and balance the tree.

Once again, this framework was mostly academic and acted as a way for me to test-drive ZeroMQ, although there are some other interesting applications of it. Since the framework supports multicast message delivery via push-pull or publish-subscribe mechanisms, one such use case is autonomous load balancing.

Paired with something like ZooKeeper, etcd, or some other service-discovery protocol, clients would be capable of discovering hosts, who act as load balancers. Once a client has discovered a host, it can request to become a part of that host’s cluster. If the host accepts the request, the client can begin to send messages to the host (and, as a result, to the rest of the cluster) and, likewise, receive messages from the host (and the rest of the cluster). This enables clients and hosts to submit work to the cluster such that it’s processed in an evenly distributed way, and workers can determine whether to pass work on further down the tree or process it themselves. Clients can choose to participate in load-balancing clusters at their own will and when they become available, making them mostly autonomous. Clients could then be quickly spun-up and spun-down using, for example, Docker containers.

ZeroMQ is great for achieving reliable, fast, and scalable distributed messaging, but it’s equally useful for performing parallel computation on a single machine or several locally networked ones by facilitating in- and inter- process communication using the same patterns. It also scales in the sense that it can effortlessly leverage multiple cores on each machine. ZeroMQ is not a replacement for a message broker, but it can work in unison with traditional message-oriented middleware. Combined with Protocol Buffers and other serialization methods, ZeroMQ makes it easy to build extremely high-throughput messaging frameworks.

  1. ZeroMQ’s founder, iMatix, was responsible for moving JPMorgan Chase and the Dow Jones Industrial Average trading platforms to OpenAMQ []
  2. In systems where near real-time is sufficient, JeroMQ is adequate and benefits by not requiring any native linking. []

Modularizing Infinitum: A Postmortem

In addition to getting the code migrated from Google Code to GitHub, one of my projects over the holidays was to modularize the Infinitum Android framework I’ve been working on for the past year.

Infinitum began as a SQLite ORM and quickly grew to include a REST ORM implementation,  REST client, logging wrapper, DI framework, AOP module, and, of course, all of the framework tools needed to support these various functionalities. It evolved as I added more and more features in a semi-haphazard way. In my defense, the code was organized. It was logical. It made sense. There was no method, but there also was no madness. Everything was in an appropriately named package. Everything was coded to an interface. There was no duplicated code. However, modularity — in terms of minimizing framework dependencies — wasn’t really in mind at the time, and the code was all in a single project.

The Wild, Wild West

The issue wasn’t how the code was organized, it was how the code was integrated. The project was cowboy coding at its finest. I was the only stakeholder, the only tester, the only developer — judge, jury, and executioner. I was building it for my own personal use after all. Consequently, there was no planning involved, unit testing was somewhere between minimal and non-existent, and what got done was at my complete discretion. Ultimately, what was completed any given day, more or less, came down to what I felt like working on.

What started as an ORM framework became a REST framework, which became a logging framework, which became an IOC framework, which became an AOP framework. All of these features, built from the ground up, were tied together through a context, which provided framework configuration data. More important, the Infinitum context stored the bean factory used for storing and retrieving bean definitions used by both the framework and the client. The different modules themselves were not tightly coupled, but they were connected to the context like feathers on a bird.

infinitum-arch

The framework began to grow large. It was only about 300KB of actual code (JARed without ProGuard compression), but it had a number of library dependencies, namely Dexmaker, Simple XML, and GSON, which is over 1MB combined in size. Since it’s an Android framework, I wanted to keep the footprint as small as possible. Additionally, it’s likely that someone wouldn’t be using all of the features in the framework. Maybe they just need the SQLite ORM, or just the REST client, or just dependency injection. The way the framework was structured, they had to take it all or none.

A Painter Looking for a Brush

I began to investigate ways to modularize it. As I illustrated, the central problem lay in the fact that the Infinitum context had knowledge of all of the different modules and was responsible for calling and configuring their APIs. If the ORM is an optional dependency, the context should not need to have knowledge of it. How can the modules be decoupled from the context?

Obviously, there is a core dependency, Infinitum Core, which consists of the framework essentials. These are things used throughout the framework in all of the modules — logging, DI1, exceptions, and miscellaneous utilities. The goal was to pull off ORM, REST, and AOP modules.

My initial approach was to try and use the decorator pattern to “decorate” the Infinitum context with additional functionality. The OrmContextDecorator would implement the ORM-specific methods, the AopContextDecorator would implement the AOP-specific methods, and so on. The problem with this was that it would still require the module-specific methods to be declared in the Infinitum context interface. Not only would they need to be stubbed out in the context implementation, a lot of module interfaces would need to be shuffled and placed in Infinitum Core  in order to satisfy the compiler. The problem remained; the context still had knowledge of all the modules.

I had another idea in mind. Maybe I could turn the Infinitum context from a single point of configuration to a hierarchical structure where each module has its own context as a “child” of the root context. The OrmContext interface could extend the InfinitumContext interface, providing ORM-specific functionality while still inheriting the core context methods. The implementation would then contain a reference to the parent context, so if it was unable to perform a certain piece of functionality, it could delegate to the parent. This could work. The Infinitum context has no notion of module X, Y, or Z, and, in effect, the control has been inverted. You could call it the Hollywood Principle — “Don’t call us, we’ll call you.”

infinitum-context-hierarchy

There’s still one remaining question: how do we identify the “child” contexts and subsequently initialize them? The solution is to maintain a module registry. This registry will keep track of the optional framework dependencies and is responsible for initializing them if they are available. We use a marker class from each module, a class we know exists if the dependency is included in the classpath, to check its availability.

Lastly, we use reflection to instantiate an instance of the module context. I used an enum to maintain a registry of Infinitum modules. I then extended the enum to add an initialize method which loads a context instance.

The modules get picked up during a post-processing step in the ContextFactory. It’s this step that also adds them as child contexts to the parent.

New modules can be added to the registry without any changes elsewhere. As long as the context has been implemented, they will be picked up and processed automatically.

Once this architecture was in place, separating the framework into different projects was simple. Now Infinitum Core can be used by itself if only dependency injection is needed, the ORM can be included if needed for SQLite, AOP included for aspect-oriented programming, and Web for the RESTful web service client and various HTTP utilities.

We Shape Our Buildings, and Afterwards, Our Buildings Shape Us

I think this solution has helped to minimize some of the complexity a bit. As with any modular design, not only is it more extensible, it’s more maintainable. Each module context is responsible for its own configuration, so this certainly helped to reduce complexity in the InfinitumContext implementation as before it was handling the initialization for the ORM, AOP, and REST pieces. It also worked out in that I made the switch to GitHub2 by setting up four discrete repositories, one for each module.

In retrospect, I would have made things a lot easier on myself if I had taken a more modular approach from the beginning. I ended up having to reengineer quite a bit, although once I had a viable solution, it actually wasn’t all that much work. I was fortunate in that I had things fairly well designed (perhaps not at a very high level, but in general) and extremely organized. It’s difficult to anticipate change, but chances are you’ll be kicking yourself if you don’t. I started the framework almost a year ago, and I never imagined it would grow to what it is today.

  1. I was originally hoping to pull out dependency injection as a separate module, but the framework relies heavily on it to wire up components. []
  2. Now that the code’s pushed to GitHub, I begin the laborious task of migrating the documentation over from Google Code. []

The Importance of Being Idle

“Practice not-doing and everything will fall into place.”

It’s good to be lazy. Sometimes, in programming, it can also be hard to be lazy. It’s this paradox that I will explore today — The Art of Being Lazy. Specifically, I’m going to dive into a design pattern known as lazy loading by discussing why it’s used, the different flavors it comes in, and how it can be implemented.

Lazy loading is a pretty simple concept: don’t load something until you really need it. However, the philosophy can be generalized further: don’t do something until you need to do it. It’s this line of thinking that has helped lead to processes like Kanban and lean software development (and also probably got you through high school). Notwithstanding, this tenet goes beyond the organizational level. It’s about optimizing efficiency and minimizing waste. There’s a lot to be said about optimizing efficiency in a computer program, which is why The Art of Being Lazy is an exceedingly relevant principle.

They Don’t Teach You This in School

My first real job as a programmer was working as a contractor for Thomson Reuters.  I started as a .NET developer (having no practical experience with it whatsoever) working on a web application that primarily consisted of C# and ASP.NET. The project was an internal configuration management database, which is basically just a big database containing information pertaining to all of the components of an information system (in this case, Thomson’s West Tech network, the infrastructure behind their legal technology division).

This CMDB was geared towards providing application-impact awareness, which, more or less, meant that operations and maintenance teams could go in and see what applications or platforms would be affected by a server going down (hopefully for scheduled maintenance and not a datacenter outage), which business units were responsible for said applications, and who the contacts were for those groups. It also provided various other pieces of information pertaining to these systems, but what I’m getting at is that we were dealing with a lot of data, and this data was all interconnected. We had a very complex domain model with a lot of different relationships. What applications are running on what app servers? Which database servers do they depend on? What NAS servers have what NAS volumes mounted on them? The list goes on.

Our object graph was immense. You can imagine the scale of infrastructure a company like Thomson Reuters has. The crux of the problem was that we were persisting all of this data as well as the relationships between it, and we wanted to allow users of this software to navigate this vast hierarchy of information. Naturally, we used an ORM to help manage this complexity. Since we were working in .NET, and many of us were Java developers, we went with NHibernate.

We wanted to be able to load, say, an application server, and see all of the entities associated with it. To the uninitiated (which, at the time, would have included myself), this might seem like a daunting task. Loading any given entity would result in loading hundreds, if not thousands, of related entities because it would load those directly related, then those related to the immediate neighbors, continuing on in what seems like a never-ending cascade. Not only would it take forever, but we’d quickly run out of memory! There’s simply no way you can deal with an object graph of that magnitude and reasonably perform any kind of business logic on it. Moreover, it’s certainly not scalable, so obviously this would be a very naive thing to do. The good news is that, unsurprisingly,  it’s something that’s not necessary to do.

It’s Good to be Lazy

The solution, of course, as I’ve already hit you across the face with, is a design pattern known as lazy loading. The idea is to defer initialization of an object until it’s truly needed (i.e. accessed). Going back to my anecdote, when we load, for example, an application server entity, rather than eagerly loading all its associated entities, such as servers, applications, BIG-IPs, etc., we use placeholders. Those related entities are then loaded on-the-fly when they are accessed.

Lazy loading can be implemented in a few different ways, through lazy initialization, ghost objects, value holders, and dynamic proxies — each has its own trade-offs. I’ll talk about all of them, but I’m going to primarily focus on using proxies since it’s probably the most widely-used approach, especially within the ORM arena.

Lazy initialization probably best illustrates the concept of lazy loading. With lazy initialization, the object to be lazily loaded is represented by a special marker value (typically null) which indicates that the object has yet to be loaded. Every call to the object will first check to see if it has been loaded/initialized, and if it hasn’t, it gets loaded/initialized. Thus, the first call to the object will load it, while subsequent calls will not need to. The code below shows how this is done.

Ghost objects are simply entities that have been partially loaded, usually just having the ID populated so that the full object can be loaded later. This is very similar to lazy initialization. The difference is that the related entity is initialized but not populated.

A value holder is an object that takes the place of the lazily loaded object and is responsible for loading it. The value holder has a getValue method which does the lazy loading. The entity is loaded on the first call to getValue.

The above solutions get the job done, but their biggest problem is that they are pretty intrusive. The classes have knowledge that they are lazily loaded and require logic for loading. Luckily, there’s an option which helps to avoid this issue. Using dynamic proxies1, we can write an entity class which has no knowledge of lazy loading and yet still lazily load it if we want to.

This is possible because the proxy extends the entity class or, if applicable, implements the same interface, allowing it to intercept calls to the entity itself. That way, the object need not be loaded, but when it’s accessed, the proxy intercepts the invocation, loads the object if needed, and then delegates the invocation to it. Since proxying classes requires bytecode instrumentation, we need to use a library like Cglib.

First, we implement an InvocationHandler we can use to handle lazy loading.

Now, we can use Cglib’s Enhancer class to create a proxy.

Now, the first call to any method on foo will invoke loadObject, which in turn will load the object into memory. Cglib actually provides an interface for doing lazy loading called LazyLoader, so we don’t even need to implement an InvocationHandler.

ORM frameworks like Hibernate use proxies to implement lazy loading, which is one of the features we took advantage of while developing the CMDB application. One of the nifty things that Hibernate supports is paged lazy loading, which allows entities in a collection to be loaded and unloaded while it’s being iterated over. This is extremely useful for one-to-many and, in particular, one-to-very-many relationships.

Lazy loading was also one of the features I included in Infinitum’s ORM, implemented using dynamic proxies as well.2 At a later date, I may examine how lazy loading is implemented within the context of an ORM and how Infinitum uses it. It’s a very useful design pattern and provides some pretty significant performance optimizations. It just goes to show that sometimes being lazy pays off.

  1. For more background on proxies themselves, check out one of my previous posts. []
  2. Java bytecode libraries like Cglib are not compatible on the Android platform. Android uses its own bytecode variant. []

Proxies: Why They’re Useful and How They’re Implemented

I wanted to write about lazy loading, but doing so requires some background on proxies. Proxies are such an interesting and useful concept that I decided it would be worthwhile to write a separate post discussing them. I’ve talked about them in the past, for instance on StackOverflow, so this will be a bit of a rehash, but I will go into a little more depth here.

What is a proxy? Fundamentally, it’s a broker, or mediator, between an object and that object’s user, which I will refer to as its client. Specifically, a proxy intercepts calls to the object, performs some logic, and then (typically) passes the call on to the object itself. I say typically because the proxy could simply intercept without ever calling the object.

proxy

A proxy works by implementing an object’s non-final methods. This means that proxying an interface is pretty simple because an interface is merely a list of method signatures that need to be implemented. This facilitates the interception of method invocations quite nicely. Proxying a concrete class is a bit more involved, and I’ll explain why shortly.

Proxies are useful, very useful. That’s because they allow for the modification of an object’s behavior and do so in a way that’s completely invisible to the user. Few know about them, but many use them, usually without even being aware of it. Hibernate uses them for lazy loading, Spring uses them for aspect-oriented programming, and Mockito uses them for creating mocks. Those are just three (huge) use cases of many.

JDK Dynamic Proxies

Java provides a Proxy class which implements a list of interfaces at runtime. The behavior of a proxy is specified through an implementation of InvocationHandler, an interface which has a single method called invoke. The signature for the invoke method looks like the following:

The proxy argument is the proxy instance the method was invoked on. The method argument is the Method instance corresponding to the interface method invoked on the object.  The last argument, args, is an array of objects which consists of the arguments passed in to the method invocation, if any.

Each proxy has an InvocationHandler associated with it, and it’s this handler which is responsible for delegating method calls made on the proxy to the object being proxied. This level of indirection means that methods are not invoked on an object itself but rather on its proxy. The example below illustrates how an InvocationHandler would be implemented such that “Hello World” is printed to the console before every method invocation.

This is pretty easy to understand. The invoke method will intercept any method call by printing “Hello World” before delegating the invocation to the proxied object. It’s not very useful, but it does lend some insight into why proxies are useful for AOP.

An interesting observation is that invoke provides a reference to the proxy itself, meaning if you were to instead call the method on it, you would receive a StackOverflowError because it would lead to an infinite recursion.

Note that the InvocationHandler alone is of no use. In order to actually create a proxy, we need to use the Proxy class and provide the InvocationHandler. Proxy provides a static method for creating new instances called newProxyInstance. This method takes three arguments, a class loader, an array of interfaces to be implemented by the proxy, and the proxy behavior in the form of an InvocationHandler. An example of creating a proxy for a List is shown below.

The client invoking methods on the List can’t tell the difference between a proxy and its underlying object representation, nor should it care.

Proxying Classes

While proxying an interface dynamically is relatively straightforward, the same cannot be said for proxying a class. Java’s Proxy class is merely a runtime implementation of an interface or set of interfaces, but a class does not have to implement an interface at all. As a result, proxying classes requires bytecode manipulation. Fortunately, there are libraries available which help to facilitate this through a high-level API. For example, Cglib (short for code-generation library) provides a way to extend Java classes at runtime and Javassist (short for Java Programming Assistant) allows for both class modification and creation at runtime. It’s worth pointing out that Spring, Hibernate, Mockito, and various other frameworks make heavy use of these libraries.

Cglib and Javassist provide support for proxying classes because they can dynamically generate bytecode (i.e. class files), allowing us to extend classes at runtime in a way that Java’s Proxy can implement an interface at runtime.

At the core of Cglib is the Enhancer class, which is used to generate dynamic subclasses. It works in a similar fashion to the JDK’s Proxy class, but rather than using a JDK InvocationHandler, it uses a Callback for providing proxy behavior. There are various Callback extensions, such as InvocationHandler (which is a replacement for the JDK version), LazyLoader, NoOp, and Dispatcher.

This code is essentially the same as the earlier example in that every method invocation on the proxied object will first print “Hello World” before being delegated to the actual object. The difference is that MyClass does not implement an interface, so we didn’t need to specify an array of interfaces for the proxy.

Proxies are a very powerful programming construct which enables us to implement things like lazy loading and AOP. In general, they allow us to alter the behavior of objects transparently. In the future, I’ll dive into the specific use cases of lazy loading and AOP.

Solving the Referential Integrity Problem

“A man with a watch knows what time it is. A man with two watches is never sure.”

I’ve been developing my open source Android framework, Infinitum, for the better part of 10 months now. It has brought about some really interesting problems that I’ve had to tackle, which is one of the many reasons I enjoy working on it so much.

Chicken or the Egg

Although it’s much more now, Infinitum began as an object-relational mapper which was loosely modeled after Hibernate. One of the first major issues I faced while developing the ORM component was loading object graphs. To illustrate what I mean by this, suppose we’re developing some software for a department store. The domain model for this software might look something like this:

As you can see, an Employee works in one Department, and, conversely, a Department has one or more Employees working in it, forming a many-to-one relationship and resulting in the class below.

Pretty straightforward, right? Now, let’s say we want to retrieve the Employee with, say, the ID 4028 from the database. Thinking about it at a high level and ignoring any notion of lazy loading, this appears to be rather simple.

1. Perform a query on the Employee table.

2. Instantiate a new Employee object.
3. Populate the Employee object’s fields from the query result.

But there’s some handwaving going on in those three steps, specifically the last one. One of the Employee fields is an entity, namely department. Okay, this shouldn’t be a problem. We just need to perform a second query to retrieve the Department associated with the Employee (the result of the first query is going to include the Department foreign key — let’s assume its 14).

Then we just create the Department object, populate it and assign it to the respective field in the Employee.

Once again, there’s a problem. To understand why, it’s helpful to see what the Department class actually looks like.

Do you see what the issue is? In order to construct our Employee, we need to construct his Department. In order to construct his Department, we need to construct the Employee. Our object graph has a cycle that’s throwing us for a (infinite) loop.

Breaking the Cycle

Fortunately, there’s a pretty easy solution for this chicken-or-the-egg problem. We’ll make use of a HashMap to keep tabs on our object graph as we incrementally build it. This will make more sense in just a bit.

We’re going to use a HashMap keyed off of an integer hash where the map values will be the entities in the object graph.

The integer hash will be a unique value computed for each entity we need to load to fulfill the object graph. The idea is that we will store the partially populated entity in the HashMap to have its remaining fields populated later. Loading an entity will take the following steps:

  1. Perform query on the entity table.
  2. Instantiate a new entity object.
  3. Populate the entity object fields which do not belong to a relationship from the query result.
  4. Compute the hash for the partial entity object.
  5. Check if the HashMap contains the computed hash.
  6. If the HashMap contains the hash, return the associated entity object (this breaks any potential cycle).
  7. Otherwise, store the entity object in the HashMap using the hash as its key.
  8. Load related entities by recursively calling this sequence.

Going back to our Employee problem, retrieving an Employee from the database will take these steps:

  1. Perform query on the Employee table.
  2. Instantiate a new Employee object.
  3. Populate the Employee object fields which do not belong to a relationship from the query result.
  4. Compute the hash for the partial Employee object.
  5. Check if the HashMap contains the computed hash (it won’t).
  6. Store the Employee object in the HashMap using the hash as its key.
  7. Perform query on the Department table.
  8. Instantiate a new Department object.
  9. Populate the Department object fields which do not belong to a relationship from the query results.
  10. Compute the hash for the partial Department object.
  11. Check if the HashMap contains the computed hash (again, it won’t).
  12. Store the Department object in the HashMap using the hash as its key.
  13. The cycle will terminate and the two objects in the HashMap, the Employee and the Department, will be fully populated and referencing each other.

Considering the HashMap is not specific to any entity type (i.e. it will hold Employees, Departments, and any other domain types we come up with), how do we compute a unique hash for objects of various types? Moreover, we’re computing hashes for incomplete objects, so what gives?

Obviously, we can’t make use of hashCode() since not every field is guaranteed to be populated. Fortunately, we can take advantage of the fact that every entity must have a primary key, but, unless we’re using a policy where every primary key is unique across every table, this won’t get us very far. We will include the entity type as a factor in our hash code. Here’s the code Infinitum currently uses to compute this hash:

This hash allows us to uniquely identify entities even if they have not been fully populated. Our cycle problem is solved!

Maintaining Referential Integrity

The term “referential integrity” is typically used to refer to a property of relational databases. However, when I say referential integrity, I’m referring to the notion of object references in an object graph. This referential integrity is something ORMs must keep track of or otherwise you run into some big problems.

To illustrate this, say our department store only has one department and two employees who work in said department (this might defeat the purpose of a department store, but just roll with it). Now, let’s say we retrieve one Employee, Bill, from the database. Once again ignoring lazy loading, this should implicitly load an object graph consisting of the Employee, the Department, and the Employees assigned to that Department. Next, let’s subsequently retrieve the second Employee, Frank, from the database. Again, this will load the object graph.

Bill and Frank both work in the same Department, but if referential integrity is not enforced, objects can become out of sync.

The underlying problem is that there are two different copies of the Department object, but we must abide by the Highlander Principle in that “there can be only one.” Bill and Frank should reference the same instance so that, regardless of how the Department is dereferenced, it stays synced between every object in the graph.

In plain terms, when we’re retrieving objects from the database, we must be cautious not to load the same one twice. Otherwise, we’ll have two objects corresponding to a single database row and things will get out of sync.

Enter Identity Map

This presents an interesting problem. Knowing what we learned earlier with regard to the chicken-or-the-egg problem, can we apply a similar solution? The answer is yes! In fact, the solution we discussed earlier was actually masquerading as a fairly common design pattern known as the Identity Map, originally cataloged by Martin Fowler in his book Patterns of Enterprise Application Architecture.

The idea behind the Identity Map pattern is that, every time we read a record from the database, we first check the Identity Map to see if the record has already been retrieved. This allows us to simply return a new reference to the in-memory record rather than creating a new object, maintaining referential integrity.

A secondary benefit to the Identity Map is that, since it acts as a cache, it reduces the number of database calls needed to retrieve objects, which yields a performance enhancement.

An Identity Map is normally tied to some sort of transactional context such as a session. This works exceedingly well for Infinitum because its ORM is built around the notion of a Session object, which  can be configured as a scoped unit of work. The Infinitum Session contains a cache which functions as an Identity Map, solving both the cycle and the referential integrity issues.

It’s worth pointing out, however, that while an Identity Map maintains referential integrity within the context of a session, it doesn’t do anything to prevent incongruities between different sessions. This is a complex problem that usually requires a locking strategy, which is beyond the scope of this blog post.

Under the Hood

It may be helpful to see how Infinitum uses an Identity Map to solve the cycle problem. The method createFromCursor takes a database result cursor and transforms it into an instance of the given type. It makes use of a recursive method that goes through the process I outlined earlier. The call to loadRelationships will result in this recursion.

Entities are stored in the Session cache as they are retrieved, allowing us to enforce referential integrity while also preventing any infinite loops that might occur while building up the object graph.

So that’s it! We’ve learned to make use of the Identity Map pattern to solve some pretty interesting problems. We looked at how we can design an ORM to load object graphs that contain cycles as well as maintain this critical notion of referential integrity. We also saw how the Identity Map helps to give us some performance gain through caching. Infinitum’s ORM module makes use of this pattern in its session caching and many other frameworks use it as well. In a future blog entry, I will talk about lazy loading and how it can be used to avoid loading large object graphs.