【发布时间】:2016-03-18 05:27:25
【问题描述】:
我想从 spark.elasticsearch 中建立索引。它抛出以下异常...
org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 0 失败 1 次,最近一次失败:阶段 1.0 中丢失任务 0.0(TID 1,本地主机):java.lang.StringIndexOutOfBoundsException:字符串索引超出范围:-1 在 java.lang.String.substring(String.java:1967) 在 org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:110) 在 org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:58) 在 org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:372) 在 org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40) 在 org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 在 org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在 org.apache.spark.scheduler.Task.run(Task.scala:88) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745)
驱动程序堆栈跟踪: 在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) 在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) 在 scala.Option.foreach(Option.scala:236) 在 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) 在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 在 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1835) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1912) 在 org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:67) 在 org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:52) 在 org.elasticsearch.spark.rdd.api.java.JavaEsSpark$.saveToEs(JavaEsSpark.scala:54) 在 org.elasticsearch.spark.rdd.api.java.JavaEsSpark.saveToEs(JavaEsSpark.scala) 在 com.tgt.search.metrics.es.bulk.Sparkimporter.main(Sparkimporter.java:88) 引起:java.lang.StringIndexOutOfBoundsException:字符串索引超出范围:-1 在 java.lang.String.substring(String.java:1967) 在 org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:110) 在 org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:58) 在 org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:372) 在 org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40) 在 org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 在 org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在 org.apache.spark.scheduler.Task.run(Task.scala:88) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745)
这是我的代码...
SparkConf conf = new SparkConf().setMaster("local")
.setAppName("Indexer").set("spark.driver.maxResultSize", "2g");
conf.set("es.index.auto.create", "true");
conf.set("es.nodes", "localhost");
conf.set("es.port", "9200");
conf.set("es.write.operation", "index");
JavaSparkContext sc = new JavaSparkContext(conf);
Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");
JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(doc1, doc2));
JavaEsSpark.saveToEs(javaRDD, "spark/docs");
我尝试在本地编写文件,它工作正常......这可能是配置问题。
这些是我的 pom.xml 中的依赖项
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.1.0</version>
</dependency>
<!-- <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId>
<version>2.6.4</version> </dependency> -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark_2.10</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
【问题讨论】:
标签: java elasticsearch apache-spark