From ace4b2cc75e187e070dfac8d40a0761481508554 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Thu, 18 Jul 2024 20:54:49 -0400 Subject: draft of Rigol DS1054z scope --- qolab/hardware/scope/rigolds1054z.py | 343 +++++++++++++++++++++++++++++++++++ 1 file changed, 343 insertions(+) create mode 100644 qolab/hardware/scope/rigolds1054z.py (limited to 'qolab/hardware/scope') diff --git a/qolab/hardware/scope/rigolds1054z.py b/qolab/hardware/scope/rigolds1054z.py new file mode 100644 index 0000000..ccb85e7 --- /dev/null +++ b/qolab/hardware/scope/rigolds1054z.py @@ -0,0 +1,343 @@ +""" +Created by Eugeniy E. Mikhailov 2024/07/18 +""" + +from qolab.hardware.basic import BasicInstrument +from qolab.hardware.scpi import SCPI_PROPERTY +from ._basic import ScopeSCPI +from qolab.hardware.scpi import response2numStr +from qolab.data.trace import Trace, TraceXY +import numpy as np +import scipy.signal +from pyvisa.constants import InterfaceType + +class RigolDS1054z(ScopeSCPI): + """Rigol 1054 scope""" + + vertDivOnScreen = 8 + horizDivOnScreen = 12 + + def __init__(self, resource, *args, **kwds): + super().__init__(resource, *args, **kwds) + self.config["Device model"] = "Rigol DS1054z" + self.resource.read_termination = "\n" + self.numberOfChannels = 4 + self.maxRequiredPoints = 1200 + # desired number of points per channel, can return twice more + + TimePerDiv = SCPI_PROPERTY( + scpi_prfx=":TIMEBASE:MAIN:SCALE", + ptype=float, + doc="Scope Time per Division", + ) + + def getTimePerDiv(self): + return self.TimePerDiv + + def setTimePerDiv(self, value): + self.TimePerDiv = value + + TrigDelay = SCPI_PROPERTY( + scpi_prfx=":TIMEBASE:MAIN:OFFSET", + ptype=float, + doc="Scope Time Offset or Trigger Delay", + ) + + @BasicInstrument.tsdb_append + def getTrigDelay(self): + return self.TrigDelay + + @BasicInstrument.tsdb_append + def setTrigDelay(self, value): + self.TrigDelay = value + + @BasicInstrument.tsdb_append + def getChanVoltageOffset(self, chNum): + qstr = f":CHANnel{chNum}:OFFSet?" + rstr = self.query(qstr) + return float(rstr) + + @BasicInstrument.tsdb_append + def setChanVoltageOffset(self, chNum, val): + cstr = f":CHANnel{chNum}:OFFSet {val}" + self.write(cstr) + + @BasicInstrument.tsdb_append + def getChanVoltsPerDiv(self, chNum): + qstr = f":CHANnel{chNum}:SCALe?" + rstr = self.query(qstr) + return float(rstr) + + @BasicInstrument.tsdb_append + def setChanVoltsPerDiv(self, chNum, vPerDiv): + cstr = f":CHANnel{chNum}:SCALe {vPerDiv}" + self.write(cstr) + + @BasicInstrument.tsdb_append + def getTriggerStatus(self): + """Get Trigger Status. + + We expect TD, WAIT, RUN, AUTO, or STOP. + """ + res = self.query(":TRIGger:STATus?") + return res + + @BasicInstrument.tsdb_append + def getTriggerMode(self): + """Get trigger mode. + + We expect AUTO, NORM, or SING (for Single) + """ + res = self.query(":TRIGger:SWEep?") + return res + + @BasicInstrument.tsdb_append + def setTriggerMode(self, val): + """Set trigger mode. + + Takes AUTO, NORMal, or SINGle + """ + self.write(f"TRIGger:SWEep {val}") + + def stop(self): + self.write(f":STOP") + + def run(self): + self.write(f":RUN") + + def getRawWaveform( + self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True + ): + """ + Get raw channel waveform in binary format. + + Parameters + ---------- + chNum : int + Scope channel to use: 1, 2, 3, or 4 + availableNpnts : int or None (default) + Available numnber of points. Do not set it if you want it auto detected. + maxRequiredPoints : int + Maximum number of required points, if we ask less than available + we well get sparse set which proportionally fills all available time range. + decimate : False or True (default) + Decimate should be read as apply the low pass filter or not, technically + for both setting we get decimation (i.e. smaller than available + at the scope number of points). The name came from + ``scipy.signal.decimate`` filtering function. + If ``decimate=True`` is used, we get all available points + and then low-pass filter them to get ``maxRequiredPoints`` + The result is less noisy then, but transfer time from the instrument + is longer. + If ``decimate=False``, then it we are skipping points to get needed number + but we might see aliasing, if there is a high frequency noise + and sparing > 1. Unless you know what you doing, it is recommended + to use ``decimate=True``. + """ + + # if RAW is used the scope should be in STOP state + self.write(f":WAVeform:SOURce CHAN{chNum}") + self.write(":WAVeform:MODE RAW") # {NORMal|MAXimum|RAW} RAW gives maximum number of points + self.write(":WAVeform:FORMat BYTE") # {WORD|BYTE|ASCii}, scope is 8 bit, BYTE is enough + preamble = self.query(":WAVeform:PREamble?").split(",") + """ + Format is + ,,,,,,,,, + Wherein, + : 0 (BYTE), 1 (WORD) or 2 (ASC). + : 0 (NORMal), 1 (MAXimum) or 2 (RAW). + : an integer between 1 and 12000000. After the memory depth option is + installed, is an integer between 1 and 24000000. + : the number of averages in the average sample mode and 1 in other modes. + : the time difference between two neighboring points in the X direction. + : the start time of the waveform data in the X direction. + : the reference time of the data point in the X direction. + : the waveform increment in the Y direction. + : the vertical offset relative to the "Vertical Reference Position" in the Y direction. + : the vertical reference position in the Y direction. + """ + print(preamble) + Npnts = int(preamble[2]) + wfRaw = np.zeros(Npnts, dtype=np.int8) + maxreadable = 200000 # 250000 is the maximum number of bytes readable in one go + strt = 1 + stp = min(maxreadable,Npnts) + while (strt <= Npnts): + import time + stp = strt - 1 + maxreadable + stp = min(stp, Npnts) + print(f"{strt=} {stp=}") + # reading requested number of points in chunks of maxreadable + print("strt cnt") + self.write(f":WAVeform:STARt {strt}") + print(f"{self.wait_until_finished()}") + time.sleep(.1) + print("stop cnt") + self.write(f":WAVeform:STOP {stp}") + print(f"{self.wait_until_finished()}") + time.sleep(.1) + qstr = ":WAVeform:DATA?" + wfRawChunk = self.query_binary_values( + qstr, + datatype="b", + header_fmt="ieee", + container=np.array, + chunk_size=(maxreadable + 100), + ) + print(f"{self.wait_until_finished()}") + wfRaw[strt-1:stp] = wfRawChunk + strt += maxreadable + print("waiting") + time.sleep(1) + if True: + return wfRaw + + ( + sparsing, + Npnts, + availableNpnts, + maxRequiredPoints, + ) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints) + if decimate: + Npnts = availableNpnts # get all of them and decimate later + if (sparsing == 1 and Npnts == availableNpnts) or decimate: + # We are getting all points of the trace + # Apparently sparsing has no effect with this command + # and effectively uses SP=1 for any sparsing + # but I want to make sure and force it + cstr = "WAVEFORM_SETUP NP,0,FP,0,SP,1" + # technically when we know Npnts and sparsing + # we can use command from the follow up 'else' clause + else: + # we just ask every point with 'sparsing' interval + # fast to grab but we could do better with more advance decimate + # method, which allow better precision for the price + # of longer acquisition time + cstr = f"WAVEFORM_SETUP SP,{sparsing},NP,{Npnts},FP,0" + # Note: it is not enough to provide sparsing (SP), + # number of points (NP) needed to be calculated properly too! + # From the manual + # WAVEFORM_SETUP SP,,NP,,FP, + # SP Sparse point. It defines the interval between data points. + # For example: + # SP = 0 sends all data points. + # SP = 1 sends all data points. + # SP = 4 sends every 4th data point + # NP — Number of points. It indicates how many points should be transmitted. + # For example: + # NP = 0 sends all data points. + # NP = 50 sends a maximum of 50 data points. + # FP — First point. It specifies the address of the first data point + # to be sent. + # For example: + # FP = 0 corresponds to the first data point. + # FP = 1 corresponds to the second data point + self.write(cstr) + + trRaw = Trace(f"Ch{chNum}") + + qstr = f"C{chNum}:WAVEFORM? DAT2" + # expected full reply: 'C1:WF DAT2,#9000000140.........' + try: + wfRaw = self.query_binary_values( + qstr, + datatype="b", + header_fmt="ieee", + container=np.array, + chunk_size=(Npnts + 100), + ) + if self.resource.interface_type == InterfaceType.usb: + # Somehow on windows (at least with USB interface) + # there is a lingering empty string which we need to flush out + r = self.read() + if r != "": + print(f"WARNING: We expected an empty string but got {r=}") + trRaw.values = wfRaw.reshape(wfRaw.size, 1) + if decimate and sparsing != 1: + numtaps = 3 + # not sure it is the best case + trRaw.values = scipy.signal.decimate( + trRaw.values, sparsing, numtaps, axis=0 + ) + except ValueError as err: + # most likely we get crazy number of points + # self.read() # flushing the bogus output of previous command + print(f"Error {err=}: getting waveform failed for {qstr=}") + wfRaw = np.array([]) + trRaw.config["unit"] = "Count" + trRaw.config["tags"]["Decimate"] = decimate + return (trRaw, availableNpnts, Npnts) + + def getWaveform( + self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True + ): + """ + For decimate use see ``getRawWaveform``. + + In short decimate=True is slower but more precise. + """ + trRaw, availableNpnts, Npnts = self.getRawWaveform( + chNum, + availableNpnts=availableNpnts, + maxRequiredPoints=maxRequiredPoints, + decimate=decimate, + ) + VoltageOffset = self.getChanVoltageOffset(chNum) + VoltsPerDiv = self.getChanVoltsPerDiv(chNum) + tr = trRaw + tr.values = ( + trRaw.values * VoltsPerDiv * self.vertDivOnScreen / 250 - VoltageOffset + ) + tr.config["unit"] = "Volt" + tr.config["tags"]["VoltageOffset"] = VoltageOffset + tr.config["tags"]["VoltsPerDiv"] = VoltsPerDiv + return (tr, availableNpnts) + + def getTrace( + self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True + ): + old_trg_status = self.getTriggerStatus() + self.stop() + # to get synchronous channels + wfVoltage, availableNpnts = self.getWaveform( + chNum, + availableNpnts=availableNpnts, + maxRequiredPoints=maxRequiredPoints, + decimate=decimate, + ) + t = self.getTimeTrace( + availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints + ) + tr = TraceXY(f"Ch{chNum}") + tr.x = t + tr.y = wfVoltage + # restore scope to the before acquisition mode + if old_trg_mode != "STOP": + self.run() + return tr + + + +if __name__ == "__main__": + import pyvisa + + print("testing") + rm = pyvisa.ResourceManager() + print(rm.list_resources()) + # instr = rm.open_resource("TCPIP::192.168.0.62::INSTR") + instr = rm.open_resource("USB0::0x1AB1::0x04CE::DS1ZA170502787::0::INSTR") + scope = RigolDS1054z(instr) + print(f"ID: {scope.idn}") + print(f"TimePerDiv = scope.TimePerDiv") + # print(f'Ch1 mean: {scope.mean(1)}') + print(f"Ch1 available points: {scope.getAvailableNumberOfPoints(1)}") + print(f"Sample Rate: {scope.getSampleRate()}") + print(f"Time per Div: {scope.getTimePerDiv()}") + print(f"Ch1 Volts per Div: {scope.getChanVoltsPerDiv(1)}") + print(f"Ch1 Voltage Offset: {scope.getChanVoltageOffset(1)}") + print("------ Header start -------------") + print(str.join("\n", scope.getHeader())) + print("------ Header ends -------------") + ch1 = scope.getTrace(1) + traces = scope.getAllTraces() + -- cgit v1.2.3