【问题标题】:Cloudwatch Logs -> Kinesis Firehose -> S3 - not proper JSON?Cloudwatch 日志 -> Kinesis Firehose -> S3 - 不是正确的 JSON?
【发布时间】:2020-06-09 13:07:29
【问题描述】:

我正在尝试使用 Cloudwatch 订阅过滤器构建集中式日志记录解决方案,以将日志写入 Kinesis Firehose -> S3 -> AWS Glue -> Athena。我在数据格式方面遇到了很多问题。

最初,我使用 AWS::KinesisFirehose 的 S3DestinationConfiguration 写入 S3,然后尝试使用 AWS::Glue::Crawler 抓取数据或在 Cloudformation 模板中手动创建表。我发现 Crawler 在确定 S3 上的数据格式时遇到了很多麻烦(发现 ION 而不是 JSON - Athena 无法查询 ION)。我现在正在尝试ExtendedS3DestinationConfiguration,它允许显式配置输入和输出格式以强制其为镶木地板。

很遗憾,使用此设置 Kinesis Firehose 会返回错误日志,指出输入不是有效的 JSON。这让我想知道 Cloudwatch 订阅过滤器是否没有写入正确的 JSON - 但是此对象上没有配置选项来控制数据格式。

这不是一个特别不寻常的问题陈述,因此必须有人进行适当的配置。以下是我失败配置的一些 sn-ps:

ExtendedS3DestinationConfiguration:
        BucketARN: !Sub arn:aws:s3:::${S3Bucket}
        Prefix: !Sub ${S3LogsPath}year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
        ErrorOutputPrefix: !Sub ${FailedWritePath}
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 128
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Sub ${AppId}-logstream-${Environment}
          LogStreamName: logs
        CompressionFormat: UNCOMPRESSED
        RoleARN: !GetAtt FirehoseRole.Arn
        DataFormatConversionConfiguration:
          Enabled: true
          InputFormatConfiguration:
            Deserializer:
              OpenXJsonSerDe: {}
          OutputFormatConfiguration:
            Serializer:
              ParquetSerDe: {}
          SchemaConfiguration:
            CatalogId: !Ref AWS::AccountId
            DatabaseName: !Ref CentralizedLoggingDatabase
            Region: !Ref AWS::Region
            RoleARN: !GetAtt FirehoseRole.Arn
            TableName: !Ref LogsGlueTable
            VersionId: LATEST

以前的配置:

S3DestinationConfiguration:
        BucketARN: !Sub arn:aws:s3:::${S3Bucket}
        Prefix: !Sub ${S3LogsPath}year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
        ErrorOutputPrefix: !Sub ${FailedWritePath}
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 128
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Sub ${AppId}-logstream-${Environment}
          LogStreamName: logs
        CompressionFormat: GZIP
        RoleARN: !GetAtt FirehoseRole.Arn

还有爬虫:

Type: AWS::Glue::Crawler
    Properties:
      Name: !Sub ${DNSEndPoint}_logging_s3_crawler_${Environment}
      DatabaseName: !Ref CentralizedLoggingDatabase
      Description: AWS Glue crawler to crawl logs on S3
      Role: !GetAtt CentralizedLoggingGlueRole.Arn
#      Schedule: ## run on demand
#        ScheduleExpression: cron(40 * * * ? *)
      Targets:
        S3Targets:
          - Path: !Sub s3://${S3Bucket}/${S3LogsPath}
      SchemaChangePolicy:
        UpdateBehavior: UPDATE_IN_DATABASE
        DeleteBehavior: LOG
      TablePrefix: !Sub ${AppId}_${Environment}_

错误,使用ExtendedS3DestinationConfiguration:

"attemptsMade":1,"arrivalTimestamp":1582650068665,"lastErrorCode":"DataFormatConversion.ParseError","lastErrorMessage":"遇到格式错误的 JSON。非法字符((CTRL-CHAR,代码 31)):在 [Source: com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream@2ce955fc; line: 1, column: 2] 处的标记之间只允许使用常规空格(\r、\n、\t)

这里似乎有一些配置问题,但我找不到它。

【问题讨论】:

  • 可能值得让日志进入 lambda 中的预处理步骤,您可以在其中验证格式,然后再将其发送到 firehose
  • 嗨,你有没有设法解决这个问题?我从您的初始 FH 配置开始。显然,json 被发送 base64 编码,显然 Athena 可以默认读取它。尽管如此,让 Athena 表输出清晰的数据仍然是一项艰巨的工作!

标签: amazon-web-services aws-glue amazon-kinesis-firehose


【解决方案1】:

所以我刚刚在类似的情况下经历过这种情况,但现在可以正常工作了。

Firehose 将日志写入 S3 压缩的 Base64,并作为 JSON 记录数组。 Athena 读取数据需要解压,每行 1 条 JSON 记录。

所以从蓝图创建一个 lambda 函数:kinesis-firehose-cloudwatch-logs-processor 在 Firehose 中启用转换,并指定上述 lambda 函数。 这将解压缩,并将 json 放入 S3 每行 1 条记录。

创建 Athena 表:

CREATE EXTERNAL TABLE mydb.mytable(
  eventversion string COMMENT 'from deserializer', 
  useridentity struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>> COMMENT 'from deserializer', 
  eventtime string COMMENT 'from deserializer', 
  eventsource string COMMENT 'from deserializer', 
  eventname string COMMENT 'from deserializer', 
  awsregion string COMMENT 'from deserializer', 
  sourceipaddress string COMMENT 'from deserializer', 
  useragent string COMMENT 'from deserializer', 
  errorcode string COMMENT 'from deserializer', 
  errormessage string COMMENT 'from deserializer', 
  requestparameters string COMMENT 'from deserializer', 
  responseelements string COMMENT 'from deserializer', 
  additionaleventdata string COMMENT 'from deserializer', 
  requestid string COMMENT 'from deserializer', 
  eventid string COMMENT 'from deserializer', 
  resources array<struct<arn:string,accountid:string,type:string>> COMMENT 'from deserializer', 
  eventtype string COMMENT 'from deserializer', 
  apiversion string COMMENT 'from deserializer', 
  readonly string COMMENT 'from deserializer', 
  recipientaccountid string COMMENT 'from deserializer', 
  serviceeventdetails string COMMENT 'from deserializer', 
  sharedeventid string COMMENT 'from deserializer', 
  vpcendpointid string COMMENT 'from deserializer', 
  managementevent boolean COMMENT 'from deserializer', 
  eventcategory string COMMENT 'from deserializer')
PARTITIONED BY ( 
  datehour string)
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
WITH SERDEPROPERTIES ( 
  'paths'='awsRegion,eventCategory,eventID,eventName,eventSource,eventTime,eventType,eventVersion,managementEvent,readOnly,recipientAccountId,requestID,requestParameters,responseElements,sourceIPAddress,userAgent,userIdentity') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://mybucket/prefix'
TBLPROPERTIES (
  'projection.datehour.format'='yyyy/MM/dd/HH', 
  'projection.datehour.interval'='1', 
  'projection.datehour.interval.unit'='HOURS', 
  'projection.datehour.range'='2021/01/01/00,NOW', 
  'projection.datehour.type'='date', 
  'projection.enabled'='true', 
  'storage.location.template'='s3://mybucket/myprefix/${datehour}'
)

【讨论】:

    猜你喜欢
    • 2019-05-07
    • 2017-05-27
    • 1970-01-01
    • 2021-11-05
    • 1970-01-01
    • 2019-08-15
    • 2020-08-12
    • 2016-02-03
    • 2017-10-16
    相关资源
    最近更新 更多