Данный метод интеграции с Kafka устарел. Для всех новых интеграций используйте драйвер Kafka.

Если вы решили использовать Корреляторы событий для интеграции с Kafka, обратите внимание, что каждое расширение и зависимость, перечисленные ниже, могут иметь дополнительные подзависимости, которые также должны быть установлены.

Интеграция с Kafka

В данном уроке объясняется, как использовать Корреляторы событий для получения событий из источников Kafka.

Зависимости

Для источников Kafka требуется установка следующих зависимостей.

Расширения должны быть установлены в директории расширений, указанной в свойствах плагина Корреляторы событий:

Библиотека Kafka-clients и зависимости расширения должны быть установлены в папку <папка установки>\lib в подкаталог папки установки AggreGate:

Информация о загрузке и установке расширений Siddhi может быть полезна, если у вас возникли проблемы с правильной установкой необходимых зависимостей.

Об источниках данных Kafka

В Kafka сообщения создаются продюсерами. Например, продюсером может быть устройство, которое отправляет данные о температуре. Когда сообщение от производителя достигает кластера Kafka, оно сохраняется и публикуется в топике. Получатели могут подписаться на эти топики, чтобы получать сохраненные сообщения.

Каждая тема имеет один или несколько партиций. Партиция - это отдельный канал внутри топика, доступ к которому осуществляется по его идентификатору. Партиции нумеруются, начиная с 0. Сообщения назначаются конкретному разделу на основе значения ключевого поля, хранящегося в сообщении. Например, если несколько устройств включают в сообщения свой идентификатор, каждому из них может быть назначен свой раздел.

Движок коррелятора выступает в роли потребителя топиков Kafka. Поток Kafka может подписаться на топик и получить данные из всех его партиций. Как только поток получает события, вы можете использовать его как обычный поток коррелятора. Например, вы можете фильтровать входной поток или обнаруживать закономерности во входящих событиях.

Источники данных Kafka

Как и любой другой поток, потоки Kafka используют декораторы для определения параметров потока. Более подробную информацию о том, как работают потоки коррелятора событий, можно найти в разделе Скрипты коррелятора событий.

Пример потока источников Kafka:

@Source(

    type="kafka", 
    bootstrap.servers="kafka1.example.com:9091, kafka2.example.com:9092", 
    topic.list="example", group.id="correlator", 
    threading.option="single.thread", 
    @map(type='json'))

define stream InKafkaTemperature (device string, temperature int);

Параметры потока Kafka @Source:

Параметр

Опция

Описание

type

Нет

Тип входного потока. Для источников данных Kafka значение всегда kafka.

bootstrap.servers

Нет

Список хостов Kafka, разделенных запятой. Движок коррелятора будет получать события с указанных хостов.

topic.list

Нет

Список топиков, разделенных запятыми. Поток будет получать события от всех указанных топиков.

http://group.id

Нет

Имя группы получателей. Если вы хотите разделить сообщения из одного топика между несколькими входящими потоками, используйте то же самое имя группы. Это позволит избежать получения потоками дублей событий.

threading.option

Нет

Определяет способ обработки этого источника. Возможные значения:

  • single.thread. Источник будет работать в один поток.

  • topic.wise. Каждый топик будет выполняться в отдельном потоке.

  • partition.wise. Использовать отдельный поток для каждого раздела.

seq.enabled

Да

Опция используется, когда важна последовательность получаемых событий. В этом случае, поток должен определить признак, который будет выступать идентификатором. Параметр необязательный и по умолчанию выставлен на false.

is.binary.message

Да

Если события имеют двоичный формат, этот параметр должен быть установлен в true. Текущий параметр необязателен и по умолчанию равен false.

topic.offset.map

Да

Определяет смещения топиков в формате <topic> = <offset>, <topic> = <offset>. Смещение определяет, сколько сообщений будет пропущено с начала топика. Например, если значение смещения равно 100, то первые 100 сообщений будут пропущены при чтении из топика движком коррелятора. Данный параметр является необязательным, и по умолчанию сообщения не пропущены.

optional.configuration

Да

Определяет все остальные параметры настроек получателя. Формат parameter:value, parameter:value. Параметр необязательный.

@map

Нет

Маппер входных данных. Расширение Kafka поддерживает входные данные в форматах текс, xml, json, а также в бинарном формате. Этот формат можно указать в параметре маппера. Например, для получения данных в формате json укажите маппер @map(type='json') для входного потока.

Пример

Следующий скрипт получит данные из семи разделов (0-6) из топика "Температура". Поток подключается к двум серверам из кластера Kafka: kafka1.example.com:9091 и kafka2.example.com:9092. Входящий поток имеет два поля: устройство и температура. Для всех событий, имеющих значение температуры более 100, в выходном потоке генерируется событие.

@Source(

    type="kafka", 
    bootstrap.servers="kafka1.example.com:9091, kafka2.example.com:9092", 
    topic.list="temperature", group.id="correlator", 
    threading.option="single.thread", 
    partition.no.list="0,1,2,3,4,5,6", 
    seq.enabled="false", 
    is.binary.message="false", 
    @map(type='json'))

define stream InKafkaTemperature (device string, temperature int);

@Sink(type = 'internalEvent', event='test', @map(type='internalEvent'))

define stream OutKafkaTemperature (message string, temperature int);

from InKafkaTemperature[temperature > 100]
select 
  device as message,
  temperature as temperature
insert into OutKafkaTemperature;