Notification Listener¶
通知监听器用于处理由使用 messaging 驱动程序的通知器发送的通知消息。
通知监听器订阅目标中的主题 - 以及可选的交换器。通知器客户端将通知消息发送到目标的 topic/exchange,监听器接收这些消息。
通知监听器公开多个端点,每个端点包含一组方法。每个方法名称对应于通知的优先级。收到通知时,它会被分派到名称与通知优先级相同的方法 - 例如,info 通知会被分派到 info() 方法,等等。
可选地,通知端点可以定义一个 NotificationFilter。不匹配过滤器规则的通知消息不会传递到端点的这些方法。
端点方法的参数是:客户端提供的请求上下文、通知消息的 publisher_id、event_type、payload 和 metadata。metadata 参数是一个映射,包含唯一的 message_id 和时间戳。
端点方法可以显式返回 oslo_messaging.NotificationResult.HANDLED 以确认消息,或者返回 oslo_messaging.NotificationResult.REQUEUE 以重新排队消息。请注意,并非所有传输驱动程序都支持重新排队。为了使用此功能,应用程序应该断言该功能可用,方法是将 allow_requeue=True 传递给 get_notification_listener()。如果驱动程序不支持重新排队,则此时会引发 NotImplementedError。
只有当所有端点都返回 oslo_messaging.NotificationResult.HANDLED 或 None 时,消息才会被确认。
注意: 如果多个监听器订阅同一个目标,则通知只会收到其中一个监听器。接收监听器将使用尽力而为的轮询算法从该组中选择。
通过为监听器指定池名称,可以稍微改变这种传递模式。具有相同池名称的监听器在订阅相同 topic/exchange 的监听器组中表现为子组。每个监听器子组将收到一份通知的副本,由子组中的一个成员消费。因此,将传递通知的多个副本 - 一个传递给没有池名称的监听器组(如果存在),以及一个传递给共享相同池名称的每个监听器子组。
重要提示: 这意味着 Notifier 始终会将通知发布到非池化的 Listener 以及池化的 Listener。因此,任何使用监听器池的应用程序必须至少有一个监听器从非池化队列中消费(即,一个或多个未设置pool 参数的监听器)。
请注意,并非所有传输驱动程序都实现了对监听器池的支持。那些不支持池的驱动程序,如果在 get_notification_listener() 中指定了池名称,将引发 NotImplementedError。
每个通知监听器都与一个执行器相关联,该执行器控制如何接收和分派传入的通知消息。请参阅执行器文档以获取有关其他类型执行器的说明。
注意: 如果使用“eventlet”执行器,则需要对 threading 和 time 库进行 monkeypatch。
通知监听器具有 start()、stop() 和 wait() 消息,用于开始处理请求、停止处理请求以及在监听器停止后等待所有正在处理中的请求完成。
要创建一个通知监听器,您需要提供一个传输、目标列表和一个端点列表。
可以通过简单地调用 get_notification_transport() 方法来获取传输
transport = messaging.get_notification_transport(conf)
它将根据用户的消息配置加载适当的传输驱动程序。有关更多详细信息,请参阅 get_notification_transport()。
具有多个端点的通知监听器的简单示例可能是
from oslo_config import cfg
import oslo_messaging
class NotificationEndpoint(object):
filter_rule = oslo_messaging.NotificationFilter(
publisher_id='^compute.*')
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
class ErrorEndpoint(object):
filter_rule = oslo_messaging.NotificationFilter(
event_type='^instance\..*\.start$',
context={'ctxt_key': 'regexp'})
def error(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
transport = oslo_messaging.get_notification_transport(cfg.CONF)
targets = [
oslo_messaging.Target(topic='notifications'),
oslo_messaging.Target(topic='notifications_bis')
]
endpoints = [
NotificationEndpoint(),
ErrorEndpoint(),
]
pool = "listener-workers"
server = oslo_messaging.get_notification_listener(transport, targets,
endpoints, pool=pool)
server.start()
server.wait()
通过提供序列化器对象,监听器可以从原始类型中反序列化请求上下文和参数。
- oslo_messaging.get_notification_listener(transport, targets, endpoints, executor=None, serializer=None, allow_requeue=False, pool=None)¶
构造一个通知监听器
executor 参数控制如何接收和分派传入的消息。
如果使用 eventlet 执行器,则需要对 threading 和 time 库进行 monkeypatch。
- 参数:
transport (Transport) – 消息传输
targets (目标列表Target) – 要监听的交换器和主题
endpoints (列表) – 端点对象列表
executor (字符串) – 消息执行器名称 - 可用值是 ‘eventlet’ 和 ‘threading’
serializer (Serializer) – 可选的实体序列化器
allow_requeue (布尔值) – 是否需要 NotificationResult.REQUEUE 支持
pool (字符串) – 池名称
- 引发:
NotImplementedError
- oslo_messaging.get_batch_notification_listener(transport, targets, endpoints, executor=None, serializer=None, allow_requeue=False, pool=None, batch_size=None, batch_timeout=None)¶
构造一个批量通知监听器
executor 参数控制如何接收和分派传入的消息。
如果使用 eventlet 执行器,则需要对 threading 和 time 库进行 monkeypatch。
- 参数:
transport (Transport) – 消息传输
targets (目标列表Target) – 要监听的交换器和主题
endpoints (列表) – 端点对象列表
executor (字符串) – 消息执行器名称 - 可用值是 ‘eventlet’ 和 ‘threading’
serializer (Serializer) – 可选的实体序列化器
allow_requeue (布尔值) – 是否需要 NotificationResult.REQUEUE 支持
pool (字符串) – 池名称
batch_size (整数) – 在调用端点回调之前等待的消息数量
batch_timeout (整数) – 在调用端点回调之前等待的秒数
- 引发:
NotImplementedError