Ekstensi reaktif untuk bahasa go
Reactivex, atau RX singkatnya, adalah API untuk pemrograman dengan aliran yang dapat diamati. Ini adalah API ReaktifEx resmi untuk bahasa Go.
Reactivex adalah cara baru, alternatif pemrograman asinkron untuk panggilan balik, janji, dan ditangguhkan. Ini tentang memproses aliran peristiwa atau item, dengan peristiwa menjadi kejadian atau perubahan dalam sistem. Aliran peristiwa disebut yang dapat diamati.
Operator adalah fungsi yang mendefinisikan yang dapat diamati, bagaimana dan kapan harus memancarkan data. Daftar operator yang dibahas tersedia di sini.
Implementasi RXGO didasarkan pada konsep pipa. Pipa adalah serangkaian tahapan yang dihubungkan oleh saluran, di mana setiap tahap adalah sekelompok goroutine yang menjalankan fungsi yang sama.

Mari kita lihat contoh konkret dengan setiap kotak menjadi operator:
Just .Map .Filter .Dalam contoh ini, item akhir dikirim dalam saluran, tersedia untuk konsumen. Ada banyak cara untuk mengonsumsi atau menghasilkan data menggunakan RXGO. Menerbitkan hasil dalam saluran hanyalah salah satunya.
Setiap operator adalah tahap transformasi. Secara default, semuanya berurutan. Namun, kita dapat memanfaatkan arsitektur CPU modern dengan mendefinisikan beberapa contoh operator yang sama. Setiap contoh operator menjadi goroutine yang terhubung ke saluran umum.
Filosofi RXGO adalah untuk mengimplementasikan konsep reaktif dan memanfaatkan primitif GO utama (saluran, goroutine, dll.) Sehingga integrasi antara kedua dunia semulus mungkin.
go get -u github.com/reactivex/rxgo/v2
Mari kita buat pertama yang dapat diamati dan konsumsi item:
observable := rxgo . Just ( "Hello, World!" )()
ch := observable . Observe ()
item := <- ch
fmt . Println ( item . V ) Operator Just membuat yang dapat diamati dari daftar item statis. Of(value) membuat item dari nilai yang diberikan. Jika kami ingin membuat item dari kesalahan, kami harus menggunakan Error(err) . Ini adalah perbedaan dengan V1 yang menerima nilai atau kesalahan secara langsung tanpa harus membungkusnya. Apa alasan untuk perubahan ini? Ini untuk menyiapkan RXGO untuk fitur generik yang datang (semoga) di Go 2.
Ngomong -ngomong, operator Just menggunakan kari sebagai gula sintaksis. Dengan cara ini, ia menerima beberapa item dalam daftar parameter pertama dan beberapa opsi dalam daftar parameter kedua. Kita akan lihat di bawah ini cara menentukan opsi.
Setelah yang dapat diamati dibuat, kita dapat mengamatinya menggunakan Observe() . Secara default, yang dapat diamati malas dalam arti bahwa ia memancarkan item hanya setelah langganan dibuat. Observe() mengembalikan <-chan rxgo.Item .
Kami mengonsumsi item dari saluran ini dan mencetak nilainya dari item menggunakan item.V
Item adalah pembungkus di atas nilai atau kesalahan. Kami mungkin ingin memeriksa jenisnya terlebih dahulu seperti ini:
item := <- ch
if item . Error () {
return item . E
}
fmt . Println ( item . V ) item.Error() Mengembalikan boolean yang menunjukkan apakah item berisi kesalahan. Kemudian, kami menggunakan salah satu item.E untuk mendapatkan kesalahan atau item.V untuk mendapatkan nilainya.
Secara default, yang dapat diamati dihentikan setelah kesalahan dihasilkan. Namun, ada operator khusus untuk menangani kesalahan (misalnya, OnError , Retry , dll.)
Dimungkinkan juga untuk mengkonsumsi item menggunakan panggilan balik:
observable . ForEach ( func ( v interface {}) {
fmt . Printf ( "received: %v n " , v )
}, func ( err error ) {
fmt . Printf ( "error: %e n " , err )
}, func () {
fmt . Println ( "observable is closed" )
})Dalam contoh ini, kami melewati tiga fungsi:
NextFunc dipicu saat item nilai dipancarkan.ErrFunc dipicu saat item kesalahan dipancarkan.CompletedFunc dipicu setelah yang dapat diamati selesai. ForEach tidak blokir. Namun, ia mengembalikan saluran pemberitahuan yang akan ditutup setelah yang dapat diamati selesai. Oleh karena itu, untuk membuat pemblokiran kode sebelumnya, kita hanya perlu menggunakan <- :
<- observable. ForEach ( ... ) Katakanlah kami ingin menerapkan aliran yang mengkonsumsi struktur Customer berikut:
type Customer struct {
ID int
Name , LastName string
Age int
TaxNumber string
} Kami membuat produser yang akan memancarkan Customer ke chan rxgo.Item yang diberikan dan membuat yang dapat diamati dari itu:
// Create the input channel
ch := make ( chan rxgo. Item )
// Data producer
go producer ( ch )
// Create an Observable
observable := rxgo . FromChannel ( ch )Kemudian, kita perlu melakukan dua operasi berikut:
Karena langkah memperkaya IO-terikat, mungkin menarik untuk memparalelkannya di dalam kumpulan goroutine tertentu. Namun, mari kita bayangkan bahwa semua item Customer perlu diproduksi secara berurutan berdasarkan ID -nya.
observable .
Filter ( func ( item interface {}) bool {
// Filter operation
customer := item .( Customer )
return customer . Age > 18
}).
Map ( func ( _ context. Context , item interface {}) ( interface {}, error ) {
// Enrich operation
customer := item .( Customer )
taxNumber , err := getTaxNumber ( customer )
if err != nil {
return nil , err
}
customer . TaxNumber = taxNumber
return customer , nil
},
// Create multiple instances of the map operator
rxgo . WithPool ( pool ),
// Serialize the items emitted by their Customer.ID
rxgo . Serialize ( func ( item interface {}) int {
customer := item .( Customer )
return customer . ID
}), rxgo . WithBufferedChannel ( 1 )) Pada akhirnya, kami mengkonsumsi item menggunakan ForEach() atau Observe() misalnya. Observe() mengembalikan <-chan Item :
for customer := range observable . Observe () {
if customer . Error () {
return err
}
fmt . Println ( customer )
}Di dunia RX, ada perbedaan antara dingin dan panas yang dapat diamati. Ketika data diproduksi oleh yang dapat diamati sendiri, itu adalah dingin yang dapat diamati. Ketika data diproduksi di luar yang dapat diamati, itu adalah panas yang dapat diamati. Biasanya, ketika kami tidak ingin membuat produser berulang kali, kami menyukai yang panas yang dapat diamati.
Di RXGO, ada konsep yang serupa.
Pertama, mari kita buat operator FromChannel yang dapat diamati dan lihat implikasinya:
ch := make ( chan rxgo. Item )
go func () {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
close ( ch )
}()
observable := rxgo . FromChannel ( ch )
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}Hasil eksekusi ini adalah:
0
1
2
Ini berarti pengamat pertama sudah mengonsumsi semua item. Dan tidak ada yang tersisa untuk orang lain.
Meskipun perilaku ini dapat diubah dengan dapat diamati.
Poin utama di sini adalah Goroutine menghasilkan barang -barang itu.
Di sisi lain, mari kita buat dingin yang dapat diamati menggunakan operator Defer :
observable := rxgo . Defer ([]rxgo. Producer { func ( _ context. Context , ch chan <- rxgo. Item ) {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
}})
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}Sekarang, hasilnya adalah:
0
1
2
0
1
2Dalam kasus dingin yang dapat diamati, aliran dibuat secara independen untuk setiap pengamat.
Sekali lagi, Observable panas vs dingin bukan tentang bagaimana Anda mengonsumsi item, ini tentang di mana data diproduksi.
Contoh yang baik untuk yang dapat diamati adalah kutu harga dari pertukaran perdagangan.
Dan jika Anda mengajar yang dapat diamati untuk mengambil produk dari database, maka hasilkan satu per satu, Anda akan membuat dingin yang dapat diamati.
Ada operator lain yang disebut FromEventSource yang menciptakan yang dapat diamati dari saluran. Perbedaan antara Operator FromChannel adalah bahwa segera setelah yang dapat diamati dibuat, ia mulai memancarkan item terlepas dari apakah ada pengamat atau tidak. Oleh karena itu, item yang dipancarkan oleh yang dapat diamati tanpa pengamat hilang (sementara mereka buffered dengan operator FromChannel ).
Kasus penggunaan dengan operator FromEventSource , misalnya, telemetri. Kami mungkin tidak tertarik pada semua data yang dihasilkan dari awal aliran - hanya data karena kami mulai mengamatinya.
Begitu kita mulai mengamati yang dapat diamati dengan FromEventSource , kita dapat mengonfigurasi strategi tekanan balik. Secara default, itu memblokir (ada pengiriman yang dijamin untuk barang -barang yang dipancarkan setelah kami mengamatinya). Kita dapat mengesampingkan strategi ini dengan cara ini:
observable := rxgo . FromEventSource ( input , rxgo . WithBackPressureStrategy ( rxgo . Drop )) Strategi Drop berarti bahwa jika saluran pipa setelah FromEventSource tidak siap untuk mengkonsumsi item, item ini dijatuhkan.
Secara default, operator yang menghubungkan saluran tidak buffered. Kita dapat mengesampingkan perilaku ini seperti ini:
observable . Map ( transform , rxgo . WithBufferedChannel ( 42 )) Setiap operator memiliki parameter opts ...Option yang memungkinkan untuk lulus opsi tersebut.
Strategi pengamatan default malas. Ini berarti operator memproses item yang dipancarkan oleh yang dapat diamati begitu kita mulai mengamatinya. Kita dapat mengubah perilaku ini dengan cara ini:
observable := rxgo . FromChannel ( ch ). Map ( transform , rxgo . WithObservationStrategy ( rxgo . Eager )) Dalam hal ini, operator Map dipicu setiap kali item diproduksi, bahkan tanpa pengamat.
Secara default, setiap operator berurutan. Satu operator menjadi satu instance goroutine. Kami dapat mengesampingkannya menggunakan opsi berikut:
observable . Map ( transform , rxgo . WithPool ( 32 )) Dalam contoh ini, kami membuat kumpulan 32 goroutine yang mengonsumsi item secara bersamaan dari saluran yang sama. Jika operasi terikat CPU, kita dapat menggunakan opsi WithCPUPool() yang membuat kumpulan berdasarkan jumlah CPU logis.
Yang dapat diamati dapat diamati menyerupai yang biasa dapat diamati, kecuali bahwa ia tidak mulai memancarkan item ketika ia berlangganan, tetapi hanya ketika metode Connect () dipanggil. Dengan cara ini, Anda dapat menunggu semua pelanggan yang dimaksudkan untuk berlangganan yang dapat diamati sebelum yang dapat diamati mulai memancarkan item.
Mari kita buat yang dapat dihubungkan dengan menggunakan rxgo.WithPublishStrategy :
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())Kemudian, kami membuat dua pengamat:
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) + 1 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) * 2 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) Jika observable bukan dapat diamati yang dapat dihubungkan, karena DoOnNext menciptakan pengamat, sumber yang dapat diamati akan mulai memancarkan item. Namun, dalam kasus yang dapat dihubungkan dapat diamati, kita harus memanggil Connect() :
observable . Connect () Setelah Connect() dipanggil, yang dapat dihubungkan dapat diamati untuk memancarkan item.
Ada perubahan penting lainnya dengan yang dapat diamati secara teratur. Yang dapat diamati dapat diamati menerbitkan item -itemnya. Ini berarti semua pengamat menerima salinan item.
Berikut adalah contoh dengan yang dapat diamati secara teratur:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a regular Observable
observable := rxgo . FromChannel ( ch )
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) First observer: 1
First observer: 2
First observer: 3
Sekarang, dengan yang dapat diamati dapat diamati:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a Connectable Observable
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
})
disposed , cancel := observable . Connect ()
go func () {
// Do something
time . Sleep ( time . Second )
// Then cancel the subscription
cancel ()
}()
// Wait for the subscription to be disposed
<- disposed Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
Iterable adalah objek yang dapat diamati menggunakan Observe(opts ...Option) <-chan Item .
Iterable bisa berupa:
Dokumentasi Paket: https://pkg.go.dev/github.com/reactivex/rxgo/v2
Cara menggunakan API Assert untuk menulis tes unit saat menggunakan RXGO.
Opsi Operator
Semua kontribusi sangat disambut! Pastikan Anda memeriksa pedoman yang berkontribusi terlebih dahulu. Pendatang baru dapat melihat masalah yang sedang berlangsung dan memeriksa label help needed .
Juga, jika Anda mempublikasikan posting tentang RXGO, beri tahu kami. Kami akan dengan senang hati memasukkannya ke bagian Sumber Daya Eksternal.
Terima kasih kepada semua orang yang sudah berkontribusi pada RXGO!
Terima kasih banyak kepada JetBrains yang telah mendukung proyek ini.