Sunday, March 16, 2014

Distributed Systems and the CAP Theorem

In the field of distributed systems, the CAP theorem is an important result that often guides the design of such systems. The theorem states that a distributed system cannot satisfy consistency, availability, and partition tolerance simultaneously (see the Wikipedia article for definitions of these three properties). In practice, this typically means that, in the case of a network partition, there is some trade-off between consistency and availability. For example, HBase chooses consistency in that case, while Cassandra chooses availability (with eventual consistency). These days, many services have extremely high availability requirements, leading to the popularity of systems that sacrifice strong consistency for availability; having worked with Cassandra myself, however, it is clear that eventual consistency introduces a layer of complexity on both the client and server side that can be difficult to overcome. I was recently introduced to a blog post on how to beat the CAP theorem (it's lengthy, but well worth the read) written by Nathan Marz, the creator of Storm. He outlines an approach to building distributed systems that is intended to simplify the nature of eventual consistency.

First, let me preface the discussion. The article was met with some criticism, but from what I can tell it is mostly people not understanding his ideas combined with the provocative and ambiguous title. There's no such thing as "beating" the CAP theorem in the sense of violating it through some ingenious design; it's a theorem because someone proved it is always true and cannot be violated. The point being made is that we can address the CAP theorem in a way that doesn't lead us down a road of unmanageable complexity and, consequently, systems that are not robust and reliable.

The basic principle that the design leverages is that "data is inherently immutable." This is because data is always associated with a point in time; you can think of data as a fact that was true at that time. So while the balance of your bank account might change from $10$ to $20$ from time $T$ to time $T+1$, the two pieces of data, namely that your balance was $10$ at time $T$ and $20$ at time $T+1$, are forever true. In my experience, starting with this kind of definition sets you up for success because immutability is good, and it simplifies everything. From here, a distributed system is merely an accumulation of data that exposes methods of querying the data, where a query can be any arbitrary computation over all the data. The flexibility you have in querying the data is determined by what the system has chosen to expose, ranging from limited queries (e.g. a plain key-value store) to very expressive queries (e.g. SQL).

Now that we've gotten the mental model of distributed systems out of the way, let's take a look at the key piece of the design that let us "beat" the CAP theorem. Instead of treating all data homogeneously as most systems do, separate the data into two layers: the batch layer, say everything up until the last hour, and the real-time layer, i.e. everything from the last hour. Queries are then sent to both layers of the data and subsequently merged to produce the final result. Since queries are typically too slow to run across the entire batch layer all the time, we precompute "views" of the batch layer that allow queries to be quickly answered. For example, in a key-value store, the data is the history of all inserts, updates, and deletes, but we can precompute a view which is the current map of keys to values, which lets us answer any query by key quickly. Do this every hour, flushing the real-time layer as the view becomes available to query, and we have a system that only transiently depends on the real-time layer.

So what does this buy us? First of all, the batch layer is very simple. You have all of the data, you compute something from it, and you never have to deal with concurrency or anything of that sort. But we still need the real-time layer, which is going to be complex, so have we really saved anything? Let's think about failure (and if you're working with distributed systems, you should always be thinking about failure), both in terms of systems and humans. Failures will occur, and given the complexity of these real-time systems and the application code interacting with them, it's not unusual to corrupt or lose data in unexpected ways. The batch layer is essentially a reliable fallback mechanism in case of a catastrophe (which, as Marz recalls in an incident, can be as simple as running out of disk space somewhere). By isolating the complex real-time layer from the batch layer that is the ultimate source of truth, you protect yourself against these failures.

We can summarize this entire design exercise by the simple principle that we started out with: data is immutable (and immutability is good). Whether you're programming in a multithreaded environment or building a distributed data processing system, leverage immutability as much as you can. It simplifies things and protects you against your own mistakes. And again, I highly recommend checking out the full post to understand more of his thinking.

Sunday, March 2, 2014

Matrix Sketching

Last time, I wrote about a clever algorithm for approximating the histogram for a stream using bounded memory. The post was motivated by this paper, which is an extension of that algorithm to a problem that seems unrelated at first glance, which is matrix sketching. The matrix sketching problem is as follows: given the rows of a large matrix $A \in \mathbb{R}^{n \times m}$ as a stream, produce a matrix $B \in \mathbb{R}^{l \times m}$ where $l << n$ which is a "good" approximation for $A$ when multiplied with vectors. Specifically, the algorithm in the paper achieves the following result: if $l = 1/\epsilon$, then it produces a matrix $B$ such that, for any unit vector $x$,

$||Ax||^2 \ge ||Bx||^2 \ge ||Ax||^2 - \epsilon ||A||_f^2$

where $||A||_f$ is the Frobenius norm of $A$. Here you can see the parallels with the frequency approximation algorithm; the error is a function of how "big" the stream is, which in this case is the Frobenius norm of the input matrix.

The algorithm works as follows: start with $B$ as a $l \times m$ matrix of all zeroes, and for each input row $A_i$ do the following update:
  1. Set $B_l = A_i$ (the last row of $B$).
  2. Compute the singular value decomposition (SVD) of $B$, so we obtain $B = U \Sigma V$ with the standard assumption that the diagonal values of $\Sigma$ are $\sigma_1 \ge \sigma_2 \ge \cdots \ge \sigma_l$.
  3. "Remove" the smallest singular value from $\Sigma$ by letting

    $\bar{\Sigma} = \sqrt{\max(\Sigma - I_l \sigma_l^2, 0)}$

    where $I_l$ is the $l \times l$ identity matrix.
  4. Then set $B = \bar{\Sigma}V$ (note that the last row of $B$ is all zeroes after this step because the last row of $\bar{\Sigma}$ is all zeroes by construction).
At the end, just output the current value of $B$. I won't go into any of the proof details (they can be found in the paper), but it's interesting to try to understand what the algorithm is doing intuitively. The SVD can be thought of (very loosely) as breaking down a matrix into three transformations applied to a multidimensional space: a rotation ($V$), followed by a scaling along the axes ($\Sigma$), and lastly another rotation ($U$). So the singular values are the scaling factors of the matrix in orthogonal directions, and we are removing the smallest one from each of these directions equally. As a result, we only lose a fraction of the accuracy in any particular direction (i.e. $Bx$ versus $Ax$ for a specific $x$) compared to the Frobenius norm of $A$ as a whole.

This would be pretty cool even if it was a purely theoretical result since it ties two problems together in a very elegant way, but it gets better. The paper goes on to explore the algorithm's performance with some experiments and observes that the accuracy is quite an improvement over existing techniques for matrix sketching. Moreover, computing the SVD is somewhat expensive, so the author describes how the algorithm can be parallelized as well as a way to reduce the computational cost of the SVD step and only slightly relaxing the guarantees. It's a very nice paper that spans both the theoretical and practical domains for the matrix sketching problem.

Saturday, February 15, 2014

Streaming Frequency Approximation

It's been a while, but I finally motivated myself to write another blog post! Today's topic is approximating the frequency of items in a stream. With all the data that is generated today, streaming algorithms have become very popular as one of the efficient ways to process datasets that are far too large to fit in memory. One common data mining problem is determining the frequency of items in data, i.e. building a histogram. Consider a stream $X = x_1, x_2, \ldots, x_n$ of $n$ items, each of which belongs to one of $m$ classes $c_1, c_2, \ldots, c_m$. If $m$ is small, the problem is simple, as we simple maintain the entire histogram in $O(m)$ memory and output the exact result. The interesting case is when there are an extremely large number of distinct classes, the number of which is usually unknown. In this case, we will present an algorithm originally from this paper that approximates each value in the histogram within $\epsilon n$ using $O(1/\epsilon)$ memory.

