【问题标题】:Kafka Connect FileStreamSource ignores appended linesKafka Connect FileStreamSource 忽略附加行
【发布时间】:2018-10-26 17:43:26
【问题描述】:

我正在开发一个使用 Spark 处理日志的应用程序,我想使用 Kafka 作为从日志文件流式传输数据的一种方式。基本上我有一个日志文件(在本地文件系统上),它会不断更新新日志,Kafka Connect 似乎是从文件中获取数据以及新附加行的完美解决方案。

我正在使用以下命令以默认配置启动服务器:

动物园管理员服务器:

zookeeper-server-start.sh config/zookeeper.properties

zookeeper.properties

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0

Kafka 服务器:

kafka-server-start.sh config/server.properties

server.properties

broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
[...]

然后我创建了主题“连接测试”:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic connect-test

最后我运行了 Kafka 连接器:

connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

connect-standalone.properties

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

连接文件源.properties

name=my-file-connector
connector.class=FileStreamSource
tasks.max=1
file=/data/users/zamara/suivi_prod/app/data/logs.txt
topic=connect-test

起初我通过运行一个简单的控制台消费者来测试连接器:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

一切正常,消费者从文件中接收日志,当我添加日志时,消费者不断更新新日志。

(然后我按照本指南尝试将 Spark 作为“消费者”:https://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers,它仍然很好)

之后,我从日志文件中删除了一些日志并更改了主题(我删除了“connect-test”主题,创建了另一个主题并使用新主题编辑了 connect-file-source.properties)。

但现在连接器不再以相同的方式工作了。使用控制台使用者时,我只获取文件中已经存在的日志,并且我添加的每一行都被忽略。可能在不更改连接器名称的情况下更改主题(和/或修改日志文件中的数据)破坏了 Kafka 中的某些内容...

这就是 Kafka Connect 对我的主题“new-topic”和连接器“new-file-connector”所做的:

[2018-05-16 15:06:42,454] INFO Created connector new-file-connector (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-05-16 15:06:42,487] INFO Cluster ID: qjm74WJOSomos3pakXb2hA (org.apache.kafka.clients.Metadata:265)
[2018-05-16 15:06:42,522] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: new-topic-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:06:52,458] INFO WorkerSourceTask{id=new-file-connector-0} Finished commitOffsets successfully in 5 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:427)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:07:12,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:12,460] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)

(即使在文件中添加新行后,它也会不断刷新 0 条未完成的消息)

所以我尝试重新开始:我删除了 /tmp/kafka-logs 目录、/tmp/connect.offset 文件,并使用了全新的主题名称、连接器名称和日志文件,以防万一。但是,连接器仍然会忽略新日志...我什至尝试删除我的 kafka,从存档中重新提取它并再次运行整个过程(以防 Kafka 发生更改),但没有成功。

我不知道问题出在哪里,任何帮助将不胜感激!

【问题讨论】:

    标签: apache-kafka apache-kafka-connect


    【解决方案1】:

    Kafka Connect 不会“监视”或“跟踪”文件。我不相信它在确实这样做的任何地方都有记录。


    我会说它对于读取活动日志的用处甚至不如使用Spark Streaming to watch a folder。 Spark 将“识别”新创建的 文件。 Kafka Connect FileStreamSource 必须指向一个预先存在的不可变文件。

    要让 Spark 使用活动日志,您需要 something that does "log rotation" - 也就是说,当文件达到最大大小或某个时间段(例如一天)结束等条件时,此过程将将活动日志移动到 Spark 正在监视的目录,然后它会处理启动一个新日志文件以供您的应用程序继续写入。


    如果您希望主动监视文件并将其摄取到 Kafka,则可以使用 Filebeat、Fluentd 或 Apache Flume。

    【讨论】:

      【解决方案2】:

      docs

      FileStream 连接器示例旨在为那些第一次以用户或开发人员身份开始使用 Kafka Connect 的人展示如何运行一个简单的连接器。不建议用于生产。

      我会使用 Filebeat 之类的东西(及其 Kafka 输出)来代替将日志摄取到 Kafka。或者 kafka-connect-spooldir 如果您的日志不是直接附加到的,而是放置在文件夹中以供摄取的独立文件。

      【讨论】:

        猜你喜欢
        • 2019-09-15
        • 2018-07-08
        • 2017-06-23
        • 1970-01-01
        • 1970-01-01
        • 2012-10-31
        • 1970-01-01
        • 2019-04-30
        • 2021-06-24
        相关资源
        最近更新 更多