Skip to content

Instantly share code, notes, and snippets.

@barronh
Created May 23, 2025 20:10
Show Gist options
  • Save barronh/e2892b80c9dedbf4f184edafff6b1410 to your computer and use it in GitHub Desktop.
Save barronh/e2892b80c9dedbf4f184edafff6b1410 to your computer and use it in GitHub Desktop.
Convert a CSV File into a CMAQ-ready point source files.
__all__ = ['open_griddesc', 'csv2inln', 'templatecsv']
__doc__ = """
csv2inln
========
This module is meant to help make CMAQ-ready point source emission files from
a CSV file. By default, the emissions are time-independent for a day. However,
a tfactor_utc variable can be supplied to add a diurnal scaling factor.
Prerequisites
-------------
python >= 3.9
numpy
pandas
netcdf4
xarray
pyproj
Install with the command:
```bash
pip install numpy pandas netcdf4 xarray pyproj
```
Example
-------
Create a CMAQ-ready emission file with one point that emits
```python
from csv2inln import csv2inln
import pandas as pd
# Generally, you would make the CSV separately.
csvpath = 'path/to/test.csv'
# Here, I am loading it inline
csvpath = pd.DataFrame.from_records([
{
'LATITUDE(degrees)': 28.2, 'LONGITUDE(degrees)': 83.6,
'STKDM(m)': 1., 'STKHT(m)': 37., 'STKTK(degrees K)': 335.15, 'STKVE(m/s)': 2.2,
'PEC(g/s)': 0.09,'CO(moles/s)': 0.02
},
{
'LATITUDE(degrees)': 30.2, 'LONGITUDE(degrees)': 85.6,
'STKDM(m)': 1., 'STKHT(m)': 37., 'STKTK(degrees K)': 335.15, 'STKVE(m/s)': 2.2,
'PEC(g/s)': 0.09,'CO(moles/s)': 0.02
},
])
# Define the output paths
stkpath = 'stkprops.nc'
inlnpath = 'eminln.nc'
# Define the GRID (see GRIDDESC)
gdattrs = {
"P_ALP": 1.0, "P_BET": 45.0, "P_GAM": -98.0, "XCENT": -98.0, "YCENT": 90.0,
"GDTYP": 6, "GDNAM": "36ASIA1", "NCOLS": 187, "NROWS": 187,
"XORIG": -4302000.0, "YORIG": 3366000.0, "XCELL": 36000.0, "YCELL": 36000.0,
"SDATE": 2025001, "STIME": 0, "TSTEP": 10000, # used to define emission times
}
# Process the data
csv2inln(csvpath, stkpath, inlnpath, gdattrs)
```
"""
import os
import numpy as np
import pyproj
import netCDF4
import pandas as pd
import xarray as xr
def open_griddesc(attrs, earth_radius=6370000.):
"""
Create a proj4 formatted grid definition using IOAPI attrs and earth_radius
Arguments
---------
attrs : dict-like
Mappable of IOAPI properties that supports the items method
earth_radius : float
Assumed radius of the earth. 6370000 is the WRF default.
Returns
-------
ds : xarray.Dataset
"""
props = {k: v for k, v in attrs.items()}
props['x_0'] = -props['XORIG']
props['y_0'] = -props['YORIG']
props.setdefault('earth_radius', earth_radius)
if props['GDTYP'] == 1:
projstr = '+proj=lonlat +R={earth_radius}'.format(**props)
elif props['GDTYP'] == 2:
projstr = (
'+proj=lcc +lat_1={P_ALP} +lat_2={P_BET} +lat_0={YCENT}'
' +lon_0={XCENT} +R={earth_radius} +x_0={x_0} +y_0={y_0}'
' +to_meter={XCELL} +no_defs'
).format(**props)
elif props['GDTYP'] == 6:
projstr = (
'+proj=stere +lat_0={lat_0} +lat_ts={P_BET} +lon_0={XCENT}'
' +x_0={x_0} +y_0={y_0} +R={earth_radius} +to_meter={XCELL}'
' +no_defs'
).format(lat_0=props['P_ALP'] * 90, **props)
elif props['GDTYP'] == 7:
projstr = (
'+proj=merc +R={earth_radius} +lat_ts=0 +lon_0={XCENT}'
' +x_0={x_0} +y_0={y_0} +to_meter={XCELL}'
' +no_defs'
).format(**props)
else:
raise ValueError('GDTYPE {GDTYP} not implemented'.format(**props))
ds = xr.Dataset(attrs=attrs)
ds.attrs['crs_proj4'] = projstr
return ds
def template(nstk, path, keyprops, attrs):
"""
Arguments
---------
nstk : int
Number of stacks in the file
path : str
Where to save the file
keyprops : list
Properties of attributes to add
[
{dtype: 'f', long_name: 'ISTACK ', units='none ', var_desc='Stack groups...'},
...
{'dtype': 'i', 'long_name': 'LPING ', 'units': 'none ', 'var_desc': '1=PING SOURCE in domain, 0=otherwise '}
]
attrs : dict
File level properties to override defaults.
Returns
-------
f : netCDF4.Dataset
Open dataset to add values to
"""
varlist = ''.join([p['long_name'].ljust(16)[:16] for p in keyprops])
nvar = len(keyprops)
# Open file
f = netCDF4.Dataset(path, mode='w', format='NETCDF3_CLASSIC')
# Define dimensions
f.createDimension('TSTEP', None)
f.createDimension('DATE-TIME', 2)
f.createDimension('LAY', 1)
f.createDimension('VAR', nvar)
f.createDimension('ROW', nstk)
f.createDimension('COL', 1)
# Add dummy properties dimensions
f.setncattr('IOAPI_VERSION', 'ioapi-3.2: $Id: init3.F90 247 2023-03-22 15:59:19Z coats $'.ljust(80))
f.setncattr('EXEC_ID', '????????????????'.ljust(80))
f.setncattr('FTYPE', 1)
f.setncattr('CDATE', 2024205)
f.setncattr('CTIME', 160925)
f.setncattr('WDATE', 2024205)
f.setncattr('WTIME', 160925)
f.setncattr('SDATE', 2022001)
f.setncattr('STIME', 0)
f.setncattr('TSTEP', 10000)
f.setncattr('NTHIK', 1)
f.setncattr('NCOLS', 1)
f.setncattr('NROWS', nstk)
f.setncattr('NLAYS', 1)
f.setncattr('NVARS', nvar)
f.setncattr('VGTYP', -9999)
f.setncattr('VGTOP', -9e+36)
f.setncattr('VGLVLS', np.array([0., 0.], dtype='f'))
f.setncattr('UPNAM', 'CSV2INLN ')
f.setncattr('VAR-LIST', varlist)
f.setncattr('FILEDESC', 'Point source stack groups file'.ljust(80 * 60)[:80 * 60])
f.setncattr('HISTORY', ''.ljust(80 * 60)[:80 * 60])
# overwrite defaults
for pk, pv in attrs.items():
f.setncattr(pk, pv)
f.setncattr('NCOLS', 1)
f.setncattr('NROWS', nstk)
# Define variables
TFLAG = f.createVariable('TFLAG', 'i', ('TSTEP', 'VAR', 'DATE-TIME'))
TFLAG.setncattr('units', '<YYYYDDD,HHMMSS>')
TFLAG.setncattr('long_name', 'TFLAG ')
TFLAG.setncattr('var_desc', 'Timestep-valid flags: (1) YYYYDDD or (2) HHMMSS ')
for keyprop in keyprops:
keyprop = {k: v for k, v in keyprop.items()}
key = keyprop['long_name'].strip()
dtype = keyprop.pop('dtype')
v = f.createVariable(key, dtype, ('TSTEP', 'LAY', 'ROW', 'COL'))
for pk, pv in keyprop.items():
v.setncattr(pk, pv)
f['TFLAG'][0, :, 0] = attrs['SDATE']
f['TFLAG'][0, :, 1] = attrs['STIME']
return f
# known variables requried by CMAQ stack_props file
_stkprops = [
{'dtype': 'i', 'long_name': 'ISTACK ', 'units': 'none ', 'var_desc': 'Stack group number'.ljust(80)},
{'dtype': 'f', 'long_name': 'LATITUDE ', 'units': 'degrees ', 'var_desc': 'Latitude'.ljust(80)},
{'dtype': 'f', 'long_name': 'LONGITUDE ', 'units': 'degrees ', 'var_desc': 'Longitude'.ljust(80)},
{'dtype': 'f', 'long_name': 'STKDM ', 'units': 'm ', 'var_desc': 'Inside stack diameter'.ljust(80)},
{'dtype': 'f', 'long_name': 'STKHT ', 'units': 'm ', 'var_desc': 'Stack height above ground surface'.ljust(80)},
{'dtype': 'f', 'long_name': 'STKTK ', 'units': 'degrees K ', 'var_desc': 'Stack exit temperature'.ljust(80)},
{'dtype': 'f', 'long_name': 'STKVE ', 'units': 'm/s ', 'var_desc': 'Stack exit velocity'.ljust(80)},
{'dtype': 'f', 'long_name': 'STKFLW ', 'units': 'm**3/s ', 'var_desc': 'Stack exit flow rate'.ljust(80)},
{'dtype': 'i', 'long_name': 'STKCNT ', 'units': 'none ', 'var_desc': 'Number of stacks in group'.ljust(80)},
{'dtype': 'i', 'long_name': 'ROW ', 'units': 'none ', 'var_desc': 'Grid row number'.ljust(80)},
{'dtype': 'i', 'long_name': 'COL ', 'units': 'none ', 'var_desc': 'Grid column number'.ljust(80)},
{'dtype': 'f', 'long_name': 'XLOCA ', 'units': 'm ', 'var_desc': 'Projection x coordinate'.ljust(80)},
{'dtype': 'f', 'long_name': 'YLOCA ', 'units': 'm ', 'var_desc': 'Projection y coordinate'.ljust(80)},
{'dtype': 'i', 'long_name': 'IFIP ', 'units': 'none ', 'var_desc': 'FIPS CODE'.ljust(80)},
{'dtype': 'i', 'long_name': 'LMAJOR ', 'units': 'none ', 'var_desc': '1= MAJOR SOURCE in domain, 0=otherwise'.ljust(80)},
{'dtype': 'i', 'long_name': 'LPING ', 'units': 'none ', 'var_desc': '1=PING SOURCE in domain, 0=otherwise'.ljust(80)}
]
def templatecsv(csvpath):
"""
Create a template to be filled out by the user.
Arguments
---------
csvpath : str
Path for a template csv to be created for you to fill out.
Returns
-------
None
"""
if os.path.exists(csvpath):
raise IOError(f'{csvpath} exists; delete to remake')
stkcols = [sp['long_name'].strip() + '(' + sp['units'].strip() + ')' for sp in _stkprops]
ecols = ['CO(moles/s)', 'PEC(g/s)', 'Add more columns(g/s)']
with open(csvpath, 'w') as cf:
cf.write(
','.join(stkcols + ecols)
)
print(f'INFO :: {csvpath} created')
print('INFO :: Fill in columns with the following:')
for sp in _stkprops:
print('INFO :: - ' + sp['long_name'].strip() + '(' + sp['units'].strip() + '):', sp['var_desc'].strip())
print('WARN :: ROW, COL, XLOCA, YLOCA, IFIP, LMAJOR, LPING are all optional.')
print('WARN :: do not leave blank; remove columns if not specified')
def csv2inln(csvpath, stkpath, inlnpath, gdattrs, tfactor_utc=None):
"""
Convert CSV to stack
Arguments
---------
csvpath : str
Path for input, which must have columns:
- LATITUDE(degrees), LONGITUDE(degrees) : stack 2d location in WGS84
- STKDM(m), STKHT(m) : stack diameter (m) and height (m)
- STKTK(K), STKVE(m/s) : stack temperature (K) and velocity (m/s)
Optional properties:
- ISTACK(none), STKCNT(none) : stack group number (none) and stacks in group (none)
- XLOCA(m), YLOCA(m), ROW(none), COL(none): defaults based on GDNAM and lat/lon
- IFIP(none), LMAJOR(none), LPING(none) : default to 0
Optional emission rates:
- CO(moles/s), PEC(g/s) : these are just examples of the two units expected.
stkpath : str
Path to save stack properties
inlnpath : str
Path to save inline emissions
gdattrs : dict
Grid attributes sufficient to define an IOAPI grid. SDATE, STIME, and
TSTEP should also be provided.
tfactor_utc : xarray.DataArray
Must have dimensions ('time',) or ('time', 'stack'). Defaults to
('time',) with 1s for 25 steps.
Returns
-------
None
"""
if os.path.exists(stkpath) or os.path.exists(inlnpath):
raise IOError(f'{stkpath} or {inlnpath} exist; delete to remake')
if tfactor_utc is None:
tfactor_utc = np.ones(25, dtype='f')
if not isinstance(tfactor_utc, xr.DataArray):
dims = ('time',)
if len(tfactor_utc.shape) == 2:
print(f'WARN :: assuming {tfactor_utc.shape} are (time, stack)')
dims = ('time', 'stack')
tfactor_utc = xr.DataArray(tfactor_utc, dims=dims)
gf = open_griddesc(gdattrs)
proj = pyproj.Proj(gf.crs_proj4)
if isinstance(csvpath, pd.DataFrame):
cdf = csvpath
csvpath = '<inline>'
else:
cdf = pd.read_csv(csvpath)
nstk = cdf.shape[0]
keyunits = [k.partition('(')[::2] for k in cdf.columns]
emisunits = [(k, u[:-1]) for k, u in keyunits if u.strip() in ('g/s)', 'moles/s)')]
propunits = [(k, u[:-1]) for k, u in keyunits if u.strip() not in ('g/s)', 'moles/s)')]
stkf = template(nstk, stkpath, _stkprops, gf.attrs)
now = pd.to_datetime('now', utc=True)
fdesc = f'Stack properties loaded for {nstk} from {csvpath} on {now:%Y-%m-%dT%H:%M:%SZ}'
stkf.setncattr('FILEDESC', fdesc.ljust(80 * 60)[:80 * 60])
vshape = stkf['LONGITUDE'].shape
stkf['ISTACK'][:] = (np.arange(nstk) + 1).reshape(vshape)
stkf['STKCNT'][:] = 1
stkf['IFIP'][:] = 0
stkf['LMAJOR'][:] = 1
stkf['LPING'][:] = 0
x, y = proj(cdf['LONGITUDE(degrees)'], cdf['LATITUDE(degrees)'])
stkf['ROW'][:] = (y.astype('i') + 1).reshape(vshape)
stkf['COL'][:] = (x.astype('i') + 1).reshape(vshape)
stkf['XLOCA'][:] = (x * gf.attrs['XCELL'] + gf.attrs['XORIG']).reshape(vshape)
stkf['YLOCA'][:] = (y * gf.attrs['YCELL'] + gf.attrs['YORIG']).reshape(vshape)
for k, u in propunits:
if k not in stkf.variables:
print(f'WARN :: not using {k}')
continue
print(f'INFO :: {k}')
du = stkf[k].units
if du.strip().lower() != u.strip().lower():
raise ValueError(f'FAIL:: "{du}" != "{u}"')
stkf[k][:] = cdf[f'{k}({u})'].values.reshape(vshape) # natively in appropriate units
stkf['STKFLW'][:] = np.pi * stkf['STKVE'][:] * (stkf['STKDM'][:] / 2)**2
inlnprops = [
{'dtype': 'f', 'long_name': k.ljust(16), 'units': u.ljust(16), 'var_desc': k.ljust(80)}
for k, u in emisunits
]
inlnf = template(nstk, inlnpath, inlnprops, gf.attrs)
sdate = pd.to_datetime(gf.attrs['SDATE'] * 1000000 + gf.attrs['STIME'], format='%Y%j%H%M%S')
dates = pd.date_range(sdate, periods=25, freq='1h')
jdate = dates.strftime('%Y%j').astype('i').values[:, None, None]
hhmmss = dates.strftime('%H%M%S').astype('i').values[:, None, None]
nt = len(dates)
inlnf['TFLAG'][:nt, :, 0] = jdate
inlnf['TFLAG'][:nt, :, 1] = hhmmss
vshape = inlnf[emisunits[0][0]].shape
for k, u in emisunits:
if k not in inlnf.variables:
print(f'WARN :: not using {k}')
continue
du = inlnf[k].units
if du.strip().lower() != u.strip().lower():
raise ValueError(f'FAIL:: "{du}" != "{u}"')
vals = tfactor_utc * xr.DataArray(cdf[f'{k}({u})'].values, dims=('stack',))
inlnf[k][:nt] = vals.values[:, None, :, None] # natively in appropriate units
if __name__ == '__main__':
gdnam = '36NHEMI2'
base = 'brick_kiln_data_nepal'
csvpath = f'{base}.csv'
stkpath = f'stack_props_{base}_{gdnam}.nc'
inlnpath = f'emis_inln_{base}_{gdnam}.nc'
gdattrs = {
"P_ALP": 1.0, "P_BET": 45.0, "P_GAM": -98.0, "XCENT": -98.0, "YCENT": 90.0,
"GDTYP": 6, "GDNAM": gdnam, "NCOLS": 561, "NROWS": 561,
"XORIG": -10098000., "YORIG": -10098000., "XCELL": 36000.0, "YCELL": 36000.0,
"SDATE": 2025001, "STIME": 0, "TSTEP": 10000, # used to define emission times
}
csv2inln(csvpath, stkpath, inlnpath, gdattrs)
gdnam = '36ASIA1'
stkpath = f'stack_props_{base}_{gdnam}.nc'
inlnpath = f'emis_inln_{base}_{gdnam}.nc'
gdattrs = {
"P_ALP": 1.0, "P_BET": 45.0, "P_GAM": -98.0, "XCENT": -98.0, "YCENT": 90.0,
"GDTYP": 6, "GDNAM": gdnam, "NCOLS": 250, "NROWS": 185,
"XORIG": -4302000.0, "YORIG": 3366000.0, "XCELL": 36000.0, "YCELL": 36000.0,
"SDATE": 2025001, "STIME": 0, "TSTEP": 10000, # used to define emission times
}
csv2inln(csvpath, stkpath, inlnpath, gdattrs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment