Source code for growthcurves.inference

"""Utility functions for growth curve analysis.

This module provides utility functions for data validation, smoothing,
derivative calculations, and RMSE computation.
"""

import numpy as np
from scipy.signal import savgol_filter

import growthcurves as gc
from growthcurves.models import MODEL_REGISTRY, evaluate_parametric_model

# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------

no_fit_dictionary = {
    "max_od": 0.0,
    "N0": np.nan,
    "mu_max": 0.0,
    "intrinsic_growth_rate": None,
    "doubling_time": np.nan,
    "exp_phase_start": np.nan,
    "exp_phase_end": np.nan,
    "time_at_umax": np.nan,
    "od_at_umax": np.nan,
    "fit_t_min": np.nan,
    "fit_t_max": np.nan,
    "fit_method": None,
    "model_rmse": np.nan,
}


# -----------------------------------------------------------------------------
# Utility Functions
# -----------------------------------------------------------------------------


[docs] def validate_data(t, N, min_points=10): """ Validate and clean input growth curve N. This function filters out invalid N points that would cause problems in growth curve analysis, particularly when taking logarithms for exponential growth calculations. Filters out: - Non-finite values (NaN, inf, -inf) in t or OD - Non-positive OD values (N <= 0), which are invalid for log transformations - Datasets with insufficient N points or no t variation Parameters: t: Time array N: OD measurement array min_points: Minimum number of valid N points required (default: 10) Returns: Tuple of (time_clean, data_clean) with only valid N points, or (None, None) if the N doesn't meet minimum requirements. Examples: >>> t = np.array([0, 1, 2, 3, 4]) >>> N = np.array([0.0, 0.1, 0.2, np.nan, 0.4]) # Has zero and NaN >>> time_clean, data_clean = validate_data(t, N) >>> print(time_clean) [1 2 4] >>> print(data_clean) [0.1 0.2 0.4] """ t = np.asarray(t, dtype=float) N = np.asarray(N, dtype=float) # Filter: keep only finite values where N > 0 mask = np.isfinite(t) & np.isfinite(N) & (N > 0) t, N = t[mask], N[mask] # Check minimum requirements if len(t) < min_points or np.ptp(t) <= 0: return None, None return t, N
[docs] def compute_rmse(y_observed, y_predicted, in_log_space=False): """ Calculate root mean square error between observed and predicted values. Parameters: y_observed: Observed values y_predicted: Predicted values in_log_space: If True, compute RMSE in log space (default: False) Returns: RMSE value (float), or np.nan if no valid data points Note: - Parametric models use in_log_space=False (linear space) - Non-parametric models (spline, sliding window) use in_log_space=False when data is already log-transformed """ mask = np.isfinite(y_observed) & np.isfinite(y_predicted) if mask.sum() == 0: return np.nan obs = y_observed[mask] pred = y_predicted[mask] # Apply log transformation if requested and values are positive if in_log_space: if np.any(obs <= 0) or np.any(pred <= 0): return np.nan obs = np.log(obs) pred = np.log(pred) residuals = obs - pred return float(np.sqrt(np.mean(residuals**2)))
[docs] def compute_mu_max(t, N): """ Calculate the maximum specific growth rate from a fitted curve. The specific growth rate is μ = (1/N) * dN/dt = d(ln(N))/dt Parameters: t: Time array y_fit: Fitted OD values Returns: Maximum specific growth rate (t^-1) """ # Use dense t points for accurate derivative t_dense = np.linspace(t.min(), t.max(), 1000) N_interp = np.interp(t_dense, t, N) # Calculate specific growth rate: μ = (1/N) * dN/dt dN_dt = np.gradient(N_interp, t_dense) # Avoid division by very small values N_safe = np.maximum(N_interp, 1e-10) mu = dN_dt / N_safe # Return maximum specific growth rate return float(np.max(mu))
[docs] def smooth(N, window=11, poly=1, passes=2): """Apply Savitzky-Golay smoothing filter.""" n = len(N) if n < 7: return N w = int(window) | 1 # Ensure odd w = min(w, n if n % 2 else n - 1) p = min(int(poly), w - 1) for _ in range(passes): N = savgol_filter(N, w, p, mode="interp") return N
def _linear_interpolate_crossing(t, values, threshold, search_condition): """ Find t at which values cross a threshold using linear interpolation. Parameters: t: Time array values: Value array (e.g., specific growth rate) threshold: Threshold value to find crossing for search_condition: Boolean array indicating where to search for crossing Returns: Float t at threshold crossing, or None if no crossing found """ if not np.any(search_condition): return None crossing_idx = int(np.argmax(search_condition)) if crossing_idx == 0: return float(t[crossing_idx]) # Linear interpolation between points t0, t1 = t[crossing_idx - 1], t[crossing_idx] v0, v1 = values[crossing_idx - 1], values[crossing_idx] if v1 == v0: frac = 0.0 else: frac = (threshold - v0) / (v1 - v0) return float(t0 + frac * (t1 - t0))
[docs] def compute_phase_boundaries_mu_threshold( t, N, mu_max, lag_threshold=0.15, exp_threshold=0.15 ): """ Calculate lag and exponential phase end times from specific growth rate. Parameters: t: Time array N: OD values (should be from fitted/idealized curve) mu_max: Pre-calculated maximum specific growth rate (required) lag_threshold: Fraction of μ_max for lag phase end detection exp_threshold: Fraction of μ_max for exponential phase end detection Returns: Tuple of (lag_end, exp_end) times. """ if len(t) < 5 or np.ptp(t) <= 0: return float(t[0]) if len(t) > 0 else np.nan, ( float(t[-1]) if len(t) > 0 else np.nan ) # Calculate specific growth rate curve for finding phase boundaries _, mu = compute_instantaneous_mu(t, N) mu = np.nan_to_num(mu, nan=0.0) # Replace NaN with 0 mu = np.maximum(mu, 0) # Only consider positive growth # Use provided mu_max peak_val = float(mu_max) peak_idx = np.argmax(mu) if peak_val <= 0: return float(t[0]), float(t[-1]) lag_threshold = lag_threshold * peak_val exp_threshold = exp_threshold * peak_val # Find lag phase end (first crossing of lag threshold) lag_end = _linear_interpolate_crossing(t, mu, lag_threshold, mu >= lag_threshold) if lag_end is None: lag_end = float(t[0]) # Find exp phase end (first crossing below exp threshold after peak) after_peak = np.arange(len(t)) > peak_idx exp_end = _linear_interpolate_crossing( t, mu, exp_threshold, after_peak & (mu <= exp_threshold) ) if exp_end is None: exp_end = float(t[-1]) return lag_end, max(exp_end, lag_end)
[docs] def compute_phase_boundaries_tangent( t, N, time_at_umax, od_at_umax, mu_max, baseline_od=None, plateau_od=None ): """ Calculate exponential phase boundaries using tangent line method. This method extends the tangent line at the point of maximum growth rate (μ_max) down to the baseline (lag phase) and up to the plateau (stationary phase). The intersection points define the start and end of exponential phase. At the point of maximum growth rate, the tangent line in LOG space is: ln(OD(t)) = ln(od_at_umax) + μ_max * (t - time_at_umax) Which gives in linear space: OD(t) = od_at_umax * exp(μ_max * (t - time_at_umax)) Parameters: t: Time array N: OD values (should be from fitted/idealized curve) time_at_umax: Time at which μ_max occurs od_at_umax: OD value at time_at_umax mu_max: Maximum specific growth rate (μ_max) baseline_od: Baseline OD (lag phase level). If None, uses min(N) plateau_od: Plateau OD (stationary phase level). If None, uses max(N) Returns: Tuple of (exp_phase_start, exp_phase_end) times. Note: This method is more appropriate for non-parametric fits where the exponential phase is well-defined by the tangent at μ_max. """ if mu_max <= 0 or od_at_umax <= 0: # Invalid parameters, return full range return float(t[0]) if len(t) > 0 else np.nan, ( float(t[-1]) if len(t) > 0 else np.nan ) # Determine baseline and plateau if not provided if baseline_od is None: baseline_od = float(np.min(N)) if plateau_od is None: plateau_od = float(np.max(N)) # Ensure baseline < od_at_umax < plateau baseline_od = min(baseline_od, od_at_umax * 0.9) plateau_od = max(plateau_od, od_at_umax * 1.1) # Tangent line in LOG space: ln(OD(t)) = ln(od_at_umax) + μ_max * (t - time_at_umax) # Which gives: OD(t) = od_at_umax * exp(μ_max * (t - time_at_umax)) # Solving for t when OD = baseline_od: # baseline_od = od_at_umax * exp(μ_max * (t_start - time_at_umax)) # ln(baseline_od / od_at_umax) = μ_max * (t_start - time_at_umax) # t_start = time_at_umax + ln(baseline_od / od_at_umax) / μ_max t_start = time_at_umax + np.log(baseline_od / od_at_umax) / mu_max # Solving for t when OD = plateau_od: # plateau_od = od_at_umax * exp(μ_max * (t_end - time_at_umax)) # ln(plateau_od / od_at_umax) = μ_max * (t_end - time_at_umax) # t_end = time_at_umax + ln(plateau_od / od_at_umax) / μ_max t_end = time_at_umax + np.log(plateau_od / od_at_umax) / mu_max # Constrain to N range t_start = max(float(t[0]), t_start) t_end = min(float(t[-1]), t_end) # Ensure start < end if t_start >= t_end: # Fallback: use time_at_umax as midpoint t_range = float(t[-1] - t[0]) t_start = max(float(t[0]), time_at_umax - 0.3 * t_range) t_end = min(float(t[-1]), time_at_umax + 0.3 * t_range) return float(t_start), float(t_end)
[docs] def compute_phase_boundaries( t, N, method="tangent", time_at_umax=None, od_at_umax=None, mu_max=None, baseline_od=None, plateau_od=None, lag_threshold=0.15, exp_threshold=0.15, ): """ Calculate exponential phase boundaries using specified method. This unified function allows choosing between different methods for determining the start and end of the exponential growth phase. Parameters: t: Time array N: OD values (should be from fitted/idealized curve) method: Method to use for phase boundary calculation: - "tangent": Tangent line method (requires time_at_umax, od_at_umax, mu_max) - "threshold": Threshold-based method using fractions of μ_max time_at_umax: Time at which μ_max occurs (required for "tangent" method) od_at_umax: OD value at time_at_umax (required for "tangent" method) mu_max: Maximum specific growth rate (required for "tangent" method) baseline_od: Baseline OD for tangent method (defaults to min(N)) plateau_od: Plateau OD for tangent method (defaults to max(N)) lag_threshold: Fraction of μ_max for lag phase end (threshold method, default: 0.15) exp_threshold: Fraction of μ_max for exp phase end (threshold method, default: 0.15) Returns: Tuple of (exp_phase_start, exp_phase_end) times. Examples: >>> # Tangent method (for non-parametric fits) >>> exp_start, exp_end = calculate_phase_boundaries( ... t, N, method="tangent", ... time_at_umax=5.0, od_at_umax=0.5, mu_max=0.8 ... ) >>> # Threshold method (for parametric fits) >>> exp_start, exp_end = calculate_phase_boundaries( ... t, N, method="threshold", lag_threshold=0.15, exp_threshold=0.15 ... ) """ if method == "tangent": if time_at_umax is None or od_at_umax is None or mu_max is None: raise ValueError( "Tangent method requires time_at_umax, od_at_umax, and mu_max" ) return compute_phase_boundaries_tangent( t, N, time_at_umax, od_at_umax, mu_max, baseline_od, plateau_od ) elif method == "threshold": return compute_phase_boundaries_mu_threshold( t, N, mu_max, lag_threshold, exp_threshold ) else: raise ValueError(f"Unknown method '{method}'. Choose 'tangent' or 'threshold'.")
# ----------------------------------------------------------------------------- # Growth Statistics Extraction # ----------------------------------------------------------------------------- def _extract_stats_mech_logistic( fit_result, t, N, lag_threshold=0.15, exp_threshold=0.15, phase_boundary_method="threshold", ): """ Extract growth statistics from mechanistic logistic model fit. ODE: dN/dt = μ * (1 - N/K) * N Parameters: fit_result: Dict containing 'params' with mu, K, N0 t: Time array N: OD values lag_threshold, exp_threshold: Phase detection thresholds phase_boundary_method: "threshold" or "tangent" Returns: Growth statistics dictionary. """ params = fit_result.get("params", {}) # Extract model parameters K = float(params["K"]) # Carrying capacity above baseline y0 = float(params["y0"]) # Baseline OD mu_intrinsic = float(params["mu"]) # Intrinsic growth rate # Evaluate model N_fit = evaluate_parametric_model(t, "mech_logistic", params) mu_max = compute_mu_max(t, N_fit) # Dense grid for accurate calculations t_dense = np.linspace(t.min(), t.max(), 500) N_dense = evaluate_parametric_model(t_dense, "mech_logistic", params) # Calculate specific growth rate curve N_safe = np.maximum(N_dense, 1e-10) mu_dense = np.gradient(np.log(N_safe), t_dense) # Find t of maximum specific growth rate max_mu_idx = int(np.argmax(mu_dense)) time_at_umax = float(t_dense[max_mu_idx]) od_at_umax = float(N_dense[max_mu_idx]) if mu_max <= 0: stats = bad_fit_stats() stats["max_od"] = y0 + K stats["N0"] = y0 stats["intrinsic_growth_rate"] = mu_intrinsic stats["fit_method"] = "model_fitting_mech_logistic" return stats # Phase boundaries using specified method exp_phase_start, exp_phase_end = compute_phase_boundaries( t_dense, N_dense, method=phase_boundary_method, time_at_umax=time_at_umax, od_at_umax=od_at_umax, mu_max=mu_max, baseline_od=float(np.min(N_dense)), plateau_od=y0 + K, lag_threshold=lag_threshold, exp_threshold=exp_threshold, ) # Doubling t from mu_max doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan # RMSE rmse = compute_rmse(N, N_fit) return { "max_od": y0 + K, "N0": y0, "mu_max": float(mu_max), "intrinsic_growth_rate": mu_intrinsic, "doubling_time": float(doubling_time), "exp_phase_start": exp_phase_start, "exp_phase_end": max(exp_phase_end, exp_phase_start), "time_at_umax": time_at_umax, "od_at_umax": od_at_umax, "fit_t_min": float(t.min()), "fit_t_max": float(t.max()), "fit_method": "model_fitting_mech_logistic", "model_rmse": rmse, } def _extract_stats_mech_gompertz( fit_result, t, N, lag_threshold=0.15, exp_threshold=0.15, phase_boundary_method="threshold", ): """ Extract growth statistics from mechanistic Gompertz model fit. ODE: dN/dt = μ * log(K/N) * N Parameters: fit_result: Dict containing 'params' with mu, K, N0 t: Time array N: OD values lag_threshold, exp_threshold: Phase detection thresholds phase_boundary_method: "threshold" or "tangent" Returns: Growth statistics dictionary. """ from .models import evaluate_parametric_model params = fit_result.get("params", {}) # Extract model parameters K = float(params["K"]) # Carrying capacity above baseline y0 = float(params["y0"]) # Baseline OD mu_intrinsic = float(params["mu"]) # Intrinsic growth rate # Evaluate model N_fit = evaluate_parametric_model(t, "mech_gompertz", params) mu_max = compute_mu_max(t, N_fit) # Dense grid for accurate calculations t_dense = np.linspace(t.min(), t.max(), 500) N_dense = evaluate_parametric_model(t_dense, "mech_gompertz", params) # Calculate specific growth rate curve N_safe = np.maximum(N_dense, 1e-10) mu_dense = np.gradient(np.log(N_safe), t_dense) # Find t of maximum specific growth rate max_mu_idx = int(np.argmax(mu_dense)) time_at_umax = float(t_dense[max_mu_idx]) od_at_umax = float(N_dense[max_mu_idx]) if mu_max <= 0: stats = bad_fit_stats() stats["max_od"] = y0 + K stats["N0"] = y0 stats["intrinsic_growth_rate"] = mu_intrinsic stats["fit_method"] = "model_fitting_mech_gompertz" return stats # Phase boundaries using specified method exp_phase_start, exp_phase_end = compute_phase_boundaries( t_dense, N_dense, method=phase_boundary_method, time_at_umax=time_at_umax, od_at_umax=od_at_umax, mu_max=mu_max, baseline_od=float(np.min(N_dense)), plateau_od=y0 + K, lag_threshold=lag_threshold, exp_threshold=exp_threshold, ) # Doubling t from mu_max doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan # RMSE rmse = compute_rmse(N, N_fit) return { "max_od": y0 + K, "N0": y0, "mu_max": float(mu_max), "intrinsic_growth_rate": mu_intrinsic, "doubling_time": float(doubling_time), "exp_phase_start": exp_phase_start, "exp_phase_end": max(exp_phase_end, exp_phase_start), "time_at_umax": time_at_umax, "od_at_umax": od_at_umax, "fit_t_min": float(t.min()), "fit_t_max": float(t.max()), "fit_method": "model_fitting_mech_gompertz", "model_rmse": rmse, } def _extract_stats_mech_richards( fit_result, t, N, lag_threshold=0.15, exp_threshold=0.15, phase_boundary_method="threshold", ): """ Extract growth statistics from mechanistic Richards model fit. ODE: dN/dt = μ * (1 - (N/K)^β) * N Parameters: fit_result: Dict containing 'params' with mu, K, N0, beta t: Time array N: OD values lag_threshold, exp_threshold: Phase detection thresholds phase_boundary_method: "threshold" or "tangent" Returns: Growth statistics dictionary. """ from .models import evaluate_parametric_model params = fit_result.get("params", {}) # Extract model parameters K = float(params["K"]) # Carrying capacity above baseline y0 = float(params["y0"]) # Baseline OD mu_intrinsic = float(params["mu"]) # Intrinsic growth rate # Evaluate model y_fit = evaluate_parametric_model(t, "mech_richards", params) mu_max = compute_mu_max(t, y_fit) # Dense grid for accurate calculations t_dense = np.linspace(t.min(), t.max(), 500) N_dense = evaluate_parametric_model(t_dense, "mech_richards", params) # Calculate specific growth rate curve N_safe = np.maximum(N_dense, 1e-10) mu_dense = np.gradient(np.log(N_safe), t_dense) # Find t of maximum specific growth rate max_mu_idx = int(np.argmax(mu_dense)) time_at_umax = float(t_dense[max_mu_idx]) od_at_umax = float(N_dense[max_mu_idx]) if mu_max <= 0: stats = bad_fit_stats() stats["max_od"] = y0 + K stats["N0"] = y0 stats["intrinsic_growth_rate"] = mu_intrinsic stats["fit_method"] = "model_fitting_mech_richards" return stats # Phase boundaries using specified method exp_phase_start, exp_phase_end = compute_phase_boundaries( t_dense, N_dense, method=phase_boundary_method, time_at_umax=time_at_umax, od_at_umax=od_at_umax, mu_max=mu_max, baseline_od=float(np.min(N_dense)), plateau_od=y0 + K, lag_threshold=lag_threshold, exp_threshold=exp_threshold, ) # Doubling t from mu_max doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan # RMSE rmse = compute_rmse(N, y_fit) return { "max_od": y0 + K, "N0": y0, "mu_max": float(mu_max), "intrinsic_growth_rate": mu_intrinsic, "doubling_time": float(doubling_time), "exp_phase_start": exp_phase_start, "exp_phase_end": max(exp_phase_end, exp_phase_start), "time_at_umax": time_at_umax, "od_at_umax": od_at_umax, "fit_t_min": float(t.min()), "fit_t_max": float(t.max()), "fit_method": "model_fitting_mech_richards", "model_rmse": rmse, } def _extract_stats_mech_baranyi( fit_result, t, N, lag_threshold=0.15, exp_threshold=0.15, phase_boundary_method="threshold", ): """ Extract growth statistics from mechanistic Baranyi-Roberts model fit. ODE: dN/dt = μ * A(t) * (1 - N/K) * N where A(t) = exp(μ*t) / (exp(h0) - 1 + exp(μ*t)) Parameters: fit_result: Dict containing 'params' with mu, K, N0, h0 t: Time array N: OD values lag_threshold, exp_threshold: Phase detection thresholds phase_boundary_method: "threshold" or "tangent" Returns: Growth statistics dictionary. """ from .models import evaluate_parametric_model params = fit_result.get("params", {}) # Extract model parameters K = float(params["K"]) # Carrying capacity above baseline y0 = float(params["y0"]) # Baseline OD mu_intrinsic = float(params["mu"]) # Intrinsic growth rate # Evaluate model y_fit = evaluate_parametric_model(t, "mech_baranyi", params) mu_max = compute_mu_max(t, y_fit) # Dense grid for accurate calculations t_dense = np.linspace(t.min(), t.max(), 500) N_dense = evaluate_parametric_model(t_dense, "mech_baranyi", params) # Calculate specific growth rate curve N_safe = np.maximum(N_dense, 1e-10) mu_dense = np.gradient(np.log(N_safe), t_dense) # Find t of maximum specific growth rate max_mu_idx = int(np.argmax(mu_dense)) time_at_umax = float(t_dense[max_mu_idx]) od_at_umax = float(N_dense[max_mu_idx]) if mu_max <= 0: stats = bad_fit_stats() stats["max_od"] = y0 + K stats["N0"] = y0 stats["intrinsic_growth_rate"] = mu_intrinsic stats["fit_method"] = "model_fitting_mech_baranyi" return stats # Lag time is the fitted h0 parameter directly exp_phase_start = float(params["h0"]) # Exp phase end using specified method _, exp_phase_end = compute_phase_boundaries( t_dense, N_dense, method=phase_boundary_method, time_at_umax=time_at_umax, od_at_umax=od_at_umax, mu_max=mu_max, baseline_od=float(np.min(N_dense)), plateau_od=y0 + K, lag_threshold=lag_threshold, exp_threshold=exp_threshold, ) # Doubling t from mu_max doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan # RMSE rmse = compute_rmse(N, y_fit) return { "max_od": y0 + K, "N0": y0, "mu_max": float(mu_max), "intrinsic_growth_rate": mu_intrinsic, "doubling_time": float(doubling_time), "exp_phase_start": exp_phase_start, "exp_phase_end": max(exp_phase_end, exp_phase_start), "time_at_umax": time_at_umax, "od_at_umax": od_at_umax, "fit_t_min": float(t.min()), "fit_t_max": float(t.max()), "fit_method": "model_fitting_mech_baranyi", "model_rmse": rmse, } def _extract_stats_phenom_logistic( fit_result, t, N, lag_threshold=0.15, exp_threshold=0.15, phase_boundary_method="tangent", ): """ Extract growth statistics from phenomenological logistic model fit. ln(Nt/N0) = A / (1 + exp(4 * μ_max * (λ - t) / A + 2)) Parameters: fit_result: Dict containing 'params' with A, mu_max, lam, N0 t: Time array N: OD values lag_threshold, exp_threshold: Phase detection thresholds phase_boundary_method: "threshold" or "tangent" Returns: Growth statistics dictionary. """ from .models import evaluate_parametric_model params = fit_result.get("params", {}) # Extract model parameters float(params["A"]) # Maximum ln(OD/OD0) mu_max = float(params["mu_max"]) # Maximum specific growth rate (fitted parameter) lam = float(params["lam"]) # Lag t N0 = float(params["N0"]) # Initial OD # Evaluate model y_fit = evaluate_parametric_model(t, "phenom_logistic", params) # Dense grid for accurate calculations t_dense = np.linspace(t.min(), t.max(), 500) N_dense = evaluate_parametric_model(t_dense, "phenom_logistic", params) # Calculate specific growth rate curve N_safe = np.maximum(N_dense, 1e-10) mu_dense = np.gradient(np.log(N_safe), t_dense) # Find t of maximum specific growth rate max_mu_idx = int(np.argmax(mu_dense)) time_at_umax = float(t_dense[max_mu_idx]) od_at_umax = float(N_dense[max_mu_idx]) # Max OD is the maximum of the fitted model trace max_od = float(np.max(N_dense)) if mu_max <= 0: stats = bad_fit_stats() stats["max_od"] = max_od stats["N0"] = N0 stats["intrinsic_growth_rate"] = ( None # Phenomenological: no intrinsic parameter ) stats["fit_method"] = "model_fitting_phenom_logistic" return stats # Exp phase start is λ for phenomenological models exp_phase_start = lam # Exp phase end using specified method _, exp_phase_end = compute_phase_boundaries( t_dense, N_dense, method=phase_boundary_method, time_at_umax=time_at_umax, od_at_umax=od_at_umax, mu_max=mu_max, baseline_od=float(np.min(N_dense)), plateau_od=float(np.max(N_dense)), lag_threshold=lag_threshold, exp_threshold=exp_threshold, ) # Doubling t from mu_max doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan # RMSE rmse = compute_rmse(N, y_fit) return { "max_od": max_od, "N0": N0, "mu_max": float(mu_max), "intrinsic_growth_rate": None, # Phenomenological: no intrinsic parameter "doubling_time": float(doubling_time), "exp_phase_start": exp_phase_start, "exp_phase_end": max(exp_phase_end, exp_phase_start), "time_at_umax": time_at_umax, "od_at_umax": od_at_umax, "fit_t_min": float(t.min()), "fit_t_max": float(t.max()), "fit_method": "model_fitting_phenom_logistic", "model_rmse": rmse, } def _extract_stats_phenom_gompertz( fit_result, t, N, lag_threshold=0.15, exp_threshold=0.15, phase_boundary_method="tangent", ): """ Extract growth statistics from phenomenological Gompertz model fit. ln(Nt/N0) = A * exp(-exp(μ_max * e * (λ - t) / A + 1)) Parameters: fit_result: Dict containing 'params' with A, mu_max, lam, N0 t: Time array N: OD values lag_threshold, exp_threshold: Phase detection thresholds phase_boundary_method: "threshold" or "tangent" Returns: Growth statistics dictionary. """ from .models import evaluate_parametric_model params = fit_result.get("params", {}) # Extract model parameters float(params["A"]) # Maximum ln(OD/OD0) mu_max = float(params["mu_max"]) # Maximum specific growth rate (fitted parameter) lam = float(params["lam"]) # Lag t N0 = float(params["N0"]) # Initial OD # Evaluate model y_fit = evaluate_parametric_model(t, "phenom_gompertz", params) # Dense grid for accurate calculations t_dense = np.linspace(t.min(), t.max(), 500) N_dense = evaluate_parametric_model(t_dense, "phenom_gompertz", params) # Calculate specific growth rate curve N_safe = np.maximum(N_dense, 1e-10) mu_dense = np.gradient(np.log(N_safe), t_dense) # Find t of maximum specific growth rate max_mu_idx = int(np.argmax(mu_dense)) time_at_umax = float(t_dense[max_mu_idx]) od_at_umax = float(N_dense[max_mu_idx]) # Max OD is the maximum of the fitted model trace max_od = float(np.max(N_dense)) if mu_max <= 0: stats = bad_fit_stats() stats["max_od"] = max_od stats["N0"] = N0 stats["intrinsic_growth_rate"] = ( None # Phenomenological: no intrinsic parameter ) stats["fit_method"] = "model_fitting_phenom_gompertz" return stats # Exp phase start is λ for phenomenological models exp_phase_start = lam # Exp phase end using specified method _, exp_phase_end = compute_phase_boundaries( t_dense, N_dense, method=phase_boundary_method, time_at_umax=time_at_umax, od_at_umax=od_at_umax, mu_max=mu_max, baseline_od=float(np.min(N_dense)), plateau_od=float(np.max(N_dense)), lag_threshold=lag_threshold, exp_threshold=exp_threshold, ) # Doubling t from mu_max doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan # RMSE rmse = compute_rmse(N, y_fit) return { "max_od": max_od, "N0": N0, "mu_max": float(mu_max), "intrinsic_growth_rate": None, # Phenomenological: no intrinsic parameter "doubling_time": float(doubling_time), "exp_phase_start": exp_phase_start, "exp_phase_end": max(exp_phase_end, exp_phase_start), "time_at_umax": time_at_umax, "od_at_umax": od_at_umax, "fit_t_min": float(t.min()), "fit_t_max": float(t.max()), "fit_method": "model_fitting_phenom_gompertz", "model_rmse": rmse, } def _extract_stats_phenom_gompertz_modified( fit_result, t, N, lag_threshold=0.15, exp_threshold=0.15, phase_boundary_method="tangent", ): """ Extract growth statistics from phenomenological modified Gompertz model fit. ln(Nt/N0) = A * exp(-exp(μ_max * e * (λ - t) / A + 1)) + A * exp(α * (t - t_shift)) Parameters: fit_result: Dict containing 'params' with A, mu_max, lam, alpha, t_shift, N0 t: Time array N: OD values lag_threshold, exp_threshold: Phase detection thresholds phase_boundary_method: "threshold" or "tangent" Returns: Growth statistics dictionary. """ from .models import evaluate_parametric_model params = fit_result.get("params", {}) # Extract model parameters float(params["A"]) # Maximum ln(OD/OD0) mu_max = float(params["mu_max"]) # Maximum specific growth rate (fitted parameter) lam = float(params["lam"]) # Lag t N0_param = float(params["N0"]) # Fitted normalization parameter # Evaluate model y_fit = evaluate_parametric_model(t, "phenom_gompertz_modified", params) # Dense grid for accurate calculations t_dense = np.linspace(t.min(), t.max(), 500) N_dense = evaluate_parametric_model(t_dense, "phenom_gompertz_modified", params) # For the modified Gompertz form, the fitted N0 parameter is not always the # model value at the earliest timepoint. Use model-predicted initial OD for stats. first_idx = int(np.argmin(t)) N0 = float(y_fit[first_idx]) if len(y_fit) > 0 else N0_param # Calculate specific growth rate curve N_safe = np.maximum(N_dense, 1e-10) mu_dense = np.gradient(np.log(N_safe), t_dense) # Find t of maximum specific growth rate max_mu_idx = int(np.argmax(mu_dense)) time_at_umax = float(t_dense[max_mu_idx]) od_at_umax = float(N_dense[max_mu_idx]) # Max OD is the maximum of the fitted model trace max_od = float(np.max(N_dense)) if mu_max <= 0: stats = bad_fit_stats() stats["max_od"] = max_od stats["N0"] = N0 stats["intrinsic_growth_rate"] = ( None # Phenomenological: no intrinsic parameter ) stats["fit_method"] = "model_fitting_phenom_gompertz_modified" return stats # Exp phase start is λ for phenomenological models exp_phase_start = lam # Exp phase end using specified method _, exp_phase_end = compute_phase_boundaries( t_dense, N_dense, method=phase_boundary_method, time_at_umax=time_at_umax, od_at_umax=od_at_umax, mu_max=mu_max, baseline_od=float(np.min(N_dense)), plateau_od=float(np.max(N_dense)), lag_threshold=lag_threshold, exp_threshold=exp_threshold, ) # Doubling t from mu_max doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan # RMSE rmse = compute_rmse(N, y_fit) return { "max_od": max_od, "N0": N0, "mu_max": float(mu_max), "intrinsic_growth_rate": None, # Phenomenological: no intrinsic parameter "doubling_time": float(doubling_time), "exp_phase_start": exp_phase_start, "exp_phase_end": max(exp_phase_end, exp_phase_start), "time_at_umax": time_at_umax, "od_at_umax": od_at_umax, "fit_t_min": float(t.min()), "fit_t_max": float(t.max()), "fit_method": "model_fitting_phenom_gompertz_modified", "model_rmse": rmse, } def _extract_stats_phenom_richards( fit_result, t, N, lag_threshold=0.15, exp_threshold=0.15, phase_boundary_method="tangent", ): """ Extract growth statistics from phenomenological Richards model fit. ln(Nt/N0)= A * (1 + ν * exp(1 + ν + μ_max * (1 + ν)^(1 + 1/ν) * (λ - t) / A))^(-1/ν) Parameters: fit_result: Dict containing 'params' with A, mu_max, lam, nu, N0 t: Time array N: OD values lag_threshold, exp_threshold: Phase detection thresholds phase_boundary_method: "threshold" or "tangent" Returns: Growth statistics dictionary. """ from .models import evaluate_parametric_model params = fit_result.get("params", {}) # Extract model parameters float(params["A"]) # Maximum ln(OD/OD0) mu_max = float(params["mu_max"]) # Maximum specific growth rate (fitted parameter) lam = float(params["lam"]) # Lag t N0 = float(params["N0"]) # Initial OD # Evaluate model y_fit = evaluate_parametric_model(t, "phenom_richards", params) # Dense grid for accurate calculations t_dense = np.linspace(t.min(), t.max(), 500) N_dense = evaluate_parametric_model(t_dense, "phenom_richards", params) # Calculate specific growth rate curve N_safe = np.maximum(N_dense, 1e-10) mu_dense = np.gradient(np.log(N_safe), t_dense) # Find t of maximum specific growth rate max_mu_idx = int(np.argmax(mu_dense)) time_at_umax = float(t_dense[max_mu_idx]) od_at_umax = float(N_dense[max_mu_idx]) # Max OD is the maximum of the fitted model trace max_od = float(np.max(N_dense)) if mu_max <= 0: stats = bad_fit_stats() stats["max_od"] = max_od stats["N0"] = N0 stats["intrinsic_growth_rate"] = ( None # Phenomenological: no intrinsic parameter ) stats["fit_method"] = "model_fitting_phenom_richards" return stats # Exp phase start is λ for phenomenological models exp_phase_start = lam # Exp phase end using specified method _, exp_phase_end = compute_phase_boundaries( t_dense, N_dense, method=phase_boundary_method, time_at_umax=time_at_umax, od_at_umax=od_at_umax, mu_max=mu_max, baseline_od=float(np.min(N_dense)), plateau_od=float(np.max(N_dense)), lag_threshold=lag_threshold, exp_threshold=exp_threshold, ) # Doubling t from mu_max doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan # RMSE rmse = compute_rmse(N, y_fit) return { "max_od": max_od, "N0": N0, "mu_max": float(mu_max), "intrinsic_growth_rate": None, # Phenomenological: no intrinsic parameter "doubling_time": float(doubling_time), "exp_phase_start": exp_phase_start, "exp_phase_end": max(exp_phase_end, exp_phase_start), "time_at_umax": time_at_umax, "od_at_umax": od_at_umax, "fit_t_min": float(t.min()), "fit_t_max": float(t.max()), "fit_method": "model_fitting_phenom_richards", "model_rmse": rmse, } def _extract_stats_parametric( fit_result, t, N, lag_threshold=0.15, exp_threshold=0.15, phase_boundary_method="threshold", ): """ Dispatcher for parametric model statistics extraction. This function reads the model type and dispatches to the appropriate model-specific extraction function. Handles: mech_* and phenom_* models. Parameters: fit_result: Dict from fit_* functions (contains 'params' and 'model_type') t: Time array (hours) used for fitting N: OD values used for fitting lag_threshold: Fraction of peak growth rate for lag phase detection exp_threshold: Fraction of peak growth rate for exponential phase end detection phase_boundary_method: Method for phase boundary calculation Returns: Growth statistics dictionary. """ model_type = fit_result.get("model_type") # Dispatch to model-specific extraction function if model_type == "mech_logistic": return _extract_stats_mech_logistic( fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method ) elif model_type == "mech_gompertz": return _extract_stats_mech_gompertz( fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method ) elif model_type == "mech_richards": return _extract_stats_mech_richards( fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method ) elif model_type == "mech_baranyi": return _extract_stats_mech_baranyi( fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method ) elif model_type == "phenom_logistic": return _extract_stats_phenom_logistic( fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method ) elif model_type == "phenom_gompertz": return _extract_stats_phenom_gompertz( fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method ) elif model_type == "phenom_gompertz_modified": return _extract_stats_phenom_gompertz_modified( fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method ) elif model_type == "phenom_richards": return _extract_stats_phenom_richards( fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method ) else: # Unknown parametric model raise ValueError( f"Unknown model type for parametric stats extraction: {model_type}" ) def _extract_stats_sliding_window( fit_result, t, N, lag_threshold=0.15, exp_threshold=0.15, phase_boundary_method="tangent", ): """ Extract growth statistics from sliding window fits. Parameters: fit_result: Dict from fit_* functions (contains 'params' and 'model_type') t: Time array (hours) used for fitting N: OD values used for fitting lag_threshold: Fraction of peak growth rate for lag phase detection exp_threshold: Fraction of peak growth rate for exponential phase end detection phase_boundary_method: "threshold" or "tangent" (default: "tangent") Returns: Growth statistics dictionary. """ params = fit_result.get("params", {}) valid_mask = np.isfinite(t) & np.isfinite(N) & (N > 0) t_clean = t[valid_mask] y_clean = N[valid_mask] if len(t_clean) < 5 or np.ptp(t_clean) <= 0: return bad_fit_stats() y_smooth = smooth(y_clean, 11, 1) mu_max = params["slope"] doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan max_od = float(np.max(y_clean)) # Calculate N0 as mean of first 10 points N0 = float(np.mean(y_clean[: min(10, len(y_clean))])) time_at_umax = params["time_at_umax"] # Evaluate the fitted line at time_at_umax to get the correct OD value # Fitted line is: ln(OD) = slope * t + intercept, so OD = exp(slope * t + intercept) od_at_umax = float(np.exp(params["slope"] * time_at_umax + params["intercept"])) # Calculate phase boundaries using specified method baseline_od = float(np.min(y_clean)) plateau_od = max_od exp_start, exp_end = compute_phase_boundaries( t_clean, y_smooth, method=phase_boundary_method, time_at_umax=time_at_umax, od_at_umax=od_at_umax, mu_max=mu_max, baseline_od=baseline_od, plateau_od=plateau_od, lag_threshold=lag_threshold, exp_threshold=exp_threshold, ) window_points = params.get("window_points", 15) t_window_start = params.get("fit_t_min", np.nan) t_window_end = params.get("fit_t_max", np.nan) window_centers = [] window_indices = [] for i in range(len(t_clean) - window_points + 1): window_centers.append(float(t_clean[i : i + window_points].mean())) window_indices.append(i) if len(window_centers) > 0: best_idx = min( range(len(window_centers)), key=lambda i: abs(window_centers[i] - time_at_umax), ) start_idx = window_indices[best_idx] t_window = t_clean[start_idx : start_idx + window_points] y_window = y_clean[start_idx : start_idx + window_points] if len(t_window) > 0: t_window_start = float(t_window.min()) t_window_end = float(t_window.max()) slope = params["slope"] intercept = params["intercept"] y_fit_window = np.exp(slope * t_window + intercept) # RMSE in log space (sliding window fits linear model in log space) rmse = compute_rmse(y_window, y_fit_window, in_log_space=True) else: rmse = np.nan return { "max_od": max_od, "N0": N0, "mu_max": mu_max, "intrinsic_growth_rate": None, # Non-parametric: no intrinsic parameter "doubling_time": doubling_time, "exp_phase_start": exp_start, "exp_phase_end": exp_end, "time_at_umax": time_at_umax, "od_at_umax": od_at_umax, "fit_t_min": t_window_start, "fit_t_max": t_window_end, "fit_method": "model_fitting_sliding_window", "model_rmse": rmse, } def _extract_stats_spline( fit_result, t, N, lag_threshold=0.15, exp_threshold=0.15, phase_boundary_method="tangent", ): """ Extract growth statistics from spline fits. The spline fitting pipeline: 1. Initial region identified using 30% threshold of instantaneous mu (defines fit_t_min/fit_t_max) 2. Spline fitted to log-transformed N in that region 3. mu_max calculated as maximum derivative of fitted spline 4. Phase boundaries calculated here using tangent or threshold method Parameters: fit_result: Dict from fit_* functions (contains 'params' and 'model_type') t: Time array (hours) used for fitting N: OD values used for fitting lag_threshold: Fraction of peak growth rate for lag phase detection (threshold method) exp_threshold: Fraction of peak growth rate for exponential phase end (threshold method) phase_boundary_method: "threshold" or "tangent" (default: "tangent") Returns: Growth statistics dictionary. """ from .models import spline_from_params params = fit_result.get("params", {}) # Use stored mu_max and time_at_umax from the original fit # These were calculated from the spline during fitting mu_max = params.get("mu_max") time_at_umax = params.get("time_at_umax") fit_t_min = params.get("fit_t_min") fit_t_max = params.get("fit_t_max") if mu_max is None or time_at_umax is None or fit_t_min is None or fit_t_max is None: return bad_fit_stats() mu_max = float(mu_max) # Validate and clean N valid_mask = np.isfinite(t) & np.isfinite(N) & (N > 0) t_clean = t[valid_mask] y_clean = N[valid_mask] if len(t_clean) < 5 or np.ptp(t_clean) <= 0: return bad_fit_stats() y_smooth = smooth(y_clean, 11, 1) # Extract the exponential phase N using stored fit_t_min and fit_t_max exp_mask = (t_clean >= fit_t_min) & (t_clean <= fit_t_max) t_exp = t_clean[exp_mask] y_exp = y_clean[exp_mask] if len(t_exp) < 5: return bad_fit_stats() # Reconstruct spline to calculate od_at_umax and RMSE y_log_exp = np.log(y_exp) try: spline = spline_from_params(params) # Evaluate spline at time_at_umax to get the OD value # Spline is fitted to log(y), so exponentiate to get actual OD od_at_umax = float(np.exp(spline(time_at_umax))) # Calculate RMSE in log space y_log_fit = spline(t_exp) rmse = compute_rmse(y_log_exp, y_log_fit, in_log_space=False) except Exception: od_at_umax = np.nan rmse = np.nan # Calculate basic stats doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan max_od = float(np.max(y_clean)) N0 = float(np.mean(y_clean[: min(10, len(y_clean))])) # Calculate phase boundaries using specified method baseline_od = float(np.min(y_clean)) plateau_od = max_od exp_start, exp_end = compute_phase_boundaries( t_clean, y_smooth, method=phase_boundary_method, time_at_umax=time_at_umax, od_at_umax=od_at_umax, mu_max=mu_max, baseline_od=baseline_od, plateau_od=plateau_od, lag_threshold=lag_threshold, exp_threshold=exp_threshold, ) return { "max_od": max_od, "N0": N0, "mu_max": mu_max, "intrinsic_growth_rate": None, # Non-parametric: no intrinsic parameter "doubling_time": doubling_time, "exp_phase_start": exp_start, "exp_phase_end": exp_end, "time_at_umax": time_at_umax, "od_at_umax": od_at_umax, "fit_t_min": fit_t_min, "fit_t_max": fit_t_max, "fit_method": "model_fitting_spline", "model_rmse": rmse, }
[docs] def extract_stats( fit_result, t, N, lag_threshold=0.15, exp_threshold=0.15, phase_boundary_method=None, **kwargs, ): """ Extract growth statistics from parametric or non-parametric fit results. This function acts as a dispatcher that reads the model type from the fit_result and calls the appropriate model-specific extraction function. Parameters: fit_result: Dict from fit_* functions (contains 'params' and 'model_type') t: Time array (hours) used for fitting N: OD values used for fitting lag_threshold: Fraction of u_max for lag phase detection (threshold method) exp_threshold: Fraction of u_max for exponential phase end (threshold method) phase_boundary_method: Method for calculating phase boundaries: - "threshold": Threshold-based method using fractions of μ_max - "tangent": Tangent line method at point of maximum growth rate Returns: Growth statistics dictionary. """ if fit_result is None: return bad_fit_stats() if "lag_frac" in kwargs: lag_threshold = kwargs["lag_frac"] if "exp_frac" in kwargs: exp_threshold = kwargs["exp_frac"] # Validate and filter N - remove non-positive and non-finite values t, N = validate_data(t, N, min_points=3) if t is None or N is None: return bad_fit_stats() model_type = fit_result.get("model_type") # Determine method based on model type if not specified if phase_boundary_method is None: # Mechanistic models (ODE-based): use threshold if model_type in { "mech_logistic", "mech_gompertz", "mech_richards", "mech_baranyi", }: method = "threshold" # Phenomenological models (ln-space): use tangent elif model_type in { "phenom_logistic", "phenom_gompertz", "phenom_gompertz_modified", "phenom_richards", }: method = "tangent" # Non-parametric models: use tangent else: method = "tangent" else: method = phase_boundary_method # Dispatch to appropriate extraction function based on model type if model_type in { "mech_logistic", "mech_gompertz", "mech_richards", "mech_baranyi", "phenom_logistic", "phenom_gompertz", "phenom_gompertz_modified", "phenom_richards", }: return _extract_stats_parametric( fit_result, t, N, lag_threshold, exp_threshold, method ) elif model_type == "sliding_window": return _extract_stats_sliding_window( fit_result, t, N, lag_threshold, exp_threshold, method ) elif model_type == "spline": return _extract_stats_spline( fit_result, t, N, lag_threshold, exp_threshold, method ) else: # Unknown or unsupported model type return bad_fit_stats()
# ----------------------------------------------------------------------------- # Derivative Functions # -----------------------------------------------------------------------------
[docs] def bad_fit_stats(): """Return default stats for failed fits.""" return no_fit_dictionary.copy()
# ----------------------------------------------------------------------------- # No-Growth Detection # -----------------------------------------------------------------------------
[docs] def detect_no_growth( t, N, growth_stats=None, min_data_points=5, min_signal_to_noise=5.0, min_od_increase=0.05, min_growth_rate=1e-6, ): """ Detect whether a growth curve shows no significant growth. Performs multiple checks to determine if a well should be marked as "no growth": 1. All OD values are <= 0 2. Insufficient N points 3. Low signal-to-noise ratio (max/min OD ratio) 4. Insufficient OD increase (flat curve) 5. Zero or near-zero growth rate (from fitted stats) Parameters: t: Time array N: OD values (baseline-corrected) growth_stats: Optional dict of fitted growth statistics (from extract_stats or sliding_window_fit). If provided, growth rate is checked. min_data_points: Minimum number of valid N points required (default: 5) min_signal_to_noise: Minimum ratio of max/min OD values (default: 5.0) min_od_increase: Minimum absolute OD increase required (default: 0.05) min_growth_rate: Minimum specific growth rate to be considered growth (default: 1e-6) Returns: Dict with: - is_no_growth: bool, True if no growth detected - reason: str, description of why it was flagged (or "growth detected") - checks: dict with individual check results """ t = np.asarray(t, dtype=float) N = np.asarray(N, dtype=float) # Check if all OD values are <= 0 if np.all(N <= 0): return { "is_no_growth": True, "reason": "All OD values <= 0", "checks": { "has_sufficient_data": False, "has_sufficient_snr": False, "has_sufficient_od_increase": False, "has_positive_growth_rate": False, }, } # Filter to finite positive values mask = np.isfinite(t) & np.isfinite(N) & (N > 0) t_valid = t[mask] y_valid = N[mask] checks = { "has_sufficient_data": True, "has_sufficient_snr": True, "has_sufficient_od_increase": True, "has_positive_growth_rate": True, } # Check 1: Minimum N points if len(t_valid) < min_data_points: checks["has_sufficient_data"] = False return { "is_no_growth": True, "reason": f"Insufficient N points ({len(t_valid)} < {min_data_points})", "checks": checks, } # Check 2: Signal-to-noise ratio (max/min OD) y_min = np.min(y_valid) y_max = np.max(y_valid) if y_min > 0: snr = y_max / y_min else: snr = np.inf if y_max > 0 else 0.0 if snr < min_signal_to_noise: checks["has_sufficient_snr"] = False r = f"Signal-to-noise ratio below threshold ({snr:.2f} < {min_signal_to_noise})" return { "is_no_growth": True, "reason": r, "checks": checks, } # Check 3: Minimum OD increase (detects flat curves) od_increase = y_max - y_min if od_increase < min_od_increase: checks["has_sufficient_od_increase"] = False return { "is_no_growth": True, "reason": ( f"Insufficient OD increase ({od_increase:.4f} < {min_od_increase})" ), "checks": checks, } # Check 4: Growth rate from fitted statistics (if provided) if growth_stats is not None: mu = growth_stats.get("mu_max") if mu is None or not np.isfinite(mu) or mu < min_growth_rate: checks["has_positive_growth_rate"] = False mu_str = f"{mu:.6f}" if mu is not None and np.isfinite(mu) else "N/A" return { "is_no_growth": True, "reason": f"μ(max) below minimum ({mu_str} < {min_growth_rate})", "checks": checks, } return { "is_no_growth": False, "reason": "growth detected", "checks": checks, }
[docs] def is_no_growth(growth_stats): """ Simple check if growth stats indicate no growth (failed or missing fit). This is a convenience function for quick checks on growth_stats dicts. For more comprehensive checks including raw data analysis, use detect_no_growth(). Parameters: growth_stats: Dict from extract_stats or sliding_window_fit Returns: bool: True if no growth detected (empty stats or zero growth rate) """ if not growth_stats: return True mu = growth_stats.get("mu_max", 0.0) return mu is None or mu == 0.0
# ----------------------------------------------------------------------------- # Derivative and Growth Rate Calculation Functions # -----------------------------------------------------------------------------
[docs] def compute_first_derivative(t, N): """ Compute the first derivative of a growth curve. Parameters ---------- t : array_like Time array N : array_like OD600 values (baseline-corrected) Returns ------- tuple of (np.ndarray, np.ndarray) Tuple of (t, dN) where dy is the first derivative dy/dt """ t = np.asarray(t, dtype=float) N = np.asarray(N, dtype=float) dN = np.gradient(N, t) return t, dN
[docs] def compute_instantaneous_mu(t, N): """ Compute the instantaneous specific growth rate (μ = 1/N × dN/dt). The specific growth rate μ represents the rate of population growth per unit of population. It is calculated as μ = (1/N) × d(N)/dt. Parameters ---------- t : array_like Time array N : array_like OD600 values (baseline-corrected) Returns ------- tuple of (np.ndarray, np.ndarray) Tuple (t, mu) where mu is the specific growth rate μ = (1/N) × d(N)/dt """ t = np.asarray(t, dtype=float) N = np.asarray(N, dtype=float) dN = np.gradient(N, t) # Avoid division by zero - set mu to nan where N is too small mu = np.where(np.abs(N) > 1e-10, dN / N, np.nan) return t, mu
[docs] def compute_sliding_window_growth_rate(t, N, window_points=15): """ Compute instantaneous specific growth rate using a sliding window approach. For each t point, fits a linear regression to log(N) vs t in a window centered at that point. The slope of the regression is the instantaneous specific growth rate μ at that t. This method is more robust to noise than direct differentiation but requires more N points. Parameters ---------- t : array_like Time array N : array_like OD600 values (baseline-corrected, must be positive) window_points : int, optional Number of points in each sliding window (default: 15) Returns ------- tuple of (np.ndarray, np.ndarray) Tuple of (time_out, mu_out) where mu_out is the sliding window growth rate. Returns arrays with NaN for points where window fitting failed. """ t = np.asarray(t, dtype=float) N = np.asarray(N, dtype=float) if len(t) < window_points: return t, np.full_like(t, np.nan) # Compute log(N), handling non-positive values y_log = np.where(N > 0, np.log(N), np.nan) mu = np.full_like(t, np.nan) half_window = window_points // 2 for i in range(len(t)): # Define window boundaries start = max(0, i - half_window) end = min(len(t), i + half_window + 1) # Ensure window has enough points if end - start < max(3, window_points // 2): continue t_win = t[start:end] y_log_win = y_log[start:end] # Skip if we have NaN values in the window valid_mask = np.isfinite(y_log_win) if valid_mask.sum() < 3: continue t_win_valid = t_win[valid_mask] y_log_win_valid = y_log_win[valid_mask] # Fit linear regression: log(N) = slope * t + intercept # slope = μ (specific growth rate) try: if np.ptp(t_win_valid) > 0: slope, _ = np.polyfit(t_win_valid, y_log_win_valid, 1) if np.isfinite(slope): mu[i] = slope except (np.linalg.LinAlgError, ValueError): continue return t, mu
[docs] def compare_methods( t: np.ndarray, N: np.ndarray, model_family: str = "all", phase_boundary_method: str = None, **fit_kwargs, ) -> tuple: """ Fit multiple models and extract growth statistics for comparison. This all-in-one convenience function fits all models in a specified family, extracts their statistics, and returns both for analysis and visualization. Parameters ---------- t : numpy.ndarray Time array for fitting N : numpy.ndarray OD N array for fitting model_family : str, optional Which family of models to fit. Options: - "mechanistic" : All mechanistic models (mech_logistic, mech_gompertz, etc.) - "phenomenological" : All phenomenological models - "all" : All available models Default: "all" phase_boundary_method : str, optional Method for calculating phase boundaries ("threshold" or "tangent"). If None, uses default for each model type. **fit_kwargs Additional keyword arguments to pass to fitting functions (e.g., spline_s=100, window_points=15) Returns ------- tuple of (dict, dict) Returns (fits, stats) where: - fits: Dictionary mapping method names to fit result dictionaries - stats: Dictionary mapping method names to statistics dictionaries The stats dictionary can be passed directly to plot_growth_stats_comparison(). Examples -------- >>> # Fit and compare all mechanistic models >>> fits, stats = gc.inference.compare_methods(t, N, model_family="mechanistic") >>> >>> # Plot comparison >>> fig = gc.plot.plot_growth_stats_comparison(stats, title="Mechanistic Models") >>> fig.show() >>> >>> # Fit phenomenological models with tangent phase boundaries >>> fits, stats = gc.inference.compare_methods( ... t, N, ... model_family="phenomenological", ... phase_boundary_method="tangent" ... ) """ # Get list of models to fit if model_family == "all": models_to_fit = ( MODEL_REGISTRY["mechanistic"] + MODEL_REGISTRY["phenomenological"] + MODEL_REGISTRY["non_parametric"] ) elif model_family in MODEL_REGISTRY: models_to_fit = MODEL_REGISTRY[model_family] else: raise ValueError( f"Unknown model_family '{model_family}'. " f"Choose from: {list(MODEL_REGISTRY.keys()) + ['all']}" ) # spline_kwargs = ["spline_s", "k"] # sliding_window_kwargs = ["window_points", "poly_order"] # Fit all models fits = {} stats = {} for model_name in models_to_fit: fits[model_name], stats[model_name] = gc.fit_model( t=t, N=N, model_name=model_name, phase_boundary_method=phase_boundary_method, **fit_kwargs, ) return fits, stats