> 文章列表 > 学习笔记-mqtt

学习笔记-mqtt

学习笔记-mqtt

mqtt简介

  • 规范原则

    • 精简
    • 发布/订阅模式
    • 动态创建主题
    • 传输量低,传输率高
    • 考虑低带宽、高延迟、不稳定的网络
    • 支持连续的会话保持合控制(心跳)
    • 客户端计算能力可以低
    • 提供服务质量管理
    • 不强求传输数据格式类型
  • 服务质量

    • QoS0
      • 至多一次
    • QoS1
      • 至少一次
    • QoS2
      • 只有一次
  • 客户端

    • 发布
    • 订阅
  • 服务端

  • 发布订阅

    • /
      • 分隔符,分割层级
      • 主题过滤器,通配一个层级
      • 主题过滤器,通配多个层级

mqtt协议

  • mqtt协议

    • 固定报头fixed header
    • 可变报文头variable header
    • 报文体payload
  • 协议

    • 首字节(byte1)
      • bit[4-7]
        • mqtt报文类型
          • connect-0001
          • connack-0010
          • publish-0011
          • puback-0100
          • pubrec-0101
      • bit[0-3]
        • 标志位(只有少部分类型使用标志位)
        • bit[3]-DUP
          • 1重复
        • bit[2]bit[1]-qos
          • 00-至多一次
          • 01-至少一次
          • 10-只有一次
          • 11-非法消息
        • bit[0]
          • 1保留消息
    • 剩余字节(byte2-)

EMQX

  • mqtt服务器
    • curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash
    • sudo apt-get install emqx
    • sudo emqx start
    • Dashboard
      • http://localhost:18083/
      • admin/public

认证

  • 内置数据源

    • usrename
    • cilent id
  • 外部数据库

    • mysql
    • ldap
    • redis
    • mongodb
    • postgreSql
  • 其他

    • http
    • jwt
  • 匿名认证

    • low_anonymous = true
      • 默认开启
  • 认证链

    • 同时开启多个认证,根据认证插件启动顺序进行链式认证

username认证

  • 管理-插件-emqx_auth_username

  • emqx/etc/plugins/emqx_auth_username.conf

    • auth.user.?.username
    • auth.user.?password
  • api

    • get api/v4/auth_username
      • 查询所有用户
    • get api/v4/auth_username/${username}
      • 查询具体用户
    • post api/v4/auth_username
      • 新增
      • {“username”:“user”,“password”:“123456”}
    • put api/v4/auth_username/${username}
      • 更新密码
      • {“password”:“123456”}
    • delete api/v4/auth_username/${username}
      • 删除用户
  • api/v4/auth_username请求头

GET http://hostname:port/api/v4/auth_username HTTP/1.1
Content-Type: application/json
Authorizarion: Basic {{username}}:{{password}}

clientId认证

  • 管理-插件-emqx_auth_clientId

  • emqx/etc/plugins/emqx_auth_clientid.conf

    • auth.client.?.clientid
    • auth.client.?.password
  • api

    • get api/v4/auth_clientid
      • 查询所有
    • get api/v4/auth_clientid/${clientid}
      • 查询指定
    • post api/v4/auth_clientid
      • 新增
      • {“clientid”:“xxx”,“password”:“123456”}
    • put api/v4/auth_clientid/${clientid}
      • 更新密码
      • {“password”:“123456”}
    • delete api/v4/auth_clientid/${clientid}
      • 删除
  • api/v4/auth_clientid请求头

POST http://hostname:port/api/v4/auth_clientid HTTP/1.1
Content-Type: application/json
Authorizarion: Basic {{username}}:{{password}}

http认证

  • 管理-插件-emqx_auth_http
  • emqx/etc/plugins/emqx_auth_http.conf
    • auth.http.auth_req
      • 请求认证地址(需要自己实现)
    • auth.http.auth_req.method
    • auth.http.auth_req.params
    • 重试设置
      • auth.http.request.retry_times
      • auth.http.request.retry_interval
      • auth.http.request.retry_backoff
