ส่วนขยายปฏิกิริยาสำหรับภาษา GO
Reactivex หรือ Rx สำหรับระยะสั้นเป็น API สำหรับการเขียนโปรแกรมที่มีสตรีมที่สังเกตได้ นี่คือ API ReactiveX อย่างเป็นทางการสำหรับภาษา GO
Reactivex เป็นวิธีใหม่ทางเลือกของการเขียนโปรแกรมแบบอะซิงโครนัสเพื่อโทรกลับสัญญาและรอการตัดบัญชี มันเกี่ยวกับการประมวลผลสตรีมของเหตุการณ์หรือรายการโดยมีเหตุการณ์เกิดขึ้นหรือการเปลี่ยนแปลงใด ๆ ภายในระบบ กระแสของเหตุการณ์เรียกว่าสิ่งที่สังเกตได้
ผู้ประกอบการเป็นฟังก์ชั่นที่กำหนดวิธีการที่สังเกตได้อย่างไรและเมื่อใดควรปล่อยข้อมูล รายชื่อผู้ให้บริการครอบคลุมมีอยู่ที่นี่
การใช้งาน RXGO ขึ้นอยู่กับแนวคิดของท่อ ไปป์ไลน์เป็นชุดของขั้นตอนที่เชื่อมต่อด้วยช่องซึ่งแต่ละขั้นตอนเป็นกลุ่มของ goroutines ที่ใช้ฟังก์ชั่นเดียวกัน

