"""
Motor-position-based tile placement for mosaic stitching.
Consolidated from linum_stitch_3d_refined.py and linum_stitch_motor_only.py.
"""
import logging
from pathlib import Path
from typing import Any
import numpy as np
[docs]
logger = logging.getLogger(__name__)
[docs]
def compute_motor_positions(
nx: int, ny: int, tile_shape: tuple, overlap_fraction: float, scale_factor: float = 1.0, rotation_deg: float = 0.0
) -> tuple:
"""Compute tile positions based on motor grid (ideal positions).
Assumes a regular grid where tiles are spaced by (1 - overlap) * tile_size.
Optionally applies scale factor and rotation to test hypotheses about
stage calibration issues.
Parameters
----------
nx, ny : int
Number of tiles in each direction.
tile_shape : tuple
Tile dimensions (z, height, width).
overlap_fraction : float
Expected overlap between tiles (0-1).
scale_factor : float
Scale applied to step size (default 1.0 = no scaling).
rotation_deg : float
Global grid rotation in degrees (default 0.0).
Returns
-------
positions : list
List of (row_pos, col_pos) pixel positions for each tile.
step_y : int
Y step in pixels.
step_x : int
X step in pixels.
"""
tile_height, tile_width = tile_shape[1], tile_shape[2]
step_y = int(tile_height * (1.0 - overlap_fraction))
step_x = int(tile_width * (1.0 - overlap_fraction))
step_y = int(step_y * scale_factor)
step_x = int(step_x * scale_factor)
rotation_matrix: np.ndarray | None = None
if rotation_deg != 0.0:
theta = np.radians(rotation_deg)
cos_t, sin_t = np.cos(theta), np.sin(theta)
rotation_matrix = np.array([[cos_t, -sin_t], [sin_t, cos_t]])
positions = []
for i in range(nx):
for j in range(ny):
pos = np.array([i * step_y, j * step_x])
if rotation_deg != 0.0 and rotation_matrix is not None:
pos = np.dot(rotation_matrix, pos)
positions.append(pos.astype(int) if rotation_deg != 0.0 else (int(pos[0]), int(pos[1])))
return positions, step_y, step_x
[docs]
def compute_registration_refinements(
volume: np.ndarray,
tile_shape: tuple,
nx: int,
ny: int,
overlap_fraction: float,
max_refinement_px: float = 10.0,
*,
histogram_match: bool = False,
max_empty_fraction: float | None = None,
use_gpu: bool = False,
) -> dict:
"""Correlate neighboring tiles within a slice to measure displacement errors.
Phase-correlates overlapping regions of adjacent tiles (horizontal and
vertical neighbors) to measure the difference between expected and actual
tile positions. Returns both clamped residuals for blend refinement and
unclamped absolute displacements for fitting the affine displacement model
(Lefebvre et al. 2017, Eqs 1-6).
Note: this operates on tiles *within a single slice* -- it is entirely
separate from the Z-slice pairwise registration (``linum_register_pairwise.py``).
Parameters
----------
volume : np.ndarray
The mosaic grid volume (Z, nx*tile_h, ny*tile_w).
tile_shape : tuple
Tile dimensions (z, height, width).
nx, ny : int
Number of tiles in each direction.
overlap_fraction : float
Expected overlap fraction (0-1).
max_refinement_px : float
Maximum residual shift retained for blend refinement. Larger residuals
are clamped. Does not affect the absolute displacements in 'pairs'.
histogram_match : bool, keyword-only
If True, match the intensity histogram of the second overlap to the
first before phase correlation. Improves robustness when tile-edge
illumination is uneven; disabled by default to preserve existing
behaviour.
max_empty_fraction : float or None, keyword-only
If set, use an Otsu threshold on the central plane to classify
tissue vs background, and skip any pair whose overlap contains more
than this fraction of background pixels (mirrors the behaviour of
``linumpy.registration.transforms.estimate_mosaic_transform``).
When ``None`` (default), the prior ``mean(overlap > 0) < 0.1``
heuristic is used.
use_gpu : bool, keyword-only
If True, run the pairwise phase correlations via
:func:`linumpy.gpu.fft_ops.phase_correlation` (CuPy-accelerated).
Falls back silently to the CPU path when CuPy / a CUDA device is
not available. Default is False.
Returns
-------
dict with keys 'horizontal', 'vertical', 'pairs', 'stats'.
'pairs' is a list of dicts with keys 'row_delta', 'col_delta',
'measured_dy', 'measured_dx' -- the absolute observed pixel
displacements used for affine model estimation.
"""
from linumpy.registration.transforms import pair_wise_phase_correlation
gpu_phase_correlation: Any = None
if use_gpu:
try:
from linumpy.gpu import GPU_AVAILABLE
from linumpy.gpu.fft_ops import phase_correlation as _gpu_phase_correlation
if GPU_AVAILABLE:
gpu_phase_correlation = _gpu_phase_correlation
else:
logger.info("use_gpu=True but no CUDA device detected; falling back to CPU phase correlation")
except ImportError as e:
logger.info("use_gpu=True but GPU stack unavailable (%s); falling back to CPU", e)
def _phase_correlate(ov1: np.ndarray, ov2: np.ndarray) -> tuple[float, float]:
"""Return (axis-0 shift, axis-1 shift) for vol2 relative to vol1."""
if gpu_phase_correlation is not None:
translation, _ = gpu_phase_correlation(ov1, ov2, use_gpu=True)
return float(translation[0]), float(translation[1])
axis0, axis1 = pair_wise_phase_correlation(ov1, ov2)
return float(axis0), float(axis1)
tile_height, tile_width = tile_shape[1], tile_shape[2]
overlap_y = int(tile_height * overlap_fraction)
overlap_x = int(tile_width * overlap_fraction)
# Expected step sizes (what a diagonal model would predict)
step_y = tile_height * (1.0 - overlap_fraction)
step_x = tile_width * (1.0 - overlap_fraction)
refinements = {
"horizontal": {},
"vertical": {},
"pairs": [], # absolute displacements for affine estimation
"stats": {"total_pairs": 0, "valid_pairs": 0, "clamped_pairs": 0, "mean_refinement": 0.0, "max_refinement": 0.0},
}
all_shifts = []
z_mid = volume.shape[0] // 2
empty_threshold: float | None = None
if max_empty_fraction is not None:
from skimage.filters import threshold_otsu
plane = np.asarray(volume[z_mid])
positive = plane[plane > 0]
if positive.size > 0:
empty_threshold = float(threshold_otsu(positive))
match_histograms_fn = None
if histogram_match:
from skimage.exposure import match_histograms as _match_histograms
match_histograms_fn = _match_histograms
def _is_empty(ov: np.ndarray) -> bool:
if empty_threshold is not None and max_empty_fraction is not None:
return bool(np.sum(ov <= empty_threshold) > max_empty_fraction * ov.size)
return bool(np.mean(ov > 0) < 0.1)
# Horizontal refinements (between columns: tile (i,j) → (i,j+1))
# The expected displacement is (0, step_x); registration measures residual
for i in range(nx):
for j in range(ny - 1):
r1_start = i * tile_height
r1_end = (i + 1) * tile_height
c1_end = (j + 1) * tile_width
c2_start = (j + 1) * tile_width
overlap1 = volume[z_mid, r1_start:r1_end, c1_end - overlap_x : c1_end]
overlap2 = volume[z_mid, r1_start:r1_end, c2_start : c2_start + overlap_x]
if _is_empty(overlap1) or _is_empty(overlap2):
continue
if match_histograms_fn is not None:
overlap2 = match_histograms_fn(overlap2, overlap1)
refinements["stats"]["total_pairs"] += 1
try:
dy, dx = _phase_correlate(overlap1, overlap2)
# Store absolute displacement for affine estimation (unclamped)
# Horizontal pair: row_delta=0, col_delta=1
# Measured position = expected_step + residual
refinements["pairs"].append(
{
"row_delta": 0,
"col_delta": 1,
"measured_dy": float(dy), # cross-axis residual
"measured_dx": float(step_x + dx), # along-axis: step + residual
}
)
magnitude = np.sqrt(dx**2 + dy**2)
if magnitude > max_refinement_px:
scale = max_refinement_px / magnitude
dx *= scale
dy *= scale
refinements["stats"]["clamped_pairs"] += 1
refinements["horizontal"][(i, j)] = {"dx": float(dx), "dy": float(dy)}
refinements["stats"]["valid_pairs"] += 1
all_shifts.append(magnitude)
except Exception as e:
logger.debug("Registration failed for h-pair (%d,%d)-(%d,%d): %s", i, j, i, j + 1, e)
# Vertical refinements (between rows: tile (i,j) → (i+1,j))
# The expected displacement is (step_y, 0); registration measures residual
for i in range(nx - 1):
for j in range(ny):
r1_end = (i + 1) * tile_height
r2_start = (i + 1) * tile_height
c_start = j * tile_width
c_end = (j + 1) * tile_width
overlap1 = volume[z_mid, r1_end - overlap_y : r1_end, c_start:c_end]
overlap2 = volume[z_mid, r2_start : r2_start + overlap_y, c_start:c_end]
if _is_empty(overlap1) or _is_empty(overlap2):
continue
if match_histograms_fn is not None:
overlap2 = match_histograms_fn(overlap2, overlap1)
refinements["stats"]["total_pairs"] += 1
try:
dy, dx = _phase_correlate(overlap1, overlap2)
# Store absolute displacement for affine estimation (unclamped)
# Vertical pair: row_delta=1, col_delta=0
refinements["pairs"].append(
{
"row_delta": 1,
"col_delta": 0,
"measured_dy": float(step_y + dy), # along-axis: step + residual
"measured_dx": float(dx), # cross-axis residual
}
)
magnitude = np.sqrt(dx**2 + dy**2)
if magnitude > max_refinement_px:
scale = max_refinement_px / magnitude
dx *= scale
dy *= scale
refinements["stats"]["clamped_pairs"] += 1
refinements["vertical"][(i, j)] = {"dx": float(dx), "dy": float(dy)}
refinements["stats"]["valid_pairs"] += 1
all_shifts.append(magnitude)
except Exception as e:
logger.debug("Registration failed for v-pair (%d,%d)-(%d,%d): %s", i, j, i + 1, j, e)
if all_shifts:
refinements["stats"]["mean_refinement"] = float(np.mean(all_shifts))
refinements["stats"]["max_refinement"] = float(np.max(all_shifts))
return refinements
[docs]
def estimate_affine_from_pairs(pairs: list, tile_shape: tuple, overlap_fraction: float) -> tuple[np.ndarray, dict]:
"""Estimate a 2x2 affine displacement model from neighbor tile correlations.
Fits the Lefebvre et al. (2017) motor displacement model using
least-squares on the absolute (step + residual) displacements returned
by :func:`compute_registration_refinements`.
Note: this uses phase correlation between *neighboring tiles within a
single slice*, not the Z-slice pairwise registration that appears
elsewhere in the pipeline.
The model is: ``pixel_pos = A @ [i, j]^T`` where *A* is a general 2x2
matrix. Off-diagonal terms capture the scan-to-stage rotation (θ) and
the non-perpendicularity of the motor axes (φ).
Parameters
----------
pairs : list of dict
Each dict has 'row_delta', 'col_delta', 'measured_dy', 'measured_dx'.
tile_shape : tuple
Tile dimensions (z, height, width).
overlap_fraction : float
Expected overlap fraction (for diagnostics only).
Returns
-------
transform : np.ndarray
Fitted 2x2 affine matrix mapping tile index to pixel position.
diagnostics : dict
Extracted displacement model parameters (θ, φ, Ox, Oy) and fit
residual statistics.
"""
if not pairs:
# Fallback to diagonal model
step_y = tile_shape[1] * (1.0 - overlap_fraction)
step_x = tile_shape[2] * (1.0 - overlap_fraction)
transform = np.array([[step_y, 0.0], [0.0, step_x]])
return transform, {"fallback": True, "reason": "no pairs"}
n = len(pairs)
# System: A_mat @ x = b_vec, where A_mat has rows [r, c, 0, 0] (for dy) and [0, 0, r, c] (for dx),
# and x = [a, b, c, d]^T are the four elements of the 2x2 transform matrix.
a_mat = np.zeros((2 * n, 4))
b_vec = np.zeros((2 * n, 1))
for idx, p in enumerate(pairs):
r, c = p["row_delta"], p["col_delta"]
a_mat[2 * idx, :] = [r, c, 0, 0]
b_vec[2 * idx, 0] = p["measured_dy"]
a_mat[2 * idx + 1, :] = [0, 0, r, c]
b_vec[2 * idx + 1, 0] = p["measured_dx"]
result = np.linalg.lstsq(a_mat, b_vec, rcond=None)
transform = result[0].reshape((2, 2))
residuals = result[1] if len(result[1]) > 0 else np.array([0.0])
# Extract Lefebvre displacement model parameters for diagnostics
diagnostics = _extract_displacement_params(transform, tile_shape, overlap_fraction)
diagnostics["n_pairs"] = n
diagnostics["lstsq_residual"] = float(np.sum(residuals))
diagnostics["fallback"] = False
return transform, diagnostics
[docs]
def pool_pairs_and_fit_global_affine(
volumes: list[tuple[str, Any]],
overlap_fraction: float,
*,
histogram_match: bool = False,
max_empty_fraction: float | None = None,
n_samples: int | None = None,
seed: int = 0,
use_gpu: bool = False,
) -> tuple[np.ndarray, dict]:
"""Pool neighbor-tile pair measurements across many mosaic grids and fit one affine.
For each ``(slice_id, path)`` entry, load only the central Z plane of the
OME-Zarr volume and call :func:`compute_registration_refinements` with the
supplied options. All resulting pairs are concatenated, optionally
sub-sampled with a deterministic seed, and fed to
:func:`estimate_affine_from_pairs` for a single 2x2 affine fit.
Parameters
----------
volumes : list of (slice_id, path)
Each ``path`` must be a string or :class:`pathlib.Path` pointing at a
``*.ome.zarr`` mosaic grid.
overlap_fraction : float
Expected tile overlap fraction (must match acquisition).
histogram_match : bool, keyword-only
Forwarded to :func:`compute_registration_refinements`.
max_empty_fraction : float or None, keyword-only
Forwarded to :func:`compute_registration_refinements`.
n_samples : int or None, keyword-only
If set and the pooled pair count exceeds this value, a reproducible
random sub-sample of size ``n_samples`` is drawn before fitting.
seed : int, keyword-only
Seed used when sub-sampling. Ignored when ``n_samples`` is None.
use_gpu : bool, keyword-only
Forwarded to :func:`compute_registration_refinements`.
Returns
-------
transform : np.ndarray
Fitted 2x2 affine matrix.
diagnostics : dict
Full diagnostics including per-slice stats, pooled pair count,
chosen backend label, and the output of
:func:`estimate_affine_from_pairs`.
"""
import random as _random
from linumpy.io.zarr import read_omezarr
tile_shape_ref: tuple | None = None
all_pairs: list[dict] = []
per_slice_stats: list[dict] = []
for slice_id, zarr_path in volumes:
vol, _ = read_omezarr(zarr_path, level=0)
tile_shape = tuple(vol.chunks)
if len(tile_shape) != 3:
logger.warning("slice %s: unexpected chunks %s, skipping", slice_id, tile_shape)
continue
if tile_shape_ref is None:
tile_shape_ref = tile_shape
elif tile_shape[1:] != tile_shape_ref[1:]:
logger.warning(
"slice %s: tile shape %s differs from reference %s -- pooling across different "
"tile sizes is not supported. Skipping.",
slice_id,
tile_shape,
tile_shape_ref,
)
continue
nx = vol.shape[1] // tile_shape[1]
ny = vol.shape[2] // tile_shape[2]
if nx == 0 or ny == 0:
logger.warning("slice %s: too few tiles (nx=%d ny=%d), skipping", slice_id, nx, ny)
continue
z_mid_full = vol.shape[0] // 2
logger.info(
"slice %s: shape=%s tile=%s grid=%dx%d z_mid=%d (hist_match=%s empty_frac=%s use_gpu=%s)",
slice_id,
tuple(vol.shape),
tile_shape,
nx,
ny,
z_mid_full,
histogram_match,
max_empty_fraction,
use_gpu,
)
z_plane = np.asarray(vol[z_mid_full : z_mid_full + 1])
refinements = compute_registration_refinements(
z_plane,
tile_shape,
nx,
ny,
overlap_fraction,
histogram_match=histogram_match,
max_empty_fraction=max_empty_fraction,
use_gpu=use_gpu,
)
pairs = refinements["pairs"]
stats = dict(refinements["stats"])
stats["slice_id"] = slice_id
stats["nx"] = int(nx)
stats["ny"] = int(ny)
per_slice_stats.append(stats)
logger.info(
"slice %s: %d valid pairs collected (total=%d)",
slice_id,
stats["valid_pairs"],
stats["total_pairs"],
)
all_pairs.extend(pairs)
if tile_shape_ref is None:
raise ValueError("No usable mosaic grids produced pair measurements.")
total_pooled = len(all_pairs)
logger.info("pooled pair count: %d", total_pooled)
sampled = False
if n_samples is not None and total_pooled > n_samples:
rng = _random.Random(seed)
all_pairs = rng.sample(all_pairs, n_samples)
sampled = True
logger.info("random-sampled to %d pairs (seed=%d)", len(all_pairs), seed)
transform, fit_diag = estimate_affine_from_pairs(all_pairs, tile_shape_ref, overlap_fraction)
diagnostics: dict[str, Any] = {
"n_volumes": len(per_slice_stats),
"n_pairs_pooled_total": total_pooled,
"n_pairs_used": len(all_pairs),
"tile_shape": list(tile_shape_ref),
"overlap_fraction": overlap_fraction,
"histogram_match": bool(histogram_match),
"max_empty_fraction": max_empty_fraction,
"sampled_n": n_samples,
"seed": seed if sampled else None,
"backend": "gpu" if use_gpu else "cpu",
"transform": transform.tolist(),
"displacement_model": _extract_displacement_params(transform, tile_shape_ref, overlap_fraction),
"lstsq_residual": fit_diag.get("lstsq_residual"),
"fallback": fit_diag.get("fallback", False),
"per_slice_stats": per_slice_stats,
}
return transform, diagnostics
def _extract_displacement_params(transform: np.ndarray, tile_shape: tuple, overlap_fraction: float) -> dict:
"""Extract Lefebvre motor model parameters from a 2x2 affine transform.
Given the fitted transform ``A`` where ``(dy, dx) = A @ (row_delta, col_delta)``,
recover the scan-to-stage rotation θ, the motor-axis angle φ, and the
effective per-direction overlap fractions Ox, Oy.
Derivation (Lefebvre et al. 2017, Eqs. 1-6). In image coordinates
(y-down, x-right) the horizontal motor step (``col_delta = 1``) has
image displacement
(dy, dx) = (b, d) = nx·(1 - Ox)·(-sin θ, cos θ)
so that ``θ = arctan2(-b, d)`` and ``Ox = 1 - sqrt(b**2 + d**2) / nx``
with ``nx = tile_w``. The vertical motor step (``row_delta = 1``) has
(dy, dx) = (a, c) = ny·(1 - Oy)·(sin(φ - θ), cos(φ - θ))
so that ``φ - θ = arctan2(a, c)`` and ``Oy = 1 - sqrt(a**2 + c**2) / ny`` with
``ny = tile_h``. Perfectly perpendicular motors correspond to
``φ = 90°`` (not zero).
Parameters
----------
transform : np.ndarray
2x2 affine matrix fitted by :func:`estimate_affine_from_pairs`.
tile_shape : tuple
Tile dimensions (z, height, width).
overlap_fraction : float
Expected overlap fraction (for comparison).
Returns
-------
dict with 'theta_deg', 'phi_deg', 'Ox_fraction', 'Oy_fraction',
'off_diagonal_px'.
"""
a, b = transform[0, 0], transform[0, 1]
c, d = transform[1, 0], transform[1, 1]
tile_h, tile_w = tile_shape[1], tile_shape[2]
# θ: scan-to-stage rotation, from the horizontal motor step (b, d) (Eq. 3).
# tan(θ) = -b / d
theta_rad = np.arctan2(-b, d) if abs(d) > 1e-6 else 0.0
# φ - θ: from the vertical motor step (a, c) (Eq. 4).
# tan(φ - θ) = a / c (image-frame y-down convention folds the paper's
# negative-sine into the atan2 arguments).
phi_minus_theta = np.arctan2(a, c) if abs(c) > 1e-6 else np.pi / 2.0
phi_rad = phi_minus_theta + theta_rad
# Ox: overlap along the horizontal motor axis (Eq. 5).
horizontal_step = np.sqrt(b**2 + d**2)
ox_fraction = 1.0 - horizontal_step / tile_w
# Oy: overlap along the vertical motor axis (Eq. 6).
vertical_step = np.sqrt(a**2 + c**2)
oy_fraction = 1.0 - vertical_step / tile_h
return {
"theta_deg": float(np.degrees(theta_rad)),
"phi_deg": float(np.degrees(phi_rad)),
"Ox_fraction": float(ox_fraction),
"Oy_fraction": float(oy_fraction),
"expected_overlap": float(overlap_fraction),
"off_diagonal_px": [float(b), float(c)],
"transform": transform.tolist(),
}
[docs]
def compute_affine_positions(nx: int, ny: int, transform: np.ndarray) -> list[tuple[int, int]]:
"""Compute tile positions using a 2x2 affine displacement model.
This is the corrected version of :func:`compute_motor_positions` that
accounts for scan-to-stage rotation (θ) and non-perpendicular motor
axes (φ) via the off-diagonal terms in the transform matrix.
Parameters
----------
nx, ny : int
Number of tiles in each direction.
transform : np.ndarray
2x2 affine matrix mapping tile index (i, j) to pixel position
(row_px, col_px).
Returns
-------
positions : list of (int, int)
Pixel positions for each tile, row-major order.
"""
positions = []
for i in range(nx):
for j in range(ny):
pos = transform @ np.array([i, j], dtype=float)
positions.append((round(pos[0]), round(pos[1])))
return positions
[docs]
def compute_affine_output_shape(nx: int, ny: int, tile_shape: tuple, transform: np.ndarray) -> tuple[int, int, int]:
"""Compute the output mosaic shape from affine tile positions.
With off-diagonal terms, tiles may extend beyond what the diagonal model
predicts. This computes the bounding box over all tile corner positions.
Parameters
----------
nx, ny : int
Number of tiles in each direction.
tile_shape : tuple
Tile dimensions (z, height, width).
transform : np.ndarray
2x2 affine matrix.
Returns
-------
(nz, output_height, output_width) : tuple of int
"""
nz = tile_shape[0]
tile_h, tile_w = tile_shape[1], tile_shape[2]
# Check all four corner tiles
corners = [(0, 0), (nx - 1, 0), (0, ny - 1), (nx - 1, ny - 1)]
max_row, max_col = 0, 0
min_row, min_col = 0, 0
for i, j in corners:
pos = transform @ np.array([i, j], dtype=float)
# Tile occupies [pos[0], pos[0]+tile_h) x [pos[1], pos[1]+tile_w)
min_row = min(min_row, pos[0])
min_col = min(min_col, pos[1])
max_row = max(max_row, pos[0] + tile_h)
max_col = max(max_col, pos[1] + tile_w)
output_height = int(np.ceil(max_row - min_row))
output_width = int(np.ceil(max_col - min_col))
return (nz, output_height, output_width)
[docs]
def apply_blend_shift_refinement(tile: np.ndarray, refinements_for_tile: list) -> np.ndarray:
"""Apply registration refinement by shifting tile data in overlap regions.
Applies a small sub-pixel shift (averaged from all neighbors) to improve
blending quality without changing the tile's position in the mosaic.
Parameters
----------
tile : np.ndarray
3D tile data (Z, Y, X).
refinements_for_tile : list
List of dicts with 'dx', 'dy' refinements from neighbors.
Returns
-------
np.ndarray
Shifted tile (or unmodified if shift is negligible).
"""
from scipy.ndimage import shift as ndi_shift
if not refinements_for_tile:
return tile
total_dy = sum(ref.get("dy", 0) for ref in refinements_for_tile)
total_dx = sum(ref.get("dx", 0) for ref in refinements_for_tile)
count = len(refinements_for_tile)
avg_dy = total_dy / count / 2
avg_dx = total_dx / count / 2
if abs(avg_dy) < 0.1 and abs(avg_dx) < 0.1:
return tile
nonzero_vals = tile[tile > 0]
cval = float(np.percentile(nonzero_vals, 1)) if len(nonzero_vals) > 0 else 0.0
shifted = ndi_shift(tile, (0, avg_dy, avg_dx), order=1, mode="constant", cval=cval)
return shifted
[docs]
def compare_motor_vs_registration(
motor_positions: list | tuple, reg_positions: list | tuple, output_path: str | None = None
) -> dict:
"""Compare motor-based positions with registration-based positions.
Used diagnostically to identify stage calibration issues (systematic offset,
dilation/scaling) and registration drift.
Parameters
----------
motor_positions : list
List of (row, col) positions from motor grid.
reg_positions : list
List of (row, col) positions from image registration.
output_path : str or None
If provided, save comparison JSON to this path.
Returns
-------
dict
Statistics including mean/std/max differences and diagnostic flags.
"""
import json
motor_arr = np.array(motor_positions)
reg_arr = np.array(reg_positions)
diff = reg_arr - motor_arr
comparison: dict[str, Any] = {
"n_tiles": len(motor_positions),
"mean_diff_y": float(np.mean(diff[:, 0])),
"mean_diff_x": float(np.mean(diff[:, 1])),
"std_diff_y": float(np.std(diff[:, 0])),
"std_diff_x": float(np.std(diff[:, 1])),
"max_diff_y": float(np.max(np.abs(diff[:, 0]))),
"max_diff_x": float(np.max(np.abs(diff[:, 1]))),
"mean_magnitude": float(np.mean(np.sqrt(diff[:, 0] ** 2 + diff[:, 1] ** 2))),
"max_magnitude": float(np.max(np.sqrt(diff[:, 0] ** 2 + diff[:, 1] ** 2))),
}
if abs(comparison["mean_diff_y"]) > 5 or abs(comparison["mean_diff_x"]) > 5:
comparison["systematic_offset"] = True
comparison["offset_warning"] = (
f"Systematic offset detected: ({comparison['mean_diff_y']:.1f}, {comparison['mean_diff_x']:.1f}) pixels"
)
else:
comparison["systematic_offset"] = False
tile_indices = np.arange(len(motor_positions))
diff_magnitude = np.sqrt(diff[:, 0] ** 2 + diff[:, 1] ** 2)
if len(tile_indices) > 10:
correlation = np.corrcoef(tile_indices, diff_magnitude)[0, 1]
comparison["index_error_correlation"] = float(correlation)
if abs(correlation) > 0.5:
comparison["dilation_indicator"] = True
comparison["dilation_warning"] = (
f"Error increases with tile index (r={correlation:.2f}), suggesting dilation/scaling"
)
else:
comparison["dilation_indicator"] = False
if output_path:
with Path(output_path).open("w") as f:
json.dump(comparison, f, indent=2)
return comparison