【问题标题】:How to Set spoutconfig from default setting?如何从默认设置设置 spoutconfig?
【发布时间】:2016-07-09 15:47:05
【问题描述】:

我正在尝试使用图形 api 获取 fb 页面数据。每个帖子的大小超过 1MB,其中 kafka 默认 fetch.message 为 1MB。通过在 kafa consumer.properties 和 server.properties 文件中添加以下行,我已将 kafka 属性从 1MB 更改为 3MB。

fetch.message.max.bytes=3048576 (consumer.properties)
file message.max.bytes=3048576 (server.properties)
replica.fetch.max.bytes=3048576 (server.properties )

现在在 Kafka 中添加上述行后,3MB 消息数据将进入 kafka 数据日志。但是 STORM 无法处理该数据,它只能读取默认大小,即 1MB 数据。我应该向风暴拓扑添加哪些参数以从 kafka 主题中读取 3MB 数据。我需要在风暴中增加 buffer.size 吗?不太清楚。

这是我的拓扑代码。

 String argument = args[0];
    Config conf = new Config();
    conf.put(JDBC_CONF, map);
    conf.setDebug(true);
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    //set the number of workers
    conf.setNumWorkers(3);

    TopologyBuilder builder = new TopologyBuilder();       
   //Setup Kafka spout
    BrokerHosts hosts = new ZkHosts("localhost:2181");
    String topic = "year1234"; 
    String zkRoot = "";
    String consumerGroupId = "group1";
    SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);

        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());           KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    builder.setSpout("KafkaSpout", kafkaSpout,1);        builder.setBolt("user_details", new Parserspout(),1).shuffleGrouping("KafkaSpout");        builder.setBolt("bolts_user", new bolts_user(cp),1).shuffleGrouping("user_details");

提前致谢

【问题讨论】:

    标签: apache-kafka apache-storm kafka-consumer-api bigdata


    【解决方案1】:

    SpoutConfig 类扩展了具有以下所有设置的 KafkaConfig:

    public int fetchSizeBytes = 1024 * 1024;
    public int socketTimeoutMs = 10000;
    public int fetchMaxWait = 10000;
    public int bufferSizeBytes = 1024 * 1024;
    public MultiScheme scheme = new RawMultiScheme();
    public boolean ignoreZkOffsets = false;
    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
    public long maxOffsetBehind = Long.MAX_VALUE;
    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
    public int metricsTimeBucketSizeInSecs = 60;
    

    注意它们是公开的,因此您可以更改它们

    spoutConfig.fetchSizeBytes = 3048576;
    spoutConfig.bufferSizeBytes = 3048576;
    

    请看这里:http://grepcode.com/file/repo1.maven.org/maven2/org.apache.storm/storm-kafka/0.9.2-incubating/storm/kafka/KafkaConfig.java#KafkaConfig

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-07-29
      • 2015-02-13
      • 1970-01-01
      • 2020-04-15
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多