Skip to content

Commit 8f44edf

Browse files
committed
fix.
1 parent 40de176 commit 8f44edf

File tree

2 files changed

+40
-4
lines changed

2 files changed

+40
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,13 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
194194
}
195195
}
196196

197-
def simpleString: String = {
197+
def simpleString: String = withRedaction {
198198
s"""== Physical Plan ==
199199
|${stringOrError(executedPlan.treeString(verbose = false))}
200200
""".stripMargin.trim
201201
}
202202

203-
override def toString: String = {
203+
override def toString: String = withRedaction {
204204
def output = Utils.truncatedString(
205205
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")
206206
val analyzedPlan = Seq(
@@ -219,7 +219,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
219219
""".stripMargin.trim
220220
}
221221

222-
def stringWithStats: String = {
222+
def stringWithStats: String = withRedaction {
223223
// trigger to compute stats for logical plans
224224
optimizedPlan.stats
225225

@@ -231,6 +231,13 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
231231
""".stripMargin.trim
232232
}
233233

234+
/**
235+
* Redact the sensitive information in the given string.
236+
*/
237+
private def withRedaction(message: => String): String = {
238+
Utils.redact(SparkSession.getActiveSession.map(_.sparkContext.conf).orNull, message)
239+
}
240+
234241
/** A special namespace for commands that can be used to debug query execution. */
235242
// scalastyle:off
236243
object debug {

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.UncheckedExecutionException
2828
import org.apache.commons.io.FileUtils
2929
import org.apache.hadoop.conf.Configuration
3030

31-
import org.apache.spark.SparkContext
31+
import org.apache.spark.{SparkConf, SparkContext}
3232
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
3333
import org.apache.spark.sql._
3434
import org.apache.spark.sql.catalyst.plans.logical.Range
@@ -418,6 +418,35 @@ class StreamSuite extends StreamTest {
418418
assert(OutputMode.Update === InternalOutputModes.Update)
419419
}
420420

421+
override protected def sparkConf: SparkConf = super.sparkConf
422+
.set("spark.redaction.string.regex", "file:/[\\w_]+")
423+
424+
test("explain - redaction") {
425+
val replacement = "*********"
426+
427+
val inputData = MemoryStream[String]
428+
val df = inputData.toDS().map(_ + "foo").groupBy("value").agg(count("*"))
429+
// Test StreamingQuery.display
430+
val q = df.writeStream.queryName("memory_explain").outputMode("complete").format("memory")
431+
.start()
432+
.asInstanceOf[StreamingQueryWrapper]
433+
.streamingQuery
434+
try {
435+
inputData.addData("abc")
436+
q.processAllAvailable()
437+
438+
val explainWithoutExtended = q.explainInternal(false)
439+
assert(explainWithoutExtended.contains(replacement))
440+
assert(explainWithoutExtended.contains("StateStoreRestore"))
441+
442+
val explainWithExtended = q.explainInternal(true)
443+
assert(explainWithExtended.contains(replacement))
444+
assert(explainWithExtended.contains("StateStoreRestore"))
445+
} finally {
446+
q.stop()
447+
}
448+
}
449+
421450
test("explain") {
422451
val inputData = MemoryStream[String]
423452
val df = inputData.toDS().map(_ + "foo").groupBy("value").agg(count("*"))

0 commit comments

Comments
 (0)