【问题标题】:Array[Byte] Spark RDD to String Spark RDDArray[Byte] Spark RDD 到字符串 Spark RDD
【发布时间】:2016-01-20 10:06:35
【问题描述】:

我正在使用 Cloudera 的 SparkOnHBase 模块从 HBase 获取数据。

我通过这种方式得到一个 RDD:

var getRdd = hbaseContext.hbaseRDD("kbdp:detalle_feedback", scan)

基于此,我得到的是一个类型的对象

RDD[(Array[Byte], List[(Array[Byte], Array[Byte], Array[Byte])])]

对应于行键和值列表。它们都由一个字节数组表示。

如果我将 getRDD 保存到文件中,我看到的是:

([B@f7e2590,[([B@22d418e2,[B@12adaf4b,[B@48cf6e81), ([B@2a5ffc7f,[B@3ba0b95,[B@2b4e651c), ([B@27d0277a,[B@52cfcf01,[B@491f7520), ([B@3042ad61,[B@6984d407,[B@f7c4db0), ([B@29d065c1,[B@30c87759,[B@39138d14), ([B@32933952,[B@5f98506e,[B@8c896ca), ([B@2923ac47,[B@65037e6a,[B@486094f5), ([B@3cd385f2,[B@62fef210,[B@4fc62b36), ([B@5b3f0f24,[B@8fb3349,[B@23e4023a), ([B@4e4e403e,[B@735bce9b,[B@10595d48), ([B@5afb2a5a,[B@1f99a960,[B@213eedd5), ([B@2a704c00,[B@328da9c4,[B@72849cc9), ([B@60518adb,[B@9736144,[B@75f6bc34)])

对于每条记录(rowKey 和列)

但我需要的是获取所有和每个键和值的字符串表示。或者至少是价值观。为了将其保存到文件并查看类似

key1,(value1,value2...)

或类似的东西

key1,value1,value2...

我对 spark 和 scala 完全陌生,而且很难获得一些东西。

你能帮我解决这个问题吗?

【问题讨论】:

    标签: scala apache-spark hbase


    【解决方案1】:

    首先让我们创建一些示例数据:

    scala> val d = List( ("ab" -> List(("qw", "er", "ty")) ), ("cd" -> List(("ac", "bn", "afad")) ) )
    d: List[(String, List[(String, String, String)])] = List((ab,List((qw,er,ty))), (cd,List((ac,bn,afad))))
    

    数据是这样的:

    scala> d foreach println
    (ab,List((qw,er,ty)))
    (cd,List((ac,bn,afad)))
    

    将其转换为Array[Byte] 格式

    scala> val arrData = d.map { case (k,v) => k.getBytes() -> v.map { case (a,b,c) => (a.getBytes(), b.getBytes(), c.getBytes()) } }
    
    arrData: List[(Array[Byte], List[(Array[Byte], Array[Byte], Array[Byte])])] = List((Array(97, 98),List((Array(113, 119),Array(101, 114),Array(116, 121)))), (Array(99, 100),List((Array(97, 99),Array(98, 110),Array(97, 102, 97, 100)))))
    

    根据这些数据创建一个 RDD

    scala> val rdd1 = sc.parallelize(arrData)
    rdd1: org.apache.spark.rdd.RDD[(Array[Byte], List[(Array[Byte], Array[Byte], Array[Byte])])] = ParallelCollectionRDD[0] at parallelize at <console>:25
    

    创建从Array[Byte]String 的转换函数:

    scala> def b2s(a: Array[Byte]): String = new String(a)
    b2s: (a: Array[Byte])String
    

    执行我们的最终转换:

    scala> val rdd2 = rdd1.map { case (k,v) => b2s(k) -> v.map{ case (a,b,c) => (b2s(a), b2s(b), b2s(c)) } }
    rdd2: org.apache.spark.rdd.RDD[(String, List[(String, String, String)])] = MapPartitionsRDD[1] at map at <console>:29
    
    scala> rdd2.collect()
    res2: Array[(String, List[(String, String, String)])] = Array((ab,List((qw,er,ty))), (cd,List((ac,bn,afad))))
    

    【讨论】:

    【解决方案2】:

    我不了解 HBase,但如果那些 Array[Byte]s 是 Unicode 字符串,那么这样的东西应该可以工作:

    rdd: RDD[(Array[Byte], List[(Array[Byte], Array[Byte], Array[Byte])])] = *whatever*
    rdd.map(k, l => 
      (new String(k),
      l.map(a => 
        a.map(elem =>
          new String(elem)
        )
      ))
    )
    

    很抱歉造型不好等等,我什至不确定它是否会起作用。

    【讨论】:

    • 非常感谢 mehmetminanc。它并不完全以这种方式工作,但它给了我一个面对问题的好主意。
    • @tuxdna 解释得非常简洁,但我不明白一个是如何工作的,另一个是如何工作的。两者在语义上似乎相同。
    • 很可能是 mehmetminanc。正是因为我没有经验,所以我在其他方面理解得更好。
    • 我对最佳答案不感兴趣,tuxdna 有更好的答案。我只是说它们是一样的。
    猜你喜欢
    • 2020-02-28
    • 1970-01-01
    • 1970-01-01
    • 2016-06-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多