Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Changed

- Standardized eta handling across all damage reduction recipes. Previously, the `adding_up` recipe did not use eta parameters while `risk_aversion` did. Now all recipes require and use eta values consistently. ([PR #417](https://github.com/ClimateImpactLab/dscim/pull/417), [@JMGilbert](https://github.com/JMGilbert))
- File naming convention updated to include eta value for all reduced damage outputs (e.g., `adding_up_cc_eta1.0.zarr`)
- Simplified `reduce_damages()` and `subset_USA_reduced_damages()` functions by removing conditional logic for eta-less processing
- Updated `StackedDamages.adding_up_damages` property to reference eta-specific file paths
- Refactored mortality damage preprocessing to support eta-aware damage loading. The `prep_mortality_damages()` function now accepts multiple eta values and merges damage data across the eta dimension. ([PR #417](https://github.com/ClimateImpactLab/dscim/pull/417), [@JMGilbert](https://github.com/JMGilbert))
- Added support for mortality version 9 with VLY (Value of a Life Year) valuation using EPA population-averaged scaling. ([PR #417](https://github.com/ClimateImpactLab/dscim/pull/417), [@JMGilbert](https://github.com/JMGilbert))
- Updated chunking strategy in `reduce_damages()` to handle eta as a mapped dimension alongside batch operations, improving consistency in batch dimension handling for both standard and quantile regression workflows. ([PR #417](https://github.com/ClimateImpactLab/dscim/pull/417), [@JMGilbert](https://github.com/JMGilbert))

### Fixed

- Pinned `scipy=1.15.3` to resolve `statsmodels==0.14.4` import issues. ([PR #417](https://github.com/ClimateImpactLab/dscim/pull/417), [@JMGilbert](https://github.com/JMGilbert))
- Updated all unit tests to work with eta-aware damage processing, including test fixtures for mortality damages and adding-up calculations. ([PR #417](https://github.com/ClimateImpactLab/dscim/pull/417), [@JMGilbert](https://github.com/JMGilbert))

### Removed

- Removed assertion preventing `adding_up` recipe from accepting eta arguments, as this recipe now processes damages with eta values like all other recipes. ([PR #417](https://github.com/ClimateImpactLab/dscim/pull/417), [@JMGilbert](https://github.com/JMGilbert))

## [0.7.0] - 2025-08-15

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ impactlab-tools==0.6.0
p_tqdm==1.4.2
pyarrow==21.0.0
numcodecs==0.15.1
scipy==1.15.3
168 changes: 168 additions & 0 deletions src/dscim/menu/main_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ def __init__(
extrap_formula=None,
fair_dims=None,
save_files=None,
geography="globe",
country_mapping_path=None,
individual_region=None,
**kwargs,
):
if scc_quantiles is None:
Expand Down Expand Up @@ -232,6 +235,12 @@ def __init__(
self.extrap_formula = extrap_formula
self.fair_dims = fair_dims
self.save_files = save_files
self.geography = geography
self.individual_region = individual_region
if country_mapping_path is not None:
self.country_mapping = pd.read_csv(country_mapping_path)
else:
self.country_mapping = None
self.__dict__.update(**kwargs)
self.kwargs = kwargs

Expand Down Expand Up @@ -456,6 +465,135 @@ def collapsed_pop(self):
pop = self.pop.mean(["model", "ssp"])
return pop

def _aggregate_by_geography(
self,
data: xr.DataArray,
geography: str,
) -> xr.DataArray:
"""Aggregate data by geography type.

Parameters
----------
data : xr.DataArray
Data with 'region' dimension to aggregate
geography : str
One of "ir" (impact region), "country", or "globe"

Returns
-------
xr.DataArray
Aggregated data according to geography
"""
if geography == "ir":
return data # No aggregation needed

elif geography == "country":
if hasattr(self, "country_mapping") and self.country_mapping is not None:
return self._aggregate_to_country(data)
else:
raise ValueError(
"Country aggregation requires country_mapping parameter"
)

elif geography in ("globe", "global"):
return (
data.sum(dim="region")
.assign_coords({"region": "globe"})
.expand_dims("region")
)

else:
raise ValueError(
f"Unknown geography: {geography}. "
f"Valid options: 'ir', 'country', 'globe'"
)

def _aggregate_to_country(self, data: xr.DataArray) -> xr.DataArray:
"""Aggregate to country level using mapping.

Parameters
----------
data : xr.DataArray
Data with 'region' dimension to aggregate

Returns
-------
xr.DataArray
Data aggregated to country level
"""
mapping = {
row["ISO"]: row["MatchedISO"]
if str(row["MatchedISO"]) != "nan"
else "nopop"
for _, row in self.country_mapping.iterrows()
}
territories = [mapping.get(str(r)[:3], "unknown") for r in data.region.values]
return data.assign_coords({"region": territories}).groupby("region").sum()

def damages_dataset(self, geography: str = "globe") -> xr.Dataset:
"""Calculate damages as xarray Dataset.

Parameters
----------
geography : str, optional
One of "ir" (impact region), "country", or "globe" (default)

Returns
-------
xr.Dataset
Damages dataset with regional aggregation
"""
# Get raw calculated damages (xr.DataArray)
damages = self.calculated_damages
pop = self.collapsed_pop

# Apply geography aggregation
aggregated = self._aggregate_by_geography(damages * pop, geography)

ds = aggregated.to_dataset(name="damages")

# Add metadata for GWR discounting
if "gwr" in self.discounting_type:
ds = ds.assign_coords(
ssp=str(list(self.gdp.ssp.values)),
model=str(list(self.gdp.model.values)),
)

return ds

def _filter_illegal_combinations_xr(self, ds: xr.Dataset) -> xr.Dataset:
"""Filter illegal SSP/RCP combinations (xarray version).

Parameters
----------
ds : xr.Dataset
Dataset to filter

Returns
-------
xr.Dataset
Filtered dataset with illegal combinations set to NaN
"""
if "ssp" in ds.coords and any(ssp in ds.ssp.values for ssp in ["SSP1", "SSP5"]):
self.logger.info("Dropping illegal model combinations.")
illegal = ((ds.ssp == "SSP1") & (ds.rcp == "rcp85")) | (
(ds.ssp == "SSP5") & (ds.rcp == "rcp45")
)
for var in ["anomaly", "gmsl"]:
if var in ds:
ds[var] = xr.where(illegal, np.nan, ds[var])

if "agriculture" in self.sector:
self.logger.info("Dropping illegal model combinations for agriculture.")
if "anomaly" in ds:
ds["anomaly"] = xr.where(
(ds.gcm == "ACCESS1-0") & (ds.rcp == "rcp85"),
np.nan,
ds["anomaly"],
)

return ds

@abstractmethod
def ce_cc_calculation(self):
"""Calculate CE damages depending on discount type"""
Expand Down Expand Up @@ -491,6 +629,13 @@ def damage_function_points(self) -> pd.DataFrame:
--------
pd.DataFrame
"""
if self.geography != "globe":
return self._damage_function_points_xarray()
else:
return self._damage_function_points_pandas()

def _damage_function_points_pandas(self) -> pd.DataFrame:
"""Original pandas-based implementation for backward compatibility."""
df = self.global_damages_calculation()

if "slr" in df.columns:
Expand All @@ -515,6 +660,29 @@ def damage_function_points(self) -> pd.DataFrame:

return df

def _damage_function_points_xarray(self) -> pd.DataFrame:
"""xarray-based implementation for geography support."""
ds = self.damages_dataset(geography=self.geography)

# Merge climate data using xarray
climate_vars = []
if "slr" in ds.coords:
gmsl = self.climate.gmsl.set_index(["slr", "year"]).to_xarray()
climate_vars.append(gmsl)
if "gcm" in ds.coords:
gmst = self.climate.gmst.set_index(["gcm", "rcp", "year"]).to_xarray()
climate_vars.append(gmst)

if climate_vars:
climate_ds = xr.merge(climate_vars)
ds = xr.merge([ds, climate_ds]).sel(year=ds.year)

# Filter illegal combinations
ds = self._filter_illegal_combinations_xr(ds)

# Convert to DataFrame at the boundary
return ds.to_dataframe().reset_index()

def damage_function_calculation(self, damage_function_points, global_consumption):
"""The damage function model fit may be : (1) ssp specific, (2) ssp-model specific, (3) unique across ssp-model.
This depends on the type of discounting. In each case the input data passed to the fitting functions and the formatting of the returned
Expand Down
4 changes: 2 additions & 2 deletions src/dscim/menu/simple_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ def gdppc(self):
def adding_up_damages(self):
"""This property calls pre-calculated adding-up IR-level 'mean' over batches."""

mean_cc = f"{self.ce_path}/adding_up_cc.zarr"
mean_no_cc = f"{self.ce_path}/adding_up_no_cc.zarr"
mean_cc = f"{self.ce_path}/adding_up_cc_eta{self.eta}.zarr"
mean_no_cc = f"{self.ce_path}/adding_up_no_cc_eta{self.eta}.zarr"

if os.path.exists(mean_cc) and os.path.exists(mean_no_cc):
self.logger.info(
Expand Down
20 changes: 19 additions & 1 deletion src/dscim/preprocessing/input_damages.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ def prep_mortality_damages(
outpath,
mortality_version,
path_econ,
etas,
):
ec = EconVars(path_econ=path_econ)

Expand All @@ -744,6 +745,10 @@ def prep_mortality_damages(
scaling_deaths = "epa_row"
scaling_costs = "epa_scaled"
valuation = "vsl"
elif mortality_version == 9:
scaling_deaths = "epa_popavg"
scaling_costs = "epa_scaled"
valuation = "vly"
else:
raise ValueError("Mortality version not valid: ", str(mortality_version))

Expand Down Expand Up @@ -776,7 +781,19 @@ def prep(
valuation=valuation,
).drop(["gcm", "valuation"])

data = xr.open_mfdataset(paths, preprocess=prep, parallel=True, engine="zarr")
d_ls = []
for eta in etas:
paths_ls = [paths.format(i, eta) for i in range(15)]
data = (
xr.open_mfdataset(
paths_ls, preprocess=prep, parallel=True, engine="zarr"
)
.assign_coords({"eta": eta})
.expand_dims("eta")
)
d_ls.append(data)

data = xr.merge(d_ls)

damages = xr.Dataset(
{
Expand All @@ -798,6 +815,7 @@ def prep(
damages = damages.chunk(
{
"batch": 15,
"eta": 1,
"ssp": 1,
"model": 1,
"rcp": 1,
Expand Down
54 changes: 18 additions & 36 deletions src/dscim/preprocessing/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,6 @@ def reduce_damages(
zero=False,
quantreg=False,
):
if recipe == "adding_up":
assert (
eta is None
), "Adding up does not take an eta argument. Please set to None."
# client = Client(n_workers=35, memory_limit="9G", threads_per_worker=1)

if recipe == "risk_aversion":
assert not quantreg, "Quantile regression is not compatible with risk aversion. Please set quantreg to False."

with open(config) as stream:
c = yaml.safe_load(stream)
params = c["sectors"][sector]
Expand Down Expand Up @@ -123,15 +114,16 @@ def reduce_damages(
"model": 1,
"ssp": 1,
}
map_dims = ["eta"]
if quantreg:
chunkies["batch"] = 1
ce_batch_dims = [i for i in gdppc.dims] + [
i for i in ds.dims if i not in gdppc.dims
]
else:
ce_batch_dims = [i for i in gdppc.dims] + [
i for i in ds.dims if i not in gdppc.dims and i != "batch"
]
map_dims.append("batch")

ce_batch_dims = [i for i in gdppc.dims] + [
i for i in ds.dims if i not in gdppc.dims and i not in map_dims
]

ce_batch_coords = {c: ds[c].values for c in ce_batch_dims}
ce_batch_coords["region"] = [
i for i in gdppc.region.values if i in ce_batch_coords["region"]
Expand All @@ -145,6 +137,8 @@ def reduce_damages(
).chunk(chunkies)

other = xr.open_zarr(damages).chunk(chunkies)
if "eta" in other.coords:
other = other.sel(eta=eta, drop=True)

out = other.map_blocks(
ce_from_chunk,
Expand Down Expand Up @@ -172,7 +166,7 @@ def reduce_damages(

if recipe == "adding_up":
out.to_zarr(
f"{outpath}/{recipe}_{reduction}.zarr",
f"{outpath}/{recipe}_{reduction}_eta{eta}.zarr",
consolidated=True,
mode="w",
)
Expand Down Expand Up @@ -289,14 +283,9 @@ def subset_USA_reduced_damages(
eta,
input_path,
):
if recipe == "adding_up":
ds = xr.open_zarr(
f"{input_path}/{sector}/{recipe}_{reduction}.zarr",
)
elif recipe == "risk_aversion":
ds = xr.open_zarr(
f"{input_path}/{sector}/{recipe}_{reduction}_eta{eta}.zarr",
)
ds = xr.open_zarr(
f"{input_path}/{sector}/{recipe}_{reduction}_eta{eta}.zarr",
)

US_territories = [
"USA",
Expand All @@ -321,18 +310,11 @@ def subset_USA_reduced_damages(
for var in subset.variables:
subset[var].encoding.clear()

if recipe == "adding_up":
subset.to_zarr(
f"{input_path}/{sector}_USA/{recipe}_{reduction}.zarr",
consolidated=True,
mode="w",
)
elif recipe == "risk_aversion":
subset.to_zarr(
f"{input_path}/{sector}_USA/{recipe}_{reduction}_eta{eta}.zarr",
consolidated=True,
mode="w",
)
subset.to_zarr(
f"{input_path}/{sector}_USA/{recipe}_{reduction}_eta{eta}.zarr",
consolidated=True,
mode="w",
)


def subset_USA_ssp_econ(
Expand Down
Loading