Skip to content

编写工作流

工作流结构

yaml
name: my-workflow          # 可选:默认为文件名
description: "Process daily data"
schedule: "0 2 * * *"      # 可选:cron 调度
maxActiveRuns: 1           # 可选:并发限制

params:                    # 运行时参数
  - DATE: "`date +%Y-%m-%d`"

env:                       # 环境变量
  - DATA_DIR: /tmp/data

steps:                     # 工作流步骤
  - name: process
    command: ./process.sh ${DATE}

基础配置

使用基础配置在所有 DAG 之间共享通用设置:

yaml
# ~/.config/dagu/base.yaml
env:
  - LOG_LEVEL: info
  - AWS_REGION: us-east-1

smtp:
  host: smtp.company.com
  port: "587"
  username: ${SMTP_USER}
  password: ${SMTP_PASS}

errorMail:
  from: alerts@company.com
  to: oncall@company.com
  attachLogs: true

histRetentionDays: 30 # Dagu 删除超过此天数的历史记录和日志
maxActiveRuns: 5

DAG 自动继承这些设置:

yaml
# my-workflow.yaml
name: data-pipeline

# 继承所有基础设置
# 可以覆盖特定值:
env:
  - LOG_LEVEL: debug  # 覆盖
  - CUSTOM_VAR: value # 添加

steps:
  - name: process
    command: ./process.sh

配置优先级:系统默认值 → 基础配置 → DAG 配置

指南章节

  1. 基础 - 步骤、命令、依赖关系
  2. 控制流 - 并行执行、条件、循环
  3. 数据和变量 - 参数、输出、数据传递
  4. 错误处理 - 重试、失败、通知
  5. 高级模式 - 组合、优化、最佳实践

完整示例

yaml
name: data-processor
schedule: "0 2 * * *"

params:
  - DATE: "`date +%Y-%m-%d`"

env:
  - DATA_DIR: /tmp/data/${DATE}

steps:
  - name: download
    command: aws s3 cp s3://bucket/${DATE}.csv ${DATA_DIR}/
    retryPolicy:
      limit: 3
      intervalSec: 60

  - name: validate
    command: python validate.py ${DATA_DIR}/${DATE}.csv
    continueOn:
      failure: false

  - name: process-types
    parallel: [users, orders, products]
    command: python process.py --type=$ITEM --date=${DATE}
    output: RESULT_${ITEM}

  - name: report
    command: python report.py --date=${DATE}

handlerOn:
  failure:
    command: ./notify_failure.sh "${DATE}"

常见模式

顺序流水线

yaml
steps:
  - name: extract
    command: ./extract.sh
    
  - name: transform
    command: ./transform.sh
    
  - name: load
    command: ./load.sh

条件执行

yaml
steps:
  - name: test
    command: npm test
    
  - name: deploy
    command: ./deploy.sh
    preconditions:
      - condition: "${BRANCH}"
        expected: "main"

并行处理

yaml
steps:
  - name: prepare
    command: ./prepare.sh
    
  - name: process-files
    parallel: [file1, file2, file3]
    run: process-file
    params: "FILE=${ITEM}"

---
# 用于处理每个文件的子工作流
# 这可以在同一个文件中用 `---` 分隔,或在单独的文件中
name: process-file
steps:
  - name: process
    command: ./process.sh --file ${FILE}

上面的示例为每个文件并行运行 process-file

下一步

本站为 Dagu 中文文档,由 Dagu.dev 基于 Dagu 原版英文文档翻译维护。