latest_vs_15811b8

Created Diff never expires
62 removals
692 lines
56 additions
686 lines
"""This module stores the Dataset class, which is the primary class for I/O.
"""This module stores the Dataset class, which is the primary class for I/O.


NOTE: Replaces `e3sm_diags.driver.utils.dataset`.
NOTE: Replaces `e3sm_diags.driver.utils.dataset`.


This Dataset class operates on `xr.Dataset` objects, which are created using
This Dataset class operates on `xr.Dataset` objects, which are created using
netCDF files. These `xr.Dataset` contain either the reference or test variable.
netCDF files. These `xr.Dataset` contain either the reference or test variable.
This variable can either be from a climatology file or a time series file.
This variable can either be from a climatology file or a time series file.
If the variable is from a time series file, the climatology of the variable is
If the variable is from a time series file, the climatology of the variable is
calculated. Reference and test variables can also be derived using other
calculated. Reference and test variables can also be derived using other
variables from dataset files.
variables from dataset files.
"""
"""
from __future__ import annotations
from __future__ import annotations


import collections
import collections
import fnmatch
import fnmatch
import glob
import glob
import os
import os
import re
import re
from datetime import datetime
from typing import TYPE_CHECKING, Callable, Dict, Literal, Tuple
from typing import TYPE_CHECKING, Callable, Dict, Literal, Tuple


import pandas as pd
import xarray as xr
import xarray as xr
import xcdat as xc
import xcdat as xc


from e3sm_diags.derivations.derivations import (
from e3sm_diags.derivations.derivations import (
DERIVED_VARIABLES,
DERIVED_VARIABLES,
FUNC_NEEDS_TARGET_VAR,
FUNC_NEEDS_TARGET_VAR,
DerivedVariableMap,
DerivedVariableMap,
DerivedVariablesMap,
DerivedVariablesMap,
)
)
from e3sm_diags.driver import LAND_FRAC_KEY, LAND_OCEAN_MASK_PATH, OCEAN_FRAC_KEY
from e3sm_diags.driver import LAND_FRAC_KEY, LAND_OCEAN_MASK_PATH, OCEAN_FRAC_KEY
from e3sm_diags.driver.utils.climo_xr import CLIMO_FREQS, ClimoFreq, climo
from e3sm_diags.driver.utils.climo_xr import CLIMO_FREQ, CLIMO_FREQS, climo
from e3sm_diags.logger import custom_logger
from e3sm_diags.logger import custom_logger


if TYPE_CHECKING:
if TYPE_CHECKING:
from e3sm_diags.parameter.core_parameter import CoreParameter
from e3sm_diags.parameter.core_parameter import CoreParameter




logger = custom_logger(__name__)
logger = custom_logger(__name__)


# A constant variable that defines the pattern for time series filenames.
# A constant variable that defines the pattern for time series filenames.
# Example: "ts_global_200001_200112.nc" (<VAR>_<SITE>_<TS_EXT_FILEPATTERN>)
# Example: "ts_global_200001_200112.nc" (<VAR>_<SITE>_<TS_EXT_FILEPATTERN>)
TS_EXT_FILEPATTERN = r"_.{13}.nc"
TS_EXT_FILEPATTERN = r"_.{13}.nc"




def squeeze_time_dim(ds: xr.Dataset) -> xr.Dataset:
"""Squeeze single coordinate climatology time dimensions.

For example, "ANN" averages over the year and collapses the time dim.
Time bounds are also dropped if they exist.

Parameters
----------
ds : xr.Dataset
The dataset with a time dimension

Returns
-------
xr.Dataset
The dataset with a time dimension.
"""
time_dim = xc.get_dim_coords(ds, axis="T")

if len(time_dim) == 1:
ds = ds.squeeze(dim=time_dim.name)
ds = ds.drop_vars(time_dim.name)

bnds_key = time_dim.attrs.get("bounds")
if bnds_key is not None and bnds_key in ds.data_vars.keys():
ds = ds.drop_vars(bnds_key)

return ds


class Dataset:
class Dataset:
def __init__(
def __init__(
self,
self,
parameter: CoreParameter,
parameter: CoreParameter,
data_type: Literal["ref", "test"],
data_type: Literal["ref", "test"],
):
):
# The CoreParameter object with a list of parameters.
# The CoreParameter object with a list of parameters.
self.parameter = parameter
self.parameter = parameter


# The type of data for the Dataset object to store.
# The type of data for the Dataset object to store.
self.data_type = data_type
self.data_type = data_type


# The path, start year, and end year based on the dataset type.
# The path, start year, and end year based on the dataset type.
if self.data_type == "ref":
if self.data_type == "ref":
self.root_path = self.parameter.reference_data_path
self.root_path = self.parameter.reference_data_path
elif self.data_type == "test":
elif self.data_type == "test":
self.root_path = self.parameter.test_data_path
self.root_path = self.parameter.test_data_path
else:
else:
raise ValueError(
raise ValueError(
f"The `type` ({self.data_type}) for this Dataset object is invalid."
f"The `type` ({self.data_type}) for this Dataset object is invalid."
"Valid options include 'ref' or 'test'."
"Valid options include 'ref' or 'test'."
)
)


