The hardware and bandwidth for this mirror is donated by dogado GmbH, the Webhosting and Full Service-Cloud Provider. Check out our Wordpress Tutorial.
If you wish to report a bug, or if you are interested in having us mirror your free-software or open-source project, please feel free to contact us at mirror[@]dogado.de.

Tools for Inter-Process Communication

Ian Fellows

2023-01-25

Introduction

ipc provides methods for inter-process communication useful when developing Shiny applications that use asynchronous processing.

Quick Start

Perhaps the most important use of this package is to communicate with a parent process from a child process executed using the future package. This is very easy to do. Simply create and start a Queue in the parent process. The child process can then send messages and evaluate R code on the main process.

library(ipc)
library(future)
plan(multisession)
q <- queue()

value <- ""

f <- future({
  # set value in the main process
  q$producer$fireEval({
    value <- "Hello world"
  })
})

Sys.sleep(.5)
# Evaluate signals
cons <- q$consumer$consume()
print(value)
## [1] "Hello world"
# Remove temporary files
q$destroy()

A Queue object has a producer field that is used to send signals onto the Queue and a consumer field that is used to read from the queue, and process any signals written to the Queue.

Messages can be consumed by calling the consumer’s consume method and will be handled by default using the environment that consume is called in. The consumer’s start method will execute consume at regular intervals provided the R process is idle.

