学习笔记-mqtt
mqtt简介
-
规范原则
-
服务质量
- QoS0
- 至多一次
- QoS1
- 至少一次
- QoS2
- 只有一次
- QoS0
-
客户端
- 发布
- 订阅
-
服务端
- 消息代理Broker
-
发布订阅
- /
- 分隔符,分割层级
-
- 主题过滤器,通配一个层级
-
- 主题过滤器,通配多个层级
- /
mqtt协议
-
mqtt协议
- 固定报头fixed header
- 可变报文头variable header
- 报文体payload
-
协议
- 首字节(byte1)
- bit[4-7]
- mqtt报文类型
- connect-0001
- connack-0010
- publish-0011
- puback-0100
- pubrec-0101
- …
- mqtt报文类型
- bit[0-3]
- 标志位(只有少部分类型使用标志位)
- bit[3]-DUP
- 1重复
- bit[2]bit[1]-qos
- 00-至多一次
- 01-至少一次
- 10-只有一次
- 11-非法消息
- bit[0]
- 1保留消息
- bit[4-7]
- 剩余字节(byte2-)
- 首字节(byte1)
EMQX
- mqtt服务器
- curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash
- sudo apt-get install emqx
- sudo emqx start
- Dashboard
- http://localhost:18083/
- admin/public
认证
-
内置数据源
- usrename
- cilent id
-
外部数据库
- mysql
- ldap
- redis
- mongodb
- postgreSql
-
其他
- http
- jwt
-
匿名认证
- low_anonymous = true
- 默认开启
- low_anonymous = true
-
认证链
- 同时开启多个认证,根据认证插件启动顺序进行链式认证
username认证
-
管理-插件-emqx_auth_username
-
emqx/etc/plugins/emqx_auth_username.conf
- auth.user.?.username
- auth.user.?password
-
api
- get api/v4/auth_username
- 查询所有用户
- get api/v4/auth_username/${username}
- 查询具体用户
- post api/v4/auth_username
- 新增
- {“username”:“user”,“password”:“123456”}
- put api/v4/auth_username/${username}
- 更新密码
- {“password”:“123456”}
- delete api/v4/auth_username/${username}
- 删除用户
- get api/v4/auth_username
-
api/v4/auth_username请求头
GET http://hostname:port/api/v4/auth_username HTTP/1.1
Content-Type: application/json
Authorizarion: Basic {{username}}:{{password}}
clientId认证
-
管理-插件-emqx_auth_clientId
-
emqx/etc/plugins/emqx_auth_clientid.conf
- auth.client.?.clientid
- auth.client.?.password
-
api
- get api/v4/auth_clientid
- 查询所有
- get api/v4/auth_clientid/${clientid}
- 查询指定
- post api/v4/auth_clientid
- 新增
- {“clientid”:“xxx”,“password”:“123456”}
- put api/v4/auth_clientid/${clientid}
- 更新密码
- {“password”:“123456”}
- delete api/v4/auth_clientid/${clientid}
- 删除
- get api/v4/auth_clientid
-
api/v4/auth_clientid请求头
POST http://hostname:port/api/v4/auth_clientid HTTP/1.1
Content-Type: application/json
Authorizarion: Basic {{username}}:{{password}}
http认证
- 管理-插件-emqx_auth_http
- emqx/etc/plugins/emqx_auth_http.conf
- auth.http.auth_req
- 请求认证地址(需要自己实现)
- auth.http.auth_req.method
- auth.http.auth_req.params
- 重试设置
- auth.http.request.retry_times
- auth.http.request.retry_interval
- auth.http.request.retry_backoff
- auth.http.auth_req
基于springboot的认证服务
- AuthController
@RestController
@RequestMapping("/mqtt")
public class AuthController{@RequestMapping("/auth")public ResponseEntity auth(String clientid,String username,String password){// 查询数据库// 首先通过username查询数据String pass = xxxx.xx(username);if(StringUtils.isEmpty(pass)){return new ResponseEntity(HttpStatus.UNAUTHORIZED);}if(!pass.equals(password)){return new ResponseEntity(HttpStatus.UNAUTHORIZED);}return new ResponseEntity(HttpStatus.OK);}
}
sdk客户端
Paho
- 依赖
<depenency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version>
</depenency>
- application.yml
mqtt:broker-url: tcp://xxx:xxclient-id: xxusername: xxpassword: xx
- MqttProperties
@Configuration
@ConfigurationProperties(prefix="mqtt")
@Data
public class MqttProperties{private String brokerUrl;private String clientId;private String username;private String password;
}
- EmqColent
@Component
public class EmqClient{private IMqttClient mqttClient;@AutoWiredprivate MqttProperties mqttProperties;@AutoWiredprivate MqttCallback mqttCallback;@PostConstructpublic void init(){MqttClientPersistence mempersitence = new MemoryPersistence();try{mqttClient = new MqttClient(mqttProperties.getBorkerUrl(),mqttProperties.getClientId,mempersitence); }catch(MqttException ex){}}// 连接brokerpublic void connect(String username,String password){MqttConnectOptions options = new MqttConnectOptions();options.setAutomaticReconnect(true);options.setUserName(username);options.setPassword(password);options.setCleanSession(true);mqttClient.setCallback(mqttCallback);try{mqttClient.connect(options);}catch(MqttException ex){}}// 断开连接@PreDestroypublic void disconnect(){try{mqttClient.disconnect();}catch(MqttException ex){}}// 重连public void reConnect(){try{mqttClient.reconnect();}catch(MqttException ex){}}// 发布public void publish(String topic,String msg,QosEnum qos,boolean retain){MqttMessage mqttMessage = new MqttMessage();mqttMessage.setPayload(msg.getBytes());mqttMessage.setQos(qos.value());mqttMessage.setRetained(retain);try{mqttClient.publish(topic,mqttMessage);}catch(MqttException ex){}}// 订阅public void subscribe(String topicFilter,QosEnum qos){try{mqttClient.subscribe(topicFilter,qos.value());}catch(MqttException ex){}}// 取消订阅public void unSubscribe(String topicFilter){try{mqttClient.unsubscribe(topicFilter);}catch(MqttException ex){}}
}
- QosEnum
public enum QosEnum{Qos0(0),Qos1(1),Qos2(2);private final int value;QosEnum(int value){this.value = value;}public int value(){return this.value;}}
- MessageCallback
public class MessageCallback implements MqttCallback{@Override// 丢失连接后触发回调public void connectionLost(Throwable cause){}@Override// 应用收到消息后触发回调public void messageArrived(String topic,MqttMessage message)throws Exception{}@Override// 消息发布完成触发回调public void deliveryComplete(IMqttDeliveryToken token){}
}
- EmqApplication
@SpringbootApplication
public class EmqApplication{public static void main(String[] args){SpringApplication.run(EmqApplication.class,agrs);}@Autowiredprivate EmqClient emqClient;@Autowiredprivate MqttProperties properties;@PostConstructpublic void init(){emqClient.connect(properties.getUsername,properties.getPassword);emqClient.subscribe("xxx/#",QosEnum.Qos2);// 开启线程,每五秒发布一次new Thread(()->{while(ture){emqClient.public("xxx/123","public msg :" + LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME),QosEnum.Qos2,false);TimeUnit.SECONDS.sleep(5);}}).start();}}
mqttjs
- https://unpkg.com/mqtt/dist/mqtt.min.js
$(function(){// 定义连接const options = {clean:true,//不保留会话connectTimeout:4000,//超时时间clientId:'xxx',username:'xx',password:'xxx'}const connectUrl = 'ws://xxx:8083/mqttconst client = mqtt.connect(connectUrl,options);client.on('reconnect',()=>{// 重连})client.on('close',()=>{// 关闭})client.on('disconnect',(packet)=>{// 接收断开连接的数据包})client.on('offline',()=>{// 客户端下线})client.on('error',(error)=>{// 客户端错误})client.on('packetsend',(packet)=>{// 客户端任何消息发出})client.on('packetreceive',(packet)=>{// 客户端任何消息接收})client.on('connect',(connack)=>{// 客户端成功连接// 订阅client.subscribe('xxx/#',{qos:2})// 每隔2秒发布一次setInterval(publish,2000)})function publish(){// 发布数据const message = 'h5 message '+ Math.random()+new Date()client.publish('xxx/123',message,{qos:2})}client.on('message',(topic,message,packet)=>{// 收到发布的消息事件})})
日志输出
-
emqx.conf
- log.to
- off
- file
- console
- both(默认)
- log.level
- warning(默认)
- log.dir
- log(默认)
- log.file
- emqx.log(默认)
- log.rotation.size
- 10MB(默认)
- log.info.flie
- 将info以上级别写入
- log.error.file
- 将error日志单独输入
- log.chars_limit
- 8192
- 单个日志最大字符
- 日志格式
- date time level client_info module_info msg
- log.to
-
log handler
- 负责日志处理和输出工作进程
- emqx_ctl log handlers list
- 查询安装的log handler
-
修改日志级别
- 全局
- emqx_ctl log set-level debug
- 主日志
- emqx_ctl log primary-level debug
- 单个log handler
- emqx_ctl log hanlders set-level file debug
- id=file的修改为debug
- emqx_ctl log hanlders set-level file debug
- 全局
日志追踪
- 将primary log level 设置为debug
- emqx_ctl log primary-level debug
- 开启cilentid日志追踪
- emqx_ctl trace start client emq-demo log/emq-demo.log
- 将clientid=emq-demo的日志输入到emq-demo.log
- emqx_ctl trace start client emq-demo log/emq-demo.log
- 开启topic日志追踪
- emqx_ctl trace start topic ‘xxx/#’ log/topic_xx.log
- 将topic=xxx的日志输入到topic_xx.log
- emqx_ctl trace start topic ‘xxx/#’ log/topic_xx.log
ACL
-
发布订阅权限管理
- 内置ACL
- 数据库ACL
- MYSQL
- PostgreSQL
- Redis
- MongoDB
- HTTP ACL
-
超级用户不受acl管理
-
ACL 规则
- ‘allow/deny’ ‘who’ ‘subscribe/publish’ ‘topics’
-
etc/emqx.conf
- acl_nomatch(如果acl规则没有匹配上)
- allow(允许)
- deny(不允许)
- acl_file(acl文件)
- etc/acl.conf
- acl_deny_action(acl为禁止的响应)
- ignore(忽略)/disconnect(断开)
- enable_acl_cache
- on (开启acl缓存)
- acl_cache_max_size
= 32(单个客户端最大缓出规则数) - acl_cache_ttl
= 1m(默认1分钟,缓存失效)
- acl_nomatch(如果acl规则没有匹配上)
-
清除acl缓存
- GET /api/v4/clients/{clientid}/acl_cache
- 查询缓存
- Authorization: Basic username:password
- delet /api/v4/clients/{clientid}/acl_cache
- 删除缓存
- Authorization: Basic username:password
- GET /api/v4/clients/{clientid}/acl_cache
-
ACL鉴权链
- 插件开启先后顺序
内置ACL
- etc/acl.conf
http ACL
- 插件
- emqx_auth_http
- etc/plugins/emqx_auth_http.conf
- 超级用户认证
- auth.http.super_req
- auth.http.super_req.method
- auth.http.super_req.params
- 其他用户认证
- auth.http.acl_req
- auth.http.acl_req.method
- auth.http.acl_req.params
- 超级用户认证
springboot 代码
@PostMapping("/superuser")
public ResponseEntity superuser(String clientid,String username){// 查询当前用户是否时超级用户if(clientid.contains("admin")|| username.contains("admin")){return new ResponseEntity(HttpStatus.OK);}else{return new ResponseEntity(HttpStatus.UNAUTHORIZED);}
}@PostMapping("/acl")
// access 1 sub订阅 2 pub发布
public ResponseEntity acl(int access,String clientid,String username,String ipaddr,String topic,String mountpoint){// 查询当前用户是否有topic的操作权限if(username.equals("xxx")&& topic.equals("xxx/#")&access == 1){return new ResponseEntity(HttpStatus.OK);}if(clientid.equals("xxx")&& topic.equals("xxx/#")&access == 2){return new ResponseEntity(HttpStatus.OK);}return new ResponseEntity(HttpStatus.UNAUTHORIZED);
}
webhook
- 插件
- emqx_web_hook
- 将钩子事件通知到某个web服务
- /etc/plugins/emqx_web_hook.conf
- web.hook.api.url
- web.hook.encode_payload
- 触发规则
- web.hook.rule.. =
- emqx_web_hook
客户端断开连接事件
// web.hook.rule.client.connected.1={"action":"on_client_connected"}
// web.hook.rule.client.disconnected.1={"action":"on_client_disconnected"}@RestController
@RequestMapping("/mqtt")
public class WebHookController{private Map<String,boolean> clientStatus = new HashMap<>;@PostMapping("/webhook")public void hook(@RequestBody Map<String,Object> params){// 获取事件名称String action = (String)params.get("action");String clientId = (String)params.get("clientid");if(action.equals("client_connected")){// 客户端接入clientStatus.put(clientId,true);}if(action.equals("client_disconnected")){// 客户端下线clientStatus.put(clientId,false);}}@GetMapping("/status")public Map getStatus(){return clientStatus;}
}
集群
-
node
- 每一个emxq
- 唯一的节点名称
- etc/emxq.cnf
- node.name
- node.cookie
- 订阅表
- 只存在与订阅者所在节点
- 主题-订阅者映射
- 路由表
- 主题-节点映射
- 主题树
- 根据主题树匹配路由,再根据路由查询路由表
-
cluster集群
- etc/emqx.cnf
- cluster.name
- cluster.proto_dist
- cluster.discovery
- 发现策列
- etc/emqx.cnf
-
集群发现策列
- manual-手动
- emqx_ctl cluster join xxx
- emqx_ctl cluster leave
- 主动退出
- emqx_ctl cluster force-leave xxx
- 强制退出
- static-静态
- cluster.static.seeds
- 设置集群节点名称列表
- mcast
- dns
- etcd
- k8s
- manual-手动
监控api
- etc/plugins/emqx_management.conf
- 默认用户
- management.default_application.id
- management.default_application.secret
- 管理-应用-新增用户
- 工具-HTTP接口-接口列表
- 默认用户
保留消息
- 每个topic只有一个保留消息
- etc/emqx.conf
- mqtt.retain_available=true
- 开启
- mqtt.retain_available=true
- etc/plugins/emqx_retainer.conf
- retainer.storage_type
- retainer.max_retained_messages
- retainer.max_payload_size
- retainer.expiry_interval
共享订阅
- 多个订阅者负载均衡
- 带群组共享订阅
- $share/
- 不带群组共享订阅
- 默认都在一个组
- $queue/topic
- 负载均衡策列
- etc/emqx.conf
- broker.shared_subscription_strategy
- etc/emqx.conf
- 是否需要ack
- 保证通讯质量,一个客户端掉线后发给其他同组客户端
- broker.shared_dispatch_ack_enabled
延迟发布
- $delayed/{DelayInterval}/topic
- 管理-插件-emqx_delayed_publish
代理订阅
-
客户端自动订阅
-
etc/emqx.conf
- module.subscription
- 默认关闭
- 配置规则
- module.subscription..topic=
- module.subscription/.qos=
- module.subscription
-
webhook&httpapi实现动态代理订阅
- 客户端上线自动订阅,下线取消订阅
// web.hook.rule.client.connected.1={"action":"on_client_connected"}
// web.hook.rule.client.disconnected.1={"action":"on_client_disconnected"}@RestController
@RequestMapping("/mqtt")
public class WebHookController{private Map<String,boolean> clientStatus = new HashMap<>;@PostMapping("/webhook")public void hook(@RequestBody Map<String,Object> params){// 获取事件名称String action = (String)params.get("action");String clientId = (String)params.get("clientid");if(action.equals("client_connected")){// 客户端接入clientStatus.put(clientId,true);autoSub(clientId,"/xx/#",QosEnum.Qos2,true);}if(action.equals("client_disconnected")){// 客户端下线clientStatus.put(clientId,false);autoSub(clientId,"/xx/#",QosEnum.Qos2,false);}}@GetMapping("/status")public Map getStatus(){return clientStatus;}// 自动订阅和取消public void autoSub(String clientId,String topicFilter,QosEnum qos,boolean sub){RestTemplate restTemplate = new RestTemplateBuilder().basicAuthentican("admin","public").defaultHeader(HttpHeaders.CONTENT_TYPE,MediaType.APPLICATION_JSON_VALUE).build();Map<String,Object> params = new HashMap<>();params.put("clientid",clientId);params.put("topic",topicFilter);params.put("qos",qos.value());HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);HttpEntity entity = new HttpEntity(params,headers);if(sub){// 订阅new Thread(()->{ResponseEntit<String> responseEntity = restTemplate.postForEntity("xx/api/v4/mqtt/subscribe",entity,String.class);}).start();return;}// 取消ResponseEntit<String> responseEntity = restTemplate.postForEntity("xx/api/v4/mqtt/unsubscribe",entity,String.class);}
}
主题重写
- etc/emqx.conf
- module.rewrite
- 重写规则
- module.rewrite.rule. = 主题过滤器 正则表达式 目标表达式
- 倒叙匹配 匹配到第一个就结束
黑名单
-
少量客户端时使用
-
etc/emqx.conf
- zone.external.enable_flapping_detect = on
- 需要开启
- flapping_detect_policy = 离线次数, 检查时间, 封禁时间
- 短时间多次上下线,客户端就会封禁
- zone.external.enable_flapping_detect = on
-
api
- get /api/v4/banned
- 获取黑名单
- post /api/v4/banned
- {who,as}
- 添加客户端
- get /api/v4/banned
速率限制
- etc/emqx.conf
- listener.tcp.external.max_conn_rate
- zone.external_publish_limit
- listener.tcp.external.rate_limit
飞行窗口&消息队列
-
飞行窗口
- 存储已发布未确认消息
-
消息队列
- 飞行窗口满后存入消息队列
-
max_infight
-
max_mqueue_len
-
mqueue_store_qos0
消息重传
- etc/emqx.conf
- retry_interval
- 消息重传等待间隔
- retry_interval
规则引擎
-
emqx消息与事件等响应规则
- 消息发布
- 事件触发
- $events/client.connected
-
应用场景
- 动作监听
- 数据筛选
- 消息路由
- 消息编解码
-
规则引擎组成
- 规则:{sql语句,动作列表:[{动作,动作参数,绑定资源:{资源配置}}]}
-
sql语句
- select <字段名> from <主题> [where <条件>]
- foreach <字段名> [do <条件>] [incase <条件>] from <主题> [where <条件>]
系统调优
# 所有进程最大文件数
sysctl -w fs.file-max=2097152
# 单个进程可分割最大文件数
sysctl -w fs.nr_open=2097152
# 允许当前会话/进程打开文件句柄数
ulimit -n 1048576 # backlog
sysctl -w net.core.somaxconn=32768
sysctl -w net.ipv4.tcp_max_syn_backlog=16384
sysctl -w net.core.netdev_max_backlog=16384