Jobs

概述

Jobs 和 jobboards 是 TaskFlow 提供的一种新颖概念,用于允许工作流在有能力的拥有者(这些拥有者通常会使用 engines 来完成工作流)之间自动转移所有权。它们提供了必要的语义,能够以原子方式将作业从生产者可靠且容错地转移到消费者。它们借鉴了物理世界中发布和获取工作的方式(通常报纸或在线网站上的招聘信息起着类似的作用)。

简而言之: 它类似于队列,但消费者在声明时会锁定队列中的项目,并且仅在完成工作后才将其从队列中移除。如果消费者失败,锁将自动释放,并且该项目将返回队列以供进一步消费。

注意

有关更多信息,请访问 范式转变 页面以获取更多详细信息。

定义

招聘

一个 job 包含一个唯一的标识符、名称以及对 logbook 的引用,该引用包含有关已完成或应该/将要完成的工作的详细信息,以完成为此作业创建的工作。

Jobboards

一个 jobboard 负责管理作业的发布、所有权和交付。它充当可以发布、声明和搜索作业的位置;通常通过迭代或通知进行。Jobboards 可以由不同的有能力的实现支持(每个实现可能具有不同的配置),但所有 jobboards 都实现相同的接口和语义,以便后端使用尽可能透明。这允许使用 TaskFlow 的服务的部署者或开发人员选择最适合其设置(及其预期用途)的 jobboard 实现。

高级架构

../_images/jobboard.png

注意: 此图显示了高级图(并且此文档的其他部分也引用了它)的 zookeeper 实现(其他实现通常具有不同的架构)。

特性

  • 高可用性

    • 通过将部分完成的工作或尚未开始的工作转移到可以恢复先前部分完成的工作或开始初始工作的实体来保证工作流向前推进,从而确保整个工作流的推进(推进意味着通过工作流 patternsatoms 转换并通过其相关的 states 转换)。

  • 原子转移和单一所有权

    • 确保同一时间只有一个工作流由单个所有者管理(包括当工作流转移到恢复其他失败所有者工作的拥有者时)。这避免了争用,并确保工作流由一个且只有一个实体管理。

    • 注意: 这并不意味着所有者需要自己运行工作流,而是说该所有者可以使用 engine 以分布式方式运行工作,以确保工作流的推进。

  • 工作流构建和执行的分离

    • 可以使用 logbooks 创建作业,logbooks 包含一个关于实体(例如 API 服务器)要完成的工作的规范。然后,该作业可以由监视该 jobboard 的实体(不一定是 API 服务器本身)完成。这在工作形成和工作完成之间创建了断开连接,这对于横向扩展很有用。

  • 异步完成

    • 例如,当 API 服务器将作业发布到 jobboard 以供完成时,该 API 服务器可以向调用 API 服务的用户返回一个跟踪标识符。用户可以使用此跟踪标识符来轮询状态(类似于 FedEx 或 UPS 创建的运输跟踪标识符)。

用法

所有 jobboards 都是实现相同接口的类,当然也可以像在 Python 中导入和创建任何其他类一样导入和创建它们。但是,创建 jobboards 的更简单(推荐)方法是使用 fetch() 函数,该函数使用 entrypoints(内部使用 stevedore)来获取和配置后端。

使用此函数,典型的 jobboard 创建(以及发布作业的示例)可能如下所示

from taskflow.persistence import backends as persistence_backends
from taskflow.jobs import backends as job_backends

...
persistence = persistence_backends.fetch({
    "connection': "mysql",
    "user": ...,
    "password": ...,
})
book = make_and_save_logbook(persistence)
board = job_backends.fetch('my-board', {
    "board": "zookeeper",
}, persistence=persistence)
job = board.post("my-first-job", book)
...

类似地,通过创建 jobboard 并使用迭代功能来查找和声明作业(并最终消费它们)来消费作业。典型的 jobboard 用法(用于消费和工作完成)可能如下所示

import time

from taskflow import exceptions as exc
from taskflow.persistence import backends as persistence_backends
from taskflow.jobs import backends as job_backends

...
my_name = 'worker-1'
coffee_break_time = 60
persistence = persistence_backends.fetch({
    "connection': "mysql",
    "user": ...,
    "password": ...,
})
board = job_backends.fetch('my-board', {
    "board": "zookeeper",
}, persistence=persistence)
while True:
    my_job = None
    for job in board.iterjobs(only_unclaimed=True):
        try:
            board.claim(job, my_name)
        except exc.UnclaimableJob:
            pass
        else:
            my_job = job
            break
    if my_job is not None:
        try:
            perform_job(my_job)
        except Exception:
            LOG.exception("I failed performing job: %s", my_job)
            board.abandon(my_job, my_name)
        else:
            # I finished it, now cleanup.
            board.consume(my_job)
            persistence.get_connection().destroy_logbook(my_job.book.uuid)
    time.sleep(coffee_break_time)
...

有几种方法可以在 flow 中提供参数。第一种选择是在 logbook 中的 flowdetail 对象中添加一个 store

您也可以在将作业发布到 job board 时在 job 本身中提供一个 store。如果找到这两个 store 值,它们将被组合,job store 将覆盖 logbook store

from oslo_utils import uuidutils

from taskflow import engines
from taskflow.persistence import backends as persistence_backends
from taskflow.persistence import models
from taskflow.jobs import backends as job_backends


...
persistence = persistence_backends.fetch({
    "connection': "mysql",
    "user": ...,
    "password": ...,
})
board = job_backends.fetch('my-board', {
    "board": "zookeeper",
}, persistence=persistence)

book = models.LogBook('my-book', uuidutils.generate_uuid())

flow_detail = models.FlowDetail('my-job', uuidutils.generate_uuid())
book.add(flow_detail)

connection = persistence.get_connection()
connection.save_logbook(book)

flow_detail.meta['store'] = {'a': 1, 'c': 3}

job_details = {
    "flow_uuid": flow_detail.uuid,
    "store": {'a': 2, 'b': 1}
}

engines.save_factory_details(flow_detail, flow_factory,
                             factory_args=[],
                             factory_kwargs={},
                             backend=persistence)

jobboard = get_jobboard(zk_client)
jobboard.connect()
job = jobboard.post('my-job', book=book, details=job_details)

# the flow global parameters are now the combined store values
# {'a': 2, 'b': 1', 'c': 3}
...

类型

Zookeeper

Board 类型: 'zookeeper'

使用 zookeeper 通过使用 zookeeper 目录、短暂节点、非短暂节点和监视来提供 jobboard 功能和语义。

其他关键字参数

  • client: 提供 kazoo.client.KazooClient-like 接口的类;它将用于 zookeeper 交互,在 jobboard 实例之间共享客户端可能会提供更好的可扩展性,并有助于避免创建太多与一组 zookeeper 服务器的开放连接。

  • persistence: 提供 persistence 后端接口的类;它将用于加载作业 logbooks 以在运行时使用或在声明作业进行检查之前使用。

其他配置参数

  • path: 存储作业信息的 zookeeper 根路径(默认值/taskflow/jobs

  • hosts: 要连接的 zookeeper 主机列表(默认值localhost:2181);仅当未提供客户端时才使用。

  • timeout: 执行 zookeeper 操作时使用的超时时间;仅当未提供客户端时才使用。

  • handler: 提供 kazoo.handlers-like 接口的类;它将由 kazoo 内部用于执行异步操作,当您的程序使用 eventlet 并且您希望指示 kazoo 使用与 eventlet 兼容的处理程序时,这很有用。

注意

有关实现细节,请参阅 ZookeeperJobBoard

Redis

Board 类型: 'redis'

使用 redis 通过使用 redis 哈希数据结构和单个作业所有权键(可以配置在给定时间后过期)来提供 jobboard 功能和语义。

注意

有关实现细节,请参阅 RedisJobBoard

Etcd

Board 类型: 'etcd'

使用 etcd 通过使用 Etcd 键值数据结构和单个作业所有权键(可以配置在给定时间后过期)来提供 jobboard 功能。

其他关键字参数

  • persistence: 提供 persistence 后端接口的类;它将用于加载作业 logbooks 以在运行时使用或在声明作业进行检查之前使用。

其他配置参数

  • path: 存储作业信息的 Etcd 路径(默认值jobboard

  • host: 要连接的 Etcd 主机(默认值localhost

  • port: Etcd 服务器的端口(默认值2379

  • api_path: Etcd API 端点的路径

  • protocol: 用于与服务器通信的协议(默认值http,选项为 httphttps

  • ca_certcert_keycert_cert: 传递给 Etcd3gw 的证书信息,用于 https 通信

  • timeout: 执行 Etcd 操作时使用的超时时间

  • ttl: 声明作业时的默认生存时间(默认值None

注意

有关实现细节,请参阅 EtcdJobBoard

注意事项

在使用 jobboard 时,应使用一些使用注意事项,以确保其以安全可靠的方式使用。最终我们希望使这些成为非问题,但现在它们值得一提。

双引擎作业

是什么: 由于 atoms 和 engines 当前不可抢占,因此我们无法强制 engine(或它用于运行的线程/远程工作者……)停止处理 atom(强制代码停止而不经其同意通常是坏行为),如果它已经开始处理 atom。这可能会导致问题,因为 engine 可以注意到它不再拥有声明的时间点是在任何 state 更改发生时(例如,过渡到新的 atom 或记录结果),此时注意到声明丢失后,engine 可以立即停止进行进一步的工作。造成的影响是,当声明丢失时,另一个 engine 可以立即尝试获取先前丢失的声明,并且它可能会开始处理另一个 engine 可能仍在执行的未完成任务(因为该 engine 尚未意识到它丢失了声明)。

简而言之: 不可抢占,在下一个状态更改后才能意识到失去声明的事实,另一个 engine 可能会在此期间获取声明,因此两者都在处理作业。

通过以下方式缓解

  1. 确保您的 atoms 是幂等的,这将导致可能正在执行相同 atom 的 engine 能够继续执行而不会造成任何冲突/问题(幂等性保证这一点)。

  2. 在声明先前声明过的作业之前,强制执行一项策略,该策略发生在作业的工作流程开始之前(可能在 engine 开始作业的工作之前),以确保任何先前的工作都已回滚后再继续前进。例如

    • 回滚最后完成的 atom/atom 集。

    • 回滚发生的最后状态更改。

  3. 通过添加等待期来延迟声明部分完成的工作(以允许先前的 engine 聚合),然后再处理部分完成的作业(将此与上述建议结合使用,大多数双引擎问题都应避免)。

接口

class taskflow.jobs.base.JobPriority(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

基础: Enum

作业优先级的枚举(基于 hadoop 作业优先级建模)。

VERY_HIGH = 'VERY_HIGH'

极其紧急的作业优先级。

HIGH = 'HIGH'

轻微紧急的作业优先级。

NORMAL = 'NORMAL'

默认作业优先级。

LOW = 'LOW'

优先级较低的任务。

VERY_LOW = 'VERY_LOW'

优先级非常低的任务。

classmethod reorder(*values)[source]

重新排序 (优先级, 值) 元组 -> 优先级排序后的值。

class taskflow.jobs.base.Job(board, name, uuid=None, details=None, backend=None, book=None, book_data=None)[source]

基类: object

表示一个命名的、可跟踪的工作单元的抽象类。

一个任务连接一个日志簿、一个所有者、一个优先级、最后修改和创建日期以及任务具有的任何相关状态。由于它连接到一个日志簿,而每个日志簿都与一组可以创建流程的工厂相关联,因此它是可以由实体拥有(通常该实体将读取这些日志簿并运行任何包含的流程)的工作单元的当前顶级容器。

在可预见的未来,将只允许一个实体拥有和操作一个任务中包含的流程。

注意(harlowja):此对象将在失败时传输到另一个实体,以便将包含的流程的所有权转移给第二个实体/所有者,以便恢复、继续、撤销…

abstract property last_modified

任务最后修改的日期时间。

abstract property created_on

任务创建的日期时间。

property board

此任务发布或创建的看板。

abstract property state

访问此任务的当前状态。

abstract property priority

此任务的 JobPriority

wait(timeout=None, delay=0.01, delay_multiplier=2.0, max_delay=60.0, sleep_func=<built-in function sleep>)[source]

等待任务进入完成状态。

如果任务在给定的超时时间内未完成,则返回 false,否则返回 true(如果由于某种原因无法读取任务信息,则也可能引发任务失败异常)。定期状态检查将每 delay 秒发生一次,其中 delay 将在找到未完成的状态后乘以给定的乘数。

请注意,如果未提供超时时间,则相当于阻塞直到任务完成。 另外请注意,如果看板后端可以优化此方法,则其实现可能不会使用延迟(和退避) 。但是,无论应用了什么优化,实现都必须始终尊重给定的超时值。

property book

与此任务关联的日志簿。

如果未与此任务关联日志簿,则此属性为 None。

property book_uuid

与此任务关联的日志簿的 UUID。

如果未与此任务关联日志簿,则此属性为 None。

property book_name

与此任务关联的日志簿的名称。

如果未与此任务关联日志簿,则此属性为 None。

property uuid

此任务的 UUID。

property details

与此任务关联的任何详细信息的字典。

property name

此任务的非唯一标识名称。

class taskflow.jobs.base.JobBoardIterator(board, logger, board_fetch_func=None, board_removal_func=None, only_unclaimed=False, ensure_fresh=False)[source]

基类: object

遍历看板的迭代器,遍历潜在的任务。

它提供以下属性

  • only_unclaimed:一个布尔值,指示是否仅遍历未认领的任务

  • ensure_fresh:一个布尔值,要求每次获取新的任务集时,强制后端刷新(确保看板具有最新的任务列表)

  • board:此迭代器创建的看板

property board

此迭代器创建的看板。

class taskflow.jobs.base.JobBoard(name, conf)[source]

基类: object

可以发布、重新发布、认领和转移任务的地方。

此看板可以有多种实现,具体取决于底层看板实现所需的语义和功能。

注意(harlowja):名称旨在类似于报纸或其他地方使用的公告牌/发布系统,用于征集人们可以面试和申请(然后工作和完成)的任务。

abstract iterjobs(only_unclaimed=False, ensure_fresh=False)[source]

返回当前在此看板上的任务的迭代器。

注意(harlowja):迭代顺序应为发布顺序(从旧到新),优先级较高的任务应在优先级较低的任务之前提供,但由后端实现提供最适合它的顺序。

注意(harlowja):返回的迭代器可能支持其他属性,这些属性可用于进一步自定义迭代方式;请检查后端的迭代器对象以确定支持哪些其他属性。

参数:
  • only_unclaimed – 一个布尔值,指示是否仅遍历未认领的任务。

  • ensure_fresh – 一个布尔值,要求仅遍历最新的任务,其中“最新”的定义由后端特定。后端如果其内部语义/功能不支持此参数,则可以忽略此值。

abstract wait(timeout=None)[source]

等待给定的时间,直到有任务发布。

找到任务后,将返回一个迭代器,该迭代器可用于遍历这些任务。

注意(harlowja):由于看板可以由多个外部实体同时修改,因此返回的迭代器可能为空,因为其他实体可能在创建迭代器后删除了这些任务(请注意在使用时这一点)。

参数:

timeout – 等待任务出现的时间(以秒为单位)(如果为 None,则永久等待)。

abstract property job_count

返回此看板上的任务数量。

注意(harlowja):此计数可能会随着任务的出现或删除而变化,因此不应以需要精确和绝对的方式使用此计数的准确性。

abstract find_owner(job)[source]

如果存在,则获取任务的所有者。

property name

此看板的非唯一标识名称。

abstract consume(job, who)[source]

永久(且原子地)从看板中删除任务。

消耗向看板(以及任何其他检查看板的人)发出信号,表明此任务已由先前认领该任务的实体完成。

只有认领该任务的实体才能消耗该任务。

已消耗的任务不能被另一个实体认领或重新发布(任务发布是不可变的)。任何消耗未认领的任务(或他们没有认领权的任务)的实体都会引发异常。

参数:
  • job – 此看板上的一个任务,可以被消耗(如果不存在,则会引发 NotFound 异常)。

  • who – 执行消耗的实体的名称,必须与用于认领此任务的名称相同。

abstract post(name, book=None, details=None, priority=JobPriority.NORMAL)[source]

原子地将任务创建并发布到看板。

此发布允许其他人尝试认领该任务(然后进行工作)。提供的日志簿、详细信息字典或名称(或这些的组合)的内容必须提供足够的用于引用和构建包含的工作的信息(无论是什么)。

一旦发布了任务,只能通过消费该任务(在任务被认领后)来将其移除。任何实体都可以向任务板发布/提议任务(未来这可能会受到限制)。

返回一个任务对象,代表发布的信息。

abstract claim(job, who)[source]

原子性地尝试认领提供的任务。

如果一个任务被认领,预计认领该任务的实体将在未来的某个时间处理该任务的内容,并完成任务(导致重新发布)或从任务板消费该任务(表示其完成)。如果认领失败,将引发相应的异常来向认领尝试者发出信号。

参数:
  • job – 此任务板上可以认领的任务(如果不存在,将引发 NotFound 异常)。

  • who – 认领实体的名称字符串。

abstract abandon(job, who)[source]

原子性地尝试放弃提供的任务。

此放弃信号告知其他人该任务现在可以被重新认领。通常,如果认领任务的实体未能或无法完成其先前认领的任务,则会发生这种情况。

只有认领了该任务的实体才能放弃任务。任何实体放弃未认领的任务(或他们不拥有的任务)都将导致异常。

参数:
  • job – 此任务板上可以放弃的任务(如果不存在,将引发 NotFound 异常)。

  • who – 执行放弃的实体的名称字符串,必须与认领此任务时使用的名称相同。

abstract trash(job, who)[source]

丢弃提供的任务。

丢弃任务向其他人发出信号,该任务已损坏,不应被重新认领。这为用户提供了一种从外部将任务从板上删除的选项。丢弃的任务详细信息应保留在其他位置以供审查(如果需要)。

只有认领了该任务的实体才能丢弃任务。任何实体丢弃未认领的任务(或他们不拥有的任务)都将导致异常。

参数:
  • job – 此任务板上可以丢弃的任务(如果不存在,将引发 NotFound 异常)。

  • who – 执行丢弃的实体的名称字符串,必须与认领此任务时使用的名称相同。

abstract register_entity(entity)[source]

将实体注册到任务板(的后端),例如:一个指挥者。

参数:

entity (Entity) – 要注册为与任务板(的后端)关联的实体

abstract property connected

返回此任务板是否已连接。

abstract connect()[source]

打开到任何后端系统的连接。

abstract close()[source]

关闭到任何后端系统的连接。

关闭后,除非重新连接,否则任务板将无法使用。

class taskflow.jobs.base.NotifyingJobBoard(name, conf)[source]

Bases: JobBoard

一个可以通知其他人有关板事件的子任务板。

实现者预计至少会通知有关任务发布和删除的信息。

注意(harlowja):发出的通知可能会在单独的专用线程中发出,因此请确保注册的所有回调都是线程安全的(并且阻塞的时间尽可能短)。

taskflow.jobs.backends.fetch(name, conf, namespace='taskflow.jobboards', **kwargs)[source]

获取具有给定配置的作业板后端。

此获取方法将在入口点命名空间中查找入口点名称,然后尝试使用提供的名称、配置和任何板特定的 kwargs 实例化该入口点。

注意(harlowja):为了便于指定配置和选项到板,配置(通常只是一个字典)也可以是一个标识入口点名称和该板特定配置的 URI 字符串。

例如,给定以下配置 URI

zookeeper://<not-used>/?a=b&c=d

这将查找名为“zookeeper”的入口点,并将由 URI 的组件组成的配置对象(在本例中为 {'a': 'b', 'c': 'd'})提供给该板实例的构造函数(包括指定的名称)。

taskflow.jobs.backends.backend(name, conf, namespace='taskflow.jobboards', **kwargs)[source]

获取作业板,连接到它并在完成后关闭它。

这允许获取板实例,连接到它,然后在上下文管理器语句中使用该板,并在上下文管理器退出时关闭该板。

实现

Zookeeper

class taskflow.jobs.backends.impl_zookeeper.ZookeeperJob(board, name, client, path, uuid=None, details=None, book=None, book_data=None, created_on=None, backend=None, priority=JobPriority.NORMAL)[source]

Bases: Job

一个 zookeeper 任务。

property lock_path

存储作业锁/认领和所有者 znode 的路径。

property priority

此任务的 JobPriority

property path

存储作业数据 znode 的路径。

property sequence

当前作业的序列号。

property root

作业在 zookeeper 中的父路径。

property last_modified

任务最后修改的日期时间。

property created_on

任务创建的日期时间。

property state

访问此任务的当前状态。

class taskflow.jobs.backends.impl_zookeeper.ZookeeperJobBoard(name, conf, client=None, persistence=None, emit_notifications=True)[source]

Bases: NotifyingJobBoard

一个由 zookeeper 支持的任务板。

kazoo 库提供支持。

此任务板在 zookeeper 中的一个目录中创建序列化持久 znodes,并使用 zookeeper 监视来通知其他任务板使用 post() 方法发布的作业(这会创建一个包含作业内容/详细信息编码在 json 中的 znode)。这些任务板的用户(可能在不相交的机器集合上)然后可以遍历可用的作业,并决定他们是否想要尝试认领他们遍历的作业中的一个。如果是这样,他们将尝试联系 zookeeper,他们将尝试使用持久 znode 的名称加上“.lock”作为后缀创建一个临时 znode。如果尝试使用 claim() 方法认领作业的实体能够创建具有该名称的临时 znode,则允许(并期望)执行该作业内容描述的任何工作。完成之后,临时 znode 和持久 znode 将在单个事务中删除(如果成功完成)。如果认领实体不成功(或认领该 znode 的实体死亡),则临时 znode 将被释放(手动使用 abandon() 或由 zookeeper 在临时节点和关联会话被认为丢失时自动释放)。

请注意,kazoo 客户端的创建是通过 make_client() 实现的,并且此任务板配置的传输到该函数以创建客户端可能发生在 __init__ 时。这意味着此任务板配置中的某些参数可以提供给 make_client(),以便如果调用者未提供客户端,则将根据 make_client() 的规范创建客户端。

MIN_ZK_VERSION = (3, 4, 0)

事务支持是在 3.4.0 中添加的,因此我们需要至少该版本。

LOCK_POSTFIX = '.lock'

锁条目具有的 Znode 后缀

TRASH_FOLDER = '.trash'

根路径下创建的 Znode 子路径,包含已删除的作业。

ENTITY_FOLDER = '.entities'

根路径下创建的 Znode 子路径,包含已注册的实体。

JOB_PREFIX = 'job'

作业条目具有的 Znode 前缀

DEFAULT_PATH = '/taskflow/jobs'

用于作业的默认 Znode 路径(数据、锁等)。

STATE_HISTORY_LENGTH = 2

要保留的先前状态更改的数量,主要用于历史记录跟踪和调试连接问题。

NO_FETCH_STATES = ('LOST', 'SUSPENDED')

在这些状态下,从获取例程返回空列表的客户端状态,在此期间,基础连接正在恢复或可能恢复(即,它没有完全断开连接)。

property path

所有作业 Znode 将存储的路径。

property trash_path

所有已删除的作业 Znode 将存储的路径。

property entity_path

所有 Conductor 信息 Znode 将存储的路径。

property job_count

返回此看板上的任务数量。

注意(harlowja):此计数可能会随着任务的出现或删除而变化,因此不应以需要精确和绝对的方式使用此计数的准确性。

iterjobs(only_unclaimed=False, ensure_fresh=False)[source]

返回当前在此看板上的任务的迭代器。

注意(harlowja):迭代顺序应为发布顺序(从旧到新),优先级较高的任务应在优先级较低的任务之前提供,但由后端实现提供最适合它的顺序。

注意(harlowja):返回的迭代器可能支持其他属性,这些属性可用于进一步自定义迭代方式;请检查后端的迭代器对象以确定支持哪些其他属性。

参数:
  • only_unclaimed – 一个布尔值,指示是否仅遍历未认领的任务。

  • ensure_fresh – 一个布尔值,要求仅遍历最新的任务,其中“最新”的定义由后端特定。后端如果其内部语义/功能不支持此参数,则可以忽略此值。

post(name, book=None, details=None, priority=JobPriority.NORMAL)[source]

原子地将任务创建并发布到看板。

此发布允许其他人尝试认领该任务(然后进行工作)。提供的日志簿、详细信息字典或名称(或这些的组合)的内容必须提供足够的用于引用和构建包含的工作的信息(无论是什么)。

一旦发布了任务,只能通过消费该任务(在任务被认领后)来将其移除。任何实体都可以向任务板发布/提议任务(未来这可能会受到限制)。

返回一个任务对象,代表发布的信息。

claim(job, who)[source]

原子性地尝试认领提供的任务。

如果一个任务被认领,预计认领该任务的实体将在未来的某个时间处理该任务的内容,并完成任务(导致重新发布)或从任务板消费该任务(表示其完成)。如果认领失败,将引发相应的异常来向认领尝试者发出信号。

参数:
  • job – 此任务板上可以认领的任务(如果不存在,将引发 NotFound 异常)。

  • who – 认领实体的名称字符串。

find_owner(job)[source]

如果存在,则获取任务的所有者。

register_entity(entity)[source]

将实体注册到任务板(的后端),例如:一个指挥者。

参数:

entity (Entity) – 要注册为与任务板(的后端)关联的实体

consume(job, who)[source]

永久(且原子地)从看板中删除任务。

消耗向看板(以及任何其他检查看板的人)发出信号,表明此任务已由先前认领该任务的实体完成。

只有认领该任务的实体才能消耗该任务。

已消耗的任务不能被另一个实体认领或重新发布(任务发布是不可变的)。任何消耗未认领的任务(或他们没有认领权的任务)的实体都会引发异常。

参数:
  • job – 此看板上的一个任务,可以被消耗(如果不存在,则会引发 NotFound 异常)。

  • who – 执行消耗的实体的名称,必须与用于认领此任务的名称相同。

abandon(job, who)[source]

原子性地尝试放弃提供的任务。

此放弃信号告知其他人该任务现在可以被重新认领。通常,如果认领任务的实体未能或无法完成其先前认领的任务,则会发生这种情况。

只有认领了该任务的实体才能放弃任务。任何实体放弃未认领的任务(或他们不拥有的任务)都将导致异常。

参数:
  • job – 此任务板上可以放弃的任务(如果不存在,将引发 NotFound 异常)。

  • who – 执行放弃的实体的名称字符串,必须与认领此任务时使用的名称相同。

trash(job, who)[source]

丢弃提供的任务。

丢弃任务向其他人发出信号,该任务已损坏,不应被重新认领。这为用户提供了一种从外部将任务从板上删除的选项。丢弃的任务详细信息应保留在其他位置以供审查(如果需要)。

只有认领了该任务的实体才能丢弃任务。任何实体丢弃未认领的任务(或他们不拥有的任务)都将导致异常。

参数:
  • job – 此任务板上可以丢弃的任务(如果不存在,将引发 NotFound 异常)。

  • who – 执行丢弃的实体的名称字符串,必须与认领此任务时使用的名称相同。

wait(timeout=None)[source]

等待给定的时间,直到有任务发布。

找到任务后,将返回一个迭代器,该迭代器可用于遍历这些任务。

注意(harlowja):由于看板可以由多个外部实体同时修改,因此返回的迭代器可能为空,因为其他实体可能在创建迭代器后删除了这些任务(请注意在使用时这一点)。

参数:

timeout – 等待任务出现的时间(以秒为单位)(如果为 None,则永久等待)。

property connected

返回此任务板是否已连接。

close()[source]

关闭到任何后端系统的连接。

关闭后,除非重新连接,否则任务板将无法使用。

connect(timeout=10.0)[source]

打开到任何后端系统的连接。

Redis

class taskflow.jobs.backends.impl_redis.RedisJob(board, name, sequence, key, uuid=None, details=None, created_on=None, backend=None, book=None, book_data=None, priority=JobPriority.NORMAL)[source]

Bases: Job

一个 Redis 作业。

property key

作业数据存储在板列表/垃圾哈希中的键。

property priority

此任务的 JobPriority

property last_modified_key

作业上次修改数据存储的键。

property owner_key

作业声明 + 所有者数据存储的键。

property sequence

当前作业的序列号。

expires_in()[source]

声明到期需要多少秒。

返回声明条目到期的时间(秒),或者 DOES_NOT_EXPIREKEY_NOT_FOUND 如果它不失效或如果无法确定失效时间(也许 owner_key 在查询时到期或之前到期?)。

extend_expiry(expiry)[source]

延长此作业的拥有者密钥(即声明)的到期时间。

注意(harlowja):如果此作业的声明之前没有关联到期时间,则调用此方法将创建一个(并且在该时间过后,此作业上的声明将不再存在)。

如果请求到期成功,则返回 True,否则返回 False

property created_on

任务创建的日期时间。

property last_modified

任务最后修改的日期时间。

property state

访问此任务的当前状态。

class taskflow.jobs.backends.impl_redis.RedisJobBoard(name, conf, client=None, persistence=None)[source]

Bases: JobBoard

redis 支持的作业板。

redis-py 库提供支持。

这个任务板通过在 redis 哈希 中列出任务来创建任务条目。这个哈希包含可以由一组符合条件的消费者主动处理(和检查/声明)的任务。通常使用 post() 方法发布任务(这会在 msgpack 中编码的任务内容/细节创建一个哈希条目)。这些任务板的用户(可能在不相交的机器集合上)可以迭代可用的任务,并决定是否尝试声明他们迭代过的任务中的一个。如果是,他们将尝试联系 redis,并尝试在 redis 中创建一个键(使用嵌入的 lua 脚本以原子方式执行此操作)以声明所需的任务。如果尝试使用任务板的实体能够创建该锁/所有者键来 claim() 任务,那么它将被允许(并且预计)执行该任务内容描述的任何工作。一旦声明实体完成,锁/所有者键和 哈希 条目将在单个请求中删除(也使用嵌入的 lua 脚本以原子方式执行此操作)(如果成功完成)。如果声明实体不成功(或者声明任务的实体崩溃),则锁/所有者键可以自动释放(通过**可选**使用声明过期时间)或通过使用 abandon() 手动放弃任务,以便其他人可以消费/处理它。

注意(harlowja): 默认情况下,claim() 没有过期时间(这意味着声明即使在声明实体失败的情况下也是持久的)。要确保发生过期,请将一个数字值作为 expiry 关键字参数传递给 claim() 方法,该方法定义了声明应该保留的秒数。在使用过期时间时,请确保在使用 extend_expiry() 方法定期保持声明的存活状态,同时正在处理它。

CLIENT_CONF_TRANSFERS = (('host', <class 'str'>), ('port', <class 'int'>), ('username', <class 'str'>), ('password', <class 'str'>), ('encoding', <class 'str'>), ('encoding_errors', <class 'str'>), ('socket_timeout', <class 'float'>), ('socket_connect_timeout', <class 'float'>), ('unix_socket_path', <class 'str'>), ('ssl', <function bool_from_string>), ('ssl_keyfile', <class 'str'>), ('ssl_certfile', <class 'str'>), ('ssl_cert_reqs', <class 'str'>), ('ssl_ca_certs', <class 'str'>), ('db', <class 'int'>))

允许从任务板配置代理到 redis 客户端的键(以及值类型转换器)(如果未通过 client 关键字参数显式提供客户端,则用于配置 redis 客户端内部)。

参见: http://redis-py.readthedocs.org/en/2025.2/#redis.Redis

参见: https://github.com/andymccurdy/redis-py/blob/2.10.3/redis/client.py

OWNED_POSTFIX = b'.owned'

用于创建任务所有者键的后缀(与任务键组合)。

LAST_MODIFIED_POSTFIX = b'.last_modified'

用于创建任务最后修改键的后缀(与任务键组合)。

DEFAULT_NAMESPACE = b'taskflow'

当未提供命名空间时,默认的键命名空间。

MIN_REDIS_VERSION = (2, 6)

此后端所需的最低 redis 版本。

需要此版本,因为我们需要包含在 2.6 及更高版本中的内置服务器端 lua 脚本支持。

NAMESPACE_SEP = b':'

用于将键与命名空间组合在一起(以获取将要使用的实际键)的分隔符。

KEY_PIECE_SEP = b'.'

用于将许多键片段组合在一起(以获取将要使用的实际键)的分隔符。

SCRIPT_STATUS_OK = 'ok'

调用成功时预期的 lua 响应状态字段。

SCRIPT_STATUS_ERROR = 'error'

调用不成功时预期的 lua 响应状态字段。

SCRIPT_NOT_EXPECTED_OWNER = 'Not expected owner!'

当所有者与预期不符时,预期的 lua 脚本错误响应。

SCRIPT_UNKNOWN_OWNER = 'Unknown owner!'

当找不到所有者时,预期的 lua 脚本错误响应。

SCRIPT_UNKNOWN_JOB = 'Unknown job!'

当找不到任务时,预期的 lua 脚本错误响应。

SCRIPT_ALREADY_CLAIMED = 'Job already claimed!'

当任务已被声明时,预期的 lua 脚本错误响应。

SCRIPT_TEMPLATES = {'abandon': '\n-- 提取 *所有* 变量 (以便 我们 可以 轻松 知道 它们 是什么)...\nlocal owner_key = KEYS[1]\nlocal listings_key = KEYS[2]\nlocal last_modified_key = KEYS[3]\n\nlocal expected_owner = ARGV[1]\nlocal job_key = ARGV[2]\nlocal last_modified_blob = ARGV[3]\nlocal result = {}\nif redis.call("hexists", listings_key, job_key) == 1 then\n    if redis.call("exists", owner_key) == 1 then\n        local owner = redis.call("get", owner_key)\n        if owner ~= expected_owner then\n            result["status"] = "${error}"\n            result["reason"] = "${not_expected_owner}"\n            result["owner"] = owner\n        else\n            redis.call("del", owner_key)\n            redis.call("set", last_modified_key, last_modified_blob)\n            result["status"] = "${ok}"\n        end\n    else\n        result["status"] = "${error}"\n        result["reason"] = "${unknown_owner}"\n    end\nelse\n    result["status"] = "${error}"\n    result["reason"] = "${unknown_job}"\nend\nreturn cmsgpack.pack(result)\n', 'claim': '\nlocal function apply_ttl(key, ms_expiry)\n    if ms_expiry ~= nil then\n        redis.call("pexpire", key, ms_expiry)\n    end\nend\n\n-- 提取 *所有* 变量 (以便 我们 可以 轻松 知道 它们 是什么)...\nlocal owner_key = KEYS[1]\nlocal listings_key = KEYS[2]\nlocal last_modified_key = KEYS[3]\n\nlocal expected_owner = ARGV[1]\nlocal job_key = ARGV[2]\nlocal last_modified_blob = ARGV[3]\n\n-- 如果 这个 是非数字的 (它 可能是 ),那么它将变成 nil\nlocal ms_expiry = nil\nif ARGV[4] ~= "none" then\n    ms_expiry = tonumber(ARGV[4])\nend\nlocal result = {}\nif redis.call("hexists", listings_key, job_key) == 1 then\n    if redis.call("exists", owner_key) == 1 then\n        local owner = redis.call("get", owner_key)\n        if owner == expected_owner then\n            -- 所有者 相同,保持不变...\n            redis.call("set", last_modified_key, last_modified_blob)\n            apply_ttl(owner_key, ms_expiry)\n        end\n        result["status"] = "${error}"\n        result["reason"] = "${already_claimed}"\n        result["owner"] = owner\n    else\n        redis.call("set", owner_key, expected_owner)\n        redis.call("set", last_modified_key, last_modified_blob)\n        apply_ttl(owner_key, ms_expiry)\n        result["status"] = "${ok}"\n    end\nelse\n    result["status"] = "${error}"\n    result["reason"] = "${unknown_job}"\nend\nreturn cmsgpack.pack(result)\n', 'consume': '\n-- 提取 *所有* 变量 (以便 我们 可以 轻松 知道 它们 是什么)...\nlocal owner_key = KEYS[1]\nlocal listings_key = KEYS[2]\nlocal last_modified_key = KEYS[3]\n\nlocal expected_owner = ARGV[1]\nlocal job_key = ARGV[2]\nlocal result = {}\nif redis.call("hexists", listings_key, job_key) == 1 then\n    if redis.call("exists", owner_key) == 1 then\n        local owner = redis.call("get", owner_key)\n        if owner ~= expected_owner then\n            result["status"] = "${error}"\n            result["reason"] = "${not_expected_owner}"\n            result["owner"] = owner\n        else\n            -- 顺序 很重要 (首先 删除 所有者 ,如果\n            -- 失败了,那么 作业数据 仍然 存在,因此 它可以\n            -- 再次 处理,而不是 相反)... \n            redis.call("del", owner_key, last_modified_key)\n            redis.call("hdel", listings_key, job_key)\n            result["status"] = "${ok}"\n        end\n    else\n        result["status"] = "${error}"\n        result["reason"] = "${unknown_owner}"\n    end\nelse\n    result["status"] = "${error}"\n    result["reason"] = "${unknown_job}"\nend\nreturn cmsgpack.pack(result)\n', 'trash': '\n-- 提取 *所有* 变量 (以便 我们 可以 轻松 知道 它们 是什么)...\nlocal owner_key = KEYS[1]\nlocal listings_key = KEYS[2]\nlocal last_modified_key = KEYS[3]\nlocal trash_listings_key = KEYS[4]\n\nlocal expected_owner = ARGV[1]\nlocal job_key = ARGV[2]\nlocal last_modified_blob = ARGV[3]\nlocal result = {}\nif redis.call("hexists", listings_key, job_key) == 1 then\n    local raw_posting = redis.call("hget", listings_key, job_key)\n    if redis.call("exists", owner_key) == 1 then\n        local owner = redis.call("get", owner_key)\n        if owner ~= expected_owner then\n            result["status"] = "${error}"\n            result["reason"] = "${not_expected_owner}"\n            result["owner"] = owner\n        else\n            -- 这个 顺序 很重要 (尝试 首先 移动 该值\n            -- 并且 只有 这样 成功了,我们 尝试 进行任何删除)... \n            redis.call("hset", trash_listings_key, job_key, raw_posting)\n            redis.call("set", last_modified_key, last_modified_blob)\n            redis.call("del", owner_key)\n            redis.call("hdel", listings_key, job_key)\n            result["status"] = "${ok}"\n        end\n    else\n        result["status"] = "${error}"\n        result["reason"] = "${unknown_owner}"\n    end\nelse\n    result["status"] = "${error}"\n    result["reason"] = "${unknown_job}"\nend\nreturn cmsgpack.pack(result)\n'}

Lua 模板脚本,将被各种方法使用(它们在调用时被转换为真实脚本并加载到 connect() 方法中)。

需要注意的事项

  • lua 脚本是串行运行的,因此当此脚本运行时,没有其他命令会更改后端(redis 也确保没有其他脚本正在运行),因此这些脚本的原子性由 redis 保证。

  • 考虑过事务(甚至大部分已经实现),但最终被拒绝,因为 redis 不支持回滚,并且事务 不能相互依赖(后续操作不能依赖于早期操作的结果)。这两个问题限制了我们正确报告错误(并提供有用的消息)以及在故障/争用情况下保持一致性的能力(由于无法回滚)。使用事务的第三个也是最终的打击是,为了正确使用它们,我们必须监视一个 非常有争议的键(listings 键),这会在负载下导致客户端更频繁地重试(这也会增加网络负载、使用的 CPU 周期、触发的事务失败等)。

  • 由于 EXEC 之前/之后的失败,部分事务执行是可能的(并且缺乏回滚使情况更糟)。

总的来说,在思考之后,似乎拥有小的 lua 脚本并不坏(即使它有些复杂),这归因于上述问题以及公开提到的关于事务的问题。通常使用 lua 脚本来实现这个目的似乎是一种常见的做法,并且它解决了在考虑和实现事务时出现的问题。

一些关于 redis(以及 redis + lua)的链接,可能对查看有用

join(key_piece, *more_key_pieces)[source]

从多个片段创建并返回一个命名空间键。

注意(harlowja):所有文本/unicode 的片段都会被转换为其二进制等效值(如果它们已经是二进制则不进行转换),然后再进行连接(因为 redis 期望二进制键,而不是 unicode/文本键)。

property namespace

所有键都将以此命名空间为前缀(或没有)。

trash_key

一个哈希将存储在此键中,用于存储被丢弃的任务。

sequence_key

一个整数将存储在此键中(用于序列化任务)。

listings_key

一个哈希将存储在此键中,用于存储活动任务。

property job_count

返回此看板上的任务数量。

注意(harlowja):此计数可能会随着任务的出现或删除而变化,因此不应以需要精确和绝对的方式使用此计数的准确性。

property connected

返回此任务板是否已连接。

connect()[source]

打开到任何后端系统的连接。

close()[source]

关闭到任何后端系统的连接。

关闭后,除非重新连接,否则任务板将无法使用。

find_owner(job)[source]

如果存在,则获取任务的所有者。

post(name, book=None, details=None, priority=JobPriority.NORMAL)[source]

原子地将任务创建并发布到看板。

此发布允许其他人尝试认领该任务(然后进行工作)。提供的日志簿、详细信息字典或名称(或这些的组合)的内容必须提供足够的用于引用和构建包含的工作的信息(无论是什么)。

一旦发布了任务,只能通过消费该任务(在任务被认领后)来将其移除。任何实体都可以向任务板发布/提议任务(未来这可能会受到限制)。

返回一个任务对象,代表发布的信息。

wait(timeout=None, initial_delay=0.005, max_delay=1.0, sleep_func=<built-in function sleep>)[source]

等待给定的时间,直到有任务发布。

找到任务后,将返回一个迭代器,该迭代器可用于遍历这些任务。

注意(harlowja):由于看板可以由多个外部实体同时修改,因此返回的迭代器可能为空,因为其他实体可能在创建迭代器后删除了这些任务(请注意在使用时这一点)。

参数:

timeout – 等待任务出现的时间(以秒为单位)(如果为 None,则永久等待)。

iterjobs(only_unclaimed=False, ensure_fresh=False)[source]

返回当前在此看板上的任务的迭代器。

注意(harlowja):迭代顺序应为发布顺序(从旧到新),优先级较高的任务应在优先级较低的任务之前提供,但由后端实现提供最适合它的顺序。

注意(harlowja):返回的迭代器可能支持其他属性,这些属性可用于进一步自定义迭代方式;请检查后端的迭代器对象以确定支持哪些其他属性。

参数:
  • only_unclaimed – 一个布尔值,指示是否仅遍历未认领的任务。

  • ensure_fresh – 一个布尔值,要求仅遍历最新的任务,其中“最新”的定义由后端特定。后端如果其内部语义/功能不支持此参数,则可以忽略此值。

register_entity(entity)[source]

将实体注册到任务板(的后端),例如:一个指挥者。

参数:

entity (Entity) – 要注册为与任务板(的后端)关联的实体

consume(job, who)[source]

永久(且原子地)从看板中删除任务。

消耗向看板(以及任何其他检查看板的人)发出信号,表明此任务已由先前认领该任务的实体完成。

只有认领该任务的实体才能消耗该任务。

已消耗的任务不能被另一个实体认领或重新发布(任务发布是不可变的)。任何消耗未认领的任务(或他们没有认领权的任务)的实体都会引发异常。

参数:
  • job – 此看板上的一个任务,可以被消耗(如果不存在,则会引发 NotFound 异常)。

  • who – 执行消耗的实体的名称,必须与用于认领此任务的名称相同。

claim(job, who, expiry=None)[source]

原子性地尝试认领提供的任务。

如果一个任务被认领,预计认领该任务的实体将在未来的某个时间处理该任务的内容,并完成任务(导致重新发布)或从任务板消费该任务(表示其完成)。如果认领失败,将引发相应的异常来向认领尝试者发出信号。

参数:
  • job – 此任务板上可以认领的任务(如果不存在,将引发 NotFound 异常)。

  • who – 认领实体的名称字符串。

abandon(job, who)[source]

原子性地尝试放弃提供的任务。

此放弃信号告知其他人该任务现在可以被重新认领。通常,如果认领任务的实体未能或无法完成其先前认领的任务,则会发生这种情况。

只有认领了该任务的实体才能放弃任务。任何实体放弃未认领的任务(或他们不拥有的任务)都将导致异常。

参数:
  • job – 此任务板上可以放弃的任务(如果不存在,将引发 NotFound 异常)。

  • who – 执行放弃的实体的名称字符串,必须与认领此任务时使用的名称相同。

trash(job, who)[source]

丢弃提供的任务。

丢弃任务向其他人发出信号,该任务已损坏,不应被重新认领。这为用户提供了一种从外部将任务从板上删除的选项。丢弃的任务详细信息应保留在其他位置以供审查(如果需要)。

只有认领了该任务的实体才能丢弃任务。任何实体丢弃未认领的任务(或他们不拥有的任务)都将导致异常。

参数:
  • job – 此任务板上可以丢弃的任务(如果不存在,将引发 NotFound 异常)。

  • who – 执行丢弃的实体的名称字符串,必须与认领此任务时使用的名称相同。

Etcd

class taskflow.jobs.backends.impl_etcd.EtcdJob(board: EtcdJobBoard, name, client, key, uuid=None, details=None, backend=None, book=None, book_data=None, priority=JobPriority.NORMAL, sequence=None, created_on=None)[source]

Bases: Job

一个 Etcd 任务。

property last_modified

任务最后修改的日期时间。

property created_on

任务创建的日期时间。

property state

访问此任务的当前状态。

property priority

此任务的 JobPriority

expires_in()[source]

声明到期需要多少秒。

extend_expiry(expiry)[source]

延长此作业的拥有者密钥(即声明)的到期时间。

如果请求到期成功,则返回 True,否则返回 False

class taskflow.jobs.backends.impl_etcd.EtcdJobBoard(name, conf, client=None, persistence=None)[source]

Bases: JobBoard

一个由 etcd 支持的任务板。

这个任务板在 etcd 中创建序列化的键/值对。每个键代表一个任务,其关联的值包含以 json 编码的任务参数。任务板的用户可以迭代可用的任务,并决定是否通过调用 claim() 方法尝试声明一个任务。声明任务包括原子地基于任务的键和“.lock”后缀创建一个键。如果该键的原子创建成功,则该任务属于该用户。任何尝试锁定已经锁定的任务都将失败。当任务完成时,用户通过调用 consume() 方法来消费任务,它从 etcd 中删除任务和锁。或者,如果用户想删除任务或将其留给另一个用户,可以丢弃 (trash()) 或放弃 (abandon())。Etcd 不提供解锁任务的方法,当消费者死亡时。Etcd 任务板提供基于全局 ttl 配置设置或 claim() 方法的 expiry 参数的定时过期。当达到此生存时间/过期时,该任务将自动解锁,并且另一个消费者可以声明它。如果预计任务的工作时间超过定义的生存时间,则消费者可以通过调用 EtcdJob.extend_expiry() 函数来刷新计时器。

incr(key)[source]

原子地递增一个整数,如果它不存在则创建它

iterjobs(only_unclaimed=False, ensure_fresh=False)[source]

返回当前在此看板上的任务的迭代器。

wait(timeout=None)[source]

等待给定的时间,直到有任务发布。

property job_count

返回此看板上的任务数量。

find_owner(job: EtcdJob) dict | None[source]

如果存在,则获取任务的所有者。

post(name, book=None, details=None, priority=JobPriority.NORMAL) EtcdJob[source]

原子地将任务创建并发布到看板。

claim(job, who, expiry=None)[source]

原子性地尝试认领提供的任务。

consume(job, who)[source]

永久(且原子地)从看板中删除任务。

abandon(job, who)[source]

原子性地尝试放弃提供的任务。

trash(job, who)[source]

丢弃提供的任务。

register_entity(entity: entity.Entity)[source]

注册一个实体到任务板(的后端),例如:一个调度器

property connected

返回此任务板是否已连接。

connect()[source]

打开到任何后端系统的连接。

close()[source]

关闭到任何后端系统的连接。

层级

Inheritance diagram of taskflow.jobs.base, taskflow.jobs.backends.impl_redis, taskflow.jobs.backends.impl_zookeeper, taskflow.jobs.backends.impl_etcd