shikokuchuo/mirai

Load-testing with TLS

wlandau opened this issue ยท 28 comments

In wlandau/crew@686f86b, I implemented an interface in crew to leverage the new TLS capabilities in nanonext and mirai. It's really simple on my end, which is great. But I do notice some new problems in load-testing. When I tried the test below (also at https://github.com/wlandau/crew/blob/main/tests/mirai/test-tls-max_tasks.R) using mirai 0.9.0.9013 and nanonext 0.9.0.29, the task loop hangs, and eventually daemons() times out. In addition, daemons(0L) did not terminate the dispatcher or servers.

library(crew)
library(mirai)

# Implements throttling to avoid overburdening the {mirai} dispatcher.
throttler <- crew::crew_schedule()

# Efficient and convenient data structure to keep track of {mirai} tasks.
schedule <- crew::crew_schedule()
schedule$start()

# Start the {mirai} client.
n <- 20L
mirai::daemons(
  n = n,
  url = "wss://127.0.0.1:5000",
  dispatcher = TRUE,
  token = TRUE
)

# Mutable structure with {crew} worker info. This is the primary
# data structure of each {crew} launcher.
workers <- new.env(parent = emptyenv()) # For mutability.
workers$workers <- tibble::tibble(
  handle = replicate(n, new.env(), simplify = FALSE), # callr::r_bg() handles
  socket = environment(mirai::daemons)$..$default$urls, # starting URLs
  launches = rep(0L, n), # number of times a worker was launched at this index
  launched = rep(FALSE, n), # FALSE if the worker is definitely done.
  assigned = rep(0L, n), # Cumulative "assigned" stat to check backlog (#79).
  complete = rep(0L, n) # Cumulative "complete" stat to check backlog (#79).
)

# For {mirai} servers with online == 0L and instance == 1L,
# rotate the websocket URL. Also set workers$launched to FALSE,
# which signals that tally() can safely update the cumulative
# "assigned" and "complete" statistics (#79).
rotate <- function(workers) {
  info <- mirai::daemons()$daemons
  done <- which(info[, "online"] < 1L & info[, "instance"] > 0L)
  for (index in done) {
    socket <- mirai::saisei(i = index, force = FALSE)
    if (!is.null(socket)) {
      workers$workers$socket[index] <- socket # Next launch is at this URL.
      workers$workers$launched[index] <- FALSE # Lets tally() update stats.
    }
  }
}

# For workers that are definitely done and not going to dial in until the
# next launch, update the cumulative "assigned" and "complete" which {crew}
# uses to detect backlogged workers (#79). A backlogged worker is a {mirai}
# server with more assigned than complete tasks. Detecting the backlog
# is important because if a worker is disconnected and backlogged,
# then {crew} will need to relaunch it so the backlogged tasks can run.
tally <- function(workers) {
  info <- mirai::daemons()$daemons
  index <- !(workers$workers$launched) # Workers safe to update.
  workers$workers$assigned[index] <- as.integer(info[index, "assigned"])
  workers$workers$complete[index] <- as.integer(info[index, "complete"])
  invisible()
}

# In {crew}, the scale() method of the launcher class
# re-launches all backlogged non-launched workers,
# and then it may launch additional non-launched workers
# in order to meet the demand of the task load.
# The scale() function below is a simplified version which launches
# all non-launched workers.
scale <- function(workers) {
  for (index in which(!workers$workers$launched)) { # non-launched workers
    # I would have used mirai::launch_server() here, but callr::r_bg()
    # allows me to manually terminate the server without calling
    # mirai::daemons(n = 0L). This is important for updating the final
    # assigned and complete tallies later on.
    workers$workers$handle[[index]] <- callr::r_bg(
      func = function(url, tls) {
        mirai::server(
          url = url,
          tls = tls,
          maxtasks = 100L
        )
      },
      args = list(
        url = workers$workers$socket[index],
        tls = environment(mirai::daemons)$..$default$tls$client
      )
    )
    # Increment the launch count.
    workers$workers$launches[index] <- workers$workers$launches[index] + 1L
    # Signal to tally() to wait for this worker to complete
    # instead of updating the cumulative assigned and complete stats.
    workers$workers$launched[index] <- TRUE
  }
}

index <- 0L # current task
n_tasks <- 6000L # all tasks
results <- list()
while (index < n_tasks || schedule$nonempty()) { # while there is work to do
  if (!throttler$throttle()) { # avoid overburdening the {mirai} dispatcher
    rotate(workers) # Rotate the URLs of done workers.
    tally(workers) # Update the cumulative stats for done workers.
    scale(workers) # Re-launch all the done workers.
  }
  # If there are still tasks to launch, launch one.
  if (index < n_tasks) {
    index <- index + 1L
    cat("push", index, "\n")
    task <- mirai(index, index = index)
    # The "schedule" is nothing fancy for the purposes of #88 and #89,
    # it is just a fast data structure for bookkeeping {mirai} objects
    # without the other frills in {crew}.
    schedule$push(task)
  }
  # Try to process the results of finished tasks.
  if (schedule$nonempty()) { # If there are still tasks to process...
    # Call nanonext::.unresolved() and move resolved tasks
    # from the hash table in schedule$pushed to the first-in/first-out
    # linked list in schedule$collected.
    schedule$collect()
    task <- schedule$pop() # Return a task that was resolved and collected.
    # pop() returns NULL if there is no resolved/collected task.
    if (!is.null(task)) {
      data <- task$data
      results[[data]] <- data
      cat("pop", data, "\n")
    }
  }
}

# Terminate the dispatcher.
daemons(n = 0L)

# Manually terminate any remaining workers.
for (handle in workers$workers$handle) {
  if (inherits(handle, "r_process") && handle$is_alive()) {
    handle$kill()
  }
}

# Check the results.
all(sort(as.integer(unlist(results))) == seq_len(n_tasks))

The simple persistent-worker load test below (also at https://github.com/wlandau/crew/blob/main/tests/mirai/test-tls-persistent.R) behaves a bit better. The tasks seem to complete, though they do take much longer to collect (probably because of encryption, right?). But neither daemons(0L) nor quitting the R session terminates the dispatcher or servers.

library(crew)
library(mirai)

# Efficient and convenient data structure to keep track of {mirai} tasks.
# It has a hash table for new tasks and a first-in/first-out linked list
# for resolved tasks. It calls nanonext::.unresolved() to collect resolved
# tasks, but otherwise it does not rely on {mirai}/{nanonext}. I highly doubt
# it is the source of the {crew} bugs in #88 or #89.
schedule <- crew::crew_schedule()
schedule$start()

# Start the {mirai} client and servers with TLS.
daemons(n = 20L, url = "wss://127.0.0.1:0", dispatcher = TRUE, token = TRUE)
codes <- lapply(environment(daemons)$..$default$urls, launch_server)

# Run the tasks.
index <- 0L # current task
n_tasks <- 6000L # all tasks
results <- list()
while (index < n_tasks || schedule$nonempty()) { # while there is work to do
  # If there are still tasks to launch, launch one.
  if (index < n_tasks) {
    index <- index + 1L
    cat("push", index, "\n")
    task <- mirai(index, index = index)
    # The "schedule" is nothing fancy for the purposes of #88 and #89,
    # it is just a fast data structure for bookkeeping {mirai} objects
    # without the other frills in {crew}.
    schedule$push(task)
  }
  # Try to process the results of finished tasks.
  if (schedule$nonempty()) { # If there are still tasks to process...
    # Call nanonext::.unresolved() and move resolved tasks
    # from the hash table in schedule$pushed to the first-in/first-out
    # linked list in schedule$collected.
    schedule$collect()
    task <- schedule$pop() # Return a task that was resolved and collected.
    # pop() returns NULL if there is no resolved/collected task.
    if (!is.null(task)) {
      data <- task$data
      results[[data]] <- data
      cat("pop", data, "\n")
    }
  }
}

# Should be TRUE
all(sort(unlist(results)) == seq_len(n_tasks))

# Clean up the dispatcher.
daemons(n = 0L)

I tried again with mirai 0.9.0.9014 and nanonext 0.9.0.9031, and in the second (simpler) test, I saw the same hanging I originally found in the first test. daemons(0) and q() still do not terminate the dispatcher or servers. Results are consistent on my local Ubuntu desktop and my M2 MacBook.

I haven't had the chance to go much in depth yet, but I think the issue here is in launching all the servers simultaneously.

It seems that TLS can't handle this asynchronously. I have found that a sleep of 0.1s between launches works. Until I've found a better solution / the root cause - I think we have to treat TLS a bit differently.

We also shouldn't expect the same performance in general.

Can you try making launches synchronous and with an interval of at least [0.1]s? Then it should hopefully all behave correctly, even if this makes it a bit less performant out of the box.

If this is the issue then I can take a deeper look at how the servers dial in (it is a bit different to the normal case).

The 0.1s delay seems to have solved all the problems I mentioned! And now it seems TLS only barely impacts the performance of tasks.

I wonder if a similar issue could explain wlandau/crew#76. Those tests only have 1 worker, but what if the client is calling daemons() at the precise moment a server tries to dial in?

mirai 0.9.0.9015 performs a bit better in this respect. I have replicated the tlsConfig object one for each listener, so there is no contention on the same object. Unfortunately it only seems to solve the issue for the tls+tcp:// transport.

From testing, the wss:// transport is still affected but less so - for me now a 0.05s interval seems to be ok. Must be something about how websocket sharing the same port still leads to some kind of contention + deadlock, whereas different ports now work fine.

Just to note that in mirai 0.9.0.9016 I have made launch_server() safe with a 100ms sleep just for wss, like you have implemented in crew. This does mean that for tests using launch_server() additional delays are not necessary.

Currently it is safe to do for all transports:

lapply(cpi("urls"), launch_server)

I tried https://github.com/wlandau/crew/blob/d82b7805a4fcc608e4774e6c921131fd5c932eba/tests/mirai/test-tls-max_tasks.R again on my M2 MacBook, and with mirai 0.9.0.9016 and nano next 0.9.0.9032, I still see hanging tasks, 'errorValue' int 5 | Timed out from daemons(), and no termination on daemons(0L). Not all the time, but still some of the time.

I tried https://github.com/wlandau/crew/blob/d82b7805a4fcc608e4774e6c921131fd5c932eba/tests/mirai/test-tls-max_tasks.R again on my M2 MacBook, and with mirai 0.9.0.9016 and nano next 0.9.0.9032, I still see hanging tasks, 'errorValue' int 5 | Timed out from daemons(), and no termination on daemons(0L). Not all the time, but still some of the time.

My commit 523f27f fixes this issue.

Synchronous launches are still required for wss://. I don't think this is too much of an issue at the moment, as performance while running is actually quite good as you found.

Alternatively, tls+tcp:// seems to cope with simultaneous launches. This could, if necessary, be made a backup choice when using TLS - it just uses more ports.

Unfortunately even with 523f27f I am still sometimes seeing hanging tasks and daemons() timeouts. daemons(0) seems to work better though in the couple times I have retried https://github.com/wlandau/crew/blob/d82b7805a4fcc608e4774e6c921131fd5c932eba/tests/mirai/test-tls-max_tasks.R.

Also, on reflection, I am not sure a 0.1s launch timeout would make dials synchronous in the general case I have in mind for crew. For local processes it might work, but if workers launch and dial in from AWS Batch jobs, e.g. the Docker containers might take different amounts of time to spin up, and a couple might try to dial in at the same moment. I am not sure I can control that on my end.

That's strange. I thought I'd fixed as I'm not seeing this on my Ubuntu system. Are you able to test on yours?

Can you confirm if it works fine if you switch the wss to tls+tcp? That may be the price we have to pay to use TLS. It would help narrow down the issue in any case, help confirm if it is limited to wss. Thanks.

Sorry - I just did not run it long enough - I did eventually get a hang. This would have happened when the listener was rotated. So I have implemented a fix by re-generating the tlsConfig object when this happens, which seems to fix the hangs. mirai 0.9.0.9018.

Confirming if tls+tcp works would still be useful. I think the current script doesn't work as the rotation logic needs to be amended as a TCP socket address stays the same and saisei() does not change it.

Also, on reflection, I am not sure a 0.1s launch timeout would make dials synchronous in the general case I have in mind for crew. For local processes it might work, but if workers launch and dial in from AWS Batch jobs, e.g. the Docker containers might take different amounts of time to spin up, and a couple might try to dial in at the same moment. I am not sure I can control that on my end.

Comment: The 0.1s includes quite a generous buffer, so that's not the real window. I would say that the statistical chances of a real collision, especially if there are other (stochastic) factors in play, are incredibly small.

There is a 100% safe alternative - which would be to launch workers one at a time, and only launch the next one after daemons() shows the previous launch is online.

There is a 100% safe alternative - which would be to launch workers one at a time, and only launch the next one after daemons() shows the previous launch is online.

That may be okay for local processes which launch quickly, but e.g. jobs on a busy cluster or a heavy EC2 instance could take several minutes to launch, so I am not sure that would be feasible.

Comment: The 0.1s includes quite a generous buffer, so that's not the real window. I would say that the statistical chances of a real collision, especially if there are other (stochastic) factors in play, are incredibly small.

Good point, the chance of a collision should be small for on-demand cloud workers because delays at launch would probably make launch times more variable. For a cluster though, I sometimes observe a different kind of situation: I submit 50 jobs to the queue, they all wait because the cluster is busy and then a huge section of the cluster is suddenly free and my jobs begin all at once.

I think I could mitigate this somewhat in crew::crew_worker() with a random delay just before the call to mirai::server(). I think that would catch most cases. But for the small number of TLS handshakes that still fail, would it be possible to retry at the level of mirai::server() if the main connection succeeded? And if multiple handshakes fail, exit the server and register instance = 1? In the latter case, crew will detect this and simply re-launch the server.

I like the random delay idea - hopefully that can minimise failures in the first instance. I am investigating the failure cases to see if there is a way of resolving them.

The new random delay in wlandau/crew@e7927c0 appears to help (using mirai 0.9.0.9019). 2 or 3 runs of https://github.com/wlandau/crew/blob/e7927c0e4ea4e2f333eecabcebe5b4c7801e88b2/tests/throughput/test-backlog-tasks_max.R completed fine, and then the next run had a hanging task (and interestingly, a worker that did not appear on htop but was reported as online by daemons()). daemons(0) worked in all runs I tried this time.

Hangs still? would you mind sharing your nanonext::nng_version()? I am making some progress...

Only occasionally in these aggressive load tests, and only with TLS. nng_version() reports [1] "1.6.0pre" "mbed TLS 3.4.0".

OK - as I mentioned earlier the deadlock appears to be at the NNG level and the R session actually moves forward, allowing daemons() to succeed etc.

My conclusion after testing is that this issue is only present when using v3.x of mbedtls. NNG was actively developed for v2.x and although made compatible with the newer release, evidently there is some mismatch. The 2.28.x branch is the current LTS branch (and the one in most Linux repos), hence I will move to bundle this version with nanonext. There is no difference in security as it is still maintained, and NNG does not use any of the newer features in 3.x in any case.

Awesome! Happy to retest today or tomorrow, or when I am back from holiday, whenever you are ready.

Turns out it's the mbed configuration - the defaults just don't work! Fortunately the Debian packagers have done an excellent job here! I'll need more time to put it together, but I'm confident this will work. No need to wait for this - enjoy your hols!

Done in 0.9.0.9020. I have removed the mitigants that were added and I think you can on your side as well.

Thanks! It's definitely working much better now. I found no problems on my Linux machine with https://github.com/wlandau/crew/blob/main/tests/throughput/test-backlog-tasks_max.R. Still a couple hanging tasks on my MacBook though, along with terminated servers that still show online = 1 in daemons(). (And daemons(0) shuts down the dispatcher and most (sometimes all) of the servers.)

I know I like my aggressive load tests to explore edge cases, but I doubt something like this would come up in real life. I am testing 60000 tasks with 20 servers that rotate every 100 tasks, and those tasks are pretty much instantaneous.

Also, the load test still works if I forgo TLS.

Ok, for once I believe this is platform-specific now! To be clear, this issue was due to the bundled mbedtls library being built with the incorrect configuration. I take it from the above that this is actually fixed on Linux. If there is still an issue on MacOS then I'll try to find a working configuration down the line.

I agree, I think it is platform-dependent. Thanks for working on this. We are so close to a solution!

And yes, I think it is fixed on Linux.

Great! Thanks for confirming. I'll open a new issue to keep track of this for MacOS.