Skip to contents

Overview

This guide presents best practices for using SafeMapper in production environments. Follow these guidelines to build robust, maintainable, and efficient data processing pipelines.

library(SafeMapper)
#> SafeMapper: Fault-tolerant functional programming

Architecture Patterns

Pattern 1: The Robust Pipeline

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Production Pipeline Architecture                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                         Configuration Layer                          │   │
│   │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                  │   │
│   │  │   Config    │  │   Logging   │  │  Monitoring │                  │   │
│   │  │   Setup     │  │   Setup     │  │   Setup     │                  │   │
│   │  └─────────────┘  └─────────────┘  └─────────────┘                  │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                      │                                       │
│                                      ▼                                       │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                         Input Validation                             │   │
│   │  ├── Check data types                                               │   │
│   │  ├── Validate required fields                                       │   │
│   │  └── Filter invalid records                                         │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                      │                                       │
│                                      ▼                                       │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                         SafeMapper Processing                        │   │
│   │  ├── s_map/s_future_map with checkpointing                         │   │
│   │  ├── Error wrappers (s_safely/s_possibly)                          │   │
│   │  └── Progress tracking                                              │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                      │                                       │
│                                      ▼                                       │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                         Post-Processing                              │   │
│   │  ├── Result validation                                              │   │
│   │  ├── Error aggregation                                              │   │
│   │  ├── Cleanup                                                        │   │
│   │  └── Reporting                                                      │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Implementation

# Production pipeline template
run_production_pipeline <- function(data, process_fn, config = list()) {
  # ===== Configuration Layer =====
  default_config <- list(
    batch_size = 100,
    retry_attempts = 3,
    session_id = paste0("pipeline_", format(Sys.time(), "%Y%m%d_%H%M%S")),
    fail_threshold = 0.1  # Max 10% failure rate
  )
  config <- modifyList(default_config, config)
  
  s_configure(
    batch_size = config$batch_size,
    retry_attempts = config$retry_attempts
  )
  
  # ===== Input Validation =====
  if (length(data) == 0) {
    stop("No data provided")
  }
  
  message(sprintf("[%s] Starting pipeline: %d items", Sys.time(), length(data)))
  
  # ===== SafeMapper Processing =====
  safe_fn <- s_safely(process_fn)
  
  results <- s_map(
    data,
    safe_fn,
    .session_id = config$session_id
  )
  
  # ===== Post-Processing =====
  successes <- sum(sapply(results, function(x) is.null(x$error)))
  failures <- sum(sapply(results, function(x) !is.null(x$error)))
  success_rate <- successes / length(results)
  
  message(sprintf("[%s] Complete: %d/%d (%.1f%% success)", 
                  Sys.time(), successes, length(results), success_rate * 100))
  
  # Check failure threshold
  if ((1 - success_rate) > config$fail_threshold) {
    warning(sprintf("Failure rate (%.1f%%) exceeds threshold (%.1f%%)",
                    (1 - success_rate) * 100, config$fail_threshold * 100))
  }
  
  # Return structured result
  list(
    results = results,
    summary = list(
      total = length(results),
      successes = successes,
      failures = failures,
      success_rate = success_rate
    ),
    config = config
  )
}

# Usage example
sample_data <- 1:50
output <- run_production_pipeline(
  sample_data,
  function(x) x^2,
  config = list(batch_size = 10, session_id = "demo_pipeline")
)
#> [2026-01-23 04:10:11.645923] Starting pipeline: 50 items
#> [2%] Processing items 1-10 of 50
#> [22%] Processing items 11-20 of 50
#> [42%] Processing items 21-30 of 50
#> [62%] Processing items 31-40 of 50
#> [82%] Processing items 41-50 of 50
#> Completed 50 items
#> [2026-01-23 04:10:11.670645] Complete: 50/50 (100.0% success)
print(output$summary)
#> $total
#> [1] 50
#> 
#> $successes
#> [1] 50
#> 
#> $failures
#> [1] 0
#> 
#> $success_rate
#> [1] 1

Configuration Best Practices

Environment-Based Configuration

# config.R - Environment-specific settings

