【问题标题】:Kinesis Firehose lambda transformationKinesis Firehose lambda 转换
【发布时间】:2018-10-25 10:13:07
【问题描述】:

我有以下 lambda 函数作为 Kinesis firehose 记录转换的一部分,它将 msgpack 记录从 kinesis 输入流转换为 json。

Lambda 运行时:python 3.6

from __future__ import print_function

import base64
import msgpack
import json
print('Loading function')


def lambda_handler(event, context):
  output = []

  for record in event['records']:
    payload = msgpack.unpackb(base64.b64decode(record['data']), raw=False)

    # Do custom processing on the payload here
    output_record = {
        'recordId': record['recordId'],
        'result': 'Ok',
        'data': json.dumps(payload, ensure_ascii=False).encode('utf8')
    }
    output.append(output_record)

  print('Successfully processed {} records.'.format(len(event['records'])))
  return {'records': output}

但是 lambda 抛出以下错误:

An error occurred during JSON serialization of response: b'
{
   "id": "d23fd47f-3a62-4383-bcb3-abdb913ea572",
   "timestamp": 1526358140730,
   "message": "Hello World"
}
' is not JSON serializable
Traceback (most recent call last):
File "/var/lang/lib/python3.6/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/var/lang/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/var/lang/lib/python3.6/json/encoder.py", line 257, in iterencode
 return _iterencode(o, 0)
File "/var/runtime/awslambda/bootstrap.py", line 110, in 
decimal_serializer
raise TypeError(repr(o) + " is not JSON serializable")

我做错了什么吗?

【问题讨论】:

    标签: python python-3.x aws-lambda


    【解决方案1】:

    我能够解决这个问题。

    这是对我有用的代码。

    from __future__ import print_function
    
    import base64
    import msgpack
    import json
    
    print('Loading function')
    
    
    def lambda_handler(event, context):
      output = []
    
      for record in event['records']:
        payload = msgpack.unpackb(base64.b64decode(record['data']), raw=False)
    
        # Do custom processing on the payload here
        output_record = {
           'recordId': record['recordId'],
           'result': 'Ok',
           'data': base64.b64encode(json.dumps(payload).encode('utf-8') + b'\n').decode('utf-8')
        }
        output.append(output_record)
    
      print('Successfully processed {} records.'.format(len(event['records'])))
      return {'records': output}
    

    【讨论】:

    • 当我返回与您类似的记录时,我收到错误消息 One or more record Ids were not returned. Ensure that the Lambda function returns all received record Ids 但是当我打印我的 output 列表时,它会显示我的 200 条记录。我所有其他的东西都很好,因为我有一些其他的逻辑可以在 try-except 中做一些事情。但由于某种原因,它为 200 个条目中的每一个都写了这个。
    • 最后两行需要取消缩进,因为它们需要在for 循环之外。
    【解决方案2】:

    我已经设法以这种方式修复它:

    import base64
    import gzip
    import io
    import json
    import zlib
    def cloudwatch_handler(event, context):
      output = []
      for record in event['records']:
        compressed_payload = base64.b64decode(record['data'])
        uncompressed_payload = gzip.decompress(compressed_payload)
        print('uncompressed_payload',uncompressed_payload)
        payload = json.loads(uncompressed_payload)
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(payload).encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)
      print('Successfully processed {} records.'.format(len(event['records'])))
      return {'records': output}
    

    此解决方案不需要您包含来自 python 外部的模块,例如 msgpack

    【讨论】:

      【解决方案3】:

      我有一个执行转换工作的 go 版本 lambda 函数。

      https://github.com/hixichen/golang_lamda_decode_protobuf_firehose

      【讨论】:

        猜你喜欢
        • 2021-09-14
        • 2021-07-07
        • 2018-07-02
        • 2019-05-20
        • 2021-02-17
        • 2019-11-05
        • 2020-10-19
        • 2020-02-08
        • 2020-11-06
        相关资源
        最近更新 更多