【发布时间】:2021-10-12 10:15:35
【问题描述】:
我正在尝试通过 PySpark 从 Elastic Search(版本:7.13.4)中获取数据。但是,我收到了这个错误。
org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cannot initialize SSL - parseAlgParameters failed: ObjectIdentifier() -- data isn't an object ID (tag = 48)
我的代码:
import findspark
findspark.init()
from functools import reduce
import time, datetime, argparse,math,configparser,urllib3
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext,SparkSession,Window
from pyspark.sql.functions import split,lit,to_utc_timestamp,hour,dayofweek,round,lower,col,unix_timestamp,pandas_udf,PandasUDFType,mean,date_format,concat,when,first,last,count,sum,ceil,max,asc
from pyspark.sql.types import TimestampType,IntegerType,StringType
from datetime import timedelta
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
import pyspark.sql.functions as F
from pyspark.sql.functions import*
query = """{
"query": {
"match_all": {}
}
}"""
df = spark.read.format("org.elasticsearch.spark.sql").option("es.nodes", "elasticsearch-svc.namespace") \
.option("es.port","9200") \
.option("es.read.metadata", "false") \
.option("es.mapping.date.rich", "false") \
.option("es.query", query) \
.option("es.net.http.auth.user", "elastic-username") \
.option("es.net.http.auth.pass", "elastic-password") \
.option("es.net.ssl.keystore.location","file:////my-storage/ssl_certificates/elastic-certificates.p12") \
.option("es.net.ssl.keystore.pass","mypassword") \
.option("es.net.ssl.keystore.type","PKCS12") \
.option("es.net.ssl.truststore.location","file:////my-storage/ssl_certificates/elastic-certificates.p12") \
.option("es.net.ssl.truststore.pass","mypassword") \
.option("es.net.ssl","true") \
.option("es.net.ssl.cert.allow.self.signed","true") \
.load("my_index")
完全错误:
Py4JJavaError:调用 o673.load 时出错。 : org.elasticsearch.hadoop.EsHadoopIllegalArgumentException:不能 检测 ES 版本 - 通常如果 网络/Elasticsearch 集群不可访问,或者当针对 没有正确设置 'es.nodes.wan.only' 的 WAN/Cloud 实例 org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:340) 在 org.elasticsearch.spark.sql.ElasticsearchRelation.cfg$lzycompute(DefaultSource.scala:225) 在 org.elasticsearch.spark.sql.ElasticsearchRelation.cfg(DefaultSource.scala:223) 在 org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:229) 在 org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:229) 在 org.elasticsearch.spark.sql.ElasticsearchRelation$$anonfun$schema$1.apply(DefaultSource.scala:233) 在 org.elasticsearch.spark.sql.ElasticsearchRelation$$anonfun$schema$1.apply(DefaultSource.scala:233) 在 scala.Option.getOrElse(Option.scala:121) 在 org.elasticsearch.spark.sql.ElasticsearchRelation.schema(DefaultSource.scala:233) 在 org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:403) 在 org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) 在 org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) 在 org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.lang.Thread.run(Thread.java:748) 原因: org.elasticsearch.hadoop.EsHadoopIllegalStateException:不能 初始化 SSL - parseAlgParameters 失败:ObjectIdentifier() -- 数据 不是对象 ID (tag = 48) org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.createSSLContext(SSLSocketFactory.java:175) 在 org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.getSSLContext(SSLSocketFactory.java:160) 在 org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.createSocket(SSLSocketFactory.java:129) 在 org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707) 在 org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387) 在 org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171) 在 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) 在 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) 在 org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.doExecute(CommonsHttpTransport.java:685) 在 org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:664) 在 org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:116) 在 org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:432) 在 org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:428) 在 org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:388) 在 org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:392) 在 org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:168) 在 org.elasticsearch.hadoop.rest.RestClient.mainInfo(RestClient.java:745) 在 org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:330) ... 23 更多原因:java.io.IOException: parseAlgParameters 失败:ObjectIdentifier() -- 数据不是对象 ID (tag = 48) sun.security.pkcs12.PKCS12KeyStore.parseAlgParameters(PKCS12KeyStore.java:816) 在 sun.security.pkcs12.PKCS12KeyStore.engineLoad(PKCS12KeyStore.java:2018) 在 java.security.KeyStore.load(KeyStore.java:1445) 在 org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.loadKeyStore(SSLSocketFactory.java:200) 在 org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.loadKeyManagers(SSLSocketFactory.java:215) 在 org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.createSSLContext(SSLSocketFactory.java:173) ... 40 更多原因:java.io.IOException: ObjectIdentifier() -- 数据不是对象 ID (tag = 48) sun.security.util.ObjectIdentifier.(ObjectIdentifier.java:257) 在 sun.security.util.DerInputStream.getOID(DerInputStream.java:314) 在 com.sun.crypto.provider.PBES2Parameters.engineInit(PBES2Parameters.java:267) 在 java.security.AlgorithmParameters.init(AlgorithmParameters.java:293) 在 sun.security.pkcs12.PKCS12KeyStore.parseAlgParameters(PKCS12KeyStore.java:812) ... 45 更多
我认为密钥库类型可能是问题,并尝试将 .p12 文件转换为 .jks 文件,但它也无法正常工作
错误:
keytool error: java.io.IOException: parseAlgParameters failed: ObjectIdentifier() -- data isn't an object ID (tag = 48)
【问题讨论】:
标签: java apache-spark elasticsearch ssl pyspark