Scruggs, Matt
2017-08-14 21:31:15 UTC
Howdy,
I'm running SimilarityAnalysis.cooccurrencesIDSs on a fairly small dataset (about 870k [user, item] rows in the primary action IDSâŠno cross co-occurrence IDS) and I noticed it scales strangely. This is with Mahout 0.13.0 although the same behavior happens in 0.12.x as well (haven't tested it before that).
TLDR - regardless of the Spark parallelism (CPUs) I throw at this routine, every Spark task within the final / busy stage seems to take the same amount of time, which leads me to guess that every shuffle partition contains the same amount of data (perhaps the full dataset matrix in shape/size, albeit with different values). I'm reaching out to see if this is a known algorithmic complexity issue in this routine, or if my config is to blame (or both).
Regarding our hardware, we have identical physical machines in a Mesos cluster with 6 workers and a few masters. Each worker has ~500GB of SSD, 32 cores and 128g RAM. We run lots of Spark jobs and have generally ironed out the kinks in terms of hardware and cluster config, so I don't suspect any hardware-related issues.
Here are some timings for SimilarityAnalysis.cooccurrencesIDSs on this dataset with maxNumInteractions = 500, maxInterestingItemsPerThing = 20, randomSeed = default, parOpts = default (there's lots of other Spark config, this is just what I'm varying to check for effects). In particular, notice how the ratio of (spark.sql.shuffle.partitions / spark.cores.max) affects the runtime:
* 8 executors w/8 cores each, takes about 45 minutes
* note that spark.sql.shuffle.partitions > spark.cores.max
spark.cores.max = 64
spark.executor.cores = 8
spark.sql.shuffle.partitions = 200 (default)
* 1 executors w/24 cores, takes about 65 minutes
* note that spark.sql.shuffle.partitions >>> spark.cores.max
spark.cores.max = 24
spark.executor.cores = 24
spark.sql.shuffle.partitions = 200 (default)
* 1 executor w/8 cores, takes about 8 minutes
* note that spark.sql.shuffle.partitions = spark.cores.max
spark.cores.max = 8
spark.executor.cores = 8 (1 executor w/8 cores)
spark.sql.shuffle.partitions = 8
* 1 executor w/24 cores, takes about 8 minutes (same as 8 cores!)
* note that spark.sql.shuffle.partitions = spark.cores.max
spark.cores.max = 24
spark.executor.cores = 24 (1 executor w/24 cores)
spark.sql.shuffle.partitions = 24
* 32 executors w/2 cores each, takes about 8 minutes (same as 8 cores!)
* note that spark.sql.shuffle.partitions = spark.cores.max
spark.cores.max = 64
spark.executor.cores = 2
spark.sql.shuffle.partitions = 88 (results in 64 tasks for final stage)
Adjusting the "maxNumInteractions" parameter down to 100 and 50 results in a minor improvement (5-10%). I've also played around with removing [user, item] rows from the input dataset for users with only 1 interactionâŠI read to try that in another threadâŠthat yielded maybe a 40-50% speed improvement, but I'd rather not toss out data (unless it truly is totally useless, of course :D ).
When I look at the thread dump within the Spark UI's Executors -> thread dump pages, it seems all the executors are very busy in the code pasted below for >95% of the run. GC throughput is very good so we're not bogged down there...it's just super busy doing running the code below. I am intrigued about the comments on the SequentialAccessSparseVector methods I see being called (getQuick and setQuick), which state they take O(log n) time (https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b45863df3e53/math/src/main/java/org/apache/mahout/math/SequentialAccessSparseVector.java).
Thanks all for your time and feedback!
Matt Scruggs
org.apache.mahout.math.OrderedIntDoubleMapping.find(OrderedIntDoubleMapping.java:105)
org.apache.mahout.math.OrderedIntDoubleMapping.get(OrderedIntDoubleMapping.java:110)
org.apache.mahout.math.SequentialAccessSparseVector.getQuick(SequentialAccessSparseVector.java:157)
org.apache.mahout.math.SparseRowMatrix.getQuick(SparseRowMatrix.java:90)
org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240)
org.apache.mahout.math.scalabindings.MatrixOps.$plus$eq(MatrixOps.scala:45)
org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:163)
org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
org.apache.spark.scheduler.Task.run(Task.scala:86)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
âŠâŠor this codeâŠâŠ
org.apache.mahout.math.SparseRowMatrix.setQuick(SparseRowMatrix.java:105)
org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240)
org.apache.mahout.math.scalabindings.MatrixOps.$plus$eq(MatrixOps.scala:45)
org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:163)
org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
org.apache.spark.scheduler.Task.run(Task.scala:86)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
I'm running SimilarityAnalysis.cooccurrencesIDSs on a fairly small dataset (about 870k [user, item] rows in the primary action IDSâŠno cross co-occurrence IDS) and I noticed it scales strangely. This is with Mahout 0.13.0 although the same behavior happens in 0.12.x as well (haven't tested it before that).
TLDR - regardless of the Spark parallelism (CPUs) I throw at this routine, every Spark task within the final / busy stage seems to take the same amount of time, which leads me to guess that every shuffle partition contains the same amount of data (perhaps the full dataset matrix in shape/size, albeit with different values). I'm reaching out to see if this is a known algorithmic complexity issue in this routine, or if my config is to blame (or both).
Regarding our hardware, we have identical physical machines in a Mesos cluster with 6 workers and a few masters. Each worker has ~500GB of SSD, 32 cores and 128g RAM. We run lots of Spark jobs and have generally ironed out the kinks in terms of hardware and cluster config, so I don't suspect any hardware-related issues.
Here are some timings for SimilarityAnalysis.cooccurrencesIDSs on this dataset with maxNumInteractions = 500, maxInterestingItemsPerThing = 20, randomSeed = default, parOpts = default (there's lots of other Spark config, this is just what I'm varying to check for effects). In particular, notice how the ratio of (spark.sql.shuffle.partitions / spark.cores.max) affects the runtime:
* 8 executors w/8 cores each, takes about 45 minutes
* note that spark.sql.shuffle.partitions > spark.cores.max
spark.cores.max = 64
spark.executor.cores = 8
spark.sql.shuffle.partitions = 200 (default)
* 1 executors w/24 cores, takes about 65 minutes
* note that spark.sql.shuffle.partitions >>> spark.cores.max
spark.cores.max = 24
spark.executor.cores = 24
spark.sql.shuffle.partitions = 200 (default)
* 1 executor w/8 cores, takes about 8 minutes
* note that spark.sql.shuffle.partitions = spark.cores.max
spark.cores.max = 8
spark.executor.cores = 8 (1 executor w/8 cores)
spark.sql.shuffle.partitions = 8
* 1 executor w/24 cores, takes about 8 minutes (same as 8 cores!)
* note that spark.sql.shuffle.partitions = spark.cores.max
spark.cores.max = 24
spark.executor.cores = 24 (1 executor w/24 cores)
spark.sql.shuffle.partitions = 24
* 32 executors w/2 cores each, takes about 8 minutes (same as 8 cores!)
* note that spark.sql.shuffle.partitions = spark.cores.max
spark.cores.max = 64
spark.executor.cores = 2
spark.sql.shuffle.partitions = 88 (results in 64 tasks for final stage)
Adjusting the "maxNumInteractions" parameter down to 100 and 50 results in a minor improvement (5-10%). I've also played around with removing [user, item] rows from the input dataset for users with only 1 interactionâŠI read to try that in another threadâŠthat yielded maybe a 40-50% speed improvement, but I'd rather not toss out data (unless it truly is totally useless, of course :D ).
When I look at the thread dump within the Spark UI's Executors -> thread dump pages, it seems all the executors are very busy in the code pasted below for >95% of the run. GC throughput is very good so we're not bogged down there...it's just super busy doing running the code below. I am intrigued about the comments on the SequentialAccessSparseVector methods I see being called (getQuick and setQuick), which state they take O(log n) time (https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b45863df3e53/math/src/main/java/org/apache/mahout/math/SequentialAccessSparseVector.java).
Thanks all for your time and feedback!
Matt Scruggs
org.apache.mahout.math.OrderedIntDoubleMapping.find(OrderedIntDoubleMapping.java:105)
org.apache.mahout.math.OrderedIntDoubleMapping.get(OrderedIntDoubleMapping.java:110)
org.apache.mahout.math.SequentialAccessSparseVector.getQuick(SequentialAccessSparseVector.java:157)
org.apache.mahout.math.SparseRowMatrix.getQuick(SparseRowMatrix.java:90)
org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240)
org.apache.mahout.math.scalabindings.MatrixOps.$plus$eq(MatrixOps.scala:45)
org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:163)
org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
org.apache.spark.scheduler.Task.run(Task.scala:86)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
âŠâŠor this codeâŠâŠ
org.apache.mahout.math.SparseRowMatrix.setQuick(SparseRowMatrix.java:105)
org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240)
org.apache.mahout.math.scalabindings.MatrixOps.$plus$eq(MatrixOps.scala:45)
org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:163)
org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
org.apache.spark.scheduler.Task.run(Task.scala:86)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)