Главная SQL, Без рубрики, Новое Обзор технологии Microsoft StreamInsight
  • Обзор технологии Microsoft StreamInsight

    SQL-Server-StreamInsight-CTP2-Available-for-Download-2 Технология StreamInsight – это новая технология для анализа данных от Microsoft. Microsoft StreamInsight предназначен для обработки потоков событий и базируется на технологии Complex Event Processing.

    Complex Event Processing (CEP) – это непрерывная инкрементальная обработка потоков событий из множества источников на основании декларированных запросов и шаблонов с близкой к нулю задержкой.

    Рассмотрим ситуацию, когда нам нужно обрабатывать некоторый поток событий, причем событий очень много и они поступают очень часто. Необходимо обрабатывать события с минимальной задержкой, желательно в реальном времени.

    В парадигме СУБД данные сначала должны быть загружены на сервер, а это уже может потребовать значительных затрат ресурсов и времени. Затем, нужно сделать запрос к базе данных, чтобы получить некоторый ответ, на основании которого можно принять то или иное решение.

    Понятно, что для описанного примера использование подхода СУБД, возможно, является не самой лучшей идеей.

    Теперь рассмотрим работу CEP системы. В CEP системе запросы являются «статичными», то есть автоматически выполняются системой при поступлении новых событий и отправляют обработанные данные в выходной поток. В общем случае, данные даже не обязательно сохранять куда-либо.

    Данные из входного потока обрабатываются инкрементально, то есть CEP движку не нужно обращаться к старым данных, если в этом нет необходимости.

    В таблице приводится сравнение СУБД и CEP по различным параметрам. Видно, что CEP позволяет обрабатывать большое количество данных с малой задержкой, в то время как СУБД не позволяет этого сделать.

    СУБД

    CEP

    Парадигма запросов Запросы выполняются по требованию Запросы обрабатываются непрерывно
    Задержка Секунды, часы, дни Миллисекунды или меньше
    Пропускная способность Сотни записей/сек Десятки тысяч записей/сек

    Связь StreamInsight и Microsoft SQL Server 2008 R2

    StreamInsight поставляется вместе с SQL Server 2008 R2 и является частью платформы обработки данных MS SQL, однако StreamInsight никак не относится к Database Engine и не зависит от SQL Server (обратное тоже верно).

    Так что связь SQL Server и StreamInsight является достаточно условной.

    Более того, StreamInsight можно скачать отдельно от MS SQL Server.

    С другой стороны, при установке StreamInsight отдельно от MS SQL Server, нужно ввести ключ активации MS SQL Server 2008 R2 или выбрать ознакомительный 180-дневный режим. Полностью бесплатную версию я найти не смог.

    Возможности StreamInsight зависят от редакции MS SQL Server 2008 R2, с которой ассоциирован вводимый ключ, и представлены в таблице:

    Возможности StreamInsight Редакция Microsoft SQL Server 2008 R2
    Standard

    Тысячи событий/сек

    Задержка: секунды

    Standard

    Enterprise

    Web

    Premium

    Десятки тысяц событий/сек

    Задержка: менее секунды

    Datacenter

    Developer

    Evaluation

    Установка StreamInsight

    Дистрибутив StreamInsight можно скачать по ссылке

    http://www.microsoft.com/sqlserver/2008/en/us/R2-complex-event.aspx, на момент написания статьи доступны версии для 32-битных и 64-битных операционных систем.

    Системные требования

    Аппаратное обеспечение:

    • Рекомендуется: 2.2 GHz CPU, 1024 MB RAM
    • Минимум: 1.6 GHz CPU, 384 MB RAM

    Программное обеспечение:

    Поддерживаются следующие операционные системы:

    • Windows XP Service Pack 2 и более новые (x86 и x64)
    • Windows Server 2003 Service Pack 2 и более новые (x86 и x64)
    • Windows Server 2003 R2 и более новые (x86 и x64)
    • Windows Vista (x86 и x64)
    • Windows Server 2008 и более новые (x86 и x64)
    • Windows 7 (x86 и x64)

    Дистрибутив занимаем около 10 Mb, установка происходит при помощи простого мастера. Наиболее важный вопрос при установке – это имя инстанса StreamInsight – это имя нужно обязательно запомнить, поскольку оно используется при программировании.

    После установки необходимо удалить старую версию Microsoft SQL Server Compact (Панель управления->Установка и удаление программ) и установить новую (идущую в комплекте со StreamInsight). На момент написания статьи, это Microsoft SQL Server Compact 3.5 SP1.

    В комплект установки входят следующие компоненты (после установки их можно найти в папке c:\Program Files\Microsoft StreamInsight 1.0\):

    Файлы Путь
    Microsoft.ComplexEventProcessing DLLs*

    StreamInsightDumper

    C:\Program Files\Microsoft StreamInsight 1.0\Bin
    Документация StreamInsight C:\Program Files\Microsoft StreamInsight 1.0\Documentation\<LanguageFolder>
    Конфигурация и исполняемые файлы Microsoft StreamInsight Server C:\Program Files\Microsoft StreamInsight 1.0\Host

    C:\Program Files\Microsoft StreamInsight 1.0\Host\<InstanceName>

    Лицензия Microsoft StreamInsight C:\Program Files\Microsoft StreamInsight 1.0\license\<LanguageFolder>
    Пакет установки SQL Server Compact Edition C:\Program Files\Microsoft StreamInsight 1.0\Redist
    Отладчик потоков событий Microsoft StreamInsight

    Для Windows XP и Windows Server 2003 урезанные версии

    C:\Program Files\Microsoft StreamInsight 1.

    * – Сборки регистрируются в GAC во время установки

    Схема работы StreamInsight

    Рассмотрим более подробно схему работы StreamInsight:

    Данные могут поступать с различных источников. Например, это могут быть различные сенсоры, веб-сервисы, хранилища данных и другие источники. Информация из таких источников, скорее всего, является разнородной (например, имеет разный формат), для преобразования этой информации в события, понятные системе, используются входные адаптеры (Input Adapters). Полученные события поступают на движок StreamInsight, где обрабатываются заранее описанными запросами. Причем, данные могут обрабатываться несколькими запросами по очереди, может также использоваться информация из внешних источников данных. Всю работу по максимально быстрой обработке событий берет на себя движок StreamInsight. Обработанные данные нужно каким-то образом передать получателям событий. Для преобразования данных в удобную для получателей форму используются выходные адаптеры (Output Adapters).

    Основные понятия

    К основным структурым элементам StreamInsight относятся:

    • Потоки
    • События
    • Адаптеры
    • Запросы

    Рассмотрим каждый элемент:

    Потоки

    Поток – это набор событий, который, в общем случае, может быть бесконечным.

    События в потоке необязательно упорядочены по времени, однако каждое событие обязательно имеет временную отметку.

    В StreamInsight потоки реализованы в виде очередей, которые предоставляют методы для постановки и изъятия событий из потока. Причем, StreamInsight берет на себя работу по правильной обработке событий с учетом их временных отметок, а не на основании порядка поступления событий из потока, что, безусловно, очень важно.

    События

    Пожалуй, основной элемент StreamInsight – это событие.

    Любое событие в StreamInsight состоит из двух частей:

    • Заголовок
      • Тип события (Event Kind)
      • Временные отметки (Timestamps)
    • Нагрузка (Payload)
      • Данные

    Событие может иметь одну или более (начало события, конец события) временных отметок. Время задается типом DateTime в фомате UTC. Стоит отметить, что временные отметки задаются программно, то есть разработчик отвечает за корректное заполнение временных отметок, а не движок StreamInsight.

    Тип события может иметь значения INSERT (новые данные) и CTI (Current Time Increment). Событие CTI используется для корректной обработки данных в тех случаях, когда события поступают в неправильной хронологической последовательности. Событие CTI по сути сообщает о том, что все события, появившиеся до временной отметки, соответствующей событию CTI, уже были отправлены и теперь могут быть обработаны.

    Рассмотрим пример:

    Пусть у нас есть несколько событий произвольно расположенных на временной оси. Нам нужно вычислить среднее значение из данных, содержащихся в событиях.

    Проблема в том, что мы не знаем временных меток событий, которые еще не поступили на обработку, таким образом, мы не можем вычислить и среднего значения по интервалу времени.

    Добавим события CTI. Событие CTI говорит о том, что можно начать обрабатывать события лежащие левее его по оси времени. Таким образом мы вычисляем среднее значение 1.

    Идем дальше: пусть появилось еще некоторое количество событий и событие CTI 2. Теперь все события, лежащие левее CTI 2 могут быть обработаны.

    Что произойдет если после появления события CTI появилось новое событие лежащее левее него по оси времени (обозначено красным)? Ведь получается, что среднее значение 1 уже не является актуальным.

    Реакция системы в такой ситуации описывается специальной политикой, которая задается разработчиком.

    Существует два вида таких политик:

    • Drop – отбрасывать все подобные события
    • Adjust – изменять временные характеристики события таким образом, чтобы оно оказалось правее последнего события CTI на временной оси. Правда такая политика применима не ко всем событиям, а лишь к тем которые обладают моделью отличной от точечной.

    Что такое модель события?

    Еще одной характеристикой события является его модель (Event Model), существует три модели событий:

    • Interval – известен период времени, в течении которого происходило событие

      Пример: температура в комнате с 14-00 до 16-00 была равна 21 градусу.

    • Point – точечная модель, событие произошло мгновенно.

      Пример: атмосферное давление в 14-30 было равно 760 мм.

    • Edge – известно время начала события, но не известно его время окончания

      Пример: дождь начался в 18-00 (пока не закончился)

    Рассмотрим примеры:

    Interval

    Тип события Начало Конец Нагрузка (Id)
    INSERT 2009-12-27

    02:04:00.213

    2009-12-27

    02:04:04.329

    EU-23423-12
    INSERT 2009-12-27

    02:04:04.329

    2009-12-27

    02:04:08.234

    EU-23423-15
    INSERT 2009-12-27

    02:04:04.234

    2009-12-27

    02:04:04.523

    EU-23423-18

    Point

    Тип события Начало Конец Нагрузка (Id)
    INSERT 2009-12-27

    02:04:00.213

    2009-12-27

    02:04:00.213 + c

    EU-23423-12
    INSERT 2009-12-27

    02:04:04.329

    2009-12-27

    02:04:04.329 + c

    EU-23423-15
    INSERT 2009-12-27

    02:04:04.234

    2009-12-27

    02:04:04.234 + c

    EU-23423-18

    Где с – это наименьшая измеримая единица времени.

    События типа Edge обычно состоят из двух событий (начало и конец события):

    Тип события Тип границы Начало Конец Нагрузка (Id)
    INSERT Start 2009-12-27

    02:04:00.213

    EU-23423-12
    INSERT End 2009-12-27

    02:04:00.213

    2009-12-27

    02:04:04.329

    EU-23423-12
    INSERT Start 2009-12-27

    02:04:04.234

    EU-23423-18
    INSERT End 2009-12-27

    02:04:04.234

    2009-12-27

    02:04:08.238

    EU-23423-18

    ∞ – на самом деле DateTime.MaxValue

    Нагрузка

    Нагрузка – это те данные, которые сопровождают событие. Например, текущее значение температуры, полученное с термометра.

    Нагрузка описывается стандартной структурой на C#, однако существует ряд ограничений, отметим основные (полный список ограничений может быть найден в Microsoft StreamInsight Help -> Developer’s Guide -> Creating Event Types):

    • Размер события не должен превышать 16Kb с учетом всей служебной информации.
    • Событие должно иметь не менее одного поля нагрузки
    • Допустимы только скалярные и элементарные типы (byte,int,byte[],string,datetime)
    • Нагрузка должна быть упакова в класс или структуру C# (даже если в нагрузке содержится только одно поле, оно должно быть размещено в структуре или классе)
    • Нельзя использовать пользовательские атрибуты для полей

    Следующий класс описывает нагрузку с двумя полями типа int.

    public class SimplePayload

    {

    public int V1 { get; set; }

    public int V2 { get; set; }

    }

    Адаптеры

    Как было замечено ранее, адаптеры служат для преобразования потоков данных. Адаптеры делятся на входные и выходные. Входные адаптеры преобразуют данные полученные с источников, в формат понятный StreamInsight, а выходные адаптеры преоразуют данные в формат понятный конечным получателям.

    Основные задачи, которые нужно решить при разработке адаптеров:

    Определить тип адаптера.

    Входной или выходной.

    Определить тип событий.

    Адаптер может быть типизированным либо нетипизированным. Структура событий для типизированных адаптеров известна заранее (описана в виде класса), а для нетипизиванных неизвестна (например, данные получаются с использованием DataReader из Microsoft SQL Server).

    Определить модель событий.

    Point, Interval или Edge. Рекомендуется создавать отдельный адаптер для каждой модели событий.

    Выбрать подходящий базовый класс для адаптера.

    Существуется несколько базовых классов адаптеров для каждого типа адаптера и модели события.

    Тип адаптера Базовый класс входного адаптера Базовый класс выходного адаптера
    Типизированный, Point TypedPointInputAdapter TypedPointOutputAdapter
    Нетипизированный, Point PointInputAdapter PointOutputAdapter
    Типизированный, Interval TypedIntervalInputAdapter TypedIntervalOutputAdapter
    Нетипизированный, Interval IntervalInputAdapter IntervalOutputAdapter
    Типизированный, Edge TypedEdgeInputAdapter TypedEdgeOutputAdapter
    Нетипизированный, Edge EdgeInputAdapter EdgeOutputAdapter

    Создать фабрики для входных и выходных адаптеров

    Тип адаптера Базовый класс фабрики входного адаптера Базовый класс фабрики выходного адаптера
    Типизированный ITypedInputAdapterFactory ITypedOutputAdapterFactory
    Нетипизированный IInputAdapterFactory IOutputAdapterFactory

    Далее перечислены основные обязанности фабрики:

    • Разделяет ресурсы между сходными адаптерами, различающимися только моделью событий.
    • Предоставляет интерфейс Create() и Dispose(). С их помощью адаптеры работают с событиями.
    • Автоматически создает CTI события с учетом пользовательских настроек.

    Разработчик адаптера должен имплементировать методы Start() и Resume() базового класса, в которых производится преобразование событий.

    Базовые классы предоставляют методы Enqueue() и Dequeue(), которые должны использоваться во входном и выходном адаптерах соответственно для обработки событий. Например, входной адаптер преобразует событие в формат понятный StreamInsight и помещает его в очередь, используя метод Enqueue().

    Очень подробно разработка адаптеров описана в разделе Creating Input and Output Adapters документации StreamInsight.

    Запросы

    Пожалуй, наиболее интересным с точки зрения разработчика элементом StreamInsight являются запросы.

    Запросы StreamInsight представляют собой не что иное, как LINQ запросы к потокам данных.

    Простейший запрос мог бы выглядеть следующим образом:


    var queryOutput = from e in input


    select e;

    Такой запрос просто возвращает все полученные события.

    StreamInsight запросы предоставляют следующие возможности:

    • Проекции (Project)
    • Фильтры (Filter)
    • Корелляция потоков (Join)
    • Объединения (Union)
    • Агрегация (Aggregation)
    • Оконные операции

    Рассмотрим каждую из возможностей:

    Проекции

    Проекции используется в случаях, когда нужно:

    • Получить только часть данных из входного потока.
    • Произвести вычисления каких-либо значений.
    • Создать новые поля, основываясь на имеющихся во входном потоке полях.

    Следующий запрос использует пользовательскую функцию для получения нового поля:


    var queryOutput = from e in input


    select
    new { e.Lane, e.TagId,

    VehicleType = TollPointEvent.VehicleTypeName(e.VehicleTypeId) };

    Фильтры

    Фильтры используются в тех случаях, когда нужно получить только те события, которые удовлетворяют определенным требованиям.

    Следующий запрос выбирает только те события, для которых выполняется условие из блока where:


    var queryOutput = from e in input


    where e.VehicleTypeId == 2


    select
    new { e.Lane, e.TagId, VehicleType = TollPointEvent.VehicleTypeName(e.VehicleTypeId) };

    Корелляция потоков

    Используется операция Join, хорошо знакомая разработчикам, использующим SQL. Разница в том, что объединятся не все события, а только те, которые произошли в один и тот же промежуток времени.

    Следующий запрос выполняет cross join двух потоков событий.


    var queryOutput = from nbv in northboundVehicles


    from sbv in lane0Vehicles


    select
    new

    {

    ExitNorth = nbv.ExitGate,

    NorthVehicle = nbv.TagId,

    TollPointId = sbv.TollPointId

    };

    Объединение

    Объединение потоков просто объединяет все события их двух потоков в один. В отличие от корелляции, при объединении потоков информация о временных характеристиках событий не играет роли.

    Следующий запрос объединяет события их двух разных потоков.


    var northboundVehicles = from e in input


    where e.DirectionId == 0


    select e;


    var lane0Vehicles = from e in input


    where e.Lane == 0


    select e;


    var queryOutput = northboundVehicles.Union(lane0Vehicles);

    Безусловно, можно объединять более двух потоков, последовательно вызывая Union()

    Агрегация

    Агрегация позволяет вычислять некоторую функцию по набору событий. К таким фукнциям относятся: Avg, Sum, Count и т.д. Также можно использовать пользовательские функции. Функции агрегации можно применять только к наборам событий, в данном случае – только к окнам.

    Окна позволяют объединять события (для последующей обработки) на интервалах, которые могут задаваться временными или количественными характеристиками.

    Для окон поддерживаются следующие виды операций:

    • Агрегация.
    • TopK – ранжирование.
    • Пользовательские операторы.

    В StreamInsight существует четыре типа окон:

    • Временные окна: Hopping Window и Tumbling Window
    • Окна моментальных снимков: Snapshot Window
    • Окна количества: CountByStartTime Window

    Hopping Window (прыгающее окно)

    Для прыгающего окна определяются два параметра: H – размер «прыжка» и размер окна S. Новое окно создается через каждые H моментов времени (окно «прыгает»), а размер этого окна равен S.

    Если H равно S, такое окно называется Tumbling Window (окно вращения).

    Следующий запрос вычисляет сумму по всем событиям, произошедшим за последний час, каждые 10 минут.

    var hoppingAgg = from w in inputStream.HoppingWindow(TimeSpan.FromHours(1),

    TimeSpan.FromMinutes(10),

    HoppingWindowOutputPolicy.ClipToWindowEnd)

    select new { sum = w.Sum(e => e.i) };

    Обратите внимание, что при создании окна используется параметр, задающий политику HoppingWindowOutputPolicy.ClipToWindowEnd. Интересным является тот факт, что в текущей версии StreamInsight существует всего один вариант задания данной политики (правда он называется по-разному для каждого типа окна).

    Snapshot Window (Окно моментального снимка)

    Параметры окон моментального снимка определяются только событиями в потоке. Появление нового события или окончание старого приводит к окончанию текущего окна и созданию нового (если какие-либо события еще не закончены). Рисунок хорошо иллюстрирует сказанное:

    Следующий запрос создает окно мементального снимка:

    var snapshotAgg = from w in inputStream.SnapshotWindow(WindowInputPolicy.ClipToWindow,

    SnapshotWindowOutputPolicy.Clip)

    select new { sum = w.Sum(e => e.i) };

    CountByStartTime Window (Окно количества)

    Размер окон количества зависит от количества событий с разными временными отметками (точнее, с разным временем начала события). Если все события имели разное начальное время, то окно будет содержать ровно N (параметр окна количества) событий.

    Окно количество можно задать следующим запросом:

    var agg = from w in inputStream.CountByStartTimeWindow(10, CountWindowOutputPolicy.PointAlignToWindowEnd)

    select new { sum = w.Sum(e => e.i) };

    Для более подробного ознакомления с окнами событий, я рекомендую обратиться к соответствующему разделу документации StreamInsight (Developer’s Guide->Writing Query Templates in LINQ->Using Event Windows).

    Этапы разработки

    Тепед тем как переходить к рассмотрению примера еще раз пройдемся по шагам, которые нужно выполнить разработчику StreamInsight приложений.

    1. Определить параметры событий и создать класс нагрузки
    2. Создать входные адаптеры и фабрики входных адаптеров для каждого типа источников данных
    3. Создать шаблоны StreamInsight запросов с использованием LINQ
    4. Создать выходные адаптеры и фабрики выходных адаптеров для каждого типа получателей событий
    5. Связать все компоненты системы вместе

    Также нужно отметить, что движок StreamInsight может работать в нескольких режимах:

    • Как отдельный сервис
    • Как встроенная часть приложения

    Первый вариант, пожалуй, является более предпочтительным для реальных задач, поскольку он является более масштабируемым. С другой стороны, второй вариант является более простым, именно его мы и рассмотрим в рамках примера.

    Пример

    Теперь пора перейти к примеру. Я буду использовать пример, идущий в комплекте с SQL Server 2008 R2 Update for Developers Training Kit, который называется HighwayMonitor.

    Задача:

    Имеется шоссе с 8 рядами в каждую сторону. Имеется 6 точек наблюдения, с которых поступают сигналы. Измеряется скорость едущих машин.

    Существует пять типов машин:

    • Скорая (Ambulance)
    • Автобус (Bus)
    • Грузовик (Truck)
    • Такси (Taxi)
    • Легковой автомобить (Car)

    Мы не будем ставить каких-то конкретных задач по анализу данных, на этом примере мы просто убедимся в том, что StreamInsight может справляться с задачей обработки данных, написание более разумных запросов остается читателям в качестве самостоятельной работы.

    Пока будем просто выводить события в DataGridView.

    Открываем проект в Visual Studio 2010.

    В окне Solution Explorer посмотрим на структуру проекта:

    Позже мы рассмотрим классы, содержащиеся в проекте, пока нужно отметить лишь то, что в проект добавлены ссылки на библиотеки StreamInsight, которые могут быть найдены в папке, в которую был установлен StreamInsight.

    Определим формат событий в файле TollPointTypes.cs


    public
    class
    TollPointEvent : IRandomInit

    {


    public
    static
    int TollPoints = 6;


    public
    static
    int Lanes = 8;


    public
    static
    Random rand;


    public
    Guid EventID;


    public
    Int32 TollPointId;


    public
    Int32 DirectionId;


    public
    Int32 Lane;


    public
    Int32 VehicleTypeId;


    public
    string TagId;


    public
    DateTime EnterGate;


    public
    Int32 MillisecondsToPassSpeedCheckPoint;


    public
    DateTime ExitGate;


    public TollPointEvent()

    {

    }


    public
    void Init()

    {


    if (null == rand)

    {

    rand = new
    Random();

    }


    this.EventID = Guid.NewGuid();


    this.TollPointId = rand.Next(TollPoints);


    this.DirectionId = PickDirection(rand);


    this.Lane = rand.Next(Lanes);


    this.VehicleTypeId = PickVehicleTypeId(rand);


    this.TagId = rand.Next(Int32.MaxValue).ToString();


    this.ExitGate = DateTime.Now;


    Int32 vehicleSpeed = rand.Next(30, 120); // generate car speeds between 30 and 120 kph


    //Int32 vehicleSpeed = 80;


    Int32 vehicleLength = PickLength(rand, this.VehicleTypeId);


    this.EnterGate = CalcEnter(this.ExitGate, vehicleSpeed, vehicleLength);


    this.MillisecondsToPassSpeedCheckPoint = CalcPassSpeedCheckPoint(this.EnterGate, vehicleSpeed);


    Trace.WriteLine(this.EventID.ToString());

    }

    }

    Также в классе содержатся дополнительные методы, но для нас они не слишком важны.

    В файле TollPointInput.cs содержится описание входного адаптера. Поскольку у нас нет реальных датчиков, адаптер самостоятельно создает события.


    public
    class
    TollPointInput<TollPointEvent> : TypedPointInputAdapter<TollPointEvent>

    {


    private
    TollPointInputConfig _config;


    public TollPointInput(TollPointInputConfig config)

    {

    _config = config;

    }


    public
    override
    void Resume()

    {

    ProduceEvents();

    }


    public
    override
    void Start()

    {

    ProduceEvents();

    }


    private
    void ProduceEvents()

    {


    PointEvent<TollPointEvent> currEvent = default(PointEvent<TollPointEvent>);


    EnqueueOperationResult result = EnqueueOperationResult.Full;


    Random rand = new
    Random();


    while (!HighwayMonitor.Program.MainWindow.NeedToStop)

    {


    if (AdapterState.Stopping == AdapterState)

    {

    Stopped();


    return;

    }

    currEvent = CreateInsertEvent();


    if (null == currEvent)

    {


    continue;

    }

    currEvent.StartTime = DateTime.Now;


    currEvent.Payload = (TollPointEvent)Activator.CreateInstance(typeof(TollPointEvent));

    (currEvent.Payload as
    IRandomInit).Init();

    result = Enqueue(ref currEvent);


    if (EnqueueOperationResult.Full == result)

    {

    ReleaseEvent(ref currEvent);

    Ready();


    return;

    }


    Thread.Sleep(rand.Next(1,500));

    }


    this.Stopped();

    }

    }

    Мы знаем параметры событий (TollPointEvent), а наши события являются мгновенными, так что в качестве базового класса выбран TypedPointInputAdapter.

    Реализованы методы Resume() и Start(), которые просто вызывают метод ProduceEvents(), на котором стоит остановиться подробнее.

    currEvent = CreateInsertEvent();

    Создает новое событие с типом INSERT, далее поля события (включая время возникновения) заполняются. После того как все поля заполнены, вызывается метод Enqueue(), который ставит новое событие в очередь обработки.

    В коде также выполняется достаточно много проверок, которые обязательно нужно производить, например, может оказаться, что очередь событий переполнена, в таком случае новое событие не может быть добавлено в очередь.

    В файле TollPointInputFactory.cs описана фабрика входных адаптеров, метод Create которой просто возвращает экземпляр только что рассмотренного адаптера.


    public
    class
    TollPointInputFactory : ITypedInputAdapterFactory<TollPointInputConfig>, ITypedDeclareAdvanceTimeProperties<TollPointInputConfig>

    {


    public
    InputAdapterBase Create<TollPointEvent>(TollPointInputConfig configInfo, EventShape eventShape)

    {


    return
    new
    TollPointInput<TollPointEvent>(configInfo);

    }


    public
    void Dispose()

    {

    }


    public
    AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(TollPointInputConfig configInfo, EventShape eventShape)

    {


    var atgs = new
    AdvanceTimeGenerationSettings(configInfo.CtiFrequency, TimeSpan.FromSeconds(0), true);


    var ats = new
    AdapterAdvanceTimeSettings(atgs, AdvanceTimePolicy.Drop);


    return ats;

    }

    }

    Метод DeclareAdvanceTimeProperties устанавливает политику работы с событиями CTI.

    Первая строка метода создает настройки времени таким образом, чтобы событие CTI создавалось после каждого configInfo.CtiFrequency-го события. В основной программе для этого параметра будет задано значение 1, это значит, что событие CTI будет создаваться после каждого события.

    Во второй строчке указывается политика обработки событий, которые поступили позже события CTI, но их временная отметка более раняя. В данном случае устанавливается значение Drop (на самом деле мы не могли установить другое значение, поскольку мы работаем с мгновенными (Point) событиями).

    Файл GridOutputAdapter.cs содержит описание выходного адаптера


    public
    class
    GridOutputAdapter : PointOutputAdapter

    {


    private
    EventWaitHandle _adapterStopSignal;


    private
    CepEventType _bindtimeEventType;


    private
    int _eventsDequeued = 0;


    public GridOutputAdapter(string StopSignalName, CepEventType EventType)

    {

    _bindtimeEventType = EventType;

    _adapterStopSignal = EventWaitHandle.OpenExisting(StopSignalName);

    }


    public
    override
    void Start()

    {


    List<string> columnHeaders = new
    List<string>();

    columnHeaders.Add(“Command”);

    columnHeaders.Add(“Timestamp”);


    for (int fieldCounter = 0; fieldCounter < _bindtimeEventType.FieldsByOrdinal.Count; fieldCounter++)

    {


    CepEventTypeField eventFieldType = _bindtimeEventType.FieldsByOrdinal[fieldCounter];

    columnHeaders.Add(eventFieldType.Name);

    }

    HighwayMonitor.Program.MainWindow.AddEventToDisplayListBox(columnHeaders.ToArray(), _bindtimeEventType.FieldsByOrdinal.Count + 2);

    ConsumeEvents();

    }


    public
    override
    void Resume()

    {

    ConsumeEvents();

    }


    protected
    override
    void Dispose(bool disposing)

    {


    base.Dispose(disposing);

    }


    private
    void ConsumeEvents()

    {


    PointEvent currEvent = default(PointEvent);


    DequeueOperationResult result;


    try

    {


    while (true)

    {


    if (AdapterState.Stopping == AdapterState)

    {

    result = Dequeue(out currEvent);

    PrepareToStop(currEvent, result);

    Stopped();

    _adapterStopSignal.Set();


    return;

    }


    result = Dequeue(out currEvent);


    if (DequeueOperationResult.Empty == result)

    {

    PrepareToResume();

    Ready();


    return;

    }


    else

    {

    _eventsDequeued++;


    HighwayMonitor.Program.MainWindow.AddEventToDisplayListBox(CreateStringArrayFromEvent(currEvent), _bindtimeEventType.FieldsByOrdinal.Count + 2);

    ReleaseEvent(ref currEvent);

    }

    }

    }


    catch (AdapterException e)

    {


    Console.WriteLine(“ConsumeEvents – “ + e.Message + e.StackTrace);

    }

    }


    private
    void PrepareToStop(PointEvent currEvent, DequeueOperationResult result)

    {


    if (DequeueOperationResult.Success == result)

    {

    ReleaseEvent(ref currEvent);

    }

    }


    private
    void PrepareToResume()

    {

    }


    private
    string[] CreateStringArrayFromEvent(PointEvent currEvent)

    {


    if (EventKind.Cti == currEvent.EventKind)

    {


    return
    new
    string[] {null, “CTI”, currEvent.StartTime.ToString() };

    }


    else

    {


    List<string> eventDetails = new
    List<string>();

    eventDetails.Add(null); // leave the first column for an image

    eventDetails.Add(“INSERT”);

    eventDetails.Add(currEvent.StartTime.ToString());


    for (int fieldCounter = 0; fieldCounter < _bindtimeEventType.FieldsByOrdinal.Count; fieldCounter++)

    {


    CepEventTypeField eventFieldType = _bindtimeEventType.FieldsByOrdinal[fieldCounter];


    object value = Convert.ChangeType(currEvent.GetField(fieldCounter), eventFieldType.Type.ClrType, CultureInfo.CurrentCulture);

    eventDetails.Add((value != null) ? value.ToString() : “NULL”);

    }


    return eventDetails.ToArray();

    }

    }

    }

    Наиболее интересным для нас методом тут является метод ConsumeEvents(). При помощи метода Dequeue() из очереди извлекается событие (если оно там есть), из которого создается набор строк в методе CreateStringArrayFromEvent() . Полученные строки добавляются в грид на форме.

    Обратите внимание на то, что в выходном адаптере мы уже не может работать с событием как с типизированными данными. Дело в том, что в ходе StreamInsight запроса мы могли создать (и, скорее всего, создали) проекцию данных, и этот новый тип данных мы уже не знаем. Так что мы просто проходим в цикле по всем имеющимся полям.

    Фабрика выходных адаптеров описана в файле GridOutputAdapterFactory.cs:


    public
    class
    GridOutputAdapterFactory : IOutputAdapterFactory<string>

    {


    public
    OutputAdapterBase Create(string StopSignalName,EventShape Shape, CepEventType EventType)

    {


    return
    new
    GridOutputAdapter(StopSignalName, EventType);

    }


    public
    void Dispose()

    {

    }

    }

    Тут совсем нет ничего интересного, мы просто возвращаем новый экземпляр выходного адаптера.

    Основная логика приложения находится в методе обработчика нажатия кнопки Start формы, рассмотрим только этот метод.


    private
    void StartButton_Click(object sender, EventArgs buttonArgs)

    {


    if (StartButton.Text == “&Start”)

    {

    NeedToStop = false;


    try

    {

    _tracer.WriteLine(“Starting Toll Tracker”);

    _server = Microsoft.ComplexEventProcessing.Server.Create(“si”);

    _application = _server.CreateApplication(“HighwayMonitor”);


    var input = CepStream<TollPointEvent>.Create(“TollPointInputStream”,


    typeof(TollPointInputFactory),

    _inputConfig,


    EventShape.Point);


    //———————————————————————-


    //- Modify this query – must have a resultant query called queryOutput


    // 2.1.01 – original query


    var queryOutput = from e in input


    select e;


    //———————————————————————

    _results = queryOutput.ToQuery(_application, “TollData”, “TollData Query”, typeof(GridOutputAdapterFactory),

    _stopSignalName,


    EventShape.Point,


    StreamEventOrder.FullyOrdered);

    _adapterStopSignal.Reset();

    _results.Start();

    StartButton.Text = “Sto&p”;

    DisplayDataGridView.Rows.Clear();

    DisplayDataGridView.Columns.Clear();

    }


    catch (Exception e)

    {

    _tracer.WriteLine(e.ToString());


    if (_results != null) _results.Stop();


    MessageBox.Show(“Unable to start query. Error returned was:\n\r” + e.ToString());

    }

    }


    else

    {

    NeedToStop = true;

    _adapterStopSignal.WaitOne();

    _results.Stop();

    _results = null;


    if (_server != null)

    {

    _server.Dispose();

    _server = null;

    }

    StartButton.Text = “&Start”;

    }

    }

    Первое, на что стоит обратить внимание – это создание движка StreamInsight.

    _server = Microsoft.ComplexEventProcessing.Server.Create(“si”);

    Параметр метода Create() – это название экземпляра StreamInsight сервера, которые мы задавали при инсталляции (в моем случае – “si”).

    Далее создается входной поток с использованием фабрики входных адаптеров.


    var input = CepStream<TollPointEvent>.Create(“TollPointInputStream”,


    typeof(TollPointInputFactory),

    _inputConfig,


    EventShape.Point);

    queryOutput – это запрос для обработки входного потока, в данном случае просто выбираются все события.


    var queryOutput = from e in input


    select e;

    Выходной поток инициализируется при помощи фабрики выходных адаптеров и запроса.

    _results = queryOutput.ToQuery(_application, “TollData”, “TollData Query”, typeof(GridOutputAdapterFactory),

    _stopSignalName,


    EventShape.Point,


    StreamEventOrder.FullyOrdered);

    Обратите внимание на вызов метода ToQuery(), он преобразует LINQ запрос в запрос StreamInsight.

    Вызов _results.Start(); запускает StreamInsight запрос на выполнение.

    Если запустить приложение и нажать на кнопку Start, то должна получиться следующая картина:

    Рекомендую ознакомиться с файлами Demo 2.1 QueryStreamInsightUsingLINQ.txt и Demo 2.2 Using Advanced Query Options.txt, в них содержатся примеры более интересных запросов для данного приложения.

    Заключение

    Технология StreamInsight, как минимум, заслуживает внимания, поскольку она предоставляет новый способ анализа данных. Причем, скорость обратки событий и парадигма в целом не могут не радовать.

    С другой стороны, технология новая и некоторые моменты вызывают сомнения, на мой взгляд, наиболее очевидные недостатки технологии на сегодняшний день следующие:

    • Непонятная ценовая политика. Во время подготовки статьи я не смог найти бесплатной версии StreamInsight без ограничения по времени или возможности купить StreamInsight отдельно от SQL Server 2008 R2. (Хотя, возможно, я плохо искал)
    • Не понятно, каким образом аналитик должен писать LINQ запросы. Как пока известно, планируется разработка специального языка StreamInsight запросов, понятного аналитику, а не только разработчику.

    Несмотря на указанные недостатки, я уверен, что уже сейчас найдется достаточно разработчиков, которых заинтересует данная технология, ведь задачи, которые может успешно решать StreamInsight, возникают достаточно часто.

    Материалы для дальнейшего изучения

    В рамках данной статьи были рассмотрены лишь основные аспекты работы со StreamInsight, для более глубокого понимания технологии, я рекомендую обратиться к следующим источникам:

    Techdays.ru – материалы на русском

    (http://www.techdays.ru/Search.aspx?Tag=SQL%2bServer%2b2008%2bR2)

    SQL Server 2008 R2 Update for Developers Training Kit – крайне полезные обучающие материалы
    (http://www.microsoft.com/downloads/details.aspx?familyid=FFFAAD6A-0153-4D41-B289-A3ED1D637C0D&displaylang=en)

    Документация по StreamInsight (поставляется вместе с StreamInsight)

    Исходный код

    Иван Андреев