Extensiones reactivas para el idioma GO
Reactivex, o RX para abreviar, es una API para la programación con transmisiones observables. Esta es la API oficial de Reactivex para el idioma GO.
Reactivex es una nueva forma alternativa de programación asincrónica a devoluciones de llamada, promesas y diferidas. Se trata de procesar flujos de eventos o elementos, y los eventos son cualquier hecho o cambio dentro del sistema. Un flujo de eventos se llama observable.
Un operador es una función que define un observable, cómo y cuándo debe emitir datos. La lista de operadores cubiertos está disponible aquí.
La implementación de RXGO se basa en el concepto de tuberías. Una tubería es una serie de etapas conectadas por canales, donde cada etapa es un grupo de goroutinas que ejecutan la misma función.

Veamos un ejemplo concreto con cada caja como operador:
Just .Map .Filter .En este ejemplo, los elementos finales se envían en un canal, disponible para un consumidor. Hay muchas formas de consumir o producir datos utilizando RXGO. Publicar los resultados en un canal es solo uno de ellos.
Cada operador es una etapa de transformación. Por defecto, todo es secuencial. Sin embargo, podemos aprovechar las arquitecturas modernas de la CPU definiendo múltiples instancias del mismo operador. Cada instancia del operador es una goroutina conectada a un canal común.
La filosofía de RXGO es implementar los conceptos reactivos y aprovechar las principales primitivas de GO (canales, goroutinas, etc.) para que la integración entre los dos mundos sea lo más suave posible.
go get -u github.com/reactivex/rxgo/v2
Creemos nuestro primer observable y consumamos un elemento:
observable := rxgo . Just ( "Hello, World!" )()
ch := observable . Observe ()
item := <- ch
fmt . Println ( item . V ) El operador Just crea un observable de una lista estática de elementos. Of(value) crea un elemento de un valor dado. Si queremos crear un elemento a partir de un error, tenemos que usar Error(err) . Esta es una diferencia con el V1 que aceptaba un valor o un error directamente sin tener que envolverlo. ¿Cuál es la justificación para este cambio? Es preparar RXGO para la función de genérica (con suerte) en Go 2.
Por cierto, el operador Just usa el curry como azúcar sintáctico. De esta manera, acepta múltiples elementos en la primera lista de parámetros y múltiples opciones en la segunda lista de parámetros. Veremos a continuación cómo especificar opciones.
Una vez que se crea lo observable, podemos observarlo usando Observe() . Por defecto, un observable es perezoso en el sentido de que emite elementos solo una vez que se realiza una suscripción. Observe() devuelve a <-chan rxgo.Item .
Consumimos un elemento de este canal e imprimimos su valor del elemento usando item.V
Un elemento es un envoltorio encima de un valor o un error. Es posible que deseemos verificar el tipo primero como este:
item := <- ch
if item . Error () {
return item . E
}
fmt . Println ( item . V ) item.Error() Devuelve un booleano que indica si un elemento contiene un error. Luego, usamos cualquier item.E para obtener el error o item.V para obtener el valor.
Por defecto, un observable se detiene una vez que se produce un error. Sin embargo, hay operadores especiales para tratar errores (por ejemplo, OnError , Retry , etc.)
También es posible consumir elementos usando devoluciones de llamada:
observable . ForEach ( func ( v interface {}) {
fmt . Printf ( "received: %v n " , v )
}, func ( err error ) {
fmt . Printf ( "error: %e n " , err )
}, func () {
fmt . Println ( "observable is closed" )
})En este ejemplo, pasamos tres funciones:
NextFunc se activa cuando se emite un elemento de valor.ErrFunc se activó cuando se emite un elemento de error.CompletedFunc activado una vez que se complete el observable. ForEach no es bloqueo. Sin embargo, devuelve un canal de notificación que se cerrará una vez que se complete el observable. Por lo tanto, para hacer el bloqueo de código anterior, simplemente necesitamos usar <- :
<- observable. ForEach ( ... ) Digamos que queremos implementar una transmisión que consuma la siguiente estructura Customer :
type Customer struct {
ID int
Name , LastName string
Age int
TaxNumber string
} Creamos un productor que emitirá a Customer a un chan rxgo.Item dado y creamos un observable a partir de él:
// Create the input channel
ch := make ( chan rxgo. Item )
// Data producer
go producer ( ch )
// Create an Observable
observable := rxgo . FromChannel ( ch )Luego, necesitamos realizar las dos operaciones siguientes:
Como el paso enriquecedor está atado a IO, puede ser interesante paralelizarlo dentro de un grupo de goroutinas. Sin embargo, imaginemos que todos los artículos Customer deben producirse secuencialmente en función de su ID .
observable .
Filter ( func ( item interface {}) bool {
// Filter operation
customer := item .( Customer )
return customer . Age > 18
}).
Map ( func ( _ context. Context , item interface {}) ( interface {}, error ) {
// Enrich operation
customer := item .( Customer )
taxNumber , err := getTaxNumber ( customer )
if err != nil {
return nil , err
}
customer . TaxNumber = taxNumber
return customer , nil
},
// Create multiple instances of the map operator
rxgo . WithPool ( pool ),
// Serialize the items emitted by their Customer.ID
rxgo . Serialize ( func ( item interface {}) int {
customer := item .( Customer )
return customer . ID
}), rxgo . WithBufferedChannel ( 1 )) Al final, consumimos los elementos usando ForEach() u Observe() por ejemplo. Observe() Devuelve un <-chan Item :
for customer := range observable . Observe () {
if customer . Error () {
return err
}
fmt . Println ( customer )
}En el mundo RX, hay una distinción entre observables fríos y calientes. Cuando los datos son producidos por lo observable en sí, es un frío observable. Cuando los datos se producen fuera de lo observable, es una observable en caliente. Por lo general, cuando no queremos crear un productor una y otra vez, favorecemos un observable en caliente.
En RXGO, hay un concepto similar.
Primero, creemos una observable en caliente utilizando el operador FromChannel y veamos las implicaciones:
ch := make ( chan rxgo. Item )
go func () {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
close ( ch )
}()
observable := rxgo . FromChannel ( ch )
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}El resultado de esta ejecución es:
0
1
2
Significa que el primer observador ya consumió todos los artículos. Y nada dejó para los demás.
Aunque este comportamiento se puede alterar con observables conectables.
El punto principal aquí es que la Goroutine produjo esos artículos.
Por otro lado, creemos un frío observable usando el operador Defer :
observable := rxgo . Defer ([]rxgo. Producer { func ( _ context. Context , ch chan <- rxgo. Item ) {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
}})
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}Ahora, el resultado es:
0
1
2
0
1
2En el caso de un frío observable, la corriente se creó independientemente para cada observador.
Una vez más, los observables Hot vs Cold no se tratan de cómo consumir elementos, es sobre dónde se producen los datos.
Un buen ejemplo para el observable en caliente son las garrapatas de precio de un intercambio de negociación.
Y si enseña a un observable buscar productos de una base de datos, entonces los cede uno por uno, creará el frío observable.
Hay otro operador llamado FromEventSource que crea un observable de un canal. La diferencia entre el operador FromChannel es que tan pronto como se crea lo observable, comienza a emitir elementos, independientemente de si hay un observador o no. Por lo tanto, los ítems emitidos por un observable sin observadores se pierden (mientras se amortiguan con el operador FromChannel ).
Un caso de uso con el operador FromEventSource es, por ejemplo, la telemetría. Es posible que no estemos interesados en todos los datos producidos desde el comienzo de una secuencia, solo los datos desde que comenzamos a observarlos.
Una vez que comenzamos a observar un observable creado con FromEventSource , podemos configurar la estrategia de contrapresión. Por defecto, está bloqueando (hay una entrega garantizada para los artículos emitidos después de observarlo). Podemos anular esta estrategia de esta manera:
observable := rxgo . FromEventSource ( input , rxgo . WithBackPressureStrategy ( rxgo . Drop )) La estrategia Drop significa que si la tubería después de FromEventSource no estaba lista para consumir un artículo, este artículo se elimina.
Por defecto, un canal que conecta a los operadores no se torce. Podemos anular este comportamiento como este:
observable . Map ( transform , rxgo . WithBufferedChannel ( 42 )) Cada operador tiene un parámetro opts ...Option que permite pasar tales opciones.
La estrategia de observación predeterminada es perezosa. Significa que un operador procesa los elementos emitidos por un observable una vez que comenzamos a observarlo. Podemos cambiar este comportamiento de esta manera:
observable := rxgo . FromChannel ( ch ). Map ( transform , rxgo . WithObservationStrategy ( rxgo . Eager )) En este caso, el operador Map se activa cada vez que se produce un elemento, incluso sin ningún observador.
Por defecto, cada operador es secuencial. Un operador es una instancia de Goroutine. Podemos anularlo usando la siguiente opción:
observable . Map ( transform , rxgo . WithPool ( 32 )) En este ejemplo, creamos un conjunto de 32 goroutinas que consumen elementos simultáneamente del mismo canal. Si la operación está unida a CPU, podemos usar la opción WithCPUPool() que crea un grupo basado en la cantidad de CPU lógicas.
Un observable conectable se asemeja a un observable ordinario, excepto que no comienza a emitir elementos cuando se suscribe, sino solo cuando se llama a su método Connect (). De esta manera, puede esperar a que todos los suscriptores previstos se suscriban a lo observable antes de que el observable comience a emitir elementos.
Creemos un observable conectable usando rxgo.WithPublishStrategy :
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())Luego, creamos dos observadores:
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) + 1 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) * 2 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) Si observable no fuera un observable conectable, ya que DoOnNext crea un observador, la fuente observable habría comenzado a emitir elementos. Sin embargo, en el caso de un observable conectable, tenemos que llamar Connect() ::
observable . Connect () Una vez que se llama Connect() , el observable conectable comienza a emitir elementos.
Hay otro cambio importante con un observable regular. Un observable conectable publica sus elementos. Significa que todos los observadores reciben una copia de los elementos.
Aquí hay un ejemplo con un observable regular:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a regular Observable
observable := rxgo . FromChannel ( ch )
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) First observer: 1
First observer: 2
First observer: 3
Ahora, con un observable conectable:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a Connectable Observable
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
})
disposed , cancel := observable . Connect ()
go func () {
// Do something
time . Sleep ( time . Second )
// Then cancel the subscription
cancel ()
}()
// Wait for the subscription to be disposed
<- disposed Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
Un iterable es un objeto que se puede observar usando Observe(opts ...Option) <-chan Item .
Un iterable puede ser:
Documentación del paquete: https://pkg.go.dev/github.com/reactivex/rxgo/v2
Cómo usar la API de afirmación para escribir pruebas unitarias mientras usa RXGO.
Opciones de operador
¡Todas las contribuciones son muy bienvenidas! Asegúrese de ver primero las pautas contribuyentes. Los recién llegados pueden echar un vistazo a los problemas continuos y verificar la etiqueta help needed .
Además, si publica una publicación sobre RXGO, háganoslo saber. Nos alegraría incluirlo en la sección de recursos externos.
¡Gracias a todas las personas que ya contribuyeron a RXGO!
Muchas gracias a JetBrains por apoyar el proyecto.