AMQP (Advanced Message Queuing Protocol)

Материал из Национальной библиотеки им. Н. Э. Баумана
Последнее изменение этой страницы: 21:03, 29 ноября 2017.

AMQP (Advanced Message Queuing Protocol) — это стандартный протокол кадрирования (под кадрированием подразумевается предоставление структуры для потоков двоичных данных, которые передаются в любом направлении в рамках сетевого подключения) и передачи, который обеспечивает асинхронную безопасную и надежную передачу сообщений между двумя сторонами. Это основной протокол, применяемый для обмена сообщениями служебной шины Azure и концентраторов событий Azure. Обе службы также поддерживают протокол HTTPS. Защищаемый протокол SBMP, который также поддерживается, постепенно вытесняется в пользу протокола AMQP. Основная идея состоит в том, что отдельные подсистемы (или независимые приложения) могут обмениваться произвольным образом сообщениями через AMQP-брокер, который осуществляет маршрутизацию, возможно гарантирует доставку, распределение потоков данных, подписку на нужные типы сообщений. Разработчиком данного протокола является «JPMorgan Chase» (Джей Пи Морган Чейз).

Основные цели

Цель заключается в том, чтобы все разработчики, использующие любой имеющийся стек клиента AMQP на любой платформе, могли взаимодействовать со служебной шиной Azure через протокол AMQP

Распространенные стеки общего назначения AMQP, например Apache Proton или AMQP.NET Lite, уже реализуют все основные протоколы AMQP. Для этих базовых жестов иногда в качестве оболочки используется API более высокого уровня. В Apache Proton предусмотрены два API: императивный API Messenger и реактивный API Reactor.

Далее предполагается, что управление подключениями, сеансами и связями AMQP, а также обработка передачи кадров и управление потоком осуществляются с помощью соответствующего стека (например, Apache Proton-C) и для этого не требуется особое внимание со стороны разработчиков приложений. Кроме того, абстрактно предполагается, что доступны несколько примитивных возможностей API, например возможность подключения и создания абстрактных объектов отправителя (sender) и получателя (receiver), которые затем приобретают форму операций

send() 

и

receive() 

Расширенные возможности служебной шины Azure, например просмотр сообщений или управление сеансами, описываются в контексте AMQP, а также как многоуровневая псевдореализация на базе этой предполагаемой абстракции API [Источник 1].


Устройство AMQP

На самом нижнем уровне определяется формат кодирования данных в бинарный вид для передачи по TCP-соединению, выше лежит формат передачи RPC-запросов между сервером и клиентом. Cемантика работы с сообщениями, создания очередей и т.п. описывается в XML-спецификации, которая задает RPC-интерфейс сервера и клиента (примеры таких XML-файлов для версий 0-8 и 0-10). Данный XML является последней и конечной спецификацией протокола. Версии протокола 0-8 и 0-10 отличаются настолько сильно, что поддерживать их одновременно вряд ли возможно в одной программе. Иногда такие spec-файлы для разных брокеров AMQP, формально поддерживающих одну и ту же версию протокола, отличаются настолько, что не являются взаимозаменяемыми.

AMQP основан на трех понятиях:

  • Сообщение (message) — единица передаваемых данных, основная его часть (содержание) никак не интерпретируется сервером, к сообщению могут быть прицеплены структурированные заголовки.
  • Точка обмена (exchange) — в нее отправляются сообщения. Точка обмена распределяет сообщения в одну или несколько очередей. При этом в точке обмена сообщения не хранятся. Точки обмена бывают трех типов: fanout — сообщение передается во все прицепленные к ней очереди; direct — сообщение передается в очередь с именем, совпадающим с ключом маршрутизации (routing key) (ключ маршрутизации указывается при отправке сообщения); topic — нечто среднее между fanout и exchange, сообщение передается в очереди, для которых совпадает маска на ключ маршрутизации, например, app.notification.sms.* — в очередь будут доставлены все сообщения, отправленные с ключами, начинающимися на app.notification.sms.
  • Очередь (queue) — здесь хранятся сообщения до тех пор, пока не будет забраны клиентом. Клиент всегда забирает сообщения из одной или нескольких очередей.

Для эффективного испльзования необходимо правильно выбрать AMQP-брокер. При выборе необходимо рассматривать как собственно характеристики сервера: скорость работы, надежность, легкость установки и поддержки, но также внимательно смотреть на версию AMQP-протокола, которая поддерживается брокером, — она должна совпадать с версией клиентской AMQP-библиотеки.

Использование AMQP