มาดูตัวอย่างคอนกรีตโดยแต่ละกล่องเป็นผู้ให้บริการ:
JustMapFilterในตัวอย่างนี้รายการสุดท้ายจะถูกส่งในช่องทางให้กับผู้บริโภค มีหลายวิธีในการบริโภคหรือผลิตข้อมูลโดยใช้ RXGO การเผยแพร่ผลลัพธ์ในช่องเป็นเพียงหนึ่งในนั้น
ผู้ประกอบการแต่ละคนเป็นขั้นตอนการเปลี่ยนแปลง โดยค่าเริ่มต้นทุกอย่างเป็นลำดับ แต่เราสามารถใช้ประโยชน์จากสถาปัตยกรรม CPU ที่ทันสมัยโดยการกำหนดอินสแตนซ์ของผู้ให้บริการเดียวกันหลายกรณี แต่ละอินสแตนซ์ของผู้ประกอบการเป็น goroutine ที่เชื่อมต่อกับช่องทางทั่วไป
ปรัชญาของ RXGO คือการใช้แนวคิด ReactiveX และใช้ประโยชน์จากการเป็นหลัก (ช่องทาง, goroutines ฯลฯ ) เพื่อให้การบูรณาการระหว่างโลกทั้งสองนั้นราบรื่นที่สุดเท่าที่จะทำได้
go get -u github.com/reactivex/rxgo/v2
มาสร้างรายการที่สังเกตได้ครั้งแรกและบริโภครายการ:
observable := rxgo . Just ( "Hello, World!" )()
ch := observable . Observe ()
item := <- ch
fmt . Println ( item . V ) ผู้ประกอบการ Just สร้างรายการที่สังเกตได้จากรายการคงที่ Of(value) สร้างรายการจากค่าที่กำหนด หากเราต้องการสร้างรายการจากข้อผิดพลาดเราต้องใช้ Error(err) นี่คือความแตกต่างกับ V1 ที่ยอมรับค่าหรือข้อผิดพลาดโดยตรงโดยไม่ต้องห่อ เหตุผลสำหรับการเปลี่ยนแปลงนี้คืออะไร? มันคือการเตรียม RXGO สำหรับคุณสมบัติทั่วไปที่กำลังจะมาถึง (หวังว่า) ใน GO 2
โดยวิธีการที่ผู้ประกอบการ Just ใช้แกงกะหรี่เป็นน้ำตาลวากยสัมพันธ์ ด้วยวิธีนี้จะยอมรับหลายรายการในรายการพารามิเตอร์แรกและหลายตัวเลือกในรายการพารามิเตอร์ที่สอง เราจะเห็นวิธีระบุตัวเลือกด้านล่าง
เมื่อสร้างที่สังเกตได้เราสามารถสังเกตได้โดยใช้ Observe() โดยค่าเริ่มต้นสิ่งที่สังเกตได้นั้นขี้เกียจในแง่ที่ว่ามันปล่อยรายการเพียงครั้งเดียวเมื่อมีการสมัครสมาชิก Observe() ส่งคืน <-chan rxgo.Item
เราบริโภครายการจากช่องนี้และพิมพ์ค่าของรายการโดยใช้ item.V
รายการคือเสื้อคลุมที่อยู่ด้านบนของค่าหรือข้อผิดพลาด เราอาจต้องการตรวจสอบประเภทก่อนเช่นนี้:
item := <- ch
if item . Error () {
return item . E
}
fmt . Println ( item . V ) item.Error() ส่งคืนบูลีนระบุว่ารายการมีข้อผิดพลาดหรือไม่ จากนั้นเราใช้รายการใด item.E หนึ่งเพื่อรับข้อผิดพลาดหรือ item.V เพื่อรับค่า
โดยค่าเริ่มต้นการสังเกตจะหยุดลงเมื่อเกิดข้อผิดพลาด อย่างไรก็ตามมีผู้ให้บริการพิเศษที่จะจัดการกับข้อผิดพลาด (เช่น OnError , Retry ฯลฯ )
นอกจากนี้ยังเป็นไปได้ที่จะบริโภครายการโดยใช้การโทรกลับ:
observable . ForEach ( func ( v interface {}) {
fmt . Printf ( "received: %v n " , v )
}, func ( err error ) {
fmt . Printf ( "error: %e n " , err )
}, func () {
fmt . Println ( "observable is closed" )
})ในตัวอย่างนี้เราผ่านสามฟังก์ชั่น:
NextFunc ที่ถูกเรียกใช้เมื่อมีการปล่อยไอเท็มค่าErrFunc ทริกเกอร์เมื่อปล่อยรายการข้อผิดพลาดCompletedFunc เรียกใช้เมื่อสังเกตได้เสร็จแล้ว ForEach ไม่ปิดกั้น แต่มันจะส่งคืนช่องทางการแจ้งเตือนที่จะปิดเมื่อสามารถสังเกตได้เสร็จสิ้น ดังนั้นเพื่อให้การบล็อกรหัสก่อนหน้านี้เราเพียงแค่ต้องใช้ <- :
<- observable. ForEach ( ... ) สมมติว่าเราต้องการใช้สตรีมที่ใช้โครงสร้าง Customer ต่อไปนี้:
type Customer struct {
ID int
Name , LastName string
Age int
TaxNumber string
} เราสร้างโปรดิวเซอร์ที่จะปล่อย Customer ให้กับ chan rxgo.Item ที่กำหนดและสร้างสิ่งที่สังเกตได้จากมัน:
// Create the input channel
ch := make ( chan rxgo. Item )
// Data producer
go producer ( ch )
// Create an Observable
observable := rxgo . FromChannel ( ch )จากนั้นเราจำเป็นต้องดำเนินการทั้งสองต่อไปนี้:
เนื่องจากขั้นตอนที่เพิ่มคุณค่าคือการผูกมัด IO มันอาจเป็นเรื่องที่น่าสนใจที่จะขนานกันภายในสระว่ายน้ำของ Goroutines แต่ลองจินตนาการว่ารายการ Customer ทั้งหมดจำเป็นต้องมีการผลิตตามลำดับตาม ID
observable .
Filter ( func ( item interface {}) bool {
// Filter operation
customer := item .( Customer )
return customer . Age > 18
}).
Map ( func ( _ context. Context , item interface {}) ( interface {}, error ) {
// Enrich operation
customer := item .( Customer )
taxNumber , err := getTaxNumber ( customer )
if err != nil {
return nil , err
}
customer . TaxNumber = taxNumber
return customer , nil
},
// Create multiple instances of the map operator
rxgo . WithPool ( pool ),
// Serialize the items emitted by their Customer.ID
rxgo . Serialize ( func ( item interface {}) int {
customer := item .( Customer )
return customer . ID
}), rxgo . WithBufferedChannel ( 1 )) ในท้ายที่สุดเราบริโภครายการที่ใช้ ForEach() หรือ Observe() ตัวอย่างเช่น Observe() ส่งคืน <-chan Item :
for customer := range observable . Observe () {
if customer . Error () {
return err
}
fmt . Println ( customer )
}ในโลก RX มีความแตกต่างระหว่างความเย็นและความร้อนที่สังเกตได้ เมื่อข้อมูลถูกผลิตโดยตัวเองที่สังเกตได้มันเป็นความเย็นที่สังเกตได้ เมื่อข้อมูลถูกสร้างขึ้นนอกสิ่งที่สังเกตได้มันเป็นสิ่งที่สังเกตได้ร้อน โดยปกติเมื่อเราไม่ต้องการสร้างโปรดิวเซอร์ซ้ำแล้วซ้ำอีกเราก็ชอบที่จะสังเกตได้
ใน RXGO มีแนวคิดที่คล้ายกัน
ก่อนอื่นมาสร้าง ความร้อน ที่สังเกตได้โดยใช้ตัวดำเนินการ FromChannel และดูความหมาย:
ch := make ( chan rxgo. Item )
go func () {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
close ( ch )
}()
observable := rxgo . FromChannel ( ch )
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}ผลของการดำเนินการนี้คือ:
0
1
2
มันหมายถึงผู้สังเกตการณ์คนแรกบริโภครายการทั้งหมดแล้ว และไม่มีอะไรเหลือสำหรับคนอื่น
แม้ว่าพฤติกรรมนี้สามารถเปลี่ยนแปลงได้ด้วยการสังเกตที่เชื่อมต่อได้
ประเด็นหลักที่นี่คือ Goroutine ที่ผลิตรายการเหล่านั้น
ในทางกลับกันเรามาสร้าง ความเย็น ที่สังเกตได้โดยใช้ตัวดำเนินการ Defer :
observable := rxgo . Defer ([]rxgo. Producer { func ( _ context. Context , ch chan <- rxgo. Item ) {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
}})
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}ตอนนี้ผลลัพธ์คือ:
0
1
2
0
1
2ในกรณีของความเย็นที่สังเกตได้สตรีมถูกสร้างขึ้นอย่างอิสระสำหรับผู้สังเกตการณ์ทุกคน
อีกครั้ง Hot Vs Cold Observables ไม่ได้เกี่ยวกับวิธีการบริโภคสิ่งของ แต่เป็นเรื่องเกี่ยวกับการผลิตข้อมูล
ตัวอย่างที่ดีสำหรับการสังเกตที่น่าสนใจคือราคาเห็บจากการแลกเปลี่ยนการซื้อขาย
และถ้าคุณสอนสิ่งที่สังเกตได้ในการดึงผลิตภัณฑ์จากฐานข้อมูลให้ให้พวกเขาทีละคนคุณจะสร้าง ความเย็น ที่สังเกตได้
มีผู้ให้บริการรายอื่นที่เรียกว่า FromEventSource ที่สร้างสิ่งที่สังเกตได้จากช่อง ความแตกต่างระหว่างผู้ประกอบการ FromChannel คือทันทีที่การสังเกตสามารถสร้างได้มันจะเริ่มปล่อยรายการโดยไม่คำนึงถึงว่ามีผู้สังเกตการณ์หรือไม่ ดังนั้นรายการที่ปล่อยออกมาโดยผู้สังเกตการณ์โดยไม่มีผู้สังเกตการณ์จะหายไป (ในขณะที่พวกเขาถูกบัฟเฟอร์ด้วยตัวดำเนินการ FromChannel )
กรณีการใช้งานที่มีตัวดำเนินการ FromEventSource เป็นตัวอย่างเช่น telemetry เราอาจไม่สนใจข้อมูลทั้งหมดที่ผลิตตั้งแต่ต้นกระแส - เพียงข้อมูลตั้งแต่เราเริ่มสังเกต
เมื่อเราเริ่มสังเกตสิ่งที่สังเกตได้ซึ่งสร้างขึ้นด้วย FromEventSource เราสามารถกำหนดค่ากลยุทธ์การแบ็คกดดันได้ โดยค่าเริ่มต้นจะมีการปิดกั้น (มีการจัดส่งที่รับประกันสำหรับรายการที่ปล่อยออกมาหลังจากที่เราสังเกต) เราสามารถแทนที่กลยุทธ์นี้ด้วยวิธีนี้:
observable := rxgo . FromEventSource ( input , rxgo . WithBackPressureStrategy ( rxgo . Drop )) กลยุทธ์ Drop หมายความว่าหากท่อส่งหลังจาก FromEventSource ยังไม่พร้อมที่จะบริโภครายการรายการนี้จะลดลง
โดยค่าเริ่มต้นตัวดำเนินการเชื่อมต่อช่องจะไม่ได้รับการบัฟเฟอร์ เราสามารถแทนที่พฤติกรรมนี้เช่นนี้:
observable . Map ( transform , rxgo . WithBufferedChannel ( 42 )) ผู้ประกอบการแต่ละคนมีการ opts ...Option ที่อนุญาตให้ผ่านตัวเลือกดังกล่าว
กลยุทธ์การสังเกตเริ่มต้นนั้นขี้เกียจ มันหมายถึงผู้ประกอบการประมวลผลรายการที่ปล่อยออกมาโดยที่สังเกตได้เมื่อเราเริ่มสังเกต เราสามารถเปลี่ยนพฤติกรรมนี้ด้วยวิธีนี้:
observable := rxgo . FromChannel ( ch ). Map ( transform , rxgo . WithObservationStrategy ( rxgo . Eager )) ในกรณีนี้ผู้ให้บริการ Map จะถูกเรียกใช้เมื่อใดก็ตามที่มีการผลิตรายการแม้ว่าจะไม่มีผู้สังเกตการณ์ก็ตาม
โดยค่าเริ่มต้นผู้ประกอบการแต่ละคนจะเป็นลำดับ ผู้ประกอบการหนึ่งคนเป็นหนึ่งอินสแตนซ์ goroutine เราสามารถแทนที่ด้วยตัวเลือกต่อไปนี้:
observable . Map ( transform , rxgo . WithPool ( 32 )) ในตัวอย่างนี้เราสร้างพูล 32 goroutines ที่ใช้รายการพร้อมกันจากช่องทางเดียวกัน หากการดำเนินการเป็น CPU-bound เราสามารถใช้ตัวเลือก WithCPUPool() ที่สร้างพูลตามจำนวน CPU แบบลอจิคัล
การเชื่อมต่อที่สามารถเชื่อมต่อได้คล้ายกับที่สังเกตได้ทั่วไปยกเว้นว่ามันไม่ได้เริ่มปล่อยไอเท็มเมื่อสมัครเป็นสมาชิก แต่เฉพาะเมื่อมีการเรียกวิธีการเชื่อมต่อ () ด้วยวิธีนี้คุณสามารถรอให้สมาชิกที่ตั้งใจไว้ทั้งหมดสมัครสมาชิกที่สังเกตได้ก่อนที่ผู้สังเกตจะเริ่มต้นเปล่งรายการ
มาสร้างสิ่งที่สามารถเชื่อมต่อได้โดยใช้ rxgo.WithPublishStrategy :
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())จากนั้นเราสร้างผู้สังเกตการณ์สองคน:
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) + 1 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) * 2 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) หาก observable ไม่สามารถสังเกตได้ที่สามารถสังเกตได้เช่นเดียวกับ DoOnNext สร้างผู้สังเกตการณ์แหล่งที่สังเกตได้จะเริ่มปล่อยไอเท็ม แต่ในกรณีของการสังเกตที่สามารถเชื่อมต่อได้เราต้องโทรหา Connect() :
observable . Connect () เมื่อเรียกว่า Connect() แล้วการเชื่อมต่อที่สามารถเชื่อมต่อได้จะเริ่มปล่อยไอเท็ม
มีการเปลี่ยนแปลงที่สำคัญอีกอย่างหนึ่งที่สังเกตได้เป็นประจำ การเชื่อมต่อที่เชื่อมต่อได้เผยแพร่รายการ หมายความว่าผู้สังเกตการณ์ทั้งหมดได้รับสำเนาของรายการ
นี่คือตัวอย่างที่มีสิ่งที่สังเกตได้เป็นประจำ:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a regular Observable
observable := rxgo . FromChannel ( ch )
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) First observer: 1
First observer: 2
First observer: 3
ตอนนี้ด้วยการเชื่อมต่อที่สามารถสังเกตได้:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a Connectable Observable
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
})
disposed , cancel := observable . Connect ()
go func () {
// Do something
time . Sleep ( time . Second )
// Then cancel the subscription
cancel ()
}()
// Wait for the subscription to be disposed
<- disposed Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
การทำซ้ำเป็นวัตถุที่สามารถสังเกตได้โดยใช้ Observe(opts ...Option) <-chan Item
การทำซ้ำสามารถเป็นได้เช่นกัน:
เอกสารประกอบแพ็คเกจ: https://pkg.go.dev/github.com/reactivex/rxgo/v2
วิธีใช้ ASSERT API เพื่อเขียนการทดสอบหน่วยในขณะที่ใช้ RXGO
ตัวเลือกผู้ประกอบการ
การบริจาคทั้งหมดยินดีเป็นอย่างยิ่ง! ให้แน่ใจว่าคุณตรวจสอบแนวทางที่มีส่วนร่วมก่อน ผู้มาใหม่สามารถดูปัญหาอย่างต่อเนื่องและตรวจสอบ help needed
นอกจากนี้หากคุณเผยแพร่โพสต์เกี่ยวกับ RXGO โปรดแจ้งให้เราทราบ เรายินดีที่จะรวมไว้ในส่วนทรัพยากรภายนอก
ขอบคุณทุกคนที่มีส่วนร่วมกับ RXGO แล้ว!
ขอบคุณมากสำหรับ Jetbrains ที่สนับสนุนโครงการ