RxMQTTnet
4.3.56
ส่วนขยายไปยังโครงการ MQTTNET เพื่อเปลี่ยนการสมัครสมาชิกเป็น Observables และเผยแพร่จากกระแสที่สังเกตได้
ใช้ MQTTnet.MqttFactory กับ MQTTnet.Extensions.External.RxMQTT.Client.MqttFactoryExtensions
var client = new MqttFactory ( ) . CreateRxMqttClient ( ) ; ใช้ตัวเลือกไคลเอนต์ที่มีการจัดการ
await client . StartAsync ( options ) . ConfigureAwait ( false ) ; รับ IObservable<MqttApplicationMessageReceivedEventArgs> โดยเชื่อมต่อกับไคลเอนต์ RX และใช้ส่วนขยายเพื่อประมวลผลข้อความ:
var subscription = rxMqttClinet
. Connect ( "RxClientTest/#" )
. SelectPayload ( )
. Subscribe ( Console . WriteLine ) ;สิ้นสุดการสมัครสมาชิกโดยการสมัครสมาชิก
subscription . Dispose ( ) ; สร้างลำดับที่สังเกตได้ของ MqttApplicationMessage s และเผยแพร่สิ่งเหล่านี้ผ่านไคลเอนต์ RX
Observable . Interval ( TimeSpan . FromMilliseconds ( 1000 ) )
. Select ( i => new MqttApplicationMessageBuilder ( )
. WithTopic ( "RxClientTest" )
. WithPayload ( "Time: " + DateTime . Now . ToLongTimeString ( ) )
. WithQualityOfServiceLevel ( MqttQualityOfServiceLevel . ExactlyOnce )
. WithRetainFlag ( )
. Build ( ) )
. PublishOn ( mqttClient )
. Subscribe ( ) ; ใช้วิธีการเผยแพร่ไคลเอนต์ MQTT