消息配置¶
本节概述了混合消息传递部署的概念,并描述了使用 OpenStack-Ansible (OSA) 进行有效部署的必要步骤,其中 RPC 和 Notify 通信是分离的,并与不同的消息服务器后端集成。
oslo.messaging 库¶
oslo.messaging 库是 OpenStack Oslo 项目的一部分,该项目提供服务间消息传递功能。该库支持两种通信模式(RPC 和 Notify),并提供了一种抽象,隐藏了消息总线操作的细节,从而使 OpenStack 服务无需关心这些细节。
通知¶
Notify 通信是 notifier 到 listener 的异步交换。传输的消息通常对应于信息更新或事件发生,这些信息由 OpenStack 服务发布。由于 Notify 通信是时间解耦的,因此 listener 无需在发送通知时存在。notifier 和 listener 之间的这种解耦要求为通知部署的消息后端提供消息持久性,例如代理队列或日志存储。值得注意的是,消息传输是单向的,从 notifier 到 listener,没有消息流返回到 notifier。
RPC¶
RPC 旨在作为客户端和服务器之间的同步交换,并且是时间相关的。传输的信息通常对应于服务命令调用的请求-响应模式。如果服务器在调用命令时不存在,则调用应失败。时间耦合要求部署的消息后端支持从调用者到服务器的请求的双向传输,以及服务器发回给调用者的相关响应。可以通过代理队列或直接消息后端服务器来满足此要求。
消息传输¶
oslo.messaging 库支持消息 传输插件 功能,以便可以分离 RPC 和 Notify 通信,并部署不同的消息后端服务器。
oslo.messaging 驱动程序为所选协议和后端服务器提供传输集成。下表总结了支持的 oslo.messaging 驱动程序及其支持的通信服务。
+----------------+-----------+-----------+-----+--------+-----------+
| Oslo.Messaging | Transport | Backend | RPC | Notify | Messaging |
| Driver | Protocol | Server | | | Type |
+================+===========+===========+=====+========+===========+
| rabbit | AMQP V0.9 | rabbitmq | yes | yes | queue |
+----------------+-----------+-----------+-----+--------+-----------+
| kafka | kafka | kafka | | yes | queue |
| (experimental) | binary | | | | (stream) |
+----------------+-----------+-----------+-----+--------+-----------+
RabbitMQ 服务器的标准部署¶
单个 RabbitMQ 服务器后端(例如服务器或集群)是 OpenStack-Ansible (OSA) 的默认部署。此代理消息后端通过其与 oslo.messaging rabbit 驱动程序的集成,为 RPC 和 Notification 通信提供队列服务。 oslo-messaging.yml 文件提供了默认配置,用于将 oslo.messaging RPC 和 Notify 服务关联到 RabbitMQ 服务器后端。
# Quorum Queues
oslomsg_rabbit_quorum_queues: "{{ rabbitmq_queue_replication }}"
# NOTE(noonedeadpunk): Disabled due to missing oslo.concurrency lock_path defenition
# for services
oslomsg_rabbit_queue_manager: False
# RPC
oslomsg_rpc_transport: 'rabbit'
oslomsg_rpc_port: "{{ rabbitmq_port }}"
oslomsg_rpc_servers: "{{ rabbitmq_servers }}"
oslomsg_rpc_use_ssl: "{{ rabbitmq_use_ssl }}"
oslomsg_rpc_host_group: "{{ rabbitmq_host_group }}"
oslomsg_rpc_policies: "{{ rabbitmq_policies }}"
# Notify
oslomsg_notify_transport: "{{ (groups[rabbitmq_host_group] | length > 0) | ternary('rabbit', 'none') }}"
oslomsg_notify_port: "{{ rabbitmq_port }}"
oslomsg_notify_servers: "{{ rabbitmq_servers }}"
oslomsg_notify_use_ssl: "{{ rabbitmq_use_ssl }}"
oslomsg_notify_host_group: "{{ rabbitmq_host_group }}"
oslomsg_notify_policies: "{{ rabbitmq_policies }}"
管理 RabbitMQ 流策略¶
在部署支持 quorum 和 stream 队列的 RabbitMQ 时,消息的保留行为会发生变化。Stream 队列在磁盘上维护一个仅追加日志,其中包含所有接收到的消息,直到保留策略指示应丢弃它们。默认情况下,此策略设置为每个 stream 的 x-max-age 为 1800 秒。但是,如 RabbitMQ 文档 中所述,只有在 stream 积累了足够的消息以填充一个 segment(默认大小为 500MB)后,此策略才会生效。
如果您希望减少磁盘使用量,可以通过 OpenStack-Ansible 应用额外的策略,如下所示
rabbitmq_policies:
- name: CQv2
pattern: '.*'
priority: 0
tags:
queue-version: 2
state: >-
{{
((oslomsg_rabbit_quorum_queues | default(True) or not rabbitmq_queue_replication) and rabbitmq_install_method | default('') != 'distro'
) | ternary('present', 'absent')
}}
# The following is an example of an additional policy which applies to fanout/stream queues only
# By default, each stream uses RabbitMQ's 500MB segment size, and no messages will be discarded
# until that size is reached. This may result in undesirable disk usage.
# If using this policy, it must be applied BEFORE any stream queues are created.
# See also https://bugs.launchpad.net/oslo.messaging/+bug/2089845 and https://rabbitmq.cn/docs/streams#retention
# - name: CQv2F
# pattern: '^.*_fanout'
# priority: 1
# tags:
# queue-version: 2
# stream-max-segment-size-bytes: 1000000
# state: >-
# {{
# ((oslomsg_rabbit_quorum_queues | default(True) or not rabbitmq_queue_replication) and rabbitmq_install_method | default('') != 'distro'
# ) | ternary('present', 'absent')
# }}
但是请注意,只有在创建任何 stream 队列之前,此策略才会生效。如果这些队列已经存在,则需要手动删除并由相关的 OpenStack 服务重新创建。
此问题正在 oslo.messaging bug 中跟踪。