Skip to content

Commit 2831571

Browse files
gatorsmilecloud-fan
authored andcommitted
[SPARK-22791][SQL][SS] Redact Output of Explain
## What changes were proposed in this pull request? When calling explain on a query, the output can contain sensitive information. We should provide an admin/user to redact such information. Before this PR, the plan of SS is like this ``` == Physical Plan == *HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, count(1)#12L]) +- StateStoreSave [value#6], state info [ checkpoint = file:/private/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-91c6fac0-609f-4bc8-ad57-52c189f06797/state, runId = 05a4b3af-f02c-40f8-9ff9-a3e18bae496f, opId = 0, ver = 0, numPartitions = 5], Complete, 0 +- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#18L]) +- StateStoreRestore [value#6], state info [ checkpoint = file:/private/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-91c6fac0-609f-4bc8-ad57-52c189f06797/state, runId = 05a4b3af-f02c-40f8-9ff9-a3e18bae496f, opId = 0, ver = 0, numPartitions = 5] +- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#18L]) +- Exchange hashpartitioning(value#6, 5) +- *HashAggregate(keys=[value#6], functions=[partial_count(1)], output=[value#6, count#18L]) +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6] +- *MapElements <function1>, obj#5: java.lang.String +- *DeserializeToObject value#30.toString, obj#4: java.lang.String +- LocalTableScan [value#30] ``` After this PR, we can get the following output if users set `spark.redaction.string.regex` to `file:/[\\w_]+` ``` == Physical Plan == *HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, count(1)#12L]) +- StateStoreSave [value#6], state info [ checkpoint = *********(redacted)/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-e7da9b7d-3ec0-474d-8b8c-927f7d12ed72/state, runId = 8a9c3761-93d5-4896-ab82-14c06240dcea, opId = 0, ver = 0, numPartitions = 5], Complete, 0 +- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#32L]) +- StateStoreRestore [value#6], state info [ checkpoint = *********(redacted)/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-e7da9b7d-3ec0-474d-8b8c-927f7d12ed72/state, runId = 8a9c3761-93d5-4896-ab82-14c06240dcea, opId = 0, ver = 0, numPartitions = 5] +- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#32L]) +- Exchange hashpartitioning(value#6, 5) +- *HashAggregate(keys=[value#6], functions=[partial_count(1)], output=[value#6, count#32L]) +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6] +- *MapElements <function1>, obj#5: java.lang.String +- *DeserializeToObject value#27.toString, obj#4: java.lang.String +- LocalTableScan [value#27] ``` ## How was this patch tested? Added a test case Author: gatorsmile <[email protected]> Closes #19985 from gatorsmile/redactPlan.
1 parent 571aa27 commit 2831571

File tree

6 files changed

+105
-11
lines changed

6 files changed

+105
-11
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2650,15 +2650,29 @@ private[spark] object Utils extends Logging {
26502650
redact(redactionPattern, kvs)
26512651
}
26522652

