One really cool feature of Clojure that I came across is its built-in support for memoization. Memoization is a simple technique where you save the results of function calls so that next time the function is called with the same arguments you return the saved result. It is often useful when applying dynamic programming to problems because you will repeatedly solve the same sub-problems and memoizing the results makes algorithms efficient. Clojure has a built-in memoize function which takes in another function and memoizes calls to it (presumably storing the results in a map). Whereas traditionally you need to create a map yourself and manually check at the beginning of your function whether the result has already been computed, Clojure simplifies it so that you hardly need to change the implementation at all. Here's an example:
The first function, fib, is the standard, recursive implementation of Fibonacci. The second, fib-memo-wrong, is a misapplication of memoize, though it would be great if it actually worked. The problem is that fib-memo-wrong can only memoize the calls to fib that actually go through itself, which means in particular that none of the recursive calls are memoized, defeating the purpose of implementing Fibonacci efficiently. The last function is the correct implementation of fib-memo which is quite similar to fib but includes a memoize step inside, which indicates that all results of the function will be memoized since the recursive calls go through the same path. The key point is that we never had to specify a map or explicitly tell Clojure what arguments/results to memoize, which is very convenient.
Having implemented memoization in C++ countless numbers of times during programming contests, I was pretty excited when I saw this in Clojure. It wasn't quite as easy as I had hoped (fib-memo-wrong would be ideal), but being able to provide this functionality in a clean way within a language is still nice. It is not easy to write this as a utility in other languages because it has to be completely generic over the argument list, so building it into the language might be the only way to go. Bonus points to Clojure for doing so.
To end, here is proof that the implementations work as I described:
user=> (time (fib 40))
"Elapsed time: 6511.060066 msecs"
165580141
user=> (time (fib-memo-wrong 40))
"Elapsed time: 6387.369829 msecs"
165580141
user=> (time (fib-memo-wrong 40))
"Elapsed time: 0.034988 msecs"
165580141
user=> (time (fib-memo 40))
"Elapsed time: 0.464636 msecs"
165580141
Analytics
Monday, August 26, 2013
Saturday, August 24, 2013
STM Implementation
As described last time, software transactional memory (STM) is a concurrent programming model that hides a lot of the complexities of synchronization away from programmers by exposing a concept similar to that of database transactions. By providing strong guarantees about the behavior of memory accesses in transactions, STM allows programmers to frame operations as single-threaded computations and not worry about locking. This necessarily pushes the synchronization details into the system which implements STM, however, and it's not without a cost. I'll briefly cover two important aspects of implementing an STM which can have serious implications on its performance: data versioning and conflict detection.
Why is data versioning necessary? The answer is straightforward: transactions need to know when the memory addresses they are reading and writing are modified in order to ensure isolation. There are generally two ways to manage the versioning within a transaction, given the requirement that you must be able to atomically commit or rollback all of the transaction's operations. The first is eager versioning, which consists of updating the memory in-place (and the corresponding version) and maintaining an undo log in case of rollback. The second is lazy versioning, which consists of buffering all writes done by the transaction and applying them all at the time of commit. There is a key performance trade-off between the two: eager versioning performs better when most transactions successfully commit while lazy versioning wins when many transactions need to rollback due to conflicts.
Which leads to the second point: conflict detection. In order to determine whether a transaction needs to rollback, it is necessary to keep track of the read and write sets (memory addresses) of each transaction. Then there are two choices as to when to detect conflicts, namely at the time of the read/write (pessimistic) or when the transaction is being committed (optimistic). We see the same performance pattern as with versioning: optimistic is superior when few conflicts are expected, while pessimistic performs better when conflicts are common. It's also possible to mix the two, e.g. use optimistic reads and pessimistic writes, which makes sense because reads are generally less likely to produce conflicts. The choice of conflict detection mechanism greatly influences how often locks need to be acquired and the overall performance, as pessimistic detection results in additional work done on every memory access.
Implementing STM efficiently is not an easy task, especially when you take into consideration the fairness and forward progress guarantees that are ideal in such systems. Moreover, memory accesses will almost certainly have a significant amount of overhead associated with them, and it is this fact that leads languages like Clojure to make the distinction between normal variables and STM variables very clear. Programmers should restrict the use of STM to only the cases in which it is absolutely necessary and use immutable variables otherwise. While this may make STM more difficult to use and hinder its acceptance, the benefits may outweigh the costs in the long run. It turns out that the Akka library for Scala also has an STM implementation, which could mean that the concept is catching on, but it still has a long way to go before becoming mainstream. The transactional memory model is very powerful from the programmer's perspective, but understanding the implementation details and performance penalties is important if you ever want to use it extensively.
Why is data versioning necessary? The answer is straightforward: transactions need to know when the memory addresses they are reading and writing are modified in order to ensure isolation. There are generally two ways to manage the versioning within a transaction, given the requirement that you must be able to atomically commit or rollback all of the transaction's operations. The first is eager versioning, which consists of updating the memory in-place (and the corresponding version) and maintaining an undo log in case of rollback. The second is lazy versioning, which consists of buffering all writes done by the transaction and applying them all at the time of commit. There is a key performance trade-off between the two: eager versioning performs better when most transactions successfully commit while lazy versioning wins when many transactions need to rollback due to conflicts.
Which leads to the second point: conflict detection. In order to determine whether a transaction needs to rollback, it is necessary to keep track of the read and write sets (memory addresses) of each transaction. Then there are two choices as to when to detect conflicts, namely at the time of the read/write (pessimistic) or when the transaction is being committed (optimistic). We see the same performance pattern as with versioning: optimistic is superior when few conflicts are expected, while pessimistic performs better when conflicts are common. It's also possible to mix the two, e.g. use optimistic reads and pessimistic writes, which makes sense because reads are generally less likely to produce conflicts. The choice of conflict detection mechanism greatly influences how often locks need to be acquired and the overall performance, as pessimistic detection results in additional work done on every memory access.
Implementing STM efficiently is not an easy task, especially when you take into consideration the fairness and forward progress guarantees that are ideal in such systems. Moreover, memory accesses will almost certainly have a significant amount of overhead associated with them, and it is this fact that leads languages like Clojure to make the distinction between normal variables and STM variables very clear. Programmers should restrict the use of STM to only the cases in which it is absolutely necessary and use immutable variables otherwise. While this may make STM more difficult to use and hinder its acceptance, the benefits may outweigh the costs in the long run. It turns out that the Akka library for Scala also has an STM implementation, which could mean that the concept is catching on, but it still has a long way to go before becoming mainstream. The transactional memory model is very powerful from the programmer's perspective, but understanding the implementation details and performance penalties is important if you ever want to use it extensively.
Sunday, August 18, 2013
Software Transactional Memory in Clojure
Having briefly explored Erlang and the actor model as the basis for a programming with concurrency, I spent some time learning about Clojure, a dialect of Lisp that can run in the JVM and is designed for concurrency. Clojure is also functional in nature and thus emphasizes immutability as the solution to the pitfalls of multithreaded programming. But while Erlang uses actors to coordinate state, Clojure has a minimal interface for interacting with shared, mutable state in a more traditional manner. Instead of controlling access to the state via synchronization primitives like locks, however, it employs software transactional memory (STM) to ensure that different threads can access the same state safely. For reference, this article is an excellent resource on STM in Clojure and additionally touches on the drawbacks of traditional lock-based synchronization as well as the actor model.
STM can be thought of as very similar to database transactions in terms of the guarantees it provides to clients. The standard ACID properties are respected by software transactional memory. They are
The important line of code is line 8, (dosync (alter numbers-safe conj n))). The dosync keyword indicates the start of a transaction, and alter applies an update function to a Ref within a transaction. Since all appends are done in transactions, they are isolated from each other by Clojure's STM, and thus the result is correct. What is nice about programming like this is that you do not need to be especially conscious of how different threads may be interacting with shared, mutable state. As long as you always remember to access such state through transactions (which should be clear by the fact that they are declared as a Ref), you are protected against unsynchronized accesses.
It is pretty cool that software transactional memory has been implemented in the core of a language, as it is conceptually appealing. Because it seems so easy to use, however, the price of synchronization must be paid somewhere other than by the programmer, and it is in the overhead and performance implications. Next time I'll go into some more detail about how STM might be implemented and the hidden costs of using it.
STM can be thought of as very similar to database transactions in terms of the guarantees it provides to clients. The standard ACID properties are respected by software transactional memory. They are
- atomicity: either all changes are committed or all changes are rolled back;
- consistency: the state at the beginning and end of a transaction, as visible to the transaction, will be consistent;
- isolation: modifications within a transaction are not visible to other transactions until they have been committed; and
- durability: once a transaction has been committed, its modifications will not be lost.
These properties greatly simplify the manipulation of shared state, as operations done within a transaction can essentially assume that none of the state will be modified at the same time. Here is an example of Clojure code appending numbers to a list with and without transactions.
To briefly explain the code, the first two lines define two global variables (using the def function), numbers-unsafe and numbers-safe. The former is simply a pointer to an immutable list, while the latter is a Clojure Ref, which is a reference that can be protected by STM. In parallel (pmap = parallel map), the integers from 0 to 99,999 are appended (using conj) to each of the two lists and we print the resulting list's length. The output will look something like this:
unsafe: 88067
safe: 100000
The important line of code is line 8, (dosync (alter numbers-safe conj n))). The dosync keyword indicates the start of a transaction, and alter applies an update function to a Ref within a transaction. Since all appends are done in transactions, they are isolated from each other by Clojure's STM, and thus the result is correct. What is nice about programming like this is that you do not need to be especially conscious of how different threads may be interacting with shared, mutable state. As long as you always remember to access such state through transactions (which should be clear by the fact that they are declared as a Ref), you are protected against unsynchronized accesses.
It is pretty cool that software transactional memory has been implemented in the core of a language, as it is conceptually appealing. Because it seems so easy to use, however, the price of synchronization must be paid somewhere other than by the programmer, and it is in the overhead and performance implications. Next time I'll go into some more detail about how STM might be implemented and the hidden costs of using it.
Sunday, August 11, 2013
Phi Accrual Failure Detector
Failure detectors are an important piece in designing robust distributed systems. Components must be expected to fail, and the rest of the system should either continue functioning properly (ideal) or at the very least degrade gracefully instead of crashing or becoming corrupted. Because of the unreliable nature of communication over networks, however, detecting that a node has failed is a nontrivial task. The phi accrual failure detector is a popular choice for solving this problem, as it provides a good balance of flexibility and adaptability to different network conditions. It is used successfully in several real-world distributed systems, such as Apache Cassandra (see here) and Akka clusters (see here), and also has a Node.js implementation.
There is a formal theory about the consistency and accuracy guarantees of failure detectors which I will quickly outline before explaining phi accrual (to learn more, please see here). A process is said to "suspect" another process if it believes the other process to have failed. A failure detector is strongly complete if "every faulty process is eventually permanently suspected by every non-faulty process," which is to say that once a process fails, at some point all the processes still running will know that fact. Similarly, a failure detector is strongly accurate if "no non-faulty process is suspected after some time." In summary, non-faulty processes should eventually have the "correct" assessment for all other processes regarding whether they have failed or not, which is a pretty sensible guarantee for a failure detector in a distributed system. Naturally, there are two important metrics for determining how effective a failure detector which satisfies these properties is, namely the time it takes to detect a failure and the rate at which it makes mistakes in suspecting processes. These will guide the discussion of why the phi accrual failure detector makes sense.
Firstly, we should understand the concept of accrual failure detectors. In the formal theory, a process only makes binary decisions about other processes: each of them is either suspected of failure or not. But in practice, we can better capture the uncertainty of these judgments by having a continuous value ($\phi(t)$ in this case, a function of time) and a threshold $\Phi$, where we suspect the process if $\phi(t) \ge \Phi$. Because $\Phi$ is a parameter, accrual failure detectors provide flexibility on top of the basic model. Depending on the requirements of the system, different values can be chosen to optimize for quicker detection or reduced false positives. An example provided in the paper demonstrates how this can be useful: consider a job scheduling system with a master and a set of workers, where the master monitors the status of the workers and assigns jobs to them. Instead of having a binary determination of whether a worker is alive or not, consider having two thresholds $\Phi_1 < \Phi_2$. If $\Phi_1$ exceeded, then stop sending new jobs to the worker, and only when $\Phi_2$ is exceeded assume the worker has failed and reassign any pending jobs. Thus the system can minimize both the chance that it reassigns jobs unnecessarily as well as the chance of assigning jobs to a failed worker.
Phi accrual is one implementation of an accrual failure detector. The way it works is quite simple and relies on the processes sending each other heartbeats at a regular interval, e.g. 100 milliseconds. It keeps track of the intervals between heartbeats in a sliding window of time and measures the mean and variance of these samples, building the corresponding normal distribution with cumulative density function $P(x)$. Then define $\phi(t) = -\log_{10}(1 - P(t - t_{last}))$ where $t_{last}$ is the last time at which a heartbeat was received. The value of $\phi$ will increase the longer it has been since the last heartbeat. Furthermore, the algorithm is adaptive to network conditions because of the measurements in the sliding window. If the network becomes slow or unreliable, the resulting mean and variance will increase; as such, there will need to be a longer period for which no heartbeat is received before the process is suspected. The phi accrual model additionally allows for convenient intuition as to the choice of the threshold $\Phi$. Assuming that the sliding window is representative of the real distribution of heartbeat inter-arrival times, a choice of $\Phi = 1$ means that there is about a 10% chance of a false positive, and in general the probability of a false positive is $0.1^{\Phi}$.
One of the hardest parts about building distributed systems is that there is much less "library code" that can be leveraged. Oftentimes, good software engineering is about not reinventing the wheel, and having well-defined, reusable components is essential to scaling a codebase (or multiple). Phi accrual failure detection seems like it could be an important library that distributed systems can plug in without having to worry about the intricacies of the algorithm or the implementation.
There is a formal theory about the consistency and accuracy guarantees of failure detectors which I will quickly outline before explaining phi accrual (to learn more, please see here). A process is said to "suspect" another process if it believes the other process to have failed. A failure detector is strongly complete if "every faulty process is eventually permanently suspected by every non-faulty process," which is to say that once a process fails, at some point all the processes still running will know that fact. Similarly, a failure detector is strongly accurate if "no non-faulty process is suspected after some time." In summary, non-faulty processes should eventually have the "correct" assessment for all other processes regarding whether they have failed or not, which is a pretty sensible guarantee for a failure detector in a distributed system. Naturally, there are two important metrics for determining how effective a failure detector which satisfies these properties is, namely the time it takes to detect a failure and the rate at which it makes mistakes in suspecting processes. These will guide the discussion of why the phi accrual failure detector makes sense.
Firstly, we should understand the concept of accrual failure detectors. In the formal theory, a process only makes binary decisions about other processes: each of them is either suspected of failure or not. But in practice, we can better capture the uncertainty of these judgments by having a continuous value ($\phi(t)$ in this case, a function of time) and a threshold $\Phi$, where we suspect the process if $\phi(t) \ge \Phi$. Because $\Phi$ is a parameter, accrual failure detectors provide flexibility on top of the basic model. Depending on the requirements of the system, different values can be chosen to optimize for quicker detection or reduced false positives. An example provided in the paper demonstrates how this can be useful: consider a job scheduling system with a master and a set of workers, where the master monitors the status of the workers and assigns jobs to them. Instead of having a binary determination of whether a worker is alive or not, consider having two thresholds $\Phi_1 < \Phi_2$. If $\Phi_1$ exceeded, then stop sending new jobs to the worker, and only when $\Phi_2$ is exceeded assume the worker has failed and reassign any pending jobs. Thus the system can minimize both the chance that it reassigns jobs unnecessarily as well as the chance of assigning jobs to a failed worker.
Phi accrual is one implementation of an accrual failure detector. The way it works is quite simple and relies on the processes sending each other heartbeats at a regular interval, e.g. 100 milliseconds. It keeps track of the intervals between heartbeats in a sliding window of time and measures the mean and variance of these samples, building the corresponding normal distribution with cumulative density function $P(x)$. Then define $\phi(t) = -\log_{10}(1 - P(t - t_{last}))$ where $t_{last}$ is the last time at which a heartbeat was received. The value of $\phi$ will increase the longer it has been since the last heartbeat. Furthermore, the algorithm is adaptive to network conditions because of the measurements in the sliding window. If the network becomes slow or unreliable, the resulting mean and variance will increase; as such, there will need to be a longer period for which no heartbeat is received before the process is suspected. The phi accrual model additionally allows for convenient intuition as to the choice of the threshold $\Phi$. Assuming that the sliding window is representative of the real distribution of heartbeat inter-arrival times, a choice of $\Phi = 1$ means that there is about a 10% chance of a false positive, and in general the probability of a false positive is $0.1^{\Phi}$.
One of the hardest parts about building distributed systems is that there is much less "library code" that can be leveraged. Oftentimes, good software engineering is about not reinventing the wheel, and having well-defined, reusable components is essential to scaling a codebase (or multiple). Phi accrual failure detection seems like it could be an important library that distributed systems can plug in without having to worry about the intricacies of the algorithm or the implementation.
Monday, August 5, 2013
Local vs Remote
While reading through the Akka documentation (the actor and remoting library for Scala), I once again came across a reference to the classic distributed systems paper from Sun discussing the difficulty of unifying local and remote programming models. I briefly touched on the content of the paper in a previous post, but it's worth revisiting in the context of concurrent programming and understanding how the actor model works in a distributed system. With Akka actors, like in Erlang, the programming model revolves entirely around message passing and asynchronous processing even for actors running within the same Java Virtual Machine (JVM). This blurs the line between whether you are sending messages to a local actor and a remote actor, but it is arguably done in the "correct" way. So it is interesting to understand what the difference is between the actor model and other attempts at such unification, e.g. remote procedure calls (RPC), which are generally considered to be less than ideal (although still widely used).
Firstly, we need to recap the four points brought up in the paper that supposedly doom a unified local and remote object-oriented model. They are latency, memory access, partial failure, and concurrency. Latency and memory access are fairly straightforward; because the object on which a method is called lives in a different address space on a remote machine, the method call is going to be much slower and memory accesses have to be managed in some meaningful way across multiple machines. Partial failure refers to the fact that there are multiple components that can fail independently of each other (e.g. the network and the remote entity), which can cause the method call to be in an unknown or perhaps even inconsistent state. Lastly, the issue of concurrency arises because there is no shared synchronization system which can mange the order of method calls on an object, so the programmer must be aware of the fact that an object can receive multiple method calls simultaneously. These points can typically be alleviated by a well-designed RPC library and being diligent in distinguishing local and remote invocations, but that is essentially the argument being made, that in practice they are not truly unified.
So then the question is how the actor model is different and why interacting with local and remote actors can be done in a mostly unified way. The problems of latency and concurrency are non-issues in the actor model because of the asynchronous design and the fact that each actor is processing a single message at a time. And assuming that actors have no shared state and communicate exclusively through the immutable messages passed between them, we can avoid confusing memory access semantics as well. Partial failure is, again, the most complex of the issues, and the actor model itself does not prescribe specific guarantees about message processing, such as idempotence, so it is still left to the programmer to figure out how to keep the system consistent. Regardless, we can see that actors, by design, are better suited for a unified local and remote model since they greatly reduce the space of potential problems to worry about. The corollary, however, is that local operations are more complex due to the way in which communicating between actors and sharing state is restricted, but I am not entirely convinced that this is a bad thing in light of the difficulty of multithreaded programming.
The more I learn about actors, the more optimistic I become about their potential to make concurrent programming less of a disaster. The Akka project is really interesting and gives Scala one more feature that is very much lacking in Java; the mere possibility that synchronized blocks and semaphores can be avoided altogether is enough to motivate me to pursue understanding Akka. Given that the future only holds more concurrent and distributed programming, I feel like programmers should be much better educated about the alternatives to traditional, error-prone multithreading.
Firstly, we need to recap the four points brought up in the paper that supposedly doom a unified local and remote object-oriented model. They are latency, memory access, partial failure, and concurrency. Latency and memory access are fairly straightforward; because the object on which a method is called lives in a different address space on a remote machine, the method call is going to be much slower and memory accesses have to be managed in some meaningful way across multiple machines. Partial failure refers to the fact that there are multiple components that can fail independently of each other (e.g. the network and the remote entity), which can cause the method call to be in an unknown or perhaps even inconsistent state. Lastly, the issue of concurrency arises because there is no shared synchronization system which can mange the order of method calls on an object, so the programmer must be aware of the fact that an object can receive multiple method calls simultaneously. These points can typically be alleviated by a well-designed RPC library and being diligent in distinguishing local and remote invocations, but that is essentially the argument being made, that in practice they are not truly unified.
So then the question is how the actor model is different and why interacting with local and remote actors can be done in a mostly unified way. The problems of latency and concurrency are non-issues in the actor model because of the asynchronous design and the fact that each actor is processing a single message at a time. And assuming that actors have no shared state and communicate exclusively through the immutable messages passed between them, we can avoid confusing memory access semantics as well. Partial failure is, again, the most complex of the issues, and the actor model itself does not prescribe specific guarantees about message processing, such as idempotence, so it is still left to the programmer to figure out how to keep the system consistent. Regardless, we can see that actors, by design, are better suited for a unified local and remote model since they greatly reduce the space of potential problems to worry about. The corollary, however, is that local operations are more complex due to the way in which communicating between actors and sharing state is restricted, but I am not entirely convinced that this is a bad thing in light of the difficulty of multithreaded programming.
The more I learn about actors, the more optimistic I become about their potential to make concurrent programming less of a disaster. The Akka project is really interesting and gives Scala one more feature that is very much lacking in Java; the mere possibility that synchronized blocks and semaphores can be avoided altogether is enough to motivate me to pursue understanding Akka. Given that the future only holds more concurrent and distributed programming, I feel like programmers should be much better educated about the alternatives to traditional, error-prone multithreading.
Subscribe to:
Posts (Atom)