JY901串⼝数据接收与处理(Python)
最近在⽤JY901做⼀些实验,关于JY901⽹上有很多资料了,也有上位机软件,可以⽅便的查看输出数据。我想做的是对输出的⾓速度进⾏积分,对⽐积分后的结果与输出的⾓度,如果数据都⽐较准确地话,那么他们应该相差不⼤。
这篇⽂章⾥,要完成的事情就是通过串⼝接收他输出的⾓速度和⾓度,然后对⾓速度进⾏积分,并实时显⽰数据结果。下⾯我⾸先对各个部分进⾏分块解释,完整的代码放在最后。
1. 串⼝通信
python实现串⼝通信可以⽤ pySerial 库。我们⾸先选择串⼝对应的设备端⼝:
# 获取串⼝设备对应的端⼝
def get_serial_port():
ports =ls.list_portsports())
for port in ports:
print(ports.index(port), port)
selected =-1
while selected <0or selected >=len(ports):
print("please input serial to use [start from 0]:")
selected =int(input())
print("selected: ", selected)
port = ports[selected]
print("use ", port)
port =list(port)
return port[0]
然后打开串⼝并持续接收数据,在收到的数据中提取IMU数据:
def serial_readers(self, port =None, size =22):
# port  : 串⼝端⼝
# size  : 默认⼀次读取数据长度(bytes)
if not port:
port = get_serial_port()
total =bytearray()
ser =None
while True:
try:
if not ser:
ser = serial.Serial(port,115200, timeout=60)
if not ser.is_open:
ser.open()
tmp = ad(size)
total, ext_omega, ext_angle = act_raw_data(total)#提取IMU数据
except Exception as e:
print("exception ", e)
if ser:
ser.close()
time.sleep(5)
d_omega, d_angle
2. IMU数据提取
⾸先我们来看JY901串⼝数据的帧结构:
加速度:0x55 0x51 AxL AxH AyL AyH AzL AzH TL TH SUM
⾓速度:0x55 0x52 wxL wxH wyL wyH wzL wzH TL TH SUM
⾓度:  0x55 0x53 RollL RollH PitchL PitchH YawL YawH TL TH SUM
这三种数据帧长度都是 11 Bytes,最后⼀位为校验位,我们从串⼝数据中提取IMU数据也是根据这个帧结构,先检测帧头,然后读取整个帧(代码⾥偷了懒,并没有核对校验位)
def extract_raw_data(self, data):
# 根据报⽂格式读取⾓速度数据
# 0x55 0x52 + data + CRC(8bits)
length =9
ext_omega =None
ext_angle =None
try:
st = data.index(b'\x55')# 寻报⽂引导字
if st >=0:
reserved = data[st+1]# 数据类型
if reserved ==82:# 0x52,提取⾓速度消息
if st+length+1<len(data):
unt_omega==0:
self.time_now = self.time_last = time.time()
else:
self.time_last = self.time_now
self.time_now = time.time()
ext_omega = data[st:st+length+2]
temp =int.from_bytes(ext_omega[6:8], byteorder='little', signed=True)
data = data[st+length+2:]
elif reserved ==83:# 0x53,提取⾓度消息
if st+length+1<len(data):
ext_angle = data[st:st+length+2]
temp =int.from_bytes(ext_angle[6:8], byteorder='little', signed=True)
self.yaw = temp/32768*180
unt_yaw==0:#记录初始相位
self.yaw_init = self.yaw
data = data[st+length+2:]
elif reserved ==81:# 0x51, 丢弃加速度消息
if st+length+1<len(data):
ext_data =None
data = data[st+length+2:]
else:
data = data[st+2:]
except:
return data, ext_omega, ext_angle
unt_yaw%100==unt_yaw>0:
self.save_data()
print(d_omega),' extract omega  ',ext_a_inte)
print(d_angle),' extract angle  ',ext_angle,self.yaw - self.yaw_init)
print('*'*30)
return data, ext_omega, ext_angle
3. 绘图显⽰
我把历史接收数据都存在⼀个 list ⾥⾯了,每过 0.1s 就重新绘图,看起来就像是在实时更新⼀样。
def plot(self):
plt.ion()
plt.figure()
while True:
try:
if d_angle)>0and d_inte)>0:
plt.clf()
plt.plot(list(range(d_angle))),d_angle,'r')
plt.plot(list(range(d_inte))),d_inte,'b')
plt.xlim(max(unt_yaw-3000),unt_yaw+200)
plt.legend(labels=['angle','integration'])
plt.pause(0.1)
plt.ioff()
except Exception as e:
print("exception ", e)
4. 运⾏效果
当时忘记保存截图了,这⾥就不放了 emmmm
5. 完整代码
为了便于共享数据,定义了⼀个类 JY901,将上⾯的各个函数放在这个类⾥⾯,输出的数据也都记录在 JY901 的成员变量⾥。
import serial
ls.list_ports
import time
import struct
import matplotlib.pyplot as plt
import numpy as np
import threading
class JY901():
def__init__(self):
self.time_last =0
self.time_now =0
self.yaw_init =0
self.yaw =0
def extract_raw_data(self, data):
# 根据报⽂格式读取⾓速度数据
# 0x55 0x52 + data + CRC(8bits)
length =9
ext_omega =None
ext_angle =None
try:
st = data.index(b'\x55')# 寻报⽂引导字
if st >=0:
reserved = data[st+1]# 数据类型
if reserved ==82:# 0x52,提取⾓速度消息
if st+length+1<len(data):
unt_omega==0:
self.time_now = self.time_last = time.time()
else:
self.time_last = self.time_now
self.time_now = time.time()
ext_omega = data[st:st+length+2]
temp =int.from_bytes(ext_omega[6:8], byteorder='little', signed=True)
data = data[st+length+2:]
elif reserved ==83:# 0x53,提取⾓度消息
if st+length+1<len(data):
ext_angle = data[st:st+length+2]
temp =int.from_bytes(ext_angle[6:8], byteorder='little', signed=True)
self.yaw = temp/32768*180
unt_yaw==0:#记录初始相位
self.yaw_init = self.yaw
python怎么读取串口数据
data = data[st+length+2:]
elif reserved ==81:# 0x51, 丢弃加速度消息
if st+length+1<len(data):
ext_data =None
data = data[st+length+2:]
else:
data = data[st+2:]
except:
return data, ext_omega, ext_angle
unt_yaw%100==unt_yaw>0:
self.save_data()
print(d_omega),' extract omega  ',ext_a_inte)
print(d_angle),' extract angle  ',ext_angle,self.yaw - self.yaw_init)
print('*'*30)
return data, ext_omega, ext_angle
def serial_readers(self, port =None, size =22):
# port  : 串⼝端⼝
# size  : 默认⼀次读取数据长度(bytes)
if not port:
port = get_serial_port()
total =bytearray()
ser =None
while True:
try:
if not ser:
ser = serial.Serial(port,115200, timeout=60)
if not ser.is_open:
ser.open()
tmp = ad(size)
total, ext_omega, ext_angle = act_raw_data(total)
except Exception as e:
print("exception ", e)
if ser:
ser.close()
time.sleep(5)
d_omega, d_angle
def save_data(self):
np.savez('record.npz',record_d_omega, record_d_angle)
def plot(self):
plt.ion()
plt.figure()
while True:
try:
if d_angle)>0and d_inte)>0:
plt.clf()
plt.plot(list(range(d_angle))),d_angle,'r')                    plt.plot(list(range(d_inte))),d_inte,'b')
plt.xlim(max(unt_yaw-3000),unt_yaw+200)
plt.legend(labels=['angle','integration'])
plt.pause(0.1)
plt.ioff()
except Exception as e:
print("exception ", e)
def get_serial_port():
ports =ls.list_portsports())
for port in ports:
print(ports.index(port), port)
selected =-1
while selected <0or selected >=len(ports):
print("please input serial to use [start from 0]:")
selected =int(input())
print("selected: ", selected)
port = ports[selected]
print("use ", port)
port =list(port)
return port[0]
if __name__ =='__main__':
imu = JY901()
thread_rcv = threading.Thread(target=imu.serial_readers)
thread_plt = threading.Thread(target=imu.plot)
thread_plt.start()
thread_rcv.start()

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。