"""Galvanometric XY shift detection and correction."""
from __future__ import annotations
import numpy as np
from scipy.ndimage import median_filter
[docs]
def detect_galvo_band_in_tile(tile_aip: np.ndarray, min_drop_ratio: float = 0.40) -> tuple:
"""Detect a galvo return dark band in the AIP of a single assembled mosaic tile.
Companion to :func:`detect_galvo_shift` for use when only the assembled
OME-Zarr mosaic is available and the raw ``.bin`` tiles no longer exist.
Each zarr chunk corresponds to one OCT tile (the zarr chunk shape equals the
tile size), so this function can be run per chunk to detect and characterise
any unfixed galvo artifact.
Parameters
----------
tile_aip : np.ndarray
2-D average intensity projection of a single tile,
shape ``(n_alines, n_bscans)``.
min_drop_ratio : float
Minimum relative intensity drop compared to the surrounding tissue
baseline to be classified as a dark band. Default 0.40 (40 % drop).
Returns
-------
tuple
``(band_start, band_width, confidence)`` -- pixel coordinates of the
detected band within the tile (along the A-line axis) and a confidence
score in [0, 1]. Returns ``(0, 0, 0.0)`` when no band is detected.
"""
n_alines = tile_aip.shape[0]
profile = median_filter(tile_aip.mean(axis=1), size=5)
baseline = float(np.percentile(profile, 75))
if baseline <= 1.0:
return 0, 0, 0.0
threshold = baseline * (1.0 - min_drop_ratio)
dark_mask = profile < threshold
if not dark_mask.any():
return 0, 0, 0.0
dark_idx = np.where(dark_mask)[0]
gaps = np.where(np.diff(dark_idx) > 2)[0]
groups = np.split(dark_idx, gaps + 1) if len(gaps) else [dark_idx]
best_group = max(groups, key=lambda g: float(np.sum(threshold - profile[g].clip(max=threshold))))
band_start = int(best_group[0])
band_end = int(best_group[-1]) + 1
band_width = band_end - band_start
if band_width > n_alines * 0.20:
return 0, 0, 0.0
confidence = _compute_dark_band_confidence(tile_aip, band_start, band_end)
return band_start, band_width, float(confidence)
[docs]
def detect_galvo_shift(aip: np.ndarray, n_pixel_return: int = 40) -> tuple:
"""Detect galvo shift artifact in an average intensity projection.
The galvo return region creates a dark horizontal band in OCT data.
This function locates the band by finding gradient pairs separated by
n_pixel_return pixels, then validates using dark band consistency.
Parameters
----------
aip : np.ndarray
Average intensity projection of shape (n_alines, n_bscans).
n_pixel_return : int
Width of galvo return region in pixels (from acquisition metadata).
Returns
-------
tuple
(shift, confidence) where shift is the circular shift needed to move
the galvo region to the edge, and confidence (0-1) indicates detection
reliability. Apply fix when confidence >= 0.5.
"""
n_alines = aip.shape[0]
profile = median_filter(aip.mean(axis=1), 5)
gradient = np.abs(np.diff(profile))
n = len(gradient) - n_pixel_return
if n <= 0:
return 0, 0.0
similarities = gradient[:n] * gradient[n_pixel_return : n_pixel_return + n]
shift_idx = np.argmax(similarities)
shift = n_alines - shift_idx - n_pixel_return
boundary_pos = shift_idx
boundary_end = boundary_pos + n_pixel_return
confidence = _compute_dark_band_confidence(aip, int(boundary_pos), int(boundary_end))
return int(shift), float(confidence)
[docs]
def detect_galvo_for_slice(
tiles: list,
n_extra: int,
threshold: float = 0.6,
n_samples: int = 5,
axial_resolution: float | None = None,
min_intensity: float = 20.0,
) -> tuple:
"""Detect galvo shift for a slice by sampling multiple tiles.
Parameters
----------
tiles : list
List of tile paths for the slice.
n_extra : int
Number of extra A-lines (galvo return pixels) from acquisition metadata.
threshold : float
Confidence threshold for applying fix (default: 0.6).
n_samples : int
Maximum number of tiles to sample (default: 5).
axial_resolution : float, optional
Axial resolution for OCT loading.
min_intensity : float
Minimum mean intensity for a tile to be considered valid.
Returns
-------
tuple
(shift, confidence) where shift is 0 if confidence < threshold.
"""
from linumpy.microscope.oct import OCT
if not tiles or n_extra <= 0:
return 0, 0.0
n_tiles = len(tiles)
center_start = int(n_tiles * 0.2)
center_end = int(n_tiles * 0.8)
sample_indices = np.linspace(center_start, max(center_end - 1, center_start), min(n_samples, n_tiles), dtype=int)
sample_indices = list(dict.fromkeys(sample_indices))
detections = []
for idx in sample_indices:
if len(detections) >= n_samples:
break
oct_obj = OCT(tiles[idx], axial_resolution) if axial_resolution else OCT(tiles[idx])
vol = oct_obj.load_image(crop=False, fix_galvo_shift=False, fix_camera_shift=False)
aip = vol.mean(axis=0)
if np.mean(aip) < min_intensity:
continue
shift, conf = detect_galvo_shift(aip, n_pixel_return=n_extra)
detections.append((shift, conf))
if not detections:
return 0, 0.0
shifts = np.array([d[0] for d in detections])
confidences = np.array([d[1] for d in detections])
best_idx = np.argmax(confidences)
best_shift = shifts[best_idx]
best_confidence = confidences[best_idx]
if len(shifts) > 1:
shift_tolerance = max(n_extra // 4, 5)
n_consistent = np.sum(np.abs(shifts - best_shift) <= shift_tolerance)
consistency_factor = (n_consistent / len(shifts)) ** 0.5
best_confidence *= consistency_factor
if best_confidence >= threshold:
return int(best_shift), float(best_confidence)
return 0, float(best_confidence)
def _compute_dark_band_confidence(aip: np.ndarray, boundary_pos: int, boundary_end: int) -> float:
"""Compute confidence that a dark band exists at the detected position.
Real galvo artifacts create a consistent dark horizontal band visible
across all B-scans. This is the key discriminator vs tissue boundaries.
Parameters
----------
aip : np.ndarray
Average intensity projection of shape (n_alines, n_bscans).
boundary_pos : int
Start position of detected galvo region.
boundary_end : int
End position of detected galvo region.
Returns
-------
float
Confidence score (0-1).
"""
n_alines, n_bscans = aip.shape
n_pixel_return = boundary_end - boundary_pos
if boundary_pos < 0 or boundary_end > n_alines or n_pixel_return < 5:
return 0.0
margin = max(10, n_pixel_return // 2)
before_start = max(0, boundary_pos - margin * 2)
before_end = boundary_pos
after_start = boundary_end
after_end = min(n_alines, boundary_end + margin * 2)
if before_end <= before_start or after_end <= after_start:
return 0.0
n_check = min(n_bscans, 20)
column_indices = np.linspace(0, n_bscans - 1, n_check, dtype=int)
cols = aip[:, column_indices]
before_vals = cols[before_start:before_end, :].mean(axis=0)
galvo_vals = cols[boundary_pos:boundary_end, :].mean(axis=0)
after_vals = cols[after_start:after_end, :].mean(axis=0)
surrounding = (before_vals + after_vals) / 2
valid_mask = surrounding >= 10
valid_cols = int(np.sum(valid_mask))
if valid_cols == 0:
return 0.0
surrounding_v = surrounding[valid_mask]
galvo_v = galvo_vals[valid_mask]
drop_mask = galvo_v < surrounding_v
drop_count = int(np.sum(drop_mask))
rel_drops = np.where(drop_mask, (surrounding_v - galvo_v) / surrounding_v, 0.0)
total_drop = float(np.sum(rel_drops))
significant_drops = int(np.sum(rel_drops > 0.10))
consistency = drop_count / valid_cols
significant_ratio = significant_drops / valid_cols
avg_drop = total_drop / max(drop_count, 1)
if consistency < 0.5:
return consistency * 0.3
score = consistency * 0.40 + significant_ratio * 0.35 + min(avg_drop / 0.3, 1.0) * 0.25
return float(np.clip(score, 0.0, 1.0))
[docs]
def fix_galvo_shift(vol: np.ndarray, shift: int = 0, axis: int = 1) -> np.ndarray:
"""Apply circular shift to move galvo return region to edge of volume.
Parameters
----------
vol : np.ndarray
OCT volume data.
shift : int
Number of pixels to shift.
axis : int
Axis along which to shift (default: 1 for A-line axis).
Returns
-------
np.ndarray
Shifted volume. Crop with vol[:, :n_alines, :] to remove galvo region.
"""
if shift == 0:
return vol
return np.roll(vol, shift, axis=axis)