参数和结果

在 TaskFlow 中,所有流程和任务状态都会进入(可能持久化的)存储(有关更多详细信息,请参阅 持久化)。这包括工作流中 原子(例如,任务、重试对象…)在执行时需要的所有信息,以及任务/重试产生的所有信息(通过可序列化的结果)。实现任务/重试或流程的开发人员可以指定任务/重试接受的参数以及它返回的结果,方法有多种。本文档将帮助您了解这些方法以及如何使用这些方法来实现所需的用法模式。

任务/重试参数

任务/重试参数的名称集合,作为任务/重试实例的 requires 和/或 optional 属性可用。当任务或重试对象即将执行时,具有这些名称的值将从存储中检索并传递给任务/重试的 execute 方法。如果 requires 属性中的任何名称在存储中找不到,将抛出异常。如果在 optional 属性中找不到任何名称,则会被忽略。

任务/重试结果

任务/重试结果的名称集合(任务/重试提供的内容),作为任务或重试实例的 provides 属性可用。在任务/重试成功完成之后,其结果(execute 方法返回的内容)将通过这些名称从存储中获得(请参见下面的示例)。

参数规范

指定任务参数 requires 集合的方法有很多种。

参数推断

可以从任务的 execute() 方法(或重试对象的 execute())的参数推断任务/重试参数。

>>> class MyTask(task.Task):
...     def execute(self, spam, eggs, bacon=None):
...         return spam + eggs
...
>>> sorted(MyTask().requires)
['eggs', 'spam']
>>> sorted(MyTask().optional)
['bacon']

从方法签名进行推断是指定参数的“最简单”方法。特殊参数,如 self*args**kwargs,在推断过程中会被忽略(因为这些名称在 python 中具有特殊含义/用法)。

>>> class UniTask(task.Task):
...     def execute(self, *args, **kwargs):
...         pass
...
>>> sorted(UniTask().requires)
[]

重新绑定

原因: 在某些情况下,您想要传递给任务/重试的值存储的名称与相应的参数名称不同。这时,rebind 构造函数参数就派上用场了。使用它,流程作者可以指示引擎通过一个名称从存储中获取值,但以另一个名称将其传递给任务/重试的 execute 方法。实现此目的有两种方法。

第一种方法是传递一个将参数名称映射到已保存值的名称的字典。

例如,如果您有一个任务

class SpawnVMTask(task.Task):

    def execute(self, vm_name, vm_image_id, **kwargs):
        pass  # TODO(imelnikov): use parameters to spawn vm

并且您使用 'name' 键在存储中保存了 'vm_name',您可以像这样使用这样的 'name' 启动虚拟机

SpawnVMTask(rebind={'vm_name': 'name'})

第二种方法是传递一个元组/列表/字典的参数名称。元组/列表/字典的长度不应小于必需参数的数量。

例如,您可以像前面的示例一样实现相同的效果,使用

SpawnVMTask(rebind_args=('name', 'vm_image_id'))

这等效于一个更详细的

SpawnVMTask(rebind=dict(vm_name='name',
                        vm_image_id='vm_image_id'))

在两种情况下,如果您的任务(或重试)使用 **kwargs 构造函数接受任意参数,您可以指定额外的参数。

SpawnVMTask(rebind=('name', 'vm_image_id', 'admin_key_name'))

当此类任务即将执行时,将从存储中获取 namevm_image_idadmin_key_name 值,并将 name 中的值作为 vm_name 传递给 execute() 方法,将 vm_image_id 中的值作为 vm_image_id 传递,并将 admin_key_name 中的值作为 admin_key_name 参数传递到 kwargs 中。

手动指定需求

原因: 通常,手动指定任务的需求很有用,无论是通过任务作者还是通过流程作者(允许流程作者覆盖任务需求)。

为此,在创建任务时使用构造函数来指定手动需求。这些手动需求(如果它们不是函数参数)将出现在 execute() 方法的 kwargs 中。

>>> class Cat(task.Task):
...     def __init__(self, **kwargs):
...         if 'requires' not in kwargs:
...             kwargs['requires'] = ("food", "milk")
...         super(Cat, self).__init__(**kwargs)
...     def execute(self, food, **kwargs):
...         pass
...
>>> cat = Cat()
>>> sorted(cat.requires)
['food', 'milk']

在构造任务实例时,流程作者还可以根据需要添加更多需求。这些手动需求(如果它们不是函数参数)将出现在 execute() 方法的 kwargs 参数中。

>>> class Dog(task.Task):
...     def execute(self, food, **kwargs):
...         pass
>>> dog = Dog(requires=("water", "grass"))
>>> sorted(dog.requires)
['food', 'grass', 'water']

如果流程作者希望关闭参数推断并手动覆盖需求,可以使用此功能,但请谨慎使用,因为您必须小心避免无效的参数映射。

>>> class Bird(task.Task):
...     def execute(self, food, **kwargs):
...         pass
>>> bird = Bird(requires=("food", "water", "grass"), auto_extract=False)
>>> sorted(bird.requires)
['food', 'grass', 'water']

结果规范

在 python 中,函数结果没有命名,因此我们无法推断任务/重试返回的内容。这很重要,因为完整的结果(任务 execute() 或重试 execute() 方法返回的内容)保存在(可能持久化的)存储中,通常(但不总是)希望使这些结果可供其他人使用。为此,任务/重试通过其 provides 构造函数参数或其默认 provides 属性指定这些值的名称。

示例

返回单个值

如果任务只返回一个值,provides 应该是一个字符串 - 该值的名称。

