编写工作流
工作流结构
yaml
name: my-workflow # 可选:默认为文件名
description: "Process daily data"
schedule: "0 2 * * *" # 可选:cron 调度
maxActiveRuns: 1 # 可选:并发限制
params: # 运行时参数
- DATE: "`date +%Y-%m-%d`"
env: # 环境变量
- DATA_DIR: /tmp/data
steps: # 工作流步骤
- name: process
command: ./process.sh ${DATE}
基础配置
使用基础配置在所有 DAG 之间共享通用设置:
yaml
# ~/.config/dagu/base.yaml
env:
- LOG_LEVEL: info
- AWS_REGION: us-east-1
smtp:
host: smtp.company.com
port: "587"
username: ${SMTP_USER}
password: ${SMTP_PASS}
errorMail:
from: alerts@company.com
to: oncall@company.com
attachLogs: true
histRetentionDays: 30 # Dagu 删除超过此天数的历史记录和日志
maxActiveRuns: 5
DAG 自动继承这些设置:
yaml
# my-workflow.yaml
name: data-pipeline
# 继承所有基础设置
# 可以覆盖特定值:
env:
- LOG_LEVEL: debug # 覆盖
- CUSTOM_VAR: value # 添加
steps:
- name: process
command: ./process.sh
配置优先级:系统默认值 → 基础配置 → DAG 配置
指南章节
完整示例
yaml
name: data-processor
schedule: "0 2 * * *"
params:
- DATE: "`date +%Y-%m-%d`"
env:
- DATA_DIR: /tmp/data/${DATE}
steps:
- name: download
command: aws s3 cp s3://bucket/${DATE}.csv ${DATA_DIR}/
retryPolicy:
limit: 3
intervalSec: 60
- name: validate
command: python validate.py ${DATA_DIR}/${DATE}.csv
continueOn:
failure: false
- name: process-types
parallel: [users, orders, products]
command: python process.py --type=$ITEM --date=${DATE}
output: RESULT_${ITEM}
- name: report
command: python report.py --date=${DATE}
handlerOn:
failure:
command: ./notify_failure.sh "${DATE}"
常见模式
顺序流水线
yaml
steps:
- name: extract
command: ./extract.sh
- name: transform
command: ./transform.sh
- name: load
command: ./load.sh
条件执行
yaml
steps:
- name: test
command: npm test
- name: deploy
command: ./deploy.sh
preconditions:
- condition: "${BRANCH}"
expected: "main"
并行处理
yaml
steps:
- name: prepare
command: ./prepare.sh
- name: process-files
parallel: [file1, file2, file3]
run: process-file
params: "FILE=${ITEM}"
---
# 用于处理每个文件的子工作流
# 这可以在同一个文件中用 `---` 分隔,或在单独的文件中
name: process-file
steps:
- name: process
command: ./process.sh --file ${FILE}
上面的示例为每个文件并行运行 process-file
。