-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathms40plus.py
More file actions
113 lines (99 loc) · 2.8 KB
/
ms40plus.py
File metadata and controls
113 lines (99 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import enum
import functools
import operator
def checksum(data):
return functools.reduce(operator.xor, data)
class Window(enum.Enum):
StartStop = 0
RemoteSerial = 8
SetPointValueHz = 102
SetPointHysteresis = 105
BaudRate = 108
RotationalFrequencySettingHz = 120
RotationalFrequencySettingRpm = 127
BusCurrent = 200
ThreePhaseVoltage = 201
Power = 202
DrivingFrequency = 203
Status = 205
ErrorCode = 206
OutputFrequency = 207
ControllerTemperature = 216
PowerSupplyTemperature = 222
OilLevel = 225
RotationalFrequency = 233
CycleTime = 300
CycleNumber = 301
PumpLife = 302
TimeWithOilUnderLevel = 305
TimeWithDirtyFilter = 306
TimeControllerStandby = 307
ControllerModelNumber = 319
PumpModelNumber = 320
PumpSpecialModelnumber = 321
PumpSerialNumber = 322
ControllerSerialNumber = 323
MaintenanceTimer = 358
LastHourTemperatureAverage = 362
LastHourAveragePower = 364
LastHourAverageFrequency = 365
LastHourAverageTemperature = 382
AverageTemperatureDuringPumpLife = 384
ProgramListingCodeAndRevision = 406
ParameterListingCodeAndRevision = 407
Rs485SerialAddressSettings = 503
SerialType = 504
class Command(enum.Enum):
Read = 0x30
Write = 0x31
class Response(enum.Enum):
Read = 0x30
Ack = 0x06
Nack = 0x15
UnknownWindow = 0x32
DataTypeError = 0x33
OutOfRange = 0x34
WinDisabled = 0x35
class Status(enum.Enum):
Stop = 0
WaitInterlock = 1
Start = 2
Autotuning = 3
Normal = 5
Fail = 6
class Framing:
def __init__(self):
self.buffer = bytearray()
def receive(self, data):
self.buffer.extend(data)
try:
stx_index = self.buffer.index(0x02)
except ValueError:
self.buffer.clear()
return # no STX found
try:
etx_index = self.buffer.index(0x03)
except ValueError:
return # incomplete
if len(self.buffer) < etx_index + 3:
return # incomplete
try:
provided_checksum = int(self.buffer[etx_index+1:etx_index+3], 16)
except ValueError:
self.buffer.clear()
print('invalid_checksum')
return # invalid checksum
calculated_checksum = checksum(self.buffer[stx_index+1:etx_index+1])
if calculated_checksum == provided_checksum:
payload = self.buffer[stx_index+1:etx_index]
self.buffer.clear()
return payload
def send(self, payload):
command = bytearray([
0x02,
*list(payload),
0x03
])
calculated_checksum = checksum(command[1:])
command.extend(bytes([calculated_checksum]).hex().upper().encode())
return command