> 文章列表 > 【Python】基于ML307A的位置读取系统(通过UART串口实现AT指令和flask来实现自动化读取并推流)

【Python】基于ML307A的位置读取系统(通过UART串口实现AT指令和flask来实现自动化读取并推流)

【Python】基于ML307A的位置读取系统(通过UART串口实现AT指令和flask来实现自动化读取并推流)

【Python】基于ML307A的位置读取系统(通过UART串口实现AT指令和flask来实现自动化读取并推流)

Python下的串口serial库

串行口的属性:
name:设备名字
portstr:已废弃,用name代替
port:读或者写端口
baudrate:波特率
bytesize:字节大小
parity:校验位
stopbits:停止位
timeout:读超时设置
writeTimeout:写超时
xonxoff:软件流控
rtscts:硬件流控
dsrdtr:硬件流控
interCharTimeout:字符间隔超时

属性的使用方法:
ser=serial.Serial(“/dev/ttyAMA0”,9600,timeout=0.5)
ser.open()

print ser.name
print ser.port
print ser.baudrate#波特率
print ser.bytesize#字节大小
print ser.parity#校验位N-无校验,E-偶校验,O-奇校验
print ser.stopbits#停止位
print ser.timeout#读超时设置
print ser.writeTimeout#写超时
print ser.xonxoff#软件流控
print ser.rtscts#硬件流控
print ser.dsrdtr#硬件流控
print ser.interCharTimeout#字符间隔超时

ser.close()

串口接收要用到多线程库(类似单片机中的中断):

def thread_com_receive():global ML307A_RXwhile True:try:rx_buf = ''rx_buf = COMM.read()  # 转化为整型数字if rx_buf != b'':time.sleep(0.01)rx_buf = rx_buf + COMM.read_all()print("串口收到消息:", rx_buf)ML307A_RX=str(rx_buf).split("\\\\r\\\\n")[1]print(ML307A_RX)time.sleep(0.01)except:passpass

初始化串口:

# 打开串口
def serial_open(n=0):global COMMserial_port = set_com_port(n)COMM = serial.Serial(serial_port, 115200, timeout=0.01)if COMM.isOpen():print(serial_port, "open success")return 0else:print("open failed")return 255
def init_com():global ML307A_RXget_com_list()len = port_list.__len__()device = port_list[0].deviceprint(len, device)serial_open()thread1 = threading.Thread(target=thread_com_receive)thread1.start()COMM.write("AT\\r\\n".encode("UTF-8"))time.sleep(0.1)if ML307A_RX=="OK":com_jugg()

AT的命令格式

AT指令格式:AT指令都以”AT”开头,以0x0D 0x0A(即\\r\\n,换行回车符)结束,模块运行后,串口默认的设置为:8位数据位、1位停止位、无奇偶校验位、硬件流控制(CTS/RTS).
注意为了发送AT命令,最后还要加上0x0D 0x0A(即\\r\\n,换行回车符)这是串口终端要求.
有一些命令后面可以加额外信息来.如电话号码

每个AT命令执行后,通常DCE都给状态值,用于判断命令执行的结果.

AT返回状态包括三种情况 OK,ERROR,和命令相关的错误原因字符串.返回状态前后都有一个字符.
如 OK 表示AT命令执行成功.
ERROR 表示AT命令执行失败
NO DIAL TONE 只出现在ATD命令返回状态中,表示没有拨号音,这类返回状态要查命令手册

还有一些命令本身是要向DCE查询数据,数据返回时,一般是+打头命令.返回格式
+命令:命令结果
如:AT+CMGR=8 (获取第8条信息)
返回 +CMGR: “REC UNREAD”,“+8613508485560”,“01/07/16,15:37:28+32”,Once more

AT指令串口通信代码如下:

