""" Created by Eugeniy E. Mikhailov 2021/11/29 """ from qolab.hardware.scope import ScopeSCPI from qolab.hardware.scpi import response2numStr from qolab.data.trace import Trace, TraceXY import numpy as np class SDS1104X(ScopeSCPI): """ Siglent SDS1104x scope """ vertDivOnScreen = 10 horizDivOnScreen = 14 def __init__(self, resource): super().__init__(resource) self.config['Device model'] = 'SDS1104X' self.resource.read_termination='\n' self.numberOfChannels = 4 self.maxRequiredPoints = 1000; # desired number of points per channel, can return twice more def mean(self, chNum): # get mean on a specific channel calculated by scope # PAVA stands for PArameter VAlue qstr = f'C{chNum}:PAVA? MEAN' rstr = self.query(qstr); # reply is in the form 'C1:PAVA MEAN,3.00E-02V' prefix, numberString, unit = response2numStr(rstr, firstSeparator=',', unit='V') return(float(numberString)) def getAvailableNumberOfPoints(self, chNum): if chNum != 1 and chNum != 3: # for whatever reason 'SAMPLE_NUM' fails for channel 2 and 4 chNum = 1 qstr = f'SAMPLE_NUM? C{chNum}' rstr = self.query(qstr) # reply is in the form 'SANU 7.00E+01pts' prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='pts') return(int(float(numberString))) def getSampleRate(self): rstr = self.query('SAMPLE_RATE?'); # expected reply is like 'SARA 1.00E+09Sa/s' prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='Sa/s') return(int(float(numberString))) def calcSparsingAndNumPoints(self, availableNpnts=None, maxRequiredPoints=None): if availableNpnts is None: # using channel 1 to get availableNpnts availableNpnts = self.getAvailableNumberOfPoints(1) if maxRequiredPoints is None: maxRequiredPoints = self.maxRequiredPoints if availableNpnts <= maxRequiredPoints*2: Npnts = availableNpnts sparsing = 1 else: sparsing = int(np.floor(availableNpnts/maxRequiredPoints)) Npnts = int(np.floor(availableNpnts/sparsing)) return(sparsing, Npnts, availableNpnts, maxRequiredPoints) def getRawWaveform(self, chNum, availableNpnts=None, maxRequiredPoints=None): (sparsing, Npnts, availableNpnts, maxRequiredPoints) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints) if sparsing == 1 and Npnts == availableNpnts: # we are getting all of it cstr = f'WAVEFORM_SETUP NP,0,FP,0,SP,{sparsing}' # technically when we know Npnts and sparsing # we can use command from the follow up 'else' clause else: cstr = f'WAVEFORM_SETUP SP,{sparsing},NP,{Npnts},FP,0' # Note: it is not enough to provide sparsing (SP), # number of points (NP) needed to be calculated properly too! # From the manual # WAVEFORM_SETUP SP,,NP,,FP, # SP Sparse point. It defines the interval between data points. # For example: # SP = 0 sends all data points. # SP = 1 sends all data points. # SP = 4 sends every 4th data point # NP — Number of points. It indicates how many points should be transmitted. # For example: # NP = 0 sends all data points. # NP = 50 sends a maximum of 50 data points. # FP — First point. It specifies the address of the first data point to be sent. # For example: # FP = 0 corresponds to the first data point. # FP = 1 corresponds to the second data point self.write(cstr) qstr = f'C{chNum}:WAVEFORM? DAT2' wfRaw=self.query_binary_values(qstr, datatype='b', header_fmt='ieee', container=np.array, chunk_size=(Npnts+100)) # expected full reply: 'C1:WF DAT2,#9000000140.........' trRaw = Trace(f'Ch{chNum}') trRaw.values = wfRaw.reshape(wfRaw.size,1) trRaw.unit = 'count' return(trRaw, availableNpnts, Npnts) def getChanVoltsPerDiv(self, chNum): qstr = f'C{chNum}:VDIV?' rstr = self.query(qstr) # expected reply to query: 'C1:VDIV 1.04E+00V' prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='V') return(float(numberString)) def setChanVoltsPerDiv(self, chNum, vPerDiv): cstr = f'C{chNum}:VDIV {vPerDiv}' self.write(cstr) # if out of range, the VAB bit (bit 2) in the STB register to be set def getChanOffset(self, chNum): qstr = f'C{chNum}:OFST?' rstr = self.query(qstr) # expected reply to query: 'C1:OFST -1.27E+00V' prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='V') return(float(numberString)) def getTimePerDiv(self): qstr = f'TDIV?' rstr = self.query(qstr) # expected reply to query: 'TDIV 2.00E-08S' prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='S') return(float(numberString)) def setTimePerDiv(self, timePerDiv): cstr = f'TDIV {timePerDiv}' self.write(cstr) # if out of range, the VAB bit (bit 2) in the STB register to be set def getTrigDelay(self): qstr = f'TRIG_DELAY?' rstr = self.query(qstr) # expected reply to query: 'TRDL -0.00E+00S' prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='S') return(float(numberString)) def getWaveform(self, chNum, availableNpnts=None, maxRequiredPoints=None): trRaw, availableNpnts, Npnts = self.getRawWaveform(chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints) VoltageOffset = self.getChanOffset(chNum) VoltsPerDiv = self.getChanVoltsPerDiv(chNum) tr = trRaw tr.values = trRaw.values * VoltsPerDiv * self.vertDivOnScreen/250 -VoltageOffset tr.config['unit'] = 'Volt' tr.config['tags']['VoltageOffset'] = VoltageOffset tr.config['tags']['VoltsPerDiv'] = VoltsPerDiv return(tr, availableNpnts) def getTimeTrace(self, availableNpnts=None, maxRequiredPoints=None): (sparsing, Npnts, availableNpnts, maxRequiredPoints) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints) sampleRate = self.getSampleRate() timePerDiv = self.getTimePerDiv() trigDelay = self.getTrigDelay() if Npnts is None and sparsing is None: # using channel 1 as reference Npnts = self.getAvailableNumberOfPoints(1) tval = np.arange(Npnts) / sampleRate * sparsing; tval = tval - timePerDiv * self.horizDivOnScreen/2 - trigDelay t = Trace('time') t.values = tval.reshape(tval.size,1) t.config['unit'] = 'S' t.config['tags']['TimePerDiv'] = timePerDiv t.config['tags']['TrigDelay'] = trigDelay t.config['tags']['SampleRate'] = sampleRate t.config['tags']['AvailableNPnts'] = availableNpnts t.config['tags']['Npnts'] = availableNpnts t.config['tags']['Sparsing'] = sparsing return(t) def getTrace(self, chNum, availableNpnts=None, maxRequiredPoints=None): wfVoltage, availableNpnts = self.getWaveform( chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints) t = self.getTimeTrace(availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints) tr = TraceXY( f'Ch{chNum}' ) tr.x = t tr.y = wfVoltage return( tr ) if __name__ == '__main__': import pyvisa print("testing") rm = pyvisa.ResourceManager() print(rm.list_resources()) instr=rm.open_resource('TCPIP::192.168.0.61::INSTR') scope = SDS1104X(instr) print(f'ID: {scope.idn}') print(f'Ch1 mean: {scope.mean(1)}') print(f'Ch1 available points: {scope.getAvailableNumberOfPoints(1)}') print(f'Sample Rate: {scope.getSampleRate()}') print(f'Time per Div: {scope.getTimePerDiv()}') print(f'Ch1 Volts per Div: {scope.getChanVoltsPerDiv(1)}') print(f'Ch1 Voltage Offset: {scope.getChanOffset(1)}') print('------ Header start -------------') print(str.join('\n', scope.getHeader())) print('------ Header ends -------------') ch1 = scope.getTrace(1) traces = scope.getAllTraces()