MQTT.JS是MQTT协议的客户端库,用Node.js和浏览器编写的JavaScript编写。
MQTT.JS是一个开放的开源项目,请参阅贡献部分,以了解这意味着什么。
v5.0.0 (07/2023)
new创建MqttClient实例时。v4.0.0 (发布04/2020)删除了对所有终止节点版本的支持,现在支持节点V12和V14。它还为调试日志记录以及一些功能添加而增加了改进。
当破裂的变化时,默认情况下,MQTT.JS客户端中内置了错误处理程序,因此,如果发出任何错误并且用户未在客户端上创建事件处理程序以解决错误,则客户端不会因未手动错误而破坏。此外,典型的TLS错误(例如ECONNREFUSED已将ECONNRESET添加到TLS错误列表中,该列表将从MQTT.JS客户端发出,因此可以作为连接错误处理。
v3.0.0增加了对MQTT 5的支持,对节点V10.X的支持以及许多修复以提高可靠性。
注意: MQTT V5支持是实验性的,因为它尚未由经纪人实施。
v2.0.0删除了对节点v0.8,v0.10和v0.12的支持,并且发送数据包的速度快3倍。它还删除了v1.0.0中的所有弃用功能,主要是mqtt.createConnection和mqtt.Server 。从v2.0.0来看,重新连接时恢复订阅,如果clean: true 。 v1.xx现在在LTS中,只要有v0.8,v0.10和v0.12用户,它将继续受到支持。
当破裂的更改时,删除了旧客户端中的encoding选项,现在所有内容都是UTF-8,除了连接消息中的password和发布消息中的payload ,这是Buffer 。
另一个打破更改是MQTT.JS现在默认为MQTT v3.1.1,因此要支持旧经纪人,请阅读客户端选项文档。
V1.0.0改进了项目的整体体系结构,该架构现在分为三个组件:MQTT.JS保持客户端,MQTT-Connection包括用于服务器端使用的Barrebone连接代码,MQTT-Packet包括协议Parser和Generator。新客户将性能提高了30%的因素,嵌入了Websocket支持(现在已弃用了MOWS),并且对QoS 1和2的支持更好。先前的API仍得到支持,但不推荐使用,因此在此读书中没有记录下来。
npm install mqtt --save为了简单起见,让我们将订户和发布者放在同一文件中:
const mqtt = require ( "mqtt" ) ;
const client = mqtt . connect ( "mqtt://test.mosquitto.org" ) ;
client . on ( "connect" , ( ) => {
client . subscribe ( "presence" , ( err ) => {
if ( ! err ) {
client . publish ( "presence" , "Hello mqtt" ) ;
}
} ) ;
} ) ;
client . on ( "message" , ( topic , message ) => {
// message is Buffer
console . log ( message . toString ( ) ) ;
client . end ( ) ;
} ) ;输出:
Hello mqttMQTT.JS可用于React Native应用程序。要使用它,请参见React本地示例
如果您想运行自己的MQTT经纪人,则可以使用蚊子或艾德斯-CLI并启动它。
您也可以使用测试实例:test.mosquitto.org。
如果您不想安装单独的经纪人,则可以尝试使用AEDES。
const mqtt = require ( "mqtt" ) // require mqtt
const client = mqtt . connect ( "mqtt://test.mosquitto.org" ) // create a client import mqtt from "mqtt" ; // import namespace "mqtt"
let client = mqtt . connect ( "mqtt://test.mosquitto.org" ) ; // create a client import { connect } from "mqtt" ; // import connect from mqtt
let client = connect ( "mqtt://test.mosquitto.org" ) ; // create a client mqtt.js捆绑了一个命令与经纪人互动。为了使其在路径上可用,您应该在全球安装mqtt.js:
npm install mqtt -g然后,在一个终端上
mqtt sub -t ' hello ' -h ' test.mosquitto.org ' -v在另一个
mqtt pub -t ' hello ' -h ' test.mosquitto.org ' -m ' from MQTT.js '有关命令mqtt help <command> 。
mqtt.js使用调试软件包进行调试目的。要启用调试日志,请在运行时添加以下环境变量:
# ( example using PowerShell, the VS Code default )
$env:DEBUG='mqttjs*' 任何WebSocket连接的重要部分是连接下降并且客户端需要重新连接时该怎么办。 MQTT具有内置的重新连接支持,可以配置为以适合应用程序的方式行事。
transformWsUrl签名URL(仅WebSocket)当MQTT连接下降并且需要重新连接时,通常要求与连接相关的任何身份验证都与基础验证机制保持最新。例如,某些应用程序可能会通过初始连接上的连接选项传递一个auth令牌,而其他云服务可能需要在每个连接时签名URL。
到重新连接发生在应用程序生命周期中时,原始验证数据可能已经过期了。
为了解决这个问题,我们可以使用称为transformWsUrl的钩子在重新连接时操纵连接URL或客户端选项。
示例(在每个重新连接上更新客户端和用户名):
const transformWsUrl = ( url , options , client ) => {
client . options . username = `token= ${ this . get_current_auth_token ( ) } ` ;
client . options . clientId = ` ${ this . get_updated_clientId ( ) } ` ;
return ` ${ this . get_signed_cloud_url ( url ) } ` ;
}
const connection = await mqtt . connectAsync ( < wss url > , {
... ,
transformWsUrl : transformUrl ,
} );现在,每次打开新的Websocket连接(希望不太频繁)时,我们将获得新的签名URL或新的验证令牌数据。
注意:当前,此钩子不支持承诺,这意味着要使用最新的auth代币,您必须具有一些运行的外部机制来处理应用程序级的身份验证刷新,以便Websocket Connection可以简单地抓住最新的有效令牌或已签名的URL。
createWebsocket自定义Websocket(仅Websocket)当您需要添加自定义的Websocket子协议或标题以通过具有自定义身份验证的代理打开连接,此回调允许您创建自己的Websocket实例,该实例将在MQTT客户端中使用。
const createWebsocket = ( url , websocketSubProtocols , options ) => {
const subProtocols = [
websocketSubProtocols [ 0 ] ,
'myCustomSubprotocolOrOAuthToken' ,
]
return new WebSocket ( url , subProtocols )
}
const client = await mqtt . connectAsync ( < wss url > , {
... ,
createWebsocket : createWebsocket ,
} );reconnectPeriod选项重新连接为了确保MQTT客户端自动尝试在连接丢弃时重新连接,必须将client option option reconnectPeriod设置为大于0的值。值为0的值将禁用重新连接,然后在下降时终止该值。
默认值为1000 ms,这意味着它将在失去连接后尝试重新连接1秒。
请注意,这只能在连接超时或成功连接后重新连接。它将(默认情况下)启用重试的连接,这些连接被服务器积极拒绝Connack错误。
为了启用自动重新连接Connack错误,请设置reconnectOnConnackError: true 。
如果客户端设置了选项autoUseTopicAlias:true ,则MQTT.JS会自动使用现有主题别名。
示例场景:
1. PUBLISH topic: ' t1 ' , ta:1 (register)
2. PUBLISH topic: ' t1 ' - > topic: ' ' , ta:1 (auto use existing map entry)
3. PUBLISH topic: ' t2 ' , ta:1 (register overwrite)
4. PUBLISH topic: ' t2 ' - > topic: ' ' , ta:1 (auto use existing map entry based on the receent map)
5. PUBLISH topic: ' t1 ' (t1 is no longer mapped to ta:1)用户不需要管理哪个主题映射到哪个主题别名。如果用户想注册主题别名,请使用主题别名发布主题。如果用户想使用主题别名,请在没有主题别名的情况下发布主题。如果有映射的主题别名,则将其添加为属性,然后将主题更新为空字符串。
如果客户端设置了选项autoAssignTopicAlias:true ,则MQTT.JS会自动使用现有主题别名。如果没有主题别名,则自动分配一个新的空缺主题别名。如果主题别名充分使用,则LRU(最近使用的)主题 - 阿里亚利亚输入将被覆盖。
示例场景:
The broker returns CONNACK (TopicAliasMaximum:3)
1. PUBLISH topic: ' t1 ' - > ' t1 ' , ta:1 (auto assign t1:1 and register)
2. PUBLISH topic: ' t1 ' - > ' ' , ta:1 (auto use existing map entry)
3. PUBLISH topic: ' t2 ' - > ' t2 ' , ta:2 (auto assign t1:2 and register. 2 was vacant)
4. PUBLISH topic: ' t3 ' - > ' t3 ' , ta:3 (auto assign t1:3 and register. 3 was vacant)
5. PUBLISH topic: ' t4 ' - > ' t4 ' , ta:1 (LRU entry is overwritten)此外,用户也可以使用发布主题手动注册主题 - alias对:'some',ta:x。它可以与自动主题别名分配效果很好。
mqtt.connect()mqtt.connectAsync()mqtt.Client()mqtt.Client#connect()mqtt.Client#publish()mqtt.Client#publishAsync()mqtt.Client#subscribe()mqtt.Client#subscribeAsync()mqtt.Client#unsubscribe()mqtt.Client#unsubscribeAsync()mqtt.Client#end()mqtt.Client#endAsync()mqtt.Client#removeOutgoingMessage()mqtt.Client#reconnect()mqtt.Client#handleMessage()mqtt.Client#connectedmqtt.Client#reconnectingmqtt.Client#getLastMessageId()mqtt.Store()mqtt.Store#put()mqtt.Store#del()mqtt.Store#createStream()mqtt.Store#close()连接到给定URL指定的经纪人和选项,并返回客户端。
URL可以按以下协议:“ MQTT','MQTTS','TCP','tls','ws','wss','wss','wxs','alis'。如果要连接到UNIX套接字,只需将+unix后缀附加到协议(例如: mqtt+unix )。这将自动设置unixSocket属性。
URL也可以是URL.parse()返回的对象,在这种情况下,两个对象已合并,即,您可以将单个对象传递给具有URL和Connect选项的单个对象。
您还可以用内容指定servers选项: [{ host: 'localhost', port: 1883 }, ... ] ,在这种情况下,每个连接都迭代数组。
对于所有与MQTT相关的选项,请参见客户端构造函数。
connect功能周围的异步包装器。
当客户端触发'connect'或'end'事件时,返回将解析为mqtt.Client实例的Promise ,或者如果触发了'error' ,则会拒绝错误。
请注意, manualConnect选项将导致此功能返回的承诺永远不会解析或拒绝,因为基础客户从未触发任何事件。
Client类通过任意传输方法(TCP,TLS,WebSocket,ECC)将客户连接与MQTT代理相关联。 Client是一个拥有自己的活动的eventemitter
Client自动处理以下内容:
论点是:
streamBuilder是返回支持connect事件的Stream类的子类的函数。通常是net.Socket 。options是客户端连接选项(请参阅:连接数据包)。默认值: wsOptions :是Websocket连接选项。默认值为{} 。这是针对Websocket的。有关可能的选项,请查看:https://github.com/websockets/ws/blob/master/master/doc/ws.md。
keepalive : 60秒,设置为0以禁用
reschedulePings :发送数据包后重新安排ping消息(默认为true )
clientId : 'mqttjs_' + Math.random().toString(16).substr(2, 8)
protocolId : 'MQTT'
protocolVersion : 4
clean : true ,设置为false以接收QoS 1和2邮件,而离线
reconnectPeriod : 1000毫秒,两个重新连接之间的间隔。通过设置为0 ,禁用自动重新连接。
reconnectOnConnackError : false ,是否还要重新连接是否有错误收到Connack。
connectTimeout : 30 * 1000毫秒,时间等待,然后再收到Connack
username :您的经纪人要求的用户名,如果有的话
password :经纪人要求的密码,如果有的话
incomingStore :传入数据包的商店
outgoingStore :外卖包的商店
queueQoSZero :如果连接断开,排队发出的QoS零消息(默认为true )
customHandleAcks :MQTT 5自定义处理Puback和PubRec数据包的功能。它的回调:
customHandleAcks: function ( topic , message , packet , done ) { /*some logic with calling done(error, reasonCode)*/ } autoUseTopicAlias :使用功能启用自动主题别名
autoAssignTopicAlias :启用自动主题别名分配功能
properties :属性MQTT 5.0。支持以下属性的object :
sessionExpiryInterval :以秒number表示会话的到期间隔,receiveMaximum :接收最大值number ,maximumPacketSize :表示客户端愿意接受number最大数据包大小,topicAliasMaximum :表示主题别名最大值表示客户端将接受为服务器number发送的主题别名,requestResponseInformation :客户端使用此值来请求服务器在Connack boolean中返回响应信息,requestProblemInformation :客户端使用此值来指示在失败的情况下,字符串或用户属性是否发送的boolean ,userProperties :允许用户属性多次出现以表示多个名称,值对object ,authenticationMethod :用于扩展身份验证string身份验证方法的名称,authenticationData :包含身份验证数据binary数据authPacket :auth分组object的设置
will :当客户端断开连接时,经纪人将自动发送的消息。格式是:
topic :发表的主题payload :发布的消息qos :QoSretain :保留旗properties :MQTT 5.0的意志属性:willDelayInterval :表示将以秒数number Will延迟间隔,payloadFormatIndicator :WILL消息是UTF-8编码字符数据或不是boolean ,messageExpiryInterval :值是WILL消息的寿命秒内,当服务器发布Will Message number ,contentType :描述意志消息string的内容,responseTopic :字符串用作响应消息string的主题名称,correlationData :请求消息的发件人使用了相关数据,以识别响应消息是在收到binary何时响应消息的情况下,userProperties :允许用户属性多次出现以表示多个名称,值对object transformWsUrl :仅适用于WS/WSS协议的可选(url, options, client) => url函数。可用于实现签名URL,重新连接后可能已过期。
createWebsocket :可选的url, websocketSubProtocols, options) => Websocket函数仅适用于WS/WSS协议。可用于实现自定义的Websock子协议或实现。
resubscribe :如果连接断开并重新连接,则再次自动订阅订阅的主题(默认为true )
messageIdProvider :自定义MessageID提供商。当设置new UniqueMessageIdProvider()时,则提供非冲突消息ID。
log :自定义日志功能。默认使用调试软件包。
manualConnect :防止要调用connect构造函数。在这种情况下,在称为mqtt.connect之后,您应该手动调用client.connect 。
timerVariant :默认为auto ,它试图确定哪个计时器最适合您的环境,如果您有检测问题,则可以将其设置为worker或native 。如果不适合您,则可以通过设置和清除属性传递计时器对象:
timerVariant: {
set : ( func , timer ) => setInterval ( func , timer ) ,
clear : ( id ) => clearInterval ( id )
} forceNativeWebSocket :如果您有检测问题(即ws does not work in the browser ),则将其设置为True,以强制使用本机Weberocket。重要的是要注意,如果为创建的第一个客户端设置为true,则所有客户端将使用本机WebSocket。相反,如果未设置或设置为false,所有这些都将使用检测结果。
unixSocket :如果要连接到Unix插座,请将其设置为true
如果需要MQTTS(TLS上的MQTT),则options对象将通过tls.connect()传递。如果使用自签名的证书,请设置rejectUnauthorized: false 。但是,要谨慎,因为这会使您在中间攻击中暴露于潜在的人中,并且不建议进行生产。
对于那些支持单个端口上多个TLS协议的人,例如WSS上的MQTT和MQTT,请使用ALPNProtocols选项。这使您可以定义应用程序层协议协议(ALPN)协议。您可以根据设置将ALPNProtocols设置为字符串阵列,缓冲区或UINT8ARRAY。
如果您要连接到仅支持MQTT 3.1(不符合3.1.1)的经纪人,则应通过以下其他选项:
{
protocolId : 'MQIsdp' ,
protocolVersion : 3
}这在RabbitMQ 3.2.4和蚊子<1.3上得到了证实。蚊子1.3和1.4版本没有这些。
'connect' function (connack) {}
在成功的(RE)连接(即Connack RC = 0)上排放。
connack收到了Connack包。当clean连接选项为false且服务器具有以前的clientId连接选项会话时,则connack.sessionPresent标志为true 。在这种情况下,您可以依靠存储的会话,并且不愿意为客户端发送订阅命令。 'reconnect' function () {}
重新连接开始时发出。
'close' function () {}
断开后发出。
'disconnect' function (packet) {}
从经纪人收到断开数据包后发出。 MQTT 5.0功能。
'offline' function () {}
客户离线时发出。
'error' function (error) {}
当客户端无法连接时发射(即Connack RC!= 0)或发生解析错误时。
以下TLS错误将作为一个error事件发出:
ECONNREFUSEDECONNRESETEADDRINUSEENOTFOUND'end' function () {}
当调用mqtt.Client#end()时发出。如果将回调传递给mqtt.Client#end() ,则一旦回调返回,就会发出此事件。
'message' function (topic, message, packet) {}
客户收到发布数据包时发出
topic主题message有效载荷packet ,如MQTT Packet所定义'packetsend' function (packet) {}
客户发送任何数据包时发出。这包括.publishiped()数据包以及MQTT用于管理订阅和连接的数据包
packet ,如MQTT Packet所定义'packetreceive' function (packet) {}
客户收到任何数据包时发出。这包括来自订阅主题的数据包以及MQTT用于管理订阅和连接的数据包
packet ,如MQTT Packet所定义默认情况下,client在调用构造函数时连接。为了防止这种情况,您可以将manualConnect选项设置为true ,并手动调用client.connect() 。
将消息发布到主题
topic是要发布到String主题message是要发布, Buffer或String消息options是要发布的选项,包括:qos QoS级别, Number ,默认值0retain标志, Boolean ,默认为falsedup标记为重复标志, Boolean ,默认错误falseproperties :MQTT 5.0属性objectpayloadFormatIndicator :有效载荷是UTF-8编码字符数据或不是boolean ,messageExpiryInterval :以秒number应用程序消息的寿命,topicAlias :用于识别主题而不是使用主题名称number值,responseTopic :字符串用作响应消息string的主题名称,correlationData :请求消息的发件人使用以识别响应消息是在接收到binary何时的请求,userProperties :允许用户属性多次出现以表示多个名称,值对object ,subscriptionIdentifier :代表订阅number的标识符,contentType :描述应用程序消息的内容string的字符串cbStorePut function () ,如果QoS为1或2 ,则将消息输入outgoingStore时发射。callback - function (err, packet) ,QoS处理完成时或在QoS 0时在下一个tick上触发。如果客户端断开连接,则会发生错误。异步publish 。返回Promise<Packet | undefined> 。
messageId属性的任何内容。订阅主题或主题
topic是要订阅的String主题或要订阅的Array主题。它也可以是一个对象,它作为对象键作为主题名称,并作为QoS值,例如{'test1': {qos: 0}, 'test2': {qos: 1}} 。支持MQTT topic通配符字符(单级+ #对于多级别)options是要订阅的选项,包括:qos QoS订阅级别,默认为0nl没有本地MQTT 5.0标志(如果值为真,则不得将应用程序消息转发到与clientID等于发布连接的客户端的连接)rap保留为已发布的MQTT 5.0标志(如果为true,则使用此订阅转发的应用程序消息保持其发布的保留标志。如果是错误的,则使用此订阅转发的应用程序消息将rearain标志设置为0。)rh保留处理MQTT 5.0(此选项指定在建立订阅时是否发送保留消息。)properties : objectsubscriptionIdentifier :代表订阅number的标识符,userProperties :允许用户属性多次出现以表示多个名称,值对objectcallback - function (err, granted)在Suback上触发的回调:err订阅错误或客户断开连接时发生的错误granted是{topic, qos}的数组:topic是对主题的订阅qos是理所当然的QoS级别异步subscribe 。返回Promise<ISubscriptionGrant[]> 。
退订主题或主题
topic是一个String主题或一系列主题,要取消订阅options :取消订阅的选项。properties : objectuserProperties :允许用户属性多次出现以表示多个名称,值对objectcallback - function (err) ,在Unsubak上发射。如果客户端断开连接,则会发生错误。异步unsubscribe 。返回Promise<void> 。
关闭客户,接受以下选项:
force :将其传递给真实将立即关闭客户,而无需等待飞行中的消息。此参数是可选的。options :断开连接的选项。reasonCode :断开原因代码numberproperties : objectsessionExpiryInterval :以秒number表示会话的到期间隔,reasonString :表示断开string的原因,userProperties :允许用户属性多次出现以表示多个名称,值对object ,serverReference :客户端可以使用的字符串来识别另一台服务器以使用stringcallback :客户关闭时将被调用。此参数是可选的。异步end 。返回Promise<void> 。
从外向店中删除消息。如果删除该消息,将会通过错误(“删除消息”)调用回调。
调用此函数后,释放消息ID并将重复使用。
mId :外向店里的消息的消息。使用与Connect()相同的选项再次连接
带有背压支持的消息,一次。随意覆盖,但始终致电callback ,否则客户将悬挂。
布尔值:如果客户端连接,则设置为true 。否则为false 。
号码:获取最后一条消息ID。这仅用于发送消息。
布尔值:如果客户端试图重新连接到服务器,则设置为true 。否则为false 。
消息存储的内存实现。
options是商店选项:clean :调用关闭时的true ,干净的机上消息(默认为true ) mqtt.Store的其他实现:
将数据包添加到商店中,一个数据包是具有messageId属性的任何内容。存储数据包时调用回调。
在商店中使用所有数据包创建流。
从商店中删除数据包,一个数据包是具有messageId属性的任何内容。删除数据包后,调用回调。
关闭商店。
重要的
浏览器中支持的唯一协议是Websocket上的MQTT,因此您必须使用ws://或wss://协议。
当WS模块在NodeJS中使用时,Websocket用于浏览器中。这对用户完全透明,除了以下内容:
浏览器不支持wsOption 。
出于安全原因,浏览器不允许捕获许多WebSocket错误,例如:
访问此信息可以允许恶意网页获取有关您的网络的信息,因此他们需要浏览器以无法区分的方式报告所有连接时间错误。
因此,聆听client.on('error')可能不会捕获您在nodejs env中会遇到的所有错误。
MQTT.JS使用Esbuild捆绑。经过测试,与WebPack,Vite和React等所有捆绑包一起工作。
您可以在dist文件夹中找到所有MQTT捆绑版本:
mqtt.js iife格式,未更新mqtt.min.js iife格式,缩小mqtt.esm.js ESM格式缩小从MQTT.JS> 5.2.0开始,您可以在代码中导入MQTT:
import mqtt from 'mqtt'这将由您的捆绑器自动处理。
否则,您可以选择使用特定的捆绑包:
import * as mqtt from 'mqtt/dist/mqtt'
import * as mqtt from 'mqtt/dist/mqtt.min'
import mqtt from 'mqtt/dist/mqtt.esm'MQTT.JS捆绑包可通过http://unpkg.com获得,特别是在https://unpkg.com/mqtt/dist/mqtt.min.js上获得。有关版本范围的完整文档,请参见http://unpkg.com。
这是QoS的工作方式:
关于数据消耗,显然是QoS 2> QoS 1> QoS 0,如果您关注的话。
从V5开始,此项目以打字稿编写,类型定义包含在软件包中。
例子:
import { connect } from "mqtt"
const client = connect ( 'mqtt://test.mosquitto.org' ) 支持微信迷你计划。使用wxs协议。请参阅微信文档。
import 'abortcontroller-polyfill/dist/abortcontroller-polyfill-only' // import before mqtt.
import 'esbuild-plugin-polyfill-node/polyfills/navigator'
const mqtt = require ( "mqtt" ) ;
const client = mqtt . connect ( "wxs://test.mosquitto.org" , {
timerVariant : 'native' // more info ref issue: #1797
} ) ;支持Ali Mini计划。使用alis协议。请参阅“支付宝”文档。
const mqtt = require ( "mqtt" ) ;
const client = mqtt . connect ( "alis://test.mosquitto.org" ) ; MQTT.JS是一个开放的开源项目。这意味着:
做出巨大贡献的个人可以使该项目获得合适的贡献。该项目更像是一个开放的Wiki,而不是标准的护卫开源项目。
有关更多详细信息,请参见贡献。
仅由于以下贡献者的出色工作,MQTT.JS才有可能:
| 姓名 | github | 叽叽喳喳 |
|---|---|---|
| 亚当·陆克文(Adam Rudd) | github/adamvr | Twitter/@adam_vr |
| Matteo Collina | Github/Mcollina | Twitter/@matteocollina |
| Maxime Agor | Github/4rzael | Twitter/@4rzael |
| Siarhei Buntsevich | Github/Scarry1992 | |
| 丹尼尔·兰多(Daniel Lando) | Github/Robertslando |
如果您想支持MQTT.JS,请考虑赞助作者和主动维护者:
麻省理工学院