Engines

概述

Engines 是真正运行您的 atoms 的组件。

一个 engine 接收流程结构(由 patterns 描述),并使用它来决定何时运行哪个 atom

TaskFlow 提供了不同实现的 engines。有些可能更易于使用(例如,不需要额外的基础设施设置)和理解;而另一些可能需要更复杂的设置,但提供更好的可扩展性。理念和理想是,使用 TaskFlow 的服务部署者或开发者可以选择最适合其设置的 engine,而无需修改该服务代码。

注意

Engines 通常具有不同的功能和配置,但它们都必须实现相同的接口并保留 patterns 的语义(例如,linear_flow.Flow 的一部分按顺序一个接一个地运行,即使所选的 engine 能够并行运行任务)。

它们存在的原因

Engine 作为实际推进流程的核心组件,对于许多程序员来说可能是一个新概念,因此让我们更深入地描述它的运作方式以及它存在的一些原因。希望这能更清楚地说明它们为 TaskFlow 库用户带来的价值。

首先,让我们讨论一下大多数人已经熟悉的事情;声明式命令式 编程模型之间的区别。命令式模型涉及建立完成程序操作的语句(可能使用条件和其他语言特性来做到这一点)。这种程序将 如何 实现目标与定义 是什么 目标结合在一起(并且这些语句在执行时维护着这种状态)。相反,声明式模型将 如何 实现目标与 是什么 目标分开,只声明预期的目标,而不声明 如何 实现。在 TaskFlow 术语中,是什么 是流程的结构以及流程中的任务和其他 atoms,但 如何 没有定义(界限变得模糊,因为任务本身包含命令式代码,但现在将任务视为更像一个 函数,它执行、撤销并可能需要输入和提供输出)。这就是 engines 发挥作用的地方;它们执行通过 atoms、任务、流程以及其中定义的关联关系定义的 是什么,并以明确的方式执行这些操作(并且 engine 负责任何状态操作)。

这种命令式和声明式相结合(更强调声明式模型)使得以下功能成为可能

  • 增强可靠性:将状态更改与应该完成的内容分离,允许以一种自然的方式恢复,允许 engine 跟踪当前状态并知道工作流程处于哪个点以及如何恢复到该状态。

  • 增强可扩展性:当 engine 负责执行所需的工作时,可以通过创建新型执行后端(例如 worker 模型,该模型不本地执行)来改变 如何 执行。如果没有 是什么如何 的分离,就无法提供此功能(因为从这种耦合的本质来看,这种功能本质上很难提供)。

  • 增强一致性:由于 engine 负责执行 atoms 和相关的 workflow,因此它可以是保持执行模型处于一致状态的一个(如果不是唯一一个)主要实体。结合 atoms 应该是不可变的并且具有有限(如果有的话)的内部状态,可以大大提高推理和获得一致性的能力。

    • 借助未来的锁定功能(使用 tooz 来帮助),engines 还可以帮助确保任务访问的资源可靠地获得和修改。这将有助于确保其他进程、线程或其他类型的实体不会执行操作这些相同资源的任务(进一步提高一致性)。

当然,这些功能可能会带来一些缺点

  • 分离 如何是什么 的缺点是,必须开始远离命令式模型,在该模型中函数控制和操作状态(这可能对习惯于命令式模型的程序员来说是一种思维方式的转变)。我们通过创建和鼓励使用 persistence 来减轻这种担忧,以帮助实现状态并通过参数输入和输出机制传输状态。

  • 根据存在的命令式代码量(以及该代码中的状态),可能需要对该代码进行重大的重构和转换或重构为这些新概念。我们通过允许您拥有内部使用常规 Python 代码(并且内部可以以命令式风格编写)的任务以及通过提供 examples 来展示如何使用这些概念来提供帮助。

  • 分离 是什么如何 的另一个缺点是,使用传统技术调试故障可能变得更加困难(尤其是在涉及远程 worker 时)。我们通过使跟踪、监视和内省 engine 内部的操作和状态更改变得容易来提供帮助(有关如何使用这些功能,请参阅 notifications)。

创建

所有 engines 都是实现相同接口的类,当然可以像 Python 中的任何类一样导入和创建实例。但是,创建 engine 的更简单(推荐)方法是使用 engine 辅助函数。所有这些函数都导入到 taskflow.engines 模块命名空间中,因此这些函数的典型用法如下

from taskflow import engines

...
flow = make_flow()
eng = engines.load(flow, engine='serial', backend=my_persistence_conf)
eng.run()
...
taskflow.engines.helpers.load(flow, store=None, flow_detail=None, book=None, backend=None, namespace='taskflow.engines', engine='default', **kwargs)[source]

将流程加载到 engine 中。

此函数创建并准备 engine 以运行提供的流程。在此函数返回后,只需使用 engine 的 run() 方法运行 engine 即可。

要加载哪个 engine 由 engine 参数指定。它可以是命名要使用的 engine 类型的字符串,也可以是 URI,其 scheme 命名 engine 类型,URI 的主机、端口和查询参数包含进一步的选项…

要使用的存储后端由 backend 参数定义。它可以是后端本身,也可以传递给 fetch() 以获取可行后端的字典。

参数:
  • flow – 要加载的流程

  • store – dict – 要放入存储以满足流程要求的的数据

  • flow_detail – 包含流程状态的 FlowDetail(如果未提供,则会在提供的后端中为您创建一个)

  • book – 用于在 flow_detail 为 None 时创建 flow detail 的 LogBook

  • backend – 要使用的存储后端或定义它的配置

  • namespace – stevedore 的驱动程序命名空间(或默认命名空间的空字符串)

  • engine – engine 类型或包含 engine 类型和任何 URI 特定组件的 URI 字符串,这些组件将成为 engine 选项的一部分。

  • kwargs – 作为选项传递的任意关键字参数(与提取的 engine 合并),通常用于不适合任何现有参数的任何 engine 特定选项。

