【问题标题】:How to introduce reset logic when aggregating/joining streaming dataframe with static dataframe for spark streaming如何在将流数据帧与静态数据帧聚合/加入以进行火花流式传输时引入重置逻辑
【发布时间】:2025-12-23 07:45:06
【问题描述】:

spark 结构化流的一个很好的特性是它可以将静态数据帧与流数据帧连接起来。举一个例子如下。 users 是从数据库读取的静态数据框。 transactionStream 来自流。通过加入函数,我们可以得到每个国家随着批次的新到货而累计的消费。

val spendingByCountry = (transactionStream
    .join(users, users("id") === transactionStream("userid"))
    .groupBy($"country")
    .agg(sum($"cost")) as "spending")

spendingByContry.writeStream
    .outputMode("complete")
    .format("console")
    .start()

成本总和与新批次的汇总如下所示。

-------------------------------
Batch: 0
------------------------------- 
Country Spending
EN      90.0
FR      50.0

-------------------------------
Batch: 1
------------------------------- 
Country Spending
EN      190.0
FR      150.0

如果我想像上面的例子一样引入通知和重置逻辑,正确的做法应该是什么?要求是,如果支出大于某个阈值,则将国家和支出的记录存储到一个表中,并将支出重置为0以再次累加。

【问题讨论】:

    标签: apache-spark spark-streaming


    【解决方案1】:

    您可以实现此目的的一种方法是通过任意状态处理。 groupBy 可以通过自定义函数mapGroupsWithState 进行增强,您可以在其中维护所需的所有业务逻辑。以下是 Spark 文档中的示例:

     // A mapping function that maintains an integer state for string keys and returns a string.  // Additionally, it sets a timeout to remove the state if it has not received data for an hour.  def mappingFunction(key: String, value: Iterator[Int], state: GroupState[Int]): String = {
    
       if (state.hasTimedOut) {                // If called when timing out, remove the state
         state.remove()
    
       } else if (state.exists) {              // If state exists, use it for processing
         val existingState = state.get         // Get the existing state
         val shouldRemove = ...                // Decide whether to remove the state
         if (shouldRemove) {
           state.remove()                      // Remove the state
    
         } else {
           val newState = ...
           state.update(newState)              // Set the new state
           state.setTimeoutDuration("1 hour")  // Set the timeout
         }
    
       } else {
         val initialState = ...
         state.update(initialState)            // Set the initial state
         state.setTimeoutDuration("1 hour")    // Set the timeout    }    ...    // return something  }
    
    
    
    
      dataset
    .groupByKey(...)    
    .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(mappingFunction)
    

    【讨论】: