Overview
SafeMapper achieves fault tolerance through three core mechanisms:
- Fingerprinting - Uniquely identifies each computational task
- Checkpointing - Periodically saves intermediate results
- Auto-Recovery - Seamlessly resumes interrupted tasks
This article provides an in-depth explanation of how these mechanisms work.
library(SafeMapper)
#> SafeMapper: Fault-tolerant functional programmingArchitecture Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ SafeMapper System Architecture │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ User Code Layer │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ s_map() s_map2() s_pmap() s_future_map() s_walk() ... │ │
│ └─────────────────────────────┬────────────────────────────────────┘ │
│ │ │
│ Core Engine Layer ▼ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ .safe_execute() │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │Fingerprint │ │ Checkpoint │ │ Batch │ │ Error │ │ │
│ │ │ Generator │ │ Manager │ │ Processor │ │ Retry │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │ │
│ └─────────────────────────────┬────────────────────────────────────┘ │
│ │ │
│ Storage Layer ▼ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ R User Cache Directory (~/.cache/R/SafeMapper/) │ │
│ │ └── checkpoints/ │ │
│ │ ├── session_abc123.rds │ │
│ │ └── session_def456.rds │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
1. Fingerprinting Mechanism
What is a Fingerprint?
A fingerprint is a unique identifier that identifies a specific computational task. SafeMapper automatically generates fingerprints by analyzing input data characteristics.
Fingerprint Generation Flow
┌─────────────────────────────────────────────────────────────────┐
│ Fingerprint Generation │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Input Data │
│ [1, 2, 3, ..., 1000] │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Extract Features │ │
│ │ ┌─────────────────────────────────────┐ │ │
│ │ │ mode: "map" │ │ │
│ │ │ length: 1000 │ │ │
│ │ │ class: "numeric" │ │ │
│ │ │ first: 1 │ │ │
│ │ │ last: 1000 │ │ │
│ │ └─────────────────────────────────────┘ │ │
│ └───────────────────┬─────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ xxhash64 Hash Calculation │ │
│ └───────────────────┬─────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Fingerprint: "map_7a3b9c2d1e8f4a5b" │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Feature Selection Rationale
# These two calls will generate the same fingerprint
data <- 1:100
result1 <- s_map(data, ~ .x^2)
#> [1%] Processing items 1-100 of 100
#> Completed 100 items
# If re-run immediately, same fingerprint will be detected
# This call generates a different fingerprint (different data)
data2 <- 1:200
result2 <- s_map(data2, ~ .x^2)
#> [0%] Processing items 1-100 of 200
#> [50%] Processing items 101-200 of 200
#> Completed 200 itemsWhy not hash the entire dataset?
- Complete hashing of large datasets is very time-consuming
- Using features (length, first/last elements, type) enables fast, stable fingerprinting
- In practice, this approach is sufficient to distinguish different tasks
Custom Session IDs
For more precise control, you can manually specify a session ID:
# Use custom session ID
result <- s_map(1:20, ~ .x^2, .session_id = "my_custom_session")
#> [5%] Processing items 1-20 of 20
#> Completed 20 items
# This ensures that even tasks with similar data features won't conflict2. Checkpointing Mechanism
Checkpoint Data Structure
┌─────────────────────────────────────────────────────────────────┐
│ Checkpoint File Structure │
├─────────────────────────────────────────────────────────────────┤
│ │
│ checkpoint_file.rds │
│ │ │
│ ├── results: list() │
│ │ └── [1], [2], [3], ..., [completed items] │
│ │ │
│ └── metadata: list() │
│ ├── session_id: "map_7a3b9c2d..." │
│ ├── total_items: 1000 │
│ ├── completed_items: 500 │
│ ├── mode: "map" │
│ ├── created: "2026-01-23 10:30:00" │
│ └── last_updated: "2026-01-23 10:35:00" │
│ │
└─────────────────────────────────────────────────────────────────┘
Checkpoint Save Timing
┌─────────────────────────────────────────────────────────────────┐
│ Batch Processing & Checkpoints │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Data: [1, 2, 3, ..., 1000] Batch Size: 100 │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Batch 1: [1-100] │ │
│ │ │ │ │
│ │ ├── Complete ──────────► 💾 Save checkpoint (100) │ │
│ │ ▼ │ │
│ │ Batch 2: [101-200] │ │
│ │ │ │ │
│ │ ├── Complete ──────────► 💾 Save checkpoint (200) │ │
│ │ ▼ │ │
│ │ Batch 3: [201-300] │ │
│ │ │ │ │
│ │ └── ❌ INTERRUPTED! │ │
│ │ │ │
│ │ 💾 Checkpoint saved: 200 items completed │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Configuring Batch Size
# Default batch size is 100
# For fast operations, increase batch size to reduce I/O
s_configure(batch_size = 200)
# For slow operations (like API calls), decrease batch size for more frequent saves
s_configure(batch_size = 10)
# Reset to defaults
s_configure(batch_size = 100)3. Auto-Recovery Mechanism
Recovery Flow
┌─────────────────────────────────────────────────────────────────┐
│ Auto-Recovery Flow │
├─────────────────────────────────────────────────────────────────┤
│ │
│ s_map(data, func) is called │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Generate Fingerprint│ │
│ │ "map_7a3b9c2d..." │ │
│ └──────────┬──────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Checkpoint Exists? │── Yes ─►│ Validate Checkpoint │ │
│ └──────────┬──────────┘ │ - Data length match?│ │
│ │ │ - File not corrupt? │ │
│ │ No └──────────┬──────────┘ │
│ │ │ │
│ ▼ │ Valid │
│ ┌─────────────────────┐ │ │
│ │ Start Fresh │ ▼ │
│ │ start_idx = 1 │ ┌─────────────────────┐ │
│ └──────────┬──────────┘ │ Resume Progress │ │
│ │ │ "Resuming from 200" │ │
│ │ │ start_idx = 201 │ │
│ │ └──────────┬──────────┘ │
│ │ │ │
│ └───────────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Continue Batch │ │
│ │ Processing │ │
│ └──────────┬──────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Delete Checkpoint │ │
│ │ Return Results │ │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Recovery Demo
# Simulate a task that might be interrupted
simulate_task <- function(x) {
Sys.sleep(0.01)
x^2
}
# First run
result <- s_map(1:30, simulate_task, .session_id = "recovery_demo")
#> [3%] Processing items 1-30 of 30
#> Completed 30 items
# If task is interrupted, simply re-run the same code:
# result <- s_map(1:30, simulate_task, .session_id = "recovery_demo")
# Output: "Resuming from item XX/30"4. Error Retry Mechanism
Retry Flow
┌─────────────────────────────────────────────────────────────────┐
│ Error Retry Mechanism │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Processing Batch [101-200] │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Attempt 1 │ │
│ │ ❌ Error: Timeout │ │
│ └──────────┬──────────┘ │
│ │ │
│ ▼ Wait 1 second │
│ ┌─────────────────────┐ │
│ │ Attempt 2 │ │
│ │ ❌ Error: Server Busy│ │
│ └──────────┬──────────┘ │
│ │ │
│ ▼ Wait 1 second │
│ ┌─────────────────────┐ │
│ │ Attempt 3 │ │
│ │ ✅ Success! │ │
│ └──────────┬──────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Save Checkpoint │ │
│ │ Continue Next Batch │ │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Configure Retry Attempts
# For unstable network environments, increase retry attempts
s_configure(retry_attempts = 5)
# For stable local computations, reduce retry attempts
s_configure(retry_attempts = 1)
# Reset to default
s_configure(retry_attempts = 3)5. Storage Location
SafeMapper uses R’s standard user cache directory to store checkpoints:
# Checkpoint storage location (varies by system)
# Linux: ~/.cache/R/SafeMapper/checkpoints/
# macOS: ~/Library/Caches/org.R-project.R/R/SafeMapper/checkpoints/
# Windows: %LOCALAPPDATA%/R/cache/R/SafeMapper/checkpoints/Complete Execution Flow
┌─────────────────────────────────────────────────────────────────────────────┐
│ SafeMapper Complete Execution Flow │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ User Invocation │ │
│ │ s_map(data, func, .session_id = NULL) │ │
│ └───────────────────────────────┬───────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ 1. Generate Fingerprint │ │
│ │ session_id = .make_fingerprint(data, "map") │ │
│ └───────────────────────────────┬───────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ 2. Try Recovery │ │
│ │ restored = .try_restore(session_id, length(data)) │ │
│ │ ├── Has checkpoint ──► results = restored$results │ │
│ │ │ start_idx = completed_items + 1 │ │
│ │ └── No checkpoint ───► results = vector("list", n) │ │
│ │ start_idx = 1 │ │
│ └───────────────────────────────┬───────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ 3. Batch Processing Loop │ │
│ │ for batch in batches(start_idx:n): │ │
│ │ │ │ │
│ │ ├── Show progress: "[XX%] Processing items X-Y of N" │ │
│ │ │ │ │
│ │ ├── Execute batch (with retry) │ │
│ │ │ batch_results = .execute_batch_with_retry(...) │ │
│ │ │ │ │
│ │ ├── Store results │ │
│ │ │ results[batch_indices] = batch_results │ │
│ │ │ │ │
│ │ └── Save checkpoint │ │
│ │ .save_checkpoint(session_id, results, ...) │ │
│ └───────────────────────────────┬───────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ 4. Completion │ │
│ │ .cleanup_checkpoint(session_id) # Delete checkpoint │ │
│ │ message("Completed N items") │ │
│ │ return(.format_output(results, output_type)) │ │
│ └───────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Design Principles
SafeMapper follows these design principles:
┌─────────────────────────────────────────────────────────────────┐
│ Design Principles │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Zero Configuration First │
│ ├── Works out of the box, no setup required │
│ └── All configuration is optional │
│ │
│ 2. Non-Invasive │
│ ├── API fully compatible with purrr/furrr │
│ ├── Just change function name, no code restructuring │
│ └── Can switch back to native purrr anytime │
│ │
│ 3. Transparent Operation │
│ ├── Checkpoints managed automatically │
│ ├── Users don't need to worry about details │
│ └── Automatic cleanup after success │
│ │
│ 4. Safe and Reliable │
│ ├── Save per batch, minimize data loss │
│ ├── Automatic error retry │
│ └── Corrupted checkpoints safely ignored │
│ │
└─────────────────────────────────────────────────────────────────┘
Next Steps
- 🔧 Map Functions - Learn all mapping functions in detail
- ⚡ Parallel Processing - Speed up with future
- 🛡️ Error Handling - Build more robust code