astronomy-commons/hats-import

Complex arrow columns cause an error during the reducing stage

Opened this issue · 1 comments

Bug report

I implemented a custom reader for HSC PDR2 data, which converts a FITS bool-array to pa.list_(pa.bool_(), 274).

On the reducing stage this ends with an error from pyarrow-pandas conversion layer, originated from hipscat_import/catalog/map_reduce.py:287, in reduce_pixel_shards(), where we convert merged pyarrow table to pandas.

I tried to reproduce this problem with a simple pyarrow table including fixed-list column, but this code does work:

import pandas as pd
import pyarrow as pa

a = pa.array([1, 2, 3])
b = pa.array([[True, False, True]] * 3, type=pa.list_(pa.bool_(), 3))
table = pa.Table.from_arrays([a, b], names=['a', 'b'])
df = table.to_pandas(types_mapper=pd.ArrowDtype)

Note this types_mapper argument here, it works even without this, but produces object dtype, I believe that we also should use it, but it may be another issue. I also tried to modify reduce_pixel_shards() to use this argument, but it produces the same error.

Details are under the spoilers:

Reader implementation
import numpy as np
import pandas as pd
import pyarrow as pa
from astropy.table import Table
from hipscat_import.catalog.file_readers import FitsReader


class HSCFitsReader(FitsReader):
    """FITS reader that converts ra and dec from radians to degrees"""
    def __init__(self, *args, ra_column, dec_column, **kwargs):
        super().__init__(*args, **kwargs)
        self.ra_column = ra_column
        self.dec_column = dec_column

    def read(self, input_file, read_columns=None):
        # Mostly copy-pasted from the hipscat-import implementation
        self.regular_file_exists(input_file, **self.kwargs)
        table = Table.read(input_file, memmap=True, **self.kwargs)
        if read_columns:
            table.keep_columns(read_columns)
        elif self.column_names:
            table.keep_columns(self.column_names)
        elif self.skip_column_names:
            table.remove_columns(self.skip_column_names)

        total_rows = len(table)
        read_rows = 0

        while read_rows < total_rows:
            chunk = table[read_rows : read_rows + self.chunksize]
            df_chunk = self.astropy_table_to_df(chunk)
            yield df_chunk

            read_rows += self.chunksize

    def astropy_table_to_df(self, table):
        """Data convertion, spoils input table"""
        # Flags wouldn't convert to pandas due to astropy's implementation limitations
        # So we convert them manually and delete from the table
        if 'flags' in table.columns:
            flags = table['flags']
            flags_length = flags.shape[1]
            flags_pyarrow = pa.array(flags.tolist(), type=pa.list_(pa.bool_(), flags_length))

            table.remove_column('flags')

        df = table.to_pandas()

        # Convert coords from radians to degrees
        df[self.ra_column] = np.degrees(df[self.ra_column])
        df[self.dec_column] = np.degrees(df[self.dec_column])

        # Assign flags
        if 'flags' in table.columns:
            df['flags'] = pd.Series(flags_pyarrow, dtype=pd.ArrowDtype(flags_pyarrow.type))
            
        return df
Traceback
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/types.pxi:5284, in pyarrow.lib.type_for_alias()
   5283 try:
-> 5284     alias = _type_aliases[name]
   5285 except KeyError:

KeyError: 'fixed_size_list<item: bool>[274]'

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/dtypes.py:2251, in construct_from_string()
   2250 try:
-> 2251     pa_dtype = pa.type_for_alias(base_type)
   2252 except ValueError as err:

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/types.pxi:5286, in pyarrow.lib.type_for_alias()
   5285 except KeyError:
-> 5286     raise ValueError('No type alias for {0}'.format(name))
   5287 

ValueError: No type alias for fixed_size_list<item: bool>[274]

The above exception was the direct cause of the following exception:

NotImplementedError                       Traceback (most recent call last)
Cell In[3], line 29
     25 with Client(n_workers=64) as client:
     26 # import dask
     27 # with dask.config.set(scheduler='single-threaded'), Client(processes=False, threads_per_worker=1, n_workers=1) as client:
     28     display(client)
---> 29     pipeline_with_client(args, client)

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/pipeline.py:60, in pipeline_with_client(args, client)
     58 if args.completion_email_address:
     59     _send_failure_email(args, exception)
---> 60 raise exception

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/pipeline.py:44, in pipeline_with_client(args, client)
     41     raise ValueError("args is required and should be subclass of RuntimeArguments")
     43 if isinstance(args, ImportArguments):
---> 44     catalog_runner.run(args, client)
     45 elif isinstance(args, IndexArguments):
     46     index_runner.run(args, client)

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/run_import.py:111, in run(args, client)
     85         for (
     86             destination_pixel,
     87             source_pixel_count,
     88             destination_pixel_key,
     89         ) in args.resume_plan.get_reduce_items():
     90             futures.append(
     91                 client.submit(
     92                     mr.reduce_pixel_shards,
   (...)
    108                 )
    109             )
--> 111         args.resume_plan.wait_for_reducing(futures)
    113 # All done - write out the metadata
    114 with args.resume_plan.print_progress(total=5, stage_name="Finishing") as step_progress:

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/resume_plan.py:298, in ResumePlan.wait_for_reducing(self, futures)
    296 def wait_for_reducing(self, futures):
    297     """Wait for reducing futures to complete."""
--> 298     self.wait_for_futures(futures, self.REDUCING_STAGE, fail_fast=True)
    299     remaining_reduce_items = self.get_reduce_items()
    300     if len(remaining_reduce_items) > 0:

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/pipeline_resume_plan.py:143, in PipelineResumePlan.wait_for_futures(self, futures, stage_name, fail_fast)
    141         some_error = True
    142         if fail_fast:
