Source code for spatiocoexistence.py_tools

import numpy as np
from numpy.typing import NDArray
from spatiocoexistence.crowding import crowding_indices
from pathlib import Path
import pandas as pd
from spatiocoexistence.processes import (
    calculate_reduced_growth,
    calculate_survival_rate,
    calculate_recruitment_rate,
)
import re


[docs] def get_BA_and_k( CI_CS: NDArray[np.float64], CI_C: NDArray[np.float64], CI_HS: NDArray[np.float64], CI_H: NDArray[np.float64], species: NDArray[np.int_], dbh: NDArray[np.float64], max_x: float, max_y: float, radius: float, ) -> tuple[ NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.int_], NDArray[np.int_], ]: """ BA stands for basal area, and k represents aggregation. ff is for comparing a focal species with conspecifics. fh is for comparing a focal species with heterospecifics. Returns numpy arrays for all outputs. """ unique_species = np.unique(species) n_species = unique_species.size BA_ff = np.zeros(n_species) BA_fh = np.zeros(n_species) k_ff = np.zeros(n_species) k_fh = np.zeros(n_species) n_BA_ff = np.zeros(n_species) n_BA_fh = np.zeros(n_species) abundance_con = np.zeros(n_species, dtype=int) abundance_het = np.zeros(n_species, dtype=int) c = 2 * np.pi * radius / (int(max_x + 0.5) * int(max_y + 0.5)) all_individuals = np.arange(len(species)) for idx, f in enumerate(unique_species): conspecifics = np.where(species == f)[0] heterospecifics = np.delete(all_individuals, conspecifics) CI_CS_mean = np.mean(CI_CS[conspecifics]) CI_C_mean = np.mean(CI_C[conspecifics]) CI_HS_mean = np.mean(CI_HS[conspecifics]) CI_H_mean = np.mean(CI_H[conspecifics]) dbh_C_mean = np.mean(dbh[conspecifics]) dbh_H_mean = np.mean(dbh[heterospecifics]) ff = CI_CS_mean / CI_C_mean if CI_CS_mean != 0 else 0 fh = CI_HS_mean / CI_H_mean if CI_HS_mean != 0 else 0 BA_ff[idx] = ff n_BA_ff[idx] = ff / dbh_C_mean if dbh_C_mean != 0 else 0 k_ff[idx] = CI_C_mean / (c * len(conspecifics)) if len(conspecifics) > 0 else 0 BA_fh[idx] = fh n_BA_fh[idx] = fh / dbh_H_mean if dbh_H_mean != 0 else 0 k_fh[idx] = ( CI_H_mean / (c * len(heterospecifics)) if len(heterospecifics) > 0 else 0 ) abundance_con[idx] = len(conspecifics) abundance_het[idx] = len(heterospecifics) return BA_ff, BA_fh, n_BA_ff, n_BA_fh, k_ff, k_fh, abundance_con, abundance_het
[docs] def reduced_growth( CI_CS: NDArray[np.float64], CI_HS: NDArray[np.float64], beta_gr: float | None = None ) -> NDArray[np.float64]: """Calculate the reduced growth using the formula from Cython.""" # Use the Cython function, passing beta_gr which can be None to use the global value return calculate_reduced_growth(CI_CS, CI_HS, beta_gr)
[docs] def survival( CI_CS: NDArray[np.float64], CI_HS: NDArray[np.float64], CI_CS_dead: NDArray[np.float64], CI_HS_dead: NDArray[np.float64], dbh: NDArray[np.float64], reduced: bool = True, ) -> NDArray[np.float64]: """Calculate the reduced survival using the formula from Cython.""" # apply_bg should be False when reduced=True, and True when reduced=False return calculate_survival_rate( CI_CS, CI_HS, CI_CS_dead, CI_HS_dead, dbh, apply_bg=not reduced )
# THESE CALCULATIONS NEED TO BE UPDATED
[docs] def reduced_recruitment( CI_CS: NDArray[np.float64], CI_HS: NDArray[np.float64], CI_CS_d: NDArray[np.float64], CI_HS_d: NDArray[np.float64], dbh: NDArray[np.float64], species, ) -> NDArray[np.float64]: """Calculate the reduced recruitment using the formula from Cython.""" # This now correctly passes to the Cython function return calculate_recruitment_rate(CI_CS, CI_HS, CI_CS_d, CI_HS_d, dbh, species)
[docs] def size_class(bins: int = 32, delta_size: float = 0.2558) -> np.ndarray: """Calculate the size classes, that are needed for further analysis.""" return np.exp(delta_size * np.arange(bins))
[docs] def mean_count_size_class( size_class: np.ndarray, parameter: NDArray[np.float64], dbh: NDArray[np.float64], count: bool = False, ) -> np.ndarray: """Get the mean values of the parameter specified for a size class.""" mean_count = np.zeros(len(size_class) - 1) for i in range(len(size_class) - 1): indices = np.where((dbh >= size_class[i]) & (dbh < size_class[i + 1]))[0] if indices.size != 0: if not count: mean_count[i] = np.mean(parameter[indices]) else: mean_count[i] = len(parameter[indices]) else: mean_count[i] = 0 return mean_count
[docs] def create_initial_inventory( n_species: int = 80, n_repro: int = 22646, n_saplings: int = 10000, initial_dbh_rp: float = 6.64, initial_dbh_sa: float = 0.397, dim_x: float = 1000, dim_y: float = 500, radius: float = 20, num_threads=1, ) -> np.ndarray: """ Create an initial inventory for a uniformly distributed forest. The abundance per species is calculated by the mean density of species of BCI. Then for each species the trees/individuals are located randomly according to their abundance. the initial dbh, can be also calculated as: # np.sqrt(4*mean_BA/np.pi) mean_BA: float = 6.64, ToDo: -2 (dead, memory), -1(just died), 0(alive), 1(recruit), 2(reproud) """ # mean density of species based on BCI data mean_density_rp = int(n_repro / n_species * (dim_x * dim_y / (1000 * 500))) mean_density_sa = int(n_saplings / n_species * (dim_x * dim_y / (1000 * 500))) abundance_rp = int(0.5 * mean_density_rp) + np.random.randint( low=0, high=mean_density_rp, size=n_species ) abundance_sa = int(0.5 * mean_density_sa) + np.random.randint( low=0, high=mean_density_sa, size=n_species ) number_trees = abundance_rp.sum() + abundance_sa.sum() dtype = np.dtype( [ ("x", "f8"), ("y", "f8"), ("dbh", "f8"), ("species", "int64"), ("status", "int32"), ("CI_CS", "f8"), ("CI_HS", "f8"), ("CI_CS_d", "f8"), ("CI_HS_d", "f8"), ] ) inventory = np.zeros(number_trees, dtype=dtype) inventory["status"] = int(1) index = 0 for sp in range(n_species): for _ in range(abundance_rp[sp]): inventory["x"][index] = np.random.uniform(0, dim_x) inventory["y"][index] = np.random.uniform(0, dim_y) inventory["species"][index] = sp inventory["dbh"][index] = initial_dbh_rp index += 1 for _ in range(abundance_sa[sp]): inventory["x"][index] = np.random.uniform(0, dim_x) inventory["y"][index] = np.random.uniform(0, dim_y) inventory["species"][index] = sp inventory["dbh"][index] = initial_dbh_sa index += 1 # ensure species_ids start at 0 inventory["species"] -= np.min(inventory["species"]) CI_CS, CI_HS, CI_CS_d, CI_HS_d = crowding_indices( inventory["x"], inventory["y"], inventory["species"], inventory["status"], radius, cell_size=0, dbh=inventory["dbh"], num_threads=num_threads, ) inventory["CI_CS"] = CI_CS inventory["CI_HS"] = CI_HS inventory["CI_CS_d"] = CI_CS_d inventory["CI_HS_d"] = CI_HS_d return inventory
[docs] def read_initial_inventory(file_name: Path, radius: float = 10) -> np.ndarray: dtype = np.dtype( [ ("x", "f8"), ("y", "f8"), ("dbh", "f8"), ("species", "int64"), ("status", "int32"), ("CI_CS", "f8"), ("CI_HS", "f8"), ("CI_CS_d", "f8"), ("CI_HS_d", "f8"), ] ) if file_name.suffix == ".phy": input_data = pd.read_csv(file_name, sep=r"\s+") inventory = np.zeros(input_data.shape[0], dtype=dtype) inventory["x"] = input_data.iloc[:, 0] inventory["y"] = input_data.iloc[:, 1] inventory["dbh"] = input_data.iloc[:, 4] / 100 inventory["species"] = np.unique(input_data["Acro"], return_inverse=True)[1] inventory["status"] = input_data.iloc[:, 7].astype(np.int32) CI_CS, CI_HS, CI_CS_d, CI_HS_d = crowding_indices( inventory["x"], inventory["y"], inventory["species"], inventory["status"], radius, cell_size=0, dbh=inventory["dbh"], ) inventory["CI_CS"] = CI_CS inventory["CI_HS"] = CI_HS inventory["CI_CS_d"] = CI_CS_d inventory["CI_HS_d"] = CI_HS_d return inventory
[docs] def read_param_file( file: Path, skip_header: int = 0, skip_footer: int = 0 ) -> dict[str, float | int | str]: params: dict[str, float | int | str] = {} with open(file) as f: lines = f.readlines() lines = lines[ skip_header : len(lines) - skip_footer if skip_footer > 0 else None ] for line_num, line in enumerate(lines): line = line.strip() if not line or line.startswith("#"): continue # The following lines are special cases if line == "Parameter ranges from mean and var": continue # Skip this header line if line.startswith("None"): continue # Skip this line about the matrix # Handle regular parameter lines parts = line.split(None, 1) # Split on first whitespace if len(parts) < 2: continue value_str, rest = parts # Extract parameter name (everything before the first --) if "--" in rest: name = rest.split("--")[0].strip() else: name = rest.strip() # Convert value to appropriate type value: float | int | str try: if "." in value_str or "e" in value_str.lower(): value = float(value_str) else: value = int(value_str) except ValueError: value = value_str params[name] = value # Cleanup dict keys, because sometimes old values are saved in the string. cleaned = {re.sub(r"\s+[\d.]+", "", k).strip(): v for k, v in params.items()} return cleaned
PARAMS_PYTHONIC_MAP = { # Core parameters "tmax": "t_max", # number of census steps to simulate "Qsize": "cell_size", # quadrat size in meters "Qdim1": "quadrat_dim_x", # x width in number of quadrats "Qdim2": "quadrat_dim_y", # y width in number of quadrats "NrSp": "num_species", # initial number of species "ZOI": "neighborhood_radius", # neighborhood radius (zone of influence) # Reproduction and survival "Rep": "mean_reproduction", # mean reproduction rate "RangeRep": "reproduction_range", # range of variability in reproduction rate "Surv": "background_survival", # background survival rate "SurvSapl": "background_survival_sap", # background survival rate for saplings # Dispersal "Sigma": "sigma_dispersal", # sigma parameter of dispersal kernel "RangeSigma": "sigma_range", # range of interspecific variability of sigma "ProbLD": "prob_long_distance", # proportion of recruits with long-distance dispersal "RangeProbLD": "prob_ld_range", # interspecific variability of probLD # Competition parameters "beta": "beta", # competition index scaling factor "RangeBeta": "beta_range", # interspecific variability of beta "betaH": "beta_H", # competition matrix value for heterospecifics "RangeBetaH": "beta_H_range", # range of interspecific variability of betaH # Establishment "betaEst": "beta_establishment", # competition index scaling for establishment "RangebetaEst": "beta_establishment_range", # interspecific variability of betaEst "betaHEst": "beta_H_establishment", # rel. strength of het. vs conspecific competition on establishment "RangeBetaHEst": "beta_H_establishment_range", # interspecific variability in betaHEst # Dead competition "betaDead": "beta_d", # dead competition parameter "RangeBetaDead": "beta_d_range", # range of interspecific variability of beta_d "BetaHDead": "beta_H_d", # dead competition for heterospecifics "RangeBetaHDead": "beta_H_d_range", # range of interspecific variability of beta_H_d # Establishment dead competition "betaEstDead": "beta_d_establishment", # dead competition for establishment "BetaEstHDead": "beta_H_d_establishment", # dead competition for heterospecifics on establishment # Size-dependency "BetaEstSize": "beta_size_establishment", # coefficient of size-dependency of establishment "BetaSize": "beta_size", # reproductive: coefficient of size-dependency of survival # Clustering "# clusters/sp": "clusters_per_species", # number of cluster centers per species "var clusters": "cluster_variation", # probability cluster center changes location "PShared clust": "shared_cluster_prop", # proportion of shared cluster centers # Growth "r_model": "max_growth_rate", # max size increase per time step "RangeR_model": "growth_rate_range", # interspecific variability in max growth rate "c_model": "growth_exponent", # power-law exponent of growth with dbh "RangeC_model": "growth_exponent_range", # interspecific variability in growth exponent # Growth competition "beta_gr": "beta_growth", # reduction in growth due to competition "RangeBeta_gr": "beta_growth_range", # interspecific variability in beta_gr "betaH_gr": "beta_H_growth", # rel strength of het. vs conspecific competition on growth "RangeBetaH_gr": "beta_H_growth_range", # interspecific variability in betaH_gr # Size and other parameters "Max dbh (dm)": "max_dbh", # maximal dbh in decimeters "Min rec surv": "min_recruit_survival", # minimal survival rate for recruits "Mean BA": "mean_basal_area", # mean basal area for initial condition "Dbh recruits": "recruit_dbh", # dbh of recruits in dm "RepSize": "reproductive_size", # mean reproductive size (in dm) "RangeRepSize": "reproductive_size_range", # interspecific variability in mean reproductive size "Time lag": "time_lag", # time lag in census periods }
[docs] def map_params(params: dict) -> dict: """Map the parameters to pythonic naming and scale certain values.""" mapped = {} for key, value in params.items(): py_key = PARAMS_PYTHONIC_MAP.get(key, key) # Divide by 100 if the key is "reproductive_size" or "recruit_dbh" if py_key in ("reproductive_size", "recruit_dbh"): try: mapped[py_key] = float(value) / 100 except Exception: mapped[py_key] = value else: mapped[py_key] = value return mapped
def _default_params() -> dict: """ Default values as defined in the parameter file. This function should be used to initialize a new model or reset parameters between tests to avoid test interference. """ default_params = { # Core parameters "t_max": 500, # number of census steps to simulate, each step is 5 years "cell_size": 10, # quadrat size in m (technical parameter to optimize search of neighbors) "quadrat_dim_x": 100, # x width in # quadrats "quadrat_dim_y": 50, # y width in # quadrats "num_species": 120, # initial number of species in the simulation "neighborhood_radius": 10, # neighborhood radius, maximum range of plant interactions # Reproduction and survival "mean_reproduction": 0.173, # mean number of recruits per individual within the 5yr timestep "reproduction_range": 0.00, # range of variability in reproductive rate "background_survival": 0.903, # background survival rate (5yrs) without neighbors "background_survival_sap": 0.8983, # background survival rate (5yrs) of saplings without neighbors # Dispersal "sigma_dispersal": 14, # parameter sigma of Thomas process, standard deviation of Gaussian dispersal kernel "sigma_range": 12.0, # range of interspecific variability of sigma "prob_long_distance": 0.5, # proportion of recruits distributed independent of parents around cluster centers "prob_ld_range": 0.00, # interspecific variability of probLD # Competition parameters "beta": 0.000, # factor beta scales competition index CI to survival: exp(-beta*CI) "beta_growth": 0.06, # reduction in growth due to competition: exp(-beta_gr*(CI_CS+betaH_gr*CI_HS)) "beta_H": 0.00, # value of competition matrix for heterospecifics "beta_H_growth": 0.23, # rel strength of het. vs consp competition on growth "growth_exponent": 0.4, # power-law exponent of growth with dbh "max_growth_rate": 0.135, # max size increase in dm/5 years "clusters_per_species": 10, # # cluster centers per species per 50ha "cluster_variation": 1.0, # probability that cluster center changes location in one timestep "shared_cluster_prop": 0.0, # proportion of cluster centers that are shared for all species "min_recruit_survival": 0.89, # minimal survival rate of location to be accepted for recruits "recruit_dbh": 0.5, # dbh recruits in dm (converted from mm in set_parameters) "reproductive_size": 1.0, # mean reproductive size in dm (converted from mm in set_parameters) "max_dbh": 200, # maximal dbh (dm) # Dead competition "beta_d": 0.0133, # dead competition parameter "beta_H_d": 0.00, # dead competition for heterospecifics # Size-dependency "beta_size": 0.00, # reproductive: coefficient of size-dependency of survival # Establishment "beta_establishment": 0.0048, # factor betaEst scales competition index CI to establishment: exp(-betaEst*CI) "beta_H_establishment": 0.0, # rel. strength of het. vs consp competition on establishment "beta_d_establishment": 0.0, # dead competition for establishment "beta_H_d_establishment": 0.0, # dead competition for heterospecifics on establishment "beta_size_establishment": 0.0361, # coefficient of size-dependency of survival of saplings } return default_params
[docs] def species_abundance_distribution(species: NDArray[int], abundance_class: np.ndarray): """Get the amount of species per abundance class.""" spec, counts = np.unique(species, return_counts=True) return np.histogram(counts, bins=abundance_class)
[docs] def log_likelihood( simulated_mean: np.ndarray, simulated_std: np.ndarray, reference_par: np.ndarray ): """ Calculate the log-likelihood for a certain parameter. The formula used is: log(L) = -(sum((x_n - μ)²/(2σ²)) + N * log(√(2πσ²))) """ log_likelihood = np.sum( (np.array(reference_par) - np.array(simulated_mean)) ** 2 / (2 * np.array(simulated_std) ** 2) - np.log(1 / (np.sqrt(2 * np.pi) * simulated_std)) ) return log_likelihood
[docs] def get_CI_CS_histogram(inventory, bins): CI_CS = inventory["CI_CS"] counts, _ = np.histogram(CI_CS, bins=bins) return counts
[docs] def get_CI_HS_histogram(inventory, bins): CI_HS = inventory["CI_HS"] counts, _ = np.histogram(CI_HS, bins=bins) return counts
[docs] def get_reduced_growth_histogram(inventory, bins, beta_gr=0.084): CI_CS = inventory["CI_CS"] CI_HS = inventory["CI_HS"] vals = reduced_growth(CI_CS, CI_HS, beta_gr=beta_gr) counts, _ = np.histogram(vals, bins=bins) return counts
[docs] def get_survival_histogram(inventory, bins): CI_CS = inventory["CI_CS"] CI_HS = inventory["CI_HS"] CI_CS_d = inventory["CI_CS_d"] CI_HS_d = inventory["CI_HS_d"] dbh = inventory["dbh"] vals = survival(CI_CS, CI_HS, CI_CS_d, CI_HS_d, dbh) counts, _ = np.histogram(vals, bins=bins) return counts
[docs] def get_recruitment_histogram(inventory, bins): CI_CS = inventory["CI_CS"] CI_HS = inventory["CI_HS"] CI_CS_d = inventory["CI_CS_d"] CI_HS_d = inventory["CI_HS_d"] dbh = inventory["dbh"] species = inventory["species"] vals = reduced_recruitment(CI_CS, CI_HS, CI_CS_d, CI_HS_d, dbh, species) counts, _ = np.histogram(vals, bins=bins) return counts
[docs] def get_SAD_histogram(inventory, bins): species = inventory["species"] unique_species, species_counts = np.unique(species, return_counts=True) sad_hist, _ = np.histogram( species_counts, bins=bins, weights=np.ones_like(species_counts) * 1 / len(unique_species), ) return sad_hist