wlandau/crew

Integration with ExtendedTask

Closed this issue · 3 comments

@jcheng5, it was great to talk with you today. I went ahead and made quick improvements to crew based on the end of the call:

  • Clarified the intent of controller$promise(mode = "one"):
    To create a promise specific to the task above, call `as.promise()` on the returned task object.
    ```r
    promise <- as.promise(task)
    ```
    To create a promise that resolves when *any* task in the controller completes, use the `promise()` method of the controller. The following promise prints the output value asynchronously if the task succeeds.
    ```r
    promise <- controller$promise(mode = "one") %...>%
    mutate(result = as.character(result)) %...>%
    print()
    ```
  • Added explicit error handling in the example app:

    crew/vignettes/shiny.Rmd

    Lines 215 to 228 in c572f22

    observe({
    req(reactive_poll())
    invalidateLater(millis = 100)
    output <- controller$pop()
    if (anyNA(output$error)) { # Task succeeded.
    reactive_result(output$result[[1]])
    reactive_status(status_message(n = length(controller$tasks)))
    reactive_poll(controller$nonempty())
    } else if (!is.null(output)) { # Task threw an error.
    reactive_status(paste("Task error:", output$error))
    reactive_poll(FALSE)
    }
    })
    }

The app is much more complicated than I think it needs to be, and ExtendedTask may help a lot. Thanks for the pointer to https://shiny.posit.co/py/docs/nonblocking.html#true-non-blocking-behavior-with-extendedtask. I will have a look.

Wow, extended tasks work so well and are super easy to use! It only took a quick minute on a Friday afternoon to dramatically simplify the example app. See below for a rendition that uses an extended task. I will migrate this to crew's documentation when the next production version of Shiny is released.

library(crew)
library(shiny)
library(ggplot2)
library(aRtsy)

run_task <- function() {
  Sys.sleep(5)
  canvas_phyllotaxis(
    colors = colorPalette(name = "random", n = 3),
    iterations = 1000,
    angle = runif(n = 1, min = - 2 * pi, max = 2 * pi),
    size = 1,
    p = 1
  )
}

status_message <- function(n) {
  if (n > 0) {
    paste(format(Sys.time()), "tasks in progress:", n)
  } else {
    "All tasks completed."
  }
}

ui <- fluidPage(
  actionButton("task", "Submit a task (5 seconds)"),
  textOutput("status"),
  plotOutput("result")
)

server <- function(input, output, session) {
  # reactive values and outputs
  reactive_status <- reactiveVal("No task submitted yet.")
  output$status <- renderText(reactive_status())
  output$result <- renderPlot(task$result()$result[[1L]], width = 500)
  
  # crew controller
  controller <- crew_controller_local(workers = 4, seconds_idle = 10)
  controller$start()
  onStop(function() controller$terminate())
  
  # extended task to get completed results from the controller
  task <- ExtendedTask$new(function() controller$promise(mode = "one"))
  
  # button to submit a crew task
  observeEvent(input$task, {
    controller$push(
      command = run_task(),
      data = list(run_task = run_task),
      packages = "aRtsy"
    )
    task$invoke()
    reactive_status(status_message(n = length(controller$tasks)))
  })
  
  # Refresh the status every second and when a task starts or completes.
  observe({
    invalidateLater(millis = 1000)
    reactive_status(status_message(n = length(controller$tasks)))
  })
}

shinyApp(ui = ui, server = server)

On second thought, updating the docs now would help prepare for conferences. I will update the example app and publish a new release of crew today.

I have actually noticed that reactive expressions with extended tasks intermittently stop invalidating under heavy load: shikokuchuo/mirai#118. From crew's perspective, I think it makes sense to emphasize the polling version of the coin flip app and label the promise-based functionality as experimental.