Source code for joseki.accessor

"""Accessor module."""

from __future__ import annotations

import datetime
import logging
import typing as t

import numpy as np
import pint
import xarray as xr

from ._version import __version__
from .constants import AIR_MAIN_CONSTITUENTS_MOLAR_FRACTION, MM, K
from .profiles.schema import schema
from .profiles.util import molar_mass, utcnow
from .units import to_quantity, ureg

logger = logging.getLogger(__name__)


[docs] def molecular_mass(m: str) -> pint.Quantity: """Return the average molecular mass of a molecule. Args: m: Molecule formula. Returns: Average molecular mass. """ return MM[m] * ureg("dalton")
def _scaling_factor( initial_amount: pint.Quantity, target_amount: pint.Quantity ) -> float: """Compute scaling factor given initial and target amounts. Args: initial_amount: Initial amount. target_amount: Target amount. Raises: ValueError: when the initial amount has a zero magnitude and the target amount has a non-zero magnitude. Returns: Scaling factor """ if initial_amount.m == 0.0: if target_amount.m == 0.0: return 0.0 else: raise ValueError( f"Cannot compute scaling factor when initial amount has " f"magnitude of zero and target amount has a non-zero magnitude " f"(got {target_amount})." ) else: factor = (target_amount / initial_amount).m_as(ureg.dimensionless) return float(factor)
[docs] @xr.register_dataset_accessor("joseki") class JosekiAccessor: # pragma: no cover """Joseki accessor.""" def __init__(self, xarray_obj): self._obj = xarray_obj @property def molecules(self) -> t.List[str]: """Return list of molecules.""" return [c[2:] for c in self._obj.data_vars if c.startswith("x_")] @property def column_number_density(self) -> t.Dict[str, pint.Quantity]: r"""Compute column number density. Returns: A mapping of molecule and column number density. Notes: The column number density is given by: .. math:: N_{\mathrm{M}} = \int n_{\mathrm{M}}(z) \, \mathrm{d} z with .. math:: n_{\mathrm{M}}(z) = x_{\mathrm{M}}(z) \, n(z) where * :math:`z` is the altitude, * :math:`x_{\mathrm{M}}(z)` is the mole fraction of molecule M at altitude :math:`z`, * :math:`n(z)` is the air number density at altitude :math:`z`, * :math:`n_{\mathrm{M}}(z)` is the number density of molecule M at altitude :math:`z`. The integration is performed using the trapezoidal rule. """ ds = self._obj logger.debug("Computing column number density using the trapezoidal rule.") _column_number_density = {} for m in self.molecules: integral = (ds[f"x_{m}"] * ds.n).integrate( coord="z" ) # integrate using the trapezoidal rule units = " ".join([ds[var].attrs["units"] for var in [f"x_{m}", "n", "z"]]) _column_number_density[m] = ( integral.values * ureg.Unit(units) ).to_base_units() return _column_number_density @property def column_mass_density(self) -> t.Dict[str, pint.Quantity]: r"""Compute column mass density. Returns: A mapping of molecule and column mass density. Notes: The column mass density is given by: .. math:: \sigma_{\mathrm{M}} = N_{\mathrm{M}} \, m_{\mathrm{M}} where * :math:`N_{\mathrm{M}}` is the column number density of molecule M, * :math:`m_{\mathrm{M}}` is the molecular mass of molecule M. """ _column_number_density = self.column_number_density return { m: (molecular_mass(m) * _column_number_density[m]).to("kg/m^2") for m in self.molecules } @property def number_density_at_sea_level(self) -> t.Dict[str, pint.Quantity]: """Compute number density at sea level. Returns: A mapping of molecule and number density at sea level. """ ds = self._obj n = to_quantity(ds.n.isel(z=0)) return {m: (to_quantity(ds[f"x_{m}"].isel(z=0)) * n) for m in self.molecules} @property def mass_density_at_sea_level(self) -> t.Dict[str, pint.Quantity]: """Compute mass density at sea level. Returns: A mapping of molecule and mass density at sea level. """ _number_density_at_sea_level = self.number_density_at_sea_level return { m: (molecular_mass(m) * _number_density_at_sea_level[m]).to("kg/m^3") for m in self.molecules } @property def mole_fraction_at_sea_level(self) -> t.Dict[str, pint.Quantity]: """Compute mole fraction at sea level. Returns: A mapping of molecule and mole fraction at sea level. """ ds = self._obj return {m: to_quantity(ds[f"x_{m}"].isel(z=0)).item() for m in self.molecules} @property def mole_fraction(self) -> xr.DataArray: """Extract mole fraction and tabulate as a function of (m, z). Returns: Mole fraction. """ ds = self._obj molecules = self.molecules concatenated = xr.concat([ds[f"x_{m}"] for m in molecules], dim="m") concatenated["m"] = ("m", molecules, {"long_name": "molecule"}) concatenated.attrs.update( { "standard_name": "mole_fraction", "long_name": "mole fraction", "units": "dimensionless", } ) concatenated.name = "x" return concatenated @property def mass_fraction(self) -> xr.DataArray: """Extract mass fraction and tabulate as a function of (m, z). Returns: Mass fraction. """ x = self.mole_fraction m_air = self.air_molar_mass m = molar_mass(molecules=self.molecules) y = (x * m / m_air).rename("y") y.attrs.update( { "standard_name": "mass_fraction_in_air", "long_name": "mass fraction", "units": "kg * kg^-1", } ) return y @property def air_molar_mass(self) -> xr.DataArray: r""" Compute air molar mass as a function of altitude. Returns: Air molar mass. Notes: The air molar mass is given by: .. math:: M_{\mathrm{air}} = \frac{ \sum_{\mathrm{M}} x_{\mathrm{M}} \, m_{\mathrm{M}} }{ \sum_{\mathrm{M}} x_{\mathrm{M}} } where * :math:`x_{\mathrm{M}}` is the mole fraction of molecule M, * :math:`m_{\mathrm{M}}` is the molar mass of molecule M. To compute the air molar mass accurately, the mole fraction of molecular nitrogen (N2), molecular oxygen (O2), and argon (Ar) are required. If these are not present in the dataset, they are computed using the assumption that the mole fraction of these molecules are constant with altitude and set to the following values: * molecular nitrogen (N2): 0.78084 * molecular oxygen (O2): 0.209476 * argon (Ar): 0.00934 are independent of altitude. Since nothing guarantees that the mole fraction sum is equal to one, the air molar mass is computed as the sum of the mole fraction weighted molar mass divided by the sum of the mole fraction. """ ds = self._obj # for molar mass computation to be accurate, main air constituents # must be present in the dataset ds_copy = ds.copy(deep=True) for m in AIR_MAIN_CONSTITUENTS_MOLAR_FRACTION: if f"x_{m}" not in ds: value = AIR_MAIN_CONSTITUENTS_MOLAR_FRACTION[m] ds_copy[f"x_{m}"] = ("z", np.full_like(ds.n, value)) ds_copy[f"x_{m}"].attrs.update({"units": "dimensionless"}) # compute air molar mass x = ds_copy.joseki.mole_fraction molecules = x.m.values mm = xr.DataArray( data=np.array([MM[m] for m in molecules]), coords={"m": ("m", molecules)}, attrs={"units": "dimensionless"}, ) mm_average = (x * mm).sum(dim="m") / (x.sum(dim="m")) mm_average.attrs.update( { "long_name": "air molar mass", "units": "kg/mol", } ) return mm_average @property def air_number_density(self) -> xr.DataArray: ds = self._obj p = to_quantity(ds.p).m_as("pascal") t = to_quantity(ds.t).m_as("kelvin") k_b = K.m_as("joule / kelvin") n_air = p / (k_b * t) / ureg.m**3 result = xr.DataArray( n_air.m, coords={"z": ("z", ds.z.values, ds.z.attrs)}, attrs={ "standard_name": "air_number_density", "long_name": "air number density", "units": "m^-3", }, ) return result @property def number_density(self) -> xr.DataArray: """ Compute number densities for each species as a function of altitude and return them as a data array. """ ds = self._obj das = [] da_air = self.air_number_density n_air = to_quantity(da_air).m_as("m^-3") for m in self.molecules: da = xr.DataArray( ds[f"x_{m}"].values * n_air, coords={"z": ("z", ds.z.values, ds.z.attrs)}, attrs={ "standard_name": "number_density", "long_name": "number density", "units": "m^-3", }, ).expand_dims({"m": [m]}) das.append(da) result = xr.concat(das, dim="m") return result
[docs] def scaling_factors( self, target: t.MutableMapping[str, pint.Quantity | dict | xr.DataArray] ) -> t.MutableMapping[str, float]: """Compute scaling factor(s) to reach specific target amount(s). Args: target: Mapping of molecule and target amount. Raises: ValueError: If a target amount has dimensions that are not supported. Returns: Mapping of molecule and scaling factors. Notes: For each molecule in the ``target`` mapping, the target amount is interpreted, depending on its dimensions (indicated in square brackets), as: * a column number density [``length^-2``], * a column mass density [``mass * length^-2``], * a number density at sea level [``length^-3``], * a mass density at sea level [``mass * length^-3``], * a mole fraction at sea level [``dimensionless``] The scaling factor is then evaluated as the ratio of the target amount with the original amount, for each molecule. See Also: `rescale` """ compute_initial_amount = { "[length]^-2": self.column_number_density, "[mass] * [length]^-2": self.column_mass_density, "[length]^-3": self.number_density_at_sea_level, "[mass] * [length]^-3": self.mass_density_at_sea_level, "": self.mole_fraction_at_sea_level, } factors = {} for m, target_amount in target.items(): target_amount = to_quantity(target_amount) initial_amount = None for dim in compute_initial_amount.keys(): if target_amount.check(dim): initial_amount = compute_initial_amount[dim][m] if initial_amount is None: raise ValueError factors[m] = _scaling_factor( initial_amount=initial_amount, target_amount=target_amount ) return factors
[docs] def rescale( self, factors: t.MutableMapping[str, float], check_x_sum: bool = False ) -> xr.Dataset: """Rescale molecules concentration in atmospheric profile. Args: factors: A mapping of molecule and scaling factor. check_x_sum: if True, check that mole fraction sums are never larger than one. Raises: ValueError: if ``check_x_sum`` is ``True`` and the dataset is not valid. Returns: Rescaled dataset (new object). """ ds = self._obj # update mole fraction x_new = {} for m in factors: with xr.set_options(keep_attrs=True): x_new[f"x_{m}"] = ds[f"x_{m}"] * factors[m] ds = ds.assign(x_new) # validate rescaled dataset try: ds.joseki.validate(check_x_sum=check_x_sum) except ValueError as e: raise ValueError("Cannot rescale") from e # update history attribute now = utcnow() for m in factors.keys(): ds.attrs["history"] += ( f"\n{now} - rescaled {m}'s mole fraction using a scaling " f"factor of {factors[m]:.3f} - joseki, version {__version__}" ) return ds
[docs] def rescale_to( self, target: t.Mapping[str, pint.Quantity | dict | xr.DataArray], check_x_sum: bool = False, ) -> xr.Dataset: """ Rescale mole fractions to match target molecular total column densities. Args: target: Mapping of molecule and target total column density. Total column must be either a column number density [``length^-2``], a column mass density [``mass * length^-2``], a number density at sea level [``length^-3``], a mass density at sea level [``mass * length^-3``], a mole fraction at sea level [``dimensionless``]. check_x_sum: if True, check that mole fraction sums are never larger than one. Returns: Rescaled dataset (new object). """ return self.rescale( factors=self.scaling_factors(target=target), check_x_sum=check_x_sum, )
[docs] def drop_molecules(self, molecules: t.List[str]) -> xr.Dataset: """Drop molecules from dataset. Args: molecules: List of molecules to drop. Returns: Dataset with molecules dropped. """ ds = self._obj # update history attribute now = utcnow() ds.attrs["history"] += ( f"\n{now} - dropped mole fraction data for molecules " f"{', '.join(molecules)} - joseki, version {__version__}" ) return ds.drop_vars([f"x_{m}" for m in molecules])
[docs] def validate( self, check_x_sum: bool = False, ret_true_if_valid: bool = False ) -> bool: """Validate atmosphere thermophysical profile dataset schema. Returns: ``True`` if the dataset complies with the schema, else `False`. """ return schema.validate( ds=self._obj, check_x_sum=check_x_sum, ret_true_if_valid=ret_true_if_valid, )
@property def is_valid(self): """ Return ``True`` if the dataset complies with the schema, else `False`. """ try: self.validate(ret_true_if_valid=True) return True except ValueError: return False