--- title: "Parallelization" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Parallelization} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, comment = "#>" ) library(SimEngine) ``` ## What is parallelization? Parallelization is the process of speeding up a computer program by dividing it into independent tasks and running those tasks simultaneously across multiple computer processors. Most modern laptops will have two or more processors (or "cores"), and many statisticians have access to so-called cluster computing systems (CCS), which can have hundreds of processing nodes, each of which can have multiple cores. Roughly speaking, a program that can be run in ten minutes when executed on a single core will take just one minute if it can be broken into ten separate tasks that all run at the same time. Therefore, parallelization can result in massive gains in computing speed and should be done whenever possible. Not all code can be parallelized; the separate tasks cannot exchange information or depend on each other in any way. However, you can still write programs that are *partially* parallel, such as when you separately compute ten estimates in parallel and then take the mean of the ten estimates. The terminology associated with parallel computing can be confusing - what is the difference between a node, a core, and a processor? What is the difference between a job, a task, and a thread? We use the following definitions: - A **node** is a single computer. Each node has access to physical resources, such as processing cores and memory. Your laptop is a node. A CCS is a collection of multiple nodes. - A **core** (or a *processor*) is an electronic component within a computer that executes code. Many modern laptops will have more than one core, and each node on a CCS will usually have multiple cores. - A **task** (or a *thread*) is a portion of code that runs on a single core. - A **cluster computing system (CCS)** is a type of "supercomputer", usually created and managed by IT specialists, specifically designed to handle large numbers of parallel tasks coming from multiple users. - A **job** is a collection of tasks that are part of the same simulation. - A **job array** is a special type of job that contains a number of near-identical tasks - A **job scheduler (JS)** is the software that runs on a CCS and manages the process of running jobs and job arrays. Slurm and Oracle Grid Engine are examples of job schedulers. ## Parallelization in **SimEngine** User-friendly parallelization is a hallmark of **SimEngine**. There are two modes of parallelizing code using **SimEngine**, which we refer to as *local parallelization* and *cluster parallelization*. Local parallelization refers to splitting the computational work of a simulation between multiple cores of a single computer (e.g., a multicore laptop). Cluster parallelization refers to running a simulation on a CCS using job arrays. **SimEngine** is designed to automate as much of the parallelization process as possible. We give an overview of each parallelization mode below. ## Local parallelization Local parallelization is the easiest way to parallelize code, as the entire process is handled by the package and executed on the user's computer. This mode is activated using `set_config()`, as follows. ```{r} sim <- new_sim() sim %<>% set_config(parallel = TRUE) ``` **SimEngine** handles the mechanics related to parallelization internally using the base R package `parallel`. If a single simulation replicate runs in a very short amount of time (e.g., less than one second), using local parallelization can actually result in an *increase* in total runtime. This is because there is a certain amount of computational overhead involved in the parallelization mechanisms inside **SimEngine**. A speed comparison can be performed by running the code twice, once with `set_config(parallel = TRUE)` and once with `set_config(parallel = FALSE)`, each followed by `sim %>% vars("total_runtime")`, to see the difference in total runtime. The exact overhead involved with local parallelization will differ between machines. If the user's computer has `n` cores available, **SimEngine** will use `n-1` cores by default. The `n_cores` argument of `set_config()` can be used to manually specify the number of cores to use, as follows. ```{r} sim %<>% set_config(n_cores = 2) ``` ## Cluster parallelization Parallelizing code using a CCS is more complicated, but **SimEngine** is built to streamline this process as much as possible. A CCS is a supercomputer that consists of a number of nodes, each of which may have multiple cores. In a typical workflow, a user starts by logging into the CCS (via SSH) and transferring files to the CCS filesystem (using Linux commands or an FTP client). The user then runs programs by submitting "jobs" to the CCS using a special program called a job scheduler. The job scheduler manages the process of running the jobs in parallel across multiple nodes and/or multiple cores. Although there are multiple ways to run code in parallel on a CCS, **SimEngine** makes use of job arrays. The main cluster parallelization function in **SimEngine** is `run_on_cluster()`. Throughout this example, we use Slurm as an example job scheduler, but an analogous workflow will apply to other job scheduling software. To illustrate the cluster parallelization workflow, consider the following simulation: ```r sim <- new_sim() create_data <- function(n) { return(rpois(n=n, lambda=20)) } est_lambda <- function(dat, type) { if (type=="M") { return(mean(dat)) } if (type=="V") { return(var(dat)) } } sim %<>% set_levels(estimator = c("M","V"), n = c(10,100,1000)) sim %<>% set_script(function() { dat <- create_data(L$n) lambda_hat <- est_lambda(dat=dat, type=L$estimator) return(list("lambda_hat"=lambda_hat)) }) sim %<>% set_config(num_sim=100) sim %<>% run() sim %>% summarize() ``` To run this code on a CCS, we simply wrap it in the `run_on_cluster()` function. To use this function, we must break the code into three blocks, called `first`, `main`, and `last`. The code in the `first` block will run only once, and will set up the simulation object. When this is finished, **SimEngine** will save the simulation object in the filesystem of the CCS. The code in the `main` block will then run once for each simulation replicate, and will have access to the simulation object created in the `first` block. In most cases, the code in the `main` block will simply include a single call to `run()`. Finally, the code in the `last` block will run after all simulation replicates have finished running, and after **SimEngine** has automatically compiled the results into the simulation object. Use of the `run_on_cluster()` function is illustrated below: ```r run_on_cluster( first = { sim <- new_sim() create_data <- function(n) { return(rpois(n=n, lambda=20)) } est_lambda <- function(dat, type) { if (type=="M") { return(mean(dat)) } if (type=="V") { return(var(dat)) } } sim %<>% set_levels(estimator = c("M","V"), n = c(10,100,1000)) sim %<>% set_script(function() { dat <- create_data(L$n) lambda_hat <- est_lambda(dat=dat, type=L$estimator) return(list("lambda_hat"=lambda_hat)) }) sim %<>% set_config(num_sim=100, n_cores=20) }, main = { sim %<>% run() }, last = { sim %>% summarize() }, cluster_config = list(js="slurm") ) ``` Note that none of the actual simulation code changed (with the exception of specifying `n_cores=20` in the `set_config()` call); we simply divided the code into chunks and and placed these chunks into the appropriate block (`first`, `main`, or `last`) within `run_on_cluster()`. Additionally, we specified which job scheduler to use in the `cluster_config` argument list. The command `js_support()` can be run in R to see a list of supported job scheduler software; the value in the `js_code` column is the value that should be specified in the `cluster_config` argument. Unsupported job schedulers can still be used for cluster parallelization, as detailed below. Next, we must give the job scheduler instructions on how to run the above code. In the following, we assume that the R code above is stored in a file called `my_simulation.R`. We also need to create a simple shell script called `run_sim.sh` with the following two lines, which will run `my_simulation.R` (we demonstrate this using BASH scripting language, but any shell scripting language may be used). ```bash #!/bin/bash Rscript my_simulation.R ``` If created on a local machine, the two simulation files (`my_simulation.R` and `run_sim.sh`) must be transferred to the filesystem of the CCS. Finally, we use the job scheduler to submit three jobs. The first will run the `first` code, the second will run the `main` code, and the third will run the `last` code. With Slurm, we run the following three shell commands: ```bash sbatch --export=sim_run='first' run_sim.sh #> Submitted batch job 101 sbatch --export=sim_run='main' --array=1-20 --depend=afterok:101 run_sim.sh #> Submitted batch job 102 sbatch --export=sim_run='last' --depend=afterok:102 run_sim.sh #> Submitted batch job 103 ``` In the first line, we submit the `run_sim.sh` script using the `sim_run='first'` environment variable, which tells **SimEngine** to only run the code in the `first` block. After running this, Slurm returns the message `Submitted batch job 101`. The number `101` is called the "job ID" and uniquely identifies the job on the CCS. In the second line, we submit the `run_sim.sh` script using the `sim_run='main'` environment variable and tell Slurm to run a job array with "task IDs" 1-20. Each task corresponds to one core, and so in this case 20 cores will be used. This number should equal the `n_cores` number specified via `set_config()`. **SimEngine** handles the work of dividing the simulation replicates between the cores; the only restriction is that the number of cores cannot exceed the total number of simulation replicates. Also note that we included the option `--depend=afterok:101`, which instructs the job scheduler to wait until the first job finishes before starting the job array. (In practice, the number 101 must be replaced with whatever job ID Slurm assigned to the first job.) Once this command is submitted, the code in the `main` block will be run for each replicate. A temporary folder called `sim_results` will be created and filled with temporary objects containing data on the results and/or errors for each replicate. In the third line, we submit the `run_sim.sh` script using the `sim_run='last'` environment variable. Again, we use `--depend=afterok:102` to ensure this code does not run until all tasks in the job array have finished. When this job runs, **SimEngine** will compile the results from the `main` block, run the code in the `last` block, save the simulation object to the filesystem, and delete the temporary `sim_results` folder and its contents. If desired, the user can leave the `last` block empty, but this third `sbatch` command should be run anyways to compile the results and save the simulation object for further analysis. ## Additional cluster parallelization functionality ### Running locally The `run_on_cluster()` function is programmed such that it can also be run locally. In this case, the code within the `first`, `main`, and `last` blocks will be executed in the calling environment of the `run_on_cluster()` function (typically the global environment); this can be useful for testing simulations locally before sending them to a CCS. ### Using unsupported job schedulers There may be job schedulers that **SimEngine** does not natively support. If this is the case, **SimEngine** can still be used for cluster parallelization; this requires identifying the environment variable that the job scheduler uses to uniquely identify tasks within a job array. For example, Slurm uses the variable `"SLURM_ARRAY_TASK_ID"` and Grid Engine uses the variable `"SGE_TASK_ID"`. Once this variable is identified, it can be specified in the `cluster_config` block, as follows: ```r run_on_cluster( first = {...}, main = {...}, last = {...}, cluster_config = list(tid_var="SLURM_ARRAY_TASK_ID") ) ``` ### Updating a simulation on a CCS To update a simulation on a CCS, the `update_sim_on_cluster()` function can be used. The workflow is similar to that of `run_on_cluster()`, with several key differences. Instead of creating a new simulation object in the `first` block using `new_sim()`, the existing simulation object (which would have been saved to the filesystem when `run_on_cluster()` was called originally) is loaded using `readRDS()`. Then, the functions `set_levels()` and/or `set_config()` are called to specify the desired updates. In the `main` block, `update_sim()` is called (instead of `run()`). In the `last` block, code can remain the same or change as needed. These differences are illustrated in the code below. ```r update_sim_on_cluster( first = { sim <- readRDS("sim.rds") sim %<>% set_levels(n=c(100,500,1000)) }, main = { sim %<>% update_sim() }, last = { sim %>% summarize() }, cluster_config = list(js="slurm") ) ``` Submission of this code via a job scheduler proceeds in the same manner as described earlier for `run_on_cluster()`.