GO語言的反應性擴展
ReactiveX(簡稱Reactivex)是用於使用可觀察流的編程的API。這是GO語言的官方Ractivex API。
Reactivex是一種新的,替代的方式,用於回調,承諾和延期。這是關於處理事件或項目的流,事件是系統內的任何發生或更改。事件流稱為可觀察的。
操作員是定義可觀察到的函數,如何以及何時發出數據。涵蓋的操作員列表可在此處找到。
RXGO實現基於管道的概念。管道是通過通道連接的一系列階段,每個階段都是運行相同功能的一組goroutines。

讓我們看看一個具體的示例,每個框是操作員:
Just Operator創建一個基於固定項目列表的靜態觀察。Map運算符定義轉換函數(轉換為正方形)。Filter操作員過濾每個黃色正方形。在此示例中,最終項目以頻道發送,可用於消費者。有很多方法可以使用RXGO消費或生產數據。在頻道中發布結果只是其中之一。
每個操作員都是轉換階段。默認情況下,一切都是順序的。但是,我們可以通過定義同一操作員的多個實例來利用現代CPU架構。每個操作員實例是連接到公共通道的goroutine。
RXGO的理念是實施Ractivex概念並利用主要的GO原始詞(頻道,goroutines等),以便兩個世界之間的集成盡可能順利。
go get -u github.com/reactivex/rxgo/v2
讓我們創建我們的第一個可觀察並消耗項目:
observable := rxgo . Just ( "Hello, World!" )()
ch := observable . Observe ()
item := <- ch
fmt . Println ( item . V ) Just操作員從靜態項目列表中創建一個可觀察到的。 Of(value)從給定值創建項目。如果我們要從錯誤創建項目,則必須使用Error(err) 。這是與直接接受值或錯誤無需包裝的V1的差異。這種變化的理由是什麼?它是為了在第2期中(希望)準備RXGO的RXGO。
順便說一句, Just Operator使用咖哩作為句法糖。這樣,它在第一個參數列表中接受多個項目,第二個參數列表中的多個選項。我們將在下面查看如何指定選項。
一旦創建了可觀察到的,我們就可以使用Observe()觀察它。默認情況下,一個可觀察到的懶惰是因為它僅在訂閱後才發出項目。 Observe()返回<-chan rxgo.Item 。
我們從該通道中消費了一個項目,並使用item.V打印了其物品的價值。
項目是值或錯誤之上的包裝器。我們可能想先檢查一下類型:
item := <- ch
if item . Error () {
return item . E
}
fmt . Println ( item . V ) item.Error()返回布爾值,指示項目是否包含錯誤。然後,我們使用任何item.E獲取錯誤或item.V以獲取值。
默認情況下,一旦產生錯誤,就會停止可觀察到的。但是,有特別操作員可以處理錯誤(例如, OnError , Retry等)
也可以使用回調食用項目:
observable . ForEach ( func ( v interface {}) {
fmt . Printf ( "received: %v n " , v )
}, func ( err error ) {
fmt . Printf ( "error: %e n " , err )
}, func () {
fmt . Println ( "observable is closed" )
})在此示例中,我們傳遞了三個功能:
NextFunc 。ErrFunc 。CompletedFunc 。 ForEach是無障礙的。但是,它返回一個通知渠道,一旦可觀察到完成,該通知渠道將關閉。因此,要使先前的代碼阻止,我們只需要使用<- ::
<- observable. ForEach ( ... )假設我們要實施一個消耗以下Customer結構的流:
type Customer struct {
ID int
Name , LastName string
Age int
TaxNumber string
}我們創建了一個生產商,將Customer s散發到給定的chan rxgo.Item上,並從中創建一個可觀察的:
// Create the input channel
ch := make ( chan rxgo. Item )
// Data producer
go producer ( ch )
// Create an Observable
observable := rxgo . FromChannel ( ch )然後,我們需要執行以下兩個操作:
由於豐富的步驟是io的結合,因此將其在給定的goroutines池中並行化可能很有趣。但是,讓我們想像所有Customer物品都需要根據其ID依次生產。
observable .
Filter ( func ( item interface {}) bool {
// Filter operation
customer := item .( Customer )
return customer . Age > 18
}).
Map ( func ( _ context. Context , item interface {}) ( interface {}, error ) {
// Enrich operation
customer := item .( Customer )
taxNumber , err := getTaxNumber ( customer )
if err != nil {
return nil , err
}
customer . TaxNumber = taxNumber
return customer , nil
},
// Create multiple instances of the map operator
rxgo . WithPool ( pool ),
// Serialize the items emitted by their Customer.ID
rxgo . Serialize ( func ( item interface {}) int {
customer := item .( Customer )
return customer . ID
}), rxgo . WithBufferedChannel ( 1 ))最後,我們使用ForEach()或Observe()消費項目。 Observe()返回<-chan Item :
for customer := range observable . Observe () {
if customer . Error () {
return err
}
fmt . Println ( customer )
}在RX世界中,冷和熱可觀察物之間存在區別。當數據由可觀察到的本身產生時,它是可觀察的。當可觀察到的數據在可觀察的外部產生時,它是可觀察的。通常,當我們不想一遍又一遍地創建生產者時,我們會贊成一個可觀察到的熱門人物。
在RXGO中,也有類似的概念。
首先,讓我們使用FromChannel Operator創建一個可觀察的熱觀察,並查看含義:
ch := make ( chan rxgo. Item )
go func () {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
close ( ch )
}()
observable := rxgo . FromChannel ( ch )
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}此執行的結果是:
0
1
2
這意味著第一個觀察者已經消耗了所有項目。沒有其他人留給別人。
儘管可以使用可連接的可觀察物來改變此行為。
這裡的要點是Goroutine生產了這些項目。
另一方面,讓我們使用Defer操作員創建一個可冷觀察的人:
observable := rxgo . Defer ([]rxgo. Producer { func ( _ context. Context , ch chan <- rxgo. Item ) {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
}})
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}現在,結果是:
0
1
2
0
1
2在可觀察的冷,為每個觀察者獨立創建了該流。
同樣,熱與冷可觀察物質無關您消耗商品,而是關於數據的生成在何處。
熱門觀察的好例子是交易交易所的價格滴答。
而且,如果您教一個可觀察的可以從數據庫中獲取產品,然後將它們一個一個一個屈服,您將創建可觀察的冷。
還有另一個名為FromEventSource的操作員,它可以從頻道創建可觀察到的。 FromChannel運算符之間的區別在於,一旦創建了可觀察到的,它就會開始散發項目,無論是否有觀察者。因此,丟失了沒有觀察者的觀察員發出的項目(而在FromChannel操作員則緩衝時)。
FromEventSource操作員的用例是遙測。我們可能對從流的一開始產生的所有數據不感興趣,這是我們開始觀察的數據。
一旦我們開始觀察一個使用FromEventSource創建的可觀察到的,我們就可以配置背壓策略。默認情況下,它正在阻止(我們觀察到它後發出的物品保證了交付)。我們可以這樣覆蓋這種策略:
observable := rxgo . FromEventSource ( input , rxgo . WithBackPressureStrategy ( rxgo . Drop )) Drop策略意味著,如果FromEventSource之後的管道還沒有準備好食用物品,則該項目將被刪除。
默認情況下,連接運算符的通道連接不緩和。我們可以這樣覆蓋這種行為:
observable . Map ( transform , rxgo . WithBufferedChannel ( 42 ))每個操作員都有一個opts ...Option參數允許通過此類選項。
默認的觀察策略很懶惰。這意味著操作員一旦我們開始觀察,操作員就可以通過可觀察到的物品進行處理。我們可以這樣改變這種行為:
observable := rxgo . FromChannel ( ch ). Map ( transform , rxgo . WithObservationStrategy ( rxgo . Eager ))在這種情況下,即使沒有任何觀察者,每當生成項目時,都會觸發Map運算符。
默認情況下,每個操作員都是順序的。一個操作員是一個goroutine實例。我們可以使用以下選項對其進行覆蓋:
observable . Map ( transform , rxgo . WithPool ( 32 ))在此示例中,我們創建了一個32個Goroutines的池,該池從同一通道同時消費項目。如果操作是cpu結合的,我們可以使用WithCPUPool()選項,該選項基於邏輯CPU的數量創建池。
可連接的可觀察到的類似於普通觀察的可觀察,除了它在訂閱時不會開始發射項目,而只有在調用其連接()方法時。這樣,您可以等待所有預期的訂戶在可觀察到的發射項目之前訂閱可觀察到的可觀察到的。
讓我們使用rxgo.WithPublishStrategy創建一個可連接的可觀察的連接:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())然後,我們創建了兩個觀察者:
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) + 1 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) * 2 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
})如果observable是可觀察的,那麼DoOnNext創建了一個觀察者,那麼可觀察到的源將開始發出項目。但是,在可連接可觀察的情況下,我們必須致電Connect() :
observable . Connect ()一旦調用了Connect() ,可連接的可觀察到開始發出項目。
常規可觀察到還有另一個重要的變化。可連接的可觀察到其項目。這意味著所有觀察者都會收到項目的副本。
這是一個常規觀察的示例:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a regular Observable
observable := rxgo . FromChannel ( ch )
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) First observer: 1
First observer: 2
First observer: 3
現在,具有可觀察的可連接:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a Connectable Observable
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
})
disposed , cancel := observable . Connect ()
go func () {
// Do something
time . Sleep ( time . Second )
// Then cancel the subscription
cancel ()
}()
// Wait for the subscription to be disposed
<- disposed Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
一個可以使用Observe(opts ...Option) <-chan Item觀察的對象。
一個可以是:
軟件包文檔:https://pkg.go.dev/github.com/reactivex/rxgo/v2
在使用RXGO時,如何使用斷言API來編寫單元測試。
操作員選項
所有貢獻都非常歡迎!請確保首先查看貢獻指南。新移民可以查看正在進行的問題,並檢查help needed標籤。
另外,如果您發表有關RXGO的帖子,請告訴我們。我們很高興將其包括在外部資源部分。
感謝所有已經為RXGO做出貢獻的人!
非常感謝Jetbrains支持該項目。