# -*- coding: utf-8 -*-
import serial
import serial.tools.list_ports
import time
import threadingcom_rx_buf = ''				# 接收缓冲区
com_tx_buf = ''				# 发送缓冲区
COMM = serial.Serial()		# 定义串口对象
port_list: list				# 可用串口列表
port_select: list			# 选择好的串口ML307A_RX=''# 无串口返回0,
# 返回可用的串口列表
def get_com_list():global port_list# a = serial.tools.list_ports.comports()# print(a)# port_list = list(serial.tools.list_ports.comports())port_list = serial.tools.list_ports.comports()return port_listdef set_com_port(n=0):global port_listglobal port_selectport_select = port_list[n]return port_select.device# 打开串口
def serial_open(n=0):global COMMserial_port = set_com_port(n)COMM = serial.Serial(serial_port, 115200, timeout=0.01)if COMM.isOpen():print(serial_port, "open success")return 0else:print("open failed")return 255# 关闭串口
def serial_close():global COMMCOMM.close()print(COMM.name + "closed.")def set_com_rx_buf(buf=''):global com_rx_bufcom_rx_buf = bufdef set_com_tx_buf(buf=''):global com_tx_bufcom_tx_buf = bufdef get_com_rx_buf():global com_rx_bufreturn com_rx_bufdef get_com_tx_buf():global com_tx_bufreturn com_tx_bufdef thread_com_receive():global ML307A_RXwhile True:try:rx_buf = ''rx_buf = COMM.read()  # 转化为整型数字if rx_buf != b'':time.sleep(0.01)rx_buf = rx_buf + COMM.read_all()print("串口收到消息:", rx_buf)ML307A_RX=str(rx_buf).split("\\\\r\\\\n")[1]print(ML307A_RX)time.sleep(0.01)except:passpass# def serial_encode(addr=0, command=0, param1=0, param0=0):
#     buf = [addr, command, param1, param0, 0, 0, 0, 0]
#     print(buf)
#     return bufdef serial_send_command(addr=0, command=0, param1=0, param0=0, data3=0, data2=0, data1=0, data0=0):buf = [addr, command, param1, param0, data3, data2, data1, data0]COMM.write(buf)passdef serial_init():buf = "AT+CG\\r\\n"COMM.write(buf)time.sleep(0.05)buf = COMM.read_all()if buf != "OK\\r\\n":return 254  # 进入调试模式失败buf = "AT+CAN_MODE=0\\r\\n"COMM.write(buf)time.sleep(0.05)buf = COMM.read_all()if buf != "OK\\r\\n":return 253          # 进入正常模式失败,模块处于1状态,即环回模式中buf = "AT+CAN_BAUD=500000\\r\\n"COMM.write(buf)time.sleep(0.05)buf = COMM.read_all()if buf != "OK\\r\\n":return 253          # 波特率设置失败buf = "AT+FRAMEFORMAT=1,0,\\r\\n"COMM.write(buf)time.sleep(0.05)buf = COMM.read_all()if buf != "OK\\r\\n":return 253          # 波特率设置失败buf = "AT+ET\\r\\n"       # 进入透传模式COMM.write(buf)time.sleep(0.05)buf = COMM.read_all()if buf != "OK\\r\\n":return 255  # 不是CAN模块def com_jugg():global ML307A_RXwhile True:COMM.write("""AT+MUESTATS="radio"\\r\\n""".encode("UTF-8"))time.sleep(5)print(ML307A_RX)def init_com():global ML307A_RXget_com_list()len = port_list.__len__()device = port_list[0].deviceprint(len, device)serial_open()thread1 = threading.Thread(target=thread_com_receive)thread1.start()COMM.write("AT\\r\\n".encode("UTF-8"))time.sleep(0.1)if ML307A_RX=="OK":com_jugg()if __name__ == '__main__':init_com()

ML307A的部分AT指令说明

【Python】基于ML307A的位置读取系统(通过UART串口实现AT指令和flask来实现自动化读取并推流)
发送AT+MUESTATS=“radio”\\r\\n即可查询位置信息

