类型

注意

尽管这些类型旨在供公众消费,并且鼓励/轻松使用,但应注意,这些类型在未来可能会被移出到新的库中。如果您在使用此库其他部分的情况下使用这些类型,则强烈建议您积极倡导将这些类型制作为独立的库(因为以这种方式使用这些类型并非预期和/或期望的用法)。

实体

class taskflow.types.entity.Entity(kind, name, metadata)[source]

基类: object

实体对象,用于标识某个资源/项目/其他。

变量:
  • kind不可变的类型/种类,用于标识此实体(通常是特定于库/应用程序的)

  • Entity.name不可变的名称,可用于唯一标识此实体与其他众多实体。

  • metadata不可变的元数据字典,与此实体相关联(通常包含进一步描述此实体的键/值)。

失败

class taskflow.types.failure.Failure(exc_info=None, **kwargs)[source]

基类: object

表示失败的不可变对象。

Failure 对象封装了异常信息,以便以后可以重新引发、检查、检测、记录、打印、序列化、反序列化……

其中一个依赖它们的示例是 WBE 引擎。当远程工作进程抛出异常时,基于 WBE 的引擎会收到该异常,并希望将其重新引发给 WBE 引擎的用户/调用者以进行适当处理(这与非远程引擎的行为相匹配)。为了实现这一点,可以通过 WBE 通道发送一个 failure 对象(或其 to_dict() 形式),然后基于 WBE 的引擎会将其反序列化并使用该对象的 reraise() 方法来引发一个包含与原始异常相似/等效信息的异常,允许用户(或 WBE 引擎本身)按照他们想要的进行处理工作进程失败/异常。

对于好奇的读者,以下是原始异常本身可能不会被重新引发,而是会重新引发一个包装的 failure 异常对象的一些原因。这些解释适用于 failure 对象被序列化和反序列化时(当它保留在创建异常的 python 进程内部时,原始异常可以正确地重新引发而没有问题)。

  • Traceback 对象不可序列化/不可重构,因为它们包含指向异常被引发位置的堆栈帧的引用。当 failure 对象被序列化并通过通道发送并重新创建时,无法恢复原始 traceback 和起源堆栈帧。

  • 原始异常类型不能保证能被找到,工作进程可以运行在反序列化 failure 时不可访问/不可用的代码。即使能够安全地使用 pickle,在这种情况下也无法找到起源异常或相关代码。

  • 原始异常类型不能保证能以正确的方式构造。在 failure 对象创建时,异常已经被创建,而 failure 对象不能假定它拥有捕获异常的原始类型(或者有能力)重新构造(如果原始异常是通过某个自定义异常构造函数的复杂过程创建的,这一点尤其困难)。

  • 原始异常类型不能保证能以安全的方式构造。动态导入外部异常类型可能存在问题,除非是以安全的方式完成;由于 failure 对象可以捕获任何异常,动态导入那些异常类型的命名空间和模块在接收端将是不安全的(这会产生类似 python 的 `pickle` 模块的问题,它允许导入外部模块,导致这些模块的代码被运行,这可能导致接收方不希望产生的问题和副作用)。

TODO(harlowja): 使用 17911 的部分以及 https://pypi.ac.cn/project/traceback2/ 的反向移植来(希望)简化此对象的方法和内容……

BASE_EXCEPTIONS = ('BaseException', 'Exception')

所有其他 python 异常的根异常。

参见:https://docs.pythonlang.cn/2/library/exceptions.html

SCHEMA = {'$ref': '#/definitions/cause', 'definitions': {'cause': {'additionalProperties': True, 'properties': {'causes': {'items': {'$ref': '#/definitions/cause'}, 'type': 'array'}, 'exc_args': {'minItems': 0, 'type': 'array'}, 'exc_type_names': {'items': {'type': 'string'}, 'minItems': 1, 'type': 'array'}, 'exception_str': {'type': 'string'}, 'traceback_str': {'type': 'string'}, 'version': {'minimum': 0, 'type': 'integer'}}, 'required': ['exception_str', 'traceback_str', 'exc_type_names'], 'type': 'object'}}}

预期的失败模式(以 json schema 格式)。

classmethod from_exception(exception)[source]

从异常实例创建 failure 对象。

classmethod validate(data)[source]

验证输入数据是否符合预期的 failure dict 格式。

matches(other)[source]

检查另一个对象是否等同于此对象。

返回值:

检查另一个对象是否等同于此对象

返回类型:

布尔值

property exception

异常值,如果不存在异常值则为 None。

异常值在序列化过程中可能会丢失。

property exception_str

异常的字符串表示形式。

property exception_args

传递给异常构造函数的参数元组。

property exc_info

异常信息元组或 None。

参见:https://docs.pythonlang.cn/2/library/sys.html#sys.exc_info 关于

此元组的内容(如果没有,则无法检查任何内容)。

property traceback_str

异常 traceback 字符串。

static reraise_if_any(failures)[source]

如果参数不为空,则重新引发异常。

如果参数是空列表/元组/迭代器,此方法返回 None。如果参数被转换为包含单个 `Failure` 对象的列表,则重新引发该 failure。否则,将引发一个带有 failure 列表作为原因的 WrappedFailure 异常。

reraise()[source]

重新引发捕获的异常。

check(*exc_classes)[source]

检查 exc_classes 中的任何一个是否导致了失败。

此方法的参数可以是异常类型或类型名称(字符串)。如果捕获的异常是给定类型异常的实例,则返回相应的参数。否则,返回 None。

property causes

此 failure 的所有内部 failure原因的元组。

注意(harlowja):包括当前 failure(仅返回此 failure 的已连接原因,如果有)。此属性在 python 3.x 或更高版本上才真正有用,因为旧版本没有关联的原因(在 2.x 版本的 python 上,元组总是为空)。

参考 PEP 3134PEP 409PEP 415 来了解用于查找 failure 原因的内容。

pformat(traceback=False)[source]

将 failure 对象漂亮格式化为字符串。

classmethod from_dict(data)[source]

将此字典转换为对象。

to_dict(include_args=True)[source]

将此对象转换为字典。

参数:

include_args – 布尔值,指示是否在输出中包含异常参数。

copy()[source]

复制此对象。

class taskflow.types.graph.Graph(incoming_graph_data=None, name='')[source]

Bases: Graph

具有有用实用功能的图子类。

freeze()[source]

冻结图,使其不再发生突变。

export_to_dot()[source]

将图导出为 dot 格式(需要 pydot 库)。

pformat()[source]

漂亮地格式化您的图为字符串。

add_edge(u, v, attr_dict=None, **attr)[source]

在 u 和 v 之间添加边。

add_node(n, attr_dict=None, **attr)[source]

添加单个节点 n 并更新节点属性。

fresh_copy()[source]

返回一个具有相同数据结构的新的图副本。

新的副本没有任何节点、边或图属性。它与当前图具有相同的数据结构。此方法通常用于创建图的空版本。

class taskflow.types.graph.DiGraph(incoming_graph_data=None, name='')[source]

Bases: DiGraph

具有有用实用功能的有向图子类。

freeze()[source]

冻结图,使其不再发生突变。

get_edge_data(u, v, default=None)[source]

返回 (u, v) 之间边属性字典的副本

注意(harlowja):这与 networkx 的 get_edge_data() 不同,因为后一个函数不返回副本(而是返回对实际边数据的引用)。

topological_sort()[source]

以拓扑排序的顺序返回图中的节点列表。

pformat()[source]

漂亮地格式化您的图为字符串。

这个漂亮格式化的字符串表示包含关于您的图的许多有用细节,包括:名称、类型、冻结状态、节点数、节点、边数、边、图密度和图循环(如果有)。

export_to_dot()[source]

将图导出为 dot 格式(需要 pydot 库)。

is_directed_acyclic()[source]

返回此图是否为 DAG。

no_successors_iter()[source]

返回所有没有后继节点的迭代器。

no_predecessors_iter()[source]

返回所有没有前驱节点的迭代器。

bfs_predecessors_iter(n)[source]

通过广度优先遍历给定节点的所有前驱节点。

这将遍历节点的直接前驱,然后是这些前驱节点的前驱,依此类推,直到找不到更多前驱节点为止。

注意(harlowja):前驱节点循环(如果存在)不会被迭代超过一次(这可以防止无限迭代)。

add_edge(u, v, attr_dict=None, **attr)[source]

在 u 和 v 之间添加边。

add_node(n, attr_dict=None, **attr)[source]

添加单个节点 n 并更新节点属性。

fresh_copy()[source]

返回一个具有相同数据结构的新的图副本。

新的副本没有任何节点、边或图属性。它与当前图具有相同的数据结构。此方法通常用于创建图的空版本。

class taskflow.types.graph.OrderedDiGraph(incoming_graph_data=None, name='')[source]

Bases: DiGraph

具有有用实用功能的有向图子类。

此派生类保留了节点、边、插入和迭代顺序(以便迭代顺序与插入顺序匹配)。

node_dict_factory

alias of OrderedDict

adjlist_outer_dict_factory

alias of OrderedDict

adjlist_inner_dict_factory

alias of OrderedDict

edge_attr_dict_factory

alias of OrderedDict

fresh_copy()[source]

返回一个具有相同数据结构的新的图副本。

新的副本没有任何节点、边或图属性。它与当前图具有相同的数据结构。此方法通常用于创建图的空版本。

class taskflow.types.graph.OrderedGraph(incoming_graph_data=None, name='')[source]

Bases: Graph

具有有用实用功能的图子类。

此派生类保留了节点、边、插入和迭代顺序(以便迭代顺序与插入顺序匹配)。

node_dict_factory

alias of OrderedDict

adjlist_outer_dict_factory

alias of OrderedDict

adjlist_inner_dict_factory

alias of OrderedDict

edge_attr_dict_factory

alias of OrderedDict

fresh_copy()[source]

返回一个具有相同数据结构的新的图副本。

新的副本没有任何节点、边或图属性。它与当前图具有相同的数据结构。此方法通常用于创建图的空版本。

taskflow.types.graph.merge_graphs(graph, *graphs, **kwargs)[source]

将一组图合并到一个新图中。

如果没有提供其他图,则返回第一个图而不进行修改;否则,返回合并后的图。

通知者

class taskflow.types.notifier.Listener(callback, args=None, kwargs=None, details_filter=None)[source]

基类: object

表示通知侦听器/目标的不可变辅助类。

property callback

回调(不能为空),将在事件+详情时调用。

property details_filter

回调(可能为空),在调用时用于丢弃事件+详情。

property kwargs

以后调用时使用的关键字参数字典。

property args

以后调用时使用的位置参数元组。

__call__(event_type, details)[source]

使用给定事件+详情激活目标回调。

注意(harlowja):如果存在细节过滤器回调,并且该回调在与提供的 `details` 调用时返回一个虚值,那么目标回调将不会被调用。

is_equivalent(callback, details_filter=None)[source]

检查回调是否相同

参数:
  • callback – 用于比较的回调

  • details_filter – 用于比较的回调

返回值:

如果不是相同回调则为 False,否则为 True

返回类型:

布尔值

class taskflow.types.notifier.Notifier[source]

基类: object

一个通知(发布/订阅类似)辅助类。

它旨在用于订阅事件发生的通知,并允许实体发布这些通知给任何关联的订阅者,而无需任一实体关心通知如何发生。

不是线程安全的,当一个通知者在同一时间被多个线程修改时。例如,当多个线程同时调用 `register()` 或 `reset()` 时,可能会导致问题。当只有 `notify()` 调用或其他只读操作(如调用 `is_registered()`)同时发生时,它是线程安全的。

RESERVED_KEYS = ('details',)

回调参数中不能使用的键

ANY = '*'

用于接收所有通知的 Kleene 星号常量

is_registered(event_type, callback, details_filter=None)[source]

检查回调是否已注册。

返回值:

检查回调是否已注册

返回类型:

布尔值

reset()[source]

忘记所有先前注册的回调。

notify(event_type, details)[source]

通知事件发生。

所有注册以接收给定事件类型通知的回调都将被调用。如果提供的事件类型无法用于发出通知(通过 `can_be_registered()` 方法进行检查),则将静默丢弃(通知失败不允许导致或引发异常)。

参数:
  • event_type – 发生的事件类型

  • details (dictionary) – 传递给具有相同名称的回调关键字参数的其他事件详情字典

register(event_type, callback, args=None, kwargs=None, details_filter=None)[source]

注册一个回调,以便在给定类型的事件发生时调用。

当事件类型发生时(或在任何事件发生时,如果 `event_type` 等于 `ANY`),将使用提供的 `args` 和 `kwargs` 调用回调。它还将获得一个额外的关键字参数 `details`,该参数将保存传递给 `notify()` 方法的事件详情(如果提供了细节过滤器回调,则只有当细节过滤器回调返回一个真值时,目标回调才会被触发)。

参数:
  • event_type – 事件类型输入

  • callback – 要注册的函数回调。

  • args (list) – 非关键字参数

  • kwargs (dictionary) – 键值对参数

deregister(event_type, callback, details_filter=None)[source]

移除绑定到事件 `event_type` 的单个侦听器。

参数:

event_type – 注销绑定到 event_type 的侦听器

deregister_event(event_type)[source]

移除绑定到事件 `event_type` 的一组侦听器。

参数:

event_type – 注销绑定到 event_type 的侦听器

listeners_iter()[source]

返回一个关于事件 => 绑定侦听器映射的迭代器。

注意(harlowja):在生成的 (event, listeners) 元组中,每个侦听器都是 `Listener` 类型的实例,它本身包装了一个提供的回调(以及其细节过滤器回调,如果有的话)。

can_be_registered(event_type)[source]

检查事件是否可以被注册/订阅。

can_trigger_notification(event_type)[source]

检查事件是否可以触发通知。

参数:

event_type – 需要验证的事件

返回值:

事件是否可以触发通知

返回类型:

布尔值

class taskflow.types.notifier.RestrictedNotifier(watchable_events, allow_any=True)[source]

Bases: Notifier

一个限制事件注册/触发的通知类。

注意(harlowja):此类与 `Notifier` 不同,它会限制并禁止注册未在构造通知者时声明的事件类型的回调。

events_iter()[source]

返回可注册/订阅的事件的迭代器。

注意(harlowja):不包含 `ANY` 事件类型,因为该元类型不是特定事件,而是捕获一切,并不意味着与特定事件类型相同的含义。

can_be_registered(event_type)[source]

检查事件是否可以被注册/订阅。

参数:

event_type – 需要验证的事件

返回值:

事件是否可以被注册/订阅

返回类型:

布尔值

taskflow.types.notifier.register_deregister(notifier, event_type, callback=None, args=None, kwargs=None, details_filter=None)[source]

在退出时注册回调,然后注销回调的上下文管理器。

注意(harlowja):如果回调是 None,则此操作不注册任何内容,这

与 `register` 方法的行为不同,后者接受 None,因为它不可调用……

集合

class taskflow.types.sets.OrderedSet(iterable=None)[source]

Bases: Set, Hashable

一个只读的、可哈希的集合,它保留插入/初始顺序。

它应该在所有使用 `frozenset` 的地方都能正常工作。

参见:https://mail.python.org/pipermail/python-ideas/2009-May/004567.html,其中讨论了一个可能最终将来某天)将此代码(或类似代码)包含在 mainline python 代码库中的想法线程(尽管该线程的最终结果在该方面有些令人沮丧)。

copy()[source]

返回集合的浅拷贝。

intersection(*sets)[source]

返回两个或多个集合的交集,作为一个新的集合。

(即,所有集合共有的元素。)

issuperset(other)[source]

报告此集合是否包含另一个集合。

issubset(other)[source]

报告另一个集合是否包含此集合。

difference(*sets)[source]

返回两个或多个集合的差集,作为一个新的集合。

(即,此集合中存在但其他集合中不存在的所有元素。)

union(*sets)[source]

返回集合的并集,作为一个新的集合。

(即,任意一个集合中存在的元素。)

计时

class taskflow.types.timing.Timeout(value, event_factory=<class 'threading.Event'>)[source]

基类: object

表示超时的对象。

此对象能够在实际超时之前被中断。

property value

内部使用的超时的不可变值。

interrupt()[source]

强制设置超时(释放所有等待者)。

is_stopped()[source]

返回超时是否已被中断。

wait()[source]

阻塞当前线程(直到超时)并等待直到被中断。

reset()[source]

重置,以便可以再次发生中断(和等待)。

taskflow.types.timing.convert_to_timeout(value=None, default_value=None, event_factory=<class 'threading.Event'>)[source]

将给定值转换为超时实例(并返回它)。

如果提供的值已经是超时实例,则不执行任何操作。

exception taskflow.types.tree.FrozenNode[source]

基础: Exception

当一个冻结的节点被修改时引发的异常。

class taskflow.types.tree.Node(item, **kwargs)[source]

基类: object

一个 n 叉节点类,可用于创建树结构。

STARTING_PREFIX = ''

在 `pformat()` 中使用的默认字符串前缀。

EMPTY_SPACE_SEP = ' '

用于在 `pformat()` 中创建空格的默认字符串。

HORIZONTAL_CONN = '__'

用于将节点水平连接到其父节点的默认字符串(在 `pformat()` 中使用)。

VERTICAL_CONN = '|'

用于将节点垂直连接到其父节点的默认字符串(在 `pformat()` 中使用)。

LINE_SEP = '\n'

在 `pformat()` 中使用的默认换行符。

add(child)[source]

将子节点添加到此节点(附加到现有子节点的左侧)。

注意(harlowja):这将同时设置子节点的父节点为本节点。

empty()[source]

返回节点是否为叶子节点。

path_iter(include_self=True)[source]

生成从本节点到根节点的路径。

find_first_match(matcher, only_direct=False, include_self=True)[source]

查找第一个回调函数返回 true 的节点。

这将不仅搜索当前节点,还会搜索任何子节点(按深度优先顺序,从右到左),最后如果没有任何匹配项,则返回 None 而不是节点对象。

参数:
  • matcher – 回调函数,接受一个位置参数(一个节点),如果匹配所需节点则返回 true,否则返回 false。

  • only_direct – 仅查看当前节点及其直接子节点(这意味着不进行深度优先搜索)。

  • include_self – 在搜索时包含当前节点。

返回值:

匹配的节点(或 None

find(item, only_direct=False, include_self=True)[source]

如果此节点中存在 item,则返回其第一个节点。

这将不仅搜索当前节点,还会搜索任何子节点(按深度优先顺序,从右到左),最后如果没有任何匹配项,则返回 None 而不是节点对象。

参数:
  • item – 要查找的项。

  • only_direct – 仅查看当前节点及其直接子节点(这意味着不进行深度优先搜索)。

  • include_self – 在搜索时包含当前节点。

返回值:

匹配提供项的节点(或 None

disassociate()[source]

将此节点从其父节点(如果存在)中移除。

返回值:

从此父节点中移除的此节点的出现次数。

remove(item, only_direct=False, include_self=True)[source]

从这些节点的子节点中移除一项。

这将不仅搜索当前节点,还会搜索任何子节点,最后如果找不到任何项,则会引发 ValueError,而不是像通常那样返回已移除的节点对象。

参数:
  • item – 要查找的项。

  • only_direct – 仅查看当前节点及其直接子节点(这意味着不进行深度优先搜索)。

  • include_self – 在搜索时包含当前节点。

pformat(stringify_node=None, linesep='\n', vertical_conn='|', horizontal_conn='__', empty_space=' ', starting_prefix='')[source]

将此节点及其子节点格式化为漂亮的字符串表示。

示例:

>>> from taskflow.types import tree
>>> yahoo = tree.Node("CEO")
>>> yahoo.add(tree.Node("Infra"))
>>> yahoo[0].add(tree.Node("Boss"))
>>> yahoo[0][0].add(tree.Node("Me"))
>>> yahoo.add(tree.Node("Mobile"))
>>> yahoo.add(tree.Node("Mail"))
>>> print(yahoo.pformat())
CEO
|__Infra
|  |__Boss
|     |__Me
|__Mobile
|__Mail
child_count(only_direct=True)[source]

返回此节点有多少个子节点。

这可以是仅当前节点的直接子节点,也可以包括当前节点的所有子节点(子节点的子节点,以此类推)。

注意(harlowja):此计数不包括当前节点。

reverse_iter()[source]

迭代当前节点的直接子节点(从左到右)。

index(item)[source]

查找给定项的子索引,按添加顺序搜索。

dfs_iter(include_self=False, right_to_left=True)[source]

深度优先遍历(非递归)子节点。

bfs_iter(include_self=False, right_to_left=False)[source]

广度优先遍历(非递归)子节点。

to_digraph()[source]

将此节点及其子节点转换为有序有向图。

返回的图将具有与此节点及其子节点相同的结构(节点元数据将被转换为图节点元数据)。

返回值:

一个有向图

返回类型:

taskflow.types.graph.OrderedDiGraph