实用工具

警告

应尽量减少对内部实用函数和模块的外部使用,因为它们可能会在未经通知(且不经过典型的弃用周期)的情况下被更改、重构或移动至其他位置。请注意

Async

taskflow.utils.async_utils.make_completed_future(result)[source]

创建一个带有给定结果的已完成的 future。

Banner

taskflow.utils.banner.make_banner(what, chapters)[source]

创建一个 taskflow banner 字符串。

例如

>>> from taskflow.utils import banner
>>> chapters = {
    'Connection details': {
        'Topic': 'hello',
    },
    'Powered by': {
        'Executor': 'parallel',
    },
}
>>> print(banner.make_banner('Worker', chapters))

这将输出

___    __
 |    |_
 |ask |low v1.26.1
*Worker*
Connection details:
  Topic => hello
Powered by:
  Executor => parallel

Eventlet

taskflow.utils.eventlet_utils.check_for_eventlet(exc=None)[source]

检查 eventlet 是否可用,如果不可用则引发运行时错误。

参数:

exc (异常) – 代替引发运行时错误而要引发的异常

Iterators

taskflow.utils.iter_utils.fill(it, desired_len, filler=None)[source]

迭代提供的迭代器,直到达到所需的长度。

如果源迭代器没有足够的值,则会产生填充值,直到达到所需的长度。

taskflow.utils.iter_utils.count(it)[source]

返回迭代器中的值数量(耗尽迭代器)。

taskflow.utils.iter_utils.generate_delays(delay, max_delay, multiplier=2)[source]

生成器/迭代器,提供延迟值。

它生成的值在每次迭代后按给定的倍数递增(使用最大延迟作为上限)。永远不会生成负值……并且它将永远迭代(即它将永远不会停止生成值)。

taskflow.utils.iter_utils.unique_seen(its, seen_selector=None)[source]

从迭代器中产生唯一的值(并保留顺序)。

taskflow.utils.iter_utils.find_first_match(it, matcher, not_found_value=None)[source]

搜索迭代器中第一个匹配器回调返回 true 的值。

taskflow.utils.iter_utils.while_is_not(it, stop_value)[source]

从迭代器中产生给定的值,直到传递停止值。

它使用 is 运算符来确定等价性(而不是 == 运算符)。

taskflow.utils.iter_utils.iter_forever(limit)[source]

从迭代器中产生值,直到达到限制。

如果限制为负数,则我们永远迭代。

Kazoo

taskflow.utils.kazoo_utils.prettify_failures(failures, limit=-1)[source]

美化检查到的提交失败(忽略敏感数据……)。

exception taskflow.utils.kazoo_utils.KazooTransactionException(message, failures)[source]

基类: KazooException

当检查到的提交失败时引发的异常。

taskflow.utils.kazoo_utils.checked_commit(txn)[source]

提交 kazoo 事务并验证结果。

注意(harlowja): 在 https://github.com/python-zk/kazoo/pull/224 修复或合并类似的 pull request 之前,我们必须解决事务静默失败的问题。

taskflow.utils.kazoo_utils.finalize_client(client)[source]

停止并关闭客户端,即使它没有启动。

taskflow.utils.kazoo_utils.check_compatible(client, min_version=None, max_version=None)[source]

检查 kazoo 客户端是否由 zookeeper 服务器版本支持。

此检查将验证客户端连接的 zookeeper 服务器版本是否满足给定的最小版本(包括)和最大版本(包括)范围。如果服务器不在提供的版本范围内,则会引发一个指示此的异常。

taskflow.utils.kazoo_utils.make_client(conf)[source]

根据配置字典创建 kazoo 客户端

参数:

conf (dict) – 用于配置创建的客户端的配置字典

将提取的键是

  • read_only: 布尔值,指定是否允许连接到只读服务器,默认为 False

  • randomize_hosts: 布尔值,指定是否随机化提供的 host 列表,默认为 False

  • command_retry: kazoo retry 对象(或用于创建它的选项字典),用于重试执行的命令

  • connection_retry: kazoo retry 对象(或用于创建它的选项字典),用于重试发生的连接故障

  • hosts: 一个字符串、列表、集合(或具有 host 键的字典),将指定 kazoo 客户端应连接的 host,如果未提供,则默认使用 localhost:2181

  • timeout: 一个浮点值,指定 kazoo 客户端将使用的默认超时时间

  • handler: 一个 kazoo 处理程序对象,可用于为客户端提供替代的异步策略(默认基于 线程,但可以根据需要提供 geventeventlet

  • keyfile : 用于身份验证的 SSL 密钥文件

  • keyfile_password: SSL 密钥文件密码

  • certfile: 用于身份验证的 SSL 证书文件

  • ca: 用于身份验证的 SSL CA 文件

  • use_ssl: 参数,用于控制是否使用 SSL

  • verify_certs: 使用 SSL 时,用于绕过

    证书验证

Kombu

class taskflow.utils.kombu_utils.DelayedPretty(message)[source]

基类: object

包装一个消息,并延迟美化它,直到请求为止。

TODO(harlowja): 在 https://github.com/celery/kombu/pull/454/ 合并并发布包含它的版本后删除此内容(因为该 pull request 等效于或优于此)。

Miscellaneous

class taskflow.utils.misc.StrEnum(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

基础: str, Enum

一种既是字符串又能与字符串进行比较的枚举类型。

class taskflow.utils.misc.StringIO(initial_value='', newline='\n')[source]

基础: StringIO

具有一些小附加功能的字符串缓冲区。

class taskflow.utils.misc.BytesIO(initial_bytes=b'')[source]

继承自 BytesIO

具有一些小附加功能的字节缓冲区。

taskflow.utils.misc.get_hostname(unknown_hostname='<unknown>')[source]

获取机器的主机名;如果无法获取,则返回一个无效的主机名。

taskflow.utils.misc.match_type(obj, matchers)[source]

使用给定的匹配器列表/可迭代对象匹配给定的对象。

注意(harlowja):提供的列表/可迭代对象的每个元素必须是 (有效类型,结果) 的元组。

如果发生类型匹配,则返回结果(提供的元组的第二个元素),否则如果未找到匹配项,则返回 None。

taskflow.utils.misc.countdown_iter(start_at, decr=1)[source]

生成器,每次生成后递减,直到 <= 零。

注意(harlowja):我们可能可以在使用 itertools.count 时删除此功能,该功能接受一个步长(在仍然受支持的 py2.6 上,该步长参数不存在,因此无法使用)。

taskflow.utils.misc.extract_driver_and_conf(conf, conf_key)[source]

获取驱动程序名称及其配置的常用函数。

taskflow.utils.misc.reverse_enumerate(items)[source]

类似于 reversed(enumerate(items)),但减少了复制/克隆…

taskflow.utils.misc.merge_uri(uri, conf)[source]

将解析后的 URI 合并到给定的配置字典中。

将 URI 的用户名、密码、主机名、端口和查询参数合并到给定的配置字典中(如果配置键已经存在,则不覆盖现有配置键),并返回合并后的配置。

注意(harlowja):不合并路径、方案或片段。

taskflow.utils.misc.find_subclasses(locations, base_cls, exclude_hidden=True)[source]

在给定的位置查找子类类型。

这将检查给定位置的类型,这些类型是提供的基本类类型的子类,并返回找到的子类(如果无法完成此内省,则会引发异常)。

如果提供字符串作为其中一个位置,则会导入并检查它是否是基本类的子类。如果提供模块,则将检查其所有成员是否存在基本类的子类属性。如果提供类型本身,则将检查它是否是基本类的子类。

taskflow.utils.misc.pick_first_not_none(*values)[source]

返回第一个非 None 的值(如果全部为/是 None,则返回 None)。

taskflow.utils.misc.parse_uri(uri)[source]

将 URI 解析为其组成部分。

taskflow.utils.misc.disallow_when_frozen(excp_cls)[source]

冻结检查/引发方法装饰器。

taskflow.utils.misc.clamp(value, minimum, maximum, on_clamped=None)[source]

将值限制在 >= 最小值和 <= 最大值之间。

taskflow.utils.misc.fix_newlines(text, replacement='\n')[source]

修复可能以错误的换行符结尾的文本,并替换为正确的换行符。

taskflow.utils.misc.binary_encode(text, encoding='utf-8', errors='strict')[source]

使用给定的编码将文本字符串编码为二进制字符串。

如果数据已经是二进制字符串,则不执行任何操作(在未知类型上引发错误)。

taskflow.utils.misc.binary_decode(data, encoding='utf-8', errors='strict')[source]

使用给定的编码将二进制字符串解码为文本字符串。

如果数据已经是文本字符串,则不执行任何操作(在未知类型上引发错误)。

taskflow.utils.misc.decode_msgpack(raw_data, root_types=(<class 'dict'>, ))[source]

解析原始数据以获取解码对象。

解码来自给定原始数据二进制字符串的 msgback 编码的“blob”,并检查该解码对象的根类型是否在允许的类型集合中(默认情况下,字典应该是根类型)。

taskflow.utils.misc.decode_json(raw_data, root_types=(<class 'dict'>, ))[source]

解析原始数据以获取解码对象。

解码来自给定原始数据二进制字符串的 JSON 编码的“blob”,并检查该解码对象的根类型是否在允许的类型集合中(默认情况下,字典应该是根类型)。

class taskflow.utils.misc.cachedproperty(fget=None, require_lock=True)[source]

基类: object

一个线程安全描述符属性,仅评估一次。

可以将此缓存描述符放置在实例方法上,以将这些方法转换为属性,这些属性将在实例中缓存(避免重复的属性检查逻辑来执行等效操作)。

注意(harlowja):默认情况下,将保存的属性位于装饰的方法名称前加上下划线。例如,如果我们将此描述符附加到实例方法“get_thing(self)”,则缓存的属性将存储在第一次调用“get_thing”时在 self 对象中的“_get_thing”下。

taskflow.utils.misc.millis_to_datetime(milliseconds)[source]

将自纪元以来的毫秒数转换为 datetime 对象。

taskflow.utils.misc.get_version_string(obj)[source]

获取对象的版本字符串。

返回对象‘version’属性的字符串表示,如果对象没有该属性或其版本为None,则返回None。

taskflow.utils.misc.sequence_minus(seq1, seq2)[source]

计算两个序列的差集。

结果包含第一个序列中不在第二个序列中的元素,保持原始顺序。即使序列元素不可哈希也能工作。

taskflow.utils.misc.as_int(obj, quiet=False)[source]

将任意值转换为整数。

taskflow.utils.misc.capture_failure()[source]

捕获发生的异常并返回一个失败对象。

这将保存当前异常信息并返回一个供调用者使用的失败对象(如果未处理活动异常,则会引发运行时错误)。

这是因为在某些情况下,异常上下文可能会被清除,导致尝试保存None。当eventlet切换绿线程或在运行异常处理程序时,代码引发并捕获异常时,可能会发生这种情况。在两种情况下,异常上下文都将被清除。

为了解决这个问题,我们保存异常状态,yield一个失败,然后运行其他代码。

例如

>>> from taskflow.utils import misc
>>>
>>> def cleanup():
...     pass
...
>>>
>>> def save_failure(f):
...     print("Saving %s" % f)
...
>>>
>>> try:
...     raise IOError("Broken")
... except Exception:
...     with misc.capture_failure() as fail:
...         print("Activating cleanup")
...         cleanup()
...         save_failure(fail)
...
Activating cleanup
Saving Failure: IOError: Broken
taskflow.utils.misc.is_iterable(obj)[source]

测试一个对象是否可迭代。

此函数将测试指定的对象以确定它是否可迭代。字符串类型(strunicode)将被忽略,并返回False。

参数:

obj – 要测试的可迭代对象

返回值:

如果对象是可迭代的且不是字符串,则返回True

taskflow.utils.misc.safe_copy_dict(obj)[source]

复制现有的字典或默认为空字典…

如果给定的对象为假值,则返回一个空字典,否则将创建给定对象的字典(如果提供的是字典对象,则将创建该对象的浅拷贝)。

持久化

taskflow.utils.persistence_utils.temporary_log_book(backend=None)[source]

在给定的后端中创建一个用于临时使用的临时日志簿。

主要适用于测试和其他需要短时间临时日志簿的用例。

taskflow.utils.persistence_utils.temporary_flow_detail(backend=None, meta=None)[source]

在给定的后端中创建一个临时流程详情和日志簿。

主要适用于测试和其他需要短时间临时流程详情和临时日志簿的用例。

taskflow.utils.persistence_utils.create_flow_detail(flow, book=None, backend=None, meta=None)[source]

为流程创建一个流程详情,并将其添加到日志簿中并保存。

这将使用流程名称为给定的流程创建一个流程详情,将其添加到提供的日志簿中,然后使用给定的后端保存日志簿,然后返回创建的流程详情。

如果未提供book,将自动创建一个临时book(不会返回对日志簿的引用,因此这几乎总是应该提供或仅在不需要日志簿的情况下使用,例如在测试中)。如果未提供后端,则不会进行保存,即使流程详情已添加到给定的(或临时生成的)日志簿中,创建的流程详情也不会被持久化。

Redis

class taskflow.utils.redis_utils.RedisClient(*args, **kwargs)[source]

基类: Redis

可以关闭的redis客户端(并在关闭后禁止使用)。

TODO(harlowja): 如果 https://github.com/andymccurdy/redis-py/issues/613 最终得到解决或合并或其他,那么我们可能可以删除这个。

transaction(func: Callable[[Pipeline], None], *watches, **kwargs) None

方便的方法,用于将可调用对象func作为事务执行,同时监视watches中指定的所有键。‘func’可调用对象应期望一个参数,即Pipeline对象。

pubsub(**kwargs)

返回发布/订阅对象。使用此对象,您可以订阅通道并侦听发布到这些通道的消息。

class taskflow.utils.redis_utils.UnknownExpire(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

基类: IntEnum

来自 get_expiry() 的非过期(非ttl)结果。

参见: https://redis.ac.cn/commands/ttlhttps://redis.ac.cn/commands/pttl

DOES_NOT_EXPIRE = -1

该命令返回 -1 如果键存在但没有关联的过期时间。

KEY_NOT_FOUND = -2

该命令返回 -2 如果键不存在。

taskflow.utils.redis_utils.get_expiry(client, key, prior_version=None)[source]

获取键的过期时间(使用最佳确定 ttl 方法)。

taskflow.utils.redis_utils.apply_expiry(client, key, expiry, prior_version=None)[source]

应用键的过期时间(使用最佳确定过期方法)。

taskflow.utils.redis_utils.is_server_new_enough(client, min_version, default=False, prior_version=None)[source]

检查客户端是否连接到足够新的redis服务器。

模式

taskflow.utils.schema_utils.schema_validate(data, schema)[source]

使用提供的 json schema 验证给定的数据。

线程

taskflow.utils.threading_utils.is_alive(thread)[source]

辅助函数,用于确定线程是否存活(安全处理 None)。

taskflow.utils.threading_utils.get_ident()[source]

返回当前线程的“线程标识符”。

taskflow.utils.threading_utils.get_optimal_thread_count(default=2)[source]

尝试猜测当前系统的最佳线程数。

taskflow.utils.threading_utils.daemon_thread(target, *args, **kwargs)[source]

创建一个守护线程,该线程在启动时调用给定的目标。

taskflow.utils.threading_utils.no_op(*args, **kwargs)[source]

一个什么也不做的函数。

class taskflow.utils.threading_utils.ThreadBundle[source]

基类: object

一组可以一起启动/停止的线程。

bind(thread_factory, before_start=None, after_start=None, before_join=None, after_join=None)[source]

将线程(待创建)添加到此 bundle 中(并提供回调)。

注意(harlowja):提供回调不应尝试调用

修改方法(stop()start()bind() …)到此对象,因为此对象的锁并非设计为(也不是)可重入的…

start()[source]

创建并启动所有关联的线程(尚未运行的线程)。

stop()[source]

停止并 Join 所有关联的线程(已启动的线程)。