RPC 客户端

class oslo_messaging.RPCClient(transport, target, timeout=None, version_cap=None, serializer=None, retry=None, call_monitor_timeout=None, transport_options=None, _manual_load=True)

用于调用远程 RPC 服务器上的方法的类。

RPCClient 类负责通过消息传递传输将方法调用发送到远程 RPC 服务器并接收返回值。

该类应始终使用 get_rpc_client 函数实例化,而不是直接构造该类。

支持两种 RPC 模式:RPC 调用和 RPC 广播。

当 RPC 方法不向调用者返回值时,使用 RPC 广播。当期望方法返回值时,使用 RPC 调用。有关更多信息,请参见 cast() 和 call() 方法。

所有后续调用和广播的默认目标在 RPCClient 构造函数中提供。客户端使用目标来控制如何将 RPC 请求传递到服务器。如果仅设置目标的 topic(和可选的 exchange),则任何侦听该 topic(和 exchange)的服务器都可以服务 RPC。如果多个服务器正在侦听该 topic/exchange,则使用尽力而为的轮询算法选择一个服务器。或者,客户端可以将 Target 的 server 属性设置为特定服务器的名称,以将 RPC 请求发送到特定的服务器。对于 RPC 广播,可以通过将 Target 的 fanout 属性设置为 True,将 RPC 请求广播到侦听 Target 的 topic/exchange 的所有服务器。

虽然默认目标是在构造时设置的,但可以使用 prepare() 方法为单个方法调用覆盖目标属性。

方法调用由请求上下文字典、方法名称和参数字典组成。

本类旨在通过将其包装在另一个类中来使用,该类在子类上提供方法以使用 call() 或 cast() 执行远程调用。

class TestClient(object):

    def __init__(self, transport):
        target = messaging.Target(topic='test', version='2.0')
        self._client = messaging.get_rpc_client(transport, target)

    def test(self, ctxt, arg):
        return self._client.call(ctxt, 'test', arg=arg)

使用 prepare() 方法覆盖默认目标的一些属性的示例

def test(self, ctxt, arg):
    cctxt = self._client.prepare(version='2.5')
    return cctxt.call(ctxt, 'test', arg=arg)

RPCClient 还有许多其他属性 - 例如,timeout 和 version_cap - 这些属性对于某些方法调用来说可能值得覆盖,因此它们也可以传递给 prepare()

def test(self, ctxt, arg):
    cctxt = self._client.prepare(timeout=10)
    return cctxt.call(ctxt, 'test', arg=arg)

但是,可以直接使用该类而无需将其包装在另一个类中。例如

transport = messaging.get_rpc_transport(cfg.CONF)
target = messaging.Target(topic='test', version='2.0')
client = messaging.get_rpc_client(transport, target)
client.call(ctxt, 'test', arg=arg)

但这可能仅在有限的情况下有用,因为包装类通常有助于使代码更加清晰。

如果与消息传递服务的连接在发出 RPC 请求时未处于活动状态,则客户端将阻塞等待连接完成。如果连接失败,客户端将尝试重新建立该连接。默认情况下,这将无限期地继续,直到连接完成。但是,可以使用 retry 参数使 RPC 请求在重试给定次数后以 MessageDeliveryFailure 失败。例如

client = messaging.get_rpc_client(transport, target, retry=None)
client.call(ctxt, 'sync')
try:
    client.prepare(retry=0).cast(ctxt, 'ping')
except messaging.MessageDeliveryFailure:
    LOG.error("Failed to send ping message")
call(ctxt, method, **kwargs)

调用一个方法并等待回复。

call() 方法用于调用返回值的 RPC 方法。由于仅允许单个返回值,因此无法调用到 fanout 目标。

call() 将阻塞调用线程,直到消息传递传输提供返回值、发生超时或发生不可恢复的错误为止。

call() 保证 RPC 请求最多执行一次,这确保了调用永远不会重复。但是,如果调用在返回值到达之前失败或超时,则无法保证是否调用了该方法。

由于 call() 阻塞直到 RPC 方法完成,因此来自同一线程的 call() 保证按顺序处理。

