Created
July 23, 2022 22:02
-
-
Save rwest/b44e7f171fd727d715bdc67b747339a2 to your computer and use it in GitHub Desktop.
Cantera flux methods
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A bunch of methods that I have used at some point to make, manipulate, add, | |
process, integrate, and plot flux diagrams in Cantera. Taken out of context, | |
and probably not working. But parts of them could be helpful or show one | |
way of doing things, like getting a DIY integrated-total-flux. | |
Richard West <[email protected]> | |
CC-BY attribution suggested :-) | |
""" | |
def get_current_fluxes(self): | |
""" | |
Get all the current fluxes. | |
Returns a dict like: | |
`fluxes['gas']['C'] = ct.ReactionPathDiagram(self.gas, 'C')` etc. | |
""" | |
fluxes = {"gas": dict(), "surf": dict()} | |
for element in "HOCN": | |
fluxes["gas"][element] = ct.ReactionPathDiagram(self.gas, element) | |
fluxes["surf"][element] = ct.ReactionPathDiagram(self.surf, element) | |
element = "X" # just the surface | |
fluxes["surf"][element] = ct.ReactionPathDiagram(self.surf, element) | |
return fluxes | |
def add_fluxes(self): | |
""" | |
Add the current fluxes to the stored totals. | |
""" | |
gas_fluxes = self.total_fluxes["gas"] | |
surf_fluxes = self.total_fluxes["surf"] | |
for element in "HOCN": | |
try: | |
gas_fluxes[element].add(ct.ReactionPathDiagram(self.gas, element)) | |
except KeyError: | |
gas_fluxes[element] = ct.ReactionPathDiagram(self.gas, element) | |
try: | |
surf_fluxes[element].add(ct.ReactionPathDiagram(self.surf, element)) | |
except KeyError: | |
surf_fluxes[element] = ct.ReactionPathDiagram(self.surf, element) | |
# Now do the 'X' for just the surface | |
element = "X" | |
try: | |
surf_fluxes[element].add(ct.ReactionPathDiagram(self.surf, element)) | |
except KeyError: | |
surf_fluxes[element] = ct.ReactionPathDiagram(self.surf, element) | |
def save_flux_data(self, filename_stem=None): | |
""" | |
Save the current and integrated fluxes. | |
Also returns them. | |
""" | |
fluxes = { | |
"current": self.get_current_fluxes(), | |
"integrated": self.total_fluxes, | |
} | |
flux_strings = dict() | |
for flux_type, d1 in fluxes.items(): | |
flux_strings[flux_type] = dict() | |
for phase, d2 in d1.items(): | |
flux_strings[flux_type][phase] = dict() | |
for element, flux in d2.items(): | |
flux_strings[flux_type][phase][element] = flux.get_data() | |
flux_strings[flux_type]["combined"] = dict() | |
flux_strings[flux_type]["combined"]["mass"] = self.combine_fluxes(d1) | |
if filename_stem: | |
path = os.path.join(self.results_directory, filename_stem + ".json") | |
with open(path, "w") as f: | |
json.dump(flux_strings, f) | |
return flux_strings | |
def combine_fluxes(self, fluxes_dict): | |
""" | |
Combined a dict of dicts of flux diagrams into one. | |
Fluxes should be a dict with entries like | |
fluxes['gas']['C'] = ct.ReactionPathDiagram(self.gas, 'C') | |
Returns the flux diagram a string in the format you'd get from | |
ct.ReactionPathdiagram.get_data() | |
""" | |
# getting the entire net rates of the system | |
temp_flux_data = dict() | |
species = set() | |
for element in "HOCN": | |
for phase in ("gas", "surf"): | |
data = fluxes_dict[phase][element].get_data().strip().splitlines() | |
if not data: | |
# eg. if there's no gas-phase reactions involving C | |
continue | |
species.update(data[0].split()) # First line is a list of species | |
for line in data[1:]: # skip the first line | |
s1, s2, fwd, rev = line.split() | |
these_fluxes = np.array([float(fwd), float(rev)]) | |
if all(these_fluxes == 0): | |
continue | |
# multiply by atomic mass of the element | |
these_fluxes *= MOLECULAR_WEIGHTS[element] | |
# for surface reactions, multiply by the catalyst area per volume in a reactor | |
if phase == "surf": | |
these_fluxes *= self.cat_area_per_gas_volume | |
try: | |
# Try adding in this direction | |
temp_flux_data[(s1, s2)] += these_fluxes | |
except KeyError: | |
try: | |
# Try adding in reverse direction | |
temp_flux_data[(s2, s1)] -= these_fluxes | |
except KeyError: | |
# Neither direction there yet, so create in this direction | |
temp_flux_data[(s1, s2)] = these_fluxes | |
output = " ".join(species) + "\n" | |
output += "\n".join( | |
f"{s1} {s2} {fwd} {rev}" for (s1, s2), (fwd, rev) in temp_flux_data.items() | |
) | |
return output | |
def write_flux_dot(flux_data_string, out_file_path, threshold=0.01, title=""): | |
""" | |
Takes a flux data string fromatted as from ct.ReactionPathdiagram.get_data() | |
(or from combine_fluxes) and makes a graphviz .dot file. | |
Fluxes below 'threshold' are not plotted. | |
""" | |
output = ["digraph reaction_paths {", "center=1;"] | |
flux_data = {} | |
species_dict = {} | |
flux_data_lines = flux_data_string.splitlines() | |
species = flux_data_lines[0].split() | |
for line in flux_data_lines[1:]: | |
s1, s2, fwd, rev = line.split() | |
net = float(fwd) + float(rev) | |
if net < 0.0: # if net is negative, switch s1 and s2 so it is positive | |
flux_data[(s2, s1)] = -1 * net | |
else: | |
flux_data[(s1, s2)] = net | |
# renaming species to dot compatible names | |
if s1 not in species_dict: | |
species_dict[s1] = "s" + str(len(species_dict) + 1) | |
if s2 not in species_dict: | |
species_dict[s2] = "s" + str(len(species_dict) + 1) | |
# getting the arrow widths | |
largest_rate = max(flux_data.values()) | |
added_species = {} # dictionary of species that show up on the diagram | |
for (s1, s2), net in flux_data.items(): # writing the node connections | |
flux_ratio = net / largest_rate | |
if abs(flux_ratio) < threshold: | |
continue # don't include the paths that are below the threshold | |
pen_width = ( | |
1.0 - 4.0 * np.log10(flux_ratio / threshold) / np.log10(threshold) + 1.0 | |
) | |
# pen_width = ((net - smallest_rate) / (largest_rate - smallest_rate)) * 4 + 2 | |
arrow_size = min(6.0, 0.5 * pen_width) | |
output.append( | |
f'{species_dict[s1]} -> {species_dict[s2]} [fontname="Helvetica", penwidth={pen_width:.2f}, arrowsize={arrow_size:.2f}, color="0.7, {flux_ratio+0.5:.3f}, 0.9", label="{flux_ratio:0.3g}"];' | |
) | |
added_species[s1] = species_dict[s1] | |
added_species[s2] = species_dict[s2] | |
for ( | |
species, | |
s_index, | |
) in added_species.items(): # writing the species translations | |
output.append(f'{s_index} [ fontname="Helvetica", label="{species}"];') | |
title_string = (r"\l " + title) if title else "" | |
output.append(f' label = "Scale = {largest_rate}{title_string}";') | |
output.append(' fontname = "Helvetica";') | |
output.append("}\n") | |
directory = os.path.split(out_file_path)[0] | |
if directory: | |
os.makedirs(directory, exist_ok=True) | |
with open(out_file_path, "w") as out_file: | |
out_file.write("\n".join(output)) | |
return "\n".join(output) | |
def prettydot(dotfilepath, strip_line_labels=False): | |
""" | |
Make a prettier version of the dot file (flux diagram) | |
Assumes the species pictures are stored in a directory | |
called 'species_pictures' alongside the dot file. | |
""" | |
import os, sys, re | |
import subprocess | |
pictures_directory = os.path.join(os.path.split(dotfilepath)[0], "species_pictures") | |
if strip_line_labels: | |
print("stripping edge (line) labels") | |
# replace this: | |
# s10 [ fontname="Helvetica", label="C11H23J"]; | |
# with this: | |
# s10 [ shapefile="mols/C11H23J.png" label="" width="1" height="1" imagescale=true fixedsize=true color="white" ]; | |
reSize = re.compile('size="5,6"\;page="5,6"') | |
reNode = re.compile( | |
'(?P<node>s\d+)\ \[\ fontname="Helvetica",\ label="(?P<label>[^"]*)"\]\;' | |
) | |
rePicture = re.compile("(?P<smiles>.+?)\((?P<id>\d+)\)\.png") | |
reLabel = re.compile("(?P<name>.+?)\((?P<id>\d+)\)$") | |
species_pictures = dict() | |
for picturefile in os.listdir(pictures_directory): | |
match = rePicture.match(picturefile) | |
if match: | |
species_pictures[match.group("id")] = picturefile | |
else: | |
pass | |
# print(picturefile, "didn't look like a picture") | |
filepath = dotfilepath | |
if not open(filepath).readline().startswith("digraph"): | |
raise ValueError("{0} - not a digraph".format(filepath)) | |
infile = open(filepath) | |
prettypath = filepath + "-pretty" | |
outfile = open(prettypath, "w") | |
for line in infile: | |
(line, changed_size) = reSize.subn('size="12,12";page="12,12"', line) | |
match = reNode.search(line) | |
if match: | |
label = match.group("label") | |
idmatch = reLabel.match(label) | |
if idmatch: | |
idnumber = idmatch.group("id") | |
if idnumber in species_pictures: | |
line = ( | |
f'%s [ image="{pictures_directory}/%s" label="" width="0.5" height="0.5" imagescale=false fixedsize=false color="none" ];\n' | |
% (match.group("node"), species_pictures[idnumber]) | |
) | |
# rankdir="LR" to make graph go left>right instead of top>bottom | |
if strip_line_labels: | |
line = re.sub('label\s*=\s*"\s*[\d.]+"', 'label=""', line) | |
# change colours | |
line = re.sub('color="0.7,\ (.*?),\ 0.9"', r'color="1.0, \1, 0.7*\1"', line) | |
outfile.write(line) | |
outfile.close() | |
infile.close() | |
# print(f"Graph saved to: {prettypath}") | |
print(f"Graph saved to: {pictures_directory}") | |
return prettypath |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment