A few weeks ago I criticized a proposal by Antirez for a hypothetical linearizable system built on top of Redis WAIT and a strong coordinator. I showed that the coordinator he suggested was physically impossible to build, and that anybody who tried to actually implement that design would run into serious problems. I demonstrated those problems (and additional implementation-specific issues) in an experiment on Redis’ unstable branch.

Antirez’ principal objections, as I understand them, are:

  1. Some readers mistakenly assumed that the system I discussed was a proposal for Redis Cluster.
  2. I showed that the proposal was physically impossible, but didn’t address its safety if it were possible.
  3. The impossible parts of the proposed system could be implemented in a real asynchronous network by layering in additional constraints on the leader election process.

I did not assert that this was a design for Redis Cluster, and the term “Redis Cluster” appeared nowhere in the post. To be absolutely clear; at no point in these posts have I discussed Redis Cluster. Antirez acknowledges that Cluster makes essentially no safety guarantees, so I haven’t felt the need to write about it.

I did, however, provide ample reference to multiple points in the mailing list thread where Antirez made strong claims about the consistency of hypothetical systems built with Redis WAIT and strong failover coordinators, and cited the gist in question as the canonical example thereof. I also thought it was clear that the system Antirez proposed were physically impossible, and that in addition to those flaws I analyzed weaker, practically achievable designs. However, comments on the post, on Twitter, and on Hacker News suggest a clarification is in order.

If Aphyr was interested in a real discussion, I could just agree about the obvious, that if you can totally control the orchestration of the system then synchronous replication is obviously a building block that can be part of a strong consistent system. Apparently he as just interested in spreading FUD. Congratulations.

Allow me to phrase this unambiguously: not only is this system impossible to build, but even if it were possible, it would not be linearizable.

There are obvious flaws in Antirez’s proposal, but I’m not convinced that simply explaining those flaws will do enough good. This is unlikely to be the last of Antirez’–or anyone else’s–consistency schemes, and I can’t possibly QA all of them! Instead, I’d like to raise the level of discussion around linearizability by showing how to find problems in concurrent algorithms–even if you don’t know where those problems lie.

So here, have a repo.

Knossos

Named after the ruins where Linear B was discovered, Knossos identifies whether or not a history of events from a concurrent system is linearizable. We’re going to be working through knossos.core and knossos.redis in this post. I’ll elide some code in this post for clarity, but it’s all there in the repo.

In Knossos, we analyze histories. Histories are a sequence of operations, each of which is a map:

[{:process :c2, :type :invoke, :f :write, :value 850919}
 {:process :c1, :type :ok, :f :write, :value 850914}
 {:process :c1, :type :invoke, :f :read, :value 850919}
 {:process :c1, :type :ok, :f :read, :value 850919}]

:process is the logical thread performing the operation. :invoke marks the start of an operation, and :ok marks its completion. :f is the kind of operation being invoked, and :value is an arbitrary argument for that operation, e.g. the value of a read or write. The interpretation of :f and :value depends on the datatype we’re modeling: for a set, we might support :add, :remove, and :contains.

To verify the history, we need a model which verifies that a sequence of operations applied in a particular order is valid. For instance, if we’re describing a register (e.g. a variable in a programming language–a mutable reference that points to a single value), we would like to enforce that every read sees the most recently written value. If we write 1, then write 2, then read, the read should see 2.

We represent a model with the function knossos.core/step which takes a model’s state, and an operation, and returns a new model. If the operation applied to that state would be invalid, step throws.

(defprotocol Model
  (step [model op]))

(defrecord Register [value]
  Model
  (step [r op]
    (condp = (:f op)
      :write (Register. (:value op))
      :read  (if (or (nil? (:value op))     ; We don't know what the read was
                     (= value (:value op))) ; Read was a specific value
               r
               (throw (RuntimeException.
                        (str "read " (pr-str (:value op))
                             " from register " value)))))))

A Register implements the Model protocol and defines two functions: :write, which returns a modified copy of the register with the new value, and :read, which returns the register itself–if the read corresponds to the current value.

In a real experiment (as opposed to a mathematical model), we may not know what the read’s value will be until it returns. We allow any read with an unknown (nil?) value to succeed; when the read comes back we can re-evaluate the model with the value in mind.

This definition of step lets us reduce a sequence of operations over the model to produce a final state:

