Argo Workflowsで制御されているパイプラインをrunnでテストする

Argo Workflows で組んだバッチパイプラインのテスト、どうしてますか。

自分はrunnで再現してます。 runnはCLI ツールとしても使えるけど、Go ライブラリとしてテストに組み込めるので、runn.Load() でランブックを読み込んで go test の中からPod内で動かしているコマンドを実行できます。

Argo Workflowsのテストの何が難しいか

たとえばArgo のパイプラインは CronWorkflow → WorkflowTemplate → Pod という多段構成になります。

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
spec:
  schedule: "*/30 * * * *"
  workflowSpec:
    templates:
      - name: dispatch-and-worker
        steps:
          - - name: dispatch
              templateRef:
                name: dispatch-jobs
                template: dispatch-jobs
          - - name: process
              templateRef:
                name: job-worker
                template: job-worker
              arguments:
                parameters:
                  - name: job-id
                    value: "{{item.job_id}}"
              withParam: "{{steps.dispatch.outputs.parameters.jobs}}"

dispatcher が JSON を書き出して outputs.parameters で次ステップに渡します。worker は withParam で fan-out されるといった感じです。

これをローカルで再現しようとすると

  • {{steps.dispatch.outputs.parameters.jobs}} は Argoランタイムが解釈するので手元で動かない
  • Pod 間の依存関係の制御もArgo任せ

ただone shotなタスクを正しい順序・パラメータで実行できれば、Argo なしでパイプライン自体の動作は再現できます。

runn でどう再現するか

ArgoのWorkflowTemplateとrunnのランブックを対応させ、中間ファイルなどを読むためのグルーコードを準備します。

Argo 側:

container:
  command: ["/app"]
  args: ["dispatcher", "--job-type={{inputs.parameters.job-type}}"]
outputs:
  parameters:
    - name: jobs
      valueFrom:
        path: /tmp/jobs.json

runn 側:

desc: Dispatch jobs
steps:
  dispatch:
    exec:
      command: |
        ${E2E_BINARY} dispatcher \
          --job-type=some_job_type \
          --output-file=${E2E_TMPDIR}/dispatch_output.json
    test: current.exit_code == 0

fan-out される worker 側:

desc: Process a single job
vars:
  job_id: ""
steps:
  process:
    exec:
      command: |
        ${E2E_BINARY} worker \
          --job-id={{ vars.job_id }}
    test: current.exit_code == 0

{{ vars.job_id }} が runn のテンプレート変数です。Go 側から runn.Var("job_id", id) で注入します。

Go 側のコード

func TestPipelineFull(t *testing.T) {
    // Phase 1: dispatcher → JSON 出力
    runRunbook(t, "runbooks/_include/dispatch.yml")

    // outputs.parameters に相当する中間 JSON を読み込み・検証
    jobIDs := loadDispatchJobIDs(t, tmpDir, "dispatch_output.json")
    verifyDispatchOutput(t, tmpDir, "dispatch_output.json", wantJobIDs)

    // Argo の withParam に相当: fan-out
    for _, id := range jobIDs {
        runRunbook(t, "runbooks/_include/worker.yml",
            runn.Var("job_id", id))
    }

    // 各ジョブの最終ステータスを検証
    for _, id := range jobIDs {
        verifyJobStatus(t, testEnv, id, "success")
    }
}

func runRunbook(t *testing.T, path string, opts ...runn.Option) {
    t.Helper()
    o, err := runn.Load(path, append([]runn.Option{runn.T(t)}, opts...)...)
    if err != nil {
        t.Fatalf("load runbook %s: %v", path, err)
    }
    if err := o.RunN(context.Background()); err != nil {
        t.Fatalf("run runbook %s: %v", path, err)
    }
}

実行方法

普通に go test で実行できます。

go test -v -timeout 5m -count=1 ./e2e/...

まとめ

Go 側のグルーコードは一定書く必要がありますが、個々のステップの実行定義は runnのYAMLランブックで宣言的に管理できるので、Argo の WorkflowTemplate と対応関係が分かりやすく保てるのが良いです。