Earlier versions of Jepsen found glaring inconsistencies, but missed subtle ones. In particular, Jepsen was not well equipped to distinguish linearizable systems from sequentially or causally consistent ones. When people asked me to analyze systems which claimed to be linearizable, Jepsen could rule out obvious classes of behavior, like dropping writes, but couldn’t tell us much more than that. Since users and vendors are starting to rely on Jepsen as a basic check on correctness, it’s important that Jepsen be able to identify true linearization errors.

etcd-jepsen-set-test.jpg

To understand why Jepsen was not a complete test of linearizability, we have to understand the structure of its original tests. Jepsen assumed, originally, that every system could be modeled as a set of integers. Each client would gradually add a sequence of integers–disjoint from all the other client sets–to the database’s set; then perform a final read. If any elements which had supposedly succeeded were missing, we know the system dropped data.

The original Jepsen tests were designed for AP systems, like Riak, without a linear order; using a set is appropriate because its contents are fundamentally unordered, and because addition to the set is associative and idempotent. To test a linearizable system, we implement set addition by performing a compare-and-set, replacing the old set with the current value plus the number being written. If a given CAS was successful, then that element should appear in the final read.

This does verify sequential consistency, and to varying degrees linearizability, but has limited power. The database may choose, for instance, to delay the visibility of changes, so long as they become visible before the final read. We can’t test operations other than a CAS. We can’t, for instance, test deletions. It’s also not clear how to verify systems like mutexes, queues, or semaphores.

Furthermore, if a test does fail, it’s not clear why. A missing number from the final set might be caused by a problem with that particular CAS–or a CAS executed hours later which happened to destroy the effects of a preceding write. Ideally, we’d like to know exactly why the system failed to linearize. With this in mind, I set out to design a linearizability checker suitable for analyzing both formal models and real software with no internal visibility.

Knossos

In the introduction to Knossos, I couched Knossos as a model checker, motivated by a particular algorithm discussed on the Redis mailing list. This was slightly disingenuous: in fact, I designed Knossos as a model checker for any type of history, including those recorded from real databases. This means that Jepsen can generate a series of random operations, execute them against a database, and verify that the resulting history is valid with respect to some model.

Given a sequence of operations that a database might go through–say, two processes attempting to acquire a mutex:

{:process 1, :type :invoke, :f :acquire, :value nil}
{:process 2, :type :invoke, :f :acquire, :value nil}
{:process 1, :type :ok,     :f :acquire, :value nil}
{:process 2, :type :fail    :f :acquire, :value "lock failed; already held"}

… and a singlethreaded model of the system, like

(defrecord Mutex [locked?]
  Model
  (step [mutex op]
    (condp = (:f op)
      :acquire (if locked?
                 (inconsistent "already held")
                 (Mutex. true))
      :release (if locked?
                 (Mutex. false)
                 (inconsistent "not held")))))

… Knossos can identify if the given concurrent history linearizes–that is, whether there exists some equivalent history in which every operation appears to take place atomically, in a well-defined order, between the invocation and completion times.

jepsen-model.jpg

Linearizability, like sequential and serializable consistency, requires that every operation take place in some specific order; that there appears to be only one “true” state for the system at any given time. Therefore we can model any linearizable system as a single state, plus a function, called step, which applies an operation to that state and returns a new state.

In Clojure, we represent this model with a simple protocol, called Model, which defines a function (step current-model-state operation), and returns the new state. In our mutex example, there are four possibilities, depending on whether the operation is :acquire or :release, and whether the state locked? is true. If we try to lock an unlocked mutex, we return a new Mutex with the state true. If we try to lock a mutex which is already locked, we return a special kind of state: an inconsistent state.

Inconsistent states allow us to verify that a singlethreaded history is valid. We simply (reduce step initial-state operations); if the the result is inconsistent, we know that sequence of operations was prohibited by the model. The model formally expresses our definition of the allowable causal histories.

The plot thickens

jepsen-histories.jpg

But we don’t have a singlethreaded history to test. We have a multithreaded history, with any number of operations in play concurrently. Each client is invoking, waiting for, and then discovering the result of its operations. Our history contains pairs of :invoke, :ok messages, when an operation succeeds, or :invoke, :fail when the operation is known to not have taken place, or :invoke, :info, when we simply don’t know what happened.

If an operation times out, or the server returns an indeterminate response, we may never find out whether the operation really took place. In the history to the right, process 5 has hung and will never recover. Its operation could take place at any time, even years into the future. In general, a hung process is concurrent with every other subsequent operation.

jepsen-invalid-history.jpg
jepsen-valid-history.jpg

Given a model, we know how to test if a particular sequence of operations is valid. But in a concurrent history, the ordering is ambiguous; each operation could take place at any time between its invocation and completion. One possible interleaving might be read 1, write 1, read 2, write 2, which is obviously incorrect. On the other hand, we could evaluate write 1, read 1, write 2, read 2 instead–which is a valid history for a register. This history is linearizable–but in order to prove that fact, we have to find a particular valid order.

