This repository has been archived on 2025-01-02. You can view files and clone it, but cannot push or open issues or pull requests.
torsion/User/test/gui/entity.py

164 lines
4.3 KiB
Python

from ctypes import *
from pkg.common import bytes_to_ctypes, print_hex_data_space
SENSOR_PRESSURE = 0x01 # 压力传感器
SENSOR_FLOW = 0x02 # 流量传感器
SENSOR_TEMPERATURE = 0x03 # 温度传感器
SENSOR_LASER = 0x04 # 激光传感器
SENSOR_MINOR_LOOP = 0x05 #小回路
SENSOR_PROPORTIONAL_VALVE = 0x06 #比例阀
SENSOR_STEP_MOTOR = 0x07 #步进电机
class config_address_t(Structure):
_pack_ = 1 # 1字节对齐
_fields_ = [
("address", c_ubyte * 2),
]
def set_address(self, address="0001"):
self.address = bytes_to_ctypes(bytearray.fromhex(address))
pass
class execute_process_t(Structure):
_pack_ = 1 # 1字节对齐
_fields_ = [
("process_index", c_ubyte),
("plan_index", c_ubyte),
]
pass
class query_data_t(Structure):
class query_data_sensor_t(Structure):
_pack_ = 1 # 1字节对齐
_fields_ = [
("sensor_class", c_ubyte),
("sensor_1", c_uint8, 1),
("sensor_2", c_uint8, 1),
("sensor_3", c_uint8, 1),
("sensor_4", c_uint8, 1),
("sensor_5", c_uint8, 1),
("sensor_6", c_uint8, 1),
("sensor_7", c_uint8, 1),
("sensor_8", c_uint8, 1),
]
_pack_ = 1 # 1字节对齐
_fields_ = [
("count", c_ubyte),
("data", query_data_sensor_t*2),
]
def set_data(self, data):
self.data = data
pass
class calibration_sensor_t(BigEndianStructure): # 标定传感器数据域
_pack_ = 1 # 1字节对齐
_fields_ = [
("state", c_ubyte),
("sensor_data", query_data_t),
]
def set_calibration_data(self, data=0.0):
self.calibration_data = data
pass
class stepper_motor_t(Structure): # 标定传感器数据域
_pack_ = 1 # 1字节对齐
_fields_ = [
("dir", c_uint8),
("angle", c_float),
]
pass
class set_valve_t(Structure): # 设置阀门数据域
_pack_ = 1 # 1字节对齐
_fields_ = [
("unit", c_ubyte),
("status", c_ubyte),
("index", c_ubyte),
]
pass
class adjust_ip_input_current_t(BigEndianStructure): # 调整输入电流数据域
_pack_ = 1 # 1字节对齐
_fields_ = [
("value", c_float),
]
def set(self, data=0.0):
self.value = data
pass
class command_req_data_u(Union):
_pack_ = 1 # 1字节对齐
_fields_ = [
("adjust_ip_input_current", adjust_ip_input_current_t),
("config_address", config_address_t),
("execute_process", execute_process_t),
("query_data", query_data_t),
("calibration_sensor", calibration_sensor_t),
("set_valve", set_valve_t),
("set_valve_ratio", c_float),
("stepper_motor", stepper_motor_t),]
pass
class command_req_t(Structure):
_pack_ = 1 # 1字节对齐
_anonymous_ = ("data",)
_fields_ = [
("src", c_ubyte * 2),
("dst", c_ubyte * 2),
("command", c_ubyte),
("data", command_req_data_u),
]
def __init__(self) -> None:
self.src = (0xff, 0xff)
self.dst = (0x00, 0x01)
def set_src(self, address="ffff"):
self.src = bytes_to_ctypes(bytearray.fromhex(address))
def set_dst(self, address="0001"):
self.dst = bytes_to_ctypes(bytearray.fromhex(address))
def set_command(self, command):
self.command = c_uint8(command)
pass
response_call_func = CFUNCTYPE(c_void_p, POINTER(c_ubyte), c_ushort)
def agreement_init():
class handle_t(Structure):
_pack_ = 1 # 1字节对齐
_fields_ = [
("slave", c_ubyte),
("response_call", response_call_func),
]
response = []
request = command_req_t()
def set_master(self):
self.slave = 0
def set_slave(self):
self.slave = 1
def response_cb(self, data, length):
self.response = data[:length]
print_hex_data_space(data[:length], False)
pass
pass
handle = handle_t()
handle.response_call = response_call_func(handle.response_cb)
return handle