概述¶
这是一个调度任务到 worker 的引擎——独立的进程,专门用于执行某些原子操作,可能运行在其他机器上,通过 amqp(或其他支持的 kombu 传输)连接。
注意
这个引擎正在积极开发中,可用且 有效,但缺少一些功能(请查看 蓝图页面,了解已知问题和计划),这些功能将使其更适合生产环境。
术语¶
- 客户端
使用此库定义流程并使用引擎运行这些流程的代码、程序、服务(或用户)。
- 传输 + 协议
用于在客户端和 worker 之间传递信息的机制(以及基于该机制的 协议)。例如,amqp 作为传输,以及 json 编码的消息格式作为协议。
- 执行器 (Executor)
worker-based 引擎的一部分,用于发布任务请求,以便远程 worker 接受并处理这些请求。
- Worker
Worker 在远程主机上启动,每个 worker 都有一个可以执行的任务列表(按需)。Worker 接受并处理由执行器发布的任务请求。可以在单独的线程(或进程…)中同时处理多个请求。例如,可以将 执行器 传递给 worker,并配置为在所需数量的线程(无论是否是 greenlet)中运行。
- 代理
执行器通过代理与 worker 交互。代理维护底层的传输并发布消息(并在接收到消息时调用回调)。
需求¶
透明性:它应该作为现有 (本地) 引擎的临时替代品,只需最少的重构(如果有的话)。例如,如果一切设置和配置正确,应该能够在它上面运行相同的流程,而无需更改客户端代码。
传输无关性:传输方式应该被抽象化,以便我们可以使用 oslo.messaging、gearmand、amqp、zookeeper、marconi、websockets 或任何其他允许在客户端和 worker 之间传递信息的方式。
简单性:它应该易于编写和部署。
非均匀性:它应该支持非均匀的 worker,允许不同的 worker 根据其发布的 capabilities 执行不同的原子操作集合。
设计¶
有两个通信端,即 执行器(以及相关的引擎派生类)和 worker,它们使用代理组件进行通信。该代理设计用于接受/发布命名 exchange 中的消息。
高级架构¶
执行器和 worker 通信¶
让我们考虑一下执行器和 worker 之间的通信方式。首先,引擎解析所有原子操作的依赖关系,并调度当前可以执行的原子操作。这使用了与其他引擎类型相同的调度和依赖关系解析逻辑。然后,worker-based 引擎执行器以以下方式执行可以立即执行的原子操作(那些依赖于其他任务输出的原子操作将在该输出准备好时执行):
执行器使用代理对象启动任务执行/撤销。
Proxy将任务请求(格式如下所述)发布到命名 exchange,使用用于将请求传递到特定 worker topic 的路由键。然后,执行器等待 worker 接受并确认任务请求。如果执行器在给定的超时时间内没有从 worker 收到任务确认,则该任务将被视为超时,并引发超时异常。worker 接收到请求消息,并启动一个新线程来处理它。
worker 分派请求(获取实际执行任务的所需端点)。
如果分派成功,则 worker 向执行器发送确认响应,否则 worker 发送失败响应,以及序列化的
failure对象,其中包含失败的内容(以及原因)。worker 执行任务,并在完成后将结果发送回原始执行器(每次触发任务进度事件时,它都会向执行器发送进度通知,由引擎处理,并分派给监听器等)。
执行器从 worker 收到任务请求确认,任务请求状态从
PENDING变为RUNNING状态。一旦任务请求处于RUNNING状态,就无法超时(考虑到任务执行过程可能需要不可预测的时间)。执行器从 worker 收到任务执行结果,并将其传递回执行器和 worker-based 引擎以完成任务处理(这会针对后续任务重复)。
注意
Failure 对象不能直接 json 序列化(它们包含对不可序列化的 traceback 的引用),因此在发送之前将它们转换为字典,并在执行器和 worker 两侧接收后从字典转换回来(这种转换是有损的,因为 traceback 由于其内容包含内部解释器引用和细节而无法完全保留)。
协议¶
- taskflow.engines.worker_based.protocol.failure_to_dict(failure)[source]¶
尝试将 failure 对象转换为可 json 序列化的字典。
- class taskflow.engines.worker_based.protocol.Notify(**data)[source]¶
基类:
Message表示 notify 消息类型。
- TYPE = 'NOTIFY'¶
表示此消息类型的字符串常量。
- RESPONSE_SCHEMA = {'additionalProperties': False, 'properties': {'tasks': {'items': {'type': 'string'}, 'type': 'array'}, 'topic': {'type': 'string'}}, 'required': ['topic', 'tasks'], 'type': 'object'}¶
预期的 notify 响应 消息模式(以 json 模式格式)。
- SENDER_SCHEMA = {'additionalProperties': False, 'type': 'object'}¶
预期的 发送者 请求消息模式(以 json 模式格式)。
- class taskflow.engines.worker_based.protocol.Request(task, uuid, action, arguments, timeout=60, result=<object object>, failures=None)[source]¶
基类:
Message表示带有执行结果的请求。
每个请求都在 WAITING 状态下创建,并存储创建时间以及通过构造函数参数给定的超时时间。
请求在 WAITING/PENDING 状态下超过给定的超时时间时被认为已过期(在任何其他状态下均不被认为已过期)。
+------------+------------+---------+----------+---------+ | Start | Event | End | On Enter | On Exit | +------------+------------+---------+----------+---------+ | FAILURE[$] | . | . | . | . | | PENDING | on_failure | FAILURE | . | . | | PENDING | on_running | RUNNING | . | . | | RUNNING | on_failure | FAILURE | . | . | | RUNNING | on_success | SUCCESS | . | . | | SUCCESS[$] | . | . | . | . | | WAITING[^] | on_failure | FAILURE | . | . | | WAITING[^] | on_pending | PENDING | . | . | +------------+------------+---------+----------+---------+
- TYPE = 'REQUEST'¶
表示此消息类型的字符串常量。
- SCHEMA = {'properties': {'action': {'enum': ['execute', 'revert'], 'type': 'string'}, 'arguments': {'type': 'object'}, 'failures': {'type': 'object'}, 'result': {}, 'task_cls': {'type': 'string'}, 'task_name': {'type': 'string'}, 'task_version': {'oneOf': [{'type': 'string'}, {'type': 'array'}]}}, 'required': ['task_cls', 'task_name', 'task_version', 'action'], 'type': 'object'}¶
预期的消息模式(以 json 模式格式)。
- property current_state¶
当前请求的状态。
- property expired¶
检查请求是否已过期。
创建新请求时,其状态设置为 WAITING,存储创建时间,并通过构造函数参数给出超时时间。
如果请求在 WAITING/PENDING 状态下超过给定的超时时间,则该请求被认为已过期(不考虑任务执行过程可能需要不可预测的时间)。
- transition_and_log_error(new_state, logger=None)[source]¶
转换并记录错误(如果转换引发异常)。
这覆盖了转换函数,并执行了几乎相同的功能,但如果转换无效,它不会引发异常,而是将警告记录到提供的记录器中,并返回 False 以指示转换未执行(请注意,这与转换函数不同,在转换函数中,False 表示忽略)。
- class taskflow.engines.worker_based.protocol.Response(state, **data)[source]¶
基类:
Message表示响应消息类型。
- TYPE = 'RESPONSE'¶
表示此消息类型的字符串常量。
- SCHEMA = {'additionalProperties': False, 'definitions': {'completion': {'additionalProperties': False, 'properties': {'result': {}, 'required': ['result'], 'type': 'object'}, 'empty': {'additionalProperties': False, 'type': 'object'}, 'event': {'additionalProperties': False, 'properties': {'details': {'type': 'object'}, 'event_type': {'type': 'string'}}, 'required': ['event_type', 'details'], 'type': 'object'}}, 'properties': {'data': {'anyOf': [{'$ref': '#/definitions/event'}, {'$ref': '#/definitions/completion'}, {'$ref': '#/definitions/empty'}]}, 'state': {'enum': ['WAITING', 'PENDING', 'RUNNING', 'SUCCESS', 'FAILURE', 'EVENT'], 'type': 'string'}}, 'required': ['state', 'data'], 'type': 'object'}¶
预期的消息模式(以 json 模式格式)。
示例¶
请求(执行)¶
task_name - 要执行的完整任务名称
task_cls - 要执行的完整任务类名称
action - 要执行的任务操作(例如,执行、撤销)
arguments - 调用任务操作时要使用的参数
result - 任务执行结果(结果或
Failure)[仅传递给撤销]
此外,以下参数添加到请求消息中
reply_to - 执行器命名的交换机,工作器将把响应发送到该交换机
correlation_id - 执行器请求 ID(由于可以同时处理多个请求)
示例
{
"action": "execute",
"arguments": {
"x": 111
},
"task_cls": "taskflow.tests.utils.TaskOneArgOneReturn",
"task_name": "taskflow.tests.utils.TaskOneArgOneReturn",
"task_version": [
1,
0
]
}
请求(撤销)¶
当撤销时:
{
"action": "revert",
"arguments": {},
"failures": {
"taskflow.tests.utils.TaskWithFailure": {
"exc_type_names": [
"RuntimeError",
"StandardError",
"Exception"
],
"exception_str": "Woot!",
"traceback_str": " File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n result = task.execute(**arguments)\n File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n raise RuntimeError('Woot!')\n",
"version": 1
}
},
"result": [
"failure",
{
"exc_type_names": [
"RuntimeError",
"StandardError",
"Exception"
],
"exception_str": "Woot!",
"traceback_str": " File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n result = task.execute(**arguments)\n File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n raise RuntimeError('Woot!')\n",
"version": 1
}
],
"task_cls": "taskflow.tests.utils.TaskWithFailure",
"task_name": "taskflow.tests.utils.TaskWithFailure",
"task_version": [
1,
0
]
}
工作器响应¶
当运行时:
{
"data": {},
"state": "RUNNING"
}
当进展时:
{
"details": {
"progress": 0.5
},
"event_type": "update_progress",
"state": "EVENT"
}
当成功时:
{
"data": {
"result": 666
},
"state": "SUCCESS"
}
当失败时:
{
"data": {
"result": {
"exc_type_names": [
"RuntimeError",
"StandardError",
"Exception"
],
"exception_str": "Woot!",
"traceback_str": " File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n result = task.execute(**arguments)\n File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n raise RuntimeError('Woot!')\n",
"version": 1
}
},
"state": "FAILURE"
}
请求状态转换¶
WAITING - 请求放置在队列上(或其他 kombu 消息总线/传输)但尚未被消费。
PENDING - 工作器接受请求,并正在使用其执行器(线程、进程或其他)等待运行。
FAILURE - 工作器在运行请求后失败(由于任务异常),或者没有工作器移动/开始执行(通过将请求置于 RUNNING 状态)在指定的时间跨度内(默认情况下为 60 秒,除非被覆盖)。
RUNNING - 工作器执行器(使用线程、进程…)已开始运行请求的任务(一旦状态转换为任何状态,请求超时将不再适用;因为此时无法确定任务需要多长时间才能完成,因为无法确定任务是需要很长时间才能完成还是已经失败)。
SUCCESS - 工作器完成运行任务而没有异常。
用法¶
Workers¶
要使用基于工作器的引擎,必须首先在远程机器上建立一组工作器。必须向这些工作器提供一个任务对象列表、任务名称、模块名称(或可以检查有效任务的入口点)(这样做是为了防止任意代码执行)。
有关完整参数和对象用法,请访问 Worker。
示例
from taskflow.engines.worker_based import worker as w
config = {
'url': 'amqp://guest:guest@localhost:5672//',
'exchange': 'test-exchange',
'topic': 'test-tasks',
'tasks': ['tasks:TestTask1', 'tasks:TestTask2'],
}
worker = w.Worker(**config)
worker.run()
引擎¶
要使用基于工作器的引擎,必须构造一个流程(其中包含在远程机器上可见的任务),并选择特定的基于工作器的引擎入口点。还必须提供某些配置选项,以便正确配置和初始化传输后端。否则,使用方式应该大部分是透明的(并且几乎与使用任何其他引擎类型相同)。
有关完整参数和对象用法,请参阅 WorkerBasedActionEngine。
使用 amqp 传输的示例
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine='worker-based',
url='amqp://guest:guest@localhost:5672//',
exchange='test-exchange',
topics=['topic1', 'topic2'])
eng.run()
使用文件系统传输的示例
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine='worker-based',
exchange='test-exchange',
topics=['topic1', 'topic2'],
transport='filesystem',
transport_options={
'data_folder_in': '/tmp/in',
'data_folder_out': '/tmp/out',
})
eng.run()
其他支持的关键字参数
executor:提供WorkerTaskExecutor接口的类;它将用于执行、撤销和等待远程任务。
限制¶
流程中的原子必须仅从 持久性 中定义的方式接收和接受参数。换句话说,在构建工作流时创建的任务与在远程工作器上执行的任务将不相同(并且未通过 输入和输出 机制传递的任何内部状态都无法传输)。这意味着资源对象(数据库句柄、文件描述符、套接字…)不能直接发送到远程工作器(而是必须定义如何获取/创建这些对象)。
基于工作器的引擎将来能够运行轻量级任务,以避免传输开销,对于非常简单的任务(当前即使对于轻量级任务也会远程运行,这可能性能不佳)。
故障检测,当前当工作器确认任务时,引擎将无限期地等待任务结果(任务可能需要不确定的时间才能完成)。将来需要一种限制远程工作器执行时间(并跟踪其存活时间)的方法,如果达到超时,则可能在辅助工作器上生成任务(即第一个工作器已死或停止响应)。
实现¶
- class taskflow.engines.worker_based.engine.WorkerBasedActionEngine(flow, flow_detail, backend, options)[source]¶
基类:
ActionEngine基于工作器的操作引擎。
特定的后端选项(从提供的引擎选项中提取)
- 参数:
exchange – 代理交换机名称,执行器/工作器通信在此交换机中执行
url – 代理连接 URL(请参阅 kombu 文档中的格式)
topics – 工作器主题列表,用于进行通信(这也会通过侦听工作器发出的通知来学习)。
transport – 要使用的传输(例如,amqp、memory 等)
transition_timeout – 数值(或 None 表示无限)以等待提交的远程请求从 (PENDING, WAITING) 请求状态转换。到期后,请求所做的任务的结果将成为
RequestTimeout异常,而不是通常返回的值(或引发的异常)。transport_options – 传输特定的选项(请参阅:http://kombu.readthedocs.org/ 以了解这些选项的含义和预期)
retry_options – 重试特定的选项(请参阅:
DEFAULT_RETRY_OPTIONS)worker_expiry – 数值(或负数/零/None 表示无限),定义在没有响应先前的通知/ping 请求的工作器继续发送消息的秒数(默认值为 60 秒)。
组件¶
警告
对内部引擎函数、组件和模块的外部使用应保持在最低限度,因为它们可能会被更改、重构或移动到其他位置而无需通知(并且没有典型的弃用周期)。
- class taskflow.engines.worker_based.dispatcher.Handler(process_message, validator=None)[source]¶
基类:
object接收消息时将调用的组件。
- property process_message¶
主要回调函数,用于处理接收到的消息。
只有在格式验证通过(如果适用,使用
validator回调函数)并且消息已确认后,才会调用此函数。
- property validator¶
可选回调函数,在处理之前激活。
如果存在此回调函数,则预计它会验证消息,如果消息无效,则引发
InvalidFormat。
- class taskflow.engines.worker_based.dispatcher.TypeDispatcher(type_handlers=None, requeue_filters=None)[source]¶
基类:
object接收消息并将其分派到特定类型的处理程序。
- property type_handlers¶
消息类型 -> 处理该消息的回调函数的字典。
通过查找消息属性“type”并将该类型映射到此字典中的回调函数来激活回调函数;如果找到一个,则预计它是一个接受两个位置参数的回调函数;第一个是消息数据,第二个是消息对象。如果未找到回调函数,则消息将被拒绝,底层消息传输将决定这意味着什么/暗示什么……
- property requeue_filters¶
用于请求重新排队消息的过滤器(回调函数)列表。
在确认消息之前激活这些回调函数,并且它可以指示调度程序重新排队消息而不是处理它。调用回调函数时,将提供两个位置参数;第一个是消息数据,第二个是消息对象。使用这些提供的参数,如果消息应该重新排队,则过滤器应返回一个真值对象,如果消息不应该重新排队,则返回一个假值对象。
- class taskflow.engines.worker_based.endpoint.Endpoint(task_cls)[source]¶
基类:
object表示具有 execute/revert 方法的单个任务。
- class taskflow.engines.worker_based.executor.WorkerTaskExecutor(uuid, exchange, topics, transition_timeout=60, url=None, transport=None, transport_options=None, retry_options=None, worker_expiry=60)[source]¶
基类:
TaskExecutor在远程工作器上执行任务。
- class taskflow.engines.worker_based.proxy.Proxy(topic, exchange, type_handlers=None, on_wait=None, url=None, transport=None, transport_options=None, retry_options=None)[source]¶
基类:
object代理处理来自/到命名交换器的消息。
仅供内部使用(不供公共使用)。
- DEFAULT_RETRY_OPTIONS = {'interval_max': 1, 'interval_start': 1, 'interval_step': 1, 'max_retries': 3}¶
用于在瞬态故障下重新连接的默认设置。
请参阅:http://kombu.readthedocs.org/(以及连接
ensure_options)了解这些值的含义/含义……
- property dispatcher¶
内部用于分派匹配消息的分派器。
- property connection_details¶
连接详细信息(只读)。
- property is_running¶
返回代理是否正在运行。
- class taskflow.engines.worker_based.worker.Worker(exchange, topic, tasks, executor=None, threads_count=None, url=None, transport=None, transport_options=None, retry_options=None)[source]¶
基类:
object可启动在远程主机上以处理任务请求的 Worker。
- 参数:
url – broker url
exchange – broker exchange 名称
topic – worker 声明的主题名称
tasks – worker 可以执行的任务列表,列表中的项目可以是以下类型之一:1,命名要搜索任务的 Python 模块名称或任务类名称的字符串;2,要在其中搜索任务的 Python 模块;3,将用于创建任务的任务类对象。
executor – 可用于在单独的线程中处理请求的自定义 executor 对象(如果未提供,将创建一个)
threads_count – 要传递给默认 executor 的线程数(仅当未传递 executor 时使用)
transport – 要使用的传输(例如,amqp、memory 等)
transport_options – 传输特定的选项(请参阅:http://kombu.readthedocs.org/ 以了解这些选项的含义和预期)
retry_options – 重试特定的选项(请参阅:
DEFAULT_RETRY_OPTIONS)
- banner¶
在运行前可以显示的有用的横幅。
- class taskflow.engines.worker_based.types.TopicWorker(topic, tasks, identity=<object object>)[source]¶
基类:
object一个(只读)worker 及其相关信息 + 有用的方法。
- class taskflow.engines.worker_based.types.ProxyWorkerFinder(uuid, proxy, topics, beat_periodicity=5, worker_expiry=60)[source]¶
基类:
object请求并接收有关 worker topic + 任务详细信息的响应。
- property total_workers¶
当前已知的 worker 数量。
- wait_for_workers(workers=1, timeout=None)[source]¶
等待至少 geq 个工作器通知它们已准备好进行工作。
注意(harlowja):如果提供了超时时间,此函数将等待直到超时时间到期,如果工作器的数量在超时到期之前未达到所需的工作器数量,则将返回仍需要多少个工作器,否则将返回零。
- property messages_processed¶
已处理的通知响应消息数量。