通知和监听器

概述

引擎提供了一种接收关于任务和流程状态转换的通知的方式(参见 states),这对于监控、日志记录、指标、调试以及许多其他任务都很有用。

要接收这些通知,您应该将一个回调函数注册到一个附加到 Engine 属性 atom_notifiernotifierNotifier 类的实例上。

TaskFlow 还提供了一组预定义的 监听器,并提供了编写自己的监听器的方法,这比使用原始回调函数更方便。

使用回调函数接收通知

流程通知

要接收流程状态变化的通知,请使用引擎的 notifier 属性提供的 Notifier 实例。

一个基本示例是

>>> class CatTalk(task.Task):
...   def execute(self, meow):
...     print(meow)
...     return "cat"
...
>>> class DogTalk(task.Task):
...   def execute(self, woof):
...     print(woof)
...     return 'dog'
...
>>> def flow_transition(state, details):
...     print("Flow '%s' transition to state %s" % (details['flow_name'], state))
...
>>>
>>> flo = linear_flow.Flow("cat-dog").add(
...   CatTalk(), DogTalk(provides="dog"))
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> eng.notifier.register(ANY, flow_transition)
>>> eng.run()
Flow 'cat-dog' transition to state RUNNING
meow
woof
Flow 'cat-dog' transition to state SUCCESS

任务通知

要接收任务状态变化的通知,请使用引擎的 atom_notifier 属性提供的 Notifier 实例。

一个基本示例是

>>> class CatTalk(task.Task):
...   def execute(self, meow):
...     print(meow)
...     return "cat"
...
>>> class DogTalk(task.Task):
...   def execute(self, woof):
...     print(woof)
...     return 'dog'
...
>>> def task_transition(state, details):
...     print("Task '%s' transition to state %s" % (details['task_name'], state))
...
>>>
>>> flo = linear_flow.Flow("cat-dog")
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
<taskflow.patterns.linear_flow.Flow object at 0x...>
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> eng.atom_notifier.register(ANY, task_transition)
>>> eng.run()
Task 'CatTalk' transition to state RUNNING
meow
Task 'CatTalk' transition to state SUCCESS
Task 'DogTalk' transition to state RUNNING
woof
Task 'DogTalk' transition to state SUCCESS

监听器

TaskFlow 提供了一组预定义的监听器——这些辅助类可用于在流程和/或任务转换时执行各种操作。您也可以轻松创建自己的监听器,这在某些用例中可能比使用原始回调函数更方便。

例如,这是如何使用 PrintingListener 的方法

>>> from taskflow.listeners import printing
>>> class CatTalk(task.Task):
...   def execute(self, meow):
...     print(meow)
...     return "cat"
...
>>> class DogTalk(task.Task):
...   def execute(self, woof):
...     print(woof)
...     return 'dog'
...
>>>
>>> flo = linear_flow.Flow("cat-dog").add(
...   CatTalk(), DogTalk(provides="dog"))
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> with printing.PrintingListener(eng):
...   eng.run()
...
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved flow 'cat-dog' (...) into state 'RUNNING' from state 'PENDING'
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'CatTalk' (...) into state 'RUNNING' from state 'PENDING'
meow
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'CatTalk' (...) into state 'SUCCESS' from state 'RUNNING' with result 'cat' (failure=False)
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'DogTalk' (...) into state 'RUNNING' from state 'PENDING'
woof
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'DogTalk' (...) into state 'SUCCESS' from state 'RUNNING' with result 'dog' (failure=False)
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved flow 'cat-dog' (...) into state 'SUCCESS' from state 'RUNNING'

接口

taskflow.listeners.base.FINISH_STATES = ('FAILURE', 'SUCCESS', 'REVERTED', 'REVERT_FAILURE')

这些状态是可用的,其他状态不产生结果。

taskflow.listeners.base.DEFAULT_LISTEN_FOR = ('*',)

默认监听的内容…

class taskflow.listeners.base.Listener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',))[source]

基类: object

监听器的基类。

可以将一个监听器附加到一个引擎上,以在流程和原子状态转换时执行各种操作。它实现了上下文管理器协议,以便在进入上下文时自动注册,在退出上下文时自动注销。

要实现一个监听器,请从这个类派生并重写 _flow_receiver 和/或 _task_receiver 和/或 _retry_receiver 方法(在这个类中,它们不执行任何操作)。

class taskflow.listeners.base.DumpingListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',))[source]

Bases: Listener

用于转储监听器的抽象基类。

这提供了一个简单的监听器,可以附加到一个引擎上,该引擎可以派生出来将任务和/或流程状态转换转储到某个目标后端。

要实现自己的转储监听器,请从此类派生并重写 _dump 方法。

实现

打印和日志记录监听器

class taskflow.listeners.logging.LoggingListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), log=None, level=10)[source]

Bases: DumpingListener

记录收到的通知的监听器。

它监听任务和流程通知,并将这些通知写入提供的日志记录器,或者其模块(taskflow.listeners.logging)的日志记录器(如果没有提供,并且没有覆盖类属性)。日志级别也可以配置,默认使用 logging.DEBUG

class taskflow.listeners.logging.DynamicLoggingListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), log=None, failure_level=30, level=10, hide_inputs_outputs_of=(), fail_formatter=None, mask_inputs_keys=(), mask_outputs_keys=())[source]

Bases: Listener

记录收到的通知的监听器。

它监听任务和流程通知,并将这些通知写入提供的日志记录器,或者其模块(taskflow.listeners.logging)的日志记录器(如果没有提供,并且没有覆盖类属性)。日志级别可以稍微配置,并且logging.DEBUGlogging.WARNING(除非通过构造函数参数覆盖)将根据执行状态和产生的结果自动选择。

以下流程状态会导致使用 logging.WARNING(或提供的级别)

  • states.FAILURE

  • states.REVERTED

以下任务状态会导致使用 logging.WARNING(或提供的级别)

  • states.FAILURE

  • states.RETRYING

  • states.REVERTING

  • states.REVERT_FAILURE

当任务产生 Failure 对象作为其结果时(通常发生在任务引发异常时),这将始终将日志记录器切换为使用 logging.WARNING(如果失败对象包含 exc_info 元组,它也将被记录以提供有意义的回溯)。

class taskflow.listeners.printing.PrintingListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), stderr=False)[source]

Bases: DumpingListener

将任务和流程通知消息写入 stdout 或 stderr。

计时监听器

class taskflow.listeners.timing.DurationListener(engine)[source]

Bases: Listener

捕获任务持续时间的监听器。

它记录任务执行(或失败)所花费的时间到存储中。它以浮点数形式将持续时间(秒)保存到任务元数据中,键为 'duration'

class taskflow.listeners.timing.PrintingDurationListener(engine, printer=None)[source]

Bases: DurationListener

打印持续时间并记录它的监听器。

class taskflow.listeners.timing.EventTimeListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',))[source]

Bases: Listener

捕获任务、流程和重试事件时间戳的监听器。

它记录事件接收的时间(使用 Unix 时间)到存储中。它将时间戳以 {event}-timestamp 的格式保存在原子或流程详细元数据下,其中 event 是收到的状态/事件名称。

这些信息可以稍后提取/检查以得出持续时间…

认领监听器

class taskflow.listeners.claims.CheckingClaimListener(engine, job, board, owner, on_job_loss=None)[source]

Bases: Listener

与 [engine, job, jobboard] 交互的监听器;确保认领有效。

此监听器(或其派生类)可以在任务已被认领后(以便引擎可以处理该任务的工作)与引擎的通知系统相关联。此监听器(在关联后)将在引擎通知任务或流程状态更改检查该任务是否仍然被认领。如果状态更改发生时任务未被认领,将激活一个关联的处理程序(或默认处理程序)来确定如何响应希望是异常情况。

注意(harlowja): 这可能会给 jobboard 后端(zookeeper 或其他)带来比预期更多的流量,因为每个任务和流程的状态更改数量非零(并且在每次状态更改时进行检查将导致相当多的对该管理系统的调用以检查任务的认领状态);这以后可以优化为减少检查次数(或仅在较少的状态下检查)。

注意(harlowja): 如果提供了自定义的 on_job_loss 回调,它必须接受三个位置参数,第一个是当前运行的引擎,第二个是“任务/流程”状态,第三个是引擎发送给监听器的用于检查的详细信息。

捕获监听器

class taskflow.listeners.capturing.CaptureListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), capture_flow=True, capture_task=True, capture_retry=True, skip_tasks=None, skip_retries=None, skip_flows=None, values=None)[source]

Bases: Listener

一个捕获转换并本地保存它们的监听器。

注意(harlowja): 此监听器主要用于测试(其中测试适当/预期的转换、产生的正确结果…在引擎运行后发生),但它也可能有其他用途。

变量:

values – 捕获的转换+详细信息(_format_capture() 方法的结果)将被存储到此列表中(可以通过同名构造函数关键字参数提供一个以前的列表以供追加);默认情况下,它存储格式为 (kind, state, details) 的元组。

FLOW = 'flow'

表示“流程”捕获的种类。

TASK = 'task'

表示“任务”捕获的种类。

RETRY = 'retry'

表示“重试”捕获的种类。

格式化器

class taskflow.formatters.FailureFormatter(engine, hide_inputs_outputs_of=(), mask_inputs_keys=(), mask_outputs_keys=())[source]

基类: object

格式化一个失败,并将其与相关的原子和引擎连接。

format(fail, atom_matcher)[source]

返回关于失败的 (exc_info, details) 元组。

The exc_info tuple应该是一个标准的三个元素(exctype, value, traceback)元组,它将用于进一步的日志记录。对于 details,通常会返回一个非空字符串;它应该包含任何关于失败的字符串信息(其中 exc_info 可能没有/包含任何特定细节)。

层级

Inheritance diagram of taskflow.listeners.base.DumpingListener, taskflow.listeners.base.Listener, taskflow.listeners.capturing.CaptureListener, taskflow.listeners.claims.CheckingClaimListener, taskflow.listeners.logging.DynamicLoggingListener, taskflow.listeners.logging.LoggingListener, taskflow.listeners.printing.PrintingListener, taskflow.listeners.timing.PrintingDurationListener, taskflow.listeners.timing.EventTimeListener, taskflow.listeners.timing.DurationListener