user=> (reduce step
               (Register. 0)
               [{:process :c1, :type :ok, :f :write, :value 4}
                {:process :c2, :type :ok, :f :read, :value 4}]
#knossos.core.Register{:value 4}
user=> (reduce step
               (Register. 0)
               [{:process :c1, :type :ok, :f :write, :value 4}
                {:process :c2, :type :ok, :f :read, :value 7}])

RuntimeException read 7 from register 4  knossos.core.Register (core.clj:43)

Now our problem consists of taking a history with pairs of (invoke, ok) operations, and finding an equivalent history of single operations which is consistent with the model. This equivalent single-threaded history is called a linearization; a system is linearizable if at least one such history exists. The actual definition is a bit more complicated, accounting for unmatched invoke/ok pairs, but this a workable lay definition.

Finding linearizations

The space of possible histories is really big. If we invoke 10 operations and none of them return OK, any subset of those operations might have taken place. So first we have to take the power set for any incomplete operations: that’s 2^n. Then for each of those subsets we have to compute every possible interleaving of operations. If every operations’ invocation and completion overlap, we construct a full permutation. That’s m!.

“Did you just tell me to go fuck myself?”

“I believe I did, Bob.”

My initial approach was to construct a radix tree of all possible histories (or, equivalently, a transition graph), and try to exploit degeneracy to prune the state space. Much of the literature on linearizability generates the full set of sequential histories and tests each one separately. Microsoft Research’s PARAGLIDER, in the absence of known linearization points, relies on this brute-force approach using the SPIN model checker.

A straightforward way to automatically check whether a concurrent history has a corresponding linearization is to simply try all possible permutations of the concurrent history until we either find a linearization and stop, or fail to find one and report that the concurrent history is not linearizable. We refer to this approach as Automatic Linearization… Despite its inherent complexity costs, we do use this method for checking concurrent histories of small length (e.g. less than 20). In practice, the space used for concurrent algorithms is typically small because incorrect algorithms often exhibit an incorrect concurrent history which is almost sequential.

In my experiments, enumerating all interleavings and testing each one started to break down around 12 to 16-operation histories.

Burckhardt, Dern, Musuvathi, and Tan wrote Line-Up, which verifies working C# algorithms by enumerating all thread interleavings through the CHESS model checker. This limits Line-Up to verifying only algorithms with a small state space–though this tradeoff allows them to do some very cool things around blocking and data races.

Two papers I know of attempt to reduce the search space itself. Golab, Li, and Shah developed a wicked-smart online checker using Gibbons and Korach’s algorithm and dynamic programming, but GK applies only to registers; I’d like to be able to test sets, queues, and other, more complex datatypes. Yang Liu, et al use both state space symmetry and commutative operations to drastically prune the search space for their linearizability analysis, using the PAT model checker.

I haven’t built symmetry reduction yet, but I do have a different trick: pruning the search space incrementally, as we move through the history, by using the model itself. This is a lot more complex than simply enumerating all possible interleavings–but if we can reject a branch early in the history, it saves huge amounts of work later in the search. The goal is to keep the number of possible worlds bounded by the concurrency of the history, not the length of the history.

So let’s do something paradoxical. Let’s make the problem even harder by multiplying the state space by N. Given a history of four invocations

[a b c d]

Let’s consider the N histories

[]
[a]
[a b]
[a b c]
[a b c d]

[] is trivially linearizable; nothing happens. [a] has two possible states: in one timeline, a completes. In another, a does not complete–remember, calls can fail. Assuming [a] passes the model, both are valid linearizations.

For the history [a b] we have five options. Neither a nor b can occur, one or the other could occur, or both could occur, in either order.

[]
[a]
[b]
[a b]
[b a]

Let’s say the register is initially nil, a is “write 5”, and b is “read 5”. [b] can’t take place on its own because we can’t read nil and get 5, and [b a] is invalid for the same reason. So we test five possibilities and find three linearizations. Not too bad, but we’re starting to see a hint of that n! explosion. By the third juncture we’ll have 16 sequential histories to test:

user=> (mapcat permutations (subsets ['a 'b 'c]))
([]
 (a) (b) (c)
 (a b) (b a) (a c) (c a) (b c) (c b)
 (a b c) (a c b) (b a c) (b c a) (c a b) (c b a))

And by the time we have to work with 10 operations concurrently, we’ll be facing 9864101 possible histories to test; it’ll take several minutes to test that many. But here’s the key: only some of those histories are even reachable, and we already have a clue as to which.

The 3-operation histories will include some histories which we already tested. [b a c], for instance, begins with [b a]; so if we already tested [b a] and found it impossible, we don’t even have to test [b a c] at all. The same goes for [b c a]–and every history, of any length, which begins with b.

So instead of testing all six 3-operation histories, we only have to test four. If the model rejects some of those, we can use those prefixes to reject longer histories, and so on. This dramatically cuts the state space, allowing us to test much longer histories in reasonable time.

Knossos uses Clojure’s shared-state immutable data structures to implement this search efficiently. We reduce over the history in order, maintaining a set of possible worlds. Every invocation bifurcates the set of worlds into those in which the operation happens immediately, and those in which it is deferred for later. Every completion prunes the set of worlds to only those in which the given operation completed. We can then ask Knossos to produce a set of worlds–linearized histories and the resulting states of their models–which are consistent with a given concurrent history.

Testing a model

Now let’s apply that linearizability checker to a particular system. We could measure a system experimentally, like I’ve done with Jepsen, or we could generate histories based on a formal model of a system. As an example, let’s test the model suggested by Antirez, describing a linearizable system built on top of Redis, WAIT, and a magical coordinator. As I described earlier, this model is physically impossible; it can not be built because the coordinator would need to violate the laws of physics. But let’s pretend we live on Planet Terah, and see whether the system is actually sound.

Antirez writes:

There are five nodes, using master-slave replication. When we start A is the master.

The nodes are capable of synchronous replication, when a client writes, it gets as relpy the number or replicas that received the write. A client can consider a write accepted only when “3” or more is returned, otherwise the result is not determined (false negatives are possbile).

Every node has a serial number, called the replication offset. It is always incremented as the replication stream is processed by a replica. Replicas are capable of telling an external entity, called “the controller”, what is the replication offset processed so far.

At some point, the controller, dictates that the current master is down, and decides that a failover is needed, so the master is switched to another one, in this way:

  1. The controller completely partition away the current master.
  2. The controller selects, out of a majority of replicas that are still available, the one with the higher replication offset.
  3. The controller tells all the reachable slaves what is the new master: the slaves start to get new data from the new master.
  4. The controller finally reconfigure all the clients to write to the new master.

So everything starts again. We assume that a re-appearing master, or other slaves that are again available after partitions heal, are capable of understand what the new master is. However both the old master and the slaves can’t accept writes. Slaves are read-only, while the re-apprearing master will not be able to return the majority on writes, so the client will not be able to consider the writes accepted.

In this model, it is possible to reach linearizability? I believe, yes, because we removed all the hard part, for which the strong protocols like Raft use epochs.

If you’ve spotted some of the problems in this approach, good work! But let’s say there were no obvious problems, and we weren’t sure how to find some. To do this, we’ll need a description of the system which is unambiguous and complete. Something a computer can understand.

First off, let’s describe a node:

(defn node
  "A node consists of a register, a primary it replicates from, whether it is
  isolated from all other nodes, a local replication offset, and a map of node
  names to known replication offsets."
  [name]
  {:name     name
   :register nil
   :primary  nil
   :isolated false
   :offset   0
   :offsets  {}})

Seems straightforward enough. This is a really simple model of a Redis server–one which only has a single register to read and write. We could extend it with more complex types, like lists and sets, but we’re trying to keep things simple. Notice how things like “Each node has a serial number, called the replication offset” have been translated into a field in a structure. We’ve also encoded things which were implicit in the proposal, like the fact that the WAIT command relies on the node knowing the replication offsets of its peers.

Remember, in proofs we try to deal as much as possible with immutable, pure systems; Clojure, Erlang, ML, and Haskell all lend themselves naturally to this approach. If you’re writing your checker in something like Ruby or Java, try to write immutable code anyway. It may be a bit unnatural, but it’ll really simplify things later.

(defn client
  "A client is a singlethreaded process which can, at any time, have at most
  one request in-flight to the cluster. It has a primary that it uses for reads
  and writes, and an in-flight request.
  
  Clients can be waiting for a response, in which case :wait will be the
  replication offset from the primary they're awaiting. :waiting is the value
  they're waiting for, if conducting a write."
  [name]
  {:name    name
   :node    nil
   :writing nil
   :waiting nil})

We’ll also need a coordinator. This one’s simple:

(defn coordinator
  "A controller is an FSM which manages the election process for nodes. It
  comprises a state (the phase of the election cycle it's in), and the current
  primary."
  [primary]
  {:state       :normal
   :primary     primary})

Next we’ll put all these pieces together into a full system. Phrases like “When we start A is the master,” are translated into code which picks the first node as the primary, and code which ensures that primary state is reflected by the other nodes and the coordinator.

(defn system
  "A system is comprised of a collection of nodes, a collection of clients, and
  a coordinator; plus a *history*, which is the set of operations we're
  verifying is linearizable."
  []
  (let [node-names [:n1 :n2 :n3]
        nodes      (->> node-names
                        (map node)
                        ; Fill in offset maps
                        (map (fn [node]
                               (->> node-names
                                    (remove #{(:name node)})
                                    (reduce #(assoc %1 %2 0) {})
                                    (assoc node :offsets)))))
        ; Initial primary/secondary state
        [primary & secondaries] nodes
        nodes (cons primary
                    (map #(assoc % :primary (:name primary))
                         secondaries))

        ; Construct a map of node names to nodes
        nodes (->> nodes
                   (map (juxt :name identity))
                   (into {}))

        ; Construct clients
        clients (->> [:c1 :c2]
                     (map client)
                     (map #(assoc % :node (:name primary)))
                     (map (juxt :name identity))
                     (into {}))]
    {:coordinator (coordinator (:name primary))
     :clients     clients
     :nodes       nodes
     :history     []}))

Note that we’ve introduced, for any given state of the system, the history of operation which brought us to this point. This is the same history that we’ll be evaluating using our linearizability checker.

This formally describes the state of the model. Now we need to enumerate the state transitions which bring the system from one state to another.

State transitions

First, we need a model of Redis reads and writes. Writes have two phases: an invocation and a response to the client–implemented with WAIT.

(def write-state (atom 0))

(defn client-write
  "A client can send a write operation to a node."
  [system]
  (->> system
       clients
       (filter free-client?)
       (filter (partial valid-client? system))
       (map (fn [client]
              (let [; Pick a value to write
                    value     (swap! write-state inc)
                    ; Find the node name for this client
                    node      (:node client)
                    ; And the new offset.
                    offset    (inc (get-in system [:nodes node :offset]))]
                (-> system
                    (assoc-in [:nodes node :register]            value)
                    (assoc-in [:nodes node :offset]              offset)
                    (assoc-in [:clients (:name client) :waiting] offset)
                    (assoc-in [:clients (:name client) :writing] value)
                    (log (invoke-op (:name client) :write value))))))))

client-write is a function which takes a system and returns a sequence of possible systems, each of which corresponds to one client initiating a write to its primary. We encode multiple constraints here:

  1. Clients can only initiate a write when they are not waiting for another response–i.e. clients are singlethreaded.
  2. Clients must be connected to a node which is not isolated and thinks that it is a primary. Note that this assumes a false linearization point: in the real world, these checks are not guaranteed to be instantaneous. We are being overly generous to simplify the model.

For each of these clients, we generate a unique number to write using (swap write-state inc), set the primary’s register to that value, increment the primary’s offset, and update the client–it is now waiting for that particular replication offset to be acknowledged by a majority of nodes. We also keep track of the value we wrote, just so we can fill it into the history later.

Finally, we update the history of the system, adding an invocation of a write, from this client, for the particular value.

When a client’s primary determines that a majority of nodes have acknowledged the offset that the client is waiting for, we can complete the write operation.

(defn client-write-complete
  "A reachable primary node can inform a client that its desired replication
  offset has been reached."
  [system]
  (->> system
       clients
       (remove free-client?)
       (filter (partial valid-client? system))
       (keep (fn [client]
               (let [offset (-> system
                                :nodes
                                (get (:node client))
                                majority-acked-offset)]
                 (when (<= (:waiting client) offset)
                   (-> system
                       (assoc-in [:clients (:name client) :waiting] nil)
                       (assoc-in [:clients (:name client) :writing] nil)
                       (log (ok-op (:name client)
                                   :write
                                   (:writing client))))))))))

Again, note the constraints: we can’t always complete writes. Only when the client is waiting, and the client is connected to a valid non-isolated primary, and the replication offset is acked by a majority of nodes, can these transitions take place.

keep is a Clojure function analogous to map and filter combined: only non-nil results appear in the output sequence. We use keep here to compactly express that only clients which have satisfied the majority offset acknowledgement constraint are eligible for completion.

Reads are similar to writes, but we make another generous allowance: reads are assumed to be a linearization point of the model, and therefore take place instantaneously. We add both invocation and completion operations to the log in one step.

(defn client-read
  "A client can read a value from its node, if primary and reachable. Reads are
  instantaneous."
  [system]
  (->> system
       clients
       (filter free-client?)
       (filter (partial valid-client? system))
       (map (fn [client]
              (let [node    (:node client)
                    value   (get-in system [:nodes node :register])]
                (-> system
                    (log (invoke-op (:name client) :read nil))
                    (log (ok-op (:name client) :read value))))))))

Replication

Redis replication is asynchronous: in one phase the client copies data from the primary, and after that, updates the primary with its replication offset. We assume that each phase takes place instantaneously. Is replication actually a linearization point in Redis? I don’t know–but we’ll be generous again.

(defn replicate-from-primary
  "A node can copy the state of its current primary, if the primary is
  reachable."
  [system]
  (->> system
       nodes
       (remove :isolated)
       (keep (fn [node]
               (when-let [primary (get-node system (:primary node))]
                 (when-not (:isolated primary)
                   (-> system
                       (assoc-in [:nodes (:name node) :register]
                                 (:register primary))
                       (assoc-in [:nodes (:name node) :offset]
                                 (:offset primary))
                       (log (op (:name node)
                                :info
                                :replicate-from-primary
                                (:primary node))))))))))

Pretty straightforward: each node can, if it has a primary and neither is isolated, copy its register state and offset. We’ll be generous and assume the primary’s total oplog is applied instantly and atomically.

The acknowledgement process is basically the reverse: we update the offset cache in the primary, so long as nodes are connected.

(defn ack-offset-to-primary
  "A node can inform its current primary of its offset, if the primary is
  reachable."
  [system]
  (->> system
       nodes
       (remove :isolated)
       (keep (fn [node]
               (when-let [primary (get-node system (:primary node))]
                 (when-not (:isolated primary)
                   (-> system
                       (assoc-in [:nodes
                                  (:primary node)
                                  :offsets
                                  (:name node)]
                                 (:offset node))
                       (log (op (:name node)
                                :info
                                :ack-offset-to-primary
                                (:primary node))))))))))

Failover

Four functions, corresponding to each of the four steps in the algorithm. We ensure they happen in order by ensuring that a transition can only take place if the coordinator just completed the previous step.

1) The controller completely partition away the current master.

(defn failover-1-isolate
  "If the coordinator is in normal mode, initiates failover by isolating the
  current primary."
  [system]
  (let [coord (:coordinator system)]
    (when (= :normal (:state coord))
      (-> system
          (assoc-in [:coordinator :state]               :isolated)
          (assoc-in [:coordinator :primary]             nil)
          (assoc-in [:nodes (:primary coord) :isolated] true)
          (log (op :coord :info :failover-1-isolate (:primary coord)))))))

Notice how we formalized the English statement by encoding properties of the network throughout the model: each state transition checks the partitioned state of the nodes involved. This is an oversimplification of the real system, because this part of the algorithm is impossible in an asynchronous network: it modifies the current primary’s state directly instead of sending it a message. We’re being generous by assuming the network propagates messages instantly; a more thorough model would explicitly model the loss and delay of messages.

2) The controller selects, out of a majority of replicas that are still available, the one with the higher replication offset.

Again, translation is straightforward; in the model we can freely violate the laws of physics.

(defn failover-2-select
  "If the coordinator has isolated the old primary, selects a new primary by
  choosing the reachable node with the highest offset."
  [system]
  (let [coord (:coordinator system)]
    (when (= :isolated (:state coord))
      (let [candidates (->> system nodes (remove :isolated))]
        ; Gotta reach a majority
        (when (<= (inc (Math/floor (/ (count (nodes system)) 2)))
                  (count candidates))
          (let [primary (:name (apply max-key :offset candidates))]
            (-> system
                (assoc-in [:coordinator :state] :selected)
                (assoc-in [:coordinator :primary] primary)
                (log (op :coord :info :failover-2-select primary)))))))))

3) The controller tells all the reachable slaves what is the new master: the slaves start to get new data from the new master.

You know the drill. We create a false point of linearization and assume this broadcast is atomic.

(defn failover-3-inform-nodes
  "If the coordinator has selected a new primary, broadcasts that primary to
  all reachable nodes."
  [system]
  (let [coord   (:coordinator system)
        primary (:primary coord)]
    (when (= :selected (:state coord))
      (-> system
          (assoc-in [:coordinator :state] :informed-nodes)
          (assoc :nodes (->> system
                             :nodes
                             (map (fn [ [name node] ]
                                    [name
                                     (cond
                                       ; If the node is isolated, state is
                                       ; unchanged.
                                       (:isolated node)
                                       node

                                       ; If this is the new primary node, make
                                       ; it a primary.
                                       (= primary name)
                                       (assoc node :primary nil)

                                       ; Otherwise, set the primary.
                                       :else
                                       (assoc node :primary primary))]))
                             (into {})))
          (log (op :coord :info :failover-3-inform-nodes primary))))))

4) The controller finally reconfigure all the clients to write to the new master.

Here too!

(defn failover-4-inform-clients
  "If the coordinator has informed all nodes of the new primary, update all
  client primaries."
  [system]
  (let [coord   (:coordinator system)
        primary (:primary coord)]
    (when (= :informed-nodes (:state coord))
      (-> system
          (assoc-in [:coordinator :state] :normal)
          (assoc :clients (->> system
                               :clients
                               (map (fn [ [name client] ]
                                      [name
                                       (assoc client :node primary)]))
                               (into {})))
          (log (op :coord :info :failover-4-inform-clients primary))))))

At each step there is exactly one failover transition that can happen–since the coordinator is magically sequential and never fails.

(defn failover
  "All four failover stages combined."
  [system]
  (when-let [system' (or (failover-1-isolate       system)
                       (failover-2-select          system)
                       (failover-3-inform-nodes    system)
                       (failover-4-inform-clients  system))]
    (list system')))

Putting it all together

We assume that a re-appearing master, or other slaves that are again available after partitions heal, are capable of understand what the new master is.

I struggled with this, and I actually don’t know how to interpret this part of the proposal. Erring on the side of safety, let’s omit any resurrection of isolated nodes. Once a node fails, it stays dead forever. If you let them come back, things get much more dangerous.

Only one last part remains: we need to express, in a single function, every allowable state transition.

(defn step
  "All systems reachable in a single step from a given system."
  [system]
  (concat (client-write           system)
          (client-write-complete  system)
          (client-read            system)
          (replicate-from-primary system)
          (ack-offset-to-primary  system)
          (failover               system)))

OK. So now we can evolve any particular state of the system in various directions. Let’s take a look at a basic system:

user=> (use 'knossos.redis)
nil
user=> (-> (system) pprint)
{:coordinator {:state :normal, :primary :n1},
 :clients
 {:c1 {:name :c1, :node :n1, :writing nil, :waiting nil},
  :c2 {:name :c2, :node :n1, :writing nil, :waiting nil}},
 :nodes
 {:n1
  {:name :n1,
   :register nil,
   :primary nil,
   :isolated false,
   :offset 0,
   :offsets {:n3 0, :n2 0}},
  :n2
  {:name :n2,
   :register nil,
   :primary :n1,
   :isolated false,
   :offset 0,
   :offsets {:n3 0, :n1 0}},
  :n3
  {:name :n3,
   :register nil,
   :primary :n1,
   :isolated false,
   :offset 0,
   :offsets {:n2 0, :n1 0}}},
 :history []}

What happens if we do a write?

user=> (-> (system) client-write rand-nth pprint)
{:coordinator {:state :normal, :primary :n1},
 :clients
 {:c1 {:name :c1, :node :n1, :writing nil, :waiting nil},
  :c2 {:name :c2, :node :n1, :writing 10, :waiting 1}},
 :nodes
 {:n1
  {:name :n1,
   :register 10,
   :primary nil,
   :isolated false,
   :offset 1,
   :offsets {:n3 0, :n2 0}},
  :n2
  {:name :n2,
   :register nil,
   :primary :n1,
   :isolated false,
   :offset 0,
   :offsets {:n3 0, :n1 0}},
  :n3
  {:name :n3,
   :register nil,
   :primary :n1,
   :isolated false,
   :offset 0,
   :offsets {:n2 0, :n1 0}}},
 :history [{:process :c2, :type :invoke, :f :write, :value 10}]}

Notice that client-write returns two systems: one in which :c1 writes, and one in which :c2 writes. We pick a random possibility using rand-nth. In this case, :c2 wrote the number 10 to :n1, and is waiting for replication offset 1 to be acknowledged. :n1, but not :n2 or :n3, has received the write. Note the history of this system reflects the invocation, but not the completion, of this write.

Let’s try to complete the write:

user=> (-> (system) client-write rand-nth client-write-complete pprint)
()

There are no possible worlds where the write can complete at this point. Why? Because the replication offset on the primary hasn’t completed yet. This is the whole point of Redis WAIT: we can’t consider a write complete until it’s been acknowledged.

user=> (-> (system)
           client-write rand-nth
           replicate-from-primary first
           ack-offset-to-primary first
           client-write-complete
           pprint)
({:coordinator {:state :normal, :primary :n1},
  :clients
  {:c1 {:name :c1, :node :n1, :writing nil, :waiting nil},
   :c2 {:name :c2, :node :n1, :writing nil, :waiting nil}},
  :nodes
  {:n1
   {:name :n1,
    :register 15,
    :primary nil,
    :isolated false,
    :offset 1,
    :offsets {:n3 0, :n2 1}},
   :n2
   {:name :n2,
    :register 15,
    :primary :n1,
    :isolated false,
    :offset 1,
    :offsets {:n3 0, :n1 0}},
   :n3
   {:name :n3,
    :register nil,
    :primary :n1,
    :isolated false,
    :offset 0,
    :offsets {:n2 0, :n1 0}}},
  :history
  [{:process :c1, :type :invoke, :f :write, :value 15}
   {:process :n2, :type :info, :f :replicate-from-primary, :value :n1}
   {:process :n2, :type :info, :f :ack-offset-to-primary, :value :n1}
   {:process :c1, :type :ok, :f :write, :value 15}]})

A successful write! The value 15 has been replicated to both :n1 and :n2, and with the offset map on :n1 updated, the WAIT request for the client can complete. The history reflects the invocation and completion of :c1’s write request.

Having written down the state of the system, and encoded all possible state transitions in the step function, we can find random trajectories through the system by interleaving calls to step and rand-nth. Because we don’t allow the resurrection of nodes, this system can simply halt, unable to make progress. In that case, we simply return the terminal state.

(defn trajectory
  "Returns a system from a randomized trajectory, `depth` steps away from the
  given system."
  [system depth]
  (if (zero? depth)
    system
    (let [possibilities (step system)]
      (if (empty? possibilities)
        ; Dead end
        system
        ; Descend
        (recur (rand-nth possibilities)
               (dec depth))))))

Because our trajectory evolution is randomized, the histories it generates will often contain extraneous garbage–repeated sequences of identical reads, for instance, or replicating the same state over and over again. We could go back and re-explore the state space, omitting certain transitions in the search of a simpler trajectory–but for now, we’ll take the random trajectories.

Model checking

We’ve built a simple model of a single-threaded linearizable register, a concurrent model of a hypothetical Redis system, and a verifier which tests that a history is linearizable with respect to a singlethreaded model. Now let’s combine these three elements.

First, a way to show the system that we wound up in, and the history that led us there. We’ll use linearizable-prefix to find the longest string of the history that was still linearizable–that’ll help show where, exactly, we ran out of options.

(defn print-system
  [system history]
  (let [linearizable (linearizable-prefix (->Register nil) history)]
    (locking *out*
      (println "\n\n### No linearizable history for system ###\n")
      (pprint (dissoc system :history))
      (println "\nHistory:\n")
      (pprint linearizable)
      (println "\nUnable to linearize past this point!\n")
      (pprint (drop (count linearizable) history)))))

Then we’ll generate a bunch of trajectories of, say, 15 steps apiece, and show any which have nonlinearizable histories.

(deftest redis-test
  (dothreads [i 4] ; hi haters
   (dotimes [i 10000]
    (let [system (trajectory (system) 15)]
     ; Is this system linearizable?
     (let [history (complete (:history system))
           linears (linearizations (->Register nil) history)]
      (when (empty? linears)
        (print-system system history))
      (is (not (empty? linears)))))))

And we’re ready to go. Is the model Antirez proposed linearizable?

$ lein test knossos.redis-test

### No linearizable history for system ###

{:coordinator {:state :normal, :primary :n2},
 :clients
 {:c1 {:name :c1, :node :n2, :writing nil, :waiting nil},
  :c2 {:name :c2, :node :n2, :writing 9, :waiting 2}},
 :nodes
 {:n1
  {:name :n1,
   :register 9,
   :primary nil,
   :isolated true,
   :offset 2,
   :offsets {:n3 0, :n2 1}},
  :n2
  {:name :n2,
   :register 5,
   :primary nil,
   :isolated false,
   :offset 1,
   :offsets {:n3 0, :n1 0}},
  :n3
  {:name :n3,
   :register nil,
   :primary :n2,
   :isolated false,
   :offset 0,
   :offsets {:n2 0, :n1 0}}}}

History:

[{:process :c2, :type :invoke, :f :write, :value 5}
 {:process :n2, :type :info, :f :replicate-from-primary, :value :n1}
 {:process :n2, :type :info, :f :ack-offset-to-primary, :value :n1}
 {:process :c2, :type :ok, :f :write, :value 5}
 {:process :n2, :type :info, :f :replicate-from-primary, :value :n1}
 {:process :c2, :type :invoke, :f :write, :value 9}
 {:process :n3, :type :info, :f :ack-offset-to-primary, :value :n1}
 {:process :c1, :type :invoke, :f :read, :value 9}
 {:process :c1, :type :ok, :f :read, :value 9}
 {:process :coord, :type :info, :f :failover-1-isolate, :value :n1}
 {:process :coord, :type :info, :f :failover-2-select, :value :n2}
 {:process :coord,
  :type :info,
  :f :failover-3-inform-nodes,
  :value :n2}
 {:process :coord,
  :type :info,
  :f :failover-4-inform-clients,
  :value :n2}
 {:process :n3, :type :info, :f :ack-offset-to-primary, :value :n2}
 {:process :n3, :type :info, :f :ack-offset-to-primary, :value :n2}
 {:process :c1, :type :invoke, :f :read, :value 5}]

Unable to linearize past this point!

({:process :c1, :type :ok, :f :read, :value 5})

lein test :only knossos.redis-test/redis-test

FAIL in (redis-test) (redis_test.clj:44)
expected: (not (empty? linears))
  actual: (not (not true))

Ran 1 tests containing 38340 assertions.
6 failures, 0 errors.

No, it isn’t.

What happened here?

Knossos generated a state of the system which it believes is possible, under the rules of the Redis model we constructed, but not linearizable with respect to the register model. Up until the final read, Knossos could still construct a world where things made sense, but that last read was inconsistent with every possible interpretation. Why?

Well, in the final state, n1 is isolated with value 9 at offset 2, n2 is a primary with value 5 at offset 1, and n3 thinks the value is nil. n3’s offset is 0; it never participated in this history, so we can ignore it.

  1. First, c2 writes 5 to n1. n2 replicates and acknowledges the write of 5, and c2’s write completes.
  2. n2 initiates a (noop) replication from n1.
  3. c2 initiates a write of 9 to n1. c1 concurrently initiates a read from n1, which will see 9.
  4. n2 completes its replication; state is unchanged.
  5. Then, a failover occurs. The coordinator performs all four steps atomically, so no concurrency questions there. n2 is selected as the new primary and n1 is isolated.
  6. n3 acknowledges its offset of 0 to n2 twice; both of which are noops since n2 already thinks n3’s offset is 0.
  7. Finally, c1 invokes a read from n2 and sees 5. This is the read which proves the system is inconsistent. Up until this point the history has been linearizable–we could have assumed, for instance, that the write of 9 failed and the register has always been 5, but that assumption was invalidated by the successful read of 9 by c1 earlier. We also could have assumed that the final read of 5 failed–but when it succeeded, Knossos ran out of options.

This case demonstrates that reads are a critical aspect of linearizability. Redis WAIT is not transactional. It allows clients to read unreplicated state from the primary node, which is just as invalid as reading stale data from secondaries.

I hope this illustrates beyond any shred of doubt: not only is Antirez’s proposal physically impossible, but even wildly optimistic formal interpretations of his proposal are trivially non-linearizable.

Yeah, this is Fear, Uncertainty, and Doubt. You should be uncertain about algorithms without proofs. You should doubt a distributed system without a formal model. You should be fearful that said system will not live up to its claims.

Now you’ve got another tool to validate that uncertainty for yourself.

Math is hard; let’s go model checking

Proving linearizability is hard. Much harder than proving a system is not linearizable, when you get down to it. All I had to do here was find a single counterexample; but proving linearizability requires showing that every history is valid. Traditionally one does this by identifying all the linearization points of an algorithm–the points where things take place atomically–which is a subtle and complex process, especially where the linearization point depends on runtime behavior or lies outside the code itself.

Moreover, I am not that great at proofs–and I don’t want to exclude readers who don’t have the benefit of formal training. I want to equip ordinary programmers with the motivation and tools to reason about their systems–and for that, model checking is a terrific compromise.

There are many tools available for formal modeling of concurrency. Leslie Lamport’s TLA+ is the canonical tool for concurrency proofs, but its learning curve is steep to say the least and I have a lot of trouble trying to compose its models. Bell Labs’ Spin is more accessible for programmers, encoding its models in a language called Promela. Spin has excellent tooling–it can even extract models from C code with assistance. There’s also Erigone, a reimplementation of Spin, and the aforementioned Line-Up for C#.

Knossos is a dead-simple verification system I hacked out in a week; it takes advantage of Clojure’s concise data-structure literals, immutable shared-state data structures, and concise syntax to make designing models and checking their linearizability easier. Knossos probably has some bugs, so be sure to check the failure cases by hand!

No matter which model checker you use, all of these systems let you formalize your algorithms by writing them down in a concise, unambiguous form–either a modeling language or a full programming language– and then verify that those models conform to certain invariants by exploring the state space. By working with a proof assistant, some of these specifications can also prove that the invariants hold always, instead of only proving that the invariants can fail to hold.

We verified a toy system in this blog post, but all the key elements are there. State, transition functions, invariants, and a model to verify against. We use hierarchical data structures and functions to break up the model into smaller, more manageable pieces. We generated counterexamples from probabilistic trajectories through the model.

Real models look just like this. Take a look at the model and proof sketch of the RAFT consensus algorithm, and see if you can spot the definitions of state, transitions, and invariants. Note that this isn’t a full proof–more like a sketch–and it relies on some propositions like type safety which are not mechanically verified, but this paper illustrates both formal and English proof techniques nicely.

This is the kind of argument you need to make, as a database engineer, before asserting a given system is linearizable. Formal verification will catch both obvious and subtle bugs before you, or your readers, try to implement them.

Ross B.
Ross B. on

Very interesting post. I’m still reading but may I please request clarification on something. You wrote:

“”“Microsoft Research’s PARAGLIDER, in the absence of known linearization points, relies on this brute-force approach using the SPIN model checker. …[SNIP]… In my experiments, this approach started to break down around 12 to 16-operation histories.”“”

When you say “in my experiments” do you mean that you actually tried SPIN, or that you experimented with a “brute-force” approach?

SPIN applies a number of rather clever mathematical transforms to reduce the search space, it therefore can hardly be described as a “brute force” approach. Perhaps you mean “provably exhaustive”? So again: did you try SPIN? or just a naive brute force search?

Thanks.

Aphyr on

I mean that I tried several approaches, including one similar to the PARAGLIDER paper. I abandoned the brute-force exploration of the state space because I hit similar limits as the the PARAGLIDER paper. From page 8:

A straightforward way to automatically check whether a concurrent history has a corresponding linearization is to simply try all possible permutations of the concurrent history until we either find a linearization and stop, or fail to find one and report that the concurrent history is not linearizable. We refer to this approach as Automatic Linearization. While this approach is conceptually simple, its worst-case time and space complexity is exponential in the length of the concurrent history.

I call that the “brute force” approach–though SPIN does do all sorts of optimizations internally.

I don’t know the details of their use of SPIN, but I can tell you that where PARAGLIDER tops out at 20 operations for automatic extraction of linearization points, Knossos can check a 6-thread, 600-operation register history in a few seconds, and that’s without any symmetry reduction or commutative history profiling. I could be misinterpreting the paper though!

Aphyr
Ross B.
Ross B. on

@Aphyr Thanks for the clarification.

By the way, I completely agree with this: “This is the kind of argument you need to make, as a database engineer, before asserting a given system is linearizable” – it’s true for any type of asynchronous concurrent system really (lock-free algorithms, distributed message passing, etc). There are just too many places for bugs to hide to rely on intuition and common-case modelling. Anyone who doubts the need for some kind of formal method or automated verification should read the first few chapters of “Design and Validation of Computer Protocols” by Gerard J. Holzmann (creator of SPIN) – faulty protocols have a long and colorful history.

antirez

Hello Aphyr,

13 days ago I wrote this in the Redis mailing list, about this model:

About non linearizability, perhaps it does not apply to the case where a strong coordinator exists, but in the general case one issue is that when we read, we can’t just read because a stale master could reply with stale data, breaking linearizability. There is a trick to force the read to be acknowledged that could work:

MULTI INCR somecounter GET data EXEC WAIT

I’m a distributed systems newbie, but I believe it was never a breaking news that in asynchronously replicated systems where the master applies the write as soon as it receives it, you can read non acknowledged data, so indeed this was an obvious issue. The above problem also applies in well-designed strong consistent systems and is well documented in the literature, in general you can’t just read from what you believe is the current master/leader, conceptually reads should follow the same path as writes, even if sometimes there are optimizations that can be used to make reads cheaper.

But regardless of that, this model you already wrote two blog posts about, was never the interesting part of the discussion, it was just an argument about “WAIT is just a low-level primitive, the behavior of the system is the sum of all the parts”.

The original thread was about Redis Cluster, that is a system that does not feature strong consistency, but only heuristics to try to put a bound on how much replicas can diverge, and heuristics about how to select the history that likely has more writes when a failover is performed.

Alex Baranosky
Alex Baranosky on

Hi,

Just throwing this out there as an interesting different way to implement one of your examples, rather than using the keep/when combo… I could easily have messed it up, because I haven’t executed it, but here goes anyway:

(defn client-write-complete “A reachable primary node can inform a client that its desired replication offset has been reached.” system (filter (partial valid-client? system)) (map (juxt identity (fn client) majority-acked-offset)))) (filter (fn [client offset] offset))) (map (fn [client _] :waiting] nil) (assoc-in [:clients (:name client) :writing] nil) (log (ok-op (:name client) :write (:writing client))))))))

Best, Alex

Baishampayan Ghose

Bravo, Kyle. Awesome work.

anonymous on

Bonus points for juxt. ;-)

Richard Achmatowicz
Richard Achmatowicz on

Kyle

I’m looking for the examples described in this post (Knossos, Redis and linearizability) concerning Redis but can’t find them in master on the repo for knossos. Are they still available? Are there a complete set of instructions for reproducing the tests outlined in this post?

Richard

Aphyr on

Ah, sorry Richard, that code’s several years old at this point, and both Knossos and Redis have moved on. You can trawl through the git history though.

Aphyr

Post a Comment

Comments are moderated. Links have nofollow. Seriously, spammers, give it a rest.

Please avoid writing anything here unless you're a computer. This is also a trap:

Supports Github-flavored Markdown, including [links](http://foo.com/), *emphasis*, _underline_, `code`, and > blockquotes. Use ```clj on its own line to start an (e.g.) Clojure code block, and ``` to end the block.