【发布时间】:2018-08-22 13:25:48
【问题描述】:
我将 Spark Structured Streaming (2.3.0) 与 Kafka (1.0.0) 一起使用。
val event_stream: DataStreamReader = spark
.readStream
.format(_source)
.option("kafka.bootstrap.servers", _brokers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
我正在使用 100 G 数据为一个 Kafka 主题测试管道。在 Kafka 代理(3 个引导节点,每个节点有 2G 堆/4G RAM)上,我经常(几乎每秒)看到这个 WARN 消息:
WARN Attempting to send response via channel for which there is no open connection, connection id 10.230.0.81:9092-10.230.0.116:39110-399 (kafka.network.Processor)
我还看到代理上的堆消耗稳步增加到 GC 中的时间百分比接近 100(没有太多内存释放)导致 OOM 和节点崩溃的点。 使用以下选项:
-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true
还尝试根据KAFKA-5470替换-XX:+DisableExplicitGC with -XX:+ExplicitGCInvokesConcurrent
我的消息生成速度约为 7k 消息/秒(1 KB 消息)。
我知道我们并没有真正将 Kafka 推向极限。然而令人惊讶的是,OOM 事件和随后的节点崩溃发生了。
我会感谢那些经历过这些问题并对此领域有深刻见解的人的笔记/cmets/输入。
编辑:
尝试使用 confluent 推荐 params,但我仍然观察到上述问题:
-Xms6g -Xmx6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
我想知道 Spark Kafka(结构化流)集成是否存在泄漏。
【问题讨论】: