Achelous WDL引擎支持跨框架计算
WDL运行时(runtime)
WDL标准定义了基本的语法结构,允许不同的执行引擎实现自己的运行时特性。Achelous在严格遵守语言开放标准的前提下,通过runtime特性实现了很多高级特性,极大简化了普通用户使用高级计算工具(MPI、Tensorflow、PyTorch等)分布式处理数据的操作。
从单机到分布式
比较复杂的科学计算或者符号计算通常采用MPI、Tensorflow或PyTorch等计算框架。其中MPI程序需要在多个物理机器启动,程序之间复杂交互,大量计算获得结果。 Tensorfolow或者PyTorch既可以单机运行,也支持分布式运行。分布式运行Tensorflow需要在不同物理机器启动多个容器,分配正确的角色(PS或者Worker)和网络端口,建立集群进行计算。非计算机背景用户很难在没有专家协助下运行一个分布式的MPI程序或者Tensorflow程序。Achelous的WDL引擎通过引入partisaner
运行时,帮助用户完全屏蔽了资源分配、集群的部署和程序的启动细节,用户可以像运行一个单机普通程序一样,定义一个task
运行分布式的Tensorflow或者MPI。Achelous称这种方式为所见即所得。
下面通过一个实例演示Achelous的WDL如何在一个简单的workflow
中混合使用Spark、MPI和Tensorflow(都以分布式方式运行)。
一个混合Spark、HPC和Tensorflow的示例workflow
为了便于用户理解,这个例子是一个简单的演示,本身没有复杂的逻辑和应用背景。我们在例子中定义了三个task
,分别运行Spark、MPI和Tensorflow,然后用组合这些task
成为一个workflow
。用户可以通过这个链接下载完整流程。
步骤一: 定义Spark task
迁移数据
下面的task
定义使用极道提供的datamover工具镜像从存储向HDFS并行拷贝数据:
task PrepareData {
File src_path
File dst_path
String sparkURI
command {
datamover --copy-method copyLocal --source-path ${src_path} --target-path ${dst_path}
}
output {
String done = "yes"
}
runtime {
docker: "datamover"
cpu: "1"
memory: "2G"
usextaosparkscheduler: "true"
SparkExecutorURI: "${sparkURI}"
ldapauth: "true"
}
}
在相关页面中已经介绍了如何运行Spark程序,此处不再详述。
步骤二: 定义一个MPI task
下面的task
启动一个MPI程序,并通过参数指定这个程序分布式运行时启动多少个进程。
task MPITask {
File hostPath
Int procs
command {
/home/mpi/mpich-install/run/calculator-PI steps 50000
}
runtime {
docker: "mpich-abc"
cpu: "1"
memory: "1G"
volumes: {
"/mpiwork" : hostPath
}
runner: "partisaner"
runneroptions: {
"cluster": "mpich",
"workdir": "/mpiwork",
"procs": procs
}
usextaoscheduler: "true"
ldapauth: "true"
}
}
在这个task
的定义中,有几个关键元素:
command
: 描述MPI程序的命令行及其参数,与用户通过mpiexec启动程序时指定的命令行和参数完全一致。runtime
:其中关键字docker
指明封装了MPI程序及其依赖库的Docker镜像。runner
关键字指明“partisaner”,告诉Achelous的WDL引擎通过工具partisaner
启动这个程序,runneroptions
中指定了MPI程序启动的进程数和MPI框架类型(mpich)。usextaoscheduler
指定使用Achelous内置的调度引擎分配资源。
用户在后续call
这个MPI的task
的时候,Achelous的WDL引擎会自动生成一系列task
去根据runtime
选项分配资源,启动一组容器,运行command
中指定的命令。所有这些由Achelous自动完成,无需用户干预。
步骤三: 定义一个Tensorflow task
与定义MPI的task
类似,定义如下的TensorflowTask,在runtime
中通过“partisaner”的“runner”和runneroptions
定义Tensorflow集群的描述。
task TensorflowTrainTask {
File hostPath
File debugLogDir
command {
export TRAIN_STEPS=50000
export TRAIN_DATA_DIR=/tfwork/mnist
sh /tfwork/bin/run.sh
}
runtime {
docker: "tensorflow-abc"
cpu: "1"
memory: "1G"
volumes: {
"/tfwork" : hostPath
}
runner: "partisaner"
runneroptions: {
"cluster": "tensorflow",
"logdir": "/tfwork/logs",
"workdir": "/tfwork",
"loginterval": 60,
"abortonfail": true,
"roles": [{
"name": "ps",
"procs": 1,
"cpu": 2,
"memory": 1024,
"server": true
},
{
"name": "worker",
"procs": 2,
"cpu": 1,
"memory": 1024
}
]
}
usextaoscheduler: "true"
ldapauth: "true"
}
}
与MPI集群相比,Tensorflow的拓扑结构更复杂,需要分为“ps”和“worker”两种角色,分别制定资源(GPU、Memory或者GPU)。一旦定义好runtime
属性,partisaner运行时将帮助用户建立Tensorflow集群,启动command
中的命令行。
步骤四: 在workflow
中混合调用上述task
定义好task
,用户可以像通常一样构造自己的workflow
。例子如下:
workflow compute {
Array[File] datas
File mnistPath
File logDir
File tfHostPath
File mpiHostPath
Int mpiProcs
String sparkURI
call CleanEnv as CleanWorkdir {
input: cleanDir=logDir
}
scatter(file in datas) {
call PrepareData {
input: src_path=file,dst_path=mnistPath,sparkURI=sparkURI
}
}
call MPITask after CleanWorkdir {
input: hostPath = mpiHostPath, procs = mpiProcs
}
call TensorflowTrainTask after MPITask {
input: debugLogDir = logDir, hostPath = tfHostPath
}
}
完整例子,可以通过下载页面进行下载。
上述workflow
运行时,特定task
运行时,Achelous会动态构建Spark集群、MPI集群和Tensorflow集群。task
运行结束时,对应的集群销毁,资源归还到资源池。整个过程完全动态,无需预先部署MPI、Spark或者Tensorflow集群,也不用写任何一行部署代码。