【问题标题】:Print a kstream content without increasing offset在不增加偏移量的情况下打印 kstream 内容
【发布时间】:2021-08-03 14:27:00
【问题描述】:

我目前正在对 kafka 流执行我的第一步,但我很难理解 kafka 应用程序如何存储其状态。 我想在不更新偏移量的情况下打印 kstream 的内容,感觉这不是我应该做的事情,但我很难理解为什么:

  def rawPlanningStream(
      builder: StreamsBuilder,
      topicName: String
  ): KStream[String, Planning] =
    builder.stream(topicName)(Consumed.`with`(Serdes.String, Planning.serde))

  def printPlanning(
    key: String,
    value: Planning
  ) = {
    val logger = LoggerFactory.getLogger("PlanningEventSyncLogger")
    logger.warn(s"Planning: $key, $value")
  }


def process(
      builder: StreamsBuilder,
      rawTopic: String
    ) = {
    val raw_planning_stream = PlanningEventSync.rawPlanningStream(
      builder,
      rawTopic
    )

    raw_planning_stream.peek((k,v) => printPlanning(k,v))

    //Here I would like to perform an operation on raw_planning_stream
    //but offset is already "wrong" because of the peek done earlier

   }

第一次启动进程时,主题的内容按预期打印,如果我再次启动它,它不再打印任何内容,因为偏移量已更新。

我的问题是是否可以执行像打印这样的“非侵入性”操作来保持原样?

(注意:我设法在我的组上使用 kafka-consumer-groups.sh 中的 --reset-offsets --to-earliest 来手动重置偏移量,但我希望能够以编程方式执行操作不更改我的消费者组的偏移量)

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    如果不能设置enable.auto.commit=false,那么另一种选择是设置application.id="<some random UUID>",这样每次运行应用程序时,它都会创建一个新的消费者组,从auto.offset.reset设置开始

    【讨论】:

    • 感谢您的回答,我确认如果我使用不同的 application.id 重新启动我的应用程序,它会从重置偏移量中读取并打印我的流。我的问题是我想在同一次运行中打印而不修改偏移然后执行一些操作。我还设置了enable.auto.commit=false,但我没有看到任何变化(打印后偏移量仍在更新)
    • 另外,看起来我已经使用了默认配置 enable.auto.commit=false,因为当我尝试放置 ConsumerConfig:ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> false 它说:``` Unexpected user-specified consumer config: enable.auto .commit 找到。用户设置 (false) 将被忽略并使用 Streams 默认设置 (false) ```
    • 如果你想执行一个动作并打印,那么只需在 map() 函数中打印
    • 我想我不明白的是,如果不增加偏移量,就不可能在流上映射/窥视/foreach。所以我最初想做的,首先使用其中一个函数进行打印,然后在同一流上执行另一个 map/peek/foreach 以执行其他操作,这不是正确的方法。
    • AFAIK, peek 是一个传递操作。它可以在其他非终端操作之前或之后进行,所以raw_planning_stream.peek((k,v) => printPlanning(k,v)).map((k,v) => ...) 应该可以工作。但是,如果您要进行任何拓扑更改,则确实需要重置偏移量
    猜你喜欢
    • 1970-01-01
    • 2019-05-25
    • 1970-01-01
    • 1970-01-01
    • 2012-10-29
    • 1970-01-01
    • 2015-08-23
    • 1970-01-01
    • 2015-08-19
    相关资源
    最近更新 更多