aboutsummaryrefslogtreecommitdiff
path: root/tests/test_tableflow.py
blob: 41f36203cce3103c1db32c4df3c7dc3f5e37f6e1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import pytest
import qolab.tableflow as tblfl
import pandas as pd


def test_table_load_noinputs():
    assert tblfl.loadInOutTables() == (None, None)
    assert tblfl.loadInOutTables(
        inputFileName=None, outputFileName="non_existing_file"
    ) == (None, None)


def test_wrong_comment_in_table_file_to_load():
    with pytest.raises(Exception):
        # should raise ParserError
        tblfl.loadInOutTables(
            inputFileName="tests/tableflow_test_data/tableIn1.csv",
            outputFileName=None,
            comment="%",
        )


def test_right_comment_in_table_file_to_load():
    tIn, tOut = tblfl.loadInOutTables(
        inputFileName="tests/tableflow_test_data/tableIn1.csv",
        outputFileName=None,
        comment="#",
    )
    assert type(tIn) == pd.core.frame.DataFrame


def test_table_equality_with_no_output_file_name():
    tIn, tOut = tblfl.loadInOutTables(
        inputFileName="tests/tableflow_test_data/tableIn1.csv",
        outputFileName=None,
        comment="#",
    )
    assert type(tIn) == pd.core.frame.DataFrame
    assert type(tOut) == pd.core.frame.DataFrame
    assert tIn.equals(tOut)
    col0 = tIn.keys()[0]
    vBefore = tIn.at[0, col0]
    tIn.at[0, col0] = vBefore + 1
    assert not tIn.equals(tOut)


def test_table_load_with_in_out_file_names():
    # different filenames, same content for ease of testing
    tIn, tOut = tblfl.loadInOutTables(
        inputFileName="tests/tableflow_test_data/tableIn1.csv",
        outputFileName="tests/tableflow_test_data/tableOut1nonProcessed.csv",
        comment="#",
    )
    assert type(tIn) == pd.core.frame.DataFrame
    assert type(tOut) == pd.core.frame.DataFrame
    assert tIn.equals(tOut)

    # different filenames, different content
    tIn, tOut = tblfl.loadInOutTables(
        inputFileName="tests/tableflow_test_data/tableIn1.csv",
        outputFileName="tests/tableflow_test_data/tableOut1pariallyProcessed.csv",
        comment="#",
    )
    assert type(tIn) == pd.core.frame.DataFrame
    assert type(tOut) == pd.core.frame.DataFrame
    assert not tIn.equals(tOut)
    assert "out1" in tOut.columns
    assert "out1" not in tIn.columns


def test_for_existing_row():
    tbl1 = pd.DataFrame({"a": [1, 2, 3], "b": [1, 4, 6]})
    r = pd.Series({"a": 2, "b": 4})
    assert tblfl.ilocRowOrAdd(tbl1, r) == 1


def test_for_existing_row_with_NA():
    # NA in both table and raw should return a hit
    tbl1 = pd.DataFrame({"a": [1, 2, 3], "b": [1, pd.NA, 6]})
    r = pd.Series({"a": 2, "b": pd.NA})
    assert tblfl.ilocRowOrAdd(tbl1, r) == 1

    # should insert new row
    tbl1 = pd.DataFrame({"a": [1, 2, 3], "b": [1, 4, 6]})
    r = pd.Series({"a": 2, "b": pd.NA})
    assert tblfl.ilocRowOrAdd(tbl1, r) == 3

    # should insert new row
    tbl1 = pd.DataFrame({"a": [1, 2, 3], "b": [1, 4, 6]})
    r = pd.Series({"a": 2, "b": pd.NA})
    assert tblfl.ilocRowOrAdd(tbl1, r) == 3


def test_for_nonexisting_row_and_its_insertion():
    tbl1 = pd.DataFrame({"a": [1, 2, 3], "b": [1, 4, 6]})
    r = pd.Series({"a": 2, "b": 10})
    assert len(tbl1) == 3
    assert tblfl.ilocRowOrAdd(tbl1, r) == 3
    assert len(tbl1) == 4


def test_isRedoNeeded():
    r = pd.Series({"a": 2, "b": 4, "c": pd.NA})
    assert not tblfl.isRedoNeeded(r, ["a", "b"])
    assert tblfl.isRedoNeeded(r, ["c"])
    assert tblfl.isRedoNeeded(r, ["non_existing"])
    assert not tblfl.isRedoNeeded(r, ["b", "c"])


def test_reflowTable():
    tIn, tOut = tblfl.loadInOutTables(
        inputFileName="tests/tableflow_test_data/tableIn1.csv",
        outputFileName="tests/tableflow_test_data/tableOut1pariallyProcessed.csv",
        comment="#",
    )
    # check for warnings
    with pytest.warns(UserWarning):
        tblfl.reflowTable(tIn, tOut)

    with pytest.warns(UserWarning):
        tblfl.reflowTable(tIn, tOut, postProcessedColums=["dummyName"])

    def frow(row):
        return row

    with pytest.warns(UserWarning):
        tblfl.reflowTable(tIn, tOut, process_row_func=frow)

    # now run reflow
    def frow(row, extraInfo=None):
        row["out1"] = row["x"] * row["x"]
        return row

    assert len(tIn) != len(tOut)
    tblfl.reflowTable(
        tIn, tOut, process_row_func=frow, postProcessedColums=["out1", "out2"]
    )
    assert len(tIn) == len(tOut)
    assert (tOut["out1"] == tOut["x"] * tOut["x"]).all()

    # check that reflow is done
    tOut.loc[tOut["x"] == 1, "out1"] = pd.NA
    tblfl.reflowTable(
        tIn, tOut, process_row_func=frow, postProcessedColums=["out1", "out2"]
    )
    assert (tOut["out1"] == tOut["x"] * tOut["x"]).all()

    # check that reflow is not reprocessed
    tOut.loc[tOut["x"] == 1, "out1"] = 12121  # crazy number
    tblfl.reflowTable(
        tIn, tOut, process_row_func=frow, postProcessedColums=["out1", "out2"]
    )
    assert (tOut.loc[tOut["x"] == 1, "out1"] == 12121).all()  # should not change

    # now we are forcing redo
    tOut.loc[tOut["x"] == 1, "out1"] = 12121  # crazy number
    tblfl.reflowTable(
        tIn,
        tOut,
        process_row_func=frow,
        postProcessedColums=["out1", "out2"],
        redo=True,
    )
    assert not (tOut.loc[tOut["x"] == 1, "out1"] == 12121).all()  # must not be the same
    assert (tOut["out1"] == tOut["x"] * tOut["x"]).all()