【发布时间】:2019-07-29 11:09:19
【问题描述】:
我有一个玩具 Flink 作业,它读取 3 个 kafka 主题,然后合并所有这 3 个流。就是这样,没有额外的工作。
如果在我的 Flink 作业中使用并行度 1,一切似乎都很好,只要我更改并行度 > 1,它就会失败:
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:693)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
为什么它适用于并行度 1 而不是并行度 > 1?
是否与kafka服务器端设置有关?或者它与我的 java 代码中的消费者设置有关(我的代码中还没有特殊配置)?
我知道这里提供的信息可能还不够,但我无法触及 kafka 集群。我只是希望一些大师之前可能会遇到同样的错误,并且可以与我分享一些建议。
我使用的是 kafka 0.10,flink 1.5。
非常感谢。
【问题讨论】:
-
如何设置并行度?
env.setParallelism?