# -*- coding: utf-8 -*-
"""
Created on Mon Apr 10 18:21:52 2023@author: ZHOU
"""import time
from flask import Flask, render_template, request
import threading
import socket
import serial
import serial.tools.list_portslocal_post = 1212
local_ip = Nonefor i in range(12):try:s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)s.connect(("8.8.8.8",80))local_ip = str(s.getsockname()[0])s.close()print("Network Enable")network_flag = 1breakexcept:        print("Network Error...")network_flag = 0time.sleep(5)app = Flask(__name__)com_rx_buf = ''				# 接收缓冲区
com_tx_buf = ''				# 发送缓冲区
COMM = serial.Serial()		# 定义串口对象
port_list: list				# 可用串口列表
port_select: list			# 选择好的串口ML307A_RX=''location_info = '请输入用户名和密码并成功登陆后,刷新网页查看定位信息'
location_info_flag=0
# 无串口返回0,
# 返回可用的串口列表
def get_com_list():global port_list# a = serial.tools.list_ports.comports()# print(a)# port_list = list(serial.tools.list_ports.comports())port_list = serial.tools.list_ports.comports()return port_listdef set_com_port(n=0):global port_listglobal port_selectport_select = port_list[n]return port_select.device# 打开串口
def serial_open(n=0):global COMMserial_port = set_com_port(n)COMM = serial.Serial(serial_port, 115200, timeout=0.01)if COMM.isOpen():print(serial_port, "open success")return 0else:print("open failed")return 255# 关闭串口
def serial_close():global COMMCOMM.close()print(COMM.name + "closed.")def set_com_rx_buf(buf=''):global com_rx_bufcom_rx_buf = bufdef set_com_tx_buf(buf=''):global com_tx_bufcom_tx_buf = bufdef get_com_rx_buf():global com_rx_bufreturn com_rx_bufdef get_com_tx_buf():global com_tx_bufreturn com_tx_bufdef thread_com_receive():global ML307A_RXwhile True:try:rx_buf = ''rx_buf = COMM.read()  # 转化为整型数字if rx_buf != b'':time.sleep(0.01)rx_buf = rx_buf + COMM.read_all()print("串口收到消息:", rx_buf)ML307A_RX=str(rx_buf).split("\\\\r\\\\n")[1]time.sleep(0.01)except:passpass# def serial_encode(addr=0, command=0, param1=0, param0=0):
#     buf = [addr, command, param1, param0, 0, 0, 0, 0]
#     print(buf)
#     return bufdef serial_send_command(addr=0, command=0, param1=0, param0=0, data3=0, data2=0, data1=0, data0=0):buf = [addr, command, param1, param0, data3, data2, data1, data0]COMM.write(buf)passdef serial_init():buf = "AT+CG\\r\\n"COMM.write(buf)time.sleep(0.05)buf = COMM.read_all()if buf != "OK\\r\\n":return 254  # 进入调试模式失败buf = "AT+CAN_MODE=0\\r\\n"COMM.write(buf)time.sleep(0.05)buf = COMM.read_all()if buf != "OK\\r\\n":return 253          # 进入正常模式失败,模块处于1状态,即环回模式中buf = "AT+CAN_BAUD=500000\\r\\n"COMM.write(buf)time.sleep(0.05)buf = COMM.read_all()if buf != "OK\\r\\n":return 253          # 波特率设置失败buf = "AT+FRAMEFORMAT=1,0,\\r\\n"COMM.write(buf)time.sleep(0.05)buf = COMM.read_all()if buf != "OK\\r\\n":return 253          # 波特率设置失败buf = "AT+ET\\r\\n"       # 进入透传模式COMM.write(buf)time.sleep(0.05)buf = COMM.read_all()if buf != "OK\\r\\n":return 255  # 不是CAN模块def com_jugg():global ML307A_RXglobal location_infoglobal location_info_flagwhile True:COMM.write("""AT+MUESTATS="radio"\\r\\n""".encode("UTF-8"))time.sleep(0.5)if location_info_flag == 1:location_info = ML307A_RX.split("""+MUESTATS: "radio",""")[1]print(location_info)time.sleep(2.5)print(1)def init_com():global ML307A_RXget_com_list()len = port_list.__len__()device = port_list[0].deviceprint(len, device)serial_open(0)thread1 = threading.Thread(target=thread_com_receive)thread1.setDaemon(True)thread1.start()COMM.write("AT\\r\\n".encode("UTF-8"))time.sleep(0.1)print(321)if ML307A_RX=="OK":print(111)com_jugg()#@app.route('/login/')   # 登录
#def login():
#    return render_template('login.html')@app.route('/', methods=['GET', 'POST'])
def index():global location_info_flagglobal location_infocommand_str=''if request.method == 'POST':c0 = str(request.form.get('send'))c1 = str(request.form.get('user'))c2 = str(request.form.get('pass'))for i in [c0,c1,c2]:if i != "None":command_str = ibreakif command_str == "登陆":if str(request.form.get('user'))=="admin" and str(request.form.get('pass'))=="123":location_info_flag=1print("登陆成功")else:print("登陆失败")now_today = time.time()time_hour = time.localtime(now_today).tm_hourtime_min = time.localtime(now_today).tm_mintime_sec = time.localtime(now_today).tm_secif time_hour < 10:time_hour = "0"+str(time_hour)if time_min < 10:time_min = "0"+str(time_min)local_time_str = str(time.localtime(now_today).tm_year)+"-"+str(time.localtime(now_today).tm_mon)+"-"+str(time.localtime(now_today).tm_mday)+" "+str(time_hour)+":"+str(time_min)+":"+str(time_sec)data = {'当前时间:': [local_time_str],'位置信息:': [location_info],}return render_template('index.html',data_dict=data)def app_run():app.run(host=local_ip, port=local_post)if __name__ == "__main__":thread0 = threading.Thread(target=app_run)thread0.setDaemon(True)thread0.start()print(123)init_com()
![在这里插入图片描述](https://img-blog.csdnimg.cn/297662e9c9424b1292f623d605d7ee7f.jpeg)