Mistral 工作流语言 (v2)

介绍

本文档完整描述了 Mistral 工作流服务版本 2 的 Mistral 工作流语言。自 2014 年 5 月发布版本 1 以来,Mistral 团队完全重做了该语言,旨在使其更易于理解,同时更一致和灵活。

与 Mistral 工作流语言 v1 不同,v2 假定 Mistral 处理的所有实体(如工作流和操作)在通过 API(以及 Python Client API 和 CLI)引用和访问时完全独立。工作簿(Workbook)是可以在工作流和操作中组合实体的实体,仍然存在于该语言中,但仅用于命名空间和方便起见。有关更多详细信息,请参阅 工作簿部分

注意:Mistral 工作流语言和版本 1 的 API 自 2015 年 4 月起已不再支持,版本 2 现在是与 Mistral 服务交互的唯一方式。

Mistral 工作流语言由以下主要对象(实体)类型组成,将在下面详细描述

先决条件

Mistral 工作流语言支持 YAQLJinja2 表达式语言,以引用工作流上下文变量,从而实现工作流任务之间的数据传递。它也被称为数据流机制。YAQL 是一种简单但功能强大的查询语言,允许从 JSON 结构化数据中提取所需的信息。虽然 Jinja2 主要是一种模板技术,但 Mistral 也将其用于评估表达式,因此用户可以在 YAQL 和 Jinja2 之间进行选择。在单个工作流定义中组合这两种表达式语言也是可能的。唯一的限制是在同一行内无法使用两种类型的表达式。只要 YAQL 和 Jinja2 表达式位于工作流定义文本的不同行,则它是有效的。允许在 Mistral 工作流语言的以下部分中使用 YAQL/Jinja2

Mistral 工作流语言完全基于 YAML,并且了解 YAML 对于更好地理解本规范中的材料非常有帮助。它还利用支持的查询语言来定义工作流和操作定义中的表达式。

工作流

工作流是 Mistral 工作流语言的基本构建块,也是项目存在的原因。工作流代表一个可以用多种方式描述的过程,并且可以做一些对最终用户来说有趣的工作。每个工作流都由任务(至少一个)组成,描述了在工作流执行期间应采取的精确步骤。

在 YAQL 中,应使用 ‘<% $.x %>’,在 Jinja 表达式中使用 ‘{{ _.x }}’ 来访问工作流执行数据上下文中的 x 变量。

YAML 示例

---
version: '2.0'

create_vm:
  description: Simple workflow example

  input:
    - vm_name
    - image_ref
    - flavor_ref
  output:
    vm_id: "{{ _.vm_id }}"
    vm_status: <% $.vm_status %>

  tasks:
    create_server:
      action: nova.servers_create name=<% $.vm_name %> image=<% $.image_ref %> flavor=<% $.flavor_ref %>
      publish:
        vm_id: <% task().result.id %>
      on-success:
        - wait_for_instance

    wait_for_instance:
      action: nova.servers_find id={{ _.vm_id }} status='ACTIVE'
      retry:
        delay: 5
        count: 15
      publish:
        vm_status: "{{ task().result.status }}"

此示例工作流只是将命令发送到 OpenStack Compute 服务 Nova,以开始创建虚拟机并使用特殊的“retry”策略等待其创建完成。

工作流类型

Mistral 工作流语言 v2 引入了不同的工作流类型,并且每种工作流类型的结构根据其语义而异。基本上,工作流类型封装了工作流处理逻辑,一组定义所有此类工作流应如何工作的元规则。目前,Mistral 提供了两种工作流类型

有关详细信息,请参阅相应的部分。

常见工作流属性

  • type - 工作流类型。可以是 ‘direct’ 或 ‘reverse’。可选。默认为 ‘direct’。

  • description - 包含工作流描述的任意文本。可选

  • input - 列表,定义所需的输入参数名称,以及可选地以“my_param: 123”的形式定义它们的默认值。可选

  • output - 任意数据结构,包含 YAQL/Jinja2 表达式,定义工作流输出。可以嵌套。可选

  • output-on-error - 任意数据结构,包含 YAQL/Jinja2 表达式,定义工作流在进入错误状态时要返回的输出。可以嵌套。可选

  • task-defaults - 在工作流级别定义的某些任务属性的默认设置。可选。为特定任务定义的相应属性始终优先。可以在 task-defaults 中定义的特定任务属性如下

    • on-error - 任务完成出错后将运行的任务列表。对于 直接工作流 而言。可选

    • on-success - 任务成功完成后将运行的任务列表。对于 直接工作流 而言。可选

    • on-complete - 任务完成(无论成功与否)后将运行的任务列表。对于 直接工作流 而言。可选

    • requires - 任务依赖的任务列表。对于 反向工作流 而言。可选

    • pause-before - 配置 pause-before 策略。可选

    • wait-before - 配置 wait-before 策略。可选

    • wait-after - 配置 wait-after 策略。可选

    • fail-on - 配置 fail-on 策略。可选

    • timeout - 配置 timeout 策略。可选

    • retry - 配置 retry 策略。可选

    • concurrency - 配置 concurrency 策略。可选

    • safe-rerun - 配置 safe-rerun 策略。可选

  • tasks - 包含工作流任务的字典。有关更多详细信息,请参阅下文。必需

任务

任务是工作流组成的部分。它定义了工作流中的一个特定计算步骤。当工作流引擎处理用 YAML 编写的工作流文本中描述的实体时,它会安排任务执行。安排任务意味着它现在有资格执行,稍后会运行。确切的运行时间取决于系统负载和配置。每个任务可以选择性地获取输入数据并产生输出。在 Mistral 工作流语言 v2 中,任务可以与操作或工作流关联。在下面的示例中,有两种不同类型的任务

action_based_task:
  action: std.http url='openstack.org'

workflow_based_task:
  workflow: backup_vm_workflow vm_id=<% $.vm_id %>

操作将在下面单独介绍,但展望未来,值得一提的是 Mistral 提供了许多开箱即用的操作(包括大多数核心 OpenStack 服务的操作),并且将新操作插入 Mistral 也很容易。

常见任务属性

所有 Mistral 任务,无论工作流类型如何,都具有以下通用属性

  • name - 任务名称不得等于 noopfailsucceedpause。最大长度为 255 个字符。对于具有 join 控制流的任务,此限制为 208 个字符。

  • description - 包含任务描述的任意文本。可选

  • action - 与任务关联的操作的名称。可以是静态值或表达式(例如,“{{ _.action_name }}”)。 workflow 互斥。如果未提供操作或工作流,则将使用不执行任何操作的 ‘std.noop’ 操作。

  • workflow - 与任务关联的工作流的名称。可以是静态值或表达式(例如,“{{ _.subworkflow_name }}”)。 action 互斥

  • input - 任务的操作或工作流的实际输入参数值。可选。值可以是 JSON 兼容的类型,例如数字、字符串等、字典或列表。它也可以是 YAQL/Jinja2 表达式,以从任务上下文或上述类型中检索值,这些类型包含内联表达式(例如,字符串“<% $.movie_name %> is a cool movie!”)。可以评估为 JSON 对象的表达式。

  • publish - 要发布到工作流上下文的变量的字典。任何 JSON 兼容的数据结构,可选地包含选择要发布的内容的表达式。已发布的变量将通过使用表达式可供下游任务访问。注意! Mistral 将变量保存到存储(上下文)中,该存储仅与分支关联。例如,在 A1 的声明中,表达式“$.my_var”始终评估为 1,对于 B1 它始终评估为 2。这不取决于 A 和 B 的运行顺序。这是因为我们有两个分支(A -> A1 和 B -> B1),对于这些分支,变量“my_var”有其自己不同的版本。可选

version: '2.0'
wf:
  tasks:
    A:
      action: std.noop
      publish:
        my_var: 1
      on-success: A1
    A1:
      action: my_action param1=<% $.my_var %>
    B:
      action: std.noop
      publish:
        my_var: 2
      on-success: B1
    B1:
      action: my_action param1=<% $.my_var %>
  • publish-on-error - 与 publish 相同,但在任务执行失败时进行评估。可选

  • with-items - 如果配置了,它允许在提供的项目列表中多次运行与任务关联的操作或工作流。有关详细信息,请参阅 使用 ‘with-items’ 处理集合可选

  • keep-result - 布尔值,允许在任务完成后不存储操作结果(例如,如果它们很大且以后不需要)。可选。默认值为 ‘true’。

  • target - 字符串参数。它定义了应将任务操作发送到的执行器。目标是指执行器的名称。执行器的名称可以在 Mistral 配置文件中的“host”属性中定义。如果多个执行器具有相同的名称,则任务操作仅发送到其中一个执行器。可选

  • pause-before - 配置 pause-before 策略。可选

  • wait-before - 配置 wait-before 策略。可选

  • wait-after - 配置 wait-after 策略。可选

  • fail-on - 配置 fail-on 策略。可选

  • timeout - 配置 timeout 策略。可选

  • retry - 配置 retry 策略。可选

  • concurrency - 配置 concurrency 策略。可选

  • safe-rerun - 布尔值,允许在执行器在操作执行期间崩溃时重新运行任务。如果设置为 ‘true’,则任务可能会运行两次。可选。默认设置为 ‘false’。

工作流

如果任务具有 ‘workflow’ 属性,它将同步启动具有给定名称的子工作流。

静态子工作流名称示例

my_task:
  workflow: name_of_my_workflow

动态子工作流名称示例

---
version: '2.0'

framework:
  input:
    - magic_workflow_name: show_weather

  tasks:
    weather_data:
      action: std.echo
      input:
        output:
          location: wherever
          temperature: "22C"
      publish:
        weather_data: <% task().result %>
      on-success:
        - do_magic

    do_magic:
      # Reference workflow by parameter.
      workflow: <% $.magic_workflow_name %>
      # Expand dictionary to input parameters.
      input: <% $.weather_data %>

show_weather:
  input:
    - location
    - temperature

  tasks:
    write_data:
      action: std.echo
      input:
        output: "<% $.location %>: <% $.temperature %>"

在此示例中,我们定义了 YAML 片段中的两个工作流,并且工作流 ‘framework’ 可以在 ‘framework’ 通过输入参数 ‘magic_workflow_name’ 接收到相应的子工作流名称的情况下调用工作流 ‘show_weather’。在这种情况下,它默认设置,因此用户无需显式传递任何内容。

注意:动态子工作流选择的典型用途是当工作流的一部分可以自定义时。例如,收集一些天气数据,然后在它上面执行一些自定义工作流。

策略

任何 Mistral 任务,无论其工作流类型如何,都可以选择性地配置策略。

YAML 示例

my_task:
  action: my_action
  pause-before: true
  wait-before: 2
  wait-after: 4
  fail-on: <% $.some_value < 4 %>
  timeout: 30
  retry:
    count: 10
    delay: 20
    break-on: <% $.my_var = true %>
    continue-on: <% $.my_var = false %>

pause-before

定义 Mistral 引擎在开始任务之前是否应将工作流置于暂停状态。

wait-before

定义 Mistral 引擎在开始任务之前应等待的秒数延迟。

wait-after

定义 Mistral 引擎在任务完成后应等待的秒数延迟,然后再启动在 on-successon-erroron-complete 中定义的下一个任务。

fail-on

定义任务将失败的条件,即使操作已成功完成。

timeout

定义一个时间段(以秒为单位),在此之后,如果任务未完成,引擎将自动使任务失败。

concurrency

定义任务中同时运行的动作的最大数量。仅适用于 具有 with-items 的任务。如果未设置任务的 concurrency 属性,则任务的操作(或嵌套工作流)将立即安排执行。

retry

定义一种模式,用于在发生错误时重复任务。

  • count - 定义任务可以重复的最大次数。

  • delay - 定义后续任务迭代之间的秒数延迟。

  • break-on - 定义一个表达式,如果评估为 ‘true’,将中断迭代循环。如果触发,则任务被视为错误。

  • continue-on - 定义一个表达式,如果评估为 ‘true’,将继续迭代循环。如果触发,则任务被视为成功。如果评估为 ‘false’,则策略将中断迭代。

重试策略也可以配置为单行

task1:
  action: my_action
  retry: count=10 delay=5 break-on=<% $.foo = 'bar' %>

任何策略的所有参数值都可以定义为 YAQL/Jinja2 表达式。

注意: 在同一个重试块中使用 break-on 和 continue-on 很少见。break-on 应该用于期望操作在尝试一段时间后处于 ERROR 状态,但最终可能会进入 SUCCESS 状态,从而停止循环。但是,如果 break-on‘true’,则重试将停止,任务将处于 ERROR 状态。continue-on 应该用于操作通常会返回 SUCCESS,但操作具有可用于指示是否继续循环的其他结果。

注意:在触发超时策略后,重试任务策略不起作用。在直接工作流或任务重试的情况下,应使用 on-error 来重新执行任务。

输入语法

在描述工作流任务时,可以以两种方式指定其输入参数

完整语法

my_task:
  action: std.http
  input:
    url: http://mywebsite.org
    method: GET

简化语法

my_task:
  action: std.http url="http://mywebsite.org" method="GET"

具有动态输入参数映射的语法

---
version: '2.0'

example_workflow:
  input:
    - http_request_parameters:
        url: http://mywebsite.org
        method: GET

  tasks:
    setup_task:
      action: std.http
      input: <% $.http_request_parameters %>

相同的规则适用于与工作流关联的任务。

完整语法

my_task:
  workflow: some_nested_workflow
  input:
    param1: val1
    param2: val2

简化语法

my_task:
  workflow: some_nested_workflow param1='val1' param2='val2'

具有动态输入参数映射的语法

---
version: '2.0'

example_workflow:
  input:
    - nested_params: {"param1": "val1", "param2": "val2"}

  tasks:
    setup_task:
      workflow: some_nested_workflow
      input: <% $.nested_params %>

注意:也可以合并这两种方法,并使用关键字 input 指定一部分参数,使用简化的键值对语法。在这种情况下,所有参数将有效地合并。如果同一参数以两种方式指定,则 input 下的参数优先。

直接工作流

直接工作流由组合成图的任务组成,每个后续任务在另一个任务之后启动,具体取决于生成的结果。因此,直接工作流具有转换的概念。如果没有任何可以用来跳转到下一个任务的转换,则直接工作流被认为已完成。

../_images/direct_workflow.png

图 1. Mistral 直接工作流。

YAML 示例

---
version: '2.0'

create_vm_and_send_email:
  type: direct

  input:
    - vm_name
    - image_id
    - flavor_id

  output:
    result: <% $.vm_id %>

  tasks:
    create_vm:
      action: nova.servers_create name=<% $.vm_name %> image=<% $.image_id %> flavor=<% $.flavor_id %>
      publish:
        vm_id: <% task(create_vm).result.id %>
      on-error:
        - send_error_email
      on-success:
        - send_success_email

    send_error_email:
      action: send_email to_addrs=['admin@mysite.org'] body='Failed to create a VM'
      on-complete:
        - fail

    send_success_email:
      action: send_email to_addrs=['admin@mysite.org'] body='Vm is successfully created and its id <% $.vm_id %>'

直接工作流任务属性

Mistral 支持以下任务转换

  • on-success - 任务成功完成后将运行的任务列表。可选

  • on-error - 任务完成出错后将运行的任务列表。可选

  • on-complete - 任务完成(无论成功与否)后将运行的任务列表。可选

可以以两种方式定义任务转换

首先只是一个任务列表。您可以在上面找到工作流示例。第二种方法是

*transition*:
  publish:
    global:
        some_global_variable: some_value
    branch:
        some_branch_variable: some_value
    next:
      - *next_task*

transitions下定义的发布可以可选地定义范围,以便能够发布到不同的范围:‘branch’和‘global’。在‘branch’下指定变量将使Mistral发布到分支工作流上下文,就像‘publish’和‘publish-on-error’一样。在‘global’下指定变量将使Mistral发布到全局工作流上下文。您可以在YAQL中使用“$.”,在Jinja中使用“_.”来访问全局变量,但分支变量可以在当前分支中覆盖它们。为了防止这种情况,您可以使用YAQL/Jinja函数“global()”来显式访问工作流全局上下文中的变量。

如果‘publish’在‘on-complete’中定义,也在‘on-success’和/或‘on-error’中定义,那么发布的结果将是‘on-complete’发布的内容与‘on-success’或‘on-error’发布的内容的合并,具体取决于任务状态。如果‘on-complete’发布了‘on-success’或‘on-error’也发布的变量,那么后者优先。换句话说,在这种情况下,‘on-complete’被认为是一个默认值,可以被更具体的‘on-XXX’子句覆盖。

transitions下定义的关键字‘next’可选地包含一个任务列表,这些任务将在当前任务完成后运行。

编写和读取全局变量的示例

---
version: '2.0'

wf:
  tasks:
    A:
      action: std.noop
      on-success:
        publish:
          branch:
            my_var: "branch value"
          global:
            my_var: "global value"
        next: A1

    A1:
      # $.my_var will always evaluate to "branch value" because A1 belongs
      # to the same branch as A and runs after A. When using "$" to access
      # context variables branch values have higher priority.
      # In order to access global context reliably we need to use YAQL/Jinja
      # function 'global'. So global(my_var) will always evaluate to
      # 'global value'.
      action: my_action1 param1=<% $.my_var %> param2=<% global(my_var) %>

    B:
      # $.my_var will evaluate to "global value" if task A completes
      # before task B and "null", if not. It's because A and B are
      # parallel and 'publish' in A doesn't apply to B, only
      # 'publish-global' does. In this example global(my_var) has the same
      # meaning as $.my_var because there's no ambiguity from what context
      # we should take variable 'my_var'.
      action: my_action2 param1=<% $.my_var %> param2=<% global(my_var) %>

注意!重要的是要注意,这是一种修改数据的不受保护的方式,因为当从并行分支写入相同变量的不同值时,可能会发生竞争条件。换句话说,如果我们有分支A和B,并且这些分支中存在首先读取全局变量X,然后递增它并写入新值的任务,Mistral不会保证在完成任务A和B之后的结果值将是X + 2。在某些情况下,它可能是X + 1,因为可能会发生以下情况:任务A读取X,任务B读取X,任务B递增X,任务B写入X + 1,任务A递增X(旧值,不是B递增的),任务A写入X + 1。

注意:以上所有子句都不能包含评估为YAQL/Jinja表达式的任务名称。它们必须是静态值。但是,任务转换可以基于表达式有条件。有关更多详细信息,请参阅带有表达式的转换

理解on-successon-erroron-complete在处理操作错误方面的语义非常重要。

如果任务操作返回错误,on-successon-complete不会阻止整个工作流执行失败。只有on-error才会。最接近的类比是常规编程语言中的try-catch-finally块。on-error类似于catch,它充当预期错误的异常处理程序。而on-complete就像finally,无论如何都会运行,但它不会阻止异常冒泡到上一层。因此,on-complete应该只被理解为允许定义一些清理操作的语言结构。

话虽如此,重要的是要知道Mistral处理这些子句的顺序。

taskA:
 action: my_action
 on-success:
   - taskB
   - taskC
 on-complete:
   - taskD
   - taskE

在此示例中,如果任务操作(‘my_action’)成功完成,Mistral将首先处理‘on-success’子句并安排任务‘taskB’和‘taskC’,然后处理‘on-complete’子句并安排‘taskC’和‘taskE’。在大多数情况下,此处理顺序并不重要,但有些情况很重要,尤其是在‘on-success’和‘on-complete’列表都包含引擎命令时,这些命令将在本文档后面解释。

如果‘on-success’和‘on-error’都在任务定义中定义,它们永远不会冲突,因为它们是互斥的,这意味着只有其中一个可以根据任务操作是失败还是成功来处理。

带有表达式的转换

任务转换可以由先前任务的成功/错误/完整性以及可以访问上游任务生成的所有数据和工作流输入的其他保护表达式确定。因此,在上面的示例中,任务‘create_vm’也可以在转换为任务‘send_success_email’时具有一个YAQL表达式,如下所示

create_vm:
 ...
 on-success:
   - send_success_email: <% $.vm_id != null %>

这将告诉Mistral仅当任务‘create_vm’发布的变量‘vm_id’不为空时才运行‘send_success_email’任务。表达式也可以应用于‘on-error’和‘on-complete’。

引擎命令

Mistral在直接工作流中可以调用许多引擎命令。这些命令用于更改工作流状态。

  • succeed - 将结束当前工作流并将其状态设置为SUCCESS。

  • pause - 将结束当前工作流并将其状态设置为PAUSED。

  • fail - 将结束当前工作流并将其状态设置为ERROR。

每个引擎命令都接受一个msg输入。这是可选的,但如果提供,它将存储在工作流执行的状态信息中。

使用succeedfail结束的工作流以后可能无法恢复,但使用pause结束的工作流可以恢复。

YAML示例

---
version: '2.0'

send_error_mail:
  tasks:
    create_server:
      action: nova.servers_create name=<% $.vm_name %>
      publish:
        vm_id: <% task().result.id %>
      on-complete:
        - fail: <% not $.vm_id %>

在此示例中,我们有一个带有单个任务的工作流,该任务在Nova中创建一个服务器。该任务发布虚拟机的ID,但如果该值为空,则将导致工作流失败。

on-complete:
  - taskA
  - fail
  - taskB

当引擎命令与任务名称一起在单个列表中使用时,它们将逐个处理,直到工作流达到终端状态。在上面的示例中,on-complete有三个步骤需要完成 - 这些步骤按顺序执行,直到工作流达到终端状态。因此,在这种情况下,taskA首先被安排,然后fail引擎命令将工作流状态设置为ERROR,并且taskB永远不会被安排。即使使用succeedtaskB也不会被安排。

pause命令暂停工作流。这意味着当其状态通过更新Rest API调用设置为RUNNING时,工作流可以继续。

YAML 示例

on-complete:
  - taskA
  - pause
  - taskB

在这种情况下,当Mistral处理‘on-complete’子句时,它将安排taskA,然后将工作流状态设置为PAUSED,并停止安排新任务。但是,如果稍后通过API手动恢复工作流,Mistral将安排taskB,因为在‘on-complete’列表中,它紧随pause命令之后。

鉴于Mistral处理‘on-success’(或‘on-error’)和‘on-complete’子句的顺序,了解如果这两个子句都包含引擎命令会发生什么非常重要。

taskA:
 action: my_action
 on-error:
   - taskB
   - fail
   - taskC
 on-complete:
   - taskD
   - pause
   - taskE

如上所述,‘on-complete’始终在‘on-success’(或‘on-error’)之后处理,因为它在大多数通用编程语言中的‘finally’中扮演着类似的角色。让我们考虑一个示例,当‘taskA’运行时,即其操作‘my_action’运行时,可能会发生两种情况。

  • 如果‘my_action’失败,Mistral将安排‘taskB’,因为它列在‘on-error’子句中,该子句在‘on-complete’之前处理,然后将工作流状态设置为ERROR。这将阻止安排其他新任务,因此‘taskC’、‘taskD’和‘taskE’将永远不会被安排。换句话说,整个‘on-complete’子句将永远不会被处理,因为‘on-error’中的‘fail’命令将工作流状态设置为ERROR。

  • 如果‘my_action’成功,则将忽略‘on-error’子句,由于未定义‘on-success’,Mistral将处理‘on-complete’子句。在执行此操作时,它将首先安排‘taskD’,然后由于‘pause’命令而暂停工作流。如果稍后通过API手动恢复工作流,将安排‘taskE’。

这说明了,在设计工作流时,了解Mistral如何处理‘on-success’、‘on-error’和‘on-complete’以及引擎命令非常重要。

引擎命令和任务

直接工作流中的on-*子句可以引用任务和引擎命令,如前所示。可以将引擎命令用作任务名称。例如,可以创建一个名为noopfail的任务。这些任务将覆盖引擎命令,也就是说,这些任务中定义的动作将被执行,而不是引擎命令。这是简洁地扩展Mistral引擎的默认行为或提供无副作用的工作流示例的一种方法。

任务名称解析的顺序如下

  1. 搜索具有给定名称的任务

  2. 选择具有给定名称的引擎命令

执行第一个匹配的选项。

分叉

在某些情况下,我们需要能够在某个任务完成后运行多个任务。

create_vm:
  ...
  on-success:
    - register_vm_in_load_balancer
    - register_vm_in_dns

在这种情况下,Mistral将同时运行这两个“register_xxx”任务,这将导致多个独立的流程路线并行处理。

合并

合并流程控制允许同步多个并行工作流分支并聚合它们的数据。

完全合并 (join: all)

register_vm_in_load_balancer:
  ...
  on-success:
    - wait_for_all_registrations

register_vm_in_dns:
  ...
  on-success:
    - wait_for_all_registrations

try_to_do_something_without_registration:
  ...
  on-error:
   - wait_for_all_registrations

wait_for_all_registrations:
  join: all
  action: send_email

当任务具有分配了值“all”的属性“join”时,该任务仅在所有上游任务(导致该任务的任务)完成并且相应条件触发后才会运行。如果任务A在任务B的任何“on-success”、“on-error”和“on-complete”子句中提到任务B,则任务A被认为是任务B的上游任务,无论保护表达式如何。

部分合并 (join: 2)

register_vm_in_load_balancer:
  ...
  on-success:
    - wait_for_two_registrations

register_vm_in_dns:
  ...
  on-success:
    - wait_for_two_registrations

register_vm_in_zabbix:
  ...
  on-success:
    - wait_for_two_registrations

wait_for_two_registrations:
  join: 2
  action: send_email

当任务具有分配了数字值的属性“join”时,该任务将在至少完成这么多上游任务并且相应条件触发时运行。在上面的示例中,任务“wait_for_two_registrations”将在两个“register_vm_xxx”任务中的任何两个完成时运行。

区分器 (join: one)

区分器是“join”属性具有值1的特殊情况的Partial Join。这意味着Mistral将等待任何已完成的任务。在这种情况下,可以使用特殊字符串值“one”代替1,这是为了与“all”保持对称。但是,用户是否使用“1”或“one”由他们决定。

反向工作流

在反向工作流中,工作流任务图中的所有关系都是依赖关系。为了运行这种类型的工作流,我们需要指定需要完成的任务,可以将其约定地称为“目标任务”。当Mistral引擎启动工作流时,它将递归地识别需要首先完成的所有依赖项。

../_images/reverse_workflow.png

图2解释了反向工作流的工作方式。在示例中,任务T1被选为目标任务。因此,当工作流启动时,Mistral将仅运行任务T7T8T5T6T2T1,顺序如下(从没有依赖项的任务开始)。任务T3T4将不会成为此工作流的一部分,因为从T1T3T4的定向图中没有路径。

YAML示例

---
version: '2.0'

create_vm_and_send_email:
  type: reverse

  input:
    - vm_name
    - image_id
    - flavor_id

  output:
    result: <% $.vm_id %>

  tasks:
    create_vm:
      action: nova.servers_create name=<% $.vm_name %> image=<% $.image_id %> flavor=<% $.flavor_id %>
      publish:
        vm_id: <% task(create_vm).result.id %>

    search_for_ip:
      action: nova.floating_ips_findall instance_id=null
      publish:
        vm_ip: <% task(search_for_ip).result[0].ip %>

    associate_ip:
      action: nova.servers_add_floating_ip server=<% $.vm_id %> address=<% $.vm_ip %>
      requires: [search_for_ip]

    send_email:
      action: send_email to='admin@mysite.org' body='Vm is created and id <% $.vm_id %> and ip address <% $.vm_ip %>'
      requires: [create_vm, associate_ip]

反向工作流任务属性

  • requires - 在此任务之前应执行的任务列表。可选

处理集合

YAML示例

---
version: '2.0'

create_vms:
  description: Creating multiple virtual servers using "with-items".

  input:
    - vm_names
    - image_ref
    - flavor_ref

  output:
    vm_ids: <% $.vm_ids %>

  tasks:
    create_servers:
      with-items: vm_name in <% $.vm_names %>
      action: nova.servers_create name=<% $.vm_name %> image=<% $.image_ref %> flavor=<% $.flavor_ref %>
      publish:
        vm_ids: <% task(create_servers).result.id %>
      on-success:
        - wait_for_servers

    wait_for_servers:
      with-items: vm_id in <% $.vm_ids %>
      action: nova.servers_find id=<% $.vm_id %> status='ACTIVE'
      retry:
        delay: 5
        count: <% $.vm_names.len() * 10 %>

此示例中的工作流“create_vms”会根据我们在“vm_names”输入参数中提供的数量创建多个虚拟机。例如,如果指定vm_names=[“vm1”、“vm2”],它将基于相同的镜像和flavor创建具有这些名称的服务器。这是因为使用了“with-items”关键字,该关键字使与任务关联的动作或工作流多次运行。 “with-items”任务属性的值包含以下形式的表达式:‘my_var’ in <% YAQL_expression %>。对于Jinja2表达式:‘my_var’ in {{ Jinja2_expression }}。

最常见的形式是

with-items:
  - var1 in <% YAQL_expression_1 %> # or: var1 in <% Jinja2_expression_1 %>
  - var2 in <% YAQL_expression_2 %> # or: var2 in <% Jinja2_expression_2 %>
  ...
  - varN in <% YAQL_expression_N %> # or: varN in <% Jinja2_expression_N %>

其中以YAQL_expression_1、YAQL_expression_2、YAQL_expression_N表示的集合必须大小相等。当任务启动时,Mistral将并行迭代所有集合,即迭代次数等于任何集合的长度。

请注意,如果使用“with-items”,可访问的工作流上下文中的任务结果<% task(task_name).result %>将是一个列表,其中包含相应动作/工作流调用的结果。如果至少一个动作/工作流调用失败,则整个任务将进入ERROR状态。也可以为具有“with-items”属性的任务应用重试策略。在这种情况下,重试策略将根据“with-items”配置重新启动所有动作/工作流调用。其他策略也可以以与常规非“with-items”任务相同的方式使用。

行动

动作定义了任务启动时需要执行的内容。动作类似于常规编程语言(如Python)中的常规函数。它具有名称和参数。Mistral区分‘系统动作’和‘Ad-hoc动作’。

系统动作

Mistral 提供的系统操作开箱即用,任何人都可以使用。 也可以通过特殊的插件机制为特定的 Mistral 安装添加系统操作。 目前,内置的系统操作是

std.fail

此操作始终失败。 可用于手动使工作流任务失败。

wf:
  tasks:
    manual_fail:
      action: std.fail

可以将 error_data 参数传递给该操作。 此数据将用作操作返回值。

wf:
  tasks:
    manual_fail:
      action: std.fail
      input: error_data={x:1,y:2}

std.http

发送 HTTP 请求。

输入参数

  • url - HTTP 请求的 URL。必需

  • method - HTTP 请求的方法。可选。 默认值为 ‘GET’。

  • params - 要在 HTTP 请求的查询字符串中发送的字典或字节。可选

  • body - 要在 HTTP 请求主体中发送的字典、字节或类似文件的对象。可选

  • headers - 要与 HTTP 请求一起发送的 HTTP 标头字典。可选

  • cookies - 要与 HTTP 请求一起发送的 HTTP Cookie 字典。可选

  • auth - 用于启用基本/摘要/自定义 HTTP 身份验证的身份验证。可选

  • timeout - 浮点数,描述请求的超时时间(秒)。可选

  • allow_redirects - 布尔值。 如果允许 POST/PUT/DELETE 重定向,则设置为 True。可选

  • proxies - 将协议映射到代理 URL 的字典。可选

  • verify - 它可以是布尔值,在这种情况下,它控制我们是否验证服务器的 TLS 证书,也可以是字符串,在这种情况下,它必须是用于 CA 捆绑包的路径。可选。 默认值为 ‘True’。

示例

http_task:
  action: std.http url='google.com'

std.mistral_http

此操作的工作方式与 ‘std.http’ 相同,唯一的例外是:发送请求时,它会插入以下 HTTP 标头

  • Mistral-Workflow-Name - 当前操作执行关联的工作流名称。

  • Mistral-Execution-Id - 此操作关联的工作流执行标识符。

  • Mistral-Task-Id - 此操作执行关联的任务执行标识符。

  • Mistral-Action-Execution-Id - 当前操作执行的标识符。

使用此操作可以以异步方式完成任何工作,该工作通过 HTTP 协议触发。 这意味着 Mistral 可以使用 ‘std.mistral_http’ 发送请求,然后任何时候接收到此请求的系统都可以通过其公共 API 将此操作的结果通知给 Mistral。 Mistral-Action-Execution-Id 标头对于此操作是必需的,因为它用于在 Mistral 中查找相应的操作执行以附加结果。

std.email

通过 SMTP 协议发送电子邮件消息。

  • to_addrs - 以逗号分隔的收件人列表。必需

  • cc_addrs - 以逗号分隔的抄送收件人列表。可选

  • bcc_addrs - 以逗号分隔的密送收件人列表。可选

  • reply_to - 以逗号分隔的电子邮件地址列表。可选

  • subject - 消息的主题。可选

  • body - 包含消息正文的文本。可选

  • html_body - 包含 HTML 格式的消息的文本。可选

  • from_addr - 发件人电子邮件地址。必需

  • smtp_server - SMTP 服务器主机名。必需

  • smtp_password - SMTP 服务器密码。可选

示例

send_email_task:
  action: std.email
  input:
      to_addrs: [admin@mywebsite.org]
      subject: Hello from Mistral :)
      body: |
        Cheers! (:_:)
        -- Thanks, Mistral Team.
      from_addr: mistral@openstack.org
      smtp_server: smtp.google.com
      smtp_password: SECRET

‘std.emal’ 操作的语法非常冗长。 但是,可以使用 Ad-hoc 操作显着简化它。 有关更多信息,请参阅下方

std.ssh

运行安全 Shell 命令。

输入参数

  • cmd - 包含需要执行的 shell 命令的字符串。必需

  • host - 需要在其中执行命令的主机名。必需

  • username - 用于在主机上进行身份验证的用户名。必需

  • password - 用于在主机上进行身份验证的用户密码。可选

  • private_key_filename - 将用于远程主机身份验证的私钥文件名。 所有私钥都应位于执行程序主机上的 <home-user-directory>/.ssh 目录中,或者应提供密钥的绝对路径。 该文件需要可供运行执行程序的帐户访问。可选

注意:支持使用密钥对进行身份验证,密钥应位于 Mistral Executor 服务器机器上。

std.echo

简单的操作,主要用于测试目的,返回预定义的結果。

输入参数

  • output - 需要作为操作结果返回的任何类型的值。必需

  • delay - 浮点值,定义应延迟多少秒返回结果。可选

std.javascript

评估给定的 JavaScript 代码。

注意:std.js 是 std.javascript 的别名,即 std.js 可以代替 std.javascript 使用。

输入参数

  • script - 需要执行的 JavaScript 代码片段的文本。必需

  • context - 此对象将被分配给 javascript 变量 $。 默认值为 None。

要使用 std.javascript,需要安装 py_mini_racer 并将 py_mini_racer 设置为 mistral.conf 中的 js_implementation 参数。

pip install py_mini_racer

其他可用实现

带有 context 的示例

---
version: '2.0'

generate_uuid:
  description: Generates a Universal Unique ID

  input:
    - radix: 16

  output:
    uuid: <% $.generated_uuid %>

  tasks:
    generate_uuid_task:
      action: std.js
      input:
        context: <% $ %>
        script: |
          return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
                  var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r&0x3|0x8);
                  return v.toString($.radix);
          });
      publish:
        generated_uuid: <% task().result %>

获取当前日期和时间的另一个示例

---
version: '2.0'

get_date_workflow:
  description: Get the current date

  output:
    current_date: <% $.current_date %>

  tasks:
    get_date_task:
      action: std.js
      input:
        script: |
          var date = new Date();
          return date; // returns "2015-07-12T10:32:12.460000" or use date.toLocaleDateString() for "Sunday, July 12, 2015"
      publish:
        current_date: <% task().result %>

Ad-hoc 操作

