«

Argo Workflow 简介及对比(下)

image.png

[toc]

本文接上文,继续介绍 Argo Workflows 的使用例子

Output Parameters/输出参数

输出参数提供了将步骤的结果作为参数而不是作为工件使用的通用机制。 这允许您将任何类型的步骤(而不仅仅是脚本)的结果用于条件测试、循环和参数。 输出参数的工作方式与脚本结果类似,只是输出参数的值设置为生成文件的内容,而不是标准输出的内容。

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: output-parameter-
spec:  
  entrypoint: output-parameter
  templates:
  - name: output-parameter
    steps:
    - - name: generate-parameter
        template: whalesay
    - - name: consume-parameter
        template: print-message
        arguments:
          parameters:
          # Pass the hello-param output from the generate-parameter step as the message input to print-message
          - name: message
            value: "{{steps.generate-parameter.outputs.parameters.hello-param}}"

  - name: whalesay
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["echo -n hello world > /tmp/hello_world.txt"]  # generate the content of hello_world.txt
    outputs:
      parameters:
      - name: hello-param        # name of output parameter
        valueFrom:
          path: /tmp/hello_world.txt    # set the value of hello-param to the contents of this hello-world.txt

  - name: print-message
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]

DAG 模板使用任务前缀引用另一个任务,例如 {{tasks.generate-parameter.outputs.parameters.hello-param}}

Loops/循环 vs Workflow Spec

在编写工作流程时,能够对一组输入进行迭代循环通常是非常有用的,如下例所示:

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: loops-
spec:  
  entrypoint: loop-example
  templates:
  - name: loop-example
    steps:
    - - name: print-message
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems:              # invoke whalesay once for each item in parallel
        - hello world           # item 1
        - goodbye world         # item 2

  - name: whalesay
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]
Argo Workflow Serverless Workflow
apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: loops-
spec:  
  entrypoint: loop-example
  templates:
  - name: loop-example
    steps:
    - - name: print-message
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems:              # invoke whalesay once for each item in parallel
        - hello world           # item 1
        - goodbye world         # item 2

  - name: whalesay
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]
id: loops-  
name: Loop over data example  
version: '1.0'  
functions:  
- name: whalesay
  resource: docker/whalesay:latest
  type: container
  metadata:
    command: cowsay
states:  
- name: injectdata
  type: INJECT
  start:
    kind: DEFAULT
  data:
    greetings:
    - hello world
    - goodbye world
  transition:
    nextState: printgreetings
- name: printgreetings
  type: FOREACH
  inputCollection: "$.greetings"
  inputParameter: "$.greeting"
  states:
  - name: foreach-print
    type: OPERATION
    start:
      kind: DEFAULT
    actions:
    - name: print-message
      functionRef:
        refName: whalesay
        parameters:
          message: "$.greeting"
    end:
      kind: DEFAULT
  end:
    kind: DEFAULT

我们还可以迭代多个项目:

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: loops-maps-
spec:  
  entrypoint: loop-map-example
  templates:
  - name: loop-map-example
    steps:
    - - name: test-linux
        template: cat-os-release
        arguments:
          parameters:
          - name: image
            value: "{{item.image}}"
          - name: tag
            value: "{{item.tag}}"
        withItems:
        - { image: 'debian', tag: '9.1' }       #item set 1
        - { image: 'debian', tag: '8.9' }       #item set 2
        - { image: 'alpine', tag: '3.6' }       #item set 3
        - { image: 'ubuntu', tag: '17.10' }     #item set 4

  - name: cat-os-release
    inputs:
      parameters:
      - name: image
      - name: tag
    container:
      image: "{{inputs.parameters.image}}:{{inputs.parameters.tag}}"
      command: [cat]
      args: [/etc/os-release]

我们可以将项目列表作为参数传递:

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: loops-param-arg-
spec:  
  entrypoint: loop-param-arg-example
  arguments:
    parameters:
    - name: os-list                                     # a list of items
      value: |
        [
          { "image": "debian", "tag": "9.1" },
          { "image": "debian", "tag": "8.9" },
          { "image": "alpine", "tag": "3.6" },
          { "image": "ubuntu", "tag": "17.10" }
        ]

  templates:
  - name: loop-param-arg-example
    inputs:
      parameters:
      - name: os-list
    steps:
    - - name: test-linux
        template: cat-os-release
        arguments:
          parameters:
          - name: image
            value: "{{item.image}}"
          - name: tag
            value: "{{item.tag}}"
        withParam: "{{inputs.parameters.os-list}}"      # parameter specifies the list to iterate over

  # This template is the same as in the previous example
  - name: cat-os-release
    inputs:
      parameters:
      - name: image
      - name: tag
    container:
      image: "{{inputs.parameters.image}}:{{inputs.parameters.tag}}"
      command: [cat]
      args: [/etc/os-release]

我们甚至可以动态地生成要迭代的项目列表!

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: loops-param-result-
spec:  
  entrypoint: loop-param-result-example
  templates:
  - name: loop-param-result-example
    steps:
    - - name: generate
        template: gen-number-list
    # Iterate over the list of numbers generated by the generate step above
    - - name: sleep
        template: sleep-n-sec
        arguments:
          parameters:
          - name: seconds
            value: "{{item}}"
        withParam: "{{steps.generate.outputs.result}}"

  # Generate a list of numbers in JSON format
  - name: gen-number-list
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import json
        import sys
        json.dump([i for i in range(20, 31)], sys.stdout)

  - name: sleep-n-sec
    inputs:
      parameters:
      - name: seconds
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo sleeping for {{inputs.parameters.seconds}} seconds; sleep {{inputs.parameters.seconds}}; echo done"]

Conditionals/条件 vs Workflow Spec

我们也支持如下例所示的条件执行:

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: coinflip-
spec:  
  entrypoint: coinflip
  templates:
  - name: coinflip
    steps:
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                 # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails
        template: tails                 # call tails template if "tails"
        when: "{{steps.flip-coin.outputs.result}} == tails"

  # Return heads or tails based on a random number
  - name: flip-coin
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"
        print(result)

  - name: heads
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]

  - name: tails
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was tails\""]
Argo Workflow Serverless Workflow
apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: coinflip-
spec:  
  entrypoint: coinflip
  templates:
  - name: coinflip
    steps:
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                 # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails
        template: tails                 # call tails template if "tails"
        when: "{{steps.flip-coin.outputs.result}} == tails"

  # Return heads or tails based on a random number
  - name: flip-coin
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"
        print(result)

  - name: heads
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]

  - name: tails
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was tails\""]
id: coinflip-  
name: Conditionals Example  
version: '1.0'  
functions:  
- name: flip-coin-function
  resource: python:alpine3.6
  type: script
  metadata:
    command: python
    source: import random result = "heads" if random.randint(0,1) == 0 else "tails"
      print(result)
- name: echo
  resource: alpine:3.6
  type: container
  metadata:
    command: sh, -c
states:  
- name: flip-coin
  type: OPERATION
  start:
    kind: DEFAULT
  actions:
  - functionRef:
      refName: flip-coin-function
    actionDataFilter:
      dataResultsPath: "$.flip.result"
  transition:
    nextState: show-flip-results
- name: show-flip-results
  type: SWITCH
  conditions:
  - path: "$.flip.result"
    value: heads
    operator: Equals
    transition:
      nextState: show-results-heads
  - path: "$.flip.result"
    value: tails
    operator: Equals
    transition:
      nextState: show-results-tails
- name: show-results-heads
  type: OPERATION
  actions:
  - functionRef:
      refName: echo
    actionDataFilter:
      dataResultsPath: it was heads
  end:
    kind: DEFAULT
- name: show-results-tails
  type: OPERATION
  actions:
  - functionRef:
      refName: echo
    actionDataFilter:
      dataResultsPath: it was tails
  end:
    kind: DEFAULT

重试失败或错误步骤 vs Workflow Spec

您可以指定一个 retryStrategy 来指定如何重试失败或错误的步骤:

# This example demonstrates the use of retry back offs
apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: retry-backoff-
spec:  
  entrypoint: retry-backoff
  templates:
  - name: retry-backoff
    retryStrategy:
      limit: 10
      retryPolicy: "Always"
      backoff:
        duration: "1"      # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
        factor: 2
        maxDuration: "1m"  # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
    container:
      image: python:alpine3.6
      command: ["python", -c]
      # fail with a 66% probability
      args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
Argo Workflow Serverless Workflow
apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: retry-backoff-
spec:  
  entrypoint: retry-backoff
  templates:
  - name: retry-backoff
    retryStrategy:
      limit: 10
      retryPolicy: "Always"
      backoff:
        duration: "1"      
        factor: 2
        maxDuration: "1m" 
    container:
      image: python:alpine3.6
      command: ["python", -c]
      # fail with a 66% probability
      args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
id: retry-backoff-  
name: Retry Example  
version: '1.0'  
functions:  
- name: fail-function
  resource: python:alpine3.6
  type: container
  metadata:
    command: python
states:  
- name: retry-backoff
  type: OPERATION
  start:
    kind: DEFAULT
  actions:
  - functionRef:
      refName: flip-coin-function
      parameters:
        args:
        - import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)
  retry:
  - expression:
      language: spel
      body: "$.exit_code == 1"
    maxAttempts: 10
    multiplier: PT2M
    interval: PT1M
  end:
    kind: DEFAULT

提供空的 retryStrategy (即 retryStrategy: {})将导致容器重试直到完成。 vs Workflow Spec

Recursion/递归 vs Workflow Spec

模板可以递归地互相调用! 在上述抛硬币模板的变化,我们继续抛硬币,直到出现正面。

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: coinflip-recursive-
spec:  
  entrypoint: coinflip
  templates:
  - name: coinflip
    steps:
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                 # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails                     # keep flipping coins if "tails"
        template: coinflip
        when: "{{steps.flip-coin.outputs.result}} == tails"

  - name: flip-coin
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"
        print(result)

  - name: heads
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]
Argo Workflow Serverless Workflow
apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: coinflip-recursive-
spec:  
  entrypoint: coinflip
  templates:
  - name: coinflip
    steps:
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                 # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails                     # keep flipping coins if "tails"
        template: coinflip
        when: "{{steps.flip-coin.outputs.result}} == tails"

  - name: flip-coin
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"
        print(result)

  - name: heads
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]
id: coinflip-recursive-  
name: Recursion Example  
version: '1.0'  
functions:  
- name: heads-function
  resource: alpine:3.6
  type: container
  metadata:
    command: echo "it was heads"
- name: flip-coin-function
  resource: python:alpine3.6
  type: script
  metadata:
    command: python
    source: import random result = "heads" if random.randint(0,1) == 0 else "tail"  print(result)
states:  
- name: flip-coin-state
  type: OPERATION
  start:
    kind: DEFAULT
  actions:
  - functionRef:
      refName: flip-coin-function
    actionDataFilter:
      dataResultsPath: "$.steps.flip-coin.outputs.result"
  transition:
    nextState: flip-coin-check
- name: flip-coin-check
  type: SWITCH
  conditions:
  - path: "$.steps.flip-coin.outputs.result"
    value: tails
    operator: Equals
    transition:
      nextState: flip-coin-state
  - path: "$.steps.flip-coin.outputs.result"
    value: heads
    operator: Equals
    transition:
      nextState: heads-state
- name: heads-state
  type: OPERATION
  actions:
  - functionRef:
      refName: heads-function
      parameters:
        args: echo "it was heads"
  end:
    kind: DEFAULT

下面是几组对比的结果。

argo get coinflip-recursive-tzcb5

STEP                         PODNAME                              MESSAGE  
 ✔ coinflip-recursive-vhph5
 ├---✔ flip-coin             coinflip-recursive-vhph5-2123890397
 └-·-✔ heads                 coinflip-recursive-vhph5-128690560
   └-○ tails

STEP                          PODNAME                              MESSAGE  
 ✔ coinflip-recursive-tzcb5
 ├---✔ flip-coin              coinflip-recursive-tzcb5-322836820
 └-·-○ heads
   └-✔ tails
     ├---✔ flip-coin          coinflip-recursive-tzcb5-1863890320
     └-·-○ heads
       └-✔ tails
         ├---✔ flip-coin      coinflip-recursive-tzcb5-1768147140
         └-·-○ heads
           └-✔ tails
             ├---✔ flip-coin  coinflip-recursive-tzcb5-4080411136
             └-·-✔ heads      coinflip-recursive-tzcb5-4080323273
               └-○ tails

在第一次运行,硬币立即出现正面,停止运行。 在第二次运行,硬币出现三次反面,最后出现正面,才停止运行。

注:圆圈代表递归,对号代表执行完成

Exit handlers/退出处理 vs Workflow Spec

退出处理是一个总是在工作流结束时执行的模板,无论成功还是失败。

退出处理的一些常见用例如下:

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: exit-handlers-
spec:  
  entrypoint: intentional-fail
  onExit: exit-handler                  # invoke exit-hander template at end of the workflow
  templates:
  # primary workflow template
  - name: intentional-fail
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo intentional failure; exit 1"]

  # Exit handler templates
  # After the completion of the entrypoint template, the status of the
  # workflow is made available in the global variable {{workflow.status}}.
  # {{workflow.status}} will be one of: Succeeded, Failed, Error
  - name: exit-handler
    steps:
    - - name: notify
        template: send-email
      - name: celebrate
        template: celebrate
        when: "{{workflow.status}} == Succeeded"
      - name: cry
        template: cry
        when: "{{workflow.status}} != Succeeded"
  - name: send-email
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo send e-mail: {{workflow.name}} {{workflow.status}}"]
  - name: celebrate
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo hooray!"]
  - name: cry
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo boohoo!"]

注意: 使用无服务器工作流规范,我们可以通过几种方式处理 Argos 的“onExit”功能。 一个是“onError”功能,用于捕捉错误并转换到工作流程的“error”部分。另一种是在工作流执行结束时发送一个包含工作流状态的事件。然后,此事件可以触发可以处理每个状态的其他工作流的执行。对于这个示例,我们使用“onError”定义。

Argo Workflow Serverless Workflow
apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: exit-handlers-
spec:  
  entrypoint: intentional-fail
  onExit: exit-handler           
  templates:
  # primary workflow template
  - name: intentional-fail
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo intentional failure; exit 1"]

  # Exit handler templates
  # After the completion of the entrypoint template, the status of the
  # workflow is made available in the global variable {{workflow.status}}.
  # {{workflow.status}} will be one of: Succeeded, Failed, Error
  - name: exit-handler
    steps:
    - - name: notify
        template: send-email
      - name: celebrate
        template: celebrate
        when: "{{workflow.status}} == Succeeded"
      - name: cry
        template: cry
        when: "{{workflow.status}} != Succeeded"
  - name: send-email
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo send e-mail: {{workflow.name}} {{workflow.status}}"]
  - name: celebrate
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo hooray!"]
  - name: cry
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo boohoo!"]
id: exit-handlers-  
name: Exit/Error Handling Example  
version: '1.0'  
functions:  
- name: intentional-fail-function
  resource: alpine:latest
  type: container
  metadata:
    command: "[sh, -c]"
- name: send-email-function
  resource: alpine:latest
  type: script
  metadata:
    command: "[sh, -c]"
- name: celebrate-cry-function
  resource: alpine:latest
  type: script
  metadata:
    command: "[sh, -c]"
states:  
- name: intentional-fail-state
  type: OPERATION
  start:
    kind: DEFAULT
  actions:
  - functionRef:
      refName: intentional-fail-function
      parameters:
        args: echo intentional failure; exit 1
  onError:
  - expression:
      language: spel
      body: "$.error != null"
    errorDataFilter:
      dataOutputPath: "$.exit-code"
  transition:
    nextState: send-email-state
- name: send-email-state
  type: OPERATION
  actions:
  - functionRef:
      refName: send-email-function
      parameters:
        args: 'echo send e-mail: $.workflow.name $.exit-code'
  transition:
    nextState: emo-state
- name: emo-state
  type: SWITCH
  conditions:
  - path: "$.exit-code"
    value: '1'
    operator: Equals
    transition:
      nextState: celebrate-state
  - path: "$.exit-code"
    value: '1'
    operator: NotEquals
    transition:
      nextState: cry-state
- name: celebrate-state
  type: OPERATION
  actions:
  - functionRef:
      refName: celebrate-cry-function
      parameters:
        args: echo hooray!
  end:
    kind: DEFAULT
- name: cry-state
  type: OPERATION
  actions:
  - functionRef:
      refName: celebrate-cry-function
      parameters:
        args: echo boohoo!
  end:
    kind: DEFAULT

Timeouts

若要限制工作流的运行时间,可以设置变量 activeDeadlineSeconds

# To enforce a timeout for a container template, specify a value for activeDeadlineSeconds.
apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: timeouts-
spec:  
  entrypoint: sleep
  templates:
  - name: sleep
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo sleeping for 1m; sleep 60; echo done"]
    activeDeadlineSeconds: 10           # terminate container template after 10 seconds

Volumes/卷

下面的示例动态创建一个卷,然后在工作流中使用该卷。

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: volumes-pvc-
spec:  
  entrypoint: volumes-pvc-example
  volumeClaimTemplates:                 # define volume, same syntax as k8s Pod spec
  - metadata:
      name: workdir                     # name of volume claim
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 1Gi                  # Gi => 1024 * 1024 * 1024

  templates:
  - name: volumes-pvc-example
    steps:
    - - name: generate
        template: whalesay
    - - name: print
        template: print-message

  - name: whalesay
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"]
      # Mount workdir volume at /mnt/vol before invoking docker/whalesay
      volumeMounts:                     # same syntax as k8s Pod spec
      - name: workdir
        mountPath: /mnt/vol

  - name: print-message
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"]
      # Mount workdir volume at /mnt/vol before invoking docker/whalesay
      volumeMounts:                     # same syntax as k8s Pod spec
      - name: workdir
        mountPath: /mnt/vol

卷是将大量数据从工作流中的一个步骤移动到另一个步骤的非常有用的方法。 根据系统的不同,某些卷可以从多个步骤并发地访问。

在某些情况下,您希望访问一个已经存在的卷,而不是动态地创建或销毁一个卷。

# Define Kubernetes PVC
kind: PersistentVolumeClaim  
apiVersion: v1  
metadata:  
  name: my-existing-volume
spec:  
  accessModes: [ "ReadWriteOnce" ]
  resources:
    requests:
      storage: 1Gi

---
apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: volumes-existing-
spec:  
  entrypoint: volumes-existing-example
  volumes:
  # Pass my-existing-volume as an argument to the volumes-existing-example template
  # Same syntax as k8s Pod spec
  - name: workdir
    persistentVolumeClaim:
      claimName: my-existing-volume

  templates:
  - name: volumes-existing-example
    steps:
    - - name: generate
        template: whalesay
    - - name: print
        template: print-message

  - name: whalesay
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"]
      volumeMounts:
      - name: workdir
        mountPath: /mnt/vol

  - name: print-message
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"]
      volumeMounts:
      - name: workdir
        mountPath: /mnt/vol

也可以在模板级别而不是工作流级别声明现有的卷, 使用resource步骤生成卷在工作流中很有用。

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: template-level-volume-
spec:  
  entrypoint: generate-and-use-volume
  templates:
  - name: generate-and-use-volume
    steps:
    - - name: generate-volume
        template: generate-volume
        arguments:
          parameters:
            - name: pvc-size
              # In a real-world example, this could be generated by a previous workfow step.
              value: '1Gi'
    - - name: generate
        template: whalesay
        arguments:
          parameters:
            - name: pvc-name
              value: '{{ steps.generate-volume.outputs.parameters.pvc-name }}'
    - - name: print
        template: print-message
        arguments:
          parameters:
            - name: pvc-name
              value: '{{ steps.generate-volume.outputs.parameters.pvc-name }}'

  - name: generate-volume
    inputs:
      parameters:
        - name: pvc-size
      resource:
        action: create
        setOwnerReference: true
        manifest: |
          apiVersion: v1
          kind: PersistentVolumeClaim
          metadata:
            generateName: pvc-example-
          spec:
            accessModes: ['ReadWriteOnce', 'ReadOnlyMany']
            resources:
              requests:
                storage: '{{inputs.parameters.pvc-size}}'
      outputs:
        parameters:
          - name: pvc-name
            valueFrom:
              jsonPath: '{.metadata.name}'

  - name: whalesay
    inputs:
      parameters:
        - name: pvc-name
    volumes:
      - name: workdir
        persistentVolumeClaim:
          claimName: '{{inputs.parameters.pvc-name}}'
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"]
      volumeMounts:
      - name: workdir
        mountPath: /mnt/vol

  - name: print-message
    inputs:
        parameters:
          - name: pvc-name
    volumes:
      - name: workdir
        persistentVolumeClaim:
          claimName: '{{inputs.parameters.pvc-name}}'
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"]
      volumeMounts:
      - name: workdir
        mountPath: /mnt/vol

Suspending/挂起

工作流可以通过如下方式挂起

argo suspend WORKFLOW  

或者只挂起工作流一个步骤:

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: suspend-template-
spec:  
  entrypoint: suspend
  templates:
  - name: suspend
    steps:
    - - name: build
        template: whalesay
    - - name: approve
        template: approve
    - - name: delay
        template: delay
    - - name: release
        template: whalesay

  - name: approve
    suspend: {}

  - name: delay
    suspend:
      duration: 20    # Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"

  - name: whalesay
    container:
      image: docker/whalesay
      command: [cowsay]
      args: ["hello world"]

一旦挂起,工作流将不会计划任何新的步骤,直到它恢复。可以通过如下方式手动恢复

argo resume WORKFLOW  

或如上面的例子所示,自动经过一段持续时间后恢复。

Daemon Containers/守护进程容器

Argo 工作流可以启动在后台运行的容器(也称为守护进程容器) ,同时工作流本身继续执行。 请注意,当工作流退出调用守护进程的模板范围时,守护进程将被自动销毁。 守护进程容器对于启动要测试的服务或在测试中使用的服务(例如 fixture)很有用。 我们还发现,在运行大型模拟以启动数据库作为收集和组织结果的守护进程时,它非常有用。 与 sidecar 相比,守护进程的最大优势在于它们的存在可以跨越多个步骤甚至整个工作流。

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: daemon-step-
spec:  
  entrypoint: daemon-example
  templates:
  - name: daemon-example
    steps:
    - - name: influx
        template: influxdb              # start an influxdb as a daemon (see the influxdb template spec below)

    - - name: init-database             # initialize influxdb
        template: influxdb-client
        arguments:
          parameters:
          - name: cmd
            value: curl -XPOST 'http://{{steps.influx.ip}}:8086/query' --data-urlencode "q=CREATE DATABASE mydb"

    - - name: producer-1                # add entries to influxdb
        template: influxdb-client
        arguments:
          parameters:
          - name: cmd
            value: for i in $(seq 1 20); do curl -XPOST 'http://{{steps.influx.ip}}:8086/write?db=mydb' -d "cpu,host=server01,region=uswest load=$i" ; sleep .5 ; done
      - name: producer-2                # add entries to influxdb
        template: influxdb-client
        arguments:
          parameters:
          - name: cmd
            value: for i in $(seq 1 20); do curl -XPOST 'http://{{steps.influx.ip}}:8086/write?db=mydb' -d "cpu,host=server02,region=uswest load=$((RANDOM % 100))" ; sleep .5 ; done
      - name: producer-3                # add entries to influxdb
        template: influxdb-client
        arguments:
          parameters:
          - name: cmd
            value: curl -XPOST 'http://{{steps.influx.ip}}:8086/write?db=mydb' -d 'cpu,host=server03,region=useast load=15.4'

    - - name: consumer                  # consume intries from influxdb
        template: influxdb-client
        arguments:
          parameters:
          - name: cmd
            value: curl --silent -G http://{{steps.influx.ip}}:8086/query?pretty=true --data-urlencode "db=mydb" --data-urlencode "q=SELECT * FROM cpu"

  - name: influxdb
    daemon: true                        # start influxdb as a daemon
    retryStrategy:
      limit: 10                         # retry container if it fails
    container:
      image: influxdb:1.2
      readinessProbe:                   # wait for readinessProbe to succeed
        httpGet:
          path: /ping
          port: 8086

  - name: influxdb-client
    inputs:
      parameters:
      - name: cmd
    container:
      image: appropriate/curl:latest
      command: ["/bin/sh", "-c"]
      args: ["{{inputs.parameters.cmd}}"]
      resources:
        requests:
          memory: 32Mi
          cpu: 100m

