实用工具¶
警告
应尽量减少对内部实用函数和模块的外部使用,因为它们可能会在未经通知(且不经过典型的弃用周期)的情况下被更改、重构或移动至其他位置。请注意
Async¶
Eventlet¶
Iterators¶
- taskflow.utils.iter_utils.fill(it, desired_len, filler=None)[source]¶
迭代提供的迭代器,直到达到所需的长度。
如果源迭代器没有足够的值,则会产生填充值,直到达到所需的长度。
- taskflow.utils.iter_utils.generate_delays(delay, max_delay, multiplier=2)[source]¶
生成器/迭代器,提供延迟值。
它生成的值在每次迭代后按给定的倍数递增(使用最大延迟作为上限)。永远不会生成负值……并且它将永远迭代(即它将永远不会停止生成值)。
- taskflow.utils.iter_utils.find_first_match(it, matcher, not_found_value=None)[source]¶
搜索迭代器中第一个匹配器回调返回 true 的值。
Kazoo¶
- exception taskflow.utils.kazoo_utils.KazooTransactionException(message, failures)[source]¶
基类:
KazooException当检查到的提交失败时引发的异常。
- taskflow.utils.kazoo_utils.checked_commit(txn)[source]¶
提交 kazoo 事务并验证结果。
注意(harlowja): 在 https://github.com/python-zk/kazoo/pull/224 修复或合并类似的 pull request 之前,我们必须解决事务静默失败的问题。
- taskflow.utils.kazoo_utils.check_compatible(client, min_version=None, max_version=None)[source]¶
检查 kazoo 客户端是否由 zookeeper 服务器版本支持。
此检查将验证客户端连接的 zookeeper 服务器版本是否满足给定的最小版本(包括)和最大版本(包括)范围。如果服务器不在提供的版本范围内,则会引发一个指示此的异常。
- taskflow.utils.kazoo_utils.make_client(conf)[source]¶
-
- 参数:
conf (dict) – 用于配置创建的客户端的配置字典
将提取的键是
read_only: 布尔值,指定是否允许连接到只读服务器,默认为Falserandomize_hosts: 布尔值,指定是否随机化提供的 host 列表,默认为Falsecommand_retry: kazoo retry 对象(或用于创建它的选项字典),用于重试执行的命令connection_retry: kazoo retry 对象(或用于创建它的选项字典),用于重试发生的连接故障hosts: 一个字符串、列表、集合(或具有 host 键的字典),将指定 kazoo 客户端应连接的 host,如果未提供,则默认使用localhost:2181timeout: 一个浮点值,指定 kazoo 客户端将使用的默认超时时间handler: 一个 kazoo 处理程序对象,可用于为客户端提供替代的异步策略(默认基于 线程,但可以根据需要提供 gevent 或 eventlet)keyfile: 用于身份验证的 SSL 密钥文件keyfile_password: SSL 密钥文件密码certfile: 用于身份验证的 SSL 证书文件ca: 用于身份验证的 SSL CA 文件use_ssl: 参数,用于控制是否使用 SSLverify_certs: 使用 SSL 时,用于绕过证书验证
Kombu¶
- class taskflow.utils.kombu_utils.DelayedPretty(message)[source]¶
基类:
object包装一个消息,并延迟美化它,直到请求为止。
TODO(harlowja): 在 https://github.com/celery/kombu/pull/454/ 合并并发布包含它的版本后删除此内容(因为该 pull request 等效于或优于此)。
Miscellaneous¶
- class taskflow.utils.misc.StrEnum(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
基础:
str,Enum一种既是字符串又能与字符串进行比较的枚举类型。
- class taskflow.utils.misc.StringIO(initial_value='', newline='\n')[source]¶
基础:
StringIO具有一些小附加功能的字符串缓冲区。
- taskflow.utils.misc.get_hostname(unknown_hostname='<unknown>')[source]¶
获取机器的主机名;如果无法获取,则返回一个无效的主机名。
- taskflow.utils.misc.match_type(obj, matchers)[source]¶
使用给定的匹配器列表/可迭代对象匹配给定的对象。
注意(harlowja):提供的列表/可迭代对象的每个元素必须是 (有效类型,结果) 的元组。
如果发生类型匹配,则返回结果(提供的元组的第二个元素),否则如果未找到匹配项,则返回 None。
- taskflow.utils.misc.countdown_iter(start_at, decr=1)[source]¶
生成器,每次生成后递减,直到 <= 零。
注意(harlowja):我们可能可以在使用
itertools.count时删除此功能,该功能接受一个步长(在仍然受支持的 py2.6 上,该步长参数不存在,因此无法使用)。
- taskflow.utils.misc.merge_uri(uri, conf)[source]¶
将解析后的 URI 合并到给定的配置字典中。
将 URI 的用户名、密码、主机名、端口和查询参数合并到给定的配置字典中(如果配置键已经存在,则不覆盖现有配置键),并返回合并后的配置。
注意(harlowja):不合并路径、方案或片段。
- taskflow.utils.misc.find_subclasses(locations, base_cls, exclude_hidden=True)[source]¶
在给定的位置查找子类类型。
这将检查给定位置的类型,这些类型是提供的基本类类型的子类,并返回找到的子类(如果无法完成此内省,则会引发异常)。
如果提供字符串作为其中一个位置,则会导入并检查它是否是基本类的子类。如果提供模块,则将检查其所有成员是否存在基本类的子类属性。如果提供类型本身,则将检查它是否是基本类的子类。
- taskflow.utils.misc.clamp(value, minimum, maximum, on_clamped=None)[source]¶
将值限制在 >= 最小值和 <= 最大值之间。
- taskflow.utils.misc.binary_encode(text, encoding='utf-8', errors='strict')[source]¶
使用给定的编码将文本字符串编码为二进制字符串。
如果数据已经是二进制字符串,则不执行任何操作(在未知类型上引发错误)。
- taskflow.utils.misc.binary_decode(data, encoding='utf-8', errors='strict')[source]¶
使用给定的编码将二进制字符串解码为文本字符串。
如果数据已经是文本字符串,则不执行任何操作(在未知类型上引发错误)。
- taskflow.utils.misc.decode_msgpack(raw_data, root_types=(<class 'dict'>, ))[source]¶
解析原始数据以获取解码对象。
解码来自给定原始数据二进制字符串的 msgback 编码的“blob”,并检查该解码对象的根类型是否在允许的类型集合中(默认情况下,字典应该是根类型)。
- taskflow.utils.misc.decode_json(raw_data, root_types=(<class 'dict'>, ))[source]¶
解析原始数据以获取解码对象。
解码来自给定原始数据二进制字符串的 JSON 编码的“blob”,并检查该解码对象的根类型是否在允许的类型集合中(默认情况下,字典应该是根类型)。
- class taskflow.utils.misc.cachedproperty(fget=None, require_lock=True)[source]¶
基类:
object一个线程安全描述符属性,仅评估一次。
可以将此缓存描述符放置在实例方法上,以将这些方法转换为属性,这些属性将在实例中缓存(避免重复的属性检查逻辑来执行等效操作)。
注意(harlowja):默认情况下,将保存的属性位于装饰的方法名称前加上下划线。例如,如果我们将此描述符附加到实例方法“get_thing(self)”,则缓存的属性将存储在第一次调用“get_thing”时在 self 对象中的“_get_thing”下。
- taskflow.utils.misc.get_version_string(obj)[source]¶
获取对象的版本字符串。
返回对象‘version’属性的字符串表示,如果对象没有该属性或其版本为None,则返回None。
- taskflow.utils.misc.sequence_minus(seq1, seq2)[source]¶
计算两个序列的差集。
结果包含第一个序列中不在第二个序列中的元素,保持原始顺序。即使序列元素不可哈希也能工作。
- taskflow.utils.misc.capture_failure()[source]¶
捕获发生的异常并返回一个失败对象。
这将保存当前异常信息并返回一个供调用者使用的失败对象(如果未处理活动异常,则会引发运行时错误)。
这是因为在某些情况下,异常上下文可能会被清除,导致尝试保存None。当eventlet切换绿线程或在运行异常处理程序时,代码引发并捕获异常时,可能会发生这种情况。在两种情况下,异常上下文都将被清除。
为了解决这个问题,我们保存异常状态,yield一个失败,然后运行其他代码。
例如
>>> from taskflow.utils import misc >>> >>> def cleanup(): ... pass ... >>> >>> def save_failure(f): ... print("Saving %s" % f) ... >>> >>> try: ... raise IOError("Broken") ... except Exception: ... with misc.capture_failure() as fail: ... print("Activating cleanup") ... cleanup() ... save_failure(fail) ... Activating cleanup Saving Failure: IOError: Broken
持久化¶
- taskflow.utils.persistence_utils.temporary_log_book(backend=None)[source]¶
在给定的后端中创建一个用于临时使用的临时日志簿。
主要适用于测试和其他需要短时间临时日志簿的用例。
- taskflow.utils.persistence_utils.temporary_flow_detail(backend=None, meta=None)[source]¶
在给定的后端中创建一个临时流程详情和日志簿。
主要适用于测试和其他需要短时间临时流程详情和临时日志簿的用例。
- taskflow.utils.persistence_utils.create_flow_detail(flow, book=None, backend=None, meta=None)[source]¶
为流程创建一个流程详情,并将其添加到日志簿中并保存。
这将使用流程名称为给定的流程创建一个流程详情,将其添加到提供的日志簿中,然后使用给定的后端保存日志簿,然后返回创建的流程详情。
如果未提供book,将自动创建一个临时book(不会返回对日志簿的引用,因此这几乎总是应该提供或仅在不需要日志簿的情况下使用,例如在测试中)。如果未提供后端,则不会进行保存,即使流程详情已添加到给定的(或临时生成的)日志簿中,创建的流程详情也不会被持久化。
Redis¶
- class taskflow.utils.redis_utils.RedisClient(*args, **kwargs)[source]¶
基类:
Redis可以关闭的redis客户端(并在关闭后禁止使用)。
TODO(harlowja): 如果 https://github.com/andymccurdy/redis-py/issues/613 最终得到解决或合并或其他,那么我们可能可以删除这个。
- transaction(func: Callable[[Pipeline], None], *watches, **kwargs) None¶
方便的方法,用于将可调用对象func作为事务执行,同时监视watches中指定的所有键。‘func’可调用对象应期望一个参数,即Pipeline对象。
- pubsub(**kwargs)¶
返回发布/订阅对象。使用此对象,您可以订阅通道并侦听发布到这些通道的消息。
- class taskflow.utils.redis_utils.UnknownExpire(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
基类:
IntEnum来自
get_expiry()的非过期(非ttl)结果。参见: https://redis.ac.cn/commands/ttl 或 https://redis.ac.cn/commands/pttl
- DOES_NOT_EXPIRE = -1¶
该命令返回
-1如果键存在但没有关联的过期时间。
- KEY_NOT_FOUND = -2¶
该命令返回
-2如果键不存在。
- taskflow.utils.redis_utils.get_expiry(client, key, prior_version=None)[source]¶
获取键的过期时间(使用最佳确定 ttl 方法)。
模式¶
线程¶
- taskflow.utils.threading_utils.daemon_thread(target, *args, **kwargs)[source]¶
创建一个守护线程,该线程在启动时调用给定的目标。