【问题标题】:Kafka Connect S3 sink - how to use the timestamp from the message itself [timestamp extractor]Kafka Connect S3 sink - 如何使用消息本身的时间戳[时间戳提取器]
【发布时间】:2019-08-18 22:15:51
【问题描述】:

我一直在努力解决使用 kafka connect 和 S3 sink 的问题。

首先是结构:

{
   Partition: number
   Offset: number
   Key: string
   Message: json string
   Timestamp: timestamp
}

通常在发布到 Kafka 时,时间戳应该由生产者设置。不幸的是,似乎有些情况没有发生。这意味着时间戳有时可能是null

为了提取此时间戳,连接器设置为以下值: "timestamp.extractor":"Record".

现在可以确定Message 字段本身也始终包含时间戳。

Message:

{
   timestamp: "2019-04-02T06:27:02.667Z"
   metadata: {
     creationTimestamp: "1554186422667"
   }
}

但是现在的问题是,我想将该字段用于timestamp.extractor

我以为这样就足够了,但这似乎不起作用:

"timestamp.extractor":"RecordField",
"timestamp.field":"message.timestamp",

这也会产生一个 NullPointer。

关于如何使用来自 kafka 消息负载本身的时间戳的任何想法,而不是为 kafka v0.10+ 设置的默认时间戳字段

编辑: 完整配置:

{ "name": "<name>",
  "config": {
    "connector.class":"io.confluent.connect.s3.S3SinkConnector",
    "tasks.max":"4",
    "topics":"<topic>",
    "flush.size":"100",
    "s3.bucket.name":"<bucket name>",
    "s3.region": "<region>",
    "s3.part.size":"<partition size>",
    "rotate.schedule.interval.ms":"86400000",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "storage.class":"io.confluent.connect.s3.storage.S3Storage",
    "format.class":"io.confluent.connect.s3.format.json.JsonFormat",
    "locale":"ENGLISH",
    "timezone":"UTC",
    "schema.generator.class":"io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
    "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "partition.duration.ms": "3600000",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
    "timestamp.extractor":"RecordField",
    "timestamp.field":"message.timestamp",
    "max.poll.interval.ms": "600000",
    "request.timeout.ms": "610000",
    "heartbeat.interval.ms": "6000",
    "session.timeout.ms": "20000",
    "s3.acl.canned":"bucket-owner-full-control"
  }
}

编辑 2: Kafka 消息负载结构:

{
  "reference": "",
  "clientId": "",
  "gid": "",
  "timestamp": "2019-03-19T15:27:55.526Z",
}

编辑 3:

{
"transforms": "convert_op_creationDateTime",
"transforms.convert_op_creationDateTime.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_op_creationDateTime.target.type": "Timestamp",
"transforms.convert_op_creationDateTime.field": "timestamp",
"transforms.convert_op_creationDateTime.format": "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"
}

所以我尝试对对象进行转换,但似乎我又被困在这件事上了。该模式似乎无效。环顾互联网,这似乎是一个有效的 SimpleDatePattern。它似乎在抱怨'T'。也更新了消息架构。

【问题讨论】:

  • 澄清一下:您使用 Kafka Connect 作为接收器?并使用 Single Message Transform 将 Kafka 消息的时间戳提取到您要写入接收器的消息的字段中?你能分享你完整的 Kafka Connect 配置吗?
  • 更新了消息!
  • 好的,现在这更有意义了 :) 你能分享你的消息架构吗?
  • 你是说这个属性吗? Message: json string?
  • 你能显示完整的堆栈跟踪吗?

标签: amazon-s3 apache-kafka apache-kafka-connect confluent-platform


【解决方案1】:

根据您共享的架构,您应该设置:

    "timestamp.extractor":"RecordField",
    "timestamp.field":"timestamp",

即时间戳字段名称没有 message 前缀。

【讨论】:

  • 虽然似乎没有完全工作。仍然得到一个空指针。尽管消息实际上有可用的时间戳(根据 kafka 工具)
  • 可能与它是UTC有关吗?是否需要以某种方式进行改造?
【解决方案2】:

如果数据是字符串,则 Connect 将尝试解析为 毫秒 - source code here

无论如何,message.timestamp 假定数据看起来像 { "message" : { "timestamp": ... } },所以只有 timestamp 是正确的。无论如何,使用嵌套字段是不可能的,因此您可能需要澄清您拥有的 Connect 版本。

我不完全确定在使用 JSON 转换器时如何让instanceof Date 评估为真,即使你设置了schema.enable = true,那么在代码中你也可以看到只有模式的条件数字和字符串的类型,但仍假定它是毫秒。

您可以尝试使用TimestampConverter 转换来转换您的日期字符串。

【讨论】:

  • 看来我在这里碰壁了。我已经用更多信息更新了这个问题。我确实注意到我也可以访问正常的时间戳,这对于“字符串”格式时间来说是完美的。使用 ExtractField 或与此相关的东西会有所帮助吗?
  • 不确定为什么需要提取任何内容。我自己没有使用 TimestampConverter,因为我使用的数据几乎总是以毫秒为单位的 unix 纪元时间。不过,它的单元测试在这里,如果你想看看github.com/apache/kafka/blob/trunk/connect/transforms/src/test/…
猜你喜欢
  • 2019-04-18
  • 2020-11-18
  • 1970-01-01
  • 2023-03-22
  • 2021-07-14
  • 1970-01-01
  • 1970-01-01
  • 2017-03-27
  • 2019-08-10
相关资源
最近更新 更多