Source code for climt._core.initialization

from sympl import (
    DataArray, DiagnosticComponent, combine_component_properties, get_constant,
    set_constant
)
from .._components import RRTMGShortwave, RRTMGLongwave
import numpy as np
from datetime import datetime
from scipy.interpolate import CubicSpline
import pkg_resources


def get_atmosphere_grid(grid_state,
                        interface=False,
                        horizontal=False):
    y, x = grid_state['latitude'].shape
    y_dim, x_dim = grid_state['latitude'].dims
    z = grid_state['atmosphere_hybrid_sigma_pressure_a_coordinate_on_interface_levels'].shape[0]

    if horizontal:
        return (y, x), (y_dim, x_dim)
    if interface:
        return (z, y, x), ('interface_levels', y_dim, x_dim)
    else:
        return (z-1, y, x), ('mid_levels', y_dim, x_dim)


def get_surface_grid(grid_state,
                     interface=False,
                     horizontal=False):
    return tuple(grid_state['latitude'].shape), tuple(grid_state['latitude'].dims)


def get_land_grid(grid_state,
                  interface=False,
                  horizontal=False):
    y, x = grid_state['latitude'].shape
    y_dim, x_dim = grid_state['latitude'].dims

    if horizontal:
        return (y, x), (y_dim, x_dim)
    else:
        raise NotImplementedError(
            '3D land grids are not yet supported')


def get_ocean_grid(grid_state,
                   interface=False,
                   horizontal=False):
    y, x = grid_state['latitude'].shape
    y_dim, x_dim = grid_state['latitude'].dims

    if horizontal:
        return (y, x), (y_dim, x_dim)
    else:
        raise NotImplementedError(
            '3D ocean grids are not yet supported')


def get_ice_grid(grid_state,
                 interface=False,
                 horizontal=False):
    y, x = grid_state['latitude'].shape
    y_dim, x_dim = grid_state['latitude'].dims
    z = grid_state['height_on_ice_interface_levels'].shape[0]

    if horizontal:
        return (y, x), (y_dim, x_dim)
    if interface:
        return (z, y, x), ('ice_interface_levels', y_dim, x_dim)
    else:
        return (z-1, y, x), ('ice_mid_levels', y_dim, x_dim)


def get_scalar_grid(grid_state,
                    interface=False,
                    horizontal=False):
    return (), ()


domain_shape_descriptor = {
    'atmosphere': get_atmosphere_grid,
    'surface': get_surface_grid,
    'land': get_land_grid,
    'ocean': get_ocean_grid,
    'ice': get_ice_grid,
    'scalar': get_scalar_grid,
}


class RRTMGLongwaveDefaultValues(DiagnosticComponent):

    input_properties = {
        'air_pressure': {
            'dims': ['*', 'mid_levels'],
            'units': 'Pa',
        },
    }

    diagnostic_properties = {
        'surface_longwave_emissivity': {
            'dims': ['num_longwave_bands', '*'],
            'units': 'dimensionless',
        },
        'longwave_optical_thickness_due_to_cloud': {
            'dims': ['mid_levels', '*', 'num_longwave_bands'],
            'units': 'dimensionless',
        },
        'longwave_optical_thickness_due_to_aerosol': {
            'dims': ['num_longwave_bands', 'mid_levels', '*'],
            'units': 'dimensionless',
        },
    }

    def array_call(self, state):
        ncol, nz = state['air_pressure'].shape
        diagnostics = {
            'surface_longwave_emissivity': np.ones(
                [RRTMGLongwave.num_longwave_bands, ncol]
            ),
            'longwave_optical_thickness_due_to_cloud': np.zeros(
                [nz, ncol, RRTMGLongwave.num_longwave_bands]),
            'longwave_optical_thickness_due_to_aerosol': np.zeros(
                [RRTMGLongwave.num_longwave_bands, nz, ncol]),
        }
        return diagnostics


