Discussion:
Error spark-mahout when spark-submit mode cluster
(too old to reply)
Jaume Galí
2018-08-01 13:54:11 UTC
Permalink
Hi everybody, I'm trying to build a basic recomender with Spark and Mahout on Scala. I use the follow mahout repo to compile mahout with scala 2.11 and spark 2.1.2 mahout_fork <https://github.com/actionml/mahout/tree/sparse-speedup-13.0>
To execute my code I use spark-submit and it run fine when I put --master
local but when I try to run on a cluster with --master
spark://vagrant-ubuntu-trusty-64:7077 <spark://vagrant-ubuntu-trusty-64:7077> it fails always with the same error.

Command (Run Fine):

/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
--master local \
target/scala-2.11/recomender-0.0.1.jar
Command (ERROR):

/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
--master spark <spark://vagrant-ubuntu-trusty-64:7077>: <spark://vagrant-ubuntu-trusty-64:7077>//vagrant-ubuntu-trusty-64:7077 <spark://vagrant-ubuntu-trusty-64:7077> \
target/scala-2.11/recomender-0.0.1.jar
Dependencies on Build.sbt :

name := "recomender"
version := "0.0.1"
scalaVersion := "2.11.11"
val mahoutVersion = "0.13.0"
val sparkVersion = "2.1.2"

libraryDependencies ++= {
Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided" ,
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided" ,
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
/* Mahout */
"org.apache.mahout" %% "mahout-spark" % mahoutVersion
exclude("org.apache.spark", "spark-core_2.11")
exclude("org.apache.spark", "spark-sql_2.11"),
"org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
"org.apache.mahout" % "mahout-math" % mahoutVersion,
"org.apache.mahout" % "mahout-hdfs" % mahoutVersion
exclude("com.thoughtworks.xstream", "xstream")
exclude("org.apache.hadoop", "hadoop-client")
)
}

resolvers += "Local Repository" at "file://"+baseDirectory.value / "repo"
resolvers += Resolver.mavenLocal



Main class:

package com.reco

import org.apache.mahout.sparkbindings.SparkDistributedContext
import org.apache.mahout.sparkbindings._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object GenerateIndicator {

def main(args: Array[String]) {
try {

// Create spark-conf
val sparkConf = new SparkConf().setAppName("recomender")

implicit val mahoutCtx: SparkDistributedContext = mahoutSparkContext(
masterUrl = sparkConf.get("spark.master"),
appName = "recomender",
sparkConf = sparkConf,
// addMahoutJars = true,
addMahoutJars = false
)

implicit val sdc: SparkDistributedContext = sc2sdc(mahoutCtx)

val sparkSession = SparkSession
.builder()
.appName("recomender")
.config(sparkConf)
.getOrCreate()

val lines = returnData()

val linesRdd = sdc.sc.parallelize(lines)

println("...Collecting...")

linesRdd.collect().foreach( item => { // ERROR HERE! on collect()
println(item)
})

// Destroy Spark Session
sparkSession.stop()
sparkSession.close()

} catch {
case e: Exception =>
println(e)
throw new Exception(e)

}

}

def returnData() : Array[String] = {
val lines = Array(
"17,Action",
"17,Comedy",
"17,Crime",
"17,Horror",
"17,Thriller",
"12,Crime",
"12,Thriller",
"16,Comedy",
"16,Romance",
"20,Drama",
"20,Romance",
"7,Drama",
"7,Sci-Fi",
// ... more lines in array ...
"1680,Drama",
"1680,Romance",
"1681,Comedy"
)
lines
}

}
Error::

18/08/01 14:18:53 INFO DAGScheduler: ResultStage 0 (collect at GenerateIndicator.scala:38) failed in 3.551 s due to Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
18/08/01 14:18:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7) on 10.0.2.15, executor 0: java.lang.IllegalStateException (unread block data) [duplicate 7]
18/08/01 14:18:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
18/08/01 14:18:53 INFO DAGScheduler: Job 0 failed: collect at GenerateIndicator.scala:38, took 5.265593 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Thanks a lot for your time.
Cheers.
Eric Link
2018-08-08 13:47:54 UTC
Permalink
unsubscribe
Post by Jaume Galí
Hi everybody, I'm trying to build a basic recomender with Spark and Mahout
on Scala. I use the follow mahout repo to compile mahout with scala 2.11
and spark 2.1.2 mahout_fork <
https://github.com/actionml/mahout/tree/sparse-speedup-13.0>
To execute my code I use spark-submit and it run fine when I put --master
local but when I try to run on a cluster with --master
spark://vagrant-ubuntu-trusty-64:7077
<spark://vagrant-ubuntu-trusty-64:7077> it fails always with the same error.
/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
--master local \
target/scala-2.11/recomender-0.0.1.jar
/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
<spark://vagrant-ubuntu-trusty-64:7077>//vagrant-ubuntu-trusty-64:7077
<spark://vagrant-ubuntu-trusty-64:7077> \
target/scala-2.11/recomender-0.0.1.jar
name := "recomender"
version := "0.0.1"
scalaVersion := "2.11.11"
val mahoutVersion = "0.13.0"
val sparkVersion = "2.1.2"
libraryDependencies ++= {
Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided" ,
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided" ,
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
/* Mahout */
"org.apache.mahout" %% "mahout-spark" % mahoutVersion
exclude("org.apache.spark", "spark-core_2.11")
exclude("org.apache.spark", "spark-sql_2.11"),
"org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
"org.apache.mahout" % "mahout-math" % mahoutVersion,
"org.apache.mahout" % "mahout-hdfs" % mahoutVersion
exclude("com.thoughtworks.xstream", "xstream")
exclude("org.apache.hadoop", "hadoop-client")
)
}
resolvers += "Local Repository" at "file://"+baseDirectory.value / "repo"
resolvers += Resolver.mavenLocal


package com.reco
import org.apache.mahout.sparkbindings.SparkDistributedContext
import org.apache.mahout.sparkbindings._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object GenerateIndicator {
def main(args: Array[String]) {
try {
// Create spark-conf
val sparkConf = new SparkConf().setAppName("recomender")
implicit val mahoutCtx: SparkDistributedContext = mahoutSparkContext(
masterUrl = sparkConf.get("spark.master"),
appName = "recomender",
sparkConf = sparkConf,
// addMahoutJars = true,
addMahoutJars = false
)
implicit val sdc: SparkDistributedContext = sc2sdc(mahoutCtx)
val sparkSession = SparkSession
.builder()
.appName("recomender")
.config(sparkConf)
.getOrCreate()
val lines = returnData()
val linesRdd = sdc.sc.parallelize(lines)
println("...Collecting...")
linesRdd.collect().foreach( item => { // ERROR HERE! on collect()
println(item)
})
// Destroy Spark Session
sparkSession.stop()
sparkSession.close()
} catch {
case e: Exception =>
println(e)
throw new Exception(e)
}
}
def returnData() : Array[String] = {
val lines = Array(
"17,Action",
"17,Comedy",
"17,Crime",
"17,Horror",
"17,Thriller",
"12,Crime",
"12,Thriller",
"16,Comedy",
"16,Romance",
"20,Drama",
"20,Romance",
"7,Drama",
"7,Sci-Fi",
// ... more lines in array ...
"1680,Drama",
"1680,Romance",
"1681,Comedy"
)
lines
}
}
18/08/01 14:18:53 INFO DAGScheduler: ResultStage 0 (collect at
GenerateIndicator.scala:38) failed in 3.551 s due to Job aborted due to
java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/08/01 14:18:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7)
on 10.0.2.15, executor 0: java.lang.IllegalStateException (unread block
data) [duplicate 7]
18/08/01 14:18:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool
18/08/01 14:18:53 INFO DAGScheduler: Job 0 failed: collect at
GenerateIndicator.scala:38, took 5.265593 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage
0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException: unread
block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Thanks a lot for your time.
Cheers.
--
Eric Link
214.641.5465
Amani Kongara
2018-08-08 13:57:47 UTC
Permalink
Unsubscribe
Post by Eric Link
unsubscribe
Post by Jaume Galí
Hi everybody, I'm trying to build a basic recomender with Spark and
Mahout
Post by Jaume Galí
on Scala. I use the follow mahout repo to compile mahout with scala 2.11
and spark 2.1.2 mahout_fork <
https://github.com/actionml/mahout/tree/sparse-speedup-13.0>
To execute my code I use spark-submit and it run fine when I put --master
local but when I try to run on a cluster with --master
spark://vagrant-ubuntu-trusty-64:7077
<spark://vagrant-ubuntu-trusty-64:7077> it fails always with the same
error.
Post by Jaume Galí
/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
--master local \
target/scala-2.11/recomender-0.0.1.jar
/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
<spark://vagrant-ubuntu-trusty-64:7077>//vagrant-ubuntu-trusty-64:7077
<spark://vagrant-ubuntu-trusty-64:7077> \
target/scala-2.11/recomender-0.0.1.jar
name := "recomender"
version := "0.0.1"
scalaVersion := "2.11.11"
val mahoutVersion = "0.13.0"
val sparkVersion = "2.1.2"
libraryDependencies ++= {
Seq(
"org.apache.spark" %% "spark-core" % sparkVersion %
"provided" ,
Post by Jaume Galí
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided"
,
Post by Jaume Galí
"org.apache.spark" %% "spark-mllib" % sparkVersion %
"provided",
Post by Jaume Galí
/* Mahout */
"org.apache.mahout" %% "mahout-spark" % mahoutVersion
exclude("org.apache.spark", "spark-core_2.11")
exclude("org.apache.spark", "spark-sql_2.11"),
"org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
"org.apache.mahout" % "mahout-math" % mahoutVersion,
"org.apache.mahout" % "mahout-hdfs" % mahoutVersion
exclude("com.thoughtworks.xstream", "xstream")
exclude("org.apache.hadoop", "hadoop-client")
)
}
resolvers += "Local Repository" at "file://"+baseDirectory.value / "repo"
resolvers += Resolver.mavenLocal


package com.reco
import org.apache.mahout.sparkbindings.SparkDistributedContext
import org.apache.mahout.sparkbindings._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object GenerateIndicator {
def main(args: Array[String]) {
try {
// Create spark-conf
val sparkConf = new SparkConf().setAppName("recomender")
implicit val mahoutCtx: SparkDistributedContext =
mahoutSparkContext(
Post by Jaume Galí
masterUrl = sparkConf.get("spark.master"),
appName = "recomender",
sparkConf = sparkConf,
// addMahoutJars = true,
addMahoutJars = false
)
implicit val sdc: SparkDistributedContext = sc2sdc(mahoutCtx)
val sparkSession = SparkSession
.builder()
.appName("recomender")
.config(sparkConf)
.getOrCreate()
val lines = returnData()
val linesRdd = sdc.sc.parallelize(lines)
println("...Collecting...")
linesRdd.collect().foreach( item => { // ERROR HERE! on collect()
println(item)
})
// Destroy Spark Session
sparkSession.stop()
sparkSession.close()
} catch {
case e: Exception =>
println(e)
throw new Exception(e)
}
}
def returnData() : Array[String] = {
val lines = Array(
"17,Action",
"17,Comedy",
"17,Crime",
"17,Horror",
"17,Thriller",
"12,Crime",
"12,Thriller",
"16,Comedy",
"16,Romance",
"20,Drama",
"20,Romance",
"7,Drama",
"7,Sci-Fi",
// ... more lines in array ...
"1680,Drama",
"1680,Romance",
"1681,Comedy"
)
lines
}
}
18/08/01 14:18:53 INFO DAGScheduler: ResultStage 0 (collect at
GenerateIndicator.scala:38) failed in 3.551 s due to Job aborted due to
java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
Post by Jaume Galí
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
Post by Jaume Galí
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
Post by Jaume Galí
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Post by Jaume Galí
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Post by Jaume Galí
at java.lang.Thread.run(Thread.java:748)
18/08/01 14:18:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7)
on 10.0.2.15, executor 0: java.lang.IllegalStateException (unread block
data) [duplicate 7]
18/08/01 14:18:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
tasks
Post by Jaume Galí
have all completed, from pool
18/08/01 14:18:53 INFO DAGScheduler: Job 0 failed: collect at
GenerateIndicator.scala:38, took 5.265593 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage
unread
Post by Jaume Galí
block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
Post by Jaume Galí
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
Post by Jaume Galí
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
Post by Jaume Galí
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Post by Jaume Galí
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Post by Jaume Galí
at java.lang.Thread.run(Thread.java:748)
Thanks a lot for your time.
Cheers.
--
Eric Link
214.641.5465
--
Amani Kongara
Dmitriy Lyubimov
2018-08-08 15:12:50 UTC
Permalink
Post by Eric Link
unsubscribe
Post by Jaume Galí
Hi everybody, I'm trying to build a basic recomender with Spark and
Mahout
Post by Jaume Galí
on Scala. I use the follow mahout repo to compile mahout with scala 2.11
and spark 2.1.2 mahout_fork <
https://github.com/actionml/mahout/tree/sparse-speedup-13.0>
To execute my code I use spark-submit and it run fine when I put --master
local but when I try to run on a cluster with --master
spark://vagrant-ubuntu-trusty-64:7077
<spark://vagrant-ubuntu-trusty-64:7077> it fails always with the same
error.
Post by Jaume Galí
/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
--master local \
target/scala-2.11/recomender-0.0.1.jar
/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
<spark://vagrant-ubuntu-trusty-64:7077>//vagrant-ubuntu-trusty-64:7077
<spark://vagrant-ubuntu-trusty-64:7077> \
target/scala-2.11/recomender-0.0.1.jar
name := "recomender"
version := "0.0.1"
scalaVersion := "2.11.11"
val mahoutVersion = "0.13.0"
val sparkVersion = "2.1.2"
libraryDependencies ++= {
Seq(
"org.apache.spark" %% "spark-core" % sparkVersion %
"provided" ,
Post by Jaume Galí
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided"
,
Post by Jaume Galí
"org.apache.spark" %% "spark-mllib" % sparkVersion %
"provided",
Post by Jaume Galí
/* Mahout */
"org.apache.mahout" %% "mahout-spark" % mahoutVersion
exclude("org.apache.spark", "spark-core_2.11")
exclude("org.apache.spark", "spark-sql_2.11"),
"org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
"org.apache.mahout" % "mahout-math" % mahoutVersion,
"org.apache.mahout" % "mahout-hdfs" % mahoutVersion
exclude("com.thoughtworks.xstream", "xstream")
exclude("org.apache.hadoop", "hadoop-client")
)
}
resolvers += "Local Repository" at "file://"+baseDirectory.value / "repo"
resolvers += Resolver.mavenLocal


package com.reco
import org.apache.mahout.sparkbindings.SparkDistributedContext
import org.apache.mahout.sparkbindings._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object GenerateIndicator {
def main(args: Array[String]) {
try {
// Create spark-conf
val sparkConf = new SparkConf().setAppName("recomender")
implicit val mahoutCtx: SparkDistributedContext =
mahoutSparkContext(
Post by Jaume Galí
masterUrl = sparkConf.get("spark.master"),
appName = "recomender",
sparkConf = sparkConf,
// addMahoutJars = true,
addMahoutJars = false
)
implicit val sdc: SparkDistributedContext = sc2sdc(mahoutCtx)
val sparkSession = SparkSession
.builder()
.appName("recomender")
.config(sparkConf)
.getOrCreate()
val lines = returnData()
val linesRdd = sdc.sc.parallelize(lines)
println("...Collecting...")
linesRdd.collect().foreach( item => { // ERROR HERE! on collect()
println(item)
})
// Destroy Spark Session
sparkSession.stop()
sparkSession.close()
} catch {
case e: Exception =>
println(e)
throw new Exception(e)
}
}
def returnData() : Array[String] = {
val lines = Array(
"17,Action",
"17,Comedy",
"17,Crime",
"17,Horror",
"17,Thriller",
"12,Crime",
"12,Thriller",
"16,Comedy",
"16,Romance",
"20,Drama",
"20,Romance",
"7,Drama",
"7,Sci-Fi",
// ... more lines in array ...
"1680,Drama",
"1680,Romance",
"1681,Comedy"
)
lines
}
}
18/08/01 14:18:53 INFO DAGScheduler: ResultStage 0 (collect at
GenerateIndicator.scala:38) failed in 3.551 s due to Job aborted due to
java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(
ObjectInputStream.java:2773)
Post by Jaume Galí
at java.io.ObjectInputStream.readObject0(ObjectInputStream.
java:1599)
Post by Jaume Galí
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at
java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:2060)
Post by Jaume Galí
at java.io.ObjectInputStream.readObject0(ObjectInputStream.
java:1567)
Post by Jaume Galí
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at
org.apache.spark.serializer.JavaDeserializationStream.
readObject(JavaSerializer.scala:75)
Post by Jaume Galí
at
org.apache.spark.serializer.JavaSerializerInstance.
deserialize(JavaSerializer.scala:114)
Post by Jaume Galí
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1149)
Post by Jaume Galí
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:624)
Post by Jaume Galí
at java.lang.Thread.run(Thread.java:748)
18/08/01 14:18:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7)
on 10.0.2.15, executor 0: java.lang.IllegalStateException (unread block
data) [duplicate 7]
18/08/01 14:18:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
tasks
Post by Jaume Galí
have all completed, from pool
18/08/01 14:18:53 INFO DAGScheduler: Job 0 failed: collect at
GenerateIndicator.scala:38, took 5.265593 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task
1
Post by Jaume Galí
in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage
unread
Post by Jaume Galí
block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(
ObjectInputStream.java:2773)
Post by Jaume Galí
at java.io.ObjectInputStream.readObject0(ObjectInputStream.
java:1599)
Post by Jaume Galí
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at
java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:2060)
Post by Jaume Galí
at java.io.ObjectInputStream.readObject0(ObjectInputStream.
java:1567)
Post by Jaume Galí
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at
org.apache.spark.serializer.JavaDeserializationStream.
readObject(JavaSerializer.scala:75)
Post by Jaume Galí
at
org.apache.spark.serializer.JavaSerializerInstance.
deserialize(JavaSerializer.scala:114)
Post by Jaume Galí
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1149)
Post by Jaume Galí
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:624)
Post by Jaume Galí
at java.lang.Thread.run(Thread.java:748)
Thanks a lot for your time.
Cheers.
--
Eric Link
214.641.5465
santosh kumar
2018-08-08 15:34:56 UTC
Permalink
Post by Eric Link
unsubscribe
Post by Jaume Galí
Hi everybody, I'm trying to build a basic recomender with Spark and
Mahout
Post by Jaume Galí
on Scala. I use the follow mahout repo to compile mahout with scala 2.11
and spark 2.1.2 mahout_fork <
https://github.com/actionml/mahout/tree/sparse-speedup-13.0>
To execute my code I use spark-submit and it run fine when I put --master
local but when I try to run on a cluster with --master
spark://vagrant-ubuntu-trusty-64:7077
<spark://vagrant-ubuntu-trusty-64:7077> it fails always with the same
error.
Post by Jaume Galí
/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
--master local \
target/scala-2.11/recomender-0.0.1.jar
/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
<spark://vagrant-ubuntu-trusty-64:7077>//vagrant-ubuntu-trusty-64:7077
<spark://vagrant-ubuntu-trusty-64:7077> \
target/scala-2.11/recomender-0.0.1.jar
name := "recomender"
version := "0.0.1"
scalaVersion := "2.11.11"
val mahoutVersion = "0.13.0"
val sparkVersion = "2.1.2"
libraryDependencies ++= {
Seq(
"org.apache.spark" %% "spark-core" % sparkVersion %
"provided" ,
Post by Jaume Galí
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided"
,
Post by Jaume Galí
"org.apache.spark" %% "spark-mllib" % sparkVersion %
"provided",
Post by Jaume Galí
/* Mahout */
"org.apache.mahout" %% "mahout-spark" % mahoutVersion
exclude("org.apache.spark", "spark-core_2.11")
exclude("org.apache.spark", "spark-sql_2.11"),
"org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
"org.apache.mahout" % "mahout-math" % mahoutVersion,
"org.apache.mahout" % "mahout-hdfs" % mahoutVersion
exclude("com.thoughtworks.xstream", "xstream")
exclude("org.apache.hadoop", "hadoop-client")
)
}
resolvers += "Local Repository" at "file://"+baseDirectory.value / "repo"
resolvers += Resolver.mavenLocal


package com.reco
import org.apache.mahout.sparkbindings.SparkDistributedContext
import org.apache.mahout.sparkbindings._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object GenerateIndicator {
def main(args: Array[String]) {
try {
// Create spark-conf
val sparkConf = new SparkConf().setAppName("recomender")
implicit val mahoutCtx: SparkDistributedContext =
mahoutSparkContext(
Post by Jaume Galí
masterUrl = sparkConf.get("spark.master"),
appName = "recomender",
sparkConf = sparkConf,
// addMahoutJars = true,
addMahoutJars = false
)
implicit val sdc: SparkDistributedContext = sc2sdc(mahoutCtx)
val sparkSession = SparkSession
.builder()
.appName("recomender")
.config(sparkConf)
.getOrCreate()
val lines = returnData()
val linesRdd = sdc.sc.parallelize(lines)
println("...Collecting...")
linesRdd.collect().foreach( item => { // ERROR HERE! on collect()
println(item)
})
// Destroy Spark Session
sparkSession.stop()
sparkSession.close()
} catch {
case e: Exception =>
println(e)
throw new Exception(e)
}
}
def returnData() : Array[String] = {
val lines = Array(
"17,Action",
"17,Comedy",
"17,Crime",
"17,Horror",
"17,Thriller",
"12,Crime",
"12,Thriller",
"16,Comedy",
"16,Romance",
"20,Drama",
"20,Romance",
"7,Drama",
"7,Sci-Fi",
// ... more lines in array ...
"1680,Drama",
"1680,Romance",
"1681,Comedy"
)
lines
}
}
18/08/01 14:18:53 INFO DAGScheduler: ResultStage 0 (collect at
GenerateIndicator.scala:38) failed in 3.551 s due to Job aborted due to
java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
Post by Jaume Galí
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
Post by Jaume Galí
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
Post by Jaume Galí
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Post by Jaume Galí
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Post by Jaume Galí
at java.lang.Thread.run(Thread.java:748)
18/08/01 14:18:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7)
on 10.0.2.15, executor 0: java.lang.IllegalStateException (unread block
data) [duplicate 7]
18/08/01 14:18:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
tasks
Post by Jaume Galí
have all completed, from pool
18/08/01 14:18:53 INFO DAGScheduler: Job 0 failed: collect at
GenerateIndicator.scala:38, took 5.265593 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage
unread
Post by Jaume Galí
block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
Post by Jaume Galí
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
Post by Jaume Galí
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
Post by Jaume Galí
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Post by Jaume Galí
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Post by Jaume Galí
at java.lang.Thread.run(Thread.java:748)
Thanks a lot for your time.
Cheers.
--
Eric Link
214.641.5465
Dmitriy Lyubimov
2018-08-08 15:10:44 UTC
Permalink
My best guess is that it looks like serialization problem at the
cluster/master. This typically happens if class or java versions are
different between driver/worker(s). Why that ended up being the case in
your particular case, for me it is hard to tell. Bottom line, I do not
believe this is a Mahout-specific problem. Sorry if I did not say anything
you had not already known.

-d
Post by Jaume Galí
Hi everybody, I'm trying to build a basic recomender with Spark and Mahout
on Scala. I use the follow mahout repo to compile mahout with scala 2.11
and spark 2.1.2 mahout_fork <https://github.com/actionml/
mahout/tree/sparse-speedup-13.0>
To execute my code I use spark-submit and it run fine when I put --master
local but when I try to run on a cluster with --master
spark://vagrant-ubuntu-trusty-64:7077 <spark://vagrant-ubuntu-trusty-64:7077>
it fails always with the same error.
/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
--master local \
target/scala-2.11/recomender-0.0.1.jar
/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
<spark://vagrant-ubuntu-trusty-64:7077>//vagrant-ubuntu-trusty-64:7077
<spark://vagrant-ubuntu-trusty-64:7077> \
target/scala-2.11/recomender-0.0.1.jar
name := "recomender"
version := "0.0.1"
scalaVersion := "2.11.11"
val mahoutVersion = "0.13.0"
val sparkVersion = "2.1.2"
libraryDependencies ++= {
Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided" ,
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided" ,
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
/* Mahout */
"org.apache.mahout" %% "mahout-spark" % mahoutVersion
exclude("org.apache.spark", "spark-core_2.11")
exclude("org.apache.spark", "spark-sql_2.11"),
"org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
"org.apache.mahout" % "mahout-math" % mahoutVersion,
"org.apache.mahout" % "mahout-hdfs" % mahoutVersion
exclude("com.thoughtworks.xstream", "xstream")
exclude("org.apache.hadoop", "hadoop-client")
)
}
resolvers += "Local Repository" at "file://"+baseDirectory.value / "repo"
resolvers += Resolver.mavenLocal


package com.reco
import org.apache.mahout.sparkbindings.SparkDistributedContext
import org.apache.mahout.sparkbindings._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object GenerateIndicator {
def main(args: Array[String]) {
try {
// Create spark-conf
val sparkConf = new SparkConf().setAppName("recomender")
implicit val mahoutCtx: SparkDistributedContext = mahoutSparkContext(
masterUrl = sparkConf.get("spark.master"),
appName = "recomender",
sparkConf = sparkConf,
// addMahoutJars = true,
addMahoutJars = false
)
implicit val sdc: SparkDistributedContext = sc2sdc(mahoutCtx)
val sparkSession = SparkSession
.builder()
.appName("recomender")
.config(sparkConf)
.getOrCreate()
val lines = returnData()
val linesRdd = sdc.sc.parallelize(lines)
println("...Collecting...")
linesRdd.collect().foreach( item => { // ERROR HERE! on collect()
println(item)
})
// Destroy Spark Session
sparkSession.stop()
sparkSession.close()
} catch {
case e: Exception =>
println(e)
throw new Exception(e)
}
}
def returnData() : Array[String] = {
val lines = Array(
"17,Action",
"17,Comedy",
"17,Crime",
"17,Horror",
"17,Thriller",
"12,Crime",
"12,Thriller",
"16,Comedy",
"16,Romance",
"20,Drama",
"20,Romance",
"7,Drama",
"7,Sci-Fi",
// ... more lines in array ...
"1680,Drama",
"1680,Romance",
"1681,Comedy"
)
lines
}
}
18/08/01 14:18:53 INFO DAGScheduler: ResultStage 0 (collect at
GenerateIndicator.scala:38) failed in 3.551 s due to Job aborted due to
unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(
ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at java.io.ObjectInputStream.defaultReadFields(
ObjectInputStream.java:2278)
at java.io.ObjectInputStream.readSerialData(
ObjectInputStream.java:2202)
at java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at org.apache.spark.serializer.JavaDeserializationStream.
readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.
deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:301)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/08/01 14:18:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7)
on 10.0.2.15, executor 0: java.lang.IllegalStateException (unread block
data) [duplicate 7]
18/08/01 14:18:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool
18/08/01 14:18:53 INFO DAGScheduler: Job 0 failed: collect at
GenerateIndicator.scala:38, took 5.265593 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage
unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(
ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at java.io.ObjectInputStream.defaultReadFields(
ObjectInputStream.java:2278)
at java.io.ObjectInputStream.readSerialData(
ObjectInputStream.java:2202)
at java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at org.apache.spark.serializer.JavaDeserializationStream.
readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.
deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:301)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Thanks a lot for your time.
Cheers.
Continue reading on narkive:
Loading...