Imagine something like a game of hopscotch: one must land on each cell in turn, always moving from left to right, finding a path in which the model’s constraints hold. Where there are many cells at the same time, finding a path becomes especially difficult. We must consider every possible permutation of those concurrent cells, which is O(n!). That’s the kind of hopscotch that, even when played by computer, makes one re-evaluate one’s life choices.

So what do we do, presented with a huge space of possibilities?

Exploit degeneracy

I’m a degenerate sort of person, so my first inclination is to look for symmetries in the state space. The key observation to make is that whether a given operation is valid or not depends solely on the current state of the model, not its history.

step(state, op) -> state'

jepsen-degeneracy.jpg

It doesn’t matter how we got to the state; if you give me two registers containing the value 2, and ask me to apply the same operation to both, we only need to check one of the registers, because the results will be equivalent!

Unlike a formal model-checker or proof assistant, Knossos doesn’t know the structure of the system it’s analyzing; it can’t perform symmetry reduction based on the definition of step. What we can do, however, is look for cases where we come back to the same state and the same future series of operations–and when that occurs, drop all but one of the cases immediately–and this turns out to be equivalent to a certain class of symmetry reduction. In particular, we can compact interchangeable orders like concurrent reads, or writes that lead to the same value, etc. We keep a cache of visited worlds and avoid exploring any that have been seen before.

Laziness

monads.jpg
jepsen-laziness.jpg

Remember, we’re looking for any linearization, not all of them. If we can find a shortcut by not evaluating some highly-branching history, by not taking some expensive path, we can skip huge parts of the search. Like a lightning bolt feeling its way down the path of least resistance, we evaluate only those paths which seem easiest–coming back to the hard ones later. If the history is truly not linearizable, we’re forced to return to those expensive branches and check them, but if the history is valid, we can finish as soon as a single path is found.

Lazy evaluation is all about making control flow explicit instead of implicit. We use a data structure to describe where to explore next, instead of following the normal program flow. In Knossos, we represent the exploration of a particular order of operations as a world, which sits at some index along the multithreaded history. Each world carries with it a fixed history–the specific order of operations that occurred in that possible universe. The fixed history leads to a current model state. Finally, each world has a set of pending operations: operations that have been invoked, but have not yet taken effect.

For example, a world might have a fixed history of lock, unlock, lock, leading to a model state where locked is true, and a second lock attempt might be pending but not yet applied. An unlock operation could arrive and allow the pending lock to take place.

By representing the entire state of the computation as a data structure, we can write a single function that takes a world and explores it, returning a set of potential future worlds. We can explore those worlds in parallel.

Parallelization

pmap.jpg
jepsen-parallelize.jpg

Because our states are immutable representations of the computation, and the function we use to explore any given state is pure and deterministic, we can trivially parallelize the exploration process. Early versions of Knossos reduced over each operation in the history, applying that operation to every outstanding world by whacking it with a parallel map.

This parallelization strategy has a serious drawback, though: by exploring the state space one index at a time, we effectively perform a breadth-first search. We want to take shortcuts through the state space; running many searches at once. We don’t just want depth-first, either; instead, we want to explore those worlds which have the lowest branching factor, because those worlds are the cheapest to explore.

So instead of exploring the history one operation at a time, we spawn lots of threads and have each consume from a priority queue of worlds, ranked by how awful those worlds are to explore. As each explorer thread discovers new consequent worlds, it inserts them back into the pool. If any thread finds a world that encompasses every operation in the history, we’ve demonstrated the history is linearizable.

We pay some cost in synchronization: queues aren’t cheap, and the java.util.concurrent.BlockingPriorityQueue has some particularly nasty contention costs for both enqueues and dequeues. Luckily, the queue will usually contain plenty of elements, so we can stripe the queue into several subqueues, each with thread affinity. Affinity for each queue reduces lock contention, which dramatically reduces the time threads spend waiting to enqueue or dequeue worlds. When a thread exhausts its local queue, it steals worlds from its neighbors.

This approach costs us some degree of memory locality: transferring records through the queue tends to push them out of the CPU cache. We can tune how far each explorer thread will take a particular world to reduce the locality cost: if work is too chunky, threads can starve awaiting worlds to explore–but if work is too fine-grained, synchronization and cache misses dominate.

Memoization

Making control flow explicit (some might even say monadic) allows us to memoize computation as well. At RICON East, in 2013, Margo Seltzer gave a phenomenal talk on automatically parallelizing singlethreaded x86 programs. She pointed out that x86 can be thought of as a very large, very complicated, function that transforms a bit-vector of all the registers and all of memory into some subsequent state–depending on the instruction pointer, contents of registers, etc. It’s a very large value, but if you compress it and make even some modest predictions you can cache the results of computations that haven’t even happened yet, allowing the program to jump forward when it encounters a known state.

jepsen-memoization.jpg

This works because parallel programs usually don’t change the entire memory space; they often read and write only a small portion of memory. for(i = 0; i < 100; i++) { arr[i]++ }, for instance, independently increments each number in arr. In that sense, the memory space is degenerate outside each particular element. That degeneracy allows speculative execution to have a chance of predicting an equivalent future state of the program: we can increment each number concurrently.

In Knossos we have a similarly degenerate state space; all fixed histories may be collapsed so long as the model and pending operations are identical. We also have a speculative and lazy execution strategy: operations are simultaneously explored at various points in the multiverse. Hence we can apply a similar memoization strategy: by caching visited worlds, we can avoid exploring equivalent paths twice.

In fact we don’t even need to store the results of the exploration, simply that we have reached that world. Think of exploring a maze with several friends, all looking for a path through. When anyone reaches a dead end, they can save time for everyone by coloring in the path they took. When someone comes to a branch in the maze, they only take the paths that nobody has colored in. We simply abort any exploration of a world equivalent to one already visited. This optimization is nondeterministic but synchronization-free, allowing memoization checks to be extremely cheap. Even though cache hitrates are typically low, each hit prunes an exponential number of descendant worlds, dramatically reducing runtimes.

Immutability and performance

haskell.jpg

When we explore a world, we’ll typically encounter many branching paths. Given two concurrent writes a and b, we need to explore [], [a], [b], [a b], and [b a], and in turn, each of those worlds will fork into hundreds, then thousands, then millions of consequent worlds. We have to make a lot of copies.

At this point in the essay, Haskell enthusiasts are nodding their heads sagely and muttering things about Coyoneda diffeomorphisms and trendofunctors. Haskell offers excellent support for immutable data structures and parallel execution of pure functions, which would make it an ideal choice for building this kind of checker.

zahn.jpg

But I am, sadly, not a Haskell wizard. When you get right down to it, I’m more of a Clojure Sith Lord. And as it turns out, this is a type of problem that Clojure is also well-suited for. We express the consistency model as a pure function over immutable models, and use Clojure’s immutable maps, vectors, and sets to store the state of each world, its histories, its pending operations, and so on. Forking the world into distinct paths doesn’t require copying the entire state; rather, Clojure uses a reference to the original data structure, and stores a delta on top. We can fork millions of worlds cheaply.

Because worlds are immutable, we can share them freely between threads. Because the functions that explore a world, returning subsequent possible worlds, are pure, we can explore worlds on any thread, at any time, and take advantage of memoization. But in order to execute that search process in parallel, we need that priority queue of worlds-at-the-edge: a fundamentally mutable data structure. The memoizing cache is also mutable: it must be, to share state between threads. We also need some book-keeping state: how far has the algorithm explored; have we reached the end; how large is the cache.

So as a layer atop the immutable core, we make limited use of mutable structures: a striped java.util.concurrent.PriorityQueue for keeping track of which worlds are up next, a concurrent hashmap to memoize results, Clojure’s atoms for bookkeeping, and some java.util.concurrent.atomic references for primitive CAS. Because this code is wildly nondeterministic, it’s the most difficult portion of Knossos to reason about and debug–yet that nondeterminism is a critical degree of freedom for parallel execution. By broadening the space of allowable execution orders, we reduce the need for inter-core synchronization.

deathstar.jpg

Reducing synchronization is especially important because while I was working on Knossos, Comcast offered me a research grant specifically for Jepsen. As one does when offered unlimited resources by a galactic empire, I thought big.

I used Comcast’s grant to build a 24-core (48 HT) Xeon with 128GB of ECC; effectively demolishing the parallelism and heap barriers that limited earlier verification efforts. Extensive profiling with Yourkit (another great supporter of open-source projects) helped reduce lock and CAS contention which limited scalability to ~4 cores; a few weeks work removed almost all thread stalls and improved performance by two orders of magnitude.

The result is that Knossos can check 5-process, 150–200-element histories in a matter of minutes, not days–and it can do it on 48 cores.

cpus.png

There are several optimizations I haven’t made yet; for instance, detecting crashed processes and optimistically inserting a world in which that crashed process’ operation never takes place. However, Knossos at this stage is more than capable of detecting linearization errors in real-world histories.

Proud of this technological terror I’d constructed, I consulted the small Moff Tarkin that lives in my head on what database to test next. “You would prefer another target? An open-source target? Then name the distributed system!”

alderaan.jpg

RabbitMQ. They’re on RabbitMQ.”

Bill Smith
Bill Smith on

After you finish beating up on RabbitMQ, I would enjoy hearing more about what you discovered with YourKit.

Samuel Trille
Samuel Trille on

Dude, You can have this result with Erlang from the very beginning in 2 hours ;__; Good luck with your quest to find truth.

computer repair

Dude! this is terrific read im interested highly on the development of yourkit

simbo1905

Jepsen Knossos was recently subpoenaed to appear in court as an expert witness of the trail of Paxos Agorithm. Apparently he is a really snappy dresser and a hard as nails private detective. You can read extracts from the court transcriptions over at http://simbo1905.wordpress.com/2014/10/05/in-defence-of-the-paxos-consensus-algorithm/

Post a Comment

Comments are moderated. Links have nofollow. Seriously, spammers, give it a rest.

Please avoid writing anything here unless you're a computer. This is also a trap:

Supports Github-flavored Markdown, including [links](http://foo.com/), *emphasis*, _underline_, `code`, and > blockquotes. Use ```clj on its own line to start an (e.g.) Clojure code block, and ``` to end the block.