Sunday, December 8, 2013


The virtual currency bitcoin has been getting a lot of hype and news recently, as the price per coin has gone from about \$14 at the beginning of the year to a high of over \$1200 last week with a subsequent drop to its current value, which is between \$700 and \$800 (see here). While nobody really knows the future of bitcoin, the success of which depends highly on regulations, the technical aspects of its implementation are quite interesting. For the basic overview, first check out this explanation from the website, which the rest of this post assumes at least cursory knowledge of. The original bitcoin paper that contains all of the technical details I will be explaining can be found here. The core concept is a virtual currency that does not require a centralized authority to manage, which naturally makes it much more flexible and reduces the overhead of processing transactions. In order for such a system to work, there must be some source of trust, which is accomplished through a combination of public-key cryptography and a proof-of-work system.

Bitcoin, at its core, is really just a sequence of transactions between wallets, each of which is signed by the sender. Each wallet has a public key and a private key, and senders sign transactions with their private key such that everyone else can validate the transactions using the public key, as is typical with public-key cryptography. The problem that arises is that there is no easy way to verify that the sender did not "double-spend" their bitcoins, i.e. signing two transactions which send the same bitcoins. As it turns out, the only way to prevent such a situation without a centralized authority is to have a public, linear history of all transactions (known as the block chain). Assuming such a history, every new transaction can be verified to not include double-spending, and we have a working system.

So the remaining problem is how a distributed, peer-to-peer system can generate a transaction history that everyone agrees on. The solution is a proof-of-work system that is based on cryptographic hashing. At any point in time, bitcoin miners are continuously brute-forcing through hash computations in order to produce the next block in the chain. A block consists of the previous block hash, a set of transactions, and a nonce; it is only valid if its hash has sufficient difficulty, where difficulty is the number of 0's the hash begins with. Miners continually increment the nonce until they produce a block whose hash difficulty is accepted by the system (i.e. everyone else). The difficulty is chosen such that blocks are produced around every 10 minutes, which is why bitcoin transactions typically take that long to verify. Once a block has been committed to the chain, all the miners will generally accept it and move on to the next block, where there are some details about blocks being produced simultaneously, etc. The transactions that are part of the chain form the history from which future transactions can be verified. An example of a recently-produced block can be seen here.

There are a couple of remaining points that should be addressed when talking about bitcoin. Firstly, what is the incentive for people to mine (and thus produce the transaction history necessary for the system to function)? Bitcoin mining is the only way to produce new bitcoins; each block has a special transaction that gives the producer of the block some bitcoins, currently 25, although this reward decreases over time since the total number of bitcoins in circulation is capped at 21 million. The other incentive is transaction fees that are collected as payment for keeping the system running. Another issue is that the transaction history seems like it will grow infinitely and become unmanageable. This can be alleviated using the fact that transactions which are used as inputs to other transactions can be garbage-collected using Merkle trees (details in the paper). Lastly, it is important to consider the possibility of attackers producing fake blocks that reverse old transactions. The key point is that reversing an old transaction involves recomputing all of the blocks from that transaction onward since each block is a function of the previous one's hash. As such, the attacker would have to produce transaction history as quickly as the rest of the network combined, which seems to be a strong enough guarantee for people to trust bitcoin.

Whether bitcoin is a fad or not, it has a neat theoretical foundation, and I'm impressed by all of the work people are putting into it ( is super cool). There is certainly room for disruption in the currency and payments industry, and it's only a matter of time before money is a purely virtual concept. Bitcoin also shows us that cryptography is a powerful tool, and it's worth thinking about how we can rely on mathematics for trust instead of other people (as depressing as that might sound).

Thursday, November 28, 2013

Asynchronous HTTP with spray

Last time, I wrote about the high-level ideas behind the Scala libraries akka and spray, namely the concept of reactive programming in order to meet the demands of modern applications. I finally got the chance to play around a bit with spray in order to get a feel for how the theory of event-driven, asynchronous programming manifests itself in code. I started with a very simple web application that takes a stock ticker symbol and uses the Yahoo Query Language (YQL) to look up basic quote information for the company (using this endpoint). Here's some code for a trait that uses spray-http and spray-client to execute a request against the Yahoo API:

You'll notice that I included all of the necessary imports in the example (I normally leave them out), and this is because a lot of things look nicer once imported (e.g. GET vs HttpMethods.GET) and spray uses implicits quite extensively to expose a nice domain-specific language (DSL). So knowing what the imports are is actually pretty important for figuring out why the code actually compiles and works properly. Now let's break down the code. The request method is the simplest part, and all it does is take in the YQL query and construct an HTTP request with the appropriate URL and parameters. Spray has some convenient methods for making this easy, but there's nothing too special involved.

The executeQuery method is where all the magic happens. To understand it, we'll need to revisit two concepts: futures and actors. Futures are the Scala way of expressing a value that is computed asynchronously. In this case, executeQuery returns a future because, per the reactive programming principles, we do not want to block the thread while waiting for the request to be sent and the response to be received over the network. This means that when the method returns, it is most likely that the response has not come back, so whoever consumes the result of the future will need to wait for it (e.g. using Await). Actors are an abstraction for handling asynchronous computations using a messaging model; you send messages to an actor, which does some computation based on the contents of the message and potentially returns a result. In this case, we have an implicit ActorSystem that is used by spray-client to run an actor which receives the HttpRequest and returns an HttpResponse. The sendReceive method conveniently encapsulates all of this functionality.

Here is our YQL client in action:

Since this is a test, we define a test ActorSystem to use and then use Await to get the response from the future, but when integrated with spray-can (the HTTP server) it becomes much nicer. In contrast to the Java servlet model where we would have many threads all probably blocked on the network, spray encourages everything to be asynchronous and utilizes a small pool of threads. To that end, the "controllers" you define in spray-can allow futures to be returned and automatically map them to HTTP responses, so any request that requires asynchronous processing will naturally result in a future that is handled entirely by the underlying abstraction. Next time, we'll see how to layer other pieces such as JSON and error handling on top of futures in spray.

Sunday, November 17, 2013

Reactive Programming

There's an interesting idea being pushed forward by the Scala community about the way systems should be built, and it's called reactive programming. The reactive manifesto gives a nice summary of their views on the future of distributed applications and how the way we programmed ten years ago is inadequate in this future. Because of the changing landscape of latency expectations, data scale, and hardware improvements, going forward, systems must be event-driven, scalable, resilient, and responsive in order to meet user needs. There is a significant amount of ongoing Scala development in order to build tools that allow programmers to achieve these properties easily. This is the goal of the akka and spray projects.

The reactive programming model being adopted is based on actors. Given that I am a huge fan of the actor model for how it deals with the problems of concurrency, it is great to see an entirely new ecosystem developing around actors. The Scala community has really invested in actors as the new way in which communication between systems (or even within them) will be done in the future. From some slides for a recent talk on akka and spray, actors are part of a different programming paradigm that is "well-suited for building reactive systems" because they inherently shun the root of all concurrency evil, shared mutable state. Specifically, using actors versus the old model of managing threads in servlets gets around the high overhead of such management as well as the difficulties of synchronizing state. In some ways, we can view this as the next step in lightweight request handling: it was once (and still sometimes is) the case that each HTTP request would spawn a process, but that was greatly improved upon by having many threads within a single process handle requests. With actors, a single thread can handle many requests at the same time, allowing the resources to be much more efficiently utilized (this is the premise behind Node.js as well).

