googleapis/python-bigquery

feature_request(BREAKING_CHANGE): automatically set `parquet_options.enable_list_inference = True` when the source format is parquet

Opened this issue · 4 comments

Environment details

  • OS type and version: Ubuntu 20.04 (WSL)
  • Python version: 3.11.8
  • pip version: 24.2
  • google-cloud-bigquery version: 3.25.0

Steps to reproduce

  1. Create a table that has a single field with mode=REPEATED
  2. Attempt to append to the table using client.load_table_from_file with a parquet file written from memory to a BytesIO buffer
    - If no schema is provided to bigquery.LoadJobConfig, the operation fails
    - If the table schema is provided to bigquery.LoadJobConfig, the operation does not raise, but instead incorrectly inserts empty arrays into the table

Issue details

I am unable to use client.load_table_from_file with a parquet file to append to an existing table with a REPEATED field.

This issue is somewhat similar to #1981, except related to REPEATED fields rather than REQUIRED fields.

Code example

Apologies, in advance that the example is a bit long.

It demonstrates Parquet files written to BytesIO buffers from both Polars and PyArrow unable to be written to a BigQuery table with mode=REPEATED.

Code example
from io import BytesIO

import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import bigquery

import polars as pl

PROJECT = "<project>"


def create_and_return_table(table_name: str, client: bigquery.Client) -> bigquery.Table:
    schema = [bigquery.SchemaField("foo", "INTEGER", mode="REPEATED")]
    table = bigquery.Table(f"{PROJECT}.testing.{table_name}", schema=schema)

    client.delete_table(table, not_found_ok=True)
    return client.create_table(table)


def polars_way(table: bigquery.Table, client: bigquery.Client):
    df = pl.DataFrame({"foo": [[1, 2], [3, 4]]})

    with BytesIO() as stream:
        df.write_parquet(stream)

        job_config = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.PARQUET,
            # Default option, but make it explicit
            write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
            # If the schema is provided, the operation succeeds, but the data is not
            # correctly inserted. Empty lists are inserted instead.
            # schema=table.schema,
        )

        job = client.load_table_from_file(
            stream,
            destination=table,
            rewind=True,
            job_config=job_config,
        )

    job.result()


def pyarrow_way(table: bigquery.Table, client: bigquery.Client):
    pyarrow_schema = pa.schema([pa.field("foo", pa.large_list(pa.int64()))])
    pyarrow_table = pa.Table.from_pydict(
        {"foo": [[1, 2], [3, 4]]}, schema=pyarrow_schema
    )

    with BytesIO() as stream:
        writer = pq.ParquetWriter(stream, pyarrow_schema)
        writer.write(pyarrow_table)
        writer.close()

        job_config = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.PARQUET,
            # Default option, but make it explicit
            write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
            # If the schema is provided, the operation succeeds, but the data is not
            # correctly inserted. Empty lists are inserted instead.
            # schema=table.schema,
        )
        job = client.load_table_from_file(
            stream,
            destination=table,
            rewind=True,
            job_config=job_config,
        )

    job.result()


def main():
    client = bigquery.Client()
    table = create_and_return_table("test_pl", client)
    polars_way(table, client)
    table = create_and_return_table("test_pa", client)
    pyarrow_way(table, client)

    # Both "ways" raise the below error

    # google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table
    # project:dataset.table. Field foo has changed type from INTEGER to RECORD; reason:
    # invalid, message: Provided Schema does not match Table project:dataset.table. Field
    # foo has changed type from INTEGER to RECORD

    # Unless the table schema is provided, in which case the operation succeeds, but the
    # data is inserted as empty arrays


if __name__ == "__main__":
    main()

Stack trace

Both the polars_way and the pyarrow_way raise with the error. Here they both are.

# polars_way
Traceback (most recent call last):
  File "/home/henry/development/polars_bq/combined.py", line 93, in <module>
    main()
  File "/home/henry/development/polars_bq/combined.py", line 77, in main
    polars_way(table, client)
  File "/home/henry/development/polars_bq/combined.py", line 42, in polars_way
    job.result()
  File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/cloud/bigquery/job/base.py", line 966, in result
    return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/api_core/future/polling.py", line 261, in result
    raise self._exception
google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table <project>:testing.test_pl. Field foo has changed type from INTEGER to RECORD; reason: invalid, message: Provided Schema does not match Table <project>:testing.test_pl. Field foo has changed type from INTEGER to RECORD

# pyarrow_way
Traceback (most recent call last):
  File "/home/henry/development/polars_bq/combined.py", line 93, in <module>
    main()
  File "/home/henry/development/polars_bq/combined.py", line 79, in main
    pyarrow_way(table, client)
  File "/home/henry/development/polars_bq/combined.py", line 71, in pyarrow_way
    job.result()
  File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/cloud/bigquery/job/base.py", line 966, in result
    return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/api_core/future/polling.py", line 261, in result
    raise self._exception
google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table <project>:testing.test_pa. Field foo has changed type from INTEGER to RECORD; reason: invalid, message: Provided Schema does not match Table <project>:testing.test_pa. Field foo has changed type from INTEGER to RECORD

Since you are using load_table_from_file directly, there's a couple of things to watch out for:

My teammate, Micah, commented on a similar Arrow + BigQuery issue with what might be going on.

For (1) He mentions a feature in BigQuery "enable_list_inference". We do this automatically in the "load pandas DataFrame into BigQuery" code path:

parquet_options.enable_list_inference = True
).

We also enable compliant nested types in that code path:

{"use_compliant_nested_type": True}

For parquet files from polars, it'd be good to double check that the parquet schema produced are compatible with BigQuery via "compliant nested types".

In https://gist.github.com/tswast/4e2fb2cca1c1fecf8fb697e94102358f, I have confirmed that parquet files created with the polars write engine use the "compliant" schema, which is equivalent to an list of repeated groups with a single field named "element". (See the Parquet spec)

Modifying your code samples to add

job_config  = bigquery.LoadJobConfig(...)
parquet_options = bigquery.ParquetOptions()
parquet_options.enable_list_inference = True
job_config.parquet_options = parquet_options

both ways (pyarrow and polars) succeed.

Code
from io import BytesIO

import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import bigquery

import polars as pl

PROJECT = "swast-scratch"


def create_and_return_table(table_name: str, client: bigquery.Client) -> bigquery.Table:
    schema = [bigquery.SchemaField("foo", "INTEGER", mode="REPEATED")]
    table = bigquery.Table(f"{PROJECT}.testing.{table_name}", schema=schema)

    client.delete_table(table, not_found_ok=True)
    return client.create_table(table)


def polars_way(table: bigquery.Table, client: bigquery.Client):
    df = pl.DataFrame({"foo": [[1, 2], [3, 4]]})

    with BytesIO() as stream:
        df.write_parquet(stream)

        job_config = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.PARQUET,
            # Default option, but make it explicit
            write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
            # If the schema is provided, the operation succeeds, but the data is not
            # correctly inserted. Empty lists are inserted instead.
            # schema=table.schema,
        )
        parquet_options = bigquery.ParquetOptions()
        parquet_options.enable_list_inference = True
        job_config.parquet_options = parquet_options

        job = client.load_table_from_file(
            stream,
            destination=table,
            rewind=True,
            job_config=job_config,
        )

    job.result()


def pyarrow_way(table: bigquery.Table, client: bigquery.Client):
    pyarrow_schema = pa.schema([pa.field("foo", pa.large_list(pa.int64()))])
    pyarrow_table = pa.Table.from_pydict(
        {"foo": [[1, 2], [3, 4]]}, schema=pyarrow_schema
    )

    with BytesIO() as stream:
        writer = pq.ParquetWriter(stream, pyarrow_schema)
        writer.write(pyarrow_table)
        writer.close()

        job_config = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.PARQUET,
            # Default option, but make it explicit
            write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
            # If the schema is provided, the operation succeeds, but the data is not
            # correctly inserted. Empty lists are inserted instead.
            # schema=table.schema,
        )
        parquet_options = bigquery.ParquetOptions()
        parquet_options.enable_list_inference = True
        job_config.parquet_options = parquet_options

        job = client.load_table_from_file(
            stream,
            destination=table,
            rewind=True,
            job_config=job_config,
        )

    job.result()


def main():
    client = bigquery.Client()
    table = create_and_return_table("test_pl", client)
    polars_way(table, client)
    print(client.list_rows(table).to_arrow())
    table = create_and_return_table("test_pa", client)
    pyarrow_way(table, client)
    print(client.list_rows(table).to_arrow())


if __name__ == "__main__":
    main()

Output:

# Polars way
pyarrow.Table
foo: list<item: int64> not null
  child 0, item: int64
----
foo: [[[1,2],[3,4]]]

# PyArrow way
pyarrow.Table
foo: list<item: int64> not null
  child 0, item: int64
----
foo: [[[1,2],[3,4]]]

All that said, I think this issue can be converted to a feature request to change the default parquet_options.enable_list_inference = True when the source format is Parquet.

This would be a breaking change, so should only happen whenever we do a 4.0.0 release.

Hi @tswast,

Thank you very much for you work, comments and PR to the Polars docs!
Also supporting this being changed to a breaking feature request.