Julia bindings for Apache Arrow DataFusion, a fast, embeddable, modular analytic query engine for SQL queries.
- High Performance: Leverages the speed and efficiency of DataFusion's Rust implementation
- SQL Support: Full SQL query capabilities with DataFusion's comprehensive dialect
- Iceberg Support: Full Apache Iceberg table format support with partitioning and schema evolution (powered by iceberg-rust)
- Memory Safe: Automatic resource management with Julia's garbage collector
- Easy Integration: Simple API for registering data sources and executing queries
- Cross Platform: Works on macOS, Linux, and Windows
- Julia 1.8+: This package requires Julia 1.8 or later
- Rust 1.70+: Required to build the C API
- Git: For cloning the repositories
DataFusion.jl requires the companion datafusion-c-api C library. Follow these steps to set up both components:
# Clone both repositories
git clone https://github.com/vustef/datafusion-c-api.git
git clone https://github.com/vustef/DataFusion.jl.git
cd datafusion-c-api
cargo build --release
This will create the necessary dynamic library (libdatafusion_c_api.dylib
on macOS, libdatafusion_c_api.so
on Linux, or datafusion_c_api.dll
on Windows) in target/release/
.
cd ../DataFusion.jl
julia --project=. -e "using Pkg; Pkg.instantiate()"
julia --project=. examples/basic_usage.jl
Your setup should look like this:
your-workspace/
├── datafusion-c-api/ # C API repository
│ ├── src/
│ ├── target/release/ # Built library location
│ └── ...
└── DataFusion.jl/ # Julia package repository
├── src/
├── examples/
└── ...
The Julia package automatically looks for the C library in the relative path ../datafusion-c-api/target/release/
.
using DataFusion
# Create a DataFusion context
ctx = DataFusionContext()
# Register a CSV file as a table
register_csv!(ctx, "employees", "employees.csv")
# Execute SQL queries
result = sql(ctx, "SELECT * FROM employees WHERE age > 30")
# Print the results
print_result(result)
# Get result metadata
println("Batches: ", batch_count(result))
println("Rows in first batch: ", batch_num_rows(result, 0))
println("Columns in first batch: ", batch_num_columns(result, 0))
DataFusion.jl supports Apache Iceberg tables with full schema and partitioning capabilities:
using DataFusion
# Create an Iceberg catalog
catalog = iceberg_catalog_sql("sqlite://", "my_catalog")
# Build a schema
schema = iceberg_schema()
add_long_field!(schema, UInt32(1), "id", true)
add_long_field!(schema, UInt32(2), "customer_id", true)
add_date_field!(schema, UInt32(3), "order_date", true)
add_int_field!(schema, UInt32(4), "amount", true)
# Create partition specification
partition_spec = iceberg_partition_spec()
add_day_field!(partition_spec, UInt32(3), UInt32(1000), "day")
# Create and register Iceberg table
table = iceberg_table("orders", "/path/to/orders", schema, partition_spec, catalog, "my_catalog")
ctx = DataFusionContext()
register_iceberg_table!(ctx, "orders", table)
# Use the table with SQL
result = sql(ctx, "INSERT INTO orders VALUES (1, 100, '2024-01-01', 250)")
result = sql(ctx, "SELECT * FROM orders WHERE order_date >= '2024-01-01'")
print_result(result)
Represents a DataFusion execution context that manages query execution and data sources.
ctx = DataFusionContext()
Represents the result of a SQL query execution, containing one or more record batches.
Register a CSV file as a table in the DataFusion context.
Arguments:
ctx
: The DataFusion contexttable_name
: Name to assign to the tablefile_path
: Path to the CSV file
Example:
register_csv!(ctx, "sales_data", "/path/to/sales.csv")
Create a new SQL-based Iceberg catalog.
Arguments:
database_url
: Database connection URL (e.g., "sqlite://")name
: Catalog name
Example:
catalog = iceberg_catalog_sql("sqlite://", "my_catalog")
Create a new Iceberg schema builder.
Example:
schema = iceberg_schema()
add_long_field!(schema, UInt32(1), "id", true)
add_date_field!(schema, UInt32(2), "created_at", true)
Add a long (64-bit integer) field to the schema.
Add an int (32-bit integer) field to the schema.
Add a date field to the schema.
Create a new Iceberg partition specification.
Add day-based partitioning to the partition specification.
iceberg_table(name::String, location::String, schema::IcebergSchema, partition_spec::IcebergPartitionSpec, catalog::IcebergCatalog, catalog_name::String) -> IcebergTable
Create a new Iceberg table.
Register an Iceberg table with the DataFusion context.
Execute a SQL query and return the results.
Arguments:
ctx
: The DataFusion contextquery
: SQL query string
Returns: A DataFusionResult
containing the query results
Example:
result = sql(ctx, "SELECT product, SUM(amount) FROM sales_data GROUP BY product")
Print the result as a formatted table to stdout.
Arguments:
result
: The DataFusion query result
Get the number of record batches in the result.
Get the number of rows in a specific batch.
Get the number of columns in a specific batch.
using DataFusion
# Create context and register data
ctx = DataFusionContext()
register_csv!(ctx, "employees", "employees.csv")
# Basic SELECT
result = sql(ctx, "SELECT name, age FROM employees")
print_result(result)
# Filtering
result = sql(ctx, "SELECT * FROM employees WHERE department = 'Engineering'")
print_result(result)
# Aggregation
result = sql(ctx, "SELECT department, COUNT(*) as count, AVG(salary) as avg_salary
FROM employees
GROUP BY department")
print_result(result)
# Sorting and limiting
result = sql(ctx, "SELECT name, salary FROM employees ORDER BY salary DESC LIMIT 5")
print_result(result)
using DataFusion
# Create an Iceberg catalog and schema
catalog = iceberg_catalog_sql("sqlite://", "analytics")
schema = iceberg_schema()
add_long_field!(schema, UInt32(1), "id", true)
add_long_field!(schema, UInt32(2), "customer_id", true)
add_long_field!(schema, UInt32(3), "product_id", true)
add_date_field!(schema, UInt32(4), "order_date", true)
add_int_field!(schema, UInt32(5), "amount", true)
# Create day-partitioned table
partition_spec = iceberg_partition_spec()
add_day_field!(partition_spec, UInt32(4), UInt32(1000), "day")
table = iceberg_table("orders", "/data/orders", schema, partition_spec, catalog, "analytics")
# Register with DataFusion
ctx = DataFusionContext()
register_iceberg_table!(ctx, "orders", table)
# Insert data
sql(ctx, """
INSERT INTO orders (id, customer_id, product_id, order_date, amount) VALUES
(1, 101, 1, '2024-01-01', 250),
(2, 102, 2, '2024-01-01', 150),
(3, 103, 1, '2024-01-02', 300)
""")
# Query with time-based filtering (takes advantage of partitioning)
result = sql(ctx, """
SELECT product_id, SUM(amount) as total_sales
FROM orders
WHERE order_date >= '2024-01-01' AND order_date < '2024-01-02'
GROUP BY product_id
""")
print_result(result)
The package includes several example scripts:
# Run the basic usage example
cd DataFusion.jl
julia --project=. examples/basic_usage.jl
# Run the Iceberg example
julia --project=. examples/iceberg_usage.jl
Run the test suite to verify everything is working:
cd DataFusion.jl
julia --project=. test/runtests.jl
DataFusion.jl provides informative error messages for common issues:
try
ctx = DataFusionContext()
register_csv!(ctx, "nonexistent", "missing_file.csv")
catch e
println("Error: ", e)
# Handle the error appropriately
end
- Use Release Build: Always build the C API in release mode for best performance
- Batch Operations: DataFusion is optimized for batch processing - avoid row-by-row operations
- Predicate Pushdown: Use WHERE clauses to filter data early in the query
- Column Selection: Only select the columns you need to reduce memory usage
This error occurs when the Julia package cannot locate the compiled Rust library. Ensure that:
- You've built the
datafusion-c-api
project withcargo build --release
- The library file exists in
../datafusion-c-api/target/release/
- Both repositories are in the same parent directory
If you encounter build errors:
- Ensure you have Rust 1.70+ installed:
rustc --version
- Update your Rust installation:
rustup update
- Clean and rebuild:
cargo clean && cargo build --release
If you encounter memory-related errors:
- Ensure you're not holding references to freed contexts or results
- Julia's garbage collector should handle memory cleanup automatically
- Check that the C library was built without memory issues
If queries are running slowly:
- Verify you're using the release build of the C library
- Check that your CSV files are properly formatted
- Consider using more specific queries with appropriate filtering
DataFusion.jl consists of two main components:
- datafusion-c-api: A Rust crate that provides C-compatible bindings to Apache Arrow DataFusion
- DataFusion.jl: This Julia package that provides high-level Julia bindings to the C API
The architecture provides:
- Safety: Memory management handled by both Rust and Julia garbage collectors
- Performance: Direct calls to DataFusion's optimized Rust implementation
- Compatibility: Standard C ABI ensures broad compatibility across platforms
- Apache Arrow DataFusion - The underlying query engine
- iceberg-rust - Rust implementation of Apache Iceberg with DataFusion integration (used for Iceberg support)
- datafusion-c-api - C bindings for DataFusion (required dependency)
- Arrow.jl - Julia bindings for Apache Arrow
- DataFrames.jl - DataFrames implementation in Julia
Contributions are welcome! Please ensure that:
- The Rust C API builds successfully
- All Julia examples run without errors
- New features include appropriate documentation and examples
- Tests pass:
julia --project=. test/runtests.jl
When contributing, you may need to make changes to both repositories:
- C API changes: Submit PRs to datafusion-c-api
- Julia binding changes: Submit PRs to this repository
This project is licensed under the same terms as Apache Arrow DataFusion - Apache License 2.0.