概述

这是一个调度任务到 worker 的引擎——独立的进程,专门用于执行某些原子操作,可能运行在其他机器上,通过 amqp(或其他支持的 kombu 传输)连接。

注意

这个引擎正在积极开发中,可用且 有效,但缺少一些功能(请查看 蓝图页面,了解已知问题和计划),这些功能将使其更适合生产环境。

术语

客户端

使用此库定义流程并使用引擎运行这些流程的代码、程序、服务(或用户)。

传输 + 协议

用于在客户端和 worker 之间传递信息的机制(以及基于该机制的 协议)。例如,amqp 作为传输,以及 json 编码的消息格式作为协议。

执行器 (Executor)

worker-based 引擎的一部分,用于发布任务请求,以便远程 worker 接受并处理这些请求。

Worker

Worker 在远程主机上启动,每个 worker 都有一个可以执行的任务列表(按需)。Worker 接受并处理由执行器发布的任务请求。可以在单独的线程(或进程…)中同时处理多个请求。例如,可以将 执行器 传递给 worker,并配置为在所需数量的线程(无论是否是 greenlet)中运行。

代理

执行器通过代理与 worker 交互。代理维护底层的传输并发布消息(并在接收到消息时调用回调)。

需求

  • 透明性:它应该作为现有 (本地) 引擎的临时替代品,只需最少的重构(如果有的话)。例如,如果一切设置和配置正确,应该能够在它上面运行相同的流程,而无需更改客户端代码。

  • 传输无关性:传输方式应该被抽象化,以便我们可以使用 oslo.messaginggearmandamqpzookeepermarconiwebsockets 或任何其他允许在客户端和 worker 之间传递信息的方式。

  • 简单性:它应该易于编写和部署。

  • 非均匀性:它应该支持非均匀的 worker,允许不同的 worker 根据其发布的 capabilities 执行不同的原子操作集合。

设计

有两个通信端,即 执行器(以及相关的引擎派生类)和 worker,它们使用代理组件进行通信。该代理设计用于接受/发布命名 exchange 中的消息。

高级架构

../_images/worker-engine.svg

执行器和 worker 通信

让我们考虑一下执行器和 worker 之间的通信方式。首先,引擎解析所有原子操作的依赖关系,并调度当前可以执行的原子操作。这使用了与其他引擎类型相同的调度和依赖关系解析逻辑。然后,worker-based 引擎执行器以以下方式执行可以立即执行的原子操作(那些依赖于其他任务输出的原子操作将在该输出准备好时执行):

  1. 执行器使用代理对象启动任务执行/撤销。

  2. Proxy 将任务请求(格式如下所述)发布到命名 exchange,使用用于将请求传递到特定 worker topic 的路由键。然后,执行器等待 worker 接受并确认任务请求。如果执行器在给定的超时时间内没有从 worker 收到任务确认,则该任务将被视为超时,并引发超时异常。

  3. worker 接收到请求消息,并启动一个新线程来处理它。

    1. worker 分派请求(获取实际执行任务的所需端点)。

    2. 如果分派成功,则 worker 向执行器发送确认响应,否则 worker 发送失败响应,以及序列化的 failure 对象,其中包含失败的内容(以及原因)。

    3. worker 执行任务,并在完成后将结果发送回原始执行器(每次触发任务进度事件时,它都会向执行器发送进度通知,由引擎处理,并分派给监听器等)。

  4. 执行器从 worker 收到任务请求确认,任务请求状态从 PENDING 变为 RUNNING 状态。一旦任务请求处于 RUNNING 状态,就无法超时(考虑到任务执行过程可能需要不可预测的时间)。

  5. 执行器从 worker 收到任务执行结果,并将其传递回执行器和 worker-based 引擎以完成任务处理(这会针对后续任务重复)。

注意

Failure 对象不能直接 json 序列化(它们包含对不可序列化的 traceback 的引用),因此在发送之前将它们转换为字典,并在执行器和 worker 两侧接收后从字典转换回来(这种转换是有损的,因为 traceback 由于其内容包含内部解释器引用和细节而无法完全保留)。

协议

taskflow.engines.worker_based.protocol.make_an_event(new_state)[source]

将新的/目标状态转换为事件名称。

taskflow.engines.worker_based.protocol.build_a_machine(freeze=True)[source]

构建状态机,允许请求通过。

taskflow.engines.worker_based.protocol.failure_to_dict(failure)[source]

尝试将 failure 对象转换为可 json 序列化的字典。

class taskflow.engines.worker_based.protocol.Message[source]

基类: object

所有消息类型的基类。

abstract to_dict()[source]

返回可 json 序列化的消息表示。

class taskflow.engines.worker_based.protocol.Notify(**data)[source]

基类: Message

表示 notify 消息类型。

TYPE = 'NOTIFY'

表示此消息类型的字符串常量。

RESPONSE_SCHEMA = {'additionalProperties': False, 'properties': {'tasks': {'items': {'type': 'string'}, 'type': 'array'}, 'topic': {'type': 'string'}}, 'required': ['topic', 'tasks'], 'type': 'object'}

预期的 notify 响应 消息模式(以 json 模式格式)。

SENDER_SCHEMA = {'additionalProperties': False, 'type': 'object'}

预期的 发送者 请求消息模式(以 json 模式格式)。

to_dict()[source]

返回可 json 序列化的消息表示。

class taskflow.engines.worker_based.protocol.Request(task, uuid, action, arguments, timeout=60, result=<object object>, failures=None)[source]

基类: Message

表示带有执行结果的请求。

每个请求都在 WAITING 状态下创建,并存储创建时间以及通过构造函数参数给定的超时时间。

请求在 WAITING/PENDING 状态下超过给定的超时时间时被认为已过期(在任何其他状态下均不被认为已过期)。

+------------+------------+---------+----------+---------+
|   Start    |   Event    |   End   | On Enter | On Exit |
+------------+------------+---------+----------+---------+
| FAILURE[$] |     .      |    .    |    .     |    .    |
|  PENDING   | on_failure | FAILURE |    .     |    .    |
|  PENDING   | on_running | RUNNING |    .     |    .    |
|  RUNNING   | on_failure | FAILURE |    .     |    .    |
|  RUNNING   | on_success | SUCCESS |    .     |    .    |
| SUCCESS[$] |     .      |    .    |    .     |    .    |
| WAITING[^] | on_failure | FAILURE |    .     |    .    |
| WAITING[^] | on_pending | PENDING |    .     |    .    |
+------------+------------+---------+----------+---------+
TYPE = 'REQUEST'

表示此消息类型的字符串常量。

SCHEMA = {'properties': {'action': {'enum': ['execute', 'revert'], 'type': 'string'}, 'arguments': {'type': 'object'}, 'failures': {'type': 'object'}, 'result': {}, 'task_cls': {'type': 'string'}, 'task_name': {'type': 'string'}, 'task_version': {'oneOf': [{'type': 'string'}, {'type': 'array'}]}}, 'required': ['task_cls', 'task_name', 'task_version', 'action'], 'type': 'object'}

预期的消息模式(以 json 模式格式)。

property current_state

当前请求的状态。

set_result(result)[source]

设置响应 futures 的结果。

property expired

检查请求是否已过期。

创建新请求时,其状态设置为 WAITING,存储创建时间,并通过构造函数参数给出超时时间。

如果请求在 WAITING/PENDING 状态下超过给定的超时时间,则该请求被认为已过期(不考虑任务执行过程可能需要不可预测的时间)。

to_dict()[source]

返回可JSON序列化的请求。

对于因某些异常而失败的请求,这将所有 failure.Failure 对象转换为字典(然后由接收方重新构建)。

transition_and_log_error(new_state, logger=None)[source]

转换并记录错误(如果转换引发异常)。

这覆盖了转换函数,并执行了几乎相同的功能,但如果转换无效,它不会引发异常,而是将警告记录到提供的记录器中,并返回 False 以指示转换未执行(请注意,这与转换函数不同,在转换函数中,False 表示忽略)。

transition(new_state)[source]

将请求转换为新状态。

如果执行了转换,则返回 True。如果转换被忽略,则返回 False。如果转换无效(并且不会执行),则引发 InvalidState 异常。

static from_dict(data, task_uuid=None)[source]

解析已验证数据为工作单元。

所有 Failure 对象,这些对象在远程侧已被转换为字典,现在将转换回 py:class:~taskflow.types.failure.Failure 对象。

class taskflow.engines.worker_based.protocol.Response(state, **data)[source]

基类: Message

表示响应消息类型。

TYPE = 'RESPONSE'

表示此消息类型的字符串常量。

SCHEMA = {'additionalProperties': False, 'definitions': {'completion': {'additionalProperties': False, 'properties': {'result': {}, 'required': ['result'], 'type': 'object'}, 'empty': {'additionalProperties': False, 'type': 'object'}, 'event': {'additionalProperties': False, 'properties': {'details': {'type': 'object'}, 'event_type': {'type': 'string'}}, 'required': ['event_type', 'details'], 'type': 'object'}}, 'properties': {'data': {'anyOf': [{'$ref': '#/definitions/event'}, {'$ref': '#/definitions/completion'}, {'$ref': '#/definitions/empty'}]}, 'state': {'enum': ['WAITING', 'PENDING', 'RUNNING', 'SUCCESS', 'FAILURE', 'EVENT'], 'type': 'string'}}, 'required': ['state', 'data'], 'type': 'object'}

预期的消息模式(以 json 模式格式)。

to_dict()[source]

返回可 json 序列化的消息表示。

示例

请求(执行)

  • task_name - 要执行的完整任务名称

  • task_cls - 要执行的完整任务类名称

  • action - 要执行的任务操作(例如,执行、撤销)

  • arguments - 调用任务操作时要使用的参数

  • result - 任务执行结果(结果或 Failure[仅传递给撤销]

此外,以下参数添加到请求消息中

  • reply_to - 执行器命名的交换机,工作器将把响应发送到该交换机

  • correlation_id - 执行器请求 ID(由于可以同时处理多个请求)

示例

{
    "action": "execute",
    "arguments": {
        "x": 111
    },
    "task_cls": "taskflow.tests.utils.TaskOneArgOneReturn",
    "task_name": "taskflow.tests.utils.TaskOneArgOneReturn",
    "task_version": [
        1,
        0
    ]
}

请求(撤销)

撤销时:

{
    "action": "revert",
    "arguments": {},
    "failures": {
        "taskflow.tests.utils.TaskWithFailure": {
            "exc_type_names": [
                "RuntimeError",
                "StandardError",
                "Exception"
            ],
            "exception_str": "Woot!",
            "traceback_str": "  File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n    result = task.execute(**arguments)\n  File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n    raise RuntimeError('Woot!')\n",
            "version": 1
        }
    },
    "result": [
        "failure",
        {
            "exc_type_names": [
                "RuntimeError",
                "StandardError",
                "Exception"
            ],
            "exception_str": "Woot!",
            "traceback_str": "  File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n    result = task.execute(**arguments)\n  File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n    raise RuntimeError('Woot!')\n",
            "version": 1
        }
    ],
    "task_cls": "taskflow.tests.utils.TaskWithFailure",
    "task_name": "taskflow.tests.utils.TaskWithFailure",
    "task_version": [
        1,
        0
    ]
}

工作器响应

运行时:

{
    "data": {},
    "state": "RUNNING"
}

进展时:

{
    "details": {
        "progress": 0.5
    },
    "event_type": "update_progress",
    "state": "EVENT"
}

成功时:

{
    "data": {
        "result": 666
    },
    "state": "SUCCESS"
}

失败时:

{
    "data": {
        "result": {
            "exc_type_names": [
                "RuntimeError",
                "StandardError",
                "Exception"
            ],
            "exception_str": "Woot!",
            "traceback_str": "  File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n    result = task.execute(**arguments)\n  File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n    raise RuntimeError('Woot!')\n",
            "version": 1
        }
    },
    "state": "FAILURE"
}

请求状态转换

WBE request state transitions

WAITING - 请求放置在队列上(或其他 kombu 消息总线/传输)但尚未被消费。

PENDING - 工作器接受请求,并正在使用其执行器(线程、进程或其他)等待运行。

FAILURE - 工作器在运行请求后失败(由于任务异常),或者没有工作器移动/开始执行(通过将请求置于 RUNNING 状态)在指定的时间跨度内(默认情况下为 60 秒,除非被覆盖)。

RUNNING - 工作器执行器(使用线程、进程…)已开始运行请求的任务(一旦状态转换为任何状态,请求超时将不再适用;因为此时无法确定任务需要多长时间才能完成,因为无法确定任务是需要很长时间才能完成还是已经失败)。

SUCCESS - 工作器完成运行任务而没有异常。

注意

WAITINGPENDING 阶段,引擎会跟踪请求的存活时间,如果达到超时,请求将自动转换为 FAILURE,并且来自工作器的任何进一步转换将被禁止(例如,如果工作器在将来接受请求并将任务设置为 PENDING,则此转换将被记录并忽略)。可以通过将引擎 transition_timeout 选项设置为更高的/更低的值或将其设置为 None(完全删除超时)来调整此超时。将来这将通过实现与 故障转移信息/弹性 相关的蓝图来得到改进。

用法

Workers

要使用基于工作器的引擎,必须首先在远程机器上建立一组工作器。必须向这些工作器提供一个任务对象列表、任务名称、模块名称(或可以检查有效任务的入口点)(这样做是为了防止任意代码执行)。

有关完整参数和对象用法,请访问 Worker

示例

from taskflow.engines.worker_based import worker as w

config = {
    'url': 'amqp://guest:guest@localhost:5672//',
    'exchange': 'test-exchange',
    'topic': 'test-tasks',
    'tasks': ['tasks:TestTask1', 'tasks:TestTask2'],
}
worker = w.Worker(**config)
worker.run()

引擎

要使用基于工作器的引擎,必须构造一个流程(其中包含在远程机器上可见的任务),并选择特定的基于工作器的引擎入口点。还必须提供某些配置选项,以便正确配置和初始化传输后端。否则,使用方式应该大部分是透明的(并且几乎与使用任何其他引擎类型相同)。

有关完整参数和对象用法,请参阅 WorkerBasedActionEngine

使用 amqp 传输的示例

flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine='worker-based',
                            url='amqp://guest:guest@localhost:5672//',
                            exchange='test-exchange',
                            topics=['topic1', 'topic2'])
eng.run()

使用文件系统传输的示例

flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine='worker-based',
                            exchange='test-exchange',
                            topics=['topic1', 'topic2'],
                            transport='filesystem',
                            transport_options={
                                'data_folder_in': '/tmp/in',
                                'data_folder_out': '/tmp/out',
                            })
eng.run()

其他支持的关键字参数

  • executor:提供 WorkerTaskExecutor 接口的类;它将用于执行、撤销和等待远程任务。

限制

  • 流程中的原子必须仅从 持久性 中定义的方式接收和接受参数。换句话说,在构建工作流时创建的任务与在远程工作器上执行的任务将不相同(并且未通过 输入和输出 机制传递的任何内部状态都无法传输)。这意味着资源对象(数据库句柄、文件描述符、套接字…)不能直接发送到远程工作器(而是必须定义如何获取/创建这些对象)。

  • 基于工作器的引擎将来能够运行轻量级任务,以避免传输开销,对于非常简单的任务(当前即使对于轻量级任务也会远程运行,这可能性能不佳)。

  • 故障检测,当前当工作器确认任务时,引擎将无限期地等待任务结果(任务可能需要不确定的时间才能完成)。将来需要一种限制远程工作器执行时间(并跟踪其存活时间)的方法,如果达到超时,则可能在辅助工作器上生成任务(即第一个工作器已死或停止响应)。

实现

class taskflow.engines.worker_based.engine.WorkerBasedActionEngine(flow, flow_detail, backend, options)[source]

基类:ActionEngine

基于工作器的操作引擎。

特定的后端选项(从提供的引擎选项中提取)

参数:
  • exchange – 代理交换机名称,执行器/工作器通信在此交换机中执行

  • url – 代理连接 URL(请参阅 kombu 文档中的格式)

  • topics – 工作器主题列表,用于进行通信(这也会通过侦听工作器发出的通知来学习)。

  • transport – 要使用的传输(例如,amqp、memory 等)

  • transition_timeout – 数值(或 None 表示无限)以等待提交的远程请求从 (PENDING, WAITING) 请求状态转换。到期后,请求所做的任务的结果将成为 RequestTimeout 异常,而不是通常返回的值(或引发的异常)。

  • transport_options – 传输特定的选项(请参阅:http://kombu.readthedocs.org/ 以了解这些选项的含义和预期)

  • retry_options – 重试特定的选项(请参阅:DEFAULT_RETRY_OPTIONS

  • worker_expiry – 数值(或负数/零/None 表示无限),定义在没有响应先前的通知/ping 请求的工作器继续发送消息的秒数(默认值为 60 秒)。

组件

警告

对内部引擎函数、组件和模块的外部使用应保持在最低限度,因为它们可能会被更改、重构或移动到其他位置而无需通知(并且没有典型的弃用周期)。

class taskflow.engines.worker_based.dispatcher.Handler(process_message, validator=None)[source]

基类: object

接收消息时将调用的组件。

property process_message

主要回调函数,用于处理接收到的消息。

只有在格式验证通过(如果适用,使用 validator 回调函数)并且消息已确认后,才会调用此函数。

property validator

可选回调函数,在处理之前激活。

如果存在此回调函数,则预计它会验证消息,如果消息无效,则引发 InvalidFormat

class taskflow.engines.worker_based.dispatcher.TypeDispatcher(type_handlers=None, requeue_filters=None)[source]

基类: object

接收消息并将其分派到特定类型的处理程序。

property type_handlers

消息类型 -> 处理该消息的回调函数的字典。

通过查找消息属性“type”并将该类型映射到此字典中的回调函数来激活回调函数;如果找到一个,则预计它是一个接受两个位置参数的回调函数;第一个是消息数据,第二个是消息对象。如果未找到回调函数,则消息将被拒绝,底层消息传输将决定这意味着什么/暗示什么……

property requeue_filters

用于请求重新排队消息的过滤器(回调函数)列表。

在确认消息之前激活这些回调函数,并且它可以指示调度程序重新排队消息而不是处理它。调用回调函数时,将提供两个位置参数;第一个是消息数据,第二个是消息对象。使用这些提供的参数,如果消息应该重新排队,则过滤器应返回一个真值对象,如果消息不应该重新排队,则返回一个假值对象。

on_message(data, message)[source]

此方法在接收到传入消息时调用。

class taskflow.engines.worker_based.endpoint.Endpoint(task_cls)[source]

基类: object

表示具有 execute/revert 方法的单个任务。

class taskflow.engines.worker_based.executor.WorkerTaskExecutor(uuid, exchange, topics, transition_timeout=60, url=None, transport=None, transport_options=None, retry_options=None, worker_expiry=60)[source]

基类: TaskExecutor

在远程工作器上执行任务。

execute_task(task, task_uuid, arguments, progress_callback=None)[source]

调度任务执行。

revert_task(task, task_uuid, arguments, result, failures, progress_callback=None)[source]

调度任务撤销。

wait_for_workers(workers=1, timeout=None)[source]

等待至少 geq 个工作器通知它们已准备好进行工作。

注意(harlowja):如果提供了超时时间,此函数将等待直到超时时间到期,如果工作器的数量在超时到期之前未达到所需的工作器数量,则将返回仍需要多少个工作器,否则将返回零。

start()[source]

启动消息处理线程。

stop()[source]

停止消息处理线程。

class taskflow.engines.worker_based.proxy.Proxy(topic, exchange, type_handlers=None, on_wait=None, url=None, transport=None, transport_options=None, retry_options=None)[source]

基类: object

代理处理来自/到命名交换器的消息。

仅供内部使用(不供公共使用)。

DEFAULT_RETRY_OPTIONS = {'interval_max': 1, 'interval_start': 1, 'interval_step': 1, 'max_retries': 3}

用于在瞬态故障下重新连接的默认设置。

请参阅:http://kombu.readthedocs.org/(以及连接 ensure_options)了解这些值的含义/含义……

property dispatcher

内部用于分派匹配消息的分派器。

property connection_details

连接详细信息(只读)。

property is_running

返回代理是否正在运行。

publish(msg, routing_key, reply_to=None, correlation_id=None)[source]

使用给定的路由键将消息发布到命名的交换器。

start()[source]

启动代理。

wait()[source]

等待代理启动。

stop()[source]

停止代理。

class taskflow.engines.worker_based.worker.Worker(exchange, topic, tasks, executor=None, threads_count=None, url=None, transport=None, transport_options=None, retry_options=None)[source]

基类: object

可启动在远程主机上以处理任务请求的 Worker。

参数:
  • url – broker url

  • exchange – broker exchange 名称

  • topic – worker 声明的主题名称

  • tasks – worker 可以执行的任务列表,列表中的项目可以是以下类型之一:1,命名要搜索任务的 Python 模块名称或任务类名称的字符串;2,要在其中搜索任务的 Python 模块;3,将用于创建任务的任务类对象。

  • executor – 可用于在单独的线程中处理请求的自定义 executor 对象(如果未提供,将创建一个)

  • threads_count – 要传递给默认 executor 的线程数(仅当未传递 executor 时使用)

  • transport – 要使用的传输(例如,amqp、memory 等)

  • transport_options – 传输特定的选项(请参阅:http://kombu.readthedocs.org/ 以了解这些选项的含义和预期)

  • retry_options – 重试特定的选项(请参阅:DEFAULT_RETRY_OPTIONS

banner

在运行前可以显示的有用的横幅。

run(display_banner=True, banner_writer=None)[source]

运行 worker。

wait()[source]

等待 worker 启动。

stop()[source]

停止 worker。

class taskflow.engines.worker_based.types.TopicWorker(topic, tasks, identity=<object object>)[source]

基类: object

一个(只读)worker 及其相关信息 + 有用的方法。

class taskflow.engines.worker_based.types.ProxyWorkerFinder(uuid, proxy, topics, beat_periodicity=5, worker_expiry=60)[source]

基类: object

请求并接收有关 worker topic + 任务详细信息的响应。

property total_workers

当前已知的 worker 数量。

wait_for_workers(workers=1, timeout=None)[source]

等待至少 geq 个工作器通知它们已准备好进行工作。

注意(harlowja):如果提供了超时时间,此函数将等待直到超时时间到期,如果工作器的数量在超时到期之前未达到所需的工作器数量,则将返回仍需要多少个工作器,否则将返回零。

property messages_processed

已处理的通知响应消息数量。

maybe_publish()[source]

定期调用,以向每个主题发布通知消息。

这些消息(尤其是响应)是此查找器了解 worker 以及它们可以执行哪些任务的方式(以便我们可以将 worker 匹配到要运行的任务)。

process_response(data, message)[source]

处理从远程发送的通知消息。

clean()[source]

清除任何已死/过期/未响应的 worker。

返回已删除的 worker 数量。

reset()[source]

重置查找器的内部状态。

get_worker_for_task(task)[source]

获取可以执行给定任务的 worker。