The hardware and bandwidth for this mirror is donated by dogado GmbH, the Webhosting and Full Service-Cloud Provider. Check out our Wordpress Tutorial.
If you wish to report a bug, or if you are interested in having us mirror your free-software or open-source project, please feel free to contact us at mirror[@]dogado.de.

mirai - Plumber Integration

Plumber Integration

mirai may be used as an asynchronous / distributed backend for plumber pipelines.

Example usage is provided below for different types of endpoint.

Example GET Endpoint

The plumber router code is run in a daemon process itself so that it does not block the interactive process.

The /echo endpoint takes a GET request, sleeps for 1 second (simulating an expensive computation) and simply returns the ‘msg’ request header together with a timestamp and the process ID of the process it is run on.

library(mirai)

# important to supply SIGINT so the plumber server is interrupted and exits
# cleanly when torn down
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1

m <- mirai({
  library(plumber)
  library(promises) # to provide the promise pipe
  library(mirai)

  # does not use dispatcher (suitable when all requests require similar compute)
  daemons(4L, dispatcher = FALSE) # handles 4 requests simultaneously

  pr() |>
    pr_get(
      "/echo",
      function(req, res) {
        mirai(
          {
            Sys.sleep(1L)
            list(status = 200L, body = list(time = format(Sys.time()),
                                            msg = msg,
                                            pid = Sys.getpid()))
          },
          msg = req[["HEADERS"]][["msg"]]
        ) %...>% (function(x) {
          res$status <- x$status
          res$body <- x$body
        })
      }
    ) |>
    pr_run(host = "127.0.0.1", port = 8985)
})

The API can be queried using an async HTTP client such as nanonext::ncurl_aio().

Here, all 8 requests are submitted at once, but we note that that responses have differing timestamps as only 4 can be processed at any one time (limited by the number of daemons set).

library(nanonext)
res <- lapply(1:8,
              function(i) ncurl_aio("http://127.0.0.1:8985/echo",
                                    headers = c(msg = as.character(i))))
res <- lapply(res, call_aio)
for (r in res) print(r$data)
#> [1] "{\"time\":[\"2024-04-23 17:15:11\"],\"msg\":[\"1\"],\"pid\":[36065]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:11\"],\"msg\":[\"2\"],\"pid\":[36062]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:11\"],\"msg\":[\"3\"],\"pid\":[36068]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:11\"],\"msg\":[\"4\"],\"pid\":[36060]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:12\"],\"msg\":[\"5\"],\"pid\":[36065]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:12\"],\"msg\":[\"6\"],\"pid\":[36062]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:12\"],\"msg\":[\"7\"],\"pid\":[36068]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:12\"],\"msg\":[\"8\"],\"pid\":[36060]}"

daemons(0)
#> [1] 0

Example POST Endpoint

Below is a demonstration of the equivalent using a POST endpoint, accepting a JSON instruction sent as request data.

Note that req$postBody should always be accessed in the router process and passed in as an argument to the ‘mirai’, as this is retrieved using a connection that is not serializable.

library(mirai)

# important to supply SIGINT so the plumber server is interrupted and exits cleanly when torn down
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1

m <- mirai({
  library(plumber)
  library(promises) # to provide the promise pipe
  library(mirai)

  # uses dispatcher (suitable for requests with differing compute lengths)
  daemons(4L, dispatcher = TRUE) # handles 4 requests simultaneously

  pr() |>
    pr_post(
      "/echo",
      function(req, res) {
        mirai(
          {
            Sys.sleep(1L) # simulate expensive computation
            list(status = 200L,
                 body = list(time = format(Sys.time()),
                             msg = jsonlite::fromJSON(data)[["msg"]],
                             pid = Sys.getpid()))
          },
          data = req$postBody
        ) %...>% (function(x) {
          res$status <- x$status
          res$body <- x$body
        })
      }
    ) |>
    pr_run(host = "127.0.0.1", port = 8986)
})

Querying the endpoint produces the same set of outputs as the previous example.

library(nanonext)
res <- lapply(1:8,
              function(i) ncurl_aio("http://127.0.0.1:8986/echo",
                                    method = "POST",
                                    data = sprintf('{"msg":"%d"}', i)))
res <- lapply(res, call_aio)
for (r in res) print(r$data)
#> [1] "{\"time\":[\"2024-04-23 17:15:15\"],\"msg\":[\"1\"],\"pid\":[36338]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:15\"],\"msg\":[\"2\"],\"pid\":[36343]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:15\"],\"msg\":[\"3\"],\"pid\":[36340]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:15\"],\"msg\":[\"4\"],\"pid\":[36346]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:16\"],\"msg\":[\"5\"],\"pid\":[36346]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:16\"],\"msg\":[\"6\"],\"pid\":[36338]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:16\"],\"msg\":[\"7\"],\"pid\":[36343]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:16\"],\"msg\":[\"8\"],\"pid\":[36340]}"

daemons(0)
#> [1] 0

These binaries (installable software) and packages are in development.
They may not be fully stable and should be used with caution. We make no claims about them.
Health stats visible at Monitor.