通知和监听器¶
概述¶
引擎提供了一种接收关于任务和流程状态转换的通知的方式(参见 states),这对于监控、日志记录、指标、调试以及许多其他任务都很有用。
要接收这些通知,您应该将一个回调函数注册到一个附加到 Engine 属性 atom_notifier 和 notifier 的 Notifier 类的实例上。
TaskFlow 还提供了一组预定义的 监听器,并提供了编写自己的监听器的方法,这比使用原始回调函数更方便。
使用回调函数接收通知¶
流程通知¶
要接收流程状态变化的通知,请使用引擎的 notifier 属性提供的 Notifier 实例。
一个基本示例是
>>> class CatTalk(task.Task):
... def execute(self, meow):
... print(meow)
... return "cat"
...
>>> class DogTalk(task.Task):
... def execute(self, woof):
... print(woof)
... return 'dog'
...
>>> def flow_transition(state, details):
... print("Flow '%s' transition to state %s" % (details['flow_name'], state))
...
>>>
>>> flo = linear_flow.Flow("cat-dog").add(
... CatTalk(), DogTalk(provides="dog"))
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> eng.notifier.register(ANY, flow_transition)
>>> eng.run()
Flow 'cat-dog' transition to state RUNNING
meow
woof
Flow 'cat-dog' transition to state SUCCESS
任务通知¶
要接收任务状态变化的通知,请使用引擎的 atom_notifier 属性提供的 Notifier 实例。
一个基本示例是
>>> class CatTalk(task.Task):
... def execute(self, meow):
... print(meow)
... return "cat"
...
>>> class DogTalk(task.Task):
... def execute(self, woof):
... print(woof)
... return 'dog'
...
>>> def task_transition(state, details):
... print("Task '%s' transition to state %s" % (details['task_name'], state))
...
>>>
>>> flo = linear_flow.Flow("cat-dog")
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
<taskflow.patterns.linear_flow.Flow object at 0x...>
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> eng.atom_notifier.register(ANY, task_transition)
>>> eng.run()
Task 'CatTalk' transition to state RUNNING
meow
Task 'CatTalk' transition to state SUCCESS
Task 'DogTalk' transition to state RUNNING
woof
Task 'DogTalk' transition to state SUCCESS
监听器¶
TaskFlow 提供了一组预定义的监听器——这些辅助类可用于在流程和/或任务转换时执行各种操作。您也可以轻松创建自己的监听器,这在某些用例中可能比使用原始回调函数更方便。
例如,这是如何使用 PrintingListener 的方法
>>> from taskflow.listeners import printing
>>> class CatTalk(task.Task):
... def execute(self, meow):
... print(meow)
... return "cat"
...
>>> class DogTalk(task.Task):
... def execute(self, woof):
... print(woof)
... return 'dog'
...
>>>
>>> flo = linear_flow.Flow("cat-dog").add(
... CatTalk(), DogTalk(provides="dog"))
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> with printing.PrintingListener(eng):
... eng.run()
...
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved flow 'cat-dog' (...) into state 'RUNNING' from state 'PENDING'
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'CatTalk' (...) into state 'RUNNING' from state 'PENDING'
meow
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'CatTalk' (...) into state 'SUCCESS' from state 'RUNNING' with result 'cat' (failure=False)
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'DogTalk' (...) into state 'RUNNING' from state 'PENDING'
woof
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'DogTalk' (...) into state 'SUCCESS' from state 'RUNNING' with result 'dog' (failure=False)
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved flow 'cat-dog' (...) into state 'SUCCESS' from state 'RUNNING'
接口¶
- taskflow.listeners.base.FINISH_STATES = ('FAILURE', 'SUCCESS', 'REVERTED', 'REVERT_FAILURE')¶
这些状态是可用的,其他状态不产生结果。
- taskflow.listeners.base.DEFAULT_LISTEN_FOR = ('*',)¶
默认监听的内容…
- class taskflow.listeners.base.Listener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',))[source]¶
基类:
object监听器的基类。
可以将一个监听器附加到一个引擎上,以在流程和原子状态转换时执行各种操作。它实现了上下文管理器协议,以便在进入上下文时自动注册,在退出上下文时自动注销。
要实现一个监听器,请从这个类派生并重写
_flow_receiver和/或_task_receiver和/或_retry_receiver方法(在这个类中,它们不执行任何操作)。
实现¶
打印和日志记录监听器¶
- class taskflow.listeners.logging.LoggingListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), log=None, level=10)[source]¶
Bases:
DumpingListener记录收到的通知的监听器。
它监听任务和流程通知,并将这些通知写入提供的日志记录器,或者其模块(
taskflow.listeners.logging)的日志记录器(如果没有提供,并且没有覆盖类属性)。日志级别也可以配置,默认使用logging.DEBUG。
- class taskflow.listeners.logging.DynamicLoggingListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), log=None, failure_level=30, level=10, hide_inputs_outputs_of=(), fail_formatter=None, mask_inputs_keys=(), mask_outputs_keys=())[source]¶
Bases:
Listener记录收到的通知的监听器。
它监听任务和流程通知,并将这些通知写入提供的日志记录器,或者其模块(
taskflow.listeners.logging)的日志记录器(如果没有提供,并且没有覆盖类属性)。日志级别可以稍微配置,并且logging.DEBUG或logging.WARNING(除非通过构造函数参数覆盖)将根据执行状态和产生的结果自动选择。以下流程状态会导致使用
logging.WARNING(或提供的级别)states.FAILUREstates.REVERTED
以下任务状态会导致使用
logging.WARNING(或提供的级别)states.FAILUREstates.RETRYINGstates.REVERTINGstates.REVERT_FAILURE
当任务产生
Failure对象作为其结果时(通常发生在任务引发异常时),这将始终将日志记录器切换为使用logging.WARNING(如果失败对象包含exc_info元组,它也将被记录以提供有意义的回溯)。
- class taskflow.listeners.printing.PrintingListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), stderr=False)[source]¶
Bases:
DumpingListener将任务和流程通知消息写入 stdout 或 stderr。
计时监听器¶
- class taskflow.listeners.timing.DurationListener(engine)[source]¶
Bases:
Listener捕获任务持续时间的监听器。
它记录任务执行(或失败)所花费的时间到存储中。它以浮点数形式将持续时间(秒)保存到任务元数据中,键为
'duration'。
- class taskflow.listeners.timing.PrintingDurationListener(engine, printer=None)[source]¶
Bases:
DurationListener打印持续时间并记录它的监听器。
认领监听器¶
- class taskflow.listeners.claims.CheckingClaimListener(engine, job, board, owner, on_job_loss=None)[source]¶
Bases:
Listener与 [engine, job, jobboard] 交互的监听器;确保认领有效。
此监听器(或其派生类)可以在任务已被认领后(以便引擎可以处理该任务的工作)与引擎的通知系统相关联。此监听器(在关联后)将在引擎通知任务或流程状态更改时检查该任务是否仍然被认领。如果状态更改发生时任务未被认领,将激活一个关联的处理程序(或默认处理程序)来确定如何响应希望是异常情况。
注意(harlowja): 这可能会给 jobboard 后端(zookeeper 或其他)带来比预期更多的流量,因为每个任务和流程的状态更改数量非零(并且在每次状态更改时进行检查将导致相当多的对该管理系统的调用以检查任务的认领状态);这以后可以优化为减少检查次数(或仅在较少的状态下检查)。
注意(harlowja): 如果提供了自定义的
on_job_loss回调,它必须接受三个位置参数,第一个是当前运行的引擎,第二个是“任务/流程”状态,第三个是引擎发送给监听器的用于检查的详细信息。
捕获监听器¶
- class taskflow.listeners.capturing.CaptureListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), capture_flow=True, capture_task=True, capture_retry=True, skip_tasks=None, skip_retries=None, skip_flows=None, values=None)[source]¶
Bases:
Listener一个捕获转换并本地保存它们的监听器。
注意(harlowja): 此监听器主要用于测试(其中测试适当/预期的转换、产生的正确结果…在引擎运行后发生),但它也可能有其他用途。
- 变量:
values – 捕获的转换+详细信息(
_format_capture()方法的结果)将被存储到此列表中(可以通过同名构造函数关键字参数提供一个以前的列表以供追加);默认情况下,它存储格式为(kind, state, details)的元组。
- FLOW = 'flow'¶
表示“流程”捕获的种类。
- TASK = 'task'¶
表示“任务”捕获的种类。
- RETRY = 'retry'¶
表示“重试”捕获的种类。
格式化器¶
层级¶
