【问题标题】:Spark can not get elements from 'MapType'Spark 无法从“MapType”获取元素
【发布时间】:2017-07-07 02:28:48
【问题描述】:

我正在编写具有以下缓冲区架构的 udaf:

bufferSchema: StructType = StructType(
    StructField("grades", MapType(StructType(StructField("subject", StringType) :: StructField("subject_type", StringType) :: Nil),
      ArrayType(StructType(StructField("date", LongType) :: StructField("grade", IntegerType) :: Nil)))) :: Nil)

看起来 spark 在内部将键类型解释为 GenericRowWithSchema 而不是简单的 (String,String)。 所以每当我尝试从地图中拉出时:

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

var buffer_scoresMap = buffer.getAs[Map[(String,String), Array[..]](0)

buffer_scoresMap.get(("k1","k2")) 返回 None 即使这个键肯定在地图中,我什至在调试中看到它。 我尝试将密钥更改为GenericRowWithSchema,然后返回(String,String),然后从地图中获取,但没有运气。

有什么想法吗?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    确实,当元组是深度嵌套列的一部分时,它们会被转换为结构,而不是转换回元组。换句话说,buffer_scoresMap 实际上具有Map[Row, Array[..]] 类型,因此您可以创建一个Row 来从中获取项目:

    var buffer_scoresMap = buffer.getAs[Map[Row, Array[..]](0)
    buffer_scoresMap.get(Row("k1","k2")) // should not be None if key exists
    

    这是一个证明这一点的简短示例:

    // create a simple DF with similar schema: 
    case class Record(grades: Map[(String, String), Array[Int]])
    val df = sc.parallelize(Seq(Record(Map(("a", "b") -> Array(1, 2))))).toDF("grades")
    
    // this indeed fails:
    df.rdd.map(r => r.getAs[Map[(String, String), Array[Int]]](0).get(("a", "b"))).first() // None
    
    // but this works:
    df.rdd.map(r => r.getAs[Map[Row, Array[Int]]](0).get(Row("a", "b"))).first() // Some(WrappedArray(1, 2))
    

    【讨论】:

      猜你喜欢
      • 2021-08-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-07-06
      相关资源
      最近更新 更多