通过Achelous 平台运行 GATK-Spark任务
二代测序技术相关计算的众多场景中,目前对变异检测的依然是计算资源的消耗大项。对于学术研究方向的用户而言,GATK best practice 作为最常用的分析流程,在变异检出效果方面,有着非常良好的效果和业界口碑;但其运算速度难以让人满意。
虽然目前已经有了许多加速方案(例如Sentieon 的算法优化方案、英伟达公司的 parabricks的硬件加速方案等)。broad institute 在GATK4开发中,增加了Spark模式进行任务运行。对于一般用户而言,也可以在计算资源充足的前提下,尝试采用Spark版本的GATK4 对计算进行加速。
如何运行 GATK4 Spark 任务
GATK4 Spark 模式与基础版本GATK4的不同之处在于需要将数据上传至hdfs文件系统。Achelous系统中,用户可以通过极道科技提供的 datamover
工具,轻松实现数据从分布式文件存储到hdfs文件系统的拷贝。
用户的Spark程序计算的时候,需要依赖资源管理器构建多机集群,通过数据并行性提升性能。启动Spark程序需要指定资源管理器的地址。常用的资源管理器有(YARN、Mesos、K8s)等,极道的SRM也是资源管理器。用户通过在WDL程序的Task的runtime属性中添加属性来实现Spark运行环境的自动初始化:
- 属性
usextaosparkscheduler
设为true
:Achelous在启动Spark任务的时候,会将环境变量${SPARK_MASTER_URI}
的值设置成Spark程序可以接受的资源管理器的URL。用户可以通过引用环境变量${SPARK_MASTER_URI}
来获取资源管理器地址。 SparkExecutorURI
设置为用户自己的Spark的JAR包的文件路径,这个包需要存储到HDFS中。 通过设置上述两个runtime属性,用户的command中的Spark程序可以在Achelous平台中直接运行,无需部署和系统管理员介入,非常简洁和方便。
对应的 WDL 脚本代码如下:
task dfs_to_hdfs {
File inputFile
File outputFile
String sparkURI
command {
datamover --copy-method copyToHDFS --source-path ${inputFile} --target-path ${outputFile}
}
output {
File hdfsFile = outputFile
}
runtime {
docker: "datamover"
cpu: "2"
memory: "10G"
usextaosparkscheduler: "true"
SparkExecutorURI: "${sparkURI}"
ldapauth: true
volumes: {
"/var/spark/storage" : "/var/spark/storage"
}
}
}
task read_pipeline_spark {
File reference
File inputFile
File outputBamFile
File siteFile1
File siteFile2
File siteFile3
String? sparkMaster
String SparkExecutorURI
command <<<
/opt/gatk4/gatk-launch ReadsPipelineSpark \
--input ~{inputFile} \
-O ~{outputBamFile} \
--reference ~{reference} \
--knownSites ~{siteFile1} \
--knownSites ~{siteFile2} \
--knownSites ~{siteFile3} \
-- \
--sparkRunner SPARK \
--sparkMaster ${SPARK_MASTER_URI} \
--num-executors 8 --executor-cores 2 \
--executor-memory 90g \
--driver-memory 30g \
--conf 'spark.executor.extraJavaOptions=-XX:hashCode=0' \
--conf 'spark.driver.extraJavaOptions=-XX:hashCode=0' \
--conf 'spark.cores.max=16' \
--conf 'spark.local.dir=/var/spark/storage'
>>>
output {
File bam = outputBamFile
}
runtime {
docker: "xt-gatk4:4.beta.4"
cpu: "2"
memory: "20G"
SparkExecutorURI: "${SparkExecutorURI}"
usextaosparkscheduler: "true"
ldapauth: true
volumes: {
"/var/spark/storage" : "/var/spark/storage"
}
}
}
Spark 版本的GATK4 提供集成式执行命令 ReadsPipelineSpark
可以直接由未比对测序数据,通过完整的GATK best practice流程,生成vcf 文件。详情可参见其官方说明