"""Utility functions for growth curve analysis.
This module provides utility functions for data validation, smoothing,
derivative calculations, and RMSE computation.
"""
import numpy as np
from scipy.signal import savgol_filter
import growthcurves as gc
from growthcurves.models import MODEL_REGISTRY, evaluate_parametric_model
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
no_fit_dictionary = {
"max_od": 0.0,
"N0": np.nan,
"mu_max": 0.0,
"intrinsic_growth_rate": None,
"doubling_time": np.nan,
"exp_phase_start": np.nan,
"exp_phase_end": np.nan,
"time_at_umax": np.nan,
"od_at_umax": np.nan,
"fit_t_min": np.nan,
"fit_t_max": np.nan,
"fit_method": None,
"model_rmse": np.nan,
}
# -----------------------------------------------------------------------------
# Utility Functions
# -----------------------------------------------------------------------------
[docs]
def validate_data(t, N, min_points=10):
"""
Validate and clean input growth curve N.
This function filters out invalid N points that would cause problems in
growth curve analysis, particularly when taking logarithms for exponential
growth calculations.
Filters out:
- Non-finite values (NaN, inf, -inf) in t or OD
- Non-positive OD values (N <= 0), which are invalid for log transformations
- Datasets with insufficient N points or no t variation
Parameters:
t: Time array
N: OD measurement array
min_points: Minimum number of valid N points required (default: 10)
Returns:
Tuple of (time_clean, data_clean) with only valid N points, or (None, None)
if the N doesn't meet minimum requirements.
Examples:
>>> t = np.array([0, 1, 2, 3, 4])
>>> N = np.array([0.0, 0.1, 0.2, np.nan, 0.4]) # Has zero and NaN
>>> time_clean, data_clean = validate_data(t, N)
>>> print(time_clean)
[1 2 4]
>>> print(data_clean)
[0.1 0.2 0.4]
"""
t = np.asarray(t, dtype=float)
N = np.asarray(N, dtype=float)
# Filter: keep only finite values where N > 0
mask = np.isfinite(t) & np.isfinite(N) & (N > 0)
t, N = t[mask], N[mask]
# Check minimum requirements
if len(t) < min_points or np.ptp(t) <= 0:
return None, None
return t, N
[docs]
def compute_rmse(y_observed, y_predicted, in_log_space=False):
"""
Calculate root mean square error between observed and predicted values.
Parameters:
y_observed: Observed values
y_predicted: Predicted values
in_log_space: If True, compute RMSE in log space (default: False)
Returns:
RMSE value (float), or np.nan if no valid data points
Note:
- Parametric models use in_log_space=False (linear space)
- Non-parametric models (spline, sliding window) use in_log_space=False
when data is already log-transformed
"""
mask = np.isfinite(y_observed) & np.isfinite(y_predicted)
if mask.sum() == 0:
return np.nan
obs = y_observed[mask]
pred = y_predicted[mask]
# Apply log transformation if requested and values are positive
if in_log_space:
if np.any(obs <= 0) or np.any(pred <= 0):
return np.nan
obs = np.log(obs)
pred = np.log(pred)
residuals = obs - pred
return float(np.sqrt(np.mean(residuals**2)))
[docs]
def compute_mu_max(t, N):
"""
Calculate the maximum specific growth rate from a fitted curve.
The specific growth rate is μ = (1/N) * dN/dt = d(ln(N))/dt
Parameters:
t: Time array
y_fit: Fitted OD values
Returns:
Maximum specific growth rate (t^-1)
"""
# Use dense t points for accurate derivative
t_dense = np.linspace(t.min(), t.max(), 1000)
N_interp = np.interp(t_dense, t, N)
# Calculate specific growth rate: μ = (1/N) * dN/dt
dN_dt = np.gradient(N_interp, t_dense)
# Avoid division by very small values
N_safe = np.maximum(N_interp, 1e-10)
mu = dN_dt / N_safe
# Return maximum specific growth rate
return float(np.max(mu))
[docs]
def smooth(N, window=11, poly=1, passes=2):
"""Apply Savitzky-Golay smoothing filter."""
n = len(N)
if n < 7:
return N
w = int(window) | 1 # Ensure odd
w = min(w, n if n % 2 else n - 1)
p = min(int(poly), w - 1)
for _ in range(passes):
N = savgol_filter(N, w, p, mode="interp")
return N
def _linear_interpolate_crossing(t, values, threshold, search_condition):
"""
Find t at which values cross a threshold using linear interpolation.
Parameters:
t: Time array
values: Value array (e.g., specific growth rate)
threshold: Threshold value to find crossing for
search_condition: Boolean array indicating where to search for crossing
Returns:
Float t at threshold crossing, or None if no crossing found
"""
if not np.any(search_condition):
return None
crossing_idx = int(np.argmax(search_condition))
if crossing_idx == 0:
return float(t[crossing_idx])
# Linear interpolation between points
t0, t1 = t[crossing_idx - 1], t[crossing_idx]
v0, v1 = values[crossing_idx - 1], values[crossing_idx]
if v1 == v0:
frac = 0.0
else:
frac = (threshold - v0) / (v1 - v0)
return float(t0 + frac * (t1 - t0))
[docs]
def compute_phase_boundaries_mu_threshold(
t, N, mu_max, lag_threshold=0.15, exp_threshold=0.15
):
"""
Calculate lag and exponential phase end times from specific growth rate.
Parameters:
t: Time array
N: OD values (should be from fitted/idealized curve)
mu_max: Pre-calculated maximum specific growth rate (required)
lag_threshold: Fraction of μ_max for lag phase end detection
exp_threshold: Fraction of μ_max for exponential phase end detection
Returns:
Tuple of (lag_end, exp_end) times.
"""
if len(t) < 5 or np.ptp(t) <= 0:
return float(t[0]) if len(t) > 0 else np.nan, (
float(t[-1]) if len(t) > 0 else np.nan
)
# Calculate specific growth rate curve for finding phase boundaries
_, mu = compute_instantaneous_mu(t, N)
mu = np.nan_to_num(mu, nan=0.0) # Replace NaN with 0
mu = np.maximum(mu, 0) # Only consider positive growth
# Use provided mu_max
peak_val = float(mu_max)
peak_idx = np.argmax(mu)
if peak_val <= 0:
return float(t[0]), float(t[-1])
lag_threshold = lag_threshold * peak_val
exp_threshold = exp_threshold * peak_val
# Find lag phase end (first crossing of lag threshold)
lag_end = _linear_interpolate_crossing(t, mu, lag_threshold, mu >= lag_threshold)
if lag_end is None:
lag_end = float(t[0])
# Find exp phase end (first crossing below exp threshold after peak)
after_peak = np.arange(len(t)) > peak_idx
exp_end = _linear_interpolate_crossing(
t, mu, exp_threshold, after_peak & (mu <= exp_threshold)
)
if exp_end is None:
exp_end = float(t[-1])
return lag_end, max(exp_end, lag_end)
[docs]
def compute_phase_boundaries_tangent(
t, N, time_at_umax, od_at_umax, mu_max, baseline_od=None, plateau_od=None
):
"""
Calculate exponential phase boundaries using tangent line method.
This method extends the tangent line at the point of maximum growth rate
(μ_max) down to the baseline (lag phase) and up to the plateau (stationary
phase). The intersection points define the start and end of exponential phase.
At the point of maximum growth rate, the tangent line in LOG space is:
ln(OD(t)) = ln(od_at_umax) + μ_max * (t - time_at_umax)
Which gives in linear space:
OD(t) = od_at_umax * exp(μ_max * (t - time_at_umax))
Parameters:
t: Time array
N: OD values (should be from fitted/idealized curve)
time_at_umax: Time at which μ_max occurs
od_at_umax: OD value at time_at_umax
mu_max: Maximum specific growth rate (μ_max)
baseline_od: Baseline OD (lag phase level). If None, uses min(N)
plateau_od: Plateau OD (stationary phase level). If None, uses max(N)
Returns:
Tuple of (exp_phase_start, exp_phase_end) times.
Note:
This method is more appropriate for non-parametric fits where the
exponential phase is well-defined by the tangent at μ_max.
"""
if mu_max <= 0 or od_at_umax <= 0:
# Invalid parameters, return full range
return float(t[0]) if len(t) > 0 else np.nan, (
float(t[-1]) if len(t) > 0 else np.nan
)
# Determine baseline and plateau if not provided
if baseline_od is None:
baseline_od = float(np.min(N))
if plateau_od is None:
plateau_od = float(np.max(N))
# Ensure baseline < od_at_umax < plateau
baseline_od = min(baseline_od, od_at_umax * 0.9)
plateau_od = max(plateau_od, od_at_umax * 1.1)
# Tangent line in LOG space: ln(OD(t)) = ln(od_at_umax) + μ_max * (t - time_at_umax)
# Which gives: OD(t) = od_at_umax * exp(μ_max * (t - time_at_umax))
# Solving for t when OD = baseline_od:
# baseline_od = od_at_umax * exp(μ_max * (t_start - time_at_umax))
# ln(baseline_od / od_at_umax) = μ_max * (t_start - time_at_umax)
# t_start = time_at_umax + ln(baseline_od / od_at_umax) / μ_max
t_start = time_at_umax + np.log(baseline_od / od_at_umax) / mu_max
# Solving for t when OD = plateau_od:
# plateau_od = od_at_umax * exp(μ_max * (t_end - time_at_umax))
# ln(plateau_od / od_at_umax) = μ_max * (t_end - time_at_umax)
# t_end = time_at_umax + ln(plateau_od / od_at_umax) / μ_max
t_end = time_at_umax + np.log(plateau_od / od_at_umax) / mu_max
# Constrain to N range
t_start = max(float(t[0]), t_start)
t_end = min(float(t[-1]), t_end)
# Ensure start < end
if t_start >= t_end:
# Fallback: use time_at_umax as midpoint
t_range = float(t[-1] - t[0])
t_start = max(float(t[0]), time_at_umax - 0.3 * t_range)
t_end = min(float(t[-1]), time_at_umax + 0.3 * t_range)
return float(t_start), float(t_end)
[docs]
def compute_phase_boundaries(
t,
N,
method="tangent",
time_at_umax=None,
od_at_umax=None,
mu_max=None,
baseline_od=None,
plateau_od=None,
lag_threshold=0.15,
exp_threshold=0.15,
):
"""
Calculate exponential phase boundaries using specified method.
This unified function allows choosing between different methods for
determining the start and end of the exponential growth phase.
Parameters:
t: Time array
N: OD values (should be from fitted/idealized curve)
method: Method to use for phase boundary calculation:
- "tangent": Tangent line method (requires time_at_umax, od_at_umax, mu_max)
- "threshold": Threshold-based method using fractions of μ_max
time_at_umax: Time at which μ_max occurs (required for "tangent" method)
od_at_umax: OD value at time_at_umax (required for "tangent" method)
mu_max: Maximum specific growth rate (required for "tangent" method)
baseline_od: Baseline OD for tangent method (defaults to min(N))
plateau_od: Plateau OD for tangent method (defaults to max(N))
lag_threshold: Fraction of μ_max for lag phase end
(threshold method, default: 0.15)
exp_threshold: Fraction of μ_max for exp phase end
(threshold method, default: 0.15)
Returns:
Tuple of (exp_phase_start, exp_phase_end) times.
Examples:
>>> # Tangent method (for non-parametric fits)
>>> exp_start, exp_end = calculate_phase_boundaries(
... t, N, method="tangent",
... time_at_umax=5.0, od_at_umax=0.5, mu_max=0.8
... )
>>> # Threshold method (for parametric fits)
>>> exp_start, exp_end = calculate_phase_boundaries(
... t, N, method="threshold", lag_threshold=0.15, exp_threshold=0.15
... )
"""
if method == "tangent":
if time_at_umax is None or od_at_umax is None or mu_max is None:
raise ValueError(
"Tangent method requires time_at_umax, od_at_umax, and mu_max"
)
return compute_phase_boundaries_tangent(
t, N, time_at_umax, od_at_umax, mu_max, baseline_od, plateau_od
)
elif method == "threshold":
return compute_phase_boundaries_mu_threshold(
t, N, mu_max, lag_threshold, exp_threshold
)
else:
raise ValueError(f"Unknown method '{method}'. Choose 'tangent' or 'threshold'.")
# -----------------------------------------------------------------------------
# Growth Statistics Extraction
# -----------------------------------------------------------------------------
def _extract_stats_mech_logistic(
fit_result,
t,
N,
lag_threshold=0.15,
exp_threshold=0.15,
phase_boundary_method="threshold",
):
"""
Extract growth statistics from mechanistic logistic model fit.
ODE: dN/dt = μ * (1 - N/K) * N
Parameters:
fit_result: Dict containing 'params' with mu, K, N0
t: Time array
N: OD values
lag_threshold, exp_threshold: Phase detection thresholds
phase_boundary_method: "threshold" or "tangent"
Returns:
Growth statistics dictionary.
"""
params = fit_result.get("params", {})
# Extract model parameters
K = float(params["K"]) # Carrying capacity above baseline
y0 = float(params["y0"]) # Baseline OD
mu_intrinsic = float(params["mu"]) # Intrinsic growth rate
# Evaluate model
N_fit = evaluate_parametric_model(t, "mech_logistic", params)
mu_max = compute_mu_max(t, N_fit)
# Dense grid for accurate calculations
t_dense = np.linspace(t.min(), t.max(), 500)
N_dense = evaluate_parametric_model(t_dense, "mech_logistic", params)
# Calculate specific growth rate curve
N_safe = np.maximum(N_dense, 1e-10)
mu_dense = np.gradient(np.log(N_safe), t_dense)
# Find t of maximum specific growth rate
max_mu_idx = int(np.argmax(mu_dense))
time_at_umax = float(t_dense[max_mu_idx])
od_at_umax = float(N_dense[max_mu_idx])
if mu_max <= 0:
stats = bad_fit_stats()
stats["max_od"] = y0 + K
stats["N0"] = y0
stats["intrinsic_growth_rate"] = mu_intrinsic
stats["fit_method"] = "model_fitting_mech_logistic"
return stats
# Phase boundaries using specified method
exp_phase_start, exp_phase_end = compute_phase_boundaries(
t_dense,
N_dense,
method=phase_boundary_method,
time_at_umax=time_at_umax,
od_at_umax=od_at_umax,
mu_max=mu_max,
baseline_od=float(np.min(N_dense)),
plateau_od=y0 + K,
lag_threshold=lag_threshold,
exp_threshold=exp_threshold,
)
# Doubling t from mu_max
doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan
# RMSE
rmse = compute_rmse(N, N_fit)
return {
"max_od": y0 + K,
"N0": y0,
"mu_max": float(mu_max),
"intrinsic_growth_rate": mu_intrinsic,
"doubling_time": float(doubling_time),
"exp_phase_start": exp_phase_start,
"exp_phase_end": max(exp_phase_end, exp_phase_start),
"time_at_umax": time_at_umax,
"od_at_umax": od_at_umax,
"fit_t_min": float(t.min()),
"fit_t_max": float(t.max()),
"fit_method": "model_fitting_mech_logistic",
"model_rmse": rmse,
}
def _extract_stats_mech_gompertz(
fit_result,
t,
N,
lag_threshold=0.15,
exp_threshold=0.15,
phase_boundary_method="threshold",
):
"""
Extract growth statistics from mechanistic Gompertz model fit.
ODE: dN/dt = μ * log(K/N) * N
Parameters:
fit_result: Dict containing 'params' with mu, K, N0
t: Time array
N: OD values
lag_threshold, exp_threshold: Phase detection thresholds
phase_boundary_method: "threshold" or "tangent"
Returns:
Growth statistics dictionary.
"""
from .models import evaluate_parametric_model
params = fit_result.get("params", {})
# Extract model parameters
K = float(params["K"]) # Carrying capacity above baseline
y0 = float(params["y0"]) # Baseline OD
mu_intrinsic = float(params["mu"]) # Intrinsic growth rate
# Evaluate model
N_fit = evaluate_parametric_model(t, "mech_gompertz", params)
mu_max = compute_mu_max(t, N_fit)
# Dense grid for accurate calculations
t_dense = np.linspace(t.min(), t.max(), 500)
N_dense = evaluate_parametric_model(t_dense, "mech_gompertz", params)
# Calculate specific growth rate curve
N_safe = np.maximum(N_dense, 1e-10)
mu_dense = np.gradient(np.log(N_safe), t_dense)
# Find t of maximum specific growth rate
max_mu_idx = int(np.argmax(mu_dense))
time_at_umax = float(t_dense[max_mu_idx])
od_at_umax = float(N_dense[max_mu_idx])
if mu_max <= 0:
stats = bad_fit_stats()
stats["max_od"] = y0 + K
stats["N0"] = y0
stats["intrinsic_growth_rate"] = mu_intrinsic
stats["fit_method"] = "model_fitting_mech_gompertz"
return stats
# Phase boundaries using specified method
exp_phase_start, exp_phase_end = compute_phase_boundaries(
t_dense,
N_dense,
method=phase_boundary_method,
time_at_umax=time_at_umax,
od_at_umax=od_at_umax,
mu_max=mu_max,
baseline_od=float(np.min(N_dense)),
plateau_od=y0 + K,
lag_threshold=lag_threshold,
exp_threshold=exp_threshold,
)
# Doubling t from mu_max
doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan
# RMSE
rmse = compute_rmse(N, N_fit)
return {
"max_od": y0 + K,
"N0": y0,
"mu_max": float(mu_max),
"intrinsic_growth_rate": mu_intrinsic,
"doubling_time": float(doubling_time),
"exp_phase_start": exp_phase_start,
"exp_phase_end": max(exp_phase_end, exp_phase_start),
"time_at_umax": time_at_umax,
"od_at_umax": od_at_umax,
"fit_t_min": float(t.min()),
"fit_t_max": float(t.max()),
"fit_method": "model_fitting_mech_gompertz",
"model_rmse": rmse,
}
def _extract_stats_mech_richards(
fit_result,
t,
N,
lag_threshold=0.15,
exp_threshold=0.15,
phase_boundary_method="threshold",
):
"""
Extract growth statistics from mechanistic Richards model fit.
ODE: dN/dt = μ * (1 - (N/K)^β) * N
Parameters:
fit_result: Dict containing 'params' with mu, K, N0, beta
t: Time array
N: OD values
lag_threshold, exp_threshold: Phase detection thresholds
phase_boundary_method: "threshold" or "tangent"
Returns:
Growth statistics dictionary.
"""
from .models import evaluate_parametric_model
params = fit_result.get("params", {})
# Extract model parameters
K = float(params["K"]) # Carrying capacity above baseline
y0 = float(params["y0"]) # Baseline OD
mu_intrinsic = float(params["mu"]) # Intrinsic growth rate
# Evaluate model
y_fit = evaluate_parametric_model(t, "mech_richards", params)
mu_max = compute_mu_max(t, y_fit)
# Dense grid for accurate calculations
t_dense = np.linspace(t.min(), t.max(), 500)
N_dense = evaluate_parametric_model(t_dense, "mech_richards", params)
# Calculate specific growth rate curve
N_safe = np.maximum(N_dense, 1e-10)
mu_dense = np.gradient(np.log(N_safe), t_dense)
# Find t of maximum specific growth rate
max_mu_idx = int(np.argmax(mu_dense))
time_at_umax = float(t_dense[max_mu_idx])
od_at_umax = float(N_dense[max_mu_idx])
if mu_max <= 0:
stats = bad_fit_stats()
stats["max_od"] = y0 + K
stats["N0"] = y0
stats["intrinsic_growth_rate"] = mu_intrinsic
stats["fit_method"] = "model_fitting_mech_richards"
return stats
# Phase boundaries using specified method
exp_phase_start, exp_phase_end = compute_phase_boundaries(
t_dense,
N_dense,
method=phase_boundary_method,
time_at_umax=time_at_umax,
od_at_umax=od_at_umax,
mu_max=mu_max,
baseline_od=float(np.min(N_dense)),
plateau_od=y0 + K,
lag_threshold=lag_threshold,
exp_threshold=exp_threshold,
)
# Doubling t from mu_max
doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan
# RMSE
rmse = compute_rmse(N, y_fit)
return {
"max_od": y0 + K,
"N0": y0,
"mu_max": float(mu_max),
"intrinsic_growth_rate": mu_intrinsic,
"doubling_time": float(doubling_time),
"exp_phase_start": exp_phase_start,
"exp_phase_end": max(exp_phase_end, exp_phase_start),
"time_at_umax": time_at_umax,
"od_at_umax": od_at_umax,
"fit_t_min": float(t.min()),
"fit_t_max": float(t.max()),
"fit_method": "model_fitting_mech_richards",
"model_rmse": rmse,
}
def _extract_stats_mech_baranyi(
fit_result,
t,
N,
lag_threshold=0.15,
exp_threshold=0.15,
phase_boundary_method="threshold",
):
"""
Extract growth statistics from mechanistic Baranyi-Roberts model fit.
ODE: dN/dt = μ * A(t) * (1 - N/K) * N
where A(t) = exp(μ*t) / (exp(h0) - 1 + exp(μ*t))
Parameters:
fit_result: Dict containing 'params' with mu, K, N0, h0
t: Time array
N: OD values
lag_threshold, exp_threshold: Phase detection thresholds
phase_boundary_method: "threshold" or "tangent"
Returns:
Growth statistics dictionary.
"""
from .models import evaluate_parametric_model
params = fit_result.get("params", {})
# Extract model parameters
K = float(params["K"]) # Carrying capacity above baseline
y0 = float(params["y0"]) # Baseline OD
mu_intrinsic = float(params["mu"]) # Intrinsic growth rate
# Evaluate model
y_fit = evaluate_parametric_model(t, "mech_baranyi", params)
mu_max = compute_mu_max(t, y_fit)
# Dense grid for accurate calculations
t_dense = np.linspace(t.min(), t.max(), 500)
N_dense = evaluate_parametric_model(t_dense, "mech_baranyi", params)
# Calculate specific growth rate curve
N_safe = np.maximum(N_dense, 1e-10)
mu_dense = np.gradient(np.log(N_safe), t_dense)
# Find t of maximum specific growth rate
max_mu_idx = int(np.argmax(mu_dense))
time_at_umax = float(t_dense[max_mu_idx])
od_at_umax = float(N_dense[max_mu_idx])
if mu_max <= 0:
stats = bad_fit_stats()
stats["max_od"] = y0 + K
stats["N0"] = y0
stats["intrinsic_growth_rate"] = mu_intrinsic
stats["fit_method"] = "model_fitting_mech_baranyi"
return stats
# Lag time is the fitted h0 parameter directly
exp_phase_start = float(params["h0"])
# Exp phase end using specified method
_, exp_phase_end = compute_phase_boundaries(
t_dense,
N_dense,
method=phase_boundary_method,
time_at_umax=time_at_umax,
od_at_umax=od_at_umax,
mu_max=mu_max,
baseline_od=float(np.min(N_dense)),
plateau_od=y0 + K,
lag_threshold=lag_threshold,
exp_threshold=exp_threshold,
)
# Doubling t from mu_max
doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan
# RMSE
rmse = compute_rmse(N, y_fit)
return {
"max_od": y0 + K,
"N0": y0,
"mu_max": float(mu_max),
"intrinsic_growth_rate": mu_intrinsic,
"doubling_time": float(doubling_time),
"exp_phase_start": exp_phase_start,
"exp_phase_end": max(exp_phase_end, exp_phase_start),
"time_at_umax": time_at_umax,
"od_at_umax": od_at_umax,
"fit_t_min": float(t.min()),
"fit_t_max": float(t.max()),
"fit_method": "model_fitting_mech_baranyi",
"model_rmse": rmse,
}
def _extract_stats_phenom_logistic(
fit_result,
t,
N,
lag_threshold=0.15,
exp_threshold=0.15,
phase_boundary_method="tangent",
):
"""
Extract growth statistics from phenomenological logistic model fit.
ln(Nt/N0) = A / (1 + exp(4 * μ_max * (λ - t) / A + 2))
Parameters:
fit_result: Dict containing 'params' with A, mu_max, lam, N0
t: Time array
N: OD values
lag_threshold, exp_threshold: Phase detection thresholds
phase_boundary_method: "threshold" or "tangent"
Returns:
Growth statistics dictionary.
"""
from .models import evaluate_parametric_model
params = fit_result.get("params", {})
# Extract model parameters
float(params["A"]) # Maximum ln(OD/OD0)
mu_max = float(params["mu_max"]) # Maximum specific growth rate (fitted parameter)
lam = float(params["lam"]) # Lag t
N0 = float(params["N0"]) # Initial OD
# Evaluate model
y_fit = evaluate_parametric_model(t, "phenom_logistic", params)
# Dense grid for accurate calculations
t_dense = np.linspace(t.min(), t.max(), 500)
N_dense = evaluate_parametric_model(t_dense, "phenom_logistic", params)
# Calculate specific growth rate curve
N_safe = np.maximum(N_dense, 1e-10)
mu_dense = np.gradient(np.log(N_safe), t_dense)
# Find t of maximum specific growth rate
max_mu_idx = int(np.argmax(mu_dense))
time_at_umax = float(t_dense[max_mu_idx])
od_at_umax = float(N_dense[max_mu_idx])
# Max OD is the maximum of the fitted model trace
max_od = float(np.max(N_dense))
if mu_max <= 0:
stats = bad_fit_stats()
stats["max_od"] = max_od
stats["N0"] = N0
stats["intrinsic_growth_rate"] = (
None # Phenomenological: no intrinsic parameter
)
stats["fit_method"] = "model_fitting_phenom_logistic"
return stats
# Exp phase start is λ for phenomenological models
exp_phase_start = lam
# Exp phase end using specified method
_, exp_phase_end = compute_phase_boundaries(
t_dense,
N_dense,
method=phase_boundary_method,
time_at_umax=time_at_umax,
od_at_umax=od_at_umax,
mu_max=mu_max,
baseline_od=float(np.min(N_dense)),
plateau_od=float(np.max(N_dense)),
lag_threshold=lag_threshold,
exp_threshold=exp_threshold,
)
# Doubling t from mu_max
doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan
# RMSE
rmse = compute_rmse(N, y_fit)
return {
"max_od": max_od,
"N0": N0,
"mu_max": float(mu_max),
"intrinsic_growth_rate": None, # Phenomenological: no intrinsic parameter
"doubling_time": float(doubling_time),
"exp_phase_start": exp_phase_start,
"exp_phase_end": max(exp_phase_end, exp_phase_start),
"time_at_umax": time_at_umax,
"od_at_umax": od_at_umax,
"fit_t_min": float(t.min()),
"fit_t_max": float(t.max()),
"fit_method": "model_fitting_phenom_logistic",
"model_rmse": rmse,
}
def _extract_stats_phenom_gompertz(
fit_result,
t,
N,
lag_threshold=0.15,
exp_threshold=0.15,
phase_boundary_method="tangent",
):
"""
Extract growth statistics from phenomenological Gompertz model fit.
ln(Nt/N0) = A * exp(-exp(μ_max * e * (λ - t) / A + 1))
Parameters:
fit_result: Dict containing 'params' with A, mu_max, lam, N0
t: Time array
N: OD values
lag_threshold, exp_threshold: Phase detection thresholds
phase_boundary_method: "threshold" or "tangent"
Returns:
Growth statistics dictionary.
"""
from .models import evaluate_parametric_model
params = fit_result.get("params", {})
# Extract model parameters
float(params["A"]) # Maximum ln(OD/OD0)
mu_max = float(params["mu_max"]) # Maximum specific growth rate (fitted parameter)
lam = float(params["lam"]) # Lag t
N0 = float(params["N0"]) # Initial OD
# Evaluate model
y_fit = evaluate_parametric_model(t, "phenom_gompertz", params)
# Dense grid for accurate calculations
t_dense = np.linspace(t.min(), t.max(), 500)
N_dense = evaluate_parametric_model(t_dense, "phenom_gompertz", params)
# Calculate specific growth rate curve
N_safe = np.maximum(N_dense, 1e-10)
mu_dense = np.gradient(np.log(N_safe), t_dense)
# Find t of maximum specific growth rate
max_mu_idx = int(np.argmax(mu_dense))
time_at_umax = float(t_dense[max_mu_idx])
od_at_umax = float(N_dense[max_mu_idx])
# Max OD is the maximum of the fitted model trace
max_od = float(np.max(N_dense))
if mu_max <= 0:
stats = bad_fit_stats()
stats["max_od"] = max_od
stats["N0"] = N0
stats["intrinsic_growth_rate"] = (
None # Phenomenological: no intrinsic parameter
)
stats["fit_method"] = "model_fitting_phenom_gompertz"
return stats
# Exp phase start is λ for phenomenological models
exp_phase_start = lam
# Exp phase end using specified method
_, exp_phase_end = compute_phase_boundaries(
t_dense,
N_dense,
method=phase_boundary_method,
time_at_umax=time_at_umax,
od_at_umax=od_at_umax,
mu_max=mu_max,
baseline_od=float(np.min(N_dense)),
plateau_od=float(np.max(N_dense)),
lag_threshold=lag_threshold,
exp_threshold=exp_threshold,
)
# Doubling t from mu_max
doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan
# RMSE
rmse = compute_rmse(N, y_fit)
return {
"max_od": max_od,
"N0": N0,
"mu_max": float(mu_max),
"intrinsic_growth_rate": None, # Phenomenological: no intrinsic parameter
"doubling_time": float(doubling_time),
"exp_phase_start": exp_phase_start,
"exp_phase_end": max(exp_phase_end, exp_phase_start),
"time_at_umax": time_at_umax,
"od_at_umax": od_at_umax,
"fit_t_min": float(t.min()),
"fit_t_max": float(t.max()),
"fit_method": "model_fitting_phenom_gompertz",
"model_rmse": rmse,
}
def _extract_stats_phenom_gompertz_modified(
fit_result,
t,
N,
lag_threshold=0.15,
exp_threshold=0.15,
phase_boundary_method="tangent",
):
"""
Extract growth statistics from phenomenological modified Gompertz model fit.
ln(Nt/N0) = A * exp(-exp(μ_max * e * (λ - t) / A + 1)) + A * exp(α * (t - t_shift))
Parameters:
fit_result: Dict containing 'params' with A, mu_max, lam, alpha, t_shift, N0
t: Time array
N: OD values
lag_threshold, exp_threshold: Phase detection thresholds
phase_boundary_method: "threshold" or "tangent"
Returns:
Growth statistics dictionary.
"""
from .models import evaluate_parametric_model
params = fit_result.get("params", {})
# Extract model parameters
float(params["A"]) # Maximum ln(OD/OD0)
mu_max = float(params["mu_max"]) # Maximum specific growth rate (fitted parameter)
lam = float(params["lam"]) # Lag t
N0_param = float(params["N0"]) # Fitted normalization parameter
# Evaluate model
y_fit = evaluate_parametric_model(t, "phenom_gompertz_modified", params)
# Dense grid for accurate calculations
t_dense = np.linspace(t.min(), t.max(), 500)
N_dense = evaluate_parametric_model(t_dense, "phenom_gompertz_modified", params)
# For the modified Gompertz form, the fitted N0 parameter is not always the
# model value at the earliest timepoint. Use model-predicted initial OD for stats.
first_idx = int(np.argmin(t))
N0 = float(y_fit[first_idx]) if len(y_fit) > 0 else N0_param
# Calculate specific growth rate curve
N_safe = np.maximum(N_dense, 1e-10)
mu_dense = np.gradient(np.log(N_safe), t_dense)
# Find t of maximum specific growth rate
max_mu_idx = int(np.argmax(mu_dense))
time_at_umax = float(t_dense[max_mu_idx])
od_at_umax = float(N_dense[max_mu_idx])
# Max OD is the maximum of the fitted model trace
max_od = float(np.max(N_dense))
if mu_max <= 0:
stats = bad_fit_stats()
stats["max_od"] = max_od
stats["N0"] = N0
stats["intrinsic_growth_rate"] = (
None # Phenomenological: no intrinsic parameter
)
stats["fit_method"] = "model_fitting_phenom_gompertz_modified"
return stats
# Exp phase start is λ for phenomenological models
exp_phase_start = lam
# Exp phase end using specified method
_, exp_phase_end = compute_phase_boundaries(
t_dense,
N_dense,
method=phase_boundary_method,
time_at_umax=time_at_umax,
od_at_umax=od_at_umax,
mu_max=mu_max,
baseline_od=float(np.min(N_dense)),
plateau_od=float(np.max(N_dense)),
lag_threshold=lag_threshold,
exp_threshold=exp_threshold,
)
# Doubling t from mu_max
doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan
# RMSE
rmse = compute_rmse(N, y_fit)
return {
"max_od": max_od,
"N0": N0,
"mu_max": float(mu_max),
"intrinsic_growth_rate": None, # Phenomenological: no intrinsic parameter
"doubling_time": float(doubling_time),
"exp_phase_start": exp_phase_start,
"exp_phase_end": max(exp_phase_end, exp_phase_start),
"time_at_umax": time_at_umax,
"od_at_umax": od_at_umax,
"fit_t_min": float(t.min()),
"fit_t_max": float(t.max()),
"fit_method": "model_fitting_phenom_gompertz_modified",
"model_rmse": rmse,
}
def _extract_stats_phenom_richards(
fit_result,
t,
N,
lag_threshold=0.15,
exp_threshold=0.15,
phase_boundary_method="tangent",
):
"""
Extract growth statistics from phenomenological Richards model fit.
ln(Nt/N0)= A * (1 + ν * exp(1 + ν + μ_max * (1 + ν)^(1 + 1/ν) * (λ - t) / A))^(-1/ν)
Parameters:
fit_result: Dict containing 'params' with A, mu_max, lam, nu, N0
t: Time array
N: OD values
lag_threshold, exp_threshold: Phase detection thresholds
phase_boundary_method: "threshold" or "tangent"
Returns:
Growth statistics dictionary.
"""
from .models import evaluate_parametric_model
params = fit_result.get("params", {})
# Extract model parameters
float(params["A"]) # Maximum ln(OD/OD0)
mu_max = float(params["mu_max"]) # Maximum specific growth rate (fitted parameter)
lam = float(params["lam"]) # Lag t
N0 = float(params["N0"]) # Initial OD
# Evaluate model
y_fit = evaluate_parametric_model(t, "phenom_richards", params)
# Dense grid for accurate calculations
t_dense = np.linspace(t.min(), t.max(), 500)
N_dense = evaluate_parametric_model(t_dense, "phenom_richards", params)
# Calculate specific growth rate curve
N_safe = np.maximum(N_dense, 1e-10)
mu_dense = np.gradient(np.log(N_safe), t_dense)
# Find t of maximum specific growth rate
max_mu_idx = int(np.argmax(mu_dense))
time_at_umax = float(t_dense[max_mu_idx])
od_at_umax = float(N_dense[max_mu_idx])
# Max OD is the maximum of the fitted model trace
max_od = float(np.max(N_dense))
if mu_max <= 0:
stats = bad_fit_stats()
stats["max_od"] = max_od
stats["N0"] = N0
stats["intrinsic_growth_rate"] = (
None # Phenomenological: no intrinsic parameter
)
stats["fit_method"] = "model_fitting_phenom_richards"
return stats
# Exp phase start is λ for phenomenological models
exp_phase_start = lam
# Exp phase end using specified method
_, exp_phase_end = compute_phase_boundaries(
t_dense,
N_dense,
method=phase_boundary_method,
time_at_umax=time_at_umax,
od_at_umax=od_at_umax,
mu_max=mu_max,
baseline_od=float(np.min(N_dense)),
plateau_od=float(np.max(N_dense)),
lag_threshold=lag_threshold,
exp_threshold=exp_threshold,
)
# Doubling t from mu_max
doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan
# RMSE
rmse = compute_rmse(N, y_fit)
return {
"max_od": max_od,
"N0": N0,
"mu_max": float(mu_max),
"intrinsic_growth_rate": None, # Phenomenological: no intrinsic parameter
"doubling_time": float(doubling_time),
"exp_phase_start": exp_phase_start,
"exp_phase_end": max(exp_phase_end, exp_phase_start),
"time_at_umax": time_at_umax,
"od_at_umax": od_at_umax,
"fit_t_min": float(t.min()),
"fit_t_max": float(t.max()),
"fit_method": "model_fitting_phenom_richards",
"model_rmse": rmse,
}
def _extract_stats_parametric(
fit_result,
t,
N,
lag_threshold=0.15,
exp_threshold=0.15,
phase_boundary_method="threshold",
):
"""
Dispatcher for parametric model statistics extraction.
This function reads the model type and dispatches to the appropriate
model-specific extraction function.
Handles: mech_* and phenom_* models.
Parameters:
fit_result: Dict from fit_* functions (contains 'params' and 'model_type')
t: Time array (hours) used for fitting
N: OD values used for fitting
lag_threshold: Fraction of peak growth rate for lag phase detection
exp_threshold: Fraction of peak growth rate for exponential phase end detection
phase_boundary_method: Method for phase boundary calculation
Returns:
Growth statistics dictionary.
"""
model_type = fit_result.get("model_type")
# Dispatch to model-specific extraction function
if model_type == "mech_logistic":
return _extract_stats_mech_logistic(
fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method
)
elif model_type == "mech_gompertz":
return _extract_stats_mech_gompertz(
fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method
)
elif model_type == "mech_richards":
return _extract_stats_mech_richards(
fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method
)
elif model_type == "mech_baranyi":
return _extract_stats_mech_baranyi(
fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method
)
elif model_type == "phenom_logistic":
return _extract_stats_phenom_logistic(
fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method
)
elif model_type == "phenom_gompertz":
return _extract_stats_phenom_gompertz(
fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method
)
elif model_type == "phenom_gompertz_modified":
return _extract_stats_phenom_gompertz_modified(
fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method
)
elif model_type == "phenom_richards":
return _extract_stats_phenom_richards(
fit_result, t, N, lag_threshold, exp_threshold, phase_boundary_method
)
else:
# Unknown parametric model
raise ValueError(
f"Unknown model type for parametric stats extraction: {model_type}"
)
def _extract_stats_sliding_window(
fit_result,
t,
N,
lag_threshold=0.15,
exp_threshold=0.15,
phase_boundary_method="tangent",
):
"""
Extract growth statistics from sliding window fits.
Parameters:
fit_result: Dict from fit_* functions (contains 'params' and 'model_type')
t: Time array (hours) used for fitting
N: OD values used for fitting
lag_threshold: Fraction of peak growth rate for lag phase detection
exp_threshold: Fraction of peak growth rate for exponential phase end detection
phase_boundary_method: "threshold" or "tangent" (default: "tangent")
Returns:
Growth statistics dictionary.
"""
params = fit_result.get("params", {})
valid_mask = np.isfinite(t) & np.isfinite(N) & (N > 0)
t_clean = t[valid_mask]
y_clean = N[valid_mask]
if len(t_clean) < 5 or np.ptp(t_clean) <= 0:
return bad_fit_stats()
y_smooth = smooth(y_clean, 11, 1)
mu_max = params["slope"]
doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan
max_od = float(np.max(y_clean))
# Calculate N0 as mean of first 10 points
N0 = float(np.mean(y_clean[: min(10, len(y_clean))]))
time_at_umax = params["time_at_umax"]
# Evaluate the fitted line at time_at_umax to get the correct OD value
# Fitted line is: ln(OD) = slope * t + intercept, so OD = exp(slope * t + intercept)
od_at_umax = float(np.exp(params["slope"] * time_at_umax + params["intercept"]))
# Calculate phase boundaries using specified method
baseline_od = float(np.min(y_clean))
plateau_od = max_od
exp_start, exp_end = compute_phase_boundaries(
t_clean,
y_smooth,
method=phase_boundary_method,
time_at_umax=time_at_umax,
od_at_umax=od_at_umax,
mu_max=mu_max,
baseline_od=baseline_od,
plateau_od=plateau_od,
lag_threshold=lag_threshold,
exp_threshold=exp_threshold,
)
window_points = params.get("window_points", 15)
t_window_start = params.get("fit_t_min", np.nan)
t_window_end = params.get("fit_t_max", np.nan)
window_centers = []
window_indices = []
for i in range(len(t_clean) - window_points + 1):
window_centers.append(float(t_clean[i : i + window_points].mean()))
window_indices.append(i)
if len(window_centers) > 0:
best_idx = min(
range(len(window_centers)),
key=lambda i: abs(window_centers[i] - time_at_umax),
)
start_idx = window_indices[best_idx]
t_window = t_clean[start_idx : start_idx + window_points]
y_window = y_clean[start_idx : start_idx + window_points]
if len(t_window) > 0:
t_window_start = float(t_window.min())
t_window_end = float(t_window.max())
slope = params["slope"]
intercept = params["intercept"]
y_fit_window = np.exp(slope * t_window + intercept)
# RMSE in log space (sliding window fits linear model in log space)
rmse = compute_rmse(y_window, y_fit_window, in_log_space=True)
else:
rmse = np.nan
return {
"max_od": max_od,
"N0": N0,
"mu_max": mu_max,
"intrinsic_growth_rate": None, # Non-parametric: no intrinsic parameter
"doubling_time": doubling_time,
"exp_phase_start": exp_start,
"exp_phase_end": exp_end,
"time_at_umax": time_at_umax,
"od_at_umax": od_at_umax,
"fit_t_min": t_window_start,
"fit_t_max": t_window_end,
"fit_method": "model_fitting_sliding_window",
"model_rmse": rmse,
}
def _extract_stats_spline(
fit_result,
t,
N,
lag_threshold=0.15,
exp_threshold=0.15,
phase_boundary_method="tangent",
):
"""
Extract growth statistics from spline fits.
The spline fitting pipeline:
1. Initial region identified using 30% threshold of instantaneous mu
(defines fit_t_min/fit_t_max)
2. Spline fitted to log-transformed N in that region
3. mu_max calculated as maximum derivative of fitted spline
4. Phase boundaries calculated here using tangent or threshold method
Parameters:
fit_result: Dict from fit_* functions (contains 'params' and 'model_type')
t: Time array (hours) used for fitting
N: OD values used for fitting
lag_threshold: Fraction of peak growth rate for lag phase detection
(threshold method)
exp_threshold: Fraction of peak growth rate for exponential phase end
(threshold method)
phase_boundary_method: "threshold" or "tangent" (default: "tangent")
Returns:
Growth statistics dictionary.
"""
from .models import spline_from_params
params = fit_result.get("params", {})
# Use stored mu_max and time_at_umax from the original fit
# These were calculated from the spline during fitting
mu_max = params.get("mu_max")
time_at_umax = params.get("time_at_umax")
fit_t_min = params.get("fit_t_min")
fit_t_max = params.get("fit_t_max")
if mu_max is None or time_at_umax is None or fit_t_min is None or fit_t_max is None:
return bad_fit_stats()
mu_max = float(mu_max)
# Validate and clean N
valid_mask = np.isfinite(t) & np.isfinite(N) & (N > 0)
t_clean = t[valid_mask]
y_clean = N[valid_mask]
if len(t_clean) < 5 or np.ptp(t_clean) <= 0:
return bad_fit_stats()
y_smooth = smooth(y_clean, 11, 1)
# Extract the exponential phase N using stored fit_t_min and fit_t_max
exp_mask = (t_clean >= fit_t_min) & (t_clean <= fit_t_max)
t_exp = t_clean[exp_mask]
y_exp = y_clean[exp_mask]
if len(t_exp) < 5:
return bad_fit_stats()
# Reconstruct spline to calculate od_at_umax and RMSE
y_log_exp = np.log(y_exp)
try:
spline = spline_from_params(params)
# Evaluate spline at time_at_umax to get the OD value
# Spline is fitted to log(y), so exponentiate to get actual OD
od_at_umax = float(np.exp(spline(time_at_umax)))
# Calculate RMSE in log space
y_log_fit = spline(t_exp)
rmse = compute_rmse(y_log_exp, y_log_fit, in_log_space=False)
except Exception:
od_at_umax = np.nan
rmse = np.nan
# Calculate basic stats
doubling_time = np.log(2) / mu_max if mu_max > 0 else np.nan
max_od = float(np.max(y_clean))
N0 = float(np.mean(y_clean[: min(10, len(y_clean))]))
# Calculate phase boundaries using specified method
baseline_od = float(np.min(y_clean))
plateau_od = max_od
exp_start, exp_end = compute_phase_boundaries(
t_clean,
y_smooth,
method=phase_boundary_method,
time_at_umax=time_at_umax,
od_at_umax=od_at_umax,
mu_max=mu_max,
baseline_od=baseline_od,
plateau_od=plateau_od,
lag_threshold=lag_threshold,
exp_threshold=exp_threshold,
)
return {
"max_od": max_od,
"N0": N0,
"mu_max": mu_max,
"intrinsic_growth_rate": None, # Non-parametric: no intrinsic parameter
"doubling_time": doubling_time,
"exp_phase_start": exp_start,
"exp_phase_end": exp_end,
"time_at_umax": time_at_umax,
"od_at_umax": od_at_umax,
"fit_t_min": fit_t_min,
"fit_t_max": fit_t_max,
"fit_method": "model_fitting_spline",
"model_rmse": rmse,
}
# -----------------------------------------------------------------------------
# Derivative Functions
# -----------------------------------------------------------------------------
[docs]
def bad_fit_stats():
"""Return default stats for failed fits."""
return no_fit_dictionary.copy()
# -----------------------------------------------------------------------------
# No-Growth Detection
# -----------------------------------------------------------------------------
[docs]
def detect_no_growth(
t,
N,
growth_stats=None,
min_data_points=5,
min_signal_to_noise=5.0,
min_od_increase=0.05,
min_growth_rate=1e-6,
):
"""
Detect whether a growth curve shows no significant growth.
Performs multiple checks to determine if a well should be marked as "no growth":
1. All OD values are <= 0
2. Insufficient N points
3. Low signal-to-noise ratio (max/min OD ratio)
4. Insufficient OD increase (flat curve)
5. Zero or near-zero growth rate (from fitted stats)
Parameters:
t: Time array
N: OD values (baseline-corrected)
growth_stats: Optional dict of fitted growth statistics
(from extract_stats or sliding_window_fit).
If provided, growth rate is checked.
min_data_points: Minimum number of valid N points required (default: 5)
min_signal_to_noise: Minimum ratio of max/min OD values (default: 5.0)
min_od_increase: Minimum absolute OD increase required (default: 0.05)
min_growth_rate: Minimum specific growth rate to be considered growth
(default: 1e-6)
Returns:
Dict with:
- is_no_growth: bool, True if no growth detected
- reason: str, description of why it was flagged (or "growth detected")
- checks: dict with individual check results
"""
t = np.asarray(t, dtype=float)
N = np.asarray(N, dtype=float)
# Check if all OD values are <= 0
if np.all(N <= 0):
return {
"is_no_growth": True,
"reason": "All OD values <= 0",
"checks": {
"has_sufficient_data": False,
"has_sufficient_snr": False,
"has_sufficient_od_increase": False,
"has_positive_growth_rate": False,
},
}
# Filter to finite positive values
mask = np.isfinite(t) & np.isfinite(N) & (N > 0)
t_valid = t[mask]
y_valid = N[mask]
checks = {
"has_sufficient_data": True,
"has_sufficient_snr": True,
"has_sufficient_od_increase": True,
"has_positive_growth_rate": True,
}
# Check 1: Minimum N points
if len(t_valid) < min_data_points:
checks["has_sufficient_data"] = False
return {
"is_no_growth": True,
"reason": f"Insufficient N points ({len(t_valid)} < {min_data_points})",
"checks": checks,
}
# Check 2: Signal-to-noise ratio (max/min OD)
y_min = np.min(y_valid)
y_max = np.max(y_valid)
if y_min > 0:
snr = y_max / y_min
else:
snr = np.inf if y_max > 0 else 0.0
if snr < min_signal_to_noise:
checks["has_sufficient_snr"] = False
r = f"Signal-to-noise ratio below threshold ({snr:.2f} < {min_signal_to_noise})"
return {
"is_no_growth": True,
"reason": r,
"checks": checks,
}
# Check 3: Minimum OD increase (detects flat curves)
od_increase = y_max - y_min
if od_increase < min_od_increase:
checks["has_sufficient_od_increase"] = False
return {
"is_no_growth": True,
"reason": (
f"Insufficient OD increase ({od_increase:.4f} < {min_od_increase})"
),
"checks": checks,
}
# Check 4: Growth rate from fitted statistics (if provided)
if growth_stats is not None:
mu = growth_stats.get("mu_max")
if mu is None or not np.isfinite(mu) or mu < min_growth_rate:
checks["has_positive_growth_rate"] = False
mu_str = f"{mu:.6f}" if mu is not None and np.isfinite(mu) else "N/A"
return {
"is_no_growth": True,
"reason": f"μ(max) below minimum ({mu_str} < {min_growth_rate})",
"checks": checks,
}
return {
"is_no_growth": False,
"reason": "growth detected",
"checks": checks,
}
[docs]
def is_no_growth(growth_stats):
"""
Simple check if growth stats indicate no growth (failed or missing fit).
This is a convenience function for quick checks on growth_stats dicts.
For more comprehensive checks including raw data analysis, use detect_no_growth().
Parameters:
growth_stats: Dict from extract_stats or sliding_window_fit
Returns:
bool: True if no growth detected (empty stats or zero growth rate)
"""
if not growth_stats:
return True
mu = growth_stats.get("mu_max", 0.0)
return mu is None or mu == 0.0
# -----------------------------------------------------------------------------
# Derivative and Growth Rate Calculation Functions
# -----------------------------------------------------------------------------
[docs]
def compute_first_derivative(t, N):
"""
Compute the first derivative of a growth curve.
Parameters
----------
t : array_like
Time array
N : array_like
OD600 values (baseline-corrected)
Returns
-------
tuple of (np.ndarray, np.ndarray)
Tuple of (t, dN) where dy is the first derivative dy/dt
"""
t = np.asarray(t, dtype=float)
N = np.asarray(N, dtype=float)
dN = np.gradient(N, t)
return t, dN
[docs]
def compute_instantaneous_mu(t, N):
"""
Compute the instantaneous specific growth rate (μ = 1/N × dN/dt).
The specific growth rate μ represents the rate of population growth per unit
of population. It is calculated as μ = (1/N) × d(N)/dt.
Parameters
----------
t : array_like
Time array
N : array_like
OD600 values (baseline-corrected)
Returns
-------
tuple of (np.ndarray, np.ndarray)
Tuple (t, mu) where mu is the specific growth rate μ = (1/N) × d(N)/dt
"""
t = np.asarray(t, dtype=float)
N = np.asarray(N, dtype=float)
dN = np.gradient(N, t)
# Avoid division by zero - set mu to nan where N is too small
mu = np.where(np.abs(N) > 1e-10, dN / N, np.nan)
return t, mu
[docs]
def compute_sliding_window_growth_rate(t, N, window_points=15):
"""
Compute instantaneous specific growth rate using a sliding window approach.
For each t point, fits a linear regression to log(N) vs t in a window
centered at that point. The slope of the regression is the instantaneous
specific growth rate μ at that t.
This method is more robust to noise than direct differentiation but requires
more N points.
Parameters
----------
t : array_like
Time array
N : array_like
OD600 values (baseline-corrected, must be positive)
window_points : int, optional
Number of points in each sliding window (default: 15)
Returns
-------
tuple of (np.ndarray, np.ndarray)
Tuple of (time_out, mu_out) where mu_out is the sliding window growth rate.
Returns arrays with NaN for points where window fitting failed.
"""
t = np.asarray(t, dtype=float)
N = np.asarray(N, dtype=float)
if len(t) < window_points:
return t, np.full_like(t, np.nan)
# Compute log(N), handling non-positive values
y_log = np.where(N > 0, np.log(N), np.nan)
mu = np.full_like(t, np.nan)
half_window = window_points // 2
for i in range(len(t)):
# Define window boundaries
start = max(0, i - half_window)
end = min(len(t), i + half_window + 1)
# Ensure window has enough points
if end - start < max(3, window_points // 2):
continue
t_win = t[start:end]
y_log_win = y_log[start:end]
# Skip if we have NaN values in the window
valid_mask = np.isfinite(y_log_win)
if valid_mask.sum() < 3:
continue
t_win_valid = t_win[valid_mask]
y_log_win_valid = y_log_win[valid_mask]
# Fit linear regression: log(N) = slope * t + intercept
# slope = μ (specific growth rate)
try:
if np.ptp(t_win_valid) > 0:
slope, _ = np.polyfit(t_win_valid, y_log_win_valid, 1)
if np.isfinite(slope):
mu[i] = slope
except (np.linalg.LinAlgError, ValueError):
continue
return t, mu
[docs]
def compare_methods(
t: np.ndarray,
N: np.ndarray,
model_family: str = "all",
phase_boundary_method: str = None,
**fit_kwargs,
) -> tuple:
"""
Fit multiple models and extract growth statistics for comparison.
This all-in-one convenience function fits all models in a specified family,
extracts their statistics, and returns both for analysis and visualization.
Parameters
----------
t : numpy.ndarray
Time array for fitting
N : numpy.ndarray
OD N array for fitting
model_family : str, optional
Which family of models to fit. Options:
- "mechanistic" : All mechanistic models (mech_logistic, mech_gompertz, etc.)
- "phenomenological" : All phenomenological models
- "all" : All available models
Default: "all"
phase_boundary_method : str, optional
Method for calculating phase boundaries ("threshold" or "tangent").
If None, uses default for each model type.
**fit_kwargs
Additional keyword arguments to pass to fitting functions
(e.g., spline_s=100, window_points=15)
Returns
-------
tuple of (dict, dict)
Returns (fits, stats) where:
- fits: Dictionary mapping method names to fit result dictionaries
- stats: Dictionary mapping method names to statistics dictionaries
The stats dictionary can be passed directly to plot_growth_stats_comparison().
Examples
--------
>>> # Fit and compare all mechanistic models
>>> fits, stats = gc.inference.compare_methods(t,
N,
model_family="mechanistic")
>>>
>>> # Plot comparison
>>> fig = gc.plot.plot_growth_stats_comparison(stats, title="Mechanistic Models")
>>> fig.show()
>>>
>>> # Fit phenomenological models with tangent phase boundaries
>>> fits, stats = gc.inference.compare_methods(
... t, N,
... model_family="phenomenological",
... phase_boundary_method="tangent"
... )
"""
# Get list of models to fit
if model_family == "all":
models_to_fit = (
MODEL_REGISTRY["mechanistic"]
+ MODEL_REGISTRY["phenomenological"]
+ MODEL_REGISTRY["non_parametric"]
)
elif model_family in MODEL_REGISTRY:
models_to_fit = MODEL_REGISTRY[model_family]
else:
raise ValueError(
f"Unknown model_family '{model_family}'. "
f"Choose from: {list(MODEL_REGISTRY.keys()) + ['all']}"
)
# spline_kwargs = ["spline_s", "k"]
# sliding_window_kwargs = ["window_points", "poly_order"]
# Fit all models
fits = {}
stats = {}
for model_name in models_to_fit:
fits[model_name], stats[model_name] = gc.fit_model(
t=t,
N=N,
model_name=model_name,
phase_boundary_method=phase_boundary_method,
**fit_kwargs,
)
return fits, stats