from qolab.file_utils import save_table_with_header import datetime import csv import numpy as np import yaml def headerFromDictionary(d, prefix=''): header = [] tail = yaml.dump(d, default_flow_style=False, sort_keys=False) tail = tail.split('\n') header.extend(tail) prefixed_header = [prefix+l for l in header] return prefixed_header def from_timestamps_to_dates(timestamps): dates = [datetime.datetime.fromtimestamp(float(ts)) for ts in timestamps] return(dates) def loadTraceRawHeaderAndData(fname): headerstr=[] data = None with open(fname) as csvfile: rowreader = csv.reader(csvfile, delimiter='\t') for row in rowreader: if row[0][0:2]=='% ': headerstr.append(row[0][2:]) else: rdata = np.array(row, dtype=float) if data is None: data=rdata else: data=np.vstack((data,rdata)) header=yaml.load(str.join('\n', headerstr), Loader=yaml.SafeLoader) return(header, data) def loadTrace(fname): (header, data) = loadTraceRawHeaderAndData(fname) return traceFromHeaderAndData(header, data) def traceFromHeaderAndData(header, data=None): label = None model = None tr = None if 'config' not in header: print('Error: trace has now config') return None else: if 'label' in header['config']: label = header['config']['label'] if 'model' not in header['config']: print('Error: unknown trace model') return None else: model = header['config']['model'] if model == 'Trace': tr = Trace(label) if data is not None: tr.values = data elif model == 'TraceXY': tx = traceFromHeaderAndData(header['TraceX']) ty = traceFromHeaderAndData(header['TraceY']) if data is not None: tx.values=data[:,0] ty.values=data[:,1] tr = TraceXY(label) tr.x = tx tr.y = ty elif model == 'TraceSetSameX': tx = traceFromHeaderAndData(header['TraceX']) tx.values=data[:,0] tr = TraceSetSameX(label) tr.addTraceX(tx) ytrs_header = header['TraceY'] cnt=0 for l, h in ytrs_header.items(): ty = traceFromHeaderAndData(h) cnt += 1 ty.values=data[:,cnt] trxy = TraceXY(l) trxy.x = tx trxy.y = ty tr.addTrace( trxy ) else: print(f'Error: unknown trace model: {model}') return None tr.config = header['config'] return tr class Trace: def __init__(self, label): self.config = {} self.config['label'] = label self.config['model'] = 'Trace' self.config['version'] = '0.1' # 'type' is useful to indicate way of representation, make sense on y_vs_x traces # if set to none we have normal y vs x # if set to 'timestamp' x will be converted to datetime dates self.config['type'] = None self.config['item_format']='.15e' self.config['tags'] = {} self.last_saved_pos = 0 self._trace_specific_init() self.clear_data() def _trace_specific_init(self): self.config['unit'] = None self.values = np.empty(0) self.last_saved_pos = 0 def clear_last_saved_pos(self): self.last_saved_pos = 0 def clear_data(self): self.clear_last_saved_pos() if self.values is not None: self.values = np.empty(0, dtype=self.values.dtype) def plot(self): import matplotlib.pyplot as plt plt.plot(self.values, label=self.config['label']) plt.xlabel('index') plt.ylabel(f"{self.config['unit']}") plt.legend() plt.grid() def getConfig(self): d ={} d['config'] = self.config.copy() return( d ) def getData(self): return( self.values ) def getHeader(self, prefix=''): d = self.getConfig() return headerFromDictionary(d, prefix='') def save(self, fname, last_saved_pos=None, skip_headers_if_file_exist=False, **kwargs): if last_saved_pos is None: last_saved_pos = self.last_saved_pos data = self.getData() if last_saved_pos > 0: skip_headers_if_file_exist=True fname = save_table_with_header(fname, data[last_saved_pos:,:], self.getHeader(), item_format=self.config['item_format'], skip_headers_if_file_exist=skip_headers_if_file_exist, **kwargs) self.last_saved_pos = data.shape[0] return(fname) def addPoint(self, val): self.values = np.append(self.values, val) class TraceXY(Trace): def __init__(self, label): super().__init__(label) self.config['model'] = 'TraceXY' def _trace_specific_init(self): self.x = None self.y = None def clear_data(self): self.clear_last_saved_pos() if self.x is not None: self.x.clear_data() if self.y is not None: self.y.clear_data() def plot(self): import matplotlib.pyplot as plt x=self.x.values if self.config['type'] is not None: if self.config['type'] == 'timestamp': x = from_timestamps_to_dates(x) plt.plot(x, self.y.values, label=self.config['label']) plt.xlabel(f"{self.x.config['label']} ({self.x.config['unit']})") plt.ylabel(f"{self.y.config['label']} ({self.y.config['unit']})") plt.legend() plt.grid() def getConfig(self): config = {} config['config'] = self.config.copy() config['TraceX'] = {} config['TraceX'] = self.x.getConfig() config['TraceY'] = {} config['TraceY'] = self.y.getConfig() return( config ) def getData(self): data=self.x.values if data.ndim == 1: data = data[:, np.newaxis] vals = self.y.values if vals.ndim == 1: vals = vals[:, np.newaxis] data=np.concatenate((data, vals), 1) return( data ) def addPoint(self, valX, valY): self.x.values = np.append(self.x.values, valX) self.y.values = np.append(self.y.values, valY) class TraceSetSameX(Trace): def __init__(self, label): super().__init__(label) self.config['model'] = 'TraceSetSameX' def _trace_specific_init(self): self.x = None self.traces={} def clear_data(self): self.clear_last_saved_pos() if self.x is not None: self.x.clear_data() for k, tr in self.traces.items(): tr.clear_data() def addTraceX(self, tr): self.x = tr def addTrace(self, tr): if tr.config['model'] == 'TraceXY': if self.x == None: self.x = tr.x trY = tr.y self.traces[tr.config['label']]=trY elif tr.config['model'] == 'Trace': if self.x == None: self.x = tr else: self.traces[tr.config['label']]=tr def plot(self): import matplotlib.pyplot as plt nplots = len(self.traces.keys()) cnt=0 x=self.x.values if self.config['type'] is not None: if self.config['type'] == 'timestamp': x = from_timestamps_to_dates(x) for k, tr in self.traces.items(): cnt+=1 if cnt == 1: ax1=plt.subplot(nplots, 1, cnt) else: plt.subplot(nplots, 1, cnt, sharex=ax1) plt.plot(x, tr.values, label=k) plt.ylabel(f"{tr.config['label']} ({tr.config['unit']})") plt.legend() plt.grid() plt.xlabel(f"{self.x.config['label']} ({self.x.config['unit']})") def items(self): return (self.traces.items()) def keys(self): return (self.traces.keys()) def getTrace(self, label): tr = TraceXY(label) tr.x = self.x tr.y = self.traces[label] return (tr) def getConfig(self): config = {} config['config'] = self.config.copy() config['TraceX'] = {} config['TraceX'] = self.x.getConfig() config['TraceY'] = {} for k,v in self.traces.items(): config['TraceY'][k] = v.getConfig() return( config ) def getData(self): data=self.x.values if data.ndim == 1: data = data[:, np.newaxis] for k,v in self.traces.items(): vals = v.values if vals.ndim == 1: vals = vals[:, np.newaxis] data=np.concatenate((data, vals), 1) return( data ) def addPointToTrace(self, val, name=None): if name is None: self.x.values = np.append(self.x.values, val) else: a = self.traces[name].values a = np.append(a, val) self.traces[name].values = a if __name__ == '__main__': print("Testing trace") x=Trace('x trace') x.values = np.random.normal(2,2,(4,1)) x.values = np.array(x.values, int) x.config['unit']='s' x.config['tags']['tag1'] = 'xxxx' x.config['tags']['tag2'] = 'xxxx' x.save('xtrace.dat', skip_headers_if_file_exist=True) # print(x.getHeader()) y=Trace('y trace') y.values = np.random.normal(2,2,(4,1)) y.config['unit']='V' y.config['tags']['ytag2'] = 'yyyy' xy=TraceXY('xy trace') xy.config['tags']['xy tag']= 'I am xy tag' xy.x = x xy.y = y xy.save('xytrace.dat') # print(xy.getHeader()) xyn = TraceSetSameX('many ys trace') xyn.config['tags']['descr'] = 'I am many ys trace' xy.config['label']='y1' xyn.addTrace(xy) xy.config['label']='y2' xyn.addTrace(xy) xy.config['label']='y3' xyn.addTrace(xy) xyn.save('xyntrace.dat') # print(xyn.getHeader())