DAG 模板使用任务前缀来引用另一个任务,例如 {{ tasks.influx.ip }}

Sidecars/边车

边车是另一种容器,它与主容器在同一个容器中并发运行,在创建多容器的 pods 中很有用。

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: sidecar-nginx-
spec:  
  entrypoint: sidecar-nginx-example
  templates:
  - name: sidecar-nginx-example
    container:
      image: appropriate/curl
      command: [sh, -c]
      # Try to read from nginx web server until it comes up
      args: ["until `curl -G 'http://127.0.0.1/' >& /tmp/out`; do echo sleep && sleep 1; done && cat /tmp/out"]
    # Create a simple nginx web server
    sidecars:
    - name: nginx
      image: nginx:1.13

在上面的例子中,我们创建了一个 sidecar 容器,它作为一个简单的 web 服务器运行 nginx。 容器出现的顺序是随机的,因此在本例中,主容器轮询 nginx 容器,直到它准备好为请求服务。 在设计多容器系统时,这是一个很好的设计模式: 在运行主代码之前,始终等待所需的任何服务。

Hardwired Artifacts/硬连线工件

使用 Argo,您可以使用任何您喜欢的容器图像来生成任何类型的工件。 然而,在实践中,我们发现某些类型的工件非常常见,因此存在对 git、 http、 gcs 和 s3 的工件提供了内置支持。

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: hardwired-artifact-
spec:  
  entrypoint: hardwired-artifact
  templates:
  - name: hardwired-artifact
    inputs:
      artifacts:
      # Check out the master branch of the argo repo and place it at /src
      # revision can be anything that git checkout accepts: branch, commit, tag, etc.
      - name: argo-source
        path: /src
        git:
          repo: https://github.com/argoproj/argo.git
          revision: "master"
      # Download kubectl 1.8.0 and place it at /bin/kubectl
      - name: kubectl
        path: /bin/kubectl
        mode: 0755
        http:
          url: https://storage.googleapis.com/kubernetes-release/release/v1.8.0/bin/linux/amd64/kubectl
      # Copy an s3 compatible artifact repository bucket (such as AWS, GCS and Minio) and place it at /s3
      - name: objects
        path: /s3
        s3:
          endpoint: storage.googleapis.com
          bucket: my-bucket-name
          key: path/in/bucket
          accessKeySecret:
            name: my-s3-credentials
            key: accessKey
          secretKeySecret:
            name: my-s3-credentials
            key: secretKey
    container:
      image: debian
      command: [sh, -c]
      args: ["ls -l /src /bin/kubectl /s3"]

Kubernetes Resources/Kubernetes资源

在许多情况下,您希望管理 Argo 工作流中的 Kubernetes 资源。 资源模板允许您创建、删除或更新任何类型的 Kubernetes 资源。

# in a workflow. The resource template type accepts any k8s manifest
# (including CRDs) and can perform any kubectl action against it (e.g. create,
# apply, delete, patch).
apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: k8s-jobs-
spec:  
  entrypoint: pi-tmpl
  templates:
  - name: pi-tmpl
    resource:                   # indicates that this is a resource template
      action: create            # can be any kubectl action (e.g. create, delete, apply, patch)
      # The successCondition and failureCondition are optional expressions.
      # If failureCondition is true, the step is considered failed.
      # If successCondition is true, the step is considered successful.
      # They use kubernetes label selection syntax and can be applied against any field
      # of the resource (not just labels). Multiple AND conditions can be represented by comma
      # delimited expressions.
      # For more details: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
      successCondition: status.succeeded > 0
      failureCondition: status.failed > 3
      manifest: |               #put your kubernetes spec here
        apiVersion: batch/v1
        kind: Job
        metadata:
          generateName: pi-job-
        spec:
          template:
            metadata:
              name: pi
            spec:
              containers:
              - name: pi
                image: perl
                command: ["perl",  "-Mbignum=bpi", "-wle", "print bpi(2000)"]
              restartPolicy: Never
          backoffLimit: 4

