Argo Workflows で組んだバッチパイプラインのテスト、どうしてますか。
自分はrunnで再現してます。
runnはCLI ツールとしても使えるけど、Go ライブラリとしてテストに組み込めるので、runn.Load() でランブックを読み込んで go test の中からPod内で動かしているコマンドを実行できます。
Argo Workflowsのテストの何が難しいか
たとえばArgo のパイプラインは CronWorkflow → WorkflowTemplate → Pod という多段構成になります。
apiVersion: argoproj.io/v1alpha1 kind: CronWorkflow spec: schedule: "*/30 * * * *" workflowSpec: templates: - name: dispatch-and-worker steps: - - name: dispatch templateRef: name: dispatch-jobs template: dispatch-jobs - - name: process templateRef: name: job-worker template: job-worker arguments: parameters: - name: job-id value: "{{item.job_id}}" withParam: "{{steps.dispatch.outputs.parameters.jobs}}"
dispatcher が JSON を書き出して outputs.parameters で次ステップに渡します。worker は withParam で fan-out されるといった感じです。
これをローカルで再現しようとすると
{{steps.dispatch.outputs.parameters.jobs}}は Argoランタイムが解釈するので手元で動かない- Pod 間の依存関係の制御もArgo任せ
ただone shotなタスクを正しい順序・パラメータで実行できれば、Argo なしでパイプライン自体の動作は再現できます。
runn でどう再現するか
ArgoのWorkflowTemplateとrunnのランブックを対応させ、中間ファイルなどを読むためのグルーコードを準備します。
Argo 側:
container: command: ["/app"] args: ["dispatcher", "--job-type={{inputs.parameters.job-type}}"] outputs: parameters: - name: jobs valueFrom: path: /tmp/jobs.json
runn 側:
desc: Dispatch jobs steps: dispatch: exec: command: | ${E2E_BINARY} dispatcher \ --job-type=some_job_type \ --output-file=${E2E_TMPDIR}/dispatch_output.json test: current.exit_code == 0
fan-out される worker 側:
desc: Process a single job vars: job_id: "" steps: process: exec: command: | ${E2E_BINARY} worker \ --job-id={{ vars.job_id }} test: current.exit_code == 0
{{ vars.job_id }} が runn のテンプレート変数です。Go 側から runn.Var("job_id", id) で注入します。
Go 側のコード
func TestPipelineFull(t *testing.T) { // Phase 1: dispatcher → JSON 出力 runRunbook(t, "runbooks/_include/dispatch.yml") // outputs.parameters に相当する中間 JSON を読み込み・検証 jobIDs := loadDispatchJobIDs(t, tmpDir, "dispatch_output.json") verifyDispatchOutput(t, tmpDir, "dispatch_output.json", wantJobIDs) // Argo の withParam に相当: fan-out for _, id := range jobIDs { runRunbook(t, "runbooks/_include/worker.yml", runn.Var("job_id", id)) } // 各ジョブの最終ステータスを検証 for _, id := range jobIDs { verifyJobStatus(t, testEnv, id, "success") } } func runRunbook(t *testing.T, path string, opts ...runn.Option) { t.Helper() o, err := runn.Load(path, append([]runn.Option{runn.T(t)}, opts...)...) if err != nil { t.Fatalf("load runbook %s: %v", path, err) } if err := o.RunN(context.Background()); err != nil { t.Fatalf("run runbook %s: %v", path, err) } }
実行方法
普通に go test で実行できます。
go test -v -timeout 5m -count=1 ./e2e/...
まとめ
Go 側のグルーコードは一定書く必要がありますが、個々のステップの実行定義は runnのYAMLランブックで宣言的に管理できるので、Argo の WorkflowTemplate と対応関係が分かりやすく保てるのが良いです。