aboutsummaryrefslogtreecommitdiff
path: root/qolab/hardware
diff options
context:
space:
mode:
authorEugeniy E. Mikhailov <evgmik@gmail.com>2024-07-14 01:20:48 -0400
committerEugeniy E. Mikhailov <evgmik@gmail.com>2024-07-14 01:20:48 -0400
commitb3810155f1f45718b0c474a6750f71c8f430d125 (patch)
tree662771ace518b3296d728cd0759469c6650d4a8f /qolab/hardware
parent7c070588a84aeb3ff7e665ee35eff6f3b8052235 (diff)
downloadqolab-b3810155f1f45718b0c474a6750f71c8f430d125.tar.gz
qolab-b3810155f1f45718b0c474a6750f71c8f430d125.zip
black formatter
Diffstat (limited to 'qolab/hardware')
-rw-r--r--qolab/hardware/scpi.py128
1 files changed, 74 insertions, 54 deletions
diff --git a/qolab/hardware/scpi.py b/qolab/hardware/scpi.py
index 92c949e..91e7cc1 100644
--- a/qolab/hardware/scpi.py
+++ b/qolab/hardware/scpi.py
@@ -6,10 +6,14 @@ import re
import logging
import time
-logging.basicConfig(format='%(asctime)s %(levelname)8s %(name)s: %(message)s', datefmt='%m/%d/%Y %H:%M:%S')
-logger = logging.getLogger('qolab.hardware.scpi')
+logging.basicConfig(
+ format="%(asctime)s %(levelname)8s %(name)s: %(message)s",
+ datefmt="%m/%d/%Y %H:%M:%S",
+)
+logger = logging.getLogger("qolab.hardware.scpi")
logger.setLevel(logging.INFO)
+
def response2numStr(strIn, firstSeparator=None, unit=None):
"""Parses non standard SCPI reply.
Often an instrument reply is in the form 'TDIV 2.00E-08S' (for example Siglent Scope)
@@ -31,11 +35,11 @@ def response2numStr(strIn, firstSeparator=None, unit=None):
"""
prefix = None
rstr = strIn
- if firstSeparator is not None and firstSeparator != '':
+ if firstSeparator is not None and firstSeparator != "":
spltStr = re.split(firstSeparator, strIn)
prefix = spltStr[0]
rstr = spltStr[1]
- if unit is not None and unit != '':
+ if unit is not None and unit != "":
spltStr = re.split(unit, rstr)
if len(spltStr) == 1:
unit = None
@@ -44,6 +48,7 @@ def response2numStr(strIn, firstSeparator=None, unit=None):
numberString = rstr
return (prefix, numberString, unit)
+
class SCPI_PROPERTY(property):
"""Overrides 'property' class and makes it suitable for SCPI set and query notation.
Adds ability to log into TSDB.
@@ -53,7 +58,7 @@ class SCPI_PROPERTY(property):
----------
scpi_prfx : str or None (default)
SCPI command prefix to get/set property, for example 'FreqInt'
- is internally transformed to
+ is internally transformed to
query 'FreqInt?' and setter 'FreqInt {val}'.
It could be set as the explicit query and set format string list:
['AUXV? 1', 'AUXV 1,{}'] where {} is place holder for set value
@@ -66,7 +71,7 @@ class SCPI_PROPERTY(property):
no_setter : True of False (default)
does property has a setter, some properties has no setter, e.g. measurement of external voltages for an ADC
tsdb_logging : True (default) or False
- do we log get/set commands result/argument to TSDB
+ do we log get/set commands result/argument to TSDB
Examples
--------
@@ -74,16 +79,24 @@ class SCPI_PROPERTY(property):
"""
- def __init__(self, scpi_prfx=None, ptype=str, doc=None, no_getter=False, no_setter=False, tsdb_logging=True):
+ def __init__(
+ self,
+ scpi_prfx=None,
+ ptype=str,
+ doc=None,
+ no_getter=False,
+ no_setter=False,
+ tsdb_logging=True,
+ ):
self.no_getter = no_getter
self.no_setter = no_setter
self.tsdb_logging = tsdb_logging
if no_getter:
- fget=None
+ fget = None
else:
fget = self.get_scpi
if no_setter:
- fset=None
+ fset = None
else:
fset = self.set_scpi
super().__init__(fget=fget, fset=fset)
@@ -91,76 +104,81 @@ class SCPI_PROPERTY(property):
self.ptype = ptype
self.__doc__ = doc
if isinstance(scpi_prfx, str):
- self.scpi_prfx_get = ''.join([self.scpi_prfx, '?'])
- self.scpi_prfx_set = ''.join([self.scpi_prfx, ' {}'])
+ self.scpi_prfx_get = "".join([self.scpi_prfx, "?"])
+ self.scpi_prfx_set = "".join([self.scpi_prfx, " {}"])
elif isinstance(scpi_prfx, list):
if len(scpi_prfx) != 2:
- raise ValueError(f'{scpi_prfx=}, should be list with exactly two elements')
+ raise ValueError(
+ f"{scpi_prfx=}, should be list with exactly two elements"
+ )
self.scpi_prfx_get = self.scpi_prfx[0]
self.scpi_prfx_set = self.scpi_prfx[1]
else:
- raise ValueError(f'{scpi_prfx=}, it should be either str or list type')
+ raise ValueError(f"{scpi_prfx=}, it should be either str or list type")
if not isinstance(self.scpi_prfx_get, str):
- raise ValueError(f'{self.scpi_prfx_get=}, it should be str type')
+ raise ValueError(f"{self.scpi_prfx_get=}, it should be str type")
if not isinstance(self.scpi_prfx_set, str):
- raise ValueError(f'{self.scpi_prfx_set=}, it should be str type')
+ raise ValueError(f"{self.scpi_prfx_set=}, it should be str type")
def __set_name__(self, owner, name):
self.public_name = name
- self.private_name = '_' + name
+ self.private_name = "_" + name
def log_to_tsdb(self, owner, action=None, val=None):
if owner.tsdb_ingester is None or not self.tsdb_logging:
return
- if owner.config['DeviceNickname'] is not None:
- measurement=owner.config['DeviceNickname']
+ if owner.config["DeviceNickname"] is not None:
+ measurement = owner.config["DeviceNickname"]
else:
- measurement=owner.config['Device type']
+ measurement = owner.config["Device type"]
ts = time.time()
- ts_ms = int(ts*1000)
+ ts_ms = int(ts * 1000)
var_name = self.public_name
- tags = {'action': action }
+ tags = {"action": action}
fields = {var_name: val}
try:
- msg=f"{ts_ms=}, {measurement=}, {tags=}, {fields=}"
+ msg = f"{ts_ms=}, {measurement=}, {tags=}, {fields=}"
logger.debug(msg)
- owner.tsdb_ingester.append(ts_ms, measurement=measurement, tags=tags, **fields)
+ owner.tsdb_ingester.append(
+ ts_ms, measurement=measurement, tags=tags, **fields
+ )
except ValueError as err:
- logger.error(f'{err=}: cannot log to TSDB {var_name} = {val}')
-
+ logger.error(f"{err=}: cannot log to TSDB {var_name} = {val}")
def get_scpi(self, owner):
- val = self.ptype( owner.query(f'{self.scpi_prfx_get}') )
- self.log_to_tsdb(owner, action='get', val=val)
+ val = self.ptype(owner.query(f"{self.scpi_prfx_get}"))
+ self.log_to_tsdb(owner, action="get", val=val)
return val
def set_scpi(self, owner, val):
cstr = self.scpi_prfx_set.format(val)
owner.write(cstr)
- self.log_to_tsdb(owner, action='set', val=val)
+ self.log_to_tsdb(owner, action="set", val=val)
def __repr__(self):
- sargs= []
- sargs.append( f'scpi_prfx={self.scpi_prfx}')
- sargs.append( f'ptype={self.ptype}')
- sargs.append( f'doc={self.__doc__}')
- sargs.append( f'no_getter={self.no_getter}')
- sargs.append( f'no_setter={self.no_setter}')
- sargs.append( f'tsdb_logging={self.tsdb_logging}')
- sargs =', '.join(sargs)
- s = ''.join( [ f'{self.__class__.__name__}(' , sargs, ')' ] )
+ sargs = []
+ sargs.append(f"scpi_prfx={self.scpi_prfx}")
+ sargs.append(f"ptype={self.ptype}")
+ sargs.append(f"doc={self.__doc__}")
+ sargs.append(f"no_getter={self.no_getter}")
+ sargs.append(f"no_setter={self.no_setter}")
+ sargs.append(f"tsdb_logging={self.tsdb_logging}")
+ sargs = ", ".join(sargs)
+ s = "".join([f"{self.__class__.__name__}(", sargs, ")"])
return s
+
class SCPIinstr:
- """ Basic class which support SCPI commands.
+ """Basic class which support SCPI commands.
Do not instantiate directly, use
rm = pyvisa.ResourceManager()
SCPIinstr(rm.open_resource('TCPIP::192.168.0.2::INSTR'))
"""
+
def __init__(self, resource):
self.resource = resource
-
+
# convenience pyvisa functions
self.write = self.resource.write
self.read = self.resource.read
@@ -178,7 +196,7 @@ class SCPIinstr:
def set_event_status_enable(self):
self.write("*ESE")
-
+
def query_event_status_enable(self):
return self.query("*ESE?")
@@ -193,7 +211,7 @@ class SCPIinstr:
def reset(self):
self.write("*RST")
-
+
def set_service_request_enable(self):
self.write("*SRE")
@@ -202,34 +220,36 @@ class SCPIinstr:
def query_status_byte(self):
return self.query("*STB?")
-
+
def self_test_result(self):
return self.query("*TST?")
-
+
def wait(self):
self.write("*WAI")
-if __name__ == '__main__':
+if __name__ == "__main__":
from qolab.hardware.basic import BasicInstrument
+
class DummyInstrument(BasicInstrument):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# in order to check SCPI_PROPERTY we need to implement write and query
def write(self, str):
- print(f'write: {str=}')
+ print(f"write: {str=}")
def query(self, str):
- print(f'query: {str=}')
- return '123'
+ print(f"query: {str=}")
+ return "123"
- x = SCPI_PROPERTY(scpi_prfx='SETX', ptype=str, doc='property X', tsdb_logging=False)
- y = SCPI_PROPERTY(scpi_prfx='SETY', ptype=int, no_setter=True, doc='property Y')
- z = SCPI_PROPERTY(scpi_prfx='SETY', ptype=int, no_getter=True, doc='property Z')
+ x = SCPI_PROPERTY(
+ scpi_prfx="SETX", ptype=str, doc="property X", tsdb_logging=False
+ )
+ y = SCPI_PROPERTY(scpi_prfx="SETY", ptype=int, no_setter=True, doc="property Y")
+ z = SCPI_PROPERTY(scpi_prfx="SETY", ptype=int, no_getter=True, doc="property Z")
- c1= DummyInstrument()
- c1.deviceProperties.update({'x', 'y'})
+ c1 = DummyInstrument()
+ c1.deviceProperties.update({"x", "y"})
c1.getConfig()
- c2= DummyInstrument()
-
+ c2 = DummyInstrument()