get_config <- function(env = Sys.getenv("R_ENV", "development")) {
  configs <- list(
    development = list(
      batch_size = 10,
      retry_attempts = 1,
      auto_recover = FALSE,
      session_prefix = "dev"
    ),
    staging = list(
      batch_size = 50,
      retry_attempts = 3,
      auto_recover = TRUE,
      session_prefix = "staging"
    ),
    production = list(
      batch_size = 100,
      retry_attempts = 5,
      auto_recover = TRUE,
      session_prefix = "prod"
    )
  )
  
  config <- configs[[env]]
  if (is.null(config)) {
    warning("Unknown environment, using development settings")
    config <- configs$development
  }
  
  config
}

# Usage
config <- get_config("production")
s_configure(
  batch_size = config$batch_size,
  retry_attempts = config$retry_attempts,
  auto_recover = config$auto_recover
)

Configuration Decision Tree

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Choosing the Right Configuration                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   What type of operation?                                                    │
│   │                                                                          │
│   ├── Fast (< 10ms/item)                                                    │
│   │   └── batch_size: 500-1000                                              │
│   │       retry_attempts: 1                                                 │
│   │                                                                          │
│   ├── Medium (10-100ms/item)                                                │
│   │   └── batch_size: 100 (default)                                         │
│   │       retry_attempts: 3                                                 │
│   │                                                                          │
│   ├── Slow (100ms-1s/item)                                                  │
│   │   └── batch_size: 20-50                                                 │
│   │       retry_attempts: 3-5                                               │
│   │                                                                          │
│   └── Very Slow (> 1s/item)                                                 │
│       └── batch_size: 5-20                                                  │
│           retry_attempts: 2-3                                               │
│                                                                              │
│   Network involved?                                                          │
│   │                                                                          │
│   ├── YES ──► Increase retry_attempts (3-5)                                 │
│   │           Consider exponential backoff in function                      │
│   │                                                                          │
│   └── NO ───► Lower retry_attempts (1-2)                                    │
│               Errors usually persistent                                      │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Error Handling Strategies

The Three-Layer Defense

# Layer 1: Function-level error handling
process_with_validation <- function(item) {
  # Input validation
  if (is.null(item) || length(item) == 0) {
    return(list(status = "skipped", reason = "empty input"))
  }
  
  # Try the operation
  tryCatch({
    result <- item^2  # Your actual operation
    list(status = "success", value = result)
  }, error = function(e) {
    list(status = "error", reason = e$message)
  })
}

# Layer 2: s_possibly for unexpected errors
safe_process <- s_possibly(process_with_validation, 
                           otherwise = list(status = "failed", reason = "unknown"))

# Layer 3: SafeMapper checkpointing
results <- s_map(
  list(1, NULL, 3, "invalid", 5),
  safe_process,
  .session_id = "three_layer_demo"
)
#> [20%] Processing items 1-5 of 5
#> Completed 5 items

# Review results
statuses <- sapply(results, function(x) x$status)
table(statuses)
#> statuses
#>   error skipped success 
#>       1       1       3

Error Recovery Workflow

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Error Recovery Workflow                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Step 1: Detect Failure                                                     │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  Pipeline fails or returns high error rate                          │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                      │                                       │
│                                      ▼                                       │
│   Step 2: Analyze Errors                                                     │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  # Extract error patterns                                           │   │
│   │  errors <- results[has_errors]                                      │   │
│   │  error_types <- table(error_messages)                               │   │
│   │  # Identify: Transient? Systematic? Data issue?                     │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                      │                                       │
│                                      ▼                                       │
│   Step 3: Take Action                                                        │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  Transient errors ──► Re-run (checkpoint handles this)              │   │
│   │  Data errors ────────► Fix data, re-run with new session_id         │   │
│   │  Code errors ────────► Fix code, re-run with new session_id         │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Session Management

Naming Convention

# Recommended session ID formats

# Time-based (for scheduled jobs)
session_daily <- paste0("daily_report_", format(Sys.Date(), "%Y%m%d"))

# Version-based (for algorithm changes)
session_versioned <- "model_training_v2.1"

# Task-based (for specific operations)
session_task <- "customer_data_migration_batch1"