Spray is what you would expect the HTTP stack to look like on top of akka. While I haven't played with it myself, the code examples show that requests are mapped directly to actor messages, so everything is under the unified model of message handling. Judging from this, they have done a pretty good job from a performance standpoint as well, indicating just how lightweight request handling is in spray. Akka and spray together form a solid foundation on which you can build an application that conforms to the principles behind reactive programming. It is exciting to see technology evolving to meet the needs of users, programmers, and applications, and I'm sure that in a few years we'll see how far this paradigm can take us.

Sunday, November 10, 2013

For Comprehensions

In Scala, the concept of a "for loop" doesn't really exist. Instead, we get "for comprehensions," which cover for loops, foreach loops, and more as we will see. Suppose you have the following line of Scala:

for (i <- 0 until 10) yield i

It looks roughly like a for loop with some syntactic sugar. But as I mentioned, Scala doesn't have for loops, and this code actually gets translated by the compiler into

(0 until 10).map(i => i)

The until method is part of the RichInt class, and an implicit conversion from Int to RichInt is available by default in Scala. (0 until 10) produces a Range, and the result of the entire expression is an IndexedSeq[Int]. But what's so interesting about the fact that Scala translates for loops into map calls? It turns out that this translation is generic, i.e. not specific to a certain set of classes like collections. So we can write our own class that exposes a map method and leverage the syntactic sugar of for comprehensions. Here's an example based on a Sequence trait that represents an infinite sequence of integers:

A Sequence only knows how to do two things: get the element at some index and map to a new sequence using a function. We have two simple types of sequences, namely the ArithmeticSequence (whose name should be self-descriptive), and the MappedSequence that takes another sequence and applies a function to each element. The implementations of the two should be straightforward to understand. Now comes the interesting part: because we have implemented the map method, we can use the for comprehension syntax on our Sequence classes.

We start out with a simple ArithmeticSequence, but then we create a new sequence (using a for comprehension) in which every element is squared. And then we create a third sequence which replaces all of the odd elements of the second sequence with 0. All three of these are infinite sequences that we can access random elements of, as demonstrated in the final line.

What I've shown here is a primitive example of how for comprehensions can be leveraged by custom classes in Scala. This functionality was brought to my attention in the first week of the reactive programming class on Coursera, which is taught by some of the most knowledgeable Scala folks (including the designer of the language!). It turns out that the map method lets you do simple thing like what I did, but if you implement flatMap and filter, you can do arbitrarily complex for comprehensions with multiple variables and conditionals just like you would with collections. This is a good example of an aspect I like about Scala's philosophy, which is to generalize concepts as much as possible. While many languages treat for loops as their own special construct, Scala simply makes them syntactic sugar for a much richer class of functionality.

Sunday, November 3, 2013

Gumball Technique

Using memcached on top of a traditional relational database (RDBMS) has become the most popular way of building large-scale web services, as it is both simple and fast in the majority of cases. Loads are typically read-heavy because for each piece of content produced, many people will consume it. Moreover, the most popular pieces of content will account for the majority of traffic. As such, having an in-memory caching layer that is able to efficiently serve the mass of read requests to the same data will naturally become standard. Cache consistency, however, is known to be a hard problem; in this case it involves synchronizing the state of two data stores that do not provide any transactional guarantees between them. Consider the following code for a client that talks to both memcached and an RDBMS.

Assuming that the cache and the RDBMS expose the same interface as the client itself, this code for handling puts and gets is straightforward. On a get, check if the cache has the value; if not, go to the database and cache the value retrieved if it exists. On a put, issue a put on the database and invalidate the entry in the cache. In barely 10 lines of real code, we already see a race condition that can result in stale cache data. For example, suppose the database currently has the pair ("k", "v1"), the cache does not have "k", and two clients A and B are simultaneously getting the key "k" and putting the pair ("k", "v2"), respectively. Then the following sequence is possible:
  1. A sees that the cache does not contain the key "k".
  2. A gets the value "v1" from the RDBMS.
  3. B puts the pair ("k", "v2") into the RDBMS.
  4. B removes the key "k" from the cache (was not there in the first place).
  5. A caches the pair ("k", "v1") and returns.
So client A receives the value "v1", which is fine, but it also leaves that value in the cache after B has already updated it, meaning that future gets for key "k" will return stale data. It turns out that using memcached on top of an RDBMS is not quite as trivial as the code snippet above; to fix this race condition we can leverage the gumball technique.

The idea behind the gumball technique is clever: the cache server itself sees all of these operations (the get that misses, the remove, and then the put from the original getter), so it can deduce which ones will result in stale data. The only change to the protocol itself is to return a miss time $T_m$ (server time) on the first get and require the client to pass $T_m$ with the corresponding put, which we store along with the value in the cache. The rest of the work is done by the cache server itself and consists of two parts: managing "gumball" entries and rejecting certain put operations. Gumball entries are timestamped removal markers; every time a client issues a remove on a key, instead of just deleting the key, the server puts one of these gumballs that has the timestamp of the removal (again, server time). The time-to-live (TTL) of the entry, which we will refer to as $\Delta$, has a special way of being determined, but we can assume it is just some value that exceeds the typical length of a RDBMS query.

The tricky part of the algorithm comes when a put operation is issued (with a corresponding $T_m$ value). Let $T_C$ be the current time on the cache server. If $T_C - T_m > \Delta$, we ignore the put operation because a potential gumball entry could have been created in the window that has already expired. This is why $\Delta$ should be larger than the typical get/put cycle. If there is a gumball entry whose timestamp exceeds $T_m$, then it is likely that the race condition occurred and we should ignore the put operation. Lastly, if there is an actual value with miss time greater than $T_m$, we also have to ignore the put operation as the other put may have overridden a gumball entry that would have invalidated the current put. If none of these conditions hold, then we insert the key/value pair into the cache as normal (along with $T_m$).

These steps guarantee that the cache will never have a stale entry that persists following an update or delete, which brings back the consistency provided by an RDBMS in the first place. The paper goes into a bit more detail about how to choose $\Delta$ properly via a sliding window algorithm with some buffer, but the exact value is not important for correctness. It also discusses the performance impact, which seems to be negligible under normal loads, but does depend on how long the RDBMS queries take relative to the frequency of gets. The gumball technique is a nice example of leveraging minimal centralization and carefully observing access patterns to produce a simple way of ensuring distributed consistency. It would be great if memcached implementations actually started doing this, but since it requires an API change, it will probably not become widespread anytime soon. And since many websites don't actually care about this level of consistency, it is hard to say whether this will ever become standard.

Sunday, October 20, 2013


Apache Kafka is a distributed messaging system developed by LinkedIn. It functions similarly to a message queue, but is more specialized for the simultaneous use-cases of real-time and batch log processing. In particular, Kafka emphasizes throughput, scalability, and consumer flexibility over supporting some of the more traditional message queue features, e.g. the Java Message Service (JMS) standard. This paper describes the architecture of Kafka and how it is used at LinkedIn. We'll take a look at a handful of design and implementation decisions in Kafka that enable it to be very efficient and flexible.

At a high level, Kafka follows a messaging model similar to that of other message queues. Producers send messages to "topics," which are essentially namespaces for the messages to live in, and consumers subscribe to the topics that they wish to receive messages from. The first set of implementation details that enable Kafka to be efficient come from within a single server (referred to as a "broker"). Each broker stores some set of partitions for a topic, so that the messages can be distributed among multiple brokers (we'll see how the partitions play into the consumer side later). Each partition can be thought of as a set of messages appended to each other and broken up into files of reasonable size (the paper uses 1GB as the example). Like in many other systems, inserts in Kafka are implemented as appending the message to the end of a file, which is efficient using commodity hardware. The authors, however, choose not to do any in-memory caching of the messages from disk. The reasons are twofold: firstly, the standard OS page cache is quite effective for the access patterns being used (i.e. write-through and read-ahead policies), and secondly, they leverage the Linux sendfile API to send data directly from the file to the socket, avoiding the overhead of having the data live in user space. An interesting effect of this is that the Kafka process itself does not manage a lot of memory, so it is less prone to garbage collection problems and restarts since most of the memory is handled by the OS. These optimizations enable Kafka to have excellent performance on a single broker.

Kafka does interesting things on the consumer side as well. In traditional message queues, the server is typically in charge of keeping track of which messages have been sent to which consumers based on the acknowledgments it has received. Kafka inverts this model and lets consumers request messages from whatever position they wish to. There are a few issues that this can run into. Brokers can no longer clean up messages that they know have been consumed because consumers can always request older ones. This is solved by Kafka having a retention model of a fixed period of time, e.g. 7 days; because of how the partitions are implemented within a single broker, performance does not degrade as partitions grow (unlike with many message queue implementations). This also enables the different patterns of consumption that Kafka was built to handle. Real-time services will consume messages as soon as they appear, while data warehousing and analytics services can, for example, batch consume once per day. The other effect of brokers not controlling message delivery is that consumers who wish to distribute the messages of a topic among themselves (so that each message is consumed once) must coordinate in some way. Kafka achieves this by using the existing highly-available service ZooKeeper. Both consumers and brokers must talk to ZooKeeper in order to coordinate which consumers are responsible for which partitions of a topic and what offset each partition is currently at. So the logic for consumption progress is external to the implementation of a single partition, which allows that to be very simple and efficient as described above.

Messaging systems often form the core of a distributed service, so reliability, scalability, and throughput are of the utmost importance. Having had some less-than-ideal experiences with existing message queues, it is refreshing to see a new approach in Kafka. While it does not satisfy the most stringent durability requirements, Kafka makes up for it with excellent performance and the ability to distribute effectively. It is also an example of how we often do not need to build our own locking/consensus services anymore because ZooKeeper provides a flexible API and strong guarantees on both consistency and availability. I anticipate that future messaging systems will make many of the same decisions that Kafka made in order to satisfy growing data volumes and the fundamentally distributed nature of systems.

Sunday, October 13, 2013

Google Go (Part 3, Types)

In the previous two posts, I introduced Google's Go programming language and the model of concurrent programming that it adopts. We've seen that Google has made a number of interesting choices in the design of the language, and their approach to types is yet another. There are two key parts that define how types work in Go: "static duck typing" and the absence of inheritance. The former is not too unusual, as most dynamic languages have duck typing and Scala, for example, has structural typing. Completely throwing away the concept of inheritance, however, is a bold statement about how object-oriented programming should be done. Let's take a look at each of these in a little more detail.

Duck typing is prevalent nowadays, with dynamic languages like JavaScript and Python being so popular. To some people, however, it might be comforting to have the guarantee that when your code runs it won't suddenly crash due to a method not existing when you expected it to. That's the guarantee of statically typed language, but the concept is often associated with more "burdensome" programming languages such as C++ and Java. Go is a statically typed language, but it does so without an explicit type hierarchy; as long as an object presents the methods in an interface, it can be considered an implementation of the interface (hence duck typing). For example:

Here we have two interfaces, UnaryFunction and UnaryInvertible, which each have a single method. Then we have two classes/structs PlusOne and MinusOne which implement both interfaces (not explicitly specified). Note that we are able to pass around instances of PlusOne and MinusOne as either UnaryFunction or UnaryInvertible objects because the Go compiler is able to determine that they satisfy the signatures of the interfaces. So while it feels like we are doing dynamic typing without any guarantee that methods exist, the compiler is simply inferring the type relationships that we intended. This is cool because it lets programmers design interfaces around the true functionality without having to worry about how to deal with existing code or polluting the codebase with "implements" statements (and is still type safe!). For details as to how Go implements method dispatching efficiently, refer to this blog post.

Now on to the bigger question: why did the designers of Go decide to get rid of inheritance altogether? It is well-known that programmers often use inheritance when they should instead be using composition. In fact, the idea of using composition over inheritance is important enough to have a Wikipedia article of its own. I found an excellent summary of when to choose composition versus inheritance in this Stack Overflow post, which essentially says that inheritance should only be used when you want the complete public interface of the superclass. The Liskov substitution principle is a formal specification of this rule of thumb and makes the requirements for inheritance very strict: "in a computer program, if S is a subtype of T, then objects of type T may be replaced with objects of type S (i.e., objects of type S may be substituted for objects of type T) without altering any of the desirable properties of that program (correctness, task performed, etc.)." In theory, this is quite sensible: inheritance is supposed to specify an "is-a" relationship, so an instance of the subclass should be able to be treated as an instance of the superclass in all cases.

The designers of Go believe that forcing programmers to use composition instead of inheritance in all cases results in code that is less brittle and adapts better to new requirements. Specifically, "the [type] hierarchy must be designed early, often as the first step of designing the program, and early decisions can be difficult to change once the program is written. As a consequence, the model encourages early overdesign as the programmer tries to predict every possible use the software might require, adding layers of type and abstraction just in case. This is upside down. The way pieces of a system interact should adapt as it grows, not be fixed at the dawn of time." I understand the problems that they are addressing having dealt with type hierarchies that grow and grow until they become impossible to understand. The idea of decoupling independent pieces of functionality that come together to produce a class fits the mantra of modularity being the key to building scalable programs, and perhaps Go taking such an extreme stance is a message to us saying that we're not programming in the correct way.

This wraps up my series of posts on Go. I think Google has done a great job of identifying a number of important areas of programming that could use some innovation. Go puts forth some options for how we can improve things, but they are certainly not the only ones available. I am very keen on learning about whether people have reaped the expected benefits of Go's design decisions and which ones are the most effective. In the future, I am sure we will see new and existing languages try to incorporate some of the ideas behind Go that seem strange now but may turn out to be essential.

Thursday, October 10, 2013

Google Go (Part 2, Concurrency)

Last time, I wrote about some of the interesting and unique design decisions regarding dependencies and imports in Google's Go programming language. This time, we're back to my favorite topic: concurrency. Because essentially all machines have multiple cores/processors now, every new language has to address the problem of concurrent programming in some way and Go is no exception. Recall that the guiding principle behind the design of Go is its focus on optimizing the process of building large software systems, which includes things like developer productivity and tool support. The result of this is that Go adopts a communicating sequential processes (CSP) approach to concurrency, which "has the property that it is easy to add to a procedural programming model without profound changes to that model." This is a little disappointing when you consider the options out there, but given Google's preference for familiarity and the fact that they want to be building real systems quickly using Go, the choice is not too surprising. That said, it is still worthwhile to explore how it integrates into the language and what its strengths and weaknesses are.

Concurrency in Go is built around two concepts: goroutines and channels (note that the code examples below are roughly taken from these links). A goroutine is a fairly simple concept: it simply allows you to spawn a function that runs concurrently. For example, the statement go list.Sort() spawns a goroutine that sorts the list. The nice thing about this is that Go abstracts away the details of creating and managing threads while still efficiently utilizing the CPU. It is able to identify when a goroutine is blocking on I/O, for example, and ensure that the OS thread that is executing that goroutine is freed up to execute a different one. A world in which functions run concurrently and independently is great, but the interesting stuff comes when they need to communicate, which leads us to channels. It turns out that they are not very complex either; a channel is more or less just a blocking queue that multiple goroutines can take from and put into. It allows them to synchronize at certain points and send messages to each other. For example, if you wanted to wait for a goroutine to finish (e.g. the equivalent of joining a thread in Java), you would do something like this:

In the above code snippet, the c <- 1 line sends the value 1 (arbitrary) to the channel, while the <-c line blocks on receiving a value from the channel (which is ignored). By default channels have no buffer (imagine a blocking queue of size 0) so sends and receives wait for each other, but they can be given a capacity as we will see next.

Let's look at a slightly more involved example with a channel being used as a semaphore. In the code below we simulate a system that is processing requests from a queue; we would like to process requests in parallel, but limit the maximum number of simultaneous requests being processed (i.e. a typical semaphore use case).

The number of elements in the channel indicates how many permits the semaphore has available, so getting a permit is receiving from the channel and releasing a permit is sending to the channel. We spawn a goroutine for each request in the queue (which is itself a channel) that processes the request and releases a permit.

The more interesting thing about this example, however, is that it contains an insidious bug. You might expect the closure that the goroutine executes to capture the current request that is being dispatched, but unfortunately it doesn't. In this case, the language closes over the reference to req rather than the value at the time of dispatch, which means that you get undefined behavior over which requests are actually processed. Each goroutine will process whatever request the req variable points to when it executes instead of when it was created. It is unfortunate that such a common pattern has a pitfall like this, and the pain is evidenced by numerous mentions of the problem (e.g. here and here) that suggest rather ugly solutions like defining another variable in the loop body or passing the value explicitly as a parameter to the goroutine.

As far as simple and familiar models of concurrent programming go, CSP based on goroutines and channels is not bad. The danger really lies in the fact that Go has a memory model similar to that of C++ (i.e. pointers and references) which can lead to bad code with the use of channels. The article explains it as, "Go enables simple, safe concurrent programming but does not forbid bad programming." This feels accurate to me and may be a practical choice for the language, but given our understanding of how easy it is to "program badly," one might wish that Go took a more progressive approach in addressing the problems around concurrency.

Sunday, October 6, 2013

Google Go (Part 1)

I recently started learning more about Google's Go programming language, which has been on the radar for several years now and seems to be gaining more traction. I found this article to be a very insightful piece about the key design decisions and goals behind the language. At the philosophical level, Go is less about developing an "ideal" way to program like Clojure or Scala and more about being the most practical language for large-scale systems engineering. It does not attempt to revolutionize the programming model but rather optimizes for the software engineering cycle and how systems are built in the real world. To quote the article, "The goals of the Go project were to eliminate the slowness and clumsiness of software development at Google, and thereby to make the process more productive and scalable. The language was designed by and for people who write—and read and debug and maintain—large software systems." Coming from the company that builds the most large software systems in the world, that shouldn't be all too surprising, and the problems that Go tries to address are well worth understanding.

One of the focal points in the design of Go is dependencies, which might seem like a strange thing to get worked up over, but turns out to have a big effect on compilation time, especially if you're coming from the C++ world (which much of Google is). I won't go into too much detail, but this section in the article details the problems that Google experienced while building C++ binaries that caused header files to be read by the compiler tens of thousands of times. Most modern programming languages have already found ways to be more efficient when importing dependencies, but it's interesting to note that Go was observed to be around fifty times better than C++ in this area while having a similar import model. The more unique design choice that they made regarding imports was to require the package name when referencing anything from an external package. For example, here's a quick JSON parsing example:

Three packages are imported at the top: json, fmt, and strings. Note that we do not import a specific class, but rather an entire package. Any references to classes or functions from that package require you to scope it with the package name, e.g. json.NewDecoder or strings.NewReader. I am a big fan of this "limitation" because it avoids name collisions and makes it clear which dependencies are being used where. This is particularly important in Go because the compiler is very strict about which dependencies are imported; an unused dependency is actually a compile error in the language (another thing I am a big fan of). I believe that Go's somewhat unorthodox choices on how to handle dependencies make the code more understandable by both humans and computers.

To wrap up the discussion of imports, let's look at another unusual choice regarding naming, specifically capitalization. While most languages have no requirements and rely on conventions for capitalization (Erlang being a notable exception), Go decides to use capitalization as a first-class differentiator for scoping. That is, uppercase names (e.g. NewDecoder and NewReader in the above example) are visible to clients of the package while lowercase ones are not. While this takes some getting used to, the overall benefit to readability seems clear; instead of scattering public and private like in Java or re-declaring public signatures like in C++, programmers simply need to pay attention to the case of a class or function to determine its scope. To quote the article again, "The name is, after all, what clients of the package use; putting the visibility in the name rather than its type means that it's always clear when looking at an identifier whether it is part of the public API. After using Go for a while, it feels burdensome when going back to other languages that require looking up the declaration to discover this information."

Within just dependencies and naming, Go already shows many signs of deviating from the norm in order to achieve its singular goal: programmer productivity when building large-scale systems. In the future, I'll explore additional unique aspects of Go, including some of the more fundamental language semantics like inheritance (or, rather, the lack thereof), concurrency, and garbage collection.

Monday, September 23, 2013

Erlang and Riak

Today I learned that Riak, a distributed database based on Amazon's Dynamo, is written in Erlang. This is really cool because I personally have some experience with Apache Cassandra, which is another Dynamo-like distributed database written in Java. Specifically, when I worked with Cassandra, I encountered multiple concurrency bugs that were very difficult to track down and sometimes quite insidious. That's not to fault the writers of Cassandra; it is a complex, multithreaded system that has a lot of moving parts, which is exactly the type of code that led me to write about the difficulty of multithreading. Being written in Erlang, however, Riak should be able to escape some of the reliability issues that arise when dealing with shared, mutable state in Java. I am particularly curious if Riak is more stable than Cassandra, but this information seems hard to come by.

I did stumble across this blog post evaluating the choice of Erlang for Riak five years later. The folks at Basho (company behind Riak) seem to be happy with it, and their rationale for choosing Erlang lines up pretty well with my expectations. Two of the more interesting benefits they describe are that "a 'many-small-heaps' approach to garbage collection would make it easier to build systems not suffering from unpredictable pauses in production" and "the ability to inspect and modify a live system at run-time with almost no planning or cost." The former hits home again with my Cassandra experience, as Java garbage collection on a 16GB heap is not nearly as predictable as one would hope, and the latter is certainly valuable for fast-paced startups and the (in)famous "move fast, break things" philosophy. In the end, I am just happy to see that there are startups out there who have sought alternatives to the concurrent programming hell and built some neat systems as a result of it.

Tuesday, September 17, 2013


One of the core principles of designing good software systems is composability, the idea being that it should be easy to combine and reuse different components. This has all sorts of benefits, but one of the most important is that it becomes possible for programmers to understand a complex codebase as the composition of simple parts and their interactions. Without some degree of composability and modularity, it would not be possible to build reliable systems over the course of years involving hundreds of engineers because the code simply becomes too complex to understand. Fortunately, people have gotten better at doing this, and a key part of making components easy to compose is the concept of idempotence. It is the simple idea that, when possible, functions should be written in such a way that when you call them multiple times in succession with the same arguments the result does not change. For example, putting a value in a set is idempotent while appending to a list is not since in the latter case the list will keep growing with each append.

Here's an example of an interface which is not guaranteed to be idempotent (BadService) and the resulting attempt to deal with two instances of that interface:

Here, the BadServiceManager is responsible for starting and stopping two BadServices. As you can see, it requires some very ugly state-management that only gets worse as you have more and more components to bring together. The natural way that this should happen with idempotence guarantees is as follows:

In terms of local systems, reduced complexity is the primary reason that you would like idempotence. If that is not compelling enough, as soon as we start working with distributed systems the benefits become very clear cut. Consider a remote procedure call (RPC) made by machine A to machine B. Machine A receives a network timeout while making the call and is now in a predicament; did machine B execute the call or not? This is a very difficult question to answer generically, but idempotence can help. In the case of a network timeout, machine A simply retries the RPC until it receives a successful response from machine B because idempotence guarantees that any repeated calls will not affect the state. This greatly simplifies the job of the RPC client and makes it very natural for distributed components to communicate with each other robustly.

Understanding idempotence is an effective way to improve the composability and reliability of code. Unfortunately, it is not always easy to make functions idempotent and the restriction can make interfaces less clean. The benefits, however, manifest themselves very clearly when you do not need to deal with the unpredictability and complexities of calling non-idempotent interfaces, especially in a distributed setting.

Sunday, September 8, 2013

Actors and Error Handling

In addition to the basic model of actors, Erlang has another core language construct that is intended to help programmers build reliable systems. It has built-in support for various forms of error handling in the context of Erlang processes. For example, a single process will often rely on the assumption that various other processes that it depends on are available and will stop functioning properly if any of those processes fail. In many other languages, because the concept of disjoint processes is not fundamental, handling such failures is generally ad-hoc and often ignored by programmers until something unexpected happens. But in the actor model and especially when blurring the line between local and remote processes, it is necessary to treat external process failures as an expected outcome that can ideally be recovered from. As such, Erlang provides three primitives with which to build robust systems of processes: links, traps, and monitors.

Links are a way of specifying a hard dependency between two processes: if one of them dies, the other cannot function and thus should die as well. They can be used to set up groups of processes which correspond to some meaningful entity in the system that requires each of the individual processes to be running properly. When processes are linked and one dies, it sends a specific 'EXIT' message to the other that isn't received or caught by the normal messaging and exception handling constructs (instead killing the receiving process) unless some specific measures are taken. This leads us to the concept of traps, which are a method by which you can explicitly handle an exit message from a linked process. If two processes are linked without either of them trapping the exit messages, they will die together. But if, for example, you have a process which is dedicated to monitoring a group of other processes, this process can decide to receive exit messages and act accordingly when they are received, e.g. restarting the entire group of processes. Links and traps allow programmers to specify entities in a system that continue operate after failures.

The final piece of Erlang error handling is the ability to monitor other processes. This is similar to links except that monitors are unidirectional and stackable, meaning that you can compose them more easily and that they work better with libraries. Moreover, you do not need to trap the 'DOWN' messages that the monitored process sends; they are received as normal messages since there is no implicit process kill involved (unidirectional means there is no tight coupling between the processes). Monitors are a more flexible way for processes to communicate their errors to each other without linking their fates together. Between these three simple primitives for error handling, it is possible to construct systems that respond well to failure and can self-recover, leading to claims such as Joe Armstrong's (the creator of Erlang) nine 9's of reliability in the Ericsson AXD301.

As with the actor model, Scala's Akka library provides some similar functionality to Erlang's error handling, which is the concept of supervision. In some sense, Akka supervision is a stricter version of links and monitors, where every actor must be supervised by some other actor, its parent, so that the set of all actors forms a hierarchy (the root actor is provided by the library). The supervising actor can employ different strategies for monitoring the lifecycle of each of its child actors, which often involves restarting failed actors. While not quite as flexible as Erlang error handling, the way Akka enforces the actor hierarchy leads programmers to naturally structure their systems as a hierarchical tree of components that can either function independently of failures in other parts of the hierarchy or propagate/handle errors as necessary. Building concurrent and distributed systems requires serious consideration of errors and failures, but it's often the case that there are no good tools for doing so. Erlang's ideas and Akka's adaptation of them are examples of how this problem can be tackled at a first-class level.

Sunday, September 1, 2013

Clojure Agents

Continuing the exploration of Clojure's concurrency utilities leads us to agents, a built-in method of managing independent, asynchronous change to a single value. We can think of this as a parallel to a single actor that controls some private state which can only be accessed by sending messages to the actor. Agents are a little bit different in that they invert the control over how the state is modified: instead of an actor receiving a predefined set of messages which have different effects on the state, agents allow the sender to specify how the state is mutated. The agent itself is only responsible for serializing the mutations and allows the state to be read by anyone. Controlling access to state through agents is preferable to software transactional memory (STM) when asynchronous updates are acceptable, as you can avoid the performance concerns of STM. In this post, I will walk through an example of using agents for tracking metrics for some samples.

The first line shows how to initialize an agent. In our case, it is a map with three values: the sum of all samples, the sum of the squares of all samples, and the count of the number of samples. By updating these three values for each sample, we can compute the resulting mean and variance at any point in time. The two "public" functions here are record-sample and get-metrics, whose purpose should be clear from the names. Because we want recording metrics to be thread-safe, we process all updates through the agent by using the send function in this line: (send global-metrics update-metrics value). Send tells the agent to process this update at some point in the future, and the update-metrics function takes the sample value and updates the three values in the map. Then to read the metrics in get-metrics, we first call (await global-metrics) to ensure that all actions sent from the current thread finish in order to prevent race conditions of not reading a sample we just recorded (since agents process asynchronously). Finally, we read the current value of global-metrics and return the computed mean and variance.

The final two lines are examples of using these functions; we, in parallel, record samples from 0 to 10000 and then get the mean and variance. Running this, we see the expected result: {:mean 9999/2, :variance 33333333/4}. To understand agents a bit more, I included the commented-out line 4 to demonstrate why the await call is necessary. If we sleep on that line, we still obtain the correct result but it takes 10 seconds. If we sleep on that line and exclude line 18 with the await, we will get something arbitrary like {:mean 221/15, :variance 18404/225} from whatever subset of samples happened to be recorded at that time (the result is nondeterministic). Thus it is essential that we allow the actions from the current thread to finish before attempting to read the metrics. Other than that, there is no real synchronization visible in this code, as it is hidden away by the agent abstraction.

Clojure offers a wide array of methods of dealing with shared, mutable state. Agents are one of the core tools that the language provides for isolating who can update a piece of state so that there is no burden on the programmer to manage that. They also play well with the STM; sending actions to agents within a transaction will do nothing until the transaction is committed, so that each action is executed only a single time regardless of whether the transaction is forced to roll back. In the actor model with Erlang, we were forced by the immutability constraint to have a dedicated actor for managing any shared, mutable state, and Clojure agents are a specialization of that concept designed for simplicity of code. Overall, Clojure seems to have taken a very open-minded approach for addressing the concurrency problem and presents a handful of unique solutions mixed together in a nice way.

Monday, August 26, 2013

Clojure Memoization

One really cool feature of Clojure that I came across is its built-in support for memoization. Memoization is a simple technique where you save the results of function calls so that next time the function is called with the same arguments you return the saved result. It is often useful when applying dynamic programming to problems because you will repeatedly solve the same sub-problems and memoizing the results makes algorithms efficient. Clojure has a built-in memoize function which takes in another function and memoizes calls to it (presumably storing the results in a map). Whereas traditionally you need to create a map yourself and manually check at the beginning of your function whether the result has already been computed, Clojure simplifies it so that you hardly need to change the implementation at all. Here's an example:

The first function, fib, is the standard, recursive implementation of Fibonacci. The second, fib-memo-wrong, is a misapplication of memoize, though it would be great if it actually worked. The problem is that fib-memo-wrong can only memoize the calls to fib that actually go through itself, which means in particular that none of the recursive calls are memoized, defeating the purpose of implementing Fibonacci efficiently. The last function is the correct implementation of fib-memo which is quite similar to fib but includes a memoize step inside, which indicates that all results of the function will be memoized since the recursive calls go through the same path. The key point is that we never had to specify a map or explicitly tell Clojure what arguments/results to memoize, which is very convenient.

Having implemented memoization in C++ countless numbers of times during programming contests, I was pretty excited when I saw this in Clojure. It wasn't quite as easy as I had hoped (fib-memo-wrong would be ideal), but being able to provide this functionality in a clean way within a language is still nice. It is not easy to write this as a utility in other languages because it has to be completely generic over the argument list, so building it into the language might be the only way to go. Bonus points to Clojure for doing so.

To end, here is proof that the implementations work as I described:

user=> (time (fib 40))
"Elapsed time: 6511.060066 msecs"
user=> (time (fib-memo-wrong 40))
"Elapsed time: 6387.369829 msecs"
user=> (time (fib-memo-wrong 40))
"Elapsed time: 0.034988 msecs"
user=> (time (fib-memo 40))
"Elapsed time: 0.464636 msecs"

Saturday, August 24, 2013

STM Implementation

As described last time, software transactional memory (STM) is a concurrent programming model that hides a lot of the complexities of synchronization away from programmers by exposing a concept similar to that of database transactions. By providing strong guarantees about the behavior of memory accesses in transactions, STM allows programmers to frame operations as single-threaded computations and not worry about locking. This necessarily pushes the synchronization details into the system which implements STM, however, and it's not without a cost. I'll briefly cover two important aspects of implementing an STM which can have serious implications on its performance: data versioning and conflict detection.

Why is data versioning necessary? The answer is straightforward: transactions need to know when the memory addresses they are reading and writing are modified in order to ensure isolation. There are generally two ways to manage the versioning within a transaction, given the requirement that you must be able to atomically commit or rollback all of the transaction's operations. The first is eager versioning, which consists of updating the memory in-place (and the corresponding version) and maintaining an undo log in case of rollback. The second is lazy versioning, which consists of buffering all writes done by the transaction and applying them all at the time of commit. There is a key performance trade-off between the two: eager versioning performs better when most transactions successfully commit while lazy versioning wins when many transactions need to rollback due to conflicts.

Which leads to the second point: conflict detection. In order to determine whether a transaction needs to rollback, it is necessary to keep track of the read and write sets (memory addresses) of each transaction. Then there are two choices as to when to detect conflicts, namely at the time of the read/write (pessimistic) or when the transaction is being committed (optimistic). We see the same performance pattern as with versioning: optimistic is superior when few conflicts are expected, while pessimistic performs better when conflicts are common. It's also possible to mix the two, e.g. use optimistic reads and pessimistic writes, which makes sense because reads are generally less likely to produce conflicts. The choice of conflict detection mechanism greatly influences how often locks need to be acquired and the overall performance, as pessimistic detection results in additional work done on every memory access.

Implementing STM efficiently is not an easy task, especially when you take into consideration the fairness and forward progress guarantees that are ideal in such systems. Moreover, memory accesses will almost certainly have a significant amount of overhead associated with them, and it is this fact that leads languages like Clojure to make the distinction between normal variables and STM variables very clear. Programmers should restrict the use of STM to only the cases in which it is absolutely necessary and use immutable variables otherwise. While this may make STM more difficult to use and hinder its acceptance, the benefits may outweigh the costs in the long run. It turns out that the Akka library for Scala also has an STM implementation, which could mean that the concept is catching on, but it still has a long way to go before becoming mainstream. The transactional memory model is very powerful from the programmer's perspective, but understanding the implementation details and performance penalties is important if you ever want to use it extensively.

Sunday, August 18, 2013

Software Transactional Memory in Clojure

Having briefly explored Erlang and the actor model as the basis for a programming with concurrency, I spent some time learning about Clojure, a dialect of Lisp that can run in the JVM and is designed for concurrency. Clojure is also functional in nature and thus emphasizes immutability as the solution to the pitfalls of multithreaded programming. But while Erlang uses actors to coordinate state, Clojure has a minimal interface for interacting with shared, mutable state in a more traditional manner. Instead of controlling access to the state via synchronization primitives like locks, however, it employs software transactional memory (STM) to ensure that different threads can access the same state safely. For reference, this article is an excellent resource on STM in Clojure and additionally touches on the drawbacks of traditional lock-based synchronization as well as the actor model.

STM can be thought of as very similar to database transactions in terms of the guarantees it provides to clients. The standard ACID properties are respected by software transactional memory. They are
  • atomicity: either all changes are committed or all changes are rolled back;
  • consistency: the state at the beginning and end of a transaction, as visible to the transaction, will be consistent;
  • isolation: modifications within a transaction are not visible to other transactions until they have been committed; and
  • durability: once a transaction has been committed, its modifications will not be lost.
These properties greatly simplify the manipulation of shared state, as operations done within a transaction can essentially assume that none of the state will be modified at the same time. Here is an example of Clojure code appending numbers to a list with and without transactions.

To briefly explain the code, the first two lines define two global variables (using the def function), numbers-unsafe and numbers-safe. The former is simply a pointer to an immutable list, while the latter is a Clojure Ref, which is a reference that can be protected by STM. In parallel (pmap = parallel map), the integers from 0 to 99,999 are appended (using conj) to each of the two lists and we print the resulting list's length. The output will look something like this:

unsafe:  88067
safe:  100000

The important line of code is line 8, (dosync (alter numbers-safe conj n))). The dosync keyword indicates the start of a transaction, and alter applies an update function to a Ref within a transaction. Since all appends are done in transactions, they are isolated from each other by Clojure's STM, and thus the result is correct. What is nice about programming like this is that you do not need to be especially conscious of how different threads may be interacting with shared, mutable state. As long as you always remember to access such state through transactions (which should be clear by the fact that they are declared as a Ref), you are protected against unsynchronized accesses.

It is pretty cool that software transactional memory has been implemented in the core of a language, as it is conceptually appealing. Because it seems so easy to use, however, the price of synchronization must be paid somewhere other than by the programmer, and it is in the overhead and performance implications. Next time I'll go into some more detail about how STM might be implemented and the hidden costs of using it.

Sunday, August 11, 2013

Phi Accrual Failure Detector

Failure detectors are an important piece in designing robust distributed systems. Components must be expected to fail, and the rest of the system should either continue functioning properly (ideal) or at the very least degrade gracefully instead of crashing or becoming corrupted. Because of the unreliable nature of communication over networks, however, detecting that a node has failed is a nontrivial task. The phi accrual failure detector is a popular choice for solving this problem, as it provides a good balance of flexibility and adaptability to different network conditions. It is used successfully in several real-world distributed systems, such as Apache Cassandra (see here) and Akka clusters (see here), and also has a Node.js implementation.

There is a formal theory about the consistency and accuracy guarantees of failure detectors which I will quickly outline before explaining phi accrual (to learn more, please see here). A process is said to "suspect" another process if it believes the other process to have failed. A failure detector is strongly complete if "every faulty process is eventually permanently suspected by every non-faulty process," which is to say that once a process fails, at some point all the processes still running will know that fact. Similarly, a failure detector is strongly accurate if "no non-faulty process is suspected after some time." In summary, non-faulty processes should eventually have the "correct" assessment for all other processes regarding whether they have failed or not, which is a pretty sensible guarantee for a failure detector in a distributed system. Naturally, there are two important metrics for determining how effective a failure detector which satisfies these properties is, namely the time it takes to detect a failure and the rate at which it makes mistakes in suspecting processes. These will guide the discussion of why the phi accrual failure detector makes sense.

Firstly, we should understand the concept of accrual failure detectors. In the formal theory, a process only makes binary decisions about other processes: each of them is either suspected of failure or not. But in practice, we can better capture the uncertainty of these judgments by having a continuous value ($\phi(t)$ in this case, a function of time) and a threshold $\Phi$, where we suspect the process if $\phi(t) \ge \Phi$. Because $\Phi$ is a parameter, accrual failure detectors provide flexibility on top of the basic model. Depending on the requirements of the system, different values can be chosen to optimize for quicker detection or reduced false positives. An example provided in the paper demonstrates how this can be useful: consider a job scheduling system with a master and a set of workers, where the master monitors the status of the workers and assigns jobs to them. Instead of having a binary determination of whether a worker is alive or not, consider having two thresholds $\Phi_1 < \Phi_2$. If $\Phi_1$ exceeded, then stop sending new jobs to the worker, and only when $\Phi_2$ is exceeded assume the worker has failed and reassign any pending jobs. Thus the system can minimize both the chance that it reassigns jobs unnecessarily as well as the chance of assigning jobs to a failed worker.

Phi accrual is one implementation of an accrual failure detector. The way it works is quite simple and relies on the processes sending each other heartbeats at a regular interval, e.g. 100 milliseconds. It keeps track of the intervals between heartbeats in a sliding window of time and measures the mean and variance of these samples, building the corresponding normal distribution with cumulative density function $P(x)$. Then define $\phi(t) = -\log_{10}(1 - P(t - t_{last}))$ where $t_{last}$ is the last time at which a heartbeat was received. The value of $\phi$ will increase the longer it has been since the last heartbeat. Furthermore, the algorithm is adaptive to network conditions because of the measurements in the sliding window. If the network becomes slow or unreliable, the resulting mean and variance will increase; as such, there will need to be a longer period for which no heartbeat is received before the process is suspected. The phi accrual model additionally allows for convenient intuition as to the choice of the threshold $\Phi$. Assuming that the sliding window is representative of the real distribution of heartbeat inter-arrival times, a choice of $\Phi = 1$ means that there is about a 10% chance of a false positive, and in general the probability of a false positive is $0.1^{\Phi}$.

One of the hardest parts about building distributed systems is that there is much less "library code" that can be leveraged. Oftentimes, good software engineering is about not reinventing the wheel, and having well-defined, reusable components is essential to scaling a codebase (or multiple). Phi accrual failure detection seems like it could be an important library that distributed systems can plug in without having to worry about the intricacies of the algorithm or the implementation.

Monday, August 5, 2013

Local vs Remote

While reading through the Akka documentation (the actor and remoting library for Scala), I once again came across a reference to the classic distributed systems paper from Sun discussing the difficulty of unifying local and remote programming models. I briefly touched on the content of the paper in a previous post, but it's worth revisiting in the context of concurrent programming and understanding how the actor model works in a distributed system. With Akka actors, like in Erlang, the programming model revolves entirely around message passing and asynchronous processing even for actors running within the same Java Virtual Machine (JVM). This blurs the line between whether you are sending messages to a local actor and a remote actor, but it is arguably done in the "correct" way. So it is interesting to understand what the difference is between the actor model and other attempts at such unification, e.g. remote procedure calls (RPC), which are generally considered to be less than ideal (although still widely used).

Firstly, we need to recap the four points brought up in the paper that supposedly doom a unified local and remote object-oriented model. They are latency, memory access, partial failure, and concurrency. Latency and memory access are fairly straightforward; because the object on which a method is called lives in a different address space on a remote machine, the method call is going to be much slower and memory accesses have to be managed in some meaningful way across multiple machines. Partial failure refers to the fact that there are multiple components that can fail independently of each other (e.g. the network and the remote entity), which can cause the method call to be in an unknown or perhaps even inconsistent state. Lastly, the issue of concurrency arises because there is no shared synchronization system which can mange the order of method calls on an object, so the programmer must be aware of the fact that an object can receive multiple method calls simultaneously. These points can typically be alleviated by a well-designed RPC library and being diligent in distinguishing local and remote invocations, but that is essentially the argument being made, that in practice they are not truly unified.

So then the question is how the actor model is different and why interacting with local and remote actors can be done in a mostly unified way. The problems of latency and concurrency are non-issues in the actor model because of the asynchronous design and the fact that each actor is processing a single message at a time. And assuming that actors have no shared state and communicate exclusively through the immutable messages passed between them, we can avoid confusing memory access semantics as well. Partial failure is, again, the most complex of the issues, and the actor model itself does not prescribe specific guarantees about message processing, such as idempotence, so it is still left to the programmer to figure out how to keep the system consistent. Regardless, we can see that actors, by design, are better suited for a unified local and remote model since they greatly reduce the space of potential problems to worry about. The corollary, however, is that local operations are more complex due to the way in which communicating between actors and sharing state is restricted, but I am not entirely convinced that this is a bad thing in light of the difficulty of multithreaded programming.

The more I learn about actors, the more optimistic I become about their potential to make concurrent programming less of a disaster. The Akka project is really interesting and gives Scala one more feature that is very much lacking in Java; the mere possibility that synchronized blocks and semaphores can be avoided altogether is enough to motivate me to pursue understanding Akka. Given that the future only holds more concurrent and distributed programming, I feel like programmers should be much better educated about the alternatives to traditional, error-prone multithreading.

Sunday, July 28, 2013

Erlang Executor

