示例

在开发 TaskFlow 的过程中,团队努力确保各种概念都通过相关示例进行解释。以下是一些精选的示例,供您入门(按感知复杂度排序)

要探索更多示例,请查看 TaskFlow 示例 目录中的 源代码树

注意

如果提供的示例不能令人满意(或未达到您的标准),欢迎并非常感谢您贡献以帮助改进它们。示例的质量越高、越清晰,对每个人就越好、越有用。

Hello world

注意

完整源代码位于 hello_world

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.patterns import linear_flow as lf
13from taskflow.patterns import unordered_flow as uf
14from taskflow import task
15
16
17# INTRO: This is the defacto hello world equivalent for taskflow; it shows how
18# an overly simplistic workflow can be created that runs using different
19# engines using different styles of execution (all can be used to run in
20# parallel if a workflow is provided that is parallelizable).
21
22class PrinterTask(task.Task):
23    def __init__(self, name, show_name=True, inject=None):
24        super().__init__(name, inject=inject)
25        self._show_name = show_name
26
27    def execute(self, output):
28        if self._show_name:
29            print("{}: {}".format(self.name, output))
30        else:
31            print(output)
32
33
34# This will be the work that we want done, which for this example is just to
35# print 'hello world' (like a song) using different tasks and different
36# execution models.
37song = lf.Flow("beats")
38
39# Unordered flows when ran can be ran in parallel; and a chorus is everyone
40# singing at once of course!
41hi_chorus = uf.Flow('hello')
42world_chorus = uf.Flow('world')
43for (name, hello, world) in [('bob', 'hello', 'world'),
44                             ('joe', 'hellooo', 'worllllld'),
45                             ('sue', "helloooooo!", 'wooorllld!')]:
46    hi_chorus.add(PrinterTask("%s@hello" % name,
47                              # This will show up to the execute() method of
48                              # the task as the argument named 'output' (which
49                              # will allow us to print the character we want).
50                              inject={'output': hello}))
51    world_chorus.add(PrinterTask("%s@world" % name,
52                                 inject={'output': world}))
53
54# The composition starts with the conductor and then runs in sequence with
55# the chorus running in parallel, but no matter what the 'hello' chorus must
56# always run before the 'world' chorus (otherwise the world will fall apart).
57song.add(PrinterTask("conductor@begin",
58                     show_name=False, inject={'output': "*ding*"}),
59         hi_chorus,
60         world_chorus,
61         PrinterTask("conductor@end",
62                     show_name=False, inject={'output': "*dong*"}))
63
64# Run in parallel using eventlet green threads...
65try:
66    import eventlet as _eventlet  # noqa
67except ImportError:
68    # No eventlet currently active, skip running with it...
69    pass
70else:
71    print("-- Running in parallel using eventlet --")
72    e = engines.load(song, executor='greenthreaded', engine='parallel',
73                     max_workers=1)
74    e.run()
75
76
77# Run in parallel using real threads...
78print("-- Running in parallel using threads --")
79e = engines.load(song, executor='threaded', engine='parallel',
80                 max_workers=1)
81e.run()
82
83
84# Run in parallel using external processes...
85print("-- Running in parallel using processes --")
86e = engines.load(song, executor='processes', engine='parallel',
87                 max_workers=1)
88e.run()
89
90
91# Run serially (aka, if the workflow could have been ran in parallel, it will
92# not be when ran in this mode)...
93print("-- Running serially --")
94e = engines.load(song, engine='serial')
95e.run()
96print("-- Statistics gathered --")
97print(e.statistics)

在任务之间传递值

注意

完整源代码位于 simple_linear_pass

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6self_dir = os.path.abspath(os.path.dirname(__file__))
 7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 8                                       os.pardir,
 9                                       os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13from taskflow import engines
14from taskflow.patterns import linear_flow
15from taskflow import task
16
17# INTRO: This example shows how a task (in a linear/serial workflow) can
18# produce an output that can be then consumed/used by a downstream task.
19
20
21class TaskA(task.Task):
22    default_provides = 'a'
23
24    def execute(self):
25        print("Executing '%s'" % (self.name))
26        return 'a'
27
28
29class TaskB(task.Task):
30    def execute(self, a):
31        print("Executing '%s'" % (self.name))
32        print("Got input '%s'" % (a))
33
34
35print("Constructing...")
36wf = linear_flow.Flow("pass-from-to")
37wf.add(TaskA('a'), TaskB('b'))
38
39print("Loading...")
40e = engines.load(wf)
41
42print("Compiling...")
43e.compile()
44
45print("Preparing...")
46e.prepare()
47
48print("Running...")
49e.run()
50
51print("Done...")

使用监听器

注意

完整源代码位于 echo_listener

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.DEBUG)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.listeners import logging as logging_listener
13from taskflow.patterns import linear_flow as lf
14from taskflow import task
15
16# INTRO: This example walks through a miniature workflow which will do a
17# simple echo operation; during this execution a listener is associated with
18# the engine to receive all notifications about what the flow has performed,
19# this example dumps that output to the stdout for viewing (at debug level
20# to show all the information which is possible).
21
22
23class Echo(task.Task):
24    def execute(self):
25        print(self.name)
26
27
28# Generate the work to be done (but don't do it yet).
29wf = lf.Flow('abc')
30wf.add(Echo('a'))
31wf.add(Echo('b'))
32wf.add(Echo('c'))
33
34# This will associate the listener with the engine (the listener
35# will automatically register for notifications with the engine and deregister
36# when the context is exited).
37e = engines.load(wf)
38with logging_listener.DynamicLoggingListener(e):
39    e.run()

使用监听器(监听电话)

注意

完整源代码位于 simple_linear_listening

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14from taskflow.types import notifier
15
16ANY = notifier.Notifier.ANY
17
18# INTRO: In this example we create two tasks (this time as functions instead
19# of task subclasses as in the simple_linear.py example), each of which ~calls~
20# a given ~phone~ number (provided as a function input) in a linear fashion
21# (one after the other).
22#
23# For a workflow which is serial this shows an extremely simple way
24# of structuring your tasks (the code that does the work) into a linear
25# sequence (the flow) and then passing the work off to an engine, with some
26# initial data to be ran in a reliable manner.
27#
28# This example shows a basic usage of the taskflow structures without involving
29# the complexity of persistence. Using the structures that taskflow provides
30# via tasks and flows makes it possible for you to easily at a later time
31# hook in a persistence layer (and then gain the functionality that offers)
32# when you decide the complexity of adding that layer in is 'worth it' for your
33# applications usage pattern (which some applications may not need).
34#
35# It **also** adds on to the simple_linear.py example by adding a set of
36# callback functions which the engine will call when a flow state transition
37# or task state transition occurs. These types of functions are useful for
38# updating task or flow progress, or for debugging, sending notifications to
39# external systems, or for other yet unknown future usage that you may create!
40
41
42def call_jim(context):
43    print("Calling jim.")
44    print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
45
46
47def call_joe(context):
48    print("Calling joe.")
49    print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
50
51
52def flow_watch(state, details):
53    print('Flow => %s' % state)
54
55
56def task_watch(state, details):
57    print('Task {} => {}'.format(details.get('task_name'), state))
58
59
60# Wrap your functions into a task type that knows how to treat your functions
61# as tasks. There was previous work done to just allow a function to be
62# directly passed, but in python 3.0 there is no easy way to capture an
63# instance method, so this wrapping approach was decided upon instead which
64# can attach to instance methods (if that's desired).
65flow = lf.Flow("Call-them")
66flow.add(task.FunctorTask(execute=call_jim))
67flow.add(task.FunctorTask(execute=call_joe))
68
69# Now load (but do not run) the flow using the provided initial data.
70engine = taskflow.engines.load(flow, store={
71    'context': {
72        "joe_number": 444,
73        "jim_number": 555,
74    }
75})
76
77# This is where we attach our callback functions to the 2 different
78# notification objects that an engine exposes. The usage of a ANY (kleene star)
79# here means that we want to be notified on all state changes, if you want to
80# restrict to a specific state change, just register that instead.
81engine.notifier.register(ANY, flow_watch)
82engine.atom_notifier.register(ANY, task_watch)
83
84# And now run!
85engine.run()

转储内存后端

注意

完整源代码位于 dump_memory_backend

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6self_dir = os.path.abspath(os.path.dirname(__file__))
 7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 8                                       os.pardir,
 9                                       os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13from taskflow import engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16
17# INTRO: in this example we create a dummy flow with a dummy task, and run
18# it using a in-memory backend and pre/post run we dump out the contents
19# of the in-memory backends tree structure (which can be quite useful to
20# look at for debugging or other analysis).
21
22
23class PrintTask(task.Task):
24    def execute(self):
25        print("Running '%s'" % self.name)
26
27# Make a little flow and run it...
28f = lf.Flow('root')
29for alpha in ['a', 'b', 'c']:
30    f.add(PrintTask(alpha))
31
32e = engines.load(f)
33e.compile()
34e.prepare()
35
36# After prepare the storage layer + backend can now be accessed safely...
37backend = e.storage.backend
38
39print("----------")
40print("Before run")
41print("----------")
42print(backend.memory.pformat())
43print("----------")
44
45e.run()
46
47print("---------")
48print("After run")
49print("---------")
50for path in backend.memory.ls_r(backend.memory.root_path, absolute=True):
51    value = backend.memory[path]
52    if value:
53        print("{} -> {}".format(path, value))
54    else:
55        print("%s" % (path))

拨打电话

注意

完整源代码位于 simple_linear

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15# INTRO: In this example we create two tasks, each of which ~calls~ a given
16# ~phone~ number (provided as a function input) in a linear fashion (one after
17# the other). For a workflow which is serial this shows a extremely simple way
18# of structuring your tasks (the code that does the work) into a linear
19# sequence (the flow) and then passing the work off to an engine, with some
20# initial data to be ran in a reliable manner.
21#
22# NOTE(harlowja): This example shows a basic usage of the taskflow structures
23# without involving the complexity of persistence. Using the structures that
24# taskflow provides via tasks and flows makes it possible for you to easily at
25# a later time hook in a persistence layer (and then gain the functionality
26# that offers) when you decide the complexity of adding that layer in
27# is 'worth it' for your application's usage pattern (which certain
28# applications may not need).
29
30
31class CallJim(task.Task):
32    def execute(self, jim_number, *args, **kwargs):
33        print("Calling jim %s." % jim_number)
34
35
36class CallJoe(task.Task):
37    def execute(self, joe_number, *args, **kwargs):
38        print("Calling joe %s." % joe_number)
39
40
41# Create your flow and associated tasks (the work to be done).
42flow = lf.Flow('simple-linear').add(
43    CallJim(),
44    CallJoe()
45)
46
47# Now run that flow using the provided initial data (store below).
48taskflow.engines.run(flow, store=dict(joe_number=444,
49                                      jim_number=555))

拨打电话(自动回滚)

注意

完整源代码位于 reverting_linear

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15# INTRO: In this example we create three tasks, each of which ~calls~ a given
16# number (provided as a function input), one of those tasks *fails* calling a
17# given number (the suzzie calling); this causes the workflow to enter the
18# reverting process, which activates the revert methods of the previous two
19# phone ~calls~.
20#
21# This simulated calling makes it appear like all three calls occur or all
22# three don't occur (transaction-like capabilities). No persistence layer is
23# used here so reverting and executing will *not* be tolerant of process
24# failure.
25
26
27class CallJim(task.Task):
28    def execute(self, jim_number, *args, **kwargs):
29        print("Calling jim %s." % jim_number)
30
31    def revert(self, jim_number, *args, **kwargs):
32        print("Calling %s and apologizing." % jim_number)
33
34
35class CallJoe(task.Task):
36    def execute(self, joe_number, *args, **kwargs):
37        print("Calling joe %s." % joe_number)
38
39    def revert(self, joe_number, *args, **kwargs):
40        print("Calling %s and apologizing." % joe_number)
41
42
43class CallSuzzie(task.Task):
44    def execute(self, suzzie_number, *args, **kwargs):
45        raise OSError("Suzzie not home right now.")
46
47
48# Create your flow and associated tasks (the work to be done).
49flow = lf.Flow('simple-linear').add(
50    CallJim(),
51    CallJoe(),
52    CallSuzzie()
53)
54
55try:
56    # Now run that flow using the provided initial data (store below).
57    taskflow.engines.run(flow, store=dict(joe_number=444,
58                                          jim_number=555,
59                                          suzzie_number=666))
60except Exception as e:
61    # NOTE(harlowja): This exception will be the exception that came out of the
62    # 'CallSuzzie' task instead of a different exception, this is useful since
63    # typically surrounding code wants to handle the original exception and not
64    # a wrapped or altered one.
65    #
66    # *WARNING* If this flow was multi-threaded and multiple active tasks threw
67    # exceptions then the above exception would be wrapped into a combined
68    # exception (the object has methods to iterate over the contained
69    # exceptions). See: exceptions.py and the class 'WrappedFailure' to look at
70    # how to deal with multiple tasks failing while running.
71    #
72    # You will also note that this is not a problem in this case since no
73    # parallelism is involved; this is ensured by the usage of a linear flow
74    # and the default engine type which is 'serial' vs being 'parallel'.
75    print("Flow failed: %s" % e)

制造汽车

注意

完整源代码位于 build_a_car

  1import os
  2import sys
  3
  4
  5logging.basicConfig(level=logging.ERROR)
  6
  7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  8                                       os.pardir,
  9                                       os.pardir))
 10sys.path.insert(0, top_dir)
 11
 12
 13import taskflow.engines
 14from taskflow.patterns import graph_flow as gf
 15from taskflow.patterns import linear_flow as lf
 16from taskflow import task
 17from taskflow.types import notifier
 18
 19ANY = notifier.Notifier.ANY
 20
 21import example_utils as eu  # noqa
 22
 23
 24# INTRO: This example shows how a graph flow and linear flow can be used
 25# together to execute dependent & non-dependent tasks by going through the
 26# steps required to build a simplistic car (an assembly line if you will). It
 27# also shows how raw functions can be wrapped into a task object instead of
 28# being forced to use the more *heavy* task base class. This is useful in
 29# scenarios where pre-existing code has functions that you easily want to
 30# plug-in to taskflow, without requiring a large amount of code changes.
 31
 32
 33def build_frame():
 34    return 'steel'
 35
 36
 37def build_engine():
 38    return 'honda'
 39
 40
 41def build_doors():
 42    return '2'
 43
 44
 45def build_wheels():
 46    return '4'
 47
 48
 49# These just return true to indiciate success, they would in the real work
 50# do more than just that.
 51
 52def install_engine(frame, engine):
 53    return True
 54
 55
 56def install_doors(frame, windows_installed, doors):
 57    return True
 58
 59
 60def install_windows(frame, doors):
 61    return True
 62
 63
 64def install_wheels(frame, engine, engine_installed, wheels):
 65    return True
 66
 67
 68def trash(**kwargs):
 69    eu.print_wrapped("Throwing away pieces of car!")
 70
 71
 72def startup(**kwargs):
 73    # If you want to see the rollback function being activated try uncommenting
 74    # the following line.
 75    #
 76    # raise ValueError("Car not verified")
 77    return True
 78
 79
 80def verify(spec, **kwargs):
 81    # If the car is not what we ordered throw away the car (trigger reversion).
 82    for key, value in kwargs.items():
 83        if spec[key] != value:
 84            raise Exception("Car doesn't match spec!")
 85    return True
 86
 87
 88# These two functions connect into the state transition notification emission
 89# points that the engine outputs, they can be used to log state transitions
 90# that are occurring, or they can be used to suspend the engine (or perform
 91# other useful activities).
 92def flow_watch(state, details):
 93    print('Flow => %s' % state)
 94
 95
 96def task_watch(state, details):
 97    print('Task {} => {}'.format(details.get('task_name'), state))
 98
 99
100flow = lf.Flow("make-auto").add(
101    task.FunctorTask(startup, revert=trash, provides='ran'),
102    # A graph flow allows automatic dependency based ordering, the ordering
103    # is determined by analyzing the symbols required and provided and ordering
104    # execution based on a functioning order (if one exists).
105    gf.Flow("install-parts").add(
106        task.FunctorTask(build_frame, provides='frame'),
107        task.FunctorTask(build_engine, provides='engine'),
108        task.FunctorTask(build_doors, provides='doors'),
109        task.FunctorTask(build_wheels, provides='wheels'),
110        # These *_installed outputs allow for other tasks to depend on certain
111        # actions being performed (aka the components were installed), another
112        # way to do this is to link() the tasks manually instead of creating
113        # an 'artificial' data dependency that accomplishes the same goal the
114        # manual linking would result in.
115        task.FunctorTask(install_engine, provides='engine_installed'),
116        task.FunctorTask(install_doors, provides='doors_installed'),
117        task.FunctorTask(install_windows, provides='windows_installed'),
118        task.FunctorTask(install_wheels, provides='wheels_installed')),
119    task.FunctorTask(verify, requires=['frame',
120                                       'engine',
121                                       'doors',
122                                       'wheels',
123                                       'engine_installed',
124                                       'doors_installed',
125                                       'windows_installed',
126                                       'wheels_installed']))
127
128# This dictionary will be provided to the tasks as a specification for what
129# the tasks should produce, in this example this specification will influence
130# what those tasks do and what output they create. Different tasks depend on
131# different information from this specification, all of which will be provided
132# automatically by the engine to those tasks.
133spec = {
134    "frame": 'steel',
135    "engine": 'honda',
136    "doors": '2',
137    "wheels": '4',
138    # These are used to compare the result product, a car without the pieces
139    # installed is not a car after all.
140    "engine_installed": True,
141    "doors_installed": True,
142    "windows_installed": True,
143    "wheels_installed": True,
144}
145
146
147engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
148
149# This registers all (ANY) state transitions to trigger a call to the
150# flow_watch function for flow state transitions, and registers the
151# same all (ANY) state transitions for task state transitions.
152engine.notifier.register(ANY, flow_watch)
153engine.atom_notifier.register(ANY, task_watch)
154
155eu.print_wrapped("Building a car")
156engine.run()
157
158# Alter the specification and ensure that the reverting logic gets triggered
159# since the resultant car that will be built by the build_wheels function will
160# build a car with 4 doors only (not 5), this will cause the verification
161# task to mark the car that is produced as not matching the desired spec.
162spec['doors'] = 5
163
164engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
165engine.notifier.register(ANY, flow_watch)
166engine.atom_notifier.register(ANY, task_watch)
167
168eu.print_wrapped("Building a wrong car that doesn't match specification")
169try:
170    engine.run()
171except Exception as e:
172    eu.print_wrapped("Flow failed: %s" % e)

迭代字母表(使用进程)

注意

完整源代码位于 alphabet_soup

 1import functools
 2import logging
 3import os
 4import string
 5import sys
 6import time
 7
 8logging.basicConfig(level=logging.ERROR)
 9
10self_dir = os.path.abspath(os.path.dirname(__file__))
11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12                                       os.pardir,
13                                       os.pardir))
14sys.path.insert(0, top_dir)
15sys.path.insert(0, self_dir)
16
17from taskflow import engines
18from taskflow import exceptions
19from taskflow.patterns import linear_flow
20from taskflow import task
21
22
23# In this example we show how a simple linear set of tasks can be executed
24# using local processes (and not threads or remote workers) with minimal (if
25# any) modification to those tasks to make them safe to run in this mode.
26#
27# This is useful since it allows further scaling up your workflows when thread
28# execution starts to become a bottleneck (which it can start to be due to the
29# GIL in python). It also offers a intermediary scalable runner that can be
30# used when the scale and/or setup of remote workers is not desirable.
31
32
33def progress_printer(task, event_type, details):
34    # This callback, attached to each task will be called in the local
35    # process (not the child processes)...
36    progress = details.pop('progress')
37    progress = int(progress * 100.0)
38    print("Task '%s' reached %d%% completion" % (task.name, progress))
39
40
41class AlphabetTask(task.Task):
42    # Second delay between each progress part.
43    _DELAY = 0.1
44
45    # This task will run in X main stages (each with a different progress
46    # report that will be delivered back to the running process...). The
47    # initial 0% and 100% are triggered automatically by the engine when
48    # a task is started and finished (so that's why those are not emitted
49    # here).
50    _PROGRESS_PARTS = [fractions.Fraction("%s/5" % x) for x in range(1, 5)]
51
52    def execute(self):
53        for p in self._PROGRESS_PARTS:
54            self.update_progress(p)
55            time.sleep(self._DELAY)
56
57
58print("Constructing...")
59soup = linear_flow.Flow("alphabet-soup")
60for letter in string.ascii_lowercase:
61    abc = AlphabetTask(letter)
62    abc.notifier.register(task.EVENT_UPDATE_PROGRESS,
63                          functools.partial(progress_printer, abc))
64    soup.add(abc)
65try:
66    print("Loading...")
67    e = engines.load(soup, engine='parallel', executor='processes')
68    print("Compiling...")
69    e.compile()
70    print("Preparing...")
71    e.prepare()
72    print("Running...")
73    e.run()
74    print("Done: %s" % e.statistics)
75except exceptions.NotImplementedError as e:
76    print(e)

观察执行时间

注意

完整源代码位于 timing_listener

 1import os
 2import random
 3import sys
 4import time
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13from taskflow import engines
14from taskflow.listeners import timing
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18# INTRO: in this example we will attach a listener to an engine
19# and have variable run time tasks run and show how the listener will print
20# out how long those tasks took (when they started and when they finished).
21#
22# This shows how timing metrics can be gathered (or attached onto an engine)
23# after a workflow has been constructed, making it easy to gather metrics
24# dynamically for situations where this kind of information is applicable (or
25# even adding this information on at a later point in the future when your
26# application starts to slow down).
27
28
29class VariableTask(task.Task):
30    def __init__(self, name):
31        super().__init__(name)
32        self._sleepy_time = random.random()
33
34    def execute(self):
35        time.sleep(self._sleepy_time)
36
37
38f = lf.Flow('root')
39f.add(VariableTask('a'), VariableTask('b'), VariableTask('c'))
40e = engines.load(f)
41with timing.PrintingDurationListener(e):
42    e.run()

距离计算器

注意

完整源代码位于 distance_calculator

 1import math
 2import os
 3import sys
 4
 5top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 6                                       os.pardir,
 7                                       os.pardir))
 8sys.path.insert(0, top_dir)
 9
10from taskflow import engines
11from taskflow.patterns import linear_flow
12from taskflow import task
13
14# INTRO: This shows how to use a tasks/atoms ability to take requirements from
15# its execute functions default parameters and shows how to provide those
16# via different methods when needed, to influence those parameters to in
17# this case calculate the distance between two points in 2D space.
18
19# A 2D point.
20Point = collections.namedtuple("Point", "x,y")
21
22
23def is_near(val, expected, tolerance=0.001):
24    # Floats don't really provide equality...
25    if val > (expected + tolerance):
26        return False
27    if val < (expected - tolerance):
28        return False
29    return True
30
31
32class DistanceTask(task.Task):
33    # See: http://en.wikipedia.org/wiki/Distance#Distance_in_Euclidean_space
34
35    default_provides = 'distance'
36
37    def execute(self, a=Point(0, 0), b=Point(0, 0)):
38        return math.sqrt(math.pow(b.x - a.x, 2) + math.pow(b.y - a.y, 2))
39
40
41if __name__ == '__main__':
42    # For these we rely on the execute() methods points by default being
43    # at the origin (and we override it with store values when we want) at
44    # execution time (which then influences what is calculated).
45    any_distance = linear_flow.Flow("origin").add(DistanceTask())
46    results = engines.run(any_distance)
47    print(results)
48    print("{} is near-enough to {}: {}".format(
49        results['distance'], 0.0, is_near(results['distance'], 0.0)))
50
51    results = engines.run(any_distance, store={'a': Point(1, 1)})
52    print(results)
53    print("{} is near-enough to {}: {}".format(
54        results['distance'], 1.4142, is_near(results['distance'], 1.4142)))
55
56    results = engines.run(any_distance, store={'a': Point(10, 10)})
57    print(results)
58    print("{} is near-enough to {}: {}".format(
59        results['distance'], 14.14199, is_near(results['distance'], 14.14199)))
60
61    results = engines.run(any_distance,
62                          store={'a': Point(5, 5), 'b': Point(10, 10)})
63    print(results)
64    print("{} is near-enough to {}: {}".format(
65        results['distance'], 7.07106, is_near(results['distance'], 7.07106)))
66
67    # For this we use the ability to override at task creation time the
68    # optional arguments so that we don't need to continue to send them
69    # in via the 'store' argument like in the above (and we fix the new
70    # starting point 'a' at (10, 10) instead of (0, 0)...
71
72    ten_distance = linear_flow.Flow("ten")
73    ten_distance.add(DistanceTask(inject={'a': Point(10, 10)}))
74    results = engines.run(ten_distance, store={'b': Point(10, 10)})
75    print(results)
76    print("{} is near-enough to {}: {}".format(
77        results['distance'], 0.0, is_near(results['distance'], 0.0)))
78
79    results = engines.run(ten_distance)
80    print(results)
81    print("{} is near-enough to {}: {}".format(
82        results['distance'], 14.14199, is_near(results['distance'], 14.14199)))

表格乘法器(并行)

注意

完整源代码位于 parallel_table_multiply

  1import logging
  2import os
  3import random
  4import sys
  5
  6logging.basicConfig(level=logging.ERROR)
  7
  8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                       os.pardir,
 10                                       os.pardir))
 11sys.path.insert(0, top_dir)
 12
 13import futurist
 14
 15from taskflow import engines
 16from taskflow.patterns import unordered_flow as uf
 17from taskflow import task
 18
 19# INTRO: This example walks through a miniature workflow which does a parallel
 20# table modification where each row in the table gets adjusted by a thread, or
 21# green thread (if eventlet is available) in parallel and then the result
 22# is reformed into a new table and some verifications are performed on it
 23# to ensure everything went as expected.
 24
 25
 26MULTIPLER = 10
 27
 28
 29class RowMultiplier(task.Task):
 30    """Performs a modification of an input row, creating a output row."""
 31
 32    def __init__(self, name, index, row, multiplier):
 33        super().__init__(name=name)
 34        self.index = index
 35        self.multiplier = multiplier
 36        self.row = row
 37
 38    def execute(self):
 39        return [r * self.multiplier for r in self.row]
 40
 41
 42def make_flow(table):
 43    # This creation will allow for parallel computation (since the flow here
 44    # is specifically unordered; and when things are unordered they have
 45    # no dependencies and when things have no dependencies they can just be
 46    # ran at the same time, limited in concurrency by the executor or max
 47    # workers of that executor...)
 48    f = uf.Flow("root")
 49    for i, row in enumerate(table):
 50        f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER))
 51    # NOTE(harlowja): at this point nothing has ran, the above is just
 52    # defining what should be done (but not actually doing it) and associating
 53    # an ordering dependencies that should be enforced (the flow pattern used
 54    # forces this), the engine in the later main() function will actually
 55    # perform this work...
 56    return f
 57
 58
 59def main():
 60    if len(sys.argv) == 2:
 61        tbl = []
 62        with open(sys.argv[1], 'rb') as fh:
 63            reader = csv.reader(fh)
 64            for row in reader:
 65                tbl.append([float(r) if r else 0.0 for r in row])
 66    else:
 67        # Make some random table out of thin air...
 68        tbl = []
 69        cols = random.randint(1, 100)
 70        rows = random.randint(1, 100)
 71        for _i in range(0, rows):
 72            row = []
 73            for _j in range(0, cols):
 74                row.append(random.random())
 75            tbl.append(row)
 76
 77    # Generate the work to be done.
 78    f = make_flow(tbl)
 79
 80    # Now run it (using the specified executor)...
 81    try:
 82        executor = futurist.GreenThreadPoolExecutor(max_workers=5)
 83    except RuntimeError:
 84        # No eventlet currently active, use real threads instead.
 85        executor = futurist.ThreadPoolExecutor(max_workers=5)
 86    try:
 87        e = engines.load(f, engine='parallel', executor=executor)
 88        for st in e.run_iter():
 89            print(st)
 90    finally:
 91        executor.shutdown()
 92
 93    # Find the old rows and put them into place...
 94    #
 95    # TODO(harlowja): probably easier just to sort instead of search...
 96    computed_tbl = []
 97    for i in range(0, len(tbl)):
 98        for t in f:
 99            if t.index == i:
