【问题标题】:Snakemake ignore failed path and redefine inputs for a common ruleSnakemake 忽略失败的路径并重新定义通用规则的输入
【发布时间】:2021-09-07 17:13:35
【问题描述】:

我目前正在编写一个看起来像这样的管道(最小示例的代码如下,输入文件只是名称在示例中的 SAMPLES 列表中的空白文件)。

我想要的是,如果样本在前两个步骤之一中失败(最小示例设置为使 sample1 在规则 two 上失败),请继续执行所有后续步骤,就像它没有一样在那里(这意味着它只会在sample2sample3 上执行规则gather_and_do_somethingsplit_final)。

我已经在使用 --keep-going 选项继续独立作业,但我无法定义通用规则的输入并使其忽略位于失败路径中的文件。

SAMPLES = ["sample1", "sample2", "sample3"]

rule all:
    input:
        expand("{sample}_final", sample=SAMPLES)

rule one:
    input:
        "{sample}"
    output:
        "{sample}_ruleOne"
    shell:
        "touch {output}"

rule two:
    input:
        rules.one.output
    output:
        "{sample}_ruleTwo"
    run:
        if input[0] != 'sample1_ruleOne':
            with open(output[0], 'w') as fh:
                fh.write(f'written {output[0]}')

rule gather_and_do_something:
    input:
        expand(rules.two.output, sample=SAMPLES)
    output:
        'merged'
    run:
        words = []
        for f in input:
            with open(f, 'r') as fh:
                words.append(next(fh))
        if len(input):
            with open(output[0], 'w') as fh:
                fh.write('\n'.join(words))

rule split_final:
    input:
        rules.gather_and_do_something.output
    output:
        '{sample}_final'
    shell:
        'touch {output}'

我尝试编写一些自定义函数用作输入,但这似乎不起作用...

def get_files(wildcards):
    import os
    return [f for f in expand(rules.two.output, sample=SAMPLES) if f in os.listdir(os.getcwd())]

rule gather_and_do_something:
    input:
        unpack(get_files)
    output:
        'merged'
    run:
        words = []
        for f in input:
            with open(f, 'r') as fh:
                words.append(next(fh))
        if len(input):
            with open(output[0], 'w') as fh:
                fh.write('\n'.join(words))

【问题讨论】:

    标签: python snakemake


    【解决方案1】:

    如果样本在前两个步骤中的一个中失败,请继续执行所有后续步骤,就像它不存在一样(这意味着它只会在 sample2 和 sample3 上执行规则gather_and_do_something 和 split_final)。

    在允许失败的规则中,我会放置一个 try/except 以受控方式捕获失败并为失败的样本编写一个虚拟文件。然后下游规则将识别虚拟文件并表现得好像样本不存在一样,例如通过编写另一个虚拟文件。

    我不知道让这些虚拟文件四处乱窜是否会让您烦恼...在我看来这不是一件坏事,因为它们表明样本以可接受的方式失败。

    这里有一些示例伪代码:

    rule some_can_fail:
        input:
            '{sample}.txt',
        output:
            '{sample}.two',
        run:
            try:
                ...
            except:
                # Write an empty file or a file with some message that indicate it
                # failed
                touch(output)
    
    
    rule gather:
        input:
            expand('{sample}.two', sample= SAMPLES),
        output:
            'merged.txt',
        run:
            for fin in input:
                # Ignore failed samples
                if fin is not empty:
                    # Read fin and append to output file
    
    
    rule split_final:
        input:
            merged= 'merged.txt',
            two= '{sample}.two',
        output:
            '{sample}_final',
        run:
            if input.two is empty:
                # As above, write an empty file or a file with a meanigful message
                touch(output)
            else:
                # Do something and write to output
    

    【讨论】:

      【解决方案2】:

      默认情况下,snakemake 的 DAG 是静态的,因此它会期望 all 规则(及其依赖项)请求的所有输入文件都存在。您可以通过定义检查点和相应的输入函数来绕过此要求,这些函数可以根据检查点规则的输出动态更改 DAG。

      这是一个更新的示例,它将rule two 转换为检查点,并在聚合步骤(gather_and_do_somethingall)期间使用输入函数来跳过失败的样本rule two。更新后的示例将空输出文件定义为失败 (os.stat(fn).st_size > 0),但您可以改用其他测试。

      import shlex
      import os
      
      SAMPLES = ["sample1", "sample2", "sample3"]
      
      def all_input(wldc):
          files = []
          for sample in SAMPLES:
              fn = checkpoints.two.get(sample=sample).output[0]
              if os.stat(fn).st_size > 0:
                  files.append('{sample}_final'.format(sample=sample))
          return files
      
      rule all:
          input:
              all_input
      
      rule one:
          input:
              "{sample}"
          output:
              "{sample}_ruleOne"
          shell:
              "touch {output}"
      
      checkpoint two:
          input:
              rules.one.output
          output:
              "{sample}_ruleTwo"
          run:
              with open(output[0], 'w') as fh:
                  if input[0] != 'sample1_ruleOne':
                      fh.write(f'written {output[0]}')
      
      def gather_input(wldc):
          files = []
          for sample in SAMPLES:
              fn = checkpoints.two.get(sample=sample).output[0]
              if os.stat(fn).st_size > 0:
                  files.append("{sample}_ruleTwo".format(sample=sample))
          return files
      
      rule gather_and_do_something:
          input:
              gather_files = gather_input,
          output:
              'merged'
          params:
              gather_files = lambda wldc, input: ' '.join(map(shlex.quote, input.gather_files)) # Handle samples/files with spaces
          shell:
              """
              cat {params.gather_files} > '{output}'
              """
      
      rule split_final:
          input:
              rules.gather_and_do_something.output
          output:
              '{sample}_final'
          shell:
              'touch {output}'
      

      通过这些更改,rule gather_and_do_something 将仅读取来自传递样本(sample2sample3)的 rule two 输出,而 rule split_final 将仅运行样本 2 和 3 的作业。

      【讨论】:

        猜你喜欢
        • 2023-03-13
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-01-13
        • 2013-03-03
        • 1970-01-01
        • 2020-03-09
        相关资源
        最近更新 更多