AMQP вызывает контейнеры взаимодействующих программ. В них содержатся узлы, которые являются взаимодействующими объектами внутри этих контейнеров. Таким узлом может выступать очередь. Протокол AMQP предусматривает мультиплексирование, что позволяет использовать одно подключение для многих путей передачи данных между узлами.

Таким образом, сетевое подключение привязывается к контейнеру. Его инициирует контейнер в роли клиента. При этом устанавливается исходящее подключение TCP через сокет к контейнеру в роли получателя, который ожидает передачи данных и принимает входящие подключения TCP. Подтверждение подключения включает в себя согласование версии протокола, объявление или согласование использования безопасности транспортного уровня (TLS/SSL) и подтверждение проверки подлинности или авторизации на уровне подключения, основанном на SASL.

Для служебной шины Azure обязательно использование TLS. Он поддерживает подключения через TCP-порт 5671, когда перед вводом подтверждения протокола AMQP на подключение TCP сначала накладывается TLS, а также подключения через TCP-порт 5672, когда сервер незамедлительно предлагает обязательные обновления подключения к TLS с использованием модели, предписанной AMQP. Привязка AMQP WebSockets создает туннель через TCP-порт 443, что эквивалентно подключениям по протоколу AMQP 5671.

После настройки подключения и TLS в служебной шине применяется один из двух вариантов механизма SASL. SASL PLAIN, как правило, используется для передачи имени пользователя и пароля на сервер. В служебной шине нет учетных записей, но есть связанные с ключом именованные правила безопасности общего доступа, которые присваивают права. Имя правила используется в качестве имени пользователя, а ключ (текст в кодировке Base64) — в качестве пароля. Права, связанные с выбранным правилом, определяют операции, разрешенные для подключения. SASL ANONYMOUS используется для обхода авторизации SASL, когда клиент хочет использовать модель безопасности на основе утверждений (CBS), которая описана далее в этой статье. Используя этот вариант, подключение клиента можно установить анонимно на короткий период времени, в течение которого он может взаимодействовать только с конечной точкой CBS. При этом необходимо выполнить подтверждение CBS.

После установки транспортного подключения каждый контейнер объявляет максимальный размер кадра, который он готов обработать, и время ожидания простоя, по истечении которого он в одностороннем порядке отключится, если подключение будет неактивно.

В сеансах предусмотрена модель управления потоком на основе окна. После создания сеанса каждая сторона объявляет количество кадров, которое она готова принять в окне получения. Пока стороны обмениваются кадрами, передаваемые кадры заполняют окно. После заполнения окна передача прекращается и возобновляется только после сброса или развертывания окна с помощью перформатива потока (перформатив — это термин AMQP, обозначающий жесты на уровне протокола, передаваемые между двумя сторонами).

Подключения, каналы и сеансы являются временными. Если базовое подключение прерывается, подключения, туннель TLS, контекст авторизации SASL и сеансы необходимо установить заново.[Источник 2].

Операции запрос-ответ

Создание ссылки на узел управления для отправки запросов.

requestLink = session.attach(     
role: SENDER,   
   target: { address: "<entity address>/$management" },   
   source: { address: ""<my request link unique address>" }   
)  

Создание ссылки для получения ответов от узла управления.

responseLink = session.attach(    
role: RECEIVER,   
   source: { address: "<entity address>/$management" }   
   target: { address: "<my response link unique address>" }   
)  

Передача сообщения запроса.

requestLink.sendTransfer(  
       Message(  
               properties: {  
                       message-id: <request id>,  
                       reply-to: "<my response link unique address>"  
               },  
               application-properties: {  
                       "operation" -> "<operation>",  
               },  
       )  

Получение ответного сообщения по ссылке ответа.

responseMessage = responseLink.receiveTransfer()

Ответное сообщение имеет следующий формат:

Message(  
properties: {     
       correlation-id: <request id>  
   },  
   application-properties: {  
           "statusCode" -> <status code>,  
           "statusDescription" -> <status description>,  
          },         
)  

[Источник 3].

Реализация

Источники

  1. AMQP (Advanced Message Queuing Protocol) [2017-2017]. Дата обновления 20.12.2017. URL https://docs.microsoft.com/ru-ru/azure/service-bus-messaging/service-bus-amqp-protocol-guide/(дата обращения 12.12.2017)
  2. AMQP (Advanced Message Queuing Protocol) [2017-2017]. Дата обновления 20.12.2017. URL https://m.habrahabr.ru/post/62502//(дата обращения 12.12.2017)
  3. AMQP (Advanced Message Queuing Protocol) [2017-2017]. Дата обновления 20.12.2017. URL https://docs.microsoft.com/ru-ru/azure/service-bus-messaging/service-bus-amqp-request-response/(дата обращения 12.12.2017)