RPC 服务器¶
RPC 服务器暴露多个端点,每个端点包含一组可以通过给定传输由客户端远程调用的方法。
要创建 RPC 服务器,您需要提供传输、目标和端点列表。
可以通过简单地调用 get_rpc_transport() 方法来获取传输
transport = messaging.get_rpc_transport(conf)
该方法将根据用户的消息配置加载适当的传输驱动程序。有关更多详细信息,请参阅 get_rpc_transport()。
创建 RPC 服务器时提供的目标表示主题、服务器名称以及 - 可选地 - 要侦听的交换器。有关这些属性的更多详细信息,请参阅目标。
多个 RPC 服务器可以同时侦听相同的主题(和交换器)。有关在这种情况下如何将 RPC 请求分发到服务器的详细信息,请参阅 RPCClient。
每个端点对象可以具有目标属性,该属性可以设置命名空间和版本字段。默认情况下,我们使用“空命名空间”和版本 1.0。传入的方法调用将被分派到具有请求的方法、匹配的命名空间和兼容版本号的第一个端点。
方法调用的第一个参数始终由客户端提供的请求上下文。其余参数是客户端提供给方法的参数。端点方法可以返回值。如果是这样,RPC 服务器将通过传输将返回值发送回请求客户端。
executor 参数控制如何接收和分派传入的消息。请参阅 Executor 文档以获取有关执行器类型的描述。
注意: 如果使用“eventlet”执行器,则需要对 threading 和 time 库进行 monkeypatch。Eventlet 执行器已被弃用,并且 threading 执行器将是唯一可用的执行器。
RPC 响应操作是尽力而为的:服务器将在消息被消息传输接受后认为包含响应的消息已成功发送。服务器不保证 RPC 客户端处理响应。如果发送失败,将记录错误,并且服务器将继续处理传入的 RPC 请求。
方法调用的参数和从方法返回的值是 Python 原生类型。但是,消息中数据的实际编码可能不是原生形式(例如,消息负载可能是编码为 ASCII 字符串的字典,使用 JSON)。序列化器对象用于将传入的编码消息数据转换为原生类型。序列化器还用于将返回值从原生类型转换为适合消息负载的编码。
RPC 服务器具有 start()、stop() 和 wait() 方法,用于开始处理请求、停止处理请求以及在服务器停止后等待所有正在处理中的请求完成。
具有多个端点的简单 RPC 服务器的示例可能是
# NOTE(changzhi): We are using eventlet executor and
# time.sleep(1), therefore, the server code needs to be
# monkey-patched.
import eventlet
eventlet.monkey_patch()
from oslo_config import cfg
import oslo_messaging
import time
class ServerControlEndpoint(object):
target = oslo_messaging.Target(namespace='control',
version='2.0')
def __init__(self, server):
self.server = server
def stop(self, ctx):
if self.server:
self.server.stop()
class TestEndpoint(object):
def test(self, ctx, arg):
return arg
transport = oslo_messaging.get_rpc_transport(cfg.CONF)
target = oslo_messaging.Target(topic='test', server='server1')
endpoints = [
ServerControlEndpoint(None),
TestEndpoint(),
]
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
executor='eventlet')
try:
server.start()
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping server")
server.stop()
server.wait()
- oslo_messaging.get_rpc_server(transport, target, endpoints, executor=None, serializer=None, access_policy=None, server_cls=<class 'oslo_messaging.rpc.server.RPCServer'>)¶
构造一个 RPC 服务器。
- 参数:
transport (Transport) – 消息传输
target (Target) – 要侦听的交换器、主题和服务器
endpoints (list) – 端点对象列表
executor (str) – (已弃用) 消息执行器的名称 - 可用值是“eventlet”和“threading”。Eventlet 执行器也已弃用。
serializer (Serializer) – 可选的实体序列化器
access_policy (RPCAccessPolicyBase) – 可选的访问策略。默认为 DefaultRPCAccessPolicy
server_cls (class) – 要实例化的服务器类
- class oslo_messaging.RPCAccessPolicyBase¶
确定哪些端点方法可以通过 RPC 调用
- class oslo_messaging.LegacyRPCAccessPolicy¶
旧版访问策略允许访问所有可调用端点方法,包括私有方法(以“_”为前缀的方法)
- class oslo_messaging.DefaultRPCAccessPolicy¶
默认访问策略禁止调用私有方法(以“_”为前缀的方法)
注意
LegacyRPCAdapterPolicy 目前需要作为默认策略,因为我们有一些项目依赖于暴露私有方法。
- class oslo_messaging.ExplicitRPCAccessPolicy¶
策略要求装饰的端点方法允许分派
- class oslo_messaging.RPCDispatcher(endpoints, serializer, access_policy=None)¶
理解 RPC 消息的消息分派器。
通过传递知道如何处理消息的可调用分派器来构造 MessageHandlingServer,每次收到消息时都会调用该分派器。
RPCDispatcher 是这样一种分派器,它理解 RPC 消息的格式。该分派器查看消息中的命名空间、版本和方法值,并将其与可用端点列表进行匹配。
端点可以具有描述其暴露方法的命名空间和版本的目标属性。
RPCDispatcher 可以具有 access_policy 属性,该属性确定要分派哪些端点方法。默认 access_policy 分派端点对象上的所有公共方法。
- class oslo_messaging.MessageHandlingServer(transport, dispatcher, executor=None)¶
用于处理消息的服务器。
将传输连接到知道如何使用执行器处理消息的分派器,该执行器知道应用程序希望如何创建新任务。
- reset()¶
重置服务。
当以守护程序模式运行的服务收到 SIGHUP 时调用。
- start(override_pool_size=None)¶
开始处理传入的消息。
此方法导致服务器开始轮询传输以获取传入的消息,并将它们传递给分派器。消息处理将继续,直到调用 stop() 方法为止。
执行器控制服务器如何与应用程序的 I/O 处理策略集成 - 它可能会选择在新进程、线程或协同调度协程中轮询消息,或者只是通过向事件循环注册回调来轮询消息。同样,执行器可以选择在新线程、协程或当前线程中分派消息。
- stop()¶
停止处理传入的消息。
此方法返回后,将不再由服务器处理任何新的传入消息。但是,服务器可能仍在处理一些消息,并且与此服务器关联的基础驱动程序资源仍在被使用。有关更多详细信息,请参阅“wait”。
- wait()¶
等待消息处理完成。
在调用 stop() 后,可能仍然存在一些尚未完全处理的现有消息。wait() 方法会阻塞,直到所有消息处理完成为止。
完成后,与此服务器关联的基础驱动程序资源将被释放(例如,关闭无用的网络连接)。
- oslo_messaging.expected_exceptions(*exceptions)¶
用于 RPC 端点方法,这些方法会引发预期的异常的装饰器。
使用此装饰器标记端点方法允许声明 RPC 服务器不应视为致命的预期异常,并且不会像在实际错误场景中生成的那样记录它们。
请注意,这将导致列出的异常被包装在 ExpectedException 中,该异常由 RPC 服务器内部使用。RPC 客户端将看到原始异常类型。
- oslo_messaging.expose(func)¶
用于 RPC 客户端的 RPC 端点方法的装饰器。
如果分派器的 access_policy 设置为 ExplicitRPCAccessPolicy,则端点方法需要显式暴露。
# foo() cannot be invoked by an RPC client def foo(self): pass # bar() can be invoked by an RPC client @rpc.expose def bar(self): pass
- exception oslo_messaging.ExpectedException¶
封装由 RPC 端点引发的预期异常
仅仅实例化此异常会记录当前的异常信息,这些信息将传递回 RPC 客户端,而不会进行异常记录。