# If the underlying data is a time series, set the `start_yr` and
# If the underlying data is a time series, set the `start_yr` and
# `end_yr` attrs based on the data type (ref or test). Note, these attrs
# `end_yr` attrs based on the data type (ref or test). Note, these attrs
# are different for the `area_mean_time_series` parameter.
# are different for the `area_mean_time_series` parameter.
if self.is_time_series:
if self.is_time_series:
# FIXME: This conditional should not assume the first set is
# FIXME: This conditional should not assume the first set is
# area_mean_time_series. If area_mean_time_series is at another
# area_mean_time_series. If area_mean_time_series is at another
# index, this conditional is not False.
# index, this conditional is not False.
if self.parameter.sets[0] in ["area_mean_time_series"]:
if self.parameter.sets[0] in ["area_mean_time_series"]:
self.start_yr = self.parameter.start_yr # type: ignore
self.start_yr = self.parameter.start_yr # type: ignore
self.end_yr = self.parameter.end_yr # type: ignore
self.end_yr = self.parameter.end_yr # type: ignore
elif self.data_type == "ref":
elif self.data_type == "ref":
self.start_yr = self.parameter.ref_start_yr # type: ignore
self.start_yr = self.parameter.ref_start_yr # type: ignore
self.end_yr = self.parameter.ref_end_yr # type: ignore
self.end_yr = self.parameter.ref_end_yr # type: ignore
elif self.data_type == "test":
elif self.data_type == "test":
self.start_yr = self.parameter.test_start_yr # type: ignore
self.start_yr = self.parameter.test_start_yr # type: ignore
self.end_yr = self.parameter.test_end_yr # type: ignore
self.end_yr = self.parameter.test_end_yr # type: ignore


# The derived variables defined in E3SM Diags. If the `CoreParameter`
# The derived variables defined in E3SM Diags. If the `CoreParameter`
# object contains additional user derived variables, they are added
# object contains additional user derived variables, they are added
# to `self.derived_vars`.
# to `self.derived_vars`.
self.derived_vars_map = self._get_derived_vars_map()
self.derived_vars_map = self._get_derived_vars_map()


# Whether the data is sub-monthly or not.
# Whether the data is sub-monthly or not.
self.is_sub_monthly = False
self.is_sub_monthly = False
if self.parameter.sets[0] in ["diurnal_cycle", "arm_diags"]:
if self.parameter.sets[0] in ["diurnal_cycle", "arm_diags"]:
self.is_sub_monthly = True
self.is_sub_monthly = True


# A boolean to keep track of whether the source variable(s) for a
# target variable contains a wildcard ("?"). This is used to determine
# whether to pass a list of wildcard variables as args to derived
# variable function (True), or to unpack an expected number of variables
# as args to a derived variable function (False).
self.is_src_vars_wildcard = False

@property
@property
def is_time_series(self):
def is_time_series(self):
if self.parameter.ref_timeseries_input or self.parameter.test_timeseries_input:
if self.parameter.ref_timeseries_input or self.parameter.test_timeseries_input:
return True
return True
else:
else:
return False
return False


@property
@property
def is_climo(self):
def is_climo(self):
return not self.is_time_series
return not self.is_time_series


def _get_derived_vars_map(self) -> DerivedVariablesMap:
def _get_derived_vars_map(self) -> DerivedVariablesMap:
"""Get the defined derived variables.
"""Get the defined derived variables.


If the user-defined derived variables are in the input parameters,
If the user-defined derived variables are in the input parameters,
append parameters.derived_variables to the correct part of the derived
append parameters.derived_variables to the correct part of the derived
variables dictionary.
variables dictionary.


Returns
Returns
-------
-------
DerivedVariablesMap
DerivedVariablesMap
A dictionary mapping the key of a derived variable to an ordered
A dictionary mapping the key of a derived variable to an ordered
dictionary that maps a tuple of source variable(s) to a derivation
dictionary that maps a tuple of source variable(s) to a derivation
function.
function.
"""
"""
dvars: DerivedVariablesMap = DERIVED_VARIABLES.copy()
dvars: DerivedVariablesMap = DERIVED_VARIABLES.copy()
user_dvars: DerivedVariablesMap = getattr(self.parameter, "derived_variables")
user_dvars: DerivedVariablesMap = getattr(self.parameter, "derived_variables")


# If the user-defined derived vars already exist, create a
# If the user-defined derived vars already exist, create a
# new OrderedDict that combines the user-defined entries with the
# new OrderedDict that combines the user-defined entries with the
# existing ones in `e3sm_diags`. The user-defined entry should
# existing ones in `e3sm_diags`. The user-defined entry should
# be the highest priority and must be first in the OrderedDict.
# be the highest priority and must be first in the OrderedDict.
if user_dvars is not None:
if user_dvars is not None:
for key, ordered_dict in user_dvars.items():
for key, ordered_dict in user_dvars.items():
if key in dvars.keys():
if key in dvars.keys():
dvars[key] = collections.OrderedDict(**ordered_dict, **dvars[key])
dvars[key] = collections.OrderedDict(**ordered_dict, **dvars[key])
else:
else:
dvars[key] = ordered_dict
dvars[key] = ordered_dict


return dvars
return dvars


# Attribute related methods
# Attribute related methods
# --------------------------------------------------------------------------
# --------------------------------------------------------------------------
def get_name_yrs_attr(self, season: ClimoFreq | None = None) -> str:
def get_name_yrs_attr(self, season: CLIMO_FREQ | None = None) -> str:
"""Get the diagnostic name and 'yrs_averaged' attr as a single string.
"""Get the diagnostic name and 'yrs_averaged' attr as a single string.


This method is used to update either `parameter.test_name_yrs` or
This method is used to update either `parameter.test_name_yrs` or
`parameter.ref_name_yrs`, depending on `self.data_type`.
`parameter.ref_name_yrs`, depending on `self.data_type`.


If the dataset is contains a climatology, attempt to get "yrs_averaged"
If the dataset is contains a climatology, attempt to get "yrs_averaged"
from the global attributes of the netCDF file. If this attribute cannot
from the global attributes of the netCDF file. If this attribute cannot
be retrieved, only return the diagnostic name.
be retrieved, only return the diagnostic name.


Parameters
Parameters
----------
----------
season : CLIMO_FREQ | None, optional
season : CLIMO_FREQ | None, optional
The climatology frequency, by default None.
The climatology frequency, by default None.


Returns
Returns
-------
-------
str
str
The name and years average string.
The name and years average string.
Example: "historical_H1 (2000-2002)"
Example: "historical_H1 (2000-2002)"


Notes
Notes
-----
-----
Replaces `e3sm_diags.driver.utils.general.get_name_and_yrs`
Replaces `e3sm_diags.driver.utils.general.get_name_and_yrs`
"""
"""
if self.data_type == "test":
if self.data_type == "test":
diag_name = self._get_test_name()
diag_name = self._get_test_name()
elif self.data_type == "ref":
elif self.data_type == "ref":
diag_name = self._get_ref_name()
diag_name = self._get_ref_name()


if self.is_climo:
if self.is_climo:
if season is None:
if season is None:
raise ValueError(
raise ValueError(
"A `season` argument must be supplied for climatology datasets "
"A `season` argument must be supplied for climatology datasets "
"to try to get the global attribute 'yrs_averaged'."
"to try to get the global attribute 'yrs_averaged'."
)
)


yrs_averaged_attr = self._get_global_attr_from_climo_dataset(
yrs_averaged_attr = self._get_global_attr_from_climo_dataset(
"yrs_averaged", season
"yrs_averaged", season
)
)


if yrs_averaged_attr is None:
if yrs_averaged_attr is None:
return diag_name
return diag_name


elif self.is_time_series:
elif self.is_time_series:
yrs_averaged_attr = f"{self.start_yr}-{self.end_yr}"
yrs_averaged_attr = f"{self.start_yr}-{self.end_yr}"


return f"{diag_name} ({yrs_averaged_attr})"
return f"{diag_name} ({yrs_averaged_attr})"


def _get_test_name(self) -> str:
def _get_test_name(self) -> str:
"""Get the diagnostic test name.
"""Get the diagnostic test name.


Returns
Returns
-------
-------
str
str
The diagnostic test name.
The diagnostic test name.


Notes
Notes
-----
-----
Replaces `e3sm_diags.driver.utils.general.get_name`
Replaces `e3sm_diags.driver.utils.general.get_name`
"""
"""
if self.parameter.short_test_name != "":
if self.parameter.short_test_name != "":
return self.parameter.short_test_name
return self.parameter.short_test_name
elif self.parameter.test_name != "":
elif self.parameter.test_name != "":
return self.parameter.test_name
return self.parameter.test_name


raise AttributeError(
raise AttributeError(
"Either `parameter.short_test_name` or `parameter.test_name attributes` "
"Either `parameter.short_test_name` or `parameter.test_name attributes` "
"must be set to get the name and years attribute for test datasets."
"must be set to get the name and years attribute for test datasets."
)
)


def _get_ref_name(self) -> str:
def _get_ref_name(self) -> str:
"""Get the diagnostic reference name.
"""Get the diagnostic reference name.


Returns
Returns
-------
-------
str
str
The diagnostic reference name.
The diagnostic reference name.


Notes
Notes
-----
-----
Replaces `e3sm_diags.driver.utils.general.get_name`
Replaces `e3sm_diags.driver.utils.general.get_name`
"""
"""
if self.parameter.short_ref_name != "":
if self.parameter.short_ref_name != "":
return self.parameter.short_ref_name
return self.parameter.short_ref_name
elif self.parameter.reference_name != "":
elif self.parameter.reference_name != "":
return self.parameter.reference_name
return self.parameter.reference_name
elif self.parameter.ref_name != "":
elif self.parameter.ref_name != "":
return self.parameter.ref_name
return self.parameter.ref_name


raise AttributeError(
raise AttributeError(
"Either `parameter.short_ref_name`, `parameter.reference_name`, or "
"Either `parameter.short_ref_name`, `parameter.reference_name`, or "
"`parameter.ref_name` must be set to get the name and years attribute for "
"`parameter.ref_name` must be set to get the name and years attribute for "
"reference datasets."
"reference datasets."
)
)


return self.parameter.ref_name
return self.parameter.ref_name


def _get_global_attr_from_climo_dataset(
def _get_global_attr_from_climo_dataset(
self, attr: str, season: ClimoFreq
self, attr: str, season: CLIMO_FREQ
) -> str | None:
) -> str | None:
"""Get the global attribute from the climo file based on the season.
"""Get the global attribute from the climo file based on the season.


Parameters
Parameters
----------
----------
attr : str
attr : str
The attribute to get (e.g., "Convention").
The attribute to get (e.g., "Convention").
season : CLIMO_FREQ
season : CLIMO_FREQ
The climatology frequency.
The climatology frequency.


Returns
Returns
-------
-------
str | None
str | None
The attribute string if it exists, otherwise None.
The attribute string if it exists, otherwise None.
"""
"""
attr_val = None
attr_val = None


try:
try:
filepath = self._get_climo_filepath(season)
filepath = self._get_climo_filepath(season)
except OSError:
except OSError:
pass
pass
else:
else:
ds = self._open_climo_dataset(filepath)
ds = self._open_climo_dataset(filepath)
attr_val = ds.attrs.get(attr)
attr_val = ds.attrs.get(attr)


return attr_val
return attr_val


# --------------------------------------------------------------------------
# --------------------------------------------------------------------------
# Climatology related methods
# Climatology related methods
# --------------------------------------------------------------------------
# --------------------------------------------------------------------------
def get_ref_climo_dataset(
def get_ref_climo_dataset(
self, var_key: str, season: ClimoFreq, ds_test: xr.Dataset
self, var_key: str, season: CLIMO_FREQ, ds_test: xr.Dataset
):
):
"""Get the reference climatology dataset for the variable and season.
"""Get the reference climatology dataset for the variable and season.


If the reference climatatology does not exist or could not be found, it
If the reference climatatology does not exist or could not be found, it
will be considered a model-only run. For this case the test dataset
will be considered a model-only run. For this case the test dataset
is returned as a default value and subsequent metrics calculations will
is returned as a default value and subsequent metrics calculations will
only be performed on the original test dataset.
only be performed on the original test dataset.


Parameters
Parameters
----------
----------
var_key : str
var_key : str
The key of the variable.
The key of the variable.
season : CLIMO_FREQ
season : CLIMO_FREQ
The climatology frequency.
The climatology frequency.
ds_test : xr.Dataset
ds_test : xr.Dataset
The test dataset, which is returned if the reference climatology
The test dataset, which is returned if the reference climatology
does not exist or could not be found.
does not exist or could not be found.


Returns
Returns
-------
-------
xr.Dataset
xr.Dataset
The reference climatology if it exists or a copy of the test dataset
The reference climatology if it exists or a copy of the test dataset
if it does not exist.
if it does not exist.


Raises
Raises
------
------
RuntimeError
RuntimeError
If `self.data_type` is not "ref".
If `self.data_type` is not "ref".
"""
"""
# TODO: This logic was carried over from legacy implementation. It
# TODO: This logic was carried over from legacy implementation. It
# can probably be improved on by setting `ds_ref = None` and not
# can probably be improved on by setting `ds_ref = None` and not
# performing unnecessary operations on `ds_ref` for model-only runs,
# performing unnecessary operations on `ds_ref` for model-only runs,
# since it is the same as `ds_test`. In addition, returning ds_test
# since it is the same as `ds_test`. In addition, returning ds_test
# makes it difficult for debugging.
# makes it difficult for debugging.
if self.data_type == "ref":
if self.data_type == "ref":
try:
try:
ds_ref = self.get_climo_dataset(var_key, season)
ds_ref = self.get_climo_dataset(var_key, season)
self.model_only = False
self.model_only = False
except (RuntimeError, IOError):
except (RuntimeError, IOError):
ds_ref = ds_test.copy()
ds_ref = ds_test.copy()
self.model_only = True
self.model_only = True


logger.info("Cannot process reference data, analyzing test data only.")
logger.info("Cannot process reference data, analyzing test data only.")
else:
else:
raise RuntimeError(
raise RuntimeError(
"`Dataset._get_ref_dataset` only works with "
"`Dataset._get_ref_dataset` only works with "
f"`self.data_type == 'ref'`, not {self.data_type}."
f"`self.data_type == 'ref'`, not {self.data_type}."
)
)


return ds_ref
return ds_ref


def get_climo_dataset(self, var: str, season: ClimoFreq) -> xr.Dataset:
def get_climo_dataset(self, var: str, season: CLIMO_FREQ) -> xr.Dataset:
"""Get the dataset containing the climatology variable.
"""Get the dataset containing the climatology variable.


These variables can either be from the test data or reference data.
These variables can either be from the test data or reference data.
If the variable is already a climatology variable, then get it directly
If the variable is already a climatology variable, then get it directly
from the dataset. If the variable is a time series variable, get the
from the dataset. If the variable is a time series variable, get the
variable from the dataset and compute the climatology based on the
variable from the dataset and compute the climatology based on the
selected frequency.
selected frequency.


Parameters
Parameters
----------
----------
var : str
var : str
The key of the climatology or time series variable to get the
The key of the climatology or time series variable to get the
dataset for.
dataset for.
season : CLIMO_FREQ, optional
season : CLIMO_FREQ, optional
The season for the climatology.
The season for the climatology.


Returns
Returns
-------
-------
xr.Dataset
xr.Dataset
The dataset containing the climatology variable.
The dataset containing the climatology variable.


Raises
Raises
------
------
ValueError
ValueError
If the specified variable is not a valid string.
If the specified variable is not a valid string.
ValueError
ValueError
If the specified season is not a valid string.
If the specified season is not a valid string.
ValueError
ValueError
If unable to determine if the variable is a reference or test
If unable to determine if the variable is a reference or test
variable and where to find the variable (climatology or time series
variable and where to find the variable (climatology or time series
file).
file).
"""
"""
self.var = var
self.var = var


if not isinstance(self.var, str) or self.var == "":
if not isinstance(self.var, str) or self.var == "":
raise ValueError("The `var` argument is not a valid string.")
raise ValueError("The `var` argument is not a valid string.")
if not isinstance(season, str) or season not in CLIMO_FREQS:
if not isinstance(season, str) or season not in CLIMO_FREQS:
raise ValueError(
raise ValueError(
"The `season` argument is not a valid string. Options include: "
"The `season` argument is not a valid string. Options include: "
f"{CLIMO_FREQS}"
f"{CLIMO_FREQS}"
)
)


if self.is_climo:
if self.is_climo:
ds = self._get_climo_dataset(season)
ds = self._get_climo_dataset(season)
elif self.is_time_series:
elif self.is_time_series:
ds = self.get_time_series_dataset(var)
ds = self.get_time_series_dataset(var)
ds[self.var] = climo(ds, self.var, season)
ds[self.var] = climo(ds, self.var, season)


return ds
return ds


def _get_climo_dataset(self, season: str) -> xr.Dataset:
def _get_climo_dataset(self, season: str) -> xr.Dataset:
"""Get the climatology dataset for the variable and season.
"""Get the climatology dataset for the variable and season.


Parameters
Parameters
----------
----------
season : str
season : str
The season for the climatology.
The season for the climatology.


Returns
Returns
-------
-------
xr.Dataset
xr.Dataset
The climatology dataset.
The climatology dataset.


Raises
Raises
------
------
IOError
IOError
If the variable was not found in the dataset or able to be derived
If the variable was not found in the dataset or able to be derived
using other datasets.
using other datasets.
"""
"""
filepath = self._get_climo_filepath(season)
filepath = self._get_climo_filepath(season)
ds = self._open_climo_dataset(filepath)
ds = self._open_climo_dataset(filepath)


if self.var in self.derived_vars_map:
if self.var in self.derived_vars_map:
ds = self._get_dataset_with_derived_climo_var(ds)
ds = self._get_dataset_with_derived_climo_var(ds)
elif self.var in ds.data_vars.keys():
elif self.var in ds.data_vars.keys():
pass
pass
else:
else:
raise IOError(
raise IOError(
f"Variable '{self.var}' was not in the file '{filepath}', nor was "
f"Variable '{self.var}' was not in the file '{filepath}', nor was "
"it defined in the derived variables dictionary."
"it defined in the derived variables dictionary."
)
)


ds = squeeze_time_dim(ds)
ds = self._squeeze_time_dim(ds)


# slat and slon are lat lon pair for staggered FV grid included in
# slat and slon are lat lon pair for staggered FV grid included in
# remapped files.
# remapped files.
if "slat" in ds.dims:
if "slat" in ds.dims:
ds = ds.drop_dims(["slat", "slon"])
ds = ds.drop_dims(["slat", "slon"])


all_vars = list(ds.data_vars.keys())
all_vars = list(ds.data_vars.keys())
keep_bnds = [var for var in all_vars if "bnd" or "bound" in var]
keep_bnds = [var for var in all_vars if "bnd" in var]
# ds = ds[[self.var, 'lat_bnds', 'lon_bnds']]
ds = ds[[self.var] + keep_bnds]
ds = ds[[self.var] + keep_bnds]


