Error diagnostics
mdbartos opened this issue · 1 comments
mdbartos commented
Create module to handle error diagnostics.
mdbartos commented
Existing error estimation code:
def superlink_error(self):
NK = self.NK
nk = self.nk
_kI = self._kI
_ki = self._ki
_I_1k = self._I_1k
_I_Np1k = self._I_Np1k
_Q_uk = self._Q_uk
_Q_dk = self._Q_dk
_h_Ik = self._h_Ik
_Q_ik = self._Q_ik
_D_Ik = self._D_Ik
_E_Ik = self._E_Ik
_a_ik = self._a_ik
_b_ik = self._b_ik
_c_ik = self._c_ik
_P_ik = self._P_ik
_A_SIk = self._A_SIk
_A_ik = self._A_ik
_dt = self._dt
min_depth = self.min_depth
g = 9.81
# For each superlink...
cont_err = np.zeros((nk + 1).sum())
mom_err = np.zeros(nk.sum())
for k in range(NK):
# Set up solution matrix
nlinks = nk[k]
njunctions = nlinks + 1
N = nlinks + njunctions
I_k = (_kI == k)
i_k = (_ki == k)
rows = np.arange(0, N, 2)
cols = np.arange(0, njunctions)
Q_u = _Q_uk[k]
Q_d = _Q_dk[k]
Ak = np.zeros((N, N))
bk = np.zeros(N)
# Fill right-hand side matrix
Ak[rows, cols] = _E_Ik[I_k]
Ak[rows[:-1], njunctions + cols[:-1]] = 1.
Ak[rows[1:], njunctions + cols[:-1]] = -1.
Ak[1 + rows[:-1], cols[:-1]] = g * _A_ik[i_k]
Ak[1 + rows[:-1], cols[1:]] = -g * _A_ik[i_k]
Ak[1 + rows[:-1], njunctions + cols[:-1]] = _b_ik[i_k]
Ak[1 + rows[1:-1], njunctions + cols[:-2]] = _a_ik[i_k][1:]
Ak[1 + rows[:-2], njunctions + cols[1:-1]] = _c_ik[i_k][:-1]
# Fill left-hand side vector
bk[rows] = _D_Ik[I_k]
bk[1 + rows[:-1]] = _P_ik[i_k]
bk[0] += Q_u
bk[-1] -= Q_d
bk[1] -= _a_ik[i_k][0] * Q_u
bk[-2] -= _c_ik[i_k][-1] * Q_d
# Solve superlink equations simultaneously
x_hat = np.zeros(N)
x_hat[:njunctions] = _h_Ik[I_k]
x_hat[njunctions:] = _Q_ik[i_k]
err_k = bk - (Ak @ x_hat)
cont_err[I_k] = err_k[:njunctions]
mom_err[i_k] = err_k[njunctions:]
return cont_err, mom_err
def superlink_flow_error(self):
# Import instance variables
nk = self.nk
_is_end = self._is_end
_is_start = self._is_start
_h_Ik = self._h_Ik
_U_Ik = self._U_Ik
_V_Ik = self._V_Ik
_W_Ik = self._W_Ik
_X_Ik = self._X_Ik
_Y_Ik = self._Y_Ik
_Z_Ik = self._Z_Ik
_h_uk = self._h_uk
_h_dk = self._h_dk
# Compute internal flow estimates in both directions
Q_ik_b = self._Q_i_next_b(_X_Ik[~_is_end], _h_Ik[~_is_end],
_Y_Ik[~_is_end], _Z_Ik[~_is_end],
np.repeat(_h_dk, nk))
Q_ik_f = self._Q_im1k_next_f(_U_Ik[~_is_end], _h_Ik[~_is_start],
_V_Ik[~_is_end], _W_Ik[~_is_end],
np.repeat(_h_uk, nk))
return Q_ik_b, Q_ik_f
def superlink_depth_error(self):
# Import instance variables
nk = self.nk
_is_end = self._is_end
_is_start = self._is_start
_Q_ik = self._Q_ik
_U_Ik = self._U_Ik
_V_Ik = self._V_Ik
_W_Ik = self._W_Ik
_X_Ik = self._X_Ik
_Y_Ik = self._Y_Ik
_Z_Ik = self._Z_Ik
_h_uk = self._h_uk
_h_dk = self._h_dk
min_depth = self.min_depth
# Create arrays
n_junctions = (nk + 1).sum()
h_Ik_b = np.empty(n_junctions)
h_Ik_f = np.empty(n_junctions)
# Compute internal flow estimates in both directions
h_Ik_b[~_is_end] = self._h_Ik_next_b(_Q_ik, _Y_Ik[~_is_end], _Z_Ik[~_is_end],
np.repeat(_h_dk, nk), _X_Ik[~_is_end])
h_Ik_f[~_is_start] = self._h_Ik_next_f(_Q_ik, _V_Ik[~_is_end], _W_Ik[~_is_end],
np.repeat(_h_uk, nk), _U_Ik[~_is_end])
# Ensure end depths are consistent with boundary conditions
h_Ik_b[_is_start] = _h_uk
h_Ik_b[_is_end] = _h_dk
h_Ik_f[_is_start] = _h_uk
h_Ik_f[_is_end] = _h_dk
h_Ik_b = np.maximum(h_Ik_b, min_depth)
h_Ik_f = np.maximum(h_Ik_f, min_depth)
return h_Ik_b, h_Ik_f
def recurrence_convergence(self, dt=None, max_iter=20, rtol=1e-12):
# Import instance variables
_A_SIk = self._A_SIk
if dt is None:
dt = self.dt
# Loop until convergence
err_prev = 0.
for _ in range(max_iter):
Q_ik_b, Q_ik_f = self.superlink_flow_error()
h_Ik_b, h_Ik_f = self.superlink_depth_error()
herr = ((_A_SIk * (h_Ik_b - h_Ik_f) / dt)**2).sum()
ferr = ((Q_ik_b - Q_ik_f)**2).sum()
err_next = herr + ferr
derr = err_next - err_prev
if abs(derr) < rtol:
break
self._Q_ik = (Q_ik_b + Q_ik_f) / 2
self._h_Ik = (h_Ik_b + h_Ik_f) / 2
err_prev = err_next
def superlink_continuity_error(self, dt=None):
# Import instance variables
NK = self.NK
_ki = self._ki
_kI = self._kI
_h_Ik_next = self._h_Ik
_Q_ik_next = self._Q_ik
_h_Ik = self.states['_h_Ik']
_Q_ik = self.states['_Q_ik']
_A_SIk = self._A_SIk
_Q_uk = self._Q_uk
_Q_dk = self._Q_dk
if dt is None:
dt = self._dt
# Superlink continuity error
dSk = np.zeros(NK)
dSIk = _A_SIk * (_h_Ik_next - _h_Ik) / dt
dQik = _Q_ik_next - _Q_ik
np.add.at(dSk, _kI, dSIk)
np.add.at(dSk, _ki, dQik)
_err_k = _Q_uk - _Q_dk - dSk
# Export instance variables
self._err_k = _err_k
def superjunction_continuity_error(self, Q_in=None, dt=None):
# Import instance variables
M = self.M
_J_uk = self._J_uk
_J_dk = self._J_dk
_A_sj = self._A_sj
H_j_next = self.H_j
H_j = self.states['H_j']
_Q_uk = self._Q_uk
_Q_dk = self._Q_dk
bc = self.bc
if Q_in is None:
Q_in = self._Q_in
if dt is None:
dt = self._dt
# Superjunction continuity error
dQj = np.zeros(M)
np.add.at(dQj, _J_uk, -_Q_uk)
np.add.at(dQj, _J_dk, _Q_dk)
dQj += Q_in
dSj = _A_sj * (H_j_next - H_j) / dt
_err_j = dQj - dSj
_err_j[bc] = 0.
# Export instance variables
self._err_j = _err_j