Extensões reativas para o idioma Go
ReactiveX, ou RX, para abreviação, é uma API para programação com fluxos observáveis. Esta é a API oficial do ReactiveX para o idioma Go.
O ReactiveX é uma maneira nova e alternativa de programação assíncrona a retornos de chamada, promessas e adiados. Trata -se de processar fluxos de eventos ou itens, com eventos sendo quaisquer ocorrências ou alterações dentro do sistema. Um fluxo de eventos é chamado de observável.
Um operador é uma função que define um observável, como e quando deve emitir dados. A lista de operadores cobertos está disponível aqui.
A implementação do RXGO é baseada no conceito de pipelines. Um pipeline é uma série de estágios conectados por canais, onde cada estágio é um grupo de goroutinas que executam a mesma função.

Vamos ver um exemplo concreto com cada caixa sendo um operador:
Just .Map .Filter .Neste exemplo, os itens finais são enviados em um canal, disponíveis para um consumidor. Existem muitas maneiras de consumir ou produzir dados usando o RXGO. A publicação dos resultados em um canal é apenas um deles.
Cada operador é um estágio de transformação. Por padrão, tudo é seqüencial. No entanto, podemos aproveitar as arquiteturas modernas da CPU, definindo várias instâncias do mesmo operador. Cada instância do operador é uma goroutina conectada a um canal comum.
A filosofia do RXGO é implementar os conceitos Reactivex e alavancar as primitivas GO principal (canais, goroutinas etc.), para que a integração entre os dois mundos seja o mais suave possível.
go get -u github.com/reactivex/rxgo/v2
Vamos criar nosso primeiro observável e consumir um item:
observable := rxgo . Just ( "Hello, World!" )()
ch := observable . Observe ()
item := <- ch
fmt . Println ( item . V ) O operador Just cria um observável a partir de uma lista estática de itens. Of(value) cria um item de um determinado valor. Se queremos criar um item a partir de um erro, precisamos usar Error(err) . Isso é uma diferença com o V1 que estava aceitando um valor ou um erro diretamente sem precisar envolvê -lo. Qual é a lógica para essa mudança? É para preparar o RXGO para o recurso de genéricos que chegam (espero) em Go 2.
A propósito, o operador Just usa currying como açúcar sintático. Dessa forma, ele aceita vários itens na primeira lista de parâmetros e várias opções na segunda lista de parâmetros. Veremos abaixo como especificar opções.
Depois que o observável é criado, podemos observá -lo usando Observe() . Por padrão, um observável é preguiçoso no sentido de que emite itens apenas uma vez que uma assinatura é feita. Observe() retorna a <-chan rxgo.Item .
Consumimos um item deste canal e imprimimos seu valor do item usando item.V
Um item é um invólucro na parte superior de um valor ou erro. Podemos querer verificar o tipo primeiro como este:
item := <- ch
if item . Error () {
return item . E
}
fmt . Println ( item . V ) item.Error() retorna um booleano indicando se um item contém um erro. Em seguida, usamos o item.E para obter o erro ou item.V para obter o valor.
Por padrão, um observável é interrompido quando um erro é produzido. No entanto, existem operadores especiais para lidar com erros (por exemplo, OnError , Retry , etc.)
Também é possível consumir itens usando retornos de chamada:
observable . ForEach ( func ( v interface {}) {
fmt . Printf ( "received: %v n " , v )
}, func ( err error ) {
fmt . Printf ( "error: %e n " , err )
}, func () {
fmt . Println ( "observable is closed" )
})Neste exemplo, passamos três funções:
NextFunc acionado quando um item de valor é emitido.ErrFunc acionado quando um item de erro é emitido.CompletedFunc concluído acionado quando o observável for concluído. ForEach é não bloqueador. No entanto, ele retorna um canal de notificação que será fechado assim que o observável concluir. Portanto, para fazer o bloqueio de código anterior, simplesmente precisamos usar <- :
<- observable. ForEach ( ... ) Digamos que queremos implementar um fluxo que consome a seguinte estrutura Customer :
type Customer struct {
ID int
Name , LastName string
Age int
TaxNumber string
} Criamos um produtor que emitirá Customer para um determinado chan rxgo.Item e crie um observável a partir dele:
// Create the input channel
ch := make ( chan rxgo. Item )
// Data producer
go producer ( ch )
// Create an Observable
observable := rxgo . FromChannel ( ch )Então, precisamos executar as duas operações seguintes:
Como a etapa enriquecedora é ligada a IO, pode ser interessante paralelizá-lo dentro de um determinado pool de goroutines. No entanto, vamos imaginar que todos os itens Customer precisam ser produzidos sequencialmente com base em seu ID .
observable .
Filter ( func ( item interface {}) bool {
// Filter operation
customer := item .( Customer )
return customer . Age > 18
}).
Map ( func ( _ context. Context , item interface {}) ( interface {}, error ) {
// Enrich operation
customer := item .( Customer )
taxNumber , err := getTaxNumber ( customer )
if err != nil {
return nil , err
}
customer . TaxNumber = taxNumber
return customer , nil
},
// Create multiple instances of the map operator
rxgo . WithPool ( pool ),
// Serialize the items emitted by their Customer.ID
rxgo . Serialize ( func ( item interface {}) int {
customer := item .( Customer )
return customer . ID
}), rxgo . WithBufferedChannel ( 1 )) No final, consumimos os itens usando ForEach() ou Observe() por exemplo. Observe() retorna um <-chan Item :
for customer := range observable . Observe () {
if customer . Error () {
return err
}
fmt . Println ( customer )
}No mundo do RX, há uma distinção entre observáveis frios e quentes. Quando os dados são produzidos pelo próprio observável, é um observável frio. Quando os dados são produzidos fora do observável, é um observável quente. Geralmente, quando não queremos criar um produtor repetidamente, favorecemos um observável quente.
No RXGO, existe um conceito semelhante.
Primeiro, vamos criar um observável quente usando o operador FromChannel e ver as implicações:
ch := make ( chan rxgo. Item )
go func () {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
close ( ch )
}()
observable := rxgo . FromChannel ( ch )
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}O resultado desta execução é:
0
1
2
Isso significa que o primeiro observador já consumiu todos os itens. E nada mais para os outros.
Embora esse comportamento possa ser alterado com observáveis conectáveis.
O ponto principal aqui é que a goroutina produziu esses itens.
Por outro lado, vamos criar um observável frio usando o operador Defer :
observable := rxgo . Defer ([]rxgo. Producer { func ( _ context. Context , ch chan <- rxgo. Item ) {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
}})
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}Agora, o resultado é:
0
1
2
0
1
2No caso de um observável frio, o fluxo foi criado independentemente para todos os observadores.
Novamente, os observáveis quentes vs frios não são sobre como você consome itens, é sobre onde os dados são produzidos.
Um bom exemplo para o Hot Obsertable são carrapatos de preço de uma bolsa comercial.
E se você ensinar um observável a buscar produtos a partir de um banco de dados, entregue -os um por um, criará o Observável frio .
Há outro operador chamado FromEventSource que cria um observável a partir de um canal. A diferença entre o operador FromChannel é que, assim que o observável for criado, ele começa a emitir itens, independentemente de haver um observador ou não. Portanto, os itens emitidos por um observável sem observador (s) são perdidos (enquanto são tamponados com o operador FromChannel ).
Um caso de uso com o operador FromEventSource é, por exemplo, telemetria. Podemos não estar interessados em todos os dados produzidos desde o início de um fluxo - apenas os dados desde que começamos a observá -los.
Quando começamos a observar um observável criado com FromEventSource , podemos configurar a estratégia de contrapressão. Por padrão, está bloqueando (há uma entrega garantida para os itens emitidos depois de observá -lo). Podemos substituir essa estratégia desta maneira:
observable := rxgo . FromEventSource ( input , rxgo . WithBackPressureStrategy ( rxgo . Drop )) A estratégia Drop significa que, se o oleoduto após FromEventSource não estivesse pronto para consumir um item, este item será descartado.
Por padrão, um canal que conecta os operadores não é buffer. Podemos substituir esse comportamento como este:
observable . Map ( transform , rxgo . WithBufferedChannel ( 42 )) Cada operador possui um parâmetro opts ...Option permitindo passar essas opções.
A estratégia de observação padrão é preguiçosa. Isso significa que um operador processa os itens emitidos por um observável quando começamos a observá -lo. Podemos mudar esse comportamento desta maneira:
observable := rxgo . FromChannel ( ch ). Map ( transform , rxgo . WithObservationStrategy ( rxgo . Eager )) Nesse caso, o operador Map é acionado sempre que um item é produzido, mesmo sem nenhum observador.
Por padrão, cada operador é seqüencial. Um operador sendo uma instância de goroutina. Podemos substituí -lo usando a seguinte opção:
observable . Map ( transform , rxgo . WithPool ( 32 )) Neste exemplo, criamos um pool de 32 goroutinas que consomem itens simultaneamente do mesmo canal. Se a operação estiver ligada à CPU, podemos usar a opção WithCPUPool() que cria um pool com base no número de CPUs lógicas.
Um observável conectável se assemelha a um observável comum, exceto que ele não começa a emitir itens quando é subscrito, mas apenas quando o método Connect () é chamado. Dessa forma, você pode esperar que todos os assinantes pretendidos se inscrevam no observável antes que o observável comece a emitir itens.
Vamos criar um observável conectável usando rxgo.WithPublishStrategy :
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())Então, criamos dois observadores:
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) + 1 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) * 2 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) Se observable não fosse um observável conectável, pois DoOnNext cria um observador, a fonte observável começaria a emitir itens. No entanto, no caso de um observável conectável, temos que chamar Connect() :
observable . Connect () Depois que Connect() é chamado, o observável conectável começa a emitir itens.
Há outra mudança importante com um observável regular. Um observável conectável publica seus itens. Isso significa que todos os observadores recebem uma cópia dos itens.
Aqui está um exemplo com um observável regular:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a regular Observable
observable := rxgo . FromChannel ( ch )
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) First observer: 1
First observer: 2
First observer: 3
Agora, com um observável conectável:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a Connectable Observable
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
})
disposed , cancel := observable . Connect ()
go func () {
// Do something
time . Sleep ( time . Second )
// Then cancel the subscription
cancel ()
}()
// Wait for the subscription to be disposed
<- disposed Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
Um iterável é um objeto que pode ser observado usando Observe(opts ...Option) <-chan Item .
Um iterável pode ser:
Documentação do pacote: https://pkg.go.dev/github.com/reactivex/rxgo/v2
Como usar a API Assert para escrever testes de unidade enquanto estiver usando o RXGO.
Opções do operador
Todas as contribuições são muito bem -vindas! Certifique -se de verificar primeiro as diretrizes que contribuem. Os recém -chegados podem dar uma olhada em questões contínuas e verificar a help needed .
Além disso, se você publicar um post sobre o RXGO, informe -nos. Ficaríamos felizes em incluí -lo na seção de recursos externos.
Obrigado a todas as pessoas que já contribuíram para o RXGO!
Um grande agradecimento à JetBrains por apoiar o projeto.