Скрипты коррелятора событий

Скрипты коррелятора используют язык Streaming SQL.

Так как Streaming SQL - сам по себе обширный язык, в данном разделе объясняются только базовые понятия этого языка, имеющие отношение к AggreGate. Более подробно о синтаксисе, операторах и возмоностях Streaming SQL см. Гайд по Streaming SQL.

Основные понятия языка Streaming SQL:

  • Поток - серия событий, упорядоченных по времени. Потоки делятся на входные и выходные. Коррелятор "слушает" входные потоки и выводит события в выходные потоки.

  • Запрос - выражение, которое может брать события из одного или более потоков, обрабатывать эти событий в потоковом режиме и выводить в выходной поток.

  • Функция - упакованная сложная логика исполнения, производящая операции с данными событий и возвращающая полученные данные. Можно вызвать функции для выполнения операций с данными событий.

  • Фильтр - условное выражение, определенное для потока. В результате, будут обрабатываться только события, соответствующие определенному выражению.

  • Окно - подмножество данных с определенным критерием, взятых из потока,. Окна динамичны. По мере генерирования новых событий в потоке, данные окно самообновляются.

  • Шаблон - выражение, которое определяет логику корреляции событий. С помощью шаблонов можно коррелировать между собой события из одного и более входных потоков и генерировать выходные события из данных коррелируемых событий.

Пример скрипта см. ниже. Скрипт берет значения из событий, генерируемых виртуальным устройством, и выводит их в выходной поток вместе с дополнительным сообщением.

    @Source(type = 'internalEvent', context='users.admin.devices.virtual', event='event1', @map(type='internalEvent'))

    define stream SourceStream (string string, int int);

 

    @Sink(type = 'internalEvent', event='test', @map(type='internalEvent'))

    define stream OutStream (message string, str string, num double);

 

    from SourceStream

    select 'Event 1 detected' as message, string as str, cast(int, "double") as num

    insert into OutStream;

В этом примере, SourceStream  - входной поток, OutStream  - выходной поток. @Source и @Sink - аннотации потока (см.ниже). from ... select ... insert into  - запрос. cast(int, "float") - встроенная функция, которая преобразует значения из Integer в Double.  В приведенном примере нет фильтров, окон и шаблонов. Они описаны в отдельных разделах.

Определение потоков

Осуществляется при помощи команды define stream с дальнейшим указанием определений атрибутов потока:

define stream TestStream (message string, num double);

Указанный в примере выше поток SourceStream имеет два атрибута: message типа string и num типа double.

Имена атрибутов должны совпадать с именами полей событий. Например, если событие в AggreGate называется message, нельзя использовать для него другое имя поля в скрипте коррелятора. В противном случае, коррелятор событий может проигнорировать некоторые события, что может привести к потере данных.

Привязка Потоков к Событиям AggreGate

Определение потока требует использования аннотации. Механизм, который привязывает события AggreGate к потокам движка коррелятора, использует аннотации.

Аннотация @Source указывает на то, что следующее за ней определение потока - это входной поток.

Пример входного потока с аннотацией:  

    @Source(type = 'internalEvent', context='users.admin.devices.virtual', event='event1', @map(type='internalEvent'))

    define stream SourceStream (string string, int int);

Атрибуты аннотации @Source:

  • type определяет тип входного потока. Этот параметр определяет протокол, который будет использоваться для создания данного входного потока. По умолчанию поддерживается только тип internalEvent. Этот тип соответствует событиям, генерируемым внутри AggreGate. Вы можете расширить количество поддерживаемых типов потоков, установив расширения. Например, вы можете использовать MQTT и RabbitMQ типы потоков, установив соответствующие расширения движка коррелятора.

  • context - специальный атрибут для типа потока internalEvent. Определяет контекст, имеющий событие, которое должен "слушать" коррелятор.

  • event - специальный атрибут для типа потока internalEvent. Определяет имя события в определенном контексте. Коррелятор будет "слушать" события с этим именем, генерируемые контекстом.

  • @map() - преобразователь входного формата для движка коррелятора. Преобразователь по умолчанию internalEvent преобразует формат событий, сгенерированных в AggreGate, в формат, который может обработать коррелятор.

Аннотация @Sink указывает на то, что следующее за ней определение потока - это выходной поток.

Пример выходного потока с аннотацией:  

    @Sink(type = 'internalEvent', context='users.admin.devices.virtual' event='event2', @map(type='internalEvent'))

    define stream OutStream (string string, int int);

