/ R

multidplyr - dplyr meets parallel processing

Note: I assume that you are familiar with dplyr. If not, I suggest using first the following tutorial: https://cran.rstudio.com/web/packages/dplyr/vignettes/introduction.html.

Intro

dplyr is one of my favourite R packages for data manipulation. It's extremely handy, easy to start and also very elegant with the pipe notation (%>%) yet powerful.

Nevertheless, among all these advantages, there is one thing which could be improved - unsupported parallelization. Due to the model of data processing, operations like summarize or do could be easily executed in parallel, but they are not.

Luckily, there is multidplyr which takes all the best from dplyr and adds parallel processing. You can find the project on GitHub: https://github.com/hadley/multidplyr.

According to Hadley Wickham, author of the R package, the speedup, achieved through parallelisation, is visible when there are more than 10 million records or the function performed in do is particularly heavy.

multidplyr installation

Before you install multidplyr, you have to have devtools package. You can get both using following commands:

install.packages("devtools")
devtools::install_github("hadley/multidplyr")

Basic functions

There are two basic functions introduced by the package. First of them is partition and it divides data into groups which will be processed independently. Thus, in many cases, it can be viewed as a replacement for group_by function. The second is collect which joins results produced in parallel into one object. If you are familiar with Spark, you can see the analogy. Below, you can find a simple example of the same code executed sequentially in dplyr and in parallel with multidplyr.

# Load packages
library(dplyr)
library(multidplyr)

# We will use built-in dataset - airquality
# dplyr
airquality %>% group_by(Month) %>% summarize(cnt = n())

# multidplyr
airquality %>% partition(Month) %>% summarize(cnt = n()) %>% collect()

And the output:

# dplyr
# A tibble: 5 × 2
  Month   cnt
  <int> <int>
1     5    31
2     6    30
3     7    31
4     8    31
5     9    30


# multidplyr
# A tibble: 5 × 2
  Month   cnt
  <int> <int>
1     7    31
2     9    30
3     5    31
4     6    30
5     8    31

As you can notice, the results are the same, but order. In the first call, data are ordered by the month. While in the parallel version, only the parts which were processed together are ordered - 7, 9 and 5, 6, 8 (note that results can be different; depends on how the data were split).

Cluster management

If we want to change the number of cores used during computations, we can create our own cluster. Then, we can decide if we want to use it only for the single call (through passing cluster object as the partition function argument) or as a default cluster.

# Creating 4-core cluster
cluster <- create_cluster(4)

# Using cluster only for a single call
airquality %>% partition(Month, cluster = cluster) %>% summarize(cnt = n()) %>% collect()

# Setting default cluster
set_default_cluster(cluster)
airquality %>% partition(Month) %>% summarize(cnt = n()) %>% collect()

User's functions and variables

By default, multidplyr cannot use any user-created functions or variables. When we try to do this, we receive an error.

four <- function(x) {
  return(data.frame(a = 4))
}

one <- 1

# dplyr - using user's function
airquality %>% group_by(Month) %>% do(four(.))

# dplyr - using user's variable
airquality %>% group_by(Month) %>% do(data.frame(b=one))

# multidplyr - using user's function
airquality %>% partition(Month) %>% do(four(.)) %>% collect()

# multidplyr - using user's variable
airquality %>% partition(Month) %>% do(data.frame(b=one)) %>% collect()

When you try to execute the code above, the dplyr statements will finish successfully in contrary to multidplyr's which return the following errors:

Error in checkForRemoteErrors(lapply(cl, recvResult)) : 
  2 nodes produced errors; first error: could not find function "four"

Error in checkForRemoteErrors(lapply(cl, recvResult)) : 
  2 nodes produced errors; first error: object 'one' not found

To fix the errors, we have to register function four and variable one to make them visible for the cluster. To do this, we will use cluster_assign_value method which takes three arguments - cluster, symbol name and value. If we have not created our own cluster, we can also retrieve the default one (check code below).

# Create new cluster
cluster <- create_cluster(4)
# Or retrieve the default one
cluster <- get_default_cluster()

# Register function and variable
cluster_assign_value(cluster, 'four', four)
cluster_assign_value(cluster, 'one', one)

After registration, we should be able to run the multidplyr's code without any problems. Also, we can check already registered objects with cluster_ls method, get one of these items using cluster_get and unregister with cluster_rm.

# Check registered items
cluster_ls(cluster)
# Get the item
cluster_get(cluster, 'one')
# Unregister function and variable
cluster_rm(cluster, c('four', 'one'))

Also, if you prefer, you can do the same using pipe notation instead passing cluster as the first argument.

# Check registered items
cluster %>% cluster_ls()
# Get the item
cluster %>% cluster_get('one')
# Unregister function and variable
cluster %>% cluster_rm(c('four', 'one'))

Besides cluster_assign_value, there are three other methods of registering symbols. Thus, you have a possibility to choose the best one depending on a situation.
The first of them is cluster_copy which is the equivalent of cluster_assign_value with the same name as original the symbol. So if you are not going to use any different name, this is the preferred approach.

# Both commands has the same effect
cluster_copy(cluster, four)
cluster_assign_value(cluster, "four", four)

The second of the functions is cluster_assign_expr which allows us to assign R code to the symbol:

cluster_assign_expr(cluster, 'random10', rnorm(10))
cluster_get(cluster, 'random10')

# Output:
# [[1]]
#  [1] -1.1242879 -0.5550312  0.6660461  0.3170633 -0.9264522  
#  [6] 0.4113212  -1.4133881  0.5977884 -1.6699814 -0.2328526

# [[2]]
#  [1] -1.1242879 -0.5550312  0.6660461  0.3170633 -0.9264522  
#  [6] 0.4113212  -1.4133881  0.5977884 -1.6699814 -0.2328526

And the last one is cluster_assign_each. It is an interesting function which can assign a different value to each cluster node (core) while all of them will have a common symbol. It means that we can use the same piece of code for all of them. In an example, when we want to process files in parallel.

cluster_assign_each(cluster, 'filename', list('file1.csv', 'file2.csv'))
cluster_get(cluster, 'filename')

# Output:
# [[1]]
# [1] "file1.csv"

# [[2]]
# [1] "file2.csv"

Finally, I want to add that if we register the same symbol twice, we overwrite the previous value:

cluster_assign_value(cluster, 'one', 1)
cluster_get(cluster, 'one')

# Output:
# [[1]]
# [1] 1

# [[2]]
# [1] 1

cluster_assign_value(cluster, 'one', 2)
cluster_get(cluster, 'one')

# Output:
# [[1]]
# [1] 2

# [[2]]
# [1] 2

If you have any questions, just leave a comment.