Skip to content

Commit ddc9905

Browse files
yifeihbulldozer-bot[bot]
authored andcommitted
SPARK-25299: Add rest of shuffle writer benchmarks (apache-spark-on-k8s#507)
1 parent 5cf18c4 commit ddc9905

File tree

5 files changed

+228
-53
lines changed

5 files changed

+228
-53
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.sort
19+
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.benchmark.Benchmark
22+
import org.apache.spark.util.Utils
23+
24+
/**
25+
* Benchmark to measure performance for aggregate primitives.
26+
* {{{
27+
* To run this benchmark:
28+
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
29+
* 2. build/sbt "sql/test:runMain <this class>"
30+
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
31+
* Results will be written to "benchmarks/<this class>-results.txt".
32+
* }}}
33+
*/
34+
object BypassMergeSortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase {
35+
36+
private val shuffleHandle: BypassMergeSortShuffleHandle[String, String] =
37+
new BypassMergeSortShuffleHandle[String, String](
38+
shuffleId = 0,
39+
numMaps = 1,
40+
dependency)
41+
42+
private val MIN_NUM_ITERS = 10
43+
private val DATA_SIZE_SMALL = 1000
44+
private val DATA_SIZE_LARGE =
45+
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES/4/DEFAULT_DATA_STRING_SIZE
46+
47+
def getWriter(transferTo: Boolean): BypassMergeSortShuffleWriter[String, String] = {
48+
val conf = new SparkConf(loadDefaults = false)
49+
conf.set("spark.file.transferTo", String.valueOf(transferTo))
50+
conf.set("spark.shuffle.file.buffer", "32k")
51+
52+
val shuffleWriter = new BypassMergeSortShuffleWriter[String, String](
53+
blockManager,
54+
blockResolver,
55+
shuffleHandle,
56+
0,
57+
conf,
58+
taskContext.taskMetrics().shuffleWriteMetrics
59+
)
60+
61+
shuffleWriter
62+
}
63+
64+
def writeBenchmarkWithLargeDataset(): Unit = {
65+
val size = DATA_SIZE_LARGE
66+
val benchmark = new Benchmark(
67+
"BypassMergeSortShuffleWrite with spill",
68+
size,
69+
minNumIters = MIN_NUM_ITERS,
70+
output = output)
71+
72+
addBenchmarkCase(benchmark, "without transferTo", size, () => getWriter(false))
73+
addBenchmarkCase(benchmark, "with transferTo", size, () => getWriter(true))
74+
benchmark.run()
75+
}
76+
77+
def writeBenchmarkWithSmallDataset(): Unit = {
78+
val size = DATA_SIZE_SMALL
79+
val benchmark = new Benchmark("BypassMergeSortShuffleWrite without spill",
80+
size,
81+
minNumIters = MIN_NUM_ITERS,
82+
output = output)
83+
addBenchmarkCase(benchmark, "small dataset without disk spill", size, () => getWriter(false))
84+
benchmark.run()
85+
}
86+
87+
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
88+
runBenchmark("BypassMergeSortShuffleWriter write") {
89+
writeBenchmarkWithSmallDataset()
90+
writeBenchmarkWithLargeDataset()
91+
}
92+
}
93+
}

core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleWriterBenchmarkBase.scala

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.shuffle.sort
1919

20-
import java.io.{BufferedInputStream, Closeable, File, FileInputStream, FileOutputStream}
20+
import java.io.File
2121
import java.util.UUID
2222

2323
import org.apache.commons.io.FileUtils
@@ -35,7 +35,7 @@ import org.apache.spark.executor.TaskMetrics
3535
import org.apache.spark.memory.{MemoryManager, TaskMemoryManager, TestMemoryManager}
3636
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
3737
import org.apache.spark.serializer.{KryoSerializer, Serializer, SerializerManager}
38-
import org.apache.spark.shuffle.IndexShuffleBlockResolver
38+
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter}
3939
import org.apache.spark.storage.{BlockManager, DiskBlockManager, TempShuffleBlockId}
4040
import org.apache.spark.util.Utils
4141

@@ -121,10 +121,26 @@ abstract class ShuffleWriterBenchmarkBase extends BenchmarkBase {
121121
blockManager)
122122
}
123123

