1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
|
import platform
import re
import os
from datetime import date
def filename2os_fname(fname):
r"""Translate Windows or Linux filename to OS dependent style.
Takes in account the notion of 'Z:' drive on different systems.
In particular replaces Z: <==> /mnt/qol_grp_data and \\ <==> /
Example: Z:\\dir1\\dir2\\file <==> /mnt/qol_grp_data/dir1/dir2/file
"""
if platform.system() == "Windows":
fname = re.sub("/mnt/qol_grp_data", "Z:", fname)
else:
fname = re.sub("Z:", "/mnt/qol_grp_data", fname)
fname = re.sub(r"\\", "/", fname)
fname = os.path.normpath(fname)
return fname
def get_runnum(data_dir):
r"""Reads, increments data counter and saves it back in the provided `data_dir`.
If necessary creates counter file and full path to it.
Examples":
get_runnum('Z:\\Ramsi_EIT\\data\\')
get_runnum('/mnt/qol_grp_data/data')
"""
data_dir = filename2os_fname(data_dir)
if not os.path.exists(data_dir):
os.mkdir(data_dir)
if not os.path.isdir(data_dir):
print(f"ERROR: cannot create directory for data: {data_dir}")
print("Will use current dir for storage")
data_dir = "."
runnumpath = os.path.join(data_dir, "autofile")
# convert to OS dependent way
runnumpath = filename2os_fname(runnumpath)
if not os.path.exists(runnumpath):
os.mkdir(runnumpath)
runnum_file = os.path.join(runnumpath, "runnum.dat")
runnum_file = filename2os_fname(runnum_file)
run_number = 0
if os.path.isfile(runnum_file):
with open(runnum_file, "r") as f:
content = f.readlines()
run_number = int(content[0])
f.close()
# Increment it and fold if needed
run_number = run_number + 1
# Important: we are using five digit counters to synchronize
# with qol_get_next_data_file.m
if run_number > 99999:
run_number = 0
with open(runnum_file, "w") as f:
f.write(f"{run_number}")
f.close()
return run_number
def get_next_data_file(
prefix,
savepath,
run_number=None,
datestr=None,
date_format="%Y%m%d",
extension="dat",
):
"""Generate a filename according to a standard naming scheme
fname = os.path.join(savepath, f'{prefix}_{datestr}_{run_number:05d}.{extension}')
if run_number is missing, acquires it with `get_runnum( savepath )`
"""
if run_number is None:
run_number = get_runnum(savepath)
today = date.today()
if datestr is None:
datestr = today.strftime(date_format)
fname = os.path.join(savepath, f"{prefix}_{datestr}_{run_number:05d}.{extension}")
return fname
def infer_compression(fname):
"""Infers compression algorithm from filename extension"""
compression = None # usual suspect
b, fext = os.path.splitext(fname)
if fext == ".gz":
compression = "gzip"
elif (fext == ".bz") or (fext == ".bz2"):
compression = "bzip"
return compression
def save_table_with_header(
fname,
data,
header="",
comment_symbol="%",
skip_headers_if_file_exist=False,
item_format="e",
item_separator="\t",
compressionmethod=None,
compresslevel=9,
match_filename_to_compression=True,
):
r"""Saves output to CSV or TSV file with specially formatted header.
The file is appended if needed.
It is possible to compress output file.
Parameters
----------
fname : string, full path of the saved file.
data : array type (python or numpy).
header : list or array of header strings to put at the beginning of the record
comment_symbol : prefix for the header lines, default is '%'.
Note that headers are actually prefixed with <comment_symbol> and <space>.
Historically it is chosen as '%' to make files compatible with Matlab `load`.
'#' symbmol is also good choice.
skip_headers_if_file_exist : True or False (default).
When True skip addition of headers in already existing file.
Useful when appending to file with the same headers.
item_format : output format like in formatted strings, examples are 'e', '.15e', 'f'
item_separator : how to separate columns, '\t' is default.
Natural choices are either ',' (comma) or '\t' (tab).
compressionmethod : compression method
- None : no compression (default)
- gzip : gzip method of compression
- bzip : bzip2 method of compression
compresslevel : 9 (default) the highest compression, 0 no compression at all, as it is defined for gzip in Lib/gzip.py
match_filename_to_compression: True (default) or False
If True changes the filename suffix in accordance with compression method,
e.g. 'data.dat' -> 'data.dat.gz' if compression is set to 'gzip',
otherwise assumes that users know what they do.
"""
fname = filename2os_fname(fname)
compression_infered = infer_compression(fname)
if (compression_infered != compressionmethod) and match_filename_to_compression:
if compressionmethod is None:
fname += ".dat"
if compressionmethod == "gzip":
fname += ".gz"
elif compressionmethod == "bzip":
fname += ".bz"
file_exist_flag = os.path.exists(fname)
item_format = str.join("", ["{", f":{item_format}", "}"])
_open = open # standard file handler
if compressionmethod == "gzip":
import gzip
_open = lambda fname, mode: gzip.open(
fname, mode=mode, compresslevel=compresslevel
)
if compressionmethod == "bzip":
import bz2
_open = lambda fname, mode: bz2.open(
fname, mode=mode, compresslevel=compresslevel
)
with _open(fname, mode="ab") as f:
if not (file_exist_flag and skip_headers_if_file_exist):
for l in header:
f.write(f"{comment_symbol} {l}\n".encode("utf-8"))
if data is not None:
for r in data:
l = item_separator.join(map(item_format.format, r))
f.write(l.encode("utf-8"))
f.write("\n".encode("utf-8"))
f.close()
return fname
|