【问题标题】:Multiple outputs to single list input - merging BAM files in Nextflow多个输出到单个列表输入 - 在 Nextflow 中合并 BAM 文件
【发布时间】:2021-06-03 03:37:42
【问题描述】:

我正在尝试将通过一次执行多个对齐生成的 x 个 bam 文件(在批量 y 个 fastq 文件上)合并到一个单独的 bam 文件中下一个流程。

到目前为止,在执行对齐和排序/索引生成的 bam 文件时,我有以下内容:

//Run minimap2 on concatenated fastqs
process miniMap2Bam {
        publishDir "$params.bamDir"
        errorStrategy 'retry'
        cache 'deep'
        maxRetries 3
        maxForks 10
        memory { 16.GB * task.attempt }

        input:
        val dirString from dirStr
        val runString from stringRun
        each file(batchFastq) from fastqBatch.flatMap()

        output:
        val runString into stringRun1
        file("${batchFastq}.bam") into bamFiles
        val dirString into dirStrSam

        script:
        """
        minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
        samtools index ${batchFastq}.bam
        """
}

其中${batchFastq}.bam 是一个bam 文件,其中包含一批y 个fastq 文件。

此管道完成得很好,但是,当尝试在另一个进程 (samToolsMerge) 中对这些 bam 文件执行 samtools merge 时,该进程在每次运行对齐时运行(在本例中为 4),而不是一次收集的所有 bam 文件:

//Run samtools merge
process samToolsMerge {
        echo true
        publishDir "$dirString/aligned_minimap/", mode: 'copy', overwrite: 'false'
        cache 'deep'
        errorStrategy 'retry'
        maxRetries 3
        maxForks 10
        memory { 14.GB * task.attempt }

        input:
        val runString from stringRun1
        file bamFile from bamFiles.collect()
        val dirString from dirStrSam

        output:
        file("**")

        script:
        """
        samtools merge ${runString}.bam ${bamFile} 
        """
}

输出为:

executor >  lsf (9)
[49/182ec0] process > catFastqs (1)     [100%] 1 of 1 ✔
[-        ] process > nanoPlotSummary   -
[0e/609a7a] process > miniMap2Bam (1)   [100%] 4 of 4 ✔
[42/72469d] process > samToolsMerge (2) [100%] 4 of 4 ✔




Completed at: 04-Mar-2021 14:54:21
Duration    : 5m 41s
CPU hours   : 0.2
Succeeded   : 9

如何仅从 miniMap2Bam 获取生成的 bam 文件并通过 samToolsMerge 运行它们一次,而不是多次运行该进程?

提前致谢!

编辑: 感谢下面 cmets 中的 Pallie,问题是将之前进程中的 runString 和 dirString 值输入 miniMap2Bam 和 samToolsMerge,导致每次传递值时该进程都会重复。

解决方案就像从 miniMap2Bam 中删除 val 一样简单(如下):

//Run minimap2 on concatenated fastqs
process miniMap2Bam {
        errorStrategy 'retry'
        cache 'deep'
        maxRetries 3
        maxForks 10
        memory { 16.GB * task.attempt }

        input:
        each file(batchFastq) from fastqBatch.flatMap()

        output:
        file("${batchFastq}.bam") into bamFiles

        script:
        """
        minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
        samtools index ${batchFastq}.bam
        """
}

【问题讨论】:

  • 什么是 runstring 和 dirstring,你想用它们来完成什么?感觉就像您获得的行为来自将 val 输出到价值通道中。我认为您正在尝试强制执行某个流程,但没有使用 nextflow 中可用的正确工具。
  • 感谢您的回复。我相信您是对的,因为这些值可能是导致这种行为发生的原因。 runstring 是存放 fastq 文件的目录的父目录,dirstring 是存放 fastq 文件的父目录。这些字符串是在最初读取成批的 fastq 文件时创建的,以便成功地将文件发布到正确的目录 - 我会尝试想出一个解决方法。
  • 非常感谢@Pallie!现在,只需通过重新路由流程周围的值来解决此问题,因此它不再用于合并流程 - 我将编辑问题以提供解决方案,尽管它非常简单。我想奖励你,但我不确定我是否可以为 cmets 提供答案投票/选择?

标签: input merge samtools nextflow bam


【解决方案1】:

最简单的修复方法可能是停止通过通道传递静态目录字符串和运行字符串:

// Instead of a hardcoded path use a parameter you passed via CLI like you did with bamDir
dirString = file("/path/to/fastqs/")
runString = file("/path/to/fastqs/").getParent()
fastqBatch = Channel.from("/path/to/fastqs/")

//Run minimap2 on concatenated fastqs
process miniMap2Bam {
        publishDir "$params.bamDir"
        errorStrategy 'retry'
        cache 'deep'
        maxRetries 3
        maxForks 10
        memory { 16.GB * task.attempt }

        input:
        each file(batchFastq) from fastqBatch.flatMap()

        output:
        file("${batchFastq}.bam") into bamFiles

        script:
        """
        minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
        samtools index ${batchFastq}.bam
        """
}

//Run samtools merge
process samToolsMerge {
        echo true
        publishDir "$dirString/aligned_minimap/", mode: 'copy', overwrite: 'false'
        cache 'deep'
        errorStrategy 'retry'
        maxRetries 3
        maxForks 10
        memory { 14.GB * task.attempt }

        input:
        file bamFile from bamFiles.collect()

        output:
        file("**")

        script:
        """
        samtools merge ${runString}.bam ${bamFile} 
        """

【讨论】:

  • 嗨 agan Pallie,我将选择这个作为我的问题的答案,因为这个问题现在已经解决,它准确地描述了我需要在我的管道中实现的内容。谢谢!
猜你喜欢
  • 2022-11-02
  • 1970-01-01
  • 2021-08-07
  • 1970-01-01
  • 1970-01-01
  • 2020-09-22
  • 1970-01-01
  • 2019-01-20
  • 1970-01-01
相关资源
最近更新 更多