# eof_calculations.py
import numpy as np
from .utils import calculate_anomaly, compute_weights, compute_rotated_eofs
import xarray as xr
from .preprocess_data import load_data, rename_dims_to_standard, adjust_longitude, regridding, adjust_latitude
[docs]
def calculate_global_mean_sst(data, lat_name='lat', lon_name='lon'):
"""
Calculate the global mean sea surface temperature (SST) anomaly.
Parameters:
data (xarray.DataArray): Input data array containing SST values.
lat_name (str, optional): Name of the latitude coordinate in the data array. Default is 'lat'.
lon_name (str, optional): Name of the longitude coordinate in the data array. Default is 'lon'.
Returns:
xarray.DataArray: The global mean SST anomaly.
"""
return calculate_anomaly(data).weighted(compute_weights(data)).mean(dim=[lat_name, lon_name])
[docs]
def global_sst_trend_and_enso(data=None, path=None, var=None, clim_start=None, clim_end=None, desired=None,
start_time=None, end_time=None, to_range='0_360', standardize=False,
normalize_pattern=True, normalize_index=False):
"""
Calculate global SST warming trend and ENSO patterns using EOF analysis.
This function computes the global SST warming trend, ENSO index, and their
corresponding variance fractions using the first two EOF modes of SST anomalies.
It first removes the annual cycle climatology and then applies EOF analysis.
Parameters:
----------
data : xarray.DataArray or xarray.Dataset, optional
Gridded SST data with dimensions (time, lat, lon). If not provided,
'path' and 'var' must be used to load the data.
path : str, optional
Directory path to the data. Required if 'data' is not provided.
var : str, optional
Name of the variable to load from the dataset.
clim_start : int, optional
Climatology start year.
clim_end : int, optional
Climatology end year (if both None, the entire period is used for climatology).
desired : list of str, optional
Desired outputs, which can include:
['sst_trend_pattern', 'sst_trend_timeseries', 'variance_fraction_trend',
'enso_pattern', 'enso_index', 'variance_fraction_enso'].
Default is to return all.
start_time : int, optional
Start year for loading data.
end_time : int, optional
End year for loading data.
to_range : str, optional, default '0_360'
Longitude range for data adjustment. Options are '0_360' or '-180_180'.
standardize : bool, optional, default False
Whether to normalize the data with its standard deviation before EOF calculation.
normalize_pattern : bool, optional, default True
Whether to normalize spatial components (patterns) with singular values.
normalize_index : bool, optional, default False
Whether to normalize time series (scores) with singular values.
Returns:
-------
List containing the desired outputs: SST trend pattern, trend timeseries,
variance fraction of the trend, ENSO pattern, ENSO index, and variance fraction of ENSO.
"""
if desired is None:
desired = [
'sst_trend_pattern', 'sst_trend_timeseries', 'variance_fraction_trend',
'enso_pattern', 'enso_index', 'variance_fraction_enso'
]
if data is not None:
data = adjust_latitude(adjust_longitude(rename_dims_to_standard(data.sel(time=slice(start_time, end_time))), to_range=to_range))
elif path and var:
data = adjust_longitude(
rename_dims_to_standard(
load_data(
path, var, start_time=start_time, end_time=end_time
)
),
to_range=to_range
)
else:
print("No valid data !!!")
data_anom = calculate_anomaly(data, clim_start=clim_start, clim_end=clim_end)
solver = compute_rotated_eofs(
data_anom, rotated=False, n_modes=2, standardize=standardize, use_coslat=True
)
eofs_ = solver.components(normalized=normalize_pattern)
pcs_ = solver.scores(normalized=normalize_index)
var_frac_ = solver.explained_variance_ratio()
result_dict = {
'sst_trend_pattern': eofs_[0].squeeze(),
'sst_trend_timeseries': (pcs_[0] / pcs_[0].std()).squeeze(),
'variance_fraction_trend': var_frac_[0].squeeze(),
'enso_pattern': eofs_[1].squeeze(),
'enso_index': (pcs_[1] / pcs_[1].std()).squeeze(),
'variance_fraction_enso': var_frac_[1].squeeze()
}
return_desired = [result_dict[key] for key in desired if key in result_dict]
return return_desired[0] if len(return_desired) == 1 else return_desired
[docs]
def compute_regional_eof_modes(data=None, path=None, var=None, clim_start=None, clim_end=None, desired=None,
start_time=None, end_time=None, lat_s=90, lat_e=-90, lon_s=0, lon_e=360,
to_range='0_360', n_modes=1, remove_trend=False, rotated=None,
use_coslat=True, standardize=False, normalize_pattern=True, normalize_index=False):
"""
Calculate regional EOF (Empirical Orthogonal Functions) modes from gridded SST data.
This generalized function computes EOFs for a specified regional domain, allowing for
trend removal, rotation, and different normalization options.
Parameters:
----------
data : xarray.DataArray or xarray.Dataset, optional
Gridded SST data with dimensions (time, lat, lon). Use 'path' and 'var'
instead if you want to load the data from a file.
path : str, optional
Directory path to the data. Required if 'data' is not provided.
var : str, optional
Name of the variable to load from the dataset.
clim_start : int, optional
Climatology start year.
clim_end : int, optional
Climatology end year (if both None, the entire period is used for climatology).
desired : list, optional
Desired outputs, which can be ['regional_patterns', 'regional_timeseries',
'variance_fractions_regional']. Default is all three.
start_time : int, optional
Start year for loading data.
end_time : int, optional
End year for loading data.
lat_s : float, optional
Start latitude for the regional selection. Default is 90.
lat_e : float, optional
End latitude for the regional selection. Default is -90.
lon_s : float, optional
Start longitude for the regional selection. Default is 0.
lon_e : float, optional
End longitude for the regional selection. Default is 360.
to_range : str, optional
Target longitude range. Default is '0_360'. Use '-180_180' if needed.
n_modes : int, optional
Number of EOF modes to compute. Default is 1.
remove_trend : bool, optional
Whether to remove the global trend before calculating EOFs. Default is False.
rotated : str, optional
Whether to apply Varimax or Promax rotation to the EOFs. Default is False.
use_coslat : bool, optional
Whether to use cosine latitude weighting. Default is True.
standardize : bool, optional
Whether to normalize the data with its standard deviation before EOF calculation. Default is False.
normalize_pattern : bool, optional
Whether to normalize spatial components (patterns) with singular values. Default is True.
normalize_index : bool, optional
Whether to normalize the time series (scores) with singular values. Default is False.
Returns:
-------
List containing the desired outputs: regional patterns, timeseries, and/or variance fractions.
"""
if desired is None:
desired = ['regional_patterns', 'regional_timeseries', 'variance_fractions_regional']
if data is not None:
if start_time!=None and end_time!=None:
data = adjust_latitude(adjust_longitude(rename_dims_to_standard(data.sel(time=slice(start_time, end_time))), to_range=to_range))
else:
data = adjust_latitude(adjust_longitude(rename_dims_to_standard(data), to_range=to_range))
elif path and var:
data = adjust_longitude(
rename_dims_to_standard(
load_data(
path, var, start_time=start_time, end_time=end_time
)
),
to_range=to_range
)
else:
print("No valid data !!!")
data_anom_1 = calculate_anomaly(data, clim_start=clim_start, clim_end=clim_end)
data_anom = data_anom_1.sel(lat=slice(lat_s, lat_e), lon=slice(lon_s, lon_e))
if remove_trend:
global_mean_sst = calculate_global_mean_sst(data_anom_1, lat_name='lat', lon_name='lon')
data_anom -= global_mean_sst
if rotated is not None:
n_modes = 10 ### High number of modes to redistribute variance while rotating
else:
n_modes = n_modes
solver = compute_rotated_eofs(
data_anom, rotated=rotated, n_modes=n_modes,
standardize=standardize, use_coslat=use_coslat
)
result_dict = {
'regional_patterns': solver.components(normalized=normalize_pattern).squeeze(),
'regional_timeseries': (solver.scores(normalized=normalize_index) /
solver.scores(normalized=normalize_index).std()).squeeze(),
'variance_fractions_regional': solver.explained_variance_ratio().squeeze()
}
return_desired = [result_dict[key] for key in desired if key in result_dict]
return return_desired[0] if len(return_desired) == 1 else return_desired
[docs]
def compute_pdo(data=None, path=None, var=None, clim_start=None, clim_end=None, desired=None,
start_time=None, end_time=None, to_range='0_360', standardize=False,
normalize_pattern=True, normalize_index=False, lat_s=70, lat_e=20, lon_s=110,
lon_e=260, remove_trend=False):
"""
Calculate the PDO (Pacific Decadal Oscillation) index and pattern.
This function calculates the conventional PDO index, pattern, and variance fraction
as the first EOF modes of SST anomaly in the North Pacific (above 20°N) after
removing the annual cycle climatology and global mean SST (to account for the warming trend).
Parameters:
----------
data : xarray.DataArray or xarray.Dataset, optional
Gridded SST data with dimensions (time, lat, lon). Use 'path' and 'var'
instead if you want to load the data from a file.
path : str, optional
Directory path to the data. Required if 'data' is not provided.
var : str, optional
Name of the variable to load from the dataset.
clim_start : int, optional
Climatology start year.
clim_end : int, optional
Climatology end year (if both None, whole period will be chosen for climatology).
desired : list, optional
Desired outputs, which can be ['pdo_pattern', 'pdo_index', 'variance_fraction_pdo'].
Default is all three.
start_time : int, optional
Start year for loading data.
end_time : int, optional
End year for loading data.
to_range : str, optional
Target longitude range. Default is '0_360'. Use '-180_180' if needed.
standardize : bool, optional
Whether to normalize the data with its standard deviation.
normalize_pattern : bool, optional
Whether to normalize spatial components with singular values. Default is True.
normalize_index : bool, optional
Whether to normalize the time series (scores) with singular values. Default is False.
remove_trend : bool, optional
Whether to remove the global trend. Default is False, in which case mode 2 is used.
If True, mode 1 is used.
Returns:
-------
List containing the desired outputs: PDO pattern, PDO index, and/or variance fraction.
"""
if desired is None:
desired = ['pdo_pattern', 'pdo_index', 'variance_fraction_pdo']
n_modes = 1 if remove_trend else 2
if data is not None:
if start_time!=None and end_time!=None:
data = adjust_latitude(adjust_longitude(rename_dims_to_standard(data.sel(time=slice(start_time, end_time))), to_range=to_range))
else:
data = adjust_latitude(adjust_longitude(rename_dims_to_standard(data), to_range=to_range))
elif path and var:
data = adjust_longitude(
rename_dims_to_standard(
load_data(
path, var, start_time=start_time, end_time=end_time
)
),
to_range=to_range
)
else:
print("No valid data !!!")
data_anomaly = calculate_anomaly(data)
data_pdo = data_anomaly.sel(lat=slice(lat_s, lat_e), lon=slice(lon_s, lon_e))
if remove_trend:
data_pdo_anomaly = data_pdo - calculate_global_mean_sst(data_anomaly, lat_name='lat', lon_name='lon')
else:
data_pdo_anomaly = calculate_anomaly(data_pdo, clim_start=clim_start, clim_end=clim_end)
solver = compute_rotated_eofs(
data_pdo_anomaly, rotated=False, n_modes=n_modes,
standardize=standardize, use_coslat=True
)
result_dict = {
'pdo_pattern': solver.components()[n_modes - 1].squeeze(),
'pdo_index': (solver.scores()[n_modes - 1] / solver.scores()[n_modes - 1].std()).squeeze(),
'variance_fraction_pdo': solver.explained_variance_ratio()[n_modes - 1].squeeze()
}
return_desired = [result_dict[key] for key in desired if key in result_dict]
return return_desired[0] if len(return_desired) == 1 else return_desired
[docs]
def compute_amo(data=None, path=None, var=None, clim_start=None, clim_end=None, desired=None,
start_time=None, end_time=None, lat_s=70, lat_e=0, lon_s=280, lon_e=360,
to_range='0_360'):
"""
Calculate the AMO (Atlantic Multidecadal Oscillation) index and pattern.
This function calculates the conventional AMO index and AMO pattern as the
area-averaged SST anomalies and time-averaged SST anomalies in the North Atlantic
(0°N-70°N) after removing the annual cycle climatology and global mean SST (to
account for the warming trend).
Parameters:
----------
data : xarray.DataArray or xarray.Dataset, optional
Gridded SST data with dimensions (time, lat, lon). Use 'path' and 'var'
instead if you want to load the data from a file.
path : str, optional
Directory path to the data. Required if 'data' is not provided.
var : str, optional
Name of the variable to load from the dataset.
start_time : int, optional
Start year for loading data.
end_time : int, optional
End year for loading data.
clim_start : int, optional
Climatology start year.
clim_end : int, optional
Climatology end year (if both None, whole period will be chosen for climatology).
desired : list, optional
Desired outputs, which can be ['amo_pattern', 'amo_index']. Default is both.
lat_s : float, optional
Start latitude for the region. Default is 70 (North).
lat_e : float, optional
End latitude for the region. Default is 0 (Equator).
lon_s : float, optional
Start longitude for the region. Default is 280 (Western Atlantic).
lon_e : float, optional
End longitude for the region. Default is 360.
to_range : str, optional
Target longitude range. Default is '0_360'. Use '-180_180' if needed.
Returns:
-------
List containing the desired outputs: AMO pattern and/or AMO index.
"""
if desired is None:
desired = ['amo_pattern', 'amo_index']
if data is not None:
if start_time!=None and end_time!=None:
data = adjust_latitude(adjust_longitude(rename_dims_to_standard(data.sel(time=slice(start_time, end_time))), to_range=to_range))
else:
data = adjust_latitude(adjust_longitude(rename_dims_to_standard(data), to_range=to_range))
elif path and var:
data = adjust_longitude(
rename_dims_to_standard(
load_data(
path, var, start_time=start_time, end_time=end_time
)
),
to_range=to_range
)
else:
print("No valid data !!!")
north_atlantic_sst = data.sel(lat=slice(lat_s, lat_e), lon=slice(lon_s, lon_e))
data_anomalies = calculate_anomaly(
north_atlantic_sst, clim_start=clim_start, clim_end=clim_end
)
global_mean_data = calculate_global_mean_sst(data, lat_name='lat', lon_name='lon')
amo_index = (
data_anomalies.weighted(compute_weights(data_anomalies)).mean(('lat', 'lon'))
- global_mean_data
)
amo_pattern = (
xr.cov(calculate_anomaly(data), amo_index, dim='time')
/ amo_index.var()
)
result_dict = {
'amo_pattern': amo_pattern,
'amo_index': amo_index
}
return_desired = [result_dict[key] for key in desired if key in result_dict]
return return_desired[0] if len(return_desired) == 1 else return_desired
[docs]
def compute_nao(data=None, path=None, var=None, clim_start=None, clim_end=None, desired=None, \
lat_s=None, lat_e=None, use_coslat=None, standardize=None, to_range=None, n_modes=10, nao_mode=None, \
start_time=None, end_time=None, rotated='Varimax'):
'''
This function calculates the NAO index, NAO pattern, and variance fraction.
It is calculated as the second EOF mode of 500mb geopotential height
anomaly above 20N after removing the annual cycle climatology.
Parameters:
- data : xarray.DataArray or xarray.Dataset
Processed gridded data in the shape of (time, lat, lon).
- path : str, optional
Path to the data file (if data is to be loaded directly).
- var : str, optional
Variable name in the dataset (used if loading from a file).
- clim_start : int, optional
Climatology start year.
- clim_end : int, optional
Climatology end year. If None, the whole period will be used.
- desired : list, optional
List of desired outputs, e.g., ['nao_pattern', 'nao_index', 'variance_fraction_nao'].
- lat_s : float, optional
Start latitude for selecting the northern region.
- lat_e : float, optional
End latitude for selecting the northern region.
- use_coslat : bool, optional
Whether to apply cosine latitude weighting. Default is True.
- standardize : bool, optional
If True, standardizes the data. Default is False.
- to_range : str, optional
Longitude range, either '0_360' or '-180_180'. Default is '0_360'.
- nao_mode : int, optional
The EOF mode number to be considered as NAO. Default is 1.
- start_time, end_time : str, optional
Time range for selecting data.
- rotated : str, optional
Rotation method for EOFs. Options: None, 'Varimax', 'Promax'. Default is 'Varimax'.
'''
lat_s = lat_s if lat_s is not None else 90
lat_e = lat_e if lat_e is not None else 20
standardize = standardize if standardize is not None else False
desired = desired if desired is not None else ['nao_pattern', 'nao_index', 'variance_fraction_nao']
to_range = to_range if to_range is not None else '0_360'
nao_mode = nao_mode if nao_mode is not None else 1
if data is not None:
if start_time!=None and end_time!=None:
data = adjust_latitude(adjust_longitude(rename_dims_to_standard(data.sel(time=slice(start_time, end_time))), to_range=to_range))
else:
data = adjust_latitude(adjust_longitude(rename_dims_to_standard(data), to_range=to_range))
elif path and var:
data = adjust_longitude(
rename_dims_to_standard(
load_data(
path, var, start_time=start_time, end_time=end_time
)
),
to_range=to_range
)
else:
print("No valid data !!!")
if data is None:
raise ValueError("Data must be provided either directly or via path and var.")
north_geop = data.sel(lat=slice(lat_s, lat_e))
data_anomalies = calculate_anomaly(north_geop, clim_start=clim_start, clim_end=clim_end)
eofs_result = compute_rotated_eofs(
data_anomalies, rotated=rotated, n_modes=n_modes,
standardize=standardize, use_coslat=use_coslat
)
nao_index = eofs_result.scores()[nao_mode-1] / eofs_result.scores()[nao_mode-1].std()
nao_pattern = eofs_result.components()[nao_mode-1]
variance_fraction_nao = eofs_result.explained_variance_ratio()[nao_mode-1]
result_dict = {
'nao_pattern': nao_pattern.squeeze(),
'nao_index': nao_index.squeeze(),
'variance_fraction_nao': variance_fraction_nao.squeeze(),
}
return_desired = [result_dict[key] for key in desired if key in result_dict]
return return_desired[0] if len(return_desired) == 1 else return_desired