from ctypes import * from ctypes import cdll import entity import pkg.common as common # assic数组转字符串 def assic_to_str(assic): return ''.join([chr(i) for i in assic]) def get_version(data): return str(data >> 4) + '.' + str(data & 0x0f) def get_status(data): if data == 0: return '未执行' elif data == 1: return '执行中' elif data == 2: return '准备完毕' elif data == 3: return '执行失败' else: return '未知状态' class Response(object): msg = None class slave_req_query_state_t(Structure): _pack_ = 1 # 1字节对齐 _fields_ = [ ("code", c_ubyte * 11), ("version", c_ubyte), ("status", c_ubyte), ("process_index", c_ubyte), ("plan_index", c_ubyte), ("action_index", c_ubyte), ("two_way_valve", c_ubyte), ("three_way_valve", c_ubyte), ] class Command(object): dll = None handle = None command_print = True cmd = 0 value = 0 def __init__(self, path): self.dll = cdll.LoadLibrary(path) self.handle = entity.agreement_init() self.handle.set_master() self.dll.pbuf_initz() ret = self.dll.uart_init(self.dll.agreement_master_rsp) assert bool(ret), "uart_init failed" ret = self.dll.agreement_init(byref(self.handle)) assert bool(ret), "agreement_init failed" def process(self, data): self.handle.response.clear() with common.HiddenPrints(): ret = self.dll.uart_recv_data(common.bytes_to_ctypes( data), len(data)) assert bool(ret), "uart_recv_data failed" return self.handle.response # 复位指令 def command0(self): with common.HiddenPrints(): self.handle.request.set_command(0x00) ret = self.dll.agreement_master_req(byref(self.handle.request)) assert bool(ret), "reset failed" return self.handle.response def command1(self): # 查询IP输入电流 with common.HiddenPrints(): self.handle.request.set_command(0x01) ret = self.dll.agreement_master_req(byref(self.handle.request)) assert bool(ret), "read_current failed" return self.handle.response def command2(self, current): # 调节IP输入电流 with common.HiddenPrints(): self.handle.request.set_command(0x02) self.handle.request.data.adjust_ip_input_current.set(current) ret = self.dll.agreement_master_req(byref(self.handle.request)) assert bool(ret), "set_current failed" return self.handle.response def command3(self): # 查询状态 with common.HiddenPrints(): self.handle.request.set_command(0x03) ret = self.dll.agreement_master_req(byref(self.handle.request)) assert bool(ret), "read_status failed" return self.handle.response def command4(self): # 查询流程 with common.HiddenPrints(): self.handle.request.set_command(0x04) ret = self.dll.agreement_master_req(byref(self.handle.request)) assert bool(ret), "read_flow failed" return self.handle.response def command5(self): # 配置流程 with common.HiddenPrints(): self.handle.request.set_command(0x05) ret = self.dll.mock_command_req_config_process() assert bool(ret), "config_flow failed" # common.print_hex_data_space(self.handle.response) return self.handle.response def command6(self): # 执行流程 with common.HiddenPrints(): self.handle.request.set_command(0x06) self.handle.request.data.execute_process.process_index = 0x00 self.handle.request.data.execute_process.plan_index = 0x00 ret = self.dll.agreement_master_req(byref(self.handle.request)) assert bool(ret), "execute_flow failed" return self.handle.response def command7(self): # 停止流程 with common.HiddenPrints(): self.handle.request.set_command(0x07) ret = self.dll.agreement_master_req(byref(self.handle.request)) assert bool(ret), "stop_flow failed" return self.handle.response def command8(self): # 查询数据 with common.HiddenPrints(): self.handle.request.set_command(0x08) self.handle.request.data.query_data.count = 2 sensor = entity.query_data_t() sensor.count = 3 d1 = entity.query_data_t.query_data_sensor_t() d1.sensor_class = entity.SENSOR_PRESSURE d1.sensor_1 = 1 d1.sensor_2 = 1 d1.sensor_3 = 1 d1.sensor_4 = 1 d1.sensor_5 = 1 d1.sensor_6 = 1 d2 = entity.query_data_t.query_data_sensor_t() d2.sensor_class = entity.SENSOR_STEP_MOTOR d2.sensor_1 = 1 self.handle.request.data.query_data.set_data((d1, d2)) ret = self.dll.agreement_master_req(byref(self.handle.request)) assert bool(ret), "read_data failed" return self.handle.response def command9(self, address="1"): # 配置地址 address = address.zfill(4) with common.HiddenPrints(): self.handle.request.set_command(0x09) self.handle.request.data.config_address.set_address(address) ret = self.dll.agreement_master_req(byref(self.handle.request)) assert bool(ret), "config_address failed" return self.handle.response def command10(self): # 查询地址 with common.HiddenPrints(): self.handle.request.set_command(0x0a) ret = self.dll.agreement_master_req(byref(self.handle.request)) assert bool(ret), "read_address failed" return self.handle.response def command11(self): # 标定传感器 with common.HiddenPrints(): self.handle.request.set_command(0x0b) self.handle.request.data.calibration_sensor.state = 0 # 0:零位 1:满值 sensor = entity.query_data_t() sensor.count = 2 d1 = entity.query_data_t.query_data_sensor_t() d1.sensor_class = entity.SENSOR_PRESSURE d1.sensor_1 = 1 d1.sensor_2 = 1 d1.sensor_3 = 1 d2 = entity.query_data_t.query_data_sensor_t() d2.sensor_class = entity.SENSOR_FLOW d2.sensor_1 = 1 d2.sensor_2 = 1 sensor.set_data((d1, d2)) self.handle.request.data.calibration_sensor.sensor_data = sensor ret = self.dll.agreement_master_req(byref(self.handle.request)) assert bool(ret), "calibration_sensor failed" return self.handle.response def command12(self, valve_type=0x10, valve_index=1, status=0): # 设置阀门状态 with common.HiddenPrints(): self.handle.request.set_command(0x0C) self.handle.request.data.set_valve.unit = valve_type self.handle.request.data.set_valve.status = status self.handle.request.data.set_valve.index = valve_index ret = self.dll.agreement_master_req(byref(self.handle.request)) assert bool(ret), "set_valve failed" return self.handle.response def command13(self): with common.HiddenPrints(): self.handle.request.set_command(0x0D) ret = self.dll.agreement_master_req(byref(self.handle.request)) assert bool(ret), "read_valve failed" return self.handle.response def command14(self, value=0): with common.HiddenPrints(): self.handle.request.set_command(0x0E) self.handle.request.data.set_valve_ratio = value print(self.handle.request.data.set_valve_ratio) ret = self.dll.agreement_master_req(byref(self.handle.request)) assert bool(ret), "send_valve failed" return self.handle.response def command15(self,dir,angle): with common.HiddenPrints(): self.handle.request.set_command(0x0F) self.handle.request.data.stepper_motor.dir = dir self.handle.request.data.stepper_motor.angle = angle ret = self.dll.agreement_master_req(byref(self.handle.request)) assert bool(ret), "read_valve failed" return self.handle.response # 处理应答 def response(self, data): rs = Response() s = '' f = c_float() cmd = data[0] - 0x80 self.cmd = cmd bs = data[1:] if cmd == 0x00: s = '复位成功' elif cmd == 0x01: bs = bs[4:] l = bs[::-1] memmove(addressof(f), common.bytes_to_ctypes(l), sizeof(c_float)) if self.command_print == True: s = '查询IP输入电流' s = s + ' ' + str(int(f.value)) else: s = '' self.value = f.value elif cmd == 0x02: s = '调节IP输入电流成功' elif cmd == 0x03: s = '查询状态' r = slave_req_query_state_t() memmove(addressof(r), common.bytes_to_ctypes(bs), sizeof(slave_req_query_state_t)) s = s + ' 标识'+assic_to_str(r.code) + ' 版本号:' + get_version(r.version) + ' 状态:' + get_status(r.status)+' 流程:'+str(r.process_index) + \ ' 方案:'+str(r.plan_index)+' 动作:'+str(r.action_index) + \ ' 两通阀状态:'+str(r.two_way_valve)+' 三通阀状态:'+str(r.three_way_valve) # print(s) s = "" elif cmd == 0x04: s = '查询流程 '+common.get_str_hex(bs) elif cmd == 0x08: s = '读取数据 '+common.get_str_hex(bs) elif cmd == 0x0a: s = '查询地址 '+common.get_str_hex(bs) elif cmd == 0x0d: bs = bs[4:] l = bs[::-1] memmove(addressof(f), common.bytes_to_ctypes(l), sizeof(c_float)) if self.command_print == True: s = '查询比例阀输入电流' s = s + ' ' + str(int(f.value)) else: s = '' self.value = f.value elif cmd == 0x0e: s = '设置比例阀电流 '+common.get_str_hex(bs) else: s = '发送成功' rs.msg = s return rs