存储管道的写入阶段

介绍

管道是一组处理请求所需阶段。当新的请求到达 Zaqar 时,消息首先通过传输层管道,然后通过一个存储层管道,具体取决于每个特定请求的操作类型。例如,如果 Zaqar 收到一个与队列相关的操作请求,则存储层管道将是 queue pipeline。Zaqar 始终将实际的存储控制器作为最终的存储层管道阶段。

通过设置 zaqar.conf 文件中 [storage] 部分的选项,您可以向这些存储层管道添加额外的阶段

  • 声明管道

  • 消息管道,内置阶段可供使用

    • zaqar.notification.notifier - 在每个进入队列的消息上向队列订阅者发送通知,即启用通知功能。

  • 队列管道

  • 订阅管道

存储层管道的选项默认情况下为空,因为额外的阶段会影响 Zaqar 的性能。根据阶段的不同,选项值的列出顺序可能重要也可能不重要。

您可以将自己的外部阶段添加到存储层管道。

在编写阶段之前需要了解的事项

管道中的阶段必须实现它们需要挂钩的存储控制器方法。您可以在 zaqar/storage/base.py 中的抽象类中找到所有可用于挂钩的方法。例如,如果您正在寻找可用于队列存储层管道挂钩的所有方法,请参阅 zaqar/storage/base.py 中的 Queue 类。如您所见,Zaqar 的内置阶段 zaqar.notification.notifier 实现了 zaqar.storage.base.Message 抽象类的 post 方法。

如果阶段返回非 None 值,则可以立即停止管道;否则,处理将继续到下一个阶段,最后到达实际的存储控制器。

警告

在大多数情况下,存储管道返回的非 None 值无关紧要,但有时传输层会使用返回值,您必须小心。例如,在创建队列请求期间,如果存储驱动程序返回 True,则传输层会向客户端响应 201 http 响应代码,如果返回 False,则响应 204 http 响应代码。请参阅:zaqar.transport.wsgi.v2_0.queues.ItemResource#on_put

Zaqar 通过 Python 入口点机制查找带有源代码的阶段。包含 Zaqar 阶段的所有 Python 包都必须在安装期间通过 setup.pysetup.cfgzaqar.storage.stages 入口点组下注册其阶段。如果阶段已注册,并且用户在 zaqar.conf 的存储层管道选项之一中指定了阶段的入口点名称,则该阶段将被加载到特定的存储层管道。Zaqar 将阶段作为插件导入。请参阅 zaqar.storage.pipeline#_get_storage_pipeline

有关插件的更多信息,请参阅:Stevedore - 创建插件Stevedore - 加载插件

外部阶段示例(在 Zaqar 包外部编写)

这是一个包含可以处理 Zaqar 中与队列相关的请求的阶段的小型包。该阶段没有任何实际用途,但作为示例很好。

包的文件树结构

.
├── setup.py
└── ubershystages
    ├── __init__.py
    └── queues
        ├── __init__.py
        └── lovely.py

2 directories, 4 files

setup.py:

from setuptools import setup, find_packages

setup(
    name='ubershystages',
    version='1.0',

    description='Demonstration package for Zaqar with plugin pipeline stage',

    author='Ubershy',
    author_email='ubershy@gmail.com',

    url='',

    classifiers=['Development Status :: 3 - Alpha',
                 'License :: OSI Approved :: Apache Software License',
                 'Programming Language :: Python',
                 'Programming Language :: Python :: 2',
                 'Programming Language :: Python :: 2.7',
                 'Programming Language :: Python :: 3',
                 'Programming Language :: Python :: 3.5',
                 'Intended Audience :: Developers',
                 'Environment :: Console',
                 ],

    platforms=['Any'],

    scripts=[],

    packages=find_packages(),
    include_package_data=True,

    entry_points={
        'zaqar.storage.stages': [
            'ubershy.lovelyplugin = ubershystages.queues.lovely:LovelyStage',
        ],
    },

    zip_safe=False,
)

lovely.py:

class LovelyStage(object):
    """This stage:
    1. Prints 'Lovely stage is processing request...' on each queue creation or
       deletion request.
    2. Prints 'Oh, what a lovely day!' on each creation request of a queue
       named 'lovely'.
    3. Prevents deletion of a queue named 'lovely' and prints 'Secretly keeping
       lovely queue' on such attempt.
    """

    def __init__(self, *args, **kwargs):
        print("Lovely stage is loaded!")

    def create(self, name, metadata=None, project=None):
        """Stage's method which processes queue creation request.

        :param name: The queue name
        :param project: Project id
        """

        self.printprocessing()
        if name == 'lovely':
            print("Oh, what a lovely day!")

    def delete(self, name, project=None):
        """Stage's method which processes queue deletion request.

        :param name: The queue name
        :param project: Project id
        :returns: Something non-None, if the queue has a name 'lovely'. It will
        stop further processing through the other stages of the pipeline, and
        the request will not reach the storage controller driver, preventing
        queue deletion from the database.
        """

        self.printprocessing()
        if name == 'lovely':
            print('Secretly keeping lovely queue')
            something = "shhh... it's a bad practice"
            return something

    def printprocessing(self):
        print('Lovely stage is processing request...')

要将包安装到系统的包根目录中,请运行

# pip install -e .

zaqar.conf 中,将 ubershy.lovelyplugin 添加到 queue_pipeline 选项

[storage]
queue_pipeline = ubershy.lovelyplugin

启动 Zaqar

$ zaqar-server

如果阶段已成功加载到 Zaqar,您将在终端输出行中看到 Lovely stage is loaded! 行。然后,您可以尝试使用队列“lovely”执行队列创建和队列删除操作,并查看 Zaqar 的数据库中会发生什么。

注意

您可以在一个包中包含多个阶段,只需确保所有阶段都将注册为入口点即可。例如,在 setup.py 中,您可以注册额外的 ubershy.nastyplugin 阶段

entry_points={
    'zaqar.storage.stages': [
        'ubershy.lovelyplugin = ubershystages.queues.lovely:LovelyStage',
        'ubershy.nastyplugin = ubershystages.messages.nasty:NastyStage',
    ],
},