【问题标题】:HDFS file watcherHDFS 文件观察器
【发布时间】:2015-07-09 17:02:36
【问题描述】:

我可以在 HDFS 上安装 file watcher 吗?

场景: 文件不断登陆HDFS。一旦文件数量达到阈值(可以是文件数量或文件大小),我想启动Spark Job。

是否可以在 HDFS 上实现文件观察器来实现这一点。如果是,那么任何人都可以建议这样做的方法吗?有哪些不同的选择? Zookeeper 或 Oozie 能做到吗?

任何帮助将不胜感激。谢谢。

【问题讨论】:

  • Spark Streaming 也有类似的功能:在FileInputDStream
  • 我能想到的简单的事情是你可以像这样使用 unix 命令:hadoop fs -ls | wc -l
  • @YijieShen 可以详细说明一下吗?
  • @user3484461 是的。它会列出 hdfs 目录中的所有文件及其详细信息。我正在尝试按照您的建议实施它。
  • 是的,您可以使用 Inotification 执行此操作。您只需要通过 inotifyier 获取 HDFS 事务的详细信息,以便更好地理解阅读此link

标签: hadoop apache-spark hdfs file-watcher


【解决方案1】:

Hadoop 2.6 引入了DFSInotifyEventInputStream,您可以使用它。您可以从HdfsAdmin 获取它的一个实例,然后只需调用.take().poll() 即可获取所有事件。事件类型包括删除、追加和创建,它们应该涵盖您要查找的内容。

这是一个基本示例。确保以 hdfs 用户身份运行它,因为管理界面需要 HDFS root。

public static void main( String[] args ) throws IOException, InterruptedException, MissingEventsException
{
    HdfsAdmin admin = new HdfsAdmin( URI.create( args[0] ), new Configuration() );
    DFSInotifyEventInputStream eventStream = admin.getInotifyEventStream();
    while( true ) {
        EventBatch events = eventStream.take();
        for( Event event : events.getEvents() ) {
            System.out.println( "event type = " + event.getEventType() );
            switch( event.getEventType() ) {
                case CREATE:
                    CreateEvent createEvent = (CreateEvent) event;
                    System.out.println( "  path = " + createEvent.getPath() );
                    break;
                default:
                    break;
            }
        }
    }
}

这是一篇更详细的博文:

http://johnjianfang.blogspot.com/2015/03/hdfs-6634-inotify-in-hdfs.html?m=1

【讨论】:

    【解决方案2】:

    Oozie 协调器可以做到这一点。可以根据数据可用性触发 Oozie 协调器操作。编写一个数据触发的协调器。协调器动作是基于完成标志触发的。 done-flag 只是一个空文件。因此,当达到您的阈值时,将一个空文件写入目录。

    【讨论】:

      【解决方案3】:

      老线程...如果有人想在Scala这样做

      import java.net.URI
      
      import org.apache.hadoop.conf.Configuration
      import org.apache.hadoop.hdfs.client.HdfsAdmin
      import org.apache.hadoop.hdfs.inotify.Event.{AppendEvent, CreateEvent, RenameEvent}
      
      
      object HDFSTest extends App {
        val admin = new HdfsAdmin( URI.create( "hdfs://namenode:port" ), new Configuration() )
        val eventStream = admin.getInotifyEventStream()
      
        while( true ) {
          val events =  eventStream.poll(2l, java.util.concurrent.TimeUnit.SECONDS)
          events.getEvents.toList.foreach { event ⇒
            println(s"event type = ${event.getEventType}")
            event match {
              case create: CreateEvent ⇒
                println("CREATE: " + create.getPath)
      
              case rename: RenameEvent ⇒
                println("RENAME: " + rename.getSrcPath + " => " + rename.getDstPath)
      
              case append: AppendEvent ⇒
                println("APPEND: " + append.getPath)
      
              case other ⇒
                println("other: " + other)
            }
          }
        }
      }
      

      如果想使用模拟用户... set env var: HADOOP_USER_NAME=user-name

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2022-01-13
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-04-11
        相关资源
        最近更新 更多