By default, a Consumer object knows how to handle two signals. The "eval" handler will execute the signal’s data as an expression in the evaluation environment (i.e. where consume is called. The "doCall" handler will call the function defined by the first element of the data with parameters equal to the second element. Functions to handle various signals can be added to the consumer using the addHandler method.

Producer objects has a built in method to signal for evaluation. fireEval(expr, env) will signal for expr to be evaluated, substituting in any value in env. For example, the following code will set the variable val to 2 in the environment in which it is consumed.

variable <- 2
q$producer$fireEval(val <- j, env=list(j=variable))

Signaling a child process

Signals can also be sent from the main process to a child. Here we will cause the child process to throw an error.

library(future)
library(promises)
plan(multisession)

q <- queue()

fut <- future({
  for(i in 1:1000){
    Sys.sleep(.1)
    q$consumer$consume()
 }
})

q$producer$fireEval(stop("Stop that child"))
cat(try(value(fut)))
## Error in eval(obj, envir = env) : Stop that child
## Error in eval(obj, envir = env) : Stop that child
q$destroy()

If errors occur during the consumption, all messages are processed, and then the first error encountered is then thrown. Alternatively, errors can be switched to warnings using q$consumer$consume(throwErrors=FALSE).

Continuous consumption

A consumer’s start method can be used to execute comsume at regular intervals (provided the R process is idle).

library(future)
library(promises)
plan(multisession)

q <- queue()

fut <- future({
  for(i in 1:100){
    Sys.sleep(.1)
    q$producer$fireEval(print(index), list(index=i))
 }
})

q$consumer$start()

# ... Later, stop consumption and clean up
# q$destroy()

Sources

By default, ipc communication is backed by text files (using TextFileSource class, which wraps the txtq package). The files’ location is, again by default, generated by the tempfile function. These global defaults can be overridden using the tempFileGenerator and defaultSource functions.

For communication between processes on the same machine, the defaults will generally suffice. If processes are running on multiple machines, two strategies for sources may be used. First, if all machines have access to a single file system, override the tempFileGenerator to point to generate files in the shared file system. Alternately, ipc provides the RedisSource class to back queues using a redis database.

q <- queue(RedisSource$new())

Use in Shiny

A major use case for this package is to support Shiny applications. You can view three example applications using the shinyExample function. Taking advantage of inter-process communication allows for more dynamic applications that are more responsive to the user.

In Shiny apps, it is recommended that queues be created with the shinyQueue function. This will ensure that the queue is properly destroyed on session end.

Changing a reactive value from a future

Reactive values can not be changed directly from within a future. Queues make it easy to signal the main process to assign a reactive value from within the body of a future.

The application below creates a future every time countdown is clicked, which assigns a value to the reactive value every second, counting down from 10 to 0. If you click the button multiple times, each future will compete to set the value, and the numbers will jump around.

library(shiny)
library(ipc)
library(future)
plan(multisession)

ui <- fluidPage(

  titlePanel("Countdown"),

  sidebarLayout(
    sidebarPanel(
      actionButton('run', 'count down')
    ),

    mainPanel(
      tableOutput("result")
    )
  )
)

server <- function(input, output) {

  queue <- shinyQueue()
  queue$consumer$start(100) # Execute signals every 100 milliseconds

  # A reactive value to hold output
  result_val <- reactiveVal()
  
  # Handle button click
  observeEvent(input$run,{
    future({
      for(i in 10:0){
        Sys.sleep(1)
        result <- data.frame(count=i)
        # change value
        queue$producer$fireAssignReactive("result_val",result)
      }
    })

    #Return something other than the future so we don't block the UI
    NULL
  })

  # set output to reactive value
  output$result <- renderTable({
    req(result_val())
  })
}

# Run the application
shinyApp(ui = ui, server = server)

Adding a progress bar to an async operation

AsyncProgress is a drop in replacement for Shiny’s Progress class that allows you to update progress bars within a future. The example below shows a minimal example of this. Note how you can click run multiple times and get multiple progress bars.

library(shiny)
library(ipc)
library(future)
library(promises)
plan(multisession)

ui <- fluidPage(

  titlePanel("Countdown"),

  sidebarLayout(
    sidebarPanel(
      actionButton('run', 'Run')
    ),

    mainPanel(
      tableOutput("result")
    )
  )
)

server <- function(input, output) {
  
  # A reactive value to hold output
  result_val <- reactiveVal()
  
  # Handle button click
  observeEvent(input$run,{
    result_val(NULL)
    
    # Create a progress bar
    progress <- AsyncProgress$new(message="Complex analysis")
    future({
      for(i in 1:10){
        Sys.sleep(1)
        progress$inc(1/10) # Increment progress bar
      }
      progress$close() # Close the progress bar
      data.frame(result="Insightful result")
    }) %...>% result_val  # Assign result of future to result_val

    # Return something other than the future so we don't block the UI
    NULL
  })

  # Set output to reactive value
  output$result <- renderTable({
    req(result_val())
  })
}

# Run the application
shinyApp(ui = ui, server = server)

Killing a long running process

When creating a UI with a process that take a significant amount of times it is critical to provide the user a mechanism to cancel the operation. The recommended mechanism to do this is to send a kill message. The AsyncInterruptor is an easy to use wrapper around a Queue object for this common use case.

library(future)
library(promises)
plan(multisession)

# A long running function. Having a callback (progressMonitor) is a way to
# allow for interrupts to be checked for without adding a dependency
# to the analysis function.
accessableAnalysisFunction <- function(progressMonitor=function(i) cat(".")){
  for(i in 1:1000){
    Sys.sleep(.1)
    progressMonitor(i)
  }
  data.frame(result="Insightful analysis")
}

inter <- AsyncInterruptor$new()
fut <- future({
  accessableAnalysisFunction(progressMonitor = function(i) inter$execInterrupts())
})
inter$interrupt("Stop that future")
cat(try(value(fut)))
## Error : Stop that future
## Error : Stop that future
inter$destroy()

There are times where the Shiny developer does not have access to the long running code in such a way that execInterrupts can be called as the computation progresses. In these cases, the only way to terminate a running future is to kill it at the OS level.

The function stopMulticoreFuture kills a future, provided it is executed in a multicore plan. For mac and linux machines, plan(multicore) results in a multicore execution plans. In windows it is not possible to use a multicore execution plan.

The behavior of the execution plan after a kill signal has been sent is technically undefined, but currently there are no large unintended consequences of killing child processes. This may change in the future however, which is why AsyncInterruptor is strongly preferred if at all possible.

library(shiny)
library(ipc)
library(future)
library(promises)
plan(multicore)    # This will only work with multicore, which is unavailable on Windows

inaccessableAnalysisFunction <- function(){
  Sys.sleep(10)
  data.frame(result="Insightful analysis")
}

# Define UI for application that draws a histogram
ui <- fluidPage(

  # Application title
  titlePanel("Cancelable Async Task"),

  # Sidebar with a slider input for number of bins
  sidebarLayout(
    sidebarPanel(
      actionButton('run', 'Run'),
      actionButton('cancel', 'Cancel')
    ),

    # Show a plot of the generated distribution
    mainPanel(
      tableOutput("result")
    )
  )
)

server <- function(input, output) {

  fut <- NULL

  result_val <- reactiveVal()
  running <- reactiveVal(FALSE)
  observeEvent(input$run,{

    #Don't do anything if in the middle of a run
    if(running())
      return(NULL)
    running(TRUE)

    print("Starting Run")
    result_val(NULL)
    fut <<- future({
      result <- inaccessableAnalysisFunction()
    })
    prom <- fut %...>% result_val
    prom <- catch(fut,
                 function(e){
                   result_val(NULL)
                   print(e$message)
                   showNotification("Task Stopped")
                 })
    prom <- finally(prom, function(){
      print("Done")
      running(FALSE) #declare done with run
    })

    #Return something other than the future so we don't block the UI
    NULL
  })


  # Kill future
  observeEvent(input$cancel,{
    #
    # Use this method of stopping only if you don't have access to the
    # internals of the long running process. If you are able, it is
    # recommended to use AsyncInterruptor instead.
    #
    stopMulticoreFuture(fut)
  })


  output$result <- renderTable({
    req(result_val())
  })
}

# Run the application
shinyApp(ui = ui, server = server)

Use in HPC Environments

Shiny apps are not the only place where it is desirable to monitor computationally complex tasks. Packages like ergm and rstan use parallel computing when performing MCMC simulations. ipc can be used to report back intermediate results and progress to the user. Here is a function that does parallel (fake) MCMC, and provides both a progress bar and running trace plots for the chains.

library(parallel)


mcmcTask <- function(){
  on.exit(q$destroy())
  nchains <- min(4, detectCores() - 1)
  q <- queue()
  prog <- 1
  pb <- txtProgressBar(min = 0, max = nchains * 10, style=3)
  res <- list()
  chains <- list()
  for(i in 1:nchains){
    # Run each chain in parallel
    res[[i]] <- mcparallel({
      chain <- c()
      for(j in 1:10){
        chain <- c(chain, rnorm(100))
        
        # Send the current chain to the main process
        q$producer$fireEval(chains[[i]] <- chain, list(i=i,chain=chain))
        # Update progress
        q$producer$fireEval(prog <- prog + 1)
        
        Sys.sleep(runif(1)*5)
      }
      chain
    })
  }
  
  # Monitor progress
  while(prog < nchains * 10){
    q$consumer$consume()
    setTxtProgressBar(pb, prog)
    par(mfrow=c(2,ceiling(nchains / 2)))
    for(j in seq_along(chains)) 
      if(length(chains[[j]]) > 1) 
        plot(chains[[j]], main=paste("Trace Plot for MCMC chain", j))
    Sys.sleep(2)
  }
  close(pb)
  mccollect(res)
}

chains <- mcmcTask()

These binaries (installable software) and packages are in development.
They may not be fully stable and should be used with caution. We make no claims about them.
Health stats visible at Monitor.