【发布时间】:2018-10-26 17:43:26
【问题描述】:
我正在开发一个使用 Spark 处理日志的应用程序,我想使用 Kafka 作为从日志文件流式传输数据的一种方式。基本上我有一个日志文件(在本地文件系统上),它会不断更新新日志,Kafka Connect 似乎是从文件中获取数据以及新附加行的完美解决方案。
我正在使用以下命令以默认配置启动服务器:
动物园管理员服务器:
zookeeper-server-start.sh config/zookeeper.properties
zookeeper.properties
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
Kafka 服务器:
kafka-server-start.sh config/server.properties
server.properties
broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
[...]
然后我创建了主题“连接测试”:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic connect-test
最后我运行了 Kafka 连接器:
connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
连接文件源.properties
name=my-file-connector
connector.class=FileStreamSource
tasks.max=1
file=/data/users/zamara/suivi_prod/app/data/logs.txt
topic=connect-test
起初我通过运行一个简单的控制台消费者来测试连接器:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
一切正常,消费者从文件中接收日志,当我添加日志时,消费者不断更新新日志。
(然后我按照本指南尝试将 Spark 作为“消费者”:https://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers,它仍然很好)
之后,我从日志文件中删除了一些日志并更改了主题(我删除了“connect-test”主题,创建了另一个主题并使用新主题编辑了 connect-file-source.properties)。
但现在连接器不再以相同的方式工作了。使用控制台使用者时,我只获取文件中已经存在的日志,并且我添加的每一行都被忽略。可能在不更改连接器名称的情况下更改主题(和/或修改日志文件中的数据)破坏了 Kafka 中的某些内容...
这就是 Kafka Connect 对我的主题“new-topic”和连接器“new-file-connector”所做的:
[2018-05-16 15:06:42,454] INFO Created connector new-file-connector (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-05-16 15:06:42,487] INFO Cluster ID: qjm74WJOSomos3pakXb2hA (org.apache.kafka.clients.Metadata:265)
[2018-05-16 15:06:42,522] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: new-topic-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:06:52,453] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:06:52,458] INFO WorkerSourceTask{id=new-file-connector-0} Finished commitOffsets successfully in 5 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:427)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:02,459] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-16 15:07:12,459] INFO WorkerSourceTask{id=new-file-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-16 15:07:12,460] INFO WorkerSourceTask{id=new-file-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
(即使在文件中添加新行后,它也会不断刷新 0 条未完成的消息)
所以我尝试重新开始:我删除了 /tmp/kafka-logs 目录、/tmp/connect.offset 文件,并使用了全新的主题名称、连接器名称和日志文件,以防万一。但是,连接器仍然会忽略新日志...我什至尝试删除我的 kafka,从存档中重新提取它并再次运行整个过程(以防 Kafka 发生更改),但没有成功。
我不知道问题出在哪里,任何帮助将不胜感激!
【问题讨论】:
标签: apache-kafka apache-kafka-connect