Интеграция с RabbitMQ

В этом уроке объясняется, как можно установить соединение с брокером сообщений RabbitMQ, используя контекст Коррелятора событий.

Настройка папки расширения

Прежде всего, вам нужно указать поле Папка расширений для плагина Корреляторы событий. Все загруженные расширения коррелятора следует поместить в эту папку.

Окно свойств плагина Корреляторы событий можно найти, выбрав Редактировать свойства драйвера/плагина в контекстном меню узла Корреляторы событий в дереве системы.

Загрузка расширения RabbitMQ

Скачайте плагин RabbitMQ, используя следующую ссылку: https://github.com/siddhi-io/siddhi-io-rabbitmq. Поместите плагин в папку, заданную на предыдущем шаге.

Загрузка расширения мапера

Чтобы правильно осуществлять разбор и обработку  входящего потока данных, коррелятору необходим соответствующий плагин мапирования. Список доступных плагинов можно найти по ссылке: https://siddhi-io.github.io/siddhi/extensions/.

В данном руководстве мы будем работать в простыми текстовыми сообщениями, поэтому вам нужно скачать соответствующее расширение по ссылке: https://wso2-extensions.github.io/siddhi-map-text/.

Поместите плагин в папку, заданную на первом шаге. После этого перезапустите сервер.

После перезапуска сервера RabbitMQ сущности (источник и приемник), а также текстовый маппер, будут доступны в скриптах Коррелятора событий.

Настройка коррелятора событий

Теперь мы можем создать и настроить коррелятор событий.

Чтобы открыть окно свойств нового коррелятора, выберите пункт Создать в узле Корреляторы событий в системном дереве:

Поместите следующий скрипт в поле Correlator Script:

@sink (type = 'internalEvent',
event = 'test',
@map(type = 'internalEvent'))
define stream FooStream (text string);
@source(type ='rabbitmq',
uri = 'amqp://guest:guest@localhost:5672',
exchange.name = 'logs',
@map(type='text'))
define stream BarStream (text string);
from BarStream select text insert into FooStream;

Этот скрипт описывает, как сообщения, отправленные в обменник "logs" локального RabbitMQ брокера (BarStream), перехватываются и преобразуются в набор событий AggreGate Server (FooStream). Предполагается, что входящие сообщения поступают в текстовом формате (@map(type='text')). Ожидается, что входные данные будут выглядеть так: text:'Message'. После получения сообщения его данные будут извлечены и помещены в поле text соответствующего события AggreGate Server.

Для того, чтобы этот скрипт корректно выполнялся, вам также придется настроить выходной формат события test контекста Коррелятора событий.

Was this page helpful?