Skip to content

Instantly share code, notes, and snippets.

@jmarrec
Created July 17, 2025 11:36
Show Gist options
  • Save jmarrec/583a4b2dc5a38ce106afe609590f816c to your computer and use it in GitHub Desktop.
Save jmarrec/583a4b2dc5a38ce106afe609590f816c to your computer and use it in GitHub Desktop.
EIO Differ
import argparse
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
import pandas as pd
from tqdm.auto import tqdm
SKIP_STRINGS = [
"Program Version,EnergyPlus",
"Version,",
"<Version>,",
"EnergyPlus Completed",
"EnergyPlus Terminated",
"DElight input generated",
"(idf)=",
"(user input)=",
"(input file)=",
"(IDF Directory)=",
"(Current Working Directory)=",
'(Current Working Directory)"=',
"ReadVars Run Time",
"EnergyPlus Program Version",
"PythonPlugin: Class",
"ExpandObjects Finished. Time:",
"EnergyPlus, Version",
"EnergyPlus Run Time=",
"ParametricPreprocessor Finished. Time:",
"ExpandObjects Finished with Error(s). Time:",
"Elapsed time: ",
"End of Data",
"Surface View Factor - Zone/Enclosure Information,",
"Approximate or User Input Solar ViewFactors,",
"Approximate or User Input ViewFactors,",
"Final Solar ViewFactors,",
"Final ViewFactors,",
"Script F Factors,",
"Script F Factor,",
"Schedule Details Report=Timestep",
"Schedule Details Report=Hourly",
"! <Solar View Factor Information>",
"! <Surface View Factor and Grey Interchange Information>",
"AirflowNetwork: Advanced Single-Sided Model",
]
BROKEN_HEADERS = {
# data key, actual header
"Program Control:Threads/Parallel Sims": "Program Control Information:Threads/Parallel Sims",
"Surface View Factor - Surface Information": "View Factor - Surface Information",
"ScheduleTypeLimits": "ScheduleType",
"Schedule:Week:Daily": "WeekSchedule",
"RefrigerationDoorMixing Airflow Stats Nominal": "RefrigerationDoorMixing Airflow Stats Nominal",
"RefrigerationDoorMixing Airflow Stats Nominal": "RefrigerationDoorMixing Airflow Stats Nominal",
"Walk In": "Refrigeration Walk In Cooler",
"Walk In Surfaces Facing Zone": "Walk-In Surfaces Facing Zone",
}
MISSING_HEADERS = {
"WindowMaterial:Glazing": [
"Material Name",
"Optical Data Type",
"Spectral Data Set Name",
"Thickness {m}",
"Solar Transmittance",
"Front Solar Reflectance",
"Back Solar Reflectance",
"Visible Transmittance",
"Front Visible Reflectance",
"Back Visible Reflectance",
"Infrared Transmittance",
"Front Thermal Emissivity",
"Back Thermal Emissivity",
"Conductivity {W/m-K}",
"Dirt Factor",
"Solar Diffusing",
],
"Chilled Water Tank Information": [
"Type",
"Name",
"Volume {m3}",
"Use Side Design Flow Rate {m3/s}",
"Source Side Design Flow Rate {m3/s}",
],
"WindowMaterial:Gas": ["Material Name", "GasType", "Thickness {m}"],
}
def fixup_airflow_stats(content):
lookup = "! <ZoneInfiltration Airflow Stats Nominal>"
i = content.find("! <ZoneInfiltration Airflow Stats Nominal>")
if i > 0 and content[i - 1] != "\n":
content = content[:i] + "\n" + content[i:]
return content
def split_eio_header_body_lines(eio_path: Path) -> (list[str], list[str]):
content = eio_path.read_text()
content = fixup_airflow_stats(content=content)
lines = [line for line in content.splitlines() if not any([x in line for x in SKIP_STRINGS])]
header_lines = []
body_lines = []
for line in lines:
if line[0] == "!":
header_lines.append(line.strip())
else:
body_lines.append(line.strip())
return header_lines, body_lines
def get_eio_header_dict(header_lines: list[str]) -> dict[str, list[str]]:
headers = {}
for header_line in header_lines:
split = [x.strip() for x in header_line.split(",")]
index = split[0].split("<")[1].split(">")[0].strip()
columns = split[1:]
if not columns[-1]:
columns = columns[:-1]
headers[index] = columns
return headers
def read_eio_tables(eio_path: Path) -> dict[str, pd.DataFrame]:
header_lines, body_lines = split_eio_header_body_lines(eio_path=eio_path)
headers = get_eio_header_dict(header_lines=header_lines)
series = {}
for body_line in body_lines:
split = [x.strip() for x in body_line.split(",")]
name = split[0]
values = split[1:]
if name in BROKEN_HEADERS:
name = BROKEN_HEADERS[name]
index = None
if not name in headers:
# PLAIN MISSING
if name in MISSING_HEADERS:
headers[name] = MISSING_HEADERS[name]
elif name in ["Refrigeration Low-Stage Compressor", "Refrigeration High-Stage Compressor"]:
index = headers["Refrigeration Compressor"]
elif name in ["Medium Temperature Refrigeration Case", "Low Temperature Refrigeration Case"]:
index = headers["Refrigeration Case"]
elif name in ["Low Temperature Refrigeration Walk In Cooler"]:
index = headers["Refrigeration Walk In Cooler"]
elif name in ["High Pressure Refrigeration Compressor", "Low Pressure Refrigeration Compressor"]:
index = headers["Refrigeration Compressor"]
elif name == "SHELF":
values.insert(0, name)
name = "Shelf Details"
elif "COUPLEDSLAB" in name or "COUPLEDBASEMENT" in name:
values.insert(0, name)
name = "Domain Name"
elif name in ["FOUNDATION KIVA 1", "<Default Foundation>", "SLAB FOUNDATION", "BASEMENT FOUNDATION"]:
values.insert(0, name)
name = "Kiva Foundation Name"
else:
msg = f"Not found: '{name}' in '{eio_path}'"
# print(msg)
raise ValueError(msg)
if index is None:
index = headers[name]
values = values[: len(index)]
accept_failure = False
if name == "CTF" and values[0] == "0":
values.append(None)
if name == "Environment:Daylight Saving" and len(values) < len(index):
values = values + [None] * (len(index) - len(values))
if name in ["Zone Surfaces", "Shading Surfaces"]:
index = [x for x in index if "Vertices are shown" not in x]
if name in ["HeatTransfer Surface", "Shading Surface"]:
if "{etc}" in index:
index.remove("{etc}")
if len(values) < len(index):
values = values + [None] * (len(index) - len(values))
accept_failure = True
if name == "Enclosure/Window Adjacency Daylighting Matrix":
# 'Adjacent Enclosure Names - 1st 100 (max)' is the last one
if len(index) < len(values):
index = index[:2] + [f"Adjacent Enclosure Name_{i}" for i in range(len(values) - 2)]
accept_failure = True
if name in ["Kiva Foundation Name", "Frame/Divider Surface", "WindowConstruction:Complex"]:
accept_failure = True
if name == "WindowMaterial:Glazing:EquivalentLayer":
index = [x for x in index if x]
if name == "Refrigeration Walk In Cooler":
accept_failure = True
if name == "Water Cooling Coil Capacity Information":
accept_failure = True
try:
s = pd.Series(data=values, index=index, name=name)
except:
if accept_failure:
# s = pd.Series({k: v for k, v in zip(index, values)}, name=name)
s = pd.Series({k: (values[i] if i < len(values) else None) for i, k in enumerate(index)}, name=name)
else:
# return values, index, name
raise ValueError(f"Failed for {values=}, {index=}, {name=} for {eio_path=}")
if name not in series:
series[name] = []
series[name].append(s)
dataframes = {}
string_dataframes = {}
for name, s in series.items():
df = pd.concat(s, axis=1).T # pd.DataFrame(s)
df.index.name = "table_name"
df.reset_index(drop=False, inplace=True)
df.set_index(df.columns[1], inplace=True)
df.index.name = "key"
df.columns.name = "column"
string_dataframes[name] = df.copy()
for col in df.columns:
try:
df[col] = pd.to_numeric(df[col])
except:
pass
dataframes[name] = df
return dataframes, string_dataframes
def diff_eio(
idf_file: str, base_testfiles_dir: Path, mod_testfiles_dir: Path
) -> (pd.DataFrame, pd.DataFrame, pd.DataFrame):
base_eio = base_testfiles_dir / idf_file / "eplusout.eio"
mod_eio = mod_testfiles_dir / idf_file / "eplusout.eio"
if not base_eio.is_file() and not mod_eio.is_file():
print(f"No EIO for {idf_file}")
return (pd.DataFrame(), pd.DataFrame(), pd.DataFrame())
assert base_eio.is_file()
assert mod_eio.is_file()
dfs_base, dfs_str_base = read_eio_tables(eio_path=base_eio)
dfs_mod, dfs_str_mod = read_eio_tables(eio_path=mod_eio)
assert dfs_base.keys() == dfs_mod.keys()
diffs = {}
str_diffs = {}
has_diff = False
for key, df_base in dfs_base.items():
df_mod = dfs_mod[key]
df_str_base = dfs_str_base[key]
df_str_mod = dfs_str_mod[key]
if not df_str_base.equals(df_str_mod):
has_diff = True
df_diff = df_base.compare(df_mod, keep_equal=False, result_names=("base", "mod")).stack(
0, future_stack=True
)
df_str_diff = (
df_str_base.fillna("None")
.compare(df_str_mod.fillna("None"), keep_equal=False, result_names=("base", "mod"))
.stack(0, future_stack=True)
)
diffs[key] = df_diff
str_diffs[key] = df_str_diff.dropna(how="all", axis=0)
if not has_diff:
return (pd.DataFrame(), pd.DataFrame(), pd.DataFrame())
df_diff = pd.concat(diffs, names=["table_name", "key", "column"])
df_str_diff = pd.concat(str_diffs, names=["table_name", "key", "column"])
df_diff = df_diff.swaplevel("key", "column", axis=0)
df_str_diff = df_str_diff.swaplevel("key", "column", axis=0)
df_diff_as_str = df_str_diff.loc[df_diff.index]
return (
df_diff,
df_str_diff,
df_diff_as_str,
)
def run_parallel_diff(
idf_files: str, base_testfiles_dir: Path, mod_testfiles_dir: Path
) -> (pd.DataFrame, pd.DataFrame, pd.DataFrame, list[str]):
dfs_diff = {}
dfs_str_diff = {}
dfs_diff_as_str = {}
failed = []
max_workers = multiprocessing.cpu_count()
with ProcessPoolExecutor(max_workers=max_workers) as executor:
future_to_idf = {
executor.submit(diff_eio, idf_file, base_testfiles_dir, mod_testfiles_dir): idf_file
for idf_file in idf_files
}
with tqdm(total=len(future_to_idf)) as pbar:
for future in as_completed(future_to_idf):
idf_file = future_to_idf[future]
try:
df_diff, df_str_diff, df_diff_as_str = future.result()
if not df_diff.empty:
dfs_diff[idf_file] = df_diff
dfs_str_diff[idf_file] = df_str_diff
dfs_diff_as_str[idf_file] = df_diff_as_str
except Exception as e:
print(f"Failed: {idf_file}")
print(e)
failed.append(idf_file)
pbar.update(1)
return dfs_diff, dfs_str_diff, dfs_diff_as_str, failed
def generate_filter_html_page(
df: pd.DataFrame, html_path: Path, filter_cols: list[str] | None = None, verbose: bool = False
):
# Generate basic styled HTML table
if filter_cols is None:
filter_cols = ["table_name", "equal", "exponential"]
assert isinstance(filter_cols, list)
df_reset = df.reset_index(drop=False)
html_table = df_reset.to_html(classes="display nowrap", index=False, table_id="myTable")
targets = [i for i, c in enumerate(df_reset.columns) if c in filter_cols]
script_filter = f"""
$(document).ready(function() {{
$('#myTable').DataTable({{
paging: false, // disables pagination
// scrollY: false, // optional: avoid scroll area
// scrollCollapse: false,
dom: 'Plfrtip', // Enables SearchPanes (P) and standard layout
searchPanes: {{
cascadePanes: true,
viewTotal: true
}},
columnDefs: [
{{
searchPanes: {{
show: true
}},
targets: {targets}
}},
{{
searchPanes: {{
show: false
}},
targets: '_all'
}}
]
}});
}});
"""
# Embed in a full HTML document with DataTables JS/CSS
html_full = f"""<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>DataFrame Viewer</title>
<!-- DataTables CSS -->
<link rel="stylesheet" type="text/css"
href="https://cdn.datatables.net/1.13.6/css/jquery.dataTables.min.css"/>
<link rel="stylesheet" type="text/css"
href="https://cdn.datatables.net/searchpanes/2.2.0/css/searchPanes.dataTables.min.css"/>
<!-- jQuery -->
<script src="https://code.jquery.com/jquery-3.7.1.js"></script>
<!-- DataTables JS -->
<script src="https://cdn.datatables.net/1.13.6/js/jquery.dataTables.min.js"></script>
<script src="https://cdn.datatables.net/searchpanes/2.2.0/js/dataTables.searchPanes.min.js"></script>
<script src="https://cdn.datatables.net/select/1.7.0/js/dataTables.select.min.js"></script>
<style>
body {{ font-family: Calibri, sans-serif; padding: 2em; }}
table.dataTable thead th {{ background-color: #f2f2f2; }}
div.dt-searchPanes {{ margin-bottom: 20px; }}
</style>
</head>
<body>
<h1>DataFrame Viewer</h1>
{html_table}
<script>
{script_filter}
</script>
</body>
</html>
"""
# Write to file
html_path.write_text(html_full)
if verbose:
print(f"Wrote '{html_path}'")
def full_eio_diff_aggregration(
idf_files: str, base_testfiles_dir: Path, mod_testfiles_dir: Path, verbose: bool = False
) -> (pd.DataFrame, pd.DataFrame, pd.DataFrame, list[str]):
dfs_diff, dfs_str_diff, dfs_diff_as_str, failed = run_parallel_diff(
idf_files=idf_files, base_testfiles_dir=base_testfiles_dir, mod_testfiles_dir=mod_testfiles_dir
)
if failed and verbose:
print(f"The following failed: {failed}")
if not dfs_diff:
print("No differences found.")
return pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), failed
df_diff = pd.concat(dfs_diff)
df_str_diff = pd.concat(dfs_str_diff)
df_diff_as_str = pd.concat(dfs_diff_as_str)
df_diff.index.names = ["idf_file"] + df_diff.index.names[1:]
df_str_diff.index.names = ["idf_file"] + df_str_diff.index.names[1:]
df_diff_as_str.index.names = ["idf_file"] + df_diff_as_str.index.names[1:]
df_diff = df_diff.sort_values(by="base")
df_str_diff = df_str_diff.sort_values(by="base", key=lambda x: x.astype(float).abs())
df_diff_as_str = df_diff_as_str.sort_values(by="base", key=lambda x: x.astype(float).abs())
if verbose:
print(f"df_diff.shape={df_diff.shape}")
df_diff.drop_duplicates(inplace=True)
if verbose:
print(f"df_diff.shape={df_diff.shape}")
if verbose:
print(f"df_str_diff.shape={df_str_diff.shape}")
df_str_diff.drop_duplicates(inplace=True)
if verbose:
print(f"df_str_diff.shape={df_str_diff.shape}")
if verbose:
print(f"df_diff_as_str.shape={df_diff_as_str.shape}")
df_diff_as_str = df_diff_as_str.loc[df_diff.index]
if verbose:
print(f"df_diff_as_str.shape={df_diff_as_str.shape}")
df_diff_as_str.drop_duplicates(inplace=True)
if verbose:
print(f"df_diff_as_str.shape={df_diff_as_str.shape}")
df_diff_as_str["base_value"] = df_diff_as_str["base"].astype(float)
df_diff_as_str["mod_value"] = df_diff_as_str["mod"].astype(float)
df_diff_as_str["equal"] = df_diff_as_str["base_value"] == df_diff_as_str["mod_value"]
df_str_diff["base_value"] = df_str_diff["base"].astype(float)
df_str_diff["mod_value"] = df_str_diff["mod"].astype(float)
df_str_diff["equal"] = df_str_diff["base_value"] == df_str_diff["mod_value"]
df_diff["equal"] = df_diff["base"] == df_diff["mod"]
return df_diff, df_str_diff, df_diff_as_str, failed
def dump_dataframes_to_excel_with_filter(
sheet_name_to_df: dict[str, pd.DataFrame], xlsx_path: Path, verbose: bool = False
):
# Save to Excel with autofilter
with pd.ExcelWriter(xlsx_path, engine="xlsxwriter") as writer:
for sheet_name, df in sheet_name_to_df.items():
df_reset = df.reset_index()
df_reset.to_excel(writer, sheet_name=sheet_name, index=False)
# Access the XlsxWriter workbook and worksheet objects.
workbook = writer.book
worksheet = writer.sheets[sheet_name]
num_index_cols = df.index.nlevels
max_row, max_col = df_reset.shape
header = df_reset.columns.tolist()
# Add an Excel Table starting at A1 (row=0, col=0)
worksheet.add_table(
0,
0,
max_row,
max_col - 1, # + num_index_cols,
{
"columns": [{"header": col} for col in header],
"name": sheet_name.replace(" ", "_"),
"style": "Table Style Medium 9",
},
)
# Freeze panes below header row, after index cols
worksheet.freeze_panes(1, num_index_cols)
# Set former index as bold
bold_centered_format = workbook.add_format(
{
"bold": True,
"align": "center",
"valign": "vcenter",
}
)
# All cells will be centered
centered_format = workbook.add_format(
{
"align": "center",
"valign": "vcenter",
}
)
# Adjust width
for i, col in enumerate(header):
col_values = df_reset.iloc[:, i].astype(str)
max_len = max([len(str(col))] + col_values.map(len).tolist())
cell_format = bold_centered_format if i < num_index_cols else centered_format
worksheet.set_column(i, i, min(max_len + 2, 80), cell_format) # Cap at 80 chars
# Apply autofilter to the header row.
# worksheet.autofilter(0, 0, max_row, max_col - 1 + num_index_cols)
if verbose:
print(f"Wrote '{xlsx_path}'")
def valid_dir(path_str: str) -> Path:
path = Path(path_str)
if not path.is_dir():
raise argparse.ArgumentTypeError(f"'{path}' is not a valid directory")
return path
def parse_args():
parser = argparse.ArgumentParser(description="Diff .eio files between testfiles directories.")
parser.add_argument("--base-testfiles-dir", type=valid_dir, required=True, help="Path to base testfiles directory")
parser.add_argument(
"--mod-testfiles-dir", type=valid_dir, required=True, help="Path to modified testfiles directory"
)
parser.add_argument("--verbose", action="store_true", help="Enable verbose output")
parser.add_argument("--single-file", type=str, metavar="IDF_NAME", help="Only run diff on a single IDF file")
parser.add_argument("--output-html", action="store_true", help="Generate HTML in currenct directory")
parser.add_argument(
"--out-dir", type=Path, default=Path.cwd(), help="Output directory (default: current working directory)"
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
idf_files = []
if args.single_file is None:
idf_files = [x.name for x in args.mod_testfiles_dir.iterdir() if x.is_dir()]
else:
if not (p := args.mod_testfiles_dir / args.single_file).is_dir():
raise IOError(f"Could not find '{p}'")
if not (p := args.prev_testfiles_dir / args.single_file).is_dir():
raise IOError(f"Could not find '{p}'")
idf_files = [args.single_file]
out_dir = args.out_dir
out_dir.mkdir(parents=True, exist_ok=True)
df_diff, df_str_diff, df_diff_as_str, failed = full_eio_diff_aggregration(
idf_files=idf_files,
base_testfiles_dir=args.base_testfiles_dir,
mod_testfiles_dir=args.mod_testfiles_dir,
verbose=args.verbose,
)
if df_diff.empty:
print("No differences found.")
exit(0)
df_str_diff["exponential"] = df_str_diff["base"].str.contains("E")
df_diff_as_str["exponential"] = df_diff_as_str["base"].str.contains("E")
csv_path = (out_dir / "df_str_diff.csv").resolve()
df_str_diff.to_csv(csv_path)
print(f"Wrote '{csv_path}'")
xlsx_path = (out_dir / "all_diffs.xlsx").resolve()
dump_dataframes_to_excel_with_filter(
sheet_name_to_df={
"df_diff": df_diff,
"df_str_diff": df_str_diff,
"df_diff_as_str": df_diff_as_str,
},
xlsx_path=xlsx_path,
verbose=True,
)
print(df_str_diff[~df_str_diff["equal"]].reset_index().to_markdown(index=False))
if args.output_html:
generate_filter_html_page(df=df_diff, html_path=(out_dir / "df_diff.html").resolve(), verbose=args.verbose)
generate_filter_html_page(
df=df_str_diff, html_path=(out_dir / "df_str_diff.html").resolve(), verbose=args.verbose
)
generate_filter_html_page(
df=df_diff_as_str, html_path=(out_dir / "df_diff_as_str.html").resolve(), verbose=args.verbose
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment