Source code for climt._components.dry_convection.component

from sympl import (
    get_constant, Stepper, initialize_numpy_arrays_with_properties
)
import numpy as np


[docs]class DryConvectiveAdjustment(Stepper): """ A conservative scheme to keep the temperature profile close to the dry adiabat if it is super-adiabatic. """ input_properties = { 'air_temperature': { 'units': 'degK', 'dims': ['mid_levels', '*'], }, 'air_pressure': { 'units': 'Pa', 'dims': ['mid_levels', '*'], }, 'air_pressure_on_interface_levels': { 'units': 'Pa', 'dims': ['interface_levels', '*'], 'alias': 'P_int', }, 'specific_humidity': { 'units': 'kg/kg', 'dims': ['mid_levels', '*'], }, } output_properties = { 'air_temperature': { 'units': 'degK', }, 'specific_humidity': { 'units': 'kg/kg', }, } diagnostic_properties = {} def array_call(self, state, time_step): self._Cpd = get_constant('heat_capacity_of_dry_air_at_constant_pressure', 'J/kg/degK') self._Cvap = get_constant('heat_capacity_of_vapor_phase', 'J/kg/K') self._Rdair = get_constant('gas_constant_of_dry_air', 'J/kg/degK') self._Pref = get_constant('reference_air_pressure', 'Pa') self._Rv = get_constant('gas_constant_of_vapor_phase', 'J/kg/K') q = state['specific_humidity'] output_arrays = initialize_numpy_arrays_with_properties( self.output_properties, state, self.input_properties ) output_temperature = output_arrays['air_temperature'] output_temperature[:] = state['air_temperature'] output_q = output_arrays['specific_humidity'] output_q[:] = q rd_cp = self.gas_constant(q)/self.heat_capacity(q) theta = state['air_temperature']*(self._Pref/state['air_pressure'])**rd_cp num_levels = q.shape[0] pdiff = state['P_int'][:-1, :] - state['P_int'][1:, :] theta_q = theta*(1 + output_q*self._Rv/self._Rdair - output_q) for column in range(q.shape[-1]): for level in range(num_levels-1, -1, -1): dp = pdiff[:, column] theta_sum = np.cumsum(theta_q[level::, column]) divisor = np.arange(1, num_levels - level+1) theta_avg = (theta_sum/divisor)[1::] theta_lesser = (theta_avg > theta_q[level+1::, column]) if np.sum(theta_lesser) == 0: continue convect_to_level = len(theta_lesser) - np.argmax(theta_lesser[::-1]) if level == 0: convect_to_level = max(convect_to_level, 1) if convect_to_level == 0: continue stable_level = level + convect_to_level q_conv = output_q[level:stable_level, column] t_conv = output_temperature[level:stable_level, column] dp_conv = dp[level:stable_level] p_conv_high = state['P_int'][level, column] p_conv_low = state['P_int'][stable_level, column] enthalpy = self.heat_capacity(q_conv)*t_conv integral_enthalpy = np.sum(enthalpy*dp_conv) mean_conv_q = np.sum(q_conv*dp_conv)/(p_conv_high - p_conv_low) output_q[level:stable_level, column] = mean_conv_q rdcp_conv = self.gas_constant(mean_conv_q)/self.heat_capacity(mean_conv_q) theta_coeff = ( state['air_pressure'][level:stable_level, column]/self._Pref)**rdcp_conv integral_theta_den = np.sum(self.heat_capacity(q_conv)*theta_coeff*dp_conv) mean_theta = integral_enthalpy/integral_theta_den output_temperature[level:stable_level, column] = mean_theta*theta_coeff return {}, output_arrays def heat_capacity(self, q): """ Calculate heat capacity based on amount of q """ return self._Cpd*(1-q) + self._Cvap*q def gas_constant(self, q): """ Calculate gas constant based on amount of q """ return self._Rdair*(1-q) + self._Rv*q