At any point while processing the stream, the in-memory state is a partial histogram with at most $k = 1/\epsilon$ elements (class/count pairs). When processing an element $x_i$, we use the following procedure with three cases:
  1. $x_i$ is already in the histogram: increment its count;
  2. we have fewer than $k$ elements in the histogram: add $x_i$ with count $1$;
  3. the histogram already has $k$ other elements: while the histogram has $k$ elements, decrease the count of each element by $1$ and evict any elements with zero count, and then add $x_i$ with count $1$.
That's the entire algorithm, and it's easy to prove that it achieves the desired approximation as well. The only case in which we "lose" information is case 3, where the count of each of $k$ elements is decreased by $1$. But we can clearly decrease counts at most $n/k = \epsilon n$ times, since that would necessarily reduce all counts to zero. Thus, using the partial histogram and outputting the frequency of any class not in the histogram as zero, we are guaranteed that each count is within $\epsilon n$ of the actual count. In particular, this means that the algorithm will be guaranteed to output a positive count for any class that occurs with frequency $\epsilon n$ or more in the stream.

I've always found this problem to be particularly beautiful because of the simplicity of the result, the algorithm, as well as the proof. Moreover, it's a very practical problem with many applications. To make things even more interesting, a recent paper shows how this concept can be generalized to do matrix sketching (i.e. approximating a matrix with a smaller one). I hope to get around to writing about that at some point as well.

Sunday, December 8, 2013


The virtual currency bitcoin has been getting a lot of hype and news recently, as the price per coin has gone from about \$14 at the beginning of the year to a high of over \$1200 last week with a subsequent drop to its current value, which is between \$700 and \$800 (see here). While nobody really knows the future of bitcoin, the success of which depends highly on regulations, the technical aspects of its implementation are quite interesting. For the basic overview, first check out this explanation from the website, which the rest of this post assumes at least cursory knowledge of. The original bitcoin paper that contains all of the technical details I will be explaining can be found here. The core concept is a virtual currency that does not require a centralized authority to manage, which naturally makes it much more flexible and reduces the overhead of processing transactions. In order for such a system to work, there must be some source of trust, which is accomplished through a combination of public-key cryptography and a proof-of-work system.

Bitcoin, at its core, is really just a sequence of transactions between wallets, each of which is signed by the sender. Each wallet has a public key and a private key, and senders sign transactions with their private key such that everyone else can validate the transactions using the public key, as is typical with public-key cryptography. The problem that arises is that there is no easy way to verify that the sender did not "double-spend" their bitcoins, i.e. signing two transactions which send the same bitcoins. As it turns out, the only way to prevent such a situation without a centralized authority is to have a public, linear history of all transactions (known as the block chain). Assuming such a history, every new transaction can be verified to not include double-spending, and we have a working system.

So the remaining problem is how a distributed, peer-to-peer system can generate a transaction history that everyone agrees on. The solution is a proof-of-work system that is based on cryptographic hashing. At any point in time, bitcoin miners are continuously brute-forcing through hash computations in order to produce the next block in the chain. A block consists of the previous block hash, a set of transactions, and a nonce; it is only valid if its hash has sufficient difficulty, where difficulty is the number of 0's the hash begins with. Miners continually increment the nonce until they produce a block whose hash difficulty is accepted by the system (i.e. everyone else). The difficulty is chosen such that blocks are produced around every 10 minutes, which is why bitcoin transactions typically take that long to verify. Once a block has been committed to the chain, all the miners will generally accept it and move on to the next block, where there are some details about blocks being produced simultaneously, etc. The transactions that are part of the chain form the history from which future transactions can be verified. An example of a recently-produced block can be seen here.

There are a couple of remaining points that should be addressed when talking about bitcoin. Firstly, what is the incentive for people to mine (and thus produce the transaction history necessary for the system to function)? Bitcoin mining is the only way to produce new bitcoins; each block has a special transaction that gives the producer of the block some bitcoins, currently 25, although this reward decreases over time since the total number of bitcoins in circulation is capped at 21 million. The other incentive is transaction fees that are collected as payment for keeping the system running. Another issue is that the transaction history seems like it will grow infinitely and become unmanageable. This can be alleviated using the fact that transactions which are used as inputs to other transactions can be garbage-collected using Merkle trees (details in the paper). Lastly, it is important to consider the possibility of attackers producing fake blocks that reverse old transactions. The key point is that reversing an old transaction involves recomputing all of the blocks from that transaction onward since each block is a function of the previous one's hash. As such, the attacker would have to produce transaction history as quickly as the rest of the network combined, which seems to be a strong enough guarantee for people to trust bitcoin.

Whether bitcoin is a fad or not, it has a neat theoretical foundation, and I'm impressed by all of the work people are putting into it ( is super cool). There is certainly room for disruption in the currency and payments industry, and it's only a matter of time before money is a purely virtual concept. Bitcoin also shows us that cryptography is a powerful tool, and it's worth thinking about how we can rely on mathematics for trust instead of other people (as depressing as that might sound).

Thursday, November 28, 2013

Asynchronous HTTP with spray

Last time, I wrote about the high-level ideas behind the Scala libraries akka and spray, namely the concept of reactive programming in order to meet the demands of modern applications. I finally got the chance to play around a bit with spray in order to get a feel for how the theory of event-driven, asynchronous programming manifests itself in code. I started with a very simple web application that takes a stock ticker symbol and uses the Yahoo Query Language (YQL) to look up basic quote information for the company (using this endpoint). Here's some code for a trait that uses spray-http and spray-client to execute a request against the Yahoo API:

You'll notice that I included all of the necessary imports in the example (I normally leave them out), and this is because a lot of things look nicer once imported (e.g. GET vs HttpMethods.GET) and spray uses implicits quite extensively to expose a nice domain-specific language (DSL). So knowing what the imports are is actually pretty important for figuring out why the code actually compiles and works properly. Now let's break down the code. The request method is the simplest part, and all it does is take in the YQL query and construct an HTTP request with the appropriate URL and parameters. Spray has some convenient methods for making this easy, but there's nothing too special involved.

The executeQuery method is where all the magic happens. To understand it, we'll need to revisit two concepts: futures and actors. Futures are the Scala way of expressing a value that is computed asynchronously. In this case, executeQuery returns a future because, per the reactive programming principles, we do not want to block the thread while waiting for the request to be sent and the response to be received over the network. This means that when the method returns, it is most likely that the response has not come back, so whoever consumes the result of the future will need to wait for it (e.g. using Await). Actors are an abstraction for handling asynchronous computations using a messaging model; you send messages to an actor, which does some computation based on the contents of the message and potentially returns a result. In this case, we have an implicit ActorSystem that is used by spray-client to run an actor which receives the HttpRequest and returns an HttpResponse. The sendReceive method conveniently encapsulates all of this functionality.

Here is our YQL client in action:

Since this is a test, we define a test ActorSystem to use and then use Await to get the response from the future, but when integrated with spray-can (the HTTP server) it becomes much nicer. In contrast to the Java servlet model where we would have many threads all probably blocked on the network, spray encourages everything to be asynchronous and utilizes a small pool of threads. To that end, the "controllers" you define in spray-can allow futures to be returned and automatically map them to HTTP responses, so any request that requires asynchronous processing will naturally result in a future that is handled entirely by the underlying abstraction. Next time, we'll see how to layer other pieces such as JSON and error handling on top of futures in spray.

Sunday, November 17, 2013

Reactive Programming