124-
def addBenchmarkCase(benchmark: Benchmark, name: String)(func: Benchmark.Timer => Unit): Unit = {
124+
def addBenchmarkCase(
125+
benchmark: Benchmark,
126+
name: String,
127+
size: Int,
128+
writerSupplier: () => ShuffleWriter[String, String],
129+
numSpillFiles: Option[Int] = Option.empty): Unit = {
125130
benchmark.addTimerCase(name) { timer =>
126131
setup()
127-
func(timer)
132+
val writer = writerSupplier()
133+
val dataIterator = createDataIterator(size)
134+
try {
135+
timer.startTiming()
136+
writer.write(dataIterator)
137+
timer.stopTiming()
138+
if (numSpillFiles.isDefined) {
139+
assert(tempFilesCreated.length == numSpillFiles.get)
140+
}
141+
} finally {
142+
writer.stop(true)
143+
}
128144
teardown()
129145
}
130146
}

core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala

Lines changed: 20 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -92,74 +92,45 @@ object SortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase {
9292
size,
9393
minNumIters = MIN_NUM_ITERS,
9494
output = output)
95-
addBenchmarkCase(benchmark, "small dataset without spills") { timer =>
96-
val shuffleWriter = getWriter(Option.empty, Option.empty)
97-
val dataIterator = createDataIterator(size)
98-
try {
99-
timer.startTiming()
100-
shuffleWriter.write(dataIterator)
101-
timer.stopTiming()
102-
assert(tempFilesCreated.isEmpty)
103-
} finally {
104-
shuffleWriter.stop(true)
105-
}
106-
}
95+
addBenchmarkCase(benchmark,
96+
"small dataset without spills",
97+
size,
98+
() => getWriter(Option.empty, Option.empty),
99+
Some(0))
107100
benchmark.run()
108101
}
109102

110103
def writeBenchmarkWithSpill(): Unit = {
111104
val size = DATA_SIZE_LARGE
112-
113105
val benchmark = new Benchmark("SortShuffleWriter with spills",
114106
size,
115107
minNumIters = MIN_NUM_ITERS,
116108
output = output,
117109
outputPerIteration = true)
118-
addBenchmarkCase(benchmark, "no map side combine") { timer =>
119-
val shuffleWriter = getWriter(Option.empty, Option.empty)
120-
val dataIterator = createDataIterator(size)
121-
try {
122-
timer.startTiming()
123-
shuffleWriter.write(dataIterator)
124-
timer.stopTiming()
125-
assert(tempFilesCreated.length == 7)
126-
} finally {
127-
shuffleWriter.stop(true)
128-
}
129-
}
110+
addBenchmarkCase(benchmark,
111+
"no map side combine",
112+
size,
113+
() => getWriter(Option.empty, Option.empty),
114+
Some(7))
130115

131116
def createCombiner(i: String): String = i
132117
def mergeValue(i: String, j: String): String = if (Ordering.String.compare(i, j) > 0) i else j
133118
def mergeCombiners(i: String, j: String): String =
134119
if (Ordering.String.compare(i, j) > 0) i else j
135120
val aggregator =
136121
new Aggregator[String, String, String](createCombiner, mergeValue, mergeCombiners)
137-
addBenchmarkCase(benchmark, "with map side aggregation") { timer =>
138-
val shuffleWriter = getWriter(Some(aggregator), Option.empty)
139-
val dataIterator = createDataIterator(size)
140-
try {
141-
timer.startTiming()
142-
shuffleWriter.write(dataIterator)
143-
timer.stopTiming()
144-
assert(tempFilesCreated.length == 7)
145-
} finally {
146-
shuffleWriter.stop(true)
147-
}
148-
}
122+
addBenchmarkCase(benchmark,
123+
"with map side aggregation",
124+
size,
125+
() => getWriter(Some(aggregator), Option.empty),
126+
Some(7))
149127

150128
val sorter = Ordering.String
151-
addBenchmarkCase(benchmark, "with map side sort") { timer =>
152-
val shuffleWriter = getWriter(Option.empty, Some(sorter))
153-
val dataIterator = createDataIterator(size)
154-
try {
155-
timer.startTiming()
156-
shuffleWriter.write(dataIterator)
157-
timer.stopTiming()
158-
assert(tempFilesCreated.length == 7)
159-
} finally {
160-
shuffleWriter.stop(true)
161-
}
162-
}
129+
addBenchmarkCase(benchmark,
130+
"with map side sort",
131+
size,
132+
() => getWriter(Option.empty, Some(sorter)),
133+
Some(7))
163134
benchmark.run()
164135
}
165136

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.shuffle.sort
18+
19+
import org.apache.spark.SparkConf
20+
import org.apache.spark.benchmark.Benchmark
21+
import org.apache.spark.util.Utils
22+
23+
/**
24+
* Benchmark to measure performance for aggregate primitives.
25+
* {{{
26+
* To run this benchmark:
27+
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
28+
* 2. build/sbt "sql/test:runMain <this class>"
29+
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
30+
* Results will be written to "benchmarks/<this class>-results.txt".
31+
* }}}
32+
*/
33+
object UnsafeShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase {
34+
35+
private val shuffleHandle: SerializedShuffleHandle[String, String] =
36+
new SerializedShuffleHandle[String, String](0, 0, this.dependency)
37+
38+
private val MIN_NUM_ITERS = 10
39+
private val DATA_SIZE_SMALL = 1000
40+
private val DATA_SIZE_LARGE =
41+
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES/2/DEFAULT_DATA_STRING_SIZE
42+
43+
def getWriter(transferTo: Boolean): UnsafeShuffleWriter[String, String] = {
44+
val conf = new SparkConf(loadDefaults = false)
45+
conf.set("spark.file.transferTo", String.valueOf(transferTo))
46+
47+
new UnsafeShuffleWriter[String, String](
48+
blockManager,
49+
blockResolver,
50+
taskMemoryManager,
51+
shuffleHandle,
52+
0,
53+
taskContext,
54+
conf,
55+
taskContext.taskMetrics().shuffleWriteMetrics
56+
)
57+
}
58+
59+
def writeBenchmarkWithSmallDataset(): Unit = {
60+
val size = DATA_SIZE_SMALL
61+
val benchmark = new Benchmark("UnsafeShuffleWriter without spills",
62+
size,
63+
minNumIters = MIN_NUM_ITERS,
64+
output = output)
65+
addBenchmarkCase(benchmark,
66+
"small dataset without spills",
67+
size,
68+
() => getWriter(false),
69+
Some(1)) // The single temp file is for the temp index file
70+
benchmark.run()
71+
}
72+
73+
def writeBenchmarkWithSpill(): Unit = {
74+
val size = DATA_SIZE_LARGE
75+
val benchmark = new Benchmark("UnsafeShuffleWriter with spills",
76+
size,
77+
minNumIters = MIN_NUM_ITERS,
78+
output = output,
79+
outputPerIteration = true)
80+
addBenchmarkCase(benchmark, "without transferTo", size, () => getWriter(false), Some(7))
81+
addBenchmarkCase(benchmark, "with transferTo", size, () => getWriter(true), Some(7))
82+
benchmark.run()
83+
}
84+
85+
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
86+
runBenchmark("UnsafeShuffleWriter write") {
87+
writeBenchmarkWithSmallDataset()
88+
writeBenchmarkWithSpill()
89+
}
90+
}
91+
}

dev/run-spark-25299-benchmarks.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,16 @@ done
5050

5151
echo "Running SPARK-25299 benchmarks"
5252

53+
SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriterBenchmark"
5354
SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.SortShuffleWriterBenchmark"
55+
SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.UnsafeShuffleWriterBenchmark"
5456

5557
SPARK_DIR=`pwd`
5658

5759
mkdir -p /tmp/artifacts
60+
cp $SPARK_DIR/sql/core/benchmarks/BypassMergeSortShuffleWriterBenchmark-results.txt /tmp/artifacts/
5861
cp $SPARK_DIR/sql/core/benchmarks/SortShuffleWriterBenchmark-results.txt /tmp/artifacts/
62+
cp $SPARK_DIR/sql/core/benchmarks/UnsafeShuffleWriterBenchmark-results.txt /tmp/artifacts/
5963

6064
if [ "$UPLOAD" = false ]; then
6165
exit 0

0 commit comments

Comments
 (0)