Conductors

Conductor

概述

Conductors 提供了一种机制,将各种概念统一到一个易于使用的(尽可能即插即用)构造中。

它们负责以下工作

  • jobboards 交互(检查和认领 jobs)。

  • 从认领的作业中创建 engines(使用 factories 来重建要执行的包含的任务和流)。

  • 使用提供的 persistence 层和引擎配置来分派引擎。

  • 根据分派和执行结果,完成或放弃认领的 job

  • 重复执行.

注意

它们受到 铁路乘务员音乐指挥 的启发,并具有类似的职责。

注意事项

在使用 conductor 时,应考虑一些使用注意事项,以确保其使用安全可靠。我们最终希望这些问题不再存在,但目前值得提及。

无限循环

内容: 在一个 conductor 上失败(由于某种内部错误)的作业将被该 conductor 放弃,然后另一个 conductor 可能会遇到相同的错误并放弃它(并重复)。这将创建一个作业放弃循环,只要该作业以可认领的状态存在,该循环就会持续下去。

示例

Conductor cycling

缓解方法

  1. 强制删除在给定数量的 conductor 尝试后持续失败的作业。这可以通过手动或自动脚本(或其他相关监控)或通过 jobboard 的 trash() 方法来完成。

  2. 解决内部错误的原因(存储后端故障,其他...)。

接口

class taskflow.conductors.base.Conductor(name, jobboard, persistence=None, engine=None, engine_options=None)[source]

基类: object

所有 conductor 实现的基类。

Conductors 作为实体,从 jobboard 中提取作业,将工作分配给某个引擎(使用期望的配置),然后等待该工作完成。如果工作失败,它们会放弃已认领的工作(或者如果它们运行的进程崩溃或终止,则会自动发生此放弃),然后稍后由另一个 conductor 完成先前失败 conductor 的工作。

ENTITY_KIND = 'conductor'

创建新实体对象时使用的实体类型。

conductor

代表此 conductor 的实体对象。

property notifier

conductor 操作(或其他状态更改)的通知器。

注意(harlowja): 不同的 conductor 实现可能在不同时间发出不同的事件 + 事件详细信息,因此请参阅您的 conductor 文档以了解确切可以订阅和不能订阅的内容。

connect()[source]

确保 jobboard 已连接(如果已连接则为空操作)。

close()[source]

关闭包含的 jobboard,禁止进一步使用。

abstract run(max_dispatches=None)[source]

持续认领、运行和消耗作业(重复执行)。

参数:

max_dispatches - 将分派的作业数量的上限,如果为 None 或负数,则表示分派的作业数量没有限制;如果为正数,则此 run 方法将在分派了该数量的作业后返回(而不是永远运行或直到停止)。

abstract stop()[source]

请求 conductor 停止分派。

此方法可用于请求 conductor 停止其消耗和分派循环。

该方法会立即返回,而不管 conductor 是否已停止。

abstract wait(timeout=None)[source]

等待 conductor 优雅退出。

此方法等待 conductor 优雅退出。可以提供一个可选的超时时间,这将导致方法在指定的超时时间内返回。如果达到超时时间,则返回值将是 False,否则为 True

参数:

timeout - wait() 方法应该阻塞的最大秒数。

taskflow.conductors.backends.fetch(kind, name, jobboard, namespace='taskflow.conductors', **kwargs)[source]

使用给定的选项获取 conductor backend。

此 fetch 方法将在入口点命名空间中查找“kind”入口点,然后尝试使用提供的名称、jobboard 和任何 board 特定的 kwargs 来实例化该入口点。

class taskflow.conductors.backends.impl_executor.ExecutorConductor(name, jobboard, persistence=None, engine=None, engine_options=None, wait_timeout=None, log=None, max_simultaneous_jobs=-1)[source]

Bases: Conductor

将作业从阻塞的 run() 方法分派到某个执行器。

此 conductor 迭代提供的 jobboard 中的作业(如果没有作业则等待给定的超时时间),尝试认领它们,使用执行器处理这些作业(可能阻止进一步的作业被认领和消耗),然后在完成后消耗这些工作单元。此过程将重复进行,直到 conductor 被停止或发生其他严重错误。

注意(harlowja): 即使引擎因 atom 故障而失败,也会发生消耗。仅当发生执行失败或存储失败时才会跳过此操作,这些失败(通常)可以通过在不同的 conductor 上重新运行来纠正(存储故障和执行故障可能是瞬时问题,可以通过后续执行来解决)。如果一个作业在完成后无法被消耗或放弃,conductor 将依赖 jobboard 的功能来自动放弃这些作业。

LOG = None

用于监听事件的日志记录器(如果没有,则使用模块级别的日志记录器)。

REFRESH_PERIODICITY = 30

每 30 秒 jobboard 将重新同步一次(如果出于某种原因未收到 watch 或 watch 集合),使用“ensure_fresh”选项来确保这一点(仅适用于支持的 jobboard backend)。

WAIT_TIMEOUT = 0.5

未找到作业时用于空闲/等待的默认超时时间。

MAX_SIMULTANEOUS_JOBS = -1

默认同时进行的作业的最大数量。

负数或零表示没有限制(请注意,如果使用的执行器基于队列构建,大多数执行器都是如此,那么这将意味着队列将包含大量未完成的已提交作业)。如果 https://bugs.python.org/issue22737 被实现和发布,这种情况可能会有所改善。

NO_CONSUME_EXCEPTIONS = (<class 'taskflow.exceptions.ExecutionFailure'>, <class 'taskflow.exceptions.StorageFailure'>)

不会导致消耗的异常。

EVENTS_EMITTED = ('compilation_start', 'compilation_end', 'preparation_start', 'preparation_end', 'validation_start', 'validation_end', 'running_start', 'running_end', 'job_consumed', 'job_abandoned')

将为以上每个事件发出事件。事件会发送给注册到 conductor 的监听器。

stop()[source]

请求 conductor 停止分派。

此方法可用于请求 conductor 停止其消耗和分派循环。

该方法会立即返回,而不管 conductor 是否已停止。

property dispatching

分派循环是否仍在分派。

run(max_dispatches=None)[source]

持续认领、运行和消耗作业(重复执行)。

参数:

max_dispatches - 将分派的作业数量的上限,如果为 None 或负数,则表示分派的作业数量没有限制;如果为正数,则此 run 方法将在分派了该数量的作业后返回(而不是永远运行或直到停止)。

wait(timeout=None)[source]

等待 conductor 优雅退出。

此方法等待 conductor 优雅退出。可以提供一个可选的超时时间,这将导致方法在指定的超时时间内返回。如果达到超时时间,则返回值将是 False,否则为 True

参数:

timeout - wait() 方法应该阻塞的最大秒数。

实现

阻塞

class taskflow.conductors.backends.impl_blocking.BlockingConductor(name, jobboard, persistence=None, engine=None, engine_options=None, wait_timeout=None, log=None, max_simultaneous_jobs=1)[source]

Bases: ExecutorConductor

阻塞 conductor,以阻塞方式处理作业。

MAX_SIMULTANEOUS_JOBS = 1

默认同时进行的作业的最大数量。

非阻塞

class taskflow.conductors.backends.impl_nonblocking.NonBlockingConductor(name, jobboard, persistence=None, engine=None, engine_options=None, wait_timeout=None, log=None, max_simultaneous_jobs=9, executor_factory=None)[source]

Bases: ExecutorConductor

非阻塞 conductor,使用线程执行器处理作业。

注意(harlowja): 可以通过关键字参数 executor_factory 提供自定义执行器工厂,如果提供,它将在 run() 时间使用一个位置参数(此 conductor)调用,并且必须返回一个兼容的 executor,可用于提交作业。如果提供 None,则默认选择一个线程池支持的执行器(其工作线程数将等于此 conductor 的同时作业数)。

注意(harlowja): 可以通过关键字参数 executor_factory 提供自定义执行器工厂,如果提供,它将在 run() 时间使用一个位置参数(此 conductor)调用,并且必须返回一个兼容的 executor,可用于提交作业。如果提供 None,则默认选择一个线程池支持的执行器(其工作线程数将等于此 conductor 的同时作业数)。

MAX_SIMULTANEOUS_JOBS = 9

默认同时进行的作业的最大数量。

层级

Inheritance diagram of taskflow.conductors.base, taskflow.conductors.backends.impl_blocking, taskflow.conductors.backends.impl_nonblocking, taskflow.conductors.backends.impl_executor