Source code for growthcurves.preprocessing

"""Data preprocessing utilities for growth curve analysis.

This module provides functions for common preprocessing steps such as blank
subtraction and path length correction.
"""

from functools import partial

import numpy as np
from pyod.models.ecod import ECOD


[docs] def blank_subtraction(N: np.ndarray, blank: np.ndarray) -> np.ndarray: """ Subtract blank values from time data series of growth measurements. Performs element-wise subtraction of blank measurements from measurements. This is commonly used for baseline correction in optical density measurements. Parameters ---------- N : numpy.ndarray Data series to be corrected (e.g., OD measurements) blank : numpy.ndarray Blank/background measurements to subtract. Must be the same length as N, or a scalar value to subtract from all N points. Returns ------- numpy.ndarray Blank-subtracted N Examples -------- >>> import numpy as np >>> N = np.array([0.5, 0.6, 0.7, 0.8]) >>> blank = np.array([0.05, 0.05, 0.05, 0.05]) >>> corrected = blank_subtraction(N, blank) >>> corrected array([0.45, 0.55, 0.65, 0.75]) """ # Handle numpy arrays data_array = np.asarray(N, dtype=float) blank_array = np.asarray(blank, dtype=float) return data_array - blank_array
[docs] def path_correct(N: np.ndarray, path_length_cm: float) -> np.ndarray: """ Correct optical density measurements to a standard 1 cm path length. Normalizes OD measurements taken at a specific path length to what they would be at a 1 cm path length using Beer-Lambert law (OD is proportional to path length). Parameters ---------- N : numpy.ndarray Optical density measurements to correct path_length_cm : float Actual path length of the measurement in centimeters (must be > 0) Returns ------- numpy.ndarray Path-corrected N normalized to 1 cm path length Raises ------ ValueError If path_length_cm is not positive Examples -------- >>> import numpy as np >>> # Measurement taken with 0.5 cm path length >>> N = np.array([0.25, 0.30, 0.35]) >>> corrected = path_correct(N, path_length_cm=0.5) >>> corrected # OD values as if measured with 1 cm path array([0.5, 0.6, 0.7]) >>> # Measurement taken with 2 cm path length >>> N = np.array([1.0, 1.2, 1.4]) >>> corrected = path_correct(N, path_length_cm=2.0) >>> corrected array([0.5, 0.6, 0.7]) Notes ----- The correction uses the relationship: OD_1cm = OD_measured / path_length This assumes the Beer-Lambert law holds (linear relationship between absorbance and path length), which is typically valid for OD < 1.0-1.5. """ if path_length_cm <= 0: raise ValueError(f"Path length must be positive, got {path_length_cm} cm") # Handle numpy arrays data_array = np.asarray(N, dtype=float) return data_array / float(path_length_cm)
[docs] def out_of_iqr_window( values: np.ndarray, factor: float = 1.5, position: str = "center" ) -> bool: """Return True if the selected value is an outlier based on the IQR method. Parameters ---------- values : numpy.ndarray Input window of values. factor : float, default=1.5 IQR multiplier used to define outlier bounds. position : {"center", "first", "last"}, default="center" Which value in the window to test as the target point. Raises ------ ValueError If `position` is invalid. If `position="center"` and the input array does not have an odd number of elements. """ if position == "center": if len(values) % 2 == 0: raise ValueError( "Input array must have an odd number of elements when " "position='center'." ) center = values[len(values) // 2] elif position == "first": center = values[0] elif position == "last": center = values[-1] else: raise ValueError("position must be one of: 'center', 'first', 'last'.") if np.isnan(center): return False q1 = np.nanquantile(values, 0.25) q3 = np.nanquantile(values, 0.75) iqr = q3 - q1 lower_bound = q1 - factor * iqr upper_bound = q3 + factor * iqr return (center < lower_bound) or (center > upper_bound)
[docs] def detect_outliers_iqr(N: np.array, window_size: int, factor: float = 1.5) -> np.array: """Return a boolean array indicating whether each value is an outlier based on the IQR method. The sliding window size gives for the middle values the central window points IQR status. For the first and last points in data series N, the first and last point in window is respectively used instead of the center to label a value as an outlier. """ edge_idx = window_size // 2 windows = np.lib.stride_tricks.sliding_window_view(N, window_size) # use partial to fix factor argument for out_of_iqr_window _out_of_iqr_window = partial(out_of_iqr_window, factor=factor) window_flags = np.apply_along_axis(_out_of_iqr_window, 1, windows) edge_window_size = window_size + window_size // 2 - 1 start_windows = np.lib.stride_tricks.sliding_window_view( N[:edge_window_size], window_size ) start_window_mask = np.apply_along_axis( _out_of_iqr_window, 1, start_windows, position="first", # passed to out_of_iqr_window ) end_windows = np.lib.stride_tricks.sliding_window_view( N[-(edge_window_size):], window_size ) end_window_mask = np.apply_along_axis( _out_of_iqr_window, 1, end_windows, position="last", # passed to out_of_iqr_window ) # create mask of same length as array N mask = np.full(N.shape, False) mask[:edge_idx] = start_window_mask # [:edge_idx] mask[edge_idx:-edge_idx] = window_flags mask[-edge_idx:] = end_window_mask return mask
[docs] def detect_outliers_hampel( N: np.ndarray, window: int = 15, factor: float = 3.0 ) -> np.ndarray: """Return a boolean array indicating whether each value is an outlier using the Hampel identifier. For each point, computes the median and MAD of a symmetric neighbourhood window and flags points whose MAD z-score exceeds `factor`. Parameters ---------- N : numpy.ndarray Input time series of OD values. window : int, default=15 Total number of neighbours to include (window // 2 on each side). factor : float, default=3.0 MAD z-score threshold. Points with score > factor are flagged. Returns ------- numpy.ndarray Boolean mask of the same length as N where True indicates an outlier. """ n = len(N) half = window // 2 mask = np.zeros(n, dtype=bool) for i in range(n): nb = N[max(0, i - half) : min(n, i + half + 1)] med = np.median(nb) mad = np.median(np.abs(nb - med)) if mad > 1e-12: mask[i] = abs(N[i] - med) / (1.4826 * mad) > factor return mask
[docs] def detect_outliers_ecod(N: np.ndarray, factor: float = 3.5) -> np.ndarray: """Return a boolean array indicating whether each value is an outlier using ECOD (Empirical Cumulative Distribution-based Outlier Detection). Builds a 3-feature matrix per point — absolute rolling-mean residual, raw OD value, and first difference — then fits ECOD and flags points whose MAD z-score of the decision score exceeds `factor`. Parameters ---------- N : numpy.ndarray Input time series of OD values. factor : float, default=3.5 MAD z-score threshold for flagging outliers. Higher values flag fewer, more extreme points. Returns ------- numpy.ndarray Boolean mask of the same length as N where True indicates an outlier. """ n = len(N) half = 15 // 2 residual = np.zeros(n) for i in range(n): win = N[max(0, i - half) : min(n, i + half + 1)] residual[i] = N[i] - win.mean() d = np.diff(N, prepend=N[0]) X = np.column_stack([np.abs(residual), N, d]) clf = ECOD() clf.fit(X) scores = clf.decision_scores_ med = np.median(scores) mad = np.median(np.abs(scores - med)) if mad < 1e-12: return np.zeros(n, dtype=bool) mad_z = np.abs(scores - med) / (1.4826 * mad) return mad_z > factor
[docs] def detect_outliers(N: np.ndarray, method: str = "ecod", **kwargs) -> np.ndarray: """Detect outliers in a growth curve time series. Entry point that dispatches to the chosen detection method. All methods return a boolean mask of the same length as ``N``. Parameters ---------- N : numpy.ndarray Input time series of OD values. method : {"iqr", "ecod", "hampel"}, default="ecod" Outlier detection method to use: - ``"iqr"`` — sliding-window IQR method (:func:`detect_outliers_iqr`). Kwargs: ``window_size`` (int, required), ``factor`` (float, default 1.5). - ``"ecod"`` — ECOD method (:func:`detect_outliers_ecod`). Kwargs: ``factor`` (float, default 3.5). - ``"hampel"`` — Hampel identifier (:func:`detect_outliers_hampel`). Kwargs: ``window`` (int, default 15), ``factor`` (float, default 3.0). **kwargs Additional keyword arguments forwarded to the chosen method. Returns ------- numpy.ndarray Boolean mask of the same length as N where True indicates an outlier. Raises ------ ValueError If ``method`` is not recognised. Examples -------- >>> mask = detect_outliers(N, method="iqr", window_size=11, factor=1.5) >>> mask = detect_outliers(N, method="ecod", factor=3.5) >>> mask = detect_outliers(N, method="hampel", window=15, factor=3.0) """ if method == "iqr": return detect_outliers_iqr(N, **kwargs) elif method == "ecod": return detect_outliers_ecod(N, **kwargs) elif method == "hampel": return detect_outliers_hampel(N, **kwargs) else: raise ValueError( f"Unknown method '{method}'. Choose from: 'iqr', 'ecod', 'hampel'." )
if __name__ == "__main__": # Example usage data = np.array([20, 1, 2, 3, 4, 5, 20, 6, 7, 8, 9, 10, 25]) print(out_of_iqr_window(data)) # rolling windows of size 5: shape -> (len(data)-4, 5) window_size = 5 edge_idx = window_size // 2 windows = np.lib.stride_tricks.sliding_window_view(data, window_size) # outlier flag for each 5-value window (checks center element of each window) # center points (indices 2..-3) window_flags = np.apply_along_axis(out_of_iqr_window, 1, windows) edge_window_size = window_size + window_size // 2 - 1 start_windows = np.lib.stride_tricks.sliding_window_view( data[:edge_window_size], window_size ) start_window_flags = np.apply_along_axis( out_of_iqr_window, 1, start_windows, position="first", # passed to out_of_iqr_window ) end_windows = np.lib.stride_tricks.sliding_window_view( data[-(edge_window_size):], window_size ) end_window_flags = np.apply_along_axis( out_of_iqr_window, 1, end_windows, position="last", # passed to out_of_iqr_window ) print("center windows flags:", window_flags) print("start edge windows:", start_windows) print("start edge window flags:", start_window_flags) print("end edge windows:", end_windows) print("end edge window flags:", end_window_flags) # optional: align back to original array length (center indices only) flags = np.full(data.shape, False) flags[:edge_idx] = start_window_flags # [:edge_idx] flags[edge_idx:-edge_idx] = window_flags flags[-edge_idx:] = end_window_flags print("windows:\n", windows) print("window_flags:", window_flags) print("aligned flags:", flags) mask = detect_outliers_iqr(data, window_size=5) print("final mask:", mask) assert (mask == flags).all()