Storm 与 Redis 整合
原创编写: 王宇
2016-11-07
Storm 与 Redis 整合
参考资料:http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-redis.html
Strom为Redis提供几个标准Bolt
RedisLookupBolt 例子
RedisStoreBoltBolt 例子
非简单Bolt
RedisLookupBolt 源代码,用于理解WordCountRedisLookupMapper
Kafka + Storm + Redis 完整例子
依赖包
编译执行
参考资料:http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-redis.html
Strom为Redis提供几个标准Bolt
RedisLookupBolt 例子
RedisStoreBoltBolt 例子
非简单Bolt
RedisLookupBolt 源代码,用于理解WordCountRedisLookupMapper
Kafka + Storm + Redis 完整例子
依赖包
编译执行
参考资料:http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-redis.html
Strom为Redis提供几个标准Bolt
- RedisLookupBolt: 查询
- RedisStoreBolt: 存储
- AbstractRedisBolt: 存储
RedisLookupBolt 例子
从Redis中,查询单词的计算数量
classWordCountRedisLookupMapperimplementsRedisLookupMapper{privateRedisDataTypeDescription description;privatefinalString hashKey ="wordCount";publicWordCountRedisLookupMapper(){description =newRedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);}@OverridepublicList<Values> toTuple(ITuple input,Object value){String member = getKeyFromTuple(input);List<Values> values =Lists.newArrayList();values.add(newValues(member, value));return values;}@Overridepublicvoid declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(newFields("wordName","count"));}@OverridepublicRedisDataTypeDescription getDataTypeDescription(){return description;}@OverridepublicString getKeyFromTuple(ITuple tuple){return tuple.getStringByField("word");}@OverridepublicString getValueFromTuple(ITuple tuple){returnnull;}}JedisPoolConfig poolConfig =newJedisPoolConfig.Builder().setHost(host).setPort(port).build();RedisLookupMapper lookupMapper =newWordCountRedisLookupMapper();RedisLookupBolt lookupBolt =newRedisLookupBolt(poolConfig, lookupMapper);
- 对例子理解
-
如上图,JedisPoolConfig的作用是,提供Redis相关的配置,给RedisLookupBolt
RedisLookupMapper的作用是,绘制相应的数据结构给RedisLookupBolt.
例如- getKeyFromTuple()方法, 告诉RedisLookupBolt,从输入的Tuple中,取什么样子的Key,让它去Redis中查询。
- declareOutputFields()的作用是,定义RedisLookupBolt的输出格式。
RedisStoreBoltBolt 例子
classWordCountStoreMapperimplementsRedisStoreMapper{privateRedisDataTypeDescription description;privatefinalString hashKey ="wordCount";publicWordCountStoreMapper(){description =newRedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);}@OverridepublicRedisDataTypeDescription getDataTypeDescription(){return description;}@OverridepublicString getKeyFromTuple(ITuple tuple){return tuple.getStringByField("word");}@OverridepublicString getValueFromTuple(ITuple tuple){return tuple.getStringByField("count");}}JedisPoolConfig poolConfig =newJedisPoolConfig.Builder().setHost(host).setPort(port).build();RedisStoreMapper storeMapper =newWordCountStoreMapper();RedisStoreBolt storeBolt =newRedisStoreBolt(poolConfig, storeMapper);
非简单Bolt
采用 AbstractRedisBolt 实现更为复杂的逻辑
publicstaticclassLookupWordTotalCountBoltextendsAbstractRedisBolt{privatestaticfinalLogger LOG =LoggerFactory.getLogger(LookupWordTotalCountBolt.class);privatestaticfinalRandom RANDOM =newRandom();publicLookupWordTotalCountBolt(JedisPoolConfig config){super(config);}publicLookupWordTotalCountBolt(JedisClusterConfig config){super(config);}@Overridepublicvoid execute(Tuple input){JedisCommands jedisCommands =null;try{jedisCommands = getInstance();String wordName = input.getStringByField("word");String countStr = jedisCommands.get(wordName);if(countStr !=null){int count =Integer.parseInt(countStr);this.collector.emit(newValues(wordName, count));// print lookup result with low probabilityif(RANDOM.nextInt(1000)>995){LOG.info("Lookup result - word : "+ wordName +" / count : "+ count);}}else{// skipLOG.warn("Word not found in Redis - word : "+ wordName);}}finally{if(jedisCommands !=null){returnInstance(jedisCommands);}this.collector.ack(input);}}@Overridepublicvoid declareOutputFields(OutputFieldsDeclarer declarer){// wordName, countdeclarer.declare(newFields("wordName","count"));}}
RedisLookupBolt 源代码,用于理解WordCountRedisLookupMapper
注意下列代码中的:
- get
package org.apache.storm.redis.bolt;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;import org.apache.storm.redis.common.mapper.RedisLookupMapper;import org.apache.storm.redis.common.config.JedisClusterConfig;import org.apache.storm.redis.common.config.JedisPoolConfig;import redis.clients.jedis.JedisCommands;import java.util.List;/*** Basic bolt for querying from Redis and emits response as tuple.* <p/>* Various data types are supported: STRING, LIST, HASH, SET, SORTED_SET, HYPER_LOG_LOG, GEO*/publicclassRedisLookupBoltextendsAbstractRedisBolt{privatefinalRedisLookupMapper lookupMapper;privatefinalRedisDataTypeDescription.RedisDataType dataType;privatefinalString additionalKey;/*** Constructor for single Redis environment (JedisPool)* @param config configuration for initializing JedisPool* @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses*/publicRedisLookupBolt(JedisPoolConfig config,RedisLookupMapper lookupMapper){super(config);this.lookupMapper = lookupMapper;RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();this.dataType = dataTypeDescription.getDataType();this.additionalKey = dataTypeDescription.getAdditionalKey();}/*** Constructor for Redis Cluster environment (JedisCluster)* @param config configuration for initializing JedisCluster* @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses*/publicRedisLookupBolt(JedisClusterConfig config,RedisLookupMapper lookupMapper){super(config);this.lookupMapper = lookupMapper;RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();this.dataType = dataTypeDescription.getDataType();this.additionalKey = dataTypeDescription.getAdditionalKey();}/*** {@inheritDoc}*/@Overridepublicvoid execute(Tuple input){String key = lookupMapper.getKeyFromTuple(input);Object lookupValue;JedisCommands jedisCommand =null;try{jedisCommand = getInstance();switch(dataType){case STRING:lookupValue = jedisCommand.get(key);break;case LIST:lookupValue = jedisCommand.lpop(key);break;case HASH:lookupValue = jedisCommand.hget(additionalKey, key);break;case SET:lookupValue = jedisCommand.scard(key);break;case SORTED_SET:lookupValue = jedisCommand.zscore(additionalKey, key);break;case HYPER_LOG_LOG:lookupValue = jedisCommand.pfcount(key);break;case GEO:lookupValue = jedisCommand.geopos(additionalKey, key);break;default:thrownewIllegalArgumentException("Cannot process such data type: "+ dataType);}List<Values> values = lookupMapper.toTuple(input, lookupValue);for(Values value : values){collector.emit(input, value);}collector.ack(input);}catch(Exception e){this.collector.reportError(e);this.collector.fail(input);}finally{returnInstance(jedisCommand);}}/*** {@inheritDoc}*/@Overridepublicvoid declareOutputFields(OutputFieldsDeclarer declarer){lookupMapper.declareOutputFields(declarer);}}
Kafka + Storm + Redis 完整例子
- 场景
参考 《Storm与Kafka整合记录》中,统计单词数量的例子。在这个例子基础之上,将统计到单词结果保存到Redis中。 - 修改Topology,增加RedisStoreBolt
publicclassKafkaStormSample{publicclassKafkaStormSample{publicstaticvoid main(String[] args)throwsException{Config config =newConfig();config.setDebug(true);config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);String zkConnString ="localhost:2181";String topic ="my-first-topic";BrokerHosts hosts =newZkHosts(zkConnString);SpoutConfig kafkaSpoutConfig =newSpoutConfig(hosts, topic,"/"+ topic, UUID.randomUUID().toString());kafkaSpoutConfig.bufferSizeBytes =1024*1024*4;kafkaSpoutConfig.fetchSizeBytes =1024*1024*4;// kafkaSpoutConfig.forceFromStart = true;kafkaSpoutConfig.scheme =newSchemeAsMultiScheme(newStringScheme());// Creating RedisStoreBoltString host ="localhost";int port =6379;JedisPoolConfig poolConfig =newJedisPoolConfig.Builder().setHost(host).setPort(port).build();RedisStoreMapper storeMapper =newWordCountStoreMapper();RedisStoreBolt storeBolt =newRedisStoreBolt(poolConfig, storeMapper);// Assemble with topologyTopologyBuilder builder =newTopologyBuilder();builder.setSpout("kafka-spout",newKafkaSpout(kafkaSpoutConfig));builder.setBolt("word-spitter",newSplitBolt()).shuffleGrouping("kafka-spout");builder.setBolt("word-counter",newCountBolt()).shuffleGrouping("word-spitter");builder.setBolt("redis-store-bolt", storeBolt).shuffleGrouping("word-counter");LocalCluster cluster =newLocalCluster();cluster.submitTopology("KafkaStormSample", config, builder.createTopology());Thread.sleep(10000);cluster.shutdown();}}
- 修改 CountBolt, emit to RedisStoreBolt
publicclassCountBoltimplementsIRichBolt{Map<String,Integer> counters;privateOutputCollector collector;@Overridepublicvoid prepare(Map stormConf,TopologyContext context,OutputCollector collector){this.counters =newHashMap<String,Integer>();this.collector = collector;}@Overridepublicvoid execute(Tuple input){String str = input.getString(0);if(!counters.containsKey(str)){counters.put(str,1);}else{Integer c = counters.get(str)+1;counters.put(str, c);}// emit to redis-store-boltInteger result = counters.get(str);this.collector.emit(newValues(str,String.valueOf(result)));collector.ack(input);}@Overridepublicvoid cleanup(){for(Map.Entry<String,Integer> entry:counters.entrySet()){System.out.println(entry.getKey()+" : "+ entry.getValue());}}@Overridepublicvoid declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(newFields("word","count"));}@OverridepublicMap<String,Object> getComponentConfiguration(){returnnull;}}
依赖包
jedis-2.9.0.jar
commons-pool2-2.4.2.jar
将以上两个包,增加到CLASSPATH路径中
编译执行
-
启动服务
- 启动 zookeeper
$ cd /opt/zookeeper$ ./bin/zkServer.sh start
- 启动 redis
$ redis-server
- 启动 kafka
$ cd /opt/kafka$ ./bin/kafka-server-start.sh config/server.properties
- 启动 storm
$ cd /opt/storm$./bin/storm nimbus$./bin/storm supervisor
- 启动 zookeeper
-
输入数据
$cd /opt/kafka$./bin/kafka-console-producer.sh --broker-list localhost:9092--topic my-first-topicHelloMy first messageMy second message
- 执行例子
$ java KafkaStormSample
- 在Redis-cli 中查询结果
$ redis-cli127.0.0.1:6379> hvals wordCount