【问题标题】:Using AWS Lambda to read Kafka(MSK) event source使用 AWS Lambda 读取 Kafka(MSK) 事件源
【发布时间】:2021-02-07 12:57:28
【问题描述】:

我正在尝试使用 AWS lambda 从 kafka 主题 (AWS MSK) 中读取值。

从 lambda 打印时的事件记录如下所示:

{'eventSource': 'aws:kafka', 'eventSourceArn': 'arn:aws:kafka:ap-northeast-1:987654321:cluster/mskcluster/79y80c66-813a-4f-af0e-4ea47ba107e6', 'records ': {'Transactions-0': [{'topic': 'Transactions', 'partition': 0, 'offset': 4798, 'timestamp': 1603565835915, 'timestampType': 'CREATE_TIME', 'value': ' eyJFdmVudFRpbWUiOiAiMjAyMC0xMC0yNCAxODo1NzoxNS45MTUzMjQiLCAiSVAiOiAiMTgwLjI0MS4xNTkuMjE4IiwgIkFjY291bnROdW1iZXIiOiwiMTQ2ODA4ODYiLCAiVXNlck5hbWUiOi67iQW1iZXIgUm9tYXJvIiwgIkFtb3VudCI6ICI1NTYyIiwgIlRyYW5zYWN0aW9uSUQiOiAiTzI4Qlg3TlBJbWZmSXExWCIsICJDb3VuTHJ5IjogIk9tYW4ifQ =='}]}}

如何提取“主题”和“价值”字段?值 1 是 base64 编码的。 我收到以下错误:

NameError: 名称“记录”未定义

我正在尝试以下代码:

import json
import base64

def lambda_handler(event, context):
    print(event)
    message = event['records']
    payload=base64.b64decode(record["message"]["value"])
    print("Decoded payload: " + str(payload))

SampleMSK 事件结构

【问题讨论】:

    标签: python-3.x amazon-web-services apache-kafka aws-lambda


    【解决方案1】:

    在您的代码 sn-p 中,您尝试传递给解码函数的 record 变量不存在。遍历记录的示例是:

    records = event['records']['Transactions-0']
    for record in records:
        payload=base64.b64decode(record["message"]["value"])
        print("Decoded payload: " + str(payload))
    

    每个函数调用都包含每个主题的多条记录。虽然如果你有多个像Transactions-1,你也可以迭代那些......

    【讨论】:

    • 谢谢 - 遍历记录是诀窍!工作代码是这样的:` def lambda_handler(event, context): records = event['records']['Transactions-0'] for record in records: payload=base64.b64decode(record["value"]) print("解码的有效载荷:" + str(payload)) `
    猜你喜欢
    • 2021-04-15
    • 1970-01-01
    • 1970-01-01
    • 2021-07-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-09-29
    相关资源
    最近更新 更多