aboutsummaryrefslogtreecommitdiff
path: root/qolab/hardware
diff options
context:
space:
mode:
Diffstat (limited to 'qolab/hardware')
-rw-r--r--qolab/hardware/scope/sds1104x.py258
1 files changed, 151 insertions, 107 deletions
diff --git a/qolab/hardware/scope/sds1104x.py b/qolab/hardware/scope/sds1104x.py
index 9f0b502..3023d08 100644
--- a/qolab/hardware/scope/sds1104x.py
+++ b/qolab/hardware/scope/sds1104x.py
@@ -10,6 +10,7 @@ import numpy as np
import scipy.signal
from pyvisa.constants import InterfaceType
+
class SDS1104X(ScopeSCPI):
"""Siglent SDS1104x scope"""
@@ -17,41 +18,47 @@ class SDS1104X(ScopeSCPI):
# the grabbed trace has more points outside what is visible on the screen
vertDivOnScreen = 10
horizDivOnScreen = 14
+
def __init__(self, resource, *args, **kwds):
super().__init__(resource, *args, **kwds)
- self.config['Device model'] = 'SDS1104X'
- self.resource.read_termination='\n'
+ self.config["Device model"] = "SDS1104X"
+ self.resource.read_termination = "\n"
self.numberOfChannels = 4
- self.maxRequiredPoints = 1000; # desired number of points per channel, can return twice more
+ self.maxRequiredPoints = 1000
+ # desired number of points per channel, can return twice more
def mean(self, chNum):
# get mean on a specific channel calculated by scope
# PAVA stands for PArameter VAlue
- qstr = f'C{chNum}:PAVA? MEAN'
- rstr = self.query(qstr);
+ qstr = f"C{chNum}:PAVA? MEAN"
+ rstr = self.query(qstr)
# reply is in the form 'C1:PAVA MEAN,3.00E-02V'
- prefix, numberString, unit = response2numStr(rstr, firstSeparator=',', unit='V')
- return(float(numberString))
+ prefix, numberString, unit = response2numStr(rstr, firstSeparator=",", unit="V")
+ return float(numberString)
def getAvailableNumberOfPoints(self, chNum):
if chNum != 1 and chNum != 3:
# for whatever reason 'SAMPLE_NUM' fails for channel 2 and 4
chNum = 1
- qstr = f'SAMPLE_NUM? C{chNum}'
+ qstr = f"SAMPLE_NUM? C{chNum}"
rstr = self.query(qstr)
# reply is in the form 'SANU 7.00E+01pts'
- prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='pts')
- return(int(float(numberString)))
-
+ prefix, numberString, unit = response2numStr(
+ rstr, firstSeparator=" ", unit="pts"
+ )
+ return int(float(numberString))
+
@BasicInstrument.tsdb_append
def getSampleRate(self):
- rstr = self.query('SAMPLE_RATE?');
+ rstr = self.query("SAMPLE_RATE?")
# expected reply is like 'SARA 1.00E+09Sa/s'
- prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='Sa/s')
- return(int(float(numberString)))
+ prefix, numberString, unit = response2numStr(
+ rstr, firstSeparator=" ", unit="Sa/s"
+ )
+ return int(float(numberString))
def setSampleRate(self, val):
- print('Cannot set SampleRate directly for SDS1104X')
+ print("Cannot set SampleRate directly for SDS1104X")
# it is not possible to do with this model directly
pass
@@ -62,15 +69,17 @@ class SDS1104X(ScopeSCPI):
if maxRequiredPoints is None:
maxRequiredPoints = self.maxRequiredPoints
- if availableNpnts <= maxRequiredPoints*2:
+ if availableNpnts <= maxRequiredPoints * 2:
Npnts = availableNpnts
sparsing = 1
else:
- sparsing = int(np.floor(availableNpnts/maxRequiredPoints))
- Npnts = int(np.floor(availableNpnts/sparsing))
- return(sparsing, Npnts, availableNpnts, maxRequiredPoints)
+ sparsing = int(np.floor(availableNpnts / maxRequiredPoints))
+ Npnts = int(np.floor(availableNpnts / sparsing))
+ return (sparsing, Npnts, availableNpnts, maxRequiredPoints)
- def getRawWaveform(self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True):
+ def getRawWaveform(
+ self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True
+ ):
"""
If decimate=True is used, we get all available points and then low-pass filter them.
The result is less noisy. But transfer time from the instrument is longer.
@@ -78,26 +87,31 @@ class SDS1104X(ScopeSCPI):
but we might see aliasing, if there is a high frequency noise and sparsing > 1
"""
- (sparsing, Npnts, availableNpnts, maxRequiredPoints) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints)
+ (
+ sparsing,
+ Npnts,
+ availableNpnts,
+ maxRequiredPoints,
+ ) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints)
if decimate:
- Npnts = availableNpnts # get all of them and decimate later
+ Npnts = availableNpnts # get all of them and decimate later
if (sparsing == 1 and Npnts == availableNpnts) or decimate:
# We are getting all points of the trace
# Apparently sparsing has no effect with this command
# and effectively uses SP=1 for any sparsing
# but I want to make sure and force it
- cstr = f'WAVEFORM_SETUP NP,0,FP,0,SP,1'
+ cstr = f"WAVEFORM_SETUP NP,0,FP,0,SP,1"
# technically when we know Npnts and sparsing
# we can use command from the follow up 'else' clause
else:
# we just ask every point with 'sparsing' interval
# fast to grab but we could do better with more advance decimate
# method, which allow better precision for the price of longer acquisition time
- cstr = f'WAVEFORM_SETUP SP,{sparsing},NP,{Npnts},FP,0'
+ cstr = f"WAVEFORM_SETUP SP,{sparsing},NP,{Npnts},FP,0"
# Note: it is not enough to provide sparsing (SP),
# number of points (NP) needed to be calculated properly too!
# From the manual
- # WAVEFORM_SETUP SP,<sparsing>,NP,<number>,FP,<point>
+ # WAVEFORM_SETUP SP,<sparsing>,NP,<number>,FP,<point>
# SP Sparse point. It defines the interval between data points.
# For example:
# SP = 0 sends all data points.
@@ -113,64 +127,73 @@ class SDS1104X(ScopeSCPI):
# FP = 1 corresponds to the second data point
self.write(cstr)
- trRaw = Trace(f'Ch{chNum}')
+ trRaw = Trace(f"Ch{chNum}")
- qstr = f'C{chNum}:WAVEFORM? DAT2'
+ qstr = f"C{chNum}:WAVEFORM? DAT2"
# expected full reply: 'C1:WF DAT2,#9000000140.........'
try:
- wfRaw=self.query_binary_values(qstr, datatype='b', header_fmt='ieee', container=np.array, chunk_size=(Npnts+100))
+ wfRaw = self.query_binary_values(
+ qstr,
+ datatype="b",
+ header_fmt="ieee",
+ container=np.array,
+ chunk_size=(Npnts + 100),
+ )
if self.resource.interface_type == InterfaceType.usb:
# Somehow on windows (at least with USB interface)
# there is a lingering empty string which we need to flush out
- r=self.read()
- if r != '':
- print(f'WARNING: We expected an empty string but got {r=}')
- trRaw.values = wfRaw.reshape(wfRaw.size,1)
+ r = self.read()
+ if r != "":
+ print(f"WARNING: We expected an empty string but got {r=}")
+ trRaw.values = wfRaw.reshape(wfRaw.size, 1)
if decimate and sparsing != 1:
- numtaps = 3; # not sure it is the best case
- trRaw.values = scipy.signal.decimate(trRaw.values, sparsing, numtaps, axis=0)
+ numtaps = 3
+ # not sure it is the best case
+ trRaw.values = scipy.signal.decimate(
+ trRaw.values, sparsing, numtaps, axis=0
+ )
except ValueError as err:
# most likely we get crazy number of points
# self.read() # flushing the bogus output of previous command
- print(f'Error {err=}: getting waveform failed for {qstr=}')
- wfRaw=np.array([])
- trRaw.config['unit'] = 'Count'
- trRaw.config['tags']['Decimate'] = decimate
- return(trRaw, availableNpnts, Npnts)
+ print(f"Error {err=}: getting waveform failed for {qstr=}")
+ wfRaw = np.array([])
+ trRaw.config["unit"] = "Count"
+ trRaw.config["tags"]["Decimate"] = decimate
+ return (trRaw, availableNpnts, Npnts)
def getChanVoltsPerDiv(self, chNum):
- qstr = f'C{chNum}:VDIV?'
+ qstr = f"C{chNum}:VDIV?"
rstr = self.query(qstr)
# expected reply to query: 'C1:VDIV 1.04E+00V'
- prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='V')
- return(float(numberString))
+ prefix, numberString, unit = response2numStr(rstr, firstSeparator=" ", unit="V")
+ return float(numberString)
def setChanVoltsPerDiv(self, chNum, vPerDiv):
- cstr = f'C{chNum}:VDIV {vPerDiv}'
+ cstr = f"C{chNum}:VDIV {vPerDiv}"
self.write(cstr)
# if out of range, the VAB bit (bit 2) in the STB register to be set
def getChanVoltageOffset(self, chNum):
- qstr = f'C{chNum}:OFST?'
+ qstr = f"C{chNum}:OFST?"
rstr = self.query(qstr)
# expected reply to query: 'C1:OFST -1.27E+00V'
- prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='V')
- return(float(numberString))
+ prefix, numberString, unit = response2numStr(rstr, firstSeparator=" ", unit="V")
+ return float(numberString)
def setChanVoltageOffset(self, chNum, val):
- cstr = f'C{chNum}:OFST {val}'
+ cstr = f"C{chNum}:OFST {val}"
self.write(cstr)
def getLED(self):
- """ Returns binary mask of available LEDs """
- qstr = 'LED?'
+ """Returns binary mask of available LEDs"""
+ qstr = "LED?"
rstr = self.query(qstr)
- prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='')
- return(int(numberString,16)) # convert from hex string to integer
+ prefix, numberString, unit = response2numStr(rstr, firstSeparator=" ", unit="")
+ return int(numberString, 16) # convert from hex string to integer
def toggleRun(self):
# SY_FP is undocumented, reverse engineered from the web interface
- self.write('SY_FP 12,1')
+ self.write("SY_FP 12,1")
@BasicInstrument.tsdb_append
def getRun(self):
@@ -190,7 +213,7 @@ class SDS1104X(ScopeSCPI):
def toggleRoll(self):
# SY_FP is undocumented, reverse engineered from the web interface
- self.write('SY_FP 49,1')
+ self.write("SY_FP 49,1")
@BasicInstrument.tsdb_append
def setRoll(self, val):
@@ -200,106 +223,127 @@ class SDS1104X(ScopeSCPI):
@BasicInstrument.tsdb_append
def getTimePerDiv(self):
- qstr = 'TDIV?'
+ qstr = "TDIV?"
rstr = self.query(qstr)
# expected reply to query: 'TDIV 2.00E-08S'
- prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='S')
- return(float(numberString))
+ prefix, numberString, unit = response2numStr(rstr, firstSeparator=" ", unit="S")
+ return float(numberString)
@BasicInstrument.tsdb_append
def setTimePerDiv(self, timePerDiv):
- cstr = f'TDIV {timePerDiv}'
+ cstr = f"TDIV {timePerDiv}"
self.write(cstr)
# if out of range, the VAB bit (bit 2) in the STB register to be set
@BasicInstrument.tsdb_append
def getTrigDelay(self):
- qstr = 'TRIG_DELAY?'
+ qstr = "TRIG_DELAY?"
rstr = self.query(qstr)
# expected reply to query: 'TRDL -0.00E+00S'
- prefix, numberString, unit = response2numStr(rstr, firstSeparator=' ', unit='S')
- return(float(numberString))
-
+ prefix, numberString, unit = response2numStr(rstr, firstSeparator=" ", unit="S")
+ return float(numberString)
- def getWaveform(self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True):
+ def getWaveform(
+ self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True
+ ):
"""
For decimate use see getRawWaveform. In short decimate=True is slower but more precise.
"""
- trRaw, availableNpnts, Npnts = self.getRawWaveform(chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints, decimate=decimate)
+ trRaw, availableNpnts, Npnts = self.getRawWaveform(
+ chNum,
+ availableNpnts=availableNpnts,
+ maxRequiredPoints=maxRequiredPoints,
+ decimate=decimate,
+ )
VoltageOffset = self.getChanVoltageOffset(chNum)
VoltsPerDiv = self.getChanVoltsPerDiv(chNum)
tr = trRaw
- tr.values = trRaw.values * VoltsPerDiv * self.vertDivOnScreen/250 -VoltageOffset
- tr.config['unit'] = 'Volt'
- tr.config['tags']['VoltageOffset'] = VoltageOffset
- tr.config['tags']['VoltsPerDiv'] = VoltsPerDiv
- return(tr, availableNpnts)
+ tr.values = (
+ trRaw.values * VoltsPerDiv * self.vertDivOnScreen / 250 - VoltageOffset
+ )
+ tr.config["unit"] = "Volt"
+ tr.config["tags"]["VoltageOffset"] = VoltageOffset
+ tr.config["tags"]["VoltsPerDiv"] = VoltsPerDiv
+ return (tr, availableNpnts)
def getTimeTrace(self, availableNpnts=None, maxRequiredPoints=None):
- (sparsing, Npnts, availableNpnts, maxRequiredPoints) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints)
+ (
+ sparsing,
+ Npnts,
+ availableNpnts,
+ maxRequiredPoints,
+ ) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints)
sampleRate = self.getSampleRate()
timePerDiv = self.getTimePerDiv()
- trigDelay = self.getTrigDelay()
+ trigDelay = self.getTrigDelay()
if Npnts is None and sparsing is None:
# using channel 1 as reference
Npnts = self.getAvailableNumberOfPoints(1)
- tval = np.arange(Npnts) / sampleRate * sparsing;
- tval = tval - timePerDiv * self.horizDivOnScreen/2 - trigDelay
- t = Trace('time')
- t.values = tval.reshape(tval.size,1)
- t.config['unit'] = 'S'
- t.config['tags']['TimePerDiv'] = timePerDiv
- t.config['tags']['TrigDelay'] = trigDelay
- t.config['tags']['SampleRate'] = sampleRate
- t.config['tags']['AvailableNPnts'] = availableNpnts
- t.config['tags']['Npnts'] = availableNpnts
- t.config['tags']['Sparsing'] = sparsing
- return(t)
+ tval = np.arange(Npnts) / sampleRate * sparsing
+ tval = tval - timePerDiv * self.horizDivOnScreen / 2 - trigDelay
+ t = Trace("time")
+ t.values = tval.reshape(tval.size, 1)
+ t.config["unit"] = "S"
+ t.config["tags"]["TimePerDiv"] = timePerDiv
+ t.config["tags"]["TrigDelay"] = trigDelay
+ t.config["tags"]["SampleRate"] = sampleRate
+ t.config["tags"]["AvailableNPnts"] = availableNpnts
+ t.config["tags"]["Npnts"] = availableNpnts
+ t.config["tags"]["Sparsing"] = sparsing
+ return t
def getTriggerMode(self):
# we expect NORM, AUTO, SINGLE, STOP
- res = self.query('TRIG_MODE?')
+ res = self.query("TRIG_MODE?")
# res is in the form 'TRMD AUTO'
return res[5:]
def setTriggerMode(self, val):
# we expect NORM, AUTO, SINGLE, STOP
- self.write(f'TRIG_MODE {val}')
-
- def getTrace(self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True):
+ self.write(f"TRIG_MODE {val}")
+
+ def getTrace(
+ self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True
+ ):
old_trg_mode = self.getTriggerMode()
- self.setTriggerMode('STOP'); # to get synchronous channels
- wfVoltage, availableNpnts = self.getWaveform( chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints, decimate=decimate)
- t = self.getTimeTrace(availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints)
- tr = TraceXY( f'Ch{chNum}' )
+ self.setTriggerMode("STOP")
+ # to get synchronous channels
+ wfVoltage, availableNpnts = self.getWaveform(
+ chNum,
+ availableNpnts=availableNpnts,
+ maxRequiredPoints=maxRequiredPoints,
+ decimate=decimate,
+ )
+ t = self.getTimeTrace(
+ availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints
+ )
+ tr = TraceXY(f"Ch{chNum}")
tr.x = t
tr.y = wfVoltage
# restore scope to the before acquisition mode
if old_trg_mode != "STOP":
# short speed up here with this check
self.setTriggerMode(old_trg_mode)
- return( tr )
+ return tr
-if __name__ == '__main__':
+if __name__ == "__main__":
import pyvisa
+
print("testing")
rm = pyvisa.ResourceManager()
- print(rm.list_resources())
- instr=rm.open_resource('TCPIP::192.168.0.62::INSTR')
+ print(rm.list_resources())
+ instr = rm.open_resource("TCPIP::192.168.0.62::INSTR")
scope = SDS1104X(instr)
- print(f'ID: {scope.idn}')
+ print(f"ID: {scope.idn}")
# print(f'Ch1 mean: {scope.mean(1)}')
- print(f'Ch1 available points: {scope.getAvailableNumberOfPoints(1)}')
- print(f'Sample Rate: {scope.getSampleRate()}')
- print(f'Time per Div: {scope.getTimePerDiv()}')
- print(f'Ch1 Volts per Div: {scope.getChanVoltsPerDiv(1)}')
- print(f'Ch1 Voltage Offset: {scope.getChanVoltageOffset(1)}')
- print('------ Header start -------------')
- print(str.join('\n', scope.getHeader()))
- print('------ Header ends -------------')
- ch1 = scope.getTrace(1)
+ print(f"Ch1 available points: {scope.getAvailableNumberOfPoints(1)}")
+ print(f"Sample Rate: {scope.getSampleRate()}")
+ print(f"Time per Div: {scope.getTimePerDiv()}")
+ print(f"Ch1 Volts per Div: {scope.getChanVoltsPerDiv(1)}")
+ print(f"Ch1 Voltage Offset: {scope.getChanVoltageOffset(1)}")
+ print("------ Header start -------------")
+ print(str.join("\n", scope.getHeader()))
+ print("------ Header ends -------------")
+ ch1 = scope.getTrace(1)
traces = scope.getAllTraces()
-
-
-