This repository has been archived on 2025-01-02. You can view files and clone it, but cannot push or open issues or pull requests.
torsion/User/test/gui/main.py

390 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import datetime
import threading
import time
import epm
import serial
import serial.tools.list_ports
from PyQt5.QtCore import *
from PyQt5.QtWidgets import QApplication, QMainWindow, QMessageBox
from command import Command
import pyqtgraph as pg
import numpy as np
import matplotlib.pyplot as plt
import pkg.common as common
from pyqtgraph import examples
# examples.run()
SERIAL_PORT = 'COM10' # 串口号
# 串口速率定义
BAUDRATE = 115200
array_valve = [[1, "两通阀", 0x10], [2, "两通阀", 0x10],
[3, "两通阀", 0x10], [4, "两通阀", 0x10], [5, "两通阀", 0x10], [1, "三通阀", 0x11], [2, "三通阀", 0x11], [3, "三通阀", 0x11], [4, "三通阀", 0x11]]
array_step_motor = [[0,"反转"],[1,"正转"]]
def com_get():
port_list = list(serial.tools.list_ports.comports())
return port_list
class MainProcess(QMainWindow):
# 定义变量
dll = None
command = None
com_list = []
serial = None
serial_read_thread = None
com_port = None
response_si = pyqtSignal(list)
# 给定一个X轴和Y轴的参数列表用作后面承载数据
max_point = 100
obsX = None
obsY = None
pid_count = 0
pid_set = 500
pid_graph = None
pid_si = pyqtSignal()
def set_graph_ui(self):
pg.setConfigOptions(antialias=False) # pyqtgraph全局变量设置函数antialias=True开启曲线抗锯齿
win = pg.GraphicsLayoutWidget() # 创建pg layout可实现数据界面布局自动管理
# pg绘图窗口可以作为一个widget添加到GUI中的graph_layout当然也可以添加到Qt其他所有的容器中
ui.graph_layout.addWidget(win)
p1 = win.addPlot(title="PID") # 添加第一个绘图窗口
p1.setLabel('left', text='', color='#ffffff') # y轴设置函数
p1.showGrid(x=True, y=True) # 栅格设置函数
p1.setLogMode(x=False, y=False) # False代表线性坐标轴True代表对数坐标轴
p1.setLabel('bottom', text='', units='') # x轴设置函数
# p1.addLegend() # 可选择是否添加legend
return p1
def plot_sin_cos(self):
if self.pid_set==0:
self.pid_graph.setYRange(0, 100)
else:
self.pid_graph.setYRange(0, self.pid_set*2)
if self.pid_count > self.max_point:
self.pid_graph.setXRange(self.pid_count-self.max_point, self.pid_count)
else:
self.pid_graph.setXRange(0, self.max_point)
default_pen = pg.mkPen(width=2, color='g')
self.pid_graph.plot(self.obsX.array, self.obsY.array, pen=default_pen, name='', clear=True)
# 在Y轴上画一条水平线
self.pid_graph.addLine(y=self.pid_set, pen=pg.mkPen('y', width=1, style=Qt.DotLine))
def plot_init(self):
self.pid_count = 0
self.pid_set = 0
self.obsX.clear()
self.obsY.clear()
pass
def pyqtgraph_click(self):
examples.run()
pass
def init(self, path):
self.command = Command(path)
ui.bt_pyqtgraph.clicked.connect(self.pyqtgraph_click)
ui.bt_reset.clicked.connect(self.bt_reset_clicked)
ui.bt_com_open.clicked.connect(self.bt_com_open_clicked)
ui.bt_config.clicked.connect(self.bt_send_config)
ui.bt_command3.clicked.connect(self.bt_command3_clicked)
ui.bt_command4.clicked.connect(self.bt_command4_clicked)
ui.bt_command5.clicked.connect(self.bt_command5_clicked)
ui.bt_command6.clicked.connect(self.bt_command6_clicked)
ui.bt_command8.clicked.connect(self.bt_command8_clicked)
ui.bt_command9.clicked.connect(self.bt_command9_clicked)
ui.bt_command10.clicked.connect(self.bt_command10_clicked)
ui.bt_command11.clicked.connect(self.bt_command11_clicked)
ui.bt_set_valve_open.clicked.connect(self.bt_set_valve_open_clicked)
ui.bt_set_valve_close.clicked.connect(self.bt_set_valve_close_clicked)
ui.bt_send_valve_ratio_value.clicked.connect(
self.bt_send_valve_ratio_value_clicked)
ui.bt_read_valve_ratio_value.clicked.connect(
self.bt_read_valve_ratio_value_clicked)
ui.bt_command15.clicked.connect(self.bt_command15_clicked)
ui.bt_content_clear.clicked.connect(ui.txt_content.clear)
ui.radio_ip.clicked.connect(self.plot_init)
ui.radio_valve.clicked.connect(self.plot_init)
self.obsX = common.LimitedArray(self.max_point)
self.obsY = common.LimitedArray(self.max_point)
plt.ion() # 开启一个画图的窗口
# 遍历array_valve并赋值到cb_valve
for i in range(len(array_valve)):
ui.cb_valve.addItem(
array_valve[i][1] + " " + str(array_valve[i][0]))
# 遍历 array_step_motor 并赋值到 cb_step_motor
for i in range(len(array_step_motor)):
ui.cb_step_motor.addItem(
array_step_motor[i][1] + " " + str(array_step_motor[i][0]))
# 参数配置
ui.bt_current_config_write.clicked.connect(
self.bt_current_config_write)
ui.bt_current_config_read.clicked.connect(self.bt_current_config_read)
# 串口号读取
self.cb_com_select_port_clicked()
# 定时器
self.timer = QTimer(self) # 初始化一个定时器
self.command_timer = QTimer(self) # 初始化一个定时器
self.timer.timeout.connect(self.operate) # 计时结束调用operate()方法
self.command_timer.timeout.connect(self.command1) # 计时结束调用operate()方法
self.timer.start(3000) # 设置计时间隔并启动,3秒
self.command_timer.start(200) # 设置计时间隔并启动,100毫秒
# 绑定信号
self.response_si.connect(self.response_done)
self.pid_graph = self.set_graph_ui()
self.pid_si.connect(self.plot_sin_cos)
def operate(self):
# 串口打开判断
if self.serial is not None and self.serial.isOpen():
self.send(self.command.command3())
pass
# 查询IP输入电流或者比例阀
def command1(self):
if self.serial is not None and self.serial.isOpen():
if ui.radio_ip.isChecked():
self.send(self.command.command1())
else:
self.send(self.command.command13())
self.command.command_print = False
self.pid_count += 1
pass
def send(self, data):
if self.serial is None:
# 弹出提示框
QMessageBox(QMessageBox.Warning, '警告', '串口未打开').exec_()
return
self.serial.write(data)
def cb_com_select_port_clicked(self):
ui.cb_com_select_port.clear()
self.com_list = com_get()
if len(self.com_list) >= 0:
for i in range(len(self.com_list)):
s = self.com_list[i][1]
ui.cb_com_select_port.addItem(s)
# 自动打开串口
if self.com_list[i][0] == SERIAL_PORT:
ui.cb_com_select_port.setCurrentIndex(i)
self.bt_com_open_clicked()
def response_done(self, data):
res = self.command.response(data)
if res.msg != '':
ui.txt_content.append(res.msg)
if self.command.cmd == 0x01 or self.command.cmd==0x0d:
if self.pid_count <100:
self.pid_set = self.command.value
self.obsX.append(self.pid_count)
self.obsY.append(self.command.value)
self.pid_si.emit()
def serial_read_thread_func(self):
# 串口读取线程
time.sleep(0.5)
try:
while True:
if self.serial is not None and self.serial.isOpen():
data = self.serial.read(self.serial.inWaiting())
if len(data) > 0:
# common.print_hex_data_space(data)
res = self.command.process(data)
if res is not None and len(res) > 0:
self.response_si.emit(res)
pass
except serial.SerialException as e:
print('exit failed:', e)
finally:
self.serial = None
print('exit')
def bt_com_open_clicked(self): # 点击串口
ui.bt_com_open.setDisabled(True)
if ui.bt_com_open.text() == '打开':
ui.bt_com_open.setText('关闭')
ui.cb_com_select_port.setDisabled(True)
# 打开串口
port_info = ui.cb_com_select_port.currentText()
for i in range(len(self.com_list)):
s = self.com_list[i][1]
if s == port_info:
self.com_port = self.com_list[i][0]
break
self.serial = serial.Serial(self.com_port, BAUDRATE, timeout=0.5)
if self.serial.isOpen():
print('open success')
else:
print('open failed')
self.serial_read_thread = threading.Thread( # 串口读取线程
target=self.serial_read_thread_func)
self.serial_read_thread.setDaemon(True)
self.serial_read_thread.start()
else:
ui.bt_com_open.setText('打开')
ui.cb_com_select_port.setDisabled(False)
# 关闭串口
try:
if self.serial is not None and self.serial.isOpen():
self.serial.flush()
self.serial.close()
print('close success')
except serial.SerialException as e:
print('close failed:', e)
finally:
self.serial = None
ui.bt_com_open.setDisabled(False)
def bt_reset_clicked(self): # 点击复位
self.send(self.command.command0())
def bt_send_config(self):
self.bt_current_config_write()
pass
def bt_current_config_write(self):
# 调节IP输入电流
s = "调节IP输入电流(μA):" + str(ui.sb_current.value())
ui.txt_content.append(s)
self.send(self.command.command2(ui.sb_current.value()))
self.pid_set = ui.sb_current.value()
pass
def bt_current_config_read(self):
# 查询IP输入电流
self.send(self.command.command1())
self.command.command1_print = True
pass
def bt_command3_clicked(self):
self.send(self.command.command3())
pass
def bt_command4_clicked(self):
self.send(self.command.command4())
pass
def bt_command5_clicked(self):
self.send(self.command.command5())
pass
def bt_set_valve_open_clicked(self):
# 获取cb_valve的序号
index = ui.cb_valve.currentIndex()
data = array_valve[index]
status = 1
if data[2] == 0x11:
status = 1
self.send(self.command.command12(data[2], data[0], status))
pass
def bt_command6_clicked(self):
self.send(self.command.command6())
pass
def bt_command8_clicked(self):
self.send(self.command.command8())
pass
def bt_command9_clicked(self):
self.send(self.command.command9(str(ui.sb_device_address.value())))
pass
def bt_command10_clicked(self):
self.send(self.command.command10())
pass
def bt_command11_clicked(self):
self.send(self.command.command11())
pass
def bt_set_valve_close_clicked(self):
index = ui.cb_valve.currentIndex()
data = array_valve[index]
status = False
if data[2] == 0x11:
status = 2
self.send(self.command.command12(data[2], data[0], status))
pass
def bt_send_valve_ratio_value_clicked(self):
value = ui.sb_valve_ratio_value.value()
self.send(self.command.command14(value))
pass
def bt_read_valve_ratio_value_clicked(self):
self.send(self.command.command13())
pass
def bt_command15_clicked(self):
dir = ui.cb_step_motor.currentIndex()
angle = ui.sb_step_motor.value()
print(dir,angle)
self.send(self.command.command15(dir,angle))
pass
if __name__ == '__main__':
app = QApplication(sys.argv)
MainWindow = QMainWindow()
width = 800
height = 600
screen = QApplication.desktop().screenGeometry()
x = (screen.width() - width) / 2
y = (screen.height() - height) / 2
MainWindow.setGeometry(x, y, width, height) # 设置窗口大小为 800x600
MainWindow.setWindowTitle('EPM 测试工具')
# 禁止最大化
MainWindow.setFixedSize(MainWindow.width(), MainWindow.height())
ui = epm.Ui_Form()
ui.setupUi(MainWindow)
process = MainProcess(MainWindow)
process.init("./epm.dll")
MainWindow.show()
sys.exit(app.exec_())