Атрибуты аннотации @Sink:

  • type определяет тип выходного потока. Аналогичный атрибут, как и для аннотации @Source.

  • context  - специальный атрибут для типа потока internalEvent. Определяет контекст, который будет генерировать выходные события. Если данный атрибут не определен, события будут генерироваться контекстом самого коррелятора.

  • event - специальный атрибут для типа потока internalEvent. Определяет имя события, которое будет сгенерировано в определенном контексте. Если событие генерируется контекстом коррелятора, его формат определяется свойством Формат выходного события. Если другим контекстом, событие с определенным именем уже должно существовать в определенном контексте. В обоих случаях, имена полей событий должны соответствовать именам атрибутов потока. Если требуется подмножество полей из события, можно указать для них только атрибуты потока.

  • @map() преобразователь выходного формата для движка коррелятора. Преобразователь по умолчанию internalEvent преобразует формат событий, сгенерированных движком коррелятора, в формат, используемый AggreGate.

Обработка данных события

Для обработки данных события используйте запросы. Запросы - это команды, которые собирают данные из одного или более входных потоков и выводят данные в выходной поток.

Простой запрос выглядит следующим образом:

    from SourceStream 

    select int as num, string as message

    insert into OutStream;

Запрос отбирает из входного потока поля с именами int и string. Значение поля int помещается в поле num. Значение поля string помещается в поле message. Получившееся событие имеет два поля: num и message. Оно отправляется в выходной поток.

Следующий запрос выполняет функции для вычисления значений полей для исходящего события:

    from SourceStream

    select ifThenElse(regex:matches("(.*)bbb(.*)", string), "match", "no match") as message, cast(int, 'double') as num

    insert into OutStream;

Этот запрос использует три функции для вычисления значений полей. Функция ifThenElse() - встроенная функция языка Streaming SQL. Она обеспечивает условную логику. Функция cast() - встроенная функция языка Streaming SQL. Она преобразует свой параметр в другой тип. В приведенном примере, значение типа Integer из поля int преобразуется в значение типа Double, которое записывается в поле num выходящего потока. Функция regex:matches() приводит поле string входного события в соответствие с регулярным выражением. regex:matches()  - функция расширения. Чтобы использовать функции расширения, необходимо установить соответствующие расширения.

Функции расширений должны вызываться с использованием пространства имен. В приведенном выше примере regex  - это пространство имен функции matches.

Более подробно о встроенных функциях см. Документацию по API streaming SQL. Подробнее о функциях расширений см. документацию по расширениям движка коррелятора.

Фильтры

Фильтр - это условие, определенное для потока. Будут обработаны только события, отвечающие условию, а все остальные - проигнорированы.

Следующий фиьтр отбирает события конкретного пользователя:

    @Source(type = 'internalEvent', context='users.admin.models.actionHandler', event='action', @map(type='internalEvent'))

    define stream UserActions (user string, action string, level int);``

 

    from UserActions[user == 'administrator']

    select action as message

    insert into OutStream;

Можно использовать логические операторы внутри фильтров, чтобы комбинировать условные конструкции. Следующий пример отбирает события конкретного пользователя, превышающие определенный уровень серьезности.

    from UserActions[user == 'administrator' and level > 1]

    select action as message

    insert into OutStream;

Окна

Окно - это подмножество данных с определенным критерием, взятых из потока. По мере того, как все больше данных добавляется через входной поток, обновляются также и данные в окне. Как и в случае с входящим потоком, можно обрабатывать данные внутри окна и отправить их в выходящий поток.

Например, вы можете использовать окно для хранения десяти последних значений температуры с устройства и сверять с ними все остальные входные события. При поступлении новых данных, данные в окне будут обновляться соответственно. Или, например, вы можете использовать окно, чтобы выбрать последние пять минут активности для определенного устройства. Со временем данные в этом окне будут обновляться, поэтому события, содержащиеся в окне, всегда будут актуальными.

В потоке окна определяются при помощи префикса #window, за которым идет точка и тип окна с параметрами. Следующее окно получает последние 10 событий из потока:

    from SourceStream#window.length(10)

Определения окон могут комбинироваться с фильтрами. Следующее окно получает 10 последних событий из потока, принадлежащего определенному пользователю:

    from SourceStream#window.length(10)[user == 'administrator']

Следующий запрос использует окно и показывает максимальную температуру для последних 10 событий. Каждый раз при получении нового события, генерируется выходное событие:

    from SourceStream#window.length(10)

    select "Maximum temperature" as message, cast(max(int), 'double') as num

    insert into OutStream;

Окна могут быть скользящие и переворачивающиеся

