【问题标题】:Flink stucks at checkpoint creationFlink 卡在检查点创建
【发布时间】:2020-05-20 18:03:50
【问题描述】:

我有一个在创建检查点时卡住的 flink 作业。它几乎没有状态(除了一些 kafka 偏移量)。

作业本身具有以下基本设置:

KafkaSource -> iterate -> HDFSSink

iterate 函数再次执行 HTTP 调用并转发成功,丢弃 4xx 并重试 5xx。 从我的指标中可以看出,所有这些都发生了,我得到了一些 5xx(返回到迭代源)一些 4xx(忽略)和很多 2xx(转发到 HDFS)。

如果我查看线程转储,我可以看到某个任务被阻止:

"Async calls on IterationSource-8 (1/1)" #123 daemon prio=5 os_prio=0 tid=0x00007f174000f800 nid=0x237 waiting for monitor entry [0x00007f17b32f5000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:747)
    - waiting to lock <0x00000000ace0f128> (a java.lang.Object)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:683)
    at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1155)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

这个正在等待一个对象监视器,它被持有:

"IterationSource-8 (1/1)" #63 prio=5 os_prio=0 tid=0x00007f17c00bf000 nid=0x1e0 in Object.wait() [0x00007f17b17d2000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:256)
    - locked <0x00000000acd030b0> (a java.util.ArrayDeque)
    at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
    at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:184)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
    at org.apache.flink.streaming.runtime.tasks.StreamIterationHead.performDefaultAction(StreamIterationHead.java:77)
    - locked <0x00000000ace0f128> (a java.lang.Object)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

仔细查看源代码,我可以看到第二个线程(持有锁)似乎处于某种无限循环中:

LocalBufferPool.java:

while (availableMemorySegments.isEmpty()) {
}

亲爱的 Flink 大师们有什么线索可以查看哪个指标吗?我正在使用 Flink 1.9.0。

提前感谢您的任何提示!

【问题讨论】:

  • HTTP 调用是异步的还是同步的?

标签: java apache-flink flink-streaming


【解决方案1】:

当我在 Flink Sink 中使用 HTTP 调用时,我遇到了类似的检查点。经过大量的跟踪和错误后,我发现,如果每秒下沉速率低于输入速率,检查点就会被触发。

为此,我为源(输入)指定并行度为 1,为 HTTP 调用指定并行度为 8。

这将在等待 HTTP 响应时不会阻塞线程,以便检查点发生。我也是 Flink 的新手,希望一些专家解释为什么在 flink 中使用 HTTP 调用时检查点会变慢。

【讨论】:

    猜你喜欢
    • 2020-09-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-01-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-01-27
    相关资源
    最近更新 更多