aboutsummaryrefslogtreecommitdiff
path: root/qolab/file_utils/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'qolab/file_utils/__init__.py')
-rw-r--r--qolab/file_utils/__init__.py125
1 files changed, 76 insertions, 49 deletions
diff --git a/qolab/file_utils/__init__.py b/qolab/file_utils/__init__.py
index cbc9f2a..ab674ae 100644
--- a/qolab/file_utils/__init__.py
+++ b/qolab/file_utils/__init__.py
@@ -3,7 +3,8 @@ import re
import os
from datetime import date
-def filename2os_fname( fname ):
+
+def filename2os_fname(fname):
r"""Translate Windows or Linux filename to OS dependent style.
Takes in account the notion of 'Z:' drive on different systems.
@@ -11,83 +12,104 @@ def filename2os_fname( fname ):
Example: Z:\\dir1\\dir2\\file <==> /mnt/qol_grp_data/dir1/dir2/file
"""
- if platform.system() == 'Windows':
- fname = re.sub('/mnt/qol_grp_data', 'Z:', fname)
+ if platform.system() == "Windows":
+ fname = re.sub("/mnt/qol_grp_data", "Z:", fname)
else:
- fname = re.sub('Z:', '/mnt/qol_grp_data', fname)
- fname = re.sub(r'\\', '/', fname)
+ fname = re.sub("Z:", "/mnt/qol_grp_data", fname)
+ fname = re.sub(r"\\", "/", fname)
fname = os.path.normpath(fname)
- return (fname)
+ return fname
def get_runnum(data_dir):
- r""" Reads, increments data counter and saves it back in the provided `data_dir`.
+ r"""Reads, increments data counter and saves it back in the provided `data_dir`.
If necessary creates counter file and full path to it.
Examples":
get_runnum('Z:\\Ramsi_EIT\\data\\')
get_runnum('/mnt/qol_grp_data/data')
"""
- data_dir = filename2os_fname( data_dir );
+ data_dir = filename2os_fname(data_dir)
if not os.path.exists(data_dir):
- os.mkdir(data_dir)
+ os.mkdir(data_dir)
if not os.path.isdir(data_dir):
print(f"ERROR: cannot create directory for data: {data_dir}")
print("Will use current dir for storage")
data_dir = "."
- runnumpath = os.path.join(data_dir, 'autofile')
+ runnumpath = os.path.join(data_dir, "autofile")
# convert to OS dependent way
- runnumpath = filename2os_fname( runnumpath );
+ runnumpath = filename2os_fname(runnumpath)
if not os.path.exists(runnumpath):
- os.mkdir(runnumpath)
- runnum_file = os.path.join(runnumpath, 'runnum.dat');
- runnum_file = filename2os_fname( runnum_file );
+ os.mkdir(runnumpath)
+ runnum_file = os.path.join(runnumpath, "runnum.dat")
+ runnum_file = filename2os_fname(runnum_file)
run_number = 0
if os.path.isfile(runnum_file):
- with open(runnum_file, 'r') as f:
+ with open(runnum_file, "r") as f:
content = f.readlines()
run_number = int(content[0])
f.close()
# Increment it and fold if needed
- run_number = run_number + 1;
+ run_number = run_number + 1
# Important: we are using five digit counters to synchronize
# with qol_get_next_data_file.m
if run_number > 99999:
run_number = 0
- with open(runnum_file, 'w') as f:
- f.write(f'{run_number}')
+ with open(runnum_file, "w") as f:
+ f.write(f"{run_number}")
f.close()
- return(run_number)
-
-def get_next_data_file(prefix, savepath, run_number=None, datestr=None, date_format='%Y%m%d', extension='dat'):
- """Generate a filename according to a standard naming scheme
+ return run_number
+
+
+def get_next_data_file(
+ prefix,
+ savepath,
+ run_number=None,
+ datestr=None,
+ date_format="%Y%m%d",
+ extension="dat",
+):
+ """Generate a filename according to a standard naming scheme
fname = os.path.join(savepath, f'{prefix}_{datestr}_{run_number:05d}.{extension}')
if run_number is missing, acquires it with `get_runnum( savepath )`
"""
if run_number is None:
- run_number = get_runnum( savepath )
+ run_number = get_runnum(savepath)
today = date.today()
if datestr is None:
datestr = today.strftime(date_format)
- fname = os.path.join(savepath, f'{prefix}_{datestr}_{run_number:05d}.{extension}')
- return(fname)
+ fname = os.path.join(savepath, f"{prefix}_{datestr}_{run_number:05d}.{extension}")
+ return fname
+
def infer_compression(fname):
"""Infers compression algorithm from filename extension"""
- compression = None # usual suspect
+ compression = None # usual suspect
b, fext = os.path.splitext(fname)
- if fext == '.gz':
- compression = 'gzip'
- elif ( fext == '.bz') or (fext == '.bz2'):
- compression = 'bzip'
+ if fext == ".gz":
+ compression = "gzip"
+ elif (fext == ".bz") or (fext == ".bz2"):
+ compression = "bzip"
return compression
-def save_table_with_header(fname, data, header='', comment_symbol='%', skip_headers_if_file_exist=False, item_format='e', item_separator='\t', compressionmethod=None, compresslevel=9, match_filename_to_compression=True):
+
+def save_table_with_header(
+ fname,
+ data,
+ header="",
+ comment_symbol="%",
+ skip_headers_if_file_exist=False,
+ item_format="e",
+ item_separator="\t",
+ compressionmethod=None,
+ compresslevel=9,
+ match_filename_to_compression=True,
+):
r"""Saves output to CSV or TSV file with specially formatted header.
The file is appended if needed.
@@ -122,29 +144,34 @@ def save_table_with_header(fname, data, header='', comment_symbol='%', skip_head
compression_infered = infer_compression(fname)
if (compression_infered != compressionmethod) and match_filename_to_compression:
if compressionmethod is None:
- fname += '.dat'
- if compressionmethod == 'gzip':
- fname += '.gz'
- elif compressionmethod == 'bzip':
- fname += '.bz'
+ fname += ".dat"
+ if compressionmethod == "gzip":
+ fname += ".gz"
+ elif compressionmethod == "bzip":
+ fname += ".bz"
file_exist_flag = os.path.exists(fname)
- item_format=str.join('', ['{', f':{item_format}', '}'])
- _open = open # standard file handler
- if compressionmethod == 'gzip':
+ item_format = str.join("", ["{", f":{item_format}", "}"])
+ _open = open # standard file handler
+ if compressionmethod == "gzip":
import gzip
- _open = lambda fname, mode: gzip.open( fname, mode=mode, compresslevel = compresslevel)
- if compressionmethod == 'bzip':
+
+ _open = lambda fname, mode: gzip.open(
+ fname, mode=mode, compresslevel=compresslevel
+ )
+ if compressionmethod == "bzip":
import bz2
- _open = lambda fname, mode: bz2.open( fname, mode=mode, compresslevel = compresslevel)
- with _open(fname, mode='ab') as f:
+
+ _open = lambda fname, mode: bz2.open(
+ fname, mode=mode, compresslevel=compresslevel
+ )
+ with _open(fname, mode="ab") as f:
if not (file_exist_flag and skip_headers_if_file_exist):
for l in header:
- f.write(f'{comment_symbol} {l}\n'.encode('utf-8'))
+ f.write(f"{comment_symbol} {l}\n".encode("utf-8"))
if data is not None:
for r in data:
- l=item_separator.join( map(item_format.format, r))
- f.write(l.encode('utf-8'))
- f.write('\n'.encode('utf-8'))
+ l = item_separator.join(map(item_format.format, r))
+ f.write(l.encode("utf-8"))
+ f.write("\n".encode("utf-8"))
f.close()
- return(fname)
-
+ return fname