Реактивные библиотеки RX

За последние несколько лет термин «реактивное программирование» стал популярен во всех языках программирования. Даже был опубликован Reactive Manifesto , который, впрочем, дает очень общую формулировку реактивных систем. Да, приложения должны быстро отвечать (Responsive), не падать в случае ошибок (Resilient), быть гибкими в плане увеличения/уменьшения мощностей (Elastic) и базироваться на асинхронных событиях (Message Driven).

Декларация, конечно, верная, а что конкретно делать программисту? Давайте поговорим о реактивных (Rx) библиотеках, существующих во многих языках программирования.

Лет семь назад Эрик Майер (Erik Meijer) из Майкрософта предложил модель программирования Reactive Extensions (Rx) и имплементировал ее, как набор библиотек (Rx.NET) для соединения и обработки асинхронных потоков данных. Потоки эти основаны на событиях, типа кто-то твитанул, а ты получил нотификацию. Оно, конечно, можно и зайти на страничку Твиттера и каждые 10 секунд ее перегружать, в надежде, что кто-то из тех, кого вы фоловите выдал новый твит, но тогда это не реактивно, а просто polling. Нагрузка на сервер значительно вырастет, если каждый пользователь будет его дергать раз в 10 секунд. Гораздо экономичнее просто подписаться на поток твитов и получать их только тогда, когда они есть.

Или возьмем онлайновый аукцион, когда другие покупатели перебивают вашу цену на фотокамеру. Это явный поток, на который можно и нужно подписаться, вместо того, чтобы постоянно проверять, не побили ли ваше предложение.

Еще пример — поток цен на акции во время работы биржи. Или поток сигналов от сенсора, например, акселерометра в телефоне. Даже процесс, когда пользователь тащит мышку по экрану, можно рассматривать, как поток координат мышиного указателя.

Пять лет назад Microsoft выложил Rx.NET в опенсорс. Народу понравилось, и библиотеку портировали на многие языки программирования: RxCpp, RxJS, RxPHP, Rx.rb, Rx.py, RxJava, RxSwift, RxScala, RxKotlin.

Давайте познакомимся с основными Rx понятиями. Сразу скажу, что эта статья не учебник по реактивному программированию, а просто описание основных игроков любой Rx библиотеки. Я работаю и с RxJS, и с RxJava, но в этой статье буду использовать JavaScript в примерах кода. Для начала, посмотрим на нереактивный код:

let a1 = 2;?
let b1 = 4;??

let c1 = a1 + b1;  // c1 = 6??

a1 = 55;           // c1 = 6, but should be 59
b1 = 20;           // c1 = 6, but should be 75

После выполнения этого кода, c1 остается равным шести. Оно, конечно, можно добавить строчки после изменения a1 и b1 и пересчитывать c1, но правильнее было бы, чтобы c1 мгновенно пересчитывалась при изменении a1 или b1, как в Excel spreadsheet. Иными словами, мы хотим перейти к push-модели, когда новые и асинхронно изменяющиеся значения are pushed к потребителю. Мы хотим уйти от pull-модели, когда потребитель периодически спрашивает поставщика: «У тебя есть что-то новенькое для меня?... А сейчас?... А может сейчас есть? ».

Observable, Observer, Subscriber

Давайте посмотрим на основных игроков реактивных библиотек: Observable, Observer и Subscriber.

Observable — это объект или функция, которая выдает последовательности данных во времени (a.k.a. The Producer).

Observer — это объект или функция, которая знает, как обрабатывать последовательности данных (a.k.a. The Consumer).

Subscriber — это объект или функция, которая связывает Observable и Observer.

Я думаю многие из работающих программистов, увидев это диаграмму, скажут, что мы это и так знаем — обычный messaging и pub-sub. Это и так, и не так:

  1. Rx библиотеки заточены на асинхронную обработку без блокировки обработки данных.
  2. Rx предлагает простой API с выделенными каналами для передачи данных, ошибок и сигнала об окончании потока данных.
  3. В Rx библиотеках есть больше сотни операторов, которыми можно обрабатывать потоки идущие к подписчику. Операторы можно собирать в цепочки, т.е операторы composable.
  4. В некоторых реализациях RX, например, RxJava2, хорошо поддерживается backpressure, т.е. ситуация, когда продюсер выдает данные быстрее, чем подписчик может обработать.
  5. Для Rx messaging не нужно поднимать специальные серверы. Все включено в код вашего приложения.
  6. В языках, которые поддерживают multi-threading, работа со threads упрощается, как и переключения с одних threads на другие. Разработчики на Андроиде — это оценят, ибо там вывод на экран всегда должен выполняться main thread, а вычисления — другими.

Как же все-таки Observable передает данные в Observer? Observer может имплементировать три метода (названия могут слегка отличаться в зависимости от языка):

В следующем примере функция getData() превращает массив с пивом в Observable и возвращает его. Кому? Подписчику, когда он появится. А подписчик — getData().subscribe(...?) - передает Observer, как аргумент функции subscribe(). Соответственно, Observer состоит из трех функций:

// Defining the function with observable
function getData(){

    var beers = [
        {name: "Stella", country: "Belgium", price: 9.50},
        {name: "Sam Adams", country: "USA", price: 8.50},
        {name: "Bud Light", country: "USA", price: 6.50},
        {name: "Brooklyn Lager", country: "USA", price: 8.00},
        {name: "Sapporo", country: "Japan", price: 7.50}
    ];

// The observer will be provided at the time of subscription
    return Rx.Observable.create( observer => {

              beers.forEach( beer => observer.next(beer));
              observer.complete();
           }
    );
}

// Calling the function that subscribe to the observable
// The function subscribe() receives the Observer, represented by three functions
getData()
     .subscribe(
         beer  => console.log("Subscriber got " + beer),   // handling the arrived data
         error => console.err(error),   // an error arrived
            () => console.log("The stream is over")   // the signal that the stream completed arrived
);

Наш Observer состоит из трех fat arrow functions, которые появились в спецификации языка ECMAScript 6. Функции next() и complete() выполнятся только тогда, когда мы вызовем subscribe(). Посмотреть этот пример в действии можно здесь: bit.ly/2jm69aM (откройте консоль броузера и нажмите Run).

Операторы

Операторы — это функции, которыми можно преобразовывать данные между моментом, когда Observable их отправил, и моментом, когда подписчик их получил. Т.е. преобразовываем данные во время движения. В Rx библиотеках операторов много. Больше сотни.

Каждый оператор — это функция, которая принимает Observable на вход, трансформирует полученное значение и выдает новый Observable на выходе. Так как вход и выход любого оператора одного типа (Observable), операторы можно связывать. Вот, например, как можно отфильтровать пиво, которое дешевле, чем 8 у.е. и преобразовать пивные объекты в строки.

Изучение Rx операторов требует времени, и если у читателей ДОУ будет интерес, то я продолжу писать об Rx. Документация Rx библиотек часто включает marble diagrams, которые могут помочь в понимании, что делает конкретный оператор. Вот, например, как иллюстрируется оператор filter:

Входной поток (Observable), представленный разными геометрическими фигурами, фильтруется, чтобы на выходе получить другой поток (Observable), в котором будут только круги.

А как же все-таки сделать c1=a1+b1 реактивным?

Сначала нужно превратить a1 и b1 в потоки, например, так:

const a1 = Rx.Observable.from([2, 55]);

Но этот поток выстрелит 2 и 55 мгновенно, а мы хотим добавить временную составляющую. Для имитации задержки можно использовать отдельный поток, который просто выстреливает числа через определенные интервалы времени, а чтобы связать его с нашим потоком, который выдает 2 и 55, мы используем оператор zip:

const a1 = Rx.Observable.from([2, 55])
  .zip(Rx.Observable.interval(1200), x => x);

Когда появится подписчик, наш поток выдаст 2, а через 1.2 секунды — 55. Похожим способом сделаем поток для b1, только с задержкой в полторы секунды. И снова, используя композицию потоков и оператор combineLatest, мы скажем: «Скомбинируй потоки a1 и b1, суммируя их последние значения». Весь код будет выглядеть вот так:

const a1 = Rx.Observable.from([2, 55])
  .zip(Rx.Observable.interval(1200), x => x);

const b1 = Rx.Observable.from([4, 20])
  .zip(Rx.Observable.interval(1500), x => x);

a1.combineLatest(b1, (x, y) => x + y)
  .subscribe(val => console.log("c1=" + val));

Чтобы увидеть этот код в действии, посетите Plunker по адресу bit.ly/2nphn0k , откройте консоль броузера и нажмите кнопку Run. Вы увидите, как c1 будет пересчитываться, как только потоки a1 и b1 будут выдавать новые значения.

В мае я буду проводить трехдневный воркшоп в Киеве по разработке веб приложений с Angular 4, который использует RxJS во многих местах. Зарегистрироваться на воркшоп можно здесь: bit.ly/2n6CoKy . Для читателей ДОУ существует скидка 10%. При регистрации введите промо код dou.

Если вы еще не работали с реактивными библиотеками, советую посмотреть на Rx библиотеку для вашего языка программирования и начать использовать ее в реальных проектах. Rx библиотеки не требуют изменения стиля программирования всего проекта. Используйте их там, где можно сделать так, что асинхронные данные проходят через последовательность алгоритмов (операторов).

Опубліковано: 03/04/17 @ 10:00
Розділ Різне

Рекомендуємо:

Упали позиции в Яндексе? Как определить причину и вернуть сайт в ТОП
Дайджест: как стать веб-разработчиком, рынок фриланса, рэп-роботы
Как посмотреть ссылки на страницу сайта бесплатно
Дайджест: як стати веб-розробником, ринок фріланса, реп-роботи
Підбірка фан-відео від айтішників за 2016 рік