diff options
author | Eugeniy E. Mikhailov <evgmik@gmail.com> | 2024-07-13 18:24:56 -0400 |
---|---|---|
committer | Eugeniy E. Mikhailov <evgmik@gmail.com> | 2024-07-13 18:24:56 -0400 |
commit | 52bfae2b52b76186e61e9d5555fb5bfe021d29d9 (patch) | |
tree | 8445f772db7172907debda4c4dd0c7d353230fac /qolab/file_utils | |
parent | f4371ce3253f83775357aa8245fd785336c72f49 (diff) | |
download | qolab-52bfae2b52b76186e61e9d5555fb5bfe021d29d9.tar.gz qolab-52bfae2b52b76186e61e9d5555fb5bfe021d29d9.zip |
black formatter
Diffstat (limited to 'qolab/file_utils')
-rw-r--r-- | qolab/file_utils/__init__.py | 125 |
1 files changed, 76 insertions, 49 deletions
diff --git a/qolab/file_utils/__init__.py b/qolab/file_utils/__init__.py index cbc9f2a..ab674ae 100644 --- a/qolab/file_utils/__init__.py +++ b/qolab/file_utils/__init__.py @@ -3,7 +3,8 @@ import re import os from datetime import date -def filename2os_fname( fname ): + +def filename2os_fname(fname): r"""Translate Windows or Linux filename to OS dependent style. Takes in account the notion of 'Z:' drive on different systems. @@ -11,83 +12,104 @@ def filename2os_fname( fname ): Example: Z:\\dir1\\dir2\\file <==> /mnt/qol_grp_data/dir1/dir2/file """ - if platform.system() == 'Windows': - fname = re.sub('/mnt/qol_grp_data', 'Z:', fname) + if platform.system() == "Windows": + fname = re.sub("/mnt/qol_grp_data", "Z:", fname) else: - fname = re.sub('Z:', '/mnt/qol_grp_data', fname) - fname = re.sub(r'\\', '/', fname) + fname = re.sub("Z:", "/mnt/qol_grp_data", fname) + fname = re.sub(r"\\", "/", fname) fname = os.path.normpath(fname) - return (fname) + return fname def get_runnum(data_dir): - r""" Reads, increments data counter and saves it back in the provided `data_dir`. + r"""Reads, increments data counter and saves it back in the provided `data_dir`. If necessary creates counter file and full path to it. Examples": get_runnum('Z:\\Ramsi_EIT\\data\\') get_runnum('/mnt/qol_grp_data/data') """ - data_dir = filename2os_fname( data_dir ); + data_dir = filename2os_fname(data_dir) if not os.path.exists(data_dir): - os.mkdir(data_dir) + os.mkdir(data_dir) if not os.path.isdir(data_dir): print(f"ERROR: cannot create directory for data: {data_dir}") print("Will use current dir for storage") data_dir = "." - runnumpath = os.path.join(data_dir, 'autofile') + runnumpath = os.path.join(data_dir, "autofile") # convert to OS dependent way - runnumpath = filename2os_fname( runnumpath ); + runnumpath = filename2os_fname(runnumpath) if not os.path.exists(runnumpath): - os.mkdir(runnumpath) - runnum_file = os.path.join(runnumpath, 'runnum.dat'); - runnum_file = filename2os_fname( runnum_file ); + os.mkdir(runnumpath) + runnum_file = os.path.join(runnumpath, "runnum.dat") + runnum_file = filename2os_fname(runnum_file) run_number = 0 if os.path.isfile(runnum_file): - with open(runnum_file, 'r') as f: + with open(runnum_file, "r") as f: content = f.readlines() run_number = int(content[0]) f.close() # Increment it and fold if needed - run_number = run_number + 1; + run_number = run_number + 1 # Important: we are using five digit counters to synchronize # with qol_get_next_data_file.m if run_number > 99999: run_number = 0 - with open(runnum_file, 'w') as f: - f.write(f'{run_number}') + with open(runnum_file, "w") as f: + f.write(f"{run_number}") f.close() - return(run_number) - -def get_next_data_file(prefix, savepath, run_number=None, datestr=None, date_format='%Y%m%d', extension='dat'): - """Generate a filename according to a standard naming scheme + return run_number + + +def get_next_data_file( + prefix, + savepath, + run_number=None, + datestr=None, + date_format="%Y%m%d", + extension="dat", +): + """Generate a filename according to a standard naming scheme fname = os.path.join(savepath, f'{prefix}_{datestr}_{run_number:05d}.{extension}') if run_number is missing, acquires it with `get_runnum( savepath )` """ if run_number is None: - run_number = get_runnum( savepath ) + run_number = get_runnum(savepath) today = date.today() if datestr is None: datestr = today.strftime(date_format) - fname = os.path.join(savepath, f'{prefix}_{datestr}_{run_number:05d}.{extension}') - return(fname) + fname = os.path.join(savepath, f"{prefix}_{datestr}_{run_number:05d}.{extension}") + return fname + def infer_compression(fname): """Infers compression algorithm from filename extension""" - compression = None # usual suspect + compression = None # usual suspect b, fext = os.path.splitext(fname) - if fext == '.gz': - compression = 'gzip' - elif ( fext == '.bz') or (fext == '.bz2'): - compression = 'bzip' + if fext == ".gz": + compression = "gzip" + elif (fext == ".bz") or (fext == ".bz2"): + compression = "bzip" return compression -def save_table_with_header(fname, data, header='', comment_symbol='%', skip_headers_if_file_exist=False, item_format='e', item_separator='\t', compressionmethod=None, compresslevel=9, match_filename_to_compression=True): + +def save_table_with_header( + fname, + data, + header="", + comment_symbol="%", + skip_headers_if_file_exist=False, + item_format="e", + item_separator="\t", + compressionmethod=None, + compresslevel=9, + match_filename_to_compression=True, +): r"""Saves output to CSV or TSV file with specially formatted header. The file is appended if needed. @@ -122,29 +144,34 @@ def save_table_with_header(fname, data, header='', comment_symbol='%', skip_head compression_infered = infer_compression(fname) if (compression_infered != compressionmethod) and match_filename_to_compression: if compressionmethod is None: - fname += '.dat' - if compressionmethod == 'gzip': - fname += '.gz' - elif compressionmethod == 'bzip': - fname += '.bz' + fname += ".dat" + if compressionmethod == "gzip": + fname += ".gz" + elif compressionmethod == "bzip": + fname += ".bz" file_exist_flag = os.path.exists(fname) - item_format=str.join('', ['{', f':{item_format}', '}']) - _open = open # standard file handler - if compressionmethod == 'gzip': + item_format = str.join("", ["{", f":{item_format}", "}"]) + _open = open # standard file handler + if compressionmethod == "gzip": import gzip - _open = lambda fname, mode: gzip.open( fname, mode=mode, compresslevel = compresslevel) - if compressionmethod == 'bzip': + + _open = lambda fname, mode: gzip.open( + fname, mode=mode, compresslevel=compresslevel + ) + if compressionmethod == "bzip": import bz2 - _open = lambda fname, mode: bz2.open( fname, mode=mode, compresslevel = compresslevel) - with _open(fname, mode='ab') as f: + + _open = lambda fname, mode: bz2.open( + fname, mode=mode, compresslevel=compresslevel + ) + with _open(fname, mode="ab") as f: if not (file_exist_flag and skip_headers_if_file_exist): for l in header: - f.write(f'{comment_symbol} {l}\n'.encode('utf-8')) + f.write(f"{comment_symbol} {l}\n".encode("utf-8")) if data is not None: for r in data: - l=item_separator.join( map(item_format.format, r)) - f.write(l.encode('utf-8')) - f.write('\n'.encode('utf-8')) + l = item_separator.join(map(item_format.format, r)) + f.write(l.encode("utf-8")) + f.write("\n".encode("utf-8")) f.close() - return(fname) - + return fname |