Реактивные расширения для языка GO
Reactivex, или Rx для короткого, является API для программирования с наблюдаемыми потоками. Это официальный API ReactiveX для языка GO.
Reactivex - это новый, альтернативный способ асинхронного программирования для обратных вызовов, обещаний и отложенных. Речь идет о обработке потоков событий или элементов, причем события являются любыми случаями или изменениями в системе. Поток событий называется наблюдаемым.
Оператор - это функция, которая определяет наблюдаемое, как и когда он должен испускать данные. Список охватываемых операторов доступен здесь.
Реализация RXGO основана на концепции трубопроводов. Трубопровод - это серия этапов, соединенных каналами, где каждый этап представляет собой группу goroutines, выполняющих одну и ту же функцию.

Давайте посмотрим на конкретный пример, когда каждая коробка является оператором:
Just Operator.Map .Filter .В этом примере окончательные элементы отправляются в канал, доступный для потребителя. Есть много способов потребления или производства данных с использованием RXGO. Публикация результатов в канале является только одним из них.
Каждый оператор - это этап преобразования. По умолчанию все последовательно. Тем не менее, мы можем использовать современные архитектуры процессора, определяя несколько экземпляров одного и того же оператора. Каждый экземпляр оператора является goroutine, подключенным к общему каналу.
Философия RXGO заключается в реализации концепций ReactiveX и использовании основных примитивов GO (каналы, goroutines и т. Д.), Чтобы интеграция между двумя мирами была максимально плавной.
go get -u github.com/reactivex/rxgo/v2
Давайте создадим наш первый наблюдаемый и потребляем предмет:
observable := rxgo . Just ( "Hello, World!" )()
ch := observable . Observe ()
item := <- ch
fmt . Println ( item . V ) Just Operator создает наблюдаемый из статического списка элементов. Of(value) создает элемент из данного значения. Если мы хотим создать элемент из ошибки, мы должны использовать Error(err) . Это разница с V1, которая принимала значение или ошибку непосредственно без необходимости обернуть его. Какое обоснование этого изменения? Это подготовка Rxgo для функции Generics, которая появится (надеюсь) в Go 2.
Кстати, Just оператор использует каррики в качестве синтаксического сахара. Таким образом, он принимает несколько элементов в первом списке параметров и несколько параметров во втором списке параметров. Ниже мы посмотрим, как указать параметры.
Как только наблюдаемая создается, мы можем наблюдать за ним, используя Observe() . По умолчанию наблюдаемая ленивая в том смысле, что он излучает предметы только после того, как будет сделана подписка. Observe() возвращает <-chan rxgo.Item .
Мы потребляли элемент из этого канала и напечатали его значение элемента, используя item.V
Элемент - это обертка поверх значения или ошибки. Мы можем захотеть проверить, как это сначала так:
item := <- ch
if item . Error () {
return item . E
}
fmt . Println ( item . V ) item.Error() возвращает логическое, указывающее, содержит ли элемент ошибку. Затем мы используем либо item.E , чтобы получить ошибку или item.V , чтобы получить значение.
По умолчанию наблюдаемая останавливается после получения ошибки. Тем не менее, есть специальные операторы, чтобы справиться с ошибками (например, OnError , Retry и т. Д.)
Также можно употреблять элементы, используя обратные вызовы:
observable . ForEach ( func ( v interface {}) {
fmt . Printf ( "received: %v n " , v )
}, func ( err error ) {
fmt . Printf ( "error: %e n " , err )
}, func () {
fmt . Println ( "observable is closed" )
})В этом примере мы прошли три функции:
NextFunc запускается при излучении элемента значения.ErrFunc запускаемый при излучении элемента ошибки.CompletedFunc функция, запускаемое после завершения наблюдаемой. ForEach не блокирует. Тем не менее, он возвращает канал уведомления, который будет закрыт после завершения наблюдаемого. Следовательно, чтобы сделать предыдущий блокировку кода, нам просто нужно использовать <- :
<- observable. ForEach ( ... ) Допустим, мы хотим внедрить поток, который потребляет следующую структуру Customer :
type Customer struct {
ID int
Name , LastName string
Age int
TaxNumber string
} Мы создаем продюсера, который излучит Customer в данное chan rxgo.Item и создаст из него наблюдаемый:
// Create the input channel
ch := make ( chan rxgo. Item )
// Data producer
go producer ( ch )
// Create an Observable
observable := rxgo . FromChannel ( ch )Затем нам нужно выполнить две следующие операции:
Поскольку обогащающий шаг связан с io, может быть интересно параллелизировать его в данном пуле Goroutines. Тем не менее, давайте представим, что все Customer должны быть созданы последовательно на основе его ID .
observable .
Filter ( func ( item interface {}) bool {
// Filter operation
customer := item .( Customer )
return customer . Age > 18
}).
Map ( func ( _ context. Context , item interface {}) ( interface {}, error ) {
// Enrich operation
customer := item .( Customer )
taxNumber , err := getTaxNumber ( customer )
if err != nil {
return nil , err
}
customer . TaxNumber = taxNumber
return customer , nil
},
// Create multiple instances of the map operator
rxgo . WithPool ( pool ),
// Serialize the items emitted by their Customer.ID
rxgo . Serialize ( func ( item interface {}) int {
customer := item .( Customer )
return customer . ID
}), rxgo . WithBufferedChannel ( 1 )) В конце концов, мы потребляем элементы, используя, например, ForEach() или Observe() . Observe() возвращает <-chan Item :
for customer := range observable . Observe () {
if customer . Error () {
return err
}
fmt . Println ( customer )
}В мире RX существует различие между холодными и горячими наблюдаемыми. Когда данные создаются самой наблюдаемой, это холодно наблюдаемое. Когда данные производятся вне наблюдаемого, это горячее наблюдение. Обычно, когда мы не хотим создавать производителя снова и снова, мы предпочитаем горячий наблюдаемый.
В RXGO есть аналогичная концепция.
Во -первых, давайте создадим горячий наблюдаемый, используя оператор FromChannel и увидим последствия:
ch := make ( chan rxgo. Item )
go func () {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
close ( ch )
}()
observable := rxgo . FromChannel ( ch )
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}Результатом этого исполнения является:
0
1
2
Это означает, что первый наблюдатель уже потреблял все предметы. И ничего не осталось для других.
Хотя это поведение может быть изменено с помощью подключаемых наблюдаемых.
Основным моментом здесь является то, что Goroutine произвел эти предметы.
С другой стороны, давайте создадим холодный наблюдаемый, используя оператор Defer :
observable := rxgo . Defer ([]rxgo. Producer { func ( _ context. Context , ch chan <- rxgo. Item ) {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
}})
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}Теперь результат:
0
1
2
0
1
2В случае холодного наблюдаемого поток был создан независимо для каждого наблюдателя.
Опять же, горячие против холодных наблюдаемых не о том, как вы потребляете элементы, а о том, где производятся данные.
Хорошим примером для горячих наблюдений являются тика по ценам от торговой биржи.
И если вы научите наблюдаемых продуктов из базы данных, то вы получите их один за другим, вы создадите холодный наблюдаемый.
Есть еще один оператор под названием FromEventSource , который создает наблюдаемый из канала. Разница между оператором FromChannel заключается в том, что, как только создается наблюдаемая, она начинает излучать элементы независимо от того, есть ли наблюдатель или нет. Следовательно, элементы, излучаемые наблюдаемым без наблюдателя (ы), теряются (в то время как они забуфрированы с оператором FromChannel ).
Вместе с использованием с оператором FromEventSource , например, телеметрия. Возможно, нас не интересует все данные, полученные с самого начала потока - только данные, так как мы начали их наблюдать.
Как только мы начнем наблюдать за наблюдаемой, созданной с помощью FromEventSource , мы можем настроить стратегию обратного давления. По умолчанию это блокирует (существует гарантированная доставка для предметов, излученных после того, как мы его наблюдаем). Мы можем переопределить эту стратегию таким образом:
observable := rxgo . FromEventSource ( input , rxgo . WithBackPressureStrategy ( rxgo . Drop )) Стратегия Drop означает, что если трубопровод после FromEventSource не был готов потреблять предмет, этот предмет отброшен.
По умолчанию операторы, подключающие канал, не забегают. Мы можем переопределить это поведение так:
observable . Map ( transform , rxgo . WithBufferedChannel ( 42 )) У каждого оператора есть параметр opts ...Option Параметр, позволяющий передать такие параметры.
Стратегия наблюдения по умолчанию ленива. Это означает, что оператор обрабатывает элементы, излучаемые наблюдаемыми, как только мы начнем наблюдать за ним. Мы можем изменить это поведение таким образом:
observable := rxgo . FromChannel ( ch ). Map ( transform , rxgo . WithObservationStrategy ( rxgo . Eager )) В этом случае оператор Map запускается всякий раз, когда производится элемент, даже без какого -либо наблюдателя.
По умолчанию каждый оператор является последовательным. Один оператор - один экземпляр Goroutine. Мы можем переопределить его, используя следующий вариант:
observable . Map ( transform , rxgo . WithPool ( 32 )) В этом примере мы создаем пул из 32 Goroutines, которые потребляют элементы одновременно из одного и того же канала. Если операция связана с процессором, мы можем использовать опцию WithCPUPool() , которая создает пул на основе количества логических процессоров.
Подключаемая наблюдаемая напоминает обычную наблюдаемую, за исключением того, что он не начинает излучать элементы, когда он подписан, но только когда его метод Connect (). Таким образом, вы можете подождать, пока все предполагаемые подписчики будут подписаться на наблюдаемая до того, как наблюдаемая начнет излучать предметы.
Давайте создадим подключаемое наблюдение, используя rxgo.WithPublishStrategy :
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())Затем мы создаем двух наблюдателей:
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) + 1 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) * 2 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) Если бы observable не было подключенным наблюдаемым, так как DoOnNext создает наблюдателя, наблюдаемый источник начал бы испускать предметы. Тем не менее, в случае подключенного наблюдаемого, мы должны позвонить Connect() :
observable . Connect () После того, как Connect() вызывается, подключаемая наблюдаемая начинает излучать элементы.
Есть еще одно важное изменение с регулярным наблюдаемым. Связанный наблюдаемый публикует свои элементы. Это означает, что все наблюдатели получают копию элементов.
Вот пример с обычным наблюдаемым:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a regular Observable
observable := rxgo . FromChannel ( ch )
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) First observer: 1
First observer: 2
First observer: 3
Теперь с подключенным наблюдаемым:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a Connectable Observable
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
})
disposed , cancel := observable . Connect ()
go func () {
// Do something
time . Sleep ( time . Second )
// Then cancel the subscription
cancel ()
}()
// Wait for the subscription to be disposed
<- disposed Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
Итерабильный-это объект, который можно наблюдать с использованием Observe(opts ...Option) <-chan Item .
Итерабильный может быть либо:
Документация пакета: https://pkg.go.dev/github.com/reactivex/rxgo/v2
Как использовать API ASSERT для написания модульных тестов при использовании RXGO.
Параметры оператора
Все вклады очень приветствуются! Убедитесь, что вы сначала проверяете руководящие принципы. Новички могут взглянуть на текущие проблемы и проверить help needed этикетку.
Кроме того, если вы опубликуете пост о RXGO, сообщите нам об этом. Мы будем рады включить его в раздел внешних ресурсов.
Спасибо всем людям, которые уже внесли свой вклад в RXGO!
Большое спасибо Jetbrains за поддержку проекта.