diff options
author | Eugeniy E. Mikhailov <evgmik@gmail.com> | 2024-07-14 01:20:48 -0400 |
---|---|---|
committer | Eugeniy E. Mikhailov <evgmik@gmail.com> | 2024-07-14 01:20:48 -0400 |
commit | b3810155f1f45718b0c474a6750f71c8f430d125 (patch) | |
tree | 662771ace518b3296d728cd0759469c6650d4a8f /qolab/hardware | |
parent | 7c070588a84aeb3ff7e665ee35eff6f3b8052235 (diff) | |
download | qolab-b3810155f1f45718b0c474a6750f71c8f430d125.tar.gz qolab-b3810155f1f45718b0c474a6750f71c8f430d125.zip |
black formatter
Diffstat (limited to 'qolab/hardware')
-rw-r--r-- | qolab/hardware/scpi.py | 128 |
1 files changed, 74 insertions, 54 deletions
diff --git a/qolab/hardware/scpi.py b/qolab/hardware/scpi.py index 92c949e..91e7cc1 100644 --- a/qolab/hardware/scpi.py +++ b/qolab/hardware/scpi.py @@ -6,10 +6,14 @@ import re import logging import time -logging.basicConfig(format='%(asctime)s %(levelname)8s %(name)s: %(message)s', datefmt='%m/%d/%Y %H:%M:%S') -logger = logging.getLogger('qolab.hardware.scpi') +logging.basicConfig( + format="%(asctime)s %(levelname)8s %(name)s: %(message)s", + datefmt="%m/%d/%Y %H:%M:%S", +) +logger = logging.getLogger("qolab.hardware.scpi") logger.setLevel(logging.INFO) + def response2numStr(strIn, firstSeparator=None, unit=None): """Parses non standard SCPI reply. Often an instrument reply is in the form 'TDIV 2.00E-08S' (for example Siglent Scope) @@ -31,11 +35,11 @@ def response2numStr(strIn, firstSeparator=None, unit=None): """ prefix = None rstr = strIn - if firstSeparator is not None and firstSeparator != '': + if firstSeparator is not None and firstSeparator != "": spltStr = re.split(firstSeparator, strIn) prefix = spltStr[0] rstr = spltStr[1] - if unit is not None and unit != '': + if unit is not None and unit != "": spltStr = re.split(unit, rstr) if len(spltStr) == 1: unit = None @@ -44,6 +48,7 @@ def response2numStr(strIn, firstSeparator=None, unit=None): numberString = rstr return (prefix, numberString, unit) + class SCPI_PROPERTY(property): """Overrides 'property' class and makes it suitable for SCPI set and query notation. Adds ability to log into TSDB. @@ -53,7 +58,7 @@ class SCPI_PROPERTY(property): ---------- scpi_prfx : str or None (default) SCPI command prefix to get/set property, for example 'FreqInt' - is internally transformed to + is internally transformed to query 'FreqInt?' and setter 'FreqInt {val}'. It could be set as the explicit query and set format string list: ['AUXV? 1', 'AUXV 1,{}'] where {} is place holder for set value @@ -66,7 +71,7 @@ class SCPI_PROPERTY(property): no_setter : True of False (default) does property has a setter, some properties has no setter, e.g. measurement of external voltages for an ADC tsdb_logging : True (default) or False - do we log get/set commands result/argument to TSDB + do we log get/set commands result/argument to TSDB Examples -------- @@ -74,16 +79,24 @@ class SCPI_PROPERTY(property): """ - def __init__(self, scpi_prfx=None, ptype=str, doc=None, no_getter=False, no_setter=False, tsdb_logging=True): + def __init__( + self, + scpi_prfx=None, + ptype=str, + doc=None, + no_getter=False, + no_setter=False, + tsdb_logging=True, + ): self.no_getter = no_getter self.no_setter = no_setter self.tsdb_logging = tsdb_logging if no_getter: - fget=None + fget = None else: fget = self.get_scpi if no_setter: - fset=None + fset = None else: fset = self.set_scpi super().__init__(fget=fget, fset=fset) @@ -91,76 +104,81 @@ class SCPI_PROPERTY(property): self.ptype = ptype self.__doc__ = doc if isinstance(scpi_prfx, str): - self.scpi_prfx_get = ''.join([self.scpi_prfx, '?']) - self.scpi_prfx_set = ''.join([self.scpi_prfx, ' {}']) + self.scpi_prfx_get = "".join([self.scpi_prfx, "?"]) + self.scpi_prfx_set = "".join([self.scpi_prfx, " {}"]) elif isinstance(scpi_prfx, list): if len(scpi_prfx) != 2: - raise ValueError(f'{scpi_prfx=}, should be list with exactly two elements') + raise ValueError( + f"{scpi_prfx=}, should be list with exactly two elements" + ) self.scpi_prfx_get = self.scpi_prfx[0] self.scpi_prfx_set = self.scpi_prfx[1] else: - raise ValueError(f'{scpi_prfx=}, it should be either str or list type') + raise ValueError(f"{scpi_prfx=}, it should be either str or list type") if not isinstance(self.scpi_prfx_get, str): - raise ValueError(f'{self.scpi_prfx_get=}, it should be str type') + raise ValueError(f"{self.scpi_prfx_get=}, it should be str type") if not isinstance(self.scpi_prfx_set, str): - raise ValueError(f'{self.scpi_prfx_set=}, it should be str type') + raise ValueError(f"{self.scpi_prfx_set=}, it should be str type") def __set_name__(self, owner, name): self.public_name = name - self.private_name = '_' + name + self.private_name = "_" + name def log_to_tsdb(self, owner, action=None, val=None): if owner.tsdb_ingester is None or not self.tsdb_logging: return - if owner.config['DeviceNickname'] is not None: - measurement=owner.config['DeviceNickname'] + if owner.config["DeviceNickname"] is not None: + measurement = owner.config["DeviceNickname"] else: - measurement=owner.config['Device type'] + measurement = owner.config["Device type"] ts = time.time() - ts_ms = int(ts*1000) + ts_ms = int(ts * 1000) var_name = self.public_name - tags = {'action': action } + tags = {"action": action} fields = {var_name: val} try: - msg=f"{ts_ms=}, {measurement=}, {tags=}, {fields=}" + msg = f"{ts_ms=}, {measurement=}, {tags=}, {fields=}" logger.debug(msg) - owner.tsdb_ingester.append(ts_ms, measurement=measurement, tags=tags, **fields) + owner.tsdb_ingester.append( + ts_ms, measurement=measurement, tags=tags, **fields + ) except ValueError as err: - logger.error(f'{err=}: cannot log to TSDB {var_name} = {val}') - + logger.error(f"{err=}: cannot log to TSDB {var_name} = {val}") def get_scpi(self, owner): - val = self.ptype( owner.query(f'{self.scpi_prfx_get}') ) - self.log_to_tsdb(owner, action='get', val=val) + val = self.ptype(owner.query(f"{self.scpi_prfx_get}")) + self.log_to_tsdb(owner, action="get", val=val) return val def set_scpi(self, owner, val): cstr = self.scpi_prfx_set.format(val) owner.write(cstr) - self.log_to_tsdb(owner, action='set', val=val) + self.log_to_tsdb(owner, action="set", val=val) def __repr__(self): - sargs= [] - sargs.append( f'scpi_prfx={self.scpi_prfx}') - sargs.append( f'ptype={self.ptype}') - sargs.append( f'doc={self.__doc__}') - sargs.append( f'no_getter={self.no_getter}') - sargs.append( f'no_setter={self.no_setter}') - sargs.append( f'tsdb_logging={self.tsdb_logging}') - sargs =', '.join(sargs) - s = ''.join( [ f'{self.__class__.__name__}(' , sargs, ')' ] ) + sargs = [] + sargs.append(f"scpi_prfx={self.scpi_prfx}") + sargs.append(f"ptype={self.ptype}") + sargs.append(f"doc={self.__doc__}") + sargs.append(f"no_getter={self.no_getter}") + sargs.append(f"no_setter={self.no_setter}") + sargs.append(f"tsdb_logging={self.tsdb_logging}") + sargs = ", ".join(sargs) + s = "".join([f"{self.__class__.__name__}(", sargs, ")"]) return s + class SCPIinstr: - """ Basic class which support SCPI commands. + """Basic class which support SCPI commands. Do not instantiate directly, use rm = pyvisa.ResourceManager() SCPIinstr(rm.open_resource('TCPIP::192.168.0.2::INSTR')) """ + def __init__(self, resource): self.resource = resource - + # convenience pyvisa functions self.write = self.resource.write self.read = self.resource.read @@ -178,7 +196,7 @@ class SCPIinstr: def set_event_status_enable(self): self.write("*ESE") - + def query_event_status_enable(self): return self.query("*ESE?") @@ -193,7 +211,7 @@ class SCPIinstr: def reset(self): self.write("*RST") - + def set_service_request_enable(self): self.write("*SRE") @@ -202,34 +220,36 @@ class SCPIinstr: def query_status_byte(self): return self.query("*STB?") - + def self_test_result(self): return self.query("*TST?") - + def wait(self): self.write("*WAI") -if __name__ == '__main__': +if __name__ == "__main__": from qolab.hardware.basic import BasicInstrument + class DummyInstrument(BasicInstrument): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # in order to check SCPI_PROPERTY we need to implement write and query def write(self, str): - print(f'write: {str=}') + print(f"write: {str=}") def query(self, str): - print(f'query: {str=}') - return '123' + print(f"query: {str=}") + return "123" - x = SCPI_PROPERTY(scpi_prfx='SETX', ptype=str, doc='property X', tsdb_logging=False) - y = SCPI_PROPERTY(scpi_prfx='SETY', ptype=int, no_setter=True, doc='property Y') - z = SCPI_PROPERTY(scpi_prfx='SETY', ptype=int, no_getter=True, doc='property Z') + x = SCPI_PROPERTY( + scpi_prfx="SETX", ptype=str, doc="property X", tsdb_logging=False + ) + y = SCPI_PROPERTY(scpi_prfx="SETY", ptype=int, no_setter=True, doc="property Y") + z = SCPI_PROPERTY(scpi_prfx="SETY", ptype=int, no_getter=True, doc="property Z") - c1= DummyInstrument() - c1.deviceProperties.update({'x', 'y'}) + c1 = DummyInstrument() + c1.deviceProperties.update({"x", "y"}) c1.getConfig() - c2= DummyInstrument() - + c2 = DummyInstrument() |