Created
March 11, 2019 07:43
-
-
Save mikaem/3966135fda6c15aba0999853d9045ac9 to your computer and use it in GitHub Desktop.
MHD solver using shenfun and integrator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mpi4py import MPI | |
import numpy as np | |
from shenfun import * | |
nu = 0.000625 | |
eta = 0.01 | |
end_time = 0.1 | |
dt = 0.01 # no adaptive time stepping | |
comm = MPI.COMM_WORLD | |
N = (2**5, 2**5, 2**5) # 32^3 | |
L = [2*np.pi,4*np.pi,6*np.pi] # from TGMHD.py demo from spectralDNS | |
dim = 3 | |
# Define basis/tensor product spaces | |
V0 = Basis(N[0], 'F', dtype='D') # Complex-Complex | |
V1 = Basis(N[1], 'F', dtype='D') # Complex-Complex | |
V2 = Basis(N[2], 'F', dtype='d') # Real-Complex | |
T = TensorProductSpace(comm, (V0, V1, V2), **{'planner_effort': 'FFTW_MEASURE'}) # x,y,z now Fourier basis | |
TV = VectorTensorProductSpace(T) # Vector implies any function "u" will now have 3 components | |
VM = MixedTensorProductSpace([T]*2*dim) | |
u = TrialFunction(T) # Galerkin method uses the 'weak/variational' forumation | |
v = TestFunction(T) | |
# Mesh variables | |
X = T.local_mesh(True) | |
K = T.local_wavenumbers(scaled=True) | |
for i in range(dim): | |
X[i] = X[i].astype(float) | |
K[i] = K[i].astype(float) | |
K2 = np.zeros(T.local_shape(True), dtype=float) | |
for i in range(dim): | |
K2 += K[i]*K[i] | |
K_over_K2 = np.zeros(TV.local_shape(), dtype=float) # TV = vector | |
for i in range(dim): | |
K_over_K2[i] = K[i] / np.where(K2 == 0, 1, K2) | |
# Dealiased grid | |
kw = {'padding_factor': 1, | |
'dealias_direct': '2/3-rule'} | |
dtypes = ['D','D','d'] | |
Vp = [Basis(N[i], 'F', domain=(0, L[i]), | |
dtype=dtypes[i], **kw) for i in range(dim)] | |
Tp = TensorProductSpace(comm, Vp, dtype=float, | |
collapse_fourier=True, | |
**{'planner_effort': 'FFTW_MEASURE'}) | |
VTp = VectorTensorProductSpace(Tp) | |
VMp = MixedTensorProductSpace([Tp]*2*dim) | |
ub_dealias = Array(VMp) | |
UB = Array(VM) # Both V and B vectors: 6 components | |
P = Array(T) # Pressure scalar: 1 component | |
curl = Array(TV) # Vector: 3 components | |
UB_hat = Function(VM) # K-space V,B vectors | |
P_hat = Function(T) # K-space Pressure scalar | |
#dU = Function(VM) | |
#Source = Array(VM) | |
ZZ_hat = np.zeros((3, 3) + Tp.local_shape(True), dtype=complex) # Work array | |
# Create views into large data structures | |
U, B = UB[:3], UB[3:] | |
U_hat, B_hat = UB_hat[:3], UB_hat[3:] | |
# Primary variable | |
#u = UB_hat | |
def set_Elsasser(c, ZZ, K): | |
c[:3] = -1j*(K[0]*(ZZ[:, 0] + ZZ[0, :]) | |
+ K[1]*(ZZ[:, 1] + ZZ[1, :]) | |
+ K[2]*(ZZ[:, 2] + ZZ[2, :]))/2.0 | |
c[3:] = 1j*(K[0]*(ZZ[0, :] - ZZ[:, 0]) | |
+ K[1]*(ZZ[1, :] - ZZ[:, 1]) | |
+ K[2]*(ZZ[2, :] - ZZ[:, 2]))/2.0 | |
return c | |
def divergenceConvection(c, z0, z1, Tp, K, ZZ_hat): | |
"""Divergence convection using Elsasser variables | |
z0=U+B | |
z1=U-B | |
""" | |
for i in range(3): | |
for j in range(3): | |
ZZ_hat[i, j] = Tp.forward(z0[i]*z1[j], ZZ_hat[i, j]) | |
c = set_Elsasser(c, ZZ_hat, K) | |
return c | |
def NonlinearRHS(self, ub, ub_hat, dU, **params): # called getConvection in MHD.py | |
global ub_dealias, Tp, VMp, K, ZZ_hat, K_over_K2, P_hat | |
ub_dealias = VMp.backward(UB_hat, ub_dealias) | |
u_dealias = ub_dealias[:3] | |
b_dealias = ub_dealias[3:] | |
# Compute convective term and place in dU | |
dU = divergenceConvection(dU, u_dealias+b_dealias, u_dealias-b_dealias, | |
Tp, K, ZZ_hat) | |
u_hat = ub_hat[:3] | |
b_hat = ub_hat[3:] | |
# Compute pressure (To get actual pressure multiply by 1j) | |
P_hat = np.sum(dU[:3]*K_over_K2, 0, out=P_hat) | |
# Add pressure gradient | |
for i in range(3): | |
dU[i] -= P_hat*K[i] | |
return dU | |
def LinearRHS(self, **params): # called pressure diffusion in MHD.py | |
L = inner(div(grad(u)), v).scale | |
L = np.broadcast_to(L[np.newaxis, ...], (6,)+L.shape).copy() | |
L[:3] *= nu | |
L[3:] *= eta | |
return L | |
if __name__ == '__main__': | |
for integrator in (RK4, ETDRK4): | |
# Initialization | |
U[0] = np.sin(X[0])*np.cos(X[1])*np.cos(X[2]) | |
U[1] = -np.cos(X[0])*np.sin(X[1])*np.cos(X[2]) | |
U[2] = 0 | |
B[0] = np.sin(X[0])*np.sin(X[1])*np.cos(X[2]) | |
B[1] = np.cos(X[0])*np.cos(X[1])*np.cos(X[2]) | |
B[2] = 0 | |
UB[:3], UB[3:] = U, B | |
UB_hat = UB.forward(UB_hat) | |
# Solve | |
integ = integrator(VMp, L=LinearRHS, N=NonlinearRHS) | |
integ.setup(dt) | |
UB_hat = integ.solve(UB, UB_hat, dt, (0, end_time)) | |
# Check accuracy | |
UB = UB_hat.backward(UB) | |
U,B = UB[:3], UB[3:] | |
k = comm.reduce(0.5*np.sum(U.astype(np.float64)*U.astype(np.float64))/np.prod(np.array(N))) | |
b = comm.reduce(0.5*np.sum(B.astype(np.float64)*B.astype(np.float64))/np.prod(np.array(N))) | |
if comm.Get_rank() == 0: | |
print(k,b) | |
assert np.round(k - 0.124565408177, 7) == 0 | |
assert np.round(b - 0.124637762143, 7) == 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment