【发布时间】:2020-02-21 17:02:59
【问题描述】:
当我尝试使用基本身份验证从 spark 连接 elasticsearch 以创建新索引时出现以下错误。
来自弹性搜索的错误未提供完整的错误信息以进行更多调试
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [HEAD] on [devl_test_index] failed; server[https://<elasticServerHost>:9200] returned [403|Forbidden:]
at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:477)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:447)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:539)
at org.elasticsearch.hadoop.rest.RestClient.indexExists(RestClient.java:534)
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:545)
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:364)
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:660)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:636)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:65)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
用于连接的代码:
Sbt dependency: "org.elasticsearch" % "elasticsearch-hadoop" % "7.5.0"
import org.elasticsearch.spark.sql._
val spark = SparkSession.builder().appName("SparkJDBC")
.enableHiveSupport()
.config("spark.es.port","9200")
.config("spark.es.nodes", "<elasticServerHost>")
.config("spark.es.nodes.wan.only","true")
.config("spark.es.net.ssl","true")
.config("spark.es.net.http.auth.user","USERNAME")
.config("spark.es.net.http.auth.pass","PASSWRD")
.master("local[*]")
.getOrCreate()
val df = spark.sql("select * from employee")
df.saveToEs("devl_test_index")
【问题讨论】:
标签: scala apache-spark elasticsearch