dask/distributed

H5py and Dask Distributed

emilyjcosta5 opened this issue · 12 comments

When I use Client to map a function to a Dask array made from an HDF5, the following error appears:

TypeError: can't pick _thread._local objects

Here is a simplified version of what I am trying to do:

import h5py
import numpy as np
from dask.distributed import Client

h5_f = h5py.File(h5_path, mode='r+')

client = Client()

#random 2d h5py dataset into Dask Array
arr = np.arange(100).reshape((10,10))
dset = h5_f.create_dataset("MyDataset", data=arr)
y = da.from_array(dset, chunks='auto')

#some function
def inc(x):
    return x + 1

#client maps function, inc(), to dataset, y
#where error appears
L = client.map(inc, y)

#results
results = client.gather(L)

After some testing, I believe the issue to lay with HDF in a lazy dask array function, which perhaps is not pickle-able when used in the map() function.

I am trying to implement Dask into the pyUSID Python package, which is built on h5py, for spectroscopy and imaging computation. Therefore, I need to use Dask with HDF.

I am using Python person 3.7.3 on a MacBook Air with a 1.8 GHz Intel Core i7 (4-core) processor and 4 gb RAM.

Here is the traceback:

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
/anaconda3/lib/python3.7/site-packages/distributed/worker.py in dumps_function(func)
   2728     try:
-> 2729         result = cache[func]
   2730     except KeyError:

KeyError: <bound method SignalFilter._unit_computation of <dask_signal_filter.SignalFilter object at 0xa1579d128>>

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
/anaconda3/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x)
     37     try:
---> 38         result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     39         if len(result) < 1000:

TypeError: can't pickle _thread._local objects

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-10-b67304d97a62> in <module>
     10 #L = client.map(sig_filt._unit_computation, sig_filt.data)
     11 #L
---> 12 h5_filt_grp = sig_filt.compute(override=True)
     13 #sig_filt.data

~/Downloads/daskUSID/signal_filter/dask_process.py in compute(self, override, *args, **kwargs)
    414             print('Scheduler info: {}'.format(client.scheduler_info()))
    415 
--> 416         L = client.map(self._unit_computation, self.data, *args, **kwargs)
    417         if self.verbose:
    418             progress(L)

/anaconda3/lib/python3.7/site-packages/distributed/client.py in map(self, func, *iterables, **kwargs)
   1437                                          user_priority=user_priority,
   1438                                          fifo_timeout=fifo_timeout,
-> 1439                                          actors=actor)
   1440         logger.debug("map(%s, ...)", funcname(func))
   1441 

/anaconda3/lib/python3.7/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
   2259 
   2260             self._send_to_scheduler({'op': 'update-graph',
-> 2261                                      'tasks': valmap(dumps_task, dsk3),
   2262                                      'dependencies': dependencies,
   2263                                      'keys': list(flatkeys),

/anaconda3/lib/python3.7/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/anaconda3/lib/python3.7/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/anaconda3/lib/python3.7/site-packages/distributed/worker.py in dumps_task(task)
   2765             return d
   2766         elif not any(map(_maybe_complex, task[1:])):
-> 2767             return {'function': dumps_function(task[0]),
   2768                     'args': warn_dumps(task[1:])}
   2769     return to_serialize(task)

/anaconda3/lib/python3.7/site-packages/distributed/worker.py in dumps_function(func)
   2729         result = cache[func]
   2730     except KeyError:
-> 2731         result = pickle.dumps(func)
   2732         if len(result) < 100000:
   2733             cache[func] = result

/anaconda3/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x)
     49     except Exception:
     50         try:
---> 51             return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     52         except Exception as e:
     53             logger.info("Failed to serialize %s. Exception: %s", x, e)

/anaconda3/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
    950     try:
    951         cp = CloudPickler(file, protocol=protocol)
--> 952         cp.dump(obj)
    953         return file.getvalue()
    954     finally:

/anaconda3/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
    265         self.inject_addons()
    266         try:
--> 267             return Pickler.dump(self, obj)
    268         except RuntimeError as e:
    269             if 'recursion' in e.args[0]:

/anaconda3/lib/python3.7/pickle.py in dump(self, obj)
    435         if self.proto >= 4:
    436             self.framer.start_framing()
--> 437         self.save(obj)
    438         self.write(STOP)
    439         self.framer.end_framing()

/anaconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/anaconda3/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_instancemethod(self, obj)
    716         else:
    717             if PY3:  # pragma: no branch
--> 718                 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
    719             else:
    720                 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__),

/anaconda3/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    636         else:
    637             save(func)
--> 638             save(args)
    639             write(REDUCE)
    640 

/anaconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/anaconda3/lib/python3.7/pickle.py in save_tuple(self, obj)
    769         if n <= 3 and self.proto >= 2:
    770             for element in obj:
--> 771                 save(element)
    772             # Subtle.  Same as in the big comment below.
    773             if id(obj) in memo:

/anaconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

/anaconda3/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    660 
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)
    664 

/anaconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/anaconda3/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

/anaconda3/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

/anaconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

/anaconda3/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    660 
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)
    664 

/anaconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/anaconda3/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

/anaconda3/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

/anaconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    522             reduce = getattr(obj, "__reduce_ex__", None)
    523             if reduce is not None:
--> 524                 rv = reduce(self.proto)
    525             else:
    526                 reduce = getattr(obj, "__reduce__", None)

TypeError: can't pickle _thread._local objects

You appear to be mixing the APIs of dask.array and futures. The array y is a prescription of dask tasks to be performed, not concrete values, so you do not want to try to ship this to workers like this.

What you wanted was:

(y + 1).compute()

However, h5py file objects are tricky at the best of times, so are are well advised to split your reading and writing operations:

with h5py.File('here.hdf', mode='w') as f:
     f.create_dataset("MyDataset", data=arr)
f = h5py.File('here.hdf', mode='r')
y = da.from_array(f['MyDataset'], chunks='auto')

Thank you for the quick response. I am wanting to use Client(), this was just a simplified way of expressing how I am trying to implement Dask. I am trying to implement it into some functions of an already completed Python package. The user of the package defines the _unit_computation() for each unit in an n-dim dataset. The goal is the map the unit computation to every unit in the dataset and automate the parallel execution for usability. I prefer to use client.map() over creating a for-loop and then calling compute() function. I found it to not work well with my program. I found using client to be most useful. Should I not turn the HDF into a Dask array? If so, what data frame should I use?

Right, so there are two issues here:

  1. h5py dataset objects aren't serializable, so your tasks will have to open and close the files each time. I think we actually special case h5py objects in dask.distributed if you follow standard channels (see example below)
  2. As @martindurant suggests, you're mixing collections and futures in an odd way. Given what you say above I'm guessing that you're looking for x.map_blocks(inc) rather than client.map(inc, y) (which doesn't do what you think it does).

I hope that this example helps:

from dask.distributed import Client
client = Client()
import h5py
import numpy as np

with h5py.File('foo.h5', 'w') as f:
    dset = f.create_dataset('x', shape=(10, 10), dtype=float, chunks=(5, 5))
    dset[:] = 1

f = h5py.File('foo.h5', 'r')
dset = f['x']
import dask.array as da
x = da.from_array(dset, chunks=dset.chunks)

def inc(x):
    return x + 1

y = x.map_blocks(inc)