以这种方式创建的资源独立于工作流。 如果您希望在删除工作流时删除资源,那么您可以将 Kubernetes 垃圾收集与工作流资源引用相同所有者(例子)。

注意: 当进行修补时,资源将接受另一个属性 mergeStrategy,它可以是 strategicmergejson。 如果没有提供此属性,则默认为 strategic。 请记住,自定义资源不能与strategic补丁一起使用,因此必须选择不同的策略。 例如,假设定义了 CronTab CustomResourceDefinition ,以及下面的 CronTab 实例:

apiVersion: "stable.example.com/v1"  
kind: CronTab  
spec:  
  cronSpec: "* * * * */5"
  image: my-awesome-cron-image

这个 Crontab 可以通过下面的 Argo Workflow 进行修改:

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: k8s-patch-
spec:  
  entrypoint: cront-tmpl
  templates:
  - name: cront-tmpl
    resource:
      action: patch
      mergeStrategy: merge                 # Must be one of [strategic merge json]
      manifest: |
        apiVersion: "stable.example.com/v1"
        kind: CronTab
        spec:
          cronSpec: "* * * * */10"
          image: my-awesome-cron-image

使用边车模式的 Docker-in-Docker

Sidecar 的一个应用是实现 Docker-in-Docker (DinD)。 当您希望从容器内部运行 Docker 命令时,DinD 非常有用。 例如,您可能希望在你的构建容器中生成并推送容器映像。 在下面的示例中,我们使用 docker:dind 容器在 sidecar 中运行 Docker 守护进程,并向主容器提供对守护进程的访问。

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: sidecar-dind-
spec:  
  entrypoint: dind-sidecar-example
  templates:
  - name: dind-sidecar-example
    container:
      image: docker:17.10
      command: [sh, -c]
      args: ["until docker ps; do sleep 3; done; docker run --rm debian:latest cat /etc/os-release"]
      env:
      - name: DOCKER_HOST               # the docker daemon can be access on the standard port on localhost
        value: 127.0.0.1
    sidecars:
    - name: dind
      image: docker:17.10-dind          # Docker already provides an image for running a Docker daemon
      securityContext:
        privileged: true                # the Docker daemon can only run in a privileged container
      # mirrorVolumeMounts will mount the same volumes specified in the main container
      # to the sidecar (including artifacts), at the same mountPaths. This enables
      # dind daemon to (partially) see the same filesystem as the main container in
      # order to use features such as docker volume binding.
      mirrorVolumeMounts: true

自定义模板变量引用

在这个示例中,我们可以看到如何在Argo workflow模板中使用其他模板语言变量引用(例如: Jinja)。 Argo 将仅验证和解析以 Argo 开头的变量,该变量允许前缀{ "item", "steps", "inputs", "outputs", "workflow", "tasks" }

apiVersion: argoproj.io/v1alpha1  
kind: Workflow  
metadata:  
  generateName: custom-template-variable-
spec:  
  entrypoint: hello-hello-hello

  templates:
    - name: hello-hello-hello
      steps:
        - - name: hello1
            template: whalesay
            arguments:
              parameters: [{name: message, value: "hello1"}]
        - - name: hello2a
            template: whalesay
            arguments:
              parameters: [{name: message, value: "hello2a"}]
          - name: hello2b
            template: whalesay
            arguments:
              parameters: [{name: message, value: "hello2b"}]

    - name: whalesay
      inputs:
        parameters:
          - name: message
      container:
        image: docker/whalesay
        command: [cowsay]
        args: ["{{user.username}}"]

Continuous Integration Example/持续集成示例

持续集成是工作流的一个流行应用程序。 目前,Argo 没有提供事件触发器来自动启动您的 CI 作业,但我们计划在不久的将来这样做。 在此之前,您可以轻松地编写一个 cron 作业来检查新的提交并启动所需的工作流,或者使用您现有的 Jenkins 服务器启动工作流。

持续集成工作流规范的一个很好的例子是在 https://github.com/argoproj/argo/tree/master/examples/influxdb-CI.yaml 提供的。 因为它只使用了我们已经讨论过的概念,而且有点长,所以我们在这里不深入讨论细节。

分享