# NOTE: There seems to be an issue with `open_mfdataset()` and
# NOTE: There seems to be an issue with `open_mfdataset()` and
# using the multiprocessing scheduler defined in e3sm_diags,
# using the multiprocessing scheduler defined in e3sm_diags,
# resulting in timeouts and resource locking.
# resulting in timeouts and resource locking.
# To avoid this, we load the multi-file dataset into memory before
# To avoid this, we load the multi-file dataset into memory before
# performing downstream operations.
# performing downstream operations.
# Related GH issue: https://github.com/pydata/xarray/issues/3781
# Related GH issue: https://github.com/pydata/xarray/issues/3781
ds.load(scheduler="sync")
ds.load(scheduler="sync")


return ds
return ds


def _open_climo_dataset(self, filepath: str) -> xr.Dataset:
def _open_climo_dataset(self, filepath: str) -> xr.Dataset:
"""Open a climatology dataset.
"""Open a climatology dataset.


Some climatology files have "time" as a scalar variable. If the scalar
Some climatology files have "time" as a scalar variable. If the scalar
variable is a single integer instead of a 1D array with a length
variable is a single integer instead of a 1D array with a length
matching the equivalent dimension size, Xarray will `raise ValueError:
matching the equivalent dimension size, Xarray will `raise ValueError:
dimension 'time' already exists as a scalar variable`. For this case,
dimension 'time' already exists as a scalar variable`. For this case,
the "time" scalar variable is dropped when opening the dataset.
the "time" scalar variable is dropped when opening the dataset.


If the scalar variable is dropped or climatology file only has a
If the scalar variable is dropped or climatology file only has a
"time" dimension without coordinates, new "time" coordinates will be
"time" dimension without coordinates, new "time" coordinates will be
added to the dataset.
added to the dataset.


Related issue: https://github.com/pydata/xarray/issues/1709
Related issue: https://github.com/pydata/xarray/issues/1709


Parameters
Parameters
----------
----------
filepath : str
filepath : str
The path to a climatology dataset.
The path to a climatology dataset.


Returns
Returns
-------
-------
xr.Dataset
xr.Dataset
The climatology dataset.
The climatology dataset.


Raises
Raises
------
------
ValueError
ValueError
Raised for all ValueErrors other than "dimension 'time' already
Raised for all ValueErrors other than "dimension 'time' already
exists as a scalar variable".
exists as a scalar variable".
"""
"""
# Time coordinates are decoded because there might be cases where
# No need to decode times because the climatology is already calculated.
# a multi-file climatology dataset has different units between files
# Times only need to be decoded if climatology is being calculated
# but raw encoded time values overlap. Decoding with Xarray allows
# (time series datasets).
# concatenation of datasets with this issue (e.g., `area_cycle_zonal_mean`
# set with the MERRA2_Aerosols climatology datasets).
# NOTE: This GitHub issue explains why the "coords" and "compat" args
# NOTE: This GitHub issue explains why the "coords" and "compat" args
# are defined as they are below: https://github.com/xCDAT/xcdat/issues/641
# are defined as they are below: https://github.com/xCDAT/xcdat/issues/641
args = {
args = {
"paths": filepath,
"paths": filepath,
"decode_times": True,
"decode_times": False,
"add_bounds": ["X", "Y"],
"add_bounds": ["X", "Y"],
"coords": "minimal",
"coords": "minimal",
"compat": "override",
"compat": "override",
"chunks": "auto",
}
}
time_coords = xr.DataArray(
name="time",
dims=["time"],
data=[0],
attrs={"axis": "T", "standard_name": "time"},
)


try:
try:
ds = xc.open_mfdataset(**args)
ds = xc.open_mfdataset(**args)
except ValueError as e: # pragma: no cover
except ValueError as e: # pragma: no cover
# FIXME: Need to fix the test that covers this code block.
# FIXME: Need to fix the test that covers this code block.
msg = str(e)
msg = str(e)


if "dimension 'time' already exists as a scalar variable" in msg:
if "dimension 'time' already exists as a scalar variable" in msg:
ds = xc.open_mfdataset(**args, drop_variables=["time"])
ds = xc.open_mfdataset(**args, drop_variables=["time"])
else:
else:
raise ValueError(msg)
raise ValueError(msg)


if "time" not in ds.coords:
if "time" not in ds.coords:
ds["time"] = xr.DataArray(
ds["time"] = time_coords
name="time",
dims=["time"],
data=[0],
attrs={"axis": "T", "standard_name": "time"},
)


return ds
return ds


def _get_climo_filepath(self, season: str) -> str:
def _get_climo_filepath(self, season: str) -> str:
"""Return the path to the climatology file.
"""Return the path to the climatology file.


There are three patterns for matching a file, with the first match
There are three patterns for matching a file, with the first match
being returned if any match is found:
being returned if any match is found:


1. Using the reference/test file parameters if they are set (`ref_file`,
1. Using the reference/test file parameters if they are set (`ref_file`,
`test_file`).
`test_file`).
- {reference_data_path}/{ref_file}
- {reference_data_path}/{ref_file}
- {test_data_path}/{test_file}
- {test_data_path}/{test_file}
2. Using the reference/test name and season.
2. Using the reference/test name and season.
- {reference_data_path}/{ref_name}_{season}.nc
- {reference_data_path}/{ref_name}_{season}.nc
- {test_data_path}/{test_name}_{season}.nc
- {test_data_path}/{test_name}_{season}.nc
3. Using the reference or test name as a nested directory with the same
3. Using the reference or test name as a nested directory with the same
name as the filename with a season.
name as the filename with a season.
- General match pattern:
- General match pattern:
- {reference_data_path}/{ref_name}/{ref_name}_{season}.nc
- {reference_data_path}/{ref_name}/{ref_name}_{season}.nc
- {test_data_path}/{test_name}/{test_name}_{season}.nc
- {test_data_path}/{test_name}/{test_name}_{season}.nc
- Patern for model-only data for season in "ANN" "DJF", "MAM", "JJA",
- Patern for model-only data for season in "ANN" "DJF", "MAM", "JJA",
or "SON":
or "SON":
- {reference_data_path}/{ref_name}/{ref_name}.*{season}.*.nc
- {reference_data_path}/{ref_name}/{ref_name}.*{season}.*.nc
- {test_data_path}/{test_name}/{test_name}.*{season}.*.nc
- {test_data_path}/{test_name}/{test_name}.*{season}.*.nc


Parameters
Parameters
----------
----------
season : str
season : str
The season for the climatology.
The season for the climatology.


Returns
Returns
-------
-------
str
str
The path to the climatology file.
The path to the climatology file.
"""
"""
# First pattern attempt.
# First pattern attempt.
filepath = self._get_climo_filepath_with_params()
filepath = self._get_climo_filepath_with_params()


# Second and third pattern attempts.
# Second and third pattern attempts.
if filepath is None:
if filepath is None:
if self.data_type == "ref":
if self.data_type == "ref":
filename = self.parameter.ref_name
filename = self.parameter.ref_name
elif self.data_type == "test":
elif self.data_type == "test":
filename = self.parameter.test_name
filename = self.parameter.test_name
if season == "ANNUALCYCLE":
if season == "ANNUALCYCLE":
filepath = self._find_climo_filepath(filename, "01")
filepath = self._find_climo_filepath(filename, "01")
# find the path for 12 monthly mean files
# find the path for 12 monthly mean files
if filepath:
if filepath:
filename_01 = filepath.split("/")[-1]
filename_01 = filepath.split("/")[-1]
filepath = filepath.replace(
filepath = filepath.replace(
# f"{filename_01}", f"{filename}_[0-1][0-9]_*_*climo.nc"
f"{filename_01}", f"{filename}_[0-1][0-9]_*_*climo.nc"
# AOD_550 dataset has pattern AOD_550_01_climo.nc, other dataset has e.g ERA5_ANN_197901_201912_climo.nc
f"{filename_01}",
f"{filename}_[0-1][0-9]_*climo.nc",
)
)
else:
else:
filepath = self._find_climo_filepath(filename, season)
filepath = self._find_climo_filepath(filename, season)


# If absolutely no filename was found, then raise an error.
# If absolutely no filename was found, then raise an error.
if filepath is None:
if filepath is None:
raise IOError(
raise IOError(
f"No file found for '{filename}' and '{season}' in {self.root_path}"
f"No file found for '{filename}' and '{season}' in {self.root_path}"
)
)


return filepath
return filepath


def _get_climo_filepath_with_params(self) -> str | None:
def _get_climo_filepath_with_params(self) -> str | None:
"""Get the climatology filepath using parameters.
"""Get the climatology filepath using parameters.


Returns
Returns
-------
-------
str | None
str | None
The filepath using the `ref_file` or `test_file` parameter if they
The filepath using the `ref_file` or `test_file` parameter if they
are set.
are set.
"""
"""
filepath = None
filepath = None


if self.data_type == "ref":
if self.data_type == "ref":
if self.parameter.ref_file != "":
if self.parameter.ref_file != "":
filepath = os.path.join(self.root_path, self.parameter.ref_file)
filepath = os.path.join(self.root_path, self.parameter.ref_file)


elif self.data_type == "test":
elif self.data_type == "test":
if hasattr(self.parameter, "test_file"):
if hasattr(self.parameter, "test_file"):
filepath = os.path.join(self.root_path, self.parameter.test_file)
filepath = os.path.join(self.root_path, self.parameter.test_file)


return filepath
return filepath


def _find_climo_filepath(self, filename: str, season: str) -> str | None:
def _find_climo_filepath(self, filename: str, season: str) -> str | None:
"""Find the climatology filepath for the variable.
"""Find the climatology filepath for the variable.


