【问题标题】:The missing records of Kafka consumerKafka消费者的缺失记录
【发布时间】:2015-08-11 23:57:10
【问题描述】:

Kafka 和 Spark-Streaming 之间存在问题,我在生产中有一个低级流量(大约 12000 - 15000 条记录/每秒)服务,起初,消耗流量似乎正常,但在 10 - 15 分钟后,顿时消耗的速度几乎剩下1/10了。可能是网络流量问题?

Kafka 的配置:
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=12
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
log.cleanup.interval.mins=1

spark-streaming 的配置(消费者):

....
val kafkaParams = Map(
    "zookeeper.connect" -> zkQuorum,
    "group.id" -> group,
    "zookeeper.connection.timeout.ms" -> "1000000",
    "zookeeper.sync.time.ms" -> "200",
    "fetch.message.max.bytes" -> "2097152000",
    "queued.max.message.chunks" -> "1000",
    "auto.commit.enable" -> "true",
    "auto.commit.interval.ms" -> "1000")

try {
    KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics.map((_, partition)).toMap,
      StorageLevel.MEMORY_ONLY).map {
      case (key, value) => convertTo(key, value)
    }.filter {
      _ != null
    }.foreachRDD(line => saveToHBase(line, INPUT_TABLE))
    //}.foreachRDD(line => logger.info("handling testing....."+ line))
  } catch {
    case e: Exception => logger.error("consumerEx: " + e.printStackTrace)
  }

【问题讨论】:

    标签: apache-kafka spark-streaming


    【解决方案1】:

    可能是 GC 暂停时间。检查这个:http://ingest.tips/2015/01/21/handling-large-messages-kafka/

    【讨论】: