Description
Vector UDF Incompatibility in Databricks Environment
Background
Dotnet.spark.worker, when parses binary stream, expects for a single large Arrow batch or data.
However, for Databricks Runtime 14.3 (As i understand, for any linux env), behavior differs, as driver dataset divided by multiple batches, 10k rows in each.
This format uses different binary format(It has more arguments)
Due to differing binary representations, it misinterprets incoming byte streams—reading incorrect data, resulting in a hang with no response.
Reproduction
Create a Spark pipeline that:
- Ingests a moderate-sized dataset.
- Performs a GroupBy (ensure groups contain >10k rows).
- Applies a vector UDF via GroupBy().Apply(...), operating on the grouped RecordBatch.
Actual results
The UDF execution never completes. The .NET Spark worker attempts to read more bytes than are available, blocking indefinitely while waiting for data that is never sent.
Expected result
UDF should execute and return successfully.
TBD: Either a single consolidated RecordBatch or an IEnumerable should be passed into the UDF.
Investigation Summary
The spark.sql.execution.arrow.maxRecordsPerBatch setting, introduced in Spark 2.3.0 (default: 10,000), plays a key role. Overall using batches is beneficial for .NET Spark, as it can circumvent the 2GB Arrow buffer limit per group when using GroupBy().Apply(...).
I implemented a PoC to validate performance for a specific use case. It successfully resolves the issue with the following changes:
- The vector UDF now accepts IEnumerable.
- Several new parameters are queried but ignored. In the Python implementation, these are used to maintain batch ordering. In my testing, batches always arrive sequentially, so ordering logic was omitted.
References:
-
PoC Commit, see SqlCommandExecutor::ExecuteArrowGroupedMapCommand
Notes:
- CoGroupedMap UDFs are unaffected as they do not leverage batching (as of Spark 3.5).
- Databricks runtime 15.3, 16.3 doesn't work with dotnet.spark entirely, so no testing is performed on it. Details in the linked PR
- UseArrow can be disabled to enable older format, that should work with current dotnet.spark implementation