Parameters
Parameters
----------
----------
filename : str
filename : str
The filename for the climatology variable.
The filename for the climatology variable.
season : str
season : str
The season for climatology.
The season for climatology.


Returns
Returns
-------
-------
str | None
str | None
The filepath for the climatology variable.
The filepath for the climatology variable.
"""
"""
# First attempt: try to find the climatology file based on season.
# First attempt: try to find the climatology file based on season.
# Example: {path}/{filename}_{season}.nc
# Example: {path}/{filename}_{season}.nc
filepath = self._find_climo_filepath_with_season(
filepath = self._find_climo_filepath_with_season(
self.root_path, filename, season
self.root_path, filename, season
)
)


# Second attempt: try looking for the file nested in a folder, based on
# Second attempt: try looking for the file nested in a folder, based on
# the test_name.
# the test_name.
# Example: {path}/{filename}/{filename}_{season}.nc
# Example: {path}/{filename}/{filename}_{season}.nc
# data_path/some_file/some_file_ANN.nc
# data_path/some_file/some_file_ANN.nc
if filepath is None:
if filepath is None:
nested_root_path = os.path.join(self.root_path, filename)
nested_root_path = os.path.join(self.root_path, filename)


if os.path.exists(nested_root_path):
if os.path.exists(nested_root_path):
filepath = self._find_climo_filepath_with_season(
filepath = self._find_climo_filepath_with_season(
nested_root_path, filename, season
nested_root_path, filename, season
)
)


return filepath
return filepath


def _find_climo_filepath_with_season(
def _find_climo_filepath_with_season(
self, root_path: str, filename: str, season: str
self, root_path: str, filename: str, season: str
) -> str | None:
) -> str | None:
"""Find climatology filepath with a root path, filename, and season.
"""Find climatology filepath with a root path, filename, and season.


Parameters
Parameters
----------
----------
root_path : str
root_path : str
The root path containing `.nc` files. The `.nc` files can be nested
The root path containing `.nc` files. The `.nc` files can be nested
in sub-directories within the root path.
in sub-directories within the root path.
filename : str
filename : str
The filename for the climatology variable.
The filename for the climatology variable.
season : str
season : str
The season for climatology.
The season for climatology.


Returns
Returns
-------
-------
str | None
str | None
The climatology filepath based on season, if it exists.
The climatology filepath based on season, if it exists.
"""
"""
files_in_dir = sorted(os.listdir(root_path))
files_in_dir = sorted(os.listdir(root_path))


# If the filename is followed by _<SEASON>.
# If the filename is followed by _<SEASON>.
for file in files_in_dir:
for file in files_in_dir:
if file.startswith(filename + "_" + season):
if file.startswith(filename + "_" + season):
return os.path.join(root_path, file)
return os.path.join(root_path, file)


# For model only data, the <SEASON> string can by anywhere in the
# For model only data, the <SEASON> string can by anywhere in the
# filename if the season is in ["ANN", "DJF", "MAM", "JJA", "SON"].
# filename if the season is in ["ANN", "DJF", "MAM", "JJA", "SON"].
if season in ["ANN", "DJF", "MAM", "JJA", "SON"]:
if season in ["ANN", "DJF", "MAM", "JJA", "SON"]:
for file in files_in_dir:
for file in files_in_dir:
if file.startswith(filename) and season in file:
if file.startswith(filename) and season in file:
return os.path.join(root_path, file)
return os.path.join(root_path, file)


return None
return None


def _get_dataset_with_derived_climo_var(self, ds: xr.Dataset) -> xr.Dataset:
def _get_dataset_with_derived_climo_var(self, ds: xr.Dataset) -> xr.Dataset:
"""Get the dataset containing the derived variable (`self.var`).
"""Get the dataset containing the derived variable (`self.var`).


Parameters
Parameters
----------
----------
ds: xr.Dataset
ds: xr.Dataset
The climatology
The climatology dataset, whic should contain the source variables
for deriving the target variable.

Returns
-------
xr.Dataset
The dataset with the derived variable.

Raises
------
IOError
If the datasets for the target variable and source variables were
not found in the data directory, or the target variable cannot be
found directly in the dataset.
"""
# An OrderedDict mapping possible source variables to the function
# for deriving the variable of interest.
# Example: {('PRECC', 'PRECL'): func, ('pr',): func1, ...}
target_var = self.var
target_var_map = self.derived_vars_map[target_var]

# Get the first valid source variables and its derivation function.
# The source variables are checked to exist in the dataset object
# and the derivation function is used to derive the target variable.
# Example:
# For target variable "PRECT": {('PRECC', 'PRECL'): func}
matching_target_var_map = self._get_matching_climo_src_vars(ds, target_var_map)

if matching_target_var_map is not None:
# NOTE: Since there's only one set of vars, we get the first and only set
# of vars from the derived variable dictionary.
# 1. Get the derivation function.
derivation_func = list(matching_target_var_map.values())[0]

# 2. Get the derivation function arguments using source variab