方法参数必须是原始类型或客户端序列化程序支持的类型(如果有)。同样,请求上下文必须是字典,除非客户端的序列化程序支持序列化另一种类型。

远程 RPC 端点方法引发的任何错误的语义都非常微妙。

首先,如果远程异常包含在 messaging.get_rpc_transport() 参数中列出的模块之一中,则该异常将由 call() 重新引发。但是,这种本地重新引发的远程异常与本地引发的相同异常类型可区分,因为重新引发的远程异常被修改,使得其类名以“_Remote”后缀结尾,因此您可以执行

if ex.__class__.__name__.endswith('_Remote'):
    # Some special case for locally re-raised remote exceptions

其次,如果远程异常不来自 allowed_remote_exmods 列表中列出的模块,则会引发 messaging.RemoteError 异常,其中包含远程异常的所有详细信息。

参数:
  • ctxt (dict) – 请求上下文字典

  • method (str) – 方法名称

  • kwargs (dict) – 方法参数字典

引发:

MessagingTimeout, RemoteError, MessageDeliveryFailure

can_send_version(version=<object object>)

检查版本是否与版本上限兼容。

cast(ctxt, method, **kwargs)

调用一个方法,无需阻塞等待返回值。

cast() 方法用于调用不返回值的方法。cast() RPC 请求可以通过将 fanout Target 属性设置为 True,广播到侦听给定 topic 的所有服务器。

cast() 操作是尽力而为的:cast() 将阻塞调用线程,直到消息传递传输接受 RPC 请求,但 cast() 不会验证服务器是否调用了该方法。cast() 保证该方法不会在目标上执行两次(例如,“最多执行一次”执行)。

连续 cast() 之间没有顺序保证,即使是发送到同一目标的 cast() 也是如此。因此,方法可能以与 cast() 顺序不同的顺序执行。

方法参数必须是原始类型或客户端序列化程序支持的类型(如果有)。

同样,请求上下文必须是字典,除非客户端的序列化程序支持序列化另一种类型。

参数:
  • ctxt (dict) – 请求上下文字典

  • method (str) – 方法名称

  • kwargs (dict) – 方法参数字典

引发:

如果消息传递传输无法接受请求,则会发生 MessageDeliveryFailure。

prepare(exchange=<object object>, topic=<object object>, namespace=<object object>, version=<object object>, server=<object object>, fanout=<object object>, timeout=<object object>, version_cap=<object object>, retry=<object object>, call_monitor_timeout=<object object>, transport_options=<object object>)

准备方法调用上下文。

使用此方法为单个方法调用覆盖客户端属性。例如

def test(self, ctxt, arg):
    cctxt = self.prepare(version='2.5')
    return cctxt.call(ctxt, 'test', arg=arg)
参数:
  • exchange (str) – 请参阅 Target.exchange

  • topic (str) – 请参阅 Target.topic

  • namespace (str) – 请参阅 Target.namespace

  • version (str) – 服务器必须支持的要求,请参阅 Target.version

  • server (str) – 发送到特定服务器,请参阅 Target.server

  • fanout (bool) – 发送到 topic 上的所有服务器,请参阅 Target.fanout

  • timeout (intfloat) – call() 的可选默认超时(以秒为单位)

  • version_cap (str) – 如果版本超过此上限,则引发 RPCVersionCapError

  • retry (int) – 可选的连接重试配置:None 或 -1 表示无限期重试。0 表示不尝试重试。N 表示最多尝试 N 次重试。

  • transport_options (dictionary) – 用于配置驱动程序的其他参数,例如在 RabbitMQ 中将参数作为“mandatory”标志发送

  • call_monitor_timeout (int) – 活跃调用心跳的超时时间(以秒为单位)。如果指定,则需要服务器以低于整体超时参数的间隔对长期运行的调用进行心跳。

exception oslo_messaging.RemoteError(exc_type=None, value=None, traceback=None)

表示远程端点方法引发了异常。

包含原始异常的类型字符串表示形式、原始异常的值和跟踪信息。这些作为连接的字符串发送给父级,因此打印异常包含所有相关信息。