【发布时间】:2021-06-11 09:26:15
【问题描述】:
我有一个 python 脚本,它需要一堆 fasta 和 gff 文件,并根据类似的 COG ID 将序列收集到主 COG 目录中的各个目录中。 COG 的数量是动态的,为此我使用了 Snakemake 中的检查点选项。
规则如下所示:
checkpoint get_COG:
input:
rules.AMR_meta.output
output:
check=directory(os.path.join("COG_data"))
threads:
config['COG']['threads']
log:
os.path.join(RESULTS_DIR, "logs/COG_directory_setup.log")
message:
"Running python script to set up directory structure for GeneForest"
run:
import glob
import pandas as pd
import os
import shutil
import logging
from Bio import SeqIO
import argparse
from io import StringIO
import numpy as np
from multiprocessing import Pool
from scripts.utils import ParseGFF, GetAllCOG, CreateCOGDirs, GetSequence, GetCoverage, ProcessCOG, GetCoverageSums
meta_file=pd.read_csv(input[0],sep=',')
# List all COGs, create dirs
cog_set=GetAllCOG(meta_file)
CreateCOGDirs(cog_set)
# Iterate over all COGs to gather the sequences
print('Creating gene catalogue...')
with Pool(threads) as p:
p.map(ProcessCOG, [[cog, meta_file] for cog in list(cog_set)])
此规则的输出会创建以下文件:
COG_data/COGXXXX/COGXXXX_raw.fasta, COG_data/COGXXXX/COGXXXX_coverage.csv
我有后续规则,我想从检查点规则中获取 fasta 输出并创建一些多序列比对和树。它们如下:
rule mafft:
input:
os.path.join("COG_data/{i}/{i}_raw.fasta")
output:
os.path.join("COG_data/{i}/{i}_aln.fasta")
conda:
os.path.join("envs/mafft.yaml")
threads:
config['MAFFT']['threads']
log:
os.path.join(RESULTS_DIR, "logs/{i}.mafft.log")
message:
"Getting multiple sequence alignment for each COG"
shell:
"(date && mafft --thread {threads} {input} > {output} && date) &> {log}"
rule trimal:
input:
os.path.join("COG_data/{i}/{i}_aln.fasta")
output:
os.path.join("COG_data/{i}/{i}_trim.fasta")
conda:
os.path.join("envs/trimal.yaml")
log:
os.path.join(RESULTS_DIR, "logs/{i}.trimal.log")
message:
"Getting trimmed alignment sequence for each COG"
shell:
"(date && trimal -in {input} -out {output} -automated1 && date) &> {log}"
rule iqtree:
input:
os.path.join("COG_data/{i}/{i}_trim.fasta")
output:
os.path.join("COG_data/{i}/{i}_trim.fasta.treefile")
conda:
os.path.join("envs/iqtree.yaml")
log:
os.path.join(RESULTS_DIR, "logs/{i}.iqtree.log")
message:
"Getting trees for each COG"
shell:
"(date && iqtree -s {input} -m MFP && date) &> {log}"
def COG_trees(wildcards):
checkpoint_output= checkpoints.get_COG.get(**wildcards).output.check
return expand(os.path.join("COG_data/{i}/{i}_trim.fasta.treefile"),
i=glob_wildcards(os.path.join(checkpoint_output, "{i}_trim.fasta.treefile")).i)
rule trees:
input:
COG_trees
output:
os.path.join(RESULTS_DIR, "COG_trees.done")
log:
os.path.join(RESULTS_DIR, "logs/geneforest_is_ready.log")
message:
"Creates the COG trees via checkpoints"
shell:
"(date && touch {output} && date) &> {log}"
虽然我得到了原始的 COG_data/COGXXXX/COGXXXX_raw.fasta 文件,但中间规则没有运行。其余的运行直接跳转到规则树并给我COG_trees.done 输出。
有没有办法修复 deg COG_trees 函数以获取运行中间规则?
感谢您的帮助!
【问题讨论】:
标签: aggregate-functions wildcard snakemake