【问题标题】:UpdateStateByKey on JSON Data from Kafka in Spark在 Spark 中来自 Kafka 的 JSON 数据上的 UpdateStateByKey
【发布时间】:2017-04-23 20:07:01
【问题描述】:

我正在从 spark.Data 中的 kafka 读取 JSON 数据。数据有开始和结束标签来显示事务的开始和结束。 JSON 数据是

`

{    "accountId": 1,   "name": "start"},
{    "accountId": 1,    "name": "A green door",     "prize":107},
{    "accountId": 2,    "name": "start"},
{    "accountId": 2,    "name": "A green door",   "prize":22 },
{    "accountId": 1,    "name": "end"},
{    "accountId": 2,    "name": "ABC",   "prize":221 },
{    "accountId": 2,    "name": "DV",   "prize":223 },
{    "accountId": 2,    "name": "end"}

` 我想使用 UpdateStateByKey 聚合对应 accountId 的奖品。

谁能告诉我该怎么做?

谢谢。

【问题讨论】:

    标签: json apache-spark apache-kafka spark-streaming


    【解决方案1】:
    data = [
        {"accountId": 1, "name": "start"},
        {"accountId": 1, "name": "A green door", "prize": 107},
        {"accountId": 2, "name": "start"},
        {"accountId": 2, "name": "A green door", "prize": 22},
        {"accountId": 1, "name": "end"},
        {"accountId": 2, "name": "ABC", "prize": 221},
        {"accountId": 2, "name": "DV", "prize": 223},
        {"accountId": 2, "name": "end"}
    ]
    
    rddQueue = map(lambda x: sc.parallelize(["%s" % x]), data)
    qs = ssc.queueStream(rddQueue)
    # qs = KafkaUtils.createDirectStream(ssc, ['test'], kafkaParams={}).map(lambda x: x[1])
    
    def makeData(x):
        kv = eval(x)
        k = kv.pop('accountId', 'NaN')
        return str(k), kv
    
    qs_all = qs.map(makeData).updateStateByKey(lambda x, y: (y or []) + x)
    
    qs_all.pprint()
    

    输出:

    ('2', [{'name': 'start'}, {'prize': 22, 'name': 'A green door'}, {'prize': 221, 'name': 'ABC'}, {'prize': 223, 'name': 'DV'}, {'name': 'end'}])
    ('1', [{'name': 'start'}, {'prize': 107, 'name': 'A green door'}, {'name': 'end'}])
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-05-27
      • 1970-01-01
      • 2016-11-03
      • 1970-01-01
      • 2017-03-28
      • 2015-09-13
      • 2015-11-07
      相关资源
      最近更新 更多