【问题标题】:Issue: Scala code in Spark shell to retrieve data from Hbase问题:Spark shell 中用于从 Hbase 检索数据的 Scala 代码
【发布时间】:2016-07-26 16:11:56
【问题描述】:

我们正在尝试在 Spark shell 中执行一个简单的 Scala 代码来从 Hbase 检索数据。 Hadoop 环境启用了 Kerberos,我们确保执行 kinit。

调用 Spark Shell 的步骤:

MASTER=yarn-client

DRIVER_CLASSPATH="/opt/cloudera/parcels/CDH/lib/hbase/lib/*"
DRIVER_LIBRARY_PATH="/opt/cloudera/parcels/CDH/lib/hadoop/lib/native"

spark-shell --driver-class-path "$DRIVER_CLASSPATH" --driver-library-path "$DRIVER_LIBRARY_PATH" --driver-memory 10G --executor-memory 15G --executor-cores 8 --num-executors 3 --master $MASTER

代码:

import org.apache.hadoop.fs._
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io._
import org.apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.hbase.util._
import org.apache.spark._

val hc = HBaseConfiguration.create
hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/hbase-site.xml"))

hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/core-site.xml"))

hc.set(TableInputFormat.INPUT_TABLE, "poc-customers")
val rdd = sc.newAPIHadoopRDD(hc, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

rdd.count

下面是错误

org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the location
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:308)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:149)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:57)
        at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
        at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:293)
        at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:268)
        at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:140)
        at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:135)
        at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:888)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.restart(TableRecordReaderImpl.java:90)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.initialize(TableRecordReaderImpl.java:167)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReader.initialize(TableRecordReader.java:134)
        at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase$1.initialize(TableInputFormatBase.java:200)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Could not set up IO Streams to <management-node-server-hostname>/10.118.114.40:60020
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:773)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:881)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:850)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1184)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)
        at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:31865)
        at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1126)
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:299)
        ... 23 more
Caused by: java.lang.RuntimeException: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'.
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$1.run(RpcClientImpl.java:673)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.handleSaslConnectionFailure(RpcClientImpl.java:631)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:739)
        ... 33 more
Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
        at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:154)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:731)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:728)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:728)
        ... 33 more
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
        at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
        at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121)
        at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
        at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223)
        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193)
        ... 42 more

请注意:

  1. 我们能够从同一个会话中调用 Hbase shell 并从同一个表中扫描记录
  2. 我们能够从同一个 Spark Shell 会话中对 HDFS 文件执行字数统计
  3. 我们可以在本地模式下执行上述代码
  4. 我们能够从同一个 spark-shell 会话中执行其他操作,例如 – 一个。 val admin = new HBaseAdmin(hc) 湾。 print(admin.isTableAvailable(“poc-customers”))

寻求帮助以解决此问题。

【问题讨论】:

  • 异常来自何处:Spark 驱动程序(在您使用kinit 创建 TGT 的同一台机器上)或 Spark 执行程序(可能在其他机器上)??

标签: hadoop apache-spark hbase kerberos


【解决方案1】:

当 Spark“驱动程序”请求 YARN 在集群中的某处生成其“执行程序”时,它会使用其 本地 Kerberos TGT(您使用 kinit 创建的 TGT)进行身份验证。然后 YARN 发出一个 global 委托令牌,由所有执行程序共享以访问 HDFS 和 YARN。

唉,HBase 不支持该委托令牌。每个执行器必须重新验证到 ZK,然后到实际的 HBase RegionServer,使用 本地 TGT。

