RxMQTTnet
4.3.56
MQTTNET項目的擴展,將訂閱轉換為可觀察到的,並從可觀察的流發布。
將MQTTnet.MqttFactory與MQTTnet.Extensions.External.RxMQTT.Client.MqttFactoryExtensions一起使用。
var client = new MqttFactory ( ) . CreateRxMqttClient ( ) ; 使用託管客戶選項
await client . StartAsync ( options ) . ConfigureAwait ( false ) ;通過連接到RX客戶端並使用擴展IObservable<MqttApplicationMessageReceivedEventArgs>處理消息:
var subscription = rxMqttClinet
. Connect ( "RxClientTest/#" )
. SelectPayload ( )
. Subscribe ( Console . WriteLine ) ;通過處置訂閱來結束訂閱。
subscription . Dispose ( ) ;創建一個可觀察到的MqttApplicationMessage s序列,並通過RX客戶端發布它們。
Observable . Interval ( TimeSpan . FromMilliseconds ( 1000 ) )
. Select ( i => new MqttApplicationMessageBuilder ( )
. WithTopic ( "RxClientTest" )
. WithPayload ( "Time: " + DateTime . Now . ToLongTimeString ( ) )
. WithQualityOfServiceLevel ( MqttQualityOfServiceLevel . ExactlyOnce )
. WithRetainFlag ( )
. Build ( ) )
. PublishOn ( mqttClient )
. Subscribe ( ) ; 使用MQTT客戶端發布方法。