基于springboot的认证服务
  • AuthController
@RestController
@RequestMapping("/mqtt")
public class AuthController{@RequestMapping("/auth")public ResponseEntity auth(String clientid,String username,String password){// 查询数据库// 首先通过username查询数据String pass = xxxx.xx(username);if(StringUtils.isEmpty(pass)){return new ResponseEntity(HttpStatus.UNAUTHORIZED);}if(!pass.equals(password)){return new ResponseEntity(HttpStatus.UNAUTHORIZED);}return new ResponseEntity(HttpStatus.OK);}
}

sdk客户端

Paho

  • 依赖
<depenency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version>
</depenency>
  • application.yml
mqtt:broker-url: tcp://xxx:xxclient-id: xxusername: xxpassword: xx
  • MqttProperties

@Configuration
@ConfigurationProperties(prefix="mqtt")
@Data
public class MqttProperties{private String brokerUrl;private String clientId;private String username;private String password;
}
  • EmqColent
@Component
public class EmqClient{private IMqttClient mqttClient;@AutoWiredprivate MqttProperties mqttProperties;@AutoWiredprivate MqttCallback mqttCallback;@PostConstructpublic void init(){MqttClientPersistence mempersitence = new MemoryPersistence();try{mqttClient = new MqttClient(mqttProperties.getBorkerUrl(),mqttProperties.getClientId,mempersitence);    }catch(MqttException ex){}}// 连接brokerpublic void connect(String username,String password){MqttConnectOptions options = new MqttConnectOptions();options.setAutomaticReconnect(true);options.setUserName(username);options.setPassword(password);options.setCleanSession(true);mqttClient.setCallback(mqttCallback);try{mqttClient.connect(options);}catch(MqttException ex){}}// 断开连接@PreDestroypublic void disconnect(){try{mqttClient.disconnect();}catch(MqttException ex){}}// 重连public void reConnect(){try{mqttClient.reconnect();}catch(MqttException ex){}}// 发布public void publish(String topic,String msg,QosEnum qos,boolean retain){MqttMessage mqttMessage = new MqttMessage();mqttMessage.setPayload(msg.getBytes());mqttMessage.setQos(qos.value());mqttMessage.setRetained(retain);try{mqttClient.publish(topic,mqttMessage);}catch(MqttException ex){}}// 订阅public void subscribe(String topicFilter,QosEnum qos){try{mqttClient.subscribe(topicFilter,qos.value());}catch(MqttException ex){}}// 取消订阅public void unSubscribe(String topicFilter){try{mqttClient.unsubscribe(topicFilter);}catch(MqttException ex){}}
}
  • QosEnum
public enum QosEnum{Qos0(0),Qos1(1),Qos2(2);private final int value;QosEnum(int value){this.value = value;}public int value(){return this.value;}}
  • MessageCallback
public class MessageCallback implements MqttCallback{@Override// 丢失连接后触发回调public void connectionLost(Throwable cause){}@Override// 应用收到消息后触发回调public void messageArrived(String topic,MqttMessage message)throws Exception{}@Override// 消息发布完成触发回调public void deliveryComplete(IMqttDeliveryToken token){}
}
  • EmqApplication
@SpringbootApplication
public class EmqApplication{public static void main(String[] args){SpringApplication.run(EmqApplication.class,agrs);}@Autowiredprivate EmqClient emqClient;@Autowiredprivate MqttProperties properties;@PostConstructpublic void init(){emqClient.connect(properties.getUsername,properties.getPassword);emqClient.subscribe("xxx/#",QosEnum.Qos2);// 开启线程,每五秒发布一次new Thread(()->{while(ture){emqClient.public("xxx/123","public msg :" + LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME),QosEnum.Qos2,false);TimeUnit.SECONDS.sleep(5);}}).start();}}

mqttjs

  • https://unpkg.com/mqtt/dist/mqtt.min.js
$(function(){// 定义连接const options = {clean:true,//不保留会话connectTimeout:4000,//超时时间clientId:'xxx',username:'xx',password:'xxx'}const connectUrl = 'ws://xxx:8083/mqttconst client = mqtt.connect(connectUrl,options);client.on('reconnect',()=>{// 重连})client.on('close',()=>{// 关闭})client.on('disconnect',(packet)=>{// 接收断开连接的数据包})client.on('offline',()=>{// 客户端下线})client.on('error',(error)=>{// 客户端错误})client.on('packetsend',(packet)=>{// 客户端任何消息发出})client.on('packetreceive',(packet)=>{// 客户端任何消息接收})client.on('connect',(connack)=>{// 客户端成功连接// 订阅client.subscribe('xxx/#',{qos:2})// 每隔2秒发布一次setInterval(publish,2000)})function publish(){// 发布数据const message = 'h5 message '+ Math.random()+new Date()client.publish('xxx/123',message,{qos:2})}client.on('message',(topic,message,packet)=>{// 收到发布的消息事件})})

日志输出

  • emqx.conf

    • log.to
      • off
      • file
      • console
      • both(默认)
    • log.level
      • warning(默认)
    • log.dir
      • log(默认)
    • log.file
      • emqx.log(默认)
    • log.rotation.size
      • 10MB(默认)
    • log.info.flie
      • 将info以上级别写入
    • log.error.file
      • 将error日志单独输入
    • log.chars_limit
      • 8192
      • 单个日志最大字符
    • 日志格式
      • date time level client_info module_info msg
  • log handler

    • 负责日志处理和输出工作进程
    • emqx_ctl log handlers list
      • 查询安装的log handler
  • 修改日志级别

    • 全局
      • emqx_ctl log set-level debug
    • 主日志
      • emqx_ctl log primary-level debug
    • 单个log handler
      • emqx_ctl log hanlders set-level file debug
        • id=file的修改为debug

日志追踪

  • 将primary log level 设置为debug
    • emqx_ctl log primary-level debug
  • 开启cilentid日志追踪
    • emqx_ctl trace start client emq-demo log/emq-demo.log
      • 将clientid=emq-demo的日志输入到emq-demo.log
  • 开启topic日志追踪
    • emqx_ctl trace start topic ‘xxx/#’ log/topic_xx.log
      • 将topic=xxx的日志输入到topic_xx.log

ACL

  • 发布订阅权限管理

    • 内置ACL
    • 数据库ACL
      • MYSQL
      • PostgreSQL
      • Redis
      • MongoDB
    • HTTP ACL
  • 超级用户不受acl管理

  • ACL 规则

    • ‘allow/deny’ ‘who’ ‘subscribe/publish’ ‘topics’
  • etc/emqx.conf

    • acl_nomatch(如果acl规则没有匹配上)
      • allow(允许)
      • deny(不允许)
    • acl_file(acl文件)
      • etc/acl.conf
    • acl_deny_action(acl为禁止的响应)
      • ignore(忽略)/disconnect(断开)
    • enable_acl_cache
      • on (开启acl缓存)
    • acl_cache_max_size
      = 32(单个客户端最大缓出规则数)
    • acl_cache_ttl
      = 1m(默认1分钟,缓存失效)
  • 清除acl缓存

    • GET /api/v4/clients/{clientid}/acl_cache
      • 查询缓存
      • Authorization: Basic username:password
    • delet /api/v4/clients/{clientid}/acl_cache
      • 删除缓存
      • Authorization: Basic username:password
  • ACL鉴权链

    • 插件开启先后顺序

内置ACL

  • etc/acl.conf

http ACL

  • 插件
    • emqx_auth_http
    • etc/plugins/emqx_auth_http.conf
      • 超级用户认证
        • auth.http.super_req
        • auth.http.super_req.method
        • auth.http.super_req.params
      • 其他用户认证
        • auth.http.acl_req
        • auth.http.acl_req.method
        • auth.http.acl_req.params

springboot 代码

@PostMapping("/superuser")
public ResponseEntity superuser(String clientid,String username){// 查询当前用户是否时超级用户if(clientid.contains("admin")|| username.contains("admin")){return new ResponseEntity(HttpStatus.OK);}else{return new ResponseEntity(HttpStatus.UNAUTHORIZED);}
}@PostMapping("/acl")
// access 1 sub订阅 2 pub发布
public ResponseEntity acl(int access,String clientid,String username,String ipaddr,String topic,String mountpoint){// 查询当前用户是否有topic的操作权限if(username.equals("xxx")&& topic.equals("xxx/#")&access == 1){return new ResponseEntity(HttpStatus.OK);}if(clientid.equals("xxx")&& topic.equals("xxx/#")&access == 2){return new ResponseEntity(HttpStatus.OK);}return new ResponseEntity(HttpStatus.UNAUTHORIZED);
}

webhook

  • 插件
    • emqx_web_hook
      • 将钩子事件通知到某个web服务
      • /etc/plugins/emqx_web_hook.conf
        • web.hook.api.url
        • web.hook.encode_payload
      • 触发规则
        • web.hook.rule.. =

客户端断开连接事件

// web.hook.rule.client.connected.1={"action":"on_client_connected"}
// web.hook.rule.client.disconnected.1={"action":"on_client_disconnected"}@RestController
@RequestMapping("/mqtt")
public class WebHookController{private Map<String,boolean> clientStatus = new HashMap<>;@PostMapping("/webhook")public void hook(@RequestBody Map<String,Object> params){// 获取事件名称String action = (String)params.get("action");String clientId = (String)params.get("clientid");if(action.equals("client_connected")){// 客户端接入clientStatus.put(clientId,true);}if(action.equals("client_disconnected")){// 客户端下线clientStatus.put(clientId,false);}}@GetMapping("/status")public Map getStatus(){return clientStatus;}
}

集群

  • node

    • 每一个emxq
    • 唯一的节点名称
    • etc/emxq.cnf
      • node.name
      • node.cookie
    • 订阅表
      • 只存在与订阅者所在节点
      • 主题-订阅者映射
    • 路由表
      • 主题-节点映射
    • 主题树
      • 根据主题树匹配路由,再根据路由查询路由表
  • cluster集群

    • etc/emqx.cnf
      • cluster.name
      • cluster.proto_dist
      • cluster.discovery
        • 发现策列
  • 集群发现策列

    • manual-手动
      • emqx_ctl cluster join xxx
      • emqx_ctl cluster leave
        • 主动退出
      • emqx_ctl cluster force-leave xxx
        • 强制退出
    • static-静态
      • cluster.static.seeds
      • 设置集群节点名称列表
    • mcast
    • dns
    • etcd
    • k8s

监控api

  • etc/plugins/emqx_management.conf
    • 默认用户
      • management.default_application.id
      • management.default_application.secret
    • 管理-应用-新增用户
    • 工具-HTTP接口-接口列表

保留消息

  • 每个topic只有一个保留消息
  • etc/emqx.conf
    • mqtt.retain_available=true
      • 开启
  • etc/plugins/emqx_retainer.conf
    • retainer.storage_type
    • retainer.max_retained_messages
    • retainer.max_payload_size
    • retainer.expiry_interval

共享订阅

  • 多个订阅者负载均衡
  • 带群组共享订阅
    • $share/
  • 不带群组共享订阅
    • 默认都在一个组
    • $queue/topic
  • 负载均衡策列
    • etc/emqx.conf
      • broker.shared_subscription_strategy
  • 是否需要ack
    • 保证通讯质量,一个客户端掉线后发给其他同组客户端
    • broker.shared_dispatch_ack_enabled

延迟发布

  • $delayed/{DelayInterval}/topic
  • 管理-插件-emqx_delayed_publish

代理订阅

  • 客户端自动订阅

  • etc/emqx.conf

    • module.subscription
      • 默认关闭
    • 配置规则
      • module.subscription..topic=
      • module.subscription/.qos=
  • webhook&httpapi实现动态代理订阅

    • 客户端上线自动订阅,下线取消订阅
// web.hook.rule.client.connected.1={"action":"on_client_connected"}
// web.hook.rule.client.disconnected.1={"action":"on_client_disconnected"}@RestController
@RequestMapping("/mqtt")
public class WebHookController{private Map<String,boolean> clientStatus = new HashMap<>;@PostMapping("/webhook")public void hook(@RequestBody Map<String,Object> params){// 获取事件名称String action = (String)params.get("action");String clientId = (String)params.get("clientid");if(action.equals("client_connected")){// 客户端接入clientStatus.put(clientId,true);autoSub(clientId,"/xx/#",QosEnum.Qos2,true);}if(action.equals("client_disconnected")){// 客户端下线clientStatus.put(clientId,false);autoSub(clientId,"/xx/#",QosEnum.Qos2,false);}}@GetMapping("/status")public Map getStatus(){return clientStatus;}// 自动订阅和取消public void autoSub(String clientId,String topicFilter,QosEnum qos,boolean sub){RestTemplate restTemplate = new RestTemplateBuilder().basicAuthentican("admin","public").defaultHeader(HttpHeaders.CONTENT_TYPE,MediaType.APPLICATION_JSON_VALUE).build();Map<String,Object> params = new HashMap<>();params.put("clientid",clientId);params.put("topic",topicFilter);params.put("qos",qos.value());HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);HttpEntity entity = new HttpEntity(params,headers);if(sub){// 订阅new Thread(()->{ResponseEntit<String> responseEntity = restTemplate.postForEntity("xx/api/v4/mqtt/subscribe",entity,String.class);}).start();return;}// 取消ResponseEntit<String> responseEntity = restTemplate.postForEntity("xx/api/v4/mqtt/unsubscribe",entity,String.class);}
}

主题重写

  • etc/emqx.conf
    • module.rewrite
    • 重写规则
      • module.rewrite.rule. = 主题过滤器 正则表达式 目标表达式
      • 倒叙匹配 匹配到第一个就结束

黑名单

  • 少量客户端时使用

  • etc/emqx.conf

    • zone.external.enable_flapping_detect = on
      • 需要开启
    • flapping_detect_policy = 离线次数, 检查时间, 封禁时间
      • 短时间多次上下线,客户端就会封禁
  • api

    • get /api/v4/banned
      • 获取黑名单
    • post /api/v4/banned
      • {who,as}
      • 添加客户端

速率限制

  • etc/emqx.conf
    • listener.tcp.external.max_conn_rate
    • zone.external_publish_limit
    • listener.tcp.external.rate_limit

飞行窗口&消息队列

  • 飞行窗口

    • 存储已发布未确认消息
  • 消息队列

    • 飞行窗口满后存入消息队列
  • max_infight

  • max_mqueue_len

  • mqueue_store_qos0

消息重传

  • etc/emqx.conf
    • retry_interval
      • 消息重传等待间隔

规则引擎

  • emqx消息与事件等响应规则

    • 消息发布
    • 事件触发
      • $events/client.connected
  • 应用场景

    • 动作监听
    • 数据筛选
    • 消息路由
    • 消息编解码
  • 规则引擎组成

    • 规则:{sql语句,动作列表:[{动作,动作参数,绑定资源:{资源配置}}]}
  • sql语句

    • select <字段名> from <主题> [where <条件>]
    • foreach <字段名> [do <条件>] [incase <条件>] from <主题> [where <条件>]

系统调优

# 所有进程最大文件数
sysctl -w fs.file-max=2097152
# 单个进程可分割最大文件数
sysctl -w fs.nr_open=2097152
# 允许当前会话/进程打开文件句柄数
ulimit -n 1048576 # backlog
sysctl -w net.core.somaxconn=32768
sysctl -w net.ipv4.tcp_max_syn_backlog=16384
sysctl -w net.core.netdev_max_backlog=16384