aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--datatrace.py16
-rw-r--r--scope/__init__.py192
-rw-r--r--scope/sds1104x.py184
3 files changed, 201 insertions, 191 deletions
diff --git a/datatrace.py b/datatrace.py
new file mode 100644
index 0000000..7398695
--- /dev/null
+++ b/datatrace.py
@@ -0,0 +1,16 @@
+
+class DataTrace:
+ def __init__(self, descrStr):
+ self.descr = descrStr
+ self.x = None
+ self.xlabel = None
+ self.xunit = None
+ self.y = None
+ self.ylabel = None
+ self.yunit = None
+
+ def plot(self):
+ import matplotlib.pyplot as plt
+ plt.plot(self.x, self.y, label=self.descr)
+ plt.legend()
+
diff --git a/scope/__init__.py b/scope/__init__.py
index ffc0720..e6b6573 100644
--- a/scope/__init__.py
+++ b/scope/__init__.py
@@ -2,11 +2,7 @@
Provide basic class to operate scope
Created by Eugeniy E. Mikhailov 2021/11/29
"""
-
-import pyvisa
import scpi
-import re
-import numpy as np
class Scope(scpi.SCPIinstr):
"""
@@ -22,191 +18,5 @@ class Scope(scpi.SCPIinstr):
def getTrace(self, chNum, availableNpnts=None, maxRequiredPoints=None):
warnings.warn( 'this function is not implemented' )
-class Trace:
- def __init__(self, descrStr):
- self.descr = descrStr
- self.x = None
- self.xlabel = None
- self.xunit = None
- self.y = None
- self.ylabel = None
- self.yunit = None
-
- def plot(self):
- import matplotlib.pyplot as plt
- plt.plot(self.x, self.y, label=self.descr)
- plt.legend()
-
-class SDS1104x(Scope):
- """ Siglent SDS1104x scope """
- vertDivOnScreen = 10
- horizDivOnScreen = 14
- numberOfChannels = 4
- def __init__(self, resource):
- super().__init__(resource)
- self.resource.read_termination='\n'
- self.maxRequiredPoints = 1000; # desired number of points per channel, can return twice more
-
- def response2numStr(self, strIn, firstSeparator=None, unit=None):
- # A typical reply of Siglent is in the form 'TDIV 2.00E-08S'
- # i.e. "<prefix><firstSeparator><numberString><unit>
- # prefix='TDIV', firstSeparator=' ', numberString='2.00E-08', unit='S'
- # this function parses the reply
- spltStr = re.split(firstSeparator, strIn)
- prefix = spltStr[0]
- rstr = spltStr[1]
- spltStr = re.split(unit, rstr)
- numberString = spltStr[0]
- unit = spltStr[1]
- return (prefix, numberString, unit)
-
- def mean(self, chNum):
- # get mean on a specific channel calculated by scope
- # PAVA stands for PArameter VAlue
- qstr = f'C{chNum}:PAVA? MEAN'
- rstr = self.query(qstr);
- # reply is in the form 'C1:PAVA MEAN,3.00E-02V'
- prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=',', unit='V')
- return(float(numberString))
-
- def getAvailableNumberOfPoints(self, chNum):
- if chNum != 1 and chNum != 3:
- # for whatever reason 'SAMPLE_NUM' fails for channel 2 and 4
- chNum = 1
- qstr = f'SAMPLE_NUM? C{chNum}'
- rstr = self.query(qstr)
- # reply is in the form 'SANU 7.00E+01pts'
- prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='pts')
- return(int(float(numberString)))
-
- def getSampleRate(self):
- rstr = self.query('SAMPLE_RATE?');
- # expected reply is like 'SARA 1.00E+09Sa/s'
- prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='Sa/s')
- return(int(float(numberString)))
-
- def calcSparsingAndNumPoints(self, availableNpnts=None, maxRequiredPoints=None):
- if availableNpnts == None:
- # using channel 1 to get availableNpnts
- availableNpnts = self.getAvailableNumberOfPoints(1)
- if maxRequiredPoints == None:
- maxRequiredPoints = self.maxRequiredPoints
-
- if availableNpnts <= maxRequiredPoints*2:
- Npnts = availableNpnts
- sparsing = 1
- else:
- sparsing = int(np.floor(availableNpnts/maxRequiredPoints))
- Npnts = int(np.floor(availableNpnts/sparsing))
- return(sparsing, Npnts, availableNpnts, maxRequiredPoints)
-
- def getRawWaveform(self, chNum, availableNpnts=None, maxRequiredPoints=None):
- (sparsing, Npnts, availableNpnts, maxRequiredPoints) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints)
- if sparsing == 1 and Npnts == availableNpnts:
- # we are getting all of it
- cstr = f'WAVEFORM_SETUP NP,0,FP,0,SP,{sparsing}'
- # technically when we know Npnts and sparsing
- # we can use command from the follow up 'else' clause
- else:
- cstr = f'WAVEFORM_SETUP SP,{sparsing},NP,{Npnts},FP,0'
- # Note: it is not enough to provide sparsing (SP),
- # number of points (NP) needed to be calculated properly too!
- # From the manual
- # WAVEFORM_SETUP SP,<sparsing>,NP,<number>,FP,<point>
- # SP Sparse point. It defines the interval between data points.
- # For example:
- # SP = 0 sends all data points.
- # SP = 1 sends all data points.
- # SP = 4 sends every 4th data point
- # NP — Number of points. It indicates how many points should be transmitted.
- # For example:
- # NP = 0 sends all data points.
- # NP = 50 sends a maximum of 50 data points.
- # FP — First point. It specifies the address of the first data point to be sent.
- # For example:
- # FP = 0 corresponds to the first data point.
- # FP = 1 corresponds to the second data point
- self.write(cstr)
-
- qstr = f'C{chNum}:WAVEFORM? DAT2'
- wfRaw=self.query_binary_values(qstr, datatype='b', header_fmt='ieee', container=np.array, chunk_size=(Npnts+100))
- # expected full reply: 'C1:WF DAT2,#9000000140.........'
- return(wfRaw, availableNpnts, Npnts)
-
- def getChanVoltsPerDiv(self, chNum):
- qstr = f'C{chNum}:VDIV?'
- rstr = self.query(qstr)
- # expected reply to query: 'C1:VDIV 1.04E+00V'
- prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='V')
- return(float(numberString))
-
- def getChanOffset(self, chNum):
- qstr = f'C{chNum}:OFST?'
- rstr = self.query(qstr)
- # expected reply to query: 'C1:OFST -1.27E+00V'
- prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='V')
- return(float(numberString))
-
- def getTimePerDiv(self):
- qstr = f'TDIV?'
- rstr = self.query(qstr)
- # expected reply to query: 'TDIV 2.00E-08S'
- prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='S')
- return(float(numberString))
-
- def getTrigDelay(self):
- qstr = f'TRIG_DELAY?'
- rstr = self.query(qstr)
- # expected reply to query: 'TRDL -0.00E+00S'
- prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='S')
- return(float(numberString))
-
-
- def getWaveform(self, chNum, availableNpnts=None, maxRequiredPoints=None):
- wfRaw, availableNpnts, Npnts = self.getRawWaveform(chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints)
- VoltageOffset = self.getChanOffset(chNum)
- VoltsPerDiv = self.getChanVoltsPerDiv(chNum)
- return( wfRaw * VoltsPerDiv * self.vertDivOnScreen/250 -VoltageOffset, availableNpnts )
-
- def getTimeTrace(self, availableNpnts=None, maxRequiredPoints=None):
- (sparsing, Npnts, availableNpnts, maxRequiredPoints) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints)
- sampleRate = self.getSampleRate()
- timePerDiv = self.getTimePerDiv()
- trigDelay = self.getTrigDelay()
- if Npnts == None and sparsing == None:
- # using channel 1 as reference
- Npnts = self.getAvailableNumberOfPoints(1)
- t = np.arange(Npnts) / sampleRate * sparsing;
- t = t - timePerDiv * self.horizDivOnScreen/2 - trigDelay
- return(t)
-
- def getTrace(self, chNum, availableNpnts=None, maxRequiredPoints=None):
- wfVoltage, availableNpnts = self.getWaveform( chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints)
- t = self.getTimeTrace(availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints)
- tr = Trace( f'Ch{chNum}' )
- tr.xlabel = 'Time'
- tr.xunit = 'S'
- tr.ylabel = 'Voltage'
- tr.yunit = 'V'
- tr.x = t
- tr.y = wfVoltage
- return( tr )
-
-
-if __name__ == '__main__':
- print("testing")
- rm = pyvisa.ResourceManager()
- print(rm.list_resources())
- instr=rm.open_resource('TCPIP::192.168.0.61::INSTR')
- scope = SDS1104x(instr)
- print(f'ID: {scope.idn}')
- print(f'Ch1 mean: {scope.mean(1)}')
- print(f'Ch1 available points: {scope.getAvailableNumberOfPoints(1)}')
- print(f'Sample Rate: {scope.getSampleRate()}')
- print(f'Time per Div: {scope.getTimePerDiv()}')
- print(f'Ch1 Volts per Div: {scope.getChanVoltsPerDiv(1)}')
- print(f'Ch1 Voltage Offset: {scope.getChanOffset(1)}')
- ch1 = scope.getTrace(1)
-
-
+from .sds1104x import SDS1104X
diff --git a/scope/sds1104x.py b/scope/sds1104x.py
new file mode 100644
index 0000000..90c014b
--- /dev/null
+++ b/scope/sds1104x.py
@@ -0,0 +1,184 @@
+"""
+Provide basic class to operate scope
+Created by Eugeniy E. Mikhailov 2021/11/29
+"""
+
+from scope import Scope
+from datatrace import DataTrace
+import re
+import numpy as np
+
+class SDS1104X(Scope):
+ """ Siglent SDS1104x scope """
+ vertDivOnScreen = 10
+ horizDivOnScreen = 14
+ numberOfChannels = 4
+ def __init__(self, resource):
+ super().__init__(resource)
+ self.resource.read_termination='\n'
+ self.maxRequiredPoints = 1000; # desired number of points per channel, can return twice more
+
+ def response2numStr(self, strIn, firstSeparator=None, unit=None):
+ # A typical reply of Siglent is in the form 'TDIV 2.00E-08S'
+ # i.e. "<prefix><firstSeparator><numberString><unit>
+ # prefix='TDIV', firstSeparator=' ', numberString='2.00E-08', unit='S'
+ # this function parses the reply
+ spltStr = re.split(firstSeparator, strIn)
+ prefix = spltStr[0]
+ rstr = spltStr[1]
+ spltStr = re.split(unit, rstr)
+ numberString = spltStr[0]
+ unit = spltStr[1]
+ return (prefix, numberString, unit)
+
+ def mean(self, chNum):
+ # get mean on a specific channel calculated by scope
+ # PAVA stands for PArameter VAlue
+ qstr = f'C{chNum}:PAVA? MEAN'
+ rstr = self.query(qstr);
+ # reply is in the form 'C1:PAVA MEAN,3.00E-02V'
+ prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=',', unit='V')
+ return(float(numberString))
+
+ def getAvailableNumberOfPoints(self, chNum):
+ if chNum != 1 and chNum != 3:
+ # for whatever reason 'SAMPLE_NUM' fails for channel 2 and 4
+ chNum = 1
+ qstr = f'SAMPLE_NUM? C{chNum}'
+ rstr = self.query(qstr)
+ # reply is in the form 'SANU 7.00E+01pts'
+ prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='pts')
+ return(int(float(numberString)))
+
+ def getSampleRate(self):
+ rstr = self.query('SAMPLE_RATE?');
+ # expected reply is like 'SARA 1.00E+09Sa/s'
+ prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='Sa/s')
+ return(int(float(numberString)))
+
+ def calcSparsingAndNumPoints(self, availableNpnts=None, maxRequiredPoints=None):
+ if availableNpnts == None:
+ # using channel 1 to get availableNpnts
+ availableNpnts = self.getAvailableNumberOfPoints(1)
+ if maxRequiredPoints == None:
+ maxRequiredPoints = self.maxRequiredPoints
+
+ if availableNpnts <= maxRequiredPoints*2:
+ Npnts = availableNpnts
+ sparsing = 1
+ else:
+ sparsing = int(np.floor(availableNpnts/maxRequiredPoints))
+ Npnts = int(np.floor(availableNpnts/sparsing))
+ return(sparsing, Npnts, availableNpnts, maxRequiredPoints)
+
+ def getRawWaveform(self, chNum, availableNpnts=None, maxRequiredPoints=None):
+ (sparsing, Npnts, availableNpnts, maxRequiredPoints) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints)
+ if sparsing == 1 and Npnts == availableNpnts:
+ # we are getting all of it
+ cstr = f'WAVEFORM_SETUP NP,0,FP,0,SP,{sparsing}'
+ # technically when we know Npnts and sparsing
+ # we can use command from the follow up 'else' clause
+ else:
+ cstr = f'WAVEFORM_SETUP SP,{sparsing},NP,{Npnts},FP,0'
+ # Note: it is not enough to provide sparsing (SP),
+ # number of points (NP) needed to be calculated properly too!
+ # From the manual
+ # WAVEFORM_SETUP SP,<sparsing>,NP,<number>,FP,<point>
+ # SP Sparse point. It defines the interval between data points.
+ # For example:
+ # SP = 0 sends all data points.
+ # SP = 1 sends all data points.
+ # SP = 4 sends every 4th data point
+ # NP — Number of points. It indicates how many points should be transmitted.
+ # For example:
+ # NP = 0 sends all data points.
+ # NP = 50 sends a maximum of 50 data points.
+ # FP — First point. It specifies the address of the first data point to be sent.
+ # For example:
+ # FP = 0 corresponds to the first data point.
+ # FP = 1 corresponds to the second data point
+ self.write(cstr)
+
+ qstr = f'C{chNum}:WAVEFORM? DAT2'
+ wfRaw=self.query_binary_values(qstr, datatype='b', header_fmt='ieee', container=np.array, chunk_size=(Npnts+100))
+ # expected full reply: 'C1:WF DAT2,#9000000140.........'
+ return(wfRaw, availableNpnts, Npnts)
+
+ def getChanVoltsPerDiv(self, chNum):
+ qstr = f'C{chNum}:VDIV?'
+ rstr = self.query(qstr)
+ # expected reply to query: 'C1:VDIV 1.04E+00V'
+ prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='V')
+ return(float(numberString))
+
+ def getChanOffset(self, chNum):
+ qstr = f'C{chNum}:OFST?'
+ rstr = self.query(qstr)
+ # expected reply to query: 'C1:OFST -1.27E+00V'
+ prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='V')
+ return(float(numberString))
+
+ def getTimePerDiv(self):
+ qstr = f'TDIV?'
+ rstr = self.query(qstr)
+ # expected reply to query: 'TDIV 2.00E-08S'
+ prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='S')
+ return(float(numberString))
+
+ def getTrigDelay(self):
+ qstr = f'TRIG_DELAY?'
+ rstr = self.query(qstr)
+ # expected reply to query: 'TRDL -0.00E+00S'
+ prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='S')
+ return(float(numberString))
+
+
+ def getWaveform(self, chNum, availableNpnts=None, maxRequiredPoints=None):
+ wfRaw, availableNpnts, Npnts = self.getRawWaveform(chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints)
+ VoltageOffset = self.getChanOffset(chNum)
+ VoltsPerDiv = self.getChanVoltsPerDiv(chNum)
+ return( wfRaw * VoltsPerDiv * self.vertDivOnScreen/250 -VoltageOffset, availableNpnts )
+
+ def getTimeTrace(self, availableNpnts=None, maxRequiredPoints=None):
+ (sparsing, Npnts, availableNpnts, maxRequiredPoints) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints)
+ sampleRate = self.getSampleRate()
+ timePerDiv = self.getTimePerDiv()
+ trigDelay = self.getTrigDelay()
+ if Npnts == None and sparsing == None:
+ # using channel 1 as reference
+ Npnts = self.getAvailableNumberOfPoints(1)
+ t = np.arange(Npnts) / sampleRate * sparsing;
+ t = t - timePerDiv * self.horizDivOnScreen/2 - trigDelay
+ return(t)
+
+ def getTrace(self, chNum, availableNpnts=None, maxRequiredPoints=None):
+ wfVoltage, availableNpnts = self.getWaveform( chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints)
+ t = self.getTimeTrace(availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints)
+ tr = DataTrace( f'Ch{chNum}' )
+ tr.xlabel = 'Time'
+ tr.xunit = 'S'
+ tr.ylabel = 'Voltage'
+ tr.yunit = 'V'
+ tr.x = t
+ tr.y = wfVoltage
+ return( tr )
+
+
+if __name__ == '__main__':
+ import pyvisa
+ print("testing")
+ rm = pyvisa.ResourceManager()
+ print(rm.list_resources())
+ instr=rm.open_resource('TCPIP::192.168.0.61::INSTR')
+ scope = SDS1104X(instr)
+ print(f'ID: {scope.idn}')
+ print(f'Ch1 mean: {scope.mean(1)}')
+ print(f'Ch1 available points: {scope.getAvailableNumberOfPoints(1)}')
+ print(f'Sample Rate: {scope.getSampleRate()}')
+ print(f'Time per Div: {scope.getTimePerDiv()}')
+ print(f'Ch1 Volts per Div: {scope.getChanVoltsPerDiv(1)}')
+ print(f'Ch1 Voltage Offset: {scope.getChanOffset(1)}')
+ ch1 = scope.getTrace(1)
+
+
+