Created
March 24, 2017 10:55
-
-
Save mangecoeur/f28feebf30603fe5faba71a8eb8a2f50 to your computer and use it in GitHub Desktop.
Script to archive .pp files to netCDF
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import shutil | |
import tempfile | |
import tarfile | |
from collections import namedtuple | |
from pathlib import Path | |
from enum import IntEnum | |
import numpy as np | |
import netCDF4 as ncdf | |
from dateutil.rrule import rrule, HOURLY | |
class ParameterCodes(IntEnum): | |
temperature = 3236 | |
precipitation = 88888 | |
irradiance = 1235 | |
irradiance_longwave = 2207 | |
humidity = 3245 | |
wind_speed = 99999 | |
PARAMETERS = { | |
3236: { | |
'desc': 'Temperature (Kelvin)', | |
'name': 'temperature', | |
'units': 'K' | |
}, | |
88888: { | |
'desc': 'Precipitation (kg/m^2 s)', | |
'name': 'precipitation', | |
'units': 'kg m-2 s-1' | |
}, | |
1235: { | |
'desc': 'Shortwave radiation (Watts/m^2)', | |
'name': 'irradiance', | |
'units': 'W m-2' | |
}, | |
2207: { | |
'desc': 'Reflected longwave radiation (Watts/m^2)', | |
'name': 'irradiance_longwave', | |
'units': 'W m-2' | |
}, | |
3245: { | |
'desc': 'Relative humidity (%)', | |
'name': 'humidity', | |
'units': '%' | |
}, | |
99999: { | |
'desc': 'Windspeed (m/s)', | |
'name': 'wind_speed', | |
'units': 'm s-1' | |
} | |
} | |
# ideally would expand to include all 64 terms, but can't be arsed... | |
Header = namedtuple('Header', ['year', 'month', 'day', 'hour', 'data_len', 'ny', 'nx', | |
'field_code', 'level', 'p_lat', 'p_lon', 'zy', 'dy', 'zx', 'dx']) | |
def get_pp_header(file): | |
# the first 64 values are the header, which are 32bit integers | |
# so 64 x 4bytes each - read 64x4 bytes from file | |
# WARNING - for some reason needed to add 3 ints. First and last one are 256, not right :/ | |
# I suspect that arrays are padded by 4 bytes at each end. So pad at start and end | |
# of header (2x4 bytes) plus padding at start of data (1x4 bytes) = 3x4bytes extra | |
header_bytes = file.read(67 * 4) | |
# In the fortran code, define header as containing both ints and floats | |
# using equivalence. | |
header = np.frombuffer(header_bytes, dtype='>i', offset=4, count=64) | |
rheader = np.frombuffer(header_bytes, dtype='>f', offset=4, count=64) | |
# from the extract_pp.f90, these should be the bits of the header corresponding to each bit of metadata | |
# Note that we do zero-indexing so everything is a bit off relative to fortran version | |
yyyy = header[0] | |
mm = header[1] | |
dd = header[2] | |
hh = header[3] | |
data_len = header[14] | |
# grid size in positions 18 and 19 | |
ny = header[17] | |
nx = header[18] | |
fieldcode = header[41] | |
level = header[33] | |
# all floats | |
b_plat = rheader[55] | |
b_plon = rheader[56] | |
b_zy = rheader[58] | |
b_dy = rheader[59] | |
b_zx = rheader[60] | |
b_dx = rheader[61] | |
return Header(yyyy, mm, dd, hh, data_len, ny, nx, fieldcode, | |
level, b_plat, b_plon, b_zy, b_dy, b_zx, b_dx) | |
def get_num_hours(header): | |
if header.year % 4 == 0: | |
num_hours = 8784 | |
else: | |
num_hours = 8760 | |
return num_hours | |
def get_units(header): | |
return PARAMETERS[header.field_code]['units'] | |
def get_var_name(header: Header): | |
field_code = header.field_code | |
var_name = PARAMETERS[field_code]['name'] | |
return var_name | |
def get_pp_contents(file, data_len) -> np.ndarray: | |
""" | |
Assume the file has been "reset" since we don't | |
always read the header from it | |
The data is in 32bit floats. | |
:param file: | |
:param data_len: Length of the data, from the header | |
:return: | |
""" | |
file.seek(67 * 4) | |
file_contents = np.fromfile(file, dtype='>f4') | |
return file_contents[:data_len] | |
def init_array(nx, ny, nt): | |
a = np.empty((ny, nx, nt)) | |
a[:] = np.NaN | |
return a | |
def load_single(file_path, year): | |
with file_path.open("rb") as file: | |
header = get_pp_header(file) | |
# Early return if this is thr wrong year! | |
if header.year != year: | |
# return None, None | |
raise ValueError('Wrong year in header: {} given, {} required'.format(header.year, year)) | |
start_date = datetime.datetime(header.year, header.month, header.day, header.hour) | |
day_of_year = start_date.timetuple().tm_yday | |
# Get the index corresponding to file's timestamp (from zero to n_hours_in_year) | |
d_idx = 24 * (day_of_year - 1) + header.hour | |
arr = get_pp_contents(file, header.data_len) | |
arr = arr.reshape((header.ny, header.nx)) | |
return d_idx, arr | |
def load_files_to_array(files, header): | |
# Some files appear to be mixed up with different years. | |
# Use the year from the first file header to filter input files | |
year = header.year | |
num_hours = get_num_hours(header) | |
out = init_array(header.nx, header.ny, num_hours) | |
for file_path in files: | |
d_idx, arr = load_single(file_path, year) | |
if d_idx is not None: | |
out[:, :, d_idx] = arr | |
return out | |
def configure_dset(dset, header, latitudes, longitudes, shape, start_date, units, var_name): | |
dset.title = 'NWP {} data on ~4km grid, non-standard lat/lon'.format(var_name) | |
dset.institution = 'MetOffice' | |
dset.source = 'NWP model' | |
dset.comment = 'Archived from MetOffice .pp files' | |
dset.p_lat = header.p_lat | |
dset.p_lon = header.p_lon | |
dset.zx = header.zx | |
dset.zy = header.zy | |
dset.nx = header.nx | |
dset.ny = header.ny | |
dset.variable = var_name | |
nlat, nlon, num_hours = shape | |
var_dim = dset.createDimension(var_name, None) | |
time = dset.createDimension('time', num_hours) | |
lat = dset.createDimension('lat', nlat) | |
lon = dset.createDimension('lon', nlon) | |
lat_var = dset.createVariable('lat', 'f4', ('lat',)) | |
long_var = dset.createVariable('lon', 'f4', ('lon',)) | |
main_var = dset.createVariable(var_name, 'f4', ('lat', 'lon', 'time',), | |
zlib=True, | |
least_significant_digit=4, # helps compress size! | |
chunksizes=(72, 72, 168) | |
# chunksizes=(nlat, nlon, 1) | |
) | |
main_var.units = units | |
main_var.long_name = var_name | |
main_var.standard_name = var_name | |
# main_var.grid_mapping = 'crs' | |
# main_var.crs = 4326 | |
# work on this later... | |
lat_var.units = 'degrees_north' | |
lat_var.long_name = 'latitudes' | |
lat_var.standard_name = 'latitudes' | |
lat_var[:] = latitudes | |
long_var.units = 'degrees_east' | |
long_var.long_name = 'longitude' | |
long_var.standard_name = 'longitude' | |
long_var[:] = longitudes | |
times = dset.createVariable('time', 'u4', ('time',)) | |
times.units = 'seconds since 1970-1-1' | |
times.calendar = 'gregorian' | |
times.long_name = 'time' | |
times.standard_name = 'time' | |
rd_datetimes = list(rrule(HOURLY, dtstart=start_date, count=num_hours)) | |
times[:] = ncdf.date2num(rd_datetimes, units=times.units) | |
# dset.set_auto_mask(True) | |
return dset | |
def load_array_to_ncdf(outfile_name, data, var_name, header): | |
# data dimension | |
ny = header.ny | |
nx = header.nx | |
num_hours = get_num_hours(header) | |
units = get_units(header) | |
# Start on the first day of the first month, hour zero | |
start_date = datetime.datetime(header.year, 1, 1, 0) | |
# some fiddlyness with getting the right number of points and interval | |
# Generator version is exact translation, but it seems to give the same | |
# result as linspace | |
rlat = np.linspace(header.zy + header.dy, header.zy + ny * header.dy, ny, | |
dtype=np.float32) | |
rlon = np.linspace(header.zx + header.dx, header.zx + nx * header.dx, nx, | |
dtype=np.float32) | |
# longitudes can't be > 360 | |
rlon[rlon > 360] -= 360 | |
shape = (ny, nx, num_hours) | |
with ncdf.Dataset(str(outfile_name), 'w', format='NETCDF4') as dset: | |
print('init netCDF at: ', outfile_name) | |
dset = configure_dset(dset, header, rlat, rlon, shape, start_date, units, var_name) | |
# var_data = dset.variables[var_name] | |
# print('data', var_data[0, 0, 0]) | |
print('adding data to netcdf') | |
dset.variables[var_name][:, :, :] = data | |
def header_from_folder(pp_folder, variable_code=None): | |
if variable_code is not None: | |
glob_pattern = '2*_{}_09999.pp'.format(str(variable_code).zfill(5)) | |
else: | |
glob_pattern = '2*.pp' | |
files = sorted(pp_folder.glob(glob_pattern)) | |
# get metadata from first file | |
with files[0].open("rb") as file: | |
header = get_pp_header(file) | |
return files, header | |
def pp_to_netcdf(pp_folder, output_folder, variable_code=None): | |
""" | |
Archive a folder full of pp files into a single netcdf file. | |
This is generally used to archive hourly files for a given variable for one year | |
into a single netCDF file for that year | |
:param variable_code: | |
:param pp_folder: | |
:return: | |
""" | |
if not isinstance(pp_folder, Path): | |
pp_folder = Path(pp_folder) | |
if not pp_folder.is_dir(): | |
raise ValueError('Given path is not a directory: ', pp_folder) | |
print('Getting file info') | |
files, header = header_from_folder(pp_folder, variable_code) | |
var_name = get_var_name(header) | |
print('loading {} to netcdf'.format(var_name)) | |
print('load data to array') | |
arr_out = load_files_to_array(files, header) | |
print('save array to netcdf') | |
# create .nc file with same name as folder, at same level as folder | |
# outfile_name = Path(folder, '..', var_name + '{}'.format(start_date.year) + '.nc') | |
ncdf_path = output_folder / (var_name + '-{}'.format(header.year) + '.nc') | |
load_array_to_ncdf(ncdf_path, arr_out, var_name, header) | |
print('done') | |
def pp_to_netcdf_lowmem(pp_folder, output_folder, variable_code=None): | |
""" | |
Archive a folder full of pp files into a single netcdf file. | |
This is generally used to archive hourly files for a given variable for one year | |
into a single netCDF file for that year. | |
This alternative implementation uses much less memory by directly writing pp files into | |
the netCDF, instead of loading them all into memory then writing in bulk. | |
However this is much slower - so as long as you have plenty of RAM (about 20GB | |
uncompressed, 12GB for systems with compressed RAM such as OSX) don't use this. | |
:param variable_code: | |
:param pp_folder: | |
:return: | |
""" | |
if not isinstance(pp_folder, Path): | |
pp_folder = Path(pp_folder) | |
if not pp_folder.is_dir(): | |
raise ValueError('Given path is not a directory: ', pp_folder) | |
print('Getting file info') | |
files, header = header_from_folder(pp_folder, variable_code) | |
var_name = get_var_name(header) | |
print('loading {} to netcdf'.format(var_name)) | |
print('load data to array') | |
print('save array to netcdf') | |
# create .nc file with same name as folder, at same level as folder | |
# outfile_name = Path(folder, '..', var_name + '{}'.format(start_date.year) + '.nc') | |
ncdf_path = output_folder / (var_name + '-{}'.format(header.year) + '.nc') | |
print('done') | |
# data dimension | |
ny = header.ny | |
nx = header.nx | |
num_hours = get_num_hours(header) | |
units = get_units(header) | |
# Start on the first day of the first month, hour zero | |
start_date = datetime.datetime(header.year, 1, 1, 0) | |
# some fiddlyness with getting the right number of points and interval | |
# Generator version is exact translation, but it seems to give the same | |
# result as linspace | |
rlat = np.linspace(header.zy + header.dy, header.zy + ny * header.dy, ny, | |
dtype=np.float32) | |
rlon = np.linspace(header.zx + header.dx, header.zx + nx * header.dx, nx, | |
dtype=np.float32) | |
# longitudes can't be > 360 | |
rlon[rlon > 360] -= 360 | |
with ncdf.Dataset(str(ncdf_path), 'w', format='NETCDF4') as dset: | |
print('init netCDF at: ', ncdf_path) | |
dset = configure_dset(dset, header, rlat, rlon, num_hours, start_date, units, var_name) | |
print('adding data to netcdf') | |
for file_path in files: | |
d_idx, arr = load_single(file_path, header.year) | |
if d_idx is not None: | |
dset.variables[var_name][:, :, d_idx] = arr | |
def archive_pp_to_netcdf(archive_path: Path, output_folder, variable_codes=None, extract=True): | |
""" | |
Unfortunately, they lobbed both radiation vars together, a bit annoying... | |
:param archive_path: | |
:param output_folder: | |
:param variable_codes: special case var for when archive contains | |
more than one variable | |
:return: | |
""" | |
# TODO: Need to change this for Legion | |
with tempfile.TemporaryDirectory() as extract_loc: | |
if extract: | |
print('unpacking to: ', extract_loc) | |
shutil.unpack_archive(str(archive_path), extract_loc) | |
else: | |
# ignore the temp file and assume the files are in the folder already | |
print('using existing files in: ', str(archive_path)) | |
extract_loc = archive_path | |
extract_loc = Path(extract_loc) | |
# Some archives contain their data 'flat' (i.e. not in a top-level directory) | |
# while others do have a top-level. Need to work around this. | |
extracted_files = list(extract_loc.glob('*.pp')) | |
if len(extracted_files) > 0: | |
# looks like extract loc now contains the pp files, no additional top-level dir | |
pp_folder = extract_loc | |
else: | |
# NOTE: if extract is true, should not get here | |
# Get the folder name from the archive, to protect against the case where | |
# there is other junk in the archive extract temp location. | |
top_folder = None | |
with tarfile.open(str(archive_path)) as file: | |
for finfo in file: | |
# get the first file name that doesn't start with a dot | |
if not finfo.name.startswith('.'): | |
top_folder = finfo.name | |
break | |
extracted_files = list((extract_loc / top_folder).glob('*.pp')) | |
if len(extracted_files) > 0: | |
pp_folder = extract_loc / top_folder | |
else: | |
pp_folder = extract_loc | |
print('extracted to folder: ', pp_folder) | |
print('converting pp to netcdf') | |
# deal with special case that we get several codes from | |
if variable_codes is not None: | |
# must be a list | |
for variable_code in variable_codes: | |
pp_to_netcdf(pp_folder, output_folder, variable_code=variable_code) | |
# pp_to_netcdf_lowmem(pp_folder, output_folder, variable_code=variable_code) | |
else: | |
pp_to_netcdf(pp_folder, output_folder) | |
# pp_to_netcdf_lowmem(pp_folder, output_folder) | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser(description='Archive metoffice .tar.gz archives of .pp files to netcdf format') | |
parser.add_argument('file_path', type=str, | |
help='path of file to resample') | |
parser.add_argument('output_path', type=str, | |
help='output file path') | |
parser.add_argument('--noextract', action="store_false", default=True) | |
args = parser.parse_args() | |
filepath = Path(args.file_path) | |
output_path = Path(args.output_path) | |
do_extract = args.noextract | |
print("Processing Archive:") | |
print(str(filepath)) | |
print("Saving to folder:") | |
print(str(output_path)) | |
print("Extracting?") | |
print(do_extract) | |
archive_pp_to_netcdf(filepath, output_path, extract=do_extract) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment