Analytics

Sunday, October 20, 2013

Kafka

Apache Kafka is a distributed messaging system developed by LinkedIn. It functions similarly to a message queue, but is more specialized for the simultaneous use-cases of real-time and batch log processing. In particular, Kafka emphasizes throughput, scalability, and consumer flexibility over supporting some of the more traditional message queue features, e.g. the Java Message Service (JMS) standard. This paper describes the architecture of Kafka and how it is used at LinkedIn. We'll take a look at a handful of design and implementation decisions in Kafka that enable it to be very efficient and flexible.

At a high level, Kafka follows a messaging model similar to that of other message queues. Producers send messages to "topics," which are essentially namespaces for the messages to live in, and consumers subscribe to the topics that they wish to receive messages from. The first set of implementation details that enable Kafka to be efficient come from within a single server (referred to as a "broker"). Each broker stores some set of partitions for a topic, so that the messages can be distributed among multiple brokers (we'll see how the partitions play into the consumer side later). Each partition can be thought of as a set of messages appended to each other and broken up into files of reasonable size (the paper uses 1GB as the example). Like in many other systems, inserts in Kafka are implemented as appending the message to the end of a file, which is efficient using commodity hardware. The authors, however, choose not to do any in-memory caching of the messages from disk. The reasons are twofold: firstly, the standard OS page cache is quite effective for the access patterns being used (i.e. write-through and read-ahead policies), and secondly, they leverage the Linux sendfile API to send data directly from the file to the socket, avoiding the overhead of having the data live in user space. An interesting effect of this is that the Kafka process itself does not manage a lot of memory, so it is less prone to garbage collection problems and restarts since most of the memory is handled by the OS. These optimizations enable Kafka to have excellent performance on a single broker.

Kafka does interesting things on the consumer side as well. In traditional message queues, the server is typically in charge of keeping track of which messages have been sent to which consumers based on the acknowledgments it has received. Kafka inverts this model and lets consumers request messages from whatever position they wish to. There are a few issues that this can run into. Brokers can no longer clean up messages that they know have been consumed because consumers can always request older ones. This is solved by Kafka having a retention model of a fixed period of time, e.g. 7 days; because of how the partitions are implemented within a single broker, performance does not degrade as partitions grow (unlike with many message queue implementations). This also enables the different patterns of consumption that Kafka was built to handle. Real-time services will consume messages as soon as they appear, while data warehousing and analytics services can, for example, batch consume once per day. The other effect of brokers not controlling message delivery is that consumers who wish to distribute the messages of a topic among themselves (so that each message is consumed once) must coordinate in some way. Kafka achieves this by using the existing highly-available service ZooKeeper. Both consumers and brokers must talk to ZooKeeper in order to coordinate which consumers are responsible for which partitions of a topic and what offset each partition is currently at. So the logic for consumption progress is external to the implementation of a single partition, which allows that to be very simple and efficient as described above.

Messaging systems often form the core of a distributed service, so reliability, scalability, and throughput are of the utmost importance. Having had some less-than-ideal experiences with existing message queues, it is refreshing to see a new approach in Kafka. While it does not satisfy the most stringent durability requirements, Kafka makes up for it with excellent performance and the ability to distribute effectively. It is also an example of how we often do not need to build our own locking/consensus services anymore because ZooKeeper provides a flexible API and strong guarantees on both consistency and availability. I anticipate that future messaging systems will make many of the same decisions that Kafka made in order to satisfy growing data volumes and the fundamentally distributed nature of systems.

Sunday, October 13, 2013

Google Go (Part 3, Types)

In the previous two posts, I introduced Google's Go programming language and the model of concurrent programming that it adopts. We've seen that Google has made a number of interesting choices in the design of the language, and their approach to types is yet another. There are two key parts that define how types work in Go: "static duck typing" and the absence of inheritance. The former is not too unusual, as most dynamic languages have duck typing and Scala, for example, has structural typing. Completely throwing away the concept of inheritance, however, is a bold statement about how object-oriented programming should be done. Let's take a look at each of these in a little more detail.

Duck typing is prevalent nowadays, with dynamic languages like JavaScript and Python being so popular. To some people, however, it might be comforting to have the guarantee that when your code runs it won't suddenly crash due to a method not existing when you expected it to. That's the guarantee of statically typed language, but the concept is often associated with more "burdensome" programming languages such as C++ and Java. Go is a statically typed language, but it does so without an explicit type hierarchy; as long as an object presents the methods in an interface, it can be considered an implementation of the interface (hence duck typing). For example:


Here we have two interfaces, UnaryFunction and UnaryInvertible, which each have a single method. Then we have two classes/structs PlusOne and MinusOne which implement both interfaces (not explicitly specified). Note that we are able to pass around instances of PlusOne and MinusOne as either UnaryFunction or UnaryInvertible objects because the Go compiler is able to determine that they satisfy the signatures of the interfaces. So while it feels like we are doing dynamic typing without any guarantee that methods exist, the compiler is simply inferring the type relationships that we intended. This is cool because it lets programmers design interfaces around the true functionality without having to worry about how to deal with existing code or polluting the codebase with "implements" statements (and is still type safe!). For details as to how Go implements method dispatching efficiently, refer to this blog post.

Now on to the bigger question: why did the designers of Go decide to get rid of inheritance altogether? It is well-known that programmers often use inheritance when they should instead be using composition. In fact, the idea of using composition over inheritance is important enough to have a Wikipedia article of its own. I found an excellent summary of when to choose composition versus inheritance in this Stack Overflow post, which essentially says that inheritance should only be used when you want the complete public interface of the superclass. The Liskov substitution principle is a formal specification of this rule of thumb and makes the requirements for inheritance very strict: "in a computer program, if S is a subtype of T, then objects of type T may be replaced with objects of type S (i.e., objects of type S may be substituted for objects of type T) without altering any of the desirable properties of that program (correctness, task performed, etc.)." In theory, this is quite sensible: inheritance is supposed to specify an "is-a" relationship, so an instance of the subclass should be able to be treated as an instance of the superclass in all cases.

The designers of Go believe that forcing programmers to use composition instead of inheritance in all cases results in code that is less brittle and adapts better to new requirements. Specifically, "the [type] hierarchy must be designed early, often as the first step of designing the program, and early decisions can be difficult to change once the program is written. As a consequence, the model encourages early overdesign as the programmer tries to predict every possible use the software might require, adding layers of type and abstraction just in case. This is upside down. The way pieces of a system interact should adapt as it grows, not be fixed at the dawn of time." I understand the problems that they are addressing having dealt with type hierarchies that grow and grow until they become impossible to understand. The idea of decoupling independent pieces of functionality that come together to produce a class fits the mantra of modularity being the key to building scalable programs, and perhaps Go taking such an extreme stance is a message to us saying that we're not programming in the correct way.

This wraps up my series of posts on Go. I think Google has done a great job of identifying a number of important areas of programming that could use some innovation. Go puts forth some options for how we can improve things, but they are certainly not the only ones available. I am very keen on learning about whether people have reaped the expected benefits of Go's design decisions and which ones are the most effective. In the future, I am sure we will see new and existing languages try to incorporate some of the ideas behind Go that seem strange now but may turn out to be essential.

Thursday, October 10, 2013

Google Go (Part 2, Concurrency)

Last time, I wrote about some of the interesting and unique design decisions regarding dependencies and imports in Google's Go programming language. This time, we're back to my favorite topic: concurrency. Because essentially all machines have multiple cores/processors now, every new language has to address the problem of concurrent programming in some way and Go is no exception. Recall that the guiding principle behind the design of Go is its focus on optimizing the process of building large software systems, which includes things like developer productivity and tool support. The result of this is that Go adopts a communicating sequential processes (CSP) approach to concurrency, which "has the property that it is easy to add to a procedural programming model without profound changes to that model." This is a little disappointing when you consider the options out there, but given Google's preference for familiarity and the fact that they want to be building real systems quickly using Go, the choice is not too surprising. That said, it is still worthwhile to explore how it integrates into the language and what its strengths and weaknesses are.

Concurrency in Go is built around two concepts: goroutines and channels (note that the code examples below are roughly taken from these links). A goroutine is a fairly simple concept: it simply allows you to spawn a function that runs concurrently. For example, the statement go list.Sort() spawns a goroutine that sorts the list. The nice thing about this is that Go abstracts away the details of creating and managing threads while still efficiently utilizing the CPU. It is able to identify when a goroutine is blocking on I/O, for example, and ensure that the OS thread that is executing that goroutine is freed up to execute a different one. A world in which functions run concurrently and independently is great, but the interesting stuff comes when they need to communicate, which leads us to channels. It turns out that they are not very complex either; a channel is more or less just a blocking queue that multiple goroutines can take from and put into. It allows them to synchronize at certain points and send messages to each other. For example, if you wanted to wait for a goroutine to finish (e.g. the equivalent of joining a thread in Java), you would do something like this:


In the above code snippet, the c <- 1 line sends the value 1 (arbitrary) to the channel, while the <-c line blocks on receiving a value from the channel (which is ignored). By default channels have no buffer (imagine a blocking queue of size 0) so sends and receives wait for each other, but they can be given a capacity as we will see next.

Let's look at a slightly more involved example with a channel being used as a semaphore. In the code below we simulate a system that is processing requests from a queue; we would like to process requests in parallel, but limit the maximum number of simultaneous requests being processed (i.e. a typical semaphore use case).


The number of elements in the channel indicates how many permits the semaphore has available, so getting a permit is receiving from the channel and releasing a permit is sending to the channel. We spawn a goroutine for each request in the queue (which is itself a channel) that processes the request and releases a permit.

The more interesting thing about this example, however, is that it contains an insidious bug. You might expect the closure that the goroutine executes to capture the current request that is being dispatched, but unfortunately it doesn't. In this case, the language closes over the reference to req rather than the value at the time of dispatch, which means that you get undefined behavior over which requests are actually processed. Each goroutine will process whatever request the req variable points to when it executes instead of when it was created. It is unfortunate that such a common pattern has a pitfall like this, and the pain is evidenced by numerous mentions of the problem (e.g. here and here) that suggest rather ugly solutions like defining another variable in the loop body or passing the value explicitly as a parameter to the goroutine.

As far as simple and familiar models of concurrent programming go, CSP based on goroutines and channels is not bad. The danger really lies in the fact that Go has a memory model similar to that of C++ (i.e. pointers and references) which can lead to bad code with the use of channels. The article explains it as, "Go enables simple, safe concurrent programming but does not forbid bad programming." This feels accurate to me and may be a practical choice for the language, but given our understanding of how easy it is to "program badly," one might wish that Go took a more progressive approach in addressing the problems around concurrency.

Sunday, October 6, 2013

Google Go (Part 1)

I recently started learning more about Google's Go programming language, which has been on the radar for several years now and seems to be gaining more traction. I found this article to be a very insightful piece about the key design decisions and goals behind the language. At the philosophical level, Go is less about developing an "ideal" way to program like Clojure or Scala and more about being the most practical language for large-scale systems engineering. It does not attempt to revolutionize the programming model but rather optimizes for the software engineering cycle and how systems are built in the real world. To quote the article, "The goals of the Go project were to eliminate the slowness and clumsiness of software development at Google, and thereby to make the process more productive and scalable. The language was designed by and for people who write—and read and debug and maintain—large software systems." Coming from the company that builds the most large software systems in the world, that shouldn't be all too surprising, and the problems that Go tries to address are well worth understanding.

One of the focal points in the design of Go is dependencies, which might seem like a strange thing to get worked up over, but turns out to have a big effect on compilation time, especially if you're coming from the C++ world (which much of Google is). I won't go into too much detail, but this section in the article details the problems that Google experienced while building C++ binaries that caused header files to be read by the compiler tens of thousands of times. Most modern programming languages have already found ways to be more efficient when importing dependencies, but it's interesting to note that Go was observed to be around fifty times better than C++ in this area while having a similar import model. The more unique design choice that they made regarding imports was to require the package name when referencing anything from an external package. For example, here's a quick JSON parsing example:

Three packages are imported at the top: json, fmt, and strings. Note that we do not import a specific class, but rather an entire package. Any references to classes or functions from that package require you to scope it with the package name, e.g. json.NewDecoder or strings.NewReader. I am a big fan of this "limitation" because it avoids name collisions and makes it clear which dependencies are being used where. This is particularly important in Go because the compiler is very strict about which dependencies are imported; an unused dependency is actually a compile error in the language (another thing I am a big fan of). I believe that Go's somewhat unorthodox choices on how to handle dependencies make the code more understandable by both humans and computers.

To wrap up the discussion of imports, let's look at another unusual choice regarding naming, specifically capitalization. While most languages have no requirements and rely on conventions for capitalization (Erlang being a notable exception), Go decides to use capitalization as a first-class differentiator for scoping. That is, uppercase names (e.g. NewDecoder and NewReader in the above example) are visible to clients of the package while lowercase ones are not. While this takes some getting used to, the overall benefit to readability seems clear; instead of scattering public and private like in Java or re-declaring public signatures like in C++, programmers simply need to pay attention to the case of a class or function to determine its scope. To quote the article again, "The name is, after all, what clients of the package use; putting the visibility in the name rather than its type means that it's always clear when looking at an identifier whether it is part of the public API. After using Go for a while, it feels burdensome when going back to other languages that require looking up the declaration to discover this information."

Within just dependencies and naming, Go already shows many signs of deviating from the norm in order to achieve its singular goal: programmer productivity when building large-scale systems. In the future, I'll explore additional unique aspects of Go, including some of the more fundamental language semantics like inheritance (or, rather, the lack thereof), concurrency, and garbage collection.