Протокол AMQP
AMQP (Advanced Message Queueing Protocol) — открытый протокол для передачи сообщений между компонентами системы с низкой задержкой, на высокой скорости, а так же с высоким уровнем надежности и безопасности. Обмен сообщениями осуществляется через AMQP-брокер.
Популярные брокеры, поддерживающие AMQP: RabbitMQ (Erlang), Qpid (C++, Java), ActiveMQ (Java), StormMQ (Java), JORAM (Java).
Ключевые понятия AMQP:
- Message — сообщение.
- Queue — очередь. Принимает и сохраняет сообщение, пока оно не будет затребовано клиентом.
- Exchange — точка обмена, сортировочный пункт. Принимает и распределяет сообщения в одну или несколько очередей. Поведение точки обмена зависит от типа:
- fanout — входящее сообщение передается во все связанные с точкой очереди.
- direct — входящее сообщение передается в очередь, имя которой указано в параметре routing key. Routing key (ключ маршрутизации) — указывается при отправке сообщения.
- topic — входящее сообщение передается в очередь или несколько очередей по маске, указанной в routing key.
В настоящее время AMQP использует в качестве транспортного протокола TCP. В будущем предполагается добавить поддержку работы с SCTP. Для AMQP официально выделен порт 5672 (TCP, UDP, SCTP) и порт 5671 для AMQPS (AMQP с шифрованием TLS/SSL).
Что такое RabbitMQ
RabbitMQ — это брокер сообщений. Поддерживает протоколы AMQP, HTTP, STOMP, MQTT и веб-сокеты, WebSockets, Web-Stomp.
Изначально RabbitMQ создавался для работы с AMQP, который считается «основным» протоколом RabbitMQ.
Надежность — одно из важнейших достоинств RabbitMQ. RabbitMQ можно настроить так, чтобы сообщения всегда доставлялись.
Доступ к портам RabbitMQ
Для взаимодействия с RabbitMQ необходимо знать связанные с ним номера портов, по которым будут ожидаться запросы. Однако, в некоторых случаях брандмауэры и другие инструменты безопасности могут блокировать привязку RabbitMQ к тому или иному порту. В этом случае RabbitMQ не запустится. Следует помнить об этом.
- 1883, 8883: клиенты MQTT и MQTTS (с шифрованием TLS/SSL), если включен плагин MQTT
- 4369: порт epmd — аналога DNS для Erlang-кластера. Epmd используется узлами RabbitMQ и инструментами CLI (CLI — интерфейс командной строки).
- 5672, 5671: используется AMQP и AMQPS (с шифрованием TLS/SSL) клиентами. 5672 — основной порт RabbitMQ. Доступа к нему достаточно для полноценной работы любых сервисов с очередью сообщений.
- 15672: для версии RabbitMQ 4.x клиенты HTTP API и RabbitMQadmin (если включен плагин управления). Иными словами, веб-интерфейс к RabbitMQ.
- 15674: клиенты STOMP-over-WebSockets, если включен плагин Web STOMP.
- 15675: клиенты MQTT-over-WebSockets, если включен плагин Web MQTT.
- 25672: используется для межсетевых интерфейсов и коммуникационных средств CLI. Выделяется из динамического диапазона, номер порта вычисляется как порт AMQP + 20000.
- 61613, 61614: клиенты STOMP и STOMPS (с шифрованием TLS/SSL), если включен плагин STOMP.
Терминология RabbitMQ
Основные понятия RabbitMQ:
- Producer — клиент, который создает сообщения.
- Consumer — клиент, который получает сообщения.
- Queue — неограниченная по размеру очередь, которая хранит сообщения.
- Exchange — компонент, который позволяет переправлять отправляемые в него сообщения на различные очереди.
Есть значительно более эффективные системы обмена сообщениями, чем RabbitMQ, например Apache Kafka, ZeroMQ, механизм PUB/SUB в Redis, NATS.
Запуск RabbitMQ с помощью Docker
RabbitMQ совместим не со всеми версиями Linux Debian, поэтому я использовала Docker. Для целей обучения и тестирования — этого вполне достаточно.
Запускаем RabbitMQ:
1 |
docker run --hostname localhost -p 8080:5672 -p 15672:15672 rabbitmq:3-management |
Я сопоставила порт 8080 на своей машине и порт RabbitMQ — 5672. Это означает, что мои скрипты будут обращаться по адресу localhost:8080 для того, чтоб получить или отправить сообщение в очередь. Вы можете назначить другие номера портов.
Вторая связка — 15672:15672 — для сопоставления порта RabbitMQ, используемого для обращения к веб-интерфейсу и порта 15672 на локальной машине. Для доступа к веб-интерфейсу в строке браузера потребуется указать адрес http://localhost:15672.
Если при запуске контейнера порты не указать — обращение к RabbitMQ окажется невозможным.
По умолчанию для доступа к RabbitMQ используются логин/пароль: guest/guest .

