Source code for linumpy.intensity.attenuation

"""Attenuation estimation and profile recovery models for OCT volumes."""

from linumpy.config.threads import worker_initializer

import itertools
import multiprocessing
from typing import Literal, overload

import numpy as np
import SimpleITK as sitk
from dipy.segment.mask import median_otsu
from scipy.interpolate import interp1d
from scipy.ndimage import (
    binary_fill_holes,
    gaussian_filter,
    median_filter,
    uniform_filter,
)
from scipy.optimize import minimize
from skimage.filters import threshold_li

from linumpy.cli.args import get_available_cpus
from linumpy.geometry.crop import mask_under_interface
from linumpy.geometry.interface import find_tissue_interface, get_interface_depth_from_mask
from linumpy.intensity.normalize import eqhist, normalize


[docs] def get_attenuation_vermeer2013( vol: np.ndarray, dz: float = 6.5e-6, mask: np.ndarray | None = None, C: np.ndarray | int | float | None = None ) -> np.ndarray: """Estimates the attenuation coefficient using the Vermeer2013 model. Parameters ---------- vol : ndarray 3D Reflectivity OCT data dz : float Axial resolution (in microns/pixel) mask : ndarray Tissue mask (optional). Every pixel above the mask will be attributed null attenuation, and pixel under the mask will be set to the last computed Aline attenuation. C: ndarray, int or float The bottom constant (DEV) Return ------ ndarray Estimated attenuation coefficient map (same size as vol) Notes ----- - This algorithm is inspired by an ultrasound attenuation compensation method. References ---------- - Vermeer et al. Depth-resolved model-based reconstruction of attenuation coefficients in optical coherence tomography. Biomed. Opt. Exp., vol5, no1, pp332-337, 2013 """ # Prepare the bottom constant (to better consider the finite Bscan dimension) if C is None: C = np.zeros(vol.shape) elif isinstance(C, (int, float)): C = np.ones(vol, dtype=float) * C elif C.ndim == 2: C = np.tile(np.reshape(C, (C.shape[0], C.shape[1], 1)), (1, 1, vol.shape[2])) # Use mask to ignore tissue layers above / under the mask if mask is None: mask = np.ones_like(vol, dtype=bool) # Compensation profile with depth for each A-line vol_p = np.ma.masked_array(vol, ~mask) profile = np.cumsum(vol_p[:, :, ::-1], axis=-1) + C profile = profile[:, :, ::-1] # Estimating the attenuation coefficient for each A-line mu = np.zeros_like(vol) mu[profile > 0] = vol[profile > 0] / profile[profile > 0] mu[profile > 0] = np.log(1 + mu[profile > 0]) / (2.0 * dz) # Convert attenuation in cm-1 mu /= 100.0 # Masking the result if mask is not None: interface = get_interface_depth_from_mask(mask) mask_above_interface = ~mask_under_interface(mask, interface, return_mask=True) mu[mask_above_interface] = 0 # Find bottom interface nx, ny, nz = mask.shape bottom_interface = (nz - get_interface_depth_from_mask(mask[:, :, ::-1]) - 1).astype(int) xx, yy = np.meshgrid(list(range(nx)), list(range(ny)), indexing="ij") bottom_mu = mu[xx, yy, bottom_interface] bottom_mu = np.tile(np.reshape(bottom_mu, (nx, ny, 1)), (1, 1, nz)) bottom_mask = mask_under_interface(mask, bottom_interface, return_mask=True) mu[bottom_mask] = bottom_mu[bottom_mask] return mu
[docs] def get_extended_attenuation_vermeer2013( vol: np.ndarray, mask: np.ndarray | None = None, k: int = 10, sigma: int = 5, sigma_bottom: int = 3, dz: int = 1, res: float = 6.5, zshift: int = 3, fill_holes: bool = False, ) -> np.ndarray: """Compute the local effective tissue attenuation using the extended Vermeer model. Parameters ---------- vol: ndarray OCT volume to process mask: ndarray Optional tissue mask. If none is given the water/tissue interface will be detected from the data. k: int Median filter kernel size (px) applied before the attenuation coefficient computation (applied in the XY direction). sigma : int Gaussian filter kernel size (px) applied axially before the exponential signal fit used to extend the Alines for the extended Vermeer signal evalution. sigma_bottom : int Gaussian filter kernel size (px) applied axially on the bottom slice signal before the extension fit. fill_holes : bool If True, fill holes in the tissue mask before computing attenuation. dz : int Number of axial pixel to consider when computing the bottom slice signal for the signal extension. res: float Axial resolution in micron / pixel zshift: int Number of pixel under the water-tissue interface to ignore while fitting the exponential function for signal extension. Returns ------- ndarray Computed attenuation coefficients. """ # First the slice is denoised with a small median filter if k > 0: vol = sitk.GetArrayFromImage(sitk.Median(sitk.GetImageFromArray(vol), (0, k, k))) # Computing tissue mask if mask is None: # Detecting the water / tissue interface interface = find_tissue_interface(vol, s_xy=3, s_z=1, order=1, use_log=True) mask = mask_under_interface(vol, interface + zshift, return_mask=True) # Lets fit an exponential function on each Aline to extend the tissue slice. exp_fit = get_gradient_attenuation(gaussian_filter(vol, (0, 0, sigma))) exp_fit = np.ma.masked_array(exp_fit, ~mask).mean(axis=2) # Fill holes left by NaN values exp_fit[np.isnan(exp_fit)] = 0 # Get the signal at the interface for each Aline interface_bottom = vol.shape[2] - get_interface_depth_from_mask(mask[:, :, ::-1]) - 1 - dz mask_bottom = mask_under_interface(vol, interface_bottom, return_mask=True) mask_bottom = (mask_bottom * mask).astype(bool) i0 = np.ma.masked_array(vol, ~mask_bottom).mean(axis=2) # Compute the end-of-scan bias epsilon = 1e-3 C = np.zeros_like(i0) C[exp_fit > epsilon] = i0[exp_fit > epsilon] / exp_fit[exp_fit > epsilon] C = gaussian_filter(C, sigma_bottom) # Compute the attenuation attn_cropped = get_attenuation_vermeer2013(vol, dz=res * 1e-06, mask=mask, C=C) # Remove NaN attn_cropped[np.isnan(attn_cropped)] = 0 # Only keep attn within the mask attn_cropped[~mask.astype(bool)] = 0 # Fill holes if fill_holes: attn_cropped = sitk.GetArrayFromImage(sitk.GrayscaleFillhole(sitk.GetImageFromArray(attn_cropped))) return attn_cropped
[docs] def get_attenuation_faber2004( vol: np.ndarray, mask: np.ndarray | None = None, dz: float = 6.5e-6, N: int = 4 ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Estimates the attenuation coefficient using the Faber2004 model. Parameters ---------- vol : ndarray 3D Reflectivity OCT data mask : ndarray Tissue mask to control which points to use in each A-lines. (if None, the whole A-line is used) dz : float Axial resolution (in microns/pixel) N : int Size of the XY uniform filter used to average A-lines together Return ------ ndarray Estimated attenuation coefficient map (computes a single mu_t per A-line) Notes ----- - This algorithm uses the confocal PSF and an single-scattering photon model. - Assumes a 4X objective setup for now. References ---------- - Faber et al. Quantitative measurement of attenuation coefficients of weakly scattering media using optical coherence tomography. Opt. Express 12, 4353-4365 (2004). """ # Average the A-line together in the XY plane if N > 0: vol = uniform_filter(vol, size=(N, N, 0)) # Computing the confocal psf parameters alpha = 2 # 1 : specular scattering; 2 : diffuse scattering n = 1.33 # Index of refraction w0 = 4.88e-6 # micron : Ligth beam width at the focal plane (need to validate) l0 = 1.030e-6 # micron : Ligth central wavelenth zr = np.pi * alpha * n * w0**2.0 / l0 # Apparent Rayleigh length (micron) # Defining the objective function for the minimization def f(x: np.ndarray, y: np.ndarray, z: np.ndarray) -> float: return np.sum((y - oct_signal_faber2004_model(z, mu_t=x[2], zR=x[1], z0=x[0])) ** 2.0) # Loop over all A-lines and computing attenuation attn = np.zeros((vol.shape[0], vol.shape[1])) r_length = np.zeros((vol.shape[0], vol.shape[1])) z_focus = np.zeros((vol.shape[0], vol.shape[1])) z = np.arange(0.0, dz * vol.shape[2], dz) for x in range(vol.shape[0]): for y in range(vol.shape[1]): mask_aline = mask[x, y, :] if mask is not None else np.ones((vol.shape[2],)).astype(bool) if np.any(mask_aline): p0 = [0.0, 100.0, 0.001] zp = np.where(mask_aline)[0][0] data = vol[x, y, :][mask_aline] data /= 1.0 * data.max() p_opt = minimize( f, p0, args=(data, z[zp::] - z[zp]), bounds=((None, None), (zr / 2.0, 2.0 * zr), (0.0, None)), ) if p_opt.success: attn[x, y] = p_opt.x[2] r_length[x, y] = p_opt.x[1] z_focus[x, y] = p_opt.x[0] + z[zp] else: print(("No convergence for (x,y)=", x, y)) return attn, r_length, z_focus
# Modele du signal utilisant la PSF confocale et single-scattering photons
[docs] def oct_signal_faber2004_model(z: np.ndarray, mu_t: float = 1.0, zR: float = 200.0, z0: float = 100.0) -> np.ndarray: """Model the oct signal using a single-scattered photons and the confocal PSF. Parameters ---------- z : (N,) ndarray Depth Position along an A-line at which the signal must be computed (in micron) mu_t : float or (N,) ndarray Attenuation coefficient (either a single value for the whole column or N values for each position) zR : float Apparent Rayleigh length (in micron) z0 : float Focal plane depth (in micron) Returns ------- ndarray OCT backscattering signal estimated at each z positions Notes ----- - This is based on the single-scattering photon model from Faber2004. References ---------- - https://www.osapublishing.org/oe/abstract.cfm?uri=oe-12-19-4353 """ psf = 1.0 / (((z - z0) / float(zR)) ** 2.0 + 1.0) iz = np.sqrt(psf * np.exp(-2 * mu_t * z)) # Should I fit the sqrt or not return iz
def _aline_fit(data: np.ndarray) -> float: """Aline fit to extract the attenuation coefficient.""" # Defining the attenuation model (biexponential) with x=[A, mu_t], y=data, z=depths def f_attn(x: np.ndarray, y: np.ndarray, z: np.ndarray) -> float: return np.sum((y - x[0] * np.exp(-2 * x[1] * z)) ** 2.0) z = np.linspace(0, len(data), len(data)) aline = np.array(data) p0 = [1.0, 0.001] # Initial condition popt = minimize(f_attn, p0, args=(aline, z), bounds=((0, None), (0, None))) return popt.x[1]
[docs] def split_aline(data: np.ndarray, mask: np.ndarray) -> tuple[list, list]: """Split an A-line into contiguous masked segments.""" data_list = [] z_list = [] this_aline = [] this_z = [] for elem, m, z in zip(data, mask, list(range(len(data))), strict=False): if m: this_aline.append(elem) this_z.append(z) else: if len(this_aline) > 0: data_list.append(this_aline) this_aline = [] z_list.append(this_z) this_z = [] if len(this_aline) > 0: data_list.append(this_aline) z_list.append(this_z) return data_list, z_list
def _split_alines_worker(param: tuple) -> tuple[list, list]: return split_aline(param[0], param[1])
[docs] def get_aline_attenuation(vol: np.ndarray, k: int = 1, mask: np.ndarray | None = None) -> np.ndarray: """Compute the attenuation coefficient for each A-lines. Parameters ---------- vol : ndarray Volume to analyze of size NxMxL k : int Number of points to compute mask : ndarray Tissue mask to limit the region where the fit is done. Returns ------- ndarray Estimated attenuation of size NxMxk """ # Computing the shape of this volume nx, ny, nz = vol.shape z = np.arange(nz) z_list = np.array_split(z, k) attn_vol = np.zeros((nx, ny, k)) for z, ik in zip(z_list, list(range(k)), strict=False): # Selecting a subsample this_vol = vol[:, :, z[0] : z[-1]] this_mask = mask[:, :, z[0] : z[-1]] if mask is not None else None # Transforming this volume into a list a_lines = np.split(this_vol.flatten(), nx * ny) if mask is not None and this_mask is not None: mask_a_lines = np.split(this_mask.flatten(), nx * ny) for A, M, ii in zip(a_lines, mask_a_lines, list(range(nx * ny)), strict=False): a_lines[ii] = A[M] # Process each Alines in parallel nproc = get_available_cpus() p = multiprocessing.Pool(nproc, initializer=worker_initializer) result = p.map(_aline_fit, a_lines) p.close() p.join() # Reshaping the output attn_vol[:, :, ik] = np.reshape(result, (nx, ny)) # Removing background attn_vol[vol.mean(axis=2) == 0] = 0 return np.squeeze(attn_vol)
[docs] def get_gradient_attenuation( vol: np.ndarray, mask: np.ndarray | None = None, return_mask: bool = False, low_thresh: float = 0.0, fill_holes: bool = False, sz: int = 3, sxy: int = 0, res: float = 1.0, ) -> np.ndarray | tuple[np.ndarray, np.ndarray]: """Compute the attenuation coefficient using the log-gradient method.""" vol_l = np.copy(vol) vol_l[vol > 0] = np.log(vol[vol > 0]) attn = -0.5 * np.gradient(vol_l, res, axis=2) # Removing 0 values and masked values attn[vol == 0] = 0 if mask is not None: attn[~mask] = 0 # Removing negative attenuation values mask_attn = attn < 0 attn[mask_attn] = 0 # Removing also small attenuation values small_mask = attn < np.percentile(attn[attn > 0], low_thresh) mask_attn[small_mask] = True attn[mask_attn] = 0 # Filling holes using morphological reconstruction. if fill_holes: vol_itk = sitk.GetImageFromArray(attn) cfilter = sitk.GrayscaleMorphologicalClosingImageFilter() cfilter.SetKernelType(sitk.sitkBall) cfilter.SetKernelRadius((sz, sxy, sxy)) output_itk = cfilter.Execute(vol_itk) attn = sitk.GetArrayFromImage(output_itk) if return_mask: return attn, mask_attn else: return attn
[docs] def get_interface_mask( vol: np.ndarray, s: int = 0, mask_tissue: bool = True, mask_water_tissue_interface: bool = True ) -> np.ndarray: """Compute a mask excluding tissue interfaces and boundaries.""" nx, ny, nz = vol.shape mask = np.ones_like(vol).astype(bool) # Get tissue mask if mask_tissue: tissue_mask = binary_fill_holes(median_otsu(eqhist(vol.mean(axis=2)), median_radius=5.0)[1]) tissue_mask = np.tile(np.reshape(tissue_mask, (nx, ny, 1)), (1, 1, nz)) mask *= tissue_mask # Get water/tissue 3D interface mask if mask_water_tissue_interface: # Computing the gradient vol_f = median_filter(vol, 5) gradient = np.gradient(vol_f) gm = gradient[0] ** 2.0 + gradient[1] ** 2.0 + gradient[2] ** 2.0 gm = normalize(gm, high_thresh=99.5) # Thresholding the gradient thresh = threshold_li(gm) interfaces_g = gm >= thresh # Converting this into a water/tissue interface depth depths = np.zeros((nx, ny)) for x, y in itertools.product(list(range(nx)), list(range(ny))): idx = np.where(interfaces_g[x, y, :]) if len(idx[0]) > 0: depths[x, y] = idx[0][0] water_tissue_mask = mask_under_interface(vol, depths, return_mask=True) mask *= water_tissue_mask # Smoothing the volume if s > 0: vol = gaussian_filter(vol, sigma=(s, s, s)) # Get tissue interface boundaries using a Canny filter vol_itk = sitk.GetImageFromArray(vol) canny_filter = sitk.CannyEdgeDetectionImageFilter() edges = sitk.GetArrayFromImage(canny_filter.Execute(vol_itk)).astype(bool) mask *= ~edges return mask
[docs] def find_interface_from_gradient(vol: np.ndarray, f: float = 0.005, remove_smooth: bool = False) -> np.ndarray: """Find the tissue-water interface depth map using gradient magnitude.""" nx, ny = vol.shape[0:2] k = int(np.round(f * 0.5 * (nx + ny))) # Computing the gradient vol_f = gaussian_filter(vol, k) gradient = np.gradient(vol_f) gm = gradient[0] ** 2.0 + gradient[1] ** 2.0 + gradient[2] ** 2.0 gm = normalize(gm, high_thresh=99.5) # Thresholding the gradient thresh = threshold_li(gm) interfaces_g = gm >= thresh # Converting this into a water/tissue interface depth depths = np.argmax(interfaces_g, axis=2) if remove_smooth: depths += k + 1 return depths
[docs] def get_heterogeneous_attenuation( vol: np.ndarray, mask: np.ndarray | None = None, fill_holes: bool = False ) -> np.ndarray: # TODO: adapt multiproc to available proc given by mpi4py """Compute heterogeneous attenuation coefficients for each A-line segment.""" nx, ny, nz = vol.shape nproc = get_available_cpus() if mask is None: # Compute the mask mask = get_interface_mask(vol) # Split the volume into Alines and Alines portions. print(f"Splitting volume into Alines portions (using {nproc} processors)") a_lines = np.split(vol.flatten(), nx * ny) a_lines_mask = np.split(mask.flatten(), nx * ny) a_lines_to_split = list(zip(a_lines, a_lines_mask, strict=False)) n_alines = len(a_lines) # Process each Alines in parallel p = multiprocessing.Pool(nproc, initializer=worker_initializer) result = p.map(_split_alines_worker, a_lines_to_split) p.close() p.join() # Debug print(("Number of Alines : ", len(a_lines_to_split))) p_count = 0 for portion in result: p_count += len(portion[0]) print(("Number of Alines portions : ", p_count)) # Compute the attenuation for each aline portions print(f"Computing attenuation for each Aline portion (using {nproc} processors)") aline_portions = [] z_portions = [] portion_idx = [] for foo, idx in zip(result, list(range(n_alines)), strict=False): aline_portions.extend(foo[0]) z_portions.extend(foo[1]) portion_idx.extend([idx] * len(foo[0])) p = multiprocessing.Pool(nproc, initializer=worker_initializer) result = p.map(_aline_fit, aline_portions) p.close() p.join() # Reshape attenuation as an Aline list # TODO: Parallelize this loop. print("Reshape attenuation as an Aline list") aline_attn = [np.zeros((nz,)) for i in range(n_alines)] for idx, z, mu in zip(portion_idx, z_portions, result, strict=False): aline_attn[idx][z] = mu # Combine local attenuations into a single volume attn_vol = np.reshape(aline_attn, (nx, ny, nz)) # Fill attenuation holes (using 1d-linear interpolation) # TODO: User inverve distance weighted interpolation instead ? if fill_holes: print("Filling attenuation holes using 1D-linear interpolation") attn_fill = np.zeros_like(attn_vol) for x in range(nx): for y in range(ny): idx = attn_vol[x, y, :] > 0 this_z = np.array(np.where(idx)[0]) this_mu = attn_vol[x, y, :] this_mu = this_mu[idx] if np.sum(idx) == 1: attn_fill[x, y, :] = attn_vol[x, y, :] elif np.sum(idx) > 1: f = interp1d(this_z, this_mu, kind="linear", bounds_error=False, fill_value=0) new_z = np.arange(nz) new_mu = f(new_z) attn_fill[x, y, :] = new_mu # Debug, should replace the original volume return attn_fill else: return attn_vol
@overload
[docs] def get_flat_agarose_profile(vol: np.ndarray, return_mask_and_profile: Literal[False] = ...) -> np.ndarray: ...
@overload def get_flat_agarose_profile( vol: np.ndarray, return_mask_and_profile: Literal[True] ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: ... def get_flat_agarose_profile( vol: np.ndarray, return_mask_and_profile: bool = False ) -> np.ndarray | tuple[np.ndarray, np.ndarray, np.ndarray]: """Estimate and remove the flat agarose intensity profile from a volume.""" nx, ny, nz = vol.shape # Get agarose mask for this slice and its intensity profile tissue_mask = binary_fill_holes(median_otsu(eqhist(vol.mean(axis=2)), median_radius=5.0)[1]) tissue_mask = np.tile(np.reshape(tissue_mask, (nx, ny, 1)), (1, 1, nz)) mask = (~tissue_mask).astype(bool) # Computing intensity profile for the agarose Alines i_profile = np.zeros((nz,)) n_alines = np.sum(mask.max(axis=2)) for x in range(nx): for y in range(ny): this_mask = mask[x, y, :] a_line = vol[x, y, :] if np.any(this_mask): i_profile += a_line / float(n_alines) # Correction profile agarose_norm = np.tile(np.reshape(i_profile, (1, 1, nz)), (nx, ny, 1)) vol_p = vol / agarose_norm.astype(float) if return_mask_and_profile: return vol_p, mask, i_profile else: return vol_p
[docs] def get_signal_from_attenuation( attn: np.ndarray, i0: np.ndarray | None = None, nz: int = 120, mask: np.ndarray | None = None, res: float = 1.0 ) -> np.ndarray: """Estimate the signal from the 2D A-Line attenuation map. Parameters ---------- attn : ndarray 2D attenuation coefficient map for each A-line i0 : ndarray 2D map of the top slice signal (optional) nz : int Number of Aline points mask : ndarray Data mask (to limit where the oct signal is simulated) res : float Axial resolution in micron / pixel Returns ------- ndarray Estimated OCT signal """ nx, ny = attn.shape attn_vol = np.zeros((nx, ny, nz)) def f_attn(x: float, z: np.ndarray) -> np.ndarray: return np.exp(-2 * x * z) for ix, iy in itertools.product(list(range(nx)), list(range(ny))): this_mask = mask[ix, iy, :].astype(bool) if mask is not None else np.ones((nz,), dtype=bool) z0 = np.where(this_mask) z0 = z0[0][0] if len(z0[0]) > 0 else 0 A = i0[ix, iy] if i0 is not None else 1 if np.any(this_mask): this_mu = attn[ix, iy] z = np.linspace(0, nz * res, nz) - z0 * res sim_profile = A * f_attn(this_mu, z) attn_vol[ix, iy, this_mask] = sim_profile[this_mask] return attn_vol