原子、任务和重试¶
原子¶
一个 atom 是 TaskFlow 中的最小单元,作为其他类的基础(它的命名灵感来自这种类型与 原子 在物理世界中的相似之处)。原子有一个名称和版本。原子期望命名所需的输入值(需求)和命名输出(提供的值)。
注意
有关原子输入和输出的更多详细信息,请访问 参数和结果。
- class taskflow.atom.Atom(name=None, provides=None, requires=None, auto_extract=True, rebind=None, inject=None, ignore_list=None, revert_rebind=None, revert_requires=None)[source]¶
基类:
object一个导致流程进展(以某种方式)的工作单元。
原子是一个命名的对象,它使用输入数据执行一些操作,从而推进流程的整体进展。它通常也会产生一些自己的命名输出作为此过程的结果。
- 参数:
name – 此原子的有意义的名称,应该是可区分且易于理解的,用于通知、调试、存储和任何其他类似目的。
provides – 此原子将提供(或可能提供)给其他原子的一组、字符串或项目列表,用于关联和关联此原子产生的任何内容(如果它确实产生任何内容)。
inject – 一个不可变的 input_name => value 字典,它指定应自动注入到原子作用域中的任何初始输入,在原子执行开始之前(这允许提供原子局部值,这些值不需要由其他原子/依赖项提供)。
rebind – 一个键/值对字典,用于定义输入到此原子
execute方法的参数名称转换。revert_rebind – 与
rebind相同,但适用于revert方法。如果未传递,则将使用rebind代替。requires – 此原子
execute方法所需的输入集合或列表。revert_requires – 此原子
revert方法所需的输入集合或列表。如果未传递,则将使用requires。
- 变量:
version – 一个不可变的版本,它将版本信息与此原子关联。在恢复旧版本的原子时,它可能很有用。应应用标准的主版本、次版本概念。
save_as – 一个不可变的输出
resource名称OrderedDict,此原子产生,其他原子可能依赖于此原子提供。格式是输出索引(或当从 execute 方法返回字典时为键)到存储的参数名称。rebind – 一个不可变的输入
resourceOrderedDict,可用于更改提供给此原子的输入。它通常用于将先前原子的输出映射到此原子期望的名称(以这种方式,这就像将另一个原子的命名空间重新映射到此原子的命名空间)。revert_rebind – 与
rebind相同,但适用于 revert 方法。如果revert方法的签名与execute不同,或者收到的revert_rebind值不同,则此值应与rebind不同。inject – 请参阅参数
inject。Atom.name – 请参阅参数
name。optional – 一个
OrderedSet,表示此原子执行execute所需的输入是可选的。revert_optional –
optional的revert版本。provides – 一个
OrderedSet,表示此原子产生的输出。
- priority = 0¶
一个数字优先级,此类的实例将在运行时具有该优先级,用于在有多个并行候选执行和/或撤销时。在这种情况下,候选列表将根据此优先级属性进行稳定排序,这将导致优先级较高的原子先执行(或撤销)(较高定义为大于优先级较低原子的数字)。默认情况下,所有原子都具有相同的优先级(零)。
例如,当以下内容组合到图形中时(其中图中每个节点都是某个任务)
a -> b b -> c b -> e b -> f
当
b完成时,然后将有三个候选可以运行(c, e, f),它们可以以任何顺序运行。此优先级所做的是在提交它们进行处理之前,根据它们的优先级对这三个进行排序(因此,与其说是随机运行顺序,现在它们将按排序顺序运行)。这同样适用于撤销(即,排序顺序将用于确定提交顺序)。
- requires¶
一个
OrderedSet,表示此原子执行其功能所需的输入。
- pre_execute()[source]¶
在执行原子之前要运行的代码。
初始化系统状态的常见模式是在所有原子继承的基类中定义一些代码。在该类中,您可以定义一个
pre_execute方法,它始终会在原子运行之前调用。
- abstract execute(*args, **kwargs)[source]¶
激活给定的原子,它将执行一些操作并返回。
此方法可用于使用通过
*args和**kwargs传递的输入需求集来执行操作。此操作可能会产生一些命名的输出/结果作为执行的结果,以便以后撤销(或供其他原子依赖)。注意(harlowja):返回的结果(如果有)应是持久的,以便可以将其传回此原子(尤其是在撤销发生在不同的 python 进程或远程机器上时),并且可以将该结果传输到其他原子(这些原子可能是本地的或远程的)。
- 参数:
args – 原子执行所需的定位参数。
kwargs – 原子执行所需的任何关键字参数。
- post_execute()[source]¶
在执行原子后要运行的代码。
清理系统全局状态的常见模式是在所有原子继承的基类中定义一些代码。在该类中,您可以定义一个
post_execute方法,它始终会在原子执行后调用,无论它们是否成功。如果需要清理全局共享数据库会话,则此模式很有用。
- pre_revert()[source]¶
在撤销原子之前要运行的代码。
这与
pre_execute()相同,但适用于撤销阶段。
- revert(*args, **kwargs)[source]¶
撤销此原子。
此方法应使用
execute()方法的结果和触发流程中原子包含的撤销的失败信息来撤消原子先前执行的任何副作用。- 参数:
args – 原子执行所需的定位参数。
kwargs – 原子执行所需的任何关键字参数;特殊键
'result'将包含execute()结果(如果有),并且**kwargs键'flow_failures'将包含任何失败信息。
- post_revert()[source]¶
在撤销原子后要运行的代码。
这与
post_execute()相同,但适用于撤销阶段。
任务¶
一个 task(从原子派生)是一个工作单元,可以与其执行和回滚序列相关联(它们几乎类似于函数)。您的任务对象应全部从 Task 派生,该类定义了任务在属性和方法方面必须提供的内容。
例如
当前提供的任务子类类型包括:
Task:适用于继承和创建自己的子类。FunctorTask:适用于将现有函数包装到任务对象中。
注意
FunctorTask 任务类型当前不能与 基于工作者的引擎 一起使用,因为无法保证在工作节点上正确找到任意函数(尤其是 lambda 或匿名函数)。
重试¶
一个 retry(从原子派生)是一个特殊的单位工作,用于处理错误、控制流程执行,并且可以(例如)使用其他参数重试其他原子(如果需要)。当关联的原子失败时,这些重试单元将被咨询以确定应采取的解决方案策略。目标是,通过这种咨询,重试原子将建议一种解决失败的策略(也许通过重试、撤销单个原子或撤销与重试关联的 作用域 中的所有内容)。
当前 retry 基类的派生类必须提供一个 on_failure() 方法来确定如何处理失败。可以从 on_failure() 方法返回的当前枚举如下:
- class taskflow.retry.Decision(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
基础:
StrEnum决策结果/策略枚举。
- REVERT = 'REVERT'¶
仅撤销周围/关联的子流程。
此策略首先咨询父原子,然后再撤销关联的子流程,以确定父重试对象是否提供了不同的调解策略。这允许安全地嵌套具有不同重试策略的流程。
如果父流程没有重试策略,默认行为是仅撤销关联子流程中的原子。通常这不是期望的行为,但为了保持向后兼容性而保留为默认值。引擎选项
defer_reverts将允许您更改此行为。如果将其设置为 True,REVERT 将始终推迟到父流程,这意味着如果父流程没有重试策略,它也将被撤销。
- REVERT_ALL = 'REVERT_ALL'¶
撤销整个流程,无论父策略如何。
此策略将撤销迄今为止已执行的每个原子,无论父流程是否与其关联单独的重试策略。
- RETRY = 'RETRY'¶
再次重试周围/关联的子流程。
为了帮助进行调解过程,retry 基类还强制执行 execute() 和 revert() 方法(尽管允许子类将这些方法定义为无操作),这些方法可用于重试原子与运行时执行模型交互(例如,跟踪它被调用的次数,这对于 ForEach 重试子类很有用)。
为了避免重新创建常见的重试模式,提供了以下提供的重试子类
AlwaysRevert:始终撤销子流程。AlwaysRevertAll:始终撤销整个流程。Times:给定次数重试子流程。ForEach:允许为每次发生故障时向子流程原子提供不同的值(使其有可能通过更改子流程原子输入来解决故障)。ParameterizedForEach:与ForEach相同,但从存储中提取值,而不是从ForEach构造函数。
注意
它们类似于异常处理程序,但由于它们能够动态地选择调解策略,因此使其更具能力,这允许这些原子影响后续执行和任何关联原子所需的输入。
影响范围¶
每个重试原子都与一个流程相关联,并且它可以影响该流程中包含的原子(或嵌套流程)如何重试或撤销(使用前面提到的模式和决策枚举)。
例如
在此图中,如果任务 A、B 或 C 失败,将咨询重试控制器 (1),并且重试控制器 (2) 决定将其重试决策委托给重试控制器 (1)。如果重试控制器 (2) 不决定将其重试决策委托给重试控制器 (1),则重试控制器 (1) 将不会注意到任何决策。如果任务 1、2 或 3 失败,则只会咨询重试控制器 (1) 以确定应用于解决相关故障的策略/模式。
用法示例¶
>>> class EchoTask(task.Task):
... def execute(self, *args, **kwargs):
... print(self.name)
... print(args)
... print(kwargs)
...
>>> flow = linear_flow.Flow('f1').add(
... EchoTask('t1'),
... linear_flow.Flow('f2', retry=retry.ForEach(values=['a', 'b', 'c'], name='r1', provides='value')).add(
... EchoTask('t2'),
... EchoTask('t3', requires='value')),
... EchoTask('t4'))
在此示例中,流程 f2 具有重试控制器 r1,它是默认重试控制器 ForEach 的实例,它接受一个值集合并在每次发生故障时遍历此集合。在每次运行中,ForEach 重试都会从集合中返回下一个值,并在集合中没有更多值时停止重试子流程。例如,如果任务 t2 或 t3 失败,则流程 f2 将被撤销,并且重试 r1 将使用给定集合 ['a', 'b', 'c'] 中的下一个值对其进行重试。但是,如果任务 t1 或任务 t4 失败,r1 将不会重试流程,因为任务 t1 和 t4 在流程 f1 中,并且不依赖于重试 r1(因此它们不会在发生故障时咨询 r1)。
>>> class SendMessage(task.Task):
... def execute(self, message):
... print("Sending message: %s" % message)
...
>>> flow = linear_flow.Flow('send_message', retry=retry.Times(5)).add(
... SendMessage('sender'))
在此示例中,send_message 流程将在失败时尝试执行 SendMessage 五次。如果它在第六次失败(如果发生),则会要求任务 REVERT(在此示例中,任务撤销不会导致任何事情发生,但在其他用例中可能会发生)。
>>> class ConnectToServer(task.Task):
... def execute(self, ip):
... print("Connecting to %s" % ip)
...
>>> server_ips = ['192.168.1.1', '192.168.1.2', '192.168.1.3' ]
>>> flow = linear_flow.Flow('send_message',
... retry=retry.ParameterizedForEach(rebind={'values': 'server_ips'},
... provides='ip')).add(
... ConnectToServer(requires=['ip']))
在此示例中,该流程尝试使用可能的 IP 地址列表(也可以使用元组)连接服务器。每次重试都会从列表中返回一个 IP。如果发生故障,它将返回下一个 IP,直到到达最后一个 IP,然后该流程将被撤销。
接口¶
- class taskflow.task.Task(name=None, provides=None, requires=None, auto_extract=True, rebind=None, inject=None, ignore_list=None, revert_rebind=None, revert_requires=None)[source]¶
基础:
Atom定义潜在工作片段的抽象。
预计此潜在工作片段能够包含定义完成该工作可以执行的功能,以及定义撤消/撤销相同工作片段可以执行的功能的方式。
- property notifier¶
内部通知调度器/注册表。
一个通知对象,它将调度与任务内部发出的内部通知相关的事件到侦听器(例如,用于进度状态更新,告诉其他人任务已完成 50%)。
- class taskflow.task.FunctorTask(execute, name=None, provides=None, requires=None, auto_extract=True, rebind=None, revert=None, version=None, inject=None)[source]¶
基础:
Task适配器,用于从可调用对象创建任务。
获取任何可调用对并从中创建一个任务。
注意(harlowja):如果未提供名称,则
execute可调用对象的函数/方法名称将用作名称(不使用revert可调用对象的名称)。
- class taskflow.task.ReduceFunctorTask(functor, requires, name=None, provides=None, auto_extract=True, rebind=None, inject=None)[source]¶
基础:
Task用于通过应用函数来减少列表的通用任务。
此任务模拟 Python 内置
reduce函数的行为。该任务接收一个 functor(lambda 或其他)和一个列表。列表使用任务的requires参数指定。执行时,此任务使用 functor 和列表作为参数调用reduce。从对reduce的调用返回的结果在执行后返回。
- class taskflow.task.MapFunctorTask(functor, requires, name=None, provides=None, auto_extract=True, rebind=None, inject=None)[source]¶
基础:
Task用于将函数映射到列表的通用任务。
此任务模拟 Python 内置
map函数的行为。该任务接收一个 functor(lambda 或其他)和一个列表。列表使用任务的requires参数指定。执行时,此任务使用 functor 和列表作为参数调用map。从对map的调用返回的列表在执行后返回。可以使用
provides参数将返回列表的每个值绑定到单个名称,遵循 taskflow 标准行为。返回列表中保留顺序。
- class taskflow.retry.Retry(name=None, provides=None, requires=None, auto_extract=True, rebind=None)[source]¶
基础:
Atom一个可以决定如何解决执行失败的类。
此抽象基类用于继承并提供不同的策略,这些策略将在执行失败时激活。由于重试对象是一个原子,它还可以提供
execute()和revert()方法,以更改连接原子的输入(取决于要使用的策略,这可能非常有用)。注意(harlowja):
execute()和revert()和on_failure()将自动接收一个history参数,其中包含有关过去决策和结果的信息(如果可用)。- abstract execute(history, *args, **kwargs)[source]¶
执行给定的重试。
此执行激活给定的重试,该重试通常会生成开始或重新启动连接组件所需的数据,使用先前提供的值和先前运行的失败历史记录。
history可以分析历史数据以更改此重试控制器将使用的解析策略。例如,重试可以多次提供相同的值(每次运行后),最新的值或其他变体。旧值将自动保存到重试原子的历史记录中,这是一个元组列表 (result, failures) 被持久化,其中 failures 是一个按任务名称索引的失败字典,result 是在此失败解析尝试期间由此重试返回的执行结果。
- 参数:
args – 重试执行所需的定位参数。
kwargs – 重试执行所需的任何关键字参数。
- class taskflow.retry.History(contents, failure=None)[source]¶
基类:
object简化与重试历史内容交互的助手。
- property failure¶
返回重试自身失败或不存在则返回 None。
- class taskflow.retry.AlwaysRevert(name=None, provides=None, requires=None, auto_extract=True, rebind=None)[source]¶
基类:
Retry始终回滚子流程的重试。
- on_failure(*args, **kwargs)[source]¶
做出关于未来的决定。
此方法通常会使用有关先前失败的信息(如果此历史失败信息不可用或未被持久化,则提供的历史记录将为空)。
返回一个重试常量(以下之一):
RETRY:当控制流必须撤销并再次重新启动时(例如使用新参数)。REVERT:当此控制流必须完全撤销,并且父流程(如果有)应决定进一步的流程执行时。REVERT_ALL:当此控制流和父流程(如果有)必须撤销并标记为FAILURE时。
- execute(*args, **kwargs)[source]¶
执行给定的重试。
此执行激活给定的重试,该重试通常会生成开始或重新启动连接组件所需的数据,使用先前提供的值和先前运行的失败历史记录。
history可以分析历史数据以更改此重试控制器将使用的解析策略。例如,重试可以多次提供相同的值(每次运行后),最新的值或其他变体。旧值将自动保存到重试原子的历史记录中,这是一个元组列表 (result, failures) 被持久化,其中 failures 是一个按任务名称索引的失败字典,result 是在此失败解析尝试期间由此重试返回的执行结果。
- 参数:
args – 重试执行所需的定位参数。
kwargs – 重试执行所需的任何关键字参数。
- class taskflow.retry.AlwaysRevertAll(name=None, provides=None, requires=None, auto_extract=True, rebind=None)[source]¶
基类:
Retry始终回滚整个流程的重试。
- on_failure(**kwargs)[source]¶
做出关于未来的决定。
此方法通常会使用有关先前失败的信息(如果此历史失败信息不可用或未被持久化,则提供的历史记录将为空)。
返回一个重试常量(以下之一):
RETRY:当控制流必须撤销并再次重新启动时(例如使用新参数)。REVERT:当此控制流必须完全撤销,并且父流程(如果有)应决定进一步的流程执行时。REVERT_ALL:当此控制流和父流程(如果有)必须撤销并标记为FAILURE时。
- execute(**kwargs)[source]¶
执行给定的重试。
此执行激活给定的重试,该重试通常会生成开始或重新启动连接组件所需的数据,使用先前提供的值和先前运行的失败历史记录。
history可以分析历史数据以更改此重试控制器将使用的解析策略。例如,重试可以多次提供相同的值(每次运行后),最新的值或其他变体。旧值将自动保存到重试原子的历史记录中,这是一个元组列表 (result, failures) 被持久化,其中 failures 是一个按任务名称索引的失败字典,result 是在此失败解析尝试期间由此重试返回的执行结果。
- 参数:
args – 重试执行所需的定位参数。
kwargs – 重试执行所需的任何关键字参数。
- class taskflow.retry.Times(attempts=1, name=None, provides=None, requires=None, auto_extract=True, rebind=None, revert_all=False)[source]¶
基类:
Retry给定次数重试子流程。返回尝试次数。
- 参数:
attempts (int) – 重试相关子流程之前的尝试次数
revert_all (bool) – 如果提供,当尝试次数达到上限时,将回滚整个流程(如果为 false,则仅回滚相关子流程)。
其他参数的解释与
Atom构造函数中定义的相同。- on_failure(history, *args, **kwargs)[source]¶
做出关于未来的决定。
此方法通常会使用有关先前失败的信息(如果此历史失败信息不可用或未被持久化,则提供的历史记录将为空)。
返回一个重试常量(以下之一):
RETRY:当控制流必须撤销并再次重新启动时(例如使用新参数)。REVERT:当此控制流必须完全撤销,并且父流程(如果有)应决定进一步的流程执行时。REVERT_ALL:当此控制流和父流程(如果有)必须撤销并标记为FAILURE时。
- execute(history, *args, **kwargs)[source]¶
执行给定的重试。
此执行激活给定的重试,该重试通常会生成开始或重新启动连接组件所需的数据,使用先前提供的值和先前运行的失败历史记录。
history可以分析历史数据以更改此重试控制器将使用的解析策略。例如,重试可以多次提供相同的值(每次运行后),最新的值或其他变体。旧值将自动保存到重试原子的历史记录中,这是一个元组列表 (result, failures) 被持久化,其中 failures 是一个按任务名称索引的失败字典,result 是在此失败解析尝试期间由此重试返回的执行结果。
- 参数:
args – 重试执行所需的定位参数。
kwargs – 重试执行所需的任何关键字参数。
- class taskflow.retry.ForEach(values, name=None, provides=None, requires=None, auto_extract=True, rebind=None, revert_all=False)[source]¶
基类:
ForEachBase应用提供的静态策略集合。
在构造时接受决策策略集合,并在每次尝试时返回该集合的下一个元素。
- 参数:
values (list) – 要迭代并作为此函数
execute()方法的结果提供给流程中其他原子的值集合,其他依赖原子可以消耗这些值(例如,以更改它们自己的行为)。revert_all (bool) – 如果提供,当尝试次数达到上限时,将回滚整个流程(如果为 false,则仅回滚相关子流程)。
其他参数的解释与
Atom构造函数中定义的相同。- on_failure(history, *args, **kwargs)[source]¶
做出关于未来的决定。
此方法通常会使用有关先前失败的信息(如果此历史失败信息不可用或未被持久化,则提供的历史记录将为空)。
返回一个重试常量(以下之一):
RETRY:当控制流必须撤销并再次重新启动时(例如使用新参数)。REVERT:当此控制流必须完全撤销,并且父流程(如果有)应决定进一步的流程执行时。REVERT_ALL:当此控制流和父流程(如果有)必须撤销并标记为FAILURE时。
- execute(history, *args, **kwargs)[source]¶
执行给定的重试。
此执行激活给定的重试,该重试通常会生成开始或重新启动连接组件所需的数据,使用先前提供的值和先前运行的失败历史记录。
history可以分析历史数据以更改此重试控制器将使用的解析策略。例如,重试可以多次提供相同的值(每次运行后),最新的值或其他变体。旧值将自动保存到重试原子的历史记录中,这是一个元组列表 (result, failures) 被持久化,其中 failures 是一个按任务名称索引的失败字典,result 是在此失败解析尝试期间由此重试返回的执行结果。
- 参数:
args – 重试执行所需的定位参数。
kwargs – 重试执行所需的任何关键字参数。
- class taskflow.retry.ParameterizedForEach(name=None, provides=None, requires=None, auto_extract=True, rebind=None, revert_all=False)[source]¶
基类:
ForEachBase应用动态提供的策略集合。
从前置节点(或存储)接受策略集合作为参数,并在每次尝试时返回该集合的下一个元素。
- 参数:
revert_all (bool) – 如果提供,当尝试次数达到上限时,将回滚整个流程(如果为 false,则仅回滚相关子流程)。
其他参数的解释与
Atom构造函数中定义的相同。- on_failure(values, history, *args, **kwargs)[source]¶
做出关于未来的决定。
此方法通常会使用有关先前失败的信息(如果此历史失败信息不可用或未被持久化,则提供的历史记录将为空)。
返回一个重试常量(以下之一):
RETRY:当控制流必须撤销并再次重新启动时(例如使用新参数)。REVERT:当此控制流必须完全撤销,并且父流程(如果有)应决定进一步的流程执行时。REVERT_ALL:当此控制流和父流程(如果有)必须撤销并标记为FAILURE时。
- execute(values, history, *args, **kwargs)[source]¶
执行给定的重试。
此执行激活给定的重试,该重试通常会生成开始或重新启动连接组件所需的数据,使用先前提供的值和先前运行的失败历史记录。
history可以分析历史数据以更改此重试控制器将使用的解析策略。例如,重试可以多次提供相同的值(每次运行后),最新的值或其他变体。旧值将自动保存到重试原子的历史记录中,这是一个元组列表 (result, failures) 被持久化,其中 failures 是一个按任务名称索引的失败字典,result 是在此失败解析尝试期间由此重试返回的执行结果。
- 参数:
args – 重试执行所需的定位参数。
kwargs – 重试执行所需的任何关键字参数。
层级¶
