Open
Description
Environment details
- OS type and version: Ubuntu 20.04 (WSL)
- Python version: 3.11.8
- pip version: 24.2
google-cloud-bigquery
version: 3.25.0
Steps to reproduce
- Create a table that has a single field with mode=REPEATED
- Attempt to append to the table using
client.load_table_from_file
with a parquet file written from memory to aBytesIO
buffer
- If no schema is provided tobigquery.LoadJobConfig
, the operation fails
- If the table schema is provided tobigquery.LoadJobConfig
, the operation does not raise, but instead incorrectly inserts empty arrays into the table
Issue details
I am unable to use client.load_table_from_file
with a parquet file to append to an existing table with a REPEATED field.
This issue is somewhat similar to #1981, except related to REPEATED fields rather than REQUIRED fields.
Code example
Apologies, in advance that the example is a bit long.
It demonstrates Parquet files written to BytesIO buffers from both Polars and PyArrow unable to be written to a BigQuery table with mode=REPEATED.
Code example
from io import BytesIO
import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import bigquery
import polars as pl
PROJECT = "<project>"
def create_and_return_table(table_name: str, client: bigquery.Client) -> bigquery.Table:
schema = [bigquery.SchemaField("foo", "INTEGER", mode="REPEATED")]
table = bigquery.Table(f"{PROJECT}.testing.{table_name}", schema=schema)
client.delete_table(table, not_found_ok=True)
return client.create_table(table)
def polars_way(table: bigquery.Table, client: bigquery.Client):
df = pl.DataFrame({"foo": [[1, 2], [3, 4]]})
with BytesIO() as stream:
df.write_parquet(stream)
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
# Default option, but make it explicit
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
# If the schema is provided, the operation succeeds, but the data is not
# correctly inserted. Empty lists are inserted instead.
# schema=table.schema,
)
job = client.load_table_from_file(
stream,
destination=table,
rewind=True,
job_config=job_config,
)
job.result()
def pyarrow_way(table: bigquery.Table, client: bigquery.Client):
pyarrow_schema = pa.schema([pa.field("foo", pa.large_list(pa.int64()))])
pyarrow_table = pa.Table.from_pydict(
{"foo": [[1, 2], [3, 4]]}, schema=pyarrow_schema
)
with BytesIO() as stream:
writer = pq.ParquetWriter(stream, pyarrow_schema)
writer.write(pyarrow_table)
writer.close()
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
# Default option, but make it explicit
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
# If the schema is provided, the operation succeeds, but the data is not
# correctly inserted. Empty lists are inserted instead.
# schema=table.schema,
)
job = client.load_table_from_file(
stream,
destination=table,
rewind=True,
job_config=job_config,
)
job.result()
def main():
client = bigquery.Client()
table = create_and_return_table("test_pl", client)
polars_way(table, client)
table = create_and_return_table("test_pa", client)
pyarrow_way(table, client)
# Both "ways" raise the below error
# google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table
# project:dataset.table. Field foo has changed type from INTEGER to RECORD; reason:
# invalid, message: Provided Schema does not match Table project:dataset.table. Field
# foo has changed type from INTEGER to RECORD
# Unless the table schema is provided, in which case the operation succeeds, but the
# data is inserted as empty arrays
if __name__ == "__main__":
main()
Stack trace
Both the polars_way
and the pyarrow_way
raise with the error. Here they both are.
# polars_way
Traceback (most recent call last):
File "/home/henry/development/polars_bq/combined.py", line 93, in <module>
main()
File "/home/henry/development/polars_bq/combined.py", line 77, in main
polars_way(table, client)
File "/home/henry/development/polars_bq/combined.py", line 42, in polars_way
job.result()
File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/cloud/bigquery/job/base.py", line 966, in result
return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/api_core/future/polling.py", line 261, in result
raise self._exception
google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table <project>:testing.test_pl. Field foo has changed type from INTEGER to RECORD; reason: invalid, message: Provided Schema does not match Table <project>:testing.test_pl. Field foo has changed type from INTEGER to RECORD
# pyarrow_way
Traceback (most recent call last):
File "/home/henry/development/polars_bq/combined.py", line 93, in <module>
main()
File "/home/henry/development/polars_bq/combined.py", line 79, in main
pyarrow_way(table, client)
File "/home/henry/development/polars_bq/combined.py", line 71, in pyarrow_way
job.result()
File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/cloud/bigquery/job/base.py", line 966, in result
return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/henry/development/polars_bq/.venv/lib/python3.11/site-packages/google/api_core/future/polling.py", line 261, in result
raise self._exception
google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table <project>:testing.test_pa. Field foo has changed type from INTEGER to RECORD; reason: invalid, message: Provided Schema does not match Table <project>:testing.test_pa. Field foo has changed type from INTEGER to RECORD