【问题标题】:How to log all kafka topics / events如何记录所有 kafka 主题/事件
【发布时间】:2021-09-03 00:28:49
【问题描述】:

我正在研究如何记录所有 Kafka 主题/事件。 是否有任何环境变量可以运行我的 Kafka 容器? 只是出于开发目的,我使用了

docker run -p 9092:9092 -e ADVERTISED_HOST=127.0.0.1 johnnypark/kafka-zookeeper

当我运行我的 python 脚本时,我能够使用和发布事件

from kafka import KafkaConsumer

consumer = KafkaConsumer('other-topic')

for msg in consumer:
    print (msg)
from kafka import KafkaProducer

producer = KafkaProducer()

producer.send('other-topic', b'some_message_bytes')

producer.flush()

我敢打赌有一种方法可以记录所有事件,但我找不到它。

PS。我只想记录所有事件以用于开发目的。我不想添加处理程序来观看所有主题,但如果有一种方法可以通过 KafkaConsumer 类来消费所有主题,那么了解它也会很高兴。

【问题讨论】:

标签: python apache-kafka


【解决方案1】:

您可以使用正则表达式模式来定义要使用的主题,请查看另一个问题中的以下示例

How to subscribe to a list of multiple kafka wildcard patterns using kafka-python?

def subscribe(self, topics=(), pattern=None, listener=None):
    """Subscribe to a list of topics, or a topic regex pattern
    Partitions will be dynamically assigned via a group coordinator.
    Topic subscriptions are not incremental: this list will replace the
    current assignment (if there is one).

对于您的用例,您可以使用“.*”模式

对于新主题,有一个可配置的参数说明创建新主题后需要多长时间来订阅它,请检查此答案

Kafka pattern subscription. Rebalancing is not being triggered on new topic

【讨论】:

  • 感谢您的回答,不幸的是,它适用于“客户*”之类的模式,但会引发“*”错误re.error: nothing to repeat at position 0
  • 你能试试“.*”之类的吗
  • 仅适用于已使用的事件。但是对于新的,它没有:(
  • 我认为我们将无法通过 KafkaConsumer 类解决它,因为pattern (str): Pattern to match available topics. You must provide either topics or pattern, but not both.
  • 所以这意味着它将占用所有可用的主题,我认为以后不会关心创建。
【解决方案2】:

您可以使用 dpkp/kafka-python 库从多个主题中消费。正如他们在KafkaConsumer 文档中提到的那样,您可以通过提供所需的多个主题列表来简单地初始化消费者。

classkafka.KafkaConsumer(*topics, **configs)

参数:
*topics (str) - 要订阅的可选主题列表。如果未设置,请在使用记录之前调用 subscribe() 或 assign()

在初始化消费者时设置主题是可选的。稍后您可以致电consumer.subscribe() 并为主题列表提供主题列表或模式。

def subscribe(self, topics=(), pattern=None, listener=None):

参数:

topics(列表):订阅的主题列表。

pattern (str):匹配可用主题的模式。您必须提供主题或模式,但不能同时提供。

如果不知道kafka中的topic列表,可以拨打consumer.topics()获取所有topic。

定义主题(自己):

获取用户有权查看的所有主题。 这将始终向集群发出远程调用以获取最新信息。

    Returns:
        set: topics

来源

【讨论】:

  • 嗯,不错。但我正在寻找更动态的东西,因为一旦我定义了主题列表,我将无法订阅新的。
  • 只是一个想法。我没有这方面的经验。您必须为此重新启动消费者。编写在后台运行的单独线程并获取主题列表并与以前的比较。如果有新主题,则使用新列表重新启动消费者。
猜你喜欢
  • 2019-07-04
  • 1970-01-01
  • 2023-03-19
  • 2016-07-12
  • 1970-01-01
  • 2019-11-04
  • 2015-07-03
  • 2016-04-23
  • 2021-02-23
相关资源
最近更新 更多