【问题标题】:PySpark HBase/Phoenix integrationPySpark HBase/Phoenix 集成
【发布时间】:2018-03-06 04:27:47
【问题描述】:

我应该将 Phoenix 数据读入 pyspark。

编辑: 我正在使用 Spark HBase 转换器:

这是一个代码sn-p:

port="2181"
host="zookeperserver"
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
cmdata_conf = {"hbase.zookeeper.property.clientPort":port, "hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": "camel", "hbase.mapreduce.scan.columns": "data:a"}
sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=cmdata_conf)

追溯:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/2.3.0.0-2557/spark/python/pyspark/context.py", line 547, in newAPIHadoopRDD
    jconf, batchSize)
  File "/usr/hdp/2.3.0.0-2557/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/usr/hdp/2.3.0.0-2557/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.io.IOException: No table was provided.
    at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:130)

任何帮助将不胜感激。

谢谢! /蒂娜

【问题讨论】:

    标签: apache-spark pyspark phoenix


    【解决方案1】:

    推荐使用 spark phoenix 插件。 凤凰火花插件详情请咨询here

    环境:使用 AWS EMR 5.10、PySpark 测试

    以下是步骤

    1. 在phoenix中创建表https://phoenix.apache.org/language/ 打开凤凰壳

      “/usr/lib/phoenix/bin/sqlline.py”

      如果存在表名,则删除表;

      CREATE TABLE 表名(DOMAIN VARCHAR 主键);

      UPSERT INTO TableName (DOMAIN) VALUES('foo');

    2. 下载 spark phoenix 插件 jar 从https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-core/4.11.0-HBase-1.3下载spark phoenix插件jar 你需要 phoenix--HBase--client.jar ,我根据我的 phoenix 和 hbase 版本使用了 phoenix-4.11.0-HBase-1.3-client.jar

    3. 在您的 hadoop 主目录中,设置以下变量:

      phoenix_jars=/home/user/apache-phoenix-4.11.0-HBase-1.3-bin/phoenix-4.11.0-HBase-1.3-client.jar

    4. 启动 PySpark shell 并在 Driver 和 executer 类路径中添加依赖

      pyspark --jars ${phoenix_jars} --conf spark.executor.extraClassPath=${phoenix_jars}

    --创建ZooKeeper URL,替换成你的集群zookeeper quorum,可以从hbase-site.xml查看

    emrMaster = "ZooKeeper URL" 
    
    df = sqlContext.read \
    .format("org.apache.phoenix.spark") \
    .option("table", "TableName") \
    .option("zkUrl", emrMaster) \
    .load() 
    
    df.show()
    df.columns
    df.printSchema()
    df1=df.replace(['foo'], ['foo1'], 'DOMAIN')
    df1.show() 
    
    df1.write \
      .format("org.apache.phoenix.spark") \
      .mode("overwrite") \
      .option("table", "TableName") \
      .option("zkUrl", emrMaster) \
      .save()
    

    【讨论】:

      【解决方案2】:

      【讨论】:

      • 我尝试了第二种方法,但是我得到一个错误:Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. :java.io.IOException:没有提供表。你在 PYSPARK 中做过吗?
      • 您是否为 Spark newAPIHadoopRDD 提供了正确的配置,如下所示: sparkconf = { "hbase.zookeeper.quorum": zookeeperhost, "hbase.mapreduce.inputtable": sampletable, "hbase.mapreduce.scan.columns ": column} hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat", "org.apache.hadoop.hbase.io.ImmutableBytesWritable", "org.apache.hadoop.hbase.client.Result ", keyConverter=keyConv, valueConverter=valueConv, conf=sparkconf)
      • 请尝试上述方法我认为您没有在配置中提供表名。另外keyConv和valueConv的值分别是examples.pythonconverters.ImmutableBytesWritableToStringConverter和examples.pythonconverters.HBaseResultToStringConverter
      • 谢谢萨钦!但是,我做了所有这些步骤。 1. 我用需要的 jar 启动 pyspark,这些是 /hbase-0.90.1.jar, $SPARK_HOME/lib/spark-examples-1.3.1.2.3.0.0-2557hadoop2.7.1.2.3.0.0-2557.jar 2 . 然后(在 IPython Notebook 中): from pyspark import SparkContext port="2181" host="hostname" keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" valueConv = "org.apache.spark.examples.pythonconverters. HBaseResultToStringConverter"
      猜你喜欢
      • 1970-01-01
      • 2018-06-16
      • 2020-01-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多