Last time, I wrote a crude blocking queue in Erlang to better understand the actor model and the functional paradigm for managing state. This time, I will be building on the blocking queue to implement a simple executor service that emulates the Java concept using actors. To be clear, the goal of the exercise is not to understand how multithreading concepts from Java can be applied in Erlang, which probably goes against the entire point of using the actor model, but rather to develop a sense for how the familiar Java abstractions would manifest themselves in Erlang. That way, when confronted with a a problem for which the natural solution would be a blocking queue or an executor service, it is easier to see how the solution can be framed in terms of actors. Without further ado, the code:

Again, the public interface consists of just three simple functions: new(), submit(), and get(). These functions create a new executor with a specified number of threads, submit a task, and get the result of a task, respectively. When the executor is created, we initialize a blocking queue and spawn a number of processes corresponding to how many "threads" the client wants, each of which runs the worker() function. As in Java, worker() repeatedly removes tasks off the queue and runs them.

The interesting part of this is how the concept of futures is implemented. A future is a proxy to the result of a task that is submitted to an executor, and its primary feature is the ability to block until the result is available, which is the purpose of the get() function. In Java, this would typically implemented using a semaphore, but here the future is another Erlang process that is spawned when the task is submitted. This process runs the wait() function, which has two phases. First, it blocks until the result of the task is received (the "done" message is sent by the worker that executes the task), and once it has the result it will respond to an arbitrary number of "get" messages that are sent by the get() function, sending back the result. So while the result is not available, calls to get() will block on receiving the "done" message, giving us the equivalent of futures. And that is pretty much all there is to implementing the executor!

The relationship between shared, mutable state and Erlang actors is becoming quite clear from these examples. The restrictions of Erlang imply that, any time you want to deal with shared, mutable state, you must spawn a process which is responsible for managing that state. Anyone who wants to read or modify that state can only do so through messages sent to that process, naturally eliminating any possibility of race conditions and unsynchronized access to data. These are useful guarantees to have as a developer, so the benefits of the actor model and functional programming are clear. With that said, it has definitely been trickier for me to design these programs, having used imperative programming techniques for so many years. But after being plagued by synchronization issues, I'm open to the possibility that trading off the intuitive nature of imperative programming and threads is worthwhile for the superior concurrency model.

Thursday, July 25, 2013

Erlang Actors

Following my post on how multithreading is hard, I spent some time learning about the alternatives, one of the most promising of which is the actor model. I have read a lot about how Erlang is a good language for building concurrent systems like message queues (RabbitMQ is the prime example), and actors are a core part of Erlang, so I decided to check it out more extensively (I found this tutorial to be useful). Erlang is functional, so you have to think carefully about how to manage "state" if you are coming from a traditional imperative programming background like I am. To guide my learning, I wrote a very primitive blocking queue in Erlang that is bounded and supports only blocking puts and takes. As it turns out, this looks very different from how you might write one in Java using synchronization primitives like locks and condition variables. It's probably easiest to start out with the code in order to guide the discussion:

To start off, we have the new() function which returns a new "instance" of the blocking queue. As it turns out, the function actually just returns an Erlang process ID, which refers to the process that is created by the spawn() call (I like to think of it as a "listener"). This newly-created process is running the listen() function, which we will get to a bit later, but the important thing to understand is that the process is the only place where the "state" of the queue is managed. No state is returned to the caller of new() because everything is immutable in Erlang, so it does not make sense for different actors to actually be carrying around the "state" of the queue. You may already notice that functional programming and the actor model make you think quite differently.

So now we have the setup: whoever creates the queue has a reference to the process ID of the queue's listener, and meanwhile the listener is running as its own process. Next are the put() and take() functions, which follow the same pattern. They each send a message to the listener (the exclamation mark syntax) and then wait for a response (the receive ... end syntax). At a high level, that's all there is to the actor model; processes are all running independently of each other except for the fact that they can send message to and receive messages from other processes. Using receive gives us the blocking semantics because the listener will not send the reply message until it has successfully put the new value or taken a value off the queue. Thus we have built-in synchronization without having to mess around with low-level synchronization primitives.

Finally, we get to the meat of the program, the listen() function. The first thing it does is decide what it should listen for, and this is based on what is currently in the queue. Depending on whether the queue is empty, full, or neither, we can receive puts, takes, or both, respectively. If the queue is empty, for example, the receive block only recognizes puts, so the process will ignore the take messages until it gets into a state in which it is able to process them (causing all callers of take() to wait in their respective receive blocks). Lastly, you'll notice that both handle_put() and handle_take() end with a recursive call to listen(). This is, again, because Erlang is functional and everything is immutable. We are not able to use something like a while loop around the receives because that would require that the queue is modified in each iteration, so instead we just call listen() again with the new state of the queue.

So there it is, a simple blocking queue implemented in a functional style with actors. It required a lot more thinking than I expected to come up with a solution using these unfamiliar tools, but in the end I started realizing their benefits. By taking shared, mutable state out of the picture you can avoid worrying about most of the issues that plague multithreaded programming. Questions that are still open in my mind include whether the actor model is rich enough to capture the complexity of real applications and what the performance implications are of writing code in this way. It has definitely been worthwhile to explore actors through Erlang, though, and I definitely plan on checking out Akka, which brings actors to Scala.

Thursday, July 18, 2013

Garbage Collection and Scope

Garbage collection (GC) is one of those programming language features that you are supposed to not have to think about, but anyone who has tried to optimize the performance of Java applications will know that managing memory is often the most effective way of speeding things up. The less time the garbage collector has to spend running, the more time your program can have to run. I recently came across an aspect of code design that turns out to have a significant impact on memory performance. Consider the following program:

It is a toy program, but it is designed the emulate the following sequence of events (both the handle1 and handle2 methods):
  • you receive a raw stream of bytes from somewhere, e.g. the network;
  • you deserialize the bytes into something meaningful, e.g. a web request; and
  • you do some processing with the deserialized data, e.g. handling a web request by doing some database operations and returning a response.
Here we are getting the bytes by just allocating a 256MB array and then "deserializing" the bytes into integers. And lastly, the "processing" consists of just repeatedly allocating 1MB arrays until we hit an error (in particular, we are expecting an OutOfMemoryError). The idea is that the handleInts method figures out roughly how much free memory the JVM has at the point at which it is called (forcing GC as necessary) in order to evaluate the relative efficiency of the two versions of code.

It turns out that handle2 is about 256MB more efficient than handle1 as measured by the number of allocations done in handleInts, despite the fact that the two methods are functionally identical. The reason is scope, although not quite in the traditional programming language sense. We have two representations of the data in the code, one as an array of bytes and the other as an array of integers, each of which is about 256MB in size. Conceptually, once we have deserialized the bytes as integers, we can discard the array of bytes, i.e. allow it to be garbage collected. And that is exactly what happens in handle2; the scope of the array of bytes is limited to the time during which it is being deserialized. In handle1, however, we are passing the array of bytes as a method parameter to the method which eventually ends up calling handleInts. That means, in particular, that a reference to the array is living on the call stack, which is a GC root, so it cannot be garbage collected while the method is running.

In my mind, this example raises some interesting questions about how code should be designed and the impact of different design patterns on GC performance. Specifically, we know that having a very deep call stacks, especially when methods do transformations on the data, can lead to larger retained sets of objects (those that cannot be collected) than intended. In an ideal world, a programmer using a language with built-in memory management should not have to be concerned with this, though, so it would be even better if the compiler or JVM could identify this type of situation and make the correct optimization.