Parallel Processing with SafeMapper
Source:vignettes/parallel-processing.Rmd
parallel-processing.RmdOverview
SafeMapper provides fault-tolerant parallel processing through the
s_future_* family of functions. These are drop-in
replacements for furrr functions with automatic
checkpointing.
library(SafeMapper)
#> SafeMapper: Fault-tolerant functional programmingWhy Parallel + Fault Tolerance?
┌─────────────────────────────────────────────────────────────────────────────┐
│ The Challenge of Parallel Processing │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Traditional Parallel Processing: │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ Worker 1: ████████████████████ │ │
│ │ Worker 2: █████████████████████████ │ │
│ │ Worker 3: ██████████████████████████████ │ │
│ │ Worker 4: ████████████████████████████ ❌ CRASH! │ │
│ └───────────────────────────────────────────────────────────────────┘ │
│ │
│ Result: ALL workers' progress lost, must restart everything │
│ │
│ SafeMapper Parallel Processing: │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ Worker 1: ████████████████████ 💾 │ │
│ │ Worker 2: █████████████████████████ 💾 │ │
│ │ Worker 3: ██████████████████████████████ 💾 │ │
│ │ Worker 4: ████████████████████████████ ❌ CRASH! │ │
│ └───────────────────────────────────────────────────────────────────┘ │
│ │
│ Result: Resume from last checkpoint, only redo partial work │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Prerequisites
SafeMapper’s parallel functions require the furrr and
future packages:
install.packages(c("furrr", "future"))Setting Up Parallel Processing
Step 2: Configure Workers
# Use multiple R sessions (works on all platforms)
plan(multisession, workers = 4)
# Or use forked processes (faster, but Unix/Mac only)
# plan(multicore, workers = 4)Step 3: Use s_future_* Functions
# Instead of furrr::future_map()
result <- s_future_map(1:1000, expensive_function)Available Parallel Functions
┌─────────────────────────────────────────────────────────────────────────────┐
│ s_future_* Function Family │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Single-Input │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ s_future_map ──► Parallel map, returns list │ │
│ │ s_future_map_chr ──► Returns character vector │ │
│ │ s_future_map_dbl ──► Returns numeric vector │ │
│ │ s_future_map_int ──► Returns integer vector │ │
│ │ s_future_map_lgl ──► Returns logical vector │ │
│ │ s_future_map_dfr ──► Returns row-bound data frame │ │
│ │ s_future_map_dfc ──► Returns column-bound data frame │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
│ Dual-Input │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ s_future_map2 ──► Parallel map with two inputs │ │
│ │ s_future_map2_chr ──► Returns character vector │ │
│ │ s_future_map2_dbl ──► Returns numeric vector │ │
│ │ s_future_map2_int ──► Returns integer vector │ │
│ │ s_future_map2_lgl ──► Returns logical vector │ │
│ │ s_future_map2_dfr ──► Returns row-bound data frame │ │
│ │ s_future_map2_dfc ──► Returns column-bound data frame │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
│ Multi-Input & Side Effects │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ s_future_pmap ──► Parallel map with multiple inputs │ │
│ │ s_future_imap ──► Parallel indexed map │ │
│ │ s_future_walk ──► Parallel side effects │ │
│ │ s_future_walk2 ──► Parallel dual-input side effects │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Basic Usage Examples
s_future_map2
plan(multisession, workers = 2)
x <- 1:50
y <- 51:100
# Process pairs in parallel
results <- s_future_map2(x, y, function(a, b) {
Sys.sleep(0.1)
a * b
})
plan(sequential)s_future_pmap
plan(multisession, workers = 2)
params <- list(
a = 1:30,
b = 31:60,
c = 61:90
)
# Process multiple inputs in parallel
results <- s_future_pmap(params, function(a, b, c) {
Sys.sleep(0.1)
a + b + c
})
plan(sequential)Execution Flow
┌─────────────────────────────────────────────────────────────────────────────┐
│ Parallel Processing Flow │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ s_future_map(data, func) │ │
│ └────────────────────────────────┬────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 1. Check for existing checkpoint │ │
│ │ ├── Found: Resume from checkpoint │ │
│ │ └── Not found: Start fresh │ │
│ └────────────────────────────────┬────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 2. Split data into batches │ │
│ │ Data: [1, 2, 3, ..., 1000] │ │
│ │ Batch 1: [1-100], Batch 2: [101-200], ... │ │
│ └────────────────────────────────┬────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 3. Process each batch with furrr::future_map │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────┐ │ │
│ │ │ Batch items distributed to workers │ │ │
│ │ │ │ │ │
│ │ │ Worker 1: [1-25] ────► Results [1-25] │ │ │
│ │ │ Worker 2: [26-50] ────► Results [26-50] │ │ │
│ │ │ Worker 3: [51-75] ────► Results [51-75] │ │ │
│ │ │ Worker 4: [76-100] ────► Results [76-100] │ │ │
│ │ └─────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └────────────────────────────────┬────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 4. Save checkpoint after each batch │ │
│ │ 💾 Checkpoint: "Batch 1 complete, 100/1000 items" │ │
│ └────────────────────────────────┬────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 5. Repeat until all batches complete │ │
│ │ Then: Delete checkpoint, return full results │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Configuration Options
Batch Size for Parallel
For parallel processing, batch size affects both checkpoint frequency and parallel efficiency:
# Larger batches = more efficient parallel execution
# But less frequent checkpoints
s_configure(batch_size = 200)
# Smaller batches = more frequent checkpoints
# But more overhead from parallelization
s_configure(batch_size = 50)furrr Options
Pass furrr options through .options parameter:
# Custom furrr options
opts <- furrr::furrr_options(
seed = 123, # Reproducible random numbers
globals = TRUE, # Export global variables
packages = "dplyr" # Load packages in workers
)
result <- s_future_map(
1:100,
my_function,
.options = opts
)When to Use Parallel Processing
┌─────────────────────────────────────────────────────────────────────────────┐
│ Decision Tree: Sequential vs Parallel │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Is each operation CPU-intensive (> 100ms)? │
│ │ │
│ ├── YES ──► Is total data size > 100 items? │
│ │ │ │
│ │ ├── YES ──► ✅ Use s_future_map (parallel) │
│ │ │ │
│ │ └── NO ───► ⚠️ Overhead may outweigh benefit │
│ │ Use s_map (sequential) │
│ │ │
│ └── NO ───► Is the operation I/O bound (network, disk)? │
│ │ │
│ ├── YES ──► ⚠️ Parallel may help, but consider: │
│ │ - Rate limits │
│ │ - Connection pools │
│ │ - Resource contention │
│ │ │
│ └── NO ───► ❌ Use s_map (sequential) │
│ Parallel overhead not worth it │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Good Use Cases for Parallel
# 1. Heavy computations
s_future_map(large_datasets, function(data) {
# Complex statistical model fitting
fit_complex_model(data)
})
# 2. Image/file processing
s_future_map(image_files, function(file) {
# CPU-intensive image transformation
process_image(file)
})
# 3. Simulations
s_future_map(1:1000, function(i) {
# Monte Carlo simulation
run_simulation(seed = i)
})Poor Use Cases for Parallel
# 1. Simple operations (overhead > benefit)
# DON'T:
s_future_map(1:1000, ~ .x + 1) # Too simple
# DO:
s_map(1:1000, ~ .x + 1)
# 2. Rate-limited API calls
# DON'T:
s_future_map(urls, fetch_api) # May hit rate limits
# DO:
s_map(urls, fetch_api) # Sequential respects rate limitsHandling Progress
# Enable progress bar
result <- s_future_map(
1:100,
slow_function,
.progress = TRUE
)Error Handling in Parallel
When errors occur in parallel execution, SafeMapper handles them gracefully:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Error Handling in Parallel │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Batch Processing with Error: │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ Batch 1: [1-100] ✅ Success ──► 💾 Save checkpoint │ │
│ │ Batch 2: [101-200] ✅ Success ──► 💾 Save checkpoint │ │
│ │ Batch 3: [201-300] ❌ Error in worker │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Retry entire batch (up to N attempts) │ │
│ │ │ │ │
│ │ ├── Success ──► Continue │ │
│ │ └── Fail ────► Error (200 items saved) │ │
│ └───────────────────────────────────────────────────────────────────┘ │
│ │
│ On re-run: Resume from item 201 │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Best Practices
┌─────────────────────────────────────────────────────────────────────────────┐
│ Parallel Processing Best Practices │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. Choose the Right Number of Workers │
│ ├── CPU-bound: workers = parallel::detectCores() - 1 │
│ └── Memory-intensive: fewer workers to avoid OOM │
│ │
│ 2. Mind Memory Usage │
│ ├── Each worker gets a copy of global data │
│ ├── Use .options$globals to minimize data transfer │
│ └── Consider chunking very large datasets │
│ │
│ 3. Set Appropriate Batch Sizes │
│ ├── Too small: High checkpoint I/O overhead │
│ ├── Too large: More work lost on failure │
│ └── Rule of thumb: 1-5 minutes of work per batch │
│ │
│ 4. Handle Random Seeds │
│ ├── Use .options$seed for reproducibility │
│ └── Each worker gets independent but reproducible stream │
│ │
│ 5. Clean Up Resources │
│ ├── Call plan(sequential) when done │
│ └── Or let R clean up on exit │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Complete Example
library(SafeMapper)
library(future)
# Configure parallel backend
plan(multisession, workers = 4)
# Configure SafeMapper
s_configure(
batch_size = 100, # Checkpoint every 100 items
retry_attempts = 3 # Retry failed batches 3 times
)
# Define your processing function
process_item <- function(x) {
Sys.sleep(0.5) # Simulate work
result <- x^2 + rnorm(1)
return(result)
}
# Run with fault tolerance
results <- s_future_map(
1:500,
process_item,
.progress = TRUE,
.options = furrr::furrr_options(seed = 42)
)
# Clean up
plan(sequential)
# If interrupted, just re-run the same code!
# It will resume from the last checkpoint.Next Steps
- 🛡️ Error Handling - Robust error handling strategies
- 📋 Session Management - Manage checkpoints
- 🎯 Real-World Examples - See parallel processing in action