Skip to content

Commit f2079d8

Browse files
heyihongcloud-fan
authored andcommitted
[SPARK-52243][CONNECT] Add NERF support for schema-related InvalidPlanInput errors
### What changes were proposed in this pull request? This PR adds NERF (New Error Framework) support for schema-related InvalidPlanInput errors in Spark Connect. The changes include: 1. Added a new error condition in error-conditions.json for schema validation: - INVALID_SCHEMA_TYPE_NON_STRUCT 2. Refactored error handling in InvalidInputErrors.scala to use the new NERF framework: - Added helper function invalidPlanInput for consistent error message generation - Updated schema validation error methods to use NERF error conditions - Made quoteByDefault method accessible to other packages 3. Added a test suite InvalidInputErrorsSuite.scala to verify error handling ### Why are the changes needed? These changes are needed to: 1. Standardize error reporting across Spark Connect using the NERF framework 2. Improve error messages with better parameterization and consistency 3. Ensure proper SQL state codes are associated with schema-related errors 4. Provide clearer error messages for users when schema validation fails ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `build/sbt "connect/testOnly *InvalidInputErrorsSuite"` ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor 0.50.5 (Universal) Closes #50997 from heyihong/SPARK-52243. Authored-by: Yihong He <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent f423885 commit f2079d8

File tree

6 files changed

+148
-32
lines changed

6 files changed

+148
-32
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3432,6 +3432,12 @@
34323432
],
34333433
"sqlState" : "42602"
34343434
},
3435+
"INVALID_SCHEMA_TYPE_NON_STRUCT" : {
3436+
"message" : [
3437+
"Invalid schema type. Expect a struct type, but got <dataType>."
3438+
],
3439+
"sqlState" : "42K09"
3440+
},
34353441
"INVALID_SET_SYNTAX" : {
34363442
"message" : [
34373443
"Expected format is 'SET', 'SET key', or 'SET key=value'. If you want to include special characters in key, or include semicolon in value, please use backquotes, e.g., SET `key`=`value`."

python/pyspark/sql/tests/connect/test_parity_udtf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def eval(self, a: int):
6161
yield a + 1,
6262

6363
with self.assertRaisesRegex(
64-
InvalidPlanInput, "Invalid Python user-defined table function return type."
64+
InvalidPlanInput, "Invalid schema type. Expect a struct type, but got"
6565
):
6666
TestUDTF(lit(1)).collect()
6767

sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrorsBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ private[sql] trait DataTypeErrorsBase {
8585
else value.toString
8686
}
8787

88-
protected def quoteByDefault(elem: String): String = {
88+
protected[sql] def quoteByDefault(elem: String): String = {
8989
"\"" + elem + "\""
9090
}
9191

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,22 @@ package org.apache.spark.sql.connect.planner
1919

2020
import scala.collection.mutable
2121

22+
import org.apache.spark.SparkThrowableHelper
2223
import org.apache.spark.connect.proto
2324
import org.apache.spark.sql.connect.common.{InvalidCommandInput, InvalidPlanInput}
25+
import org.apache.spark.sql.errors.DataTypeErrors.{quoteByDefault, toSQLType}
2426
import org.apache.spark.sql.types.DataType
2527

2628
object InvalidInputErrors {
2729

30+
// invalidPlanInput is a helper function to facilitate the migration of InvalidInputErrors
31+
// to support NERF.
32+
private def invalidPlanInput(
33+
errorCondition: String,
34+
messageParameters: Map[String, String] = Map.empty): InvalidPlanInput = {
35+
InvalidPlanInput(SparkThrowableHelper.getMessage(errorCondition, messageParameters))
36+
}
37+
2838
def unknownRelationNotSupported(rel: proto.Relation): InvalidPlanInput =
2939
InvalidPlanInput(s"${rel.getUnknown} not supported.")
3040

@@ -72,11 +82,6 @@ object InvalidInputErrors {
7282
def rowNotSupportedForUdf(errorType: String): InvalidPlanInput =
7383
InvalidPlanInput(s"Row is not a supported $errorType type for this UDF.")
7484

75-
def invalidUserDefinedOutputSchemaType(actualType: String): InvalidPlanInput =
76-
InvalidPlanInput(
77-
s"Invalid user-defined output schema type for TransformWithStateInPandas. " +
78-
s"Expect a struct type, but got $actualType.")
79-
8085
def notFoundCachedLocalRelation(hash: String, sessionUUID: String): InvalidPlanInput =
8186
InvalidPlanInput(
8287
s"Not found any cached local relation with the hash: " +
@@ -91,8 +96,10 @@ object InvalidInputErrors {
9196
def schemaRequiredForLocalRelation(): InvalidPlanInput =
9297
InvalidPlanInput("Schema for LocalRelation is required when the input data is not provided.")
9398

94-
def invalidSchema(schema: DataType): InvalidPlanInput =
95-
InvalidPlanInput(s"Invalid schema $schema")
99+
def invalidSchemaStringNonStructType(schema: String, dataType: DataType): InvalidPlanInput =
100+
invalidPlanInput(
101+
"INVALID_SCHEMA.NON_STRUCT_TYPE",
102+
Map("inputSchema" -> quoteByDefault(schema), "dataType" -> toSQLType(dataType)))
96103

97104
def invalidJdbcParams(): InvalidPlanInput =
98105
InvalidPlanInput("Invalid jdbc params, please specify jdbc url and table.")
@@ -106,8 +113,8 @@ object InvalidInputErrors {
106113
def doesNotSupport(what: String): InvalidPlanInput =
107114
InvalidPlanInput(s"Does not support $what")
108115

109-
def invalidSchemaDataType(dataType: DataType): InvalidPlanInput =
110-
InvalidPlanInput(s"Invalid schema dataType $dataType")
116+
def invalidSchemaTypeNonStruct(dataType: DataType): InvalidPlanInput =
117+
invalidPlanInput("INVALID_SCHEMA_TYPE_NON_STRUCT", Map("dataType" -> toSQLType(dataType)))
111118

112119
def expressionIdNotSupported(exprId: Int): InvalidPlanInput =
113120
InvalidPlanInput(s"Expression with ID: $exprId is not supported")
@@ -189,9 +196,6 @@ object InvalidInputErrors {
189196
def usingColumnsOrJoinConditionSetInJoin(): InvalidPlanInput =
190197
InvalidPlanInput("Using columns or join conditions cannot be set at the same time in Join")
191198

192-
def invalidStateSchemaDataType(dataType: DataType): InvalidPlanInput =
193-
InvalidPlanInput(s"Invalid state schema dataType $dataType for flatMapGroupsWithState")
194-
195199
def sqlCommandExpectsSqlOrWithRelations(other: proto.Relation.RelTypeCase): InvalidPlanInput =
196200
InvalidPlanInput(s"SQL command expects either a SQL or a WithRelations, but got $other")
197201

@@ -213,17 +217,6 @@ object InvalidInputErrors {
213217
def invalidBucketCount(numBuckets: Int): InvalidCommandInput =
214218
InvalidCommandInput("INVALID_BUCKET_COUNT", Map("numBuckets" -> numBuckets.toString))
215219

216-
def invalidPythonUdtfReturnType(actualType: String): InvalidPlanInput =
217-
InvalidPlanInput(
218-
s"Invalid Python user-defined table function return type. " +
219-
s"Expect a struct type, but got $actualType.")
220-
221-
def invalidUserDefinedOutputSchemaTypeForTransformWithState(
222-
actualType: String): InvalidPlanInput =
223-
InvalidPlanInput(
224-
s"Invalid user-defined output schema type for TransformWithStateInPandas. " +
225-
s"Expect a struct type, but got $actualType.")
226-
227220
def unsupportedUserDefinedFunctionImplementation(clazz: Class[_]): InvalidPlanInput =
228221
InvalidPlanInput(s"Unsupported UserDefinedFunction implementation: ${clazz}")
229222
}

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,7 @@ class SparkConnectPlanner(
773773
val stateSchema = DataTypeProtoConverter.toCatalystType(rel.getStateSchema) match {
774774
case s: StructType => s
775775
case other =>
776-
throw InvalidInputErrors.invalidStateSchemaDataType(other)
776+
throw InvalidInputErrors.invalidSchemaTypeNonStruct(other)
777777
}
778778
val stateEncoder = TypedScalaUdf.encoderFor(
779779
// the state agnostic encoder is the second element in the input encoders.
@@ -1105,8 +1105,7 @@ class SparkConnectPlanner(
11051105
transformDataType(twsInfo.getOutputSchema) match {
11061106
case s: StructType => s
11071107
case dt =>
1108-
throw InvalidInputErrors.invalidUserDefinedOutputSchemaTypeForTransformWithState(
1109-
dt.typeName)
1108+
throw InvalidInputErrors.invalidSchemaTypeNonStruct(dt)
11101109
}
11111110
}
11121111

@@ -1502,7 +1501,7 @@ class SparkConnectPlanner(
15021501
StructType.fromDDL,
15031502
fallbackParser = DataType.fromJson) match {
15041503
case s: StructType => s
1505-
case other => throw InvalidInputErrors.invalidSchema(other)
1504+
case other => throw InvalidInputErrors.invalidSchemaStringNonStructType(schema, other)
15061505
}
15071506
}
15081507

@@ -1580,7 +1579,7 @@ class SparkConnectPlanner(
15801579
if (rel.hasSchema) {
15811580
DataTypeProtoConverter.toCatalystType(rel.getSchema) match {
15821581
case s: StructType => reader.schema(s)
1583-
case other => throw InvalidInputErrors.invalidSchemaDataType(other)
1582+
case other => throw InvalidInputErrors.invalidSchemaTypeNonStruct(other)
15841583
}
15851584
}
15861585
localMap.foreach { case (key, value) => reader.option(key, value) }
@@ -2967,8 +2966,7 @@ class SparkConnectPlanner(
29672966
val returnType = if (udtf.hasReturnType) {
29682967
transformDataType(udtf.getReturnType) match {
29692968
case s: StructType => Some(s)
2970-
case dt =>
2971-
throw InvalidInputErrors.invalidPythonUdtfReturnType(dt.typeName)
2969+
case dt => throw InvalidInputErrors.invalidSchemaTypeNonStruct(dt)
29722970
}
29732971
} else {
29742972
None
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connect.planner
19+
20+
import org.apache.spark.SparkThrowableHelper
21+
import org.apache.spark.connect.proto
22+
import org.apache.spark.sql.catalyst.expressions.AttributeReference
23+
import org.apache.spark.sql.catalyst.plans.PlanTest
24+
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput}
25+
import org.apache.spark.sql.connect.planner.SparkConnectPlanTest
26+
import org.apache.spark.sql.types._
27+
28+
class InvalidInputErrorsSuite extends PlanTest with SparkConnectPlanTest {
29+
30+
lazy val testLocalRelation =
31+
createLocalRelationProto(
32+
Seq(AttributeReference("id", IntegerType)(), AttributeReference("name", StringType)()),
33+
Seq.empty)
34+
35+
val testCases = Seq(
36+
TestCase(
37+
name = "Invalid schema data type non struct for Parse",
38+
expectedErrorCondition = "INVALID_SCHEMA_TYPE_NON_STRUCT",
39+
expectedParameters = Map("dataType" -> "\"ARRAY<INT>\""),
40+
invalidInput = {
41+
val parse = proto.Parse
42+
.newBuilder()
43+
.setSchema(DataTypeProtoConverter.toConnectProtoType(ArrayType(IntegerType)))
44+
.setFormat(proto.Parse.ParseFormat.PARSE_FORMAT_CSV)
45+
.build()
46+
47+
proto.Relation.newBuilder().setParse(parse).build()
48+
}),
49+
TestCase(
50+
name = "Invalid schema type non struct for TransformWithState",
51+
expectedErrorCondition = "INVALID_SCHEMA_TYPE_NON_STRUCT",
52+
expectedParameters = Map("dataType" -> "\"ARRAY<INT>\""),
53+
invalidInput = {
54+
val pythonUdf = proto.CommonInlineUserDefinedFunction
55+
.newBuilder()
56+
.setPythonUdf(
57+
proto.PythonUDF
58+
.newBuilder()
59+
.setEvalType(211)
60+
.setOutputType(DataTypeProtoConverter.toConnectProtoType(ArrayType(IntegerType)))
61+
.build())
62+
.build()
63+
64+
val groupMap = proto.GroupMap
65+
.newBuilder()
66+
.setInput(testLocalRelation)
67+
.setFunc(pythonUdf)
68+
.setTransformWithStateInfo(
69+
proto.TransformWithStateInfo
70+
.newBuilder()
71+
.setOutputSchema(DataTypeProtoConverter.toConnectProtoType(ArrayType(IntegerType)))
72+
.build())
73+
.build()
74+
75+
proto.Relation.newBuilder().setGroupMap(groupMap).build()
76+
}),
77+
TestCase(
78+
name = "Invalid schema string non struct type",
79+
expectedErrorCondition = "INVALID_SCHEMA.NON_STRUCT_TYPE",
80+
expectedParameters = Map(
81+
"inputSchema" -> """"{"type":"array","elementType":"integer","containsNull":false}"""",
82+
"dataType" -> "\"ARRAY<INT>\""),
83+
invalidInput = {
84+
val invalidSchema = """{"type":"array","elementType":"integer","containsNull":false}"""
85+
86+
val dataSource = proto.Read.DataSource
87+
.newBuilder()
88+
.setFormat("csv")
89+
.setSchema(invalidSchema)
90+
.build()
91+
92+
val read = proto.Read
93+
.newBuilder()
94+
.setDataSource(dataSource)
95+
.build()
96+
97+
proto.Relation.newBuilder().setRead(read).build()
98+
}))
99+
100+
// Run all test cases
101+
testCases.foreach { testCase =>
102+
test(s"${testCase.name}") {
103+
val exception = intercept[InvalidPlanInput] {
104+
transform(testCase.invalidInput)
105+
}
106+
val expectedMessage = SparkThrowableHelper.getMessage(
107+
testCase.expectedErrorCondition,
108+
testCase.expectedParameters)
109+
assert(exception.getMessage == expectedMessage)
110+
}
111+
}
112+
113+
// Helper case class to define test cases
114+
case class TestCase(
115+
name: String,
116+
expectedErrorCondition: String,
117+
expectedParameters: Map[String, String],
118+
invalidInput: proto.Relation)
119+
}

0 commit comments

Comments
 (0)