编写工作流
工作流结构
yaml
description: "处理每日数据"
schedule: "0 2 * * *" # 可选:cron 调度
maxActiveRuns: 1 # 可选:并发限制
params: # 运行时参数
- DATE: "`date +%Y-%m-%d`"
env: # 环境变量
- DATA_DIR: /tmp/data
steps: # 工作流步骤
- echo "处理日期 ${DATE}"基础配置
使用基础配置在所有 DAG 间共享通用设置:
yaml
# ~/.config/dagu/base.yaml
env:
- LOG_LEVEL: info
- AWS_REGION: us-east-1
smtp:
host: smtp.company.com
port: "587"
username: ${SMTP_USER}
password: ${SMTP_PASS}
errorMail:
from: alerts@company.com
to: oncall@company.com
attachLogs: true
histRetentionDays: 30 # Dagu 删除超过此时间的工作流历史和日志
maxActiveRuns: 5DAG 自动继承这些设置:
yaml
# my-workflow.yaml
# 继承所有基础设置
# 可以覆盖特定值:
env:
- LOG_LEVEL: debug # 覆盖
- CUSTOM_VAR: value # 添加
steps:
- echo "处理中"配置优先级:系统默认值 → 基础配置 → DAG 配置
指南章节
- Basics - Steps, commands, dependencies
- Container - Run workflows in Docker containers
- Control Flow - Parallel execution, conditions, loops
- Data & Variables - Parameters, outputs, data passing
- Error Handling - Retries, failures, notifications
- Lifecycle Handlers - Cleanup, notifications, post-run tasks
- Patterns - Composition, optimization, best practices
完整示例
yaml
schedule: "0 2 * * *"
params:
- DATE: "`date +%Y-%m-%d`"
env:
- DATA_DIR: /tmp/data/${DATE}
steps:
- command: aws s3 cp s3://bucket/${DATE}.csv ${DATA_DIR}/
retryPolicy:
limit: 3
intervalSec: 60
- command: python validate.py ${DATA_DIR}/${DATE}.csv
continueOn:
failure: false
- parallel: [users, orders, products]
command: python process.py --type=$ITEM --date=${DATE}
output: RESULT_$ITEM
- python report.py --date=${DATE}
handlerOn:
failure:
command: echo "通知 ${DATE} 失败"常见模式
顺序管道
yaml
steps:
- echo "提取数据"
- echo "转换数据"
- echo "加载数据"并行处理
yaml
steps:
- parallel: [file1, file2, file3]
run: process-file
params: "FILE=${ITEM}"
---
# 处理每个文件的子工作流
# 可以在同一文件中用 `---` 分隔或在单独文件中
name: process-file
steps:
- echo "处理" --file ${FILE}