amazon s3 - Logistic Regression with SparkR using s3 data -
hi i'm trying replicate more examples sparkr when tried use data in s3 got errors. here code i'm running in rstudio within emr cluster:
rm(list=c(ls())) library(sparkr) # initialize spark context sc <- sparkr.init(master="yarn-client", sparkenvir=list(spark.executor.memory="5g"),'logistic') d <- 8 readpartition <- function(part){ part = as.vector(part, mode = "character") part = strsplit(part, "\t", fixed = t) list(matrix(as.numeric(unlist(part)), ncol = 9)) } # read data points , convert each partition matrix points <- cache(lapplypartition(textfile(sc, 's3://mybucket/data.txt'), readpartition)) # initialize w random value w <- runif(n=d, min = -1, max = 1) cat("initial w: ", w, "\n") # compute logistic regression gradient matrix of data points gradient <- function(partition) { partition = partition[[1]] y <- partition[, 1] # point labels (first column of input file) x <- partition[, -1] # point coordinates # each point (x, y), compute gradient function dot <- x %*% w logit <- 1 / (1 + exp(-y * dot)) grad <- t(x) %*% ((logit - 1) * y) list(grad) } (i in 1:iterations) { cat("on iteration ", i, "\n") w <- w - reduce(lapplypartition(points, gradient), "+") } cat("final w: ", w, "\n")
i got error reading data "points":
points <- cache(lapplypartition(textfile(sc, 's3://mybucket/data.txt'), readpartition)) collect on 65 failed java.lang.reflect.invocationtargetexception java.lang.reflect.invocationtargetexception @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ edu.berkeley.cs.amplab.sparkr.sparkrbackendhandler.handlemethodcall(sparkrbackendhandler.scala:111) @ edu.berkeley.cs.amplab.sparkr.sparkrbackendhandler.channelread0(sparkrbackendhandler.scala:58) @ edu.berkeley.cs.amplab.sparkr.sparkrbackendhandler.channelread0(sparkrbackendhandler.scala:19) @ io.netty.channel.simplechannelinboundhandler.channelread(simplechannelinboundhandler.java:105) @ io.netty.channel.abstractchannelhandlercontext.invokechannelread(abstractchannelhandlercontext.java:333) @ io.netty.channel.abstractchannelhandlercontext.firechannelread(abstractchannelhandlercontext.java:319) @ io.netty.handler.codec.messagetomessagedecoder.channelread(messagetomessagedecoder.java:103) @ io.netty.channel.abstractchannelhandlercontext.invokechannelread(abstractchannelhandlercontext.java:333) @ io.netty.channel.abstractchannelhandlercontext.firechannelread(abstractchannelhandlercontext.java:319) @ io.netty.handler.codec.bytetomessagedecoder.channelread(bytetomessagedecoder.java:163) @ io.netty.channel.abstractchannelhandlercontext.invokechannelread(abstractchannelhandlercontext.java:333) @ io.netty.channel.abstractchannelhandlercontext.firechannelread(abstractchannelhandlercontext.java:319) @ io.netty.channel.defaultchannelpipeline.firechannelread(defaultchannelpipeline.java:787) @ io.netty.channel.nio.abstractniobytechannel$niobyteunsafe.read(abstractniobytechannel.java:130) @ io.netty.channel.nio.nioeventloop.processselectedkey(nioeventloop.java:511) @ io.netty.channel.nio.nioeventloop.processselectedkeysoptimized(nioeventloop.java:468) @ io.netty.channel.nio.nioeventloop.processselectedkeys(nioeventloop.java:382) @ io.netty.channel.nio.nioeventloop.run(nioeventloop.java:354) @ io.netty.util.concurrent.singlethreadeventexecutor$2.run(singlethreadeventexecutor.java:116) @ io.netty.util.concurrent.defaultthreadfactory$defaultrunnabledecorator.run(defaultthreadfactory.java:137) @ java.lang.thread.run(thread.java:745) caused by: java.lang.runtimeexception: error in configuring object @ org.apache.hadoop.util.reflectionutils.setjobconf(reflectionutils.java:109) @ org.apache.hadoop.util.reflectionutils.setconf(reflectionutils.java:75) @ org.apache.hadoop.util.reflectionutils.newinstance(reflectionutils.java:133) @ org.apache.spark.rdd.hadooprdd.getinputformat(hadooprdd.scala:186) @ org.apache.spark.rdd.hadooprdd.getpartitions(hadooprdd.scala:199) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:219) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:217) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.rdd.rdd.partitions(rdd.scala:217) @ org.apache.spark.rdd.mappartitionsrdd.getpartitions(mappartitionsrdd.scala:32) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:219) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:217) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.rdd.rdd.partitions(rdd.scala:217) @ edu.berkeley.cs.amplab.sparkr.baserrdd.getpartitions(rrdd.scala:31) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:219) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:217) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.rdd.rdd.partitions(rdd.scala:217) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1511) @ org.apache.spark.rdd.rdd.collect(rdd.scala:813) @ org.apache.spark.api.java.javarddlike$class.collect(javarddlike.scala:312) @ org.apache.spark.api.java.javardd.collect(javardd.scala:32) ... 25 more caused by: java.lang.reflect.invocationtargetexception @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ org.apache.hadoop.util.reflectionutils.setjobconf(reflectionutils.java:106) ... 47 more caused by: java.lang.illegalargumentexception: compression codec com.hadoop.compression.lzo.lzocodec not found. @ org.apache.hadoop.io.compress.compressioncodecfactory.getcodecclasses(compressioncodecfactory.java:135) @ org.apache.hadoop.io.compress.compressioncodecfactory.<init>(compressioncodecfactory.java:175) @ org.apache.hadoop.mapred.textinputformat.configure(textinputformat.java:45) ... 52 more caused by: java.lang.classnotfoundexception: class com.hadoop.compression.lzo.lzocodec not found @ org.apache.hadoop.conf.configuration.getclassbyname(configuration.java:1811) @ org.apache.hadoop.io.compress.compressioncodecfactory.getcodecclasses(compressioncodecfactory.java:128) ... 54 more error: returnstatus == 0 not true 22 apr 2015 18:29:28 [rsession-rstudio] error r error 4 (r code execution error) [errormsg=error: returnstatus == 0 not true ]; occurred at: core::error r::exec::<unnamed>::evaluateexpressionsunsafe(sexprec*, sexprec*, sexprec**, r::sexp::protect*) /root/rstudio/src/cpp/r/rexec.cpp:145; logged from: core::json::value session::modules::environment::vartojson(sexprec*, const r::sexp::variable&) /root/rstudio/src/cpp/session/modules/environment/environmentutils.cpp:134 > points <- cache(lapplypartition(textfile(sc, 's3://datascience.hadoop.spark.r/data/modeldata.txt'), readpartition)) collect on 75 failed java.lang.reflect.invocationtargetexception java.lang.reflect.invocationtargetexception @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ edu.berkeley.cs.amplab.sparkr.sparkrbackendhandler.handlemethodcall(sparkrbackendhandler.scala:111) @ edu.berkeley.cs.amplab.sparkr.sparkrbackendhandler.channelread0(sparkrbackendhandler.scala:58) @ edu.berkeley.cs.amplab.sparkr.sparkrbackendhandler.channelread0(sparkrbackendhandler.scala:19) @ io.netty.channel.simplechannelinboundhandler.channelread(simplechannelinboundhandler.java:105) @ io.netty.channel.abstractchannelhandlercontext.invokechannelread(abstractchannelhandlercontext.java:333) @ io.netty.channel.abstractchannelhandlercontext.firechannelread(abstractchannelhandlercontext.java:319) @ io.netty.handler.codec.messagetomessagedecoder.channelread(messagetomessagedecoder.java:103) @ io.netty.channel.abstractchannelhandlercontext.invokechannelread(abstractchannelhandlercontext.java:333) @ io.netty.channel.abstractchannelhandlercontext.firechannelread(abstractchannelhandlercontext.java:319) @ io.netty.handler.codec.bytetomessagedecoder.channelread(bytetomessagedecoder.java:163) @ io.netty.channel.abstractchannelhandlercontext.invokechannelread(abstractchannelhandlercontext.java:333) @ io.netty.channel.abstractchannelhandlercontext.firechannelread(abstractchannelhandlercontext.java:319) @ io.netty.channel.defaultchannelpipeline.firechannelread(defaultchannelpipeline.java:787) @ io.netty.channel.nio.abstractniobytechannel$niobyteunsafe.read(abstractniobytechannel.java:130) @ io.netty.channel.nio.nioeventloop.processselectedkey(nioeventloop.java:511) @ io.netty.channel.nio.nioeventloop.processselectedkeysoptimized(nioeventloop.java:468) @ io.netty.channel.nio.nioeventloop.processselectedkeys(nioeventloop.java:382) @ io.netty.channel.nio.nioeventloop.run(nioeventloop.java:354) @ io.netty.util.concurrent.singlethreadeventexecutor$2.run(singlethreadeventexecutor.java:116) @ io.netty.util.concurrent.defaultthreadfactory$defaultrunnabledecorator.run(defaultthreadfactory.java:137) @ java.lang.thread.run(thread.java:745) caused by: java.lang.runtimeexception: error in configuring object @ org.apache.hadoop.util.reflectionutils.setjobconf(reflectionutils.java:109) @ org.apache.hadoop.util.reflectionutils.setconf(reflectionutils.java:75) @ org.apache.hadoop.util.reflectionutils.newinstance(reflectionutils.java:133) @ org.apache.spark.rdd.hadooprdd.getinputformat(hadooprdd.scala:186) @ org.apache.spark.rdd.hadooprdd.getpartitions(hadooprdd.scala:199) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:219) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:217) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.rdd.rdd.partitions(rdd.scala:217) @ org.apache.spark.rdd.mappartitionsrdd.getpartitions(mappartitionsrdd.scala:32) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:219) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:217) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.rdd.rdd.partitions(rdd.scala:217) @ edu.berkeley.cs.amplab.sparkr.baserrdd.getpartitions(rrdd.scala:31) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:219) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:217) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.rdd.rdd.partitions(rdd.scala:217) @ edu.berkeley.cs.amplab.sparkr.baserrdd.getpartitions(rrdd.scala:31) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:219) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:217) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.rdd.rdd.partitions(rdd.scala:217) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1511) @ org.apache.spark.rdd.rdd.collect(rdd.scala:813) @ org.apache.spark.api.java.javarddlike$class.collect(javarddlike.scala:312) @ org.apache.spark.api.java.javardd.collect(javardd.scala:32) ... 25 more caused by: java.lang.reflect.invocationtargetexception @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ org.apache.hadoop.util.reflectionutils.setjobconf(reflectionutils.java:106) ... 52 more caused by: java.lang.illegalargumentexception: compression codec com.hadoop.compression.lzo.lzocodec not found. @ org.apache.hadoop.io.compress.compressioncodecfactory.getcodecclasses(compressioncodecfactory.java:135) @ org.apache.hadoop.io.compress.compressioncodecfactory.<init>(compressioncodecfactory.java:175) @ org.apache.hadoop.mapred.textinputformat.configure(textinputformat.java:45) ... 57 more caused by: java.lang.classnotfoundexception: class com.hadoop.compression.lzo.lzocodec not found @ org.apache.hadoop.conf.configuration.getclassbyname(configuration.java:1811) @ org.apache.hadoop.io.compress.compressioncodecfactory.getcodecclasses(compressioncodecfactory.java:128) ... 59 more error: returnstatus == 0 not true 22 apr 2015 18:30:53 [rsession-rstudio] error r error 4 (r code execution error) [errormsg=error: returnstatus == 0 not true ]; occurred at: core::error r::exec::<unnamed>::evaluateexpressionsunsafe(sexprec*, sexprec*, sexprec**, r::sexp::protect*) /root/rstudio/src/cpp/r/rexec.cpp:145; logged from: core::json::value session::modules::environment::vartojson(sexprec*, const r::sexp::variable&) /root/rstudio/src/cpp/session/modules/environment/environmentutils.cpp:134 > file=textfile(sc, 's3://datascience.hadoop.spark.r/data/modeldata.txt',9) collect on 80 failed java.lang.reflect.invocationtargetexception java.lang.reflect.invocationtargetexception @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ edu.berkeley.cs.amplab.sparkr.sparkrbackendhandler.handlemethodcall(sparkrbackendhandler.scala:111) @ edu.berkeley.cs.amplab.sparkr.sparkrbackendhandler.channelread0(sparkrbackendhandler.scala:58) @ edu.berkeley.cs.amplab.sparkr.sparkrbackendhandler.channelread0(sparkrbackendhandler.scala:19) @ io.netty.channel.simplechannelinboundhandler.channelread(simplechannelinboundhandler.java:105) @ io.netty.channel.abstractchannelhandlercontext.invokechannelread(abstractchannelhandlercontext.java:333) @ io.netty.channel.abstractchannelhandlercontext.firechannelread(abstractchannelhandlercontext.java:319) @ io.netty.handler.codec.messagetomessagedecoder.channelread(messagetomessagedecoder.java:103) @ io.netty.channel.abstractchannelhandlercontext.invokechannelread(abstractchannelhandlercontext.java:333) @ io.netty.channel.abstractchannelhandlercontext.firechannelread(abstractchannelhandlercontext.java:319) @ io.netty.handler.codec.bytetomessagedecoder.channelread(bytetomessagedecoder.java:163) @ io.netty.channel.abstractchannelhandlercontext.invokechannelread(abstractchannelhandlercontext.java:333) @ io.netty.channel.abstractchannelhandlercontext.firechannelread(abstractchannelhandlercontext.java:319) @ io.netty.channel.defaultchannelpipeline.firechannelread(defaultchannelpipeline.java:787) @ io.netty.channel.nio.abstractniobytechannel$niobyteunsafe.read(abstractniobytechannel.java:130) @ io.netty.channel.nio.nioeventloop.processselectedkey(nioeventloop.java:511) @ io.netty.channel.nio.nioeventloop.processselectedkeysoptimized(nioeventloop.java:468) @ io.netty.channel.nio.nioeventloop.processselectedkeys(nioeventloop.java:382) @ io.netty.channel.nio.nioeventloop.run(nioeventloop.java:354) @ io.netty.util.concurrent.singlethreadeventexecutor$2.run(singlethreadeventexecutor.java:116) @ io.netty.util.concurrent.defaultthreadfactory$defaultrunnabledecorator.run(defaultthreadfactory.java:137) @ java.lang.thread.run(thread.java:745) caused by: java.lang.runtimeexception: error in configuring object @ org.apache.hadoop.util.reflectionutils.setjobconf(reflectionutils.java:109) @ org.apache.hadoop.util.reflectionutils.setconf(reflectionutils.java:75) @ org.apache.hadoop.util.reflectionutils.newinstance(reflectionutils.java:133) @ org.apache.spark.rdd.hadooprdd.getinputformat(hadooprdd.scala:186) @ org.apache.spark.rdd.hadooprdd.getpartitions(hadooprdd.scala:199) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:219) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:217) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.rdd.rdd.partitions(rdd.scala:217) @ org.apache.spark.rdd.mappartitionsrdd.getpartitions(mappartitionsrdd.scala:32) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:219) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:217) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.rdd.rdd.partitions(rdd.scala:217) @ edu.berkeley.cs.amplab.sparkr.baserrdd.getpartitions(rrdd.scala:31) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:219) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:217) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.rdd.rdd.partitions(rdd.scala:217) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1511) @ org.apache.spark.rdd.rdd.collect(rdd.scala:813) @ org.apache.spark.api.java.javarddlike$class.collect(javarddlike.scala:312) @ org.apache.spark.api.java.javardd.collect(javardd.scala:32) ... 25 more caused by: java.lang.reflect.invocationtargetexception @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ org.apache.hadoop.util.reflectionutils.setjobconf(reflectionutils.java:106) ... 47 more caused by: java.lang.illegalargumentexception: compression codec com.hadoop.compression.lzo.lzocodec not found. @ org.apache.hadoop.io.compress.compressioncodecfactory.getcodecclasses(compressioncodecfactory.java:135) @ org.apache.hadoop.io.compress.compressioncodecfactory.<init>(compressioncodecfactory.java:175) @ org.apache.hadoop.mapred.textinputformat.configure(textinputformat.java:45) ... 52 more caused by: java.lang.classnotfoundexception: class com.hadoop.compression.lzo.lzocodec not found @ org.apache.hadoop.conf.configuration.getclassbyname(configuration.java:1811) @ org.apache.hadoop.io.compress.compressioncodecfactory.getcodecclasses(compressioncodecfactory.java:128) ... 54 more error: returnstatus == 0 not true 22 apr 2015 18:39:07 [rsession-rstudio] error r error 4 (r code execution error) [errormsg=error: returnstatus == 0 not true ]; occurred at: core::error r::exec::<unnamed>::evaluateexpressionsunsafe(sexprec*, sexprec*, sexprec**, r::sexp::protect*) /root/rstudio/src/cpp/r/rexec.cpp:145; logged from: core::json::value session::modules::environment::vartojson(sexprec*, const r::sexp::variable&) /root/rstudio/src/cpp/session/modules/environment/environmentutils.cpp:134
just wonder if can help?
the error states:
caused by: java.lang.illegalargumentexception: compression codec com.hadoop.compression.lzo.lzocodec not found.
so need add hadoop lzo jar spark class path. if aws emr in /home/hadoop/share/
hadoop-*lzo*.jar
.
Comments
Post a Comment