2653+
/**
2654+
* Redact the sensitive values in the given map. If a map key matches the redaction pattern then
2655+
* its value is replaced with a dummy text.
2656+
*/
2657+
def redact(regex: Option[Regex], kvs: Seq[(String, String)]): Seq[(String, String)] = {
2658+
regex match {
2659+
case None => kvs
2660+
case Some(r) => redact(r, kvs)
2661+
}
2662+
}
2663+
26532664
/**
26542665
* Redact the sensitive information in the given string.
26552666
*/
2656-
def redact(conf: SparkConf, text: String): String = {
2657-
if (text == null || text.isEmpty || conf == null || !conf.contains(STRING_REDACTION_PATTERN)) {
2658-
text
2659-
} else {
2660-
val regex = conf.get(STRING_REDACTION_PATTERN).get
2661-
regex.replaceAllIn(text, REDACTION_REPLACEMENT_TEXT)
2667+
def redact(regex: Option[Regex], text: String): String = {
2668+
regex match {
2669+
case None => text
2670+
case Some(r) =>
2671+
if (text == null || text.isEmpty) {
2672+
text
2673+
} else {
2674+
r.replaceAllIn(text, REDACTION_REPLACEMENT_TEXT)
2675+
}
26622676
}
26632677
}
26642678

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference
2323

2424
import scala.collection.JavaConverters._
2525
import scala.collection.immutable
26+
import scala.util.matching.Regex
2627

2728
import org.apache.hadoop.fs.Path
2829

@@ -1035,6 +1036,14 @@ object SQLConf {
10351036
.booleanConf
10361037
.createWithDefault(true)
10371038

1039+
val SQL_STRING_REDACTION_PATTERN =
1040+
ConfigBuilder("spark.sql.redaction.string.regex")
1041+
.doc("Regex to decide which parts of strings produced by Spark contain sensitive " +
1042+
"information. When this regex matches a string part, that string part is replaced by a " +
1043+
"dummy value. This is currently used to redact the output of SQL explain commands. " +
1044+
"When this conf is not set, the value from `spark.redaction.string.regex` is used.")
1045+
.fallbackConf(org.apache.spark.internal.config.STRING_REDACTION_PATTERN)
1046+
10381047
object Deprecated {
10391048
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
10401049
}
@@ -1173,6 +1182,8 @@ class SQLConf extends Serializable with Logging {
11731182

11741183
def escapedStringLiterals: Boolean = getConf(ESCAPED_STRING_LITERALS)
11751184

1185+
def stringRedationPattern: Option[Regex] = SQL_STRING_REDACTION_PATTERN.readFrom(reader)
1186+
11761187
/**
11771188
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
11781189
* identifiers are equal.

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
6969
* Shorthand for calling redactString() without specifying redacting rules
7070
*/
7171
private def redact(text: String): String = {
72-
Utils.redact(SparkSession.getActiveSession.map(_.sparkContext.conf).orNull, text)
72+
Utils.redact(sqlContext.sessionState.conf.stringRedationPattern, text)
7373
}
7474
}
7575

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,13 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
194194
}
195195
}
196196

197-
def simpleString: String = {
197+
def simpleString: String = withRedaction {
198198
s"""== Physical Plan ==
199199
|${stringOrError(executedPlan.treeString(verbose = false))}
200200
""".stripMargin.trim
201201
}
202202

203-
override def toString: String = {
203+
override def toString: String = withRedaction {
204204
def output = Utils.truncatedString(
205205
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")
206206
val analyzedPlan = Seq(
@@ -219,7 +219,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
219219
""".stripMargin.trim
220220
}
221221

222-
def stringWithStats: String = {
222+
def stringWithStats: String = withRedaction {
223223
// trigger to compute stats for logical plans
224224
optimizedPlan.stats
225225

@@ -231,6 +231,13 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
231231
""".stripMargin.trim
232232
}
233233

234+
/**
235+
* Redact the sensitive information in the given string.
236+
*/
237+
private def withRedaction(message: String): String = {
238+
Utils.redact(sparkSession.sessionState.conf.stringRedationPattern, message)
239+
}
240+
234241
/** A special namespace for commands that can be used to debug query execution. */
235242
// scalastyle:off
236243
object debug {

sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.apache.hadoop.fs.Path
2020

2121
import org.apache.spark.SparkConf
2222
import org.apache.spark.sql.QueryTest
23+
import org.apache.spark.sql.internal.SQLConf
2324
import org.apache.spark.sql.test.SharedSQLContext
2425

2526
/**
@@ -52,4 +53,34 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext {
5253
assert(df.queryExecution.simpleString.contains(replacement))
5354
}
5455
}
56+
57+
private def isIncluded(queryExecution: QueryExecution, msg: String): Boolean = {
58+
queryExecution.toString.contains(msg) ||
59+
queryExecution.simpleString.contains(msg) ||
60+
queryExecution.stringWithStats.contains(msg)
61+
}
62+
63+
test("explain is redacted using SQLConf") {
64+
withTempDir { dir =>
65+
val basePath = dir.getCanonicalPath
66+
spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
67+
val df = spark.read.parquet(basePath)
68+
val replacement = "*********"
69+
70+
// Respect SparkConf and replace file:/
71+
assert(isIncluded(df.queryExecution, replacement))
72+
73+
assert(isIncluded(df.queryExecution, "FileScan"))
74+
assert(!isIncluded(df.queryExecution, "file:/"))
75+
76+
withSQLConf(SQLConf.SQL_STRING_REDACTION_PATTERN.key -> "(?i)FileScan") {
77+
// Respect SQLConf and replace FileScan
78+
assert(isIncluded(df.queryExecution, replacement))
79+
80+
assert(!isIncluded(df.queryExecution, "FileScan"))
81+
assert(isIncluded(df.queryExecution, "file:/"))
82+
}
83+
}
84+
}
85+
5586
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.UncheckedExecutionException
2828
import org.apache.commons.io.FileUtils
2929
import org.apache.hadoop.conf.Configuration
3030

31-
import org.apache.spark.SparkContext
31+
import org.apache.spark.{SparkConf, SparkContext}
3232
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
3333
import org.apache.spark.sql._
3434
import org.apache.spark.sql.catalyst.plans.logical.Range
@@ -418,6 +418,37 @@ class StreamSuite extends StreamTest {
418418
assert(OutputMode.Update === InternalOutputModes.Update)
419419
}
420420

421+
override protected def sparkConf: SparkConf = super.sparkConf
422+
.set("spark.redaction.string.regex", "file:/[\\w_]+")
423+
424+
test("explain - redaction") {
425+
val replacement = "*********"
426+
427+
val inputData = MemoryStream[String]
428+
val df = inputData.toDS().map(_ + "foo").groupBy("value").agg(count("*"))
429+
// Test StreamingQuery.display
430+
val q = df.writeStream.queryName("memory_explain").outputMode("complete").format("memory")
431+
.start()
432+
.asInstanceOf[StreamingQueryWrapper]
433+
.streamingQuery
434+
try {
435+
inputData.addData("abc")
436+
q.processAllAvailable()
437+
438+
val explainWithoutExtended = q.explainInternal(false)
439+
assert(explainWithoutExtended.contains(replacement))
440+
assert(explainWithoutExtended.contains("StateStoreRestore"))
441+
assert(!explainWithoutExtended.contains("file:/"))
442+
443+
val explainWithExtended = q.explainInternal(true)
444+
assert(explainWithExtended.contains(replacement))
445+
assert(explainWithExtended.contains("StateStoreRestore"))
446+
assert(!explainWithoutExtended.contains("file:/"))
447+
} finally {
448+
q.stop()
449+
}
450+
}
451+
421452
test("explain") {
422453
val inputData = MemoryStream[String]
423454
val df = inputData.toDS().map(_ + "foo").groupBy("value").agg(count("*"))

0 commit comments

Comments
 (0)