الامتدادات التفاعلية للغة GO
ReactiveX ، أو RX لفترة قصيرة ، هو واجهة برمجة تطبيقات للبرمجة مع تدفقات يمكن ملاحظتها. هذا هو واجهة برمجة تطبيقات Reactivex الرسمية للغة.
ReactiveX هي وسيلة جديدة بديلة للبرمجة غير المتزامنة إلى عمليات الاسترجاعات والوعود والمؤجلة. يتعلق الأمر بمعالجة تدفقات الأحداث أو العناصر ، حيث تكون الأحداث أي أحداث أو تغييرات داخل النظام. يسمى تيار من الأحداث الملاحظة.
المشغل هو وظيفة تحدد الملاحظة ، وكيف ومتى يجب أن تنبعث منها البيانات. قائمة المشغلين المغطاة متوفرة هنا.
يعتمد تطبيق RXGO على مفهوم خطوط الأنابيب. خط الأنابيب هو سلسلة من المراحل المتصلة بواسطة القنوات ، حيث كل مرحلة عبارة عن مجموعة من goroutines تدير نفس الوظيفة.

دعونا نرى مثالًا ملموسًا مع كل مربع كونه مشغل:
Just .Map .Filter .في هذا المثال ، يتم إرسال العناصر النهائية في قناة ، متاحة للمستهلك. هناك العديد من الطرق للاستهلاك أو إنتاج البيانات باستخدام RXGO. نشر النتائج في القناة هو واحد منهم فقط.
كل مشغل هو مرحلة التحول. بشكل افتراضي ، كل شيء متسلسل. ومع ذلك ، يمكننا الاستفادة من بنيات وحدة المعالجة المركزية الحديثة من خلال تحديد حالات متعددة لنفس المشغل. كل مثيل مشغل كونه goroutine متصل بقناة مشتركة.
تتمثل فلسفة RXGO في تنفيذ مفاهيم ReactiveX والاستفادة من بدائل GO الرئيسية (القنوات ، goroutines ، وما إلى ذلك) بحيث يكون التكامل بين العالمين سلسًا قدر الإمكان.
go get -u github.com/reactivex/rxgo/v2
دعنا ننشئ أول عنصر لدينا ونستهلك عنصرًا:
observable := rxgo . Just ( "Hello, World!" )()
ch := observable . Observe ()
item := <- ch
fmt . Println ( item . V ) ينشئ المشغل Just ملاحظات يمكن ملاحظتها من قائمة ثابتة للعناصر. Of(value) ينشئ عنصر من قيمة معينة. إذا أردنا إنشاء عنصر من خطأ ، فيجب علينا استخدام Error(err) . هذا فرق مع V1 الذي كان يقبل قيمة أو خطأ مباشرة دون الاضطرار إلى لفها. ما هو الأساس المنطقي لهذا التغيير؟ إنه إعداد RXGO لميزة الأدوية القادمة (نأمل) في GO 2.
بالمناسبة ، يستخدم المشغل Just الكاري كسكر النحوي. وبهذه الطريقة ، يقبل عناصر متعددة في قائمة المعلمات الأولى وخيارات متعددة في قائمة المعلمة الثانية. سنرى أدناه كيفية تحديد الخيارات.
بمجرد إنشاء الملاحظة ، يمكننا مراقبة ذلك باستخدام Observe() . بشكل افتراضي ، يكون الملاحظة كسولًا بمعنى أنه ينبعث من العناصر بمجرد إجراء اشتراك. Observe() إرجاع <-chan rxgo.Item .
استهلكنا عنصرًا من هذه القناة وطباعة قيمته للعنصر باستخدام item.V
العنصر هو غلاف أعلى قيمة أو خطأ. قد نرغب في التحقق من النوع أولاً مثل هذا:
item := <- ch
if item . Error () {
return item . E
}
fmt . Println ( item . V ) item.Error() إرجاع منطقية تشير إلى ما إذا كان عنصر يحتوي على خطأ. ثم ، نستخدم إما item.E للحصول على الخطأ أو item.V للحصول على القيمة.
بشكل افتراضي ، يتم إيقاف الملاحظة بمجرد إنتاج خطأ. ومع ذلك ، هناك مشغلين خاصين للتعامل مع الأخطاء (على سبيل المثال ، OnError ، Retry ، إلخ)
من الممكن أيضًا استهلاك العناصر باستخدام عمليات الاسترجاعات:
observable . ForEach ( func ( v interface {}) {
fmt . Printf ( "received: %v n " , v )
}, func ( err error ) {
fmt . Printf ( "error: %e n " , err )
}, func () {
fmt . Println ( "observable is closed" )
})في هذا المثال ، مررنا ثلاث وظائف:
NextFunc عند انبعاث عنصر قيمة.ErrFunc عند انبعاث عنصر خطأ.CompletedFunc التي تم تشغيلها بمجرد الانتهاء من الملاحظة. ForEach غير محظور. ومع ذلك ، فإنه يعيد قناة إشعار سيتم إغلاقها بمجرد اكتمال الملاحظة. وبالتالي ، لجعل كود الكود السابق ، نحتاج ببساطة إلى استخدام <- :
<- observable. ForEach ( ... ) لنفترض أننا نريد تنفيذ دفق يستهلك بنية Customer التالية:
type Customer struct {
ID int
Name , LastName string
Age int
TaxNumber string
} نقوم بإنشاء منتج ينبعث من Customer إلى chan rxgo.Item المعطى وننشئه:
// Create the input channel
ch := make ( chan rxgo. Item )
// Data producer
go producer ( ch )
// Create an Observable
observable := rxgo . FromChannel ( ch )ثم ، نحتاج إلى إجراء العمليتين التاليتين:
نظرًا لأن الخطوة المثيرة للإثراء تكون مرتبطة بـ IO ، فقد يكون من المثير للاهتمام موازاةها داخل مجموعة معينة من goroutines. ومع ذلك ، دعونا نتخيل أن جميع عناصر Customer تحتاج إلى إنتاج متتابع بناءً على ID .
observable .
Filter ( func ( item interface {}) bool {
// Filter operation
customer := item .( Customer )
return customer . Age > 18
}).
Map ( func ( _ context. Context , item interface {}) ( interface {}, error ) {
// Enrich operation
customer := item .( Customer )
taxNumber , err := getTaxNumber ( customer )
if err != nil {
return nil , err
}
customer . TaxNumber = taxNumber
return customer , nil
},
// Create multiple instances of the map operator
rxgo . WithPool ( pool ),
// Serialize the items emitted by their Customer.ID
rxgo . Serialize ( func ( item interface {}) int {
customer := item .( Customer )
return customer . ID
}), rxgo . WithBufferedChannel ( 1 )) في النهاية ، نستهلك العناصر التي تستخدم ForEach() أو Observe() على سبيل المثال. Observe() إرجاع <-chan Item :
for customer := range observable . Observe () {
if customer . Error () {
return err
}
fmt . Println ( customer )
}في عالم RX ، هناك تمييز بين الملاحظات الباردة والساخنة. عندما يتم إنتاج البيانات من خلال الملاحظة نفسها ، يكون البرد يمكن ملاحظته. عندما يتم إنتاج البيانات خارج الملاحظة ، يكون من الساخن الملاحظة. عادة ، عندما لا نريد إنشاء منتج مرارًا وتكرارًا ، فإننا نفضل ملاحظتها الساخنة.
في RXGO ، هناك مفهوم مماثل.
أولاً ، دعنا ننشئ مشغلًا ساخنًا يمكن ملاحظته باستخدام المشغل FromChannel ونرى الآثار المترتبة على ذلك:
ch := make ( chan rxgo. Item )
go func () {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
close ( ch )
}()
observable := rxgo . FromChannel ( ch )
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}نتيجة هذا التنفيذ هي:
0
1
2
وهذا يعني أن المراقب الأول استهلك بالفعل جميع العناصر. ولم يتبق شيء للآخرين.
على الرغم من أن هذا السلوك يمكن تغييره مع الملاحظات القابلة للاتصال.
النقطة الرئيسية هنا هي goroutine التي أنتجت تلك العناصر.
من ناحية أخرى ، دعنا ننشئ بردًا يمكن ملاحظته باستخدام المشغل Defer :
observable := rxgo . Defer ([]rxgo. Producer { func ( _ context. Context , ch chan <- rxgo. Item ) {
for i := 0 ; i < 3 ; i ++ {
ch <- rxgo . Of ( i )
}
}})
// First Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}
// Second Observer
for item := range observable . Observe () {
fmt . Println ( item . V )
}الآن ، والنتيجة هي:
0
1
2
0
1
2في حالة البرد يمكن ملاحظته ، تم إنشاء التيار بشكل مستقل لكل مراقب.
مرة أخرى ، لا تتعلق Hot vs Cold Observables بكيفية استهلاك العناصر ، بل يتعلق بمكان إنتاج البيانات.
مثال جيد للملاحظة الساخنة هي قراد الأسعار من بورصة التداول.
وإذا قمت بتدريس جلب المنتجات من قاعدة بيانات ، فستخفها واحدًا تلو الآخر ، فستقوم بإنشاء البرد يمكن ملاحظته.
هناك مشغل آخر يسمى FromEventSource يخلق قناة يمكن ملاحظتها. الفرق بين مشغل FromChannel هو أنه بمجرد إنشاء الملاحظة ، يبدأ في انبعاث العناصر بغض النظر عما إذا كان هناك مراقب أم لا. وبالتالي ، تُفقد العناصر المنبعثة من قبل مراقب (مراقبون) (بينما يتم تخزينها مع مشغل FromChannel ).
حالة الاستخدام مع مشغل FromEventSource ، على سبيل المثال ، القياس عن بعد. قد لا نكون مهتمين بجميع البيانات المنتجة من بداية دفق - فقط البيانات منذ أن بدأنا في مراقبة ذلك.
بمجرد أن نبدأ في مراقبة مراقبة مصنوعة من FromEventSource ، يمكننا تكوين استراتيجية الضغط الخلفي. بشكل افتراضي ، إنه يحظر (هناك تسليم مضمون للعناصر المنبعثة بعد مراعاة ذلك). يمكننا تجاوز هذه الاستراتيجية بهذه الطريقة:
observable := rxgo . FromEventSource ( input , rxgo . WithBackPressureStrategy ( rxgo . Drop )) تعني استراتيجية Drop أنه إذا لم يكن خط الأنابيب بعد FromEventSource مستعدًا لاستهلاك عنصر ما ، فسيتم إسقاط هذا العنصر.
بشكل افتراضي ، فإن قناة توصيل مشغلي توصيلها غير محفورة. يمكننا تجاوز هذا السلوك مثل هذا:
observable . Map ( transform , rxgo . WithBufferedChannel ( 42 )) كل مشغل لديه معلمة opts ...Option تسمح بتمرير مثل هذه الخيارات.
استراتيجية المراقبة الافتراضية كسول. وهذا يعني أن المشغل يعالج العناصر المنبعثة من خلال الملاحظة بمجرد أن نبدأ في ملاحظتها. يمكننا تغيير هذا السلوك بهذه الطريقة:
observable := rxgo . FromChannel ( ch ). Map ( transform , rxgo . WithObservationStrategy ( rxgo . Eager )) في هذه الحالة ، يتم تشغيل مشغل Map كلما تم إنتاج عنصر ، حتى بدون أي مراقب.
بشكل افتراضي ، كل مشغل متسلسل. مشغل واحد هو مثال goroutine واحد. يمكننا تجاوزه باستخدام الخيار التالي:
observable . Map ( transform , rxgo . WithPool ( 32 )) في هذا المثال ، نقوم بإنشاء مجموعة من 32 goroutines تستهلك العناصر بشكل متزامن من نفس القناة. إذا كانت العملية مرتبطة بوحدة المعالجة المركزية ، فيمكننا استخدام خيار WithCPUPool() الذي ينشئ تجمعًا استنادًا إلى عدد وحدات المعالجة المركزية المنطقية.
يشبه الملاحظة القابلة للاتصال قابلة للملاحظة ، باستثناء أنه لا تبدأ في انبعاث العناصر عندما يتم الاشتراك فيها ، ولكن فقط عند استدعاء طريقة Connect (). وبهذه الطريقة ، يمكنك انتظار جميع المشتركين المقصود للاشتراك في المشاركات التي يمكن ملاحظتها قبل أن تبدأ العناصر التي يمكن ملاحظتها.
دعونا ننشئ قابلاً للتوصيل باستخدام rxgo.WithPublishStrategy :
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())ثم ، ننشئ مراقبين:
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) + 1 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
observable . Map ( func ( _ context. Context , i interface {}) ( interface {}, error ) {
return i .( int ) * 2 , nil
}). DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) إذا لم يكن observable غير قابلة للملاحظة ، حيث أن DoOnNext يخلق مراقبًا ، فإن المصدر يمكن ملاحظته قد يبدأ في انبعاث العناصر. ومع ذلك ، في حالة الملاحظة التي يمكن ملاحظتها ، يتعين علينا استدعاء Connect() :
observable . Connect () بمجرد استدعاء Connect() ، يبدأ الملاحظة القابلة للاتصال في انبعاث العناصر.
هناك تغيير مهم آخر مع ملاحظته منتظمة. ينشر الملاحظة القابلة للملاحظة عناصرها. وهذا يعني أن جميع المراقبين يتلقون نسخة من العناصر.
فيما يلي مثال على الملاحظة المنتظمة:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a regular Observable
observable := rxgo . FromChannel ( ch )
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
}) First observer: 1
First observer: 2
First observer: 3
الآن ، مع مراقبة قابلة للتوصيل:
ch := make ( chan rxgo. Item )
go func () {
ch <- rxgo . Of ( 1 )
ch <- rxgo . Of ( 2 )
ch <- rxgo . Of ( 3 )
close ( ch )
}()
// Create a Connectable Observable
observable := rxgo . FromChannel ( ch , rxgo . WithPublishStrategy ())
// Create the first Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "First observer: %d n " , i )
})
// Create the second Observer
observable . DoOnNext ( func ( i interface {}) {
fmt . Printf ( "Second observer: %d n " , i )
})
disposed , cancel := observable . Connect ()
go func () {
// Do something
time . Sleep ( time . Second )
// Then cancel the subscription
cancel ()
}()
// Wait for the subscription to be disposed
<- disposed Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
Iserable هو كائن يمكن ملاحظته باستخدام Observe(opts ...Option) <-chan Item .
يمكن أن يكون الأمر إما:
وثائق الحزمة: https://pkg.go.dev/github.com/reactivex/rxgo/v2
كيفية استخدام واجهة برمجة تطبيقات Assert لكتابة اختبارات الوحدة أثناء استخدام RXGO.
خيارات المشغل
جميع المساهمات مرحب بها للغاية! تأكد من مراجعة الإرشادات المساهمة أولاً. يمكن للوافدين الجدد إلقاء نظرة على المشكلات المستمرة والتحقق من التسمية help needed .
أيضًا ، إذا قمت بنشر منشور عن Rxgo ، فيرجى إخبارنا بذلك. سنكون سعداء بتضمينه في قسم الموارد الخارجية.
شكرا لجميع الأشخاص الذين ساهموا بالفعل في RXGO!
شكر كبير إلى Jetbrains لدعم المشروع.