【问题标题】:Is there a way to access/modify the contents of a Nextflow channel?有没有办法访问/修改 Nextflow 频道的内容?
【发布时间】:2021-11-18 05:38:16
【问题描述】:

我的工作流输出一个主目录,我从使用 DSL2 的进程中发出该主目录。我将此输出提供给 python 脚本,该脚本可以轻松地循环遍历子目录及其各自的文件,提取信息并将其编译成 .tsv

python 脚本获取的两个重要信息是子目录的名称以及子目录中哪个文件实际上很重要。

我想将我的进程输出(“根目录”)+ 子目录(来自文件)+ 重要文件名(来自文件)并使其成为新的生成器路径以提供给另一个进程。

我只是使用了一个糟糕的方法吗?有没有更好的方法来访问生成器?在我看到的文档中订阅,但我没有运气使用这个功能。提前谢谢你。

示例 .tsv 文件(第 1 列和第 3 列是我要附加到生成器的内容)

GCF_000005845.2 Escherichia coli str. K-12 substr. MG1655, complete genome      GCF_000005845.2_ASM584v2_genomic.fna
GCF_000008865.2 Escherichia coli O157:H7 str. Sakai DNA, complete genome        GCF_000008865.2_ASM886v2_genomic.fna

工作目录结构

├── c6
│   └── 6598d4838f61d0421f03216990465c
│       ├── ecoli
│       │   ├── README.md
│       │   └── ncbi_dataset
│       │       ├── data
│       │       │   ├── GCF_000005845.2
│       │       │   │   ├── GCF_000005845.2_ASM584v2_genomic.fna
│       │       │   │   ├── genomic.gff
│       │       │   │   ├── protein.faa
│       │       │   │   └── sequence_report.jsonl
│       │       │   ├── GCF_000008865.2
│       │       │   │   ├── GCF_000008865.2_ASM886v2_genomic.fna
│       │       │   │   ├── genomic.gff
│       │       │   │   ├── protein.faa
│       │       │   │   └── sequence_report.jsonl
│       │       │   ├── assembly_data_report.jsonl
│       │       │   └── dataset_catalog.json
│       │       └── fetch.txt

这是我的 nextflow 脚本(非常欢迎建设性的批评):

#!/usr/bin/env Nextflow

nextflow.enable.dsl=2

workflow {

  //ref_genome_ch = Channel.fromPath("$params.ref_genome")
  println([params.taxon, params.zipName, params.unzippedDir])
  DOWNLOAD_ZIP(params.taxon, params.zipName)
  UNZIP(DOWNLOAD_ZIP.out.zipFile)
  REHYDRATE(UNZIP.out.unzippedDir)
  COLLECT_NAMES(REHYDRATE.out.dataDir)


  // I want to get the dir name and file name out of
  // relations.txt
  //thing = Channel.from(  )
  //thing.view()
  //organism_genomes = REHYDRATE.out.dataDir.subscribe { println("$it/")}

}

process DOWNLOAD_ZIP {
  errorStrategy 'ignore'

  input:
  val taxonName
  val zipName

  output:
  path "${zipName}" , emit: zipFile

  script:
  def reference = params.reference
  """
  datasets download genome \\
     taxon '${taxonName}' \\
     --dehydrated \\
     --filename ${zipName} \\
     ${reference} \\
     --exclude-genomic-cds
  """

}


process UNZIP {
  input:
  path zipFile

  output:
  path "${zipFile.baseName}" , emit: unzippedDir

  script:
  """
  unzip $zipFile -d ${zipFile.baseName}
  """

}


process REHYDRATE {
  input:
  path unzippedDir

  output:
  path "$unzippedDir/ncbi_dataset/data" , emit: dataDir

  script:
  """
  datasets rehydrate \\
     --directory $unzippedDir
  """
}



process COLLECT_NAMES {
  publishDir params.results

  input:
  path dataDir

  output:
  path "relations.txt" , emit: org_names

  script:
  """
  python "$baseDir/bin/collect_org_names.py" $dataDir
  """

}



编辑:用户@Steve 推荐频道运营商。我还没有完全理解 groovy {thing -> stuff} 语法,但我尝试过这样做:

thing = REHYDRATE.out.dataDir.map{"$it/*"}
thing.view()

我明白了

/mnt/c/Users/mkozubov/Desktop/nextflow_tutorial/tRNA_stuff/work/d0/long_hash/ecoli/ncbi_dataset/data/*

printed... 但是当我将它输入到只有一个脚本的进程中时: println(input) 我收到一条错误消息,指出执行的命令为空,命令输出为(空)并且目标“*”为不是目录。

我的问题是,为什么 .map 运算符没有像在频道中输入“PATH/*”那样扩展 *?




Edit2:我觉得我几乎拥有了一些东西。我更改了 COLLECT_NAMES 脚本的输出以包含文件的路径。我现在想解析这个文件并将内容读入一个频道。为此我做到了

organism_genome_files = Channel.from()
  COLLECT_NAMES.out.org_names.map {
    new File(it.toString()).eachLine { line ->
      organism_genome_files << line.split('\t')[3] }
  }

如果我将organism_genome_files &lt;&lt; line.split('\t')[3] 替换为println line.split('\t')[3],我可以看到我想要的内容,但我似乎无法找到提取此信息的方法。

我也尝试了将organism_genome_files 作为列表,但似乎没有任何效果,我似乎无法从频道中提取信息并有效地对其进行变异。

.splitCSV() 方法似乎很有用,但我仍然不明白如何让一个通道作为另一个通道的输入 :(

【问题讨论】:

    标签: channel nextflow


    【解决方案1】:

    有没有办法访问/修改 Nextflow 频道的内容?

    您可以为此使用一个或多个transforming operators。例如,要获取 'relations.txt' 的目录名和文件名,可以使用:

    COLLECT_NAMES.out.org_names.map { tuple( it.parent, it.name ) }.view()
    

    另见:Check file attributes



    我的问题是为什么 .map 运算符没有将 * 扩展为输入 将“PATH/*”放到频道中会怎样?

    它只被告知返回一个字符串(实际上是一个 GString)。 Groovy 不会像你的 shell 那样自动扩展它。我认为您想要的是列出该目录内容的某种方式。为此,您可以使用listFiles() 方法:

    REHYDRATE.out.dataDir.map { tuple( it.listFiles() ) }.view()
    

    另请参阅:List directory content



    我更改了 COLLECT_NAMES 脚本的输出以包含文件的路径。我现在想解析这个文件并将内容读入一个频道。

    如果没有关于这些文件是什么、它们有多大、它们将如何使用以及返回类型需要是什么的更多详细信息,我真的只是在这里猜测。因此,我整理了一些可能有助于您入门的潜在解决方案:

    1. 这被实现为闭包并返回列表通道:
    def getOrganismGenomeFiles = { reader ->
        def values = []
        reader.splitEachLine('\t') { fields ->
            values.add( fields[3] )
        }
        return values
    }
    
    ch.map( getOrganismGenomeFiles ).view()
    
    1. 这会破坏行,但也会返回列表通道:
    ch.map { it.readLines().collect { it.split('\t')[3] } }.view()
    
    1. 这会读取文件内容,使用 splitCsv 运算符将它们拆分为记录并返回值通道:
    ch.map { it.text }.splitCsv(sep: '\t').map { it[3] }.view()
    
    • 注意:为了便于阅读,我缩短了输入通道名称。请将上述示例中的ch 替换为COLLECT_NAMES.out.org_names


    我的(也许不是那么有建设性的)批评实际上是关于工作流设计,而不是风格、布局等。我的偏好是并且将永远是避免使用一些 web get 命令,如 curl、wget,或者在这种情况下是 NCBI 数据集,在 Nextflow 过程中。当然,您可以以这种方式进行工作,但是当您以后决定与他人共享您的工作流程时,您最终会遇到问题。即使每个人都同意在下载文件上浪费额外资源是好的(他们不会这样做,但这些成本在计划中可能可以忽略不计......)你也不一定能保证你的进程登陆的机器或节点会甚至能够解析指定的 URL。有一些方法可以解决这些问题,但我的建议是让 Nextflow 本地化所需的文件。问题是如何。这当然取决于你实际上想要做什么......

    这些文件可从 NCBI FTP Site 获得,它们的 URL 可以添加到您的 configuration,可能类似于:

    params {
      genomes {
        'GCF_000005845.2_ASM584v2' {
          genomic_fna = 'ftp://ftp.ncbi.nlm.nih.gov/genomes/refseq/bacteria/Escherichia_coli/reference/GCF_000005845.2_ASM584v2/GCF_000005845.2_ASM584v2_genomic.fna.gz'
          genomic_gff = 'ftp://ftp.ncbi.nlm.nih.gov/genomes/refseq/bacteria/Escherichia_coli/reference/GCF_000005845.2_ASM584v2/GCF_000005845.2_ASM584v2_genomic.gff.gz'
          protein_faa = 'ftp://ftp.ncbi.nlm.nih.gov/genomes/refseq/bacteria/Escherichia_coli/reference/GCF_000005845.2_ASM584v2/GCF_000005845.2_ASM584v2_protein.faa.gz'
        }
        'GCF_000008865.2_ASM886v2' {
          genomic_fna = 'ftp://ftp.ncbi.nlm.nih.gov/genomes/refseq/bacteria/Escherichia_coli/reference/GCF_000008865.2_ASM886v2/GCF_000008865.2_ASM886v2_genomic.fna.gz'
          genomic_gff = 'ftp://ftp.ncbi.nlm.nih.gov/genomes/refseq/bacteria/Escherichia_coli/reference/GCF_000008865.2_ASM886v2/GCF_000008865.2_ASM886v2_genomic.gff.gz'
          protein_faa = 'ftp://ftp.ncbi.nlm.nih.gov/genomes/refseq/bacteria/Escherichia_coli/reference/GCF_000008865.2_ASM886v2/GCF_000008865.2_ASM886v2_protein.faa.gz'
        }
      }
    }
    

    然后要访问给定基因组的文件,请使用以下内容:

    genome = 'GCF_000005845.2_ASM584v2'
    
    genomic_fna = genomes[genome].genomic_fna
    genomic_gff = genomes[genome].genomic_gff
    protein_faa = genomes[genome].protein_faa
    

    【讨论】:

    • 感谢@Steve 的回答,运营商会很有用,但我还是有问题(我会编辑我的问题)。
    • 不用担心@MatthewKozubov,请参阅上面我编辑的答案。另一种方法是只更改您的 REHYDRATE 输出声明。如果您只想要子目录,可能类似于:path ""$unzippedDir/ncbi_dataset/data/*", type: 'dir' ?
    • 感觉就像我在尝试蛮力一些可以更容易完成的事情,我尝试了其他事情并将其添加为编辑。
    • @MatthewKozubov 我在上面添加了几个示例。也许他们是有帮助的。很难说需要什么。如果这些文件很小(例如,小于 1 MB),那么对它们进行 slurping 可能是可行的方法......
    • 谢谢,这些例子非常有用!我需要更熟悉 Java/Groovy 来定义我自己的函数(我喜欢那个例子)。还有啜饮是什么意思?当我在谷歌上啜饮时,谷歌并不是最有帮助的。
    猜你喜欢
    • 2019-02-17
    • 1970-01-01
    • 2020-12-28
    • 1970-01-01
    • 2021-06-16
    • 1970-01-01
    • 2020-02-09
    • 2017-01-24
    • 2023-03-16
    相关资源
    最近更新 更多