GO语言的反应性扩展
ReactiveX(简称Reactivex)是用于使用可观察流的编程的API。这是GO语言的官方Ractivex API。
Reactivex是一种新的,替代的方式,用于回调,承诺和延期。这是关于处理事件或项目的流,事件是系统内的任何发生或更改。事件流称为可观察的。
操作员是定义可观察到的函数,如何以及何时发出数据。涵盖的操作员列表可在此处找到。
RXGO实现基于管道的概念。管道是通过通道连接的一系列阶段,每个阶段都是运行相同功能的一组goroutines。

让我们看看一个具体的示例,每个框是操作员:
Just Operator创建一个基于固定项目列表的静态观察。Map运算符定义转换函数(转换为正方形)。Filter操作员过滤每个黄色正方形。在此示例中,最终项目以频道发送,可用于消费者。有很多方法可以使用RXGO消费或生产数据。在频道中发布结果只是其中之一。
每个操作员都是转换阶段。默认情况下,一切都是顺序的。但是,我们可以通过定义同一操作员的多个实例来利用现代CPU架构。每个操作员实例是连接到公共通道的goroutine。
RXGO的理念是实施Ractivex概念并利用主要的GO原始词(频道,goroutines等),以便两个世界之间的集成尽可能顺利。
go get -u github.com/reactivex/rxgo/v2
让我们创建我们的第一个可观察并消耗项目:
observable := rxgo . Just ( "Hello, World!" )()
ch := observable . Observe ()
item := <- ch
fmt . Println ( item . V ) Just操作员从静态项目列表中创建一个可观察到的。 Of(value)从给定值创建项目。如果我们要从错误创建项目,则必须使用Error(err) 。这是与直接接受值或错误无需包装的V1的差异。这种变化的理由是什么?它是为了在第2期中(希望)准备RXGO的RXGO。
顺便说一句, Just Operator使用咖喱作为句法糖。这样,它在第一个参数列表中接受多个项目,第二个参数列表中的多个选项。我们将在下面查看如何指定选项。
一旦创建了可观察到的,我们就可以使用Observe()观察它。默认情况下,一个可观察到的懒惰是因为它仅在订阅后才发出项目。 Observe()返回<-chan rxgo.Item 。
我们从该通道中消费了一个项目,并使用item.V打印了其物品的价值。
项目是值或错误之上的包装器。我们可能想先检查一下类型:
item := <- ch
if item . Error () {
return item . E
}
fmt . Println ( item . V ) item.Error()返回布尔值,指示项目是否包含错误。然后,我们使用任何item.E获取错误或item.V以获取值。
默认情况下,一旦产生错误,就会停止可观察到的。但是,有特别操作员可以处理错误(例如, OnError , Retry等)
也可以使用回调食用项目:
observable . ForEach ( func ( v interface {}) {
fmt . Printf ( "received: %v n " , v )
}, func ( err error ) {
fmt . Printf ( "error: %e n " , err )
}, func () {
fmt . Println ( "observable is closed" )
})在此示例中,我们传递了三个功能:
NextFunc 。ErrFunc 。CompletedFunc 。 ForEach是无障碍的。但是,它返回一个通知渠道,一旦可观察到完成,该通知渠道将关闭。因此,要使先前的代码阻止,我们只需要使用<- ::
<- observable. ForEach ( ... )假设我们要实施一个消耗以下Customer结构的流:
type Customer struct {
ID int
Name , LastName string
Age int
TaxNumber string
}我们创建了一个生产商,将Customer s散发到给定的chan rxgo.Item上,并从中创建一个可观察的:
// Create the input channel
ch := make ( chan rxgo. Item )
// Data producer
go producer ( ch )
// Create an Observable
observable := rxgo . FromChannel ( ch )然后,我们需要执行以下两个操作:
由于丰富的步骤是io的结合,因此将其在给定的goroutines池中并行化可能很有趣。但是,让我们想象所有Customer物品都需要根据其ID依次生产。
observable .
Filter ( func ( item interface {}) bool {
// Filter operation
customer := item .( Customer )
return customer . Age > 18
}).
Map ( func ( _ context. Context , item interface {}) ( interface {}, error ) {
// Enrich operation
customer := item .( Customer )
taxNumber , err := getTaxNumber ( customer )
if err != nil {
return nil , err
}
customer . TaxNumber = taxNumber
return customer , nil
},
// Create multiple instances of the map operator
rxgo . WithPool ( pool ),
// Serialize the items emitted by their Customer.ID
rxgo . Serialize ( func ( item interface {}) int {
customer := item .( Customer )
return customer . ID
}), rxgo . WithBufferedChannel ( 1 ))最后,我们使用ForEach()或Observe()消费项目。 Observe()返回<-chan Item :
for customer := range observable . Observe () {
if customer . Error () {
return err
}
fmt . Println ( customer )
}在RX世界中,冷和热可观察物之间存在区别。当数据由可观察到的本身产生时,它是可观察的。当可观察到的数据在可观察的外部产生时,它是可观察的。通常,当我们不想一遍又一遍地创建生产者时,我们会赞成一个可观察到的热门人物。
在RXGO中,也有类似的概念。
首先,让我们使用FromChannel Operator创建一个可观察的热观察,并查看含义:
ch := make ( chan rxgo. Item )
go func () {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
close ( ch )
}()
observable := rxgo . FromChannel ( ch )
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}此执行的结果是:
0
1
2
这意味着第一个观察者已经消耗了所有项目。没有其他人留给别人。
尽管可以使用可连接的可观察物来改变此行为。
这里的要点是Goroutine生产了这些项目。
另一方面,让我们使用Defer操作员创建一个可冷观察的人:
observable := rxgo . Defer ([]rxgo. Producer { func ( _ context. Context , ch chan <- rxgo. Item ) {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
}})
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}现在,结果是:
0
1
2
0
1
2在可观察的冷,为每个观察者独立创建了该流。
同样,热与冷可观察物质无关您消耗商品,而是关于数据的生成在何处。
热门观察的好例子是交易交易所的价格滴答。
而且,如果您教一个可观察的可以从数据库中获取产品,然后将它们一个一个一个屈服,您将创建可观察的冷。
还有另一个名为FromEventSource的操作员,它可以从频道创建可观察到的。 FromChannel运算符之间的区别在于,一旦创建了可观察到的,它就会开始散发项目,无论是否有观察者。因此,丢失了没有观察者的观察员发出的项目(而在FromChannel操作员则缓冲时)。
FromEventSource操作员的用例是遥测。我们可能对从流的一开始产生的所有数据不感兴趣,这是我们开始观察的数据。
一旦我们开始观察一个使用FromEventSource创建的可观察到的,我们就可以配置背压策略。默认情况下,它正在阻止(我们观察到它后发出的物品保证了交付)。我们可以这样覆盖这种策略:
observable := rxgo . FromEventSource ( input , rxgo . WithBackPressureStrategy ( rxgo . Drop )) Drop策略意味着,如果FromEventSource之后的管道还没有准备好食用物品,则该项目将被删除。
默认情况下,连接运算符的通道连接不缓和。我们可以这样覆盖这种行为:
observable . Map ( transform , rxgo . WithBufferedChannel ( 42 ))每个操作员都有一个opts ...Option参数允许通过此类选项。
默认的观察策略很懒惰。这意味着操作员一旦我们开始观察,操作员就可以通过可观察到的物品进行处理。我们可以这样改变这种行为:
observable := rxgo . FromChannel ( ch ). Map ( transform , rxgo . WithObservationStrategy ( rxgo . Eager ))在这种情况下,即使没有任何观察者,每当生成项目时,都会触发Map运算符。
默认情况下,每个操作员都是顺序的。一个操作员是一个goroutine实例。我们可以使用以下选项对其进行覆盖:
observable . Map ( transform , rxgo . WithPool ( 32 ))在此示例中,我们创建了一个32个Goroutines的池,该池从同一通道同时消费项目。如果操作是cpu结合的,我们可以使用WithCPUPool()选项,该选项基于逻辑CPU的数量创建池。
可连接的可观察到的类似于普通观察的可观察,除了它在订阅时不会开始发射项目,而只有在调用其连接()方法时。这样,您可以等待所有预期的订户在可观察到的发射项目之前订阅可观察到的可观察到的。
让我们使用rxgo.WithPublishStrategy创建一个可连接的可观察的连接:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())然后,我们创建了两个观察者:
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) + 1 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) * 2 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
})如果observable是可观察的,那么DoOnNext创建了一个观察者,那么可观察到的源将开始发出项目。但是,在可连接可观察的情况下,我们必须致电Connect() :
observable . Connect ()一旦调用了Connect() ,可连接的可观察到开始发出项目。
常规可观察到还有另一个重要的变化。可连接的可观察到其项目。这意味着所有观察者都会收到项目的副本。
这是一个常规观察的示例:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a regular Observable
observable := rxgo . FromChannel ( ch )
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) First observer: 1
First observer: 2
First observer: 3
现在,具有可观察的可连接:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a Connectable Observable
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
})
disposed , cancel := observable . Connect ()
go func () {
// Do something
time . Sleep ( time . Second )
// Then cancel the subscription
cancel ()
}()
// Wait for the subscription to be disposed
<- disposed Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
一个可以使用Observe(opts ...Option) <-chan Item观察的对象。
一个可以是:
软件包文档:https://pkg.go.dev/github.com/reactivex/rxgo/v2
在使用RXGO时,如何使用断言API来编写单元测试。
操作员选项
所有贡献都非常欢迎!请确保首先查看贡献指南。新移民可以查看正在进行的问题,并检查help needed标签。
另外,如果您发表有关RXGO的帖子,请告诉我们。我们很高兴将其包括在外部资源部分。
感谢所有已经为RXGO做出贡献的人!
非常感谢Jetbrains支持该项目。