class RRTMGShortwaveDefaultValues(DiagnosticComponent):

    input_properties = {
        'air_pressure': {
            'dims': ['mid_levels', '*'],
            'units': 'Pa',
        },
    }

    diagnostic_properties = {
        'shortwave_optical_thickness_due_to_cloud': {
            'dims': ['mid_levels', '*', 'num_shortwave_bands'],
            'units': 'dimensionless',
        },
        'cloud_asymmetry_parameter': {
            'dims': ['mid_levels', '*', 'num_shortwave_bands'],
            'units': 'dimensionless',
        },
        'cloud_forward_scattering_fraction': {
            'dims': ['mid_levels', '*', 'num_shortwave_bands'],
            'units': 'dimensionless',
        },
        'single_scattering_albedo_due_to_cloud': {
            'dims': ['mid_levels', '*', 'num_shortwave_bands'],
            'units': 'dimensionless',
        },
        'shortwave_optical_thickness_due_to_aerosol': {
            'dims': ['num_shortwave_bands', 'mid_levels', '*'],
            'units': 'dimensionless',
        },
        'aerosol_asymmetry_parameter': {
            'dims': ['num_shortwave_bands', 'mid_levels', '*'],
            'units': 'dimensionless',
        },
        'single_scattering_albedo_due_to_aerosol': {
            'dims': ['num_shortwave_bands', 'mid_levels', '*'],
            'units': 'dimensionless',
        },
        'aerosol_optical_depth_at_55_micron': {
            'dims': ['num_ecmwf_aerosols', 'mid_levels', '*'],
            'units': 'dimensionless',
        },
    }

    def array_call(self, state):
        nz, ncol = state['air_pressure'].shape
        diagnostics = {
            'shortwave_optical_thickness_due_to_cloud':
                np.zeros([nz, ncol, RRTMGShortwave.num_shortwave_bands]),
            'cloud_asymmetry_parameter':
                0.85 * np.ones([nz, ncol, RRTMGShortwave.num_shortwave_bands]),
            'cloud_forward_scattering_fraction':
                0.8 * np.ones([nz, ncol, RRTMGShortwave.num_shortwave_bands]),
            'single_scattering_albedo_due_to_cloud':
                0.9 * np.ones([nz, ncol, RRTMGShortwave.num_shortwave_bands]),
            'shortwave_optical_thickness_due_to_aerosol':
                np.zeros([RRTMGShortwave.num_shortwave_bands, nz, ncol]),
            'aerosol_asymmetry_parameter':
                np.zeros([RRTMGShortwave.num_shortwave_bands, nz, ncol]),
            'single_scattering_albedo_due_to_aerosol':
                0.5 * np.ones([RRTMGShortwave.num_shortwave_bands, nz, ncol]),
            'aerosol_optical_depth_at_55_micron':
                np.zeros([RRTMGShortwave.num_ecmwf_aerosols, nz, ncol]),
        }
        return diagnostics


class ConstantDefaultValue(DiagnosticComponent):

    input_properties = {}
    diagnostic_properties = {}

    def __init__(
            self, output_name, output_value, output_units,
            dtype=None, domain=None, **kwargs):
        if dtype is None:
            self._dtype = np.float64
        else:
            self._dtype = dtype
        self._output_name = output_name
        self._output_value = output_value
        self._output_units = output_units
        self.store_domain_properties(domain)

        self.diagnostic_properties = {
            output_name: {
                'dims': ['*'],
                'units': output_units,
            },
        }
        super(ConstantDefaultValue, self).__init__(**kwargs)

    def store_domain_properties(self, domain):
        self._interface = False
        self._horizontal = False
        if domain is None:
            self._domain = 'scalar'
            return

        domain_and_type = domain.split('_')
        self._domain = domain_and_type[0]

        if len(domain_and_type) > 1:
            if domain_and_type[1] == 'horizontal':
                self._horizontal = True
            elif domain_and_type[1] == 'interface':
                self._interface = True
            else:
                NotImplementedError(
                    'Unknown domain descriptor {}'.format(domain))

    def __call__(self, grid_state):
        shape, dims = domain_shape_descriptor[self._domain](
            grid_state, self._interface, self._horizontal)

        quantity_np_array = np.broadcast_to(
            np.array(self._output_value, dtype=self._dtype),
            shape).copy()

        quantity_array = DataArray(
            quantity_np_array,
            dims=dims,
            name=self._output_name,
            attrs=dict(units=self._output_units))

        return {self._output_name: quantity_array}

    def array_call(self, state):
        return


class PressureFunctionDiagnosticComponent(DiagnosticComponent):
    """Defines a quantity as a function of pressure."""

    input_properties = {
        'air_pressure': {
            'dims': ['mid_levels', '*'],
            'units': 'Pa',
            'alias': 'p'
        },
        'surface_air_pressure': {
            'dims': ['*'],
            'units': 'Pa',
            'alias': 'ps'
        }
    }

    diagnostic_properties = {}

    def __init__(
            self, output_name, output_function, output_units,
            mid_or_interface_levels='mid'):
        """

        Args:
            output_name : string
                Name of the output. If outputting on interface levels, do *not*
                include '_on_interface_levels' at the end of the name.
            output_function : func
                A function which takes a numpy array of pressure as its only
                argument and returns a numpy array of the output quantity with
                the same dimensions.
            output_units: string
                The units of the output quantity
            mid_or_interface_levels: 'mid' or 'interface'
                Whether this diagnostic should perform the calculation on
                mid levels or interface levels.
        """
        vertical_dimension = 'mid_levels'
        if mid_or_interface_levels == 'interface':
            vertical_dimension = 'interface_levels'
            output_name += '_on_interface_levels'
            self.input_properties = {
                'air_pressure_on_interface_levels': {
                    'dims': ['interface_levels', '*'],
                    'units': 'Pa',
                    'alias': 'p'
                },
                'surface_air_pressure': {
                    'dims': ['*'],
                    'units': 'Pa',
                    'alias': 'ps'
                }
            }
        elif mid_or_interface_levels == 'mid':
            pass
        else:
            raise ValueError(
                "Argument mid_or_interface_levels must be 'mid' or 'interface'")
        self.diagnostic_properties = {
            output_name: {
                'dims': [vertical_dimension, '*'],
                'units': output_units
            }
        }
        self._output_function = output_function
        self._output_name = output_name
        super(PressureFunctionDiagnosticComponent, self).__init__()

    def array_call(self, raw_state):
        return {
            self._output_name: self._output_function(raw_state['p'], raw_state['ps'])
        }


'''
def horizontal_broadcast_if_needed(output, nx=None, ny=None):
    """
    Take a tuple of numpy arrays output. If nx and ny are not None, broadcast
    each of those numpy arrays along new dimensions of the correct length
    before returning them.
    """
    output_list = list(output)
    if ny is not None:
        for i in range(len(output_list)):
            output_list[i] = expand_new_last_dim(output_list[i], ny)
    if nx is not None:
        for i in range(len(output_list)):
            output_list[i] = expand_new_last_dim(output_list[i], nx)
    return tuple(output_list)


def expand_new_last_dim(var, length):
    indexer = tuple(slice(0, n) for n in var.shape) + (None,)
    return np.repeat(var[indexer], length, axis=-1)
'''


def leggauss(deg):
    """
    Gauss-Legendre quadrature.
    Computes the sample points and weights for Gauss-Legendre quadrature.
    These sample points and weights will correctly integrate polynomials of
    degree :math:`2*deg - 1` or less over the interval :math:`[-1, 1]` with
    the weight function :math:`f(x) = 1`.
    Parameters
    ----------
    deg : int
        Number of sample points and weights. It must be >= 1.
    Returns
    -------
    x : ndarray
        1-D ndarray containing the sample points.
    y : ndarray
        1-D ndarray containing the weights.
    Notes
    -----
    .. versionadded:: 1.7.0
    The results have only been tested up to degree 100, higher degrees may
    be problematic. The weights are determined by using the fact that
    .. math:: w_k = c / (L'_n(x_k) * L_{n-1}(x_k))
    where :math:`c` is a constant independent of :math:`k` and :math:`x_k`
    is the k'th root of :math:`L_n`, and then scaling the results to get
    the right value when integrating 1.
    """
    ideg = int(deg)
    if ideg != deg or ideg < 1:
        raise ValueError("deg must be a non-negative integer")

    # first approximation of roots. We use the fact that the companion
    # matrix is symmetric in this case in order to obtain better zeros.
    c = np.array([0]*deg + [1])
    m = np.polynomial.legendre.legcompanion(c)
    x = np.linalg.eigvalsh(m)

    # improve roots by one application of Newton
    dy = np.polynomial.legendre.legval(x, c)
    df = np.polynomial.legendre.legval(x, np.polynomial.legendre.legder(c))
    x -= dy/df

    # compute the weights. We scale the factor to avoid possible numerical
    # overflow.
    fm = np.polynomial.legendre.legval(x, c[1:])
    fm /= np.abs(fm).max()
    df /= np.abs(df).max()
    w = 1/(fm * df)

    # for Legendre we can also symmetrize
    w = (w + w[::-1])/2
    x = (x - x[::-1])/2

    # scale w to get the right value
    w *= 2. / w.sum()

    return x, w