Скользящее окно обновляется при каждом новом событии, отвечающем критериям окна. Переворачивающееся окно обновляется, когда вся длина окна полностью заполнена соответствующими событиями. Например, окно вмещает три последних события. Генерируется новое событие. Скользящее окно обновится. В нем будет два старых события и одно новое. Переворачивающееся окно не будет обновляться, пока не поступят еще два новых события, а после обновления в нем окажется три новых события.

Чтобы определить скользящее окно, используйте типы length и time. Тип length будет выбирать количество последних событий. Тип time будет отбирать все события за указанные период времени.

    -- select last 10 events from SourceStream

    from SourceStream#window.length(10)

 

    -- select all events that came in the last 10 minutes from SourceStream

    from SourceStream#window.time(10 min)

Чтобы определить переворачивающееся окно, используйте типы lengthBatch и timeBatch. Тип lengthBatch будет обновлять каждое установленное число событий. Тип timeBatch будет обновлять окно через каждый заданный интервал времени.

    -- select every 10 events from SourceStream

    from SourceStream#window.lengthBatch(10)

 

    -- select all events that come every 10 minutes from SourceStream

    from SourceStream#window.timeBatch(10 min)

Например, если длина окна lengthBatch равна 10, оно будет генерировать выходящее событие для каждых 10 входных событий. Окно timeBatch с 10-минутным временным периодом будет генерировать выходящее событие каждые 10 минут.

Следующий пример будет генерировать событие каждые 10 минут. Это событие будет содержать максимальное значение поля int из всех событий, полученных за последние 10 минут. Если событий не будет получено, событие не будет сгенерировано.

    from SourceStream#window.timeBatch(10 min)

    select 'Maximum temperature' as message, cast(max(int), 'double') as num

    insert into OutStream;

Шаблоны

С помощью шаблонов вы можете коррелировать события друг с другом и применять сложную логику на основе событий.

Синтаксис шаблонов очень простой и легко настраиваемый. Следующий пример - лишь небольшая часть того, что модно сделать при помощи шаблонов. Для большей информации см.Гайд по Streaming SQL.

Базовый шаблон определяет последовательность событий, которые должны идти друг за другом, отделяемые оператором ->. Такой шаблон показан в следующем примере. Если срабатывает аварийная сигнализация, и затем в течение 10 минут следует повышение температуры выше 80 градусов, активируются средства пожаротушения.

    from InStreamAlarms[alarm == 'emergency'] -> 

        InStreamTemperature[temp > 80]

        within 10 min

    select

        "activateExtinguishers" as action

    insert into OutSystemControl;

Другой пример. Есть два входных потока. Один из них - от датчик, который посылает информацию об уровне топлива в баке. Другой подает сигналы, когда уходит и возвращается оператор. Выходящий поток контролирует  насос.

    -- 0 to 100

    @Source(type = 'internalEvent', context='users.admin.devices.fuelSensor', event='fuelLevel', @map(type='internalEvent'))

    define stream SourceSensor (level int);

 

    -- "isAway", "isPresent"

    @Source(type = 'internalEvent', context='users.admin.models.operator', event='action', @map(type='internalEvent'))

    define stream SourceOperator (status string, name string);

 

 

    -- 1 - enable, 0 - disable

    @Sink(type = 'internalEvent', context='users.admin.models.pumpController', event='control', @map(type='internalEvent'))

    define stream OutPumpControl (action int, message string, operator string, value int);

 

Если оператор ушел и не вернулся в течение 10 минут, а в это время уровень топлива достиг 5%, включить насос. Этот шаблон использует ссылки на события (e1 и e2), чтобы обращаться к данным из связанных событий.

    from every( e1=SourceOperator[status == 'isAway'] ) -> 

        not SourceOperator[status == 'isPresent' and name == e1.name] for 10 min and e2=SourceSensor[level <= 5]

    select

        1 as action, 

        "Automatic activation (operator not present)" as message,

        e1.name as operator,

        e2.level as value

    insert into OutPumpControl;

 

Если в какой-то момент уровень топлива поднимается выше 95% и не падает в течение следующих 2 минут, остановить насос. Необязательно искать этот шаблон для каждого события, когда уровень топлива превышает 95% (нет необходимости в ключевом слове every). Первое событие с повышением уровня более, чем до 95%, запустит данный шаблон, и все другие подобные события будут обрабатываться как его часть.

    from e1=SourceSensor[level > 95] -> 

        not SourceSensor[level < e1.level] for 2 min

    select

        0 as action,

        "Automatic pump shutdown" as message,

        "auto" as operator,

        level as value

    insert into OutPumpControl;