import os import sys import datetime import threading import time import epm import serial import serial.tools.list_ports from PyQt5.QtCore import * from PyQt5.QtWidgets import QApplication, QMainWindow, QMessageBox from command import Command import pyqtgraph as pg import numpy as np import matplotlib.pyplot as plt import pkg.common as common from pyqtgraph import examples # examples.run() SERIAL_PORT = 'COM10' # 串口号 # 串口速率定义 BAUDRATE = 115200 array_valve = [[1, "两通阀", 0x10], [2, "两通阀", 0x10], [3, "两通阀", 0x10], [4, "两通阀", 0x10], [5, "两通阀", 0x10], [1, "三通阀", 0x11], [2, "三通阀", 0x11], [3, "三通阀", 0x11], [4, "三通阀", 0x11]] array_step_motor = [[0,"反转"],[1,"正转"]] def com_get(): port_list = list(serial.tools.list_ports.comports()) return port_list class MainProcess(QMainWindow): # 定义变量 dll = None command = None com_list = [] serial = None serial_read_thread = None com_port = None response_si = pyqtSignal(list) # 给定一个X轴和Y轴的参数列表,用作后面承载数据 max_point = 100 obsX = None obsY = None pid_count = 0 pid_set = 500 pid_graph = None pid_si = pyqtSignal() def set_graph_ui(self): pg.setConfigOptions(antialias=False) # pyqtgraph全局变量设置函数,antialias=True开启曲线抗锯齿 win = pg.GraphicsLayoutWidget() # 创建pg layout,可实现数据界面布局自动管理 # pg绘图窗口可以作为一个widget添加到GUI中的graph_layout,当然也可以添加到Qt其他所有的容器中 ui.graph_layout.addWidget(win) p1 = win.addPlot(title="PID") # 添加第一个绘图窗口 p1.setLabel('left', text='', color='#ffffff') # y轴设置函数 p1.showGrid(x=True, y=True) # 栅格设置函数 p1.setLogMode(x=False, y=False) # False代表线性坐标轴,True代表对数坐标轴 p1.setLabel('bottom', text='', units='次') # x轴设置函数 # p1.addLegend() # 可选择是否添加legend return p1 def plot_sin_cos(self): if self.pid_set==0: self.pid_graph.setYRange(0, 100) else: self.pid_graph.setYRange(0, self.pid_set*2) if self.pid_count > self.max_point: self.pid_graph.setXRange(self.pid_count-self.max_point, self.pid_count) else: self.pid_graph.setXRange(0, self.max_point) default_pen = pg.mkPen(width=2, color='g') self.pid_graph.plot(self.obsX.array, self.obsY.array, pen=default_pen, name='', clear=True) # 在Y轴上画一条水平线 self.pid_graph.addLine(y=self.pid_set, pen=pg.mkPen('y', width=1, style=Qt.DotLine)) def plot_init(self): self.pid_count = 0 self.pid_set = 0 self.obsX.clear() self.obsY.clear() pass def pyqtgraph_click(self): examples.run() pass def init(self, path): self.command = Command(path) ui.bt_pyqtgraph.clicked.connect(self.pyqtgraph_click) ui.bt_reset.clicked.connect(self.bt_reset_clicked) ui.bt_com_open.clicked.connect(self.bt_com_open_clicked) ui.bt_config.clicked.connect(self.bt_send_config) ui.bt_command3.clicked.connect(self.bt_command3_clicked) ui.bt_command4.clicked.connect(self.bt_command4_clicked) ui.bt_command5.clicked.connect(self.bt_command5_clicked) ui.bt_command6.clicked.connect(self.bt_command6_clicked) ui.bt_command8.clicked.connect(self.bt_command8_clicked) ui.bt_command9.clicked.connect(self.bt_command9_clicked) ui.bt_command10.clicked.connect(self.bt_command10_clicked) ui.bt_command11.clicked.connect(self.bt_command11_clicked) ui.bt_set_valve_open.clicked.connect(self.bt_set_valve_open_clicked) ui.bt_set_valve_close.clicked.connect(self.bt_set_valve_close_clicked) ui.bt_send_valve_ratio_value.clicked.connect( self.bt_send_valve_ratio_value_clicked) ui.bt_read_valve_ratio_value.clicked.connect( self.bt_read_valve_ratio_value_clicked) ui.bt_command15.clicked.connect(self.bt_command15_clicked) ui.bt_content_clear.clicked.connect(ui.txt_content.clear) ui.radio_ip.clicked.connect(self.plot_init) ui.radio_valve.clicked.connect(self.plot_init) self.obsX = common.LimitedArray(self.max_point) self.obsY = common.LimitedArray(self.max_point) plt.ion() # 开启一个画图的窗口 # 遍历array_valve并赋值到cb_valve for i in range(len(array_valve)): ui.cb_valve.addItem( array_valve[i][1] + " " + str(array_valve[i][0])) # 遍历 array_step_motor 并赋值到 cb_step_motor for i in range(len(array_step_motor)): ui.cb_step_motor.addItem( array_step_motor[i][1] + " " + str(array_step_motor[i][0])) # 参数配置 ui.bt_current_config_write.clicked.connect( self.bt_current_config_write) ui.bt_current_config_read.clicked.connect(self.bt_current_config_read) # 串口号读取 self.cb_com_select_port_clicked() # 定时器 self.timer = QTimer(self) # 初始化一个定时器 self.command_timer = QTimer(self) # 初始化一个定时器 self.timer.timeout.connect(self.operate) # 计时结束调用operate()方法 self.command_timer.timeout.connect(self.command1) # 计时结束调用operate()方法 self.timer.start(3000) # 设置计时间隔并启动,3秒 self.command_timer.start(200) # 设置计时间隔并启动,100毫秒 # 绑定信号 self.response_si.connect(self.response_done) self.pid_graph = self.set_graph_ui() self.pid_si.connect(self.plot_sin_cos) def operate(self): # 串口打开判断 if self.serial is not None and self.serial.isOpen(): self.send(self.command.command3()) pass # 查询IP输入电流或者比例阀 def command1(self): if self.serial is not None and self.serial.isOpen(): if ui.radio_ip.isChecked(): self.send(self.command.command1()) else: self.send(self.command.command13()) self.command.command_print = False self.pid_count += 1 pass def send(self, data): if self.serial is None: # 弹出提示框 QMessageBox(QMessageBox.Warning, '警告', '串口未打开').exec_() return self.serial.write(data) def cb_com_select_port_clicked(self): ui.cb_com_select_port.clear() self.com_list = com_get() if len(self.com_list) >= 0: for i in range(len(self.com_list)): s = self.com_list[i][1] ui.cb_com_select_port.addItem(s) # 自动打开串口 if self.com_list[i][0] == SERIAL_PORT: ui.cb_com_select_port.setCurrentIndex(i) self.bt_com_open_clicked() def response_done(self, data): res = self.command.response(data) if res.msg != '': ui.txt_content.append(res.msg) if self.command.cmd == 0x01 or self.command.cmd==0x0d: if self.pid_count <100: self.pid_set = self.command.value self.obsX.append(self.pid_count) self.obsY.append(self.command.value) self.pid_si.emit() def serial_read_thread_func(self): # 串口读取线程 time.sleep(0.5) try: while True: if self.serial is not None and self.serial.isOpen(): data = self.serial.read(self.serial.inWaiting()) if len(data) > 0: # common.print_hex_data_space(data) res = self.command.process(data) if res is not None and len(res) > 0: self.response_si.emit(res) pass except serial.SerialException as e: print('exit failed:', e) finally: self.serial = None print('exit') def bt_com_open_clicked(self): # 点击串口 ui.bt_com_open.setDisabled(True) if ui.bt_com_open.text() == '打开': ui.bt_com_open.setText('关闭') ui.cb_com_select_port.setDisabled(True) # 打开串口 port_info = ui.cb_com_select_port.currentText() for i in range(len(self.com_list)): s = self.com_list[i][1] if s == port_info: self.com_port = self.com_list[i][0] break self.serial = serial.Serial(self.com_port, BAUDRATE, timeout=0.5) if self.serial.isOpen(): print('open success') else: print('open failed') self.serial_read_thread = threading.Thread( # 串口读取线程 target=self.serial_read_thread_func) self.serial_read_thread.setDaemon(True) self.serial_read_thread.start() else: ui.bt_com_open.setText('打开') ui.cb_com_select_port.setDisabled(False) # 关闭串口 try: if self.serial is not None and self.serial.isOpen(): self.serial.flush() self.serial.close() print('close success') except serial.SerialException as e: print('close failed:', e) finally: self.serial = None ui.bt_com_open.setDisabled(False) def bt_reset_clicked(self): # 点击复位 self.send(self.command.command0()) def bt_send_config(self): self.bt_current_config_write() pass def bt_current_config_write(self): # 调节IP输入电流 s = "调节IP输入电流(μA):" + str(ui.sb_current.value()) ui.txt_content.append(s) self.send(self.command.command2(ui.sb_current.value())) self.pid_set = ui.sb_current.value() pass def bt_current_config_read(self): # 查询IP输入电流 self.send(self.command.command1()) self.command.command1_print = True pass def bt_command3_clicked(self): self.send(self.command.command3()) pass def bt_command4_clicked(self): self.send(self.command.command4()) pass def bt_command5_clicked(self): self.send(self.command.command5()) pass def bt_set_valve_open_clicked(self): # 获取cb_valve的序号 index = ui.cb_valve.currentIndex() data = array_valve[index] status = 1 if data[2] == 0x11: status = 1 self.send(self.command.command12(data[2], data[0], status)) pass def bt_command6_clicked(self): self.send(self.command.command6()) pass def bt_command8_clicked(self): self.send(self.command.command8()) pass def bt_command9_clicked(self): self.send(self.command.command9(str(ui.sb_device_address.value()))) pass def bt_command10_clicked(self): self.send(self.command.command10()) pass def bt_command11_clicked(self): self.send(self.command.command11()) pass def bt_set_valve_close_clicked(self): index = ui.cb_valve.currentIndex() data = array_valve[index] status = False if data[2] == 0x11: status = 2 self.send(self.command.command12(data[2], data[0], status)) pass def bt_send_valve_ratio_value_clicked(self): value = ui.sb_valve_ratio_value.value() self.send(self.command.command14(value)) pass def bt_read_valve_ratio_value_clicked(self): self.send(self.command.command13()) pass def bt_command15_clicked(self): dir = ui.cb_step_motor.currentIndex() angle = ui.sb_step_motor.value() print(dir,angle) self.send(self.command.command15(dir,angle)) pass if __name__ == '__main__': app = QApplication(sys.argv) MainWindow = QMainWindow() width = 800 height = 600 screen = QApplication.desktop().screenGeometry() x = (screen.width() - width) / 2 y = (screen.height() - height) / 2 MainWindow.setGeometry(x, y, width, height) # 设置窗口大小为 800x600 MainWindow.setWindowTitle('EPM 测试工具') # 禁止最大化 MainWindow.setFixedSize(MainWindow.width(), MainWindow.height()) ui = epm.Ui_Form() ui.setupUi(MainWindow) process = MainProcess(MainWindow) process.init("./epm.dll") MainWindow.show() sys.exit(app.exec_())