1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
"""
This unit talks to i-Series controllers sold by Omega and Newport.
They are often can be connected via iServer interface
Specifically, i853 units with LAN connection, but should be ok with many others.
Querying of this controller is slo-o-o-w at least 0.2 second and can be longer.
"""
from qolab.hardware.basic import BasicInstrument
from cachetools import cached, TTLCache
import socket
class I800(BasicInstrument):
""" Newport i800 series controller, should work with similar Omega controllers """
TTL_MEASURED = 30 # Time To Live for device measured things, i.e. Temperature
TTL_SEATABLES = 600 # Time To Live for user seatables, i.e. SetPoints, Gains, etc
def __init__(self, *args, host='192.168.1.200', port=1000, **kwds):
"""
host - default hostname or IP of the controller unit (192.168.1.200) is factory default
port - socket port (1000 by default) which accept HTTP POST requests
"""
BasicInstrument.__init__(self, *args, **kwds)
self.host=host
self.port=port
self.modbus_address='01'
self.cmd_start_marker='*'
self.config['Device type']='TemperatureController'
self.config['Device model'] = 'i800'
self.config['FnamePrefix'] = 'temperature'
self.deviceProperties = {'Temperature', 'SetPoint1', 'SetPoint2'}
def query(self, cmnd, trials=10):
modbus_cmnd = f'{self.modbus_address}{cmnd}'
qstr=f'POST / HTTP/1.1\r\n\r\n{self.cmd_start_marker}{modbus_cmnd}\r\n'
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self.host, self.port))
s.send(qstr.encode('ascii'))
reply = s.recv(100).decode('ascii')
s.close()
rlist=reply.split(); # occasionally there is more than one reply, it also removes \r
reply=rlist[-1]; # we will use the last one
if reply[0:5] != f'{modbus_cmnd}':
# check the proper echo response
print(f'Warning: expected {modbus_cmnd} but got {reply[0:5]}')
if trials > 0:
return self.query(cmnd, trials -1 )
return None
return reply[5:]
@BasicInstrument.tsdb_append
@cached(cache=TTLCache(maxsize=1, ttl=TTL_MEASURED))
def getTemperature(self):
command='X01'; # give decimal representation (X) of the temperature (01 address)
reply=self.query(command)
if reply is not None:
return float(reply)
else:
return float('nan')
def setPoinStr2value(self, spStr):
raw = int(spStr, 16)
if raw & (1<<23):
sign = -1
else:
sign = 1
if raw & (0b1 << 20):
scale = 1
elif raw & (0b10 << 20):
scale = 10
elif raw & (0b11 << 20):
scale = 100
elif raw & (0b100 << 20):
scale = 100
else:
print(f'Error: unknown decimal point position in decoded {spStr}')
return float('nan')
val = raw & 0xFFFFF
return float(sign*val/scale)
@BasicInstrument.tsdb_append
@cached(cache=TTLCache(maxsize=1, ttl=TTL_SEATABLES))
def getSetPoint1(self):
command='R01'
reply=self.query(command)
if reply is not None:
return (self.setPoinStr2value(reply))
else:
return float('nan')
@BasicInstrument.tsdb_append
@cached(cache=TTLCache(maxsize=1, ttl=TTL_SEATABLES))
def getSetPoint2(self):
command='R02'
reply=self.query(command)
if reply is not None:
return (self.setPoinStr2value(reply))
else:
return float('nan')
if __name__ == '__main__':
tc = I800()
print(tc.getConfig())
|