RabbitMQ web-интерфейс
RabbitMQ и python. Модуль pika
В Python есть несколько известных библиотек, которые позволяют работать с RabbitMQ и протоколом AMQP: py-amqplib, txAMQP, pika.
Я выбрала модуль pika.
Pika предоставляет набор объектов для работы с RabbitMQ:
- Connection adapter (соединение с RabbitMQ),
- Connection parameters (параметры соединения),
- Authentication Credentials (аутентификационные данные),
- Channel (связь с RabbitMQ через RPC-методы AMQP),
- Exceptions.
Connection adapter
Pika позволяет использовать несколько адаптеров для установления соединения с RabbitMQ:
- Tornado Connection Adapter.
- Twisted Connection Adapter.
- Select Connection Adapter — асинхронный адаптер соединения, который пытается использовать самый быстрый адаптер цикла событий для данной платформы.
- BlockingConnection — модуль адаптера блокирующего соединения реализует блокирующую семантику поверх основного драйвера AMQP. Пользовательские классы в модуле состоят из классов BlockingConnection и BlockingChannel. Создание BlockingChannel:
1234import pikaconnection = pika.BlockingConnection()channel = connection.channel()
Аутентификация
PlainCredentials и ExternalCredentials — классы учетных данных. Инкапсулируют аутентификационные данные для класса ConnectionParameters.
Класс PlainCredentials возвращает имя пользователя и пароль для подключения в требуемом формате.
Для аутентификации с помощью Pika создается объект PlainCredentials, с именем пользователя и паролем в качестве аргументов. Созданный объект передается в качестве значения аргумента credentials объекту ConnectionParameters. Если при создании объекта ConnectionParameters логин и пароль не указаны, будут использованы значения по умолчанию: логин «guest» с паролем «guest».
Если используется URLParameters, объект учетных данных будет создан автоматически.
Класс ExternalCredentials позволяет при подключении к RabbitMQ использовать внешнюю аутентификацию, как правило, с клиентским SSL-сертификатом.
Параметры подключения
Чтобы обеспечить гибкость в указании информации о коннекте к RabbitMQ, pika реализует два класса для инкапсуляции информации: ConnectionParameters и URLParameters.
- ConnectionParameters — классический объект для указания всех параметров подключения, необходимых для подключения к RabbitMQ. ConnectionParameters предоставляет атрибуты для настройки всех возможных параметров подключения.
1234567import pikacredentials = pika.PlainCredentials('guest', 'guest')parameters = pika.ConnectionParameters('rabbit-server1',5672,'/',credentials) - URLParameters — класс позволяет передавать URL-адрес AMQP при создании объекта и поддерживает хост, порт, виртуальный хост, ssl, имя пользователя и пароль в базовом URL-адресе, а другие параметры передаются через параметры запроса.
123import pikaparameters = pika.URLParameters('amqp://guest:guest@rabbit-server1:5672/%2F')
Connection
Класс Connection реализует базовое поведение, которое расширяют все адаптеры подключения.
Это основной класс, который реализует связь с RabbitMQ. Этот класс должен вызываться не напрямую, а через использование адаптера, такого как SelectConnection или BlockingConnection.
Channel
Класс Channel предоставляет оболочку для взаимодействия с RabbitMQ, реализуя методы для канала AMQP.
Канал является основным методом связи для взаимодействия с RabbitMQ. Рекомендуется создавать канал посредством вызова метода channel() для активного соединения.
Примеры использования pika
Извлечь из очереди сообщение
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import pika credentials = pika.PlainCredentials('guest', 'guest') connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 8080, '/', credentials)) channel = connection.channel() method_frame, header_frame, body = channel.basic_get('my_queue') if method_frame: print(method_frame, header_frame, body) channel.basic_ack(method_frame.delivery_tag) else: print('No messages') |
Поместить сообщение в очередь
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import pika credentials = pika.PlainCredentials('guest', 'guest') connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 8080, '/', credentials)) channel = connection.channel() # Если очередь с указанным именем не существует, queue_declare() создаст ее channel.queue_declare(queue='my_queue') # Имя очереди должно быть определено в параметре routing_key channel.basic_publish(exchange='', routing_key='my_queue', body="Hello World!") connection.close() |
Полезные ссылки по теме Pika, RabbitMQ и Docker
docker.com: rabbitmq
readthedocs.io: Introduction to Pika
rabbitmq.com: Networking and RabbitMQ