R - parallel computing in 5 minutes (with foreach and doParallel)

Parallel computing is easy to use in R thanks to packages like doParallel. However, before we decide to parallelize our code, still we should remember that there is a trade-off between simplicity and performance. So if your script runs a few seconds, probably it's not worth to bother yourself. Yet if your analysis are computationally heavy, you can often save hours or even days. In such case, it's reasonable to sacrifice code readability and clear error messages to save time.

Let's start from beginning. First, install doParallel package and load it.

install.packages('doParallel')
library(doParallel)

Then, run a simple and sequential foreach loop which calculate sum of hyperbolic tangent function results. It's not especially useful, but it will be a good example.

system.time(foreach(i=1:10000) %do% sum(tanh(1:i)))

# Output:
#    user  system elapsed 
#   2.949   0.016   2.968 

Now, let's change %do% to %dopar% and check what will happen.

system.time(foreach(i=1:10000) %dopar% sum(tanh(1:i)))

# Output:
#    user  system elapsed 
#   2.947   0.036   2.988 
# Warning message:
# executing %dopar% sequentially: no parallel backend registered 

If you run it for the first time, you should see the warning message which indicates that the loop ran sequentially. To execute your code in parallel, beside using dopar, you have to register parallel backend - don't worry, it's easy.

registerDoParallel()

Run command above and try one more time parallized version of the loop. You should see that now the execution time is lower. By default, registering backend without any parameters creates 3 workers on Windows and approximately half of the number of cores on Unix systems.

Parallel backend

Let's explore it futher. The first step will be checking how many workers we have available. Call following function to get the answer.

getDoParWorkers()

# Output:
# [1] 4

Now, let's try to switch back to sequential execution and check once more the number of workers.

registerDoSEQ()
getDoParWorkers()

# Output:
# [1] 1

Next, we will register parallel backend once more, but this time we will set number of workers explicitly.

registerDoParallel(cores=2)
getDoParWorkers()

# Output:
# [1] 2

And then, try to use more than we have physical cores.

registerDoParallel(cores=300)
getDoParWorkers()

# Output:
# [1] 300

system.time(foreach(i=1:10000) %dopar% sum(tanh(1:i)))
# Output:
#   user  system elapsed 
#  2.325   6.216   5.725 

As you can see, it is possible to use many more workers than number of cores, but it also increase overhead of dispatching tasks. So in our example, total execution time is much longer than in sequential case.

doParallel also allows us to create computational cluster manually, but we have to remember to unregister it, by calling stopCluster funtion, when we have finished our work.

cl <- makeCluster(2)
registerDoParallel(cl)
system.time(foreach(i=1:10000) %dopar% sum(tanh(1:i)))
stopCluster(cl)

Functions and packages

Few weeks ago, I had a problem with foreach loop because it was not able to see local functions or imported from external packages. The solution, which worked for me, was using additional loop's parameters (.export and .packages) and pass the function and package names explicitly.

results <- foreach(i=1:n, .export=c('function1', 'function2'), .packages='package1') %dopar% {
  # do something cool
}

Merging results

This part is not directly related with parallel computing, but it will be useful to know how to merge output from foreach loop in different ways.
By default, it returns you a list with outputs from all iterations.

results = foreach(i=1:10) %dopar% {
  data.frame(feature=rnorm(10))
}
class(results)

# Output:
# [1] "list"

You can overwrite it by setting .combine parameter. Let's make a data frame which store in each column results from one iteration.

results = foreach(i=1:10, .combine=data.frame) %dopar% {
  data.frame(feature=rnorm(10))
}
class(results)

# Output:
# [1] "data.frame"

Now, we can try to write our own custom function for merging results. In every iteration, we will generate a data frame with two columns, i.e. timestamp and measurement/feature returned by sensor. After going through all iterations, we would like to have single data frame merged by timestamps. First, we will create the merging function and then we will use it.

merge.by.time <- function(a, b) {
  merge(a, b, by='timestamp', suffixes=c('', ncol(a)))
}

results = foreach(i=1:5, .combine=merge.by.time) %dopar% {
  data.frame(timestamp=sample(1:10), feature=rnorm(10))
}

print(results)

# Output:
#    timestamp    feature   feature2   feature3    feature4    feature5
# 1          1 -0.1655825 -0.9301484 -0.6144039 -0.04826379 -1.14872702
# 2          2 -1.0214898  0.2888518  0.8086409 -0.51811068 -0.10786999
# 3          3 -0.9717168  0.8614015  1.6521166 -0.26958848  0.86063073
# 4          4 -1.8141072 -0.2487387  0.3302528  1.44081105 -0.70726657
# 5          5  0.6748330 -0.6913867 -1.4586897 -0.40082273 -0.09189869
# 6          6 -0.3780126  0.1760140  1.0881200 -1.60095458  0.78786617
# 7          7 -1.0968293  1.4114997  0.1611462  0.29579596 -1.30987061
# 8          8 -0.9285541 -2.3757749 -0.3329841 -0.64194054 -1.17378147
# 9          9 -0.9434158  0.9353895  0.9668269 -2.41119024 -0.39902072
# 10        10  0.4024025 -1.5194827 -0.5828012 -0.49676977 -0.98481906