from qolab.file_utils import save_table_with_header import datetime import numpy as np import yaml def headerFromDictionary(d, prefix=''): header = [] tail = yaml.dump(d, default_flow_style=False, sort_keys=False) tail = tail.split('\n') header.extend(tail) prefixed_header = [prefix+l for l in header] return prefixed_header def from_timestamps_to_dates(timestamps): dates = [datetime.datetime.fromtimestamp(float(ts)) for ts in timestamps] return(dates) class Trace: def __init__(self, label): self.config = {} self.config['label'] = label # 'type' is useful to indicate way of representation, make sense on y_vs_x traces # if set to none we have normal y vs x # if set to 'timestamp' x will be converted to datetime dates self.config['type'] = None self.config['tags'] = {} self.item_format='.15e' self.last_saved_pos = 0 self._trace_specific_init() self.clear_data() def _trace_specific_init(self): self.config['unit'] = None self.values = np.empty(0) self.last_saved_pos = 0 def clear_last_saved_pos(self): self.last_saved_pos = 0 def clear_data(self): self.clear_last_saved_pos() if self.values is not None: self.values = np.empty(0, dtype=self.values.dtype) def plot(self): import matplotlib.pyplot as plt plt.plot(self.values, label=self.config['label']) plt.xlabel('index') plt.ylabel(f"{self.config['unit']}") plt.legend() plt.grid() def getConfig(self): return( self.config ) def getData(self): return( self.values ) def getHeader(self, prefix=''): return headerFromDictionary(self.getConfig(), prefix='') def save(self, fname, last_saved_pos=None, skip_headers_if_file_exist=False, **kwargs): if last_saved_pos is None: last_saved_pos = self.last_saved_pos data = self.getData() if last_saved_pos > 0: skip_headers_if_file_exist=True fname = save_table_with_header(fname, data[last_saved_pos:,:], self.getHeader(), item_format=self.item_format, skip_headers_if_file_exist=skip_headers_if_file_exist, **kwargs) self.last_saved_pos = data.shape[0] return(fname) def addPoint(self, val): self.values = np.append(self.values, val) class TraceXY(Trace): def __init__(self, label): super().__init__(label) def _trace_specific_init(self): self.x = None self.y = None def clear_data(self): self.clear_last_saved_pos() if self.x is not None: self.x.clear_data() if self.y is not None: self.y.clear_data() def plot(self): import matplotlib.pyplot as plt x=self.x.values if self.config['type'] is not None: if self.config['type'] == 'timestamp': x = from_timestamps_to_dates(x) plt.plot(x, self.y.values, label=self.config['label']) plt.xlabel(f"{self.x.config['label']} ({self.x.config['unit']})") plt.ylabel(f"{self.y.config['label']} ({self.y.config['unit']})") plt.legend() plt.grid() def getConfig(self): config = self.config.copy() config['TraceX'] = {} config['TraceX'] = self.x.getConfig() config['TraceY'] = {} config['TraceY'] = self.y.getConfig() return( config ) def getData(self): data=self.x.values if data.ndim == 1: data = data[:, np.newaxis] vals = self.y.values if vals.ndim == 1: vals = vals[:, np.newaxis] data=np.concatenate((data, vals), 1) return( data ) def addPoint(self, valX, valY): self.x.values = np.append(self.x.values, valX) self.y.values = np.append(self.y.values, valY) class TraceSetSameX(Trace): def __init__(self, label): super().__init__(label) def _trace_specific_init(self): self.x = None self.traces={} def clear_data(self): self.clear_last_saved_pos() if self.x is not None: self.x.clear_data() for k, tr in self.traces.items(): tr.clear_data() def addTrace(self, tr): if len(self.traces) == 0: self.x = tr.x trY = tr.y self.traces[tr.config['label']]=trY def plot(self): import matplotlib.pyplot as plt nplots = len(self.traces.keys()) cnt=0 x=self.x.values if self.config['type'] is not None: if self.config['type'] == 'timestamp': x = from_timestamps_to_dates(x) for k, tr in self.traces.items(): cnt+=1 if cnt == 1: ax1=plt.subplot(nplots, 1, cnt) else: plt.subplot(nplots, 1, cnt, sharex=ax1) plt.plot(x, tr.values, label=k) plt.ylabel(f"{tr.config['label']} ({tr.config['unit']})") plt.legend() plt.grid() plt.xlabel(f"{self.x.config['label']} ({self.x.config['unit']})") def items(self): return (self.traces.items()) def keys(self): return (self.traces.keys()) def getTrace(self, label): tr = TraceXY(label) tr.x = self.x tr.y = self.traces[label] return (tr) def getConfig(self): config = self.config.copy() config['TraceX'] = {} config['TraceX'] = self.x.getConfig() config['TraceY'] = {} for k,v in self.traces.items(): config['TraceY'][k] = v.getConfig() return( config ) def getData(self): data=self.x.values if data.ndim == 1: data = data[:, np.newaxis] for k,v in self.traces.items(): vals = v.values if vals.ndim == 1: vals = vals[:, np.newaxis] data=np.concatenate((data, vals), 1) return( data ) def addPointToTrace(self, val, name=None): if name is None: self.x.values = np.append(self.x.values, val) else: a = self.traces[name].values a = np.append(a, val) self.traces[name].values = a if __name__ == '__main__': print("Testing trace") x=Trace('x trace') x.values = np.random.normal(2,2,(4,1)) x.values = np.array(x.values, int) x.config['unit']='s' x.config['tags']['tag1'] = 'xxxx' x.config['tags']['tag2'] = 'xxxx' x.save('xtrace.dat', skip_headers_if_file_exist=True) # print(x.getHeader()) y=Trace('y trace') y.values = np.random.normal(2,2,(4,1)) y.config['unit']='V' y.config['tags']['ytag2'] = 'yyyy' xy=TraceXY('xy trace') xy.config['tags']['xy tag']= 'I am xy tag' xy.x = x xy.y = y xy.save('xytrace.dat') # print(xy.getHeader()) xyn = TraceSetSameX('many ys trace') xyn.config['tags']['descr'] = 'I am many ys trace' xy.config['label']='y1' xyn.addTrace(xy) xy.config['label']='y2' xyn.addTrace(xy) xy.config['label']='y3' xyn.addTrace(xy) xyn.save('xyntrace.dat') # print(xyn.getHeader())