diff options
author | Eugeniy E. Mikhailov <evgmik@gmail.com> | 2021-12-02 13:10:51 -0500 |
---|---|---|
committer | Eugeniy E. Mikhailov <evgmik@gmail.com> | 2021-12-02 13:10:51 -0500 |
commit | 4035e1dfd9cc16fc4eb730cb51adf6a5963269bf (patch) | |
tree | bee7c779950a1cb0defa431644d5a53063594bdf /hardware | |
parent | e30d8525409aa4d6554ff64485decc1be25d31c1 (diff) | |
download | qolab-4035e1dfd9cc16fc4eb730cb51adf6a5963269bf.tar.gz qolab-4035e1dfd9cc16fc4eb730cb51adf6a5963269bf.zip |
create a root package qolab
Diffstat (limited to 'hardware')
-rw-r--r-- | hardware/scope/__init__.py | 31 | ||||
-rw-r--r-- | hardware/scope/sds1104x.py | 184 | ||||
-rw-r--r-- | hardware/scpi.py | 63 |
3 files changed, 0 insertions, 278 deletions
diff --git a/hardware/scope/__init__.py b/hardware/scope/__init__.py deleted file mode 100644 index 23462a7..0000000 --- a/hardware/scope/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -""" -Provide basic class to operate scope -Created by Eugeniy E. Mikhailov 2021/11/29 -""" -from hardware.scpi import SCPIinstr - -class Scope: - - # Minimal set of methods to be implemented by a scope. - # Should work with minimal arguments list - # but might be faster if parameters provided: less IO requests - - def __init__(self): - self.numberOfChannels = 0 - - def getTrace(self, chNum, availableNpnts=None, maxRequiredPoints=None): - warnings.warn( 'this function is not implemented' ) - -class ScopeSCPI(SCPIinstr, Scope): - """ - Do not instantiate directly, use - rm = pyvisa.ResourceManager() - ScopeSCPI(rm.open_resource('TCPIP::192.168.0.2::INSTR')) - """ - pass - def __init__(self, resource): - SCPIinstr.__init__(self, resource) - Scope.__init__(self) - -from .sds1104x import SDS1104X - diff --git a/hardware/scope/sds1104x.py b/hardware/scope/sds1104x.py deleted file mode 100644 index 4a12b46..0000000 --- a/hardware/scope/sds1104x.py +++ /dev/null @@ -1,184 +0,0 @@ -""" -Provide basic class to operate scope -Created by Eugeniy E. Mikhailov 2021/11/29 -""" - -from hardware.scope import ScopeSCPI -from data.trace import Trace -import re -import numpy as np - -class SDS1104X(ScopeSCPI): - """ Siglent SDS1104x scope """ - vertDivOnScreen = 10 - horizDivOnScreen = 14 - def __init__(self, resource): - super().__init__(resource) - self.resource.read_termination='\n' - self.numberOfChannels = 4 - self.maxRequiredPoints = 1000; # desired number of points per channel, can return twice more - - def response2numStr(self, strIn, firstSeparator=None, unit=None): - # A typical reply of Siglent is in the form 'TDIV 2.00E-08S' - # i.e. "<prefix><firstSeparator><numberString><unit> - # prefix='TDIV', firstSeparator=' ', numberString='2.00E-08', unit='S' - # this function parses the reply - spltStr = re.split(firstSeparator, strIn) - prefix = spltStr[0] - rstr = spltStr[1] - spltStr = re.split(unit, rstr) - numberString = spltStr[0] - unit = spltStr[1] - return (prefix, numberString, unit) - - def mean(self, chNum): - # get mean on a specific channel calculated by scope - # PAVA stands for PArameter VAlue - qstr = f'C{chNum}:PAVA? MEAN' - rstr = self.query(qstr); - # reply is in the form 'C1:PAVA MEAN,3.00E-02V' - prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=',', unit='V') - return(float(numberString)) - - def getAvailableNumberOfPoints(self, chNum): - if chNum != 1 and chNum != 3: - # for whatever reason 'SAMPLE_NUM' fails for channel 2 and 4 - chNum = 1 - qstr = f'SAMPLE_NUM? C{chNum}' - rstr = self.query(qstr) - # reply is in the form 'SANU 7.00E+01pts' - prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='pts') - return(int(float(numberString))) - - def getSampleRate(self): - rstr = self.query('SAMPLE_RATE?'); - # expected reply is like 'SARA 1.00E+09Sa/s' - prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='Sa/s') - return(int(float(numberString))) - - def calcSparsingAndNumPoints(self, availableNpnts=None, maxRequiredPoints=None): - if availableNpnts is None: - # using channel 1 to get availableNpnts - availableNpnts = self.getAvailableNumberOfPoints(1) - if maxRequiredPoints is None: - maxRequiredPoints = self.maxRequiredPoints - - if availableNpnts <= maxRequiredPoints*2: - Npnts = availableNpnts - sparsing = 1 - else: - sparsing = int(np.floor(availableNpnts/maxRequiredPoints)) - Npnts = int(np.floor(availableNpnts/sparsing)) - return(sparsing, Npnts, availableNpnts, maxRequiredPoints) - - def getRawWaveform(self, chNum, availableNpnts=None, maxRequiredPoints=None): - (sparsing, Npnts, availableNpnts, maxRequiredPoints) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints) - if sparsing == 1 and Npnts == availableNpnts: - # we are getting all of it - cstr = f'WAVEFORM_SETUP NP,0,FP,0,SP,{sparsing}' - # technically when we know Npnts and sparsing - # we can use command from the follow up 'else' clause - else: - cstr = f'WAVEFORM_SETUP SP,{sparsing},NP,{Npnts},FP,0' - # Note: it is not enough to provide sparsing (SP), - # number of points (NP) needed to be calculated properly too! - # From the manual - # WAVEFORM_SETUP SP,<sparsing>,NP,<number>,FP,<point> - # SP Sparse point. It defines the interval between data points. - # For example: - # SP = 0 sends all data points. - # SP = 1 sends all data points. - # SP = 4 sends every 4th data point - # NP — Number of points. It indicates how many points should be transmitted. - # For example: - # NP = 0 sends all data points. - # NP = 50 sends a maximum of 50 data points. - # FP — First point. It specifies the address of the first data point to be sent. - # For example: - # FP = 0 corresponds to the first data point. - # FP = 1 corresponds to the second data point - self.write(cstr) - - qstr = f'C{chNum}:WAVEFORM? DAT2' - wfRaw=self.query_binary_values(qstr, datatype='b', header_fmt='ieee', container=np.array, chunk_size=(Npnts+100)) - # expected full reply: 'C1:WF DAT2,#9000000140.........' - return(wfRaw, availableNpnts, Npnts) - - def getChanVoltsPerDiv(self, chNum): - qstr = f'C{chNum}:VDIV?' - rstr = self.query(qstr) - # expected reply to query: 'C1:VDIV 1.04E+00V' - prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='V') - return(float(numberString)) - - def getChanOffset(self, chNum): - qstr = f'C{chNum}:OFST?' - rstr = self.query(qstr) - # expected reply to query: 'C1:OFST -1.27E+00V' - prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='V') - return(float(numberString)) - - def getTimePerDiv(self): - qstr = f'TDIV?' - rstr = self.query(qstr) - # expected reply to query: 'TDIV 2.00E-08S' - prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='S') - return(float(numberString)) - - def getTrigDelay(self): - qstr = f'TRIG_DELAY?' - rstr = self.query(qstr) - # expected reply to query: 'TRDL -0.00E+00S' - prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='S') - return(float(numberString)) - - - def getWaveform(self, chNum, availableNpnts=None, maxRequiredPoints=None): - wfRaw, availableNpnts, Npnts = self.getRawWaveform(chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints) - VoltageOffset = self.getChanOffset(chNum) - VoltsPerDiv = self.getChanVoltsPerDiv(chNum) - return( wfRaw * VoltsPerDiv * self.vertDivOnScreen/250 -VoltageOffset, availableNpnts ) - - def getTimeTrace(self, availableNpnts=None, maxRequiredPoints=None): - (sparsing, Npnts, availableNpnts, maxRequiredPoints) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints) - sampleRate = self.getSampleRate() - timePerDiv = self.getTimePerDiv() - trigDelay = self.getTrigDelay() - if Npnts is None and sparsing is None: - # using channel 1 as reference - Npnts = self.getAvailableNumberOfPoints(1) - t = np.arange(Npnts) / sampleRate * sparsing; - t = t - timePerDiv * self.horizDivOnScreen/2 - trigDelay - return(t) - - def getTrace(self, chNum, availableNpnts=None, maxRequiredPoints=None): - wfVoltage, availableNpnts = self.getWaveform( chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints) - t = self.getTimeTrace(availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints) - tr = Trace( f'Ch{chNum}' ) - tr.xlabel = 'Time' - tr.xunit = 'S' - tr.ylabel = 'Voltage' - tr.yunit = 'V' - tr.x = t - tr.y = wfVoltage - return( tr ) - - -if __name__ == '__main__': - import pyvisa - print("testing") - rm = pyvisa.ResourceManager() - print(rm.list_resources()) - instr=rm.open_resource('TCPIP::192.168.0.61::INSTR') - scope = SDS1104X(instr) - print(f'ID: {scope.idn}') - print(f'Ch1 mean: {scope.mean(1)}') - print(f'Ch1 available points: {scope.getAvailableNumberOfPoints(1)}') - print(f'Sample Rate: {scope.getSampleRate()}') - print(f'Time per Div: {scope.getTimePerDiv()}') - print(f'Ch1 Volts per Div: {scope.getChanVoltsPerDiv(1)}') - print(f'Ch1 Voltage Offset: {scope.getChanOffset(1)}') - ch1 = scope.getTrace(1) - - - diff --git a/hardware/scpi.py b/hardware/scpi.py deleted file mode 100644 index 279ef24..0000000 --- a/hardware/scpi.py +++ /dev/null @@ -1,63 +0,0 @@ -""" -provide basic class to operate SCPI capable instruments -""" - -class SCPIinstr: - """ Basic class which support SCPI commands """ - """ - Do not instantiate directly, use - rm = pyvisa.ResourceManager() - SCPIinstr(rm.open_resource('TCPIP::192.168.0.2::INSTR')) - """ - def __init__(self, resource): - self.resource = resource - - # convenience pyvisa functions - self.write = self.resource.write - self.read = self.resource.read - self.query = self.resource.query - self.read_bytes = self.resource.read_bytes - self.read_binary_values = self.resource.read_binary_values - self.query_binary_values = self.resource.query_binary_values - - @property - def idn(self): - return self.query("*IDN?") - - def clear_status(self): - self.write("*CLS") - - def set_event_status_enable(self): - self.write("*ESE") - - def query_event_status_enable(self): - self.query("*ESE?") - - def query_event_status_register(self): - self.query("*ESR?") - - def set_wait_until_finished(self): - self.query("*OPC") - - def wait_until_finished(self): - self.query("*OPC?") - - def reset(self): - self.write("*RST") - - def set_service_request_enable(self): - self.write("*SRE") - - def query_service_request_enable(self): - self.query("*SRE?") - - def query_status_byte(self): - self.query("*STB?") - - def self_test_result(self): - self.query("*TST?") - - def wait(self): - self.write("*WAI") - - |