【问题标题】:How to continuously send data to kafka? [duplicate]如何连续向kafka发送数据? [复制]
【发布时间】:2016-06-24 06:10:40
【问题描述】:

我正在尝试不断向 kafka 代理/消费者发送数据(使用 tshark 嗅探数据包)。

这是我遵循的步骤:

1. 启动 zookeeper:

kafka/bin/zookeeper-server-start.sh ../kafka//config/zookeeper.properties

2. 启动kafka 服务器

kafka/bin/kafka-server-start.sh ../kafka/config/server.properties

3.开始kafka消费者

kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic \
                                           'my-topic' --from-beginning

4. 编写了以下 python 脚本将嗅探到的数据发送给消费者:

from kafka import KafkaProducer
import subprocess
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('my-topic', subprocess.check_output(['tshark','-i','wlan0']))

但它保留在 procuder 终端上并输出:

Capturing on 'wlan0'
605
^C

没有任何东西被转移给消费者。

我知道我可以使用pyshark 在 python 上实现 tshark:

import pyshark
capture = pyshark.LiveCapture(interface='eth0')
capture.sniff(timeout=5)
capture1=capture[0]
print capture1

但我不知道如何不断地将捕获的数据包从生产者发送到消费者。有什么建议吗?

谢谢!

【问题讨论】:

  • 这也是我的问题,您对之前的回答/问题不满意,您需要问一个完全不同的问题吗?或者为什么这个问题足够不同?
  • 上一个问题是一个更通用的问题,涉及生产者已经可用的脚本,但这里我试图在 python 中实现。此外,在这个问题中,我更具体地说明了我尝试过的工具和技术。

标签: python python-2.7 apache-kafka tshark pyshark


【解决方案1】:

检查以下链接。

http://zdatainc.com/2014/07/real-time-streaming-apache-storm-apache-kafka/

实现 Kafka Producer 在这里,定义了用于测试我们集群的 Kafka 生产者代码的主要部分。 在主类中,我们设置了数据管道和线程:

LOGGER.debug("Setting up streams");
PipedInputStream send = new PipedInputStream(BUFFER_LEN);
PipedOutputStream input = new PipedOutputStream(send);

LOGGER.debug("Setting up connections");
LOGGER.debug("Setting up file reader");
BufferedFileReader reader = new BufferedFileReader(filename, input);
LOGGER.debug("Setting up kafka producer");
KafkaProducer kafkaProducer = new KafkaProducer(topic, send);

LOGGER.debug("Spinning up threads");
Thread source = new Thread(reader);
Thread kafka = new Thread(kafkaProducer);

source.start();
kafka.start();

LOGGER.debug("Joining");
kafka.join();
The BufferedFileReader in its own thread reads off the data from disk:
rd = new BufferedReader(new FileReader(this.fileToRead));
wd = new BufferedWriter(new OutputStreamWriter(this.outputStream, ENC));
int b = -1;
while ((b = rd.read()) != -1)
{
    wd.write(b);
}
Finally, the KafkaProducer sends asynchronous messages to the Kafka Cluster:
rd = new BufferedReader(new InputStreamReader(this.inputStream, ENC));
String line = null;
producer = new Producer<Integer, String>(conf);
while ((line = rd.readLine()) != null)
{
    producer.send(new KeyedMessage<Integer, String>(this.topic, line));
}
Doing these operations on separate threads gives us the benefit of having disk reads not block the Kafka producer or vice-versa, enabling maximum performance tunable by the size of the buffer.
Implementing the Storm Topology
Topology Definition
Moving onward to Storm, here we define the topology and how each bolt will be talking to each other:
TopologyBuilder topology = new TopologyBuilder();

topology.setSpout("kafka_spout", new KafkaSpout(kafkaConf), 4);

topology.setBolt("twitter_filter", new TwitterFilterBolt(), 4)
        .shuffleGrouping("kafka_spout");

topology.setBolt("text_filter", new TextFilterBolt(), 4)
        .shuffleGrouping("twitter_filter");

topology.setBolt("stemming", new StemmingBolt(), 4)
        .shuffleGrouping("text_filter");

topology.setBolt("positive", new PositiveSentimentBolt(), 4)
        .shuffleGrouping("stemming");
topology.setBolt("negative", new NegativeSentimentBolt(), 4)
        .shuffleGrouping("stemming");

topology.setBolt("join", new JoinSentimentsBolt(), 4)
        .fieldsGrouping("positive", new Fields("tweet_id"))
        .fieldsGrouping("negative", new Fields("tweet_id"));

topology.setBolt("score", new SentimentScoringBolt(), 4)
        .shuffleGrouping("join");

topology.setBolt("hdfs", new HDFSBolt(), 4)
        .shuffleGrouping("score");
topology.setBolt("nodejs", new NodeNotifierBolt(), 4)
        .shuffleGrouping("score");

值得注意的是,数据会被打乱到每个螺栓,直到加入时除外,因为将相同的推文提供给加入螺栓的同一实例非常重要。

【讨论】:

    猜你喜欢
    • 2018-01-31
    • 1970-01-01
    • 2020-09-18
    • 1970-01-01
    • 2023-03-28
    • 2021-01-09
    • 1970-01-01
    • 2017-12-24
    • 1970-01-01
    相关资源
    最近更新 更多