【问题标题】:How to I index the transformed log records into AWS Elasticsearch?如何将转换后的日志记录索引到 AWS Elasticsearch?
【发布时间】:2018-04-04 07:33:14
【问题描述】:

TLDR

由于“编码问题”,lambda 函数无法将 firehose 日志索引到 AWS 托管的 ES 中。

实际错误响应

当我对来自 firehose record 的单个 logEvent 进行 base64 编码并将收集的记录发送到 AWS 托管的 ES 时,我没有收到任何错误。

有关详细信息,请参阅下一节。

base 64 编码的压缩负载正在发送到 ES,因为生成的 json 转换太大,ES 无法索引 - see this ES link

我从 AWS 托管 ES 收到以下错误:

{
    "deliveryStreamARN": "arn:aws:firehose:us-west-2:*:deliverystream/*",
    "destination": "arn:aws:es:us-west-2:*:domain/*",
    "deliveryStreamVersionId": 1,
    "message": "The data could not be decoded as UTF-8",
    "errorCode": "InvalidEncodingException",
    "processor": "arn:aws:lambda:us-west-2:*:function:*"
  }

如果输出记录未压缩,the body size is too long(小至 14MB)。如果没有压缩和简单的 base64 编码负载,我在 Lambda 日志中收到以下错误:

{
  "type": "mapper_parsing_exception",
  "reason": "failed to parse",
  "caused_by": {
    "type": "not_x_content_exception",
    "reason": "Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"
  }
}

说明

我有按大小/间隔缓冲的 Cloudwatch 日志,这些日志被输入 Kinesis Firehose。 firehose 将日志传输到 lambda 函数中,该函数将日志转换为 json 记录,然后将其发送到 AWS 托管的 Elasticsearch 集群。

lambda 函数获取以下 JSON 结构:

{
    "invocationId": "cf1306b5-2d3c-4886-b7be-b5bcf0a66ef3",
    "deliveryStreamArn": "arn:aws:firehose:...",
    "region": "us-west-2",
    "records": [{
        "recordId": "49577998431243709525183749876652374166077260049460232194000000",
        "approximateArrivalTimestamp": 1508197563377,
        "data": "some_compressed_data_in_base_64_encoding"
    }]
}

然后 lambda 函数提取 .records[].data 并将数据解码为 base64 并解压缩数据,从而生成以下 JSON:

{
  "messageType": "DATA_MESSAGE",
  "owner": "aws_account_number",
  "logGroup": "some_cloudwatch_log_group_name",
  "logStream": "i-0221b6ec01af47bfb",
  "subscriptionFilters": [
    "cloudwatch_log_subscription_filter_name"
  ],
  "logEvents": [
    {
      "id": "33633929427703365813575134502195362621356131219229245440",
      "timestamp": 1508197557000,
      "message": "Oct 16 23:45:57 some_log_entry_1"
    },
    {
      "id": "33633929427703365813575134502195362621356131219229245441",
      "timestamp": 1508197557000,
      "message": "Oct 16 23:45:57 some_log_entry_2"
    },
    {
      "id": "33633929427703365813575134502195362621356131219229245442",
      "timestamp": 1508197557000,
      "message": "Oct 16 23:45:57 some_log_entry_3"
    }
  ]
}

.logEvents[] 中的单个项目被转换为 json 结构,其中键是在 Kibana 中搜索日志时所需的列 - 如下所示:

{
    'journalctl_host': 'ip-172-11-11-111',
    'process': 'haproxy',
    'pid': 15507,
    'client_ip': '172.11.11.111',
    'client_port': 3924,
    'frontend_name': 'http-web',
    'backend_name': 'server',
    'server_name': 'server-3',
    'time_duration': 10,
    'status_code': 200,
    'bytes_read': 79,
    '@timestamp': '1900-10-16T23:46:01.0Z',
    'tags': ['haproxy'],
    'message': 'HEAD / HTTP/1.1'
}

转换后的 json 被收集到一个数组中,该数组得到 zlib 压缩和 base64 编码字符串,然后将其转换为新的 json 有效负载作为最终的 lambda 结果:

{
"records": [
    {
        "recordId": "49577998431243709525183749876652374166077260049460232194000000",
        "result": "Ok",
        "data": "base64_encoded_zlib_compressed_array_of_transformed_logs"
    }
]}

Cloudwatch 配置

13 个日志条目 (~4kb) 可以转换为大约 635kb。

我还降低了 awslogs 的阈值,希望发送到 Lambda 函数的日志的大小会变小:

buffer_duration = 10
batch_count = 10
batch_size = 500

不幸的是,当爆发时 - 峰值可能超过 2800 行,而大小超过 1MB。

当 lambda 函数生成的有效负载“太大”(大约 13mb 的转换日志)时,lambda cloudwatch 日志中会记录一个错误 - “body size is too long”。似乎没有任何迹象表明此错误来自何处,或者 lambda fn 的响应负载是否存在大小限制。

【问题讨论】:

    标签: amazon-web-services lambda elasticsearch-5 amazon-kinesis-firehose


    【解决方案1】:

    因此,AWS 支持人员告诉我,无法缓解以下限制来解决此流程:

    1. lambda 负载大小
    2. 进入 lambda 的压缩消防软管有效载荷与 lambda 输出成正比。

    相反,我已将架构修改为以下内容:

    1. Cloudwatch 日志通过 Firehose 备份到 S3 中。
    2. S3 事件由 lambda 函数处理。
    3. 如果 lambda 转换并能够成功地将日志批量索引到 ES 中,则 lambda 函数会返回成功代码。
    4. 如果 lambda 函数失败,则会为死信队列 (AWS SQS) 配置 cloudwatch 警报。可以在here 找到示例 cloudformation sn-p。
    5. 如果有 SQS 消息,可以使用这些消息手动调用 lambda 函数,或者设置 AWS 批处理作业以使用 lambda 函数处理 SQS 消息。但是,应该小心,lambda 函数不会再次故障转移到 DLQ。检查 lambda cloudwatch 日志以检查该消息未处理并发送到 DLQ 的原因。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-07-12
      • 2021-08-26
      • 2020-09-26
      • 1970-01-01
      • 1970-01-01
      • 2017-09-07
      • 2021-08-20
      • 1970-01-01
      相关资源
      最近更新 更多