# Combined (recommended for production)
session_production <- paste0(
  "prod_",                                    # Environment
  "data_sync_",                               # Task name
  format(Sys.time(), "%Y%m%d_%H%M%S"),       # Timestamp
  "_v1"                                       # Version
)

print(session_production)
#> [1] "prod_data_sync_20260123_041011_v1"

Cleanup Strategy

# cleanup_jobs.R - Run periodically

perform_cleanup <- function() {
  message("Starting checkpoint cleanup...")
  
  # Remove old sessions (> 7 days)
  removed <- s_clean_sessions(older_than_days = 7)
  message(sprintf("Removed %d old session files", removed))
  
  # Note: In a real scenario, you might want to:
  # - Archive before deletion
  # - Send notifications for failed sessions
  # - Log cleanup activities
  
  invisible(removed)
}

# Run cleanup
perform_cleanup()
#> Starting checkpoint cleanup...
#> No session files found
#> Removed 0 old session files

Performance Optimization

Batch Size Tuning

# Performance comparison helper
benchmark_batch_sizes <- function(data, func, sizes = c(10, 50, 100, 200)) {
  results <- list()
  
  for (size in sizes) {
    s_configure(batch_size = size)
    
    start_time <- Sys.time()
    s_map(data, func, .session_id = paste0("bench_", size))
    end_time <- Sys.time()
    
    results[[as.character(size)]] <- as.numeric(end_time - start_time)
    
    # Clean up benchmark sessions
    s_clean_sessions(session_ids = paste0("bench_", size))
  }
  
  data.frame(
    batch_size = sizes,
    time_seconds = unlist(results)
  )
}

# Example (with small data for demo)
# In production, use larger datasets
test_data <- 1:100
results <- benchmark_batch_sizes(
  test_data, 
  function(x) { Sys.sleep(0.001); x^2 },
  sizes = c(10, 25, 50)
)
#> [1%] Processing items 1-10 of 100
#> [11%] Processing items 11-20 of 100
#> [21%] Processing items 21-30 of 100
#> [31%] Processing items 31-40 of 100
#> [41%] Processing items 41-50 of 100
#> [51%] Processing items 51-60 of 100
#> [61%] Processing items 61-70 of 100
#> [71%] Processing items 71-80 of 100
#> [81%] Processing items 81-90 of 100
#> [91%] Processing items 91-100 of 100
#> Completed 100 items
#> No files to remove
#> [1%] Processing items 1-25 of 100
#> [26%] Processing items 26-50 of 100
#> [51%] Processing items 51-75 of 100
#> [76%] Processing items 76-100 of 100
#> Completed 100 items
#> No files to remove
#> [1%] Processing items 1-50 of 100
#> [51%] Processing items 51-100 of 100
#> Completed 100 items
#> No files to remove
print(results)
#>    batch_size time_seconds
#> 10         10    0.1346638
#> 25         25    0.1312821
#> 50         50    0.1296210

Memory Management

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Memory Management Guidelines                              │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   1. Process Large Data in Chunks                                            │
│      ├── Don't load entire dataset at once                                  │
│      ├── Use generators or iterators where possible                         │
│      └── Clear intermediate results: rm(temp_data); gc()                    │
│                                                                              │
│   2. Mind Checkpoint Size                                                    │
│      ├── Each checkpoint stores all completed results                       │
│      ├── Large result objects = large checkpoint files                      │
│      └── Consider storing references instead of full objects               │
│                                                                              │
│   3. Parallel Processing Memory                                              │
│      ├── Each worker copies global data                                     │
│      ├── Reduce workers if memory-constrained                               │
│      └── Use .options$globals to minimize copies                            │
│                                                                              │
│   4. Monitor Memory Usage                                                    │
│      ├── Check with: pryr::mem_used() or lobstr::obj_size()                │
│      ├── Profile with: profmem package                                      │
│      └── Set limits: options(future.globals.maxSize = 1e9)                 │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Monitoring and Logging

Progress Tracking

