输入和输出¶
在 TaskFlow 中,有多种方法可以为您的任务和流程提供输入并从中获取信息。本文档描述了其中一种方法,它涉及任务参数和结果。 还有 通知,它允许您在任务或流程更改状态时收到通知。 您还可以选择直接使用 持久化 层。
流程输入和输出¶
任务通过任务参数接受输入,并通过任务结果提供输出(有关更多详细信息,请参阅 参数和结果)。 这是在流程的一个任务之间传递数据的标准且推荐方法。 当然,并非每个任务参数都需要提供给流程的另一个任务,也并非每个任务结果都应被每个任务消耗。
如果某个值被流程的一个或多个任务需要,但未由任何任务提供,则该值被认为是流程输入,必须在运行流程之前将其放入存储中。 可以通过该流程的 requires 属性检索流程所需的名称集合。 这些名称可用于确定哪些名称适用于提前放入存储,哪些名称不适用。
流程的所有任务提供的值都被认为是流程输出;可以通过流程的 provides 属性获取这些值的名称集合。
例如
>>> class MyTask(task.Task):
... def execute(self, **kwargs):
... return 1, 2
...
>>> flow = linear_flow.Flow('test').add(
... MyTask(requires='a', provides=('b', 'c')),
... MyTask(requires='b', provides='d')
... )
>>> flow.requires
frozenset(['a'])
>>> sorted(flow.provides)
['b', 'c', 'd']
如您所见,此流程不需要 b,因为它由第一个任务提供。
引擎和存储¶
存储层是引擎持久化流程和任务细节的方式(有关更深入的细节,请参阅 持久化)。
输入¶
如上所述,如果某个值被流程的一个或多个任务需要,但未由任何任务提供,则该值被认为是流程输入,必须在运行流程之前将其放入存储中。 否则,引擎将在运行之前引发 MissingDependencies。
>>> class CatTalk(task.Task):
... def execute(self, meow):
... print meow
... return "cat"
...
>>> class DogTalk(task.Task):
... def execute(self, woof):
... print woof
... return "dog"
...
>>> flo = linear_flow.Flow("cat-dog")
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
<taskflow.patterns.linear_flow.Flow object at 0x...>
>>> engines.run(flo)
Traceback (most recent call last):
...
taskflow.exceptions.MissingDependencies: 'linear_flow.Flow: cat-dog(len=2)' requires ['meow', 'woof'] but no other entity produces said requirements
MissingDependencies: 'execute' method on '__main__.DogTalk==1.0' requires ['woof'] but no other entity produces said requirements
MissingDependencies: 'execute' method on '__main__.CatTalk==1.0' requires ['meow'] but no other entity produces said requirements
提供流程输入的推荐方法是使用引擎助手(run() 或 load())的 store 参数。
>>> class CatTalk(task.Task):
... def execute(self, meow):
... print meow
... return "cat"
...
>>> class DogTalk(task.Task):
... def execute(self, woof):
... print woof
... return "dog"
...
>>> flo = linear_flow.Flow("cat-dog")
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
<taskflow.patterns.linear_flow.Flow object at 0x...>
>>> result = engines.run(flo, store={'meow': 'meow', 'woof': 'woof'})
meow
woof
>>> pprint(result)
{'dog': 'dog', 'meow': 'meow', 'woof': 'woof'}
您也可以直接与引擎存储层交互以添加其他值,请注意,如果使用此方法,则无法使用助手方法 run()。 相反,您必须直接激活引擎的 run 方法 run()。
>>> flo = linear_flow.Flow("cat-dog")
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
<taskflow.patterns.linear_flow.Flow object at 0x...>
>>> eng = engines.load(flo, store={'meow': 'meow'})
>>> eng.storage.inject({"woof": "bark"})
>>> eng.run()
meow
bark
Outputs¶
如上例所示,run 方法以 dict 的形式返回所有流程输出。 相同的数据可以通过引擎存储对象的 fetch_all() 方法获取。 您还可以使用引擎存储对象的 fetch() 方法获取单个结果。
例如
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> eng.run()
meow
woof
>>> pprint(eng.storage.fetch_all())
{'dog': 'dog', 'meow': 'meow', 'woof': 'woof'}
>>> print(eng.storage.fetch("dog"))
dog