from qolab.file_utils import save_table_with_header, infer_compression import datetime import numpy as np import yaml import pandas import os def headerFromDictionary(d, prefix=''): """Converts dictionary to YAML format with optional prefix for every line.""" header = [] tail = yaml.dump(d, default_flow_style=False, sort_keys=False) tail = tail.split('\n') header.extend(tail) prefixed_header = [prefix+l for l in header] return prefixed_header def from_timestamps_to_dates(timestamps): """Formats timestamps to datetime format""" dates = [datetime.datetime.fromtimestamp(float(ts)) for ts in timestamps] return(dates) def loadTraceRawHeaderAndData(fname, tryCompressedIfMissing=True): """Load trace file and return header and data. Attempts to load a compressed file if the main is missing and `tryCompressedIfMissing` is set to `True`. E.g. if we try to load 'data.dat' file and it is missing, attempt to load 'data.dat.gz' this option is mainly for compatibility with old scripts which are not aware of compressed files """ headerstr=[] data = None if (not os.path.exists(fname)) and tryCompressedIfMissing: # attempt to locate compressed file for the missing base file for ext in ['gz', 'bz', 'bz2']: if os.path.exists(fname+'.'+ext): fname += '.'+ext break # we will try to guess if the file compressed _open = open compression = infer_compression(fname) print(compression) if compression == 'gzip': # TODO improve detection: gzip files have first 2 bytes set to b'\x1f\x8b' import gzip _open = gzip.open elif compression == 'bzip': # TODO improve detection: bzip files have first 2 bytes set to b'BZ' import bz2 _open = bz2.open with _open(fname, mode='rb') as tracefile: # Reading yaml header prefixed by '% ' # It sits at the top and below is just data in TSV format while True: ln = tracefile.readline() if ln[0:2]==b'% ': headerstr.append(ln[2:].decode('utf-8')) else: break header=yaml.load(str.join('\n', headerstr), Loader=yaml.BaseLoader) # now we load the data itself # data=np.genfromtxt(fname, comments='%', delimiter='\t') # Note: pandas reads csv faster by factor of 8 then numpy.genfromtxt # data=pandas.read_csv('/home/evmik/hopping_trace_20220706_02141.dat', comment='%', delimiter='\t', header=None) tracefile.seek(0) # rewind file to the beginning df = pandas.read_csv(tracefile, comment='%', delimiter='\t', header=None) data = df.to_numpy() return(header, data) def loadTrace(fname, tryCompressedIfMissing=True): """Load trace file.""" (header, data) = loadTraceRawHeaderAndData(fname, tryCompressedIfMissing=tryCompressedIfMissing) return traceFromHeaderAndData(header, data) def traceFromHeaderAndData(header, data=None): """Generate trace class from it description (header) and data.""" label = None model = None tr = None if 'config' not in header: print('Error: trace has now config') return None else: if 'label' in header['config']: label = header['config']['label'] if 'model' not in header['config']: print('Error: unknown trace model') return None else: model = header['config']['model'] if model == 'Trace': tr = Trace(label) if data is not None: tr.values = data elif model == 'TraceXY': tx = traceFromHeaderAndData(header['TraceX']) ty = traceFromHeaderAndData(header['TraceY']) if data is not None: tx.values=data[:,0] ty.values=data[:,1] tr = TraceXY(label) tr.x = tx tr.y = ty elif model == 'TraceSetSameX': tx = traceFromHeaderAndData(header['TraceX']) tx.values=data[:,0] tr = TraceSetSameX(label) tr.addTraceX(tx) ytrs_header = header['TraceY'] cnt=0 for l, h in ytrs_header.items(): ty = traceFromHeaderAndData(h) cnt += 1 ty.values=data[:,cnt] trxy = TraceXY(l) trxy.x = tx trxy.y = ty tr.addTrace( trxy ) else: print(f'Error: unknown trace model: {model}') return None tr.config = header['config'] return tr class Trace: """Base Trace class, which holds only one variable""" def __init__(self, label): self.config = {} self.config['label'] = label self.config['model'] = 'Trace' self.config['version'] = '0.1' # 'type' is useful to indicate way of representation, make sense on y_vs_x traces # if set to none we have normal y vs x # if set to 'timestamp' x will be converted to datetime dates self.config['type'] = None self.config['item_format']='.15e' self.config['tags'] = {} self.last_saved_pos = 0 self._trace_specific_init() self.clear_data() def _trace_specific_init(self): self.config['unit'] = None self.values = np.empty(0) self.last_saved_pos = 0 def clear_last_saved_pos(self): self.last_saved_pos = 0 def clear_data(self): self.clear_last_saved_pos() if self.values is not None: self.values = np.empty(0, dtype=self.values.dtype) def __repr__(self): lbl = self.config['label'] cls_name = f"{self.__class__.__name__}('{lbl}'" return "".join([cls_name, f', N={self.values.size}', ')']) def plot(self): import matplotlib.pyplot as plt x=self.values if self.config['type'] is not None: if self.config['type'] == 'timestamp': x = from_timestamps_to_dates(self.values) plt.plot(x, label=self.config['label']) plt.xlabel('index') plt.ylabel(f"{self.config['unit']}") plt.legend() plt.grid(True) def getConfig(self): d ={} d['config'] = self.config.copy() return( d ) def getData(self): return( self.values ) def getHeader(self, prefix=''): d = self.getConfig() return headerFromDictionary(d, prefix='') def save(self, fname, last_saved_pos=None, skip_headers_if_file_exist=False, **kwargs): if last_saved_pos is None: last_saved_pos = self.last_saved_pos data = self.getData() if last_saved_pos > 0: skip_headers_if_file_exist=True fname = save_table_with_header(fname, data[last_saved_pos:,:], self.getHeader(), item_format=self.config['item_format'], skip_headers_if_file_exist=skip_headers_if_file_exist, **kwargs) self.last_saved_pos = data.shape[0] return(fname) def addPoint(self, val): self.values = np.append(self.values, val) class TraceXY(Trace): """Data structure to handle two variables X and Y, handy for Y vs X data arrangements.""" def __init__(self, label): super().__init__(label) self.config['model'] = 'TraceXY' def _trace_specific_init(self): self.x = None self.y = None def clear_data(self): self.clear_last_saved_pos() if self.x is not None: self.x.clear_data() if self.y is not None: self.y.clear_data() def __repr__(self): lbl = self.config['label'] cls_name = f"{self.__class__.__name__}('{lbl}'" xlabel= f"{self.x.config['label']}" xparam= f", {self.x}" yparam= f", {self.y}" return "".join([cls_name, xparam, yparam, ')']) def plot(self): import matplotlib.pyplot as plt x=self.x.values if self.x.config['type'] is not None: if self.x.config['type'] == 'timestamp': x = from_timestamps_to_dates(x) plt.plot(x, self.y.values, label=self.config['label']) plt.xlabel(f"{self.x.config['label']} ({self.x.config['unit']})") plt.ylabel(f"{self.y.config['label']} ({self.y.config['unit']})") plt.legend() plt.grid(True) def getConfig(self): config = {} config['config'] = self.config.copy() config['TraceX'] = {} config['TraceX'] = self.x.getConfig() config['TraceY'] = {} config['TraceY'] = self.y.getConfig() return( config ) def getData(self): data=self.x.values if data.ndim == 1: data = data[:, np.newaxis] vals = self.y.values if vals.ndim == 1: vals = vals[:, np.newaxis] data=np.concatenate((data, vals), 1) return( data ) def addPoint(self, valX, valY): self.x.values = np.append(self.x.values, valX) self.y.values = np.append(self.y.values, valY) class TraceSetSameX(Trace): """Data structure to handle multiple Ys vs X dependencies, handy for scope traces.""" def __init__(self, label): super().__init__(label) self.config['model'] = 'TraceSetSameX' def _trace_specific_init(self): self.x = None self.traces={} def clear_data(self): self.clear_last_saved_pos() if self.x is not None: self.x.clear_data() for k, tr in self.traces.items(): tr.clear_data() def __repr__(self): lbl = self.config['label'] cls_name = f"{self.__class__.__name__}('{lbl}'" xparam= f", x: {self.x}" yparam = f", traces: {list(self.traces.keys())}" return "".join([cls_name, xparam, yparam, ')']) def addTraceX(self, tr): self.x = tr def addTrace(self, tr): if tr.config['model'] == 'TraceXY': if self.x == None: self.x = tr.x trY = tr.y self.traces[tr.config['label']]=trY elif tr.config['model'] == 'Trace': if self.x == None: self.x = tr else: self.traces[tr.config['label']]=tr def plot(self): import matplotlib.pyplot as plt nplots = len(self.traces.keys()) fig = plt.gcf() fig, axs = plt.subplots(nplots, 1, sharex=True, num=fig.number) cnt=0 x=self.x.values if self.x.config['type'] is not None: if self.x.config['type'] == 'timestamp': x = from_timestamps_to_dates(x) for k, tr in self.traces.items(): p=axs[cnt].plot(x, tr.values, label=k) axs[cnt].set_ylabel(f"{tr.config['label']} ({tr.config['unit']})") axs[cnt].legend() axs[cnt].grid(True) cnt+=1 plt.xlabel(f"{self.x.config['label']} ({self.x.config['unit']})") def items(self): return (self.traces.items()) def keys(self): return (self.traces.keys()) def getTrace(self, label): tr = TraceXY(label) tr.x = self.x tr.y = self.traces[label] return (tr) def getConfig(self): config = {} config['config'] = self.config.copy() config['TraceX'] = {} config['TraceX'] = self.x.getConfig() config['TraceY'] = {} for k,v in self.traces.items(): config['TraceY'][k] = v.getConfig() return( config ) def getData(self): data=self.x.values if data.ndim == 1: data = data[:, np.newaxis] for k,v in self.traces.items(): vals = v.values if vals.ndim == 1: vals = vals[:, np.newaxis] data=np.concatenate((data, vals), 1) return( data ) def addPointToTrace(self, val, name=None): if name is None: self.x.values = np.append(self.x.values, val) else: a = self.traces[name].values a = np.append(a, val) self.traces[name].values = a if __name__ == '__main__': print("Testing trace") x=Trace('x trace') x.values = np.random.normal(2,2,(4,1)) x.values = np.array(x.values, int) x.config['unit']='s' x.config['tags']['tag1'] = 'xxxx' x.config['tags']['tag2'] = 'xxxx' x.save('xtrace.dat', skip_headers_if_file_exist=True) # print(x.getHeader()) y=Trace('y trace') y.values = np.random.normal(2,2,(4,1)) y.config['unit']='V' y.config['tags']['ytag2'] = 'yyyy' xy=TraceXY('xy trace') xy.config['tags']['xy tag']= 'I am xy tag' xy.x = x xy.y = y xy.save('xytrace.dat') # print(xy.getHeader()) xyn = TraceSetSameX('many ys trace') xyn.config['tags']['descr'] = 'I am many ys trace' xy.config['label']='y1' xyn.addTrace(xy) xy.config['label']='y2' xyn.addTrace(xy) xy.config['label']='y3' xyn.addTrace(xy) xyn.save('xyntrace.dat') # print(xyn.getHeader())