GO 언어에 대한 반응성 확장
Recivitix 또는 Rx는 관찰 가능한 스트림이있는 프로그래밍을위한 API입니다. 이것은 Go Language의 공식 Reactivex API입니다.
Reactivex는 콜백, 약속 및 연기에 대한 비동기 프로그래밍의 새로운 대안적인 방법입니다. 이벤트가 시스템 내에서 발생하거나 변경되는 이벤트 또는 항목의 스트림 처리에 관한 것입니다. 이벤트 흐름을 관찰 가능이라고합니다.
연산자는 관찰 가능한 방법을 정의하는 기능입니다. 다루는 운영자 목록은 여기에서 제공됩니다.
RXGO 구현은 파이프 라인의 개념을 기반으로합니다. 파이프 라인은 채널로 연결된 일련의 단계로, 각 단계는 동일한 기능을 실행하는 고어 라틴 그룹입니다.

각 상자가 연산자 인 구체적인 예를 보자.
Just 연산자를 사용하여 고정 된 항목 목록을 기반으로 정적 관찰 가능한 정적 관찰 가능성을 만듭니다.Map 연산자를 사용하여 변환 함수 (원을 사각형으로 변환)를 정의합니다.Filter 연산자를 사용하여 각 노란색 사각형을 필터링합니다.이 예에서 최종 품목은 채널로 전송되어 소비자가 사용할 수 있습니다. RXGO를 사용하여 데이터를 소비하거나 생산하는 방법에는 여러 가지가 있습니다. 채널에 결과를 게시하는 것은 그 중 하나 일뿐입니다.
각 연산자는 변환 단계입니다. 기본적으로 모든 것이 순차적입니다. 그러나 동일한 연산자의 여러 인스턴스를 정의하여 최신 CPU 아키텍처를 활용할 수 있습니다. 각 연산자 인스턴스는 공통 채널에 연결된 고루 틴입니다.
RXGO의 철학은 Reactivitix 개념을 구현하고 메인 GO 프리미티브 (채널, 고루틴 등)를 활용하여 두 세계 간의 통합이 가능한 한 매끄러 듭니다.
go get -u github.com/reactivex/rxgo/v2
우리의 첫 번째 관찰 가능성을 만들고 항목을 소비합시다.
observable := rxgo . Just ( "Hello, World!" )()
ch := observable . Observe ()
item := <- ch
fmt . Println ( item . V ) Just 연산자는 정적 항목 목록에서 관찰 가능한 것을 만듭니다. Of(value) 주어진 값에서 항목을 만듭니다. 오류에서 항목을 만들려면 Error(err) 사용해야합니다. 이것은 값을 감싸지 않고 직접 값이나 오류를 수락 한 V1과 차이입니다. 이 변화의 근거는 무엇입니까? Go 2에서 제네릭 기능 (희망적으로)을 위해 RXGO를 준비하는 것입니다.
그건 그렇고, Just 운영자는 카레를 구문 설탕으로 사용합니다. 이런 식으로 첫 번째 매개 변수 목록의 여러 항목과 두 번째 매개 변수 목록의 여러 옵션을 허용합니다. 아래에서 옵션을 지정하는 방법을 볼 수 있습니다.
관찰 가능한 것이 생성되면 Observe() 사용하여 관찰 할 수 있습니다. 기본적으로, 구독이 이루어진 후에 만 항목을 방출한다는 점에서 관찰 가능한 것은 게으른다. Observe() a <-chan rxgo.Item 반환합니다.
우리는이 채널에서 항목을 소비하고 item.V 사용하여 항목의 값을 인쇄했습니다.
항목은 값 또는 오류 위에 래퍼입니다. 다음과 같은 유형을 먼저 확인할 수 있습니다.
item := <- ch
if item . Error () {
return item . E
}
fmt . Println ( item . V ) item.Error() 항목에 오류가 포함되어 있는지 여부를 나타내는 부울을 반환합니다. 그런 다음 값을 얻기 위해 오류 또는 item.V 얻기 위해 item.E 사용합니다.
기본적으로 오류가 생성되면 관찰 가능이 중지됩니다. 그러나 오류를 처리 할 특수 운영자 (예 : OnError , Retry 등)가 있습니다.
콜백을 사용하여 항목을 소비 할 수도 있습니다.
observable . ForEach ( func ( v interface {}) {
fmt . Printf ( "received: %v n " , v )
}, func ( err error ) {
fmt . Printf ( "error: %e n " , err )
}, func () {
fmt . Println ( "observable is closed" )
})이 예에서는 세 가지 기능을 전달했습니다.
NextFunc 가 트리거됩니다.ErrFunc 가 트리거됩니다.CompletedFunc 가 트리거됩니다. ForEach 차단이 아닙니다. 그러나 관찰 가능한 완료되면 닫힐 알림 채널을 반환합니다. 따라서 이전 코드를 차단하기 위해서는 단순히 <- :
<- observable. ForEach ( ... ) 다음 Customer 구조를 소비하는 스트림을 구현하고 싶다고 가정 해 봅시다.
type Customer struct {
ID int
Name , LastName string
Age int
TaxNumber string
} 우리는 Customer 을 주어진 chan rxgo.Item 에 방출하고 관찰 가능한 생산자를 만듭니다.
// Create the input channel
ch := make ( chan rxgo. Item )
// Data producer
go producer ( ch )
// Create an Observable
observable := rxgo . FromChannel ( ch )그런 다음 다음 작업을 수행해야합니다.
풍부한 단계가 IO에 묶여 있으므로 주어진 Goroutines 풀 내에서 병렬화하는 것이 흥미로울 수 있습니다. 그러나 모든 Customer 항목이 ID 기반으로 순차적으로 생성되어야한다고 상상해 봅시다.
observable .
Filter ( func ( item interface {}) bool {
// Filter operation
customer := item .( Customer )
return customer . Age > 18
}).
Map ( func ( _ context. Context , item interface {}) ( interface {}, error ) {
// Enrich operation
customer := item .( Customer )
taxNumber , err := getTaxNumber ( customer )
if err != nil {
return nil , err
}
customer . TaxNumber = taxNumber
return customer , nil
},
// Create multiple instances of the map operator
rxgo . WithPool ( pool ),
// Serialize the items emitted by their Customer.ID
rxgo . Serialize ( func ( item interface {}) int {
customer := item .( Customer )
return customer . ID
}), rxgo . WithBufferedChannel ( 1 )) 결국, 우리는 예를 들어 ForEach() 또는 Observe() 사용하여 항목을 소비합니다. Observe() a <-chan Item 반환합니다.
for customer := range observable . Observe () {
if customer . Error () {
return err
}
fmt . Println ( customer )
}RX 세계에서는 차갑고 뜨거운 관찰 가능성 사이의 구별이 있습니다. 데이터가 관찰 가능한 자체에 의해 생성 될 때, 그것은 냉간 관찰 가능입니다. 데이터가 관찰 가능한 외부에서 생성되면 핫 관찰 가능합니다. 일반적으로 프로듀서를 반복해서 만들고 싶지 않을 때는 뜨거운 관찰 가능한 것을 선호합니다.
RXGO에는 비슷한 개념이 있습니다.
먼저, FromChannel 연산자에서 Hot Observable을 만들고 그 의미를 보자.
ch := make ( chan rxgo. Item )
go func () {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
close ( ch )
}()
observable := rxgo . FromChannel ( ch )
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}이 실행의 결과는 다음과 같습니다.
0
1
2
그것은 첫 번째 관찰자가 이미 모든 항목을 소비했음을 의미합니다. 그리고 다른 사람들에게 남은 것은 없습니다.
이 동작은 연결성 관측 가능성으로 변경 될 수 있지만.
여기서 요점은 그 항목을 생산 한 고루틴입니다.
반면, Defer 연산자를 사용하여 냉간 관찰 가능한 것을 만들어 봅시다.
observable := rxgo . Defer ([]rxgo. Producer { func ( _ context. Context , ch chan <- rxgo. Item ) {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
}})
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}이제 결과는 다음과 같습니다.
0
1
2
0
1
2냉간 관찰 가능한 경우, 스트림은 모든 관찰자에 대해 독립적으로 생성되었습니다.
다시 말하지만, Hot vs Cold Observables는 항목을 소비하는 방법에 관한 것이 아니라 데이터가 생성되는 위치에 관한 것입니다.
Hot Observable의 좋은 예는 거래 거래소의 가격 진드기입니다.
또한 데이터베이스에서 제품을 가져 오도록 관찰 가능한 것을 가르치면 하나씩 산출하면 냉간 관찰 가능성을 만듭니다.
FromEventSource 라는 다른 연산자가 채널에서 관찰 가능한 것을 생성합니다. FromChannel Operator의 차이점은 관찰 가능한 것이 생성 되 자마자 관찰자가 있든 없든 상관없이 항목을 방출하기 시작한다는 것입니다. 따라서 관찰자가없는 관찰 가능한 항목은 손실됩니다 ( FromChannel 연산자로 버퍼링되는 동안).
FromEventSource 운영자의 사용 사례는 예를 들어 원격 측정입니다. 스트림의 시작부터 생성 된 모든 데이터에 관심이 없을 수도 있습니다.
FromEventSource 로 생성 된 관찰 가능한 것을 관찰하기 시작하면 역압 전략을 구성 할 수 있습니다. 기본적으로 차단 중입니다 (관찰 후 방출 된 품목에 대한 보장 된 배송이 있습니다). 우리는이 전략을 이런 식으로 무시할 수 있습니다.
observable := rxgo . FromEventSource ( input , rxgo . WithBackPressureStrategy ( rxgo . Drop )) Drop 전략은 FromEventSource 이후의 파이프 라인이 항목을 소비 할 준비가되지 않은 경우이 항목이 삭제되었음을 의미합니다.
기본적으로 채널 연결 연산자는 부러지지 않습니다. 우리는이 동작을 다음과 같이 무시할 수 있습니다.
observable . Map ( transform , rxgo . WithBufferedChannel ( 42 )) 각 연산자에게는 opts ...Option 매개 변수가 있으며 이러한 옵션을 통과 할 수 있습니다.
기본 관찰 전략은 게으르다. 이는 운영자가 관찰하기 시작하면 관찰 가능한 항목을 처리한다는 것을 의미합니다. 우리는이 행동을 이런 식으로 바꿀 수 있습니다.
observable := rxgo . FromChannel ( ch ). Map ( transform , rxgo . WithObservationStrategy ( rxgo . Eager )) 이 경우, Map 연산자는 관찰자 없이도 항목을 생성 할 때마다 트리거됩니다.
기본적으로 각 연산자는 순차적입니다. 하나의 연산자는 하나의 고루 틴 인스턴스입니다. 다음 옵션을 사용하여 재정의 할 수 있습니다.
observable . Map ( transform , rxgo . WithPool ( 32 )) 이 예에서는 동일한 채널에서 동시에 항목을 소비하는 32 개의 고어 라틴 풀을 만듭니다. 작업이 CPU에 결합 된 경우 논리 CPU 수를 기반으로 풀을 생성하는 WithCPUPool() 옵션을 사용할 수 있습니다.
연결 가능한 관찰 가능한 것은 구독 할 때 항목을 방출하기 시작하지 않고 Connect () 메소드가 호출되는 경우에만 방출을 시작한다는 점을 제외하고는 일반적인 관찰 가능한 것과 비슷합니다. 이런 식으로, 의도 된 모든 가입자가 관찰 가능한 항목을 시작하기 전에 관찰 가능한 것을 구독 할 때까지 기다릴 수 있습니다.
rxgo.WithPublishStrategy 사용하여 연결 가능한 관찰 가능한 것을 만들어 봅시다 :
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())그런 다음 두 관찰자를 만듭니다.
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) + 1 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) * 2 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) DoOnNext 관찰자를 생성함에 따라 observable 가능한 관찰 가능이 관찰 가능하지 않은 경우, 관찰 가능한 소스는 방출 항목을 시작했을 것입니다. 그러나 연결 가능한 관찰 가능한 경우 Connect() 호출해야합니다.
observable . Connect () Connect() 가 호출되면 연결 가능한 관측 가능이 항목을 방출하기 시작합니다.
정기적으로 관찰 가능한 또 다른 중요한 변화가 있습니다. 연결 가능한 관찰 가능한 항목을 게시합니다. 그것은 모든 관찰자가 항목의 사본을 받는다는 것을 의미합니다.
다음은 정기적으로 관찰 가능한 예입니다.
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a regular Observable
observable := rxgo . FromChannel ( ch )
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) First observer: 1
First observer: 2
First observer: 3
이제 연결 가능한 관찰 가능 :
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a Connectable Observable
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
})
disposed , cancel := observable . Connect ()
go func () {
// Do something
time . Sleep ( time . Second )
// Then cancel the subscription
cancel ()
}()
// Wait for the subscription to be disposed
<- disposed Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
반복 가능한 것은 Observe(opts ...Option) <-chan Item 사용하여 관찰 할 수있는 객체입니다.
반복 가능한 것은 다음 중 하나 일 수 있습니다.
패키지 문서 : https://pkg.go.dev/github.com/reactivex/rxgo/v2
Assert API를 사용하여 RXGO를 사용하는 동안 단위 테스트를 작성하는 방법.
운영자 옵션
모든 기부금은 매우 환영합니다! 기고 가이드 라인을 먼저 확인하십시오. 신규 이민자들은 진행중인 문제를 살펴보고 help needed 레이블을 확인할 수 있습니다.
또한 RXGO에 대한 게시물을 게시하면 알려주십시오. 외부 리소스 섹션에 포함하게되어 기쁩니다.
이미 RXGO에 기여한 모든 사람들에게 감사합니다!
프로젝트를 지원해 주신 JetBrains에게 큰 감사를드립니다.