ExeTera Questions
NettaShemesh103 opened this issue · 3 comments
Hello ,
My name is Netta Shemesh and I’m 23 years old from Israel. Recently I have read your article “Accessible data curation and analytics for international-scale citizen science datasets” and I am doing a project for university (Israel Institute of Technology) on your paper.
I have several questions about ExeTera, the Python-based software package you wrote.
First, I would like to know if you are still working on the software, and have you succeeded in apply Dask array in your code?
Secondly, I haven’t understood precisely why the Dask DataFrame could not import the data and what are the exceptions that raise during the Artificial Joins that made the Dask program to fail.
In addition I was wondering why applying Dask array on your software will make all operations on ExeTera fields to become streaming by default.
Thank you in advance,
Netta Shemesh
Posting responses here for continuity with the issue:
Hi Netta,
Thank you for your interest in ExeTera. My apologies for my slightly delayed response; it has been a busy week and now I am catching up on emails.
Firstly, yes, we are still working on ExeTera.
ExeTera continues to have an application within our school and we are also looking for other applications that require the kind of scale it provides.
We haven't yet had the opportunity to rewrite the backend using dask, but we are in the process of scheduling the future work, including dask integration. I cannot give you a timescale for that right now, however.
I don't recall the precise errors that were raised during the dask benchmarking with the artificial dataset, but we tried a number of approaches to make it work and were unable to do so. I'll rerun the experiment for dask and then give you instructions on how to run the artificial join code in the evaluation repo so you can experiment for yourself (it will need a machine with a lot of RAM).
Using dask arrays means that we can make use of the dask compute engine, which builds directed graphs of the operations to be performed and parallelises / distributes them as appropriate. We need to implement a number of key operations ourselves as dask does not natively support the operations that we want to perform outside of dask dataframe, but its API permits the specification of custom graph fragments through which those can be implemented.
Are you able to give me more details about your project and its aims? I might be able to assist your enquiries more effectively if you can give me an overview of what you want to do.
Yours,
Ben
Here is a stacktrace from a failing dask merge scenario:
Traceback (most recent call last):
File "dask_join_scenario.py", line 32, in <module>
go(int(sys.argv[1]), int(sys.argv[2]))
File "dask_join_scenario.py", line 20, in go
m_df.to_hdf('d_m_df_{}_{}.hdf'.format(l_length, r_length), key='/data')
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/dataframe/core.py", line 1554, in to_hdf
return to_hdf(self, path_or_buf, key, mode, append, **kwargs)
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/dataframe/io/hdf.py", line 251, in to_hdf
compute_as_if_collection(
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/base.py", line 317, in compute_as_if_collection
return schedule(dsk2, keys, **kwargs)
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/local.py", line 552, in get_sync
return get_async(
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/local.py", line 495, in get_async
for key, res_info, failed in queue_get(queue).result():
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/concurrent/futures/_base.py", line 437, in result
return self.__get_result()
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/local.py", line 537, in submit
fut.set_result(fn(*args, **kwargs))
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/local.py", line 233, in batch_execute_tasks
return [execute_task(*a) for a in it]
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/local.py", line 233, in <listcomp>
return [execute_task(*a) for a in it]
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/local.py", line 224, in execute_task
result = pack_exception(e, dumps)
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/local.py", line 219, in execute_task
result = _execute_task(task, data)
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/core.py", line 119, in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/dataframe/io/hdf.py", line 27, in _pd_to_hdf
pd_to_hdf(*args, **kwargs)
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/core/generic.py", line 2763, in to_hdf
pytables.to_hdf(
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/io/pytables.py", line 314, in to_hdf
f(store)
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/io/pytables.py", line 282, in <lambda>
f = lambda store: store.append(
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/io/pytables.py", line 1262, in append
self._write_to_group(
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/io/pytables.py", line 1772, in _write_to_group
s.write(
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/io/pytables.py", line 4299, in write
table = self._create_axes(
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/io/pytables.py", line 3915, in _create_axes
blocks, blk_items = self._get_blocks_and_items(
File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/io/pytables.py", line 4076, in _get_blocks_and_items
raise ValueError(
ValueError: cannot match existing table structure for [l1,rpk,fk,r0,r1] on appending data
If you look at the last line, you have the ValueError where the table structure appears to be inconsistent for the subtables that dask generates while performing the merge. We noted that this occurred for some join sizes and not others. We were not able to find a solution to work around the issue at the time and we felt that it represented the kind of technical hurdle that a data analyst should not have to solve. Also, given the performance disparity between dask dataframe merge and our merge we also considered that further time trying to work around the issue was not productive.
To summarise, our view is that the dask dataframe implementation was a "low hanging fruit" implementation that gave dask dataframe like functionality, but is sufficiently problematic to be worth tackling from a completely different design direction.
You can clone the ExeTeraEval repository and do the following if you want to replicate the dask evaluation.
You'll need to run two commands:
# arguments: left row count, right row count, partition row count
python create_dask_join_scenario.py 20000000 200000000 2000000
# arguments: left row count, right row count
python dask_join_scenario.py 20000000 200000000
If you are using numbers like the above, I suggest you use a machine with a large amount of RAM (>64GB)