diff options
Diffstat (limited to 'qolab/hardware')
-rw-r--r-- | qolab/hardware/scope/sds1104x.py | 258 |
1 files changed, 151 insertions, 107 deletions
diff --git a/qolab/hardware/scope/sds1104x.py b/qolab/hardware/scope/sds1104x.py index 9f0b502..3023d08 100644 --- a/qolab/hardware/scope/sds1104x.py +++ b/qolab/hardware/scope/sds1104x.py @@ -10,6 +10,7 @@ import numpy as np import scipy.signal from pyvisa.constants import InterfaceType + class SDS1104X(ScopeSCPI): """Siglent SDS1104x scope""" @@ -17,41 +18,47 @@ class SDS1104X(ScopeSCPI): # the grabbed trace has more points outside what is visible on the screen vertDivOnScreen = 10 horizDivOnScreen = 14 + def __init__(self, resource, *args, **kwds): super().__init__(resource, *args, **kwds) - self.config['Device model'] = 'SDS1104X' - self.resource.read_termination='\n' + self.config["Device model"] = "SDS1104X" + self.resource.read_termination = "\n" self.numberOfChannels = 4 - self.maxRequiredPoints = 1000; # desired number of points per channel, can return twice more + self.maxRequiredPoints = 1000 + # desired number of points per channel, can return twice more def mean(self, chNum): # get mean on a specific channel calculated by scope # PAVA stands for PArameter VAlue - qstr = f'C{chNum}:PAVA? MEAN' - rstr = self.query(qstr); + qstr = f"C{chNum}:PAVA? MEAN" + rstr = self.query(qstr) # reply is in the form 'C1:PAVA MEAN,3.00E-02V' - prefix, numberString, unit = response2numStr(rstr, firstSeparator=',', unit='V') - return(float(numberString)) + prefix, numberString, unit = response2numStr(rstr, firstSeparator=",", unit="V") + return float(numberString) def getAvailableNumberOfPoints(self, chNum): if chNum != 1 and chNum != 3: # for whatever reason 'SAMPLE_NUM' fails for channel 2 and 4 chNum = 1 - qstr = f'SAMPLE_NUM? C{chNum}' + qstr = f"SAMPLE_NUM? C{chNum}" rstr = self.query(qstr) # reply is in the form 'SANU 7.00E+01pts' - prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='pts') - return(int(float(numberString))) - + prefix, numberString, unit = response2numStr( + rstr, firstSeparator=" ", unit="pts" + ) + return int(float(numberString)) + @BasicInstrument.tsdb_append def getSampleRate(self): - rstr = self.query('SAMPLE_RATE?'); + rstr = self.query("SAMPLE_RATE?") # expected reply is like 'SARA 1.00E+09Sa/s' - prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='Sa/s') - return(int(float(numberString))) + prefix, numberString, unit = response2numStr( + rstr, firstSeparator=" ", unit="Sa/s" + ) + return int(float(numberString)) def setSampleRate(self, val): - print('Cannot set SampleRate directly for SDS1104X') + print("Cannot set SampleRate directly for SDS1104X") # it is not possible to do with this model directly pass @@ -62,15 +69,17 @@ class SDS1104X(ScopeSCPI): if maxRequiredPoints is None: maxRequiredPoints = self.maxRequiredPoints - if availableNpnts <= maxRequiredPoints*2: + if availableNpnts <= maxRequiredPoints * 2: Npnts = availableNpnts sparsing = 1 else: - sparsing = int(np.floor(availableNpnts/maxRequiredPoints)) - Npnts = int(np.floor(availableNpnts/sparsing)) - return(sparsing, Npnts, availableNpnts, maxRequiredPoints) + sparsing = int(np.floor(availableNpnts / maxRequiredPoints)) + Npnts = int(np.floor(availableNpnts / sparsing)) + return (sparsing, Npnts, availableNpnts, maxRequiredPoints) - def getRawWaveform(self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True): + def getRawWaveform( + self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True + ): """ If decimate=True is used, we get all available points and then low-pass filter them. The result is less noisy. But transfer time from the instrument is longer. @@ -78,26 +87,31 @@ class SDS1104X(ScopeSCPI): but we might see aliasing, if there is a high frequency noise and sparsing > 1 """ - (sparsing, Npnts, availableNpnts, maxRequiredPoints) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints) + ( + sparsing, + Npnts, + availableNpnts, + maxRequiredPoints, + ) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints) if decimate: - Npnts = availableNpnts # get all of them and decimate later + Npnts = availableNpnts # get all of them and decimate later if (sparsing == 1 and Npnts == availableNpnts) or decimate: # We are getting all points of the trace # Apparently sparsing has no effect with this command # and effectively uses SP=1 for any sparsing # but I want to make sure and force it - cstr = f'WAVEFORM_SETUP NP,0,FP,0,SP,1' + cstr = f"WAVEFORM_SETUP NP,0,FP,0,SP,1" # technically when we know Npnts and sparsing # we can use command from the follow up 'else' clause else: # we just ask every point with 'sparsing' interval # fast to grab but we could do better with more advance decimate # method, which allow better precision for the price of longer acquisition time - cstr = f'WAVEFORM_SETUP SP,{sparsing},NP,{Npnts},FP,0' + cstr = f"WAVEFORM_SETUP SP,{sparsing},NP,{Npnts},FP,0" # Note: it is not enough to provide sparsing (SP), # number of points (NP) needed to be calculated properly too! # From the manual - # WAVEFORM_SETUP SP,<sparsing>,NP,<number>,FP,<point> + # WAVEFORM_SETUP SP,<sparsing>,NP,<number>,FP,<point> # SP Sparse point. It defines the interval between data points. # For example: # SP = 0 sends all data points. @@ -113,64 +127,73 @@ class SDS1104X(ScopeSCPI): # FP = 1 corresponds to the second data point self.write(cstr) - trRaw = Trace(f'Ch{chNum}') + trRaw = Trace(f"Ch{chNum}") - qstr = f'C{chNum}:WAVEFORM? DAT2' + qstr = f"C{chNum}:WAVEFORM? DAT2" # expected full reply: 'C1:WF DAT2,#9000000140.........' try: - wfRaw=self.query_binary_values(qstr, datatype='b', header_fmt='ieee', container=np.array, chunk_size=(Npnts+100)) + wfRaw = self.query_binary_values( + qstr, + datatype="b", + header_fmt="ieee", + container=np.array, + chunk_size=(Npnts + 100), + ) if self.resource.interface_type == InterfaceType.usb: # Somehow on windows (at least with USB interface) # there is a lingering empty string which we need to flush out - r=self.read() - if r != '': - print(f'WARNING: We expected an empty string but got {r=}') - trRaw.values = wfRaw.reshape(wfRaw.size,1) + r = self.read() + if r != "": + print(f"WARNING: We expected an empty string but got {r=}") + trRaw.values = wfRaw.reshape(wfRaw.size, 1) if decimate and sparsing != 1: - numtaps = 3; # not sure it is the best case - trRaw.values = scipy.signal.decimate(trRaw.values, sparsing, numtaps, axis=0) + numtaps = 3 + # not sure it is the best case + trRaw.values = scipy.signal.decimate( + trRaw.values, sparsing, numtaps, axis=0 + ) except ValueError as err: # most likely we get crazy number of points # self.read() # flushing the bogus output of previous command - print(f'Error {err=}: getting waveform failed for {qstr=}') - wfRaw=np.array([]) - trRaw.config['unit'] = 'Count' - trRaw.config['tags']['Decimate'] = decimate - return(trRaw, availableNpnts, Npnts) + print(f"Error {err=}: getting waveform failed for {qstr=}") + wfRaw = np.array([]) + trRaw.config["unit"] = "Count" + trRaw.config["tags"]["Decimate"] = decimate + return (trRaw, availableNpnts, Npnts) def getChanVoltsPerDiv(self, chNum): - qstr = f'C{chNum}:VDIV?' + qstr = f"C{chNum}:VDIV?" rstr = self.query(qstr) # expected reply to query: 'C1:VDIV 1.04E+00V' - prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='V') - return(float(numberString)) + prefix, numberString, unit = response2numStr(rstr, firstSeparator=" ", unit="V") + return float(numberString) def setChanVoltsPerDiv(self, chNum, vPerDiv): - cstr = f'C{chNum}:VDIV {vPerDiv}' + cstr = f"C{chNum}:VDIV {vPerDiv}" self.write(cstr) # if out of range, the VAB bit (bit 2) in the STB register to be set def getChanVoltageOffset(self, chNum): - qstr = f'C{chNum}:OFST?' + qstr = f"C{chNum}:OFST?" rstr = self.query(qstr) # expected reply to query: 'C1:OFST -1.27E+00V' - prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='V') - return(float(numberString)) + prefix, numberString, unit = response2numStr(rstr, firstSeparator=" ", unit="V") + return float(numberString) def setChanVoltageOffset(self, chNum, val): - cstr = f'C{chNum}:OFST {val}' + cstr = f"C{chNum}:OFST {val}" self.write(cstr) def getLED(self): - """ Returns binary mask of available LEDs """ - qstr = 'LED?' + """Returns binary mask of available LEDs""" + qstr = "LED?" rstr = self.query(qstr) - prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='') - return(int(numberString,16)) # convert from hex string to integer + prefix, numberString, unit = response2numStr(rstr, firstSeparator=" ", unit="") + return int(numberString, 16) # convert from hex string to integer def toggleRun(self): # SY_FP is undocumented, reverse engineered from the web interface - self.write('SY_FP 12,1') + self.write("SY_FP 12,1") @BasicInstrument.tsdb_append def getRun(self): @@ -190,7 +213,7 @@ class SDS1104X(ScopeSCPI): def toggleRoll(self): # SY_FP is undocumented, reverse engineered from the web interface - self.write('SY_FP 49,1') + self.write("SY_FP 49,1") @BasicInstrument.tsdb_append def setRoll(self, val): @@ -200,106 +223,127 @@ class SDS1104X(ScopeSCPI): @BasicInstrument.tsdb_append def getTimePerDiv(self): - qstr = 'TDIV?' + qstr = "TDIV?" rstr = self.query(qstr) # expected reply to query: 'TDIV 2.00E-08S' - prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='S') - return(float(numberString)) + prefix, numberString, unit = response2numStr(rstr, firstSeparator=" ", unit="S") + return float(numberString) @BasicInstrument.tsdb_append def setTimePerDiv(self, timePerDiv): - cstr = f'TDIV {timePerDiv}' + cstr = f"TDIV {timePerDiv}" self.write(cstr) # if out of range, the VAB bit (bit 2) in the STB register to be set @BasicInstrument.tsdb_append def getTrigDelay(self): - qstr = 'TRIG_DELAY?' + qstr = "TRIG_DELAY?" rstr = self.query(qstr) # expected reply to query: 'TRDL -0.00E+00S' - prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='S') - return(float(numberString)) - + prefix, numberString, unit = response2numStr(rstr, firstSeparator=" ", unit="S") + return float(numberString) - def getWaveform(self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True): + def getWaveform( + self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True + ): """ For decimate use see getRawWaveform. In short decimate=True is slower but more precise. """ - trRaw, availableNpnts, Npnts = self.getRawWaveform(chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints, decimate=decimate) + trRaw, availableNpnts, Npnts = self.getRawWaveform( + chNum, + availableNpnts=availableNpnts, + maxRequiredPoints=maxRequiredPoints, + decimate=decimate, + ) VoltageOffset = self.getChanVoltageOffset(chNum) VoltsPerDiv = self.getChanVoltsPerDiv(chNum) tr = trRaw - tr.values = trRaw.values * VoltsPerDiv * self.vertDivOnScreen/250 -VoltageOffset - tr.config['unit'] = 'Volt' - tr.config['tags']['VoltageOffset'] = VoltageOffset - tr.config['tags']['VoltsPerDiv'] = VoltsPerDiv - return(tr, availableNpnts) + tr.values = ( + trRaw.values * VoltsPerDiv * self.vertDivOnScreen / 250 - VoltageOffset + ) + tr.config["unit"] = "Volt" + tr.config["tags"]["VoltageOffset"] = VoltageOffset + tr.config["tags"]["VoltsPerDiv"] = VoltsPerDiv + return (tr, availableNpnts) def getTimeTrace(self, availableNpnts=None, maxRequiredPoints=None): - (sparsing, Npnts, availableNpnts, maxRequiredPoints) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints) + ( + sparsing, + Npnts, + availableNpnts, + maxRequiredPoints, + ) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints) sampleRate = self.getSampleRate() timePerDiv = self.getTimePerDiv() - trigDelay = self.getTrigDelay() + trigDelay = self.getTrigDelay() if Npnts is None and sparsing is None: # using channel 1 as reference Npnts = self.getAvailableNumberOfPoints(1) - tval = np.arange(Npnts) / sampleRate * sparsing; - tval = tval - timePerDiv * self.horizDivOnScreen/2 - trigDelay - t = Trace('time') - t.values = tval.reshape(tval.size,1) - t.config['unit'] = 'S' - t.config['tags']['TimePerDiv'] = timePerDiv - t.config['tags']['TrigDelay'] = trigDelay - t.config['tags']['SampleRate'] = sampleRate - t.config['tags']['AvailableNPnts'] = availableNpnts - t.config['tags']['Npnts'] = availableNpnts - t.config['tags']['Sparsing'] = sparsing - return(t) + tval = np.arange(Npnts) / sampleRate * sparsing + tval = tval - timePerDiv * self.horizDivOnScreen / 2 - trigDelay + t = Trace("time") + t.values = tval.reshape(tval.size, 1) + t.config["unit"] = "S" + t.config["tags"]["TimePerDiv"] = timePerDiv + t.config["tags"]["TrigDelay"] = trigDelay + t.config["tags"]["SampleRate"] = sampleRate + t.config["tags"]["AvailableNPnts"] = availableNpnts + t.config["tags"]["Npnts"] = availableNpnts + t.config["tags"]["Sparsing"] = sparsing + return t def getTriggerMode(self): # we expect NORM, AUTO, SINGLE, STOP - res = self.query('TRIG_MODE?') + res = self.query("TRIG_MODE?") # res is in the form 'TRMD AUTO' return res[5:] def setTriggerMode(self, val): # we expect NORM, AUTO, SINGLE, STOP - self.write(f'TRIG_MODE {val}') - - def getTrace(self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True): + self.write(f"TRIG_MODE {val}") + + def getTrace( + self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True + ): old_trg_mode = self.getTriggerMode() - self.setTriggerMode('STOP'); # to get synchronous channels - wfVoltage, availableNpnts = self.getWaveform( chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints, decimate=decimate) - t = self.getTimeTrace(availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints) - tr = TraceXY( f'Ch{chNum}' ) + self.setTriggerMode("STOP") + # to get synchronous channels + wfVoltage, availableNpnts = self.getWaveform( + chNum, + availableNpnts=availableNpnts, + maxRequiredPoints=maxRequiredPoints, + decimate=decimate, + ) + t = self.getTimeTrace( + availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints + ) + tr = TraceXY(f"Ch{chNum}") tr.x = t tr.y = wfVoltage # restore scope to the before acquisition mode if old_trg_mode != "STOP": # short speed up here with this check self.setTriggerMode(old_trg_mode) - return( tr ) + return tr -if __name__ == '__main__': +if __name__ == "__main__": import pyvisa + print("testing") rm = pyvisa.ResourceManager() - print(rm.list_resources()) - instr=rm.open_resource('TCPIP::192.168.0.62::INSTR') + print(rm.list_resources()) + instr = rm.open_resource("TCPIP::192.168.0.62::INSTR") scope = SDS1104X(instr) - print(f'ID: {scope.idn}') + print(f"ID: {scope.idn}") # print(f'Ch1 mean: {scope.mean(1)}') - print(f'Ch1 available points: {scope.getAvailableNumberOfPoints(1)}') - print(f'Sample Rate: {scope.getSampleRate()}') - print(f'Time per Div: {scope.getTimePerDiv()}') - print(f'Ch1 Volts per Div: {scope.getChanVoltsPerDiv(1)}') - print(f'Ch1 Voltage Offset: {scope.getChanVoltageOffset(1)}') - print('------ Header start -------------') - print(str.join('\n', scope.getHeader())) - print('------ Header ends -------------') - ch1 = scope.getTrace(1) + print(f"Ch1 available points: {scope.getAvailableNumberOfPoints(1)}") + print(f"Sample Rate: {scope.getSampleRate()}") + print(f"Time per Div: {scope.getTimePerDiv()}") + print(f"Ch1 Volts per Div: {scope.getChanVoltsPerDiv(1)}") + print(f"Ch1 Voltage Offset: {scope.getChanVoltageOffset(1)}") + print("------ Header start -------------") + print(str.join("\n", scope.getHeader())) + print("------ Header ends -------------") + ch1 = scope.getTrace(1) traces = scope.getAllTraces() - - - |