"""Confocal PSF model fitting and volume-based PSF estimation."""
import contextlib
from typing import Any
import numpy as np
from scipy.ndimage import (
binary_erosion,
gaussian_filter,
gaussian_filter1d,
)
from scipy.optimize import curve_fit, minimize
from sklearn import linear_model
from linumpy.mosaic.overlap import get_overlap
[docs]
def get_average_volume(data: Any, z: int, mask: np.ndarray | None = None, s: int = 0) -> np.ndarray:
"""Compute an average volume for a specific slice.
Parameters
----------
data : Any
The dataset for which the average volume is computed.
z : int
Slice number.
mask : ndarray, optional
Mask specifying which volume contributes to the computation.
s : int, optional
Smoothing kernel size in pixel, by default 0 (no smoothing).
Returns
-------
ndarray
Average volume.
"""
average_vol = np.zeros(data.volshape, dtype=data.format)
if mask is not None:
n_vol = mask[:, :, z - 1].sum()
else:
nx, ny, nz = data.gridshape[:]
n_vol = nx * ny
mask = np.ones((nx, ny, nz))
# Loop over volumes
for vol in data.slice_iterator(z, mask=mask):
if s > 0: # Smoothing volume in xy
vol = gaussian_filter(vol, sigma=(s, s, 0))
average_vol += vol / (1.0 * n_vol)
return average_vol
[docs]
def find_focal_depth(vol: np.ndarray) -> int:
"""Detect the focal plane depth in a volume.
Parameters
----------
vol : ndarray
Volume in which the focal plane is detected
Returns
-------
int
The focal plane depth
"""
# Averaging intensity slice-by-slice
intensity_profile = np.mean(np.mean(vol, axis=0), axis=0)
# Focal plane depth
fz = np.argmax(intensity_profile)
return int(fz)
[docs]
def i_profile_piece_wise_model(
z: np.ndarray, I0: float, Imax: float, z0: float, zf: float, s: float, mu: float, k: float
) -> np.ndarray:
"""Evaluate the piece-wise OCT intensity profile model."""
i_profile = np.zeros(z.shape)
z1 = z <= z0
z2 = (z > z0) * (z <= zf)
z3 = z > zf
# Water above tissue
i_profile[z1] = I0 * np.exp(-k * z[z1])
# Tissue -> Focal plane area
i_profile[z2] = Imax * np.exp(-((z[z2] - zf) ** 2) / s**2)
# Attenuation area
i_profile[z3] = Imax * np.exp(-mu * (z[z3] - zf))
return i_profile
[docs]
def glm_volume_normalization(vol: np.ndarray, average_vol: np.ndarray) -> np.ndarray:
"""Volume intensity normalization using GLM fit.
Parameters
----------
vol : ndarray
Volume to normalize
average_vol : ndarray
Average volume representing the background or objective profile without tissue
Returns
-------
ndarray
Normalized volume
"""
nx, ny, nz = vol.shape
vol_p = np.zeros_like(vol)
# Preparing the input variables
X = np.zeros((nx * ny * nz, 4))
xx, yy, zz = np.meshgrid(list(range(nx)), list(range(ny)), list(range(nz)), indexing="ij")
X[:, 0] = np.reshape(xx, (nx * ny * nz,))
X[:, 1] = np.reshape(yy, (nx * ny * nz,))
X[:, 2] = np.reshape(zz, (nx * ny * nz,))
X[:, 3] = np.reshape(average_vol, (nx * ny * nz,))
y = np.reshape(vol, (nx * ny * nz,))
regr = linear_model.BayesianRidge()
regr.fit(X, y)
meanv_p = np.reshape(regr.predict(X), (nx, ny, nz))
vol_p = vol / meanv_p
return vol_p
[docs]
def volume_normalization(vol: np.ndarray, average_vol: np.ndarray, epsilon: float = 0.05) -> np.ndarray:
"""Volume intensity normalization.
Parameters
----------
vol : ndarray
Volume to normalize
average_vol : ndarray
Average volume representing the background or objective profile without tissue
epsilon: float (optional, 0 < epsilon < 1)
Small constant to prevent zero-division
Returns
-------
ndarray
Normalized volume.
"""
# Adjust volume intensity to be in range [0,1]
meanv = average_vol
# Apply normalization (division)
vol = vol / (meanv + epsilon) * meanv.mean() #
return vol
# Defining the radiometric transformation function
[docs]
def T_r(p: np.ndarray, x: int | np.ndarray, y: int | np.ndarray) -> np.ndarray:
"""Radiometric transformation function.
Parameters
----------
p : tuple
Radiometic function parameters to evaluate. (Should be a (6,) tuple)
x : int or ndarray
X position where the function is evaluated (int or result of meshgrid)
y : int or ndarray
Y position where the function is evaluated (int or result of meshgrid)
Returns
-------
int or ndarray
Function evaluated at given positoin
"""
return p[0] * x**2 + p[1] * x * y + p[2] * y**2 + p[3] * x + p[4] * y + p[5]
# Defining objective function f_r
[docs]
def f_r(x: np.ndarray, data: Any, z: int, pos: np.ndarray, i_mean: float) -> float:
"""Global radiometric objective function.
Parameters
----------
x : tuple
Radiometic function parameters to evaluate. (Should be a (6,) tuple)
data : (slicecode.utils.FileUtils.data object)
Data object used to iterate over volumes or images.
z : int
Slice number over which the function is evaluated.
pos : ndarray
Volume absolute positions computed with the shift_oct module
i_mean : float
Average volume / image intensity across all tiles.
Returns
-------
float
Objective function evaluation for parameters x.
Notes
-----
This method is an implementation of [Sun2006](http://onlinelibrary.wiley.com/doi/10.1111/j.1365-2818.2006.01687.x/full)
"""
f = 0 # Initial value
# Transform function
nx, ny = data.volshape[:2]
xx, yy = np.meshgrid(list(range(nx)), list(range(ny)), indexing="ij")
T = T_r(x, xx, yy)
# Loop over overlaps
for vol1, vol2, pos1, pos2 in data.neighbor_slice_iterator(z, return_pos=True):
# Getting overlap regions
real_pos1 = pos[pos1[0] - 1, pos1[1] - 1, :]
real_pos2 = pos[pos2[0] - 1, pos2[1] - 1, :]
ov1, ov2, pov1, pov2 = get_overlap(vol1, vol2, real_pos1, real_pos2)
# Getting AIP of overlap regions
im1 = np.squeeze(ov1.mean(axis=2))
im2 = np.squeeze(ov2.mean(axis=2))
# Evaluation T for overlap regions
T_im1 = T[pov1[0] : pov1[2], pov1[1] : pov1[3]]
T_im2 = T[pov2[0] : pov2[2], pov2[1] : pov2[3]]
# Updating function evaluation
f += np.sum(np.abs(im1 * T_im1 - im2 * T_im2))
# Loop over tiles
for vol in data.slice_iterator(z):
im = np.squeeze(vol.mean(axis=2))
f += np.sum(np.abs(im * T - i_mean))
return f
[docs]
def confocal_psf(z: np.ndarray, zf: float, zR: float, A: float | None = None) -> np.ndarray:
"""Confocal PSF model using a gaussian beam.
Parameters
----------
z : ndarray
Depths at which the psf is evaluated
zf : float
Focal plane depth
zR : float
Rayleigh Length
A : float
Normalization constant
Return
------
ndarray
PSF evaluated at each z locations
"""
psf = 1.0 / (((z - zf) / float(zR)) ** 2.0 + 1.0)
if A is not None:
psf *= A
return psf
[docs]
def get_slice_resolutions_from_psf(
zf: float,
zr: float,
nz: int = 120,
spacing: tuple[float, float, float] = (6.5, 6.5, 6.5),
N: int = 512,
lam: float = 1.030,
) -> np.ndarray:
"""Compute the lateral resolution at each depth using the confocal PSF model."""
res = np.zeros((nz,))
z = np.linspace(0, nz * spacing[2], nz)
w0 = np.sqrt(zr * lam / np.pi)
wz = w0 * np.sqrt(1 + ((z - zf) / float(zr)) ** 2.0)
for ii in range(nz):
zmin = -wz[ii] * 4
zmax = wz[ii] * 4
x = np.linspace(zmin, zmax, N)
edge = x < 0
psf = np.exp(-(x**2.0) / wz[ii] ** 2.0)
edge = np.convolve(edge, psf, mode="same")
edge /= edge.max()
x90 = x[np.where(edge > 0.90)[0][-1]]
x10 = x[np.where(edge < 0.1)[0][0]]
this_res = x10 - x90
res[ii] = this_res
return res
[docs]
def estimate_psf(
agarose: np.ndarray,
interface: np.ndarray | None = None,
dz: float = 6.5,
remove_saturation: bool = False,
mask_interface: bool = False,
zf: float | None = None,
fit_attn: bool = False,
) -> tuple[float, ...]:
"""Estimates the confocal PSF assuming a gaussian beam.
Parameters
----------
agarose : ndarray
Agarose volume
interface : ndarray
Water-Tissue interface map
dz : float
Z spacing in micron
remove_saturation : bool
If True, remove saturated pixels before fitting.
mask_interface : bool
If True, mask the tissue interface region before fitting.
zf : float, optional
Fixed focal plane depth. If None, it is estimated.
fit_attn : bool
If True, also fit the attenuation parameter.
Returns
-------
float
Focal plane depth
float
Rayleigh length
float
Normalization constant
Notes
-----
* If no interface is given, the whole volume is used for the psf regression
"""
i_profile = np.copy(agarose) if agarose.ndim == 1 else agarose.mean(axis=(0, 1))
nz = len(i_profile)
z = np.linspace(0, len(i_profile) * dz, len(i_profile))
# Only fit the tissue intensity (if interface is given)
if interface is not None:
z0 = np.median(interface)
i_profile = i_profile[z0::]
z = z[z0::]
else:
z0 = 0
if mask_interface:
m = gaussian_filter1d(i_profile, sigma=1, order=1)
m = m < 0
m = binary_erosion(m, iterations=1)
i_profile = i_profile[m]
z = z[m]
# Remove outliers in the tissue
if remove_saturation:
i_diff = np.diff(i_profile)
i_diff = np.insert(i_diff, 0, 0)
z_diff = np.copy(z)
med = np.median(i_diff)
MAD = np.median(np.abs(i_diff - med))
if MAD != 0:
Zscore = np.abs(0.6745 * (i_diff - med) / MAD)
outliers = Zscore > 3.5
else:
outliers = np.zeros_like(i_diff)
i_profile = i_profile[~outliers]
z = z_diff[~outliers]
# Normalization
i_profile /= i_profile.max()
# Fitting model
if zf is None:
if fit_attn:
def fo_psf(x: np.ndarray, y: np.ndarray, z: np.ndarray) -> float:
return np.sum((y - confocal_psf(z, x[0], x[1], x[2]) * np.exp(-2 * x[3] * (z - z[0]))) ** 2.0)
else:
def fo_psf(x: np.ndarray, y: np.ndarray, z: np.ndarray) -> float: # type: ignore[misc]
return np.sum((y - confocal_psf(z, x[0], x[1], x[2])) ** 2.0)
# 1st fit of the model
zf = 0.5 * (nz * dz)
zR = 250.0
A = 1.0
p0 = [zf, zR, A] # Initial parameters
param_bounds = [(0.0, zf * dz), (0.0, None), (0.0, 1.0)]
if fit_attn:
p0.append(0.0)
param_bounds.append((0.0, None))
popt_1 = minimize(fo_psf, p0, args=(i_profile, z), bounds=param_bounds)
# Detect outliers
if fit_attn:
I_err = (
i_profile - confocal_psf(z, popt_1.x[0], popt_1.x[1], popt_1.x[2]) * np.exp(-2 * popt_1.x[3] * (z - z[0]))
) ** 2.0
else:
I_err = (i_profile - confocal_psf(z, popt_1.x[0], popt_1.x[1], popt_1.x[2])) ** 2.0
err_med = np.median(I_err)
err_MAD = np.median(np.abs(I_err - err_med))
if err_MAD != 0:
err_Zscore = np.abs(0.6745 * (I_err - err_med) / err_MAD)
err_outliers = err_Zscore > 3.5
else:
err_outliers = np.zeros_like(I_err)
# 2nd fit without outliers
p0 = popt_1.x
popt_2 = minimize(
fo_psf,
p0,
args=(np.array(i_profile)[~err_outliers], np.array(z)[~err_outliers]),
bounds=param_bounds,
)
if fit_attn:
return popt_2.x[0], popt_2.x[1], popt_2.x[2], popt_2.x[3]
else:
return popt_2.x[0], popt_2.x[1], popt_2.x[2]
else:
def fo_psf(x: np.ndarray, y: np.ndarray, z: np.ndarray, zf: float) -> float: # type: ignore[misc]
return np.sum((y - confocal_psf(z, zf, x[0], x[1])) ** 2.0)
# 1st fit of the model
zR = 250.0
A = 1.0
p0 = [zR, A] # Initial parameters
popt_1 = minimize(fo_psf, p0, args=(i_profile, z, zf), bounds=((0.0, None), (0.0, 1.0)))
# Detect outliers
I_err = (i_profile - confocal_psf(z, zf, popt_1.x[0], popt_1.x[1])) ** 2.0
err_med = np.median(I_err)
err_MAD = np.median(np.abs(I_err - err_med))
if err_MAD != 0:
err_Zscore = np.abs(0.6745 * (I_err - err_med) / err_MAD)
err_outliers = err_Zscore > 3.5
else:
err_outliers = np.zeros_like(I_err)
# 2nd fit without outliers
p0 = popt_1.x
popt_2 = minimize(
fo_psf,
p0,
args=(np.array(i_profile)[~err_outliers], np.array(z)[~err_outliers], zf),
bounds=((0.0, None), (0.0, 1.0)),
)
return zf, popt_2.x[0], popt_2.x[1]
[docs]
def get_3d_psf(
vol: np.ndarray,
interface: np.ndarray,
res: float = 6.5,
use_average_rayleigh: bool = False,
remove_interface: bool = True,
zf: np.ndarray | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Compute a 3D PSF from a given uniform volume (e.g. agarose).
Parameters
----------
vol : ndarray
Agarose / Background volume to use for the PSF computation
interface : ndarray
Water/Tissue interface depth map (in pixel)
res : float
Z axis resolution (in micron / pixel)
use_average_rayleigh : bool
Use average Rayleigh length instead of the Rayleigh map.
remove_interface : bool
If True, mask the tissue interface before computing the PSF.
zf : ndarray, optional
Fixed focal depth map. If None, it is estimated from data.
Returns
-------
ndarray
PSF
ndarray
Focal depth map
ndarray
Rayleigh length map
"""
nx, ny, nz = vol.shape
zf_map = np.zeros((nx, ny))
zr_map = np.zeros((nx, ny))
print("Computing PSF parameters for each aline")
for ix in range(nx):
for iy in range(ny):
a_line = np.array(vol[ix, iy, :])
a_line_interface = np.squeeze(interface[ix, iy])
try:
if zf is None:
params = estimate_psf(
a_line,
interface=a_line_interface,
dz=res,
mask_interface=remove_interface,
)
else:
params = estimate_psf(
a_line,
interface=a_line_interface,
dz=res,
mask_interface=remove_interface,
zf=zf[ix, iy],
)
zf_map[ix, iy] = params[0]
zr_map[ix, iy] = params[1]
except Exception:
pass
# Smoothing the zf and zr maps
print("Smoothing the zf and zr maps")
if zf is None:
zf_map = gaussian_filter(zf_map, (nx * 0.1, ny * 0.1))
zr_map = gaussian_filter(zr_map, (nx * 0.1, ny * 0.1))
# Fit parabola on zf_map
def f(x: np.ndarray, a: float, b: float, c: float, d: float, e: float, f: float) -> np.ndarray:
return a * x[0] * x[1] + b * x[0] ** 2 + c * x[1] ** 2 + d + e * x[0] + f * x[1]
xx, yy = np.meshgrid(list(range(nx)), list(range(ny)), indexing="ij")
xdata = (np.ravel(yy), np.ravel(xx))
ydata = np.ravel(zf_map)
popt, _ = curve_fit(f, xdata, ydata)
a, b, c, d, e, f = popt
print(popt)
zf_map = np.reshape(
a * xdata[0] * xdata[1] + b * xdata[0] ** 2.0 + c * xdata[1] ** 2.0 + d + e * xdata[0] + f * xdata[1],
(nx, ny),
)
# Computing this PSF
print("Creating the 3D PSF")
z = np.linspace(0, nz * res, nz)
psf = np.zeros_like(vol)
zr_ave = np.mean(zr_map)
for ix in range(nx):
for iy in range(ny):
if use_average_rayleigh:
psf[ix, iy, :] = confocal_psf(z, zf_map[ix, iy], zr_ave, 1.0)
else:
psf[ix, iy, :] = confocal_psf(z, zf_map[ix, iy], zr_map[ix, iy], 1.0)
return psf, zf_map, zr_map
[docs]
def fit_tissue_confocal_model(
iprofile: np.ndarray,
z0: int,
zr_0: float = 400.0,
res: float = 6.5,
use_bump_model: bool = False,
return_parameters: bool = False,
return_full_model: bool = False,
plot_profiles: bool = False,
fix_zr: bool = False,
) -> dict:
"""Fit a confocal tissue intensity profile model to depth data."""
nz = len(iprofile)
z = np.linspace(0, nz * res, nz)
# Removing water DC background
dc = np.min(iprofile[0:z0])
this_profile = iprofile - dc
# Normalizing the intensity profile
imax = this_profile.max()
this_profile = this_profile / float(imax)
# Fit an initial tissue model (sigmoid)
def tissue_model(x: np.ndarray | list[float], z: np.ndarray) -> np.ndarray:
c, z0, a = x[:]
signal = a / (1 + np.exp(-c * (z - z0) / float(z[-1] - z[0]))).astype(float)
return signal
def fo_signal(x: np.ndarray, y: np.ndarray, z: np.ndarray) -> float:
return np.sqrt(np.sum((y - tissue_model(x, z)) ** 2.0) / float(y.size))
p0 = [50.0, z0 * res, 1.0] # c, z0, a
popt_tissue = minimize(fo_signal, x0=p0, args=(this_profile, z))
c, z0, a = popt_tissue.x[:]
syn_tissue = tissue_model([c, z0, 1.0], z)
new_z0 = popt_tissue.x[1]
# Fit a static PSF (fixed zr, moving zf)
def confocal_model(x: np.ndarray | list[float], z: np.ndarray) -> np.ndarray:
zf, zr, a = x[:]
return confocal_psf(z, zf, zr, a)
def fo_PSF(x: np.ndarray, y: np.ndarray, z: np.ndarray, tissue: np.ndarray) -> float:
return np.sqrt(np.sum((y - tissue * confocal_model(x, z)) ** 2.0) / float(y.size))
p0 = [z[-1] * 0.5, zr_0, 1.0] # zf, zr, a
param_bounds: list[tuple[float | None, float | None]] = [(z[0], z[-1]), (zr_0, zr_0), (0.0, None)]
popt_firstpsf = minimize(fo_PSF, x0=p0, args=(this_profile, z, syn_tissue), bounds=param_bounds)
zf, zr = popt_firstpsf.x[0:2]
psf1 = confocal_model([zf, zr, 1.0], z)
# Detect the specular reflection bump at the water / tissue interface
if use_bump_model:
def bump_model(signal: np.ndarray, w: float, b: float) -> np.ndarray:
t_grad = -gaussian_filter1d(signal, w, order=2)
t_grad[t_grad < 0] = 0
if t_grad.max() > 0:
with contextlib.suppress(BaseException):
t_grad /= float(t_grad.max())
bump = b * t_grad
return bump
def bump_tissue_model(x: np.ndarray, z: np.ndarray) -> np.ndarray:
z0, c, w, a, b = x[:]
signal = tissue_model([c, z0, a], z)
return signal + bump_model(signal, w, b)
def fo_btm(x: np.ndarray, y: np.ndarray, z: np.ndarray, psf: np.ndarray) -> float:
return np.sqrt(np.sum((y - psf * bump_tissue_model(x, z)) ** 2.0) / float(y.size))
p0 = [new_z0, 60, 5, 1.0, 0.5]
param_bounds = [(z[0], z[-1]), (0, 100), (1.0, 10), (0, None), (0, None)]
popt_btm = minimize(fo_btm, x0=p0, args=(this_profile, z, psf1), bounds=param_bounds)
z0, c, w, a, b = popt_btm.x[:]
bump_tissue = bump_tissue_model(popt_btm.x, z)
tissue = tissue_model([c, z0, a], z)
bump = bump_model(tissue, w, b)
bump_mask = (bump - bump[-1]) <= 0.1 * (bump.max() - bump[-1])
bump_mask[z <= new_z0] = 0
limit = np.where(bump_mask)[0][0] * res
this_mask = z >= limit
# Optimize the PSF model (zf, zr)
# Normalize the signal to fit the tissue model.
def normalize_profile(signal: np.ndarray, psf: np.ndarray) -> np.ndarray:
normalized_signal = signal / psf
return normalized_signal
def fo_psf_normalized(x: np.ndarray, y: np.ndarray, z: np.ndarray, tissue: np.ndarray) -> float:
return np.sqrt(np.sum((tissue - normalize_profile(y, confocal_model(x, z))) ** 2.0) / float(y.size))
p0 = popt_firstpsf.x # zf, zr, a
if fix_zr:
zr_0 = p0[1]
param_bounds = [(z[0], z[-1]), (zr_0, zr_0), (0.0, None)]
else:
param_bounds = [(z[0], z[-1]), (0, None), (0.0, None)]
popt_psf = minimize(
fo_psf_normalized,
x0=p0,
args=(this_profile[this_mask], z[this_mask], bump_tissue[this_mask]),
bounds=param_bounds,
)
psf_final = confocal_model(popt_psf.x, z)
if plot_profiles:
import matplotlib.pyplot as plt
plt.figure(figsize=(8, 5))
plt.plot(
z,
iprofile,
marker="o",
linestyle="dashed",
alpha=2,
label="Original Data",
)
plt.plot(
z,
imax * psf_final * bump_tissue + dc,
label="Tissue model",
linewidth=2,
color="r",
)
plt.plot(
z,
imax * psf_final / psf_final.max() + dc,
linewidth=2,
linestyle="dashed",
color="k",
label="Confocal PSF",
)
plt.legend(loc="best", shadow=True)
plt.grid(True)
plt.show()
output: dict[str, Any] = {"psf": psf_final}
if return_full_model:
output["tissue"] = imax * bump_tissue + dc
output["tissue_psf"] = imax * psf_final * bump_tissue + dc
if return_parameters:
output["parameters"] = {
"zf": popt_psf.x[0],
"zr": popt_psf.x[1],
"a": popt_psf.x[2],
}
output["parameters"]["z0"] = popt_btm.x[0]
output["parameters"]["c"] = popt_btm.x[1]
output["parameters"]["w"] = popt_btm.x[2]
output["parameters"]["b1"] = popt_btm.x[3]
output["parameters"]["b2"] = popt_btm.x[4]
else:
if plot_profiles:
import matplotlib.pyplot as plt
plt.figure(figsize=(8, 5))
plt.plot(
z,
iprofile,
marker="o",
linestyle="dashed",
alpha=0.6,
label="Original Data",
)
plt.plot(
z,
imax * psf1 * syn_tissue * popt_firstpsf.x[2] + dc,
label="Tissue model",
linewidth=2,
color="r",
)
plt.plot(
z,
imax * psf1 / psf1.max() + dc,
linewidth=2,
linestyle="dashed",
color="k",
label="Confocal PSF",
)
plt.plot(
z,
(iprofile - dc) * psf1.max() / psf1 + dc,
marker="o",
linewidth=2,
linestyle="dashed",
color="k",
label="Compensated Data",
)
plt.legend(loc="best", shadow=True)
plt.grid(True)
plt.show()
output: dict[str, Any] = {"psf": psf1}
if return_full_model:
output["tissue"] = imax * syn_tissue + dc
output["tissue_psf"] = imax * psf1 * syn_tissue + dc
if return_parameters:
output["parameters"] = {
"zf": popt_firstpsf.x[0],
"zr": popt_firstpsf.x[1],
"a": popt_firstpsf.x[2],
}
output["parameters"]["z0"] = popt_tissue.x[0]
output["parameters"]["c"] = popt_tissue.x[1]
output["parameters"]["w"] = popt_tissue.x[2]
return output