Extensions réactives pour la langue go
Reactivex, ou RX pour faire court, est une API pour la programmation avec des flux observables. Il s'agit de l'API officielle Reactivex pour la langue Go.
ReactiveX est une nouvelle manière alternative de programmation asynchrone aux rappels, promesses et différés. Il s'agit de traiter des flux d'événements ou d'éléments, les événements étant des événements ou des changements dans le système. Un flux d'événements est appelé observable.
Un opérateur est une fonction qui définit un observable, comment et quand il devrait émettre des données. La liste des opérateurs couvertes est disponible ici.
L'implémentation RXGO est basée sur le concept de pipelines. Un pipeline est une série d'étapes connectées par les canaux, où chaque étape est un groupe de Goroutines exécutant la même fonction.

Voyons un exemple en béton avec chaque boîte étant un opérateur:
Just .Map .Filter .Dans cet exemple, les éléments finaux sont envoyés dans un canal, disponible pour un consommateur. Il existe de nombreuses façons de consommer ou de produire des données à l'aide de RXGO. Publier les résultats dans un canal n'est que l'un d'entre eux.
Chaque opérateur est une étape de transformation. Par défaut, tout est séquentiel. Pourtant, nous pouvons tirer parti des architectures CPU modernes en définissant plusieurs instances du même opérateur. Chaque instance d'opérateur étant une goroutine connectée à un canal commun.
La philosophie de RXGO est de mettre en œuvre les concepts réactives et de tirer parti des principales primitives GO (canaux, goroutines, etc.) afin que l'intégration entre les deux mondes soit aussi fluide que possible.
go get -u github.com/reactivex/rxgo/v2
Créons notre premier observable et consumons un élément:
observable := rxgo . Just ( "Hello, World!" )()
ch := observable . Observe ()
item := <- ch
fmt . Println ( item . V ) L'opérateur Just crée un observable à partir d'une liste statique d'éléments. Of(value) crée un élément à partir d'une valeur donnée. Si nous voulons créer un élément à partir d'une erreur, nous devons utiliser Error(err) . C'est une différence avec le V1 qui acceptait une valeur ou une erreur directement sans avoir à l'envelopper. Quelle est la justification de ce changement? Il s'agit de préparer RXGO pour la fonctionnalité générique à venir (espérons-le) dans Go 2.
Soit dit en passant, l'opérateur Just utilise le curry comme sucre syntaxique. De cette façon, il accepte plusieurs éléments dans la première liste de paramètres et plusieurs options dans la deuxième liste de paramètres. Nous verrons ci-dessous comment spécifier les options.
Une fois l'observable créé, nous pouvons l'observer en utilisant Observe() . Par défaut, un observable est paresseux dans le sens où il n'émet des éléments qu'une fois qu'un abonnement est fait. Observe() renvoie un <-chan rxgo.Item .
Nous avons consommé un élément de ce canal et imprimé sa valeur de l'élément à l'aide de item.V .
Un élément est un wrapper au-dessus d'une valeur ou d'une erreur. Nous voulons peut-être d'abord vérifier le type comme ceci:
item := <- ch
if item . Error () {
return item . E
}
fmt . Println ( item . V ) item.Error() Renvoie un booléen indiquant si un élément contient une erreur. Ensuite, nous utilisons l'un ou l'autre item.E pour obtenir l'erreur ou item.V pour obtenir la valeur.
Par défaut, un observable est arrêté une fois qu'une erreur est produite. Cependant, il existe des opérateurs spéciaux pour faire face aux erreurs (par exemple, OnError , Retry , etc.)
Il est également possible de consommer des éléments à l'aide de rappels:
observable . ForEach ( func ( v interface {}) {
fmt . Printf ( "received: %v n " , v )
}, func ( err error ) {
fmt . Printf ( "error: %e n " , err )
}, func () {
fmt . Println ( "observable is closed" )
})Dans cet exemple, nous avons passé trois fonctions:
NextFunc a déclenché lorsqu'un élément de valeur est émis.ErrFunc a déclenché lorsqu'un élément d'erreur est émis.CompletedFunc a déclenché une fois l'observable terminé. ForEach n'est pas bloquant. Pourtant, il renvoie un canal de notification qui sera fermé une fois l'observable terminé. Par conséquent, pour faire le blocage du code précédent, nous devons simplement utiliser <- ::
<- observable. ForEach ( ... ) Disons que nous voulons implémenter un flux qui consomme la structure Customer suivante:
type Customer struct {
ID int
Name , LastName string
Age int
TaxNumber string
} Nous créons un producteur qui émettra Customer à un chan rxgo.Item donné et en créera un observable:
// Create the input channel
ch := make ( chan rxgo. Item )
// Data producer
go producer ( ch )
// Create an Observable
observable := rxgo . FromChannel ( ch )Ensuite, nous devons effectuer les deux opérations suivantes:
Étant donné que l'étape enrichissante est liée à IO, il pourrait être intéressant de la paralléliser dans un pool donné de Goroutines. Pourtant, imaginons que tous les éléments Customer doivent être produits séquentiellement en fonction de son ID .
observable .
Filter ( func ( item interface {}) bool {
// Filter operation
customer := item .( Customer )
return customer . Age > 18
}).
Map ( func ( _ context. Context , item interface {}) ( interface {}, error ) {
// Enrich operation
customer := item .( Customer )
taxNumber , err := getTaxNumber ( customer )
if err != nil {
return nil , err
}
customer . TaxNumber = taxNumber
return customer , nil
},
// Create multiple instances of the map operator
rxgo . WithPool ( pool ),
// Serialize the items emitted by their Customer.ID
rxgo . Serialize ( func ( item interface {}) int {
customer := item .( Customer )
return customer . ID
}), rxgo . WithBufferedChannel ( 1 )) En fin de compte, nous consommons les éléments en utilisant ForEach() ou Observe() par exemple. Observe() Renvoie un <-chan Item :
for customer := range observable . Observe () {
if customer . Error () {
return err
}
fmt . Println ( customer )
}Dans le monde RX, il y a une distinction entre les observables froids et chauds. Lorsque les données sont produites par l'observable lui-même, il est observable à froid. Lorsque les données sont produites en dehors de l'observable, il est observable à chaud. Habituellement, lorsque nous ne voulons pas créer un producteur encore et encore, nous favorisons un observable chaud.
Dans RXGO, il y a un concept similaire.
Tout d'abord, créons un observable à chaud à partir de l'opérateur FromChannel et voyons les implications:
ch := make ( chan rxgo. Item )
go func () {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
close ( ch )
}()
observable := rxgo . FromChannel ( ch )
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}Le résultat de cette exécution est:
0
1
2
Cela signifie que le premier observateur a déjà consommé tous les articles. Et il ne reste plus rien pour les autres.
Bien que ce comportement puisse être modifié avec des observables connectés.
Le point principal ici est que le goroutine a produit ces éléments.
D'un autre côté, créons un froid observable à l'aide de l'opérateur Defer :
observable := rxgo . Defer ([]rxgo. Producer { func ( _ context. Context , ch chan <- rxgo. Item ) {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
}})
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}Maintenant, le résultat est:
0
1
2
0
1
2Dans le cas d'un froid observable, le ruisseau a été créé indépendamment pour chaque observateur.
Encore une fois, les observables chauds vs froids ne concernent pas la façon dont vous consomment des éléments, c'est où les données sont produites.
Un bon exemple pour l'observable chaud est les tiques de prix d'une bourse commerciale.
Et si vous enseignez à un observable pour récupérer les produits à partir d'une base de données, alors donnez-leur un par un, vous créerez le froid observable.
Il y a un autre opérateur appelé FromEventSource qui crée un observable à partir d'un canal. La différence entre l'opérateur FromChannel est que dès que l'observable est créé, il commence à émettre des éléments, qu'il y ait un observateur ou non. Par conséquent, les éléments émis par un (s) observables sans observateurs sont perdus (alors qu'ils sont tamponnés avec l'opérateur FromChannel ).
Un cas d'utilisation avec l'opérateur FromEventSource est, par exemple, la télémétrie. Nous ne sommes peut-être pas intéressés par toutes les données produites à partir du tout début d'un flux - seulement les données depuis que nous avons commencé à l'observer.
Une fois que nous commençons à observer un observable créé avec FromEventSource , nous pouvons configurer la stratégie de contre-pression. Par défaut, il bloque (il y a une livraison garantie pour les articles émis après l'avoir observé). Nous pouvons remplacer cette stratégie de cette façon:
observable := rxgo . FromEventSource ( input , rxgo . WithBackPressureStrategy ( rxgo . Drop )) La stratégie Drop signifie que si le pipeline après FromEventSource n'était pas prêt à consommer un élément, cet élément est supprimé.
Par défaut, un canal de connexion des opérateurs n'est pas tamponné. Nous pouvons remplacer ce comportement comme ceci:
observable . Map ( transform , rxgo . WithBufferedChannel ( 42 )) Chaque opérateur a un paramètre opts ...Option permettant de passer de telles options.
La stratégie d'observation par défaut est paresseuse. Cela signifie qu'un opérateur traite les éléments émis par un observable une fois que nous commençons à l'observer. Nous pouvons changer ce comportement de cette façon:
observable := rxgo . FromChannel ( ch ). Map ( transform , rxgo . WithObservationStrategy ( rxgo . Eager )) Dans ce cas, l'opérateur Map est déclenché chaque fois qu'un élément est produit, même sans aucun observateur.
Par défaut, chaque opérateur est séquentiel. Un opérateur étant une instance de Goroutine. Nous pouvons le remplacer en utilisant l'option suivante:
observable . Map ( transform , rxgo . WithPool ( 32 )) Dans cet exemple, nous créons un pool de 32 Goroutines qui consomment des éléments simultanément à partir du même canal. Si l'opération est liée au processeur, nous pouvons utiliser l'option WithCPUPool() qui crée un pool basé sur le nombre de processeurs logiques.
Un observable connectable ressemble à un observable ordinaire, sauf qu'il ne commence pas à émettre des éléments lorsqu'il est abonné, mais uniquement lorsque sa méthode connect () est appelée. De cette façon, vous pouvez attendre que tous les abonnés prévus souscrivent à l'observable avant que l'observable commence à émettre des éléments.
Créons un observable connectable à l'aide de rxgo.WithPublishStrategy :
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())Ensuite, nous créons deux observateurs:
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) + 1 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) * 2 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) Si observable n'était pas un observable connectable, car DoOnNext crée un observateur, la source observable aurait commencé à émettre des éléments. Pourtant, dans le cas d'un observable connecté, nous devons appeler Connect() :
observable . Connect () Une fois que Connect() est appelé, l'observable connecté commence à émettre des éléments.
Il y a un autre changement important avec un observable régulier. Un observable connectable publie ses éléments. Cela signifie que tous les observateurs reçoivent une copie des articles.
Voici un exemple avec un observable régulier:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a regular Observable
observable := rxgo . FromChannel ( ch )
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) First observer: 1
First observer: 2
First observer: 3
Maintenant, avec un observable connecté:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a Connectable Observable
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
})
disposed , cancel := observable . Connect ()
go func () {
// Do something
time . Sleep ( time . Second )
// Then cancel the subscription
cancel ()
}()
// Wait for the subscription to be disposed
<- disposed Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
Un itérable est un objet qui peut être observé à l'aide de Observe(opts ...Option) <-chan Item .
Un itérable peut être soit:
Documentation du package: https://pkg.go.dev/github.com/reactivex/rxgo/v2
Comment utiliser l'API Assert pour écrire des tests unitaires tout en utilisant RXGO.
Options d'opérateur
Toutes les contributions sont les bienvenues! Assurez-vous d'abord de consulter les directives contributives. Les nouveaux arrivants peuvent jeter un œil aux problèmes en cours et vérifier l'étiquette help needed .
De plus, si vous publiez un article sur RXGO, veuillez nous le faire savoir. Nous serions heureux de l'inclure dans la section des ressources externes.
Merci à toutes les personnes qui ont déjà contribué à RXGO!
Un grand merci à JetBrains pour avoir soutenu le projet.