100                computed_tbl.append(e.storage.get(t.name))
101
102    # Do some basic validation (which causes the return code of this process
103    # to be different if things were not as expected...)
104    if len(computed_tbl) != len(tbl):
105        return 1
106    else:
107        return 0
108
109
110if __name__ == "__main__":
111    sys.exit(main())

线性方程求解器(显式依赖项)

注意

完整源代码位于 calculate_linear

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15
16# INTRO: In this example a linear flow is used to group four tasks to calculate
17# a value. A single added task is used twice, showing how this can be done
18# and the twice added task takes in different bound values. In the first case
19# it uses default parameters ('x' and 'y') and in the second case arguments
20# are bound with ('z', 'd') keys from the engines internal storage mechanism.
21#
22# A multiplier task uses a binding that another task also provides, but this
23# example explicitly shows that 'z' parameter is bound with 'a' key
24# This shows that if a task depends on a key named the same as a key provided
25# from another task the name can be remapped to take the desired key from a
26# different origin.
27
28
29# This task provides some values from as a result of execution, this can be
30# useful when you want to provide values from a static set to other tasks that
31# depend on those values existing before those tasks can run.
32#
33# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
34# that just provides those values on engine running by prepopulating the
35# storage backend before your tasks are ran (which accomplishes a similar goal
36# in a more uniform manner).
37class Provider(task.Task):
38
39    def __init__(self, name, *args, **kwargs):
40        super().__init__(name=name, **kwargs)
41        self._provide = args
42
43    def execute(self):
44        return self._provide
45
46
47# This task adds two input variables and returns the result.
48#
49# Note that since this task does not have a revert() function (since addition
50# is a stateless operation) there are no side-effects that this function needs
51# to undo if some later operation fails.
52class Adder(task.Task):
53    def execute(self, x, y):
54        return x + y
55
56
57# This task multiplies an input variable by a multiplier and returns the
58# result.
59#
60# Note that since this task does not have a revert() function (since
61# multiplication is a stateless operation) and there are no side-effects that
62# this function needs to undo if some later operation fails.
63class Multiplier(task.Task):
64    def __init__(self, name, multiplier, provides=None, rebind=None):
65        super().__init__(name=name, provides=provides,
66                         rebind=rebind)
67        self._multiplier = multiplier
68
69    def execute(self, z):
70        return z * self._multiplier
71
72
73# Note here that the ordering is established so that the correct sequences
74# of operations occurs where the adding and multiplying is done according
75# to the expected and typical mathematical model. A graph flow could also be
76# used here to automatically infer & ensure the correct ordering.
77flow = lf.Flow('root').add(
78    # Provide the initial values for other tasks to depend on.
79    #
80    # x = 2, y = 3, d = 5
81    Provider("provide-adder", 2, 3, 5, provides=('x', 'y', 'd')),
82    # z = x+y = 5
83    Adder("add-1", provides='z'),
84    # a = z+d = 10
85    Adder("add-2", provides='a', rebind=['z', 'd']),
86    # Calculate 'r = a*3 = 30'
87    #
88    # Note here that the 'z' argument of the execute() function will not be
89    # bound to the 'z' variable provided from the above 'provider' object but
90    # instead the 'z' argument will be taken from the 'a' variable provided
91    # by the second add-2 listed above.
92    Multiplier("multi", 3, provides='r', rebind={'z': 'a'})
93)
94
95# The result here will be all results (from all tasks) which is stored in an
96# in-memory storage location that backs this engine since it is not configured
97# with persistence storage.
98results = taskflow.engines.run(flow)
99print(results)

线性方程求解器(推断依赖项)

来源: graph_flow.py

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import graph_flow as gf
13from taskflow.patterns import linear_flow as lf
14from taskflow import task
15
16
17# In this example there are complex *inferred* dependencies between tasks that
18# are used to perform a simple set of linear equations.
19#
20# As you will see below the tasks just define what they require as input
21# and produce as output (named values). Then the user doesn't care about
22# ordering the tasks (in this case the tasks calculate pieces of the overall
23# equation).
24#
25# As you will notice a graph flow resolves dependencies automatically using the
26# tasks symbol requirements and provided symbol values and no orderin
27# dependency has to be manually created.
28#
29# Also notice that flows of any types can be nested into a graph flow; showing
30# that subflow dependencies (and associated ordering) will be inferred too.
31
32
33class Adder(task.Task):
34
35    def execute(self, x, y):
36        return x + y
37
38
39flow = gf.Flow('root').add(
40    lf.Flow('nested_linear').add(
41        # x2 = y3+y4 = 12
42        Adder("add2", provides='x2', rebind=['y3', 'y4']),
43        # x1 = y1+y2 = 4
44        Adder("add1", provides='x1', rebind=['y1', 'y2'])
45    ),
46    # x5 = x1+x3 = 20
47    Adder("add5", provides='x5', rebind=['x1', 'x3']),
48    # x3 = x1+x2 = 16
49    Adder("add3", provides='x3', rebind=['x1', 'x2']),
50    # x4 = x2+y5 = 21
51    Adder("add4", provides='x4', rebind=['x2', 'y5']),
52    # x6 = x5+x4 = 41
53    Adder("add6", provides='x6', rebind=['x5', 'x4']),
54    # x7 = x6+x6 = 82
55    Adder("add7", provides='x7', rebind=['x6', 'x6']))
56
57# Provide the initial variable inputs using a storage dictionary.
58store = {
59    "y1": 1,
60    "y2": 3,
61    "y3": 5,
62    "y4": 7,
63    "y5": 9,
64}
65
66# This is the expected values that should be created.
67unexpected = 0
68expected = [
69    ('x1', 4),
70    ('x2', 12),
71    ('x3', 16),
72    ('x4', 21),
73    ('x5', 20),
74    ('x6', 41),
75    ('x7', 82),
76]
77
78result = taskflow.engines.run(
79    flow, engine='serial', store=store)
80
81print("Single threaded engine result %s" % result)
82for (name, value) in expected:
83    actual = result.get(name)
84    if actual != value:
85        sys.stderr.write("{} != {}\n".format(actual, value))
86        unexpected += 1
87
88result = taskflow.engines.run(
89    flow, engine='parallel', store=store)
90
91print("Multi threaded engine result %s" % result)
92for (name, value) in expected:
93    actual = result.get(name)
94    if actual != value:
95        sys.stderr.write("{} != {}\n".format(actual, value))
96        unexpected += 1
97
98if unexpected:
99    sys.exit(1)

线性方程求解器(并行)

注意

完整源代码位于 calculate_in_parallel

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow.patterns import unordered_flow as uf
14from taskflow import task
15
16# INTRO: These examples show how a linear flow and an unordered flow can be
17# used together to execute calculations in parallel and then use the
18# result for the next task/s. The adder task is used for all calculations
19# and argument bindings are used to set correct parameters for each task.
20
21
22# This task provides some values from as a result of execution, this can be
23# useful when you want to provide values from a static set to other tasks that
24# depend on those values existing before those tasks can run.
25#
26# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
27# that provides those values on engine running by prepopulating the storage
28# backend before your tasks are ran (which accomplishes a similar goal in a
29# more uniform manner).
30class Provider(task.Task):
31    def __init__(self, name, *args, **kwargs):
32        super().__init__(name=name, **kwargs)
33        self._provide = args
34
35    def execute(self):
36        return self._provide
37
38
39# This task adds two input variables and returns the result of that addition.
40#
41# Note that since this task does not have a revert() function (since addition
42# is a stateless operation) there are no side-effects that this function needs
43# to undo if some later operation fails.
44class Adder(task.Task):
45    def execute(self, x, y):
46        return x + y
47
48
49flow = lf.Flow('root').add(
50    # Provide the initial values for other tasks to depend on.
51    #
52    # x1 = 2, y1 = 3, x2 = 5, x3 = 8
53    Provider("provide-adder", 2, 3, 5, 8,
54             provides=('x1', 'y1', 'x2', 'y2')),
55    # Note here that we define the flow that contains the 2 adders to be an
56    # unordered flow since the order in which these execute does not matter,
57    # another way to solve this would be to use a graph_flow pattern, which
58    # also can run in parallel (since they have no ordering dependencies).
59    uf.Flow('adders').add(
60        # Calculate 'z1 = x1+y1 = 5'
61        #
62        # Rebind here means that the execute() function x argument will be
63        # satisfied from a previous output named 'x1', and the y argument
64        # of execute() will be populated from the previous output named 'y1'
65        #
66        # The output (result of adding) will be mapped into a variable named
67        # 'z1' which can then be refereed to and depended on by other tasks.
68        Adder(name="add", provides='z1', rebind=['x1', 'y1']),
69        # z2 = x2+y2 = 13
70        Adder(name="add-2", provides='z2', rebind=['x2', 'y2']),
71    ),
72    # r = z1+z2 = 18
73    Adder(name="sum-1", provides='r', rebind=['z1', 'z2']))
74
75
76# The result here will be all results (from all tasks) which is stored in an
77# in-memory storage location that backs this engine since it is not configured
78# with persistence storage.
79result = taskflow.engines.run(flow, engine='parallel')
80print(result)

创建卷(并行)

注意

完整源代码位于 create_parallel_volume

 1import logging
 2import os
 3import random
 4import sys
 5import time
 6
 7logging.basicConfig(level=logging.ERROR)
 8
 9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10                                       os.pardir,
11                                       os.pardir))
12sys.path.insert(0, top_dir)
13
14from oslo_utils import reflection
15
16from taskflow import engines
17from taskflow.listeners import printing
18from taskflow.patterns import unordered_flow as uf
19from taskflow import task
20
21# INTRO: These examples show how unordered_flow can be used to create a large
22# number of fake volumes in parallel (or serially, depending on a constant that
23# can be easily changed).
24
25
26@contextlib.contextmanager
27def show_time(name):
28    start = time.time()
29    yield
30    end = time.time()
31    print(" -- {} took {:0.3f} seconds".format(name, end - start))
32
33
34# This affects how many volumes to create and how much time to *simulate*
35# passing for that volume to be created.
36MAX_CREATE_TIME = 3
37VOLUME_COUNT = 5
38
39# This will be used to determine if all the volumes are created in parallel
40# or whether the volumes are created serially (in an undefined ordered since
41# a unordered flow is used). Note that there is a disconnection between the
42# ordering and the concept of parallelism (since unordered items can still be
43# ran in a serial ordering). A typical use-case for offering both is to allow
44# for debugging using a serial approach, while when running at a larger scale
45# one would likely want to use the parallel approach.
46#
47# If you switch this flag from serial to parallel you can see the overall
48# time difference that this causes.
49SERIAL = False
50if SERIAL:
51    engine = 'serial'
52else:
53    engine = 'parallel'
54
55
56class VolumeCreator(task.Task):
57    def __init__(self, volume_id):
58        # Note here that the volume name is composed of the name of the class
59        # along with the volume id that is being created, since a name of a
60        # task uniquely identifies that task in storage it is important that
61        # the name be relevant and identifiable if the task is recreated for
62        # subsequent resumption (if applicable).
63        #
64        # UUIDs are *not* used as they can not be tied back to a previous tasks
65        # state on resumption (since they are unique and will vary for each
66        # task that is created). A name based off the volume id that is to be
67        # created is more easily tied back to the original task so that the
68        # volume create can be resumed/revert, and is much easier to use for
69        # audit and tracking purposes.
70        base_name = reflection.get_callable_name(self)
71        super().__init__(name="{}-{}".format(base_name, volume_id))
72        self._volume_id = volume_id
73
74    def execute(self):
75        print("Making volume %s" % (self._volume_id))
76        time.sleep(random.random() * MAX_CREATE_TIME)
77        print("Finished making volume %s" % (self._volume_id))
78
79
80# Assume there is no ordering dependency between volumes.
81flow = uf.Flow("volume-maker")
82for i in range(0, VOLUME_COUNT):
83    flow.add(VolumeCreator(volume_id="vol-%s" % (i)))
84
85
86# Show how much time the overall engine loading and running takes.
87with show_time(name=flow.name.title()):
88    eng = engines.load(flow, engine=engine)
89    # This context manager automatically adds (and automatically removes) a
90    # helpful set of state transition notification printing helper utilities
91    # that show you exactly what transitions the engine is going through
92    # while running the various volume create tasks.
93    with printing.PrintingListener(eng):
94        eng.run()

求和映射器和归约器(并行)

注意

完整源代码位于 simple_map_reduce

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6self_dir = os.path.abspath(os.path.dirname(__file__))
 7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 8                                       os.pardir,
 9                                       os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13# INTRO: These examples show a simplistic map/reduce implementation where
14# a set of mapper(s) will sum a series of input numbers (in parallel) and
15# return their individual summed result. A reducer will then use those
16# produced values and perform a final summation and this result will then be
17# printed (and verified to ensure the calculation was as expected).
18
19from taskflow import engines
20from taskflow.patterns import linear_flow
21from taskflow.patterns import unordered_flow
22from taskflow import task
23
24
25class SumMapper(task.Task):
26    def execute(self, inputs):
27        # Sums some set of provided inputs.
28        return sum(inputs)
29
30
31class TotalReducer(task.Task):
32    def execute(self, *args, **kwargs):
33        # Reduces all mapped summed outputs into a single value.
34        total = 0
35        for (k, v) in kwargs.items():
36            # If any other kwargs was passed in, we don't want to use those
37            # in the calculation of the total...
38            if k.startswith('reduction_'):
39                total += v
40        return total
41
42
43def chunk_iter(chunk_size, upperbound):
44    """Yields back chunk size pieces from zero to upperbound - 1."""
45    chunk = []
46    for i in range(0, upperbound):
47        chunk.append(i)
48        if len(chunk) == chunk_size:
49            yield chunk
50            chunk = []
51
52
53# Upper bound of numbers to sum for example purposes...
54UPPER_BOUND = 10000
55
56# How many mappers we want to have.
57SPLIT = 10
58
59# How big of a chunk we want to give each mapper.
60CHUNK_SIZE = UPPER_BOUND // SPLIT
61
62# This will be the workflow we will compose and run.
63w = linear_flow.Flow("root")
64
65# The mappers will run in parallel.
66store = {}
67provided = []
68mappers = unordered_flow.Flow('map')
69for i, chunk in enumerate(chunk_iter(CHUNK_SIZE, UPPER_BOUND)):
70    mapper_name = 'mapper_%s' % i
71    # Give that mapper some information to compute.
72    store[mapper_name] = chunk
73    # The reducer uses all of the outputs of the mappers, so it needs
74    # to be recorded that it needs access to them (under a specific name).
75    provided.append("reduction_%s" % i)
76    mappers.add(SumMapper(name=mapper_name,
77                          rebind={'inputs': mapper_name},
78                          provides=provided[-1]))
79w.add(mappers)
80
81# The reducer will run last (after all the mappers).
82w.add(TotalReducer('reducer', requires=provided))
83
84# Now go!
85e = engines.load(w, engine='parallel', store=store, max_workers=4)
86print("Running a parallel engine with options: %s" % e.options)
87e.run()
88
89# Now get the result the reducer created.
90total = e.storage.get('reducer')
91print("Calculated result = %s" % total)
92
93# Calculate it manually to verify that it worked...
94calc_total = sum(range(0, UPPER_BOUND))
95if calc_total != total:
96    sys.exit(1)

共享线程池执行器(并行)

注意

完整源代码位于 share_engine_thread

 1import os
 2import random
 3import sys
 4import time
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13import futurist
14
15from taskflow import engines
16from taskflow.patterns import unordered_flow as uf
17from taskflow import task
18from taskflow.utils import threading_utils as tu
19
20# INTRO: in this example we create 2 dummy flow(s) with a 2 dummy task(s), and
21# run it using a shared thread pool executor to show how a single executor can
22# be used with more than one engine (sharing the execution thread pool between
23# them); this allows for saving resources and reusing threads in situations
24# where this is benefical.
25
26
27class DelayedTask(task.Task):
28    def __init__(self, name):
29        super().__init__(name=name)
30        self._wait_for = random.random()
31
32    def execute(self):
33        print("Running '{}' in thread '{}'".format(self.name, tu.get_ident()))
34        time.sleep(self._wait_for)
35
36
37f1 = uf.Flow("f1")
38f1.add(DelayedTask("f1-1"))
39f1.add(DelayedTask("f1-2"))
40
41f2 = uf.Flow("f2")
42f2.add(DelayedTask("f2-1"))
43f2.add(DelayedTask("f2-2"))
44
45# Run them all using the same futures (thread-pool based) executor...
46with futurist.ThreadPoolExecutor() as ex:
47    e1 = engines.load(f1, engine='parallel', executor=ex)
48    e2 = engines.load(f2, engine='parallel', executor=ex)
49    iters = [e1.run_iter(), e2.run_iter()]
50    # Iterate over a copy (so we can remove from the source list).
51    cloned_iters = list(iters)
52    while iters:
53        # Run a single 'step' of each iterator, forcing each engine to perform
54        # some work, then yield, and repeat until each iterator is consumed
55        # and there is no more engine work to be done.
56        for it in cloned_iters:
57            try:
58                next(it)
59            except StopIteration:
60                try:
61                    iters.remove(it)
62                except ValueError:
63                    pass

存储和发送账单

注意

完整源代码位于 fake_billing

  1import logging
  2import os
  3import sys
  4import time
  5
  6logging.basicConfig(level=logging.ERROR)
  7
  8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                       os.pardir,
 10                                       os.pardir))
 11sys.path.insert(0, top_dir)
 12
 13from oslo_utils import uuidutils
 14
 15from taskflow import engines
 16from taskflow.listeners import printing
 17from taskflow.patterns import graph_flow as gf
 18from taskflow.patterns import linear_flow as lf
 19from taskflow import task
 20from taskflow.utils import misc
 21
 22# INTRO: This example walks through a miniature workflow which simulates
 23# the reception of an API request, creation of a database entry, driver
 24# activation (which invokes a 'fake' webservice) and final completion.
 25#
 26# This example also shows how a function/object (in this class the url sending)
 27# that occurs during driver activation can update the progress of a task
 28# without being aware of the internals of how to do this by associating a
 29# callback that the url sending can update as the sending progresses from 0.0%
 30# complete to 100% complete.
 31
 32
 33class DB:
 34    def query(self, sql):
 35        print("Querying with: %s" % (sql))
 36
 37
 38class UrlCaller:
 39    def __init__(self):
 40        self._send_time = 0.5
 41        self._chunks = 25
 42
 43    def send(self, url, data, status_cb=None):
 44        sleep_time = float(self._send_time) / self._chunks
 45        for i in range(0, len(data)):
 46            time.sleep(sleep_time)
 47            # As we send the data, each chunk we 'fake' send will progress
 48            # the sending progress that much further to 100%.
 49            if status_cb:
 50                status_cb(float(i) / len(data))
 51
 52
 53# Since engines save the output of tasks to a optional persistent storage
 54# backend resources have to be dealt with in a slightly different manner since
 55# resources are transient and can *not* be persisted (or serialized). For tasks
 56# that require access to a set of resources it is a common pattern to provide
 57# a object (in this case this object) on construction of those tasks via the
 58# task constructor.
 59class ResourceFetcher:
 60    def __init__(self):
 61        self._db_handle = None
 62        self._url_handle = None
 63
 64    @property
 65    def db_handle(self):
 66        if self._db_handle is None:
 67            self._db_handle = DB()
 68        return self._db_handle
 69
 70    @property
 71    def url_handle(self):
 72        if self._url_handle is None:
 73            self._url_handle = UrlCaller()
 74        return self._url_handle
 75
 76
 77class ExtractInputRequest(task.Task):
 78    def __init__(self, resources):
 79        super().__init__(provides="parsed_request")
 80        self._resources = resources
 81
 82    def execute(self, request):
 83        return {
 84            'user': request.user,
 85            'user_id': misc.as_int(request.id),
 86            'request_id': uuidutils.generate_uuid(),
 87        }
 88
 89
 90class MakeDBEntry(task.Task):
 91    def __init__(self, resources):
 92        super().__init__()
 93        self._resources = resources
 94
 95    def execute(self, parsed_request):
 96        db_handle = self._resources.db_handle
 97        db_handle.query("INSERT %s INTO mydb" % (parsed_request))
 98
 99    def revert(self, result, parsed_request):
100        db_handle = self._resources.db_handle
101        db_handle.query("DELETE %s FROM mydb IF EXISTS" % (parsed_request))
102
103
104class ActivateDriver(task.Task):
105    def __init__(self, resources):
106        super().__init__(provides='sent_to')
107        self._resources = resources
108        self._url = "http://blahblah.com"
109
110    def execute(self, parsed_request):
111        print("Sending billing data to %s" % (self._url))
112        url_sender = self._resources.url_handle
113        # Note that here we attach our update_progress function (which is a
114        # function that the engine also 'binds' to) to the progress function
115        # that the url sending helper class uses. This allows the task progress
116        # to be tied to the url sending progress, which is very useful for
117        # downstream systems to be aware of what a task is doing at any time.
118        url_sender.send(self._url, json.dumps(parsed_request),
119                        status_cb=self.update_progress)
120        return self._url
121
122    def update_progress(self, progress, **kwargs):
123        # Override the parent method to also print out the status.
124        super().update_progress(progress, **kwargs)
125        print("{} is {:0.2f}% done".format(self.name, progress * 100))
126
127
128class DeclareSuccess(task.Task):
129    def execute(self, sent_to):
130        print("Done!")
131        print("All data processed and sent to %s" % (sent_to))
132
133
134class DummyUser:
135    def __init__(self, user, id_):
136        self.user = user
137        self.id = id_
138
139
140# Resources (db handles and similar) of course can *not* be persisted so we
141# need to make sure that we pass this resource fetcher to the tasks constructor
142# so that the tasks have access to any needed resources (the resources are
143# lazily loaded so that they are only created when they are used).
144resources = ResourceFetcher()
145flow = lf.Flow("initialize-me")
146
147# 1. First we extract the api request into a usable format.
148# 2. Then we go ahead and make a database entry for our request.
149flow.add(ExtractInputRequest(resources), MakeDBEntry(resources))
150
151# 3. Then we activate our payment method and finally declare success.
152sub_flow = gf.Flow("after-initialize")
153sub_flow.add(ActivateDriver(resources), DeclareSuccess())
154flow.add(sub_flow)
155
156# Initially populate the storage with the following request object,
157# prepopulating this allows the tasks that dependent on the 'request' variable
158# to start processing (in this case this is the ExtractInputRequest task).
159store = {
160    'request': DummyUser(user="bob", id_="1.35"),
161}
162eng = engines.load(flow, engine='serial', store=store)
163
164# This context manager automatically adds (and automatically removes) a
165# helpful set of state transition notification printing helper utilities
166# that show you exactly what transitions the engine is going through
167# while running the various billing related tasks.
168with printing.PrintingListener(eng):
169    eng.run()

暂停工作流并恢复

注意

完整源代码位于 resume_from_backend

  1import logging
  2import os
  3import sys
  4
  5logging.basicConfig(level=logging.ERROR)
  6
  7self_dir = os.path.abspath(os.path.dirname(__file__))
  8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                       os.pardir,
 10                                       os.pardir))
 11sys.path.insert(0, top_dir)
 12sys.path.insert(0, self_dir)
 13
 14from oslo_utils import uuidutils
 15
 16import taskflow.engines
 17from taskflow.patterns import linear_flow as lf
 18from taskflow.persistence import models
 19from taskflow import task
 20
 21import example_utils as eu  # noqa
 22
 23# INTRO: In this example linear_flow is used to group three tasks, one which
 24# will suspend the future work the engine may do. This suspend engine is then
 25# discarded and the workflow is reloaded from the persisted data and then the
 26# workflow is resumed from where it was suspended. This allows you to see how
 27# to start an engine, have a task stop the engine from doing future work (if
 28# a multi-threaded engine is being used, then the currently active work is not
 29# preempted) and then resume the work later.
 30#
 31# Usage:
 32#
 33#   With a filesystem directory as backend
 34#
 35#     python taskflow/examples/resume_from_backend.py
 36#
 37#   With ZooKeeper as backend
 38#
 39#     python taskflow/examples/resume_from_backend.py \
 40#       zookeeper://127.0.0.1:2181/taskflow/resume_from_backend/
 41
 42
 43# UTILITY FUNCTIONS #########################################
 44
 45
 46def print_task_states(flowdetail, msg):
 47    eu.print_wrapped(msg)
 48    print("Flow '{}' state: {}".format(flowdetail.name, flowdetail.state))
 49    # Sort by these so that our test validation doesn't get confused by the
 50    # order in which the items in the flow detail can be in.
 51    items = sorted((td.name, td.version, td.state, td.results)
 52                   for td in flowdetail)
 53    for item in items:
 54        print(" %s==%s: %s, result=%s" % item)
 55
 56
 57def find_flow_detail(backend, lb_id, fd_id):
 58    conn = backend.get_connection()
 59    lb = conn.get_logbook(lb_id)
 60    return lb.find(fd_id)
 61
 62
 63# CREATE FLOW ###############################################
 64
 65
 66class InterruptTask(task.Task):
 67    def execute(self):
 68        # DO NOT TRY THIS AT HOME
 69        engine.suspend()
 70
 71
 72class TestTask(task.Task):
 73    def execute(self):
 74        print('executing %s' % self)
 75        return 'ok'
 76
 77
 78def flow_factory():
 79    return lf.Flow('resume from backend example').add(
 80        TestTask(name='first'),
 81        InterruptTask(name='boom'),
 82        TestTask(name='second'))
 83
 84
 85# INITIALIZE PERSISTENCE ####################################
 86
 87with eu.get_backend() as backend:
 88
 89    # Create a place where the persistence information will be stored.
 90    book = models.LogBook("example")
 91    flow_detail = models.FlowDetail("resume from backend example",
 92                                    uuid=uuidutils.generate_uuid())
 93    book.add(flow_detail)
 94    with contextlib.closing(backend.get_connection()) as conn:
 95        conn.save_logbook(book)
 96
 97    # CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################
 98
 99    flow = flow_factory()
100    engine = taskflow.engines.load(flow, flow_detail=flow_detail,
101                                   book=book, backend=backend)
102
103    print_task_states(flow_detail, "At the beginning, there is no state")
104    eu.print_wrapped("Running")
105    engine.run()
106    print_task_states(flow_detail, "After running")
107
108    # RE-CREATE, RESUME, RUN ####################################
109
110    eu.print_wrapped("Resuming and running again")
111
112    # NOTE(harlowja): reload the flow detail from backend, this will allow us
113    # to resume the flow from its suspended state, but first we need to search
114    # for the right flow details in the correct logbook where things are
115    # stored.
116    #
117    # We could avoid re-loading the engine and just do engine.run() again, but
118    # this example shows how another process may unsuspend a given flow and
119    # start it again for situations where this is useful to-do (say the process
120    # running the above flow crashes).
121    flow2 = flow_factory()
122    flow_detail_2 = find_flow_detail(backend, book.uuid, flow_detail.uuid)
123    engine2 = taskflow.engines.load(flow2,
124                                    flow_detail=flow_detail_2,
125                                    backend=backend, book=book)
126    engine2.run()
127    print_task_states(flow_detail_2, "At the end")

创建虚拟机(可恢复)

注意

完整源代码位于 resume_vm_boot

  1import hashlib
  2import logging
  3import os
  4import random
  5import sys
  6import time
  7
  8logging.basicConfig(level=logging.ERROR)
  9
 10self_dir = os.path.abspath(os.path.dirname(__file__))
 11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 12                                       os.pardir,
 13                                       os.pardir))
 14sys.path.insert(0, top_dir)
 15sys.path.insert(0, self_dir)
 16
 17import futurist
 18from oslo_utils import uuidutils
 19
 20from taskflow import engines
 21from taskflow import exceptions as exc
 22from taskflow.patterns import graph_flow as gf
 23from taskflow.patterns import linear_flow as lf
 24from taskflow.persistence import models
 25from taskflow import task
 26
 27import example_utils as eu  # noqa
 28
 29# INTRO: These examples show how a hierarchy of flows can be used to create a
 30# vm in a reliable & resumable manner using taskflow + a miniature version of
 31# what nova does while booting a vm.
 32
 33
 34@contextlib.contextmanager
 35def slow_down(how_long=0.5):
 36    try:
 37        yield how_long
 38    finally:
 39        if len(sys.argv) > 1:
 40            # Only both to do this if user input provided.
 41            print("** Ctrl-c me please!!! **")
 42            time.sleep(how_long)
 43
 44
 45class PrintText(task.Task):
 46    """Just inserts some text print outs in a workflow."""
 47    def __init__(self, print_what, no_slow=False):
 48        content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
 49        super().__init__(name="Print: %s" % (content_hash))
 50        self._text = print_what
 51        self._no_slow = no_slow
 52
 53    def execute(self):
 54        if self._no_slow:
 55            eu.print_wrapped(self._text)
 56        else:
 57            with slow_down():
 58                eu.print_wrapped(self._text)
 59
 60
 61class DefineVMSpec(task.Task):
 62    """Defines a vm specification to be."""
 63    def __init__(self, name):
 64        super().__init__(provides='vm_spec', name=name)
 65
 66    def execute(self):
 67        return {
 68            'type': 'kvm',
 69            'disks': 2,
 70            'vcpu': 1,
 71            'ips': 1,
 72            'volumes': 3,
 73        }
 74
 75
 76class LocateImages(task.Task):
 77    """Locates where the vm images are."""
 78    def __init__(self, name):
 79        super().__init__(provides='image_locations', name=name)
 80
 81    def execute(self, vm_spec):
 82        image_locations = {}
 83        for i in range(0, vm_spec['disks']):
 84            url = "http://www.yahoo.com/images/%s" % (i)
 85            image_locations[url] = "/tmp/%s.img" % (i)
 86        return image_locations
 87
 88
 89class DownloadImages(task.Task):
 90    """Downloads all the vm images."""
 91    def __init__(self, name):
 92        super().__init__(provides='download_paths',
 93                         name=name)
 94
 95    def execute(self, image_locations):
 96        for src, loc in image_locations.items():
 97            with slow_down(1):
 98                print("Downloading from {} => {}".format(src, loc))
 99        return sorted(image_locations.values())
100
101
102class CreateNetworkTpl(task.Task):
103    """Generates the network settings file to be placed in the images."""
104    SYSCONFIG_CONTENTS = """DEVICE=eth%s
105BOOTPROTO=static
106IPADDR=%s
107ONBOOT=yes"""
108
109    def __init__(self, name):
110        super().__init__(provides='network_settings',
111                         name=name)
112
113    def execute(self, ips):
114        settings = []
115        for i, ip in enumerate(ips):
116            settings.append(self.SYSCONFIG_CONTENTS % (i, ip))
117        return settings
118
119
120class AllocateIP(task.Task):
121    """Allocates the ips for the given vm."""
122    def __init__(self, name):
123        super().__init__(provides='ips', name=name)
124
125    def execute(self, vm_spec):
126        ips = []
127        for _i in range(0, vm_spec.get('ips', 0)):
128            ips.append("192.168.0.%s" % (random.randint(1, 254)))
129        return ips
130
131
132class WriteNetworkSettings(task.Task):
133    """Writes all the network settings into the downloaded images."""
134    def execute(self, download_paths, network_settings):
135        for j, path in enumerate(download_paths):
136            with slow_down(1):
137                print("Mounting {} to /tmp/{}".format(path, j))
138            for i, setting in enumerate(network_settings):
139                filename = ("/tmp/etc/sysconfig/network-scripts/"
140                            "ifcfg-eth%s" % (i))
141                with slow_down(1):
142                    print("Writing to %s" % (filename))
143                    print(setting)
144
145
146class BootVM(task.Task):
147    """Fires off the vm boot operation."""
148    def execute(self, vm_spec):
149        print("Starting vm!")
150        with slow_down(1):
151            print("Created: %s" % (vm_spec))
152
153
154class AllocateVolumes(task.Task):
155    """Allocates the volumes for the vm."""
156    def execute(self, vm_spec):
157        volumes = []
158        for i in range(0, vm_spec['volumes']):
159            with slow_down(1):
160                volumes.append("/dev/vda%s" % (i + 1))
161                print("Allocated volume %s" % volumes[-1])
162        return volumes
163
164
165class FormatVolumes(task.Task):
166    """Formats the volumes for the vm."""
167    def execute(self, volumes):
168        for v in volumes:
169            print("Formatting volume %s" % v)
170            with slow_down(1):
171                pass
172            print("Formatted volume %s" % v)
173
174
175def create_flow():
176    # Setup the set of things to do (mini-nova).
177    flow = lf.Flow("root").add(
178        PrintText("Starting vm creation.", no_slow=True),
179        lf.Flow('vm-maker').add(
180            # First create a specification for the final vm to-be.
181            DefineVMSpec("define_spec"),
182            # This does all the image stuff.
183            gf.Flow("img-maker").add(
184                LocateImages("locate_images"),
185                DownloadImages("download_images"),
186            ),
187            # This does all the network stuff.
188            gf.Flow("net-maker").add(
189                AllocateIP("get_my_ips"),
190                CreateNetworkTpl("fetch_net_settings"),
191                WriteNetworkSettings("write_net_settings"),
192            ),
193            # This does all the volume stuff.
194            gf.Flow("volume-maker").add(
195                AllocateVolumes("allocate_my_volumes", provides='volumes'),
196                FormatVolumes("volume_formatter"),
197            ),
198            # Finally boot it all.
199            BootVM("boot-it"),
200        ),
201        # Ya it worked!
202        PrintText("Finished vm create.", no_slow=True),
203        PrintText("Instance is running!", no_slow=True))
204    return flow
205
206eu.print_wrapped("Initializing")
207
208# Setup the persistence & resumption layer.
209with eu.get_backend() as backend:
210
211    # Try to find a previously passed in tracking id...
212    try:
213        book_id, flow_id = sys.argv[2].split("+", 1)
214        if not uuidutils.is_uuid_like(book_id):
215            book_id = None
216        if not uuidutils.is_uuid_like(flow_id):
217            flow_id = None
218    except (IndexError, ValueError):
219        book_id = None
220        flow_id = None
221
222    # Set up how we want our engine to run, serial, parallel...
223    try:
224        executor = futurist.GreenThreadPoolExecutor(max_workers=5)
225    except RuntimeError:
226        # No eventlet installed, just let the default be used instead.
227        executor = None
228
229    # Create/fetch a logbook that will track the workflows work.
230    book = None
231    flow_detail = None
232    if all([book_id, flow_id]):
233        # Try to find in a prior logbook and flow detail...
234        with contextlib.closing(backend.get_connection()) as conn:
235            try:
236                book = conn.get_logbook(book_id)
237                flow_detail = book.find(flow_id)
238            except exc.NotFound:
239                pass
240    if book is None and flow_detail is None:
241        book = models.LogBook("vm-boot")
242        with contextlib.closing(backend.get_connection()) as conn:
243            conn.save_logbook(book)
244        engine = engines.load_from_factory(create_flow,
245                                           backend=backend, book=book,
246                                           engine='parallel',
247                                           executor=executor)
248        print("!! Your tracking id is: '{}+{}'".format(
249            book.uuid, engine.storage.flow_uuid))
250        print("!! Please submit this on later runs for tracking purposes")
251    else:
252        # Attempt to load from a previously partially completed flow.
253        engine = engines.load_from_detail(flow_detail, backend=backend,
254                                          engine='parallel', executor=executor)
255
256    # Make me my vm please!
257    eu.print_wrapped('Running')
258    engine.run()
259
260# How to use.
261#
262# 1. $ python me.py "sqlite:////tmp/nova.db"
263# 2. ctrl-c before this finishes
264# 3. Find the tracking id (search for 'Your tracking id is')
265# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
266# 5. Watch it pick up where it left off.
267# 6. Profit!

创建卷(可恢复)

注意

完整源代码位于 resume_volume_create

  1import hashlib
  2import logging
  3import os
  4import random
  5import sys
  6import time
  7
  8logging.basicConfig(level=logging.ERROR)
  9
 10self_dir = os.path.abspath(os.path.dirname(__file__))
 11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 12                                       os.pardir,
 13                                       os.pardir))
 14sys.path.insert(0, top_dir)
 15sys.path.insert(0, self_dir)
 16
 17from oslo_utils import uuidutils
 18
 19from taskflow import engines
 20from taskflow.patterns import graph_flow as gf
 21from taskflow.patterns import linear_flow as lf
 22from taskflow.persistence import models
 23from taskflow import task
 24
 25import example_utils  # noqa
 26
 27# INTRO: These examples show how a hierarchy of flows can be used to create a
 28# pseudo-volume in a reliable & resumable manner using taskflow + a miniature
 29# version of what cinder does while creating a volume (very miniature).
 30
 31
 32@contextlib.contextmanager
 33def slow_down(how_long=0.5):
 34    try:
 35        yield how_long
 36    finally:
 37        print("** Ctrl-c me please!!! **")
 38        time.sleep(how_long)
 39
 40
 41def find_flow_detail(backend, book_id, flow_id):
 42    # NOTE(harlowja): this is used to attempt to find a given logbook with
 43    # a given id and a given flow details inside that logbook, we need this
 44    # reference so that we can resume the correct flow (as a logbook tracks
 45    # flows and a flow detail tracks a individual flow).
 46    #
 47    # Without a reference to the logbook and the flow details in that logbook
 48    # we will not know exactly what we should resume and that would mean we
 49    # can't resume what we don't know.
 50    with contextlib.closing(backend.get_connection()) as conn:
 51        lb = conn.get_logbook(book_id)
 52        return lb.find(flow_id)
 53
 54
 55class PrintText(task.Task):
 56    def __init__(self, print_what, no_slow=False):
 57        content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
 58        super().__init__(name="Print: %s" % (content_hash))
 59        self._text = print_what
 60        self._no_slow = no_slow
 61
 62    def execute(self):
 63        if self._no_slow:
 64            print("-" * (len(self._text)))
 65            print(self._text)
 66            print("-" * (len(self._text)))
 67        else:
 68            with slow_down():
 69                print("-" * (len(self._text)))
 70                print(self._text)
 71                print("-" * (len(self._text)))
 72
 73
 74class CreateSpecForVolumes(task.Task):
 75    def execute(self):
 76        volumes = []
 77        for i in range(0, random.randint(1, 10)):
 78            volumes.append({
 79                'type': 'disk',
 80                'location': "/dev/vda%s" % (i + 1),
 81            })
 82        return volumes
 83
 84
 85class PrepareVolumes(task.Task):
 86    def execute(self, volume_specs):
 87        for v in volume_specs:
 88            with slow_down():
 89                print("Dusting off your hard drive %s" % (v))
 90            with slow_down():
 91                print("Taking a well deserved break.")
 92            print("Your drive %s has been certified." % (v))
 93
 94
 95# Setup the set of things to do (mini-cinder).
 96flow = lf.Flow("root").add(
 97    PrintText("Starting volume create", no_slow=True),
 98    gf.Flow('maker').add(
 99        CreateSpecForVolumes("volume_specs", provides='volume_specs'),
100        PrintText("I need a nap, it took me a while to build those specs."),
101        PrepareVolumes(),
102    ),
103    PrintText("Finished volume create", no_slow=True))
104
105# Setup the persistence & resumption layer.
106with example_utils.get_backend() as backend:
107    try:
108        book_id, flow_id = sys.argv[2].split("+", 1)
109    except (IndexError, ValueError):
110        book_id = None
111        flow_id = None
112
113    if not all([book_id, flow_id]):
114        # If no 'tracking id' (think a fedex or ups tracking id) is provided
115        # then we create one by creating a logbook (where flow details are
116        # stored) and creating a flow detail (where flow and task state is
117        # stored). The combination of these 2 objects unique ids (uuids) allows
118        # the users of taskflow to reassociate the workflows that were
119        # potentially running (and which may have partially completed) back
120        # with taskflow so that those workflows can be resumed (or reverted)
121        # after a process/thread/engine has failed in someway.
122        book = models.LogBook('resume-volume-create')
123        flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid())
124        book.add(flow_detail)
125        with contextlib.closing(backend.get_connection()) as conn:
126            conn.save_logbook(book)
127        print("!! Your tracking id is: '{}+{}'".format(book.uuid,
128                                                       flow_detail.uuid))
129        print("!! Please submit this on later runs for tracking purposes")
130    else:
131        flow_detail = find_flow_detail(backend, book_id, flow_id)
132
133    # Load and run.
134    engine = engines.load(flow,
135                          flow_detail=flow_detail,
136                          backend=backend, engine='serial')
137    engine.run()
138
139# How to use.
140#
141# 1. $ python me.py "sqlite:////tmp/cinder.db"
142# 2. ctrl-c before this finishes
143# 3. Find the tracking id (search for 'Your tracking id is')
144# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
145# 5. Profit!

通过迭代运行引擎

注意

完整源代码位于 run_by_iter

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6self_dir = os.path.abspath(os.path.dirname(__file__))
 7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 8                                       os.pardir,
 9                                       os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13
14from taskflow import engines
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18
19# INTRO: This example shows how to run a set of engines at the same time, each
20# running in different engines using a single thread of control to iterate over
21# each engine (which causes that engine to advanced to its next state during
22# each iteration).
23
24
25class EchoTask(task.Task):
26    def execute(self, value):
27        print(value)
28        return chr(ord(value) + 1)
29
30
31def make_alphabet_flow(i):
32    f = lf.Flow("alphabet_%s" % (i))
33    start_value = 'A'
34    end_value = 'Z'
35    curr_value = start_value
36    while ord(curr_value) <= ord(end_value):
37        next_value = chr(ord(curr_value) + 1)
38        if curr_value != end_value:
39            f.add(EchoTask(name="echoer_%s" % curr_value,
40                           rebind={'value': curr_value},
41                           provides=next_value))
42        else:
43            f.add(EchoTask(name="echoer_%s" % curr_value,
44                           rebind={'value': curr_value}))
45        curr_value = next_value
46    return f
47
48
49# Adjust this number to change how many engines/flows run at once.
50flow_count = 1
51flows = []
52for i in range(0, flow_count):
53    f = make_alphabet_flow(i + 1)
54    flows.append(make_alphabet_flow(i + 1))
55engine_iters = []
56for f in flows:
57    e = engines.load(f)
58    e.compile()
59    e.storage.inject({'A': 'A'})
60    e.prepare()
61    engine_iters.append(e.run_iter())
62while engine_iters:
63    for it in list(engine_iters):
64        try:
65            print(next(it))
66        except StopIteration:
67            engine_iters.remove(it)

使用重试控制器控制重试

注意

完整源代码位于 retry_flow

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import retry
14from taskflow import task
15
16# INTRO: In this example we create a retry controller that receives a phone
17# directory and tries different phone numbers. The next task tries to call Jim
18# using the given number. If it is not a Jim's number, the task raises an
19# exception and retry controller takes the next number from the phone
20# directory and retries the call.
21#
22# This example shows a basic usage of retry controllers in a flow.
23# Retry controllers allows to revert and retry a failed subflow with new
24# parameters.
25
26
27class CallJim(task.Task):
28    def execute(self, jim_number):
29        print("Calling jim %s." % jim_number)
30        if jim_number != 555:
31            raise Exception("Wrong number!")
32        else:
33            print("Hello Jim!")
34
35    def revert(self, jim_number, **kwargs):
36        print("Wrong number, apologizing.")
37
38
39# Create your flow and associated tasks (the work to be done).
40flow = lf.Flow('retrying-linear',
41               retry=retry.ParameterizedForEach(
42                   rebind=['phone_directory'],
43                   provides='jim_number')).add(CallJim())
44
45# Now run that flow using the provided initial data (store below).
46taskflow.engines.run(flow, store={'phone_directory': [333, 444, 555, 666]})

分布式执行(简单)

注意

完整源代码位于 wbe_simple_linear

  1import logging
  2import os
  3import sys
  4import tempfile
  5
  6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  7                                       os.pardir,
  8                                       os.pardir))
  9sys.path.insert(0, top_dir)
 10
 11from taskflow import engines
 12from taskflow.engines.worker_based import worker
 13from taskflow.patterns import linear_flow as lf
 14from taskflow.tests import utils
 15from taskflow.utils import threading_utils
 16
 17import example_utils  # noqa
 18
 19# INTRO: This example walks through a miniature workflow which shows how to
 20# start up a number of workers (these workers will process task execution and
 21# reversion requests using any provided input data) and then use an engine
 22# that creates a set of *capable* tasks and flows (the engine can not create
 23# tasks that the workers are not able to run, this will end in failure) that
 24# those workers will run and then executes that workflow seamlessly using the
 25# workers to perform the actual execution.
 26#
 27# NOTE(harlowja): this example simulates the expected larger number of workers
 28# by using a set of threads (which in this example simulate the remote workers
 29# that would typically be running on other external machines).
 30
 31# A filesystem can also be used as the queue transport (useful as simple
 32# transport type that does not involve setting up a larger mq system). If this
 33# is false then the memory transport is used instead, both work in standalone
 34# setups.
 35USE_FILESYSTEM = False
 36BASE_SHARED_CONF = {
 37    'exchange': 'taskflow',
 38}
 39
 40# Until https://github.com/celery/kombu/issues/398 is resolved it is not
 41# recommended to run many worker threads in this example due to the types
 42# of errors mentioned in that issue.
 43MEMORY_WORKERS = 2
 44FILE_WORKERS = 1
 45WORKER_CONF = {
 46    # These are the tasks the worker can execute, they *must* be importable,
 47    # typically this list is used to restrict what workers may execute to
 48    # a smaller set of *allowed* tasks that are known to be safe (one would
 49    # not want to allow all python code to be executed).
 50    'tasks': [
 51        'taskflow.tests.utils:TaskOneArgOneReturn',
 52        'taskflow.tests.utils:TaskMultiArgOneReturn'
 53    ],
 54}
 55
 56
 57def run(engine_options):
 58    flow = lf.Flow('simple-linear').add(
 59        utils.TaskOneArgOneReturn(provides='result1'),
 60        utils.TaskMultiArgOneReturn(provides='result2')
 61    )
 62    eng = engines.load(flow,
 63                       store=dict(x=111, y=222, z=333),
 64                       engine='worker-based', **engine_options)
 65    eng.run()
 66    return eng.storage.fetch_all()
 67
 68
 69if __name__ == "__main__":
 70    logging.basicConfig(level=logging.ERROR)
 71
 72    # Setup our transport configuration and merge it into the worker and
 73    # engine configuration so that both of those use it correctly.
 74    shared_conf = dict(BASE_SHARED_CONF)
 75
 76    tmp_path = None
 77    if USE_FILESYSTEM:
 78        worker_count = FILE_WORKERS
 79        tmp_path = tempfile.mkdtemp(prefix='wbe-example-')
 80        shared_conf.update({
 81            'transport': 'filesystem',
 82            'transport_options': {
 83                'data_folder_in': tmp_path,
 84                'data_folder_out': tmp_path,
 85                'polling_interval': 0.1,
 86            },
 87        })
 88    else:
 89        worker_count = MEMORY_WORKERS
 90        shared_conf.update({
 91            'transport': 'memory',
 92            'transport_options': {
 93                'polling_interval': 0.1,
 94            },
 95        })
 96    worker_conf = dict(WORKER_CONF)
 97    worker_conf.update(shared_conf)
 98    engine_options = dict(shared_conf)
 99    workers = []
100    worker_topics = []
101
102    try:
103        # Create a set of workers to simulate actual remote workers.
104        print('Running %s workers.' % (worker_count))
105        for i in range(0, worker_count):
106            worker_conf['topic'] = 'worker-%s' % (i + 1)
107            worker_topics.append(worker_conf['topic'])
108            w = worker.Worker(**worker_conf)
109            runner = threading_utils.daemon_thread(w.run)
110            runner.start()
111            w.wait()
112            workers.append((runner, w.stop))
113
114        # Now use those workers to do something.
115        print('Executing some work.')
116        engine_options['topics'] = worker_topics
117        result = run(engine_options)
118        print('Execution finished.')
119        # This is done so that the test examples can work correctly
120        # even when the keys change order (which will happen in various
121        # python versions).
122        print("Result = %s" % json.dumps(result, sort_keys=True))
123    finally:
124        # And cleanup.
125        print('Stopping workers.')
126        while workers:
127            r, stopper = workers.pop()
128            stopper()
129            r.join()
130        if tmp_path:
131            example_utils.rm_path(tmp_path)

分布式通知(简单)

注意

完整源代码位于 wbe_event_sender

  1import os
  2import string
  3import sys
  4import time
  5
  6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  7                                       os.pardir,
  8                                       os.pardir))
  9sys.path.insert(0, top_dir)
 10
 11from taskflow import engines
 12from taskflow.engines.worker_based import worker
 13from taskflow.patterns import linear_flow as lf
 14from taskflow import task
 15from taskflow.types import notifier
 16from taskflow.utils import threading_utils
 17
 18ANY = notifier.Notifier.ANY
 19
 20# INTRO: These examples show how to use a remote worker's event notification
 21# attribute to proxy back task event notifications to the controlling process.
 22#
 23# In this case a simple set of events is triggered by a worker running a
 24# task (simulated to be remote by using a kombu memory transport and threads).
 25# Those events that the 'remote worker' produces will then be proxied back to
 26# the task that the engine is running 'remotely', and then they will be emitted
 27# back to the original callbacks that exist in the originating engine
 28# process/thread. This creates a one-way *notification* channel that can
 29# transparently be used in-process, outside-of-process using remote workers and
 30# so-on that allows tasks to signal to its controlling process some sort of
 31# action that has occurred that the task may need to tell others about (for
 32# example to trigger some type of response when the task reaches 50% done...).
 33
 34
 35def event_receiver(event_type, details):
 36    """This is the callback that (in this example) doesn't do much..."""
 37    print("Recieved event '%s'" % event_type)
 38    print("Details = %s" % details)
 39
 40
 41class EventReporter(task.Task):
 42    """This is the task that will be running 'remotely' (not really remote)."""
 43
 44    EVENTS = tuple(string.ascii_uppercase)
 45    EVENT_DELAY = 0.1
 46
 47    def execute(self):
 48        for i, e in enumerate(self.EVENTS):
 49            details = {
 50                'leftover': self.EVENTS[i:],
 51            }
 52            self.notifier.notify(e, details)
 53            time.sleep(self.EVENT_DELAY)
 54
 55
 56BASE_SHARED_CONF = {
 57    'exchange': 'taskflow',
 58    'transport': 'memory',
 59    'transport_options': {
 60        'polling_interval': 0.1,
 61    },
 62}
 63
 64# Until https://github.com/celery/kombu/issues/398 is resolved it is not
 65# recommended to run many worker threads in this example due to the types
 66# of errors mentioned in that issue.
 67MEMORY_WORKERS = 1
 68WORKER_CONF = {
 69    'tasks': [
 70        # Used to locate which tasks we can run (we don't want to allow
 71        # arbitrary code/tasks to be ran by any worker since that would
 72        # open up a variety of vulnerabilities).
 73        '%s:EventReporter' % (__name__),
 74    ],
 75}
 76
 77
 78def run(engine_options):
 79    reporter = EventReporter()
 80    reporter.notifier.register(ANY, event_receiver)
 81    flow = lf.Flow('event-reporter').add(reporter)
 82    eng = engines.load(flow, engine='worker-based', **engine_options)
 83    eng.run()
 84
 85
 86if __name__ == "__main__":
 87    logging.basicConfig(level=logging.ERROR)
 88
 89    # Setup our transport configuration and merge it into the worker and
 90    # engine configuration so that both of those objects use it correctly.
 91    worker_conf = dict(WORKER_CONF)
 92    worker_conf.update(BASE_SHARED_CONF)
 93    engine_options = dict(BASE_SHARED_CONF)
 94    workers = []
 95
 96    # These topics will be used to request worker information on; those
 97    # workers will respond with their capabilities which the executing engine
 98    # will use to match pending tasks to a matched worker, this will cause
 99    # the task to be sent for execution, and the engine will wait until it
100    # is finished (a response is received) and then the engine will either
101    # continue with other tasks, do some retry/failure resolution logic or
102    # stop (and potentially re-raise the remote workers failure)...
103    worker_topics = []
104
105    try:
106        # Create a set of worker threads to simulate actual remote workers...
107        print('Running %s workers.' % (MEMORY_WORKERS))
108        for i in range(0, MEMORY_WORKERS):
109            # Give each one its own unique topic name so that they can
110            # correctly communicate with the engine (they will all share the
111            # same exchange).
112            worker_conf['topic'] = 'worker-%s' % (i + 1)
113            worker_topics.append(worker_conf['topic'])
114            w = worker.Worker(**worker_conf)
115            runner = threading_utils.daemon_thread(w.run)
116            runner.start()
117            w.wait()
118            workers.append((runner, w.stop))
119
120        # Now use those workers to do something.
121        print('Executing some work.')
122        engine_options['topics'] = worker_topics
123        result = run(engine_options)
124        print('Execution finished.')
125    finally:
126        # And cleanup.
127        print('Stopping workers.')
128        while workers:
129            r, stopper = workers.pop()
130            stopper()
131            r.join()

分布式 Mandelbrot(复杂)

注意

完整源代码位于 wbe_mandelbrot

输出

Generated mandelbrot fractal

