ray-project/ray

[Core] Worker crashes unexpectedly due to frequent triggering of OOM

yx367563 opened this issue · 6 comments

What happened + What you expected to happen

I've triggered several OOMs when running Ray Tasks in batch by underestimating the amount of memory the program needs to use, and each time one or two tasks would fail or even all failed due to a unexpected worker crash, which is reproducible in almost every run.
The error message returned is: ray.exceptions.WorkerCrashedError: The worker died unexpectedly while executing this task. Check python-core-worker-*.log files for more information.
The error message in log is: Worker running the task (e9f8b6eafc16960cac9b9c694b6ec51091d001b1a343d81fcb664a17) died with exit_type: 0 with error_message: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors
There are also a few times when an error message will appear: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details:

I'm not sure whether the root cause of this error is frequent OOM, because OOM errors should have a retry mechanism.
In addition, I think it is difficult to accurately estimate the memory resources required by a task. Perhaps when scheduling tasks, the actual memory value used on each Worker Node can also be considered, instead of just deciding based on the memory parameters set by the user.

Versions / Dependencies

Ray 2.9.0
Python 3.10
KubeRay 1.0.0

Reproduction script

I have hidden and processed the specific content of the program. The general code is as follows, where total_memory is the memory size of a single Worker Node. There are two Worker Nodes in the Ray cluster deployed based on KubeRay.

import ray
import time

total_memory = 1 * 1024 * 1024 * 1024

@ray.remote()
def count_record():
    allocate_bytes = total_memory * 0.3
    record_list = [0] * ceil(allocate_bytes / 8)
    time.sleep(60)
    sum = 0
    for record in record_list:
        sum += record
    return sum

ray.init()
tasks_num = 6
results = []
for i in range(tasks_num):
    results.append(count_record1.options(memory=total_memory*0.1).remote(i))
ray.get(results)

Although in this example, you only need to adjust the memory parameter to total_memory*0.3 to complete the process normally, the program in actual use will obviously be more complex and difficult to accurately predict.

Issue Severity

Medium: It is a significant difficulty but I can work around it.