diff options
-rw-r--r-- | qolab/data/trace.py | 182 |
1 files changed, 103 insertions, 79 deletions
diff --git a/qolab/data/trace.py b/qolab/data/trace.py index bd2bdb8..28e0e42 100644 --- a/qolab/data/trace.py +++ b/qolab/data/trace.py @@ -5,45 +5,57 @@ import yaml import pandas import os + def headerFromDictionary(d, prefix=''): """Converts dictionary to YAML format with optional prefix for every line.""" header = [] tail = yaml.dump(d, default_flow_style=False, sort_keys=False) tail = tail.split('\n') header.extend(tail) - prefixed_header = [prefix+l for l in header] + prefixed_header = [prefix + l for l in header] return prefixed_header + def from_timestamps_to_dates(timestamps): """Formats timestamps to datetime format""" dates = [datetime.datetime.fromtimestamp(float(ts)) for ts in timestamps] - return(dates) + return (dates) + def loadTraceRawHeaderAndData(fname, tryCompressedIfMissing=True): """Load trace file and return header and data. - Attempts to load a compressed file if the main is missing - and `tryCompressedIfMissing` is set to `True`. - E.g. if we try to load 'data.dat' file and it is missing, attempt to load 'data.dat.gz' - this option is mainly for compatibility with old scripts which are not aware of compressed files + + Parameters + ---------- + fname: str or Path + file name path + + tryCompressedIfMissing: False or True (default) + - False : use file name as is + - True : if the provided filename is missing + attempts to load a compressed version of filename. + e.g. look for 'data.dat.gz', 'data.dat.bz', etc """ - headerstr=[] + headerstr = [] data = None if (not os.path.exists(fname)) and tryCompressedIfMissing: # attempt to locate compressed file for the missing base file for ext in ['gz', 'bz', 'bz2']: - if os.path.exists(fname+'.'+ext): - fname += '.'+ext + if os.path.exists(fname + '.' + ext): + fname += '.' + ext break # we will try to guess if the file compressed _open = open compression = infer_compression(fname) print(compression) if compression == 'gzip': - # TODO improve detection: gzip files have first 2 bytes set to b'\x1f\x8b' + # TODO improve detection by reading magic bytes: + # gzip files have first 2 bytes set to b'\x1f\x8b' import gzip _open = gzip.open elif compression == 'bzip': - # TODO improve detection: bzip files have first 2 bytes set to b'BZ' + # TODO improve detection by reading magic bytes: + # bzip files have first 2 bytes set to b'BZ' import bz2 _open = bz2.open with _open(fname, mode='rb') as tracefile: @@ -51,23 +63,26 @@ def loadTraceRawHeaderAndData(fname, tryCompressedIfMissing=True): # It sits at the top and below is just data in TSV format while True: ln = tracefile.readline() - if ln[0:2]==b'% ': + if ln[0:2] == b'% ': headerstr.append(ln[2:].decode('utf-8')) else: break - header=yaml.load(str.join('\n', headerstr), Loader=yaml.BaseLoader) + header = yaml.load(str.join('\n', headerstr), Loader=yaml.BaseLoader) # now we load the data itself - tracefile.seek(0) # rewind file to the beginning + tracefile.seek(0) # rewind file to the beginning # Note: pandas reads csv faster by factor of 8 then numpy.genfromtxt df = pandas.read_csv(tracefile, comment='%', delimiter='\t', header=None) data = df.to_numpy() - return(header, data) + return (header, data) + def loadTrace(fname, tryCompressedIfMissing=True): """Load trace file.""" - (header, data) = loadTraceRawHeaderAndData(fname, tryCompressedIfMissing=tryCompressedIfMissing) + (header, data) = loadTraceRawHeaderAndData( + fname, tryCompressedIfMissing=tryCompressedIfMissing) return traceFromHeaderAndData(header, data) + def traceFromHeaderAndData(header, data=None): """Generate trace class from it description (header) and data.""" label = None @@ -92,26 +107,26 @@ def traceFromHeaderAndData(header, data=None): tx = traceFromHeaderAndData(header['TraceX']) ty = traceFromHeaderAndData(header['TraceY']) if data is not None: - tx.values=data[:,0] - ty.values=data[:,1] + tx.values = data[:, 0] + ty.values = data[:, 1] tr = TraceXY(label) tr.x = tx tr.y = ty elif model == 'TraceSetSameX': tx = traceFromHeaderAndData(header['TraceX']) - tx.values=data[:,0] + tx.values = data[:, 0] tr = TraceSetSameX(label) tr.addTraceX(tx) ytrs_header = header['TraceY'] - cnt=0 + cnt = 0 for l, h in ytrs_header.items(): ty = traceFromHeaderAndData(h) cnt += 1 - ty.values=data[:,cnt] + ty.values = data[:, cnt] trxy = TraceXY(l) trxy.x = tx trxy.y = ty - tr.addTrace( trxy ) + tr.addTrace(trxy) else: print(f'Error: unknown trace model: {model}') @@ -119,7 +134,8 @@ def traceFromHeaderAndData(header, data=None): tr.config = header['config'] return tr - + + class Trace: """Base Trace class, which holds only one variable""" def __init__(self, label): @@ -127,11 +143,12 @@ class Trace: self.config['label'] = label self.config['model'] = 'Trace' self.config['version'] = '0.1' - # 'type' is useful to indicate way of representation, make sense on y_vs_x traces - # if set to none we have normal y vs x - # if set to 'timestamp' x will be converted to datetime dates - self.config['type'] = None - self.config['item_format']='.15e' + # 'type' is useful to indicate way of representation, + # it makes sense on y_vs_x traces. + # If set to none we have normal y vs x. + # If set to 'timestamp' x will be converted to datetime dates + self.config['type'] = None + self.config['item_format'] = '.15e' self.config['tags'] = {} self.last_saved_pos = 0 self._trace_specific_init() @@ -157,7 +174,7 @@ class Trace: def plot(self): import matplotlib.pyplot as plt - x=self.values + x = self.values if self.config['type'] is not None: if self.config['type'] == 'timestamp': x = from_timestamps_to_dates(self.values) @@ -166,35 +183,44 @@ class Trace: plt.ylabel(f"{self.config['unit']}") plt.legend() plt.grid(True) - + def getConfig(self): - d ={} + d = {} d['config'] = self.config.copy() - return( d ) + return (d) def getData(self): - return( self.values ) + return (self.values) def getHeader(self, prefix=''): d = self.getConfig() return headerFromDictionary(d, prefix='') - def save(self, fname, last_saved_pos=None, skip_headers_if_file_exist=False, **kwargs): + def save(self, fname, last_saved_pos=None, + skip_headers_if_file_exist=False, **kwargs): + """Save trace to file and returns its filename.""" if last_saved_pos is None: last_saved_pos = self.last_saved_pos data = self.getData() if last_saved_pos > 0: - skip_headers_if_file_exist=True - fname = save_table_with_header(fname, data[last_saved_pos:,:], self.getHeader(), item_format=self.config['item_format'], skip_headers_if_file_exist=skip_headers_if_file_exist, **kwargs) + skip_headers_if_file_exist = True + fname = save_table_with_header(fname, data[last_saved_pos:, :], + self.getHeader(), + item_format=self.config['item_format'], + skip_headers_if_file_exist=skip_headers_if_file_exist, + **kwargs) self.last_saved_pos = data.shape[0] - return(fname) + return (fname) def addPoint(self, val): self.values = np.append(self.values, val) class TraceXY(Trace): - """Data structure to handle two variables X and Y, handy for Y vs X data arrangements.""" + """Data structure to handle two linked variables X and Y. + + It is handy for Y vs X data arrangements. + """ def __init__(self, label): super().__init__(label) self.config['model'] = 'TraceXY' @@ -213,14 +239,14 @@ class TraceXY(Trace): def __repr__(self): lbl = self.config['label'] cls_name = f"{self.__class__.__name__}('{lbl}'" - xlabel= f"{self.x.config['label']}" - xparam= f", {self.x}" - yparam= f", {self.y}" + # xlabel = f"{self.x.config['label']}" + xparam = f", {self.x}" + yparam = f", {self.y}" return "".join([cls_name, xparam, yparam, ')']) def plot(self): import matplotlib.pyplot as plt - x=self.x.values + x = self.x.values if self.x.config['type'] is not None: if self.x.config['type'] == 'timestamp': x = from_timestamps_to_dates(x) @@ -237,32 +263,33 @@ class TraceXY(Trace): config['TraceX'] = self.x.getConfig() config['TraceY'] = {} config['TraceY'] = self.y.getConfig() - return( config ) + return (config) def getData(self): - data=self.x.values + data = self.x.values if data.ndim == 1: data = data[:, np.newaxis] vals = self.y.values if vals.ndim == 1: vals = vals[:, np.newaxis] - data=np.concatenate((data, vals), 1) - return( data ) + data = np.concatenate((data, vals), 1) + return (data) def addPoint(self, valX, valY): self.x.values = np.append(self.x.values, valX) self.y.values = np.append(self.y.values, valY) - + class TraceSetSameX(Trace): - """Data structure to handle multiple Ys vs X dependencies, handy for scope traces.""" + """Data structure to handle multiple Ys vs X dependencies. + It is handy for scope traces.""" def __init__(self, label): super().__init__(label) self.config['model'] = 'TraceSetSameX' def _trace_specific_init(self): self.x = None - self.traces={} + self.traces = {} def clear_data(self): self.clear_last_saved_pos() @@ -274,7 +301,7 @@ class TraceSetSameX(Trace): def __repr__(self): lbl = self.config['label'] cls_name = f"{self.__class__.__name__}('{lbl}'" - xparam= f", x: {self.x}" + xparam = f", x: {self.x}" yparam = f", traces: {list(self.traces.keys())}" return "".join([cls_name, xparam, yparam, ')']) @@ -286,29 +313,29 @@ class TraceSetSameX(Trace): if self.x == None: self.x = tr.x trY = tr.y - self.traces[tr.config['label']]=trY + self.traces[tr.config['label']] = trY elif tr.config['model'] == 'Trace': if self.x == None: self.x = tr else: - self.traces[tr.config['label']]=tr - + self.traces[tr.config['label']] = tr + def plot(self): import matplotlib.pyplot as plt nplots = len(self.traces.keys()) fig = plt.gcf() fig, axs = plt.subplots(nplots, 1, sharex=True, num=fig.number) - cnt=0 - x=self.x.values + cnt = 0 + x = self.x.values if self.x.config['type'] is not None: if self.x.config['type'] == 'timestamp': x = from_timestamps_to_dates(x) for k, tr in self.traces.items(): - p=axs[cnt].plot(x, tr.values, label=k) + p = axs[cnt].plot(x, tr.values, label=k) axs[cnt].set_ylabel(f"{tr.config['label']} ({tr.config['unit']})") axs[cnt].legend() axs[cnt].grid(True) - cnt+=1 + cnt += 1 plt.xlabel(f"{self.x.config['label']} ({self.x.config['unit']})") def items(self): @@ -322,27 +349,27 @@ class TraceSetSameX(Trace): tr.x = self.x tr.y = self.traces[label] return (tr) - + def getConfig(self): config = {} config['config'] = self.config.copy() config['TraceX'] = {} config['TraceX'] = self.x.getConfig() config['TraceY'] = {} - for k,v in self.traces.items(): + for k, v in self.traces.items(): config['TraceY'][k] = v.getConfig() - return( config ) + return (config) def getData(self): - data=self.x.values + data = self.x.values if data.ndim == 1: data = data[:, np.newaxis] - for k,v in self.traces.items(): + for k, v in self.traces.items(): vals = v.values if vals.ndim == 1: vals = vals[:, np.newaxis] - data=np.concatenate((data, vals), 1) - return( data ) + data = np.concatenate((data, vals), 1) + return (data) def addPointToTrace(self, val, name=None): if name is None: @@ -353,36 +380,33 @@ class TraceSetSameX(Trace): self.traces[name].values = a - -if __name__ == '__main__': +if __name__ == '__main__': print("Testing trace") - x=Trace('x trace') - x.values = np.random.normal(2,2,(4,1)) + x = Trace('x trace') + x.values = np.random.normal(2, 2, (4, 1)) x.values = np.array(x.values, int) - x.config['unit']='s' + x.config['unit'] = 's' x.config['tags']['tag1'] = 'xxxx' x.config['tags']['tag2'] = 'xxxx' x.save('xtrace.dat', skip_headers_if_file_exist=True) # print(x.getHeader()) - y=Trace('y trace') - y.values = np.random.normal(2,2,(4,1)) - y.config['unit']='V' + y = Trace('y trace') + y.values = np.random.normal(2, 2, (4, 1)) + y.config['unit'] = 'V' y.config['tags']['ytag2'] = 'yyyy' - xy=TraceXY('xy trace') - xy.config['tags']['xy tag']= 'I am xy tag' + xy = TraceXY('xy trace') + xy.config['tags']['xy tag'] = 'I am xy tag' xy.x = x xy.y = y xy.save('xytrace.dat') # print(xy.getHeader()) xyn = TraceSetSameX('many ys trace') xyn.config['tags']['descr'] = 'I am many ys trace' - xy.config['label']='y1' + xy.config['label'] = 'y1' xyn.addTrace(xy) - xy.config['label']='y2' + xy.config['label'] = 'y2' xyn.addTrace(xy) - xy.config['label']='y3' + xy.config['label'] = 'y3' xyn.addTrace(xy) xyn.save('xyntrace.dat') # print(xyn.getHeader()) - - |