`map_elements` completely breaks after an error
Opened this issue · 7 comments
Hello,
I found a quite interesting bug that causes map_elements
to stop completely working after it encounters an error at any given point before the execution. This examplary code works well and as expected:
library(polars)
#> Warning: package 'polars' was built under R version 4.3.3
pl$DataFrame(mtcars[, 1:4])$select(
pl$all()$map_elements( \(x) x + 1 )
)$head()
#> shape: (5, 4)
#> ┌──────┬─────┬───────┬───────┐
#> │ mpg ┆ cyl ┆ disp ┆ hp │
#> │ --- ┆ --- ┆ --- ┆ --- │
#> │ f64 ┆ f64 ┆ f64 ┆ f64 │
#> ╞══════╪═════╪═══════╪═══════╡
#> │ 22.0 ┆ 7.0 ┆ 161.0 ┆ 111.0 │
#> │ 22.0 ┆ 7.0 ┆ 161.0 ┆ 111.0 │
#> │ 23.8 ┆ 5.0 ┆ 109.0 ┆ 94.0 │
#> │ 22.4 ┆ 7.0 ┆ 259.0 ┆ 111.0 │
#> │ 19.7 ┆ 9.0 ┆ 361.0 ┆ 176.0 │
#> └──────┴─────┴───────┴───────┘
However, if an error occurs inside map_elements
in an evaluation that happened before, the identical code stops working.
library(polars)
#> Warning: package 'polars' was built under R version 4.3.3
pl$DataFrame(mtcars[, 1:4])$select(
pl$all()$map_elements( \(x) stop() ) # Random error that might happen
)
#> Error: Execution halted with the following contexts
#> 0: In R: in $select()
#> 0: During function call [base::tryCatch(base::withCallingHandlers({
#> NULL
#> base::saveRDS(base::do.call(base::do.call, base::c(base::readRDS("C:\\Users\\lauerm\\AppData\\Local\\Temp\\RtmpILSP8N\\callr-fun-3e036195c72"),
#> base::list(envir = .GlobalEnv, quote = TRUE)), envir = .GlobalEnv,
#> quote = TRUE), file = "C:\\Users\\lauerm\\AppData\\Local\\Temp\\RtmpILSP8N\\callr-res-3e03be9166f",
#> compress = FALSE)
#> base::flush(base::stdout())
#> base::flush(base::stderr())
#> NULL
#> base::invisible()
#> }, error = function(e) {
#> {
#> callr_data <- base::as.environment("tools:callr")$`__callr_data__`
#> err <- callr_data$err
#> if (FALSE) {
#> base::assign(".Traceback", base::.traceback(4), envir = callr_data)
#> utils::dump.frames("__callr_dump__")
#> base::assign(".Last.dump", .GlobalEnv$`__callr_dump__`,
#> envir = callr_data)
#> base::rm("__callr_dump__", envir = .GlobalEnv)
#> }
#> e <- err$process_call(e)
#> e2 <- err$new_error("error in callr subprocess")
#> class <- base::class
#> class(e2) <- base::c("callr_remote_error", class(e2))
#> e2 <- err$add_trace_back(e2)
#> cut <- base::which(e2$trace$scope == "global")[1]
#> if (!base::is.na(cut)) {
#> e2$trace <- e2$trace[-(1:cut), ]
#> }
#> base::saveRDS(base::list("error", e2, e), file = base::paste0("C:\\Users\\lauerm\\AppData\\Local\\Temp\\RtmpILSP8N\\callr-res-3e03be9166f",
#> ".error"))
#> }
#> }, interrupt = function(e) {
#> {
#> callr_data <- base::as.environment("tools:callr")$`__callr_data__`
#> err <- callr_data$err
#> if (FALSE) {
#> base::assign(".Traceback", base::.traceback(4), envir = callr_data)
#> utils::dump.frames("__callr_dump__")
#> base::assign(".Last.dump", .GlobalEnv$`__callr_dump__`,
#> envir = callr_data)
#> base::rm("__callr_dump__", envir = .GlobalEnv)
#> }
#> e <- err$process_call(e)
#> e2 <- err$new_error("error in callr subprocess")
#> class <- base::class
#> class(e2) <- base::c("callr_remote_error", class(e2))
#> e2 <- err$add_trace_back(e2)
#> cut <- base::which(e2$trace$scope == "global")[1]
#> if (!base::is.na(cut)) {
#> e2$trace <- e2$trace[-(1:cut), ]
#> }
#> base::saveRDS(base::list("error", e2, e), file = base::paste0("C:\\Users\\lauerm\\AppData\\Local\\Temp\\RtmpILSP8N\\callr-res-3e03be9166f",
#> ".error"))
#> }
#> }, callr_message = function(e) {
#> base::try(base::signalCondition(e))
#> }), error = function(e) {
#> NULL
#> if (FALSE) {
#> base::try(base::stop(e))
#> }
#> else {
#> base::invisible()
#> }
#> }, interrupt = function(e) {
#> NULL
#> if (FALSE) {
#> e
#> }
#> else {
#> base::invisible()
#> }
#> })]
#> 1: user function raised an error: EvalError(lang!(function (s) { s$map_elements(f, return_type, strict_return_type, allow_fail_eval)}, ExternalPtr.set_class(["RPolarsSeries"]))
pl$DataFrame(mtcars[, 1:4])$select(
pl$all()$map_elements( \(x) x + 1 )
)$head()
#> Error in .pr$DataFrame$select(self, unpack_list(..., .context = "in $select()")): user function panicked: select
The first error is expected, however, the same function that was previously run now stops working. The only resolution is to restart the R session. I am using the latest polars version.
packageVersion("polars")
#> [1] '0.19.1'
Any guess why that might be? Thanks!
Hi, thanks for the report. I can reproduce but $map_elements()
is tricky so I don't know how to fix it yet
According to my observations when I implemented map_batches
in the next branch, this may be due to the following:
r-polars/src/rust/src/utils/extendr_concurrent.rs
Lines 219 to 224 in 0122e29
The thread will not terminate successfully and will not be able to reconnect thereafter.
It may be possible to fix this by having it terminate when an error occurs, as in the next branch:
r-polars/src/rust/src/r_threads.rs
Lines 159 to 163 in d5994dc
After a long time away polars with small kids and a new job. I was just going to give a small introduction on r-polars next week and I noticed this bug too. This is obviously very annoying :) I don't think it was always like this, but I cannot prove it.
I can try to take a look at it within the next month.
I see my use of initcell is not quite as intended by rust crate author. If one ThreadCom (link between rust-polars threads and single r session) crashed then it replaced with kill_global + update_global. I wildly guess the polars threads in the polars thread pool still link to the crashed global threadcom, because init_cell does not support safely mutating the global state. It probably worked in the past, but since this is undefined behavior it could fail at any point.
A cell which can be unsafely initialized or interiorly mutated, but safely accessed.
This is mostly intended for use in statics. The cell is safe to access, but must be initialized before any access. There is no synchronization to ensure initialization is observed, so you should initialize at the beginning of the main function or using something like the ctor crate.
The use of this global state, is to allow new oblivious threads spawned by rust-polars to look for and clone the current active functioning ThreadCom.
chatty_gippity says try once::sync::Lazy instead
use once_cell::sync::Lazy;
use std::sync::RwLock;
// Assuming ThreadCom<S, R> is defined elsewhere in your code
pub struct ThreadCom<S, R> {
// Your fields here
}
// Define the global state
static GLOBAL_STATE: Lazy<RwLock<Option<ThreadCom<S, R>>>> = Lazy::new(|| RwLock::new(None));
maybe something comepletely different ¯\_(ツ)_/¯
This is obviously very annoying :) I don't think it was always like this, but I cannot prove it.
It must have been this way for a long time, because it reproduces even in v0.9.0, the oldest version that can be easily installed today. (We need to use apply
instead of map_elements
)
This bug was not caused init_cell vs once_cell, swapping to once_cell changed nothing. However maybe that change should be adopted in another PR for tidyness sake.
It turns out to be plain bug in how user errors were handled and polars states reset.
If a user map_ function raises an R error. The R interpreter will return directly and not gracefully shut down the polars query including closing ("killing") the "ThreadCom" object (lets multiple polars threads share the single R interpreter).
This ThreadCom then survives in the global register (once_cell/init_cell) due to no gracefull shutdown, but will be defunct in next polars query hence bug. If I force the global register to be reset at every polars query, the bug goes away (solution 1).
However I vaguely remember that is a problem if calling a polars query within user function of a polars query. In that case the inner polars query should not reset global threadCom as it will sever communication for possible other map_ functions
My candidate (solution 2) is to implicitly wrap any R user function in some tryCatch to ensure graceful shut down of polars. This might have a performance loss of 1-5ms or so per R user function call.
Solution 3a. when ever new polars query recycles a ThreadCom from the global register, it could just check once that it works by running a simple function. That might take 1ms once only. If it does not work, it will reset it.
Solution 3b, it would be even faster with some 'rust only' verification of ThreadCom, but then I might to rewrite some function signatures to allow a non R request via threadCom. Maybe not worth the hazzle.
I will look into 3b -> 3a -> 2 or so
A cell which can be unsafely initialized or interiorly mutated, but safe
yikes I made a big mistake. I found the documentation of the wrong crate with very similar use case and naming.
This is the right docs and our current use seems not be discouraged.
https://docs.rs/state/0.6.0/state/struct.InitCell.html
I should probably revert back to InitCell from once_cell::sync::Lazy. Either behaves very similar and are drop in replacements.
#1295