Source code for linumpy.gpu.array_ops

"""GPU-accelerated array operations for linumpy.

Provides GPU versions of normalization, clipping, and thresholding.
Note: Simple reductions (mean, max) should use numpy directly - GPU offers no benefit.
"""

from typing import Any

import numpy as np

from . import GPU_AVAILABLE, to_cpu


[docs] def normalize_percentile(image: Any, p_low: Any = 1, p_high: Any = 99, use_gpu: Any = True) -> Any: """ GPU-accelerated percentile-based normalization. Parameters ---------- image : np.ndarray Input image p_low : float Lower percentile for normalization (0-100) p_high : float Upper percentile for normalization (0-100) use_gpu : bool Whether to use GPU Returns ------- np.ndarray Normalized image in [0, 1] range """ if use_gpu and GPU_AVAILABLE: import cupy as cp img_gpu = cp.asarray(image.astype(np.float32)) low, high = cp.percentile(img_gpu, [p_low, p_high]) if high - low < 1e-10: return to_cpu(cp.zeros_like(img_gpu)) normalized = (img_gpu - low) / (high - low) normalized = cp.clip(normalized, 0, 1) return to_cpu(normalized) else: low, high = np.percentile(image, [p_low, p_high]) if high - low < 1e-10: return np.zeros_like(image, dtype=np.float32) normalized = (image - low) / (high - low) return np.clip(normalized, 0, 1).astype(np.float32)
[docs] def normalize_minmax(image: Any, use_gpu: Any = True) -> Any: """ GPU-accelerated min-max normalization. Parameters ---------- image : np.ndarray Input image use_gpu : bool Whether to use GPU Returns ------- np.ndarray Normalized image in [0, 1] range """ if use_gpu and GPU_AVAILABLE: import cupy as cp img_gpu = cp.asarray(image.astype(np.float32)) vmin, vmax = cp.min(img_gpu), cp.max(img_gpu) if vmax - vmin < 1e-10: return to_cpu(cp.zeros_like(img_gpu)) normalized = (img_gpu - vmin) / (vmax - vmin) return to_cpu(normalized) else: vmin, vmax = np.min(image), np.max(image) if vmax - vmin < 1e-10: return np.zeros_like(image, dtype=np.float32) return ((image - vmin) / (vmax - vmin)).astype(np.float32)
[docs] def clip_percentile(image: Any, p_low: Any = 0.5, p_high: Any = 99.5, use_gpu: Any = True) -> Any: """ GPU-accelerated percentile clipping. Parameters ---------- image : np.ndarray Input image p_low : float Lower percentile to clip p_high : float Upper percentile to clip use_gpu : bool Whether to use GPU Returns ------- np.ndarray Clipped image """ if use_gpu and GPU_AVAILABLE: import cupy as cp img_gpu = cp.asarray(image) low, high = cp.percentile(img_gpu, [p_low, p_high]) clipped = cp.clip(img_gpu, low, high) return to_cpu(clipped) else: low, high = np.percentile(image, [p_low, p_high]) return np.clip(image, low, high)
[docs] def compute_percentiles_memory_efficient( image: np.ndarray, percentiles: list, use_gpu: bool = True, max_samples: int = 10_000_000 ) -> list: """ Compute percentiles using subsampling to reduce memory usage. For large arrays, computing exact percentiles requires sorting the entire array, which can cause memory issues. This function uses random subsampling to estimate percentiles with minimal memory overhead. Parameters ---------- image : np.ndarray Input image percentiles : list List of percentiles to compute (0-100) use_gpu : bool Whether to use GPU max_samples : int Maximum number of samples to use for percentile estimation Returns ------- list Computed percentile values """ flat = image.ravel() # Subsample if the array is too large if flat.size > max_samples: # Use random sampling for memory efficiency rng = np.random.default_rng(42) # Fixed seed for reproducibility indices = rng.choice(flat.size, size=max_samples, replace=False) sample = flat[indices] else: sample = flat if use_gpu and GPU_AVAILABLE: import cupy as cp try: sample_gpu = cp.asarray(sample) result = [float(cp.percentile(sample_gpu, p).get()) for p in percentiles] del sample_gpu cp.get_default_memory_pool().free_all_blocks() return result except cp.cuda.memory.OutOfMemoryError: # Fall back to CPU if GPU runs out of memory pass return [float(np.percentile(sample, p)) for p in percentiles]
[docs] def compute_nonzero_percentile_memory_efficient( image: np.ndarray, percentile: float, use_gpu: bool = True, max_samples: int = 10_000_000 ) -> float: """ Compute percentile of non-zero values using subsampling. Parameters ---------- image : np.ndarray Input image percentile : float Percentile to compute (0-100) use_gpu : bool Whether to use GPU max_samples : int Maximum number of samples to use Returns ------- float Computed percentile value """ flat = image.ravel() nonzero_mask = flat > 0 nonzero_vals = flat[nonzero_mask] if nonzero_vals.size == 0: return 0.0 # Subsample if too large if nonzero_vals.size > max_samples: rng = np.random.default_rng(42) indices = rng.choice(nonzero_vals.size, size=max_samples, replace=False) sample = nonzero_vals[indices] else: sample = nonzero_vals if use_gpu and GPU_AVAILABLE: import cupy as cp try: sample_gpu = cp.asarray(sample) result = float(cp.percentile(sample_gpu, percentile).get()) del sample_gpu cp.get_default_memory_pool().free_all_blocks() return result except cp.cuda.memory.OutOfMemoryError: pass return float(np.percentile(sample, percentile))
[docs] def apply_flatfield_correction(image: Any, flatfield: Any, darkfield: Any = None, use_gpu: Any = True) -> Any: """ GPU-accelerated flatfield correction. Corrected = (Image - Darkfield) / (Flatfield - Darkfield) Parameters ---------- image : np.ndarray Input image flatfield : np.ndarray Flatfield image darkfield : np.ndarray, optional Darkfield image use_gpu : bool Whether to use GPU Returns ------- np.ndarray Corrected image """ if use_gpu and GPU_AVAILABLE: import cupy as cp img_gpu = cp.asarray(image.astype(np.float32)) flat_gpu = cp.asarray(flatfield.astype(np.float32)) if darkfield is not None: dark_gpu = cp.asarray(darkfield.astype(np.float32)) numerator = img_gpu - dark_gpu denominator = flat_gpu - dark_gpu else: numerator = img_gpu denominator = flat_gpu # Avoid division by zero denominator = cp.where(cp.abs(denominator) < 1e-10, 1.0, denominator) corrected = numerator / denominator return to_cpu(corrected) else: if darkfield is not None: numerator = image.astype(np.float32) - darkfield denominator = flatfield.astype(np.float32) - darkfield else: numerator = image.astype(np.float32) denominator = flatfield.astype(np.float32) denominator = np.where(np.abs(denominator) < 1e-10, 1.0, denominator) return numerator / denominator
[docs] def compute_std_projection(volume: Any, axis: Any = 0, use_gpu: Any = True) -> Any: """ GPU-accelerated standard deviation projection. Parameters ---------- volume : np.ndarray Input volume axis : int Axis along which to compute std use_gpu : bool Whether to use GPU Returns ------- np.ndarray Standard deviation projection """ if use_gpu and GPU_AVAILABLE: import cupy as cp vol_gpu = cp.asarray(volume) result = cp.std(vol_gpu, axis=axis) return to_cpu(result) else: return np.std(volume, axis=axis)
[docs] def threshold_otsu(image: Any, use_gpu: Any = True) -> Any: """ GPU-accelerated Otsu thresholding. Parameters ---------- image : np.ndarray Input image use_gpu : bool Whether to use GPU Returns ------- float Otsu threshold value """ if use_gpu and GPU_AVAILABLE: import cupy as cp img_gpu = cp.asarray(image.astype(np.float32)) # Compute histogram hist, bin_edges = cp.histogram(img_gpu.ravel(), bins=256) bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 hist = hist.astype(cp.float64) hist_norm = hist / hist.sum() # Cumulative sums weight1 = cp.cumsum(hist_norm) weight2 = cp.cumsum(hist_norm[::-1])[::-1] # Cumulative means mean1 = cp.cumsum(hist_norm * bin_centers) / weight1 mean2 = (cp.cumsum((hist_norm * bin_centers)[::-1]) / weight2[::-1])[::-1] # Between-class variance variance = weight1[:-1] * weight2[1:] * (mean1[:-1] - mean2[1:]) ** 2 # Find maximum idx = cp.argmax(variance) threshold = float(bin_centers[idx].get()) # Free GPU memory del img_gpu, hist, bin_edges, bin_centers, hist_norm, weight1, weight2, mean1, mean2, variance cp.get_default_memory_pool().free_all_blocks() return threshold else: from skimage.filters import threshold_otsu as sk_otsu return sk_otsu(image)
[docs] def apply_xy_shift(image: Any, _reference: Any, dy: Any, dx: Any, use_gpu: Any = True) -> Any: """ GPU-accelerated XY shift application. Parameters ---------- image : np.ndarray Image to shift reference : np.ndarray Reference image (determines output shape) dy : float Y shift in pixels dx : float X shift in pixels use_gpu : bool Whether to use GPU Returns ------- np.ndarray Shifted image """ # Get a representative non-zero value for out-of-bounds fill nonzero_vals = image[image > 0] cval = float(np.percentile(nonzero_vals, 1)) if len(nonzero_vals) > 0 else 0.0 if use_gpu and GPU_AVAILABLE: import cupy as cp from cupyx.scipy.ndimage import shift as cp_shift img_gpu = cp.asarray(image.astype(np.float32)) # Apply shift with edge value fill to avoid black dots if image.ndim == 2: shifted = cp_shift(img_gpu, [dy, dx], order=1, cval=cval) else: # 3D shifted = cp_shift(img_gpu, [0, dy, dx], order=1, cval=cval) return to_cpu(shifted) else: from scipy.ndimage import shift as scipy_shift if image.ndim == 2: return scipy_shift(image, [dy, dx], order=1, cval=cval) else: return scipy_shift(image, [0, dy, dx], order=1, cval=cval)