def gaussian_latitudes(n):
    x, weights = leggauss(n)
    edges = np.empty([n+1])
    edges[0] = -1
    edges[1:-1] = -1 + weights[:-1].cumsum()
    edges[-1] = 1
    return -np.rad2deg(np.arcsin(x)), -np.rad2deg(np.arcsin(edges))


[docs]def get_grid( nx=None, ny=None, nz=28, n_ice_interface_levels=10, p_surf_in_Pa=None, p_toa_in_Pa=None, proportion_sigma_levels=0.1, proportion_isobaric_levels=0.25, x_name='lon', y_name='lat', latitude_grid='gaussian'): """ Args: nx : int, optional Number of longitudinal points. ny : int, optional Number of latitudinal points. nz : int, optional Number of vertical mid levels. n_ice_interface_levels (int, optional): Number of vertical interface levels to use for ice. Use None to disable the ice vertical grid. p_surf_in_Pa : float, optional Surface pressure in Pa. x_name : str, optional Name of latitudinal dimension y_name : str, optional Name of longitudinal dimension latitude_grid : 'gaussian' or 'regular' Type of spacing to use for the latitudinal grid. Returns: grid_state: dict A model state containing grid quantities. """ if p_surf_in_Pa is None: p_surf_in_Pa = get_constant('reference_air_pressure', 'Pa') if p_toa_in_Pa is None: p_toa_in_Pa = get_constant('top_of_model_pressure', 'Pa') else: set_constant('top_of_model_pressure', p_toa_in_Pa, 'Pa') if nx is None: nx = 1 if ny is None: ny = 1 return_state = get_hybrid_sigma_pressure_levels(nz+1, p_surf_in_Pa, p_toa_in_Pa, proportion_isobaric_levels, proportion_sigma_levels) return_state['surface_air_pressure'] = DataArray( np.ones((ny, nx))*p_surf_in_Pa, dims=[y_name, x_name], attrs={'units': 'Pa'} ) return_state['time'] = datetime(2000, 1, 1) return_state.update(HybridSigmaPressureDiagnosticComponent()(return_state)) if nx is not None: two_dim_lons = np.ones((ny, nx)) two_dim_lons[:] = np.linspace(0., 360., nx*2, endpoint=False)[:-1:2][np.newaxis, :] return_state['longitude'] = DataArray( two_dim_lons, dims=[y_name, x_name], attrs={'units': 'degrees_east'}, ) if ny is not None: two_dim_lats = np.ones((ny, nx)) if latitude_grid.lower() == 'regular': two_dim_lats[:] = np.linspace(-90., 90., ny*2+1, endpoint=True)[1:-1:2][:, np.newaxis] return_state['latitude'] = DataArray( two_dim_lats, dims=[y_name, x_name], attrs={'units': 'degrees_north'}, ) elif latitude_grid.lower() == 'gaussian': lat, lat_interface = gaussian_latitudes(ny) two_dim_lats[:] = lat[:, np.newaxis] return_state['latitude'] = DataArray( two_dim_lats, dims=[y_name, x_name], attrs={'units': 'degrees_north'}, ) else: raise ValueError( 'latitude_grid can be either regular or gaussian. ' + 'Other grid types are currently not supported.') if n_ice_interface_levels is not None: return_state['height_on_ice_interface_levels'] = DataArray( np.zeros(n_ice_interface_levels), dims=['ice_interface_levels'], attrs={'units': 'm'}, ) return return_state
class HybridSigmaPressureDiagnosticComponent(DiagnosticComponent): input_properties = { 'atmosphere_hybrid_sigma_pressure_a_coordinate_on_interface_levels': { 'units': 'dimensionless', 'dims': ['interface_levels', '*'], 'alias': 'a_coord', }, 'atmosphere_hybrid_sigma_pressure_b_coordinate_on_interface_levels': { 'units': 'dimensionless', 'dims': ['interface_levels', '*'], 'alias': 'b_coord', }, 'surface_air_pressure': { 'units': 'Pa', 'dims': ['*'], }, } diagnostic_properties = { 'air_pressure': { 'units': 'Pa', 'dims': ['mid_levels', '*'], }, 'air_pressure_on_interface_levels': { 'units': 'Pa', 'dims': ['interface_levels', '*'], }, } def array_call(self, state): model_top_pressure = get_constant('top_of_model_pressure', 'Pa') p_interface = ( state['a_coord'] + state['b_coord'] * (state['surface_air_pressure'][None, :] - model_top_pressure)) delta_p = p_interface[1:, :] - p_interface[:-1, :] Rd = get_constant('gas_constant_of_dry_air', 'J kg^-1 K^-1') Cpd = get_constant('heat_capacity_of_dry_air_at_constant_pressure', 'J kg^-1 K^-1') rk = Rd/Cpd p = ( (p_interface[1:, :]**(rk+1) - p_interface[:-1, :]**(rk+1)) / ( (rk+1) * delta_p ) ) ** (1./rk) assert not np.any(np.isnan(p)) return { 'air_pressure': p, 'air_pressure_on_interface_levels': p_interface, } def get_hybrid_sigma_pressure_levels(num_levels=28, reference_pressure=1e5, model_top_pressure=20, proportion_isobaric_levels=0.25, proportion_sigma_levels=0.1): ''' Calculate the values of ak and bk using the algorithm from `[Eckermann]`_ (2008) for their NEWHYB2 coordinate system. The pressure thickness distribution is given by a sine shaped curve with a maximum at the middle of the pressure range. The nominal lower surface pressure of 1000 mb is used to calculate the coordinates, but this does not put any restriction on the actual surface pressure. Args: num_levels: int, optional The number of vertical levels reference_pressure: float, optional The reference surface pressure model_top_pressure: float, optional The pressure at the top of the model proportion_isobaric_levels: float, optional The proportion of levels where bk = 0 proportion_sigma_levels: float, optional The proportion of levels where ak = 0 Returns: hybrid_coords: dict dictionary containing ak, bk, and sigma levels Reference: Eckermann, S: Hybrid \sigma-p Coordinate Choices for a Global Model, Monthly Weather Review Jan 2009. .. _[Eckermann]: https://journals.ametsoc.org/doi/pdf/10.1175/2008MWR2537.1 ''' thickness_dist = np.sin(np.linspace(0.1, np.pi-0.1, num_levels-1)) thickness_dist /= np.sum(thickness_dist) thickness_dist *= (reference_pressure - model_top_pressure) pressure_levels = np.zeros(num_levels) pressure_levels[0] = model_top_pressure pressure_levels[1:] = model_top_pressure + np.cumsum(thickness_dist) sigma_interface = (pressure_levels - model_top_pressure)/(reference_pressure - model_top_pressure) ak = np.zeros(num_levels) bk = np.zeros(num_levels) num_isobaric_levels = int(proportion_isobaric_levels*num_levels) num_sigma_levels = int(proportion_sigma_levels*num_levels) ak[0:num_isobaric_levels] = pressure_levels[0:num_isobaric_levels] isobaric_sigma_level = sigma_interface[num_isobaric_levels-1] for level in range(num_isobaric_levels, num_levels - num_sigma_levels): sigma_value = sigma_interface[level] b_level = (sigma_value - isobaric_sigma_level)/(1 - isobaric_sigma_level) r_level = get_exponent_for_sigma(b_level, num_sigma_levels) bk[level] = b_level**r_level ak[level] = model_top_pressure + (sigma_value - bk[level])*(reference_pressure - model_top_pressure) for level in range(num_levels - num_sigma_levels, num_levels): sigma_value = sigma_interface[level] bk[level] = (sigma_interface[level] - isobaric_sigma_level)/(1 - isobaric_sigma_level) ak[level] = model_top_pressure + (sigma_value - bk[level])*(reference_pressure - model_top_pressure) ak = ak[::-1] bk = bk[::-1] return { 'atmosphere_hybrid_sigma_pressure_a_coordinate_on_interface_levels': DataArray( ak, dims=['interface_levels'], attrs={'units': 'dimensionless'}, ), 'atmosphere_hybrid_sigma_pressure_b_coordinate_on_interface_levels': DataArray( bk, dims=['interface_levels'], attrs={'units': 'dimensionless'}, ), } def get_exponent_for_sigma(b_half, num_sigma_levels): # These values correspond to the NEWHYB2 coordinate r_p = 2.2 r_sigma = 1.0 S = 5 if num_sigma_levels > 0: r_sigma = 1 else: r_sigma = 1.35 return r_p + (r_sigma - r_p)*np.arctan(S*b_half)/np.arctan(S) default_values = { 'air_temperature': {'value': 290., 'units': 'degK', 'domain': 'atmosphere'}, 'northward_wind': {'value': 0., 'units': 'm/s', 'domain': 'atmosphere'}, 'eastward_wind': {'value': 0., 'units': 'm/s', 'domain': 'atmosphere'}, 'divergence_of_wind': {'value': 0., 'units': 's^-1', 'domain': 'atmosphere'}, 'atmosphere_relative_vorticity': {'value': 0., 'units': 's^-1', 'domain': 'atmosphere'}, 'specific_humidity': {'value': 0., 'units': 'kg/kg', 'domain': 'atmosphere'}, 'mole_fraction_of_carbon_dioxide_in_air': {'value': 330e-6, 'units': 'dimensionless', 'domain': 'atmosphere'}, 'mole_fraction_of_methane_in_air': {'value': 0., 'units': 'dimensionless', 'domain': 'atmosphere'}, 'mole_fraction_of_nitrous_oxide_in_air': {'value': 0., 'units': 'dimensionless', 'domain': 'atmosphere'}, 'mole_fraction_of_oxygen_in_air': {'value': 0.21, 'units': 'dimensionless', 'domain': 'atmosphere'}, 'mole_fraction_of_nitrogen_in_air': {'value': 0.78, 'units': 'dimensionless', 'domain': 'atmosphere'}, 'mole_fraction_of_hydrogen_in_air': {'value': 500e-9, 'units': 'dimensionless', 'domain': 'atmosphere'}, 'mole_fraction_of_cfc11_in_air': {'value': 0., 'units': 'dimensionless', 'domain': 'atmosphere'}, 'mole_fraction_of_cfc12_in_air': {'value': 0., 'units': 'dimensionless', 'domain': 'atmosphere'}, 'mole_fraction_of_cfc22_in_air': {'value': 0., 'units': 'dimensionless', 'domain': 'atmosphere'}, 'mole_fraction_of_carbon_tetrachloride_in_air': {'value': 0., 'units': 'dimensionless', 'domain': 'atmosphere'}, 'cloud_area_fraction_in_atmosphere_layer': {'value': 0., 'units': 'dimensionless', 'domain': 'atmosphere'}, 'mass_content_of_cloud_ice_in_atmosphere_layer': {'value': 0., 'units': 'kg m^-2', 'domain': 'atmosphere'}, 'mass_content_of_cloud_liquid_water_in_atmosphere_layer': {'value': 0., 'units': 'kg m^-2', 'domain': 'atmosphere'}, 'cloud_ice_particle_size': {'value': 20., 'units': 'micrometer', 'domain': 'atmosphere'}, 'cloud_water_droplet_radius': {'value': 10., 'units': 'micrometer', 'domain': 'atmosphere'}, 'cloud_base_mass_flux': {'value': 0., 'units': 'kg m^-2 s^-1', 'domain': 'atmosphere_horizontal'}, 'zenith_angle': {'value': 0., 'units': 'radians', 'domain': 'atmosphere_horizontal'}, 'downwelling_shortwave_flux_in_air': {'value': 0., 'units': 'W m^-2', 'domain': 'atmosphere_interface'}, 'downwelling_longwave_flux_in_air': {'value': 0., 'units': 'W m^-2', 'domain': 'atmosphere_interface'}, 'upwelling_shortwave_flux_in_air': {'value': 0., 'units': 'W m^-2', 'domain': 'atmosphere_interface'}, 'upwelling_longwave_flux_in_air': {'value': 0., 'units': 'W m^-2', 'domain': 'atmosphere_interface'}, 'surface_specific_humidity': {'value': 0., 'units': 'kg/kg', 'domain': 'surface'}, 'surface_temperature': {'value': 300., 'units': 'degK', 'domain': 'surface'}, 'soil_surface_temperature': {'value': 300., 'units': 'degK', 'domain': 'surface'}, 'surface_geopotential': {'value': 0., 'units': 'm^2 s^-2', 'domain': 'surface'}, 'surface_thermal_capacity': {'value': 4.1813e3, 'units': 'J kg^-1 degK^-1', 'domain': 'surface'}, 'depth_of_slab_surface': {'value': 50., 'units': 'm', 'domain': 'surface'}, 'surface_material_density': {'value': 1000., 'units': 'kg m^-3', 'domain': 'surface'}, 'surface_albedo_for_direct_shortwave': {'value': 0.06, 'units': 'dimensionless', 'domain': 'surface'}, 'surface_albedo_for_diffuse_shortwave': {'value': 0.06, 'units': 'dimensionless', 'domain': 'surface'}, 'surface_albedo_for_direct_near_infrared': {'value': 0.06, 'units': 'dimensionless', 'domain': 'surface'}, 'surface_albedo_for_diffuse_near_infrared': {'value': 0.06, 'units': 'dimensionless', 'domain': 'surface'}, 'surface_roughness_length': {'value': 0.0002, 'units': 'dimensionless', 'domain': 'surface'}, 'surface_drag_coefficient_for_heat_in_air': {'value': 0.0012, 'units': 'dimensionless', 'domain': 'surface'}, 'surface_drag_coefficient_for_momentum_in_air': {'value': 0.0012, 'units': 'dimensionless', 'domain': 'surface'}, 'area_type': {'value': 'sea', 'units': 'dimensionless', 'dtype': 'a100', 'domain': 'surface'}, 'surface_upward_sensible_heat_flux': {'value': 0., 'units': 'W m^-2', 'domain': 'surface'}, 'surface_upward_latent_heat_flux': {'value': 0., 'units': 'W m^-2', 'domain': 'surface'}, 'soil_type': {'value': 'clay', 'units': 'dimensionless', 'dtype': 'a100', 'domain': 'land_horizontal'}, 'soil_temperature': {'value': 274., 'units': 'degK', 'domain': 'land_horizontal'}, 'soil_layer_thickness': {'value': 50., 'units': 'm', 'domain': 'land_horizontal'}, 'upward_heat_flux_at_ground_level_in_soil': {'value': 0., 'units': 'W m^-2', 'domain': 'land_horizontal'}, 'heat_capacity_of_soil': {'value': 2000., 'units': 'J kg^-1 degK^-1', 'domain': 'land_horizontal'}, 'sea_water_density': {'value': 1.029e3, 'units': 'kg m^-3', 'domain': 'ocean_horizontal'}, 'sea_surface_temperature': {'value': 300., 'units': 'degK', 'domain': 'ocean_horizontal'}, 'ocean_mixed_layer_thickness': {'value': 50., 'units': 'm', 'domain': 'ocean_horizontal'}, 'snow_and_ice_temperature': {'value': 270., 'units': 'degK', 'domain': 'ice_interface'}, 'heat_flux_into_sea_water_due_to_sea_ice': {'value': 0., 'units': 'W m^-2', 'domain': 'ice_horizontal'}, 'land_ice_thickness': {'value': 0., 'units': 'm', 'domain': 'ice_horizontal'}, 'sea_ice_thickness': {'value': 0., 'units': 'm', 'domain': 'ice_horizontal'}, 'surface_snow_thickness': {'value': 0., 'units': 'm', 'domain': 'ice_horizontal'}, 'solar_cycle_fraction': {'value': 0., 'units': 'dimensionless', 'domain': None}, 'flux_adjustment_for_earth_sun_distance': {'value': 1.0, 'units': 'dimensionless', 'domain': None}, 'lwe_thickness_of_soil_moisture_content': {'value': 0, 'units': 'm', 'domain': 'surface'}, 'convective_precipitation_rate': {'value': 0., 'units': 'mm day^-1', 'domain': 'surface'}, 'stratiform_precipitation_rate': {'value': 0., 'units': 'm s^-1', 'domain': 'surface'}, } for d in default_values.values(): if 'grid' not in d.keys(): d['grid'] = 'air_pressure' def aggregate_input_properties(component_list): """ Takes in a list of objects with an input_properties dictionary as an attribute, and returns an input_properties dictionary that satisfies all of those components. """ return combine_component_properties(component_list, 'input_properties') def get_init_diagnostic(name, grid_state): """ Takes in a quantity name. Returns a DiagnosticComponent object which calculates that quantity from a grid state. """ # First, check if the quantity is in the default_values dict, and return # a constructed ConstantDefaultValue DiagnosticComponent if it is. if name in default_values: return ConstantDefaultValue( name, default_values[name]['value'], default_values[name]['units'], dtype=default_values[name].get('dtype', None), domain=default_values[name]['domain'], ) elif name[-20:] == '_on_interface_levels' and name[:-20] in default_values: return ConstantDefaultValue( name, default_values[name[:-20]]['value'], default_values[name[:-20]]['units'], dtype=default_values[name[:-20]].get('dtype', None), domain=default_values[name[:-20]]['domain'] + '_interface', ) # If it isn't, check if there is a diagnostic defined in some library of DiagnosticComponent # classes (probably a list stored here) that can calculate the quantity, # and return that one. else: for diag in init_diagnostics: if name in diag.diagnostic_properties: return diag # If it still isn't, raise an exception because we can't calculate it. raise NotImplementedError('No initialization method for quantity name {}'.format(name)) def get_diagnostics_for(input_properties, grid_state): diagnostic_list = [] for name in input_properties.keys(): if name not in grid_state.keys(): diagnostic_list.append(get_init_diagnostic(name, grid_state)) return diagnostic_list def compute_all_diagnostics(state, diagnostic_list): return_dict = {} for diagnostic in diagnostic_list: return_dict.update(diagnostic(state)) return return_dict
[docs]def get_default_state( component_list, grid_state=None, n_ice_interface_levels=30): """ Retrieves a reasonable initial state for the set of components given. Args: component_list (list): Components for which to retrieve an initial state. grid_state (dict, optional): An initial state containing grid quantities. If none is given, a default will be created. n_ice_interface_levels (int, optional): Number of vertical interface levels to use for ice. Use None to disable the ice vertical grid. Returns: default_state (dict): A reasonable initial state. """ grid_state = grid_state or get_grid( n_ice_interface_levels=n_ice_interface_levels) input_properties = aggregate_input_properties(component_list) diagnostic_list = get_diagnostics_for(input_properties, grid_state) return_state = {} return_state.update(grid_state) return_state.update(compute_all_diagnostics(grid_state, diagnostic_list)) # return_state = broadcast_dims_to_match_component_properties(return_state, component_list) return return_state
def init_ozone(p, ps): p_ref = 1e5*np.linspace(0.998, 0.001, 30) ozone_ref = np.load( pkg_resources.resource_filename('climt._data', 'ozone_profile.npy') ) spline = CubicSpline(p_ref[::-1], ozone_ref[::-1]) # x must be increasing return spline(p) init_diagnostics = [ PressureFunctionDiagnosticComponent( 'longwave_optical_depth', lambda p, ps: 1. * (1. - p/ps), 'dimensionless', 'interface', ), PressureFunctionDiagnosticComponent( 'mole_fraction_of_ozone_in_air', init_ozone, 'mole/mole', 'mid' ), RRTMGShortwaveDefaultValues(), RRTMGLongwaveDefaultValues(), ] for diag in init_diagnostics: output_overlap = set(diag.diagnostic_properties.keys()).intersection( default_values.keys()) if len(output_overlap) > 0: raise AssertionError( 'Initialization diagnostic {} outputs quantity(ies) {} for which a ' 'default value is also present. The default value should probably ' 'be removed.'.format(diag.__class__.__name__, output_overlap)) for diag2 in init_diagnostics: output_overlap = set(diag.diagnostic_properties.keys()).intersection( diag2.diagnostic_properties.keys()) if diag2 is diag: pass elif len(output_overlap) > 0: raise AssertionError( 'Initialization diagnostics {} and {} both outputs quantity(ies)' ' {}. One of these should probably ' 'be removed.'.format( diag.__class__.__name__, diag2.__class__.__name__, output_overlap ) )