【问题标题】:What is the most robust way to append text to a single file from multiple connections将文本从多个连接附加到单个文件的最可靠方法是什么
【发布时间】:2013-02-24 01:32:30
【问题描述】:

我已经看到很多关于writing 文件的questions,但我想知道打开文本文件、附加一些数据然后在您要从以下位置写入时再次关闭它的最可靠方法是什么许多连接(即在并行计算情况下),并且不能保证每个连接何时要写入文件。

例如在下面的玩具示例中,它只使用了我桌面上的内核,它似乎工作正常,但我想知道如果写入时间变长并且写入的进程数,这种方法是否容易失败文件增加(尤其是在可能存在一些延迟的网络共享上)。

当可能有其他从属进程想要同时写入文件时,任何人都可以提出一种可靠、明确的方式来打开、写入然后关闭连接吗?

require(doParallel)
require(doRNG)

ncores <- 7
cl <- makeCluster( ncores , outfile = "" )
registerDoParallel( cl )

res <- foreach( j = 1:100 , .verbose = TRUE , .inorder= FALSE ) %dorng%{
    d <- matrix( rnorm( 1e3 , j ) , nrow = 1 )
    conn <- file( "~/output.txt" , open = "a" )
    write.table( d , conn , append = TRUE , col.names = FALSE )
    close( conn )
}

我正在寻找执行此操作的最佳方法,或者是否有最好的方法。也许 R 和 foreach 会自动处理我所说的写锁问题?

谢谢。

【问题讨论】:

  • 不知道 R,我无法给出明确的答案,但是使用其他语言的一种有效方法是将一个线程专用于 IO,并为该 IO 线程设置一个写入命令队列来处理。该线程可以批量写入,从而减少它所花费的时间。
  • 基本上,这将是生产者-消费者模式的一个实例
  • @didierc 感谢您的建议。我应该明确表示我正在寻找以R 为中心的答案。尤其是在具有多个内核的多个节点尝试访问网络共享上的同一文件的情况下。也许我发布的内容已经足够了。 TBH 我可能应该找到一个场景,它首先坏了,但我在抢先!
  • 您没有做错任何事情:您已正确标记了您的问题。但没有看到很多答案,我想我可能会以某种方式帮助你
  • 如果您使用的是 POSIX 文件系统并且您的追加小于 PIPE_BUF 字节(Linux 上为 4k),那么追加操作是原子的。见Is file append atomic in UNIX?。这是假设 R 不会将输入分成多个块。

标签: r file-io foreach parallel-processing


【解决方案1】:

@didierc 提出的方法的一个变体是从组合函数编写矩阵:

conn <- file("~/output.txt", "w")
wtab <- function(conn, d) {
    write.table(d, conn, col.names=FALSE)
    conn
}

res <- foreach(j = 1:100, .init=conn, .combine='wtab') %dorng% {
    matrix( rnorm( 1e3 , j ) , nrow = 1 )
}

close(conn)

当与 doSNOW 和 doMPI 等并行后端一起使用时,这种技术特别有用,它们可以在将结果发送回主节点时即时调用组合函数。

【讨论】:

  • 我想我还是买你的书吧!
  • 有时远程服务器会重新启动,我想确保不会丢失任何已处理的内容。此解决方案在这种情况下是否有效?还是只有在循环完成后才能得到结果?
  • 我的另一个问题是我需要在循环中写入多个文件。
  • @rrs 是的,如果您使用 doSNOW 或 doMPI,则在返回结果时调用 combine 函数(即时),因此您不会丢失已发送到主人。
  • @rrs 可以通过将所有打开的文件对象放在一个列表中并在组合函数中循环该列表来处理多个输出文件。
【解决方案2】:

foreach 包不提供文件锁定机制,以防止多个工作人员同时写入同一个文件。这样做的结果将取决于您的操作系统和文件系统。当使用分布式文件系统(如 NFS)时,我会特别担心结果。

相反,我会更改您打开输出文件的方式以包含工作人员的进程 ID:

conn <- file( sprintf("~/output_%d.txt" , Sys.getpid()) , open = "a" )

如果需要,您可以在 foreach 循环返回后连接文件。

当然,如果您使用多台机器,您可能有两个具有相同进程 ID 的工作人员,因此您也可以在文件名中包含主机名,例如使用 Sys.info()[['nodename']]

【讨论】:

  • 谢谢史蒂夫。我喜欢你的推理,这似乎是一个明智、强大的解决方案。我将暂时搁置这个问题(1 个月?),希望它可能会获得更多建议,但之后我会选择一个已发布的答案来关闭它。谢谢!
【解决方案3】:

您也许可以尝试类似的方法:

res <- foreach( j = 1:100 , .verbose = TRUE , .inorder= FALSE ) %dorng%{
    matrix( rnorm( 1e3 , j ) , nrow = 1 )
}

conn <- file("~/output.txt", open = "a")
apply(res, 1, function (x, output) {
    write.table( x , conn , append = TRUE , col.names = FALSE )
  }, conn)

close(conn)

来源:foreach row in a dataframe

【讨论】:

  • 如果其中有错误,我相信有人会出现纠正我。
  • 嗨@didierc。代码看起来不错。我应该在我的问题中澄清我希望从循环内输出。我想确保在结果完成时保存结果,以便循环的未来迭代发生崩溃或错误意味着不会丢失先前的结果(有时您的循环比节点多,因此从站排队等待更多工作)。思考是否允许所有循环完成然后从一个进程写入是最健壮的,或者结果是否完整? +1 用于可靠的工作代码。我希望能得到更多的意见。感谢您的关注!
  • 我想知道(例如)从属进程是否有一种的方法来首先检查它们是否可以写入文件,以及它们是否不能等待直到他们可以继续之前,有一些预先指定的超时时间。或者,如果这在 Rforeach 循环中甚至是必要的。我想当不止一台机器试图写入同一个文件时,一定存在某种锁定问题?
  • 我想我最初的想法是对的:你需要一些消息队列机制 - 应该做一个列表,每个任务在完成后立即写入其结果,并处理不同的任务那个清单。 %dorng%%dopar% 似乎都适用于数组,这将排除它们。我快速浏览了提供 mapreduce 功能的 rmr 包,但它对于您想要实现的目标来说似乎太复杂了。
  • 是的,我想这将是所需系统如何工作的概要。
猜你喜欢
  • 1970-01-01
  • 2020-06-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-07-16
  • 2010-10-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多