Source code for roger.models.oneD_event.oneD_event

from pathlib import Path
import h5netcdf
import pandas as pd
import numpy as onp

from roger import RogerSetup, roger_routine, roger_kernel, KernelOutput
from roger.variables import allocate
from roger.core.operators import numpy as npx, update, at
import roger.lookuptables as lut


[docs]class ONEDEVENTSetup(RogerSetup): """A 1D model for a single event. """ _base_path = Path(__file__).parent _input_dir = _base_path / "input" # custom helper functions def _read_var_from_nc(self, var, path_dir, file): nc_file = path_dir / file with h5netcdf.File(nc_file, "r", decode_vlen_strings=False) as infile: var_obj = infile.variables[var] return npx.array(var_obj) def _read_var_from_csv(self, var, path_dir, file): csv_file = path_dir / file infile = pd.read_csv(csv_file, sep=';', skiprows=1) var_obj = infile.loc[:, var] return npx.array(var_obj)[:, npx.newaxis] def _get_nitt(self, path_dir, file): nc_file = path_dir / file with h5netcdf.File(nc_file, "r", decode_vlen_strings=False) as infile: var_obj = infile.variables['Time'] return len(onp.array(var_obj)) def _get_runlen(self, path_dir, file): nc_file = path_dir / file with h5netcdf.File(nc_file, "r", decode_vlen_strings=False) as infile: var_obj = infile.variables['dt'] return onp.sum(var_obj) @roger_routine def set_settings(self, state): settings = state.settings settings.identifier = "ONEDEVENT" # output frequency (in seconds) settings.output_frequency = 600 # total grid numbers in x-- and y-direction settings.nx, settings.ny = 1, 1 # derive total number of time steps from forcing settings.nitt = self._get_nitt(self._input_dir, 'forcing.nc') # length of simulation (in seconds) settings.runlen = self._get_runlen(self._input_dir, 'forcing.nc') # spatial discretization (in meters) settings.dx = 1 settings.dy = 1 # origin of spatial grid settings.x_origin = 0.0 settings.y_origin = 0.0 # enable specific processes settings.enable_groundwater_boundary = False settings.enable_lateral_flow = True @roger_routine( dist_safe=False, local_variables=[ "dt_secs", "dt", "x", "y", ], ) def set_grid(self, state): vs = state.variables settings = state.settings # temporal grid vs.dt_secs = 10 * 60 vs.dt = 1 / 6 # spatial grid dx = allocate(state.dimensions, ("x")) dx = update(dx, at[:], settings.dx) dy = allocate(state.dimensions, ("y")) dy = update(dy, at[:], settings.dy) # distance from origin vs.x = update(vs.x, at[3:-2], settings.x_origin + npx.cumsum(dx[3:-2])) vs.y = update(vs.y, at[3:-2], settings.y_origin + npx.cumsum(dy[3:-2])) @roger_routine def set_look_up_tables(self, state): vs = state.variables # land use-dependent interception storage vs.lut_ilu = update(vs.lut_ilu, at[:, :], lut.ARR_ILU) # land use-dependent ground cover vs.lut_gc = update(vs.lut_gc, at[:, :], lut.ARR_GC) # land use-dependent maximum ground cover vs.lut_gcm = update(vs.lut_gcm, at[:, :], lut.ARR_GCM) # land use-dependent maximum ground cover vs.lut_is = update(vs.lut_is, at[:, :], lut.ARR_IS) # land use-dependent rooting depth vs.lut_rdlu = update(vs.lut_rdlu, at[:, :], lut.ARR_RDLU) # macropore flow velocities vs.lut_mlms = update(vs.lut_mlms, at[:, :], lut.ARR_MLMS) @roger_routine def set_topography(self, state): pass @roger_routine def set_parameters_setup(self, state): vs = state.variables # land use ID (see README for description) vs.lu_id = update(vs.lu_id, at[2:-2, 2:-2], 8) # degree of sealing (-) vs.sealing = update(vs.sealing, at[2:-2, 2:-2], 0) # surface slope (-) vs.slope = update(vs.slope, at[2:-2, 2:-2], 0.05) # convert slope to percentage vs.slope_per = update(vs.slope_per, at[2:-2, 2:-2], vs.slope[2:-2, 2:-2] * 100) # total surface depression storage (mm) vs.S_dep_tot = update(vs.S_dep_tot, at[2:-2, 2:-2], 0) # soil depth (mm) vs.z_soil = update(vs.z_soil, at[2:-2, 2:-2], 2000) # density of vertical macropores (1/m2) vs.dmpv = update(vs.dmpv, at[2:-2, 2:-2], 50) # density of horizontal macropores (1/m2) vs.dmph = update(vs.dmph, at[2:-2, 2:-2], 50) # total length of vertical macropores (mm) vs.lmpv = update(vs.lmpv, at[2:-2, 2:-2], 1000) # air capacity (-) vs.theta_ac = update(vs.theta_ac, at[2:-2, 2:-2], 0.1) # usable field capacity (-) vs.theta_ufc = update(vs.theta_ufc, at[2:-2, 2:-2], 0.1) # permanent wilting point (-) vs.theta_pwp = update(vs.theta_pwp, at[2:-2, 2:-2], 0.2) # saturated hydraulic conductivity (mm/h) vs.ks = update(vs.ks, at[2:-2, 2:-2], 5) # hydraulic conductivity of bedrock/saturated zone (mm/h) vs.kf = update(vs.kf, at[2:-2, 2:-2], 2500) @roger_routine def set_parameters(self, state): pass @roger_routine def set_initial_conditions_setup(self, state): pass @roger_routine def set_initial_conditions(self, state): vs = state.variables # interception storage of upper surface layer (mm) vs.S_int_top = update(vs.S_int_top, at[2:-2, 2:-2, :vs.taup1], 0) # snow water equivalent stored in upper surface layer (mm) vs.swe_top = update(vs.swe_top, at[2:-2, 2:-2, :vs.taup1], 0) # interception storage of lower surface layer (mm) vs.S_int_ground = update(vs.S_int_ground, at[2:-2, 2:-2, :vs.taup1], 0) # snow water equivalent stored in lower surface layer (mm) vs.swe_ground = update(vs.swe_ground, at[2:-2, 2:-2, :vs.taup1], 0) # surface depression storage (mm) vs.S_dep = update(vs.S_dep, at[2:-2, 2:-2, :vs.taup1], 0) # snow cover storage (mm) vs.S_snow = update(vs.S_snow, at[2:-2, 2:-2, :vs.taup1], 0) # snow water equivalent of snow cover (mm) vs.swe = update(vs.swe, at[2:-2, 2:-2, :vs.taup1], 0) # soil water content of root zone/upper soil layer (-) vs.theta_rz = update(vs.theta_rz, at[2:-2, 2:-2, :vs.taup1], 0.3) # soil water content of subsoil/lower soil layer (-) vs.theta_ss = update(vs.theta_ss, at[2:-2, 2:-2, :vs.taup1], 0.3) @roger_routine def set_boundary_conditions_setup(self, state): pass @roger_routine def set_boundary_conditions(self, state): pass @roger_routine def set_forcing_setup(self, state): pass @roger_routine( dist_safe=False, local_variables=[ "itt", "prec", "ta", "event_id", "tau", ], ) def set_forcing(self, state): vs = state.variables vs.prec = update(vs.prec, at[2:-2, 2:-2, vs.tau], self._read_var_from_nc("PREC", self._input_dir, 'forcing.nc')[:, :, vs.itt]) vs.ta = update(vs.ta, at[2:-2, 2:-2], self._read_var_from_nc("TA", self._input_dir, 'forcing.nc')[:, :, vs.itt]) vs.event_id = update(vs.event_id, at[vs.tau], 1) @roger_routine def set_diagnostics(self, state): pass @roger_routine def after_timestep(self, state): vs = state.variables # shift variables backwards vs.update(after_timestep_kernel(state))
@roger_kernel def after_timestep_kernel(state): vs = state.variables vs.ta = update( vs.ta, at[2:-2, 2:-2, vs.taum1], vs.ta[2:-2, 2:-2, vs.tau], ) vs.S_sur = update( vs.S_sur, at[2:-2, 2:-2, vs.taum1], vs.S_sur[2:-2, 2:-2, vs.tau], ) vs.S_int_top = update( vs.S_int_top, at[2:-2, 2:-2, vs.taum1], vs.S_int_top[2:-2, 2:-2, vs.tau], ) vs.S_int_ground = update( vs.S_int_ground, at[2:-2, 2:-2, vs.taum1], vs.S_int_ground[2:-2, 2:-2, vs.tau], ) vs.S_dep = update( vs.S_dep, at[2:-2, 2:-2, vs.taum1], vs.S_dep[2:-2, 2:-2, vs.tau], ) vs.S_snow = update( vs.S_snow, at[2:-2, 2:-2, vs.taum1], vs.S_snow[2:-2, 2:-2, vs.tau], ) vs.swe = update( vs.swe, at[2:-2, 2:-2, vs.taum1], vs.swe[2:-2, 2:-2, vs.tau], ) vs.S_rz = update( vs.S_rz, at[2:-2, 2:-2, vs.taum1], vs.S_rz[2:-2, 2:-2, vs.tau], ) vs.S_ss = update( vs.S_ss, at[2:-2, 2:-2, vs.taum1], vs.S_ss[2:-2, 2:-2, vs.tau], ) vs.S_s = update( vs.S_s, at[2:-2, 2:-2, vs.taum1], vs.S_s[2:-2, 2:-2, vs.tau], ) vs.S = update( vs.S, at[2:-2, 2:-2, vs.taum1], vs.S[2:-2, 2:-2, vs.tau], ) vs.z_sat = update( vs.z_sat, at[2:-2, 2:-2, vs.taum1], vs.z_sat[2:-2, 2:-2, vs.tau], ) vs.z_wf = update( vs.z_wf, at[2:-2, 2:-2, vs.taum1], vs.z_wf[2:-2, 2:-2, vs.tau], ) vs.z_wf_t0 = update( vs.z_wf_t0, at[2:-2, 2:-2, vs.taum1], vs.z_wf_t0[2:-2, 2:-2, vs.tau], ) vs.z_wf_t1 = update( vs.z_wf_t1, at[2:-2, 2:-2, vs.taum1], vs.z_wf_t1[2:-2, 2:-2, vs.tau], ) vs.y_mp = update( vs.y_mp, at[2:-2, 2:-2, vs.taum1], vs.y_mp[2:-2, 2:-2, vs.tau], ) vs.y_sc = update( vs.y_sc, at[2:-2, 2:-2, vs.taum1], vs.y_sc[2:-2, 2:-2, vs.tau], ) vs.theta_rz = update( vs.theta_rz, at[2:-2, 2:-2, vs.taum1], vs.theta_rz[2:-2, 2:-2, vs.tau], ) vs.theta_ss = update( vs.theta_ss, at[2:-2, 2:-2, vs.taum1], vs.theta_ss[2:-2, 2:-2, vs.tau], ) vs.theta = update( vs.theta, at[2:-2, 2:-2, vs.taum1], vs.theta[2:-2, 2:-2, vs.tau], ) vs.k_rz = update( vs.k_rz, at[2:-2, 2:-2, vs.taum1], vs.k_rz[2:-2, 2:-2, vs.tau], ) vs.k_ss = update( vs.k_ss, at[2:-2, 2:-2, vs.taum1], vs.k_ss[2:-2, 2:-2, vs.tau], ) vs.k = update( vs.k, at[2:-2, 2:-2, vs.taum1], vs.k[2:-2, 2:-2, vs.tau], ) vs.h_rz = update( vs.h_rz, at[2:-2, 2:-2, vs.taum1], vs.h_rz[2:-2, 2:-2, vs.tau], ) vs.h_ss = update( vs.h_ss, at[2:-2, 2:-2, vs.taum1], vs.h_ss[2:-2, 2:-2, vs.tau], ) vs.h = update( vs.h, at[2:-2, 2:-2, vs.taum1], vs.h[2:-2, 2:-2, vs.tau], ) vs.prec = update( vs.prec, at[2:-2, 2:-2, vs.taum1], vs.prec[2:-2, 2:-2, vs.tau], ) return KernelOutput( ta=vs.ta, S_sur=vs.S_sur, S_int_top=vs.S_int_top, S_int_ground=vs.S_int_ground, S_dep=vs.S_dep, S_snow=vs.S_snow, swe=vs.swe, S_rz=vs.S_rz, S_ss=vs.S_ss, S_s=vs.S_s, S=vs.S, z_sat=vs.z_sat, z_wf=vs.z_wf, z_wf_t0=vs.z_wf_t0, z_wf_t1=vs.z_wf_t1, y_mp=vs.y_mp, y_sc=vs.y_sc, theta_rz=vs.theta_rz, theta_ss=vs.theta_ss, theta=vs.theta, h_rz=vs.h_rz, h_ss=vs.h_ss, h=vs.h, k_rz=vs.k_rz, k_ss=vs.k_ss, k=vs.k, prec=vs.prec, )