ray-project/ray

[Data>] Dataset write_csv AttributeError: ‘Worker’ object has no attribute 'core_worker'

China-JasonW opened this issue · 5 comments

What happened + What you expected to happen

When I use ray dataset write to csv with the 2.4.0, it will threw the error "AttributeError: 'Worker' object has no attribute 'core_worker".As is shown in the picture, it called ray is not initialized, but actually has initialized. But when I rolled back the version to 2.3.1, the program executed successfully

Versions / Dependencies

I use the ray client mode, the driver code ran on my mac system, the worker ran on linux system(centos 7.5). The ray version is 2.4.0. I run the program in debug mode of idea software,and submit the remote task to the remote ray cluster.

Reproduction script

class AnalysisService(IMLService):

    def __init__(self, context: dict):
        self.context = context

    def init(self):
        ray_add = "ray://192.168.101.21:10001"
        ray.init(address=ray_add)  

    def start(self):
        # Omit part of code, info_result is ray dataset
        info_result = ray.get(transform.remote(self.context, fuc_infer, ds_test))
        logger.info('The task has finished! {}', type(info_result))

        # The exception is throw here
        info_result.write_csv("/tmp/ddata")
        logger.info("The result has {} records.", info_result.count())

Issue Severity

High: It blocks me from completing my task.

Given test.py:

import ray.data

ds1 = ray.data.range(100).groupby(lambda x: x % 3).count().show()

When using ray==2.4.0 on the client, and the rayproject/ray:2.4.0-py39-cpu image on the cluster, then:

❯ python -m test
2023-05-20 05:19:49,091 INFO worker.py:1314 -- Using address ray://127.0.0.1:10001 set in the environment variable RAY_ADDRESS
2023-05-20 05:19:54,129 INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> AllToAllOperator[Aggregate]
2023-05-20 05:19:54,130 INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
Running: 0.0/6.0 CPU, 0.0/0.0 GPU, 0.0 MiB/536.34 MiB object_store_memory 0:   8%|████▏                                                  | 1/13 [00:06<01:22,  6.90s/itTraceback (most recent call last):0.0 MiB objects, 0 output 1: 100%|█████████████████████████████████████████████████████████████████████| 13/13 [00:06<00:00,  6.90s/it]
  File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main                                                                   | 1/13 [00:06<01:18,  6.51s/it]
    return _run_code(code, main_globals, None,                                                                                           | 1/13 [00:06<01:19,  6.63s/it]
  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code                                                                              | 1/13 [00:06<01:21,  6.78s/it]
    exec(code, run_globals)
  File "/tmp/tekumara/code/ray-demo/rayexample/data/groupby.py", line 3, in <module>
    ds1 = ray.data.range(100).groupby(lambda x: x % 3).count().show()
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/dataset.py", line 2295, in show
    for row in self.take(limit):
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/dataset.py", line 2251, in take
    for row in self.iter_rows():
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/dataset_iterator.py", line 232, in iter_rows
    for batch in self.iter_batches(**iter_batch_args):
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/dataset_iterator.py", line 185, in iter_batches
    yield from iter_batches(
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 179, in iter_batches
    next_batch = next(async_batch_iter)
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 268, in make_async_gen
    raise next_item
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 249, in execute_computation
    for item in fn(thread_safe_generator):
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 170, in _async_iter_batches
    yield from extract_data_from_batch(batch_iter)
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 193, in extract_data_from_batch
    for batch in batch_iter:
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 307, in restore_original_order
    for batch in batch_iter:
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 221, in threadpool_computations
    yield from formatted_batch_iter
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 167, in format_batches
    for batch in block_iter:
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 126, in blocks_to_batches
    for block in block_iter:
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 64, in resolve_block_refs
    current_hit, current_miss, current_unknown = _calculate_ref_hits([block_ref])
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 41, in _calculate_ref_hits
    locs = ray.experimental.get_object_locations(refs)
  File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/experimental/locations.py", line 38, in get_object_locations
    return ray._private.worker.global_worker.core_worker.get_object_locations(
AttributeError: 'Worker' object has no attribute 'core_worker'

Interestingly though, if I start a local ray 2.4.0 instance it works:

❯ python -m test
2023-05-20 05:23:14,011 INFO worker.py:1616 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8266 
2023-05-20 05:23:16,011 INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> AllToAllOperator[Aggregate]
2023-05-20 05:23:16,012 INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)

Unfortunately, these cross platform client setups are not very robust. I would use both linux for both client and the server, or just not use Ray client and instead run directly on the cluster.

I'm using both Linux for the client and server, on the same machine.

ah sorry you said in the original message that driver is on mac and worker is on linux

I use the ray client mode, the driver code ran on my mac system, the worker ran on linux system(centos 7.5).

Recently I tested it on the linux cluster, both driver and worker on linux, it has the same error message.