Complex arrow columns cause an error during the reducing stage
Opened this issue · 1 comments
Bug report
I implemented a custom reader for HSC PDR2 data, which converts a FITS bool-array to pa.list_(pa.bool_(), 274)
.
On the reducing stage this ends with an error from pyarrow-pandas conversion layer, originated from hipscat_import/catalog/map_reduce.py:287, in reduce_pixel_shards()
, where we convert merged pyarrow table to pandas.
I tried to reproduce this problem with a simple pyarrow table including fixed-list column, but this code does work:
import pandas as pd
import pyarrow as pa
a = pa.array([1, 2, 3])
b = pa.array([[True, False, True]] * 3, type=pa.list_(pa.bool_(), 3))
table = pa.Table.from_arrays([a, b], names=['a', 'b'])
df = table.to_pandas(types_mapper=pd.ArrowDtype)
Note this types_mapper argument here, it works even without this, but produces object dtype, I believe that we also should use it, but it may be another issue. I also tried to modify reduce_pixel_shards()
to use this argument, but it produces the same error.
Details are under the spoilers:
Reader implementation
import numpy as np
import pandas as pd
import pyarrow as pa
from astropy.table import Table
from hipscat_import.catalog.file_readers import FitsReader
class HSCFitsReader(FitsReader):
"""FITS reader that converts ra and dec from radians to degrees"""
def __init__(self, *args, ra_column, dec_column, **kwargs):
super().__init__(*args, **kwargs)
self.ra_column = ra_column
self.dec_column = dec_column
def read(self, input_file, read_columns=None):
# Mostly copy-pasted from the hipscat-import implementation
self.regular_file_exists(input_file, **self.kwargs)
table = Table.read(input_file, memmap=True, **self.kwargs)
if read_columns:
table.keep_columns(read_columns)
elif self.column_names:
table.keep_columns(self.column_names)
elif self.skip_column_names:
table.remove_columns(self.skip_column_names)
total_rows = len(table)
read_rows = 0
while read_rows < total_rows:
chunk = table[read_rows : read_rows + self.chunksize]
df_chunk = self.astropy_table_to_df(chunk)
yield df_chunk
read_rows += self.chunksize
def astropy_table_to_df(self, table):
"""Data convertion, spoils input table"""
# Flags wouldn't convert to pandas due to astropy's implementation limitations
# So we convert them manually and delete from the table
if 'flags' in table.columns:
flags = table['flags']
flags_length = flags.shape[1]
flags_pyarrow = pa.array(flags.tolist(), type=pa.list_(pa.bool_(), flags_length))
table.remove_column('flags')
df = table.to_pandas()
# Convert coords from radians to degrees
df[self.ra_column] = np.degrees(df[self.ra_column])
df[self.dec_column] = np.degrees(df[self.dec_column])
# Assign flags
if 'flags' in table.columns:
df['flags'] = pd.Series(flags_pyarrow, dtype=pd.ArrowDtype(flags_pyarrow.type))
return df
Traceback
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/types.pxi:5284, in pyarrow.lib.type_for_alias()
5283 try:
-> 5284 alias = _type_aliases[name]
5285 except KeyError:
KeyError: 'fixed_size_list<item: bool>[274]'
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/dtypes.py:2251, in construct_from_string()
2250 try:
-> 2251 pa_dtype = pa.type_for_alias(base_type)
2252 except ValueError as err:
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/types.pxi:5286, in pyarrow.lib.type_for_alias()
5285 except KeyError:
-> 5286 raise ValueError('No type alias for {0}'.format(name))
5287
ValueError: No type alias for fixed_size_list<item: bool>[274]
The above exception was the direct cause of the following exception:
NotImplementedError Traceback (most recent call last)
Cell In[3], line 29
25 with Client(n_workers=64) as client:
26 # import dask
27 # with dask.config.set(scheduler='single-threaded'), Client(processes=False, threads_per_worker=1, n_workers=1) as client:
28 display(client)
---> 29 pipeline_with_client(args, client)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/pipeline.py:60, in pipeline_with_client(args, client)
58 if args.completion_email_address:
59 _send_failure_email(args, exception)
---> 60 raise exception
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/pipeline.py:44, in pipeline_with_client(args, client)
41 raise ValueError("args is required and should be subclass of RuntimeArguments")
43 if isinstance(args, ImportArguments):
---> 44 catalog_runner.run(args, client)
45 elif isinstance(args, IndexArguments):
46 index_runner.run(args, client)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/run_import.py:111, in run(args, client)
85 for (
86 destination_pixel,
87 source_pixel_count,
88 destination_pixel_key,
89 ) in args.resume_plan.get_reduce_items():
90 futures.append(
91 client.submit(
92 mr.reduce_pixel_shards,
(...)
108 )
109 )
--> 111 args.resume_plan.wait_for_reducing(futures)
113 # All done - write out the metadata
114 with args.resume_plan.print_progress(total=5, stage_name="Finishing") as step_progress:
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/resume_plan.py:298, in ResumePlan.wait_for_reducing(self, futures)
296 def wait_for_reducing(self, futures):
297 """Wait for reducing futures to complete."""
--> 298 self.wait_for_futures(futures, self.REDUCING_STAGE, fail_fast=True)
299 remaining_reduce_items = self.get_reduce_items()
300 if len(remaining_reduce_items) > 0:
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/pipeline_resume_plan.py:143, in PipelineResumePlan.wait_for_futures(self, futures, stage_name, fail_fast)
141 some_error = True
142 if fail_fast:
--> 143 raise future.exception()
145 if some_error:
146 raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.")
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/distributed/worker.py:3000, in apply_function_simple()
2995 with (
2996 context_meter.meter("thread-noncpu", func=time) as m,
2997 context_meter.meter("thread-cpu", func=thread_time),
2998 ):
2999 try:
-> 3000 result = function(*args, **kwargs)
3001 except (SystemExit, KeyboardInterrupt):
3002 # Special-case these, just like asyncio does all over the place. They will
3003 # pass through `fail_hard` and `_handle_stimulus_from_task`, and eventually
(...)
3006 # Any other `BaseException` types would ultimately be ignored by asyncio if
3007 # raised here, after messing up the worker state machine along their way.
3008 raise
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/map_reduce.py:331, in reduce_pixel_shards()
326 except Exception as exception: # pylint: disable=broad-exception-caught
327 print_task_failure(
328 f"Failed REDUCING stage for shard: {destination_pixel_order} {destination_pixel_number}",
329 exception,
330 )
--> 331 raise exception
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/map_reduce.py:287, in reduce_pixel_shards()
280 if rows_written != destination_pixel_size:
281 raise ValueError(
282 "Unexpected number of objects at pixel "
283 f"({healpix_pixel})."
284 f" Expected {destination_pixel_size}, wrote {rows_written}"
285 )
--> 287 dataframe = merged_table.to_pandas()
288 if sort_columns:
289 dataframe = dataframe.sort_values(sort_columns.split(","))
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/array.pxi:885, in pyarrow.lib._PandasConvertible.to_pandas()
883 coerce_temporal_nanoseconds=coerce_temporal_nanoseconds
884 )
--> 885 return self._to_pandas(options, categories=categories,
886 ignore_metadata=ignore_metadata,
887 types_mapper=types_mapper)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/table.pxi:5002, in pyarrow.lib.Table._to_pandas()
5000 types_mapper=None):
5001 from pyarrow.pandas_compat import table_to_dataframe
-> 5002 df = table_to_dataframe(
5003 options, self, categories,
5004 ignore_metadata=ignore_metadata,
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas_compat.py:774, in table_to_dataframe()
771 table = _add_any_metadata(table, pandas_metadata)
772 table, index = _reconstruct_index(table, index_descriptors,
773 all_columns, types_mapper)
--> 774 ext_columns_dtypes = _get_extension_dtypes(
775 table, all_columns, types_mapper)
776 else:
777 index = _pandas_api.pd.RangeIndex(table.num_rows)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas_compat.py:853, in _get_extension_dtypes()
848 dtype = col_meta['numpy_type']
850 if dtype not in _pandas_supported_numpy_types:
851 # pandas_dtype is expensive, so avoid doing this for types
852 # that are certainly numpy dtypes
--> 853 pandas_dtype = _pandas_api.pandas_dtype(dtype)
854 if isinstance(pandas_dtype, _pandas_api.extension_dtype):
855 if hasattr(pandas_dtype, "__from_arrow__"):
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas-shim.pxi:148, in pyarrow.lib._PandasAPIShim.pandas_dtype()
146 return self._pd.lib.infer_dtype(obj)
147
--> 148 cpdef pandas_dtype(self, dtype):
149 self._check_import()
150 try:
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas-shim.pxi:151, in pyarrow.lib._PandasAPIShim.pandas_dtype()
149 self._check_import()
150 try:
--> 151 return self._types_api.pandas_dtype(dtype)
152 except AttributeError:
153 return None
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/common.py:1624, in pandas_dtype()
1621 return dtype
1623 # registered extension types
-> 1624 result = registry.find(dtype)
1625 if result is not None:
1626 if isinstance(result, type):
1627 # GH 31356, GH 54592
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/base.py:576, in find()
574 for dtype_type in self.dtypes:
575 try:
--> 576 return dtype_type.construct_from_string(dtype)
577 except TypeError:
578 pass
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/dtypes.py:2262, in construct_from_string()
2258 except (NotImplementedError, ValueError):
2259 # Fall through to raise with nice exception message below
2260 pass
-> 2262 raise NotImplementedError(
2263 "Passing pyarrow type specific parameters "
2264 f"({has_parameters.group()}) in the string is not supported. "
2265 "Please construct an ArrowDtype object with a pyarrow_dtype "
2266 "instance with specific parameters."
2267 ) from err
2268 raise TypeError(f"'{base_type}' is not a valid pyarrow data type.") from err
2269 return cls(pa_dtype)
NotImplementedError: Passing pyarrow type specific parameters ([274]) in the string is not supported. Please construct an ArrowDtype object with a pyarrow_dtype instance with specific parameters.
Before submitting
Please check the following:
- I have described the situation in which the bug arose, including what code was executed, information about my environment, and any applicable data others will need to reproduce the problem.
- I have included available evidence of the unexpected behavior (including error messages, screenshots, and/or plots) as well as a descriprion of what I expected instead.
- If I have a solution in mind, I have provided an explanation and/or pseudocode and/or task list.
Filed pandas-dev/pandas#59738