Ad-hoc 操作是由用户创建的特殊类型的操作。 Ad-hoc 操作始终作为对任何其他现有系统操作的包装器创建,其主要目标是简化使用具有相似模式的相同操作多次操作。

YAML 示例

---
version: '2.0'

error_email:
  input:
    - execution_id
  base: std.email
  base-input:
    to_addrs: ['admin@mywebsite.org']
    subject: 'Something went wrong with your Mistral workflow :('
    body: |
        Please take a look at Mistral Dashboard to find out what's wrong
        with your workflow execution <% $.execution_id %>.
        Everything's going to be alright!
        -- Sincerely, Mistral Team.
    from_addr: 'mistral@openstack.org'
    smtp_server: 'smtp.google.com'
    smtp_password: 'SECRET'

将此操作上传到 Mistral 后,任何工作流都可以按如下方式使用它

my_workflow:
  tasks:
    ...
    send_error_email:
      action: error_email execution_id=<% execution().id %>

属性

  • base - 此操作构建在其之上的基本操作的名称。必需

  • base-input - 提供给基本操作的实际输入参数。 请参见上面的示例。可选

  • input - 声明的操作参数列表,应指定为相应的任务输入。 此属性是可选的,仅用于记录目的。 Mistral 现在不会强制实际输入参数与此列表完全对应。 基于提供的实际参数使用表达式计算基本参数,因此表达式中使用的参数隐式定义实际输入参数。 实际输入参数(表达式上下文)的字典在 YAQL 中引用为 ‘$.’,在 Jinja 中引用为 ‘_.’。 多余的参数将被简单地忽略。

  • output - 任何定义如何基于基本操作的输出来计算此操作输出的数据结构。 它可以选择性地具有表达式,以通过表达式上下文访问基本操作输出的属性。

工作簿

如前所述,工作簿仍然存在于 Mistral 工作流语言版本 2 中,但仅出于便利目的。 使用工作簿,用户可以将任何类型的多个实体(工作流、操作和触发器)组合到一个文档中并上传到 Mistral 服务。 上传工作簿时,Mistral 将解析它并将工作簿的工作流、操作和触发器保存为独立对象,这些对象可以通过各自的 API 端点(/workflows、/actions 和 /triggers/)访问。 完成后,工作簿就退出游戏。 用户可以启动工作流并使用对工作流/操作/触发器的引用,就好像它们没有使用工作簿上传一样。 但是,如果我们要修改这些单个对象,我们可以修改相同的工作簿定义并将其重新上传到 Mistral(或者,当然,我们可以独立地进行修改)。

命名空间

值得注意的是,当使用工作簿时,Mistral 会使用工作簿的名称作为前缀来生成工作流、操作和包含在工作簿中的触发器的最终名称。 为了说明此原理,请查看下图。

../_images/workbook_namespacing.png

因此,在上传工作簿后,其工作流和操作将成为独立的具有略微不同名称的对象。

YAML 示例

---
version: '2.0'

name: my_workbook

description: My set of workflows and ad-hoc actions

workflows:
  local_workflow1:
    type: direct

    tasks:
      task1:
        action: local_action str1='Hi' str2=' Mistral!'
        on-complete:
          - task2

      task2:
        action: global_action
        ...

  local_workflow2:
    type: reverse

    tasks:
      task1:
        workflow: local_workflow1

      task2:
        workflow: global_workflow param1='val1' param2='val2'
        requires: [task1]
        ...
actions:
  local_action:
    input:
      - str1
      - str2
    base: std.echo output="<% $.str1 %><% $.str2 %>"

注意:即使工作簿内部的对象在上传时名称发生变化,Mistral 仍允许使用原始工作簿中声明的本地名称在这些对象之间进行引用。

属性

  • name - 工作簿名称。必需

  • description - 工作簿描述。可选

  • tags - 带有任意逗号分隔值的字符串。可选

  • workflows - 包含工作流定义的字典。可选

  • actions - 包含 Ad-hoc 操作定义的字典。可选

执行数据上下文中预定义的值/函数

使用表达式,可以在 Mistral 工作流语言中使用一些预定义的值。

  • OpenStack 上下文

  • 任务结果

  • 执行信息

  • 环境

OpenStack 上下文

OpenStack 上下文可通过 $.openstack 访问。 它包含 auth_tokenproject_iduser_idservice_cataloguser_nameproject_namerolesis_admin 属性。

表达式中的内置函数

除了当前上下文(即 YAQL 中的 $ 和 Jinja2 中的 _)之外,表达式还可以访问一组预定义的函数。

表达式语言带有自己包含的函数和操作。 Mistral 添加了所有受支持的语言中可用的以下函数。

本节将描述 Mistral 添加的内置函数。

任务函数

签名

tasks(workflow_execution_id=null, recursive=false, state=null, flat=false)

描述

此函数允许用户按工作流执行 ID 和/或状态过滤所有任务。 此外,还可以递归获取任务执行并展平任务执行列表。

参数

  1. workflow_execution_id - 如果提供,则任务函数将返回特定工作流执行的任务执行(当前执行或不同的执行)。 否则,它将返回与其它参数匹配的所有任务执行。可选。

  2. recursive - 此参数是一个布尔值,如果为 true,则将返回嵌套工作流执行中的所有任务执行。 这通常与特定的 workflow_execution_id 结合使用,您仍然希望查看嵌套工作流的任务执行。可选。 默认值为 False。

  3. state - 如果提供,则任务执行将按其当前状态过滤。 如果未提供状态,则将返回与其它参数匹配的所有任务执行。可选。

  4. flat - 如果为 true,则仅列出满足以下至少一个条件的任务执行

    • 动作类型的任务执行

    • 状态与触发它们的 workflow execution 不同的 workflow 类型的任务执行。 例如,如果与特定的 workflow_execution_id 和 ERROR 状态一起使用,它将返回尽管 workflow 成功但出错的任务,这意味着任务本身存在错误,例如 publish 中的无效表达式。

    可选。 默认值为 False。

示例

工作流定义

