GO言語のリアクティブな拡張機能
Reactivex、または略してRXは、観察可能なストリームを使用したプログラミングのAPIです。これは、GO言語の公式Reactivex APIです。
Reactivexは、コールバック、約束、延期する非同期プログラミングの新しい代替方法です。イベントやアイテムのストリームを処理することであり、イベントはシステム内の発生または変更です。イベントのストリームは、観測可能と呼ばれます。
オペレーターは、観察可能なもの、データをどのように、いつ発射するかを定義する関数です。対象となるオペレーターのリストはこちらから入手できます。
RXGOの実装は、パイプラインの概念に基づいています。パイプラインは、チャネルで接続された一連のステージであり、各ステージは同じ関数を実行しているゴルチンのグループです。

各ボックスがオペレーターである具体的な例を見てみましょう。
Just Operatorを使用して、アイテムの固定リストに基づいて静的な観測可能を作成します。Map演算子を使用して、変換関数(円を正方形に変換)を定義します。Filter演算子を使用して各黄色の正方形をろ過します。この例では、最終的なアイテムはチャンネルで送信され、消費者が利用できます。 RXGOを使用してデータを消費したり、データを生成したりする方法はたくさんあります。結果をチャネルで公開することは、その1つにすぎません。
各オペレーターは変換段階です。デフォルトでは、すべてがシーケンシャルです。しかし、同じ演算子の複数のインスタンスを定義することにより、最新のCPUアーキテクチャを活用できます。各オペレーターインスタンスは、共通チャネルに接続されたゴルウチンです。
RXGOの哲学は、Reactivexの概念を実装し、メインGOプリミティブ(チャネル、ゴルチンなど)を活用して、2つの世界間の統合が可能な限りスムーズになるようにすることです。
go get -u github.com/reactivex/rxgo/v2
最初の観察可能なものを作成して、アイテムを消費しましょう。
observable := rxgo . Just ( "Hello, World!" )()
ch := observable . Observe ()
item := <- ch
fmt . Println ( item . V ) Just Operatorは、アイテムの静的リストから観察可能なものを作成します。 Of(value)特定の値からアイテムを作成します。エラーからアイテムを作成する場合は、 Error(err)を使用する必要があります。これは、値またはエラーをラップすることなく直接受け入れていたV1との違いです。この変化の理論的根拠は何ですか? Go 2で(できれば)generics機能のためにRXGOを準備することです。
ちなみに、 Just OperatorはCurryingを構文砂糖として使用します。これにより、最初のパラメーターリストの複数のアイテムと2番目のパラメーターリストに複数のオプションを受け入れます。以下に、オプションを指定する方法をご覧ください。
観察可能なものが作成されたら、 Observe()を使用して観察できます。デフォルトでは、サブスクリプションが作成された後にのみアイテムを放出するという意味で、観察可能は怠zyです。 Observe() a <-chan rxgo.Itemを返します。
このチャネルからアイテムを消費し、 item.Vを使用してアイテムの値を印刷しました。
アイテムは、値またはエラーの上のラッパーです。このように最初にタイプをチェックしたいかもしれません:
item := <- ch
if item . Error () {
return item . E
}
fmt . Println ( item . V ) item.Error()アイテムにエラーが含まれているかどうかを示すブール値を返します。次に、いずれかのitem.Eを使用してエラーまたはitem.Vを取得して値を取得します。
デフォルトでは、エラーが生成されると、観測可能なものが停止します。ただし、エラーに対処するための特別なオペレーター( OnError 、 Retryなど)があります。
コールバックを使用してアイテムを消費することも可能です。
observable . ForEach ( func ( v interface {}) {
fmt . Printf ( "received: %v n " , v )
}, func ( err error ) {
fmt . Printf ( "error: %e n " , err )
}, func () {
fmt . Println ( "observable is closed" )
})この例では、3つの関数を渡しました。
NextFunc 。ErrFunc 。CompletedFunc FUNCがトリガーされました。 ForEach非ブロッキングです。しかし、観察可能な完了が完了すると閉じられる通知チャネルを返します。したがって、以前のコードブロッキングを行うには、単に<-を使用する必要があります。
<- observable. ForEach ( ... )次のCustomer構造を消費するストリームを実装したいとしましょう。
type Customer struct {
ID int
Name , LastName string
Age int
TaxNumber string
}特定のchan rxgo.ItemにCustomerを放出し、そこから観察可能なプロデューサーを作成します。
// Create the input channel
ch := make ( chan rxgo. Item )
// Data producer
go producer ( ch )
// Create an Observable
observable := rxgo . FromChannel ( ch )次に、次の2つの操作を実行する必要があります。
濃縮ステップはIOバウンドであるため、Goroutinesの特定のプール内でそれを並列化することは興味深いかもしれません。それでも、すべてのCustomerアイテムをそのIDに基づいて順番に作成する必要があると想像しましょう。
observable .
Filter ( func ( item interface {}) bool {
// Filter operation
customer := item .( Customer )
return customer . Age > 18
}).
Map ( func ( _ context. Context , item interface {}) ( interface {}, error ) {
// Enrich operation
customer := item .( Customer )
taxNumber , err := getTaxNumber ( customer )
if err != nil {
return nil , err
}
customer . TaxNumber = taxNumber
return customer , nil
},
// Create multiple instances of the map operator
rxgo . WithPool ( pool ),
// Serialize the items emitted by their Customer.ID
rxgo . Serialize ( func ( item interface {}) int {
customer := item .( Customer )
return customer . ID
}), rxgo . WithBufferedChannel ( 1 ))最終的に、たとえばForEach()またはObserve()を使用してアイテムを消費します。 Observe() <-chan Itemを返します:
for customer := range observable . Observe () {
if customer . Error () {
return err
}
fmt . Println ( customer )
}RXの世界では、寒さと高温の観測可能性には区別があります。データが観察可能な自体によって生成される場合、それはコールドオーバーサバブルです。データが観察可能な外で生成されると、それは観察可能なホットです。通常、プロデューサーを何度も作りたくないときは、熱い観察可能なものを好みます。
RXGOには、同様の概念があります。
まず、 FromChannelオペレーターを使用してHot Observableを作成し、その意味を確認しましょう。
ch := make ( chan rxgo. Item )
go func () {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
close ( ch )
}()
observable := rxgo . FromChannel ( ch )
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}この実行の結果は次のとおりです。
0
1
2
これは、最初のオブザーバーがすでにすべてのアイテムを消費していることを意味します。そして、他の人には何も残っていません。
ただし、この動作は、接続可能な観測可能性で変更できます。
ここでの主なポイントは、ゴルウチンがこれらのアイテムを生産したことです。
一方、 Defer Operatorを使用してCold Observableを作成しましょう。
observable := rxgo . Defer ([]rxgo. Producer { func ( _ context. Context , ch chan <- rxgo. Item ) {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
}})
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}さて、結果は次のとおりです。
0
1
2
0
1
2風邪の観察可能な場合、ストリームはすべてのオブザーバーに対して独立して作成されました。
繰り返しますが、ホット対コールドオブザーバブルは、アイテムの消費方法に関するものではなく、データが生成される場所に関するものです。
ホットオブザーバブルの良い例は、取引交換からの価格帯です。
また、データベースから製品を取得するために観察可能なものを教えてから、それらを1つずつ生成する場合は、コールドオーバーサバブルを作成します。
チャネルから観察可能なものを作成するFromEventSourceという別のオペレーターがあります。 FromChannelオペレーターの違いは、観察可能なものが作成されるとすぐに、オブザーバーがいるかどうかに関係なくアイテムを放出し始めることです。したがって、オブザーバーなしで観察可能なものによって放出されるアイテムは失われます( FromChannelオペレーターでバッファリングされています)。
FromEventSourceオペレーターのユースケースは、たとえばテレメトリです。ストリームの最初から生成されたすべてのデータに興味がない場合があります。これは、データを観察し始めてからデータのみです。
FromEventSourceで作成された観測可能な観察を開始したら、逆圧力戦略を構成できます。デフォルトでは、ブロックしています(観察した後に放出されるアイテムの配信が保証されています)。このようにこの戦略をオーバーライドできます。
observable := rxgo . FromEventSource ( input , rxgo . WithBackPressureStrategy ( rxgo . Drop )) Drop戦略は、 FromEventSource後のパイプラインがアイテムを消費する準備ができていなかった場合、このアイテムが削除されることを意味します。
デフォルトでは、チャネル接続演算子はバッファーされていません。このような動作をオーバーライドできます。
observable . Map ( transform , rxgo . WithBufferedChannel ( 42 ))各オペレーターには、そのようなオプションを渡すことができるopts ...Optionパラメーターがあります。
デフォルトの観察戦略は怠zyです。これは、観察を開始すると、オペレーターが観察可能なものによって放出されるアイテムを処理することを意味します。このようにこの動作を変更できます。
observable := rxgo . FromChannel ( ch ). Map ( transform , rxgo . WithObservationStrategy ( rxgo . Eager ))この場合、オブザーバーがなくても、アイテムが生成されるたびにMap演算子がトリガーされます。
デフォルトでは、各演算子はシーケンシャルです。 1人のオペレーターが1つのゴルウチンインスタンスです。次のオプションを使用してオーバーライドできます。
observable . Map ( transform , rxgo . WithPool ( 32 ))この例では、同じチャネルからアイテムを同時に消費する32のゴルチンのプールを作成します。操作がCPUバウンドの場合、論理CPUの数に基づいてプールを作成するWithCPUPool()オプションを使用できます。
接続可能な観察可能なものは、通常の観測可能なものに似ていますが、サブスクライブ時にアイテムを放射し始めませんが、Connect()メソッドが呼び出された場合のみです。このようにして、観察可能なものがアイテムを発し始める前に、意図したすべてのサブスクライバーが観測可能なものを購読するのを待つことができます。
rxgo.WithPublishStrategyを使用して、接続可能な観察可能なものを作成しましょう。
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())次に、2人のオブザーバーを作成します。
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) + 1 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) * 2 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) observable接続可能な観測可能でない場合、 DoOnNextオブザーバーを作成するように、ソースのソースはアイテムを放射し始めていたでしょう。しかし、接続可能な観察可能な場合、 Connect()を呼び出す必要があります。
observable . Connect () Connect()が呼び出されると、接続可能な観測可能なものがアイテムを発し始めます。
通常の観察可能なものには別の重要な変更があります。接続可能な観察可能なものがアイテムを公開します。これは、すべてのオブザーバーがアイテムのコピーを受け取ることを意味します。
これが通常の観察可能な例です。
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a regular Observable
observable := rxgo . FromChannel ( ch )
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) First observer: 1
First observer: 2
First observer: 3
今、接続可能な観察可能なものを使用してください:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a Connectable Observable
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
})
disposed , cancel := observable . Connect ()
go func () {
// Do something
time . Sleep ( time . Second )
// Then cancel the subscription
cancel ()
}()
// Wait for the subscription to be disposed
<- disposed Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
反復可能はObserve(opts ...Option) <-chan Itemを使用して観察できるオブジェクトです。
繰り返し可能なことは次のとおりです。
パッケージのドキュメント:https://pkg.go.dev/github.com/reactivex/rxgo/v2
ASSERT APIを使用して、RXGOの使用中にユニットテストを書き込む方法。
オペレーターオプション
すべての貢献は大歓迎です!最初に貢献ガイドラインを確認してください。新参者は、進行中の問題を見て、 help neededラベルを確認することができます。
また、RXGOに関する投稿を公開する場合は、お知らせください。外部リソースセクションに含めることを嬉しく思います。
すでにRXGOに貢献してくれたすべての人々に感謝します!
プロジェクトをサポートしてくれたJetBrainsに感謝します。