This repository has been archived on 2025-01-02. You can view files and clone it, but cannot push or open issues or pull requests.
torsion/User/test/gui/command.py

288 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from ctypes import *
from ctypes import cdll
import entity
import pkg.common as common
# assic数组转字符串
def assic_to_str(assic):
return ''.join([chr(i) for i in assic])
def get_version(data):
return str(data >> 4) + '.' + str(data & 0x0f)
def get_status(data):
if data == 0:
return '未执行'
elif data == 1:
return '执行中'
elif data == 2:
return '准备完毕'
elif data == 3:
return '执行失败'
else:
return '未知状态'
class Response(object):
msg = None
class slave_req_query_state_t(Structure):
_pack_ = 1 # 1字节对齐
_fields_ = [
("code", c_ubyte * 11),
("version", c_ubyte),
("status", c_ubyte),
("process_index", c_ubyte),
("plan_index", c_ubyte),
("action_index", c_ubyte),
("two_way_valve", c_ubyte), ("three_way_valve", c_ubyte),
]
class Command(object):
dll = None
handle = None
command_print = True
cmd = 0
value = 0
def __init__(self, path):
self.dll = cdll.LoadLibrary(path)
self.handle = entity.agreement_init()
self.handle.set_master()
self.dll.pbuf_initz()
# ret = self.dll.uart1_init(self.dll.agreement_master_rsp)
# assert bool(ret), "uart_init failed"
ret = self.dll.agreement_init(byref(self.handle))
assert bool(ret), "agreement_init failed"
def process(self, data):
self.handle.response.clear()
with common.HiddenPrints():
ret = self.dll.uart_recv_data(common.bytes_to_ctypes(
data), len(data))
assert bool(ret), "uart_recv_data failed"
return self.handle.response
# 复位指令
def command0(self):
with common.HiddenPrints():
self.handle.request.set_command(0x00)
ret = self.dll.agreement_master_req(byref(self.handle.request))
assert bool(ret), "reset failed"
return self.handle.response
def command1(self): # 查询IP输入电流
with common.HiddenPrints():
self.handle.request.set_command(0x01)
ret = self.dll.agreement_master_req(byref(self.handle.request))
assert bool(ret), "read_current failed"
return self.handle.response
def command2(self, current): # 调节IP输入电流
with common.HiddenPrints():
self.handle.request.set_command(0x02)
self.handle.request.data.adjust_ip_input_current.set(current)
ret = self.dll.agreement_master_req(byref(self.handle.request))
assert bool(ret), "set_current failed"
return self.handle.response
def command3(self): # 查询状态
with common.HiddenPrints():
self.handle.request.set_command(0x03)
ret = self.dll.agreement_master_req(byref(self.handle.request))
assert bool(ret), "read_status failed"
return self.handle.response
def command4(self): # 查询流程
with common.HiddenPrints():
self.handle.request.set_command(0x04)
ret = self.dll.agreement_master_req(byref(self.handle.request))
assert bool(ret), "read_flow failed"
return self.handle.response
def command5(self): # 配置流程
with common.HiddenPrints():
self.handle.request.set_command(0x05)
ret = self.dll.mock_command_req_config_process()
assert bool(ret), "config_flow failed"
# common.print_hex_data_space(self.handle.response)
return self.handle.response
def command6(self): # 执行流程
with common.HiddenPrints():
self.handle.request.set_command(0x06)
self.handle.request.data.execute_process.process_index = 0x00
self.handle.request.data.execute_process.plan_index = 0x00
ret = self.dll.agreement_master_req(byref(self.handle.request))
assert bool(ret), "execute_flow failed"
return self.handle.response
def command7(self): # 停止流程
with common.HiddenPrints():
self.handle.request.set_command(0x07)
ret = self.dll.agreement_master_req(byref(self.handle.request))
assert bool(ret), "stop_flow failed"
return self.handle.response
def command8(self): # 查询数据
with common.HiddenPrints():
self.handle.request.set_command(0x08)
self.handle.request.data.query_data.count = 2
sensor = entity.query_data_t()
sensor.count = 3
d1 = entity.query_data_t.query_data_sensor_t()
d1.sensor_class = entity.SENSOR_PRESSURE
d1.sensor_1 = 1
d1.sensor_2 = 1
d1.sensor_3 = 1
d1.sensor_4 = 1
d1.sensor_5 = 1
d1.sensor_6 = 1
d2 = entity.query_data_t.query_data_sensor_t()
d2.sensor_class = entity.SENSOR_STEP_MOTOR
d2.sensor_1 = 1
self.handle.request.data.query_data.set_data((d1, d2))
ret = self.dll.agreement_master_req(byref(self.handle.request))
assert bool(ret), "read_data failed"
return self.handle.response
def command9(self, address="1"): # 配置地址
address = address.zfill(4)
with common.HiddenPrints():
self.handle.request.set_command(0x09)
self.handle.request.data.config_address.set_address(address)
ret = self.dll.agreement_master_req(byref(self.handle.request))
assert bool(ret), "config_address failed"
return self.handle.response
def command10(self): # 查询地址
with common.HiddenPrints():
self.handle.request.set_command(0x0a)
ret = self.dll.agreement_master_req(byref(self.handle.request))
assert bool(ret), "read_address failed"
return self.handle.response
def command11(self): # 标定传感器
with common.HiddenPrints():
self.handle.request.set_command(0x0b)
self.handle.request.data.calibration_sensor.state = 0 # 0零位 1满值
sensor = entity.query_data_t()
sensor.count = 2
d1 = entity.query_data_t.query_data_sensor_t()
d1.sensor_class = entity.SENSOR_PRESSURE
d1.sensor_1 = 1
d1.sensor_2 = 1
d1.sensor_3 = 1
d2 = entity.query_data_t.query_data_sensor_t()
d2.sensor_class = entity.SENSOR_FLOW
d2.sensor_1 = 1
d2.sensor_2 = 1
sensor.set_data((d1, d2))
self.handle.request.data.calibration_sensor.sensor_data = sensor
ret = self.dll.agreement_master_req(byref(self.handle.request))
assert bool(ret), "calibration_sensor failed"
return self.handle.response
def command12(self, valve_type=0x10, valve_index=1, status=0): # 设置阀门状态
with common.HiddenPrints():
self.handle.request.set_command(0x0C)
self.handle.request.data.set_valve.unit = valve_type
self.handle.request.data.set_valve.status = status
self.handle.request.data.set_valve.index = valve_index
ret = self.dll.agreement_master_req(byref(self.handle.request))
assert bool(ret), "set_valve failed"
return self.handle.response
def command13(self):
with common.HiddenPrints():
self.handle.request.set_command(0x0D)
ret = self.dll.agreement_master_req(byref(self.handle.request))
assert bool(ret), "read_valve failed"
return self.handle.response
def command14(self, value=0):
with common.HiddenPrints():
self.handle.request.set_command(0x0E)
self.handle.request.data.set_valve_ratio = value
print(self.handle.request.data.set_valve_ratio)
ret = self.dll.agreement_master_req(byref(self.handle.request))
assert bool(ret), "send_valve failed"
return self.handle.response
def command15(self, dir, angle):
with common.HiddenPrints():
self.handle.request.set_command(0x0F)
self.handle.request.data.stepper_motor.dir = dir
self.handle.request.data.stepper_motor.angle = angle
ret = self.dll.agreement_master_req(byref(self.handle.request))
assert bool(ret), "read_valve failed"
return self.handle.response
# 处理应答
def response(self, data):
rs = Response()
s = ''
f = c_float()
cmd = data[0] - 0x80
self.cmd = cmd
bs = data[1:]
if cmd == 0x00:
s = '复位成功'
elif cmd == 0x01:
bs = bs[4:]
l = bs[::-1]
memmove(addressof(f),
common.bytes_to_ctypes(l), sizeof(c_float))
if self.command_print == True:
s = '查询IP输入电流'
s = s + ' ' + str(int(f.value))
else:
s = ''
self.value = f.value
elif cmd == 0x02:
s = '调节IP输入电流成功'
elif cmd == 0x03:
s = '查询状态'
r = slave_req_query_state_t()
memmove(addressof(r),
common.bytes_to_ctypes(bs), sizeof(slave_req_query_state_t))
s = s + ' 标识'+assic_to_str(r.code) + ' 版本号:' + get_version(r.version) + ' 状态:' + get_status(r.status)+' 流程:'+str(r.process_index) + \
' 方案:'+str(r.plan_index)+' 动作:'+str(r.action_index) + \
' 两通阀状态:'+str(r.two_way_valve)+' 三通阀状态:'+str(r.three_way_valve)
# print(s)
s = ""
elif cmd == 0x04:
s = '查询流程 '+common.get_str_hex(bs)
elif cmd == 0x08:
s = '读取数据 '+common.get_str_hex(bs)
elif cmd == 0x0a:
s = '查询地址 '+common.get_str_hex(bs)
elif cmd == 0x0d:
bs = bs[4:]
l = bs[::-1]
memmove(addressof(f),
common.bytes_to_ctypes(l), sizeof(c_float))
if self.command_print == True:
s = '查询比例阀输入电流'
s = s + ' ' + str(int(f.value))
else:
s = ''
self.value = f.value
elif cmd == 0x0e:
s = '设置比例阀电流 '+common.get_str_hex(bs)
else:
s = '发送成功'
rs.msg = s
return rs