---
version: "v2.0"
wf:
  tasks:
    task:
      action: std.noop
      publish:
        all_tasks_in_this_wf_yaql: <% tasks(execution().id) %>
        all_tasks_in_this_wf_jinja: "{{ tasks(execution().id) }}"

        all_tasks_in_error_yaql: <% tasks(null, false, ERROR) %>
        all_tasks_in_error_jinja: "{{ tasks(None, false, 'ERROR') }}"
        all_tasks_in_error_yaql_with_kw: <% tasks(state => ERROR) %>
        all_tasks_in_error_jinja_with_kw: "{{ tasks(state='ERROR') }}"

        all_tasks_yaql_option1: <% tasks() %>
        all_tasks_yaql_option2: <% tasks(null, false, null, false) %>
        all_tasks_jinja_option1: "{{ tasks() }}"
        all_tasks_jinja_option2: "{{ tasks(None, false, None, false) }}"

任务发布结果(部分,以保持文档简短)

{
    "all_tasks_in_error_yaql": [
        {
            "id": "3d363d4b-8c19-48fa-a9a0-8721dc5469f2",
            "name": "fail_task",
            "type": "ACTION",
            "workflow_execution_id": "c0a4d2ff-0127-4826-8370-0570ef8cad80",
            "state": "ERROR",
            "state_info": "Failed to run action [action_ex_id=bcb04b28-6d50-458e-9b7e-a45a5ff1ca01, action_cls='<class 'mistral.actions.std_actions.FailAction'>', attributes='{}', params='{}']\n Fail action expected exception.",
            "result": "Failed to run action [action_ex_id=bcb04b28-6d50-458e-9b7e-a45a5ff1ca01, action_cls='<class 'mistral.actions.std_actions.FailAction'>', attributes='{}', params='{}']\n Fail action expected exception.",
            "published": {},
            "spec": {
                "action": "std.fail",
                "version": "2.0",
                "type": "direct",
                "name": "fail_task"
            }
        }
    ],
    "all_tasks_in_this_wf_jinja": [
        {
            "id": "83a34bfe-268c-46f5-9e5c-c16900540084",
            "name": "task",
            "type": "ACTION",
            "workflow_execution_id": "899a3318-b5c0-4860-82b4-a5bd147a4643",
            "state": "SUCCESS",
            "state_info": null,
            "result": null,
            "published": {},
            "spec": {
                "action": "std.noop",
                "version": "2.0",
                "type": "direct",
                "name": "task",
                "publish": {
                    "all_tasks_in_error_yaql": "<% tasks(null, false, ERROR) %>",
                    "all_tasks_in_error_jinja": "{{ tasks(None, false, 'ERROR') }}",
                    "all_tasks_yaql_option2": "<% tasks(null, false, false, false) %>",
                    "all_tasks_yaql_option1": "<% tasks() %>",
                    "all_tasks_jinja_option1": "{{ tasks() }}",
                    "all_tasks_in_error_jinja_with_kw": "{{ tasks(state='ERROR') }}",
                    "all_tasks_jinja_option2": "{{ tasks(None, false, None, false) }}",
                    "all_tasks_in_this_wf_jinja": "{{ tasks(execution().id) }}",
                    "all_tasks_in_this_wf_yaql": "<% tasks(execution().id) %>"
                }
            }
        }
    ],
    "_comment": "other fields were dropped to keep docs short"
}

任务结果

任务结果可通过 task(<task_name>).result 访问。 它包含任务结果,并直接取决于操作输出结构。 请注意,task(<task_name>) 函数本身返回的不仅仅是任务结果。 它返回任务执行的以下字段

  • id - 任务执行 UUID。

  • name - 任务执行名称。

  • spec - 任务执行 spec 字典(从 Mistral 工作流语言加载)。

  • state - 任务执行状态。

  • state_info - 任务执行状态信息。

  • result - 任务执行结果。 对于非 ‘with-items’ 任务,它只是任务操作/子工作流执行的结果。 对于 ‘with-items’ 任务,它将是相应操作/子工作流执行结果的列表。

  • published - 任务执行发布的变量。

执行信息

执行信息可通过 execution() 访问。 它包含有关执行本身的信息,例如 idwf_specinputstart_paramsroot_execution_id

执行函数

签名

executions(id=null, root_execution_id=null, state=null, from_time=null, to_time=null)

描述

此函数允许用户按执行 ID、root_execution_id、状态和/或创建时间过滤所有执行。

参数

  1. id - 如果提供,将返回具有该 ID 的执行列表。 否则,它将返回与其它参数匹配的所有执行。可选。

  2. root_execution_id - 与上面的 id 类似,如果提供,将返回具有该 root_execution_id 的执行列表。 否则,它将返回与其它参数匹配的所有执行。可选。 默认值为 False。

  3. state - 如果提供,则执行将按其当前状态过滤。 如果未提供状态,则将返回与其它参数匹配的所有执行。可选。

  4. from_time - 如果提供,则执行将按其 created_at 时间过滤,该时间大于或等于 from_time 参数。 如果未提供 from_time,则将返回与其它参数匹配的所有执行。 from_time 参数可以采用 YYYY-MM-DD hh:mm:ss 格式提供。可选。

  5. to_time - 如果提供,则执行将按其 created_at 时间过滤,该时间小于 to_time 参数(小于但不小于 from_time 参数)。 如果未提供 to_time,则将返回与其它参数匹配的所有执行。 to_time 参数可以采用 YYYY-MM-DD hh:mm:ss 格式提供。可选。

示例

工作流定义

---
version: "v2.0"
wf:
  tasks:
    task:
      action: std.noop
      publish:
        all_executions_yaql: <% executions() %>
        all_child_executions_of_this_execution: "{{ executions(root_execution_id=execution().id) }}"

        all_executions_in_error_yaql: <% executions(null, null, ERROR) %>
        all_executions_in_error_jinja: "{{ executions(None, None, 'ERROR') }}"
        all_executions_in_error_yaql_with_kw: <% executions(state => ERROR) %>
        all_executions_in_error_jinja_with_kw: "{{ executions(state='ERROR') }}"

        all_executions_filtered_date_jinja: "{{ executions(to_time="2016-12-01 15:01:00") }}"

环境

环境信息可通过 env() 访问。 它在用户提交工作流执行时传递。 它包含用户指定的变量。

全局

全局变量可通过 global(variable_name) 访问。 如果变量不存在,则返回 None。