Thursday, November 28, 2013

Asynchronous HTTP with spray

Last time, I wrote about the high-level ideas behind the Scala libraries akka and spray, namely the concept of reactive programming in order to meet the demands of modern applications. I finally got the chance to play around a bit with spray in order to get a feel for how the theory of event-driven, asynchronous programming manifests itself in code. I started with a very simple web application that takes a stock ticker symbol and uses the Yahoo Query Language (YQL) to look up basic quote information for the company (using this endpoint). Here's some code for a trait that uses spray-http and spray-client to execute a request against the Yahoo API:

You'll notice that I included all of the necessary imports in the example (I normally leave them out), and this is because a lot of things look nicer once imported (e.g. GET vs HttpMethods.GET) and spray uses implicits quite extensively to expose a nice domain-specific language (DSL). So knowing what the imports are is actually pretty important for figuring out why the code actually compiles and works properly. Now let's break down the code. The request method is the simplest part, and all it does is take in the YQL query and construct an HTTP request with the appropriate URL and parameters. Spray has some convenient methods for making this easy, but there's nothing too special involved.

The executeQuery method is where all the magic happens. To understand it, we'll need to revisit two concepts: futures and actors. Futures are the Scala way of expressing a value that is computed asynchronously. In this case, executeQuery returns a future because, per the reactive programming principles, we do not want to block the thread while waiting for the request to be sent and the response to be received over the network. This means that when the method returns, it is most likely that the response has not come back, so whoever consumes the result of the future will need to wait for it (e.g. using Await). Actors are an abstraction for handling asynchronous computations using a messaging model; you send messages to an actor, which does some computation based on the contents of the message and potentially returns a result. In this case, we have an implicit ActorSystem that is used by spray-client to run an actor which receives the HttpRequest and returns an HttpResponse. The sendReceive method conveniently encapsulates all of this functionality.

Here is our YQL client in action:

Since this is a test, we define a test ActorSystem to use and then use Await to get the response from the future, but when integrated with spray-can (the HTTP server) it becomes much nicer. In contrast to the Java servlet model where we would have many threads all probably blocked on the network, spray encourages everything to be asynchronous and utilizes a small pool of threads. To that end, the "controllers" you define in spray-can allow futures to be returned and automatically map them to HTTP responses, so any request that requires asynchronous processing will naturally result in a future that is handled entirely by the underlying abstraction. Next time, we'll see how to layer other pieces such as JSON and error handling on top of futures in spray.

Sunday, November 17, 2013

Reactive Programming

There's an interesting idea being pushed forward by the Scala community about the way systems should be built, and it's called reactive programming. The reactive manifesto gives a nice summary of their views on the future of distributed applications and how the way we programmed ten years ago is inadequate in this future. Because of the changing landscape of latency expectations, data scale, and hardware improvements, going forward, systems must be event-driven, scalable, resilient, and responsive in order to meet user needs. There is a significant amount of ongoing Scala development in order to build tools that allow programmers to achieve these properties easily. This is the goal of the akka and spray projects.

The reactive programming model being adopted is based on actors. Given that I am a huge fan of the actor model for how it deals with the problems of concurrency, it is great to see an entirely new ecosystem developing around actors. The Scala community has really invested in actors as the new way in which communication between systems (or even within them) will be done in the future. From some slides for a recent talk on akka and spray, actors are part of a different programming paradigm that is "well-suited for building reactive systems" because they inherently shun the root of all concurrency evil, shared mutable state. Specifically, using actors versus the old model of managing threads in servlets gets around the high overhead of such management as well as the difficulties of synchronizing state. In some ways, we can view this as the next step in lightweight request handling: it was once (and still sometimes is) the case that each HTTP request would spawn a process, but that was greatly improved upon by having many threads within a single process handle requests. With actors, a single thread can handle many requests at the same time, allowing the resources to be much more efficiently utilized (this is the premise behind Node.js as well).

Spray is what you would expect the HTTP stack to look like on top of akka. While I haven't played with it myself, the code examples show that requests are mapped directly to actor messages, so everything is under the unified model of message handling. Judging from this, they have done a pretty good job from a performance standpoint as well, indicating just how lightweight request handling is in spray. Akka and spray together form a solid foundation on which you can build an application that conforms to the principles behind reactive programming. It is exciting to see technology evolving to meet the needs of users, programmers, and applications, and I'm sure that in a few years we'll see how far this paradigm can take us.

Sunday, November 10, 2013

For Comprehensions

In Scala, the concept of a "for loop" doesn't really exist. Instead, we get "for comprehensions," which cover for loops, foreach loops, and more as we will see. Suppose you have the following line of Scala:

for (i <- 0 until 10) yield i

It looks roughly like a for loop with some syntactic sugar. But as I mentioned, Scala doesn't have for loops, and this code actually gets translated by the compiler into

(0 until 10).map(i => i)

The until method is part of the RichInt class, and an implicit conversion from Int to RichInt is available by default in Scala. (0 until 10) produces a Range, and the result of the entire expression is an IndexedSeq[Int]. But what's so interesting about the fact that Scala translates for loops into map calls? It turns out that this translation is generic, i.e. not specific to a certain set of classes like collections. So we can write our own class that exposes a map method and leverage the syntactic sugar of for comprehensions. Here's an example based on a Sequence trait that represents an infinite sequence of integers:

A Sequence only knows how to do two things: get the element at some index and map to a new sequence using a function. We have two simple types of sequences, namely the ArithmeticSequence (whose name should be self-descriptive), and the MappedSequence that takes another sequence and applies a function to each element. The implementations of the two should be straightforward to understand. Now comes the interesting part: because we have implemented the map method, we can use the for comprehension syntax on our Sequence classes.

We start out with a simple ArithmeticSequence, but then we create a new sequence (using a for comprehension) in which every element is squared. And then we create a third sequence which replaces all of the odd elements of the second sequence with 0. All three of these are infinite sequences that we can access random elements of, as demonstrated in the final line.

What I've shown here is a primitive example of how for comprehensions can be leveraged by custom classes in Scala. This functionality was brought to my attention in the first week of the reactive programming class on Coursera, which is taught by some of the most knowledgeable Scala folks (including the designer of the language!). It turns out that the map method lets you do simple thing like what I did, but if you implement flatMap and filter, you can do arbitrarily complex for comprehensions with multiple variables and conditionals just like you would with collections. This is a good example of an aspect I like about Scala's philosophy, which is to generalize concepts as much as possible. While many languages treat for loops as their own special construct, Scala simply makes them syntactic sugar for a much richer class of functionality.

Sunday, November 3, 2013

Gumball Technique

Using memcached on top of a traditional relational database (RDBMS) has become the most popular way of building large-scale web services, as it is both simple and fast in the majority of cases. Loads are typically read-heavy because for each piece of content produced, many people will consume it. Moreover, the most popular pieces of content will account for the majority of traffic. As such, having an in-memory caching layer that is able to efficiently serve the mass of read requests to the same data will naturally become standard. Cache consistency, however, is known to be a hard problem; in this case it involves synchronizing the state of two data stores that do not provide any transactional guarantees between them. Consider the following code for a client that talks to both memcached and an RDBMS.

Assuming that the cache and the RDBMS expose the same interface as the client itself, this code for handling puts and gets is straightforward. On a get, check if the cache has the value; if not, go to the database and cache the value retrieved if it exists. On a put, issue a put on the database and invalidate the entry in the cache. In barely 10 lines of real code, we already see a race condition that can result in stale cache data. For example, suppose the database currently has the pair ("k", "v1"), the cache does not have "k", and two clients A and B are simultaneously getting the key "k" and putting the pair ("k", "v2"), respectively. Then the following sequence is possible:
  1. A sees that the cache does not contain the key "k".
  2. A gets the value "v1" from the RDBMS.
  3. B puts the pair ("k", "v2") into the RDBMS.
  4. B removes the key "k" from the cache (was not there in the first place).
  5. A caches the pair ("k", "v1") and returns.
So client A receives the value "v1", which is fine, but it also leaves that value in the cache after B has already updated it, meaning that future gets for key "k" will return stale data. It turns out that using memcached on top of an RDBMS is not quite as trivial as the code snippet above; to fix this race condition we can leverage the gumball technique.

The idea behind the gumball technique is clever: the cache server itself sees all of these operations (the get that misses, the remove, and then the put from the original getter), so it can deduce which ones will result in stale data. The only change to the protocol itself is to return a miss time $T_m$ (server time) on the first get and require the client to pass $T_m$ with the corresponding put, which we store along with the value in the cache. The rest of the work is done by the cache server itself and consists of two parts: managing "gumball" entries and rejecting certain put operations. Gumball entries are timestamped removal markers; every time a client issues a remove on a key, instead of just deleting the key, the server puts one of these gumballs that has the timestamp of the removal (again, server time). The time-to-live (TTL) of the entry, which we will refer to as $\Delta$, has a special way of being determined, but we can assume it is just some value that exceeds the typical length of a RDBMS query.

The tricky part of the algorithm comes when a put operation is issued (with a corresponding $T_m$ value). Let $T_C$ be the current time on the cache server. If $T_C - T_m > \Delta$, we ignore the put operation because a potential gumball entry could have been created in the window that has already expired. This is why $\Delta$ should be larger than the typical get/put cycle. If there is a gumball entry whose timestamp exceeds $T_m$, then it is likely that the race condition occurred and we should ignore the put operation. Lastly, if there is an actual value with miss time greater than $T_m$, we also have to ignore the put operation as the other put may have overridden a gumball entry that would have invalidated the current put. If none of these conditions hold, then we insert the key/value pair into the cache as normal (along with $T_m$).

These steps guarantee that the cache will never have a stale entry that persists following an update or delete, which brings back the consistency provided by an RDBMS in the first place. The paper goes into a bit more detail about how to choose $\Delta$ properly via a sliding window algorithm with some buffer, but the exact value is not important for correctness. It also discusses the performance impact, which seems to be negligible under normal loads, but does depend on how long the RDBMS queries take relative to the frequency of gets. The gumball technique is a nice example of leveraging minimal centralization and carefully observing access patterns to produce a simple way of ensuring distributed consistency. It would be great if memcached implementations actually started doing this, but since it requires an API change, it will probably not become widespread anytime soon. And since many websites don't actually care about this level of consistency, it is hard to say whether this will ever become standard.