Reaktive Erweiterungen für die Go -Sprache
Reactivex oder kurz RX ist eine API für die Programmierung mit beobachtbaren Strömen. Dies ist die offizielle Reactivex -API für die Go -Sprache.
Reactivex ist eine neue, alternative Art der asynchronen Programmierung für Rückrufe, Versprechen und aufgeschobene. Es geht darum, Strom von Ereignissen oder Elementen zu verarbeiten, wobei Ereignisse Ereignisse oder Änderungen innerhalb des Systems sind. Ein Strom von Ereignissen wird als beobachtbar bezeichnet.
Ein Operator ist eine Funktion, die ein beobachtbares, wie und wann er Daten ausgeben sollte. Die Liste der abgedeckten Betreiber ist hier verfügbar.
Die RXGO -Implementierung basiert auf dem Konzept der Pipelines. Eine Pipeline ist eine Reihe von Stufen, die durch Kanäle verbunden sind, wobei jede Stufe eine Gruppe von Goroutinen ist, die dieselbe Funktion ausführen.

Lassen Sie uns ein konkretes Beispiel sehen, wobei jede Box ein Bediener ist:
Just Bedieners basiert.Map .Filter .In diesem Beispiel werden die endgültigen Elemente in einem Kanal gesendet, der einem Verbraucher zur Verfügung steht. Es gibt viele Möglichkeiten, Daten mit RXGO zu konsumieren oder zu produzieren. Die Veröffentlichung der Ergebnisse in einem Kanal ist nur einer von ihnen.
Jeder Bediener ist eine Transformationsphase. Standardmäßig ist alles sequentiell. Wir können jedoch moderne CPU -Architekturen nutzen, indem wir mehrere Instanzen desselben Bedieners definieren. Jede Operatorinstanz ist eine mit einem gemeinsame Kanal verbundene Goroutine.
Die Philosophie von RXGO besteht darin, die Reaktivkonzepte zu implementieren und die Haupt -GO -Primitiven (Kanäle, Goroutinen usw.) zu nutzen, damit die Integration zwischen den beiden Welten so glatt wie möglich ist.
go get -u github.com/reactivex/rxgo/v2
Erstellen wir unseren ersten Beobachtbaren und konsumieren Sie einen Artikel:
observable := rxgo . Just ( "Hello, World!" )()
ch := observable . Observe ()
item := <- ch
fmt . Println ( item . V ) Der Just Operator erstellt aus einer statischen Liste von Elementen eine beobachtbare. Of(value) erstellt ein Element aus einem bestimmten Wert. Wenn wir ein Element aus einem Fehler erstellen möchten, müssen wir Error(err) verwenden. Dies ist ein Unterschied zum V1, der einen Wert oder einen Fehler direkt akzeptierte, ohne ihn einwickeln zu müssen. Was ist der Grund für diese Veränderung? Es soll RxGO für die Generika -Funktion (hoffentlich) in Go 2 vorbereiten.
Übrigens verwendet der Just Bediener Currying als syntaktischer Zucker. Auf diese Weise akzeptiert es mehrere Elemente in der ersten Parameterliste und mehrere Optionen in der zweiten Parameterliste. Wir werden unten sehen, wie Sie Optionen angeben.
Sobald das Beobachtbare erzeugt wird, können wir es mit Observe() beobachten. Standardmäßig ist ein Beobachtbar in dem Sinne faul, dass es Elemente nur dann ausgibt, sobald ein Abonnement erstellt wurde. Observe() gibt einen <-chan rxgo.Item zurück.
Wir haben einen Artikel aus diesem Kanal konsumiert und seinen Wert des Elements mit item.V gedruckt.
Ein Element ist ein Wrapper über einem Wert oder einem Fehler. Möglicherweise möchten wir den Typ zuerst wie folgt überprüfen:
item := <- ch
if item . Error () {
return item . E
}
fmt . Println ( item . V ) item.Error() gibt einen Booleschen zurück, der angibt, ob ein Element einen Fehler enthält. Dann verwenden wir entweder item.E um den Fehler oder item.V zu erhalten. V, um den Wert zu erhalten.
Standardmäßig wird ein Beobachtbar gestoppt, sobald ein Fehler erzeugt wurde. Es gibt jedoch spezielle Betreiber, die mit Fehlern umgehen können (z. B. OnError , Retry usw.)
Es ist auch möglich, Elemente mit Callbacks zu konsumieren:
observable . ForEach ( func ( v interface {}) {
fmt . Printf ( "received: %v n " , v )
}, func ( err error ) {
fmt . Printf ( "error: %e n " , err )
}, func () {
fmt . Println ( "observable is closed" )
})In diesem Beispiel haben wir drei Funktionen bestanden:
NextFunc , der ausgelöst wird, wenn ein Wertelement emittiert wird.ErrFunc , der ausgelöst wird, wenn ein Fehlerelement emittiert wird.CompletedFunc , der nach Abschluss des Beobachtbaren ausgelöst wird. ForEach ist nicht blockiert. Es gibt jedoch einen Benachrichtigungskanal zurück, der nach Abschluss des Beobachtbaren geschlossen wird. Um den vorherigen Code zu blockieren, müssen wir einfach <- :
<- observable. ForEach ( ... ) Angenommen, wir möchten einen Stream implementieren, der die folgende Customer verbraucht:
type Customer struct {
ID int
Name , LastName string
Age int
TaxNumber string
} Wir erstellen einen Produzenten, der Customer an ein bestimmtes chan rxgo.Item ausgibt.
// Create the input channel
ch := make ( chan rxgo. Item )
// Data producer
go producer ( ch )
// Create an Observable
observable := rxgo . FromChannel ( ch )Dann müssen wir die beiden folgenden Operationen ausführen:
Da der bereichernde Schritt IO-gebunden ist, könnte es interessant sein, ihn in einem bestimmten Pool von Goroutinen zu parallelisieren. Stellen Sie uns jedoch vor, dass alle Customer nacheinander basierend auf seiner ID hergestellt werden müssen.
observable .
Filter ( func ( item interface {}) bool {
// Filter operation
customer := item .( Customer )
return customer . Age > 18
}).
Map ( func ( _ context. Context , item interface {}) ( interface {}, error ) {
// Enrich operation
customer := item .( Customer )
taxNumber , err := getTaxNumber ( customer )
if err != nil {
return nil , err
}
customer . TaxNumber = taxNumber
return customer , nil
},
// Create multiple instances of the map operator
rxgo . WithPool ( pool ),
// Serialize the items emitted by their Customer.ID
rxgo . Serialize ( func ( item interface {}) int {
customer := item .( Customer )
return customer . ID
}), rxgo . WithBufferedChannel ( 1 )) Am Ende konsumieren wir die Elemente zum Beispiel mit ForEach() oder Observe() . Observe() gibt einen <-chan Item zurück:
for customer := range observable . Observe () {
if customer . Error () {
return err
}
fmt . Println ( customer )
}In der RX -Welt gibt es eine Unterscheidung zwischen kaltem und heißem Observablen. Wenn die Daten vom Beobachtbaren selbst erzeugt werden, ist sie kaltbeobachtbar. Wenn die Daten außerhalb des Beobachtbaren erzeugt werden, ist sie heiß beobachtbar. Wenn wir nicht immer wieder einen Produzenten erstellen wollen, bevorzugen wir einen heißen Beobachtbaren.
In RxGO gibt es ein ähnliches Konzept.
Erstellen wir zunächst ein heißes Observable mit FromChannel -Operator und sehen Sie die Auswirkungen an:
ch := make ( chan rxgo. Item )
go func () {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
close ( ch )
}()
observable := rxgo . FromChannel ( ch )
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}Das Ergebnis dieser Ausführung ist:
0
1
2
Dies bedeutet, dass der erste Beobachter bereits alle Gegenstände konsumiert hat. Und nichts übrig für andere.
Obwohl dieses Verhalten mit verbindbaren Observablen verändert werden kann.
Der Hauptpunkt hier ist die erzeugte Goroutine, die diese Gegenstände produziert.
Lassen Sie uns andererseits einen kalten Beobachtungsbetrieb unter Verwendung von Defer Operator erstellen:
observable := rxgo . Defer ([]rxgo. Producer { func ( _ context. Context , ch chan <- rxgo. Item ) {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
}})
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}Jetzt ist das Ergebnis:
0
1
2
0
1
2Bei einem kalten Beobachtbaren wurde der Strom für jeden Beobachter unabhängig voneinander erstellt.
Auch hier geht es bei Hot vs Cold Observables nicht darum, wie Sie Elemente konsumieren, sondern dass Daten erstellt werden.
Gutes Beispiel für Hot Observable sind Preis -Zecken von einer Handelsbörse.
Und wenn Sie ein Beobachtungsbetrag für das Abrufen von Produkten aus einer Datenbank unterrichten, dann erstellen Sie das Kaltbeobachtbar .
Es gibt einen anderen Operator namens FromEventSource , der einen beobachtbaren aus einem Kanal erzeugt. Der Unterschied zwischen FromChannel -Operator besteht darin, dass sobald das Beobachtbare erstellt wird, es anfängt, Elemente zu emittieren, unabhängig davon, ob es einen Beobachter gibt oder nicht. Daher gehen die von einem beobachtbaren ohne Beobachter ausgestrahlten Elemente verloren (während sie mit FromChannel -Operator gepuffert werden).
Ein Anwendungsfall mit FromEventSource -Operator ist beispielsweise Telemetrie. Möglicherweise interessieren wir uns nicht für alle Daten, die von Beginn eines Streams erstellt wurden - nur die Daten, seit wir angefangen haben, sie zu beobachten.
Sobald wir anfangen, ein mit FromEventSource erstellter beobachtbarer Beobachtung zu beobachten, können wir die Backdruckstrategie konfigurieren. Standardmäßig blockiert es (es gibt eine garantierte Lieferung für die Artikel, die nach dem Beobachtung emittiert werden). Wir können diese Strategie auf diese Weise überschreiben:
observable := rxgo . FromEventSource ( input , rxgo . WithBackPressureStrategy ( rxgo . Drop )) Die Drop -Strategie bedeutet, dass, wenn die Pipeline nach FromEventSource nicht bereit war, einen Artikel zu konsumieren, dieser Artikel fallen gelassen wird.
Standardmäßig ist ein Kanalverbindungsoperatoren nicht gepuffert. Wir können dieses Verhalten wie dieses überschreiben:
observable . Map ( transform , rxgo . WithBufferedChannel ( 42 )) Jeder Bediener verfügt über einen Optionsparameter opts ...Option der solche Optionen übergeben kann.
Die Standardbeobachtungsstrategie ist faul. Dies bedeutet, dass ein Bediener die von einem beobachtbaren Elemente verarbeitet, sobald wir sie beobachten. Wir können dieses Verhalten auf diese Weise ändern:
observable := rxgo . FromChannel ( ch ). Map ( transform , rxgo . WithObservationStrategy ( rxgo . Eager )) In diesem Fall wird der Map immer dann ausgelöst, wenn ein Element produziert wird, auch ohne Beobachter.
Standardmäßig ist jeder Bediener sequentiell. Ein Operator ist eine Goroutine -Instanz. Wir können es mit der folgenden Option überschreiben:
observable . Map ( transform , rxgo . WithPool ( 32 )) In diesem Beispiel erstellen wir einen Pool von 32 Goroutinen, die Elemente gleichzeitig aus demselben Kanal konsumieren. Wenn der Vorgang CPU-gebunden ist, können wir die Option WithCPUPool() verwenden, mit der ein Pool basierend auf der Anzahl der logischen CPUs erstellt wird.
Ein verbindbares Observable ähnelt einem gewöhnlichen Beobachtbaren, außer dass es nicht beginnt, Elemente zu emittieren, wenn es abonniert wird, sondern nur dann, wenn seine Connect () -Methode aufgerufen wird. Auf diese Weise können Sie darauf warten, dass alle beabsichtigten Abonnenten das Beobachtbare abonnieren, bevor das beobachtbare Elemente beginnt.
Erstellen wir ein verbindbares Observable mit rxgo.WithPublishStrategy :
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())Dann erstellen wir zwei Beobachter:
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) + 1 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) * 2 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) Wenn observable nicht ein verbindbares Observable wäre, wie DoOnNext einen Beobachter erzeugt, hätte die beobachtbare Quelle begonnen, Elemente zu emittieren. Im Falle eines verbindbaren Beobachtbaren müssen wir jedoch Connect() anrufen:
observable . Connect () Sobald Connect() aufgerufen wird, beginnt das verbindbare Observable, Elemente zu emittieren.
Es gibt eine weitere wichtige Änderung bei einem regulären Beobachtbaren. Ein verbindbares Observable veröffentlicht seine Elemente. Dies bedeutet, dass alle Beobachter eine Kopie der Artikel erhalten.
Hier ist ein Beispiel mit einem regulären Beobachtbaren:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a regular Observable
observable := rxgo . FromChannel ( ch )
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) First observer: 1
First observer: 2
First observer: 3
Nun mit einem verbindbaren Beobachtbaren:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a Connectable Observable
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
})
disposed , cancel := observable . Connect ()
go func () {
// Do something
time . Sleep ( time . Second )
// Then cancel the subscription
cancel ()
}()
// Wait for the subscription to be disposed
<- disposed Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
Ein iterierbares ist ein Objekt, das mit Observe(opts ...Option) <-chan Item beobachtet werden kann.
Ein iterierbares kann entweder sein:
Paketdokumentation: https://pkg.go.dev/github.com/reactivex/rxgo/v2
So verwenden Sie die Assert -API, um Unit -Tests während der Verwendung von RxGO zu schreiben.
Bedieneroptionen
Alle Beiträge sind sehr willkommen! Stellen Sie sicher, dass Sie zuerst die beitragenden Richtlinien überprüfen. Neuankömmlinge können sich laufende Probleme ansehen und nach dem help needed Hilfetikett suchen.
Wenn Sie einen Beitrag über RxGO veröffentlichen, teilen Sie uns bitte dies mit. Wir würden uns freuen, es in den Abschnitt External Resources aufzunehmen.
Vielen Dank an alle Leute, die bereits zu RXGO beigetragen haben!
Ein großes Dankeschön an JetBrains für die Unterstützung des Projekts.