>>> y.compute()
array([[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
       [2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
       [2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
       [2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
       [2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
       [2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
       [2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
       [2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
       [2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
       [2., 2., 2., 2., 2., 2., 2., 2., 2., 2.]])

You may also want to look at the Xarray project, which uses Dask array extensively, and has lots of experience dealing with HDF5 files.

@emilyjcosta5 does the answer above help?

@mrocklin Unfortunately, I am still running into trouble with pickling. The data already exists, and I need to be able to write the results back to the file so getting the main dataset looks something like this:

h5_f = h5py.File(h5_path, mode='r+')
h5_grp = h5_f['Measurement_000/Channel_000']
h5_main = h5_grp['Raw_Data']

then I proceed to create the Dask array:
self.data = da.from_array(self.h5_main, chunks='auto')

then I process the data:

client = Client(processes=False)
results = self.data.map_blocks(self._unit_computation, dtype=self.data.dtype, *args, **kwargs)
data = results.compute()

The _unit_compute() is a signal filter for chunks of data. Simplified, looks like this:

    def _unit_computation(self, chunk, *args, **kwargs):
        """
        Processing per chunk of the dataset
        """
        # get FFT of the entire data chunk
        chunk = np.fft.fftshift(np.fft.fft(chunk, axis=1), axes=1)
        return chunk

I don't believe the _unit_compute() function to be the issue. When I make self.data a random Dask array, it works.

Try things in read only mode.

I did and I have the same error.

TypeError Traceback (most recent call last)
/anaconda3/lib/python3.6/site-packages/distributed/worker.py in dumps_function(func)
3039 try:
-> 3040 result = cache[func]
3041 except KeyError:

/anaconda3/lib/python3.6/site-packages/zict/lru.py in getitem(self, key)
47 def getitem(self, key):
---> 48 result = self.d[key]
49 self.i += 1

TypeError: unhashable type: 'SubgraphCallable'

During handling of the above exception, another exception occurred:

TypeError Traceback (most recent call last)
/anaconda3/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
39 try:
---> 40 result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
41 if len(result) < 1000:

TypeError: can't pickle _thread._local objects

During handling of the above exception, another exception occurred:

TypeError Traceback (most recent call last)
in
----> 1 h5_filt_grp = sig_filt.compute(override=True)

~/Downloads/daskUSID/daskUSID/signal_filter/dask_process.py in compute(self, override, *args, **kwargs)
415 data = self.data
416 results = data.map_blocks(self._unit_computation, dtype=data.dtype, *args, **kwargs)
--> 417 data = results.compute()
418 self.data = data
419 #self._write_results_chunk()

/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158

/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
396 keys = [x.dask_keys() for x in collections]
397 postcomputes = [x.dask_postcompute() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400

/anaconda3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2555 retries=retries,
2556 user_priority=priority,
-> 2557 actors=actors,
2558 )
2559 packed = pack_data(keys, futures)

/anaconda3/lib/python3.6/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
2482 {
2483 "op": "update-graph",
-> 2484 "tasks": valmap(dumps_task, dsk3),
2485 "dependencies": dependencies,
2486 "keys": list(flatkeys),

/anaconda3/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/anaconda3/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/anaconda3/lib/python3.6/site-packages/distributed/worker.py in dumps_task(task)
3075 return d
3076 elif not any(map(_maybe_complex, task[1:])):
-> 3077 return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
3078 return to_serialize(task)
3079

/anaconda3/lib/python3.6/site-packages/distributed/worker.py in dumps_function(func)
3044 cache[func] = result
3045 except TypeError:
-> 3046 result = pickle.dumps(func)
3047 return result
3048

/anaconda3/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
51 except Exception:
52 try:
---> 53 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
54 except Exception as e:
55 logger.info("Failed to serialize %s. Exception: %s", x, e)

/anaconda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
1095 try:
1096 cp = CloudPickler(file, protocol=protocol)
-> 1097 cp.dump(obj)
1098 return file.getvalue()
1099 finally:

/anaconda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
355 self.inject_addons()
356 try:
--> 357 return Pickler.dump(self, obj)
358 except RuntimeError as e:
359 if 'recursion' in e.args[0]:

/anaconda3/lib/python3.6/pickle.py in dump(self, obj)
407 if self.proto >= 4:
408 self.framer.start_framing()
--> 409 self.save(obj)
410 self.write(STOP)
411 self.framer.end_framing()

/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):

/anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
608 else:
609 save(func)
--> 610 save(args)
611 write(REDUCE)
612

/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478

/anaconda3/lib/python3.6/pickle.py in save_tuple(self, obj)
749 write(MARK)
750 for element in obj:
--> 751 save(element)
752
753 if id(obj) in memo:

/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478

/anaconda3/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict

/anaconda3/lib/python3.6/pickle.py in _batch_setitems(self, items)
850 k, v = tmp[0]
851 save(k)
--> 852 save(v)
853 write(SETITEM)
854 # else tmp is empty, and we're done

/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478

/anaconda3/lib/python3.6/pickle.py in save_tuple(self, obj)
734 if n <= 3 and self.proto >= 2:
735 for element in obj:
--> 736 save(element)
737 # Subtle. Same as in the big comment below.
738 if id(obj) in memo:

/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478

/anaconda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_instancemethod(self, obj)
861 else:
862 if PY3: # pragma: no branch
--> 863 self.save_reduce(types.MethodType, (obj.func, obj.self), obj=obj)
864 else:
865 self.save_reduce(types.MethodType, (obj.func, obj.self, obj.self.class),

/anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
608 else:
609 save(func)
--> 610 save(args)
611 write(REDUCE)
612

/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478

/anaconda3/lib/python3.6/pickle.py in save_tuple(self, obj)
734 if n <= 3 and self.proto >= 2:
735 for element in obj:
--> 736 save(element)
737 # Subtle. Same as in the big comment below.
738 if id(obj) in memo:

/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):

/anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
632
633 if state is not None:
--> 634 save(state)
635 write(BUILD)
636

/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478

/anaconda3/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict

/anaconda3/lib/python3.6/pickle.py in _batch_setitems(self, items)
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:

/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):

/anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
632
633 if state is not None:
--> 634 save(state)
635 write(BUILD)
636

/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478

/anaconda3/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict

/anaconda3/lib/python3.6/pickle.py in _batch_setitems(self, items)
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:

/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
494 reduce = getattr(obj, "reduce_ex", None)
495 if reduce is not None:
--> 496 rv = reduce(self.proto)
497 else:
498 reduce = getattr(obj, "reduce", None)

TypeError: can't pickle _thread._local objects

Screen Shot 2019-07-01 at 3 46 24 PM
This seems to work. However, when I call on the _unit_computation() function in the Signal Filter class, it gives me the pickling error .
Screen Shot 2019-07-01 at 3 48 14 PM

I'm sorry, I don't know what sig_filt is. You have something that isn't serializable. I'm not sure what else I can do to help. Good luck!

Okay, thank you.

I still recommend the following approach: #2787 (comment)

Interesting. I don't know then. Perhaps work from my example (verify that
it works first) and then change things towards yours until something
breaks? Maybe that helps to narrow things down?

In general, lots of things can make a function not serializable. Certainly depending on an open file handle such as you have in your example would be one reasonable cause. I think that making h5py objects serializable is outside of the scope of Dask. There are a variety of workarounds such as the example I showed, opening and closing the file every time in a task, using a nicer format for serialization, like Zarr, or using a project like Xarray. I'm going to go ahead and close this now. Again, good luck!