Реактивные библиотеки RX
За последние несколько лет термин «реактивное программирование» стал популярен во всех языках программирования. Даже был опубликован Reactive Manifesto , который, впрочем, дает очень общую формулировку реактивных систем. Да, приложения должны быстро отвечать (Responsive), не падать в случае ошибок (Resilient), быть гибкими в плане увеличения/уменьшения мощностей (Elastic) и базироваться на асинхронных событиях (Message Driven).
Декларация, конечно, верная, а что конкретно делать программисту? Давайте поговорим о реактивных (Rx) библиотеках, существующих во многих языках программирования.
Лет семь назад Эрик Майер (Erik Meijer) из Майкрософта предложил модель программирования Reactive Extensions (Rx) и имплементировал ее, как набор библиотек (Rx.NET) для соединения и обработки асинхронных потоков данных. Потоки эти основаны на событиях, типа кто-то твитанул, а ты получил нотификацию. Оно, конечно, можно и зайти на страничку Твиттера и каждые 10 секунд ее перегружать, в надежде, что кто-то из тех, кого вы фоловите выдал новый твит, но тогда это не реактивно, а просто polling. Нагрузка на сервер значительно вырастет, если каждый пользователь будет его дергать раз в 10 секунд. Гораздо экономичнее просто подписаться на поток твитов и получать их только тогда, когда они есть.
Или возьмем онлайновый аукцион, когда другие покупатели перебивают вашу цену на фотокамеру. Это явный поток, на который можно и нужно подписаться, вместо того, чтобы постоянно проверять, не побили ли ваше предложение.
Еще пример — поток цен на акции во время работы биржи. Или поток сигналов от сенсора, например, акселерометра в телефоне. Даже процесс, когда пользователь тащит мышку по экрану, можно рассматривать, как поток координат мышиного указателя.
Пять лет назад Microsoft выложил Rx.NET в опенсорс. Народу понравилось, и библиотеку портировали на многие языки программирования: RxCpp, RxJS, RxPHP, Rx.rb, Rx.py, RxJava, RxSwift, RxScala, RxKotlin.
Давайте познакомимся с основными Rx понятиями. Сразу скажу, что эта статья не учебник по реактивному программированию, а просто описание основных игроков любой Rx библиотеки. Я работаю и с RxJS, и с RxJava, но в этой статье буду использовать JavaScript в примерах кода. Для начала, посмотрим на нереактивный код:
let a1 = 2;? let b1 = 4;?? let c1 = a1 + b1; // c1 = 6?? a1 = 55; // c1 = 6, but should be 59 b1 = 20; // c1 = 6, but should be 75
После выполнения этого кода, c1 остается равным шести. Оно, конечно, можно добавить строчки после изменения a1 и b1 и пересчитывать c1, но правильнее было бы, чтобы c1 мгновенно пересчитывалась при изменении a1 или b1, как в Excel spreadsheet. Иными словами, мы хотим перейти к push-модели, когда новые и асинхронно изменяющиеся значения are pushed к потребителю. Мы хотим уйти от pull-модели, когда потребитель периодически спрашивает поставщика: «У тебя есть что-то новенькое для меня?... А сейчас?... А может сейчас есть? ».
Observable, Observer, Subscriber
Давайте посмотрим на основных игроков реактивных библиотек: Observable, Observer и Subscriber.
Observable — это объект или функция, которая выдает последовательности данных во времени (a.k.a. The Producer).
Observer — это объект или функция, которая знает, как обрабатывать последовательности данных (a.k.a. The Consumer).
Subscriber — это объект или функция, которая связывает Observable и Observer.
Я думаю многие из работающих программистов, увидев это диаграмму, скажут, что мы это и так знаем — обычный messaging и pub-sub. Это и так, и не так:
- Rx библиотеки заточены на асинхронную обработку без блокировки обработки данных.
- Rx предлагает простой API с выделенными каналами для передачи данных, ошибок и сигнала об окончании потока данных.
- В Rx библиотеках есть больше сотни операторов, которыми можно обрабатывать потоки идущие к подписчику. Операторы можно собирать в цепочки, т.е операторы composable.
- В некоторых реализациях RX, например, RxJava2, хорошо поддерживается backpressure, т.е. ситуация, когда продюсер выдает данные быстрее, чем подписчик может обработать.
- Для Rx messaging не нужно поднимать специальные серверы. Все включено в код вашего приложения.
- В языках, которые поддерживают multi-threading, работа со threads упрощается, как и переключения с одних threads на другие. Разработчики на Андроиде — это оценят, ибо там вывод на экран всегда должен выполняться main thread, а вычисления — другими.
Как же все-таки Observable передает данные в Observer? Observer может имплементировать три метода (названия могут слегка отличаться в зависимости от языка):
- next() - вот тебе новое значение из потока;
- error() - вот тебе ошибка, произошедшая в потоке;
- complete() - поток завершен.
В следующем примере функция getData() превращает массив с пивом в Observable и возвращает его. Кому? Подписчику, когда он появится. А подписчик — getData().subscribe(...?) - передает Observer, как аргумент функции subscribe(). Соответственно, Observer состоит из трех функций:
- что делать, когда придет следующий элемент потока;
- что делать, если придет ошибка;
- что делать, если придет сигнал об окончании потока.
// Defining the function with observable function getData(){ var beers = [ {name: "Stella", country: "Belgium", price: 9.50}, {name: "Sam Adams", country: "USA", price: 8.50}, {name: "Bud Light", country: "USA", price: 6.50}, {name: "Brooklyn Lager", country: "USA", price: 8.00}, {name: "Sapporo", country: "Japan", price: 7.50} ]; // The observer will be provided at the time of subscription return Rx.Observable.create( observer => { beers.forEach( beer => observer.next(beer)); observer.complete(); } ); } // Calling the function that subscribe to the observable // The function subscribe() receives the Observer, represented by three functions getData() .subscribe( beer => console.log("Subscriber got " + beer), // handling the arrived data error => console.err(error), // an error arrived () => console.log("The stream is over") // the signal that the stream completed arrived );
Наш Observer состоит из трех fat arrow functions, которые появились в спецификации языка ECMAScript 6. Функции next() и complete() выполнятся только тогда, когда мы вызовем subscribe(). Посмотреть этот пример в действии можно здесь: bit.ly/2jm69aM (откройте консоль броузера и нажмите Run).
Операторы
Операторы — это функции, которыми можно преобразовывать данные между моментом, когда Observable их отправил, и моментом, когда подписчик их получил. Т.е. преобразовываем данные во время движения. В Rx библиотеках операторов много. Больше сотни.
Каждый оператор — это функция, которая принимает Observable на вход, трансформирует полученное значение и выдает новый Observable на выходе. Так как вход и выход любого оператора одного типа (Observable), операторы можно связывать. Вот, например, как можно отфильтровать пиво, которое дешевле, чем 8 у.е. и преобразовать пивные объекты в строки.
Изучение Rx операторов требует времени, и если у читателей ДОУ будет интерес, то я продолжу писать об Rx. Документация Rx библиотек часто включает marble diagrams, которые могут помочь в понимании, что делает конкретный оператор. Вот, например, как иллюстрируется оператор filter:
Входной поток (Observable), представленный разными геометрическими фигурами, фильтруется, чтобы на выходе получить другой поток (Observable), в котором будут только круги.
А как же все-таки сделать c1=a1+b1 реактивным?
Сначала нужно превратить a1 и b1 в потоки, например, так:
const a1 = Rx.Observable.from([2, 55]);
Но этот поток выстрелит 2 и 55 мгновенно, а мы хотим добавить временную составляющую. Для имитации задержки можно использовать отдельный поток, который просто выстреливает числа через определенные интервалы времени, а чтобы связать его с нашим потоком, который выдает 2 и 55, мы используем оператор zip:
const a1 = Rx.Observable.from([2, 55]) .zip(Rx.Observable.interval(1200), x => x);
Когда появится подписчик, наш поток выдаст 2, а через 1.2 секунды — 55. Похожим способом сделаем поток для b1, только с задержкой в полторы секунды. И снова, используя композицию потоков и оператор combineLatest, мы скажем: «Скомбинируй потоки a1 и b1, суммируя их последние значения». Весь код будет выглядеть вот так:
const a1 = Rx.Observable.from([2, 55]) .zip(Rx.Observable.interval(1200), x => x); const b1 = Rx.Observable.from([4, 20]) .zip(Rx.Observable.interval(1500), x => x); a1.combineLatest(b1, (x, y) => x + y) .subscribe(val => console.log("c1=" + val));
Чтобы увидеть этот код в действии, посетите Plunker по адресу bit.ly/2nphn0k , откройте консоль броузера и нажмите кнопку Run. Вы увидите, как c1 будет пересчитываться, как только потоки a1 и b1 будут выдавать новые значения.
В мае я буду проводить трехдневный воркшоп в Киеве по разработке веб приложений с Angular 4, который использует RxJS во многих местах. Зарегистрироваться на воркшоп можно здесь: bit.ly/2n6CoKy . Для читателей ДОУ существует скидка 10%. При регистрации введите промо код dou.
Если вы еще не работали с реактивными библиотеками, советую посмотреть на Rx библиотеку для вашего языка программирования и начать использовать ее в реальных проектах. Rx библиотеки не требуют изменения стиля программирования всего проекта. Используйте их там, где можно сделать так, что асинхронные данные проходят через последовательность алгоритмов (операторов).
Опубліковано: 03/04/17 @ 10:00
Розділ Різне
Рекомендуємо:
Упали позиции в Яндексе? Как определить причину и вернуть сайт в ТОП
Дайджест: как стать веб-разработчиком, рынок фриланса, рэп-роботы
Как посмотреть ссылки на страницу сайта бесплатно
Дайджест: як стати веб-розробником, ринок фріланса, реп-роботи
Підбірка фан-відео від айтішників за 2016 рік