aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qolab/hardware/scope/rigolds1054z.py343
1 files changed, 343 insertions, 0 deletions
diff --git a/qolab/hardware/scope/rigolds1054z.py b/qolab/hardware/scope/rigolds1054z.py
new file mode 100644
index 0000000..ccb85e7
--- /dev/null
+++ b/qolab/hardware/scope/rigolds1054z.py
@@ -0,0 +1,343 @@
+"""
+Created by Eugeniy E. Mikhailov 2024/07/18
+"""
+
+from qolab.hardware.basic import BasicInstrument
+from qolab.hardware.scpi import SCPI_PROPERTY
+from ._basic import ScopeSCPI
+from qolab.hardware.scpi import response2numStr
+from qolab.data.trace import Trace, TraceXY
+import numpy as np
+import scipy.signal
+from pyvisa.constants import InterfaceType
+
+class RigolDS1054z(ScopeSCPI):
+ """Rigol 1054 scope"""
+
+ vertDivOnScreen = 8
+ horizDivOnScreen = 12
+
+ def __init__(self, resource, *args, **kwds):
+ super().__init__(resource, *args, **kwds)
+ self.config["Device model"] = "Rigol DS1054z"
+ self.resource.read_termination = "\n"
+ self.numberOfChannels = 4
+ self.maxRequiredPoints = 1200
+ # desired number of points per channel, can return twice more
+
+ TimePerDiv = SCPI_PROPERTY(
+ scpi_prfx=":TIMEBASE:MAIN:SCALE",
+ ptype=float,
+ doc="Scope Time per Division",
+ )
+
+ def getTimePerDiv(self):
+ return self.TimePerDiv
+
+ def setTimePerDiv(self, value):
+ self.TimePerDiv = value
+
+ TrigDelay = SCPI_PROPERTY(
+ scpi_prfx=":TIMEBASE:MAIN:OFFSET",
+ ptype=float,
+ doc="Scope Time Offset or Trigger Delay",
+ )
+
+ @BasicInstrument.tsdb_append
+ def getTrigDelay(self):
+ return self.TrigDelay
+
+ @BasicInstrument.tsdb_append
+ def setTrigDelay(self, value):
+ self.TrigDelay = value
+
+ @BasicInstrument.tsdb_append
+ def getChanVoltageOffset(self, chNum):
+ qstr = f":CHANnel{chNum}:OFFSet?"
+ rstr = self.query(qstr)
+ return float(rstr)
+
+ @BasicInstrument.tsdb_append
+ def setChanVoltageOffset(self, chNum, val):
+ cstr = f":CHANnel{chNum}:OFFSet {val}"
+ self.write(cstr)
+
+ @BasicInstrument.tsdb_append
+ def getChanVoltsPerDiv(self, chNum):
+ qstr = f":CHANnel{chNum}:SCALe?"
+ rstr = self.query(qstr)
+ return float(rstr)
+
+ @BasicInstrument.tsdb_append
+ def setChanVoltsPerDiv(self, chNum, vPerDiv):
+ cstr = f":CHANnel{chNum}:SCALe {vPerDiv}"
+ self.write(cstr)
+
+ @BasicInstrument.tsdb_append
+ def getTriggerStatus(self):
+ """Get Trigger Status.
+
+ We expect TD, WAIT, RUN, AUTO, or STOP.
+ """
+ res = self.query(":TRIGger:STATus?")
+ return res
+
+ @BasicInstrument.tsdb_append
+ def getTriggerMode(self):
+ """Get trigger mode.
+
+ We expect AUTO, NORM, or SING (for Single)
+ """
+ res = self.query(":TRIGger:SWEep?")
+ return res
+
+ @BasicInstrument.tsdb_append
+ def setTriggerMode(self, val):
+ """Set trigger mode.
+
+ Takes AUTO, NORMal, or SINGle
+ """
+ self.write(f"TRIGger:SWEep {val}")
+
+ def stop(self):
+ self.write(f":STOP")
+
+ def run(self):
+ self.write(f":RUN")
+
+ def getRawWaveform(
+ self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True
+ ):
+ """
+ Get raw channel waveform in binary format.
+
+ Parameters
+ ----------
+ chNum : int
+ Scope channel to use: 1, 2, 3, or 4
+ availableNpnts : int or None (default)
+ Available numnber of points. Do not set it if you want it auto detected.
+ maxRequiredPoints : int
+ Maximum number of required points, if we ask less than available
+ we well get sparse set which proportionally fills all available time range.
+ decimate : False or True (default)
+ Decimate should be read as apply the low pass filter or not, technically
+ for both setting we get decimation (i.e. smaller than available
+ at the scope number of points). The name came from
+ ``scipy.signal.decimate`` filtering function.
+ If ``decimate=True`` is used, we get all available points
+ and then low-pass filter them to get ``maxRequiredPoints``
+ The result is less noisy then, but transfer time from the instrument
+ is longer.
+ If ``decimate=False``, then it we are skipping points to get needed number
+ but we might see aliasing, if there is a high frequency noise
+ and sparing > 1. Unless you know what you doing, it is recommended
+ to use ``decimate=True``.
+ """
+
+ # if RAW is used the scope should be in STOP state
+ self.write(f":WAVeform:SOURce CHAN{chNum}")
+ self.write(":WAVeform:MODE RAW") # {NORMal|MAXimum|RAW} RAW gives maximum number of points
+ self.write(":WAVeform:FORMat BYTE") # {WORD|BYTE|ASCii}, scope is 8 bit, BYTE is enough
+ preamble = self.query(":WAVeform:PREamble?").split(",")
+ """
+ Format is
+ <format>,<type>,<points>,<count>,<xincrement>,<xorigin>,<xreference>,<yincrement>,<yorigin>,<yreference>
+ Wherein,
+ <format>: 0 (BYTE), 1 (WORD) or 2 (ASC).
+ <type>: 0 (NORMal), 1 (MAXimum) or 2 (RAW).
+ <points>: an integer between 1 and 12000000. After the memory depth option is
+ installed, <points> is an integer between 1 and 24000000.
+ <count>: the number of averages in the average sample mode and 1 in other modes.
+ <xincrement>: the time difference between two neighboring points in the X direction.
+ <xorigin>: the start time of the waveform data in the X direction.
+ <xreference>: the reference time of the data point in the X direction.
+ <yincrement>: the waveform increment in the Y direction.
+ <yorigin>: the vertical offset relative to the "Vertical Reference Position" in the Y direction.
+ <yreference>: the vertical reference position in the Y direction.
+ """
+ print(preamble)
+ Npnts = int(preamble[2])
+ wfRaw = np.zeros(Npnts, dtype=np.int8)
+ maxreadable = 200000 # 250000 is the maximum number of bytes readable in one go
+ strt = 1
+ stp = min(maxreadable,Npnts)
+ while (strt <= Npnts):
+ import time
+ stp = strt - 1 + maxreadable
+ stp = min(stp, Npnts)
+ print(f"{strt=} {stp=}")
+ # reading requested number of points in chunks of maxreadable
+ print("strt cnt")
+ self.write(f":WAVeform:STARt {strt}")
+ print(f"{self.wait_until_finished()}")
+ time.sleep(.1)
+ print("stop cnt")
+ self.write(f":WAVeform:STOP {stp}")
+ print(f"{self.wait_until_finished()}")
+ time.sleep(.1)
+ qstr = ":WAVeform:DATA?"
+ wfRawChunk = self.query_binary_values(
+ qstr,
+ datatype="b",
+ header_fmt="ieee",
+ container=np.array,
+ chunk_size=(maxreadable + 100),
+ )
+ print(f"{self.wait_until_finished()}")
+ wfRaw[strt-1:stp] = wfRawChunk
+ strt += maxreadable
+ print("waiting")
+ time.sleep(1)
+ if True:
+ return wfRaw
+
+ (
+ sparsing,
+ Npnts,
+ availableNpnts,
+ maxRequiredPoints,
+ ) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints)
+ if decimate:
+ Npnts = availableNpnts # get all of them and decimate later
+ if (sparsing == 1 and Npnts == availableNpnts) or decimate:
+ # We are getting all points of the trace
+ # Apparently sparsing has no effect with this command
+ # and effectively uses SP=1 for any sparsing
+ # but I want to make sure and force it
+ cstr = "WAVEFORM_SETUP NP,0,FP,0,SP,1"
+ # technically when we know Npnts and sparsing
+ # we can use command from the follow up 'else' clause
+ else:
+ # we just ask every point with 'sparsing' interval
+ # fast to grab but we could do better with more advance decimate
+ # method, which allow better precision for the price
+ # of longer acquisition time
+ cstr = f"WAVEFORM_SETUP SP,{sparsing},NP,{Npnts},FP,0"
+ # Note: it is not enough to provide sparsing (SP),
+ # number of points (NP) needed to be calculated properly too!
+ # From the manual
+ # WAVEFORM_SETUP SP,<sparsing>,NP,<number>,FP,<point>
+ # SP Sparse point. It defines the interval between data points.
+ # For example:
+ # SP = 0 sends all data points.
+ # SP = 1 sends all data points.
+ # SP = 4 sends every 4th data point
+ # NP — Number of points. It indicates how many points should be transmitted.
+ # For example:
+ # NP = 0 sends all data points.
+ # NP = 50 sends a maximum of 50 data points.
+ # FP — First point. It specifies the address of the first data point
+ # to be sent.
+ # For example:
+ # FP = 0 corresponds to the first data point.
+ # FP = 1 corresponds to the second data point
+ self.write(cstr)
+
+ trRaw = Trace(f"Ch{chNum}")
+
+ qstr = f"C{chNum}:WAVEFORM? DAT2"
+ # expected full reply: 'C1:WF DAT2,#9000000140.........'
+ try:
+ wfRaw = self.query_binary_values(
+ qstr,
+ datatype="b",
+ header_fmt="ieee",
+ container=np.array,
+ chunk_size=(Npnts + 100),
+ )
+ if self.resource.interface_type == InterfaceType.usb:
+ # Somehow on windows (at least with USB interface)
+ # there is a lingering empty string which we need to flush out
+ r = self.read()
+ if r != "":
+ print(f"WARNING: We expected an empty string but got {r=}")
+ trRaw.values = wfRaw.reshape(wfRaw.size, 1)
+ if decimate and sparsing != 1:
+ numtaps = 3
+ # not sure it is the best case
+ trRaw.values = scipy.signal.decimate(
+ trRaw.values, sparsing, numtaps, axis=0
+ )
+ except ValueError as err:
+ # most likely we get crazy number of points
+ # self.read() # flushing the bogus output of previous command
+ print(f"Error {err=}: getting waveform failed for {qstr=}")
+ wfRaw = np.array([])
+ trRaw.config["unit"] = "Count"
+ trRaw.config["tags"]["Decimate"] = decimate
+ return (trRaw, availableNpnts, Npnts)
+
+ def getWaveform(
+ self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True
+ ):
+ """
+ For decimate use see ``getRawWaveform``.
+
+ In short decimate=True is slower but more precise.
+ """
+ trRaw, availableNpnts, Npnts = self.getRawWaveform(
+ chNum,
+ availableNpnts=availableNpnts,
+ maxRequiredPoints=maxRequiredPoints,
+ decimate=decimate,
+ )
+ VoltageOffset = self.getChanVoltageOffset(chNum)
+ VoltsPerDiv = self.getChanVoltsPerDiv(chNum)
+ tr = trRaw
+ tr.values = (
+ trRaw.values * VoltsPerDiv * self.vertDivOnScreen / 250 - VoltageOffset
+ )
+ tr.config["unit"] = "Volt"
+ tr.config["tags"]["VoltageOffset"] = VoltageOffset
+ tr.config["tags"]["VoltsPerDiv"] = VoltsPerDiv
+ return (tr, availableNpnts)
+
+ def getTrace(
+ self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True
+ ):
+ old_trg_status = self.getTriggerStatus()
+ self.stop()
+ # to get synchronous channels
+ wfVoltage, availableNpnts = self.getWaveform(
+ chNum,
+ availableNpnts=availableNpnts,
+ maxRequiredPoints=maxRequiredPoints,
+ decimate=decimate,
+ )
+ t = self.getTimeTrace(
+ availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints
+ )
+ tr = TraceXY(f"Ch{chNum}")
+ tr.x = t
+ tr.y = wfVoltage
+ # restore scope to the before acquisition mode
+ if old_trg_mode != "STOP":
+ self.run()
+ return tr
+
+
+
+if __name__ == "__main__":
+ import pyvisa
+
+ print("testing")
+ rm = pyvisa.ResourceManager()
+ print(rm.list_resources())
+ # instr = rm.open_resource("TCPIP::192.168.0.62::INSTR")
+ instr = rm.open_resource("USB0::0x1AB1::0x04CE::DS1ZA170502787::0::INSTR")
+ scope = RigolDS1054z(instr)
+ print(f"ID: {scope.idn}")
+ print(f"TimePerDiv = scope.TimePerDiv")
+ # print(f'Ch1 mean: {scope.mean(1)}')
+ print(f"Ch1 available points: {scope.getAvailableNumberOfPoints(1)}")
+ print(f"Sample Rate: {scope.getSampleRate()}")
+ print(f"Time per Div: {scope.getTimePerDiv()}")
+ print(f"Ch1 Volts per Div: {scope.getChanVoltsPerDiv(1)}")
+ print(f"Ch1 Voltage Offset: {scope.getChanVoltageOffset(1)}")
+ print("------ Header start -------------")
+ print(str.join("\n", scope.getHeader()))
+ print("------ Header ends -------------")
+ ch1 = scope.getTrace(1)
+ traces = scope.getAllTraces()
+