Created
June 5, 2025 01:19
-
-
Save haberda/9d549c458df841af14725e00922d0d9e to your computer and use it in GitHub Desktop.
Contains the code required to implement spatial inversion for aerial gamma-ray data.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
import rasterio | |
from rasterio.transform import from_origin | |
from rasterio.warp import calculate_default_transform, reproject, Resampling | |
from pyproj import Transformer | |
import scipy.sparse as sp | |
from scipy.optimize import minimize, Bounds | |
from matplotlib.path import Path | |
from scipy.spatial import ConvexHull | |
from tqdm import tqdm | |
import matplotlib.pyplot as plt | |
import time | |
# If we want to try CUDA we could implement Cg and possibly oothers | |
# import cupy as cp | |
# from cupyx.scipy.sparse import csr_matrix, vstack, coo_matrix, diags | |
# from cupyx.scipy.sparse.linalg import cg, LinearOperator, lsqr, lsmr | |
# ---------------------- | |
# 1. Load and reproject observation data | |
# ---------------------- | |
def read_observations(obs_file, cols, src_crs, utm_crs, alt_in_ft=False): | |
""" | |
Load observation CSV, reproject to UTM, compute or read variance, | |
and convert altitude from feet to meters if needed. | |
""" | |
df = pd.read_csv(obs_file) | |
# Transform coordinates to UTM | |
transformer = Transformer.from_crs(src_crs, utm_crs, always_xy=True) | |
xs, ys = transformer.transform(df[cols['x']], df[cols['y']]) | |
# Extract measurement values | |
d = df[cols['value']].to_numpy() | |
# Estimate variance if not provided | |
if 'var' in cols and cols['var'] in df.columns: | |
var = df[cols['var']].to_numpy() | |
else: | |
var = np.maximum(d, 1.0) # assume Poisson error if missing | |
# Convert altitude if needed | |
h = df[cols['alt']].to_numpy() | |
if alt_in_ft: | |
h *= 0.3048 | |
return xs, ys, d, var, h | |
# ---------------------- | |
# 2. Load and reproject DEM to target CRS | |
# ---------------------- | |
def read_dem(dem_file, utm_crs): | |
""" | |
Load DEM GeoTIFF and reproject to desired CRS if needed. | |
Returns flattened coordinate and elevation arrays. | |
""" | |
src = rasterio.open(dem_file) | |
if src.crs.to_string() != utm_crs: | |
# Reproject DEM | |
transform, width, height = calculate_default_transform( | |
src.crs, utm_crs, src.width, src.height, *src.bounds) | |
profile = src.meta.copy() | |
profile.update({'crs': utm_crs, 'transform': transform, | |
'width': width, 'height': height}) | |
arr = np.empty((height, width), dtype=src.dtypes[0]) | |
reproject(source=rasterio.band(src, 1), destination=arr, | |
src_transform=src.transform, src_crs=src.crs, | |
dst_transform=transform, dst_crs=utm_crs, | |
resampling=Resampling.bilinear) | |
memfile = rasterio.io.MemoryFile() | |
dem = memfile.open(**profile) | |
dem.write(arr, 1) | |
else: | |
dem = src | |
# Extract DEM coordinates | |
# arr = dem.read(1) | |
# rows, cols = np.indices(arr.shape) | |
# xs, ys = rasterio.transform.xy(dem.transform, rows, cols) | |
return dem#, np.array(xs).ravel(), np.array(ys).ravel(), arr.ravel() | |
# ---------------------- | |
# 3. Generate solution grid | |
# ---------------------- | |
def solution_grid(ob_x, ob_y, dem, buffer_m, cell_size): | |
""" | |
Parameters | |
---------- | |
ob_x : np 1d array | |
Observed X values. | |
ob_y : np 1d array | |
Observed Y values. | |
buffer_m : number | |
Buffer around the solution grid in m. | |
cell_size : number | |
The desired cell size for the solution grid. | |
Returns | |
------- | |
X.ravel() | |
np array containing sol grid X values. | |
Y.ravel() | |
np array containing sol grid Y values. | |
Z.ravel() | |
np array containing sol grid Z values sampled from the DEM. | |
nx : number | |
How many cells in the X direction. | |
ny : number | |
How many cells in the Y direction. | |
""" | |
xmin, ymin, xmax, ymax = compute_grid_bounds(ob_x, ob_y, buffer_m=buffer_m) | |
nx = int((xmax - xmin) // cell_size) | |
ny = int((ymax - ymin) // cell_size) | |
xs = np.linspace(xmin + cell_size/2, xmax - cell_size/2, nx) | |
ys = np.linspace(ymin + cell_size/2, ymax - cell_size/2, ny) | |
X, Y = np.meshgrid(xs, ys) | |
return X.ravel(), Y.ravel(), sample_dem_heights(dem, X.ravel(), Y.ravel()), nx, ny | |
def compute_grid_bounds(ob_x, ob_y, buffer_m=200): | |
""" | |
Compute model bounds around observations with a buffer (in meters). | |
""" | |
xmin, xmax = ob_x.min() - buffer_m, ob_x.max() + buffer_m | |
ymin, ymax = ob_y.min() - buffer_m, ob_y.max() + buffer_m | |
return xmin, ymin, xmax, ymax | |
def sample_dem_heights(dem, x_coords, y_coords): | |
""" | |
Sample DEM at arbitrary (x, y) coordinates using rasterio.sample(). | |
Returns array of sampled terrain heights (same shape as input). | |
""" | |
coords = list(zip(x_coords, y_coords)) | |
zs = np.array([val[0] for val in dem.sample(coords)]) | |
return zs | |
# ---------------------- | |
# 4. Forward models | |
# ---------------------- | |
def generate_triangle_mesh(X, Y, Z, nx, ny): | |
""" | |
Convert raveled (1D) structured grid coordinates into a triangular mesh. | |
Parameters | |
---------- | |
X, Y, Z : 1D np.ndarray | |
Flattened meshgrid coordinates, each of length nx * ny. | |
nx, ny : int | |
Number of points in x and y directions (grid shape is (ny, nx)). | |
Returns | |
------- | |
mesh : 'triangles': (2*num_cells, 3, 3) array of triangle vertices | |
""" | |
def expand_by_interpolation(m): | |
ny, nx = m.shape | |
m_expanded = np.zeros((ny + 1, nx + 1)) | |
# Copy the original data | |
m_expanded[:ny, :nx] = m | |
# Interpolate last column (excluding corner) | |
m_expanded[:ny, nx] = m[:, -1] + (m[:, -1] - m[:, -2]) | |
# Interpolate last row (excluding corner) | |
m_expanded[ny, :nx] = m[-1, :] + (m[-1, :] - m[-2, :]) | |
# Bottom-right corner (bilinear extrapolation) | |
m_expanded[ny, nx] = ( | |
m[-1, -1] | |
+ (m[-1, -1] - m[-2, -1]) | |
+ (m[-1, -1] - m[-1, -2]) | |
- (m[-2, -2] - m[-1, -2] - m[-2, -1] + m[-1, -1]) | |
) | |
return m_expanded | |
# Reshape from raveled (1D) to 2D (ny rows, nx cols) | |
X2D = X.reshape((ny, nx)) | |
Y2D = Y.reshape((ny, nx)) | |
Z2D = Z.reshape((ny, nx)) | |
# Triangle creation requires slicing off the last row and col, this creates other problems, for now interpolate and create a fake row and col to be sacrificed | |
X2D, Y2D, Z2D = expand_by_interpolation(X2D), expand_by_interpolation(Y2D), expand_by_interpolation(Z2D) | |
# Slice each corner of the grid cell | |
X1, Y1, Z1 = X2D[:-1, :-1], Y2D[:-1, :-1], Z2D[:-1, :-1] # lower-left | |
X2, Y2, Z2 = X2D[:-1, 1:], Y2D[:-1, 1:], Z2D[:-1, 1:] # lower-right | |
X3, Y3, Z3 = X2D[1:, :-1], Y2D[1:, :-1], Z2D[1:, :-1] # upper-left | |
X4, Y4, Z4 = X2D[1:, 1:], Y2D[1:, 1:], Z2D[1:, 1:] # upper-right | |
# Form 3D coordinates for each triangle vertex (flattened) | |
A1 = np.column_stack((X1.ravel(), Y1.ravel(), Z1.ravel())) | |
B1 = np.column_stack((X2.ravel(), Y2.ravel(), Z2.ravel())) | |
C1 = np.column_stack((X3.ravel(), Y3.ravel(), Z3.ravel())) | |
A2 = B1 | |
B2 = np.column_stack((X4.ravel(), Y4.ravel(), Z4.ravel())) | |
C2 = C1 | |
# Stack triangles | |
triangle1 = np.stack((A1, B1, C1), axis=1) | |
triangle2 = np.stack((A2, B2, C2), axis=1) | |
all_triangles = np.concatenate((triangle1, triangle2), axis=0) | |
return { | |
'triangles': all_triangles, | |
'triangle1': triangle1, | |
'triangle2': triangle2 | |
} | |
def triangle_geometry(triangles): | |
""" | |
Compute surface normals, centroids, areas, and normalized area magnitudes. | |
Parameters | |
---------- | |
triangles : np.ndarray of shape (N, 3, 3) | |
Array of N triangles. Each triangle is defined by 3 vertices in 3D: | |
[[x1, y1, z1], [x2, y2, z2], [x3, y3, z3]]. | |
Returns | |
------- | |
geometry : dict with: | |
- 'normals': (N, 3) unit normal vectors | |
- 'centroids': (N, 3) average of the 3 vertices | |
- 'areas': (N,) surface area of each triangle | |
- 'area_normalized': (N,) area scaled to [0, 1] | |
""" | |
A = triangles[:, 0, :] | |
B = triangles[:, 1, :] | |
C = triangles[:, 2, :] | |
# Compute two edges for each triangle | |
AB = B - A | |
AC = C - A | |
# Cross product gives normal vector | |
normals = np.cross(AB, AC) | |
norm_mag = np.linalg.norm(normals, axis=1) | |
# Normalize the normals | |
unit_normals = np.divide(normals, norm_mag[:, np.newaxis], | |
out=np.zeros_like(normals), | |
where=norm_mag[:, np.newaxis] != 0) | |
# Centroids: average of the 3 vertices | |
centroids = (A + B + C) / 3.0 | |
return { | |
'normals': unit_normals, | |
'centroids': centroids, | |
} | |
# ---------------------- | |
# Triangle-based sensitivity matrix builder | |
def build_triangle_sensitivity(ob_x, ob_y, ob_h, X, Y, Z, nx, ny, mu, max_dist=None, normalize=False): | |
triangles = generate_triangle_mesh(X, Y, Z, nx, ny) | |
triangle_geo_1 = triangle_geometry(triangles['triangle1']) | |
triangle_geo_2 = triangle_geometry(triangles['triangle2']) | |
""" | |
Build forward model G using triangle kernel. | |
Each grid cell contributes two triangles; contributions are summed per cell. | |
""" | |
detector_pos = np.column_stack((ob_x, ob_y, ob_h)) | |
G = sp.csr_matrix(triangle_kernel_multi(triangle_geo_1['centroids'], triangle_geo_1['normals'], detector_pos, mu) + triangle_kernel_multi(triangle_geo_2['centroids'], triangle_geo_2['normals'], detector_pos, mu)) | |
if normalize: | |
G = normalize_fm(G,ob_h) | |
return G | |
# ---------------------- | |
# Triangle-based forward model kernel | |
def triangle_kernel_multi(triangle_centroids, triangle_normals, detector_pos, mu, max_dist=None): | |
""" | |
Compute contributions from triangle surfaces to multiple detectors. | |
Parameters | |
---------- | |
triangle_centroids : (N, 3) | |
Centroid of each triangle. | |
triangle_normals : (N, 3) | |
Unit normal vector of each triangle. | |
detector_pos : (M, 3) | |
Positions of M detectors. | |
mu : float | |
Attenuation coefficient. | |
max_dist: float | |
The maximum distance to calculate the kernel for each observation | |
Returns | |
------- | |
contributions : (M, N) | |
Each row corresponds to a detector, each column to a pair of triangles. | |
""" | |
if max_dist is None: | |
max_dist = 1000 | |
# Shape: (M, 1, 3) - (1, N, 3) → (M, N, 3) | |
v = detector_pos[:, None, :] - triangle_centroids[None, :, :] # (M, N, 3) | |
r = np.linalg.norm(v, axis=2) # (M, N) | |
v_hat = v / np.maximum(r[..., None], 1e-8) # (M, N, 3) | |
# Use pre-normalized triangle normals | |
n_hat = triangle_normals[None, :, :] # (1, N, 3) | |
# Cosine between v_hat and n_hat (dot product) | |
cosine = np.sum(v_hat * n_hat, axis=2) # (M, N) | |
# Raw dot product for kernel | |
dot_vn = np.sum(v * triangle_normals[None, :, :], axis=2) # (M, N) | |
# Contribution formula: same as single-detector case, vectorized | |
contrib = (dot_vn / np.maximum(r**3, 1e-8)) * np.exp(-mu * r) | |
# Filter for back-facing triangles (cosine <= 0) | |
contrib[cosine <= 0] = 0.0 | |
if max_dist is not None: | |
contrib[r>max_dist] = 0.0 | |
return contrib | |
# ---------------------- | |
# Simple forward model | |
# ---------------------- | |
def build_sensitivity(ob_x, ob_y, ob_h, prism_x, prism_y, prism_z, cell_size, mu, normalize=False, integration_pts=4, terrain_subsample=False, dem=None): | |
""" | |
Vectorized sparse sensitivity matrix G (obs x cells). | |
Uses precomputed cell corner offsets and vectorized kernel evaluation. | |
""" | |
Nobs = len(ob_x) | |
Ncells = len(prism_x) | |
# Precompute corner offsets for each subcell | |
offsets = np.array([[-.5, -.5], [-.5, .5], [.5, -.5], [.5, .5]]) * cell_size | |
# Compute full set of subcell coordinates (Ncells x 4 x 2) | |
cx = prism_x[:, None] + offsets[:, 0] | |
cy = prism_y[:, None] + offsets[:, 1] | |
if terrain_subsample and dem is not None: | |
# DEM-sampled terrain at subcell corners (Ncells x 4) | |
flat_cx = cx.ravel() | |
flat_cy = cy.ravel() | |
zs_corner = sample_dem_heights(dem, flat_cx, flat_cy).reshape(Ncells, 4) | |
else: | |
# Use prism_z as terrain height for all corners | |
zs_corner = np.repeat(prism_z[:, None], 4, axis=1) | |
rows, cols, vals = [], [], [] | |
for i in tqdm(range(Nobs), desc='Building G'): | |
dx = ob_x[i] - cx # shape (Ncells, 4) | |
dy = ob_y[i] - cy | |
h_eff = ob_h[i] - zs_corner # shape (Ncells, 4) | |
r2 = h_eff**2 + dx**2 + dy**2 | |
kernel_vals = np.exp(-mu * np.sqrt(r2)) / (r2 ** 1.5) | |
I = np.sum(kernel_vals, axis=1) * (cell_size**2 / 4.0) | |
valid = I > 0 | |
rows.extend([i] * np.count_nonzero(valid)) | |
cols.extend(np.where(valid)[0]) | |
vals.extend(I[valid]) | |
G = sp.csr_matrix((vals, (rows, cols)), shape=(Nobs, Ncells)) | |
if normalize: | |
G = normalize_fm(G,ob_h) | |
return G | |
# ---------------------- | |
def kernel_f(x, y, h, mu): | |
""" | |
Radiation transport kernel: exponential attenuation and geometric spread. | |
""" | |
r2 = h*h + x*x + y*y | |
return np.exp(-mu * np.sqrt(r2)) / (r2 ** 1.5) | |
def normalize_fm(G): | |
row_sums = np.array(G.sum(axis=1)).flatten() | |
row_indices, col_indices = G.nonzero() | |
G.data /= row_sums[row_indices] | |
return G | |
def initial_guess(G,d): | |
""" | |
This function calculates an initial guess that is a weighted average directly using the G matrix as weights. | |
Parameters | |
---------- | |
G : M*N CSR Matrix | |
Sparse forward model operator. | |
d : Observed measurments of M length | |
Vector of observed data. | |
Returns | |
------- | |
NP array of M*1 | |
Weighted average intial guess. | |
""" | |
d_sparse = sp.csr_matrix(d).T | |
G_numerator_sp = G.multiply(d_sparse) | |
G_numerator = G_numerator_sp.sum(axis=0).flatten() | |
G_weight = G.sum(axis=0).flatten() | |
return np.array(G_numerator/(G_weight + np.finfo(float).eps)).flatten() | |
# ---------------------- | |
# 6. Solver wrapper - This is the magic | |
# ---------------------- | |
def minimize_wrapper(G, d, L, lam, method='L-BFGS-B', maxiter=1000): | |
# Setup an objective and gradient functions | |
def objective(m, G, d, L, lam): | |
residual = G @ m - d | |
reg_term = L @ m | |
return 0.5 * np.sum(residual**2) + 0.5 * lam * np.sum(reg_term**2) | |
def gradient(m, G, d, L, lam): | |
return G.T @ (G @ m - d) + lam * L.T @ (L @ m) | |
bounds = Bounds(np.min(d), np.max(d)) # Set bounds to be min/max of observed data, could be more clever | |
result = minimize( | |
objective, x0, | |
args=(G, d, L, lam), | |
method=method, | |
jac=gradient, | |
bounds=bounds, | |
options={'maxiter': maxiter} | |
) | |
if result.success: | |
print(f"The result reached convergence in {result.nit} iterations.") | |
else: | |
print("The result did not convergence and reached max iterations.") | |
return result | |
def sparse_gradient(nr, nc): | |
""" | |
Builds a sparse gradient matrix operator. | |
Parameters: | |
nr (int): number of rows | |
nc (int): number of columns | |
Returns: | |
scipy.sparse.csr_matrix: A sparse matrix of shape (2*nr*nc, nr*nc) | |
""" | |
C, R = np.meshgrid(np.arange(1, nc + 1), np.arange(1, nr + 1)) | |
pixel_inds = np.arange(1, nr * nc + 1).reshape((nr, nc)) | |
npx = nr * (nc - 1) + nc * (nr - 1) | |
gradmat = np.zeros((2 * npx, 3), dtype=int) | |
kndx = 0 | |
for indx in range(nr): | |
for jndx in range(nc): | |
col = C[indx, jndx] | |
row = R[indx, jndx] | |
pix = pixel_inds[indx, jndx] | |
if col < nc: | |
gradmat[kndx, :] = [pix, pix, -1] | |
kndx += 1 | |
pix_p1 = pixel_inds[indx, jndx + 1] | |
gradmat[kndx, :] = [pix, pix_p1, 1] | |
kndx += 1 | |
if row < nr: | |
gradmat[kndx, :] = [pix + nr * nc, pix, -1] | |
kndx += 1 | |
pix_p1 = pixel_inds[indx + 1, jndx] | |
gradmat[kndx, :] = [pix + nr * nc, pix_p1, 1] | |
kndx += 1 | |
# Convert to zero-based indexing for Python | |
i = gradmat[:kndx, 0] - 1 | |
j = gradmat[:kndx, 1] - 1 | |
s = gradmat[:kndx, 2] | |
grad = sp.csr_matrix((s, (i, j)), shape=(2 * nr * nc, nr * nc)) | |
return grad | |
# Plotting | |
def plot_results(m, m_mask, G, d, var, ob_x, ob_y, ny, nx, dem, xmin, ymax, cell_size, fixed_bounds=None): | |
"""Plot inversion maps and residuals side-by-side.""" | |
methods = list(m.keys()) | |
results = [m_mask[k].reshape((ny, nx)) for k in methods] | |
if fixed_bounds is not None: | |
vmin = min(fixed_bounds) | |
vmax = max(fixed_bounds) | |
else: | |
all_vals = np.concatenate(results) | |
vmin, vmax = all_vals.min(), all_vals.max() | |
fig, axs = plt.subplots(2, len(results), figsize=(5 * len(results), 8), sharey='row') | |
if len(results) == 1: | |
axs = np.array([[axs[0]], [axs[0]]]) | |
for j, (title, result) in enumerate(zip(methods, results)): | |
im = axs[0, j].imshow(np.flipud(result), origin='lower', extent=(dem.bounds.left, dem.bounds.right, dem.bounds.bottom, dem.bounds.top), vmin=vmin, vmax=vmax) | |
axs[0, j].set_title(title) | |
axs[0, j].set_xlabel("Easting (m)") | |
axs[0, j].set_ylabel("Northing (m)") | |
predicted = G @ m[title] | |
residual = d - predicted | |
scatter = axs[1, j].scatter(ob_x, ob_y, c=residual, cmap='RdBu', vmin=-np.max(np.abs(residual)), vmax=np.max(np.abs(residual)), s=5) | |
axs[1, j].set_title(f"Residual: {title}") | |
axs[1, j].set_xlabel("Easting (m)") | |
axs[1, j].set_ylabel("Northing (m)") | |
fig.colorbar(im, ax=axs[0], orientation='vertical', shrink=0.8, label='Concentration') | |
fig.colorbar(scatter, ax=axs[1], orientation='vertical', shrink=0.8, label='Residual (obs - pred)') | |
plt.savefig('results/plots/inversion_comparison.png', dpi=300) | |
plt.close() | |
print("Saved: results/plots/inversion_comparison.png") | |
def save_geotiffs(m, dem, xmin, ymax, ny, nx, cell_size): | |
"""Save inversion results to individual GeoTIFFs.""" | |
transform = from_origin(xmin, ymax, cell_size, cell_size) | |
profile = dem.profile.copy() | |
profile.update({'dtype': 'float32', 'count': 1, 'height': ny, 'width': nx, 'transform': transform}) | |
for title, result in m.items(): | |
out_grid = result.reshape((ny, nx)).astype('float32') | |
filename = f'results/geotiffs/inversion_{title.lower()}.tif' | |
with rasterio.open(filename, 'w', **profile) as dst: | |
dst.write(out_grid, 1) | |
print(f"Saved: {filename}") | |
print('Done: inversion_result.png and inversion_result.tif') | |
def mask_results_to_observations(m_dict, prism_x, prism_y, ob_x, ob_y, nx, ny): | |
""" | |
Mask inversion result dictionary to the bounds of the observations (removes buffer). | |
Sets out-of-bound cells to NaN. | |
""" | |
x_min_obs, x_max_obs = np.min(ob_x), np.max(ob_x) | |
y_min_obs, y_max_obs = np.min(ob_y), np.max(ob_y) | |
in_bounds = ( | |
(prism_x >= x_min_obs) & (prism_x <= x_max_obs) & | |
(prism_y >= y_min_obs) & (prism_y <= y_max_obs) | |
) | |
mask_grid = in_bounds.reshape((ny, nx)) | |
masked = {} | |
for key, m_val in m_dict.items(): | |
m_grid = np.flipud(m_val.reshape((ny, nx))) | |
m_masked = np.where(mask_grid, m_grid, np.nan) | |
masked[key] = m_masked | |
return masked | |
def mask_to_convex_hull(m_dict, prism_x, prism_y, ob_x, ob_y, nx, ny): | |
# Build the convex hull from observations | |
obs_points = np.vstack([ob_x, ob_y]).T | |
hull = ConvexHull(obs_points) | |
path = Path(obs_points[hull.vertices]) | |
# Build test points and mask | |
points = np.vstack([prism_x.ravel(), prism_y.ravel()]).T | |
mask_flat = path.contains_points(points) | |
mask_grid = mask_flat.reshape((ny, nx)) | |
# Reshape prism_x, prism_y if needed | |
# And FLIP prism_y vertically to match spatial layout | |
prism_x = prism_x.reshape((ny, nx)) | |
prism_y = prism_y.reshape((ny, nx)) | |
# If the mask is rotated, you may need to transpose or flip it | |
# Here we flip the mask horizontally to correct the rotation | |
mask_grid = np.rot90(mask_grid, k=-1).T # Rotate clockwise | |
# Apply to each result | |
masked = {} | |
for key, m_val in m_dict.items(): | |
m_grid = np.flipud(m_val.reshape((ny, nx))) | |
m_masked = np.where(mask_grid, m_grid, np.nan) | |
masked[key] = m_masked | |
return masked | |
# ---------------------- | |
# Error Metrics | |
def compute_rmse(observed, predicted): | |
"""Compute Root Mean Square Error.""" | |
residual = observed - predicted | |
return np.sqrt(np.mean(residual**2)) | |
# ---------------------- | |
# 7. Main script | |
# ---------------------- | |
if __name__ == '__main__': | |
'''Start the setup by loading data and generating solution grid''' | |
# Input files and column mapping | |
# GW data | |
obs_file = 'data/data.csv' | |
# dem_file = 'data/srtm_38_03.asc3.tif' | |
dem_file = 'data/dem.tif' | |
cols = {'x': 'Longitude', 'y': 'Latitude', 'value': 'MyGammaValue', 'alt': 'AGL'} | |
# crs's for data projection | |
src_crs = 'EPSG:4326' | |
utm_crs = 'EPSG:32632' | |
# Set inversion parameters | |
mu = 0.00633212076 # 1D Attenuation Coefficient m^-1 K: 0.00633212076, U: 0.00577269228, Th: 0.00471790758 | |
maxiter = 1000 # Maximim number of iterations, usually converges under 200 its | |
tol = 1e-6 # Convergence tol | |
convex_hull = True # Use a convex hull calculation to trim the result to the input data | |
# Load input data | |
ob_x, ob_y, d, var, ob_h = read_observations(obs_file, cols, src_crs, utm_crs, alt_in_ft=False) | |
d *= 1 # You can use this as an alpha if you want to convert from cps or something | |
dem = read_dem(dem_file, utm_crs) | |
det_z = ob_h + sample_dem_heights(dem, ob_x, ob_y) # Get the terrain heights below the detector and add to the AGL value (assumes same units m) | |
# Solution grid params | |
cell_size = 5 # Can be whatever grid size is desired in m | |
buffer_m = 500 # Additional buffer space to add to all edges of solution grid. Should be no less than 200-500m | |
# The ideal lam (smoothness scalar) is a function of the spatial resolution for the most part. You will have to play with this value manually. Below is an early fit I generated by guess and check, they are probably too low. | |
lam_x = np.array([15,20,25,30,35,40,50,100]) # Cell size array for which ideal lam is identified | |
lam_ls_y = np.array([9e-2,1e-1,1.5e-1,1.75e-1,2e-1,2.15e-1,2.5e-1,5e-1]) | |
lam = np.interp(cell_size, lam_x, lam_ls_y) | |
# Build regular solution grid | |
X, Y, Z, nx, ny = solution_grid(ob_x, ob_y, dem, buffer_m, cell_size) | |
'''Start the inversion process''' | |
t = time.time() | |
# Flag for triangle-based forward model, computes fast, but makes a dense matrix. Set to false to build sparse using simple model | |
use_triangle_model = True | |
print("Constructing forward model.") | |
# Assemble forward model matrix | |
if use_triangle_model: | |
G = build_triangle_sensitivity(ob_x, ob_y, det_z, X, Y, Z, nx, ny, mu) | |
else: | |
G = build_sensitivity(ob_x, ob_y, det_z, X, Y, Z, cell_size, mu) | |
print(f'Forward model is constructed in {round(time.time() - t)} seconds, starting inversion...') | |
m = {} # store all results | |
# The 'initial guess' function uses the forward model weights to calculate a weighted average per solution cell grid. Tends to be very close to the real result. | |
m["x0"] = initial_guess(G,d) | |
# Generate initialization around the expected median | |
x0 = np.ones(G.shape[1])*np.median(m["x0"]) | |
# Normalize forward model | |
G = normalize_fm(G) | |
t1 = time.time() | |
t = time.time() | |
L = sparse_gradient(ny,nx) | |
result = minimize_wrapper(G, d, L, lam, method='L-BFGS-B', maxiter=1000) | |
m["Min"] = result.x | |
print(f'Inversion complete in {round(time.time() - t1)} seconds, computing error metrics...') | |
# Start plotting process | |
# Mask the data so we don't plot the whole solution, the edges have no observed data | |
if convex_hull: | |
m_mask = mask_to_convex_hull(m, X, Y, ob_x, ob_y, nx, ny) | |
else: | |
m_mask = mask_results_to_observations(m, X, Y, ob_x, ob_y, nx, ny) | |
fixed_bounds = [np.min(d),np.max(d)] | |
xmin, xmax = ob_x.min() - buffer_m, ob_x.max() + buffer_m | |
ymin, ymax = ob_y.min() - buffer_m, ob_y.max() + buffer_m | |
plot_results(m, m_mask, G, d, var, ob_x, ob_y, ny, nx, dem, xmin, ymax, cell_size, fixed_bounds) | |
save_geotiffs(m_mask, dem, xmin, ymax, ny, nx, cell_size) | |
# Compute and print RMSE | |
methods = list(m.keys()) | |
results = [m[k].reshape((ny, nx)) for k in methods] | |
for method in methods: | |
pred = G @ m[method] | |
rmse = compute_rmse(d, pred) | |
print(f"{method}: RMSE = {rmse:.4f}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment