[Data>] Dataset write_csv AttributeError: ‘Worker’ object has no attribute 'core_worker'
China-JasonW opened this issue · 5 comments
What happened + What you expected to happen
When I use ray dataset write to csv with the 2.4.0, it will threw the error "AttributeError: 'Worker' object has no attribute 'core_worker".As is shown in the picture, it called ray is not initialized, but actually has initialized. But when I rolled back the version to 2.3.1, the program executed successfully
Versions / Dependencies
I use the ray client mode, the driver code ran on my mac system, the worker ran on linux system(centos 7.5). The ray version is 2.4.0. I run the program in debug mode of idea software,and submit the remote task to the remote ray cluster.
Reproduction script
class AnalysisService(IMLService):
def __init__(self, context: dict):
self.context = context
def init(self):
ray_add = "ray://192.168.101.21:10001"
ray.init(address=ray_add)
def start(self):
# Omit part of code, info_result is ray dataset
info_result = ray.get(transform.remote(self.context, fuc_infer, ds_test))
logger.info('The task has finished! {}', type(info_result))
# The exception is throw here
info_result.write_csv("/tmp/ddata")
logger.info("The result has {} records.", info_result.count())
Issue Severity
High: It blocks me from completing my task.
Given test.py:
import ray.data
ds1 = ray.data.range(100).groupby(lambda x: x % 3).count().show()
When using ray==2.4.0
on the client, and the rayproject/ray:2.4.0-py39-cpu
image on the cluster, then:
❯ python -m test
2023-05-20 05:19:49,091 INFO worker.py:1314 -- Using address ray://127.0.0.1:10001 set in the environment variable RAY_ADDRESS
2023-05-20 05:19:54,129 INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> AllToAllOperator[Aggregate]
2023-05-20 05:19:54,130 INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
Running: 0.0/6.0 CPU, 0.0/0.0 GPU, 0.0 MiB/536.34 MiB object_store_memory 0: 8%|████▏ | 1/13 [00:06<01:22, 6.90s/itTraceback (most recent call last):0.0 MiB objects, 0 output 1: 100%|█████████████████████████████████████████████████████████████████████| 13/13 [00:06<00:00, 6.90s/it]
File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main | 1/13 [00:06<01:18, 6.51s/it]
return _run_code(code, main_globals, None, | 1/13 [00:06<01:19, 6.63s/it]
File "/usr/lib/python3.9/runpy.py", line 87, in _run_code | 1/13 [00:06<01:21, 6.78s/it]
exec(code, run_globals)
File "/tmp/tekumara/code/ray-demo/rayexample/data/groupby.py", line 3, in <module>
ds1 = ray.data.range(100).groupby(lambda x: x % 3).count().show()
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/dataset.py", line 2295, in show
for row in self.take(limit):
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/dataset.py", line 2251, in take
for row in self.iter_rows():
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/dataset_iterator.py", line 232, in iter_rows
for batch in self.iter_batches(**iter_batch_args):
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/dataset_iterator.py", line 185, in iter_batches
yield from iter_batches(
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 179, in iter_batches
next_batch = next(async_batch_iter)
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 268, in make_async_gen
raise next_item
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 249, in execute_computation
for item in fn(thread_safe_generator):
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 170, in _async_iter_batches
yield from extract_data_from_batch(batch_iter)
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 193, in extract_data_from_batch
for batch in batch_iter:
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 307, in restore_original_order
for batch in batch_iter:
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 221, in threadpool_computations
yield from formatted_batch_iter
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 167, in format_batches
for batch in block_iter:
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 126, in blocks_to_batches
for block in block_iter:
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 64, in resolve_block_refs
current_hit, current_miss, current_unknown = _calculate_ref_hits([block_ref])
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/data/_internal/block_batching/util.py", line 41, in _calculate_ref_hits
locs = ray.experimental.get_object_locations(refs)
File "/tmp/tekumara/code/ray-demo/.venv/lib/python3.9/site-packages/ray/experimental/locations.py", line 38, in get_object_locations
return ray._private.worker.global_worker.core_worker.get_object_locations(
AttributeError: 'Worker' object has no attribute 'core_worker'
Interestingly though, if I start a local ray 2.4.0 instance it works:
❯ python -m test
2023-05-20 05:23:14,011 INFO worker.py:1616 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8266
2023-05-20 05:23:16,011 INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> AllToAllOperator[Aggregate]
2023-05-20 05:23:16,012 INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
Unfortunately, these cross platform client setups are not very robust. I would use both linux for both client and the server, or just not use Ray client and instead run directly on the cluster.
I'm using both Linux for the client and server, on the same machine.
ah sorry you said in the original message that driver is on mac and worker is on linux
I use the ray client mode, the driver code ran on my mac system, the worker ran on linux system(centos 7.5).
Recently I tested it on the linux cluster, both driver and worker on linux, it has the same error message.