【问题标题】:Streaming HDFS data to Storm (aka HDFS spout)将 HDFS 数据流式传输到 Storm(又名 HDFS spout)
【发布时间】:2015-05-14 08:25:18
【问题描述】:

我想知道是否有任何 spout 实现将数据从 HDFS 流式传输到 Storm(类似于 HDFS 的 Spark Streaming)。我知道有螺栓实现可以将数据写入 HDFS(https://github.com/ptgoetz/storm-hdfshttp://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.1.3/bk_user-guide/content/ch_storm-using-hdfs-connector.html),但反过来我找不到。 我感谢任何建议和提示。

【问题讨论】:

    标签: hadoop hdfs apache-storm


    【解决方案1】:

    一个选项是使用 Hadoop HDFS java API。假设您使用的是 maven,您将在 pom.xml 中包含 hadoop-common:

    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <version>2.6.0.2.2.0.0-2041</version>
    </dependency>
    

    然后,在您的 spout 实现中,您将使用 HDFS FileSystem 对象。例如,下面是一些将文件中的每一行作为字符串发出的伪代码:

    @Override
    public void nextTuple() {
       try {
          Path pt=new Path("hdfs://servername:8020/user/hdfs/file.txt");
          FileSystem fs = FileSystem.get(new Configuration());
          BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
          String line = br.readLine();
          while (line != null){
             System.out.println(line);
             line=br.readLine();
             // emit the line which was read from the HDFS file
             // _collector is a private member variable of type SpoutOutputCollector set in the open method;
             _collector.emit(new Values(line));
          }
       } catch (Exception e) {
          _collector.reportError(e);
          LOG.error("HDFS spout error {}", e);
       }
    }
    

    【讨论】:

    • 谢谢你,基特!这确实是针对单个文件逐个流式处理元组的解决方案。用于批量元组的喷口(又名 Storm Trident)怎么样?
    • @florins 我自己没有尝试过 Trident,但看起来你会实现 IBatchSpout 然后你的代码会进入 emitBatch 而不是 nextTuple。
    猜你喜欢
    • 1970-01-01
    • 2013-06-19
    • 2020-05-06
    • 1970-01-01
    • 2016-04-27
    • 2018-10-08
    • 1970-01-01
    • 1970-01-01
    • 2014-12-25
    相关资源
    最近更新 更多