There's an interesting idea being pushed forward by the Scala community about the way systems should be built, and it's called reactive programming. The reactive manifesto gives a nice summary of their views on the future of distributed applications and how the way we programmed ten years ago is inadequate in this future. Because of the changing landscape of latency expectations, data scale, and hardware improvements, going forward, systems must be event-driven, scalable, resilient, and responsive in order to meet user needs. There is a significant amount of ongoing Scala development in order to build tools that allow programmers to achieve these properties easily. This is the goal of the akka and spray projects.

The reactive programming model being adopted is based on actors. Given that I am a huge fan of the actor model for how it deals with the problems of concurrency, it is great to see an entirely new ecosystem developing around actors. The Scala community has really invested in actors as the new way in which communication between systems (or even within them) will be done in the future. From some slides for a recent talk on akka and spray, actors are part of a different programming paradigm that is "well-suited for building reactive systems" because they inherently shun the root of all concurrency evil, shared mutable state. Specifically, using actors versus the old model of managing threads in servlets gets around the high overhead of such management as well as the difficulties of synchronizing state. In some ways, we can view this as the next step in lightweight request handling: it was once (and still sometimes is) the case that each HTTP request would spawn a process, but that was greatly improved upon by having many threads within a single process handle requests. With actors, a single thread can handle many requests at the same time, allowing the resources to be much more efficiently utilized (this is the premise behind Node.js as well).

Spray is what you would expect the HTTP stack to look like on top of akka. While I haven't played with it myself, the code examples show that requests are mapped directly to actor messages, so everything is under the unified model of message handling. Judging from this, they have done a pretty good job from a performance standpoint as well, indicating just how lightweight request handling is in spray. Akka and spray together form a solid foundation on which you can build an application that conforms to the principles behind reactive programming. It is exciting to see technology evolving to meet the needs of users, programmers, and applications, and I'm sure that in a few years we'll see how far this paradigm can take us.

Sunday, November 10, 2013

For Comprehensions

In Scala, the concept of a "for loop" doesn't really exist. Instead, we get "for comprehensions," which cover for loops, foreach loops, and more as we will see. Suppose you have the following line of Scala:

for (i <- 0 until 10) yield i

It looks roughly like a for loop with some syntactic sugar. But as I mentioned, Scala doesn't have for loops, and this code actually gets translated by the compiler into

(0 until 10).map(i => i)

The until method is part of the RichInt class, and an implicit conversion from Int to RichInt is available by default in Scala. (0 until 10) produces a Range, and the result of the entire expression is an IndexedSeq[Int]. But what's so interesting about the fact that Scala translates for loops into map calls? It turns out that this translation is generic, i.e. not specific to a certain set of classes like collections. So we can write our own class that exposes a map method and leverage the syntactic sugar of for comprehensions. Here's an example based on a Sequence trait that represents an infinite sequence of integers:

A Sequence only knows how to do two things: get the element at some index and map to a new sequence using a function. We have two simple types of sequences, namely the ArithmeticSequence (whose name should be self-descriptive), and the MappedSequence that takes another sequence and applies a function to each element. The implementations of the two should be straightforward to understand. Now comes the interesting part: because we have implemented the map method, we can use the for comprehension syntax on our Sequence classes.

We start out with a simple ArithmeticSequence, but then we create a new sequence (using a for comprehension) in which every element is squared. And then we create a third sequence which replaces all of the odd elements of the second sequence with 0. All three of these are infinite sequences that we can access random elements of, as demonstrated in the final line.

What I've shown here is a primitive example of how for comprehensions can be leveraged by custom classes in Scala. This functionality was brought to my attention in the first week of the reactive programming class on Coursera, which is taught by some of the most knowledgeable Scala folks (including the designer of the language!). It turns out that the map method lets you do simple thing like what I did, but if you implement flatMap and filter, you can do arbitrarily complex for comprehensions with multiple variables and conditionals just like you would with collections. This is a good example of an aspect I like about Scala's philosophy, which is to generalize concepts as much as possible. While many languages treat for loops as their own special construct, Scala simply makes them syntactic sugar for a much richer class of functionality.