from ctypes import * from pkg.common import bytes_to_ctypes, print_hex_data_space SENSOR_PRESSURE = 0x01 # 压力传感器 SENSOR_FLOW = 0x02 # 流量传感器 SENSOR_TEMPERATURE = 0x03 # 温度传感器 SENSOR_LASER = 0x04 # 激光传感器 SENSOR_MINOR_LOOP = 0x05 # 小回路 SENSOR_PROPORTIONAL_VALVE = 0x06 # 比例阀 SENSOR_STEP_MOTOR = 0x07 # 步进电机 SENSOR_PT100_TEMPERATURE = 0x08 # PT100温度传感器 class config_address_t(Structure): _pack_ = 1 # 1字节对齐 _fields_ = [ ("address", c_ubyte * 2), ] def set_address(self, address="0001"): self.address = bytes_to_ctypes(bytearray.fromhex(address)) pass class execute_process_t(Structure): _pack_ = 1 # 1字节对齐 _fields_ = [ ("process_index", c_ubyte), ("plan_index", c_ubyte), ] pass class query_data_t(Structure): class query_data_sensor_t(Structure): _pack_ = 1 # 1字节对齐 _fields_ = [ ("sensor_class", c_ubyte), ("sensor_1", c_uint8, 1), ("sensor_2", c_uint8, 1), ("sensor_3", c_uint8, 1), ("sensor_4", c_uint8, 1), ("sensor_5", c_uint8, 1), ("sensor_6", c_uint8, 1), ("sensor_7", c_uint8, 1), ("sensor_8", c_uint8, 1), ] _pack_ = 1 # 1字节对齐 _fields_ = [ ("count", c_ubyte), ("data", query_data_sensor_t*2), ] def set_data(self, data): self.data = data pass class calibration_sensor_t(BigEndianStructure): # 标定传感器数据域 _pack_ = 1 # 1字节对齐 _fields_ = [ ("state", c_ubyte), ("sensor_data", query_data_t), ] def set_calibration_data(self, data=0.0): self.calibration_data = data pass class stepper_motor_t(Structure): # 设定电机数据域 _pack_ = 1 # 1字节对齐 _fields_ = [ ("dir", c_uint8), ("angle", c_float), ] pass class adjust_ip_pwm_duty_t(BigEndianStructure): # 调整输入电流数据域 _pack_ = 1 # 1字节对齐 _fields_ = [ ("percent", c_float), ] class ip_mode_t(BigEndianStructure): # 设置I/P 模式数据域 _pack_ = 1 # 1字节对齐 _fields_ = [ ("mode", c_uint8), ("data_length", c_uint8), ("data", c_uint8), ] pass class set_valve_t(Structure): # 设置阀门数据域 _pack_ = 1 # 1字节对齐 _fields_ = [ ("unit", c_ubyte), ("status", c_ubyte), ("index", c_ubyte), ] pass class query_valve_ratio_t(Structure): # 查询比例阀数据域 _pack_ = 1 # 1字节对齐 _fields_ = [ ("no", c_ubyte), ] pass class set_valve_ratio_t(Structure): # 设置比例阀数据域 _pack_ = 1 # 1字节对齐 _fields_ = [ ("value_no", c_ubyte), ("value", c_float), ("pid_sensor_class", c_ubyte), ("pid_sensor_no", c_ubyte), ] pass class adjust_ip_input_current_t(BigEndianStructure): # 调整输入电流数据域 _pack_ = 1 # 1字节对齐 _fields_ = [ ("value", c_float), ] def set(self, data=0.0): self.value = data pass class command_req_data_u(Union): _pack_ = 1 # 1字节对齐 _fields_ = [ ("adjust_ip_input_current", adjust_ip_input_current_t), ("config_address", config_address_t), ("execute_process", execute_process_t), ("query_data", query_data_t), ("calibration_sensor", calibration_sensor_t), ("set_valve", set_valve_t), ("query_valve_ratio", query_valve_ratio_t), ("set_valve_ratio", set_valve_ratio_t), ("stepper_motor", stepper_motor_t), ("adjust_ip_pwm_duty", adjust_ip_pwm_duty_t), ("ip_mode", ip_mode_t),] pass class command_req_t(Structure): _pack_ = 1 # 1字节对齐 _anonymous_ = ("data",) _fields_ = [ ("src", c_ubyte * 2), ("dst", c_ubyte * 2), ("command", c_ubyte), ("data", command_req_data_u), ] def __init__(self) -> None: self.src = (0xff, 0xff) self.dst = (0x00, 0x01) def set_src(self, address="ffff"): self.src = bytes_to_ctypes(bytearray.fromhex(address)) def set_dst(self, address="0001"): self.dst = bytes_to_ctypes(bytearray.fromhex(address)) def set_command(self, command): self.command = c_uint8(command) pass response_call_func = CFUNCTYPE(c_void_p, POINTER(c_ubyte), c_ushort) def agreement_init(): class handle_t(Structure): _pack_ = 1 # 1字节对齐 _fields_ = [ ("slave", c_ubyte), ("response_call", response_call_func), ] response = [] request = command_req_t() def set_master(self): self.slave = 0 def set_slave(self): self.slave = 1 def response_cb(self, data, length): self.response = data[:length] print_hex_data_space(data[:length], False) pass pass handle = handle_t() handle.response_call = response_call_func(handle.response_cb) return handle