Conductors¶
概述¶
Conductors 提供了一种机制,将各种概念统一到一个易于使用的(尽可能即插即用)构造中。
它们负责以下工作
注意事项¶
在使用 conductor 时,应考虑一些使用注意事项,以确保其使用安全可靠。我们最终希望这些问题不再存在,但目前值得提及。
无限循环¶
内容: 在一个 conductor 上失败(由于某种内部错误)的作业将被该 conductor 放弃,然后另一个 conductor 可能会遇到相同的错误并放弃它(并重复)。这将创建一个作业放弃循环,只要该作业以可认领的状态存在,该循环就会持续下去。
示例
缓解方法
强制删除在给定数量的 conductor 尝试后持续失败的作业。这可以通过手动或自动脚本(或其他相关监控)或通过 jobboard 的
trash()方法来完成。解决内部错误的原因(存储后端故障,其他...)。
接口¶
- class taskflow.conductors.base.Conductor(name, jobboard, persistence=None, engine=None, engine_options=None)[source]¶
基类:
object所有 conductor 实现的基类。
Conductors 作为实体,从 jobboard 中提取作业,将工作分配给某个引擎(使用期望的配置),然后等待该工作完成。如果工作失败,它们会放弃已认领的工作(或者如果它们运行的进程崩溃或终止,则会自动发生此放弃),然后稍后由另一个 conductor 完成先前失败 conductor 的工作。
- ENTITY_KIND = 'conductor'¶
创建新实体对象时使用的实体类型。
- conductor¶
代表此 conductor 的实体对象。
- property notifier¶
conductor 操作(或其他状态更改)的通知器。
注意(harlowja): 不同的 conductor 实现可能在不同时间发出不同的事件 + 事件详细信息,因此请参阅您的 conductor 文档以了解确切可以订阅和不能订阅的内容。
- abstract run(max_dispatches=None)[source]¶
持续认领、运行和消耗作业(重复执行)。
- 参数:
max_dispatches - 将分派的作业数量的上限,如果为 None 或负数,则表示分派的作业数量没有限制;如果为正数,则此 run 方法将在分派了该数量的作业后返回(而不是永远运行或直到停止)。
- taskflow.conductors.backends.fetch(kind, name, jobboard, namespace='taskflow.conductors', **kwargs)[source]¶
使用给定的选项获取 conductor backend。
此 fetch 方法将在入口点命名空间中查找“kind”入口点,然后尝试使用提供的名称、jobboard 和任何 board 特定的 kwargs 来实例化该入口点。
- class taskflow.conductors.backends.impl_executor.ExecutorConductor(name, jobboard, persistence=None, engine=None, engine_options=None, wait_timeout=None, log=None, max_simultaneous_jobs=-1)[source]¶
Bases:
Conductor将作业从阻塞的
run()方法分派到某个执行器。此 conductor 迭代提供的 jobboard 中的作业(如果没有作业则等待给定的超时时间),尝试认领它们,使用执行器处理这些作业(可能阻止进一步的作业被认领和消耗),然后在完成后消耗这些工作单元。此过程将重复进行,直到 conductor 被停止或发生其他严重错误。
注意(harlowja): 即使引擎因 atom 故障而失败,也会发生消耗。仅当发生执行失败或存储失败时才会跳过此操作,这些失败(通常)可以通过在不同的 conductor 上重新运行来纠正(存储故障和执行故障可能是瞬时问题,可以通过后续执行来解决)。如果一个作业在完成后无法被消耗或放弃,conductor 将依赖 jobboard 的功能来自动放弃这些作业。
- LOG = None¶
用于监听事件的日志记录器(如果没有,则使用模块级别的日志记录器)。
- REFRESH_PERIODICITY = 30¶
每 30 秒 jobboard 将重新同步一次(如果出于某种原因未收到 watch 或 watch 集合),使用“ensure_fresh”选项来确保这一点(仅适用于支持的 jobboard backend)。
- WAIT_TIMEOUT = 0.5¶
未找到作业时用于空闲/等待的默认超时时间。
- MAX_SIMULTANEOUS_JOBS = -1¶
默认同时进行的作业的最大数量。
负数或零表示没有限制(请注意,如果使用的执行器基于队列构建,大多数执行器都是如此,那么这将意味着队列将包含大量未完成的已提交作业)。如果 https://bugs.python.org/issue22737 被实现和发布,这种情况可能会有所改善。
- NO_CONSUME_EXCEPTIONS = (<class 'taskflow.exceptions.ExecutionFailure'>, <class 'taskflow.exceptions.StorageFailure'>)¶
不会导致消耗的异常。
- EVENTS_EMITTED = ('compilation_start', 'compilation_end', 'preparation_start', 'preparation_end', 'validation_start', 'validation_end', 'running_start', 'running_end', 'job_consumed', 'job_abandoned')¶
将为以上每个事件发出事件。事件会发送给注册到 conductor 的监听器。
- property dispatching¶
分派循环是否仍在分派。
实现¶
阻塞¶
- class taskflow.conductors.backends.impl_blocking.BlockingConductor(name, jobboard, persistence=None, engine=None, engine_options=None, wait_timeout=None, log=None, max_simultaneous_jobs=1)[source]¶
Bases:
ExecutorConductor阻塞 conductor,以阻塞方式处理作业。
- MAX_SIMULTANEOUS_JOBS = 1¶
默认同时进行的作业的最大数量。
非阻塞¶
- class taskflow.conductors.backends.impl_nonblocking.NonBlockingConductor(name, jobboard, persistence=None, engine=None, engine_options=None, wait_timeout=None, log=None, max_simultaneous_jobs=9, executor_factory=None)[source]¶
Bases:
ExecutorConductor非阻塞 conductor,使用线程执行器处理作业。
- 注意(harlowja): 可以通过关键字参数
executor_factory提供自定义执行器工厂,如果提供,它将在run()时间使用一个位置参数(此 conductor)调用,并且必须返回一个兼容的 executor,可用于提交作业。如果提供None,则默认选择一个线程池支持的执行器(其工作线程数将等于此 conductor 的同时作业数)。 注意(harlowja): 可以通过关键字参数
executor_factory提供自定义执行器工厂,如果提供,它将在run()时间使用一个位置参数(此 conductor)调用,并且必须返回一个兼容的 executor,可用于提交作业。如果提供None,则默认选择一个线程池支持的执行器(其工作线程数将等于此 conductor 的同时作业数)。
- MAX_SIMULTANEOUS_JOBS = 9¶
默认同时进行的作业的最大数量。
- 注意(harlowja): 可以通过关键字参数
层级¶
