【发布时间】:2018-11-30 22:27:19
【问题描述】:
我目前正在使用 Kafka Connect S3 Sink Connector 3.3.1 将 Kafka 消息复制到 S3,并且在处理延迟数据时出现 OutOfMemory 错误。
我知道这看起来是一个很长的问题,但我尽力让它清晰易懂。 非常感谢您的帮助。
高级信息
- 连接器对 Kafka 消息进行简单的逐字节复制,并在字节数组的开头添加消息的长度(用于解压缩目的)。
- 这是
CustomByteArrayFormat类的作用(参见下面的配置)
- 这是
- 根据
Record时间戳对数据进行分区和分桶-
CustomTimeBasedPartitioner扩展了io.confluent.connect.storage.partitioner.TimeBasedPartitioner,其唯一目的是覆盖generatePartitionedPath方法,将主题放在路径的末尾。
-
- Kafka Connect 进程的总堆大小为 24GB(仅一个节点)
- 连接器每秒处理 8,000 到 10,000 条消息
- 每条消息的大小接近 1 KB
- Kafka 主题有 32 个分区
OutOfMemory 错误的上下文
- 仅当连接器已关闭数小时且必须赶上数据时才会发生这些错误
- 重新打开连接器时,它开始赶上,但很快就因 OutOfMemory 错误而失败
可能但不完整的解释
- 当这些 OOM 错误发生时,连接器的
timestamp.extractor配置设置为Record - 将此配置切换为
Wallclock(即Kafka Connect进程的时间)不要抛出OOM错误,所有迟到的数据都可以处理,但迟到的数据不再正确分桶- 所有迟到的数据都将在连接器重新打开时的
YYYY/MM/dd/HH/mm/topic-name分桶
- 所有迟到的数据都将在连接器重新打开时的
- 所以我的猜测是,当连接器尝试根据
Record时间戳正确存储数据时,它执行过多的并行读取会导致 OOM 错误-
"partition.duration.ms": "600000"参数在每小时 6 个 10 分钟路径中生成连接器存储桶数据(2018/06/20/12/[00|10|20|30|40|50]表示 2018 年 6 月 20 日下午 12 点) - 因此,对于 24 小时的延迟数据,连接器必须在
24h * 6 = 144不同的 S3 路径中输出数据。 - 每 10 分钟文件夹包含 10,000 条消息/秒 * 600 秒 = 6,000,000 条消息,大小为 6 GB
- 如果确实是并行读取,那么将有 864GB 的数据进入内存
-
- 我认为我必须正确配置一组给定的参数以避免那些 OOM 错误,但我觉得我没有看到全局
-
"flush.size": "100000"暗示如果有更多 dans 100,000 条消息被读取,它们应该被提交到文件中(从而释放内存)- 对于 1KB 的消息,这意味着每 100MB 提交一次
- 但即使有 144 个并行读数,总共仍只能提供 14.4 GB,小于可用的 24GB 堆大小
-
"flush.size"是在提交之前每个分区要读取的记录数吗?或者可能每个连接器的任务?
- 我理解
"rotate.schedule.interval.ms": "600000"配置的方式是,即使flush.size的100,000 条消息尚未到达,数据也将每10 分钟提交一次。
-
我的主要问题是允许我计划内存使用的数学是什么:
- 每秒的数量或记录数
- 记录的大小
- 我读取的主题的 Kafka 分区数
- 连接器任务的数量(如果相关)
- 每小时写入的桶数(这里是 6,因为
"partition.duration.ms": "600000"配置) - 要处理的延迟数据的最大小时数
配置
S3 接收器连接器配置
{
"name": "xxxxxxx",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "us-east-1",
"partition.duration.ms": "600000",
"topics.dir": "xxxxx",
"flush.size": "100000",
"schema.compatibility": "NONE",
"topics": "xxxxxx,xxxxxx",
"tasks.max": "16",
"s3.part.size": "52428800",
"timezone": "UTC",
"locale": "en",
"format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
"partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"name": "xxxxxxxxx",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "xxxxxxx",
"rotate.schedule.interval.ms": "600000",
"path.format": "YYYY/MM/dd/HH/mm",
"timestamp.extractor": "Record"
}
工作器配置
bootstrap.servers=XXXXXX
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
consumer.auto.offset.reset=earliest
consumer.max.partition.fetch.bytes=2097152
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
group.id=xxxxxxx
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
rest.advertised.host.name=XXXX
编辑:
我忘记添加我遇到的错误的示例:
2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.OutOfMemoryError: Java heap space
[2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
[2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
【问题讨论】:
标签: amazon-s3 out-of-memory apache-kafka-connect