From 88013bab26cb61f8ee96599a518fbea855544d5b Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Sat, 13 Jul 2024 17:34:32 -0400 Subject: applied black formatter --- qolab/data/trace.py | 225 +++++++++++++++++++++++++++------------------------- 1 file changed, 119 insertions(+), 106 deletions(-) (limited to 'qolab/data/trace.py') diff --git a/qolab/data/trace.py b/qolab/data/trace.py index 28e0e42..1b2ea17 100644 --- a/qolab/data/trace.py +++ b/qolab/data/trace.py @@ -6,11 +6,11 @@ import pandas import os -def headerFromDictionary(d, prefix=''): +def headerFromDictionary(d, prefix=""): """Converts dictionary to YAML format with optional prefix for every line.""" header = [] tail = yaml.dump(d, default_flow_style=False, sort_keys=False) - tail = tail.split('\n') + tail = tail.split("\n") header.extend(tail) prefixed_header = [prefix + l for l in header] return prefixed_header @@ -19,7 +19,7 @@ def headerFromDictionary(d, prefix=''): def from_timestamps_to_dates(timestamps): """Formats timestamps to datetime format""" dates = [datetime.datetime.fromtimestamp(float(ts)) for ts in timestamps] - return (dates) + return dates def loadTraceRawHeaderAndData(fname, tryCompressedIfMissing=True): @@ -40,38 +40,40 @@ def loadTraceRawHeaderAndData(fname, tryCompressedIfMissing=True): data = None if (not os.path.exists(fname)) and tryCompressedIfMissing: # attempt to locate compressed file for the missing base file - for ext in ['gz', 'bz', 'bz2']: - if os.path.exists(fname + '.' + ext): - fname += '.' + ext + for ext in ["gz", "bz", "bz2"]: + if os.path.exists(fname + "." + ext): + fname += "." + ext break # we will try to guess if the file compressed _open = open compression = infer_compression(fname) print(compression) - if compression == 'gzip': + if compression == "gzip": # TODO improve detection by reading magic bytes: # gzip files have first 2 bytes set to b'\x1f\x8b' import gzip + _open = gzip.open - elif compression == 'bzip': + elif compression == "bzip": # TODO improve detection by reading magic bytes: # bzip files have first 2 bytes set to b'BZ' import bz2 + _open = bz2.open - with _open(fname, mode='rb') as tracefile: + with _open(fname, mode="rb") as tracefile: # Reading yaml header prefixed by '% ' # It sits at the top and below is just data in TSV format while True: ln = tracefile.readline() - if ln[0:2] == b'% ': - headerstr.append(ln[2:].decode('utf-8')) + if ln[0:2] == b"% ": + headerstr.append(ln[2:].decode("utf-8")) else: break - header = yaml.load(str.join('\n', headerstr), Loader=yaml.BaseLoader) + header = yaml.load(str.join("\n", headerstr), Loader=yaml.BaseLoader) # now we load the data itself tracefile.seek(0) # rewind file to the beginning # Note: pandas reads csv faster by factor of 8 then numpy.genfromtxt - df = pandas.read_csv(tracefile, comment='%', delimiter='\t', header=None) + df = pandas.read_csv(tracefile, comment="%", delimiter="\t", header=None) data = df.to_numpy() return (header, data) @@ -79,7 +81,8 @@ def loadTraceRawHeaderAndData(fname, tryCompressedIfMissing=True): def loadTrace(fname, tryCompressedIfMissing=True): """Load trace file.""" (header, data) = loadTraceRawHeaderAndData( - fname, tryCompressedIfMissing=tryCompressedIfMissing) + fname, tryCompressedIfMissing=tryCompressedIfMissing + ) return traceFromHeaderAndData(header, data) @@ -88,36 +91,36 @@ def traceFromHeaderAndData(header, data=None): label = None model = None tr = None - if 'config' not in header: - print('Error: trace has now config') + if "config" not in header: + print("Error: trace has now config") return None else: - if 'label' in header['config']: - label = header['config']['label'] - if 'model' not in header['config']: - print('Error: unknown trace model') + if "label" in header["config"]: + label = header["config"]["label"] + if "model" not in header["config"]: + print("Error: unknown trace model") return None else: - model = header['config']['model'] - if model == 'Trace': + model = header["config"]["model"] + if model == "Trace": tr = Trace(label) if data is not None: tr.values = data - elif model == 'TraceXY': - tx = traceFromHeaderAndData(header['TraceX']) - ty = traceFromHeaderAndData(header['TraceY']) + elif model == "TraceXY": + tx = traceFromHeaderAndData(header["TraceX"]) + ty = traceFromHeaderAndData(header["TraceY"]) if data is not None: tx.values = data[:, 0] ty.values = data[:, 1] tr = TraceXY(label) tr.x = tx tr.y = ty - elif model == 'TraceSetSameX': - tx = traceFromHeaderAndData(header['TraceX']) + elif model == "TraceSetSameX": + tx = traceFromHeaderAndData(header["TraceX"]) tx.values = data[:, 0] tr = TraceSetSameX(label) tr.addTraceX(tx) - ytrs_header = header['TraceY'] + ytrs_header = header["TraceY"] cnt = 0 for l, h in ytrs_header.items(): ty = traceFromHeaderAndData(h) @@ -129,33 +132,34 @@ def traceFromHeaderAndData(header, data=None): tr.addTrace(trxy) else: - print(f'Error: unknown trace model: {model}') + print(f"Error: unknown trace model: {model}") return None - tr.config = header['config'] + tr.config = header["config"] return tr class Trace: """Base Trace class, which holds only one variable""" + def __init__(self, label): self.config = {} - self.config['label'] = label - self.config['model'] = 'Trace' - self.config['version'] = '0.1' + self.config["label"] = label + self.config["model"] = "Trace" + self.config["version"] = "0.1" # 'type' is useful to indicate way of representation, # it makes sense on y_vs_x traces. # If set to none we have normal y vs x. # If set to 'timestamp' x will be converted to datetime dates - self.config['type'] = None - self.config['item_format'] = '.15e' - self.config['tags'] = {} + self.config["type"] = None + self.config["item_format"] = ".15e" + self.config["tags"] = {} self.last_saved_pos = 0 self._trace_specific_init() self.clear_data() def _trace_specific_init(self): - self.config['unit'] = None + self.config["unit"] = None self.values = np.empty(0) self.last_saved_pos = 0 @@ -168,49 +172,54 @@ class Trace: self.values = np.empty(0, dtype=self.values.dtype) def __repr__(self): - lbl = self.config['label'] + lbl = self.config["label"] cls_name = f"{self.__class__.__name__}('{lbl}'" - return "".join([cls_name, f', N={self.values.size}', ')']) + return "".join([cls_name, f", N={self.values.size}", ")"]) def plot(self): import matplotlib.pyplot as plt + x = self.values - if self.config['type'] is not None: - if self.config['type'] == 'timestamp': + if self.config["type"] is not None: + if self.config["type"] == "timestamp": x = from_timestamps_to_dates(self.values) - plt.plot(x, label=self.config['label']) - plt.xlabel('index') + plt.plot(x, label=self.config["label"]) + plt.xlabel("index") plt.ylabel(f"{self.config['unit']}") plt.legend() plt.grid(True) def getConfig(self): d = {} - d['config'] = self.config.copy() - return (d) + d["config"] = self.config.copy() + return d def getData(self): - return (self.values) + return self.values - def getHeader(self, prefix=''): + def getHeader(self, prefix=""): d = self.getConfig() - return headerFromDictionary(d, prefix='') + return headerFromDictionary(d, prefix="") - def save(self, fname, last_saved_pos=None, - skip_headers_if_file_exist=False, **kwargs): + def save( + self, fname, last_saved_pos=None, skip_headers_if_file_exist=False, **kwargs + ): """Save trace to file and returns its filename.""" if last_saved_pos is None: last_saved_pos = self.last_saved_pos data = self.getData() if last_saved_pos > 0: skip_headers_if_file_exist = True - fname = save_table_with_header(fname, data[last_saved_pos:, :], - self.getHeader(), - item_format=self.config['item_format'], - skip_headers_if_file_exist=skip_headers_if_file_exist, - **kwargs) + fname = save_table_with_header( + fname, + data[last_saved_pos:, :], + self.getHeader(), + item_format=self.config["item_format"], + skip_headers_if_file_exist=skip_headers_if_file_exist, + **kwargs, + ) self.last_saved_pos = data.shape[0] - return (fname) + return fname def addPoint(self, val): self.values = np.append(self.values, val) @@ -221,9 +230,10 @@ class TraceXY(Trace): It is handy for Y vs X data arrangements. """ + def __init__(self, label): super().__init__(label) - self.config['model'] = 'TraceXY' + self.config["model"] = "TraceXY" def _trace_specific_init(self): self.x = None @@ -237,20 +247,21 @@ class TraceXY(Trace): self.y.clear_data() def __repr__(self): - lbl = self.config['label'] + lbl = self.config["label"] cls_name = f"{self.__class__.__name__}('{lbl}'" # xlabel = f"{self.x.config['label']}" xparam = f", {self.x}" yparam = f", {self.y}" - return "".join([cls_name, xparam, yparam, ')']) + return "".join([cls_name, xparam, yparam, ")"]) def plot(self): import matplotlib.pyplot as plt + x = self.x.values - if self.x.config['type'] is not None: - if self.x.config['type'] == 'timestamp': + if self.x.config["type"] is not None: + if self.x.config["type"] == "timestamp": x = from_timestamps_to_dates(x) - plt.plot(x, self.y.values, label=self.config['label']) + plt.plot(x, self.y.values, label=self.config["label"]) plt.xlabel(f"{self.x.config['label']} ({self.x.config['unit']})") plt.ylabel(f"{self.y.config['label']} ({self.y.config['unit']})") plt.legend() @@ -258,12 +269,12 @@ class TraceXY(Trace): def getConfig(self): config = {} - config['config'] = self.config.copy() - config['TraceX'] = {} - config['TraceX'] = self.x.getConfig() - config['TraceY'] = {} - config['TraceY'] = self.y.getConfig() - return (config) + config["config"] = self.config.copy() + config["TraceX"] = {} + config["TraceX"] = self.x.getConfig() + config["TraceY"] = {} + config["TraceY"] = self.y.getConfig() + return config def getData(self): data = self.x.values @@ -273,7 +284,7 @@ class TraceXY(Trace): if vals.ndim == 1: vals = vals[:, np.newaxis] data = np.concatenate((data, vals), 1) - return (data) + return data def addPoint(self, valX, valY): self.x.values = np.append(self.x.values, valX) @@ -283,9 +294,10 @@ class TraceXY(Trace): class TraceSetSameX(Trace): """Data structure to handle multiple Ys vs X dependencies. It is handy for scope traces.""" + def __init__(self, label): super().__init__(label) - self.config['model'] = 'TraceSetSameX' + self.config["model"] = "TraceSetSameX" def _trace_specific_init(self): self.x = None @@ -299,36 +311,37 @@ class TraceSetSameX(Trace): tr.clear_data() def __repr__(self): - lbl = self.config['label'] + lbl = self.config["label"] cls_name = f"{self.__class__.__name__}('{lbl}'" xparam = f", x: {self.x}" yparam = f", traces: {list(self.traces.keys())}" - return "".join([cls_name, xparam, yparam, ')']) + return "".join([cls_name, xparam, yparam, ")"]) def addTraceX(self, tr): self.x = tr def addTrace(self, tr): - if tr.config['model'] == 'TraceXY': + if tr.config["model"] == "TraceXY": if self.x == None: self.x = tr.x trY = tr.y - self.traces[tr.config['label']] = trY - elif tr.config['model'] == 'Trace': + self.traces[tr.config["label"]] = trY + elif tr.config["model"] == "Trace": if self.x == None: self.x = tr else: - self.traces[tr.config['label']] = tr + self.traces[tr.config["label"]] = tr def plot(self): import matplotlib.pyplot as plt + nplots = len(self.traces.keys()) fig = plt.gcf() fig, axs = plt.subplots(nplots, 1, sharex=True, num=fig.number) cnt = 0 x = self.x.values - if self.x.config['type'] is not None: - if self.x.config['type'] == 'timestamp': + if self.x.config["type"] is not None: + if self.x.config["type"] == "timestamp": x = from_timestamps_to_dates(x) for k, tr in self.traces.items(): p = axs[cnt].plot(x, tr.values, label=k) @@ -339,26 +352,26 @@ class TraceSetSameX(Trace): plt.xlabel(f"{self.x.config['label']} ({self.x.config['unit']})") def items(self): - return (self.traces.items()) + return self.traces.items() def keys(self): - return (self.traces.keys()) + return self.traces.keys() def getTrace(self, label): tr = TraceXY(label) tr.x = self.x tr.y = self.traces[label] - return (tr) + return tr def getConfig(self): config = {} - config['config'] = self.config.copy() - config['TraceX'] = {} - config['TraceX'] = self.x.getConfig() - config['TraceY'] = {} + config["config"] = self.config.copy() + config["TraceX"] = {} + config["TraceX"] = self.x.getConfig() + config["TraceY"] = {} for k, v in self.traces.items(): - config['TraceY'][k] = v.getConfig() - return (config) + config["TraceY"][k] = v.getConfig() + return config def getData(self): data = self.x.values @@ -369,7 +382,7 @@ class TraceSetSameX(Trace): if vals.ndim == 1: vals = vals[:, np.newaxis] data = np.concatenate((data, vals), 1) - return (data) + return data def addPointToTrace(self, val, name=None): if name is None: @@ -380,33 +393,33 @@ class TraceSetSameX(Trace): self.traces[name].values = a -if __name__ == '__main__': +if __name__ == "__main__": print("Testing trace") - x = Trace('x trace') + x = Trace("x trace") x.values = np.random.normal(2, 2, (4, 1)) x.values = np.array(x.values, int) - x.config['unit'] = 's' - x.config['tags']['tag1'] = 'xxxx' - x.config['tags']['tag2'] = 'xxxx' - x.save('xtrace.dat', skip_headers_if_file_exist=True) + x.config["unit"] = "s" + x.config["tags"]["tag1"] = "xxxx" + x.config["tags"]["tag2"] = "xxxx" + x.save("xtrace.dat", skip_headers_if_file_exist=True) # print(x.getHeader()) - y = Trace('y trace') + y = Trace("y trace") y.values = np.random.normal(2, 2, (4, 1)) - y.config['unit'] = 'V' - y.config['tags']['ytag2'] = 'yyyy' - xy = TraceXY('xy trace') - xy.config['tags']['xy tag'] = 'I am xy tag' + y.config["unit"] = "V" + y.config["tags"]["ytag2"] = "yyyy" + xy = TraceXY("xy trace") + xy.config["tags"]["xy tag"] = "I am xy tag" xy.x = x xy.y = y - xy.save('xytrace.dat') + xy.save("xytrace.dat") # print(xy.getHeader()) - xyn = TraceSetSameX('many ys trace') - xyn.config['tags']['descr'] = 'I am many ys trace' - xy.config['label'] = 'y1' + xyn = TraceSetSameX("many ys trace") + xyn.config["tags"]["descr"] = "I am many ys trace" + xy.config["label"] = "y1" xyn.addTrace(xy) - xy.config['label'] = 'y2' + xy.config["label"] = "y2" xyn.addTrace(xy) - xy.config['label'] = 'y3' + xy.config["label"] = "y3" xyn.addTrace(xy) - xyn.save('xyntrace.dat') + xyn.save("xyntrace.dat") # print(xyn.getHeader()) -- cgit v1.2.3