aboutsummaryrefslogtreecommitdiff
path: root/qolab
diff options
context:
space:
mode:
Diffstat (limited to 'qolab')
-rw-r--r--qolab/__init__.py0
-rw-r--r--qolab/data/__init__.py6
-rw-r--r--qolab/data/trace.py16
-rw-r--r--qolab/hardware/scope/__init__.py31
-rw-r--r--qolab/hardware/scope/sds1104x.py184
-rw-r--r--qolab/hardware/scpi.py63
-rw-r--r--qolab/test/test_scope_sds1104x.py19
7 files changed, 319 insertions, 0 deletions
diff --git a/qolab/__init__.py b/qolab/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/qolab/__init__.py
diff --git a/qolab/data/__init__.py b/qolab/data/__init__.py
new file mode 100644
index 0000000..528815a
--- /dev/null
+++ b/qolab/data/__init__.py
@@ -0,0 +1,6 @@
+
+from .trace import Trace
+
+__all__ = [
+ "Trace",
+]
diff --git a/qolab/data/trace.py b/qolab/data/trace.py
new file mode 100644
index 0000000..9b14264
--- /dev/null
+++ b/qolab/data/trace.py
@@ -0,0 +1,16 @@
+
+class Trace:
+ def __init__(self, descrStr):
+ self.descr = descrStr
+ self.x = None
+ self.xlabel = None
+ self.xunit = None
+ self.y = None
+ self.ylabel = None
+ self.yunit = None
+
+ def plot(self):
+ import matplotlib.pyplot as plt
+ plt.plot(self.x, self.y, label=self.descr)
+ plt.legend()
+
diff --git a/qolab/hardware/scope/__init__.py b/qolab/hardware/scope/__init__.py
new file mode 100644
index 0000000..a323b8f
--- /dev/null
+++ b/qolab/hardware/scope/__init__.py
@@ -0,0 +1,31 @@
+"""
+Provide basic class to operate scope
+Created by Eugeniy E. Mikhailov 2021/11/29
+"""
+from qolab.hardware.scpi import SCPIinstr
+
+class Scope:
+
+ # Minimal set of methods to be implemented by a scope.
+ # Should work with minimal arguments list
+ # but might be faster if parameters provided: less IO requests
+
+ def __init__(self):
+ self.numberOfChannels = 0
+
+ def getTrace(self, chNum, availableNpnts=None, maxRequiredPoints=None):
+ warnings.warn( 'this function is not implemented' )
+
+class ScopeSCPI(SCPIinstr, Scope):
+ """
+ Do not instantiate directly, use
+ rm = pyvisa.ResourceManager()
+ ScopeSCPI(rm.open_resource('TCPIP::192.168.0.2::INSTR'))
+ """
+ pass
+ def __init__(self, resource):
+ SCPIinstr.__init__(self, resource)
+ Scope.__init__(self)
+
+from .sds1104x import SDS1104X
+
diff --git a/qolab/hardware/scope/sds1104x.py b/qolab/hardware/scope/sds1104x.py
new file mode 100644
index 0000000..292ea7c
--- /dev/null
+++ b/qolab/hardware/scope/sds1104x.py
@@ -0,0 +1,184 @@
+"""
+Provide basic class to operate scope
+Created by Eugeniy E. Mikhailov 2021/11/29
+"""
+
+from qolab.hardware.scope import ScopeSCPI
+from qolab.data.trace import Trace
+import re
+import numpy as np
+
+class SDS1104X(ScopeSCPI):
+ """ Siglent SDS1104x scope """
+ vertDivOnScreen = 10
+ horizDivOnScreen = 14
+ def __init__(self, resource):
+ super().__init__(resource)
+ self.resource.read_termination='\n'
+ self.numberOfChannels = 4
+ self.maxRequiredPoints = 1000; # desired number of points per channel, can return twice more
+
+ def response2numStr(self, strIn, firstSeparator=None, unit=None):
+ # A typical reply of Siglent is in the form 'TDIV 2.00E-08S'
+ # i.e. "<prefix><firstSeparator><numberString><unit>
+ # prefix='TDIV', firstSeparator=' ', numberString='2.00E-08', unit='S'
+ # this function parses the reply
+ spltStr = re.split(firstSeparator, strIn)
+ prefix = spltStr[0]
+ rstr = spltStr[1]
+ spltStr = re.split(unit, rstr)
+ numberString = spltStr[0]
+ unit = spltStr[1]
+ return (prefix, numberString, unit)
+
+ def mean(self, chNum):
+ # get mean on a specific channel calculated by scope
+ # PAVA stands for PArameter VAlue
+ qstr = f'C{chNum}:PAVA? MEAN'
+ rstr = self.query(qstr);
+ # reply is in the form 'C1:PAVA MEAN,3.00E-02V'
+ prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=',', unit='V')
+ return(float(numberString))
+
+ def getAvailableNumberOfPoints(self, chNum):
+ if chNum != 1 and chNum != 3:
+ # for whatever reason 'SAMPLE_NUM' fails for channel 2 and 4
+ chNum = 1
+ qstr = f'SAMPLE_NUM? C{chNum}'
+ rstr = self.query(qstr)
+ # reply is in the form 'SANU 7.00E+01pts'
+ prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='pts')
+ return(int(float(numberString)))
+
+ def getSampleRate(self):
+ rstr = self.query('SAMPLE_RATE?');
+ # expected reply is like 'SARA 1.00E+09Sa/s'
+ prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='Sa/s')
+ return(int(float(numberString)))
+
+ def calcSparsingAndNumPoints(self, availableNpnts=None, maxRequiredPoints=None):
+ if availableNpnts is None:
+ # using channel 1 to get availableNpnts
+ availableNpnts = self.getAvailableNumberOfPoints(1)
+ if maxRequiredPoints is None:
+ maxRequiredPoints = self.maxRequiredPoints
+
+ if availableNpnts <= maxRequiredPoints*2:
+ Npnts = availableNpnts
+ sparsing = 1
+ else:
+ sparsing = int(np.floor(availableNpnts/maxRequiredPoints))
+ Npnts = int(np.floor(availableNpnts/sparsing))
+ return(sparsing, Npnts, availableNpnts, maxRequiredPoints)
+
+ def getRawWaveform(self, chNum, availableNpnts=None, maxRequiredPoints=None):
+ (sparsing, Npnts, availableNpnts, maxRequiredPoints) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints)
+ if sparsing == 1 and Npnts == availableNpnts:
+ # we are getting all of it
+ cstr = f'WAVEFORM_SETUP NP,0,FP,0,SP,{sparsing}'
+ # technically when we know Npnts and sparsing
+ # we can use command from the follow up 'else' clause
+ else:
+ cstr = f'WAVEFORM_SETUP SP,{sparsing},NP,{Npnts},FP,0'
+ # Note: it is not enough to provide sparsing (SP),
+ # number of points (NP) needed to be calculated properly too!
+ # From the manual
+ # WAVEFORM_SETUP SP,<sparsing>,NP,<number>,FP,<point>
+ # SP Sparse point. It defines the interval between data points.
+ # For example:
+ # SP = 0 sends all data points.
+ # SP = 1 sends all data points.
+ # SP = 4 sends every 4th data point
+ # NP — Number of points. It indicates how many points should be transmitted.
+ # For example:
+ # NP = 0 sends all data points.
+ # NP = 50 sends a maximum of 50 data points.
+ # FP — First point. It specifies the address of the first data point to be sent.
+ # For example:
+ # FP = 0 corresponds to the first data point.
+ # FP = 1 corresponds to the second data point
+ self.write(cstr)
+
+ qstr = f'C{chNum}:WAVEFORM? DAT2'
+ wfRaw=self.query_binary_values(qstr, datatype='b', header_fmt='ieee', container=np.array, chunk_size=(Npnts+100))
+ # expected full reply: 'C1:WF DAT2,#9000000140.........'
+ return(wfRaw, availableNpnts, Npnts)
+
+ def getChanVoltsPerDiv(self, chNum):
+ qstr = f'C{chNum}:VDIV?'
+ rstr = self.query(qstr)
+ # expected reply to query: 'C1:VDIV 1.04E+00V'
+ prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='V')
+ return(float(numberString))
+
+ def getChanOffset(self, chNum):
+ qstr = f'C{chNum}:OFST?'
+ rstr = self.query(qstr)
+ # expected reply to query: 'C1:OFST -1.27E+00V'
+ prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='V')
+ return(float(numberString))
+
+ def getTimePerDiv(self):
+ qstr = f'TDIV?'
+ rstr = self.query(qstr)
+ # expected reply to query: 'TDIV 2.00E-08S'
+ prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='S')
+ return(float(numberString))
+
+ def getTrigDelay(self):
+ qstr = f'TRIG_DELAY?'
+ rstr = self.query(qstr)
+ # expected reply to query: 'TRDL -0.00E+00S'
+ prefix, numberString, unit = self.response2numStr(rstr, firstSeparator=' ', unit='S')
+ return(float(numberString))
+
+
+ def getWaveform(self, chNum, availableNpnts=None, maxRequiredPoints=None):
+ wfRaw, availableNpnts, Npnts = self.getRawWaveform(chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints)
+ VoltageOffset = self.getChanOffset(chNum)
+ VoltsPerDiv = self.getChanVoltsPerDiv(chNum)
+ return( wfRaw * VoltsPerDiv * self.vertDivOnScreen/250 -VoltageOffset, availableNpnts )
+
+ def getTimeTrace(self, availableNpnts=None, maxRequiredPoints=None):
+ (sparsing, Npnts, availableNpnts, maxRequiredPoints) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints)
+ sampleRate = self.getSampleRate()
+ timePerDiv = self.getTimePerDiv()
+ trigDelay = self.getTrigDelay()
+ if Npnts is None and sparsing is None:
+ # using channel 1 as reference
+ Npnts = self.getAvailableNumberOfPoints(1)
+ t = np.arange(Npnts) / sampleRate * sparsing;
+ t = t - timePerDiv * self.horizDivOnScreen/2 - trigDelay
+ return(t)
+
+ def getTrace(self, chNum, availableNpnts=None, maxRequiredPoints=None):
+ wfVoltage, availableNpnts = self.getWaveform( chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints)
+ t = self.getTimeTrace(availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints)
+ tr = Trace( f'Ch{chNum}' )
+ tr.xlabel = 'Time'
+ tr.xunit = 'S'
+ tr.ylabel = 'Voltage'
+ tr.yunit = 'V'
+ tr.x = t
+ tr.y = wfVoltage
+ return( tr )
+
+
+if __name__ == '__main__':
+ import pyvisa
+ print("testing")
+ rm = pyvisa.ResourceManager()
+ print(rm.list_resources())
+ instr=rm.open_resource('TCPIP::192.168.0.61::INSTR')
+ scope = SDS1104X(instr)
+ print(f'ID: {scope.idn}')
+ print(f'Ch1 mean: {scope.mean(1)}')
+ print(f'Ch1 available points: {scope.getAvailableNumberOfPoints(1)}')
+ print(f'Sample Rate: {scope.getSampleRate()}')
+ print(f'Time per Div: {scope.getTimePerDiv()}')
+ print(f'Ch1 Volts per Div: {scope.getChanVoltsPerDiv(1)}')
+ print(f'Ch1 Voltage Offset: {scope.getChanOffset(1)}')
+ ch1 = scope.getTrace(1)
+
+
+
diff --git a/qolab/hardware/scpi.py b/qolab/hardware/scpi.py
new file mode 100644
index 0000000..279ef24
--- /dev/null
+++ b/qolab/hardware/scpi.py
@@ -0,0 +1,63 @@
+"""
+provide basic class to operate SCPI capable instruments
+"""
+
+class SCPIinstr:
+ """ Basic class which support SCPI commands """
+ """
+ Do not instantiate directly, use
+ rm = pyvisa.ResourceManager()
+ SCPIinstr(rm.open_resource('TCPIP::192.168.0.2::INSTR'))
+ """
+ def __init__(self, resource):
+ self.resource = resource
+
+ # convenience pyvisa functions
+ self.write = self.resource.write
+ self.read = self.resource.read
+ self.query = self.resource.query
+ self.read_bytes = self.resource.read_bytes
+ self.read_binary_values = self.resource.read_binary_values
+ self.query_binary_values = self.resource.query_binary_values
+
+ @property
+ def idn(self):
+ return self.query("*IDN?")
+
+ def clear_status(self):
+ self.write("*CLS")
+
+ def set_event_status_enable(self):
+ self.write("*ESE")
+
+ def query_event_status_enable(self):
+ self.query("*ESE?")
+
+ def query_event_status_register(self):
+ self.query("*ESR?")
+
+ def set_wait_until_finished(self):
+ self.query("*OPC")
+
+ def wait_until_finished(self):
+ self.query("*OPC?")
+
+ def reset(self):
+ self.write("*RST")
+
+ def set_service_request_enable(self):
+ self.write("*SRE")
+
+ def query_service_request_enable(self):
+ self.query("*SRE?")
+
+ def query_status_byte(self):
+ self.query("*STB?")
+
+ def self_test_result(self):
+ self.query("*TST?")
+
+ def wait(self):
+ self.write("*WAI")
+
+
diff --git a/qolab/test/test_scope_sds1104x.py b/qolab/test/test_scope_sds1104x.py
new file mode 100644
index 0000000..c5c890f
--- /dev/null
+++ b/qolab/test/test_scope_sds1104x.py
@@ -0,0 +1,19 @@
+import pyvisa
+from qolab.hardware.scope import SDS1104X
+
+if __name__ == '__main__':
+ print("Testing SDS1104X")
+ rm = pyvisa.ResourceManager()
+ print(rm.list_resources())
+ instr=rm.open_resource('TCPIP::192.168.0.61::INSTR')
+ scope = SDS1104X(instr)
+ print(f'ID: {scope.idn}')
+ print(f'Ch1 mean: {scope.mean(1)}')
+ print(f'Ch1 available points: {scope.getAvailableNumberOfPoints(1)}')
+ print(f'Sample Rate: {scope.getSampleRate()}')
+ print(f'Time per Div: {scope.getTimePerDiv()}')
+ print(f'Ch1 Volts per Div: {scope.getChanVoltsPerDiv(1)}')
+ print(f'Ch1 Voltage Offset: {scope.getChanOffset(1)}')
+ ch1 = scope.getTrace(1)
+ ch1.plot()
+