In Spark mode, map transformation incurs a swap on the underlying distributed array.
zuxfoucault opened this issue · 1 comments
zuxfoucault commented
While executing the line examples = series.filter(lambda x: x.std() > 6).normalize().sample(100).toarray()
in Thunder-project's tutorials---basic. The following error messages raised:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-7-d01291816368> in <module>()
----> 1 examples = series.filter(lambda x: x.std() > 6).normalize().sample(100).toarray()
/raid/opt/big/anaconda/python2/lib/python2.7/site-packages/thunder/series/series.pyc in normalize(self, method, window, perc, offset)
1079 return (y - b) / (b + offset)
1080
-> 1081 return self.map(get)
1082
1083 def toimages(self, size='150'):
/raid/opt/big/anaconda/python2/lib/python2.7/site-packages/thunder/series/series.pyc in map(self, func, index, value_shape, dtype, with_keys)
191 if isinstance(value_shape, int):
192 values_shape = (value_shape, )
--> 193 new = super(Series, self).map(func, value_shape=value_shape, dtype=dtype, with_keys=with_keys)
194
195 if index is not None:
/raid/opt/big/anaconda/python2/lib/python2.7/site-packages/thunder/base.pyc in map(self, func, value_shape, dtype, with_keys)
466 if self.mode == 'spark':
467 expand = lambda x: array(func(x), ndmin=1)
--> 468 mapped = self.values.map(expand, axis, value_shape, dtype, with_keys)
469 return self._constructor(mapped, mode=self.mode).__finalize__(self, noprop=('index',))
470
/raid/opt/big/anaconda/python2/lib/python2.7/site-packages/bolt/spark/array.pyc in map(self, func, axis, value_shape, dtype, with_keys)
153 """
154 axis = tupleize(axis)
--> 155 swapped = self._align(axis)
156
157 if with_keys:
/raid/opt/big/anaconda/python2/lib/python2.7/site-packages/bolt/spark/array.pyc in _align(self, axis)
111
112 if tokeys or tovalues:
--> 113 return self.swap(tovalues, tokeys)
114 else:
115 return self
/raid/opt/big/anaconda/python2/lib/python2.7/site-packages/bolt/spark/array.pyc in swap(self, kaxes, vaxes, size)
748
749 if len(kaxes) == self.keys.ndim and len(vaxes) == 0:
--> 750 raise ValueError('Cannot perform a swap that would '
751 'end up with all data on a single key')
752
ValueError: Cannot perform a swap that would end up with all data on a single key
jwittenbach commented
@zuxfoucault thanks for posting this! I did some digging, and it turns out the bug was upstream in the Bolt package. I made the fix there: bolt-project/bolt#108.