""" Created by Eugeniy E. Mikhailov 2024/07/18 """ from qolab.hardware.basic import BasicInstrument from qolab.hardware.scpi import SCPI_PROPERTY from ._basic import ScopeSCPI, calcSparsingAndNumPoints from qolab.data.trace import Trace, TraceXY import numpy as np import scipy.signal from pyvisa.errors import VisaIOError import logging logging.basicConfig( format="%(asctime)s %(levelname)8s %(name)s: %(message)s", datefmt="%m/%d/%Y %H:%M:%S", ) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) class RigolDS1054z(ScopeSCPI): """Rigol 1054 scope""" vertDivOnScreen = 8 horizDivOnScreen = 12 def __init__(self, resource, *args, **kwds): super().__init__(resource, *args, **kwds) self.config["Device model"] = "Rigol DS1054z" self.resource.read_termination = "\n" self.numberOfChannels = 4 self.maxRequiredPoints = 1200 self.resource.timeout = 500 # timeout in ms # desired number of points per channel, can return twice more TimePerDiv = SCPI_PROPERTY( scpi_prfx=":TIMEBASE:MAIN:SCALE", ptype=float, doc="Scope Time per Division", ) def getTimePerDiv(self): return self.TimePerDiv def setTimePerDiv(self, value): self.TimePerDiv = value TrigDelay = SCPI_PROPERTY( scpi_prfx=":TIMEBASE:MAIN:OFFSET", ptype=float, doc="Scope Time Offset or Trigger Delay", ) @BasicInstrument.tsdb_append def getTrigDelay(self): return self.TrigDelay @BasicInstrument.tsdb_append def setTrigDelay(self, value): self.TrigDelay = value @BasicInstrument.tsdb_append def getChanVoltageOffset(self, chNum): qstr = f":CHANnel{chNum}:OFFSet?" rstr = self.query(qstr) return float(rstr) @BasicInstrument.tsdb_append def setChanVoltageOffset(self, chNum, val): cstr = f":CHANnel{chNum}:OFFSet {val}" self.write(cstr) @BasicInstrument.tsdb_append def getChanVoltsPerDiv(self, chNum): qstr = f":CHANnel{chNum}:SCALe?" rstr = self.query(qstr) return float(rstr) @BasicInstrument.tsdb_append def setChanVoltsPerDiv(self, chNum, vPerDiv): cstr = f":CHANnel{chNum}:SCALe {vPerDiv}" self.write(cstr) @BasicInstrument.tsdb_append def getTriggerStatus(self): """Get Trigger Status. We expect TD, WAIT, RUN, AUTO, or STOP. """ res = self.query(":TRIGger:STATus?") return res @BasicInstrument.tsdb_append def getTriggerMode(self): """Get trigger mode. We expect AUTO, NORM, or SING (for Single) """ res = self.query(":TRIGger:SWEep?") return res @BasicInstrument.tsdb_append def setTriggerMode(self, val): """Set trigger mode. Takes AUTO, NORMal, or SINGle """ self.write(f"TRIGger:SWEep {val}") def stop(self): self.write(":STOP") def run(self): self.write(":RUN") def getRawWaveform( self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True ): """ Get raw channel waveform in binary format. Parameters ---------- chNum : int Scope channel to use: 1, 2, 3, or 4 availableNpnts : int or None (default) Available number of points. Do not set it if you want it auto detected. maxRequiredPoints : int Maximum number of required points, if we ask less than available we well get sparse set which proportionally fills all available time range. decimate : False or True (default) Decimate should be read as apply the low pass filter or not, technically for both setting we get decimation (i.e. smaller than available at the scope number of points). The name came from ``scipy.signal.decimate`` filtering function. If ``decimate=True`` is used, we get all available points and then low-pass filter them to get ``maxRequiredPoints`` The result is less noisy then, but transfer time from the instrument is longer. If ``decimate=False``, then it we are skipping points to get needed number but we might see aliasing, if there is a high frequency noise and sparing > 1. Unless you know what you doing, it is recommended to use ``decimate=True``. """ # if RAW is used the scope should be in STOP state self.write(f":WAVeform:SOURce CHAN{chNum}") self.write( ":WAVeform:MODE RAW" ) # {NORMal|MAXimum|RAW} RAW gives maximum number of points self.write( ":WAVeform:FORMat BYTE" ) # {WORD|BYTE|ASCii}, scope is 8 bit, BYTE is enough preamble = self.query(":WAVeform:PREamble?").split(",") """ Format is ,,,,,,,,, Wherein, : 0 (BYTE), 1 (WORD) or 2 (ASC). : 0 (NORMal), 1 (MAXimum) or 2 (RAW). : an integer between 1 and 12000000. After the memory depth option is installed, is an integer between 1 and 24000000. : the number of averages in the average sample mode and 1 in other modes. : the time difference between two neighboring points in the X direction. : the start time of the waveform data in the X direction. : the reference time of the data point in the X direction. : the waveform increment in the Y direction. : the vertical offset relative to the "Vertical Reference Position" in the Y direction. : the vertical reference position in the Y direction. """ availableNpnts = int(preamble[2]) wfRaw = np.zeros(availableNpnts, dtype=np.uint8) maxreadable = 250_000 # the maximum number of bytes readable in one go chunk_size = 70_000 # unfortunately large chunk size prone to read errors errCnt = 0 strt = 1 stp = min(chunk_size, availableNpnts) errorFreeChunkSize = [] errorProneChunkSize = [] while strt <= availableNpnts: stp = strt - 1 + chunk_size stp = min(stp, availableNpnts) chunk_size = stp-strt+1 # reading requested number of points in chunks self.write(f":WAVeform:STARt {strt}") self.write(f":WAVeform:STOP {stp}") qstr = ":WAVeform:DATA?" try: wfRawChunk = self.query_binary_values( qstr, datatype="b", header_fmt="ieee", container=np.array, chunk_size=(chunk_size + 100), ) if len(wfRawChunk) == 0: logger.info("Got empty chunk. Redoing.") continue # we need to repeat chunk read if len(wfRawChunk) != chunk_size: logger.info("Expected chunk with length" + f" {chunk_size} but got {len(wfRawChunk)}") logger.info(f"Current pointers are {strt=} {stp=} with {chunk_size=}") logger.info("Redoing, chunk reading.") continue # we need to repeat chunk read wfRaw[strt - 1 : stp] = wfRawChunk """ All this craziness with tuning chunk_size and catching VisaIOError is because Rigol usbtmc connection is buggy. It present itself as high speed device over USB, but set incompatible packet size of 64 while the USB standard dictates 512. In linux dmesg complains: 'bulk endpoint 0x3 has invalid maxpacket 64' """ strt += chunk_size errorFreeChunkSize.append(chunk_size) chunk_size = min(maxreadable, int(chunk_size * 1.1)) except VisaIOError as err: logger.info(f"Detected recoverable {err}") errCnt += 1 errorProneChunkSize.append(chunk_size) logger.debug(f"Visa error count is {errCnt} while reading raw chunk the scope") logger.debug(f"Current pointers are {strt=} {stp=} with {chunk_size=}") if len(errorFreeChunkSize) > 10: chunk_size = int(np.mean(errorFreeChunkSize)) else: chunk_size = max(1, int(np.mean(errorProneChunkSize) * 0.8)) logger.debug(f"New {chunk_size=}") logger.debug("Redoing, chunk reading.") pass # we repeat this loop iteration again logger.debug(f"final {chunk_size=}") if True: return wfRaw, errorFreeChunkSize, errorProneChunkSize ( sparsing, Npnts, availableNpnts, maxRequiredPoints, ) = self.calcSparsingAndNumPoints(availableNpnts, maxRequiredPoints) if decimate: Npnts = availableNpnts # get all of them and decimate later if (sparsing == 1 and Npnts == availableNpnts) or decimate: # We are getting all points of the trace # Apparently sparsing has no effect with this command # and effectively uses SP=1 for any sparsing # but I want to make sure and force it cstr = "WAVEFORM_SETUP NP,0,FP,0,SP,1" # technically when we know Npnts and sparsing # we can use command from the follow up 'else' clause else: # we just ask every point with 'sparsing' interval # fast to grab but we could do better with more advance decimate # method, which allow better precision for the price # of longer acquisition time cstr = f"WAVEFORM_SETUP SP,{sparsing},NP,{Npnts},FP,0" # Note: it is not enough to provide sparsing (SP), # number of points (NP) needed to be calculated properly too! # From the manual # WAVEFORM_SETUP SP,,NP,,FP, # SP Sparse point. It defines the interval between data points. # For example: # SP = 0 sends all data points. # SP = 1 sends all data points. # SP = 4 sends every 4th data point # NP — Number of points. It indicates how many points should be transmitted. # For example: # NP = 0 sends all data points. # NP = 50 sends a maximum of 50 data points. # FP — First point. It specifies the address of the first data point # to be sent. # For example: # FP = 0 corresponds to the first data point. # FP = 1 corresponds to the second data point self.write(cstr) trRaw = Trace(f"Ch{chNum}") qstr = f"C{chNum}:WAVEFORM? DAT2" # expected full reply: 'C1:WF DAT2,#9000000140.........' try: wfRaw = self.query_binary_values( qstr, datatype="b", header_fmt="ieee", container=np.array, chunk_size=(Npnts + 100), ) if self.resource.interface_type == InterfaceType.usb: # Somehow on windows (at least with USB interface) # there is a lingering empty string which we need to flush out r = self.read() if r != "": print(f"WARNING: We expected an empty string but got {r=}") trRaw.values = wfRaw.reshape(wfRaw.size, 1) if decimate and sparsing != 1: numtaps = 3 # not sure it is the best case trRaw.values = scipy.signal.decimate( trRaw.values, sparsing, numtaps, axis=0 ) except ValueError as err: # most likely we get crazy number of points # self.read() # flushing the bogus output of previous command print(f"Error {err=}: getting waveform failed for {qstr=}") wfRaw = np.array([]) trRaw.config["unit"] = "Count" trRaw.config["tags"]["Decimate"] = decimate return (trRaw, availableNpnts, Npnts) def getWaveform( self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True ): """ For decimate use see ``getRawWaveform``. In short decimate=True is slower but more precise. """ trRaw, availableNpnts, Npnts = self.getRawWaveform( chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints, decimate=decimate, ) VoltageOffset = self.getChanVoltageOffset(chNum) VoltsPerDiv = self.getChanVoltsPerDiv(chNum) tr = trRaw tr.values = ( trRaw.values * VoltsPerDiv * self.vertDivOnScreen / 250 - VoltageOffset ) tr.config["unit"] = "Volt" tr.config["tags"]["VoltageOffset"] = VoltageOffset tr.config["tags"]["VoltsPerDiv"] = VoltsPerDiv return (tr, availableNpnts) def getTrace( self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True ): old_trg_status = self.getTriggerStatus() self.stop() # to get synchronous channels wfVoltage, availableNpnts = self.getWaveform( chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints, decimate=decimate, ) t = self.getTimeTrace( availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints ) tr = TraceXY(f"Ch{chNum}") tr.x = t tr.y = wfVoltage # restore scope to the before acquisition mode if old_trg_status != "STOP": self.run() return tr if __name__ == "__main__": import pyvisa print("testing") rm = pyvisa.ResourceManager() print(rm.list_resources()) # instr = rm.open_resource("TCPIP::192.168.0.62::INSTR") instr = rm.open_resource("USB0::0x1AB1::0x04CE::DS1ZA170502787::0::INSTR") scope = RigolDS1054z(instr) print(f"ID: {scope.idn}") print(f"TimePerDiv = {scope.TimePerDiv}") # print(f'Ch1 mean: {scope.mean(1)}') print(f"Ch1 available points: {scope.getAvailableNumberOfPoints(1)}") print(f"Sample Rate: {scope.getSampleRate()}") print(f"Time per Div: {scope.getTimePerDiv()}") print(f"Ch1 Volts per Div: {scope.getChanVoltsPerDiv(1)}") print(f"Ch1 Voltage Offset: {scope.getChanVoltageOffset(1)}") print("------ Header start -------------") print(str.join("\n", scope.getHeader())) print("------ Header ends -------------") ch1 = scope.getTrace(1) traces = scope.getAllTraces()