feature_request(BREAKING_CHANGE): automatically set `parquet_options.enable_list_inference = True` when the source format is parquet
Opened this issue · 4 comments
Environment details
- OS type and version: Ubuntu 20.04 (WSL)
- Python version: 3.11.8
- pip version: 24.2
google-cloud-bigquery
version: 3.25.0
Steps to reproduce
- Create a table that has a single field with mode=REPEATED
- Attempt to append to the table using
client.load_table_from_file
with a parquet file written from memory to aBytesIO
buffer
- If no schema is provided tobigquery.LoadJobConfig
, the operation fails
- If the table schema is provided tobigquery.LoadJobConfig
, the operation does not raise, but instead incorrectly inserts empty arrays into the table
Issue details
I am unable to use client.load_table_from_file
with a parquet file to append to an existing table with a REPEATED field.
This issue is somewhat similar to #1981, except related to REPEATED fields rather than REQUIRED fields.
Code example
Apologies, in advance that the example is a bit long.
It demonstrates Parquet files written to BytesIO buffers from both Polars and PyArrow unable to be written to a BigQuery table with mode=REPEATED.
Code example
from io import BytesIO
import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import bigquery
import polars as pl
PROJECT = "<project>"
def create_and_return_table(table_name: str, client: bigquery.Client) -> bigquery.Table:
schema = [bigquery.SchemaField("foo", "INTEGER", mode="REPEATED")]
table = bigquery.Table(f"{PROJECT}.testing.{table_name}", schema=schema)
client.delete_table(table, not_found_ok=True)
return client.create_table(table)
def polars_way(table: bigquery.Table, client: bigquery.Client):
df = pl.DataFrame({"foo": [[1, 2], [3, 4]]})
with BytesIO() as stream:
df.write_parquet(stream)
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
# Default option, but make it explicit
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
# If the schema is provided, the operation succeeds, but the data is not
# correctly inserted. Empty lists are inserted instead.
# schema=table.schema,
)
job = client.load_table_from_file(
stream,
destination=table,
rewind=True,
job_config=job_config,
)
job.result()
def pyarrow_way(table: bigquery.Table, client: bigquery.Client):
pyarrow_schema = pa.schema([pa.field("foo", pa.large_list(pa.int64()))])
pyarrow_table = pa.Table.from_pydict(
{"foo": [[1, 2], [3, 4]]}, schema=pyarrow_schema
)
with BytesIO() as stream:
writer = pq.ParquetWriter(stream, pyarrow_schema)
writer.write(pyarrow_table)
writer.close()
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
# Default option, but make it explicit
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
# If the schema is provided, the operation succeeds, but the data is not
# correctly inserted. Empty lists are inserted instead.
# schema=table.schema,
)
job = client.load_table_from_file(
stream,
destination=table,
rewind=True,
job_config=job_config,
)
job.result()
def main():
client = bigquery.Client()
table = create_and_return_table("test_pl", client)
polars_way(table, client)
table = create_and_return_table("test_pa", client)
pyarrow_way(table, client)
# Both "ways" raise the below error
# google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table
# project:dataset.table. Field foo has changed type from INTEGER to RECORD; reason:
# invalid, message: Provided Schema does not match Table project:dataset.table. Field
# foo has changed type from INTEGER to RECORD
# Unless the table schema is provided, in which case the operation succeeds, but the
# data is inserted as empty arrays
if __name__ == "__main__":
main()
Stack trace
Both the polars_way
and the pyarrow_way
raise with the error. Here they both are.
# polars_way
Traceback (most recent call last):
File "/home/henry/development/polars_bq/combined.py", line 93, in <module>
main()
File "/home/henry/development/polars_bq/combined.py", line 77, in main
polars_way(table, client)
File "/home/henry/development/polars_bq/combined.py", line 42, in polars_way
job.result()
File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/cloud/bigquery/job/base.py", line 966, in result
return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/api_core/future/polling.py", line 261, in result
raise self._exception
google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table <project>:testing.test_pl. Field foo has changed type from INTEGER to RECORD; reason: invalid, message: Provided Schema does not match Table <project>:testing.test_pl. Field foo has changed type from INTEGER to RECORD
# pyarrow_way
Traceback (most recent call last):
File "/home/henry/development/polars_bq/combined.py", line 93, in <module>
main()
File "/home/henry/development/polars_bq/combined.py", line 79, in main
pyarrow_way(table, client)
File "/home/henry/development/polars_bq/combined.py", line 71, in pyarrow_way
job.result()
File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/cloud/bigquery/job/base.py", line 966, in result
return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/api_core/future/polling.py", line 261, in result
raise self._exception
google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table <project>:testing.test_pa. Field foo has changed type from INTEGER to RECORD; reason: invalid, message: Provided Schema does not match Table <project>:testing.test_pa. Field foo has changed type from INTEGER to RECORD
Since you are using load_table_from_file
directly, there's a couple of things to watch out for:
My teammate, Micah, commented on a similar Arrow + BigQuery issue with what might be going on.
For (1) He mentions a feature in BigQuery "enable_list_inference". We do this automatically in the "load pandas DataFrame into BigQuery" code path:
python-bigquery/google/cloud/bigquery/client.py
Line 2730 in ea69fe3
We also enable compliant nested types in that code path:
python-bigquery/google/cloud/bigquery/client.py
Line 2815 in ea69fe3
For parquet files from polars, it'd be good to double check that the parquet schema produced are compatible with BigQuery via "compliant nested types".
In https://gist.github.com/tswast/4e2fb2cca1c1fecf8fb697e94102358f, I have confirmed that parquet files created with the polars write engine use the "compliant" schema, which is equivalent to an list of repeated groups with a single field named "element". (See the Parquet spec)
Modifying your code samples to add
job_config = bigquery.LoadJobConfig(...)
parquet_options = bigquery.ParquetOptions()
parquet_options.enable_list_inference = True
job_config.parquet_options = parquet_options
both ways (pyarrow and polars) succeed.
Code
from io import BytesIO
import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import bigquery
import polars as pl
PROJECT = "swast-scratch"
def create_and_return_table(table_name: str, client: bigquery.Client) -> bigquery.Table:
schema = [bigquery.SchemaField("foo", "INTEGER", mode="REPEATED")]
table = bigquery.Table(f"{PROJECT}.testing.{table_name}", schema=schema)
client.delete_table(table, not_found_ok=True)
return client.create_table(table)
def polars_way(table: bigquery.Table, client: bigquery.Client):
df = pl.DataFrame({"foo": [[1, 2], [3, 4]]})
with BytesIO() as stream:
df.write_parquet(stream)
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
# Default option, but make it explicit
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
# If the schema is provided, the operation succeeds, but the data is not
# correctly inserted. Empty lists are inserted instead.
# schema=table.schema,
)
parquet_options = bigquery.ParquetOptions()
parquet_options.enable_list_inference = True
job_config.parquet_options = parquet_options
job = client.load_table_from_file(
stream,
destination=table,
rewind=True,
job_config=job_config,
)
job.result()
def pyarrow_way(table: bigquery.Table, client: bigquery.Client):
pyarrow_schema = pa.schema([pa.field("foo", pa.large_list(pa.int64()))])
pyarrow_table = pa.Table.from_pydict(
{"foo": [[1, 2], [3, 4]]}, schema=pyarrow_schema
)
with BytesIO() as stream:
writer = pq.ParquetWriter(stream, pyarrow_schema)
writer.write(pyarrow_table)
writer.close()
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
# Default option, but make it explicit
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
# If the schema is provided, the operation succeeds, but the data is not
# correctly inserted. Empty lists are inserted instead.
# schema=table.schema,
)
parquet_options = bigquery.ParquetOptions()
parquet_options.enable_list_inference = True
job_config.parquet_options = parquet_options
job = client.load_table_from_file(
stream,
destination=table,
rewind=True,
job_config=job_config,
)
job.result()
def main():
client = bigquery.Client()
table = create_and_return_table("test_pl", client)
polars_way(table, client)
print(client.list_rows(table).to_arrow())
table = create_and_return_table("test_pa", client)
pyarrow_way(table, client)
print(client.list_rows(table).to_arrow())
if __name__ == "__main__":
main()
Output:
# Polars way
pyarrow.Table
foo: list<item: int64> not null
child 0, item: int64
----
foo: [[[1,2],[3,4]]]
# PyArrow way
pyarrow.Table
foo: list<item: int64> not null
child 0, item: int64
----
foo: [[[1,2],[3,4]]]
All that said, I think this issue can be converted to a feature request to change the default parquet_options.enable_list_inference = True
when the source format is Parquet.
This would be a breaking change, so should only happen whenever we do a 4.0.0 release.
Hi @tswast,
Thank you very much for you work, comments and PR to the Polars docs!
Also supporting this being changed to a breaking feature request.