【问题标题】:Storm : Spout for reading data from a portStorm : 从端口读取数据的 Spout
【发布时间】:2014-07-01 10:06:26
【问题描述】:

我需要编写一个storm spout 来从端口读取数据。想知道这在逻辑上是否可行。

考虑到这一点,我设计了一个简单的拓扑结构,设计用于相同的一个喷嘴和一个螺栓。 spout 将收集使用 wget 发送的 HTTP 请求,而 bolt 将显示该请求。

我的spout结构如下:

public class ProxySpout extends BaseRichSpout{
         //The O/P collector
         SpoutOutputCollector sc;
         //The socket
         Socket clientSocket;
         //The server socket
         ServerSocket sc;

         public ProxySpout(int port){
            this.sc=new ServerSocket(port);
            try{
                clientSocket=sc.accept();
            }catch(IOException ex){
                //Handle it
            }
         }

         public void nextTuple(){
            try{
                InputStream ic=clientSocket.getInputStream();
                byte b=new byte[8196];
                int len=ic.read(b);

                sc.emit(new Values(b));
                ic.close();
            }catch(//){
                //Handle it
            }finally{
                clientSocket.close();
            }
         }
}

其余的方法我也实现了。

当我把它变成拓扑并运行它时,当我发送第一个请求时出现错误:

java.lang.RuntimeException:java.io.NotSerializableException:java.net.Socket

只需要知道我实现这个 spout 的方式是否有问题。 spout 甚至可以从端口收集数据吗?还是让 spout 充当代理的实例?

编辑

搞定了。

代码是:

   public class ProxySpout extends BaseRichSpout{
         //The O/P collector
         static SpoutOutputCollector _collector;
         //The socket
         static Socket _clientSocket;
         static ServerSocket _serverSocket;
         static int _port;

         public ProxySpout(int port){
          _port=port;
         }

         public void open(Map conf,TopologyContext context, SpoutOutputCollector collector){
           _collector=collector;
           _serverSocket=new ServerSocket(_port);
         }   

         public void nextTuple(){
            _clientSocket=_serverSocket.accept();
            InputStream incomingIS=_clientSocket.getInputStream();
            byte[] b=new byte[8196];
            int len=b.incomingIS.read(b);
            _collector.emit(new Values(b));
     }
}

按照@Shaw 的建议,尝试在open() 方法中初始化_serverSocket,并在nextTuple() 方法中运行_clientSocket 来监听请求。

不知道这个的性能指标,但它有效..:-)

【问题讨论】:

    标签: java apache-storm


    【解决方案1】:

    在构造函数中只分配变量。尝试在 prepare 方法中实例化 ServerSocket,不要在构造函数中写任何 new ...。并重命名变量,你有两个 sc 变量。

    public class ProxySpout extends BaseRichSpout{
    
        int port;
    
        public ProxySpout(int port){
            this.port=port;
        }
    
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)  { 
            //new ServerSocket
        }
    
        @Override
        public void nextTuple() {
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
        }
    }
    

    如果你把它放在prepare方法中,那么它只会在spout已经部署后才会被调用,所以它不需要序列化,并且它只会在spout的生命周期中调用一次,所以效率并不低.

    【讨论】:

    • 那么,有可能吗?让 spout 函数像代理一样?
    • 是的,但是 nextTuple() 每隔一段时间就会被调用一次,你必须管理这个,如果 spout 没有收到任何东西,错误......
    猜你喜欢
    • 2015-07-30
    • 2014-03-02
    • 2016-04-17
    • 2018-08-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多