RPC 服务器

RPC 服务器暴露多个端点,每个端点包含一组可以通过给定传输由客户端远程调用的方法。

要创建 RPC 服务器,您需要提供传输、目标和端点列表。

可以通过简单地调用 get_rpc_transport() 方法来获取传输

transport = messaging.get_rpc_transport(conf)

该方法将根据用户的消息配置加载适当的传输驱动程序。有关更多详细信息,请参阅 get_rpc_transport()。

创建 RPC 服务器时提供的目标表示主题、服务器名称以及 - 可选地 - 要侦听的交换器。有关这些属性的更多详细信息,请参阅目标。

多个 RPC 服务器可以同时侦听相同的主题(和交换器)。有关在这种情况下如何将 RPC 请求分发到服务器的详细信息,请参阅 RPCClient。

每个端点对象可以具有目标属性,该属性可以设置命名空间和版本字段。默认情况下,我们使用“空命名空间”和版本 1.0。传入的方法调用将被分派到具有请求的方法、匹配的命名空间和兼容版本号的第一个端点。

方法调用的第一个参数始终由客户端提供的请求上下文。其余参数是客户端提供给方法的参数。端点方法可以返回值。如果是这样,RPC 服务器将通过传输将返回值发送回请求客户端。

executor 参数控制如何接收和分派传入的消息。请参阅 Executor 文档以获取有关执行器类型的描述。

注意: 如果使用“eventlet”执行器,则需要对 threading 和 time 库进行 monkeypatch。Eventlet 执行器已被弃用,并且 threading 执行器将是唯一可用的执行器。

RPC 响应操作是尽力而为的:服务器将在消息被消息传输接受后认为包含响应的消息已成功发送。服务器不保证 RPC 客户端处理响应。如果发送失败,将记录错误,并且服务器将继续处理传入的 RPC 请求。

方法调用的参数和从方法返回的值是 Python 原生类型。但是,消息中数据的实际编码可能不是原生形式(例如,消息负载可能是编码为 ASCII 字符串的字典,使用 JSON)。序列化器对象用于将传入的编码消息数据转换为原生类型。序列化器还用于将返回值从原生类型转换为适合消息负载的编码。

RPC 服务器具有 start()、stop() 和 wait() 方法,用于开始处理请求、停止处理请求以及在服务器停止后等待所有正在处理中的请求完成。

具有多个端点的简单 RPC 服务器的示例可能是

# NOTE(changzhi): We are using eventlet executor and
# time.sleep(1), therefore, the server code needs to be
# monkey-patched.

import eventlet
eventlet.monkey_patch()

from oslo_config import cfg
import oslo_messaging
import time

class ServerControlEndpoint(object):

    target = oslo_messaging.Target(namespace='control',
                                   version='2.0')

    def __init__(self, server):
        self.server = server

    def stop(self, ctx):
        if self.server:
            self.server.stop()

class TestEndpoint(object):

    def test(self, ctx, arg):
        return arg

transport = oslo_messaging.get_rpc_transport(cfg.CONF)
target = oslo_messaging.Target(topic='test', server='server1')
endpoints = [
    ServerControlEndpoint(None),
    TestEndpoint(),
]
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
                                       executor='eventlet')
try:
    server.start()
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Stopping server")

server.stop()
server.wait()
oslo_messaging.get_rpc_server(transport, target, endpoints, executor=None, serializer=None, access_policy=None, server_cls=<class 'oslo_messaging.rpc.server.RPCServer'>)

构造一个 RPC 服务器。

参数:
  • transport (Transport) – 消息传输

  • target (Target) – 要侦听的交换器、主题和服务器

  • endpoints (list) – 端点对象列表

  • executor (str) – (已弃用) 消息执行器的名称 - 可用值是“eventlet”和“threading”。Eventlet 执行器也已弃用。

  • serializer (Serializer) – 可选的实体序列化器

  • access_policy (RPCAccessPolicyBase) – 可选的访问策略。默认为 DefaultRPCAccessPolicy

  • server_cls (class) – 要实例化的服务器类

class oslo_messaging.RPCAccessPolicyBase

确定哪些端点方法可以通过 RPC 调用

class oslo_messaging.LegacyRPCAccessPolicy

旧版访问策略允许访问所有可调用端点方法,包括私有方法(以“_”为前缀的方法)

class oslo_messaging.DefaultRPCAccessPolicy

默认访问策略禁止调用私有方法(以“_”为前缀的方法)

注意

LegacyRPCAdapterPolicy 目前需要作为默认策略,因为我们有一些项目依赖于暴露私有方法。

class oslo_messaging.ExplicitRPCAccessPolicy

策略要求装饰的端点方法允许分派

class oslo_messaging.RPCDispatcher(endpoints, serializer, access_policy=None)

理解 RPC 消息的消息分派器。

通过传递知道如何处理消息的可调用分派器来构造 MessageHandlingServer,每次收到消息时都会调用该分派器。

RPCDispatcher 是这样一种分派器,它理解 RPC 消息的格式。该分派器查看消息中的命名空间、版本和方法值,并将其与可用端点列表进行匹配。

端点可以具有描述其暴露方法的命名空间和版本的目标属性。

RPCDispatcher 可以具有 access_policy 属性,该属性确定要分派哪些端点方法。默认 access_policy 分派端点对象上的所有公共方法。

class oslo_messaging.MessageHandlingServer(transport, dispatcher, executor=None)

用于处理消息的服务器。

将传输连接到知道如何使用执行器处理消息的分派器,该执行器知道应用程序希望如何创建新任务。

reset()

重置服务。

当以守护程序模式运行的服务收到 SIGHUP 时调用。

start(override_pool_size=None)

开始处理传入的消息。

此方法导致服务器开始轮询传输以获取传入的消息,并将它们传递给分派器。消息处理将继续,直到调用 stop() 方法为止。

执行器控制服务器如何与应用程序的 I/O 处理策略集成 - 它可能会选择在新进程、线程或协同调度协程中轮询消息,或者只是通过向事件循环注册回调来轮询消息。同样,执行器可以选择在新线程、协程或当前线程中分派消息。

stop()

停止处理传入的消息。

此方法返回后,将不再由服务器处理任何新的传入消息。但是,服务器可能仍在处理一些消息,并且与此服务器关联的基础驱动程序资源仍在被使用。有关更多详细信息,请参阅“wait”。

wait()

等待消息处理完成。

在调用 stop() 后,可能仍然存在一些尚未完全处理的现有消息。wait() 方法会阻塞,直到所有消息处理完成为止。

完成后,与此服务器关联的基础驱动程序资源将被释放(例如,关闭无用的网络连接)。

oslo_messaging.expected_exceptions(*exceptions)

用于 RPC 端点方法,这些方法会引发预期的异常的装饰器。

使用此装饰器标记端点方法允许声明 RPC 服务器不应视为致命的预期异常,并且不会像在实际错误场景中生成的那样记录它们。

请注意,这将导致列出的异常被包装在 ExpectedException 中,该异常由 RPC 服务器内部使用。RPC 客户端将看到原始异常类型。

oslo_messaging.expose(func)

用于 RPC 客户端的 RPC 端点方法的装饰器。

如果分派器的 access_policy 设置为 ExplicitRPCAccessPolicy,则端点方法需要显式暴露。

# foo() cannot be invoked by an RPC client
def foo(self):
    pass

# bar() can be invoked by an RPC client
@rpc.expose
def bar(self):
    pass
exception oslo_messaging.ExpectedException

封装由 RPC 端点引发的预期异常

仅仅实例化此异常会记录当前的异常信息,这些信息将传递回 RPC 客户端,而不会进行异常记录。