Skip to content

Inconsistent schema coercion in ListingTableConfig #16270

Closed
@kosiew

Description

@kosiew

Describe the bug

When registering a ListingTable with a file_schema that includes optional fields (e.g., nested structs or additional columns), the resulting projected schema and output rows vary depending on the order of input files. Changing file order causes different columns to be included in the output.

To Reproduce

Rust code to reproduce the issue:

use datafusion::prelude::*;
use datafusion::{
    arrow::{
        array::Array, array::Float64Array, array::StringArray, array::StructArray,
        array::TimestampMillisecondArray, datatypes::DataType, datatypes::Field,
        datatypes::Schema, datatypes::TimeUnit, record_batch::RecordBatch,
    },
    dataframe::DataFrameWriteOptions,
    datasource::{
        file_format::parquet::ParquetFormat,
        listing::ListingOptions,
        listing::ListingTable,
        listing::ListingTableConfig,
        listing::ListingTableUrl,
        // schema_adapter::SchemaAdapterFactory,
    },
};
use std::{error::Error, fs, sync::Arc};

/// Helper function to create a RecordBatch from a Schema and log the process
async fn create_and_write_parquet_file(
    ctx: &SessionContext,
    schema: &Arc<Schema>,
    schema_name: &str,
    file_path: &str,
) -> Result<(), Box<dyn Error>> {
    let batch = create_batch(schema, schema_name)?;

    let _ = fs::remove_file(file_path);

    let df = ctx.read_batch(batch)?;

    df.write_parquet(
        file_path,
        DataFrameWriteOptions::default()
            .with_single_file_output(true)
            .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
        None,
    )
    .await?;

    Ok(())
}

/// Helper function to create a ListingTableConfig for given paths and schema
async fn create_listing_table_config(
    ctx: &SessionContext,
    paths: impl Iterator<Item = String>,
    schema: &Arc<Schema>,
) -> Result<ListingTableConfig, Box<dyn Error>> {
    let config = ListingTableConfig::new_with_multi_paths(
        paths
            .map(|p| ListingTableUrl::parse(&p))
            .collect::<Result<Vec<_>, _>>()?,
    )
    .with_schema(schema.as_ref().clone().into());

    let config = config.infer(&ctx.state()).await?;

    let config = ListingTableConfig {
        options: Some(ListingOptions {
            file_sort_order: vec![vec![col("timestamp_utc").sort(true, true)]],
            ..config.options.unwrap_or_else(|| {
                ListingOptions::new(Arc::new(ParquetFormat::default()))
            })
        }),
        ..config
    };

    Ok(config)
}

/// Helper function to create a ListingTable from paths and schema
async fn create_listing_table(
    ctx: &SessionContext,
    paths: impl Iterator<Item = String>,
    schema: &Arc<Schema>,
) -> Result<Arc<ListingTable>, Box<dyn Error>> {
    // Create the config
    let config = create_listing_table_config(ctx, paths, schema).await?;

    // Create the listing table
    let listing_table = ListingTable::try_new(config)?;

    Ok(Arc::new(listing_table))
}

/// Helper function to register a table and execute a query
async fn execute_and_display_query(
    ctx: &SessionContext,
    table_name: &str,
    listing_table: Arc<ListingTable>,
) -> Result<(), Box<dyn Error>> {
    println!("==> {}", table_name);
    ctx.register_table(table_name, listing_table)?;

    // Derive query automatically based on table name
    let query = format!("SELECT * FROM {} ORDER BY body", table_name);
    let df = ctx.sql(&query).await?;

    let _results = df.clone().collect().await?;
    // Print the results
    df.show().await?;

    Ok(())
}

fn create_batch(
    schema: &Arc<Schema>,
    schema_name: &str,
) -> Result<RecordBatch, Box<dyn Error>> {
    // Create arrays for each field in the schema
    let columns = schema
        .fields()
        .iter()
        .map(|field| create_array_for_field(field, 1, schema_name))
        .collect::<Result<Vec<_>, _>>()?;

    // Create record batch with the generated arrays
    RecordBatch::try_new(schema.clone(), columns).map_err(|e| e.into())
}

/// Creates an appropriate array for a given field with the specified length
fn create_array_for_field(
    field: &Field,
    length: usize,
    schema_name: &str,
) -> Result<Arc<dyn Array>, Box<dyn Error>> {
    match field.data_type() {
        DataType::Utf8 => {
            // For the body field, use schema_name; for other fields use default
            if field.name() == "body" {
                Ok(Arc::new(StringArray::from(vec![Some(schema_name); length])))
            } else {
                let default_value = format!("{}_{}", field.name(), 1);
                Ok(Arc::new(StringArray::from(vec![
                    Some(default_value);
                    length
                ])))
            }
        }
        DataType::Float64 => {
            // Default float value
            Ok(Arc::new(Float64Array::from(vec![Some(1.0); length])))
        }
        DataType::Timestamp(TimeUnit::Millisecond, tz) => {
            // Default timestamp (2021-12-31T12:00:00Z)
            let array =
                TimestampMillisecondArray::from(vec![Some(1640995200000); length]);
            // Create the array with the same timezone as specified in the field
            Ok(Arc::new(array.with_data_type(DataType::Timestamp(
                TimeUnit::Millisecond,
                tz.clone(),
            ))))
        }
        DataType::Struct(fields) => {
            // Create arrays for each field in the struct
            let struct_arrays = fields
                .iter()
                .map(|f| {
                    let array = create_array_for_field(f, length, schema_name)?;
                    Ok((f.clone(), array))
                })
                .collect::<Result<Vec<_>, Box<dyn Error>>>()?;

            Ok(Arc::new(StructArray::from(struct_arrays)))
        }
        _ => Err(format!("Unsupported data type: {}", field.data_type()).into()),
    }
}

fn create_schema1() -> Arc<Schema> {
    let schema1 = Arc::new(Schema::new(vec![
        Field::new("body", DataType::Utf8, true),
        Field::new(
            "timestamp_utc",
            DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
            true,
        ),
    ]));
    schema1
}

/// Creates a schema with basic HTTP request fields plus a query_params struct field
fn create_schema2() -> Arc<Schema> {
    // Get the base schema from create_schema1
    let schema1 = create_schema1();

    // Create a new vector of fields from schema1
    let mut fields = schema1
        .fields()
        .iter()
        .map(|f| f.as_ref().clone())
        .collect::<Vec<Field>>();

    // Add the query_params field
    fields.push(Field::new(
        "query_params",
        DataType::Struct(vec![Field::new("customer_id", DataType::Utf8, true)].into()),
        true,
    ));

    // Create a new schema with the extended fields
    Arc::new(Schema::new(fields))
}

/// Creates a schema with HTTP request fields, query_params struct field, and an error field
fn create_schema3() -> Arc<Schema> {
    // Get the base schema from create_schema2
    let schema2 = create_schema2();

    // Convert to a vector of fields
    let mut fields = schema2
        .fields()
        .iter()
        .map(|f| f.as_ref().clone())
        .collect::<Vec<Field>>();

    // Add the error field
    fields.push(Field::new("error", DataType::Utf8, true));

    // Create a new schema with the extended fields
    Arc::new(Schema::new(fields))
}

/// Creates a schema with HTTP request fields, expanded query_params struct with additional fields, and an error field
fn create_schema4() -> Arc<Schema> {
    // Get the base schema from create_schema1 (we can't use schema3 directly since we need to modify query_params)
    let schema1 = create_schema1();

    // Convert to a vector of fields
    let mut fields = schema1
        .fields()
        .iter()
        .map(|f| f.as_ref().clone())
        .collect::<Vec<Field>>();

    // Add the expanded query_params field with additional fields
    fields.push(Field::new(
        "query_params",
        DataType::Struct(
            vec![
                Field::new("customer_id", DataType::Utf8, true),
                Field::new("document_type", DataType::Utf8, true),
                Field::new("fetch_from_source", DataType::Utf8, true),
                Field::new("source_system", DataType::Utf8, true),
            ]
            .into(),
        ),
        true,
    ));

    // Add the error field
    fields.push(Field::new("error", DataType::Utf8, true));

    // Create a new schema with the extended fields
    Arc::new(Schema::new(fields))
}

async fn test_datafusion_schema_evolution() -> Result<(), Box<dyn Error>> {
    let ctx = SessionContext::new();

    // Create schemas
    let schema1 = create_schema1();
    let schema2 = create_schema2();
    let schema3 = create_schema3();
    let schema4 = create_schema4();

    // Define file paths in an array for easier management
    let test_files = ["jobs1.parquet", "jobs2.parquet", "jobs3.parquet"];
    let [path1, path2, path3] = test_files; // Destructure for individual access

    // Create and write parquet files for each schema
    create_and_write_parquet_file(&ctx, &schema1, "schema1", path1).await?;
    create_and_write_parquet_file(&ctx, &schema2, "schema2", path2).await?;
    create_and_write_parquet_file(&ctx, &schema3, "schema3", path3).await?;

    let paths = vec![path1.to_string(), path2.to_string(), path3.to_string()].into_iter();
    let paths2 = vec![path1.to_string(), path2.to_string(), path3.to_string()]
        .into_iter()
        .rev();

    // Use the helper function to create the listing tables with different paths
    let table_paths = create_listing_table(&ctx, paths, &schema4).await?;
    let table_paths2 = create_listing_table(&ctx, paths2, &schema4).await?;

    // Execute query on the first table with table name "paths"
    execute_and_display_query(
        &ctx,
        "paths", // First table with original path order
        table_paths,
    )
    .await?;

    // Execute query on the second table with table name "paths2"
    execute_and_display_query(
        &ctx,
        "paths_reversed", // Second table with reversed path order
        table_paths2,
    )
    .await?;

    Ok(())
}
fn main() -> Result<(), Box<dyn Error>> {
    // Create a Tokio runtime for running our async function
    let rt = tokio::runtime::Runtime::new()?;

    // Run the function in the runtime
    rt.block_on(async { test_datafusion_schema_evolution().await })?;

    println!("Example completed successfully!");
    Ok(())
}

Output from above:

==> paths
+---------+----------------------+
| body    | timestamp_utc        |
+---------+----------------------+
| schema1 | 2022-01-01T00:00:00Z |
| schema2 | 2022-01-01T00:00:00Z |
| schema3 | 2022-01-01T00:00:00Z |
+---------+----------------------+
==> paths_reversed
+---------+----------------------+------------------------------+---------+
| body    | timestamp_utc        | query_params                 | error   |
+---------+----------------------+------------------------------+---------+
| schema1 | 2022-01-01T00:00:00Z |                              |         |
| schema2 | 2022-01-01T00:00:00Z | {customer_id: customer_id_1} |         |
| schema3 | 2022-01-01T00:00:00Z | {customer_id: customer_id_1} | error_1 |
+---------+----------------------+------------------------------+---------+

Expected behavior

The output schema and rows should be consistent regardless of file order, as file_schema is explicitly provided and coercion is expected to align all inputs accordingly.

Additional context

Discovered this while working on #15295

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions