示例¶
在开发 TaskFlow 的过程中,团队努力确保各种概念都通过相关示例进行解释。以下是一些精选的示例,供您入门(按感知复杂度排序)
要探索更多示例,请查看 TaskFlow 示例 目录中的 源代码树。
注意
如果提供的示例不能令人满意(或未达到您的标准),欢迎并非常感谢您贡献以帮助改进它们。示例的质量越高、越清晰,对每个人就越好、越有用。
Hello world¶
注意
完整源代码位于 hello_world。
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.patterns import linear_flow as lf
13from taskflow.patterns import unordered_flow as uf
14from taskflow import task
15
16
17# INTRO: This is the defacto hello world equivalent for taskflow; it shows how
18# an overly simplistic workflow can be created that runs using different
19# engines using different styles of execution (all can be used to run in
20# parallel if a workflow is provided that is parallelizable).
21
22class PrinterTask(task.Task):
23 def __init__(self, name, show_name=True, inject=None):
24 super().__init__(name, inject=inject)
25 self._show_name = show_name
26
27 def execute(self, output):
28 if self._show_name:
29 print("{}: {}".format(self.name, output))
30 else:
31 print(output)
32
33
34# This will be the work that we want done, which for this example is just to
35# print 'hello world' (like a song) using different tasks and different
36# execution models.
37song = lf.Flow("beats")
38
39# Unordered flows when ran can be ran in parallel; and a chorus is everyone
40# singing at once of course!
41hi_chorus = uf.Flow('hello')
42world_chorus = uf.Flow('world')
43for (name, hello, world) in [('bob', 'hello', 'world'),
44 ('joe', 'hellooo', 'worllllld'),
45 ('sue', "helloooooo!", 'wooorllld!')]:
46 hi_chorus.add(PrinterTask("%s@hello" % name,
47 # This will show up to the execute() method of
48 # the task as the argument named 'output' (which
49 # will allow us to print the character we want).
50 inject={'output': hello}))
51 world_chorus.add(PrinterTask("%s@world" % name,
52 inject={'output': world}))
53
54# The composition starts with the conductor and then runs in sequence with
55# the chorus running in parallel, but no matter what the 'hello' chorus must
56# always run before the 'world' chorus (otherwise the world will fall apart).
57song.add(PrinterTask("conductor@begin",
58 show_name=False, inject={'output': "*ding*"}),
59 hi_chorus,
60 world_chorus,
61 PrinterTask("conductor@end",
62 show_name=False, inject={'output': "*dong*"}))
63
64# Run in parallel using eventlet green threads...
65try:
66 import eventlet as _eventlet # noqa
67except ImportError:
68 # No eventlet currently active, skip running with it...
69 pass
70else:
71 print("-- Running in parallel using eventlet --")
72 e = engines.load(song, executor='greenthreaded', engine='parallel',
73 max_workers=1)
74 e.run()
75
76
77# Run in parallel using real threads...
78print("-- Running in parallel using threads --")
79e = engines.load(song, executor='threaded', engine='parallel',
80 max_workers=1)
81e.run()
82
83
84# Run in parallel using external processes...
85print("-- Running in parallel using processes --")
86e = engines.load(song, executor='processes', engine='parallel',
87 max_workers=1)
88e.run()
89
90
91# Run serially (aka, if the workflow could have been ran in parallel, it will
92# not be when ran in this mode)...
93print("-- Running serially --")
94e = engines.load(song, engine='serial')
95e.run()
96print("-- Statistics gathered --")
97print(e.statistics)
在任务之间传递值¶
注意
完整源代码位于 simple_linear_pass。
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6self_dir = os.path.abspath(os.path.dirname(__file__))
7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
8 os.pardir,
9 os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13from taskflow import engines
14from taskflow.patterns import linear_flow
15from taskflow import task
16
17# INTRO: This example shows how a task (in a linear/serial workflow) can
18# produce an output that can be then consumed/used by a downstream task.
19
20
21class TaskA(task.Task):
22 default_provides = 'a'
23
24 def execute(self):
25 print("Executing '%s'" % (self.name))
26 return 'a'
27
28
29class TaskB(task.Task):
30 def execute(self, a):
31 print("Executing '%s'" % (self.name))
32 print("Got input '%s'" % (a))
33
34
35print("Constructing...")
36wf = linear_flow.Flow("pass-from-to")
37wf.add(TaskA('a'), TaskB('b'))
38
39print("Loading...")
40e = engines.load(wf)
41
42print("Compiling...")
43e.compile()
44
45print("Preparing...")
46e.prepare()
47
48print("Running...")
49e.run()
50
51print("Done...")
使用监听器¶
注意
完整源代码位于 echo_listener。
1import os
2import sys
3
4logging.basicConfig(level=logging.DEBUG)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.listeners import logging as logging_listener
13from taskflow.patterns import linear_flow as lf
14from taskflow import task
15
16# INTRO: This example walks through a miniature workflow which will do a
17# simple echo operation; during this execution a listener is associated with
18# the engine to receive all notifications about what the flow has performed,
19# this example dumps that output to the stdout for viewing (at debug level
20# to show all the information which is possible).
21
22
23class Echo(task.Task):
24 def execute(self):
25 print(self.name)
26
27
28# Generate the work to be done (but don't do it yet).
29wf = lf.Flow('abc')
30wf.add(Echo('a'))
31wf.add(Echo('b'))
32wf.add(Echo('c'))
33
34# This will associate the listener with the engine (the listener
35# will automatically register for notifications with the engine and deregister
36# when the context is exited).
37e = engines.load(wf)
38with logging_listener.DynamicLoggingListener(e):
39 e.run()
使用监听器(监听电话)¶
注意
完整源代码位于 simple_linear_listening。
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14from taskflow.types import notifier
15
16ANY = notifier.Notifier.ANY
17
18# INTRO: In this example we create two tasks (this time as functions instead
19# of task subclasses as in the simple_linear.py example), each of which ~calls~
20# a given ~phone~ number (provided as a function input) in a linear fashion
21# (one after the other).
22#
23# For a workflow which is serial this shows an extremely simple way
24# of structuring your tasks (the code that does the work) into a linear
25# sequence (the flow) and then passing the work off to an engine, with some
26# initial data to be ran in a reliable manner.
27#
28# This example shows a basic usage of the taskflow structures without involving
29# the complexity of persistence. Using the structures that taskflow provides
30# via tasks and flows makes it possible for you to easily at a later time
31# hook in a persistence layer (and then gain the functionality that offers)
32# when you decide the complexity of adding that layer in is 'worth it' for your
33# applications usage pattern (which some applications may not need).
34#
35# It **also** adds on to the simple_linear.py example by adding a set of
36# callback functions which the engine will call when a flow state transition
37# or task state transition occurs. These types of functions are useful for
38# updating task or flow progress, or for debugging, sending notifications to
39# external systems, or for other yet unknown future usage that you may create!
40
41
42def call_jim(context):
43 print("Calling jim.")
44 print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
45
46
47def call_joe(context):
48 print("Calling joe.")
49 print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
50
51
52def flow_watch(state, details):
53 print('Flow => %s' % state)
54
55
56def task_watch(state, details):
57 print('Task {} => {}'.format(details.get('task_name'), state))
58
59
60# Wrap your functions into a task type that knows how to treat your functions
61# as tasks. There was previous work done to just allow a function to be
62# directly passed, but in python 3.0 there is no easy way to capture an
63# instance method, so this wrapping approach was decided upon instead which
64# can attach to instance methods (if that's desired).
65flow = lf.Flow("Call-them")
66flow.add(task.FunctorTask(execute=call_jim))
67flow.add(task.FunctorTask(execute=call_joe))
68
69# Now load (but do not run) the flow using the provided initial data.
70engine = taskflow.engines.load(flow, store={
71 'context': {
72 "joe_number": 444,
73 "jim_number": 555,
74 }
75})
76
77# This is where we attach our callback functions to the 2 different
78# notification objects that an engine exposes. The usage of a ANY (kleene star)
79# here means that we want to be notified on all state changes, if you want to
80# restrict to a specific state change, just register that instead.
81engine.notifier.register(ANY, flow_watch)
82engine.atom_notifier.register(ANY, task_watch)
83
84# And now run!
85engine.run()
转储内存后端¶
注意
完整源代码位于 dump_memory_backend。
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6self_dir = os.path.abspath(os.path.dirname(__file__))
7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
8 os.pardir,
9 os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13from taskflow import engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16
17# INTRO: in this example we create a dummy flow with a dummy task, and run
18# it using a in-memory backend and pre/post run we dump out the contents
19# of the in-memory backends tree structure (which can be quite useful to
20# look at for debugging or other analysis).
21
22
23class PrintTask(task.Task):
24 def execute(self):
25 print("Running '%s'" % self.name)
26
27# Make a little flow and run it...
28f = lf.Flow('root')
29for alpha in ['a', 'b', 'c']:
30 f.add(PrintTask(alpha))
31
32e = engines.load(f)
33e.compile()
34e.prepare()
35
36# After prepare the storage layer + backend can now be accessed safely...
37backend = e.storage.backend
38
39print("----------")
40print("Before run")
41print("----------")
42print(backend.memory.pformat())
43print("----------")
44
45e.run()
46
47print("---------")
48print("After run")
49print("---------")
50for path in backend.memory.ls_r(backend.memory.root_path, absolute=True):
51 value = backend.memory[path]
52 if value:
53 print("{} -> {}".format(path, value))
54 else:
55 print("%s" % (path))
拨打电话¶
注意
完整源代码位于 simple_linear。
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15# INTRO: In this example we create two tasks, each of which ~calls~ a given
16# ~phone~ number (provided as a function input) in a linear fashion (one after
17# the other). For a workflow which is serial this shows a extremely simple way
18# of structuring your tasks (the code that does the work) into a linear
19# sequence (the flow) and then passing the work off to an engine, with some
20# initial data to be ran in a reliable manner.
21#
22# NOTE(harlowja): This example shows a basic usage of the taskflow structures
23# without involving the complexity of persistence. Using the structures that
24# taskflow provides via tasks and flows makes it possible for you to easily at
25# a later time hook in a persistence layer (and then gain the functionality
26# that offers) when you decide the complexity of adding that layer in
27# is 'worth it' for your application's usage pattern (which certain
28# applications may not need).
29
30
31class CallJim(task.Task):
32 def execute(self, jim_number, *args, **kwargs):
33 print("Calling jim %s." % jim_number)
34
35
36class CallJoe(task.Task):
37 def execute(self, joe_number, *args, **kwargs):
38 print("Calling joe %s." % joe_number)
39
40
41# Create your flow and associated tasks (the work to be done).
42flow = lf.Flow('simple-linear').add(
43 CallJim(),
44 CallJoe()
45)
46
47# Now run that flow using the provided initial data (store below).
48taskflow.engines.run(flow, store=dict(joe_number=444,
49 jim_number=555))
拨打电话(自动回滚)¶
注意
完整源代码位于 reverting_linear。
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15# INTRO: In this example we create three tasks, each of which ~calls~ a given
16# number (provided as a function input), one of those tasks *fails* calling a
17# given number (the suzzie calling); this causes the workflow to enter the
18# reverting process, which activates the revert methods of the previous two
19# phone ~calls~.
20#
21# This simulated calling makes it appear like all three calls occur or all
22# three don't occur (transaction-like capabilities). No persistence layer is
23# used here so reverting and executing will *not* be tolerant of process
24# failure.
25
26
27class CallJim(task.Task):
28 def execute(self, jim_number, *args, **kwargs):
29 print("Calling jim %s." % jim_number)
30
31 def revert(self, jim_number, *args, **kwargs):
32 print("Calling %s and apologizing." % jim_number)
33
34
35class CallJoe(task.Task):
36 def execute(self, joe_number, *args, **kwargs):
37 print("Calling joe %s." % joe_number)
38
39 def revert(self, joe_number, *args, **kwargs):
40 print("Calling %s and apologizing." % joe_number)
41
42
43class CallSuzzie(task.Task):
44 def execute(self, suzzie_number, *args, **kwargs):
45 raise OSError("Suzzie not home right now.")
46
47
48# Create your flow and associated tasks (the work to be done).
49flow = lf.Flow('simple-linear').add(
50 CallJim(),
51 CallJoe(),
52 CallSuzzie()
53)
54
55try:
56 # Now run that flow using the provided initial data (store below).
57 taskflow.engines.run(flow, store=dict(joe_number=444,
58 jim_number=555,
59 suzzie_number=666))
60except Exception as e:
61 # NOTE(harlowja): This exception will be the exception that came out of the
62 # 'CallSuzzie' task instead of a different exception, this is useful since
63 # typically surrounding code wants to handle the original exception and not
64 # a wrapped or altered one.
65 #
66 # *WARNING* If this flow was multi-threaded and multiple active tasks threw
67 # exceptions then the above exception would be wrapped into a combined
68 # exception (the object has methods to iterate over the contained
69 # exceptions). See: exceptions.py and the class 'WrappedFailure' to look at
70 # how to deal with multiple tasks failing while running.
71 #
72 # You will also note that this is not a problem in this case since no
73 # parallelism is involved; this is ensured by the usage of a linear flow
74 # and the default engine type which is 'serial' vs being 'parallel'.
75 print("Flow failed: %s" % e)
制造汽车¶
注意
完整源代码位于 build_a_car。
1import os
2import sys
3
4
5logging.basicConfig(level=logging.ERROR)
6
7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
8 os.pardir,
9 os.pardir))
10sys.path.insert(0, top_dir)
11
12
13import taskflow.engines
14from taskflow.patterns import graph_flow as gf
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17from taskflow.types import notifier
18
19ANY = notifier.Notifier.ANY
20
21import example_utils as eu # noqa
22
23
24# INTRO: This example shows how a graph flow and linear flow can be used
25# together to execute dependent & non-dependent tasks by going through the
26# steps required to build a simplistic car (an assembly line if you will). It
27# also shows how raw functions can be wrapped into a task object instead of
28# being forced to use the more *heavy* task base class. This is useful in
29# scenarios where pre-existing code has functions that you easily want to
30# plug-in to taskflow, without requiring a large amount of code changes.
31
32
33def build_frame():
34 return 'steel'
35
36
37def build_engine():
38 return 'honda'
39
40
41def build_doors():
42 return '2'
43
44
45def build_wheels():
46 return '4'
47
48
49# These just return true to indiciate success, they would in the real work
50# do more than just that.
51
52def install_engine(frame, engine):
53 return True
54
55
56def install_doors(frame, windows_installed, doors):
57 return True
58
59
60def install_windows(frame, doors):
61 return True
62
63
64def install_wheels(frame, engine, engine_installed, wheels):
65 return True
66
67
68def trash(**kwargs):
69 eu.print_wrapped("Throwing away pieces of car!")
70
71
72def startup(**kwargs):
73 # If you want to see the rollback function being activated try uncommenting
74 # the following line.
75 #
76 # raise ValueError("Car not verified")
77 return True
78
79
80def verify(spec, **kwargs):
81 # If the car is not what we ordered throw away the car (trigger reversion).
82 for key, value in kwargs.items():
83 if spec[key] != value:
84 raise Exception("Car doesn't match spec!")
85 return True
86
87
88# These two functions connect into the state transition notification emission
89# points that the engine outputs, they can be used to log state transitions
90# that are occurring, or they can be used to suspend the engine (or perform
91# other useful activities).
92def flow_watch(state, details):
93 print('Flow => %s' % state)
94
95
96def task_watch(state, details):
97 print('Task {} => {}'.format(details.get('task_name'), state))
98
99
100flow = lf.Flow("make-auto").add(
101 task.FunctorTask(startup, revert=trash, provides='ran'),
102 # A graph flow allows automatic dependency based ordering, the ordering
103 # is determined by analyzing the symbols required and provided and ordering
104 # execution based on a functioning order (if one exists).
105 gf.Flow("install-parts").add(
106 task.FunctorTask(build_frame, provides='frame'),
107 task.FunctorTask(build_engine, provides='engine'),
108 task.FunctorTask(build_doors, provides='doors'),
109 task.FunctorTask(build_wheels, provides='wheels'),
110 # These *_installed outputs allow for other tasks to depend on certain
111 # actions being performed (aka the components were installed), another
112 # way to do this is to link() the tasks manually instead of creating
113 # an 'artificial' data dependency that accomplishes the same goal the
114 # manual linking would result in.
115 task.FunctorTask(install_engine, provides='engine_installed'),
116 task.FunctorTask(install_doors, provides='doors_installed'),
117 task.FunctorTask(install_windows, provides='windows_installed'),
118 task.FunctorTask(install_wheels, provides='wheels_installed')),
119 task.FunctorTask(verify, requires=['frame',
120 'engine',
121 'doors',
122 'wheels',
123 'engine_installed',
124 'doors_installed',
125 'windows_installed',
126 'wheels_installed']))
127
128# This dictionary will be provided to the tasks as a specification for what
129# the tasks should produce, in this example this specification will influence
130# what those tasks do and what output they create. Different tasks depend on
131# different information from this specification, all of which will be provided
132# automatically by the engine to those tasks.
133spec = {
134 "frame": 'steel',
135 "engine": 'honda',
136 "doors": '2',
137 "wheels": '4',
138 # These are used to compare the result product, a car without the pieces
139 # installed is not a car after all.
140 "engine_installed": True,
141 "doors_installed": True,
142 "windows_installed": True,
143 "wheels_installed": True,
144}
145
146
147engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
148
149# This registers all (ANY) state transitions to trigger a call to the
150# flow_watch function for flow state transitions, and registers the
151# same all (ANY) state transitions for task state transitions.
152engine.notifier.register(ANY, flow_watch)
153engine.atom_notifier.register(ANY, task_watch)
154
155eu.print_wrapped("Building a car")
156engine.run()
157
158# Alter the specification and ensure that the reverting logic gets triggered
159# since the resultant car that will be built by the build_wheels function will
160# build a car with 4 doors only (not 5), this will cause the verification
161# task to mark the car that is produced as not matching the desired spec.
162spec['doors'] = 5
163
164engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
165engine.notifier.register(ANY, flow_watch)
166engine.atom_notifier.register(ANY, task_watch)
167
168eu.print_wrapped("Building a wrong car that doesn't match specification")
169try:
170 engine.run()
171except Exception as e:
172 eu.print_wrapped("Flow failed: %s" % e)
迭代字母表(使用进程)¶
注意
完整源代码位于 alphabet_soup。
1import functools
2import logging
3import os
4import string
5import sys
6import time
7
8logging.basicConfig(level=logging.ERROR)
9
10self_dir = os.path.abspath(os.path.dirname(__file__))
11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12 os.pardir,
13 os.pardir))
14sys.path.insert(0, top_dir)
15sys.path.insert(0, self_dir)
16
17from taskflow import engines
18from taskflow import exceptions
19from taskflow.patterns import linear_flow
20from taskflow import task
21
22
23# In this example we show how a simple linear set of tasks can be executed
24# using local processes (and not threads or remote workers) with minimal (if
25# any) modification to those tasks to make them safe to run in this mode.
26#
27# This is useful since it allows further scaling up your workflows when thread
28# execution starts to become a bottleneck (which it can start to be due to the
29# GIL in python). It also offers a intermediary scalable runner that can be
30# used when the scale and/or setup of remote workers is not desirable.
31
32
33def progress_printer(task, event_type, details):
34 # This callback, attached to each task will be called in the local
35 # process (not the child processes)...
36 progress = details.pop('progress')
37 progress = int(progress * 100.0)
38 print("Task '%s' reached %d%% completion" % (task.name, progress))
39
40
41class AlphabetTask(task.Task):
42 # Second delay between each progress part.
43 _DELAY = 0.1
44
45 # This task will run in X main stages (each with a different progress
46 # report that will be delivered back to the running process...). The
47 # initial 0% and 100% are triggered automatically by the engine when
48 # a task is started and finished (so that's why those are not emitted
49 # here).
50 _PROGRESS_PARTS = [fractions.Fraction("%s/5" % x) for x in range(1, 5)]
51
52 def execute(self):
53 for p in self._PROGRESS_PARTS:
54 self.update_progress(p)
55 time.sleep(self._DELAY)
56
57
58print("Constructing...")
59soup = linear_flow.Flow("alphabet-soup")
60for letter in string.ascii_lowercase:
61 abc = AlphabetTask(letter)
62 abc.notifier.register(task.EVENT_UPDATE_PROGRESS,
63 functools.partial(progress_printer, abc))
64 soup.add(abc)
65try:
66 print("Loading...")
67 e = engines.load(soup, engine='parallel', executor='processes')
68 print("Compiling...")
69 e.compile()
70 print("Preparing...")
71 e.prepare()
72 print("Running...")
73 e.run()
74 print("Done: %s" % e.statistics)
75except exceptions.NotImplementedError as e:
76 print(e)
观察执行时间¶
注意
完整源代码位于 timing_listener。
1import os
2import random
3import sys
4import time
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13from taskflow import engines
14from taskflow.listeners import timing
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18# INTRO: in this example we will attach a listener to an engine
19# and have variable run time tasks run and show how the listener will print
20# out how long those tasks took (when they started and when they finished).
21#
22# This shows how timing metrics can be gathered (or attached onto an engine)
23# after a workflow has been constructed, making it easy to gather metrics
24# dynamically for situations where this kind of information is applicable (or
25# even adding this information on at a later point in the future when your
26# application starts to slow down).
27
28
29class VariableTask(task.Task):
30 def __init__(self, name):
31 super().__init__(name)
32 self._sleepy_time = random.random()
33
34 def execute(self):
35 time.sleep(self._sleepy_time)
36
37
38f = lf.Flow('root')
39f.add(VariableTask('a'), VariableTask('b'), VariableTask('c'))
40e = engines.load(f)
41with timing.PrintingDurationListener(e):
42 e.run()
距离计算器¶
注意
完整源代码位于 distance_calculator
1import math
2import os
3import sys
4
5top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
6 os.pardir,
7 os.pardir))
8sys.path.insert(0, top_dir)
9
10from taskflow import engines
11from taskflow.patterns import linear_flow
12from taskflow import task
13
14# INTRO: This shows how to use a tasks/atoms ability to take requirements from
15# its execute functions default parameters and shows how to provide those
16# via different methods when needed, to influence those parameters to in
17# this case calculate the distance between two points in 2D space.
18
19# A 2D point.
20Point = collections.namedtuple("Point", "x,y")
21
22
23def is_near(val, expected, tolerance=0.001):
24 # Floats don't really provide equality...
25 if val > (expected + tolerance):
26 return False
27 if val < (expected - tolerance):
28 return False
29 return True
30
31
32class DistanceTask(task.Task):
33 # See: http://en.wikipedia.org/wiki/Distance#Distance_in_Euclidean_space
34
35 default_provides = 'distance'
36
37 def execute(self, a=Point(0, 0), b=Point(0, 0)):
38 return math.sqrt(math.pow(b.x - a.x, 2) + math.pow(b.y - a.y, 2))
39
40
41if __name__ == '__main__':
42 # For these we rely on the execute() methods points by default being
43 # at the origin (and we override it with store values when we want) at
44 # execution time (which then influences what is calculated).
45 any_distance = linear_flow.Flow("origin").add(DistanceTask())
46 results = engines.run(any_distance)
47 print(results)
48 print("{} is near-enough to {}: {}".format(
49 results['distance'], 0.0, is_near(results['distance'], 0.0)))
50
51 results = engines.run(any_distance, store={'a': Point(1, 1)})
52 print(results)
53 print("{} is near-enough to {}: {}".format(
54 results['distance'], 1.4142, is_near(results['distance'], 1.4142)))
55
56 results = engines.run(any_distance, store={'a': Point(10, 10)})
57 print(results)
58 print("{} is near-enough to {}: {}".format(
59 results['distance'], 14.14199, is_near(results['distance'], 14.14199)))
60
61 results = engines.run(any_distance,
62 store={'a': Point(5, 5), 'b': Point(10, 10)})
63 print(results)
64 print("{} is near-enough to {}: {}".format(
65 results['distance'], 7.07106, is_near(results['distance'], 7.07106)))
66
67 # For this we use the ability to override at task creation time the
68 # optional arguments so that we don't need to continue to send them
69 # in via the 'store' argument like in the above (and we fix the new
70 # starting point 'a' at (10, 10) instead of (0, 0)...
71
72 ten_distance = linear_flow.Flow("ten")
73 ten_distance.add(DistanceTask(inject={'a': Point(10, 10)}))
74 results = engines.run(ten_distance, store={'b': Point(10, 10)})
75 print(results)
76 print("{} is near-enough to {}: {}".format(
77 results['distance'], 0.0, is_near(results['distance'], 0.0)))
78
79 results = engines.run(ten_distance)
80 print(results)
81 print("{} is near-enough to {}: {}".format(
82 results['distance'], 14.14199, is_near(results['distance'], 14.14199)))
表格乘法器(并行)¶
注意
完整源代码位于 parallel_table_multiply
1import logging
2import os
3import random
4import sys
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13import futurist
14
15from taskflow import engines
16from taskflow.patterns import unordered_flow as uf
17from taskflow import task
18
19# INTRO: This example walks through a miniature workflow which does a parallel
20# table modification where each row in the table gets adjusted by a thread, or
21# green thread (if eventlet is available) in parallel and then the result
22# is reformed into a new table and some verifications are performed on it
23# to ensure everything went as expected.
24
25
26MULTIPLER = 10
27
28
29class RowMultiplier(task.Task):
30 """Performs a modification of an input row, creating a output row."""
31
32 def __init__(self, name, index, row, multiplier):
33 super().__init__(name=name)
34 self.index = index
35 self.multiplier = multiplier
36 self.row = row
37
38 def execute(self):
39 return [r * self.multiplier for r in self.row]
40
41
42def make_flow(table):
43 # This creation will allow for parallel computation (since the flow here
44 # is specifically unordered; and when things are unordered they have
45 # no dependencies and when things have no dependencies they can just be
46 # ran at the same time, limited in concurrency by the executor or max
47 # workers of that executor...)
48 f = uf.Flow("root")
49 for i, row in enumerate(table):
50 f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER))
51 # NOTE(harlowja): at this point nothing has ran, the above is just
52 # defining what should be done (but not actually doing it) and associating
53 # an ordering dependencies that should be enforced (the flow pattern used
54 # forces this), the engine in the later main() function will actually
55 # perform this work...
56 return f
57
58
59def main():
60 if len(sys.argv) == 2:
61 tbl = []
62 with open(sys.argv[1], 'rb') as fh:
63 reader = csv.reader(fh)
64 for row in reader:
65 tbl.append([float(r) if r else 0.0 for r in row])
66 else:
67 # Make some random table out of thin air...
68 tbl = []
69 cols = random.randint(1, 100)
70 rows = random.randint(1, 100)
71 for _i in range(0, rows):
72 row = []
73 for _j in range(0, cols):
74 row.append(random.random())
75 tbl.append(row)
76
77 # Generate the work to be done.
78 f = make_flow(tbl)
79
80 # Now run it (using the specified executor)...
81 try:
82 executor = futurist.GreenThreadPoolExecutor(max_workers=5)
83 except RuntimeError:
84 # No eventlet currently active, use real threads instead.
85 executor = futurist.ThreadPoolExecutor(max_workers=5)
86 try:
87 e = engines.load(f, engine='parallel', executor=executor)
88 for st in e.run_iter():
89 print(st)
90 finally:
91 executor.shutdown()
92
93 # Find the old rows and put them into place...
94 #
95 # TODO(harlowja): probably easier just to sort instead of search...
96 computed_tbl = []
97 for i in range(0, len(tbl)):
98 for t in f:
99 if t.index == i:
100 computed_tbl.append(e.storage.get(t.name))
101
102 # Do some basic validation (which causes the return code of this process
103 # to be different if things were not as expected...)
104 if len(computed_tbl) != len(tbl):
105 return 1
106 else:
107 return 0
108
109
110if __name__ == "__main__":
111 sys.exit(main())
线性方程求解器(显式依赖项)¶
注意
完整源代码位于 calculate_linear。
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15
16# INTRO: In this example a linear flow is used to group four tasks to calculate
17# a value. A single added task is used twice, showing how this can be done
18# and the twice added task takes in different bound values. In the first case
19# it uses default parameters ('x' and 'y') and in the second case arguments
20# are bound with ('z', 'd') keys from the engines internal storage mechanism.
21#
22# A multiplier task uses a binding that another task also provides, but this
23# example explicitly shows that 'z' parameter is bound with 'a' key
24# This shows that if a task depends on a key named the same as a key provided
25# from another task the name can be remapped to take the desired key from a
26# different origin.
27
28
29# This task provides some values from as a result of execution, this can be
30# useful when you want to provide values from a static set to other tasks that
31# depend on those values existing before those tasks can run.
32#
33# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
34# that just provides those values on engine running by prepopulating the
35# storage backend before your tasks are ran (which accomplishes a similar goal
36# in a more uniform manner).
37class Provider(task.Task):
38
39 def __init__(self, name, *args, **kwargs):
40 super().__init__(name=name, **kwargs)
41 self._provide = args
42
43 def execute(self):
44 return self._provide
45
46
47# This task adds two input variables and returns the result.
48#
49# Note that since this task does not have a revert() function (since addition
50# is a stateless operation) there are no side-effects that this function needs
51# to undo if some later operation fails.
52class Adder(task.Task):
53 def execute(self, x, y):
54 return x + y
55
56
57# This task multiplies an input variable by a multiplier and returns the
58# result.
59#
60# Note that since this task does not have a revert() function (since
61# multiplication is a stateless operation) and there are no side-effects that
62# this function needs to undo if some later operation fails.
63class Multiplier(task.Task):
64 def __init__(self, name, multiplier, provides=None, rebind=None):
65 super().__init__(name=name, provides=provides,
66 rebind=rebind)
67 self._multiplier = multiplier
68
69 def execute(self, z):
70 return z * self._multiplier
71
72
73# Note here that the ordering is established so that the correct sequences
74# of operations occurs where the adding and multiplying is done according
75# to the expected and typical mathematical model. A graph flow could also be
76# used here to automatically infer & ensure the correct ordering.
77flow = lf.Flow('root').add(
78 # Provide the initial values for other tasks to depend on.
79 #
80 # x = 2, y = 3, d = 5
81 Provider("provide-adder", 2, 3, 5, provides=('x', 'y', 'd')),
82 # z = x+y = 5
83 Adder("add-1", provides='z'),
84 # a = z+d = 10
85 Adder("add-2", provides='a', rebind=['z', 'd']),
86 # Calculate 'r = a*3 = 30'
87 #
88 # Note here that the 'z' argument of the execute() function will not be
89 # bound to the 'z' variable provided from the above 'provider' object but
90 # instead the 'z' argument will be taken from the 'a' variable provided
91 # by the second add-2 listed above.
92 Multiplier("multi", 3, provides='r', rebind={'z': 'a'})
93)
94
95# The result here will be all results (from all tasks) which is stored in an
96# in-memory storage location that backs this engine since it is not configured
97# with persistence storage.
98results = taskflow.engines.run(flow)
99print(results)
线性方程求解器(推断依赖项)¶
来源: graph_flow.py
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import graph_flow as gf
13from taskflow.patterns import linear_flow as lf
14from taskflow import task
15
16
17# In this example there are complex *inferred* dependencies between tasks that
18# are used to perform a simple set of linear equations.
19#
20# As you will see below the tasks just define what they require as input
21# and produce as output (named values). Then the user doesn't care about
22# ordering the tasks (in this case the tasks calculate pieces of the overall
23# equation).
24#
25# As you will notice a graph flow resolves dependencies automatically using the
26# tasks symbol requirements and provided symbol values and no orderin
27# dependency has to be manually created.
28#
29# Also notice that flows of any types can be nested into a graph flow; showing
30# that subflow dependencies (and associated ordering) will be inferred too.
31
32
33class Adder(task.Task):
34
35 def execute(self, x, y):
36 return x + y
37
38
39flow = gf.Flow('root').add(
40 lf.Flow('nested_linear').add(
41 # x2 = y3+y4 = 12
42 Adder("add2", provides='x2', rebind=['y3', 'y4']),
43 # x1 = y1+y2 = 4
44 Adder("add1", provides='x1', rebind=['y1', 'y2'])
45 ),
46 # x5 = x1+x3 = 20
47 Adder("add5", provides='x5', rebind=['x1', 'x3']),
48 # x3 = x1+x2 = 16
49 Adder("add3", provides='x3', rebind=['x1', 'x2']),
50 # x4 = x2+y5 = 21
51 Adder("add4", provides='x4', rebind=['x2', 'y5']),
52 # x6 = x5+x4 = 41
53 Adder("add6", provides='x6', rebind=['x5', 'x4']),
54 # x7 = x6+x6 = 82
55 Adder("add7", provides='x7', rebind=['x6', 'x6']))
56
57# Provide the initial variable inputs using a storage dictionary.
58store = {
59 "y1": 1,
60 "y2": 3,
61 "y3": 5,
62 "y4": 7,
63 "y5": 9,
64}
65
66# This is the expected values that should be created.
67unexpected = 0
68expected = [
69 ('x1', 4),
70 ('x2', 12),
71 ('x3', 16),
72 ('x4', 21),
73 ('x5', 20),
74 ('x6', 41),
75 ('x7', 82),
76]
77
78result = taskflow.engines.run(
79 flow, engine='serial', store=store)
80
81print("Single threaded engine result %s" % result)
82for (name, value) in expected:
83 actual = result.get(name)
84 if actual != value:
85 sys.stderr.write("{} != {}\n".format(actual, value))
86 unexpected += 1
87
88result = taskflow.engines.run(
89 flow, engine='parallel', store=store)
90
91print("Multi threaded engine result %s" % result)
92for (name, value) in expected:
93 actual = result.get(name)
94 if actual != value:
95 sys.stderr.write("{} != {}\n".format(actual, value))
96 unexpected += 1
97
98if unexpected:
99 sys.exit(1)
线性方程求解器(并行)¶
注意
完整源代码位于 calculate_in_parallel
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow.patterns import unordered_flow as uf
14from taskflow import task
15
16# INTRO: These examples show how a linear flow and an unordered flow can be
17# used together to execute calculations in parallel and then use the
18# result for the next task/s. The adder task is used for all calculations
19# and argument bindings are used to set correct parameters for each task.
20
21
22# This task provides some values from as a result of execution, this can be
23# useful when you want to provide values from a static set to other tasks that
24# depend on those values existing before those tasks can run.
25#
26# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
27# that provides those values on engine running by prepopulating the storage
28# backend before your tasks are ran (which accomplishes a similar goal in a
29# more uniform manner).
30class Provider(task.Task):
31 def __init__(self, name, *args, **kwargs):
32 super().__init__(name=name, **kwargs)
33 self._provide = args
34
35 def execute(self):
36 return self._provide
37
38
39# This task adds two input variables and returns the result of that addition.
40#
41# Note that since this task does not have a revert() function (since addition
42# is a stateless operation) there are no side-effects that this function needs
43# to undo if some later operation fails.
44class Adder(task.Task):
45 def execute(self, x, y):
46 return x + y
47
48
49flow = lf.Flow('root').add(
50 # Provide the initial values for other tasks to depend on.
51 #
52 # x1 = 2, y1 = 3, x2 = 5, x3 = 8
53 Provider("provide-adder", 2, 3, 5, 8,
54 provides=('x1', 'y1', 'x2', 'y2')),
55 # Note here that we define the flow that contains the 2 adders to be an
56 # unordered flow since the order in which these execute does not matter,
57 # another way to solve this would be to use a graph_flow pattern, which
58 # also can run in parallel (since they have no ordering dependencies).
59 uf.Flow('adders').add(
60 # Calculate 'z1 = x1+y1 = 5'
61 #
62 # Rebind here means that the execute() function x argument will be
63 # satisfied from a previous output named 'x1', and the y argument
64 # of execute() will be populated from the previous output named 'y1'
65 #
66 # The output (result of adding) will be mapped into a variable named
67 # 'z1' which can then be refereed to and depended on by other tasks.
68 Adder(name="add", provides='z1', rebind=['x1', 'y1']),
69 # z2 = x2+y2 = 13
70 Adder(name="add-2", provides='z2', rebind=['x2', 'y2']),
71 ),
72 # r = z1+z2 = 18
73 Adder(name="sum-1", provides='r', rebind=['z1', 'z2']))
74
75
76# The result here will be all results (from all tasks) which is stored in an
77# in-memory storage location that backs this engine since it is not configured
78# with persistence storage.
79result = taskflow.engines.run(flow, engine='parallel')
80print(result)
创建卷(并行)¶
注意
完整源代码位于 create_parallel_volume
1import logging
2import os
3import random
4import sys
5import time
6
7logging.basicConfig(level=logging.ERROR)
8
9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10 os.pardir,
11 os.pardir))
12sys.path.insert(0, top_dir)
13
14from oslo_utils import reflection
15
16from taskflow import engines
17from taskflow.listeners import printing
18from taskflow.patterns import unordered_flow as uf
19from taskflow import task
20
21# INTRO: These examples show how unordered_flow can be used to create a large
22# number of fake volumes in parallel (or serially, depending on a constant that
23# can be easily changed).
24
25
26@contextlib.contextmanager
27def show_time(name):
28 start = time.time()
29 yield
30 end = time.time()
31 print(" -- {} took {:0.3f} seconds".format(name, end - start))
32
33
34# This affects how many volumes to create and how much time to *simulate*
35# passing for that volume to be created.
36MAX_CREATE_TIME = 3
37VOLUME_COUNT = 5
38
39# This will be used to determine if all the volumes are created in parallel
40# or whether the volumes are created serially (in an undefined ordered since
41# a unordered flow is used). Note that there is a disconnection between the
42# ordering and the concept of parallelism (since unordered items can still be
43# ran in a serial ordering). A typical use-case for offering both is to allow
44# for debugging using a serial approach, while when running at a larger scale
45# one would likely want to use the parallel approach.
46#
47# If you switch this flag from serial to parallel you can see the overall
48# time difference that this causes.
49SERIAL = False
50if SERIAL:
51 engine = 'serial'
52else:
53 engine = 'parallel'
54
55
56class VolumeCreator(task.Task):
57 def __init__(self, volume_id):
58 # Note here that the volume name is composed of the name of the class
59 # along with the volume id that is being created, since a name of a
60 # task uniquely identifies that task in storage it is important that
61 # the name be relevant and identifiable if the task is recreated for
62 # subsequent resumption (if applicable).
63 #
64 # UUIDs are *not* used as they can not be tied back to a previous tasks
65 # state on resumption (since they are unique and will vary for each
66 # task that is created). A name based off the volume id that is to be
67 # created is more easily tied back to the original task so that the
68 # volume create can be resumed/revert, and is much easier to use for
69 # audit and tracking purposes.
70 base_name = reflection.get_callable_name(self)
71 super().__init__(name="{}-{}".format(base_name, volume_id))
72 self._volume_id = volume_id
73
74 def execute(self):
75 print("Making volume %s" % (self._volume_id))
76 time.sleep(random.random() * MAX_CREATE_TIME)
77 print("Finished making volume %s" % (self._volume_id))
78
79
80# Assume there is no ordering dependency between volumes.
81flow = uf.Flow("volume-maker")
82for i in range(0, VOLUME_COUNT):
83 flow.add(VolumeCreator(volume_id="vol-%s" % (i)))
84
85
86# Show how much time the overall engine loading and running takes.
87with show_time(name=flow.name.title()):
88 eng = engines.load(flow, engine=engine)
89 # This context manager automatically adds (and automatically removes) a
90 # helpful set of state transition notification printing helper utilities
91 # that show you exactly what transitions the engine is going through
92 # while running the various volume create tasks.
93 with printing.PrintingListener(eng):
94 eng.run()
求和映射器和归约器(并行)¶
注意
完整源代码位于 simple_map_reduce
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6self_dir = os.path.abspath(os.path.dirname(__file__))
7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
8 os.pardir,
9 os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13# INTRO: These examples show a simplistic map/reduce implementation where
14# a set of mapper(s) will sum a series of input numbers (in parallel) and
15# return their individual summed result. A reducer will then use those
16# produced values and perform a final summation and this result will then be
17# printed (and verified to ensure the calculation was as expected).
18
19from taskflow import engines
20from taskflow.patterns import linear_flow
21from taskflow.patterns import unordered_flow
22from taskflow import task
23
24
25class SumMapper(task.Task):
26 def execute(self, inputs):
27 # Sums some set of provided inputs.
28 return sum(inputs)
29
30
31class TotalReducer(task.Task):
32 def execute(self, *args, **kwargs):
33 # Reduces all mapped summed outputs into a single value.
34 total = 0
35 for (k, v) in kwargs.items():
36 # If any other kwargs was passed in, we don't want to use those
37 # in the calculation of the total...
38 if k.startswith('reduction_'):
39 total += v
40 return total
41
42
43def chunk_iter(chunk_size, upperbound):
44 """Yields back chunk size pieces from zero to upperbound - 1."""
45 chunk = []
46 for i in range(0, upperbound):
47 chunk.append(i)
48 if len(chunk) == chunk_size:
49 yield chunk
50 chunk = []
51
52
53# Upper bound of numbers to sum for example purposes...
54UPPER_BOUND = 10000
55
56# How many mappers we want to have.
57SPLIT = 10
58
59# How big of a chunk we want to give each mapper.
60CHUNK_SIZE = UPPER_BOUND // SPLIT
61
62# This will be the workflow we will compose and run.
63w = linear_flow.Flow("root")
64
65# The mappers will run in parallel.
66store = {}
67provided = []
68mappers = unordered_flow.Flow('map')
69for i, chunk in enumerate(chunk_iter(CHUNK_SIZE, UPPER_BOUND)):
70 mapper_name = 'mapper_%s' % i
71 # Give that mapper some information to compute.
72 store[mapper_name] = chunk
73 # The reducer uses all of the outputs of the mappers, so it needs
74 # to be recorded that it needs access to them (under a specific name).
75 provided.append("reduction_%s" % i)
76 mappers.add(SumMapper(name=mapper_name,
77 rebind={'inputs': mapper_name},
78 provides=provided[-1]))
79w.add(mappers)
80
81# The reducer will run last (after all the mappers).
82w.add(TotalReducer('reducer', requires=provided))
83
84# Now go!
85e = engines.load(w, engine='parallel', store=store, max_workers=4)
86print("Running a parallel engine with options: %s" % e.options)
87e.run()
88
89# Now get the result the reducer created.
90total = e.storage.get('reducer')
91print("Calculated result = %s" % total)
92
93# Calculate it manually to verify that it worked...
94calc_total = sum(range(0, UPPER_BOUND))
95if calc_total != total:
96 sys.exit(1)
存储和发送账单¶
注意
完整源代码位于 fake_billing
1import logging
2import os
3import sys
4import time
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13from oslo_utils import uuidutils
14
15from taskflow import engines
16from taskflow.listeners import printing
17from taskflow.patterns import graph_flow as gf
18from taskflow.patterns import linear_flow as lf
19from taskflow import task
20from taskflow.utils import misc
21
22# INTRO: This example walks through a miniature workflow which simulates
23# the reception of an API request, creation of a database entry, driver
24# activation (which invokes a 'fake' webservice) and final completion.
25#
26# This example also shows how a function/object (in this class the url sending)
27# that occurs during driver activation can update the progress of a task
28# without being aware of the internals of how to do this by associating a
29# callback that the url sending can update as the sending progresses from 0.0%
30# complete to 100% complete.
31
32
33class DB:
34 def query(self, sql):
35 print("Querying with: %s" % (sql))
36
37
38class UrlCaller:
39 def __init__(self):
40 self._send_time = 0.5
41 self._chunks = 25
42
43 def send(self, url, data, status_cb=None):
44 sleep_time = float(self._send_time) / self._chunks
45 for i in range(0, len(data)):
46 time.sleep(sleep_time)
47 # As we send the data, each chunk we 'fake' send will progress
48 # the sending progress that much further to 100%.
49 if status_cb:
50 status_cb(float(i) / len(data))
51
52
53# Since engines save the output of tasks to a optional persistent storage
54# backend resources have to be dealt with in a slightly different manner since
55# resources are transient and can *not* be persisted (or serialized). For tasks
56# that require access to a set of resources it is a common pattern to provide
57# a object (in this case this object) on construction of those tasks via the
58# task constructor.
59class ResourceFetcher:
60 def __init__(self):
61 self._db_handle = None
62 self._url_handle = None
63
64 @property
65 def db_handle(self):
66 if self._db_handle is None:
67 self._db_handle = DB()
68 return self._db_handle
69
70 @property
71 def url_handle(self):
72 if self._url_handle is None:
73 self._url_handle = UrlCaller()
74 return self._url_handle
75
76
77class ExtractInputRequest(task.Task):
78 def __init__(self, resources):
79 super().__init__(provides="parsed_request")
80 self._resources = resources
81
82 def execute(self, request):
83 return {
84 'user': request.user,
85 'user_id': misc.as_int(request.id),
86 'request_id': uuidutils.generate_uuid(),
87 }
88
89
90class MakeDBEntry(task.Task):
91 def __init__(self, resources):
92 super().__init__()
93 self._resources = resources
94
95 def execute(self, parsed_request):
96 db_handle = self._resources.db_handle
97 db_handle.query("INSERT %s INTO mydb" % (parsed_request))
98
99 def revert(self, result, parsed_request):
100 db_handle = self._resources.db_handle
101 db_handle.query("DELETE %s FROM mydb IF EXISTS" % (parsed_request))
102
103
104class ActivateDriver(task.Task):
105 def __init__(self, resources):
106 super().__init__(provides='sent_to')
107 self._resources = resources
108 self._url = "http://blahblah.com"
109
110 def execute(self, parsed_request):
111 print("Sending billing data to %s" % (self._url))
112 url_sender = self._resources.url_handle
113 # Note that here we attach our update_progress function (which is a
114 # function that the engine also 'binds' to) to the progress function
115 # that the url sending helper class uses. This allows the task progress
116 # to be tied to the url sending progress, which is very useful for
117 # downstream systems to be aware of what a task is doing at any time.
118 url_sender.send(self._url, json.dumps(parsed_request),
119 status_cb=self.update_progress)
120 return self._url
121
122 def update_progress(self, progress, **kwargs):
123 # Override the parent method to also print out the status.
124 super().update_progress(progress, **kwargs)
125 print("{} is {:0.2f}% done".format(self.name, progress * 100))
126
127
128class DeclareSuccess(task.Task):
129 def execute(self, sent_to):
130 print("Done!")
131 print("All data processed and sent to %s" % (sent_to))
132
133
134class DummyUser:
135 def __init__(self, user, id_):
136 self.user = user
137 self.id = id_
138
139
140# Resources (db handles and similar) of course can *not* be persisted so we
141# need to make sure that we pass this resource fetcher to the tasks constructor
142# so that the tasks have access to any needed resources (the resources are
143# lazily loaded so that they are only created when they are used).
144resources = ResourceFetcher()
145flow = lf.Flow("initialize-me")
146
147# 1. First we extract the api request into a usable format.
148# 2. Then we go ahead and make a database entry for our request.
149flow.add(ExtractInputRequest(resources), MakeDBEntry(resources))
150
151# 3. Then we activate our payment method and finally declare success.
152sub_flow = gf.Flow("after-initialize")
153sub_flow.add(ActivateDriver(resources), DeclareSuccess())
154flow.add(sub_flow)
155
156# Initially populate the storage with the following request object,
157# prepopulating this allows the tasks that dependent on the 'request' variable
158# to start processing (in this case this is the ExtractInputRequest task).
159store = {
160 'request': DummyUser(user="bob", id_="1.35"),
161}
162eng = engines.load(flow, engine='serial', store=store)
163
164# This context manager automatically adds (and automatically removes) a
165# helpful set of state transition notification printing helper utilities
166# that show you exactly what transitions the engine is going through
167# while running the various billing related tasks.
168with printing.PrintingListener(eng):
169 eng.run()
暂停工作流并恢复¶
注意
完整源代码位于 resume_from_backend
1import logging
2import os
3import sys
4
5logging.basicConfig(level=logging.ERROR)
6
7self_dir = os.path.abspath(os.path.dirname(__file__))
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12sys.path.insert(0, self_dir)
13
14from oslo_utils import uuidutils
15
16import taskflow.engines
17from taskflow.patterns import linear_flow as lf
18from taskflow.persistence import models
19from taskflow import task
20
21import example_utils as eu # noqa
22
23# INTRO: In this example linear_flow is used to group three tasks, one which
24# will suspend the future work the engine may do. This suspend engine is then
25# discarded and the workflow is reloaded from the persisted data and then the
26# workflow is resumed from where it was suspended. This allows you to see how
27# to start an engine, have a task stop the engine from doing future work (if
28# a multi-threaded engine is being used, then the currently active work is not
29# preempted) and then resume the work later.
30#
31# Usage:
32#
33# With a filesystem directory as backend
34#
35# python taskflow/examples/resume_from_backend.py
36#
37# With ZooKeeper as backend
38#
39# python taskflow/examples/resume_from_backend.py \
40# zookeeper://127.0.0.1:2181/taskflow/resume_from_backend/
41
42
43# UTILITY FUNCTIONS #########################################
44
45
46def print_task_states(flowdetail, msg):
47 eu.print_wrapped(msg)
48 print("Flow '{}' state: {}".format(flowdetail.name, flowdetail.state))
49 # Sort by these so that our test validation doesn't get confused by the
50 # order in which the items in the flow detail can be in.
51 items = sorted((td.name, td.version, td.state, td.results)
52 for td in flowdetail)
53 for item in items:
54 print(" %s==%s: %s, result=%s" % item)
55
56
57def find_flow_detail(backend, lb_id, fd_id):
58 conn = backend.get_connection()
59 lb = conn.get_logbook(lb_id)
60 return lb.find(fd_id)
61
62
63# CREATE FLOW ###############################################
64
65
66class InterruptTask(task.Task):
67 def execute(self):
68 # DO NOT TRY THIS AT HOME
69 engine.suspend()
70
71
72class TestTask(task.Task):
73 def execute(self):
74 print('executing %s' % self)
75 return 'ok'
76
77
78def flow_factory():
79 return lf.Flow('resume from backend example').add(
80 TestTask(name='first'),
81 InterruptTask(name='boom'),
82 TestTask(name='second'))
83
84
85# INITIALIZE PERSISTENCE ####################################
86
87with eu.get_backend() as backend:
88
89 # Create a place where the persistence information will be stored.
90 book = models.LogBook("example")
91 flow_detail = models.FlowDetail("resume from backend example",
92 uuid=uuidutils.generate_uuid())
93 book.add(flow_detail)
94 with contextlib.closing(backend.get_connection()) as conn:
95 conn.save_logbook(book)
96
97 # CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################
98
99 flow = flow_factory()
100 engine = taskflow.engines.load(flow, flow_detail=flow_detail,
101 book=book, backend=backend)
102
103 print_task_states(flow_detail, "At the beginning, there is no state")
104 eu.print_wrapped("Running")
105 engine.run()
106 print_task_states(flow_detail, "After running")
107
108 # RE-CREATE, RESUME, RUN ####################################
109
110 eu.print_wrapped("Resuming and running again")
111
112 # NOTE(harlowja): reload the flow detail from backend, this will allow us
113 # to resume the flow from its suspended state, but first we need to search
114 # for the right flow details in the correct logbook where things are
115 # stored.
116 #
117 # We could avoid re-loading the engine and just do engine.run() again, but
118 # this example shows how another process may unsuspend a given flow and
119 # start it again for situations where this is useful to-do (say the process
120 # running the above flow crashes).
121 flow2 = flow_factory()
122 flow_detail_2 = find_flow_detail(backend, book.uuid, flow_detail.uuid)
123 engine2 = taskflow.engines.load(flow2,
124 flow_detail=flow_detail_2,
125 backend=backend, book=book)
126 engine2.run()
127 print_task_states(flow_detail_2, "At the end")
创建虚拟机(可恢复)¶
注意
完整源代码位于 resume_vm_boot
1import hashlib
2import logging
3import os
4import random
5import sys
6import time
7
8logging.basicConfig(level=logging.ERROR)
9
10self_dir = os.path.abspath(os.path.dirname(__file__))
11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12 os.pardir,
13 os.pardir))
14sys.path.insert(0, top_dir)
15sys.path.insert(0, self_dir)
16
17import futurist
18from oslo_utils import uuidutils
19
20from taskflow import engines
21from taskflow import exceptions as exc
22from taskflow.patterns import graph_flow as gf
23from taskflow.patterns import linear_flow as lf
24from taskflow.persistence import models
25from taskflow import task
26
27import example_utils as eu # noqa
28
29# INTRO: These examples show how a hierarchy of flows can be used to create a
30# vm in a reliable & resumable manner using taskflow + a miniature version of
31# what nova does while booting a vm.
32
33
34@contextlib.contextmanager
35def slow_down(how_long=0.5):
36 try:
37 yield how_long
38 finally:
39 if len(sys.argv) > 1:
40 # Only both to do this if user input provided.
41 print("** Ctrl-c me please!!! **")
42 time.sleep(how_long)
43
44
45class PrintText(task.Task):
46 """Just inserts some text print outs in a workflow."""
47 def __init__(self, print_what, no_slow=False):
48 content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
49 super().__init__(name="Print: %s" % (content_hash))
50 self._text = print_what
51 self._no_slow = no_slow
52
53 def execute(self):
54 if self._no_slow:
55 eu.print_wrapped(self._text)
56 else:
57 with slow_down():
58 eu.print_wrapped(self._text)
59
60
61class DefineVMSpec(task.Task):
62 """Defines a vm specification to be."""
63 def __init__(self, name):
64 super().__init__(provides='vm_spec', name=name)
65
66 def execute(self):
67 return {
68 'type': 'kvm',
69 'disks': 2,
70 'vcpu': 1,
71 'ips': 1,
72 'volumes': 3,
73 }
74
75
76class LocateImages(task.Task):
77 """Locates where the vm images are."""
78 def __init__(self, name):
79 super().__init__(provides='image_locations', name=name)
80
81 def execute(self, vm_spec):
82 image_locations = {}
83 for i in range(0, vm_spec['disks']):
84 url = "http://www.yahoo.com/images/%s" % (i)
85 image_locations[url] = "/tmp/%s.img" % (i)
86 return image_locations
87
88
89class DownloadImages(task.Task):
90 """Downloads all the vm images."""
91 def __init__(self, name):
92 super().__init__(provides='download_paths',
93 name=name)
94
95 def execute(self, image_locations):
96 for src, loc in image_locations.items():
97 with slow_down(1):
98 print("Downloading from {} => {}".format(src, loc))
99 return sorted(image_locations.values())
100
101
102class CreateNetworkTpl(task.Task):
103 """Generates the network settings file to be placed in the images."""
104 SYSCONFIG_CONTENTS = """DEVICE=eth%s
105BOOTPROTO=static
106IPADDR=%s
107ONBOOT=yes"""
108
109 def __init__(self, name):
110 super().__init__(provides='network_settings',
111 name=name)
112
113 def execute(self, ips):
114 settings = []
115 for i, ip in enumerate(ips):
116 settings.append(self.SYSCONFIG_CONTENTS % (i, ip))
117 return settings
118
119
120class AllocateIP(task.Task):
121 """Allocates the ips for the given vm."""
122 def __init__(self, name):
123 super().__init__(provides='ips', name=name)
124
125 def execute(self, vm_spec):
126 ips = []
127 for _i in range(0, vm_spec.get('ips', 0)):
128 ips.append("192.168.0.%s" % (random.randint(1, 254)))
129 return ips
130
131
132class WriteNetworkSettings(task.Task):
133 """Writes all the network settings into the downloaded images."""
134 def execute(self, download_paths, network_settings):
135 for j, path in enumerate(download_paths):
136 with slow_down(1):
137 print("Mounting {} to /tmp/{}".format(path, j))
138 for i, setting in enumerate(network_settings):
139 filename = ("/tmp/etc/sysconfig/network-scripts/"
140 "ifcfg-eth%s" % (i))
141 with slow_down(1):
142 print("Writing to %s" % (filename))
143 print(setting)
144
145
146class BootVM(task.Task):
147 """Fires off the vm boot operation."""
148 def execute(self, vm_spec):
149 print("Starting vm!")
150 with slow_down(1):
151 print("Created: %s" % (vm_spec))
152
153
154class AllocateVolumes(task.Task):
155 """Allocates the volumes for the vm."""
156 def execute(self, vm_spec):
157 volumes = []
158 for i in range(0, vm_spec['volumes']):
159 with slow_down(1):
160 volumes.append("/dev/vda%s" % (i + 1))
161 print("Allocated volume %s" % volumes[-1])
162 return volumes
163
164
165class FormatVolumes(task.Task):
166 """Formats the volumes for the vm."""
167 def execute(self, volumes):
168 for v in volumes:
169 print("Formatting volume %s" % v)
170 with slow_down(1):
171 pass
172 print("Formatted volume %s" % v)
173
174
175def create_flow():
176 # Setup the set of things to do (mini-nova).
177 flow = lf.Flow("root").add(
178 PrintText("Starting vm creation.", no_slow=True),
179 lf.Flow('vm-maker').add(
180 # First create a specification for the final vm to-be.
181 DefineVMSpec("define_spec"),
182 # This does all the image stuff.
183 gf.Flow("img-maker").add(
184 LocateImages("locate_images"),
185 DownloadImages("download_images"),
186 ),
187 # This does all the network stuff.
188 gf.Flow("net-maker").add(
189 AllocateIP("get_my_ips"),
190 CreateNetworkTpl("fetch_net_settings"),
191 WriteNetworkSettings("write_net_settings"),
192 ),
193 # This does all the volume stuff.
194 gf.Flow("volume-maker").add(
195 AllocateVolumes("allocate_my_volumes", provides='volumes'),
196 FormatVolumes("volume_formatter"),
197 ),
198 # Finally boot it all.
199 BootVM("boot-it"),
200 ),
201 # Ya it worked!
202 PrintText("Finished vm create.", no_slow=True),
203 PrintText("Instance is running!", no_slow=True))
204 return flow
205
206eu.print_wrapped("Initializing")
207
208# Setup the persistence & resumption layer.
209with eu.get_backend() as backend:
210
211 # Try to find a previously passed in tracking id...
212 try:
213 book_id, flow_id = sys.argv[2].split("+", 1)
214 if not uuidutils.is_uuid_like(book_id):
215 book_id = None
216 if not uuidutils.is_uuid_like(flow_id):
217 flow_id = None
218 except (IndexError, ValueError):
219 book_id = None
220 flow_id = None
221
222 # Set up how we want our engine to run, serial, parallel...
223 try:
224 executor = futurist.GreenThreadPoolExecutor(max_workers=5)
225 except RuntimeError:
226 # No eventlet installed, just let the default be used instead.
227 executor = None
228
229 # Create/fetch a logbook that will track the workflows work.
230 book = None
231 flow_detail = None
232 if all([book_id, flow_id]):
233 # Try to find in a prior logbook and flow detail...
234 with contextlib.closing(backend.get_connection()) as conn:
235 try:
236 book = conn.get_logbook(book_id)
237 flow_detail = book.find(flow_id)
238 except exc.NotFound:
239 pass
240 if book is None and flow_detail is None:
241 book = models.LogBook("vm-boot")
242 with contextlib.closing(backend.get_connection()) as conn:
243 conn.save_logbook(book)
244 engine = engines.load_from_factory(create_flow,
245 backend=backend, book=book,
246 engine='parallel',
247 executor=executor)
248 print("!! Your tracking id is: '{}+{}'".format(
249 book.uuid, engine.storage.flow_uuid))
250 print("!! Please submit this on later runs for tracking purposes")
251 else:
252 # Attempt to load from a previously partially completed flow.
253 engine = engines.load_from_detail(flow_detail, backend=backend,
254 engine='parallel', executor=executor)
255
256 # Make me my vm please!
257 eu.print_wrapped('Running')
258 engine.run()
259
260# How to use.
261#
262# 1. $ python me.py "sqlite:////tmp/nova.db"
263# 2. ctrl-c before this finishes
264# 3. Find the tracking id (search for 'Your tracking id is')
265# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
266# 5. Watch it pick up where it left off.
267# 6. Profit!
创建卷(可恢复)¶
注意
完整源代码位于 resume_volume_create
1import hashlib
2import logging
3import os
4import random
5import sys
6import time
7
8logging.basicConfig(level=logging.ERROR)
9
10self_dir = os.path.abspath(os.path.dirname(__file__))
11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12 os.pardir,
13 os.pardir))
14sys.path.insert(0, top_dir)
15sys.path.insert(0, self_dir)
16
17from oslo_utils import uuidutils
18
19from taskflow import engines
20from taskflow.patterns import graph_flow as gf
21from taskflow.patterns import linear_flow as lf
22from taskflow.persistence import models
23from taskflow import task
24
25import example_utils # noqa
26
27# INTRO: These examples show how a hierarchy of flows can be used to create a
28# pseudo-volume in a reliable & resumable manner using taskflow + a miniature
29# version of what cinder does while creating a volume (very miniature).
30
31
32@contextlib.contextmanager
33def slow_down(how_long=0.5):
34 try:
35 yield how_long
36 finally:
37 print("** Ctrl-c me please!!! **")
38 time.sleep(how_long)
39
40
41def find_flow_detail(backend, book_id, flow_id):
42 # NOTE(harlowja): this is used to attempt to find a given logbook with
43 # a given id and a given flow details inside that logbook, we need this
44 # reference so that we can resume the correct flow (as a logbook tracks
45 # flows and a flow detail tracks a individual flow).
46 #
47 # Without a reference to the logbook and the flow details in that logbook
48 # we will not know exactly what we should resume and that would mean we
49 # can't resume what we don't know.
50 with contextlib.closing(backend.get_connection()) as conn:
51 lb = conn.get_logbook(book_id)
52 return lb.find(flow_id)
53
54
55class PrintText(task.Task):
56 def __init__(self, print_what, no_slow=False):
57 content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
58 super().__init__(name="Print: %s" % (content_hash))
59 self._text = print_what
60 self._no_slow = no_slow
61
62 def execute(self):
63 if self._no_slow:
64 print("-" * (len(self._text)))
65 print(self._text)
66 print("-" * (len(self._text)))
67 else:
68 with slow_down():
69 print("-" * (len(self._text)))
70 print(self._text)
71 print("-" * (len(self._text)))
72
73
74class CreateSpecForVolumes(task.Task):
75 def execute(self):
76 volumes = []
77 for i in range(0, random.randint(1, 10)):
78 volumes.append({
79 'type': 'disk',
80 'location': "/dev/vda%s" % (i + 1),
81 })
82 return volumes
83
84
85class PrepareVolumes(task.Task):
86 def execute(self, volume_specs):
87 for v in volume_specs:
88 with slow_down():
89 print("Dusting off your hard drive %s" % (v))
90 with slow_down():
91 print("Taking a well deserved break.")
92 print("Your drive %s has been certified." % (v))
93
94
95# Setup the set of things to do (mini-cinder).
96flow = lf.Flow("root").add(
97 PrintText("Starting volume create", no_slow=True),
98 gf.Flow('maker').add(
99 CreateSpecForVolumes("volume_specs", provides='volume_specs'),
100 PrintText("I need a nap, it took me a while to build those specs."),
101 PrepareVolumes(),
102 ),
103 PrintText("Finished volume create", no_slow=True))
104
105# Setup the persistence & resumption layer.
106with example_utils.get_backend() as backend:
107 try:
108 book_id, flow_id = sys.argv[2].split("+", 1)
109 except (IndexError, ValueError):
110 book_id = None
111 flow_id = None
112
113 if not all([book_id, flow_id]):
114 # If no 'tracking id' (think a fedex or ups tracking id) is provided
115 # then we create one by creating a logbook (where flow details are
116 # stored) and creating a flow detail (where flow and task state is
117 # stored). The combination of these 2 objects unique ids (uuids) allows
118 # the users of taskflow to reassociate the workflows that were
119 # potentially running (and which may have partially completed) back
120 # with taskflow so that those workflows can be resumed (or reverted)
121 # after a process/thread/engine has failed in someway.
122 book = models.LogBook('resume-volume-create')
123 flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid())
124 book.add(flow_detail)
125 with contextlib.closing(backend.get_connection()) as conn:
126 conn.save_logbook(book)
127 print("!! Your tracking id is: '{}+{}'".format(book.uuid,
128 flow_detail.uuid))
129 print("!! Please submit this on later runs for tracking purposes")
130 else:
131 flow_detail = find_flow_detail(backend, book_id, flow_id)
132
133 # Load and run.
134 engine = engines.load(flow,
135 flow_detail=flow_detail,
136 backend=backend, engine='serial')
137 engine.run()
138
139# How to use.
140#
141# 1. $ python me.py "sqlite:////tmp/cinder.db"
142# 2. ctrl-c before this finishes
143# 3. Find the tracking id (search for 'Your tracking id is')
144# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
145# 5. Profit!
通过迭代运行引擎¶
注意
完整源代码位于 run_by_iter
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6self_dir = os.path.abspath(os.path.dirname(__file__))
7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
8 os.pardir,
9 os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13
14from taskflow import engines
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18
19# INTRO: This example shows how to run a set of engines at the same time, each
20# running in different engines using a single thread of control to iterate over
21# each engine (which causes that engine to advanced to its next state during
22# each iteration).
23
24
25class EchoTask(task.Task):
26 def execute(self, value):
27 print(value)
28 return chr(ord(value) + 1)
29
30
31def make_alphabet_flow(i):
32 f = lf.Flow("alphabet_%s" % (i))
33 start_value = 'A'
34 end_value = 'Z'
35 curr_value = start_value
36 while ord(curr_value) <= ord(end_value):
37 next_value = chr(ord(curr_value) + 1)
38 if curr_value != end_value:
39 f.add(EchoTask(name="echoer_%s" % curr_value,
40 rebind={'value': curr_value},
41 provides=next_value))
42 else:
43 f.add(EchoTask(name="echoer_%s" % curr_value,
44 rebind={'value': curr_value}))
45 curr_value = next_value
46 return f
47
48
49# Adjust this number to change how many engines/flows run at once.
50flow_count = 1
51flows = []
52for i in range(0, flow_count):
53 f = make_alphabet_flow(i + 1)
54 flows.append(make_alphabet_flow(i + 1))
55engine_iters = []
56for f in flows:
57 e = engines.load(f)
58 e.compile()
59 e.storage.inject({'A': 'A'})
60 e.prepare()
61 engine_iters.append(e.run_iter())
62while engine_iters:
63 for it in list(engine_iters):
64 try:
65 print(next(it))
66 except StopIteration:
67 engine_iters.remove(it)
使用重试控制器控制重试¶
注意
完整源代码位于 retry_flow
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import retry
14from taskflow import task
15
16# INTRO: In this example we create a retry controller that receives a phone
17# directory and tries different phone numbers. The next task tries to call Jim
18# using the given number. If it is not a Jim's number, the task raises an
19# exception and retry controller takes the next number from the phone
20# directory and retries the call.
21#
22# This example shows a basic usage of retry controllers in a flow.
23# Retry controllers allows to revert and retry a failed subflow with new
24# parameters.
25
26
27class CallJim(task.Task):
28 def execute(self, jim_number):
29 print("Calling jim %s." % jim_number)
30 if jim_number != 555:
31 raise Exception("Wrong number!")
32 else:
33 print("Hello Jim!")
34
35 def revert(self, jim_number, **kwargs):
36 print("Wrong number, apologizing.")
37
38
39# Create your flow and associated tasks (the work to be done).
40flow = lf.Flow('retrying-linear',
41 retry=retry.ParameterizedForEach(
42 rebind=['phone_directory'],
43 provides='jim_number')).add(CallJim())
44
45# Now run that flow using the provided initial data (store below).
46taskflow.engines.run(flow, store={'phone_directory': [333, 444, 555, 666]})
分布式执行(简单)¶
注意
完整源代码位于 wbe_simple_linear
1import logging
2import os
3import sys
4import tempfile
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.engines.worker_based import worker
13from taskflow.patterns import linear_flow as lf
14from taskflow.tests import utils
15from taskflow.utils import threading_utils
16
17import example_utils # noqa
18
19# INTRO: This example walks through a miniature workflow which shows how to
20# start up a number of workers (these workers will process task execution and
21# reversion requests using any provided input data) and then use an engine
22# that creates a set of *capable* tasks and flows (the engine can not create
23# tasks that the workers are not able to run, this will end in failure) that
24# those workers will run and then executes that workflow seamlessly using the
25# workers to perform the actual execution.
26#
27# NOTE(harlowja): this example simulates the expected larger number of workers
28# by using a set of threads (which in this example simulate the remote workers
29# that would typically be running on other external machines).
30
31# A filesystem can also be used as the queue transport (useful as simple
32# transport type that does not involve setting up a larger mq system). If this
33# is false then the memory transport is used instead, both work in standalone
34# setups.
35USE_FILESYSTEM = False
36BASE_SHARED_CONF = {
37 'exchange': 'taskflow',
38}
39
40# Until https://github.com/celery/kombu/issues/398 is resolved it is not
41# recommended to run many worker threads in this example due to the types
42# of errors mentioned in that issue.
43MEMORY_WORKERS = 2
44FILE_WORKERS = 1
45WORKER_CONF = {
46 # These are the tasks the worker can execute, they *must* be importable,
47 # typically this list is used to restrict what workers may execute to
48 # a smaller set of *allowed* tasks that are known to be safe (one would
49 # not want to allow all python code to be executed).
50 'tasks': [
51 'taskflow.tests.utils:TaskOneArgOneReturn',
52 'taskflow.tests.utils:TaskMultiArgOneReturn'
53 ],
54}
55
56
57def run(engine_options):
58 flow = lf.Flow('simple-linear').add(
59 utils.TaskOneArgOneReturn(provides='result1'),
60 utils.TaskMultiArgOneReturn(provides='result2')
61 )
62 eng = engines.load(flow,
63 store=dict(x=111, y=222, z=333),
64 engine='worker-based', **engine_options)
65 eng.run()
66 return eng.storage.fetch_all()
67
68
69if __name__ == "__main__":
70 logging.basicConfig(level=logging.ERROR)
71
72 # Setup our transport configuration and merge it into the worker and
73 # engine configuration so that both of those use it correctly.
74 shared_conf = dict(BASE_SHARED_CONF)
75
76 tmp_path = None
77 if USE_FILESYSTEM:
78 worker_count = FILE_WORKERS
79 tmp_path = tempfile.mkdtemp(prefix='wbe-example-')
80 shared_conf.update({
81 'transport': 'filesystem',
82 'transport_options': {
83 'data_folder_in': tmp_path,
84 'data_folder_out': tmp_path,
85 'polling_interval': 0.1,
86 },
87 })
88 else:
89 worker_count = MEMORY_WORKERS
90 shared_conf.update({
91 'transport': 'memory',
92 'transport_options': {
93 'polling_interval': 0.1,
94 },
95 })
96 worker_conf = dict(WORKER_CONF)
97 worker_conf.update(shared_conf)
98 engine_options = dict(shared_conf)
99 workers = []
100 worker_topics = []
101
102 try:
103 # Create a set of workers to simulate actual remote workers.
104 print('Running %s workers.' % (worker_count))
105 for i in range(0, worker_count):
106 worker_conf['topic'] = 'worker-%s' % (i + 1)
107 worker_topics.append(worker_conf['topic'])
108 w = worker.Worker(**worker_conf)
109 runner = threading_utils.daemon_thread(w.run)
110 runner.start()
111 w.wait()
112 workers.append((runner, w.stop))
113
114 # Now use those workers to do something.
115 print('Executing some work.')
116 engine_options['topics'] = worker_topics
117 result = run(engine_options)
118 print('Execution finished.')
119 # This is done so that the test examples can work correctly
120 # even when the keys change order (which will happen in various
121 # python versions).
122 print("Result = %s" % json.dumps(result, sort_keys=True))
123 finally:
124 # And cleanup.
125 print('Stopping workers.')
126 while workers:
127 r, stopper = workers.pop()
128 stopper()
129 r.join()
130 if tmp_path:
131 example_utils.rm_path(tmp_path)
分布式通知(简单)¶
注意
完整源代码位于 wbe_event_sender
1import os
2import string
3import sys
4import time
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.engines.worker_based import worker
13from taskflow.patterns import linear_flow as lf
14from taskflow import task
15from taskflow.types import notifier
16from taskflow.utils import threading_utils
17
18ANY = notifier.Notifier.ANY
19
20# INTRO: These examples show how to use a remote worker's event notification
21# attribute to proxy back task event notifications to the controlling process.
22#
23# In this case a simple set of events is triggered by a worker running a
24# task (simulated to be remote by using a kombu memory transport and threads).
25# Those events that the 'remote worker' produces will then be proxied back to
26# the task that the engine is running 'remotely', and then they will be emitted
27# back to the original callbacks that exist in the originating engine
28# process/thread. This creates a one-way *notification* channel that can
29# transparently be used in-process, outside-of-process using remote workers and
30# so-on that allows tasks to signal to its controlling process some sort of
31# action that has occurred that the task may need to tell others about (for
32# example to trigger some type of response when the task reaches 50% done...).
33
34
35def event_receiver(event_type, details):
36 """This is the callback that (in this example) doesn't do much..."""
37 print("Recieved event '%s'" % event_type)
38 print("Details = %s" % details)
39
40
41class EventReporter(task.Task):
42 """This is the task that will be running 'remotely' (not really remote)."""
43
44 EVENTS = tuple(string.ascii_uppercase)
45 EVENT_DELAY = 0.1
46
47 def execute(self):
48 for i, e in enumerate(self.EVENTS):
49 details = {
50 'leftover': self.EVENTS[i:],
51 }
52 self.notifier.notify(e, details)
53 time.sleep(self.EVENT_DELAY)
54
55
56BASE_SHARED_CONF = {
57 'exchange': 'taskflow',
58 'transport': 'memory',
59 'transport_options': {
60 'polling_interval': 0.1,
61 },
62}
63
64# Until https://github.com/celery/kombu/issues/398 is resolved it is not
65# recommended to run many worker threads in this example due to the types
66# of errors mentioned in that issue.
67MEMORY_WORKERS = 1
68WORKER_CONF = {
69 'tasks': [
70 # Used to locate which tasks we can run (we don't want to allow
71 # arbitrary code/tasks to be ran by any worker since that would
72 # open up a variety of vulnerabilities).
73 '%s:EventReporter' % (__name__),
74 ],
75}
76
77
78def run(engine_options):
79 reporter = EventReporter()
80 reporter.notifier.register(ANY, event_receiver)
81 flow = lf.Flow('event-reporter').add(reporter)
82 eng = engines.load(flow, engine='worker-based', **engine_options)
83 eng.run()
84
85
86if __name__ == "__main__":
87 logging.basicConfig(level=logging.ERROR)
88
89 # Setup our transport configuration and merge it into the worker and
90 # engine configuration so that both of those objects use it correctly.
91 worker_conf = dict(WORKER_CONF)
92 worker_conf.update(BASE_SHARED_CONF)
93 engine_options = dict(BASE_SHARED_CONF)
94 workers = []
95
96 # These topics will be used to request worker information on; those
97 # workers will respond with their capabilities which the executing engine
98 # will use to match pending tasks to a matched worker, this will cause
99 # the task to be sent for execution, and the engine will wait until it
100 # is finished (a response is received) and then the engine will either
101 # continue with other tasks, do some retry/failure resolution logic or
102 # stop (and potentially re-raise the remote workers failure)...
103 worker_topics = []
104
105 try:
106 # Create a set of worker threads to simulate actual remote workers...
107 print('Running %s workers.' % (MEMORY_WORKERS))
108 for i in range(0, MEMORY_WORKERS):
109 # Give each one its own unique topic name so that they can
110 # correctly communicate with the engine (they will all share the
111 # same exchange).
112 worker_conf['topic'] = 'worker-%s' % (i + 1)
113 worker_topics.append(worker_conf['topic'])
114 w = worker.Worker(**worker_conf)
115 runner = threading_utils.daemon_thread(w.run)
116 runner.start()
117 w.wait()
118 workers.append((runner, w.stop))
119
120 # Now use those workers to do something.
121 print('Executing some work.')
122 engine_options['topics'] = worker_topics
123 result = run(engine_options)
124 print('Execution finished.')
125 finally:
126 # And cleanup.
127 print('Stopping workers.')
128 while workers:
129 r, stopper = workers.pop()
130 stopper()
131 r.join()
分布式 Mandelbrot(复杂)¶
注意
完整源代码位于 wbe_mandelbrot
输出¶
代码¶
1import math
2import os
3import sys
4
5top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
6 os.pardir,
7 os.pardir))
8sys.path.insert(0, top_dir)
9
10from taskflow import engines
11from taskflow.engines.worker_based import worker
12from taskflow.patterns import unordered_flow as uf
13from taskflow import task
14from taskflow.utils import threading_utils
15
16# INTRO: This example walks through a workflow that will in parallel compute
17# a mandelbrot result set (using X 'remote' workers) and then combine their
18# results together to form a final mandelbrot fractal image. It shows a usage
19# of taskflow to perform a well-known embarrassingly parallel problem that has
20# the added benefit of also being an elegant visualization.
21#
22# NOTE(harlowja): this example simulates the expected larger number of workers
23# by using a set of threads (which in this example simulate the remote workers
24# that would typically be running on other external machines).
25#
26# NOTE(harlowja): to have it produce an image run (after installing pillow):
27#
28# $ python taskflow/examples/wbe_mandelbrot.py output.png
29
30BASE_SHARED_CONF = {
31 'exchange': 'taskflow',
32}
33WORKERS = 2
34WORKER_CONF = {
35 # These are the tasks the worker can execute, they *must* be importable,
36 # typically this list is used to restrict what workers may execute to
37 # a smaller set of *allowed* tasks that are known to be safe (one would
38 # not want to allow all python code to be executed).
39 'tasks': [
40 '%s:MandelCalculator' % (__name__),
41 ],
42}
43ENGINE_CONF = {
44 'engine': 'worker-based',
45}
46
47# Mandelbrot & image settings...
48IMAGE_SIZE = (512, 512)
49CHUNK_COUNT = 8
50MAX_ITERATIONS = 25
51
52
53class MandelCalculator(task.Task):
54 def execute(self, image_config, mandelbrot_config, chunk):
55 """Returns the number of iterations before the computation "escapes".
56
57 Given the real and imaginary parts of a complex number, determine if it
58 is a candidate for membership in the mandelbrot set given a fixed
59 number of iterations.
60 """
61
62 # Parts borrowed from (credit to mark harris and benoît mandelbrot).
63 #
64 # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
65 def mandelbrot(x, y, max_iters):
66 c = complex(x, y)
67 z = 0.0j
68 for i in range(max_iters):
69 z = z * z + c
70 if (z.real * z.real + z.imag * z.imag) >= 4:
71 return i
72 return max_iters
73
74 min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
75 height, width = image_config['size']
76 pixel_size_x = (max_x - min_x) / width
77 pixel_size_y = (max_y - min_y) / height
78 block = []
79 for y in range(chunk[0], chunk[1]):
80 row = []
81 imag = min_y + y * pixel_size_y
82 for x in range(0, width):
83 real = min_x + x * pixel_size_x
84 row.append(mandelbrot(real, imag, max_iters))
85 block.append(row)
86 return block
87
88
89def calculate(engine_conf):
90 # Subdivide the work into X pieces, then request each worker to calculate
91 # one of those chunks and then later we will write these chunks out to
92 # an image bitmap file.
93
94 # And unordered flow is used here since the mandelbrot calculation is an
95 # example of an embarrassingly parallel computation that we can scatter
96 # across as many workers as possible.
97 flow = uf.Flow("mandelbrot")
98
99 # These symbols will be automatically given to tasks as input to their
100 # execute method, in this case these are constants used in the mandelbrot
101 # calculation.
102 store = {
103 'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
104 'image_config': {
105 'size': IMAGE_SIZE,
106 }
107 }
108
109 # We need the task names to be in the right order so that we can extract
110 # the final results in the right order (we don't care about the order when
111 # executing).
112 task_names = []
113
114 # Compose our workflow.
115 height, _width = IMAGE_SIZE
116 chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
117 for i in range(0, CHUNK_COUNT):
118 chunk_name = 'chunk_%s' % i
119 task_name = "calculation_%s" % i
120 # Break the calculation up into chunk size pieces.
121 rows = [i * chunk_size, i * chunk_size + chunk_size]
122 flow.add(
123 MandelCalculator(task_name,
124 # This ensures the storage symbol with name
125 # 'chunk_name' is sent into the tasks local
126 # symbol 'chunk'. This is how we give each
127 # calculator its own correct sequence of rows
128 # to work on.
129 rebind={'chunk': chunk_name}))
130 store[chunk_name] = rows
131 task_names.append(task_name)
132
133 # Now execute it.
134 eng = engines.load(flow, store=store, engine_conf=engine_conf)
135 eng.run()
136
137 # Gather all the results and order them for further processing.
138 gather = []
139 for name in task_names:
140 gather.extend(eng.storage.get(name))
141 points = []
142 for y, row in enumerate(gather):
143 for x, color in enumerate(row):
144 points.append(((x, y), color))
145 return points
146
147
148def write_image(results, output_filename=None):
149 print("Gathered %s results that represents a mandelbrot"
150 " image (using %s chunks that are computed jointly"
151 " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
152 if not output_filename:
153 return
154
155 # Pillow (the PIL fork) saves us from writing our own image writer...
156 try:
157 from PIL import Image
158 except ImportError as e:
159 # To currently get this (may change in the future),
160 # $ pip install Pillow
161 raise RuntimeError("Pillow is required to write image files: %s" % e)
162
163 # Limit to 255, find the max and normalize to that...
164 color_max = 0
165 for _point, color in results:
166 color_max = max(color, color_max)
167
168 # Use gray scale since we don't really have other colors.
169 img = Image.new('L', IMAGE_SIZE, "black")
170 pixels = img.load()
171 for (x, y), color in results:
172 if color_max == 0:
173 color = 0
174 else:
175 color = int((float(color) / color_max) * 255.0)
176 pixels[x, y] = color
177 img.save(output_filename)
178
179
180def create_fractal():
181 logging.basicConfig(level=logging.ERROR)
182
183 # Setup our transport configuration and merge it into the worker and
184 # engine configuration so that both of those use it correctly.
185 shared_conf = dict(BASE_SHARED_CONF)
186 shared_conf.update({
187 'transport': 'memory',
188 'transport_options': {
189 'polling_interval': 0.1,
190 },
191 })
192
193 if len(sys.argv) >= 2:
194 output_filename = sys.argv[1]
195 else:
196 output_filename = None
197
198 worker_conf = dict(WORKER_CONF)
199 worker_conf.update(shared_conf)
200 engine_conf = dict(ENGINE_CONF)
201 engine_conf.update(shared_conf)
202 workers = []
203 worker_topics = []
204
205 print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
206 try:
207 # Create a set of workers to simulate actual remote workers.
208 print('Running %s workers.' % (WORKERS))
209 for i in range(0, WORKERS):
210 worker_conf['topic'] = 'calculator_%s' % (i + 1)
211 worker_topics.append(worker_conf['topic'])
212 w = worker.Worker(**worker_conf)
213 runner = threading_utils.daemon_thread(w.run)
214 runner.start()
215 w.wait()
216 workers.append((runner, w.stop))
217
218 # Now use those workers to do something.
219 engine_conf['topics'] = worker_topics
220 results = calculate(engine_conf)
221 print('Execution finished.')
222 finally:
223 # And cleanup.
224 print('Stopping workers.')
225 while workers:
226 r, stopper = workers.pop()
227 stopper()
228 r.join()
229 print("Writing image...")
230 write_image(results, output_filename=output_filename)
231
232
233if __name__ == "__main__":
234 create_fractal()
作业板生产者/消费者(简单)¶
注意
完整源代码位于 jobboard_produce_consume_colors
1import contextlib
2import logging
3import os
4import random
5import sys
6import threading
7import time
8
9logging.basicConfig(level=logging.ERROR)
10
11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12 os.pardir,
13 os.pardir))
14sys.path.insert(0, top_dir)
15
16from zake import fake_client
17
18from taskflow import exceptions as excp
19from taskflow.jobs import backends
20from taskflow.utils import threading_utils
21
22# In this example we show how a jobboard can be used to post work for other
23# entities to work on. This example creates a set of jobs using one producer
24# thread (typically this would be split across many machines) and then having
25# other worker threads with their own jobboards select work using a given
26# filters [red/blue] and then perform that work (and consuming or abandoning
27# the job after it has been completed or failed).
28
29# Things to note:
30# - No persistence layer is used (or logbook), just the job details are used
31# to determine if a job should be selected by a worker or not.
32# - This example runs in a single process (this is expected to be atypical
33# but this example shows that it can be done if needed, for testing...)
34# - The iterjobs(), claim(), consume()/abandon() worker workflow.
35# - The post() producer workflow.
36
37SHARED_CONF = {
38 'path': "/taskflow/jobs",
39 'board': 'zookeeper',
40}
41
42# How many workers and producers of work will be created (as threads).
43PRODUCERS = 3
44WORKERS = 5
45
46# How many units of work each producer will create.
47PRODUCER_UNITS = 10
48
49# How many units of work are expected to be produced (used so workers can
50# know when to stop running and shutdown, typically this would not be a
51# a value but we have to limit this example's execution time to be less than
52# infinity).
53EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS
54
55# Delay between producing/consuming more work.
56WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5)
57
58# To ensure threads don't trample other threads output.
59STDOUT_LOCK = threading.Lock()
60
61
62def dispatch_work(job):
63 # This is where the jobs contained work *would* be done
64 time.sleep(1.0)
65
66
67def safe_print(name, message, prefix=""):
68 with STDOUT_LOCK:
69 if prefix:
70 print("{} {}: {}".format(prefix, name, message))
71 else:
72 print("{}: {}".format(name, message))
73
74
75def worker(ident, client, consumed):
76 # Create a personal board (using the same client so that it works in
77 # the same process) and start looking for jobs on the board that we want
78 # to perform.
79 name = "W-%s" % (ident)
80 safe_print(name, "started")
81 claimed_jobs = 0
82 consumed_jobs = 0
83 abandoned_jobs = 0
84 with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
85 while len(consumed) != EXPECTED_UNITS:
86 favorite_color = random.choice(['blue', 'red'])
87 for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True):
88 # See if we should even bother with it...
89 if job.details.get('color') != favorite_color:
90 continue
91 safe_print(name, "'%s' [attempting claim]" % (job))
92 try:
93 board.claim(job, name)
94 claimed_jobs += 1
95 safe_print(name, "'%s' [claimed]" % (job))
96 except (excp.NotFound, excp.UnclaimableJob):
97 safe_print(name, "'%s' [claim unsuccessful]" % (job))
98 else:
99 try:
100 dispatch_work(job)
101 board.consume(job, name)
102 safe_print(name, "'%s' [consumed]" % (job))
103 consumed_jobs += 1
104 consumed.append(job)
105 except Exception:
106 board.abandon(job, name)
107 abandoned_jobs += 1
108 safe_print(name, "'%s' [abandoned]" % (job))
109 time.sleep(WORKER_DELAY)
110 safe_print(name,
111 "finished (claimed %s jobs, consumed %s jobs,"
112 " abandoned %s jobs)" % (claimed_jobs, consumed_jobs,
113 abandoned_jobs), prefix=">>>")
114
115
116def producer(ident, client):
117 # Create a personal board (using the same client so that it works in
118 # the same process) and start posting jobs on the board that we want
119 # some entity to perform.
120 name = "P-%s" % (ident)
121 safe_print(name, "started")
122 with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
123 for i in range(0, PRODUCER_UNITS):
124 job_name = "{}-{}".format(name, i)
125 details = {
126 'color': random.choice(['red', 'blue']),
127 }
128 job = board.post(job_name, book=None, details=details)
129 safe_print(name, "'%s' [posted]" % (job))
130 time.sleep(PRODUCER_DELAY)
131 safe_print(name, "finished", prefix=">>>")
132
133
134def main():
135 # TODO(harlowja): Hack to make eventlet work right, remove when the
136 # following is fixed: https://github.com/eventlet/eventlet/issues/230
137 from taskflow.utils import eventlet_utils as _eu # noqa
138 try:
139 import eventlet as _eventlet # noqa
140 except ImportError:
141 pass
142
143 with contextlib.closing(fake_client.FakeClient()) as c:
144 created = []
145 for i in range(0, PRODUCERS):
146 p = threading_utils.daemon_thread(producer, i + 1, c)
147 created.append(p)
148 p.start()
149 consumed = collections.deque()
150 for i in range(0, WORKERS):
151 w = threading_utils.daemon_thread(worker, i + 1, c, consumed)
152 created.append(w)
153 w.start()
154 while created:
155 t = created.pop()
156 t.join()
157 # At the end there should be nothing leftover, let's verify that.
158 board = backends.fetch('verifier', SHARED_CONF.copy(), client=c)
159 board.connect()
160 with contextlib.closing(board):
161 if board.job_count != 0 or len(consumed) != EXPECTED_UNITS:
162 return 1
163 return 0
164
165
166if __name__ == "__main__":
167 sys.exit(main())
Conductor 模拟 CI 管道¶
注意
完整源代码位于 tox_conductor
1import itertools
2import logging
3import os
4import shutil
5import socket
6import sys
7import tempfile
8import threading
9import time
10
11logging.basicConfig(level=logging.ERROR)
12
13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
14 os.pardir,
15 os.pardir))
16sys.path.insert(0, top_dir)
17
18from oslo_utils import timeutils
19from oslo_utils import uuidutils
20from zake import fake_client
21
22from taskflow.conductors import backends as conductors
23from taskflow import engines
24from taskflow.jobs import backends as boards
25from taskflow.patterns import linear_flow
26from taskflow.persistence import backends as persistence
27from taskflow.persistence import models
28from taskflow import task
29from taskflow.utils import threading_utils
30
31# INTRO: This examples shows how a worker/producer can post desired work (jobs)
32# to a jobboard and a conductor can consume that work (jobs) from that jobboard
33# and execute those jobs in a reliable & async manner (for example, if the
34# conductor were to crash then the job will be released back onto the jobboard
35# and another conductor can attempt to finish it, from wherever that job last
36# left off).
37#
38# In this example a in-memory jobboard (and in-memory storage) is created and
39# used that simulates how this would be done at a larger scale (it is an
40# example after all).
41
42# Restrict how long this example runs for...
43RUN_TIME = 5
44REVIEW_CREATION_DELAY = 0.5
45SCAN_DELAY = 0.1
46NAME = "{}_{}".format(socket.getfqdn(), os.getpid())
47
48# This won't really use zookeeper but will use a local version of it using
49# the zake library that mimics an actual zookeeper cluster using threads and
50# an in-memory data structure.
51JOBBOARD_CONF = {
52 'board': 'zookeeper://?path=/taskflow/tox/jobs',
53}
54
55
56class RunReview(task.Task):
57 # A dummy task that clones the review and runs tox...
58
59 def _clone_review(self, review, temp_dir):
60 print("Cloning review '{}' into {}".format(review['id'], temp_dir))
61
62 def _run_tox(self, temp_dir):
63 print("Running tox in %s" % temp_dir)
64
65 def execute(self, review, temp_dir):
66 self._clone_review(review, temp_dir)
67 self._run_tox(temp_dir)
68
69
70class MakeTempDir(task.Task):
71 # A task that creates and destroys a temporary dir (on failure).
72 #
73 # It provides the location of the temporary dir for other tasks to use
74 # as they see fit.
75
76 default_provides = 'temp_dir'
77
78 def execute(self):
79 return tempfile.mkdtemp()
80
81 def revert(self, *args, **kwargs):
82 temp_dir = kwargs.get(task.REVERT_RESULT)
83 if temp_dir:
84 shutil.rmtree(temp_dir)
85
86
87class CleanResources(task.Task):
88 # A task that cleans up any workflow resources.
89
90 def execute(self, temp_dir):
91 print("Removing %s" % temp_dir)
92 shutil.rmtree(temp_dir)
93
94
95def review_iter():
96 """Makes reviews (never-ending iterator/generator)."""
97 review_id_gen = itertools.count(0)
98 while True:
99 review_id = next(review_id_gen)
100 review = {
101 'id': review_id,
102 }
103 yield review
104
105
106# The reason this is at the module namespace level is important, since it must
107# be accessible from a conductor dispatching an engine, if it was a lambda
108# function for example, it would not be reimportable and the conductor would
109# be unable to reference it when creating the workflow to run.
110def create_review_workflow():
111 """Factory method used to create a review workflow to run."""
112 f = linear_flow.Flow("tester")
113 f.add(
114 MakeTempDir(name="maker"),
115 RunReview(name="runner"),
116 CleanResources(name="cleaner")
117 )
118 return f
119
120
121def generate_reviewer(client, saver, name=NAME):
122 """Creates a review producer thread with the given name prefix."""
123 real_name = "%s_reviewer" % name
124 no_more = threading.Event()
125 jb = boards.fetch(real_name, JOBBOARD_CONF,
126 client=client, persistence=saver)
127
128 def make_save_book(saver, review_id):
129 # Record what we want to happen (sometime in the future).
130 book = models.LogBook("book_%s" % review_id)
131 detail = models.FlowDetail("flow_%s" % review_id,
132 uuidutils.generate_uuid())
133 book.add(detail)
134 # Associate the factory method we want to be called (in the future)
135 # with the book, so that the conductor will be able to call into
136 # that factory to retrieve the workflow objects that represent the
137 # work.
138 #
139 # These args and kwargs *can* be used to save any specific parameters
140 # into the factory when it is being called to create the workflow
141 # objects (typically used to tell a factory how to create a unique
142 # workflow that represents this review).
143 factory_args = ()
144 factory_kwargs = {}
145 engines.save_factory_details(detail, create_review_workflow,
146 factory_args, factory_kwargs)
147 with contextlib.closing(saver.get_connection()) as conn:
148 conn.save_logbook(book)
149 return book
150
151 def run():
152 """Periodically publishes 'fake' reviews to analyze."""
153 jb.connect()
154 review_generator = review_iter()
155 with contextlib.closing(jb):
156 while not no_more.is_set():
157 review = next(review_generator)
158 details = {
159 'store': {
160 'review': review,
161 },
162 }
163 job_name = "{}_{}".format(real_name, review['id'])
164 print("Posting review '%s'" % review['id'])
165 jb.post(job_name,
166 book=make_save_book(saver, review['id']),
167 details=details)
168 time.sleep(REVIEW_CREATION_DELAY)
169
170 # Return the unstarted thread, and a callback that can be used
171 # shutdown that thread (to avoid running forever).
172 return (threading_utils.daemon_thread(target=run), no_more.set)
173
174
175def generate_conductor(client, saver, name=NAME):
176 """Creates a conductor thread with the given name prefix."""
177 real_name = "%s_conductor" % name
178 jb = boards.fetch(name, JOBBOARD_CONF,
179 client=client, persistence=saver)
180 conductor = conductors.fetch("blocking", real_name, jb,
181 engine='parallel', wait_timeout=SCAN_DELAY)
182
183 def run():
184 jb.connect()
185 with contextlib.closing(jb):
186 conductor.run()
187
188 # Return the unstarted thread, and a callback that can be used
189 # shutdown that thread (to avoid running forever).
190 return (threading_utils.daemon_thread(target=run), conductor.stop)
191
192
193def main():
194 # Need to share the same backend, so that data can be shared...
195 persistence_conf = {
196 'connection': 'memory',
197 }
198 saver = persistence.fetch(persistence_conf)
199 with contextlib.closing(saver.get_connection()) as conn:
200 # This ensures that the needed backend setup/data directories/schema
201 # upgrades and so on... exist before they are attempted to be used...
202 conn.upgrade()
203 fc1 = fake_client.FakeClient()
204 # Done like this to share the same client storage location so the correct
205 # zookeeper features work across clients...
206 fc2 = fake_client.FakeClient(storage=fc1.storage)
207 entities = [
208 generate_reviewer(fc1, saver),
209 generate_conductor(fc2, saver),
210 ]
211 for t, stopper in entities:
212 t.start()
213 try:
214 watch = timeutils.StopWatch(duration=RUN_TIME)
215 watch.start()
216 while not watch.expired():
217 time.sleep(0.1)
218 finally:
219 for t, stopper in reversed(entities):
220 stopper()
221 t.join()
222
223
224if __name__ == '__main__':
225 main()
Conductor 运行 99 瓶啤酒歌曲请求¶
注意
完整源代码位于 99_bottles
1import functools
2import logging
3import os
4import sys
5import time
6import traceback
7
8from kazoo import client
9
10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11 os.pardir,
12 os.pardir))
13sys.path.insert(0, top_dir)
14
15from taskflow.conductors import backends as conductor_backends
16from taskflow import engines
17from taskflow.jobs import backends as job_backends
18from taskflow import logging as taskflow_logging
19from taskflow.patterns import linear_flow as lf
20from taskflow.persistence import backends as persistence_backends
21from taskflow.persistence import models
22from taskflow import task
23
24from oslo_utils import timeutils
25from oslo_utils import uuidutils
26
27# Instructions!
28#
29# 1. Install zookeeper (or change host listed below)
30# 2. Download this example, place in file '99_bottles.py'
31# 3. Run `python 99_bottles.py p` to place a song request onto the jobboard
32# 4. Run `python 99_bottles.py c` a few times (in different shells)
33# 5. On demand kill previously listed processes created in (4) and watch
34# the work resume on another process (and repeat)
35# 6. Keep enough workers alive to eventually finish the song (if desired).
36
37ME = os.getpid()
38ZK_HOST = "localhost:2181"
39JB_CONF = {
40 'hosts': ZK_HOST,
41 'board': 'zookeeper',
42 'path': '/taskflow/99-bottles-demo',
43}
44PERSISTENCE_URI = r"sqlite:////tmp/bottles.db"
45TAKE_DOWN_DELAY = 1.0
46PASS_AROUND_DELAY = 3.0
47HOW_MANY_BOTTLES = 99
48
49
50class TakeABottleDown(task.Task):
51 def execute(self, bottles_left):
52 sys.stdout.write('Take one down, ')
53 sys.stdout.flush()
54 time.sleep(TAKE_DOWN_DELAY)
55 return bottles_left - 1
56
57
58class PassItAround(task.Task):
59 def execute(self):
60 sys.stdout.write('pass it around, ')
61 sys.stdout.flush()
62 time.sleep(PASS_AROUND_DELAY)
63
64
65class Conclusion(task.Task):
66 def execute(self, bottles_left):
67 sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left)
68 sys.stdout.flush()
69
70
71def make_bottles(count):
72 # This is the function that will be called to generate the workflow
73 # and will also be called to regenerate it on resumption so that work
74 # can continue from where it last left off...
75
76 s = lf.Flow("bottle-song")
77
78 take_bottle = TakeABottleDown("take-bottle-%s" % count,
79 inject={'bottles_left': count},
80 provides='bottles_left')
81 pass_it = PassItAround("pass-%s-around" % count)
82 next_bottles = Conclusion("next-bottles-%s" % (count - 1))
83 s.add(take_bottle, pass_it, next_bottles)
84
85 for bottle in reversed(list(range(1, count))):
86 take_bottle = TakeABottleDown("take-bottle-%s" % bottle,
87 provides='bottles_left')
88 pass_it = PassItAround("pass-%s-around" % bottle)
89 next_bottles = Conclusion("next-bottles-%s" % (bottle - 1))
90 s.add(take_bottle, pass_it, next_bottles)
91
92 return s
93
94
95def run_conductor(only_run_once=False):
96 # This continuously runs consumers until its stopped via ctrl-c or other
97 # kill signal...
98 event_watches = {}
99
100 # This will be triggered by the conductor doing various activities
101 # with engines, and is quite nice to be able to see the various timing
102 # segments (which is useful for debugging, or watching, or figuring out
103 # where to optimize).
104 def on_conductor_event(cond, event, details):
105 print("Event '%s' has been received..." % event)
106 print("Details = %s" % details)
107 if event.endswith("_start"):
108 w = timeutils.StopWatch()
109 w.start()
110 base_event = event[0:-len("_start")]
111 event_watches[base_event] = w
112 if event.endswith("_end"):
113 base_event = event[0:-len("_end")]
114 try:
115 w = event_watches.pop(base_event)
116 w.stop()
117 print("It took %0.3f seconds for event '%s' to finish"
118 % (w.elapsed(), base_event))
119 except KeyError:
120 pass
121 if event == 'running_end' and only_run_once:
122 cond.stop()
123
124 print("Starting conductor with pid: %s" % ME)
125 my_name = "conductor-%s" % ME
126 persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
127 with contextlib.closing(persist_backend):
128 with contextlib.closing(persist_backend.get_connection()) as conn:
129 conn.upgrade()
130 job_backend = job_backends.fetch(my_name, JB_CONF,
131 persistence=persist_backend)
132 job_backend.connect()
133 with contextlib.closing(job_backend):
134 cond = conductor_backends.fetch('blocking', my_name, job_backend,
135 persistence=persist_backend)
136 on_conductor_event = functools.partial(on_conductor_event, cond)
137 cond.notifier.register(cond.notifier.ANY, on_conductor_event)
138 # Run forever, and kill -9 or ctrl-c me...
139 try:
140 cond.run()
141 finally:
142 cond.stop()
143 cond.wait()
144
145
146def run_poster():
147 # This just posts a single job and then ends...
148 print("Starting poster with pid: %s" % ME)
149 my_name = "poster-%s" % ME
150 persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
151 with contextlib.closing(persist_backend):
152 with contextlib.closing(persist_backend.get_connection()) as conn:
153 conn.upgrade()
154 job_backend = job_backends.fetch(my_name, JB_CONF,
155 persistence=persist_backend)
156 job_backend.connect()
157 with contextlib.closing(job_backend):
158 # Create information in the persistence backend about the
159 # unit of work we want to complete and the factory that
160 # can be called to create the tasks that the work unit needs
161 # to be done.
162 lb = models.LogBook("post-from-%s" % my_name)
163 fd = models.FlowDetail("song-from-%s" % my_name,
164 uuidutils.generate_uuid())
165 lb.add(fd)
166 with contextlib.closing(persist_backend.get_connection()) as conn:
167 conn.save_logbook(lb)
168 engines.save_factory_details(fd, make_bottles,
169 [HOW_MANY_BOTTLES], {},
170 backend=persist_backend)
171 # Post, and be done with it!
172 jb = job_backend.post("song-from-%s" % my_name, book=lb)
173 print("Posted: %s" % jb)
174 print("Goodbye...")
175
176
177def main_local():
178 # Run locally typically this is activating during unit testing when all
179 # the examples are made sure to still function correctly...
180 global TAKE_DOWN_DELAY
181 global PASS_AROUND_DELAY
182 global JB_CONF
183 # Make everything go much faster (so that this finishes quickly).
184 PASS_AROUND_DELAY = 0.01
185 TAKE_DOWN_DELAY = 0.01
186 JB_CONF['path'] = JB_CONF['path'] + "-" + uuidutils.generate_uuid()
187 run_poster()
188 run_conductor(only_run_once=True)
189
190
191def check_for_zookeeper(timeout=1):
192 sys.stderr.write("Testing for the existence of a zookeeper server...\n")
193 sys.stderr.write("Please wait....\n")
194 with contextlib.closing(client.KazooClient()) as test_client:
195 try:
196 test_client.start(timeout=timeout)
197 except test_client.handler.timeout_exception:
198 sys.stderr.write("Zookeeper is needed for running this example!\n")
199 traceback.print_exc()
200 return False
201 else:
202 test_client.stop()
203 return True
204
205
206def main():
207 if not check_for_zookeeper():
208 return
209 if len(sys.argv) == 1:
210 main_local()
211 elif sys.argv[1] in ('p', 'c'):
212 if sys.argv[-1] == "v":
213 logging.basicConfig(level=taskflow_logging.TRACE)
214 else:
215 logging.basicConfig(level=logging.ERROR)
216 if sys.argv[1] == 'p':
217 run_poster()
218 else:
219 run_conductor()
220 else:
221 sys.stderr.write("%s p|c (v?)\n" % os.path.basename(sys.argv[0]))
222
223
224if __name__ == '__main__':
225 main()