--> 143             raise future.exception()
    145 if some_error:
    146     raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.")

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/distributed/worker.py:3000, in apply_function_simple()
   2995 with (
   2996     context_meter.meter("thread-noncpu", func=time) as m,
   2997     context_meter.meter("thread-cpu", func=thread_time),
   2998 ):
   2999     try:
-> 3000         result = function(*args, **kwargs)
   3001     except (SystemExit, KeyboardInterrupt):
   3002         # Special-case these, just like asyncio does all over the place. They will
   3003         # pass through `fail_hard` and `_handle_stimulus_from_task`, and eventually
   (...)
   3006         # Any other `BaseException` types would ultimately be ignored by asyncio if
   3007         # raised here, after messing up the worker state machine along their way.
   3008         raise

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/map_reduce.py:331, in reduce_pixel_shards()
    326 except Exception as exception:  # pylint: disable=broad-exception-caught
    327     print_task_failure(
    328         f"Failed REDUCING stage for shard: {destination_pixel_order} {destination_pixel_number}",
    329         exception,
    330     )
--> 331     raise exception

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/hipscat_import/catalog/map_reduce.py:287, in reduce_pixel_shards()
    280 if rows_written != destination_pixel_size:
    281     raise ValueError(
    282         "Unexpected number of objects at pixel "
    283         f"({healpix_pixel})."
    284         f" Expected {destination_pixel_size}, wrote {rows_written}"
    285     )
--> 287 dataframe = merged_table.to_pandas()
    288 if sort_columns:
    289     dataframe = dataframe.sort_values(sort_columns.split(","))

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/array.pxi:885, in pyarrow.lib._PandasConvertible.to_pandas()
    883     coerce_temporal_nanoseconds=coerce_temporal_nanoseconds
    884 )
--> 885 return self._to_pandas(options, categories=categories,
    886                        ignore_metadata=ignore_metadata,
    887                        types_mapper=types_mapper)

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/table.pxi:5002, in pyarrow.lib.Table._to_pandas()
   5000            types_mapper=None):
   5001 from pyarrow.pandas_compat import table_to_dataframe
-> 5002 df = table_to_dataframe(
   5003     options, self, categories,
   5004     ignore_metadata=ignore_metadata,

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas_compat.py:774, in table_to_dataframe()
    771     table = _add_any_metadata(table, pandas_metadata)
    772     table, index = _reconstruct_index(table, index_descriptors,
    773                                       all_columns, types_mapper)
--> 774     ext_columns_dtypes = _get_extension_dtypes(
    775         table, all_columns, types_mapper)
    776 else:
    777     index = _pandas_api.pd.RangeIndex(table.num_rows)

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas_compat.py:853, in _get_extension_dtypes()
    848 dtype = col_meta['numpy_type']
    850 if dtype not in _pandas_supported_numpy_types:
    851     # pandas_dtype is expensive, so avoid doing this for types
    852     # that are certainly numpy dtypes
--> 853     pandas_dtype = _pandas_api.pandas_dtype(dtype)
    854     if isinstance(pandas_dtype, _pandas_api.extension_dtype):
    855         if hasattr(pandas_dtype, "__from_arrow__"):

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas-shim.pxi:148, in pyarrow.lib._PandasAPIShim.pandas_dtype()
    146         return self._pd.lib.infer_dtype(obj)
    147 
--> 148 cpdef pandas_dtype(self, dtype):
    149     self._check_import()
    150     try:

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pyarrow/pandas-shim.pxi:151, in pyarrow.lib._PandasAPIShim.pandas_dtype()
    149 self._check_import()
    150 try:
--> 151     return self._types_api.pandas_dtype(dtype)
    152 except AttributeError:
    153     return None

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/common.py:1624, in pandas_dtype()
   1621     return dtype
   1623 # registered extension types
-> 1624 result = registry.find(dtype)
   1625 if result is not None:
   1626     if isinstance(result, type):
   1627         # GH 31356, GH 54592

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/base.py:576, in find()
    574 for dtype_type in self.dtypes:
    575     try:
--> 576         return dtype_type.construct_from_string(dtype)
    577     except TypeError:
    578         pass

File /ocean/projects/phy210048p/shared/hipscat/raw/hsc/pdr3/cenv/lib/python3.11/site-packages/pandas/core/dtypes/dtypes.py:2262, in construct_from_string()
   2258         except (NotImplementedError, ValueError):
   2259             # Fall through to raise with nice exception message below
   2260             pass
-> 2262         raise NotImplementedError(
   2263             "Passing pyarrow type specific parameters "
   2264             f"({has_parameters.group()}) in the string is not supported. "
   2265             "Please construct an ArrowDtype object with a pyarrow_dtype "
   2266             "instance with specific parameters."
   2267         ) from err
   2268     raise TypeError(f"'{base_type}' is not a valid pyarrow data type.") from err
   2269 return cls(pa_dtype)

NotImplementedError: Passing pyarrow type specific parameters ([274]) in the string is not supported. Please construct an ArrowDtype object with a pyarrow_dtype instance with specific parameters.

Before submitting
Please check the following:

  • I have described the situation in which the bug arose, including what code was executed, information about my environment, and any applicable data others will need to reproduce the problem.
  • I have included available evidence of the unexpected behavior (including error messages, screenshots, and/or plots) as well as a descriprion of what I expected instead.
  • If I have a solution in mind, I have provided an explanation and/or pseudocode and/or task list.