返回值:

engine

taskflow.engines.helpers.run(flow, store=None, flow_detail=None, book=None, backend=None, namespace='taskflow.engines', engine='default', **kwargs)[source]

运行流程。

此函数使用 load() 函数将流程加载到 engine 中并运行 engine。

参数的解释与 load() 相同。

返回值:

所有命名结果的字典(请参阅 fetch_all()

taskflow.engines.helpers.save_factory_details(flow_detail, flow_factory, factory_args, factory_kwargs, backend=None)[source]

将给定的工厂的可重新导入属性保存到流程详细信息中。

此函数将工厂名称、参数和关键字参数保存到给定的流程详细信息对象中,如果提供了后端,它还会确保后端在更新后保存流程详细信息。

参数:
  • flow_detail – 包含流程状态的 FlowDetail

  • flow_factory – 函数或字符串:创建流程的函数

  • factory_args – 工厂位置参数的列表或元组

  • factory_kwargs – 工厂关键字参数的字典

  • backend – 要使用的存储后端或配置

taskflow.engines.helpers.load_from_factory(flow_factory, factory_args=None, factory_kwargs=None, store=None, book=None, backend=None, namespace='taskflow.engines', engine='default', **kwargs)[source]

从工厂函数加载流程到 engine 中。

获取流程工厂函数(或其名称)并使用它创建流程。然后,流程使用 load() 函数加载到 engine 中,并且工厂函数的完全限定名称保存到流程元数据中,以便以后可以恢复它。

参数:
  • flow_factory – 函数或字符串:创建流程的函数

  • factory_args – 工厂位置参数的列表或元组

  • factory_kwargs – 工厂关键字参数的字典

进一步的参数的解释与 load() 相同。

返回值:

engine

taskflow.engines.helpers.flow_from_detail(flow_detail)[source]

重新加载之前保存的流程。

从流程详情元数据中获取流程工厂的名称和任何参数和关键字参数,然后调用该工厂以重新创建流程。

参数:

flow_detail – 包含流程状态的 FlowDetail

taskflow.engines.helpers.load_from_detail(flow_detail, store=None, backend=None, namespace='taskflow.engines', engine='default', **kwargs)[source]

重新加载之前保存的引擎。

这使用 flow_from_detail() 函数重新加载流程,然后调用 load() 函数从该流程创建引擎。

参数:

flow_detail – 包含流程状态的 FlowDetail

进一步的参数的解释与 load() 相同。

返回值:

engine

用法

要选择要使用的引擎并将参数传递给引擎,应使用任何引擎辅助函数接受的 engine 参数,并使用 kwargs 参数来使用任何引擎特定的选项。

类型

串行

引擎类型'serial'

在单个线程上运行所有任务——调用 run() 的同一线程。

注意

默认情况下使用此引擎。

提示

如果使用 eventlet,则此引擎不会阻止其他线程运行,因为 eventlet 会自动创建一个隐式协程系统(使用 greenthreads 和 monkey patching)。有关更多详细信息,请参阅 eventletgreenlet

并行

引擎类型'parallel'

并行引擎将任务调度到不同的线程/进程,以便同时运行不相关的任务。有关可以使用并行引擎构建的受支持参数,请参阅 ParallelActionEngine 的文档。

提示

在引擎实例之间共享执行器可以提高可扩展性,通过减少线程/进程的创建和销毁以及重用现有池(这通常是一个好习惯)。

警告

使用 进程池执行器运行任务是**实验性的**。这主要是由于 futures backportmultiprocessing 模块在旧版本的 python 中不如最新版本(包含重要的修复程序,例如 4892672192051628422393 等)最新版本(这些版本本身也有各种正在进行/最近的错误)。

Workers

引擎类型'worker-based''workers'

注意

由于此引擎比其他引擎复杂(且不同),因此我们认为有必要专门编写一个文档 章节 来介绍它。

它们如何运行

为了了解引擎在运行时所经历的一般过程,让我们将其分解并描述其中一种引擎类型在执行时的操作(我们将查看 ActionEngine 引擎类型)。

创建

首先,用户为给定的流程创建引擎,提供流程详情(结果将保存到提供的 持久性 后端)。这通常通过上述方法完成。此时,引擎现在将拥有对流程和后端以及其他内部变量的引用,并已设置好。

编译

在此阶段(参见 compile()),流程将转换为内部图表示形式,使用编译器(模式的默认实现是 PatternCompiler)。此类将流程对象和包含的原子编译/转换为 networkx 有向图(和树结构),其中包含流程中定义的原子以及任何嵌套流程和原子,以及由应用程序的不同流程模式创建的约束。此图(和树)将在引擎执行期间进行分析和遍历。在此阶段,还会创建一些辅助对象并保存到内部引擎变量中(这些对象有助于原子执行、图分析和执行其他内部引擎活动)。在本阶段结束时,将创建一个 Runtime 对象,该对象包含对所有必需的运行时组件的引用,并调用其 compile() 方法以编译常用执行辅助对象的缓存。

准备工作

此阶段(参见 prepare())首先设置编译图中的所有原子的存储,确保为图中的每个节点创建相应的 AtomDetail(或子类)。

验证

此阶段(参见 validate())执行编译(现在已准备好存储)引擎的任何最终验证。它比较了启动执行所需的条件以及当前提供的内容或将来将产生的内容。如果任何原子要求不满足(没有已知的当前提供者或未来的生产者),则不允许继续执行。

执行

先前创建的图(和辅助对象)现在用于指导进一步的执行(参见 run())。流程被置于 RUNNING 状态,并使用 MachineBuilder 状态机对象和运行器对象(使用 automaton 库)构建。然后,该机器和相关的运行器开始接管并开始执行以下阶段(有关更直观的图表/表示,请参见 引擎状态图)。

注意

引擎将尊重流程施加的约束。例如,如果引擎正在执行 Flow,则它受到依赖图的约束,在这种情况下是线性的,因此,如果希望实现并发,使用并行引擎可能不会带来任何好处。

恢复

第一阶段是分析图中的任务的 状态,确定哪些任务失败,哪些任务先前正在运行,并确定该任务的意图现在应该是什么(通常意图可以是它应该 REVERT,或者它应该 EXECUTE,或者它应该被 IGNORED)。通过查看任务详情对象中该任务的状态以及分析图的边缘(例如重试原子,它可以影响任务的意图)来确定此意图。一旦确定这些意图并与每个任务关联(意图也存储在 AtomDetail 对象中),调度 阶段开始。

调度

此阶段使用 Scheduler 实现选择有资格运行的原子(默认实现会查看它们的意图,检查前置原子是否已运行等等,使用 Selector 辅助对象),并将其提交到先前提供的兼容 执行器 以进行异步执行。此 Scheduler 将返回一个 future 对象,用于调度每个原子;所有这些都收集到一个未完成的 future 列表中。这将结束初始轮次的调度,此时引擎进入 等待 阶段。

等待

在此阶段,引擎等待先前提交的任何 future 对象完成。一旦 future 对象完成(或失败),将检查该原子的结果并使用 Completer 实现进行最终确定。它通常会将结果持久化到提供的持久性后端(保存到相应的 AtomDetailFlowDetail 对象通过 Storage 辅助工具),并反映原子的新状态。此时,通常会发生两种情况,一种是如果原子失败,另一种是如果它成功。如果原子失败,则可以将其设置为新的意图,例如 RETRYREVERT(此失败原子的前置原子也可能更改其意图)。一旦完成此意图调整,就会发生新一轮的 调度,此过程会重复,直到引擎成功或失败(如果运行引擎的进程死亡,则上述阶段将重新启动,并且将发生恢复)。

注意

如果引擎在执行上述阶段时被暂停,这将阻止任何进一步的调度阶段发生,并且所有当前正在执行的工作都将允许完成(参见 暂停)。

完成

此时,使用 MachineBuilder 类构建的机器(和运行器)现在已成功完成、失败或执行被暂停。这三个中的任何一个都会导致流程进入新的状态(通常是 FAILURESUSPENDEDSUCCESSREVERTED)。通知 将关于此最终状态更改(其他状态更改也会发送通知)以及发生的任何失败发送出去。如果没有发生任何失败,则引擎将完成,如果需要,可以使用 持久性 清理为此执行保存的任何详细信息。

特殊情况

暂停

每个引擎都实现了一个 suspend() 方法,该方法可用于外部地(或将来内部地)请求引擎停止 调度 新任务。默认情况下,此操作会将流程状态从 RUNNING 转换为 SUSPENDING 状态(稍后将转换为 SUSPENDED 状态)。由于引擎可能正在远程执行原子(或本地执行原子),并且当前没有抢占,因此发生的情况是引擎的 MachineBuilder 状态机将检测到转换为 SUSPENDING 状态,并且状态机将避免调度新任务(但会允许活动任务继续)。在当前任务完成后,引擎将从 SUSPENDING 转换为 SUSPENDED,并从其 run() 方法返回。

注意

run() 返回时,此时可能(但不一定,具体取决于在调用 suspend() 时处于活动状态的内容)流程中存在未完成的工作,这些工作尚未完成(但可以在稍后的时间点恢复)。

作用域

在创建流程时,了解引擎将内部使用的查找策略(通常也称为 作用域 解析)也很重要。例如,如果任务 A 提供结果 ‘a’,任务 BA 之后提供不同的结果 ‘a’,任务 CAB 之后需要 ‘a’ 才能运行,那么将选择哪一个?

默认策略

当引擎正在执行时,它会内部与 Storage 类交互,而该类又与 ScopeWalker 实例和 Storage 类交互,该类使用以下查找顺序来查找(或失败)原子的需求查找/请求

  1. 瞬态注入的原子特定参数。

  2. 非瞬态注入的原子特定参数。

  3. 瞬态注入的参数(流程特定)。

  4. 非瞬态注入的参数(流程特定)。

  5. 首先访问的提供程序,该提供程序生成命名结果;请注意,如果在同一作用域中找到多个提供程序,则选择第一个(作用域遍历器的生成顺序定义了第一个的含义)生成该结果并且可以提取而不引发错误的提供程序作为请求需求的提供程序。

  6. 如果此时未解析,则会以 NotFound 失败(此异常的 cause 属性可能包含有关查找失败原因的更多详细信息)。

注意

为了在调试时检查此信息,建议启用 BLATHER 日志级别(级别 5)。在此级别下,存储和作用域代码/层将记录正在搜索的内容以及正在找到的内容。

接口

class taskflow.engines.base.Engine(flow, flow_detail, backend, options)[source]

基类: object

所有引擎实现的基础。

变量:
  • Engine.notifier – 一个通知对象,它将分发与引擎包含的流程相关的事件。

  • atom_notifier – 一个通知对象,它将分发与引擎包含的原子相关的事件。

property notifier

流程通知器。

property atom_notifier

原子通知器。

property options

传递给此引擎构造的选项。

abstract property storage

此引擎的存储单元。

abstract property statistics

此引擎收集的运行时统计信息字典。

当引擎从未运行过时,此字典将为空。当它正在运行或之前运行过时,它应该具有(但不一定具有)有用的和/或信息性的键和值,在运行时或完成后。

警告

此字典中的键应该保持相对稳定(不更改),但它们的存在可能在主要版本之间发生变化,因为会收集或删除新的统计信息,因此在访问键之前,请确保它们实际存在并处理它们不存在的情况。

abstract compile()[source]

将包含的流程编译为内部表示形式。

此内部表示形式是引擎将实际用于运行的内容。如果无法完成此编译,则预计会抛出带有指示无法实现编译原因的消息的异常。

abstract reset()[source]

重置回 PENDING 状态。

如果流程之前(来自先前的引擎 run())最终进入 FAILURESUCCESSREVERTED 状态(或由于某种原因它最终进入了中间状态),那么使其能够再次运行可能是可取的。调用此方法可以实现这一点(而不会导致状态转换失败,这通常会在直接调用 run() 而不进行重置的情况下发生)。

abstract prepare()[source]

执行任何运行前,但编译后的操作。

注意(harlowja):在准备期间,当前假定底层存储将被初始化,原子将被重置,并且引擎将进入 PENDING 状态。

abstract validate()[source]

执行任何运行前,准备后验证操作。

注意(harlowja):在验证期间,将验证所有最终依赖项并确保它们。默认情况下,这将检查所有原子是否具有可满足的需求(由其他提供程序满足)。

abstract run()[source]

运行引擎中的流程直到完成(或尽力而为)。

abstract suspend()[source]

尝试暂停引擎。

如果引擎当前正在运行原子,则它将尝试防止启动未来的工作(当前活动的原子当前无法抢占),并将引擎移动到可以稍后从恢复的暂停状态。

实现

class taskflow.engines.action_engine.engine.ActionEngine(flow, flow_detail, backend, options)[source]

基类: Engine

通用的基于动作的引擎。

此引擎将流程(和任何子流程)编译为编译单元,其中包含要执行的完整运行时定义,然后使用此编译单元与执行器、运行时、机器构建器和存储类结合来尝试运行流程(和任何子流程和包含的原子)直到完成。

注意(harlowja):在此过程中,任务或多个任务在执行图中失败是允许的并且有效的(甚至可以同时发生),这将导致恢复或重试过程开始。请参阅 states 模块中的有效状态,以了解正在运行的任务和流程可以进入的其他状态。

引擎选项

名称/键

描述

类型

默认值

defer_reverts

此选项允许您安全地将具有重试的流程嵌套在没有重试的流程中,并且它仍然表现为用户期望的方式(例如,如果重试耗尽,它将恢复外部流程,除非外部流程具有单独的重试行为)。

bool

False

never_resolve

当为 true 时,引擎将跳过恢复并中止,而不是恢复和/或重试,而不是恢复和尝试解决原子失败。

bool

False

inject_transient

当为 true 时,局部于每个原子作用域的值将被注入到存储中的瞬态位置(通常是局部字典),当为 false 时,这些值将被持久保存到原子详细信息中(并以非瞬态方式保存)。

bool

True

NO_RERAISING_STATES = frozenset({'SUCCESS', 'SUSPENDED'})

如果引擎停止在这些状态,则不会导致任何潜在的故障被重新引发的状态。不在此列表中的状态将导致任何捕获到的故障(如果有)被重新引发。

IGNORABLE_STATES = frozenset({'ANALYZING', 'GAME_OVER', 'RESUMING', 'SCHEDULING', 'UNDEFINED', 'WAITING'})

此引擎内部机器在运行时产生的有用的信息状态,不适合让引擎记录,但适合通过 run_iter() 提供给最终用户。

MAX_MACHINE_STATES_RETAINED = 10

run_iter() 期间,将记录最后 X 个状态机转换(通常仅在失败时有用)。

suspend()[source]

尝试暂停引擎。

如果引擎当前正在运行原子,则它将尝试防止启动未来的工作(当前活动的原子当前无法抢占),并将引擎移动到可以稍后从恢复的暂停状态。

property statistics

此引擎收集的运行时统计信息字典。

当引擎从未运行过时,此字典将为空。当它正在运行或之前运行过时,它应该具有(但不一定具有)有用的和/或信息性的键和值,在运行时或完成后。

警告

此字典中的键应该保持相对稳定(不更改),但它们的存在可能在主要版本之间发生变化,因为会收集或删除新的统计信息,因此在访问键之前,请确保它们实际存在并处理它们不存在的情况。

属性 compilation

编译结果。

注意(harlowja):只有在编译完成后才能访问(在编译成功完成之前访问此属性将返回 None)。

storage

此引擎的存储单元。

注意(harlowja):在 compile() 完成后,此存储单元的原子参数查找策略将发生变化(因为只有在编译后才能知道实际结构)。在 compile() 完成之前,原子参数查找策略查找将仅限于注入的参数(这不会反映实际的运行时查找策略,后者通常会不同,但并非总是如此)。

run(timeout=None)[source]

运行引擎(或者尝试运行)。

参数:

timeout – 等待任何原子完成的超时时间(此超时将在等待未完成的原子时使用)。

run_iter(timeout=None)[source]

使用迭代运行引擎(或者尝试运行)。

参数:

timeout – 等待任何原子完成的超时时间(此超时将在等待未完成的原子时使用,在等待状态产生后)。

与以阻塞方式运行到完成不同,这将返回一个生成器,该生成器将返回引擎正在经历的各种状态(可用于使用每个引擎的生成器同时运行多个引擎)。返回的迭代器还响应来自 PEP 0342send() 方法,如果发送了 truthy 值,则会尝试暂停自身(暂停可能会延迟,直到所有活动原子完成)。

注意(harlowja):使用 run_iter 方法不会在执行期间保留引擎锁,因此用户应确保只有一个实体正在使用返回的引擎迭代器(每个引擎一个)。

validate()[source]

执行任何运行前,准备后验证操作。

注意(harlowja):在验证期间,将验证所有最终依赖项并确保它们。默认情况下,这将检查所有原子是否具有可满足的需求(由其他提供程序满足)。

prepare()[source]

执行任何运行前,但编译后的操作。

注意(harlowja):在准备期间,当前假定底层存储将被初始化,原子将被重置,并且引擎将进入 PENDING 状态。

reset()[source]

重置回 PENDING 状态。

如果流程先前由于先前的引擎 run() 结束于 FAILURESUCCESSREVERTED 状态(或者由于某种原因结束于中间状态),那么使其能够再次运行可能是有用的。调用此方法可以实现这一点(而不会导致状态转换失败,如果直接调用 run() 而不进行重置,通常会发生这种情况)。

compile()[source]

将包含的流程编译为内部表示形式。

此内部表示形式是引擎将实际用于运行的内容。如果无法完成此编译,则预计会抛出带有指示无法实现编译原因的消息的异常。

taskflow.engines.action_engine.engine.SerialActionEngine(flow, flow_detail, backend, options)[source]

基类: ActionEngine

以串行方式运行任务的引擎。

taskflow.engines.action_engine.engine.ParallelActionEngine(flow, flow_detail, backend, options)[source]

基类: ActionEngine

以并行方式运行任务的引擎。

其他引擎选项

  • executor:一个实现 PEP 3148 兼容执行器接口的对象;它将用于调度任务。适用以下类型(传递其他未知类型将引发类型错误)。

提供的类型

使用的执行器

concurrent.futures.thread.ThreadPoolExecutor

ParallelThreadTaskExecutor

concurrent.futures.process.ProcessPoolExecutor

ParallelProcessTaskExecutor

concurrent.futures._base.Executor

ParallelThreadTaskExecutor

  • executor:一个字符串,将用于选择 PEP 3148 兼容的执行器;它将用于调度任务。适用以下字符串(传递其他未知字符串将引发值错误)。

字符串(不区分大小写)

使用的执行器

process

ParallelProcessTaskExecutor

processes

ParallelProcessTaskExecutor

thread

ParallelThreadTaskExecutor

threaded

ParallelThreadTaskExecutor

threads

ParallelThreadTaskExecutor

greenthread

ParallelThreadTaskExecutor

(绿色版本)

greedthreaded

ParallelThreadTaskExecutor

(绿色版本)

greenthreads

ParallelThreadTaskExecutor

(绿色版本)

  • max_workers:一个整数,它将影响用于将任务分派到并行工作者的数量(此数字受工作流可以支持的最大并行化限制)。

  • wait_timeout:一个浮点数(以秒为单位),它将影响并行进程任务执行器(因此仅适用于上述执行器是进程变体时)。此数字会影响进程任务执行器等待来自子进程的消息的时间(通常表示它们已完成或失败)。较低的数字将具有较高的粒度,但目前涉及更多的轮询,而较高的数字将涉及较少的轮询,但引擎注意到任务完成的时间较慢。

组件

警告

内部引擎函数、组件和模块的外部使用应保持在最低限度,因为它们可能会被更改、重构或移动到其他位置而无需通知(并且没有典型的弃用周期)。

taskflow.engines.action_engine.builder.MachineMemory[source]

基类: object

状态机内存。

cancel_futures()[source]

尝试取消任何未完成的 future。

taskflow.engines.action_engine.builder.MachineBuilder(runtime, waiter)[source]

基类: object

状态机构建器,为引擎组件提供支持。

注意(harlowja):此构建的状态机(状态和触发转换的事件)由下表表示

+--------------+------------------+------------+----------+---------+
|    Start     |      Event       |    End     | On Enter | On Exit |
+--------------+------------------+------------+----------+---------+
|  ANALYZING   |    completed     | GAME_OVER  |    .     |    .    |
|  ANALYZING   |  schedule_next   | SCHEDULING |    .     |    .    |
|  ANALYZING   |  wait_finished   |  WAITING   |    .     |    .    |
|  FAILURE[$]  |        .         |     .      |    .     |    .    |
|  GAME_OVER   |      failed      |  FAILURE   |    .     |    .    |
|  GAME_OVER   |     reverted     |  REVERTED  |    .     |    .    |
|  GAME_OVER   |     success      |  SUCCESS   |    .     |    .    |
|  GAME_OVER   |    suspended     | SUSPENDED  |    .     |    .    |
|   RESUMING   |  schedule_next   | SCHEDULING |    .     |    .    |
| REVERTED[$]  |        .         |     .      |    .     |    .    |
|  SCHEDULING  |  wait_finished   |  WAITING   |    .     |    .    |
|  SUCCESS[$]  |        .         |     .      |    .     |    .    |
| SUSPENDED[$] |        .         |     .      |    .     |    .    |
| UNDEFINED[^] |      start       |  RESUMING  |    .     |    .    |
|   WAITING    | examine_finished | ANALYZING  |    .     |    .    |
+--------------+------------------+------------+----------+---------+

在这些产生的状态(减去 GAME_OVERUNDEFINED)中的任何一个之间,如果引擎已被暂停或引擎由于无法解决的任务失败或调度失败而失败,则机器将停止执行新任务(当前正在运行的任务将被允许完成),并且此机器的运行循环将被中断。

注意(harlowja):如果运行时的调度器组件能够并行调度任务,则可以实现并行运行和/或撤销。

build(statistics, timeout=None, gather_statistics=True)[source]

构建一个状态机(在运行时使用)。

taskflow.engines.action_engine.compiler.Terminator(flow)[source]

基类: object

流程终结器类。

属性 flow

此终结器表示/标记结束的流程。

属性 name

此终结器具有的有用名称(从流程名称派生)。

taskflow.engines.action_engine.compiler.Compilation(execution_graph, hierarchy)[source]

基类: object

编译器 compile() 的结果是这个不可变对象。

TASK = 'task'

任务节点将具有一个 kind 元数据键,其值为此值。

RETRY = 'retry'

重试节点将具有一个 kind 元数据键,其值为此值。

FLOW = 'flow'

流程入口节点将具有此值的 kind 元数据键。

FLOW_END = 'flow_end'

流程出口节点将具有此值的 kind 元数据键(仅适用于编译执行图,当前未在树层次结构中使用)。

property execution_graph

原子(作为图结构)的执行顺序。

property hierarchy

模式的层次结构(作为树结构)。

class taskflow.engines.action_engine.compiler.TaskCompiler[source]

基类: object

非递归的任务编译器。

class taskflow.engines.action_engine.compiler.FlowCompiler(deep_compiler_func)[source]

基类: object

流程的递归编译器。

compile(flow, parent=None)[source]

将流程分解为图和作用域树层次结构。

class taskflow.engines.action_engine.compiler.PatternCompiler(root, freeze=True)[source]

基类: object

将流程模式(或任务)编译为编译单元。

让我们深入了解这如何工作的一些基本概念

这里的编译器通过其 __init__ 方法提供一个“root”对象,该对象可以是任务或流程(受支持的模式之一),最终目标是生成一个 Compilation 对象作为结果,其中包含所需的组件。如果这不可能,将引发 CompilationFailure。在请求编译的类型为 未知 类型的情况下,将引发 TypeError,并且当遇到重复对象(即 已经 编译的对象)时,将引发 ValueError

当“root”是一个包含自身和其他嵌套流程(依此类推)的流程时,这种复杂性就会显现出来。为了编译此对象及其包含的对象到一个图中,保留模式强制的约束,我们必须经过一个递归算法,为每个嵌套级别创建子图,然后在递归过程中向上返回(现在有了包含的模式或原子到相应子图的分解映射),我们必须正确地将分解的子图(以及其中的原子)连接到一个新的图中,然后确保模式强制的约束得以保留。最后,我们返回给调用者(他们将执行相同操作,直到根节点,此时将创建一个包含模式/嵌套模式中所有包含原子的强制顺序的单个图)。

Compilation 对象中还维护着项的嵌套层次结构(这也在上述递归过程中通过一个更简单的算法构建)。这通常用于确定在查找可以提供给该原子执行的值时,给定原子的先前原子。请注意,虽然您可以认为可以使用图本身来做到这一点,在某些方面确实可以(用于有限的使用),但层次结构保留了嵌套结构(这对于作用域分析/查找很有用),以便能够返回一个迭代器,该迭代器返回每个级别可见的作用域(扁平化后图没有此信息)。

让我们举一个例子

给定模式 f(a(b, c), d),其中 f 是一个 Flow,包含项 a(b, c),其中 a 是一个 Flow,由任务 (b, c) 和任务 d 组成。

将执行的算法(镜像上述描述的逻辑)将经历以下步骤(树层次结构构建省略,因为它更明显)

Compiling f
  - Decomposing flow f with no parent (must be the root)
  - Compiling a
      - Decomposing flow a with parent f
      - Compiling b
          - Decomposing task b with parent a
          - Decomposed b into:
            Name: b
            Nodes: 1
              - b
            Edges: 0
      - Compiling c
          - Decomposing task c with parent a
          - Decomposed c into:
            Name: c
            Nodes: 1
              - c
            Edges: 0
      - Relinking decomposed b -> decomposed c
      - Decomposed a into:
        Name: a
        Nodes: 2
          - b
          - c
        Edges: 1
          b -> c ({'invariant': True})
  - Compiling d
      - Decomposing task d with parent f
      - Decomposed d into:
        Name: d
        Nodes: 1
          - d
        Edges: 0
  - Relinking decomposed a -> decomposed d
  - Decomposed f into:
    Name: f
    Nodes: 3
      - c
      - b
      - d
    Edges: 2
      c -> d ({'invariant': True})
      b -> c ({'invariant': True})
compile()[source]

将包含的项编译为编译后的等效项。

class taskflow.engines.action_engine.completer.Strategy(runtime)[source]

基类: object

故障解决策略基类。

abstract apply()[source]

应用一些算法来解决检测到的故障。

class taskflow.engines.action_engine.completer.RevertAndRetry(runtime, retry)[source]

基类:Strategy

设置与要稍后重试的子流程关联的。

apply()[source]

应用一些算法来解决检测到的故障。

class taskflow.engines.action_engine.completer.RevertAll(runtime)[source]

基类:Strategy

所有节点/原子设置为 REVERT 意图。

apply()[source]

应用一些算法来解决检测到的故障。

class taskflow.engines.action_engine.completer.Revert(runtime, atom)[source]

基类:Strategy

将原子和关联节点设置为 REVERT 意图。

apply()[source]

应用一些算法来解决检测到的故障。

class taskflow.engines.action_engine.completer.Completer(runtime)[source]

基类: object

使用操作完成原子以完成它们。

resume()[source]

恢复包含图中的原子。

这是为了允许分析任何先前完成或失败的原子,处理其结果,并根据需要调整任何受影响的原子。

这应该返回一组原子,这些原子应该是先前未完成的原子(由于以前未完成的 RUNNING 或 REVERTING 尝试)。

complete_failure(node, outcome, failure)[source]

执行节点失败后的完成。

返回是否应将结果保存到故障累加器中,或者不应这样做。

complete(node, outcome, result)[source]

执行节点结果后的完成。

class taskflow.engines.action_engine.deciders.Decider[source]

基类: object

决策器的基类。

提供由子类实现接口。

决策器检查流程中的下一个原子是否应执行。

abstract tally(runtime)[source]

统计边缘决策器,以确定此决策器是否应允许运行。

返回值是投票“否”(不允许运行)的边缘决策器列表。

abstract affect(runtime, nay_voters)[source]

影响相关的原子,因为至少有一个“否”边缘决策器。

这将通过将状态和意图设置为 IGNORE 来改变相关的原子 + 一些后继原子,以便在未来的运行时活动中忽略它们。

check_and_affect(runtime)[source]

按正确的顺序处理 tally() + affect()

注意(harlowja):如果没有“否”边缘决策器,则假定此决策器应允许运行。

返回一个布尔值,指示此决策器是否允许运行(或不运行)。

class taskflow.engines.action_engine.deciders.IgnoreDecider(atom, edge_deciders)[source]

基础: Decider

检查提供的任何边缘决策器,并确定是否可以运行。

tally(runtime)[source]

统计边缘决策器,以确定此决策器是否应允许运行。

返回值是投票“否”(不允许运行)的边缘决策器列表。

affect(runtime, nay_voters)[source]

影响相关的原子,因为至少有一个“否”边缘决策器。

这将通过将状态和意图设置为 IGNORE 来改变相关的原子 + 一些后继原子,以便在未来的运行时活动中忽略它们。

class taskflow.engines.action_engine.deciders.NoOpDecider[source]

基础: Decider

无操作决策器,表示始终可以运行,并且没有影响。

tally(runtime)[source]

始终可以运行。

affect(runtime, nay_voters)[source]

不执行任何操作。

class taskflow.engines.action_engine.executor.SerialRetryExecutor[source]

基类: object

执行和撤销重试。

start()[source]

准备执行重试。

stop()[source]

完成重试执行器。

execute_retry(retry, arguments)[source]

调度重试执行。

revert_retry(retry, arguments)[source]

调度重试撤销。

class taskflow.engines.action_engine.executor.TaskExecutor[source]

基类: object

执行和撤销任务。

此类接收任务及其参数并执行或撤销它。它封装了有关应如何执行或撤销任务的知识:现在,在单独的线程上,在另一台机器上等。

abstract execute_task(task, task_uuid, arguments, progress_callback=None)[source]

调度任务执行。

abstract revert_task(task, task_uuid, arguments, result, failures, progress_callback=None)[source]

调度任务撤销。

start()[source]

准备执行任务。

stop()[source]

完成任务执行器。

class taskflow.engines.action_engine.executor.SerialTaskExecutor[source]

基础: TaskExecutor

按顺序执行任务。

start()[source]

准备执行任务。

stop()[source]

完成任务执行器。

execute_task(task, task_uuid, arguments, progress_callback=None)[source]

调度任务执行。

revert_task(task, task_uuid, arguments, result, failures, progress_callback=None)[source]

调度任务撤销。

class taskflow.engines.action_engine.executor.ParallelTaskExecutor(executor=None, max_workers=None)[source]

基础: TaskExecutor

并行执行任务。

将任务提交给执行器,该执行器应提供类似于 concurrent.Futures.Executor 的接口。

constructor_options = [('max_workers', <function ParallelTaskExecutor.<lambda>>)]

此执行器支持的可选构造函数关键字参数。这些通常通过引擎选项(由引擎用户)传递,并在发送到此类 __init__ 方法之前转换为正确类型。

execute_task(task, task_uuid, arguments, progress_callback=None)[source]

调度任务执行。

revert_task(task, task_uuid, arguments, result, failures, progress_callback=None)[source]

调度任务撤销。

start()[source]

准备执行任务。

stop()[source]

完成任务执行器。

class taskflow.engines.action_engine.executor.ParallelThreadTaskExecutor(executor=None, max_workers=None)[source]

基础: ParallelTaskExecutor

使用线程池执行器并行执行任务。

class taskflow.engines.action_engine.executor.ParallelGreenThreadTaskExecutor(executor=None, max_workers=None)[source]

基础类:ParallelThreadTaskExecutor

使用绿线程池执行器并行执行任务。

DEFAULT_WORKERS = 1000

当传递 None 时,默认的工作器数量;由于绿线程不能很好地映射到原生线程或处理器,因此这更多的是一种猜测/某种程度上是任意的,但它确实与 eventlet greenpool 的默认大小相匹配(因此至少与 eventlet 所做的事情保持一致)。

class taskflow.engines.action_engine.runtime.Runtime(compilation, storage, atom_notifier, task_executor, retry_executor, options=None)[source]

基类: object

运行时对象、属性等的集合,用于执行期间。

此对象包含各种实用方法和属性,它们代表了动作引擎完成运行所需的运行时组件和功能的集合。

compile()[source]

编译并缓存常用的执行辅助对象。

构建一个缓存,其中包含与包含的原子(按名称)关联的常用项目,并且对于快速查找很有用(例如,每个原子的更改状态处理程序函数、每个原子的作用域遍历器对象、特定于任务或重试的调度器等)。

check_atom_transition(atom, current_state, target_state)[source]

检查原子是否可以转换为提供的目标状态。

fetch_edge_deciders(atom)[source]

获取给定原子的边决策器。

fetch_scheduler(atom)[source]

获取给定原子的缓存特定调度器。

fetch_action(atom)[source]

获取给定原子的缓存操作处理程序。

fetch_scopes_for(atom_name)[source]

获取给定原子的可见作用域的遍历器。

iterate_retries(state=None)[source]

迭代与提供的状态匹配的重试原子。

如果未提供状态,则将返回所有重试原子。

iterate_nodes(allowed_kinds)[source]

返回执行图中指定类型的全部节点。

is_success()[source]

检查执行图中的所有原子是否处于“良好”状态。

find_retry(node)[source]

返回与给定节点关联的重试原子(或无)。

reset_atoms(atoms, state='PENDING', intention='EXECUTE')[source]

将所有提供的原子重置为给定的状态和意图。

reset_all(state='PENDING', intention='EXECUTE')[source]

将所有原子重置为给定的状态和意图。

reset_subgraph(atom, state='PENDING', intention='EXECUTE')[source]

将原子的子图重置为给定的状态和意图。

子图包含所有给定原子的后继原子。

retry_subflow(retry)[source]

准备重试及其子图以供执行。

这将设置重试的意图为 EXECUTE,并将所有子图(其后继原子)重置为 PENDING 状态,并具有 EXECUTE 意图。

class taskflow.engines.action_engine.scheduler.RetryScheduler(runtime)[source]

基类: object

调度重试原子。

schedule(retry)[source]

未来完成调度给定的重试原子。

根据原子存储的意图,这可能会为重试原子安排恢复或执行。

class taskflow.engines.action_engine.scheduler.TaskScheduler(runtime)[source]

基类: object

调度任务原子。

schedule(task)[source]

未来完成调度给定的任务原子。

根据原子存储的意图,这可能会为任务原子安排恢复或执行。

class taskflow.engines.action_engine.scheduler.Scheduler(runtime)[source]

基类: object

使用运行时 fetch_scheduler 例程安全地调度原子。

schedule(atoms)[source]

未来完成调度提供的原子。

此方法应为提供的每个原子安排一个 future,并返回一组这些 future 以供等待(或用于其他类似目的)。它还应返回任何表示在此调度过程中可能发生的调度失败的失败对象。

class taskflow.engines.action_engine.selector.Selector(runtime)[source]

基类: object

使用编译并协助执行过程的选择器。

其主要目的是使用编译的基础结构(图、节点和边关系…)以及存储中存储的原子状态/状态来获取下一个要执行或恢复的原子,并使用此信息为系统的其余部分提供其他有用的功能。

iter_next_atoms(atom=None)[source]

迭代下一个要运行的原子(源自原子或所有原子)。

class taskflow.engines.action_engine.scopes.ScopeWalker(compilation, atom, names_only=False)[source]

基类: object

使用引擎编译遍历原子的作用域。

注意(harlowja):仅供内部使用。

这将遍历给定原子可访问的可见作用域,这些作用域可以被某些外部实体以某种有意义的方式使用,例如查找依赖值…

__iter__()[source]

遍历可见的作用域。

其工作方式如下

我们首先获取给定原子(让我们称之为 Y)的所有前驱原子,方法是使用 Compilation 执行图(并进行反向广度优先扩展以收集其前驱原子),这很有用,因为我们知道它们始终会存在(并执行)在此原子之前,但它并没有告诉我们每个前驱原子创建的相应作用域级别(流、嵌套流…),因此我们需要找到此信息。

为了获取这些信息,我们查阅原子 YCompilation 层次结构/树中的位置。我们以相反的顺序查找父原子 X,它是 Y 的父原子,并从父原子中 Y 存在的索引向后遍历,到 X 中的所有兄弟节点(以及这些兄弟节点的子节点),在这次向后搜索中遇到的所有节点(如果一个兄弟节点本身就是一个流程,那么它的原子将被递归展开并包含在内)。然后,我们将这个集合假定为处于相同的作用域。这被称为一个潜在的单一作用域,为了形成一个实际的作用域,我们需要从潜在作用域中移除那些不是 Y 的前驱节点的项目,从而形成实际的作用域,然后我们将其返回。

然后,为了获取额外的作用域,我们继续向上遍历树,找到 X 的父原子(我们称之为 Z),并执行相同的操作,从父原子 ZX 位于的索引开始,以相反的顺序遍历子节点。这形成了另一个潜在作用域,在将潜在集合减少到仅包含先前收集的前驱节点后,我们将其作为实际作用域返回。然后,我们重复此过程,直到不再有父节点(即我们已到达树的顶部),或者前驱节点用尽。

class taskflow.engines.action_engine.traversal.Direction(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

基础: Enum

遍历方向枚举。

FORWARD = 1

遍历后继节点。

BACKWARD = 2

遍历前驱节点。

taskflow.engines.action_engine.traversal.breadth_first_iterate(execution_graph, starting_node, direction, through_flows=True, through_retries=True, through_tasks=True)[source]

迭代执行图中的连接节点(从起始节点)。

以广度优先的方式进行。

跳过具有 noop 属性的节点(不返回它们)。

taskflow.engines.action_engine.traversal.depth_first_iterate(execution_graph, starting_node, direction, through_flows=True, through_retries=True, through_tasks=True)[source]

迭代执行图中的连接节点(从起始节点)。

以深度优先的方式进行。

跳过具有 noop 属性的节点(不返回它们)。

taskflow.engines.action_engine.traversal.depth_first_reverse_iterate(node, start_from_idx= -1)[source]

迭代连接的(反向)节点(从起始节点)。

跳过具有 noop 属性的节点(不返回它们)。

层级

Inheritance diagram of taskflow.engines.action_engine.engine.ActionEngine, taskflow.engines.base.Engine, taskflow.engines.worker_based.engine.WorkerBasedActionEngine