390 lines
13 KiB
Python
390 lines
13 KiB
Python
import os
|
||
import sys
|
||
import datetime
|
||
import threading
|
||
import time
|
||
import epm
|
||
import serial
|
||
import serial.tools.list_ports
|
||
|
||
|
||
from PyQt5.QtCore import *
|
||
from PyQt5.QtWidgets import QApplication, QMainWindow, QMessageBox
|
||
|
||
|
||
from command import Command
|
||
|
||
import pyqtgraph as pg
|
||
import numpy as np
|
||
|
||
|
||
import matplotlib.pyplot as plt
|
||
|
||
import pkg.common as common
|
||
from pyqtgraph import examples
|
||
# examples.run()
|
||
|
||
SERIAL_PORT = 'COM10' # 串口号
|
||
|
||
# 串口速率定义
|
||
BAUDRATE = 115200
|
||
array_valve = [[1, "两通阀", 0x10], [2, "两通阀", 0x10],
|
||
[3, "两通阀", 0x10], [4, "两通阀", 0x10], [5, "两通阀", 0x10], [1, "三通阀", 0x11], [2, "三通阀", 0x11], [3, "三通阀", 0x11], [4, "三通阀", 0x11]]
|
||
|
||
array_step_motor = [[0,"反转"],[1,"正转"]]
|
||
|
||
def com_get():
|
||
port_list = list(serial.tools.list_ports.comports())
|
||
return port_list
|
||
|
||
|
||
class MainProcess(QMainWindow):
|
||
# 定义变量
|
||
dll = None
|
||
command = None
|
||
com_list = []
|
||
serial = None
|
||
serial_read_thread = None
|
||
com_port = None
|
||
response_si = pyqtSignal(list)
|
||
|
||
# 给定一个X轴和Y轴的参数列表,用作后面承载数据
|
||
max_point = 100
|
||
obsX = None
|
||
obsY = None
|
||
pid_count = 0
|
||
pid_set = 500
|
||
pid_graph = None
|
||
pid_si = pyqtSignal()
|
||
|
||
def set_graph_ui(self):
|
||
pg.setConfigOptions(antialias=False) # pyqtgraph全局变量设置函数,antialias=True开启曲线抗锯齿
|
||
win = pg.GraphicsLayoutWidget() # 创建pg layout,可实现数据界面布局自动管理
|
||
# pg绘图窗口可以作为一个widget添加到GUI中的graph_layout,当然也可以添加到Qt其他所有的容器中
|
||
ui.graph_layout.addWidget(win)
|
||
p1 = win.addPlot(title="PID") # 添加第一个绘图窗口
|
||
p1.setLabel('left', text='', color='#ffffff') # y轴设置函数
|
||
p1.showGrid(x=True, y=True) # 栅格设置函数
|
||
p1.setLogMode(x=False, y=False) # False代表线性坐标轴,True代表对数坐标轴
|
||
p1.setLabel('bottom', text='', units='次') # x轴设置函数
|
||
# p1.addLegend() # 可选择是否添加legend
|
||
return p1
|
||
|
||
def plot_sin_cos(self):
|
||
if self.pid_set==0:
|
||
self.pid_graph.setYRange(0, 100)
|
||
else:
|
||
self.pid_graph.setYRange(0, self.pid_set*2)
|
||
|
||
if self.pid_count > self.max_point:
|
||
self.pid_graph.setXRange(self.pid_count-self.max_point, self.pid_count)
|
||
else:
|
||
self.pid_graph.setXRange(0, self.max_point)
|
||
|
||
default_pen = pg.mkPen(width=2, color='g')
|
||
self.pid_graph.plot(self.obsX.array, self.obsY.array, pen=default_pen, name='', clear=True)
|
||
# 在Y轴上画一条水平线
|
||
self.pid_graph.addLine(y=self.pid_set, pen=pg.mkPen('y', width=1, style=Qt.DotLine))
|
||
|
||
def plot_init(self):
|
||
self.pid_count = 0
|
||
self.pid_set = 0
|
||
self.obsX.clear()
|
||
self.obsY.clear()
|
||
pass
|
||
|
||
def pyqtgraph_click(self):
|
||
examples.run()
|
||
pass
|
||
|
||
def init(self, path):
|
||
self.command = Command(path)
|
||
ui.bt_pyqtgraph.clicked.connect(self.pyqtgraph_click)
|
||
ui.bt_reset.clicked.connect(self.bt_reset_clicked)
|
||
ui.bt_com_open.clicked.connect(self.bt_com_open_clicked)
|
||
ui.bt_config.clicked.connect(self.bt_send_config)
|
||
ui.bt_command3.clicked.connect(self.bt_command3_clicked)
|
||
ui.bt_command4.clicked.connect(self.bt_command4_clicked)
|
||
ui.bt_command5.clicked.connect(self.bt_command5_clicked)
|
||
ui.bt_command6.clicked.connect(self.bt_command6_clicked)
|
||
ui.bt_command8.clicked.connect(self.bt_command8_clicked)
|
||
ui.bt_command9.clicked.connect(self.bt_command9_clicked)
|
||
ui.bt_command10.clicked.connect(self.bt_command10_clicked)
|
||
ui.bt_command11.clicked.connect(self.bt_command11_clicked)
|
||
ui.bt_set_valve_open.clicked.connect(self.bt_set_valve_open_clicked)
|
||
ui.bt_set_valve_close.clicked.connect(self.bt_set_valve_close_clicked)
|
||
ui.bt_send_valve_ratio_value.clicked.connect(
|
||
self.bt_send_valve_ratio_value_clicked)
|
||
ui.bt_read_valve_ratio_value.clicked.connect(
|
||
self.bt_read_valve_ratio_value_clicked)
|
||
|
||
ui.bt_command15.clicked.connect(self.bt_command15_clicked)
|
||
ui.bt_content_clear.clicked.connect(ui.txt_content.clear)
|
||
|
||
ui.radio_ip.clicked.connect(self.plot_init)
|
||
ui.radio_valve.clicked.connect(self.plot_init)
|
||
|
||
self.obsX = common.LimitedArray(self.max_point)
|
||
self.obsY = common.LimitedArray(self.max_point)
|
||
plt.ion() # 开启一个画图的窗口
|
||
# 遍历array_valve并赋值到cb_valve
|
||
for i in range(len(array_valve)):
|
||
ui.cb_valve.addItem(
|
||
array_valve[i][1] + " " + str(array_valve[i][0]))
|
||
|
||
# 遍历 array_step_motor 并赋值到 cb_step_motor
|
||
for i in range(len(array_step_motor)):
|
||
ui.cb_step_motor.addItem(
|
||
array_step_motor[i][1] + " " + str(array_step_motor[i][0]))
|
||
|
||
|
||
|
||
# 参数配置
|
||
ui.bt_current_config_write.clicked.connect(
|
||
self.bt_current_config_write)
|
||
ui.bt_current_config_read.clicked.connect(self.bt_current_config_read)
|
||
|
||
# 串口号读取
|
||
self.cb_com_select_port_clicked()
|
||
|
||
# 定时器
|
||
self.timer = QTimer(self) # 初始化一个定时器
|
||
self.command_timer = QTimer(self) # 初始化一个定时器
|
||
|
||
self.timer.timeout.connect(self.operate) # 计时结束调用operate()方法
|
||
self.command_timer.timeout.connect(self.command1) # 计时结束调用operate()方法
|
||
|
||
self.timer.start(3000) # 设置计时间隔并启动,3秒
|
||
self.command_timer.start(200) # 设置计时间隔并启动,100毫秒
|
||
|
||
# 绑定信号
|
||
self.response_si.connect(self.response_done)
|
||
|
||
self.pid_graph = self.set_graph_ui()
|
||
self.pid_si.connect(self.plot_sin_cos)
|
||
|
||
|
||
def operate(self):
|
||
# 串口打开判断
|
||
if self.serial is not None and self.serial.isOpen():
|
||
self.send(self.command.command3())
|
||
pass
|
||
|
||
# 查询IP输入电流或者比例阀
|
||
def command1(self):
|
||
if self.serial is not None and self.serial.isOpen():
|
||
if ui.radio_ip.isChecked():
|
||
self.send(self.command.command1())
|
||
else:
|
||
self.send(self.command.command13())
|
||
|
||
self.command.command_print = False
|
||
self.pid_count += 1
|
||
pass
|
||
|
||
def send(self, data):
|
||
if self.serial is None:
|
||
# 弹出提示框
|
||
QMessageBox(QMessageBox.Warning, '警告', '串口未打开').exec_()
|
||
return
|
||
self.serial.write(data)
|
||
|
||
def cb_com_select_port_clicked(self):
|
||
ui.cb_com_select_port.clear()
|
||
self.com_list = com_get()
|
||
if len(self.com_list) >= 0:
|
||
for i in range(len(self.com_list)):
|
||
s = self.com_list[i][1]
|
||
ui.cb_com_select_port.addItem(s)
|
||
|
||
# 自动打开串口
|
||
if self.com_list[i][0] == SERIAL_PORT:
|
||
ui.cb_com_select_port.setCurrentIndex(i)
|
||
self.bt_com_open_clicked()
|
||
|
||
def response_done(self, data):
|
||
res = self.command.response(data)
|
||
|
||
if res.msg != '':
|
||
ui.txt_content.append(res.msg)
|
||
if self.command.cmd == 0x01 or self.command.cmd==0x0d:
|
||
if self.pid_count <100:
|
||
self.pid_set = self.command.value
|
||
|
||
self.obsX.append(self.pid_count)
|
||
self.obsY.append(self.command.value)
|
||
self.pid_si.emit()
|
||
|
||
|
||
def serial_read_thread_func(self):
|
||
# 串口读取线程
|
||
time.sleep(0.5)
|
||
try:
|
||
while True:
|
||
if self.serial is not None and self.serial.isOpen():
|
||
data = self.serial.read(self.serial.inWaiting())
|
||
if len(data) > 0:
|
||
# common.print_hex_data_space(data)
|
||
res = self.command.process(data)
|
||
if res is not None and len(res) > 0:
|
||
self.response_si.emit(res)
|
||
pass
|
||
|
||
except serial.SerialException as e:
|
||
print('exit failed:', e)
|
||
finally:
|
||
self.serial = None
|
||
print('exit')
|
||
|
||
def bt_com_open_clicked(self): # 点击串口
|
||
ui.bt_com_open.setDisabled(True)
|
||
if ui.bt_com_open.text() == '打开':
|
||
ui.bt_com_open.setText('关闭')
|
||
ui.cb_com_select_port.setDisabled(True)
|
||
# 打开串口
|
||
port_info = ui.cb_com_select_port.currentText()
|
||
for i in range(len(self.com_list)):
|
||
s = self.com_list[i][1]
|
||
if s == port_info:
|
||
self.com_port = self.com_list[i][0]
|
||
break
|
||
self.serial = serial.Serial(self.com_port, BAUDRATE, timeout=0.5)
|
||
if self.serial.isOpen():
|
||
print('open success')
|
||
else:
|
||
print('open failed')
|
||
self.serial_read_thread = threading.Thread( # 串口读取线程
|
||
target=self.serial_read_thread_func)
|
||
self.serial_read_thread.setDaemon(True)
|
||
self.serial_read_thread.start()
|
||
|
||
else:
|
||
ui.bt_com_open.setText('打开')
|
||
ui.cb_com_select_port.setDisabled(False)
|
||
# 关闭串口
|
||
try:
|
||
if self.serial is not None and self.serial.isOpen():
|
||
self.serial.flush()
|
||
self.serial.close()
|
||
print('close success')
|
||
except serial.SerialException as e:
|
||
print('close failed:', e)
|
||
finally:
|
||
self.serial = None
|
||
|
||
ui.bt_com_open.setDisabled(False)
|
||
|
||
def bt_reset_clicked(self): # 点击复位
|
||
self.send(self.command.command0())
|
||
|
||
def bt_send_config(self):
|
||
self.bt_current_config_write()
|
||
pass
|
||
|
||
def bt_current_config_write(self):
|
||
# 调节IP输入电流
|
||
s = "调节IP输入电流(μA):" + str(ui.sb_current.value())
|
||
ui.txt_content.append(s)
|
||
self.send(self.command.command2(ui.sb_current.value()))
|
||
self.pid_set = ui.sb_current.value()
|
||
pass
|
||
|
||
def bt_current_config_read(self):
|
||
# 查询IP输入电流
|
||
self.send(self.command.command1())
|
||
self.command.command1_print = True
|
||
pass
|
||
|
||
def bt_command3_clicked(self):
|
||
self.send(self.command.command3())
|
||
pass
|
||
|
||
def bt_command4_clicked(self):
|
||
self.send(self.command.command4())
|
||
pass
|
||
|
||
def bt_command5_clicked(self):
|
||
self.send(self.command.command5())
|
||
pass
|
||
|
||
def bt_set_valve_open_clicked(self):
|
||
# 获取cb_valve的序号
|
||
index = ui.cb_valve.currentIndex()
|
||
data = array_valve[index]
|
||
status = 1
|
||
if data[2] == 0x11:
|
||
status = 1
|
||
|
||
self.send(self.command.command12(data[2], data[0], status))
|
||
pass
|
||
|
||
def bt_command6_clicked(self):
|
||
self.send(self.command.command6())
|
||
pass
|
||
|
||
def bt_command8_clicked(self):
|
||
self.send(self.command.command8())
|
||
pass
|
||
|
||
def bt_command9_clicked(self):
|
||
self.send(self.command.command9(str(ui.sb_device_address.value())))
|
||
pass
|
||
|
||
def bt_command10_clicked(self):
|
||
self.send(self.command.command10())
|
||
pass
|
||
|
||
def bt_command11_clicked(self):
|
||
self.send(self.command.command11())
|
||
pass
|
||
|
||
def bt_set_valve_close_clicked(self):
|
||
index = ui.cb_valve.currentIndex()
|
||
data = array_valve[index]
|
||
status = False
|
||
if data[2] == 0x11:
|
||
status = 2
|
||
self.send(self.command.command12(data[2], data[0], status))
|
||
pass
|
||
|
||
def bt_send_valve_ratio_value_clicked(self):
|
||
value = ui.sb_valve_ratio_value.value()
|
||
self.send(self.command.command14(value))
|
||
pass
|
||
|
||
def bt_read_valve_ratio_value_clicked(self):
|
||
self.send(self.command.command13())
|
||
pass
|
||
|
||
def bt_command15_clicked(self):
|
||
dir = ui.cb_step_motor.currentIndex()
|
||
angle = ui.sb_step_motor.value()
|
||
print(dir,angle)
|
||
self.send(self.command.command15(dir,angle))
|
||
pass
|
||
|
||
|
||
if __name__ == '__main__':
|
||
app = QApplication(sys.argv)
|
||
MainWindow = QMainWindow()
|
||
|
||
width = 800
|
||
height = 600
|
||
screen = QApplication.desktop().screenGeometry()
|
||
x = (screen.width() - width) / 2
|
||
y = (screen.height() - height) / 2
|
||
MainWindow.setGeometry(x, y, width, height) # 设置窗口大小为 800x600
|
||
MainWindow.setWindowTitle('EPM 测试工具')
|
||
# 禁止最大化
|
||
MainWindow.setFixedSize(MainWindow.width(), MainWindow.height())
|
||
|
||
ui = epm.Ui_Form()
|
||
ui.setupUi(MainWindow)
|
||
process = MainProcess(MainWindow)
|
||
process.init("./epm.dll")
|
||
|
||
MainWindow.show()
|
||
sys.exit(app.exec_())
|
||
|
||
|