【问题标题】:Redis Error On a Storm topologyStorm 拓扑上的 Redis 错误
【发布时间】:2016-04-16 06:38:59
【问题描述】:

我正在使用 redis 从我的 Storm 拓扑中提取数据,我一直在阅读它,并找到了一些示例。

现在,我正在尝试让它运行,稍后我将实现一个从 redis 读取的前端,并在一个 html 文件上创建一个 d3.js 实现,它将数据放在一个图表。我现在已经为本地文件实现了 D3 部分。目前,我没有创建用于表示我的数据的数学函数,我只是创建从文本文件中读取名称并附加“:) :)”符号的元组。这部分工作也很好,但我在 Storm 0.10.0 上尝试使用 storm-redis 时遇到问题。

据我目前了解(如果我错了,请纠正我)redis 是一个数据库,例如 mongoDB、非 SQL 和使用字段来检索键。我有一个实施,但它不工作。我在一行上编译时出错,它已被评论。这是我的代码:

package Storm.practice.Storm.Prova;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.Map;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import redis.clients.jedis.JedisCommands;

/**
 * This is a basic example of a Storm topology.
 */
public class ProvaTopology {

  public static class ProvaBolt extends BaseRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "  :-)"));
      _collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("morts"));
    }


  }
  public class ProvaSpout extends BaseRichSpout {
      SpoutOutputCollector _collector;
      //Random _rand;
      private String fileName;
      //private SpoutOutputCollector _collector;
      private BufferedReader reader;
      private AtomicLong linesRead;

      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        try {
            fileName= (String)"/home/prova.tsv";
            reader = new BufferedReader(new FileReader(fileName));
            // read and ignore the header if one exists
          } catch (Exception e) {
            throw new RuntimeException(e);
          }
       // _rand = new Random();
      }

      public void nextTuple() {
        Utils.sleep(100);


      try {
            String line = reader.readLine();
            if (line != null) {
              long id = linesRead.incrementAndGet();
              _collector.emit(new Values(line), id);
            } else {
              System.out.println("Finished reading file, " + linesRead.get() + " lines read");
              Thread.sleep(10000);
            }
          } catch (Exception e) {
            e.printStackTrace();
          }
      }
        /*String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
            "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
        int _rand;*/
        //String sentence = sentences[_rand.nextInt(sentences.length)];
        //_collector.emit(new Values(sentence));


      public void ack(Object id) {
      }

      public void fail(Object id) {
      }

      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("morts"));
      }

    }


  class MortsStoreMapper implements RedisStoreMapper {
        private RedisDataTypeDescription description;
        private final String hashKey = "Morts";

        public void MortsStoreStoreMapper() {
            description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
        }


        public RedisDataTypeDescription getDataTypeDescription() {
            return description;
        }


        public String getKeyFromTuple(ITuple tuple) {
            return tuple.getStringByField("morts");
        }


        public String getValueFromTuple(ITuple tuple) {
            return tuple.getStringByField("somriures");
        }
    }

    public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
            .setHost("127.0.0.1").setPort(666).build();
    RedisStoreMapper storeMapper = new MortsStoreMapper();**//ERROR HERE** Non enclosing instance of type ProvaTopology is accessible. Must qualify the allocation with an enclosing instance of type ProvaTopology.
    RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);

    builder.setSpout("morts", new TestWordSpout(), 10);//emisor
    builder.setBolt("happy", new ProvaBolt(), 3).shuffleGrouping("morts");// de on llig?
    builder.setBolt("meal", new ProvaBolt(), 2).shuffleGrouping("happy");// de on llig?

    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                                   //WithProgressBar
    }
    else {

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());
      Utils.sleep(10000);
      cluster.killTopology("test");
      cluster.shutdown();
    }
  }
}

尽管有这个错误,但一旦解决,我不确定我的拓扑是否会保存任何内容,一旦保存,我对如何使用前端检索它有点迷茫。任何帮助将不胜感激。

提前致谢

这是我在网上找到的一个绝地实现,在 Storm-redis 教程中,以及我正在使用的一个,以防万一:

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.storm.redis.common.config;

import redis.clients.jedis.Protocol;

import java.io.Serializable;

/**
 * Configuration for JedisPool.
 */
public class JedisPoolConfig implements Serializable {
    public static final String DEFAULT_HOST = "127.0.0.1";

    private String host;
    private int port;
    private int timeout;
    private int database;
    private String password;

    /**
     * Constructor
     * <p/>
     * You can use JedisPoolConfig.Builder() for leaving some fields to apply default value.
     *
     * @param host hostname or IP
     * @param port port
     * @param timeout socket / connection timeout
     * @param database database index
     * @param password password, if any
     */
    public JedisPoolConfig(String host, int port, int timeout, String password, int database) {
        this.host = host;
        this.port = port;
        this.timeout = timeout;
        this.database = database;
        this.password = password;
    }

    /**
     * Returns host.
     * @return hostname or IP
     */
    public String getHost() {
        return host;
    }

    /**
     * Returns port.
     * @return port
     */
    public int getPort() {
        return port;
    }

    /**
     * Returns timeout.
     * @return socket / connection timeout
     */
    public int getTimeout() {
        return timeout;
    }

    /**
     * Returns database index.
     * @return database index
     */
    public int getDatabase() {
        return database;
    }

    /**
     * Returns password.
     * @return password
     */
    public String getPassword() {
        return password;
    }

    /**
     * Builder for initializing JedisPoolConfig.
     */
    public static class Builder {
        private String host = DEFAULT_HOST;
        private int port = Protocol.DEFAULT_PORT;
        private int timeout = Protocol.DEFAULT_TIMEOUT;
        private int database = Protocol.DEFAULT_DATABASE;
        private String password;

        /**
         * Sets host.
         * @param host host
         * @return Builder itself
         */
        public Builder setHost(String host) {
            this.host = host;
            return this;
        }

        /**
         * Sets port.
         * @param port port
         * @return Builder itself
         */
        public Builder setPort(int port) {
            this.port = port;
            return this;
        }

        /**
         * Sets timeout.
         * @param timeout timeout
         * @return Builder itself
         */
        public Builder setTimeout(int timeout) {
            this.timeout = timeout;
            return this;
        }

        /**
         * Sets database index.
         * @param database database index
         * @return Builder itself
         */
        public Builder setDatabase(int database) {
            this.database = database;
            return this;
        }

        /**
         * Sets password.
         * @param password password, if any
         * @return Builder itself
         */
        public Builder setPassword(String password) {
            this.password = password;
            return this;
        }

        /**
         * Builds JedisPoolConfig.
         * @return JedisPoolConfig
         */
        public JedisPoolConfig build() {
            return new JedisPoolConfig(host, port, timeout, password, database);
        }
    }
}

【问题讨论】:

    标签: redis frontend apache-storm jedis


    【解决方案1】:

    我终于来了!我只是让它变得简单,我创建了一个非常简单的发布内容的 redis bolt,我监控了 redis 数据库,它正在工作。我的工作螺栓:

     public class RedisBolt implements IRichBolt {
    
            protected String channel = "Somriures";
            //    protected String configChannel;
            protected OutputCollector collector;
            //    protected Tuple currentTuple;
            //    protected Logger log;
            protected JedisPool pool;
            //    protected ConfigListenerThread configListenerThread;
    
            public RedisBolt(){}
            public RedisBolt(String channel) {
    
            //  log = Logger.getLogger(getClass().getName());
            //  setupNonSerializableAttributes();
            }
    
            public void prepare(Map stormConf, TopologyContext context,
                    OutputCollector collector) {
            this.collector = collector;
            pool=new JedisPool("127.0.0.1");
              GenericObjectPoolConfig config = new GenericObjectPoolConfig();
            config.setMaxTotal(100);
            config.setTestOnBorrow(true);
            }
    
    
    
            public void execute(Tuple tuple) {
            String current = tuple.getString(0);
            if(current != null) {
                //      for(Object obj: result) {
                System.out.println("Publiquem " + current);
                publish(current);
                System.out.println("emitim " + current);
                collector.emit(tuple, new Values(current));
                //      }
                collector.ack(tuple);
            }
            }
    
            public void cleanup() {
            if(pool != null) {
                pool.destroy();
            }
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields(channel));
            }
    
            public void publish(String msg) {
            Jedis jedis = pool.getResource();
            jedis.publish(channel, msg);
    
            pool.returnResource(jedis);
            }
    
            protected void setupNonSerializableAttributes() {
    
            }
    
            public Map getComponentConfiguration() {
            return null;
            }
        }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-02-06
      • 2019-08-04
      • 2019-05-24
      • 1970-01-01
      • 2016-11-18
      • 2014-04-27
      • 1970-01-01
      相关资源
      最近更新 更多