I’ve been doing a lot of performance tuning in Riemann recently, especially in the clients–but I’d like to share a particularly spectacular improvement from yesterday.

The Riemann protocol

Riemann’s TCP protocol is really simple. Send a Msg to the server, receive a response Msg. Messages might include some new events for the server, or a query; and a response might include a boolean acknowledgement or a list of events matching the query. The protocol is ordered; messages on a connection are processed in-order and responses sent in-order. Each Message is serialized using Protocol Buffers. To figure out how large each message is, you read a four-byte length header, then read length bytes, and parse that as a Msg.

time --->
send: [length1][msg1]  [length2][msg2]
recv:                     [length1][msg1]  [length2][msg2]

The optimization I discussed last time–pipelining requests–allows a client to send multiple messages before receiving their acknowledgements. There are many queues in between a client saying “send a message” and that message actually being parsed in Riemann: Java IO buffers, the kernel TCP stack, the network card, various pieces of networking hardware, the wires themselves… all act like queues. This means throughput is often limited by latency, so by writing messages asynchronously we can achieve higher throughput with only minor latency costs.

The other optimization I’ve been working on is batching. For various reasons, this kind of protocol performs better when messages are larger. If you can pack 100 events into a message, the server can buffer and parse it in one go, resulting in much higher throughputs at the cost of significantly higher latencies–especially if your event needs to sit in a buffer for a while, waiting for other events to show up so they can be sent in a Msg.

Netty’s threadpools

For any given connection, Netty (as used in Riemann) has two threadpools handling incoming bytes: the IO worker pool, and a handler pool which actually handles Riemann events. The IO worker pool is busy shuttling bytes back and forth from the TCP connection buffers through the pipeline–but if an IO worker spends too much time on a single channel, it won’t be able to handle other channels and latencies will rise. An ExecutionHandler takes over at some point in the pipeline, which uses the handler pool to do long-running work like handling a Msg.

Earlier versions of Riemann put the ExecutionHandler very close to the end of the pipeline, because all the early operations in the pipeline are really fast. The common advice goes, “Wrap long-running tasks in an execution handler, so they don’t block”. OK, makes sense.

(channel-pipeline-factory
           int32-frame-decoder (int32-frame-decoder) ; Read off 32-bit length headers
  ^:shared int32-frame-encoder (int32-frame-encoder) ; Add length header on the way out
  ^:shared protobuf-decoder    (protobuf-decoder)    ; Decode bytes to a Msg
  ^:shared protobuf-encoder    (protobuf-encoder)    ; Encode a Msg to bytes
  ^:shared msg-decoder         (msg-decoder)         ; Convert Msg to a record
  ^:shared msg-encoder         (msg-encoder)         ; Convert a record to a Msg
  ^:shared executor            (execution-handler)   ; Switch to handler threadpool
  ^:shared handler             (gen-tcp-handler      ; Actually process the Msg
                                 core
                                 channel-group
                                 tcp-handler))

Now… a motivated or prescient reader might ask, “How, exactly, does the execution handler get data from an IO thread over to a handler thread?”

It puts it on a queue. Like every good queue it’s bounded–but not by number of items, since some items could be way bigger than others. It’s bounded by memory.

(defn execution-handler
  "Creates a new netty execution handler."
  []
  (ExecutionHandler.
    (OrderedMemoryAwareThreadPoolExecutor.
      16       ; Core pool size
      1048576  ; 1MB per channel queued
      10485760 ; 10MB total queued
      )))

How does the Executor know how much memory is in a given item? It uses a DefaultObjectSizeEstimator, which knows all about Bytes and Channels and Buffers… but absolutely nothing about the decoded Protobuf objects which it’s being asked to enqueue. So the estimator goes and digs into the item’s fields using reflection:

