""" Provide basic class to build an operational scope Created by Eugeniy E. Mikhailov 2021/11/29 """ import numpy as np from qolab.hardware.scpi import SCPIinstr from qolab.hardware.basic import BasicInstrument from qolab.data.trace import TraceSetSameX import time import logging logging.basicConfig( format="%(asctime)s %(levelname)8s %(name)s: %(message)s", datefmt="%m/%d/%Y %H:%M:%S", ) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def calcSparsingAndNumPoints(availableNpnts=None, maxRequiredPoints=None): """Calculate sparcing and number of sparced points. Parameters ---------- availableNpnts: int or None (throws error) Number of available points. If set to None exit with error maxRequiredPoints: int or None (throws error) number of requested points after decimation. If availableNpnts< maxRequiredPoints*2, decimation is impossible and we will get up to factor of 2 more than requested. Return ------ (sparsing, Npnts, availableNpnts, maxRequiredPoints) """ if availableNpnts is None: raise ValueError("Invalid availableNpnts value, must be int.") if maxRequiredPoints is None: raise ValueError("Invalid maxRequiredPoints value, must be int.") if availableNpnts <= maxRequiredPoints * 2: Npnts = availableNpnts sparsing = 1 else: sparsing = int(np.floor(availableNpnts / maxRequiredPoints)) Npnts = int(np.floor(availableNpnts / sparsing)) return (sparsing, Npnts, availableNpnts, maxRequiredPoints) class Scope(BasicInstrument): """Minimal class to implement a scope. Intended to be used as a parent for hardware aware scopes. Provide a minimal set of methods to be implemented by a scope. """ def __init__(self, *args, **kwds): BasicInstrument.__init__(self, *args, **kwds) self.config["Device type"] = "Scope" self.config["Device model"] = "Generic Scope Without Hardware interface" self.config["FnamePrefix"] = "scope" self.numberOfChannels = 0 # deviceProperties must have 'get' and preferably 'set' methods available, # i.e. 'SampleRate' needs getSampleRate() and love to have setSampleRate(value) # they will be used to obtain config and set device according to it self.deviceProperties.update( {"SampleRate", "TimePerDiv", "TrigDelay", "TriggerMode", "Roll", "Run"} ) # same is applied to channelProperties # but we need setter/getter with channel number # i.e. VoltsPerDiv must provide # getChanVoltsPerDiv(chNum) and setSampleRate(chNum, value) self.channelProperties = { "VoltsPerDiv", "VoltageOffset", } def getTrace( self, chNum, availableNpnts=None, maxRequiredPoints=None, decimate=True ): # Should work with minimal arguments list # but might be faster if parameters provided: less IO requests # old_trg_mode = self.getTriggerMode() # self.setTriggerMode('STOP'); # to get synchronous channels raise NotImplementedError("getTrace function is not implemented") # if old_trg_mode != "STOP": # short speed up here with this check # self.setTriggerMode(old_trg_mode) def getTriggerMode(self): # we expect NORM, AUTO, SINGLE raise NotImplementedError("getTriggerMode function is not implemented") def setTriggerMode(self, mode): # we expect NORM, AUTO, SINGLE raise NotImplementedError("setTriggerMode function is not implemented") def getRun(self): """Is acquisition running or stopped.""" raise NotImplementedError("getRun function is not implemented") def setRun(self, val): """Either enable run or stop the acquisition.""" raise NotImplementedError("setRun function is not implemented") def _waitUntillStop(self, timeout=1): """Wait until scope in the stop state. Just because we ask for a scope to stop, does not mean that it is stopped. It can still wait for a trigger or untill the time span is filled. Parameter --------- timeout : float timeout in seconds, default is 1 second """ starttime = time.time() deadline = starttime + timeout while time.time() < deadline: if self.getRun(): time.sleep(0.010) else: logger.debug(f"Scope stopped within {time.time()-starttime} seconds.") return logger.warning( f"Scope did not reach STOP state within {timeout=} sec, try to increase it." ) def getAllTraces(self, availableNpnts=None, maxRequiredPoints=None, decimate=True): allTraces = TraceSetSameX("scope traces") allTraces.config["tags"]["DAQ"] = self.getConfig() old_run_status = self.getRun() if old_run_status: # avoid unnecessary status change self.setRun(False) # stop if currently running self._waitUntillStop() # to get synchronous channels for chNum in range(1, self.numberOfChannels + 1): allTraces.addTrace( self.getTrace( chNum, availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints, decimate=decimate, ) ) # restore scope to the before acquisition mode if old_run_status: # avoid unnecessary status change self.setRun(old_run_status) # start running if it was old run state return allTraces def chanAutoScale(self, chNum, margin=0.25, timeout=5): """Auto scale channel to fit signal on screen. Tunes Volts per division and Channel offset to fit signal on screen (vertically). Parameters ---------- chNum : int Channel to auto scale margin: float How much extra space (margin) to have with respect to full screen. Default is 0.25 (i.e. 25%), i.e. 1 vertical division at top and bottom for 8 division scope. """ starttime = time.time() deadline = starttime+timeout scaled_corectly = False while (not scaled_corectly) and (time.time() margin / 2) and (top_margin < margin) is_bottom_margin_good = (bottom_margin > margin / 2) and (bottom_margin < margin) if (is_bottom_margin_good) and (is_top_margin_good): scaled_corectly = True break offset = (tr_max+tr_min)/2 vPerDiv = (tr_max -tr_min)/(self.vertDivOnScreen*(1-margin*1.5)) self.setChanVoltageOffset(chNum, offset) self.setChanVoltsPerDiv(chNum, vPerDiv) scaled_corectly = False if (time.time()>deadline): logger.warning( f"Scope did not make proper channel {chNum} scaling within {timeout=} sec, try to increase it." ) def plot(self, **kwargs): allTraces = self.getAllTraces(**kwargs) allTraces.plot() def save( self, fname=None, item_format="e", availableNpnts=None, maxRequiredPoints=None, decimate=True, extension="dat", ): allTraces = self.getAllTraces( availableNpnts=availableNpnts, maxRequiredPoints=maxRequiredPoints, decimate=decimate, ) allTraces.config["item_format"] = item_format if fname is None: fname = self.getNextDataFile(extension=extension) allTraces.save(fname) print(f"Data saved to: {fname}") return fname class ScopeSCPI(SCPIinstr, Scope): """SCPI aware scope. Use as a parent for a hardware aware scope classes. Example ------- >>> rm = pyvisa.ResourceManager() >>> ScopeSCPI(rm.open_resource('TCPIP::192.168.0.2::INSTR')) """ def __init__(self, resource, *args, **kwds): SCPIinstr.__init__(self, resource) Scope.__init__(self, *args, **kwds) self.config["DeviceId"] = str.strip(self.idn)