# Custom progress wrapper
process_with_logging <- function(items, func, session_id) {
  total <- length(items)
  start_time <- Sys.time()
  
  message(sprintf("[START] %s - Processing %d items", session_id, total))
  
  results <- s_map(items, function(item) {
    result <- func(item)
    result
  }, .session_id = session_id)
  
  end_time <- Sys.time()
  duration <- as.numeric(end_time - start_time, units = "secs")
  
  message(sprintf("[END] %s - Completed in %.2f seconds (%.2f items/sec)", 
                  session_id, duration, total / duration))
  
  results
}

# Usage
results <- process_with_logging(
  1:50,
  function(x) { Sys.sleep(0.01); x^2 },
  "logged_task"
)
#> [START] logged_task - Processing 50 items
#> [2%] Processing items 1-50 of 50
#> Completed 50 items
#> [END] logged_task - Completed in 0.61 seconds (81.59 items/sec)

Production Checklist

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Production Readiness Checklist                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Before Deployment:                                                         │
│   □ Configuration                                                            │
│     □ batch_size tuned for workload                                         │
│     □ retry_attempts appropriate for error types                            │
│     □ Environment-specific configs set up                                   │
│                                                                              │
│   □ Error Handling                                                           │
│     □ s_safely/s_possibly wrapper in place                                  │
│     □ Input validation implemented                                          │
│     □ Error aggregation and reporting                                       │
│                                                                              │
│   □ Session Management                                                       │
│     □ Meaningful session_id naming convention                               │
│     □ Cleanup job scheduled                                                 │
│     □ Checkpoint directory has sufficient space                             │
│                                                                              │
│   □ Monitoring                                                               │
│     □ Logging implemented                                                   │
│     □ Progress tracking visible                                             │
│     □ Failure alerts configured                                             │
│                                                                              │
│   □ Testing                                                                  │
│     □ Tested with representative data                                       │
│     □ Tested recovery from interruption                                     │
│     □ Tested error scenarios                                                │
│     □ Performance benchmarked                                               │
│                                                                              │
│   During Operation:                                                          │
│   □ Monitor checkpoint directory size                                       │
│   □ Review error rates regularly                                            │
│   □ Update session_id when code changes                                     │
│   □ Run cleanup jobs on schedule                                            │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Anti-Patterns to Avoid

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Common Mistakes and Solutions                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   ❌ Using same session_id for different tasks                              │
│   ✅ Use unique, descriptive session IDs                                    │
│                                                                              │
│   ❌ Very small batch_size for fast operations                              │
│   ✅ Match batch_size to operation speed                                    │
│                                                                              │
│   ❌ Not handling function errors (letting them propagate)                  │
│   ✅ Use s_safely/s_possibly for graceful error handling                    │
│                                                                              │
│   ❌ Never cleaning up old checkpoints                                      │
│   ✅ Schedule regular cleanup with s_clean_sessions()                       │
│                                                                              │
│   ❌ Ignoring checkpoint after code changes                                 │
│   ✅ Update session_id when algorithm changes                               │
│                                                                              │
│   ❌ Using parallel for simple/fast operations                              │
│   ✅ Reserve s_future_map for CPU-intensive tasks                           │
│                                                                              │
│   ❌ Storing very large objects in results                                  │
│   ✅ Store references or summaries, write large objects to disk             │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Summary

# Quick reference for production use

# 1. Configure appropriately
s_configure(
  batch_size = 100,     # Tune based on operation speed
  retry_attempts = 3    # Higher for network operations
)

# 2. Use descriptive session IDs
session_id <- paste0("prod_task_", format(Sys.time(), "%Y%m%d"))

# 3. Wrap functions for safety (using s_safely for error capture)
my_function <- function(x) x^2
safe_fn <- s_safely(my_function)

# 4. Process with fault tolerance
data <- 1:20
results <- s_map(data, safe_fn, .session_id = session_id)
#> [5%] Processing items 1-20 of 20
#> Completed 20 items

# 5. Handle results properly (s_safely returns list with result/error)
successes <- sum(sapply(results, function(x) is.null(x$error)))
cat("Success rate:", successes / length(results) * 100, "%\n")
#> Success rate: 100 %

# 6. Clean up periodically
s_clean_sessions(older_than_days = 7)
#> No session files found

Additional Resources