int answer = 8; // Basic overhead.
  for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
       Field[] fields = c.getDeclaredFields();
       for (Field f : fields) {
           if ((f.getModifiers() & Modifier.STATIC) != 0) {
               // Ignore static fields.
               continue;
           }
           answer += estimateSize(f.getType(), visitedClasses);

Of course, I didn’t know this at the time. Netty is pretty big, and despite extensive documentation it’s not necessarily clear that an OrderedMemoryAwareThreadPoolExecutor is going to try and guess how much memory is in a given object, recursively.

So I’m staring at Yourkit, completely ignorant of everything I’ve just explained, and wondering why the devil DefaultObjectSizeEstimator is taking 38% of Riemann’s CPU time. It takes me ~15 hours of digging through Javadoc and source and blogs and StackOverflow to realize that all I have to do is…

  1. Build my own ObjectSizeEstimator, or
  2. Enqueue things I already know the size of.
(channel-pipeline-factory
         int32-frame-decoder (int32-frame-decoder)
^:shared int32-frame-encoder (int32-frame-encoder)
^:shared executor            (execution-handler) ; <--+
^:shared protobuf-decoder    (protobuf-decoder)  ;    |
^:shared protobuf-encoder    (protobuf-encoder)  ;    |
^:shared msg-decoder         (msg-decoder)       ;    |
^:shared msg-encoder         (msg-encoder)       ; ___|
^:shared handler             (gen-tcp-handler 
                               core
                               channel-group
                               tcp-handler))

Just move one line. Now I enqueue buffers with known sizes, instead of complex Protobuf objects. DefaultObjectSizeEstimator runs in constant time. Throughput doubles. Minimum latency drops by a factor of two.

drop tcp event batch throughput.png

drop tcp event batch latency.png

Throughput here is measured in messages, each containing 100 events, so master is processing 200,000–215,000 events/sec. Latency is for synchronous calls to client.sendEvents(anEvent). The dropoff at the tail end of the time series is the pipelining client draining its message queue. Client and server are running on the same quad-core Q8300, pushing about 20 megabytes/sec of traffic over loopback. Here’s what the riemann-bench session looks like, if you’re curious.

Why didn’t you figure this out sooner?

I wrote most of this code, and what code I didn’t write, I reviewed and tested. Why did it take me so long to figure out what was going on?

When I started working on this problem, the code looked nothing like the pipeline I showed you earlier.

The Netty pipeline evolved piecemeal, by trial-and-error, and went through several refactorings. The UDP server, TCP server, and Graphite server share much of the same code, but do very different things. I made several changes to improve performance. In making these changes I tried to minimize API disruption–to keep function interfaces the same–which gradually pulled the pipeline into several interacting pieces. Since Netty’s API is well-written, flexible Java code, it comes with literally hundreds of names to keep track of. Keeping function and variable names distinct became a challenge.

By the point I started digging into the problem, I was hard pressed to figure out what a channel pipeline factory was, let alone how it was constructed.

In order to solve the bug I had to understand the code, which meant inventing a new language to talk about pipelines. Once I’d expressed the pipeline clearly, it was obvious how the pieces interacted. Experimenting with new pipelines took a half hour, and I was able to almost double throughput with a single-line change.

Lee Wei

Hi,

I would just like to know what tool(s) you used to provide the graph visualisations in your article. If it’s something like gnuplot, then what settings were there?

Thanks

duckie
duckie on

Seems like a GNUPlot export in a vector format, then rendered with Inkscape for instance.

Aphyr on

This is a graph from Riemann-bench, which uses Schadenfreude, a Clojure time-series benchmarking tool I wrote. Schadenfreude uses Incanter, which in turn uses JFreeChart.

Aphyr
Andrew

Fascinating stuff. Taken from another angle, one could argue that once you had a working domain model, no matter how informal, it enabled you to make the insight detailed above - right?

Aphyr on

Exactly, Andrew. Solving problems requires developing a notation and a model to understand and express them. I spend a lot of time drawing diagrams, but wherever possible I try to develop executable models of the problem; such that the explanation of the problem is the code. Cuts down on the number of internal mappings you have to keep in your head.

Aphyr

Post a Comment

Comments are moderated. Links have nofollow. Seriously, spammers, give it a rest.

Please avoid writing anything here unless you're a computer. This is also a trap:

Supports Github-flavored Markdown, including [links](http://foo.com/), *emphasis*, _underline_, `code`, and > blockquotes. Use ```clj on its own line to start an (e.g.) Clojure code block, and ``` to end the block.