The hardware and bandwidth for this mirror is donated by dogado GmbH, the Webhosting and Full Service-Cloud Provider. Check out our Wordpress Tutorial.
If you wish to report a bug, or if you are interested in having us mirror your free-software or open-source project, please feel free to contact us at mirror[@]dogado.de.

Result and Error Handling

library(jobqueue)
q <- Queue$new()

Reacting to Results

my_job <- q$run({ 42 })

# ===  A 'jobqueue'-style callback  ============
my_job$on('done', ~message('Result = ', .$result))
#> Result = 42

# ===  A 'promises'-style callback  ============
my_job %...>% message('Result = ', .)
#> Result = 42

See also vignette('hooks')

Output vs Result

When expr is finished evaluating, the result is assigned to <Job>$output.

job <- q$run({ 42 })

job$output
#> [1] 42

By default, <Job>$result will return the exact same value as <Job>$output.

job$result
#> [1] 42

However, while <Job>$output is a fixed value, <Job>$result is highly configurable. Most notably by the reformat and signal parameters.

Reformatting Output

Use reformat = function (job) {...} to define what should be returned by <Job>$result.

Important

You may access job$output in your reformat function.
DO NOT access job$result, which would start an infinite recusion.

reformat can be set in several places:

Transformation

http_wrap <- function (job) {
  result <- job$output
  paste0('{"status":200,"body":{"result":', result, '}}')
}

job <- q$run({ 42 }, reformat = http_wrap)
cat(job$result)
#> {"status":200,"body":{"result":42}}

Job Details

with_id <- function (job) {
  list(result = job$output, id = job$id)
}

job <- q$run({ 42 }, reformat = with_id, id = 'abc')
dput(job$result)
#> list(result = 42, id = "abc")

Non-blocking

<Job>$output blocks until the Job is done. If you want <Job>$result to not block, you can return a placeholder value instead.

reformat <- function (job) {
  if (job$is_done) { job$output$result          }
  else             { 'result not available yet' }
}

job <- q$run({ Sys.sleep(5); 42 }, reformat = reformat)
job$result
#> [1] "result not available yet"

job$state
#> [1] "running"

Signaling Errors

By default, errors and interrupts will be caught by jobqueue and returned as a condition object from <Job>$result.

expr <- quote(stop('error XYZ'))

job <- q$run(expr)
x   <- job$result

inherits(x, 'condition')
#> TRUE

x
#> Error in `q$run(expr)`:
#> ! Error evaluating <Job>$expr on background process
#> Caused by error in `stop("error XYZ")`:
#> ! error XYZ
#> ---
#> Subprocess backtrace:
#> 1. base::eval(expr = expr, envir = vars, enclos = .GlobalEnv)
#> 2. base::stop("error XYZ")
#> 3. | base::.handleSimpleError(function (e) …
#> 4. global h(simpleError(msg, call))

If you want those errors to continue propagating, set signal = TRUE.

job <- q$run(expr, signal = TRUE)
x   <- job$result
#> Error in "q$run(expr, signal = TRUE)" : 
#>    ! Error evaluating <Job>$expr on background process
#>  Caused by error in `stop("error XYZ")`:
#>  ! error XYZ

These signals can be caught as usual by try() and tryCatch().

print_cnd <- function (cnd) cli::cli_text('Caught {.type {cnd}}.')

job <- q$run({ Sys.sleep(5) }, signal = TRUE)$stop()
res <- tryCatch(job$result, error = print_cnd, interrupt = print_cnd)
#> Caught an <interrupt> object.

res
#> NULL

job <- q$run({ stop('error XYZ') }, signal = TRUE)
res <- tryCatch(job$result, error = print_cnd, interrupt = print_cnd)
#> Caught an <error> object.

res
#> NULL

You can also signal some conditions while catching others. Perhaps stopping a job is part of the “normal” workflow and shouldn’t be lumped in with other errors.

job <- q$run({ Sys.sleep(5) }, signal = 'error')$stop()
res <- tryCatch(job$result, error = print_cnd)

res
#> <interrupt: job stopped by user>

job <- q$run({ stop('error XYZ') }, signal = 'error')
res <- tryCatch(job$result, error = print_cnd)
#> Caught an <error> object.

res
#> NULL

Promise Handling

You can pass a Job directly to any function that expects a ‘promise’ object, such as promises::then or promises::%...>%. These functions are re-exported by jobqueue, so calling library(promises) is optional.

q$run({ 42 }) %...>% message
#> 42

By default, errors and interrupts are caught by jobqueue and used for promise fulfillment.

onFulfilled <- function (value) cli::cli_text('Fulfilled with {.type {value}}.')
onRejected  <- function (err)   cli::cli_text('Rejected with {.type {err}}.')

job <- q$run({ Sys.sleep(5) })$stop()
job %>% then(onFulfilled, onRejected)
#> Fulfilled with an <interrupt> object.

job <- q$run({ stop('error XYZ') })
job %>% then(onFulfilled, onRejected)
#> Fulfilled with an <error> object.

Setting signal = TRUE will cause errors and interrupts be sent to the promise’s rejection handler instead.

job <- q$run({ Sys.sleep(5) }, signal = TRUE)$stop()
job %>% then(onFulfilled, onRejected)
#> Rejected with an <interrupt> object.

job <- q$run({ stop('error XYZ') }, signal = TRUE)
job %>% then(onFulfilled, onRejected)
#> Rejected with an <error> object.

These binaries (installable software) and packages are in development.
They may not be fully stable and should be used with caution. We make no claims about them.
Health stats visible at Monitor.