代码

  1import math
  2import os
  3import sys
  4
  5top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  6                                       os.pardir,
  7                                       os.pardir))
  8sys.path.insert(0, top_dir)
  9
 10from taskflow import engines
 11from taskflow.engines.worker_based import worker
 12from taskflow.patterns import unordered_flow as uf
 13from taskflow import task
 14from taskflow.utils import threading_utils
 15
 16# INTRO: This example walks through a workflow that will in parallel compute
 17# a mandelbrot result set (using X 'remote' workers) and then combine their
 18# results together to form a final mandelbrot fractal image. It shows a usage
 19# of taskflow to perform a well-known embarrassingly parallel problem that has
 20# the added benefit of also being an elegant visualization.
 21#
 22# NOTE(harlowja): this example simulates the expected larger number of workers
 23# by using a set of threads (which in this example simulate the remote workers
 24# that would typically be running on other external machines).
 25#
 26# NOTE(harlowja): to have it produce an image run (after installing pillow):
 27#
 28# $ python taskflow/examples/wbe_mandelbrot.py output.png
 29
 30BASE_SHARED_CONF = {
 31    'exchange': 'taskflow',
 32}
 33WORKERS = 2
 34WORKER_CONF = {
 35    # These are the tasks the worker can execute, they *must* be importable,
 36    # typically this list is used to restrict what workers may execute to
 37    # a smaller set of *allowed* tasks that are known to be safe (one would
 38    # not want to allow all python code to be executed).
 39    'tasks': [
 40        '%s:MandelCalculator' % (__name__),
 41    ],
 42}
 43ENGINE_CONF = {
 44    'engine': 'worker-based',
 45}
 46
 47# Mandelbrot & image settings...
 48IMAGE_SIZE = (512, 512)
 49CHUNK_COUNT = 8
 50MAX_ITERATIONS = 25
 51
 52
 53class MandelCalculator(task.Task):
 54    def execute(self, image_config, mandelbrot_config, chunk):
 55        """Returns the number of iterations before the computation "escapes".
 56
 57        Given the real and imaginary parts of a complex number, determine if it
 58        is a candidate for membership in the mandelbrot set given a fixed
 59        number of iterations.
 60        """
 61
 62        # Parts borrowed from (credit to mark harris and benoît mandelbrot).
 63        #
 64        # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
 65        def mandelbrot(x, y, max_iters):
 66            c = complex(x, y)
 67            z = 0.0j
 68            for i in range(max_iters):
 69                z = z * z + c
 70                if (z.real * z.real + z.imag * z.imag) >= 4:
 71                    return i
 72            return max_iters
 73
 74        min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
 75        height, width = image_config['size']
 76        pixel_size_x = (max_x - min_x) / width
 77        pixel_size_y = (max_y - min_y) / height
 78        block = []
 79        for y in range(chunk[0], chunk[1]):
 80            row = []
 81            imag = min_y + y * pixel_size_y
 82            for x in range(0, width):
 83                real = min_x + x * pixel_size_x
 84                row.append(mandelbrot(real, imag, max_iters))
 85            block.append(row)
 86        return block
 87
 88
 89def calculate(engine_conf):
 90    # Subdivide the work into X pieces, then request each worker to calculate
 91    # one of those chunks and then later we will write these chunks out to
 92    # an image bitmap file.
 93
 94    # And unordered flow is used here since the mandelbrot calculation is an
 95    # example of an embarrassingly parallel computation that we can scatter
 96    # across as many workers as possible.
 97    flow = uf.Flow("mandelbrot")
 98
 99    # These symbols will be automatically given to tasks as input to their
100    # execute method, in this case these are constants used in the mandelbrot
101    # calculation.
102    store = {
103        'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
104        'image_config': {
105            'size': IMAGE_SIZE,
106        }
107    }
108
109    # We need the task names to be in the right order so that we can extract
110    # the final results in the right order (we don't care about the order when
111    # executing).
112    task_names = []
113
114    # Compose our workflow.
115    height, _width = IMAGE_SIZE
116    chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
117    for i in range(0, CHUNK_COUNT):
118        chunk_name = 'chunk_%s' % i
119        task_name = "calculation_%s" % i
120        # Break the calculation up into chunk size pieces.
121        rows = [i * chunk_size, i * chunk_size + chunk_size]
122        flow.add(
123            MandelCalculator(task_name,
124                             # This ensures the storage symbol with name
125                             # 'chunk_name' is sent into the tasks local
126                             # symbol 'chunk'. This is how we give each
127                             # calculator its own correct sequence of rows
128                             # to work on.
129                             rebind={'chunk': chunk_name}))
130        store[chunk_name] = rows
131        task_names.append(task_name)
132
133    # Now execute it.
134    eng = engines.load(flow, store=store, engine_conf=engine_conf)
135    eng.run()
136
137    # Gather all the results and order them for further processing.
138    gather = []
139    for name in task_names:
140        gather.extend(eng.storage.get(name))
141    points = []
142    for y, row in enumerate(gather):
143        for x, color in enumerate(row):
144            points.append(((x, y), color))
145    return points
146
147
148def write_image(results, output_filename=None):
149    print("Gathered %s results that represents a mandelbrot"
150          " image (using %s chunks that are computed jointly"
151          " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
152    if not output_filename:
153        return
154
155    # Pillow (the PIL fork) saves us from writing our own image writer...
156    try:
157        from PIL import Image
158    except ImportError as e:
159        # To currently get this (may change in the future),
160        # $ pip install Pillow
161        raise RuntimeError("Pillow is required to write image files: %s" % e)
162
163    # Limit to 255, find the max and normalize to that...
164    color_max = 0
165    for _point, color in results:
166        color_max = max(color, color_max)
167
168    # Use gray scale since we don't really have other colors.
169    img = Image.new('L', IMAGE_SIZE, "black")
170    pixels = img.load()
171    for (x, y), color in results:
172        if color_max == 0:
173            color = 0
174        else:
175            color = int((float(color) / color_max) * 255.0)
176        pixels[x, y] = color
177    img.save(output_filename)
178
179
180def create_fractal():
181    logging.basicConfig(level=logging.ERROR)
182
183    # Setup our transport configuration and merge it into the worker and
184    # engine configuration so that both of those use it correctly.
185    shared_conf = dict(BASE_SHARED_CONF)
186    shared_conf.update({
187        'transport': 'memory',
188        'transport_options': {
189            'polling_interval': 0.1,
190        },
191    })
192
193    if len(sys.argv) >= 2:
194        output_filename = sys.argv[1]
195    else:
196        output_filename = None
197
198    worker_conf = dict(WORKER_CONF)
199    worker_conf.update(shared_conf)
200    engine_conf = dict(ENGINE_CONF)
201    engine_conf.update(shared_conf)
202    workers = []
203    worker_topics = []
204
205    print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
206    try:
207        # Create a set of workers to simulate actual remote workers.
208        print('Running %s workers.' % (WORKERS))
209        for i in range(0, WORKERS):
210            worker_conf['topic'] = 'calculator_%s' % (i + 1)
211            worker_topics.append(worker_conf['topic'])
212            w = worker.Worker(**worker_conf)
213            runner = threading_utils.daemon_thread(w.run)
214            runner.start()
215            w.wait()
216            workers.append((runner, w.stop))
217
218        # Now use those workers to do something.
219        engine_conf['topics'] = worker_topics
220        results = calculate(engine_conf)
221        print('Execution finished.')
222    finally:
223        # And cleanup.
224        print('Stopping workers.')
225        while workers:
226            r, stopper = workers.pop()
227            stopper()
228            r.join()
229    print("Writing image...")
230    write_image(results, output_filename=output_filename)
231
232
233if __name__ == "__main__":
234    create_fractal()

作业板生产者/消费者(简单)

注意

完整源代码位于 jobboard_produce_consume_colors

  1import contextlib
  2import logging
  3import os
  4import random
  5import sys
  6import threading
  7import time
  8
  9logging.basicConfig(level=logging.ERROR)
 10
 11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 12                                       os.pardir,
 13                                       os.pardir))
 14sys.path.insert(0, top_dir)
 15
 16from zake import fake_client
 17
 18from taskflow import exceptions as excp
 19from taskflow.jobs import backends
 20from taskflow.utils import threading_utils
 21
 22# In this example we show how a jobboard can be used to post work for other
 23# entities to work on. This example creates a set of jobs using one producer
 24# thread (typically this would be split across many machines) and then having
 25# other worker threads with their own jobboards select work using a given
 26# filters [red/blue] and then perform that work (and consuming or abandoning
 27# the job after it has been completed or failed).
 28
 29# Things to note:
 30# - No persistence layer is used (or logbook), just the job details are used
 31#   to determine if a job should be selected by a worker or not.
 32# - This example runs in a single process (this is expected to be atypical
 33#   but this example shows that it can be done if needed, for testing...)
 34# - The iterjobs(), claim(), consume()/abandon() worker workflow.
 35# - The post() producer workflow.
 36
 37SHARED_CONF = {
 38    'path': "/taskflow/jobs",
 39    'board': 'zookeeper',
 40}
 41
 42# How many workers and producers of work will be created (as threads).
 43PRODUCERS = 3
 44WORKERS = 5
 45
 46# How many units of work each producer will create.
 47PRODUCER_UNITS = 10
 48
 49# How many units of work are expected to be produced (used so workers can
 50# know when to stop running and shutdown, typically this would not be a
 51# a value but we have to limit this example's execution time to be less than
 52# infinity).
 53EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS
 54
 55# Delay between producing/consuming more work.
 56WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5)
 57
 58# To ensure threads don't trample other threads output.
 59STDOUT_LOCK = threading.Lock()
 60
 61
 62def dispatch_work(job):
 63    # This is where the jobs contained work *would* be done
 64    time.sleep(1.0)
 65
 66
 67def safe_print(name, message, prefix=""):
 68    with STDOUT_LOCK:
 69        if prefix:
 70            print("{} {}: {}".format(prefix, name, message))
 71        else:
 72            print("{}: {}".format(name, message))
 73
 74
 75def worker(ident, client, consumed):
 76    # Create a personal board (using the same client so that it works in
 77    # the same process) and start looking for jobs on the board that we want
 78    # to perform.
 79    name = "W-%s" % (ident)
 80    safe_print(name, "started")
 81    claimed_jobs = 0
 82    consumed_jobs = 0
 83    abandoned_jobs = 0
 84    with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
 85        while len(consumed) != EXPECTED_UNITS:
 86            favorite_color = random.choice(['blue', 'red'])
 87            for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True):
 88                # See if we should even bother with it...
 89                if job.details.get('color') != favorite_color:
 90                    continue
 91                safe_print(name, "'%s' [attempting claim]" % (job))
 92                try:
 93                    board.claim(job, name)
 94                    claimed_jobs += 1
 95                    safe_print(name, "'%s' [claimed]" % (job))
 96                except (excp.NotFound, excp.UnclaimableJob):
 97                    safe_print(name, "'%s' [claim unsuccessful]" % (job))
 98                else:
 99                    try:
100                        dispatch_work(job)
101                        board.consume(job, name)
102                        safe_print(name, "'%s' [consumed]" % (job))
103                        consumed_jobs += 1
104                        consumed.append(job)
105                    except Exception:
106                        board.abandon(job, name)
107                        abandoned_jobs += 1
108                        safe_print(name, "'%s' [abandoned]" % (job))
109            time.sleep(WORKER_DELAY)
110    safe_print(name,
111               "finished (claimed %s jobs, consumed %s jobs,"
112               " abandoned %s jobs)" % (claimed_jobs, consumed_jobs,
113                                        abandoned_jobs), prefix=">>>")
114
115
116def producer(ident, client):
117    # Create a personal board (using the same client so that it works in
118    # the same process) and start posting jobs on the board that we want
119    # some entity to perform.
120    name = "P-%s" % (ident)
121    safe_print(name, "started")
122    with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
123        for i in range(0, PRODUCER_UNITS):
124            job_name = "{}-{}".format(name, i)
125            details = {
126                'color': random.choice(['red', 'blue']),
127            }
128            job = board.post(job_name, book=None, details=details)
129            safe_print(name, "'%s' [posted]" % (job))
130            time.sleep(PRODUCER_DELAY)
131    safe_print(name, "finished", prefix=">>>")
132
133
134def main():
135    # TODO(harlowja): Hack to make eventlet work right, remove when the
136    # following is fixed: https://github.com/eventlet/eventlet/issues/230
137    from taskflow.utils import eventlet_utils as _eu  # noqa
138    try:
139        import eventlet as _eventlet  # noqa
140    except ImportError:
141        pass
142
143    with contextlib.closing(fake_client.FakeClient()) as c:
144        created = []
145        for i in range(0, PRODUCERS):
146            p = threading_utils.daemon_thread(producer, i + 1, c)
147            created.append(p)
148            p.start()
149        consumed = collections.deque()
150        for i in range(0, WORKERS):
151            w = threading_utils.daemon_thread(worker, i + 1, c, consumed)
152            created.append(w)
153            w.start()
154        while created:
155            t = created.pop()
156            t.join()
157        # At the end there should be nothing leftover, let's verify that.
158        board = backends.fetch('verifier', SHARED_CONF.copy(), client=c)
159        board.connect()
160        with contextlib.closing(board):
161            if board.job_count != 0 or len(consumed) != EXPECTED_UNITS:
162                return 1
163            return 0
164
165
166if __name__ == "__main__":
167    sys.exit(main())

Conductor 模拟 CI 管道

注意

完整源代码位于 tox_conductor

  1import itertools
  2import logging
  3import os
  4import shutil
  5import socket
  6import sys
  7import tempfile
  8import threading
  9import time
 10
 11logging.basicConfig(level=logging.ERROR)
 12
 13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 14                                       os.pardir,
 15                                       os.pardir))
 16sys.path.insert(0, top_dir)
 17
 18from oslo_utils import timeutils
 19from oslo_utils import uuidutils
 20from zake import fake_client
 21
 22from taskflow.conductors import backends as conductors
 23from taskflow import engines
 24from taskflow.jobs import backends as boards
 25from taskflow.patterns import linear_flow
 26from taskflow.persistence import backends as persistence
 27from taskflow.persistence import models
 28from taskflow import task
 29from taskflow.utils import threading_utils
 30
 31# INTRO: This examples shows how a worker/producer can post desired work (jobs)
 32# to a jobboard and a conductor can consume that work (jobs) from that jobboard
 33# and execute those jobs in a reliable & async manner (for example, if the
 34# conductor were to crash then the job will be released back onto the jobboard
 35# and another conductor can attempt to finish it, from wherever that job last
 36# left off).
 37#
 38# In this example a in-memory jobboard (and in-memory storage) is created and
 39# used that simulates how this would be done at a larger scale (it is an
 40# example after all).
 41
 42# Restrict how long this example runs for...
 43RUN_TIME = 5
 44REVIEW_CREATION_DELAY = 0.5
 45SCAN_DELAY = 0.1
 46NAME = "{}_{}".format(socket.getfqdn(), os.getpid())
 47
 48# This won't really use zookeeper but will use a local version of it using
 49# the zake library that mimics an actual zookeeper cluster using threads and
 50# an in-memory data structure.
 51JOBBOARD_CONF = {
 52    'board': 'zookeeper://?path=/taskflow/tox/jobs',
 53}
 54
 55
 56class RunReview(task.Task):
 57    # A dummy task that clones the review and runs tox...
 58
 59    def _clone_review(self, review, temp_dir):
 60        print("Cloning review '{}' into {}".format(review['id'], temp_dir))
 61
 62    def _run_tox(self, temp_dir):
 63        print("Running tox in %s" % temp_dir)
 64
 65    def execute(self, review, temp_dir):
 66        self._clone_review(review, temp_dir)
 67        self._run_tox(temp_dir)
 68
 69
 70class MakeTempDir(task.Task):
 71    # A task that creates and destroys a temporary dir (on failure).
 72    #
 73    # It provides the location of the temporary dir for other tasks to use
 74    # as they see fit.
 75
 76    default_provides = 'temp_dir'
 77
 78    def execute(self):
 79        return tempfile.mkdtemp()
 80
 81    def revert(self, *args, **kwargs):
 82        temp_dir = kwargs.get(task.REVERT_RESULT)
 83        if temp_dir:
 84            shutil.rmtree(temp_dir)
 85
 86
 87class CleanResources(task.Task):
 88    # A task that cleans up any workflow resources.
 89
 90    def execute(self, temp_dir):
 91        print("Removing %s" % temp_dir)
 92        shutil.rmtree(temp_dir)
 93
 94
 95def review_iter():
 96    """Makes reviews (never-ending iterator/generator)."""
 97    review_id_gen = itertools.count(0)
 98    while True:
 99        review_id = next(review_id_gen)
100        review = {
101            'id': review_id,
102        }
103        yield review
104
105
106# The reason this is at the module namespace level is important, since it must
107# be accessible from a conductor dispatching an engine, if it was a lambda
108# function for example, it would not be reimportable and the conductor would
109# be unable to reference it when creating the workflow to run.
110def create_review_workflow():
111    """Factory method used to create a review workflow to run."""
112    f = linear_flow.Flow("tester")
113    f.add(
114        MakeTempDir(name="maker"),
115        RunReview(name="runner"),
116        CleanResources(name="cleaner")
117    )
118    return f
119
120
121def generate_reviewer(client, saver, name=NAME):
122    """Creates a review producer thread with the given name prefix."""
123    real_name = "%s_reviewer" % name
124    no_more = threading.Event()
125    jb = boards.fetch(real_name, JOBBOARD_CONF,
126                      client=client, persistence=saver)
127
128    def make_save_book(saver, review_id):
129        # Record what we want to happen (sometime in the future).
130        book = models.LogBook("book_%s" % review_id)
131        detail = models.FlowDetail("flow_%s" % review_id,
132                                   uuidutils.generate_uuid())
133        book.add(detail)
134        # Associate the factory method we want to be called (in the future)
135        # with the book, so that the conductor will be able to call into
136        # that factory to retrieve the workflow objects that represent the
137        # work.
138        #
139        # These args and kwargs *can* be used to save any specific parameters
140        # into the factory when it is being called to create the workflow
141        # objects (typically used to tell a factory how to create a unique
142        # workflow that represents this review).
143        factory_args = ()
144        factory_kwargs = {}
145        engines.save_factory_details(detail, create_review_workflow,
146                                     factory_args, factory_kwargs)
147        with contextlib.closing(saver.get_connection()) as conn:
148            conn.save_logbook(book)
149            return book
150
151    def run():
152        """Periodically publishes 'fake' reviews to analyze."""
153        jb.connect()
154        review_generator = review_iter()
155        with contextlib.closing(jb):
156            while not no_more.is_set():
157                review = next(review_generator)
158                details = {
159                    'store': {
160                        'review': review,
161                    },
162                }
163                job_name = "{}_{}".format(real_name, review['id'])
164                print("Posting review '%s'" % review['id'])
165                jb.post(job_name,
166                        book=make_save_book(saver, review['id']),
167                        details=details)
168                time.sleep(REVIEW_CREATION_DELAY)
169
170    # Return the unstarted thread, and a callback that can be used
171    # shutdown that thread (to avoid running forever).
172    return (threading_utils.daemon_thread(target=run), no_more.set)
173
174
175def generate_conductor(client, saver, name=NAME):
176    """Creates a conductor thread with the given name prefix."""
177    real_name = "%s_conductor" % name
178    jb = boards.fetch(name, JOBBOARD_CONF,
179                      client=client, persistence=saver)
180    conductor = conductors.fetch("blocking", real_name, jb,
181                                 engine='parallel', wait_timeout=SCAN_DELAY)
182
183    def run():
184        jb.connect()
185        with contextlib.closing(jb):
186            conductor.run()
187
188    # Return the unstarted thread, and a callback that can be used
189    # shutdown that thread (to avoid running forever).
190    return (threading_utils.daemon_thread(target=run), conductor.stop)
191
192
193def main():
194    # Need to share the same backend, so that data can be shared...
195    persistence_conf = {
196        'connection': 'memory',
197    }
198    saver = persistence.fetch(persistence_conf)
199    with contextlib.closing(saver.get_connection()) as conn:
200        # This ensures that the needed backend setup/data directories/schema
201        # upgrades and so on... exist before they are attempted to be used...
202        conn.upgrade()
203    fc1 = fake_client.FakeClient()
204    # Done like this to share the same client storage location so the correct
205    # zookeeper features work across clients...
206    fc2 = fake_client.FakeClient(storage=fc1.storage)
207    entities = [
208        generate_reviewer(fc1, saver),
209        generate_conductor(fc2, saver),
210    ]
211    for t, stopper in entities:
212        t.start()
213    try:
214        watch = timeutils.StopWatch(duration=RUN_TIME)
215        watch.start()
216        while not watch.expired():
217            time.sleep(0.1)
218    finally:
219        for t, stopper in reversed(entities):
220            stopper()
221            t.join()
222
223
224if __name__ == '__main__':
225    main()

Conductor 运行 99 瓶啤酒歌曲请求

注意

完整源代码位于 99_bottles

  1import functools
  2import logging
  3import os
  4import sys
  5import time
  6import traceback
  7
  8from kazoo import client
  9
 10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 11                                       os.pardir,
 12                                       os.pardir))
 13sys.path.insert(0, top_dir)
 14
 15from taskflow.conductors import backends as conductor_backends
 16from taskflow import engines
 17from taskflow.jobs import backends as job_backends
 18from taskflow import logging as taskflow_logging
 19from taskflow.patterns import linear_flow as lf
 20from taskflow.persistence import backends as persistence_backends
 21from taskflow.persistence import models
 22from taskflow import task
 23
 24from oslo_utils import timeutils
 25from oslo_utils import uuidutils
 26
 27# Instructions!
 28#
 29# 1. Install zookeeper (or change host listed below)
 30# 2. Download this example, place in file '99_bottles.py'
 31# 3. Run `python 99_bottles.py p` to place a song request onto the jobboard
 32# 4. Run `python 99_bottles.py c` a few times (in different shells)
 33# 5. On demand kill previously listed processes created in (4) and watch
 34#    the work resume on another process (and repeat)
 35# 6. Keep enough workers alive to eventually finish the song (if desired).
 36
 37ME = os.getpid()
 38ZK_HOST = "localhost:2181"
 39JB_CONF = {
 40    'hosts': ZK_HOST,
 41    'board': 'zookeeper',
 42    'path': '/taskflow/99-bottles-demo',
 43}
 44PERSISTENCE_URI = r"sqlite:////tmp/bottles.db"
 45TAKE_DOWN_DELAY = 1.0
 46PASS_AROUND_DELAY = 3.0
 47HOW_MANY_BOTTLES = 99
 48
 49
 50class TakeABottleDown(task.Task):
 51    def execute(self, bottles_left):
 52        sys.stdout.write('Take one down, ')
 53        sys.stdout.flush()
 54        time.sleep(TAKE_DOWN_DELAY)
 55        return bottles_left - 1
 56
 57
 58class PassItAround(task.Task):
 59    def execute(self):
 60        sys.stdout.write('pass it around, ')
 61        sys.stdout.flush()
 62        time.sleep(PASS_AROUND_DELAY)
 63
 64
 65class Conclusion(task.Task):
 66    def execute(self, bottles_left):
 67        sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left)
 68        sys.stdout.flush()
 69
 70
 71def make_bottles(count):
 72    # This is the function that will be called to generate the workflow
 73    # and will also be called to regenerate it on resumption so that work
 74    # can continue from where it last left off...
 75
 76    s = lf.Flow("bottle-song")
 77
 78    take_bottle = TakeABottleDown("take-bottle-%s" % count,
 79                                  inject={'bottles_left': count},
 80                                  provides='bottles_left')
 81    pass_it = PassItAround("pass-%s-around" % count)
 82    next_bottles = Conclusion("next-bottles-%s" % (count - 1))
 83    s.add(take_bottle, pass_it, next_bottles)
 84
 85    for bottle in reversed(list(range(1, count))):
 86        take_bottle = TakeABottleDown("take-bottle-%s" % bottle,
 87                                      provides='bottles_left')
 88        pass_it = PassItAround("pass-%s-around" % bottle)
 89        next_bottles = Conclusion("next-bottles-%s" % (bottle - 1))
 90        s.add(take_bottle, pass_it, next_bottles)
 91
 92    return s
 93
 94
 95def run_conductor(only_run_once=False):
 96    # This continuously runs consumers until its stopped via ctrl-c or other
 97    # kill signal...
 98    event_watches = {}
 99
100    # This will be triggered by the conductor doing various activities
101    # with engines, and is quite nice to be able to see the various timing
102    # segments (which is useful for debugging, or watching, or figuring out
103    # where to optimize).
104    def on_conductor_event(cond, event, details):
105        print("Event '%s' has been received..." % event)
106        print("Details = %s" % details)
107        if event.endswith("_start"):
108            w = timeutils.StopWatch()
109            w.start()
110            base_event = event[0:-len("_start")]
111            event_watches[base_event] = w
112        if event.endswith("_end"):
113            base_event = event[0:-len("_end")]
114            try:
115                w = event_watches.pop(base_event)
116                w.stop()
117                print("It took %0.3f seconds for event '%s' to finish"
118                      % (w.elapsed(), base_event))
119            except KeyError:
120                pass
121        if event == 'running_end' and only_run_once:
122            cond.stop()
123
124    print("Starting conductor with pid: %s" % ME)
125    my_name = "conductor-%s" % ME
126    persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
127    with contextlib.closing(persist_backend):
128        with contextlib.closing(persist_backend.get_connection()) as conn:
129            conn.upgrade()
130        job_backend = job_backends.fetch(my_name, JB_CONF,
131                                         persistence=persist_backend)
132        job_backend.connect()
133        with contextlib.closing(job_backend):
134            cond = conductor_backends.fetch('blocking', my_name, job_backend,
135                                            persistence=persist_backend)
136            on_conductor_event = functools.partial(on_conductor_event, cond)
137            cond.notifier.register(cond.notifier.ANY, on_conductor_event)
138            # Run forever, and kill -9 or ctrl-c me...
139            try:
140                cond.run()
141            finally:
142                cond.stop()
143                cond.wait()
144
145
146def run_poster():
147    # This just posts a single job and then ends...
148    print("Starting poster with pid: %s" % ME)
149    my_name = "poster-%s" % ME
150    persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
151    with contextlib.closing(persist_backend):
152        with contextlib.closing(persist_backend.get_connection()) as conn:
153            conn.upgrade()
154        job_backend = job_backends.fetch(my_name, JB_CONF,
155                                         persistence=persist_backend)
156        job_backend.connect()
157        with contextlib.closing(job_backend):
158            # Create information in the persistence backend about the
159            # unit of work we want to complete and the factory that
160            # can be called to create the tasks that the work unit needs
161            # to be done.
162            lb = models.LogBook("post-from-%s" % my_name)
163            fd = models.FlowDetail("song-from-%s" % my_name,
164                                   uuidutils.generate_uuid())
165            lb.add(fd)
166            with contextlib.closing(persist_backend.get_connection()) as conn:
167                conn.save_logbook(lb)
168            engines.save_factory_details(fd, make_bottles,
169                                         [HOW_MANY_BOTTLES], {},
170                                         backend=persist_backend)
171            # Post, and be done with it!
172            jb = job_backend.post("song-from-%s" % my_name, book=lb)
173            print("Posted: %s" % jb)
174            print("Goodbye...")
175
176
177def main_local():
178    # Run locally typically this is activating during unit testing when all
179    # the examples are made sure to still function correctly...
180    global TAKE_DOWN_DELAY
181    global PASS_AROUND_DELAY
182    global JB_CONF
183    # Make everything go much faster (so that this finishes quickly).
184    PASS_AROUND_DELAY = 0.01
185    TAKE_DOWN_DELAY = 0.01
186    JB_CONF['path'] = JB_CONF['path'] + "-" + uuidutils.generate_uuid()
187    run_poster()
188    run_conductor(only_run_once=True)
189
190
191def check_for_zookeeper(timeout=1):
192    sys.stderr.write("Testing for the existence of a zookeeper server...\n")
193    sys.stderr.write("Please wait....\n")
194    with contextlib.closing(client.KazooClient()) as test_client:
195        try:
196            test_client.start(timeout=timeout)
197        except test_client.handler.timeout_exception:
198            sys.stderr.write("Zookeeper is needed for running this example!\n")
199            traceback.print_exc()
200            return False
201        else:
202            test_client.stop()
203            return True
204
205
206def main():
207    if not check_for_zookeeper():
208        return
209    if len(sys.argv) == 1:
210        main_local()
211    elif sys.argv[1] in ('p', 'c'):
212        if sys.argv[-1] == "v":
213            logging.basicConfig(level=taskflow_logging.TRACE)
214        else:
215            logging.basicConfig(level=logging.ERROR)
216        if sys.argv[1] == 'p':
217            run_poster()
218        else:
219            run_conductor()
220    else:
221        sys.stderr.write("%s p|c (v?)\n" % os.path.basename(sys.argv[0]))
222
223
224if __name__ == '__main__':
225    main()