H5py and Dask Distributed
emilyjcosta5 opened this issue · 12 comments
When I use Client to map a function to a Dask array made from an HDF5, the following error appears:
TypeError: can't pick _thread._local objects
Here is a simplified version of what I am trying to do:
import h5py
import numpy as np
from dask.distributed import Client
h5_f = h5py.File(h5_path, mode='r+')
client = Client()
#random 2d h5py dataset into Dask Array
arr = np.arange(100).reshape((10,10))
dset = h5_f.create_dataset("MyDataset", data=arr)
y = da.from_array(dset, chunks='auto')
#some function
def inc(x):
return x + 1
#client maps function, inc(), to dataset, y
#where error appears
L = client.map(inc, y)
#results
results = client.gather(L)
After some testing, I believe the issue to lay with HDF in a lazy dask array function, which perhaps is not pickle-able when used in the map() function.
I am trying to implement Dask into the pyUSID Python package, which is built on h5py, for spectroscopy and imaging computation. Therefore, I need to use Dask with HDF.
I am using Python person 3.7.3 on a MacBook Air with a 1.8 GHz Intel Core i7 (4-core) processor and 4 gb RAM.
Here is the traceback:
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
/anaconda3/lib/python3.7/site-packages/distributed/worker.py in dumps_function(func)
2728 try:
-> 2729 result = cache[func]
2730 except KeyError:
KeyError: <bound method SignalFilter._unit_computation of <dask_signal_filter.SignalFilter object at 0xa1579d128>>
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
/anaconda3/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x)
37 try:
---> 38 result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
39 if len(result) < 1000:
TypeError: can't pickle _thread._local objects
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
<ipython-input-10-b67304d97a62> in <module>
10 #L = client.map(sig_filt._unit_computation, sig_filt.data)
11 #L
---> 12 h5_filt_grp = sig_filt.compute(override=True)
13 #sig_filt.data
~/Downloads/daskUSID/signal_filter/dask_process.py in compute(self, override, *args, **kwargs)
414 print('Scheduler info: {}'.format(client.scheduler_info()))
415
--> 416 L = client.map(self._unit_computation, self.data, *args, **kwargs)
417 if self.verbose:
418 progress(L)
/anaconda3/lib/python3.7/site-packages/distributed/client.py in map(self, func, *iterables, **kwargs)
1437 user_priority=user_priority,
1438 fifo_timeout=fifo_timeout,
-> 1439 actors=actor)
1440 logger.debug("map(%s, ...)", funcname(func))
1441
/anaconda3/lib/python3.7/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
2259
2260 self._send_to_scheduler({'op': 'update-graph',
-> 2261 'tasks': valmap(dumps_task, dsk3),
2262 'dependencies': dependencies,
2263 'keys': list(flatkeys),
/anaconda3/lib/python3.7/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/anaconda3/lib/python3.7/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/anaconda3/lib/python3.7/site-packages/distributed/worker.py in dumps_task(task)
2765 return d
2766 elif not any(map(_maybe_complex, task[1:])):
-> 2767 return {'function': dumps_function(task[0]),
2768 'args': warn_dumps(task[1:])}
2769 return to_serialize(task)
/anaconda3/lib/python3.7/site-packages/distributed/worker.py in dumps_function(func)
2729 result = cache[func]
2730 except KeyError:
-> 2731 result = pickle.dumps(func)
2732 if len(result) < 100000:
2733 cache[func] = result
/anaconda3/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x)
49 except Exception:
50 try:
---> 51 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
52 except Exception as e:
53 logger.info("Failed to serialize %s. Exception: %s", x, e)
/anaconda3/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
950 try:
951 cp = CloudPickler(file, protocol=protocol)
--> 952 cp.dump(obj)
953 return file.getvalue()
954 finally:
/anaconda3/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
265 self.inject_addons()
266 try:
--> 267 return Pickler.dump(self, obj)
268 except RuntimeError as e:
269 if 'recursion' in e.args[0]:
/anaconda3/lib/python3.7/pickle.py in dump(self, obj)
435 if self.proto >= 4:
436 self.framer.start_framing()
--> 437 self.save(obj)
438 self.write(STOP)
439 self.framer.end_framing()
/anaconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
/anaconda3/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_instancemethod(self, obj)
716 else:
717 if PY3: # pragma: no branch
--> 718 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
719 else:
720 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__),
/anaconda3/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
636 else:
637 save(func)
--> 638 save(args)
639 write(REDUCE)
640
/anaconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
/anaconda3/lib/python3.7/pickle.py in save_tuple(self, obj)
769 if n <= 3 and self.proto >= 2:
770 for element in obj:
--> 771 save(element)
772 # Subtle. Same as in the big comment below.
773 if id(obj) in memo:
/anaconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
547
548 # Save the reduce() output and finally memoize the object
--> 549 self.save_reduce(obj=obj, *rv)
550
551 def persistent_id(self, obj):
/anaconda3/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
660
661 if state is not None:
--> 662 save(state)
663 write(BUILD)
664
/anaconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
/anaconda3/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict
/anaconda3/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:
/anaconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
547
548 # Save the reduce() output and finally memoize the object
--> 549 self.save_reduce(obj=obj, *rv)
550
551 def persistent_id(self, obj):
/anaconda3/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
660
661 if state is not None:
--> 662 save(state)
663 write(BUILD)
664
/anaconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
/anaconda3/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict
/anaconda3/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:
/anaconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
522 reduce = getattr(obj, "__reduce_ex__", None)
523 if reduce is not None:
--> 524 rv = reduce(self.proto)
525 else:
526 reduce = getattr(obj, "__reduce__", None)
TypeError: can't pickle _thread._local objects
You appear to be mixing the APIs of dask.array and futures. The array y
is a prescription of dask tasks to be performed, not concrete values, so you do not want to try to ship this to workers like this.
What you wanted was:
(y + 1).compute()
However, h5py file objects are tricky at the best of times, so are are well advised to split your reading and writing operations:
with h5py.File('here.hdf', mode='w') as f:
f.create_dataset("MyDataset", data=arr)
f = h5py.File('here.hdf', mode='r')
y = da.from_array(f['MyDataset'], chunks='auto')
Thank you for the quick response. I am wanting to use Client(), this was just a simplified way of expressing how I am trying to implement Dask. I am trying to implement it into some functions of an already completed Python package. The user of the package defines the _unit_computation() for each unit in an n-dim dataset. The goal is the map the unit computation to every unit in the dataset and automate the parallel execution for usability. I prefer to use client.map() over creating a for-loop and then calling compute() function. I found it to not work well with my program. I found using client to be most useful. Should I not turn the HDF into a Dask array? If so, what data frame should I use?
Right, so there are two issues here:
- h5py dataset objects aren't serializable, so your tasks will have to open and close the files each time. I think we actually special case h5py objects in dask.distributed if you follow standard channels (see example below)
- As @martindurant suggests, you're mixing collections and futures in an odd way. Given what you say above I'm guessing that you're looking for
x.map_blocks(inc)
rather thanclient.map(inc, y)
(which doesn't do what you think it does).
I hope that this example helps:
from dask.distributed import Client
client = Client()
import h5py
import numpy as np
with h5py.File('foo.h5', 'w') as f:
dset = f.create_dataset('x', shape=(10, 10), dtype=float, chunks=(5, 5))
dset[:] = 1
f = h5py.File('foo.h5', 'r')
dset = f['x']
import dask.array as da
x = da.from_array(dset, chunks=dset.chunks)
def inc(x):
return x + 1
y = x.map_blocks(inc)
>>> y.compute()
array([[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.]])
You may also want to look at the Xarray project, which uses Dask array extensively, and has lots of experience dealing with HDF5 files.
@emilyjcosta5 does the answer above help?
@mrocklin Unfortunately, I am still running into trouble with pickling. The data already exists, and I need to be able to write the results back to the file so getting the main dataset looks something like this:
h5_f = h5py.File(h5_path, mode='r+')
h5_grp = h5_f['Measurement_000/Channel_000']
h5_main = h5_grp['Raw_Data']
then I proceed to create the Dask array:
self.data = da.from_array(self.h5_main, chunks='auto')
then I process the data:
client = Client(processes=False)
results = self.data.map_blocks(self._unit_computation, dtype=self.data.dtype, *args, **kwargs)
data = results.compute()
The _unit_compute() is a signal filter for chunks of data. Simplified, looks like this:
def _unit_computation(self, chunk, *args, **kwargs):
"""
Processing per chunk of the dataset
"""
# get FFT of the entire data chunk
chunk = np.fft.fftshift(np.fft.fft(chunk, axis=1), axes=1)
return chunk
I don't believe the _unit_compute() function to be the issue. When I make self.data a random Dask array, it works.
Try things in read only mode.
I did and I have the same error.
TypeError Traceback (most recent call last)
/anaconda3/lib/python3.6/site-packages/distributed/worker.py in dumps_function(func)
3039 try:
-> 3040 result = cache[func]
3041 except KeyError:
/anaconda3/lib/python3.6/site-packages/zict/lru.py in getitem(self, key)
47 def getitem(self, key):
---> 48 result = self.d[key]
49 self.i += 1
TypeError: unhashable type: 'SubgraphCallable'
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
/anaconda3/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
39 try:
---> 40 result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
41 if len(result) < 1000:
TypeError: can't pickle _thread._local objects
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
in
----> 1 h5_filt_grp = sig_filt.compute(override=True)
~/Downloads/daskUSID/daskUSID/signal_filter/dask_process.py in compute(self, override, *args, **kwargs)
415 data = self.data
416 results = data.map_blocks(self._unit_computation, dtype=data.dtype, *args, **kwargs)
--> 417 data = results.compute()
418 self.data = data
419 #self._write_results_chunk()
/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
396 keys = [x.dask_keys() for x in collections]
397 postcomputes = [x.dask_postcompute() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
/anaconda3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2555 retries=retries,
2556 user_priority=priority,
-> 2557 actors=actors,
2558 )
2559 packed = pack_data(keys, futures)
/anaconda3/lib/python3.6/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
2482 {
2483 "op": "update-graph",
-> 2484 "tasks": valmap(dumps_task, dsk3),
2485 "dependencies": dependencies,
2486 "keys": list(flatkeys),
/anaconda3/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/anaconda3/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/anaconda3/lib/python3.6/site-packages/distributed/worker.py in dumps_task(task)
3075 return d
3076 elif not any(map(_maybe_complex, task[1:])):
-> 3077 return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
3078 return to_serialize(task)
3079
/anaconda3/lib/python3.6/site-packages/distributed/worker.py in dumps_function(func)
3044 cache[func] = result
3045 except TypeError:
-> 3046 result = pickle.dumps(func)
3047 return result
3048
/anaconda3/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
51 except Exception:
52 try:
---> 53 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
54 except Exception as e:
55 logger.info("Failed to serialize %s. Exception: %s", x, e)
/anaconda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
1095 try:
1096 cp = CloudPickler(file, protocol=protocol)
-> 1097 cp.dump(obj)
1098 return file.getvalue()
1099 finally:
/anaconda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
355 self.inject_addons()
356 try:
--> 357 return Pickler.dump(self, obj)
358 except RuntimeError as e:
359 if 'recursion' in e.args[0]:
/anaconda3/lib/python3.6/pickle.py in dump(self, obj)
407 if self.proto >= 4:
408 self.framer.start_framing()
--> 409 self.save(obj)
410 self.write(STOP)
411 self.framer.end_framing()
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
/anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
608 else:
609 save(func)
--> 610 save(args)
611 write(REDUCE)
612
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/anaconda3/lib/python3.6/pickle.py in save_tuple(self, obj)
749 write(MARK)
750 for element in obj:
--> 751 save(element)
752
753 if id(obj) in memo:
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/anaconda3/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
/anaconda3/lib/python3.6/pickle.py in _batch_setitems(self, items)
850 k, v = tmp[0]
851 save(k)
--> 852 save(v)
853 write(SETITEM)
854 # else tmp is empty, and we're done
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/anaconda3/lib/python3.6/pickle.py in save_tuple(self, obj)
734 if n <= 3 and self.proto >= 2:
735 for element in obj:
--> 736 save(element)
737 # Subtle. Same as in the big comment below.
738 if id(obj) in memo:
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/anaconda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_instancemethod(self, obj)
861 else:
862 if PY3: # pragma: no branch
--> 863 self.save_reduce(types.MethodType, (obj.func, obj.self), obj=obj)
864 else:
865 self.save_reduce(types.MethodType, (obj.func, obj.self, obj.self.class),
/anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
608 else:
609 save(func)
--> 610 save(args)
611 write(REDUCE)
612
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/anaconda3/lib/python3.6/pickle.py in save_tuple(self, obj)
734 if n <= 3 and self.proto >= 2:
735 for element in obj:
--> 736 save(element)
737 # Subtle. Same as in the big comment below.
738 if id(obj) in memo:
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
/anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
632
633 if state is not None:
--> 634 save(state)
635 write(BUILD)
636
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/anaconda3/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
/anaconda3/lib/python3.6/pickle.py in _batch_setitems(self, items)
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
/anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
632
633 if state is not None:
--> 634 save(state)
635 write(BUILD)
636
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/anaconda3/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
/anaconda3/lib/python3.6/pickle.py in _batch_setitems(self, items)
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
494 reduce = getattr(obj, "reduce_ex", None)
495 if reduce is not None:
--> 496 rv = reduce(self.proto)
497 else:
498 reduce = getattr(obj, "reduce", None)
TypeError: can't pickle _thread._local objects
I'm sorry, I don't know what sig_filt is. You have something that isn't serializable. I'm not sure what else I can do to help. Good luck!
Okay, thank you.
I still recommend the following approach: #2787 (comment)
Interesting. I don't know then. Perhaps work from my example (verify that
it works first) and then change things towards yours until something
breaks? Maybe that helps to narrow things down?
In general, lots of things can make a function not serializable. Certainly depending on an open file handle such as you have in your example would be one reasonable cause. I think that making h5py objects serializable is outside of the scope of Dask. There are a variety of workarounds such as the example I showed, opening and closing the file every time in a task, using a nicer format for serialization, like Zarr, or using a project like Xarray. I'm going to go ahead and close this now. Again, good luck!