在一个完美的世界里,你只需要在“spark-default.conf”中插入两个属性,即spark.yarn.principalspark.yarn.keytab(创建一个keytab来存储你的密码是你用“ktutil " 实用程序)

唉,该功能是为需要更新其 HDFS 委托令牌(通常每 7 天)的长时间运行的流式作业而构建的,而不是为 HBase 初始身份验证而构建的。现在,Spark 1.6 的发行说明显示了许多与 YARN 和 Kerberos 相关的错误修复,也许该功能现在也适用于 HBase。但我不会打赌。

那么解决方法是什么?

  1. 在驱动程序运行的 Java 代码中,声明 keytab 文件必须通过 addFile() 发送到每个执行程序
  2. 在 Executor 运行的 Java 代码中,显式创建一个 Hadoop UserGroupInformation,在连接到 HBase 之前,从 keytab 显式获取自己的 Kerberos TGT

请注意,当使用这种方式时,UGI 会保持其 TGT 私有 - 它不会显示在缓存中,因此同一台机器上的其他进程无法重用它(另一方面,来自另一个进程的 kinit不会篡改)。

【讨论】:

    【解决方案2】:

    你的问题的根本原因是

    GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)

    Cloudera Troubleshooting Guide 建议解决此问题

    说明: 用户必须拥有有效的 Kerberos 票证才能与安全的 Hadoop 集群进行交互。如果凭据缓存中没有有效的 Kerberos 票证,则运行任何 Hadoop 命令(例如 hadoop fs -ls)都会失败。如果您没有有效票证,您将收到如下错误:

    11/01/04 12:08:12 WARN ipc.Client: 连接到服务器时遇到异常: javax.security.sasl.SaslException: GSS 启动失败 [由 GSSException 引起:未提供有效凭据(机制级别:未能找到任何 Kerberos tgt)] 与 FS 的连接不良。命令中止。异常:调用 nn-host/10.0.0.2:8020 本地异常失败:java.io.IOException: javax.security.sasl.SaslException:GSS 启动失败 [由 GSSException 引起:未提供有效凭据(机制级别:找不到任何 Kerberos tgt)] 解决方案: 您可以通过运行 klist 命令检查当前在您的凭证缓存中的 Kerberos 票证。您可以通过运行 kinit 命令并指定包含凭据的 keytab 文件或输入您的主体的密码来获取票证。

    您可以尝试建议的解决方案。

    【讨论】:

      【解决方案3】:

      我正在从事与 OP 相同的项目。我们没有直接使用 Samson Scharfrichter 的答案,但它让我们相信这种解决方案是可能的。以下是对我们有用的方法:

      我们现在使用来自 SparkOnHBase (https://github.com/cloudera-labs/SparkOnHBase) 的 RDD,但我们已经合并了 https://github.com/cloudera-labs/SparkOnHBase/pull/7 建议的更改。由于这个拉取请求是开放的,它的变化也可以通过子类化来实现:

      import com.cloudera.spark.hbase.{HBaseContext, HBaseScanRDD}
      import org.apache.hadoop.conf.Configuration
      import org.apache.hadoop.hbase.HBaseConfiguration
      import org.apache.hadoop.hbase.client.Scan
      import org.apache.hadoop.security.UserGroupInformation
      import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
      import org.apache.spark.{SerializableWritable, SparkContext}
      import org.apache.spark.broadcast.Broadcast
      import org.apache.spark.deploy.SparkHadoopUtil
      import org.apache.spark.rdd.RDD
      
      class MyHBaseScanRDD (sc: SparkContext,
          @transient tableName: String,
          @transient scan: Scan,
          configBroadcast: Broadcast[SerializableWritable[Configuration]]) extends HBaseScanRDD(sc, tableName, scan, configBroadcast) {
        val jobCredentialBroadcast = sc.broadcast(new SerializableWritable(jobTransient.getCredentials))
      
        override def addCreds {
          val creds = SparkHadoopUtil.get.getCurrentUserCredentials
          @transient val ugi = UserGroupInformation.getCurrentUser
          ugi.addCredentials(creds)
          ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
          ugi.addCredentials(jobCredentialBroadcast.value.value)
        }
      }
      
      class MyHBaseContext (sc: SparkContext,
          @transient config: Configuration,
          val tmpHdfsConfigFile: String = null) extends HBaseContext(sc, config, tmpHdfsConfigFile) {
        def myHBaseScanRDD(tableName: String, scan: Scan): RDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] = {
          new MyHBaseScanRDD(sc, tableName, scan, broadcastedConf)
        }
      }
      
      val hc = HBaseConfiguration.create
      val scan = new Scan
      val hbaseContext = new MyHBaseContext(sc, hc)
      val rdd = hbaseContext.myHBaseScanRDD("tableName", scan)
      rdd.count
      

      看起来这些更改已合并到 HBase 的 HBase-Spark 模块中,该模块是 SparkOnHBase 的继承者。版本控制问题使我们无法使用较新的 HBase 库,但我建议遇到此问题的任何人先尝试一下。

      【讨论】:

      • 这种方法可以用于像火花流这样的长时间运行的工作吗?
      猜你喜欢
      • 1970-01-01
      • 2014-11-25
      • 2023-03-21
      • 1970-01-01
      • 2018-01-30
      • 2020-02-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多