The hardware and bandwidth for this mirror is donated by dogado GmbH, the Webhosting and Full Service-Cloud Provider. Check out our Wordpress Tutorial.
If you wish to report a bug, or if you are interested in having us mirror your free-software or open-source project, please feel free to contact us at mirror[@]dogado.de.
mirai = future in Japanese. Async evaluation framework for R built on NNG/nanonext.
Architecture: daemons dial into host, a topology which facilitates dynamic scaling.
This is a cheatsheet. Refer to the mirai reference manual for a detailed introduction.
mirai() returns immediately, access result via m[] or m$datadaemons() sets persistent background processeslocal_url(tcp=TRUE) + tunnel=TRUE when ports blockedcluster_config() with appropriate scheduler.compute parametermirai_map(): Parallel map with progress bars, early stopping, flatmaplibrary(mirai)
# Create a mirai (returns immediately)
m <- mirai(
{
Sys.sleep(1)
rnorm(5, mean)
},
mean = 10
)
# Direct access (non-blocking)
unresolved(m) # Check if resolved (TRUE if still running)
m$data # Returns value (NA if unresolved)
# Access result (blocks until ready)
m[] # Wait and return value
collect_mirai(m) # Wait and return value
call_mirai(m) # Wait and return mirai object
# Via ... (assigned to daemon global env)
m <- mirai(func(x), func = my_func, x = data)
# Via .args (local to evaluation env)
m <- mirai(func(x), .args = list(func = my_func, x = data))
# Pass entire environment
write_async <- function(x, file) {
mirai(write.csv(x, file), .args = environment())
}
# Set 4 local daemons (with dispatcher - default)
daemons(4)
# Without dispatcher (round-robin distribution)
daemons(4, dispatcher = FALSE)
# Reset daemons
daemons(0)
# Check connection / statistics
info()
daemons(
n = 4,
dispatcher = TRUE, # Use dispatcher for optimal FIFO scheduling
cleanup = TRUE, # Clean env between tasks
output = FALSE, # Capture stdout/stderr
maxtasks = Inf, # Task limit per daemon
idletime = Inf, # Max idle time (ms) before exit
walltime = Inf # Time limit (ms) before exit
)
daemons(sync = TRUE) # Run in current process
m <- mirai(Sys.getpid())
daemons(0)
# Listen at host URL with TLS
daemons(
url = host_url(tls = TRUE),
remote = ssh_config(c("ssh://10.75.32.90", "ssh://node2:22"))
)
# Or without automatic launching
daemons(url = host_url(tls = TRUE))
launch_remote(2, remote = ssh_config("ssh://10.75.32.90"))
host_url() # Auto-detect IP, tcp://x.x.x.x:0
host_url(tls = TRUE) # TLS connection
host_url(tls = TRUE, port = 5555) # Specific port
local_url() # IPC (Unix sockets/named pipes)
local_url(tcp = TRUE) # tcp://127.0.0.1:0
local_url(tcp = TRUE, port = 5555) # tcp://127.0.0.1:5555
ssh_config(
remotes = c("ssh://node1:22", "ssh://node2:22"),
tunnel = FALSE, # Direct connection
timeout = 10, # Connection timeout (seconds)
command = "ssh", # SSH executable
rscript = "Rscript" # R executable on remote
)
Requirements for SSH Direct:
# Host uses localhost URL
daemons(
n = 4,
url = local_url(tcp = TRUE), # tcp://127.0.0.1:0
remote = ssh_config("ssh://10.75.32.90", tunnel = TRUE)
)
# Or with specific port
daemons(
n = 2,
url = local_url(tcp = TRUE, port = 5555), # tcp://127.0.0.1:5555
remote = ssh_config("ssh://remote-server", tunnel = TRUE)
)
How Tunnelling Works:
daemons(
n = 4,
url = host_url(),
remote = cluster_config(
command = "sbatch", # Scheduler command: "sbatch", "qsub", "bsub", etc.
options = "#SBATCH --job-name=mirai
#SBATCH --mem=16G
#SBATCH --cpus-per-task=1
#SBATCH --output=mirai_%j.out
#SBATCH --error=mirai_%j.err
module load R/4.5.0",
rscript = file.path(R.home("bin"), "Rscript")
)
)
| Scheduler | Command | Job Name | Memory | CPUs |
|---|---|---|---|---|
| Slurm | sbatch |
#SBATCH --job-name=NAME |
--mem=16G |
--cpus-per-task=1 |
| SGE | qsub |
#$ -N NAME |
-l mem_free=16G |
-pe smp 1 |
| Torque/PBS | qsub |
#PBS -N NAME |
-l mem=16gb |
-l nodes=1:ppn=1 |
| LSF | bsub |
#BSUB -J NAME |
-M 16000 |
-n 1 |
# Set daemons to listen
daemons(url = host_url(tls = TRUE))
# Get launch commands (doesn't execute)
cmds <- launch_remote(
n = 2,
remote = remote_config() # Empty config returns commands
)
# Copy/paste commands to run on remote machines
# E.g. Rscript -e "mirai::daemon('tcp://10.75.32.70:5555')"
print(cmds)
# Create CPU profile
daemons(4, .compute = "cpu")
# Create GPU profile
daemons(2, .compute = "gpu")
# Direct tasks to specific profile
m_cpu <- mirai(heavy_compute(), .compute = "cpu")
m_gpu <- mirai(gpu_task(), .compute = "gpu")
# Reset specific profile
daemons(0, .compute = "cpu")
# Temporarily use profile
with_daemons("gpu", {
model <- mirai(train_model())
})
# Set profile for scope
local_daemons("cpu")
m <- mirai(task()) # Uses "cpu" profile
with(daemons(4), {
m1 <- mirai(task1())
m2 <- mirai(task2())
c(m1[], m2[])
})
# Daemons auto-reset on exit
daemons(url = host_url())
launch_local(2) # 2 local daemons
launch_remote(4, ssh_config("ssh://remote")) # 4 remote
daemons(url = host_url()) # Start listening
launch_local(2) # Add 2 daemons
# Later...
# Add 2 more (automatically exit after idle for 60 secs)
launch_local(2, idletime = 60000)
daemons(4)
# Simple map
results <- mirai_map(1:10, sqrt)[]
# With additional arguments
results <- mirai_map(
1:10,
rnorm,
.args = list(mean = 5, sd = 2)
)[]
# With helper functions
results <- mirai_map(
1:100,
function(x) transform(x, helper),
helper = my_helper_func
)[]
# Flatten to vector
results <- mirai_map(1:10, rnorm, .args = list(n = 1))[.flat]
# Progress bar
results <- mirai_map(1:100, slow_func)[.progress]
# Early stopping on error
results <- mirai_map(data_list, process)[.stop]
# Combine options
results <- mirai_map(1:100, task)[.stop, .progress]
# Map over dataframe rows
df <- data.frame(x = 1:10, y = 11:20)
mirai_map(df, function(x, y) x + y)[.flat]
# Map over matrix rows
mat <- matrix(1:6, nrow = 3, dimnames = list(c("a","b","c"), c("x","y")))
mirai_map(mat, function(x, y) x * y)[]
m <- mirai(stop("error"))
m[]
# Test error types
is_mirai_error(m$data) # Execution error
is_mirai_interrupt(m$data) # User interrupt
is_error_value(m$data) # Any error (catch-all)
# Access error details
m$data$stack.trace # Full stack trace
m$data$condition.class # Original error classes
m$data$message # Error message
status() # Detailed status
info() # Concise statistics
daemons_set() # Check if daemons exist
require_daemons() # Error if not set
# Per-mirai timeout (requires dispatcher for auto-cancellation)
m <- mirai(Sys.sleep(10), .timeout = 1000) # 1 second
m[] # Returns errorValue 5 (timed out)
# Cancel mirai (requires dispatcher)
m <- mirai(Sys.sleep(100))
stop_mirai(m) # Attempts cancellation
m$data # errorValue 20 (canceled)
# Load package on all daemons
everywhere(library(data.table))
# Export variables to all daemons
everywhere(config <<- list(threads = 4))
# Export variables to all daemons
everywhere({}, db_conn = my_conn, api_key = key)
# Statistically-sound but non-reproducible (default)
daemons(4, seed = NULL)
# Reproducible RNG (seed per mirai call)
daemons(4, seed = 123)
# For torch tensors, Arrow tables, Polars objects
daemons(
4,
serial = serial_config(
"torch_tensor",
sfunc = torch::torch_serialize,
ufunc = torch::torch_load
)
)
# Global registration
register_serial("torch_tensor", torch::torch_serialize, torch::torch_load)
daemons(4) # Auto-applies registered configs
# Auto TLS (zero-config certificates)
daemons(url = host_url(tls = TRUE))
# Custom certificate
daemons(
url = host_url(tls = TRUE),
tls = "/path/to/cert.pem",
pass = function() askpass::askpass()
)
| Feature | With Dispatcher (default) | Direct (dispatcher=FALSE) |
|---|---|---|
| Scheduling | Optimal FIFO | Round-robin |
| Timeouts | ✓ | No auto-cancellation |
| Cancellation | ✓ | ✗ |
| Serialization | ✓ | ✗ |
| Overhead | Slightly higher | Minimal |
| Use case | Variable task times | Similar task times |
┌─ Need async in R?
│
├─ Single task → mirai()
│ └─ No daemons set? → ephemeral (auto-creates process)
│
├─ Map operation → mirai_map()
│ └─ Requires daemons() to be set first
│
└─ Multiple tasks → Set up daemons
│
├─ Local only
│ └─ daemons(n)
│
├─ Remote with open ports
│ └─ daemons(url = host_url(), remote = ssh_config(..., tunnel = FALSE))
│
├─ Remote with firewall/blocked ports
│ └─ daemons(url = local_url(tcp = TRUE), remote = ssh_config(..., tunnel = TRUE))
│
└─ HPC cluster (Slurm/SGE/PBS/LSF)
└─ daemons(url = host_url(), remote = cluster_config(...))
# Expression Evaluation
mirai(pkg::func(x), x = data)
# Namespace functions OR library() inside expression
mirai(func(x), func = my_func, x = data)
# Pass dependencies explicitly via ... or .args
# Dispatcher Required For
stop_mirai(m) # Cancellation
mirai(task(), .timeout = 1000) # Timeout cancellation
daemons(4, serial = serial_config(...)) # Custom serialization
# SSH Tunnelling
daemons(url = local_url(tcp = TRUE), remote = ssh_config(..., tunnel = TRUE))
# Must use 127.0.0.1 (not external IP) + tunnel = TRUE
# TLS
host_url(tls = TRUE) # Auto TLS (zero-config, just works)
# Custom certs: provide cert path + optional passphrase function
# Remote Prerequisites
# - SSH key-based auth configured beforehand
# - SSH direct: host port open to inbound connections
# - HPC: correct module load commands and scheduler directives
These binaries (installable software) and packages are in development.
They may not be fully stable and should be used with caution. We make no claims about them.
Health stats visible at Monitor.