from __future__ import annotations
import numpy as np
from numpy.typing import NDArray
from pathlib import Path
from ._tools import (
read_initial_inventory,
create_initial_inventory,
)
from ..crowding import crowding_indices
from ._tools import _check_inventory_data
import matplotlib.pyplot as plt
from ..model._processes import (
_calculate_reduced_growth,
_calculate_survival_rate,
_calculate_recruitment_rate,
)
from ._cy_utils import (
_species_abundance,
_count_saplings,
_count_reproductives,
)
[docs]
class Inventory:
"""
This inventory class stores forest data from the simulation and the ForestGEO network.
Instance methods offer statistics for analyzing the data directly.
Some statistics require crowding indices, which is why they are calculated if they are not provided in the data.
:no-index:
"""
def _init_inventory(self, data: np.ndarray, radius: float = 10.0):
self.radius = radius
self._data = data
[docs]
@classmethod
def from_data(
cls,
data: np.ndarray | Path | Inventory,
radius: float = 10.0,
):
"""
Create an Inventory instance from existing data, a file path, or another Inventory.
If an Inventory is provided, its data and radius are copied.
If a Path is provided, the file is read and validated.
If a numpy array is provided, it is validated for correct structure and types.
:param data: Structured numpy array, file path, or Inventory to initialize from.
:param radius: Neighborhood radius for crowding indices (default: 10.0).
:param returns: A new Inventory instance initialized from the provided data.
"""
if isinstance(data, Inventory):
arr = data.data
radius = data.radius
elif isinstance(data, Path):
arr = read_initial_inventory(data)
else:
arr = _check_inventory_data(data, radius)
obj = cls.__new__(cls)
obj._init_inventory(arr, radius)
return obj
[docs]
@classmethod
def from_random(
cls,
n_species: int,
dim_x: float,
dim_y: float,
radius: float = 10.0,
num_threads: int = 1,
):
"""
Create an Inventory instance from randomly spread trees. These trees will have the same density
as trees in the BCI plot.
:param n_species: Number of different species to include in the inventory.
:param dim_x: Width of the plot area.
:param dim_y: Height of the plot area.
:param radius: Neighborhood radius for crowding indices (default: 10.0).
:param num_threads: Number of threads to use for inventory creation (default: 1).
:returns: A new Inventory instance with randomly generated data.
"""
arr = create_initial_inventory(
n_species=n_species,
dim_x=dim_x,
dim_y=dim_y,
radius=radius,
num_threads=num_threads,
)
obj = cls.__new__(cls)
obj._init_inventory(arr, radius)
return obj
@property
def data(self) -> np.ndarray:
"""
The full structured numpy array representing the inventory.
Read-only: do not assign directly, use field setters or methods.
"""
return self._data
@property
def x(self):
"""X-coordinates of individuals (np.ndarray, dtype float64).
This property can be updated/set. Setting it will automatically recalculate all crowding indices.
"""
return self.data["x"]
@x.setter
def x(self, value: np.ndarray[np.float64]) -> None:
if not (isinstance(value, np.ndarray) and value.dtype == np.float64):
raise TypeError("x must be a numpy.ndarray of dtype float64")
self.data["x"] = value
self._update_crowding_indices()
@property
def y(self):
"""Y-coordinates of individuals (np.ndarray, dtype float64).
This property can be updated/set. Setting it will automatically recalculate all crowding indices.
"""
return self.data["y"]
@y.setter
def y(self, value: np.ndarray[np.float64]) -> None:
if not (isinstance(value, np.ndarray) and value.dtype == np.float64):
raise TypeError("y must be a numpy.ndarray of dtype float64")
self.data["y"] = value
self._update_crowding_indices()
@property
def dbh(self):
"""Diameter at breast height (np.ndarray, dtype float64).
This property can be updated/set. Setting it will automatically recalculate all crowding indices.
"""
return self.data["dbh"]
@dbh.setter
def dbh(self, value: np.ndarray[np.float64]) -> None:
if not (isinstance(value, np.ndarray) and value.dtype == np.float64):
raise TypeError("dbh must be a numpy.ndarray of dtype float64")
self.data["dbh"] = value
self._update_crowding_indices()
@property
def species(self):
"""Species IDs (np.ndarray, dtype int64).
This property can be updated/set. Setting it will automatically recalculate all crowding indices.
"""
return self.data["species"]
@species.setter
def species(self, value: np.ndarray[np.int64]) -> None:
if not (isinstance(value, np.ndarray) and value.dtype == np.int64):
raise TypeError("species must be a numpy.ndarray of dtype int64")
self.data["species"] = value
self._update_crowding_indices()
@property
def status(self):
"""Status codes (np.ndarray, dtype int32).
This property can be updated/set. Setting it will automatically recalculate all crowding indices.
"""
return self.data["status"]
@status.setter
def status(self, value: np.ndarray[np.int32]) -> None:
if not (isinstance(value, np.ndarray) and value.dtype == np.int32):
raise TypeError("status must be a numpy.ndarray of dtype int32")
self.data["status"] = value
self._update_crowding_indices()
@property
def CI_CS(self):
"""Conspecific crowding index."""
return self.data["CI_CS"]
@property
def CI_HS(self):
"""Heterospecific crowding index."""
return self.data["CI_HS"]
@property
def CI_CS_d(self):
"""Conspecific crowding index (distance-weighted)."""
return self.data["CI_CS_d"]
@property
def CI_HS_d(self):
"""Heterospecific crowding index (distance-weighted)."""
return self.data["CI_HS_d"]
@property
def CI_C(self):
"""Conspecific crowding index (unweighted)."""
return self.data["CI_C"]
@property
def CI_H(self):
"""Heterospecific crowding index (unweighted)."""
return self.data["CI_H"]
@property
def CI_C_d(self):
"""Conspecific crowding index (distance-weighted, unweighted)."""
return self.data["CI_C_d"]
@property
def CI_H_d(self):
"""Heterospecific crowding index (distance-weighted, unweighted)."""
return self.data["CI_H_d"]
def _update_crowding_indices(self):
"""
Recalculate crowding indices in-place for the current inventory data.
"""
# Calculate all crowding indices and update fields
CI_CS, CI_HS, CI_CS_d, CI_HS_d = crowding_indices(
self.x,
self.y,
self.species,
self.status,
self.radius,
dbh=self.dbh,
)
CI_C, CI_H, CI_C_d, CI_H_d = crowding_indices(
self.x,
self.y,
self.species,
self.status,
self.radius,
)
self.data["CI_CS"] = CI_CS
self.data["CI_HS"] = CI_HS
self.data["CI_CS_d"] = CI_CS_d
self.data["CI_HS_d"] = CI_HS_d
self.data["CI_C"] = CI_C
self.data["CI_H"] = CI_H
self.data["CI_C_d"] = CI_C_d
self.data["CI_H_d"] = CI_H_d
[docs]
def get_BA_and_k(self) -> tuple[
NDArray[np.float64],
NDArray[np.float64],
NDArray[np.float64],
NDArray[np.float64],
NDArray[np.float64],
NDArray[np.float64],
NDArray[np.int_],
NDArray[np.int_],
]:
"""
Calculates summary statistics for BA and k using inventory data.
BA stands for basal area, and k represents aggregation.
ff is for comparing a focal species with conspecifics.
fh is for comparing a focal species with heterospecifics.
:returns: Tuple of arrays (BA_ff, BA_fh, n_BA_ff, n_BA_fh, k_ff, k_fh, abundance_con, abundance_het).
:rtype: tuple
"""
CI_CS = self.CI_CS
CI_C = self.CI_C
CI_HS = self.CI_HS
CI_H = self.CI_H
species = self.species
dbh = self.dbh
unique_species = np.unique(species)
n_species = unique_species.size
BA_ff = np.zeros(n_species)
BA_fh = np.zeros(n_species)
k_ff = np.zeros(n_species)
k_fh = np.zeros(n_species)
n_BA_ff = np.zeros(n_species)
n_BA_fh = np.zeros(n_species)
abundance_con = np.zeros(n_species, dtype=int)
abundance_het = np.zeros(n_species, dtype=int)
max_x = np.max(self.x) if self.x.size else 0.0
max_y = np.max(self.y) if self.y.size else 0.0
c = (
2
* np.pi
* self.radius
/ (int(max_x + 0.5) * int(max_y + 0.5) if (max_x > 0 and max_y > 0) else 1)
)
all_individuals = np.arange(len(species))
for idx, f in enumerate(unique_species):
conspecifics = np.where(species == f)[0]
heterospecifics = np.delete(all_individuals, conspecifics)
CI_CS_mean = np.mean(CI_CS[conspecifics]) if conspecifics.size else 0.0
CI_C_mean = np.mean(CI_C[conspecifics]) if conspecifics.size else 0.0
CI_HS_mean = np.mean(CI_HS[conspecifics]) if conspecifics.size else 0.0
CI_H_mean = np.mean(CI_H[conspecifics]) if conspecifics.size else 0.0
dbh_C_mean = np.mean(dbh[conspecifics]) if conspecifics.size else 0.0
dbh_H_mean = np.mean(dbh[heterospecifics]) if heterospecifics.size else 0.0
# Use denominator guards to avoid divide-by-zero and spurious zeros
ff = (CI_CS_mean / CI_C_mean) if CI_C_mean != 0 else 0.0
fh = (CI_HS_mean / CI_H_mean) if CI_H_mean != 0 else 0.0
BA_ff[idx] = ff
n_BA_ff[idx] = ff / dbh_C_mean if dbh_C_mean != 0 else 0.0
k_ff[idx] = (
CI_C_mean / (c * len(conspecifics))
if len(conspecifics) > 0 and c != 0
else 0.0
)
BA_fh[idx] = fh
n_BA_fh[idx] = fh / dbh_H_mean if dbh_H_mean != 0 else 0.0
k_fh[idx] = (
CI_H_mean / (c * len(heterospecifics))
if len(heterospecifics) > 0 and c != 0
else 0.0
)
abundance_con[idx] = len(conspecifics)
abundance_het[idx] = len(heterospecifics)
return BA_ff, BA_fh, n_BA_ff, n_BA_fh, k_ff, k_fh, abundance_con, abundance_het
[docs]
def reduced_growth(self, beta_gr: float | None = None) -> NDArray[np.float64]:
"""
Calculate the reduced growth using the formula from Cython.
Reduced means, to only account for the exponential part of the growth function.
:param beta_gr: Growth reduction parameter.
:returns: Array of reduced growth values.
:rtype: NDArray[np.float64]
"""
return _calculate_reduced_growth(self.CI_CS, self.CI_HS, beta_gr)
[docs]
def survival(self, reduced: bool = True) -> NDArray[np.float64]:
"""
Calculate the reduced survival probabilities using the formula from mortality in cython.
Reduced means, to only account for the exponential part of the survival probability.
:param reduced: If True, calculate reduced survival; if False, apply background mortality.
:returns: Array of survival probabilities.
:rtype: NDArray[np.float64]
"""
return _calculate_survival_rate(
self.CI_CS,
self.CI_HS,
self.CI_CS_d,
self.CI_HS_d,
self.dbh,
apply_bg=(not reduced),
)
[docs]
def reduced_recruitment(self) -> NDArray[np.float64]:
"""
Calculate the recruitment probability using the formula from recruitment in cython.
Reduced means, to only account for the exponential part of the recruitment probability.
:returns:
Array of recruitment probabilities.
"""
return _calculate_recruitment_rate(
self.CI_CS,
self.CI_HS,
self.CI_CS_d,
self.CI_HS_d,
self.dbh,
self.species,
)
[docs]
def species_abundance_distribution(self, abundance_class: np.ndarray) -> tuple:
"""
Get the amount of species per abundance class.
:param abundance_class: Array of abundance class boundaries (bins).
:returns: Distribution of species counts per abundance class.
"""
_, counts = np.unique(self.species, return_counts=True)
return np.histogram(counts, bins=abundance_class)
[docs]
def get_CI_CS_distribution(self, bins: np.ndarray) -> np.ndarray:
"""
Get distribution counts for conspecific crowding index (CI_CS).
:param bins: Array of bin edges for distribution.
:returns: Distribution of CI_CS.
"""
counts, _ = np.histogram(self.CI_CS, bins=bins)
return counts
[docs]
def get_CI_HS_distribution(self, bins: np.ndarray) -> np.ndarray:
"""
Get distribution counts for heterospecific crowding index (CI_HS).
:param bins: Array of bin edges for distribution.
:returns: Distribution of CI_HS.
"""
counts, _ = np.histogram(self.CI_HS, bins=bins)
return counts
[docs]
def get_reduced_growth_distribution(
self, bins: np.ndarray, beta_gr: float = 0.084
) -> np.ndarray:
"""
Get distribution counts for reduced growth values.
:param bins: Array of bin edges for distribution.
:param beta_gr: Growth reduction parameter.
:returns: Distribution of reduced growth.
"""
vals = self.reduced_growth(beta_gr=beta_gr)
counts, _ = np.histogram(vals, bins=bins)
return counts
[docs]
def get_survival_distribution(self, bins: np.ndarray) -> np.ndarray:
"""
Get distribution counts for survival probabilities.
:param bins: Array of bin edges for distribution.
:returns: Distribution of survival probabilities.
"""
vals = self.survival()
counts, _ = np.histogram(vals, bins=bins)
return counts
[docs]
def get_recruitment_distribution(self, bins: np.ndarray) -> np.ndarray:
"""
Get distribution counts for recruitment probabilities.
:param bins: Array of bin edges for distribution.
:returns: Distribution of recruitment probabilities.
"""
vals = self.reduced_recruitment()
counts, _ = np.histogram(vals, bins=bins)
return counts
[docs]
def get_SAD_distribution(self, bins: np.ndarray) -> np.ndarray:
"""
Get distribution counts for species abundance distribution (SAD).
:param bins: Array of bin edges for abundance classes.
:returns: Distribution of SAD.
"""
species = self.species
unique_species, species_counts = np.unique(species, return_counts=True)
sad_hist, _ = np.histogram(
species_counts,
bins=bins,
weights=(
np.ones_like(species_counts) * 1 / len(unique_species)
if len(unique_species) > 0
else None
),
)
return sad_hist
[docs]
def count_reproductives(
self, reproductive_size: float
) -> tuple[np.ndarray[np.int32], np.ndarray[np.int32]]:
"""
Count the number of reproductive trees.
All individuals are checked to be alive and larger or equally large than reproductive_size.
:param reproductive_size: The size upon which an individual can reproduce.
:returns: Amount of reproductives per species, indices of reproductive trees.
"""
return _count_reproductives(
self.dbh, self.species, self.status, reproductive_size
)
[docs]
def count_saplings(self, reproductive_size: float) -> int:
"""
Count the number of saplings.
All individuals are checked to be alive and smaller than reproductive_size.
:param reproductive_size: The size upon which an individual can reproduce.
:returns: The total amount of saplings.
"""
return _count_saplings(self.dbh, self.status, reproductive_size)
[docs]
def species_abundance(self) -> np.ndarray[np.int32]:
"""
Count the amount of individuals per species.
:returns: Array of individual counts per species.
"""
n_species = np.max(self.species)
return _species_abundance(self.species, n_species)
[docs]
def extend_inventory(self, other: Inventory | np.ndarray) -> None:
"""
Extend the current inventory by appending another inventory's data.
This method concatenates all fields from the other inventory to the current one
and recalculates all crowding indices for the combined dataset.
:param other: Another Inventory object or structured numpy array to append.
:raises TypeError: If other is not an Inventory or compatible structured array.
:raises ValueError: If the data structures are incompatible.
"""
# Convert other to data array if it's an Inventory
if isinstance(other, Inventory):
other_data = other.data
elif isinstance(other, np.ndarray):
# Validate that it's a structured array with the right fields
other_data = _check_inventory_data(other, self.radius)
else:
raise TypeError(
"other must be an Inventory object or a structured numpy array"
)
# Check that both arrays have the same dtype
if self._data.dtype != other_data.dtype:
raise ValueError(
f"Cannot extend: dtype mismatch. "
f"Current: {self._data.dtype}, Other: {other_data.dtype}"
)
# Concatenate the structured arrays
combined_data = np.concatenate([self._data, other_data])
# Update internal data
self._data = combined_data
# Recalculate all crowding indices for the combined dataset
self._update_crowding_indices()
[docs]
def plot(
self,
threshold: int = 50,
filter: float = 1.0,
scale: float = 7.5,
figsize=(22, 10),
):
"""
Plot the current state of the inventory.
:param title: The title of the plot.
:param threshold: Minimum abundance of conspecifics ToDo.
:param filter: Filter out all trees below that size for inventory plot.
:param figsize: Figure size tuple (width, height).
"""
# Lazy import to avoid circular dependency
from ._plotting import display
fig, axs = plt.subplots(5, 5, figsize=figsize)
display(
self,
threshold=threshold,
filter=filter,
scale=scale,
abundances=self.species_abundance(),
axs=axs,
)
plt.show()
[docs]
def plot_map(
self,
filter: float = 1.0,
scale: float = 7.5,
figsize: tuple | None = None,
mask: np.ndarray | None = None,
):
"""
Plot only the spatial map of the inventory (trees as scatter plot).
:param title: The title of the plot.
:param filter: Minimum DBH of trees to be plotted.
:param scale: Scaling factor for point size based on DBH.
:param figsize: Figure size tuple (width, height).
:param mask: Boolean mask to filter which individuals to plot (optional).
"""
# Lazy import to avoid circular dependency
from ._plotting import plot_inventory_on_ax
if figsize is None:
figsize = (10, int(10 * np.max(self.y) / np.max(self.x)))
fig, ax = plt.subplots(figsize=figsize)
plot_inventory_on_ax(
ax=ax,
inventory=self,
filter=filter,
scale=scale,
mask=mask,
)
plt.tight_layout()
plt.show()