【问题标题】:What is the recommended way to append to files on HDFS?附加到 HDFS 上的文件的推荐方法是什么?
【发布时间】:2016-09-23 20:40:33
【问题描述】:

我无法找到一种安全的方式来附加到HDFS 中的文件。

我正在使用一个小的 3-node Hadoop cluster (CDH v.5.3.9 to be specific)。我们的进程是一个数据管道,它是multi-threaded (8 threads),它有一个阶段,它将分隔文本行附加到HDFS 上的专用目录中的文件中。我正在使用锁来同步线程对附加数据的缓冲写入器的访问。

我的第一个问题是一般地决定方法。

方法 A 是打开文件,附加到它,然后为附加的每一行关闭它。这似乎很慢,并且似乎会产生太多的小块,或者至少我在各种帖子中看到了一些这样的情绪。

方法 B 是缓存写入器,但会定期刷新它们以确保写入器列表不会无限增长(目前,每个流水线处理的输入文件都有一个写入器)。这似乎是一种更有效的方法,但我想在一段时间内打开流但受到控制可能是一个问题,尤其是对于输出文件阅读器 (?)

除此之外,我真正的问题是两个。我正在使用FileSystem Java Hadoop API 进行附加,并且间歇性地得到这两个异常:

org.apache.hadoop.ipc.RemoteException: failed to create file /output/acme_20160524_1.txt for DFSClient_NONMAPREDUCE_271210261_1 for client XXX.XX.XXX.XX because current leaseholder is trying to recreate file.

org.apache.hadoop.ipc.RemoteException: BP-1999982165-XXX.XX.XXX.XX-1463070000410:blk_1073760252_54540 does not exist or is not under Constructionblk_1073760252_545 40{blockUCState=UNDER_RECOVERY, primaryNodeIndex=1, replicas=[ReplicaUnderConstruction[[DISK]DS-ccdf4e55-234b-4e17-955f-daaed1afdd92:NORMAL|RBW], ReplicaUnderConst ruction[[DISK]DS-1f66db61-759f-4c5d-bb3b-f78c260e338f:NORMAL|RBW]]}

有人对其中任何一个有任何想法吗?

对于第一个问题,我尝试过检测this post 中讨论的逻辑,但似乎没有帮助。

我也对dfs.support.append 属性的作用感兴趣,如果适用的话。

我获取文件系统的代码:

userGroupInfo = UserGroupInformation.createRemoteUser("hdfs"); Configuration conf = new Configuration();
conf.set(key1, val1);
...
conf.set(keyN, valN);
fileSystem = userGroupInfo.doAs(new PrivilegedExceptionAction<FileSystem>() { 
  public FileSystem run() throws Exception { 
   return FileSystem.get(conf);
  }
});

我获取 OutputStream 的代码:

org.apache.hadoop.fs.path.Path file = ...
public OutputStream getOutputStream(boolean append) throws IOException {   
  OutputStream os = null;
  synchronized (file) { 
    if (isFile()) {
      os = (append) ? fs.append(file) : fs.create(file, true);
    } else if (append) {
      // Create the file first, to avoid "failed to append to non-existent file" exception
      FSDataOutputStream dos = fs.create(file);
      dos.close();
      // or, this can be: fs.createNewFile(file);
      os = fs.append(file);
    }
    // Creating a new file
    else { 
      os = fs.create(file);
    }
  }
  return os;
} 

【问题讨论】:

  • 看来issues.apache.org/jira/browse/HDFS-7203 可能在这里起作用。我有原型代码,它维护一个附加到单个文件的单个线程。但我仍然看到“当前的承租人正在尝试重新创建文件”异常。

标签: java hadoop append hdfs


【解决方案1】:

我在 CDH 5.3 / HDFS 2.5.0 中添加了文件。到目前为止,我的结论如下:

  • 不能有一个专用线程执行每个文件的追加操作,或者多个线程写入多个文件,无论我们是通过 HDFS API 文件系统的同一个实例还是不同的实例写入数据。
  • 无法刷新(即关闭并重新打开)编写器;他们必须保持开放。
  • 最后一项会导致偶尔出现的相对罕见的 ClosedChannelException,这似乎是可恢复的(通过重试追加)。
  • 我们使用带有阻塞队列的单线程执行器服务(用于附加到所有文件);每个文件一个写入器,写入器保持打开状态(直到它们关闭时处理结束)。
  • 当我们升级到高于 5.3 的 CDH 时,我们会重新审视这一点,看看哪种线程策略是有意义的:一个且只有一个线程,每个文件一个线程,多个线程写入多个文件。此外,我们还想看看是否可以/需要定期关闭和重新打开 writer。
  • 此外,我还看到了以下错误,并且能够通过将 'dfs.client.block.write.replace-datanode-on-failure.policy' 设置为客户端。
java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[XXX.XX.XXX.XX:50010, XXX.XX.XXX.XX:50010], original=[XXX.XX.XXX.XX:50010, XXX.XX.XXX.XX:50010]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
       at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:969) ~[hadoop-hdfs-2.5.0.jar:?]
       at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1035) ~[hadoop-hdfs-2.5.0.jar:?]
       at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1184) ~[hadoop-hdfs-2.5.0.jar:?]
       at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:532) ~[hadoop-hdfs-2.5.0.jar:?] 

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-12-15
    • 2015-03-28
    • 2015-07-18
    • 2014-10-05
    • 2017-04-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多