【问题标题】:Error while writing to HBase Table using PySpark使用 PySpark 写入 HBase 表时出错
【发布时间】:2017-10-25 04:46:32
【问题描述】:

我正在尝试使用 pySpark 写入 hbase 表。到目前为止,我可以从 hbase 读取数据。但写入 hbase 表时出现异常。

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *

properties = {
  "instanceId" : "hbase",
  "zookeepers" : "10-x-x-x.local:2181,10-x-x-x.local:2181,10-x-x-x.local:2181",
  "hbase.columns.mapping" : "KEY_FIELD STRING :key, A STRING c:a, B STRING c:b",
  "hbase.use.hbase.context" : False,
  "hbase.config.resources" : "file:///etc/hbase/conf/hbase-site.xml",
  "hbase.table"  : "t"
}
spark = SparkSession\
        .builder\
        .appName("hbaseWrite")\
        .getOrCreate()

sc = spark.sparkContext

#I am able to read the data successfully.
#df = spark.read.format("org.apache.hadoop.hbase.spark")\
#    .options( **properties)\
#    .load()

data = [("3","DATA 3 A", "DATA 3 B")]
columns = ['KEY_FIELD','A','B']
cSchema = StructType([StructField(columnName, StringType()) for columnName in columns])
df = spark.createDataFrame(data, schema=cSchema)
df.write\
      .options( **properties)\
      .mode('overwrite').format("org.apache.hadoop.hbase.spark").save()

执行命令格式如下:

spark2-submit --master local[*] write_to_hbase.py

Spark 版本:2.2.0.cloudera1(我无法更改我的 spark 版本) HBase 版本:1.2.0-cdh5.12.0(但我可以更改我的 HBase 版本)

注意:我已将 hbase jar 添加到 spark2 jar 文件夹中,并将以下依赖 jar 添加到 spark2 jar 文件夹中。

  1. spark-core_2.11-1.6.1.jar
  2. htrace-core-3.1.0-incubating.jar
  3. scala-library-2.9.1.jar

错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o70.save.
: java.lang.RuntimeException: org.apache.hadoop.hbase.spark.DefaultSource does not allow create table as select.
        at scala.sys.package$.error(package.scala:27)
        at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:476)

我尝试了多个建议,但没有任何效果。这可能是一个重复的问题,但我没有其他选择可以找到答案。

【问题讨论】:

标签: apache-spark pyspark apache-spark-sql spark-dataframe pyspark-sql


【解决方案1】:

如果您使用的是Cloudera distribution,那么运气不好,没有官方方法可以使用PYSAPRK 写信给HBASE。这已由Cloudera support Team 确认。

但是,如果您使用的是 Hortonworks 并且您有 spark 2.0,那么下面的链接应该可以帮助您入门。

Pyspark to Hbase write

【讨论】:

  • 能否提供链接 Cloudera 支持团队确认?
  • @AdaPongaya 对不起,我不能这样做,因为它是内部文档。它张贴在我的团队在工作中提出的支持案例的案例说明中
【解决方案2】:

通过编译 git repo https://github.com/hortonworks-spark/shc 并将 shc jar 放入 spark jar 文件夹中解决了这个问题。并遵循@Aniket Kulkarni 建议的link

最终的代码看起来像这样,

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *

properties = {
  "instanceId" : "hbase",
  "zookeepers" : "10-x-x-x.local:2181,10-x-x-x.local:2181,10-x-x-x.local:2181",
  "hbase.columns.mapping" : "KEY_FIELD STRING :key, A STRING c:a, B STRING c:b",
  "hbase.use.hbase.context" : False,
  "hbase.config.resources" : "file:///etc/hbase/conf/hbase-site.xml",
  "hbase.table"  : "test_table"
}
spark = SparkSession.builder\
        .appName("hbaseWrite")\
        .getOrCreate()

sc = spark.sparkContext
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"test_table"}
    "rowkey":"key",
    "columns":{
        "KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
        "A":{"cf":"c", "col":"a", "type":"string"},
        "B":{"cf":"c", "col":"b", "type":"string"}
    }
}""".split())


data = [("3","DATA 3 A", "DATA 3 B")]
columns = ['KEY_FIELD','A','B']
cSchema = StructType([StructField(columnName, StringType()) for columnName in columns])
df = spark.createDataFrame(data, schema=cSchema)
df.write\
      .options(catalog=catalog)\
      .options( **properties)\
      .mode('overwrite').format("org.apache.spark.sql.execution.datasources.hbase").save()

【讨论】:

    猜你喜欢
    • 2020-07-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多