>>> class TheAnswerReturningTask(task.Task):
...    def execute(self):
...        return 42
...
>>> sorted(TheAnswerReturningTask(provides='the_answer').provides)
['the_answer']

返回一个元组

对于返回多个值的任务,一种选择(如 python 中通常那样)是通过 tuple 返回这些值。

class BitsAndPiecesTask(task.Task):
    def execute(self):
        return 'BITs', 'PIECEs'

然后,您可以将值传递给 provides 参数作为元组或列表,以赋予每个值单独的名称

BitsAndPiecesTask(provides=('bits', 'pieces'))

执行此类任务后,您(以及引擎,这对于其他任务很有用)将能够通过名称从存储中获取这些元素

>>> storage.fetch('bits')
'BITs'
>>> storage.fetch('pieces')
'PIECEs'

Provides 参数可以短于任务实际返回的元组 - 然后额外的数值将被忽略(但是,正如预期的那样,所有这些值都将被保存并传递给任务 revert() 或重试 revert() 方法)。

注意

Provides 参数元组也可以长于任务实际返回的元组 - 当发生这种情况时,额外的参数将未定义:会在日志中打印警告,如果尝试使用此类参数,则会引发 NotFound 异常。

返回一个字典

另一种选择是以字典(也称为 dict)的形式返回多个值。

class BitsAndPiecesTask(task.Task):

    def execute(self):
        return {
            'bits': 'BITs',
            'pieces': 'PIECEs'
        }

TaskFlow 期望如果 provides 参数是一个 set,则返回一个字典

BitsAndPiecesTask(provides=set(['bits', 'pieces']))

执行此类任务后,您(以及引擎,这对于其他任务很有用)将能够通过名称从存储中获取元素

>>> storage.fetch('bits')
'BITs'
>>> storage.fetch('pieces')
'PIECEs'

注意

如果任务返回的字典中的某些项目不存在于 provides 参数中 - 那么额外的数值将被忽略(但是,当然,保存并传递给 revert() 方法)。如果 provides 参数包含任务实际返回的字典中不存在的一些项目 - 那么额外的参数将未定义:会在日志中打印警告,如果尝试使用此类参数,则会引发 NotFound 异常。

默认 provides

如上所述,默认基类不提供任何内容,这意味着结果对流程中的其他任务/重试不可用。

作者可以使用 default_provides 类/实例变量覆盖此设置并指定 provides 的默认值

class BitsAndPiecesTask(task.Task):
    default_provides = ('bits', 'pieces')
    def execute(self):
        return 'BITs', 'PIECEs'

当然,流程作者可以覆盖此设置以更改名称(如果需要)

BitsAndPiecesTask(provides=('b', 'p'))

或更改结构 - 例如,此实例将使其他任务可以通过名称 'bnp' 访问元组

BitsAndPiecesTask(provides='bnp')

或者流程作者可能希望恢复默认行为并隐藏任务的结果,以免其他任务在流程中发生命名冲突

BitsAndPiecesTask(provides=())

重试参数

要撤销任务,引擎 会调用任务的 revert() 方法。此方法应接受与任务的 execute() 方法相同的参数,以及一个名为 result 的特殊关键字参数。

对于 result 值,有两种情况

  • 如果由于任务失败(从其 execute() 方法引发了异常)而撤销任务,则 result 值是 Failure 对象的实例,该对象保存异常信息。

  • 如果由于其他任务失败而撤销任务,并且此任务已成功完成,则 result 值是从存储中获取的结果:即 execute() 方法返回的内容。

所有其他参数以与 execute() 方法相同的方式从存储中获取。

要确定任务是否失败,您可以检查 result 是否是 Failure 的实例

from taskflow.types import failure

class RevertingTask(task.Task):

    def execute(self, spam, eggs):
        return do_something(spam, eggs)

    def revert(self, result, spam, eggs):
        if isinstance(result, failure.Failure):
            print("This task failed, exception: %s"
                  % result.exception_str)
        else:
            print("do_something returned %r" % result)

如果此任务失败(即 do_something 引发了异常),它将在撤销时打印 "This task failed, exception:" 和异常消息。如果此任务已成功完成,它将打印 "do_something returned"do_something 结果的表示形式。

重试参数

一个 Retry 控制器处理参数的方式与一个 Task 相同。但它有一个额外的参数 'history',它本身是一个 History 对象,其中包含所有引擎尝试失败的内容(即结果)。历史对象可以看作一个元组,包含先前重试运行的结果以及一个表/字典,其中每个键是失败的原子名称,每个值是一个 Failure 对象。

考虑以下实现

class MyRetry(retry.Retry):

    default_provides = 'value'

    def on_failure(self, history, *args, **kwargs):
        print(list(history))
        return RETRY

    def execute(self, history, *args, **kwargs):
        print(list(history))
        return 5

    def revert(self, history, *args, **kwargs):
        print(list(history))

假设上述重试返回了一个值 '5',然后某个任务 'A' 发生了某个异常。在这种情况下,on_failure 方法将收到以下历史记录(打印为列表)

[('5', {'A': failure.Failure()})]

此时(由于实现返回了 RETRY),execute() 方法将被再次调用,它将收到相同的历史记录,然后可以返回一个值,供后续任务用来改变其行为。

如果相反,execute() 方法本身引发了异常,则将调用实现的 revert() 方法,并且一个 Failure 对象将出现在历史对象中,而不是通常的结果。

注意

Retry 被撤销后,对象历史将被清除。