【问题标题】:Streaming Kafka Messages to MySQL Database将 Kafka 消息流式传输到 MySQL 数据库
【发布时间】:2018-03-10 18:55:09
【问题描述】:

我想将 Kafka 消息写入 MySQL 数据库。 this 链接中有一个示例。在该示例中,Apache Flume 用于消费消息并将其写入 MySQL。我使用相同的代码,当我运行 flume-ng agentevent 时,总是变为 null

我的flume.conf.properties 文件是:

agent.sources=kafkaSrc
agent.channels=channel1
agent.sinks=jdbcSink

agent.channels.channel1.type=org.apache.flume.channel.kafka.KafkaChannel
agent.channels.channel1.brokerList=localhost:9092
agent.channels.channel1.topic=kafkachannel
agent.channels.channel1.zookeeperConnect=localhost:2181
agent.channels.channel1.capacity=10000
agent.channels.channel1.transactionCapacity=1000
agent.channels.channel1.parseAsFlumeEvent=false


agent.sources.kafkaSrc.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSrc.channels = channel1
agent.sources.kafkaSrc.zookeeperConnect = localhost:2181
agent.sources.kafkaSrc.topic = kafka-mysql

agent.sinks.jdbcSink.type = com.stratio.ingestion.sink.jdbc.JDBCSink
agent.sinks.jdbcSink.connectionString = jdbc:mysql://127.0.0.1:3306/test?useSSL=false
agent.sinks.jdbcSink.username=root
agent.sinks.jdbcSink.password=pass
agent.sinks.jdbcSink.batchSize = 10
agent.sinks.jdbcSink.channel =channel1
agent.sinks.jdbcSink.sqlDialect=MYSQL
agent.sinks.jdbcSink.driver=com.mysql.jdbc.Driver
agent.sinks.jdbcSink.sql=INSERT INTO kafkamsg(msg) VALUES(${body:varchar})

我哪里错了?

谢谢。

【问题讨论】:

    标签: apache-kafka flume-ng


    【解决方案1】:

    在我的参考示例中,flume 监听 kafka 的 kafka-mysql 主题。但此代码适用于 kafkachannel 主题。所以我们需要向kafkachannel主题产生消息,我不知道为什么。

    【讨论】:

      猜你喜欢
      • 2023-03-26
      • 2020-11-06
      • 2018-06-25
      • 1970-01-01
      • 1970-01-01
      • 2015-01-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多