from __future__ import annotations
import sys
from dataclasses import dataclass
from logging import debug, error, info, warning
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import scipy
from astropy.io import fits
from PIL import Image as PILImage
from micropolarray.cameras import Camera, PolarCam
from micropolarray.image import Image
from micropolarray.polarization_functions import AoLP, DoLP, pB
from micropolarray.processing.chen_wan_liang_calibration import _ifov_jitcorrect
from micropolarray.processing.congrid import congrid
from micropolarray.processing.demodulation import Demodulator
from micropolarray.processing.demosaic import (
demosaic,
merge_polarizations,
split_polarizations,
)
from micropolarray.processing.nrgf import roi_from_polar
from micropolarray.processing.rebin import micropolarray_rebin
from micropolarray.processing.shift import shift_micropol
from micropolarray.utils import (
_make_abs_and_create_dir,
fix_data,
mean_minus_std,
mean_plus_std,
timer,
)
[docs]@dataclass
class PolParam:
"""Auxiliary class for polarization parameters.
Members:
ID (str): parameter identifier
data (np.array): parameter image as numpy 2D array
title (str): brief title of the parameter, useful for plotting
measure_unit (str): initial measure units of the parameter
fix_data (bool): controls whether data has to be constrained to [0, 4096] interval (not implemented yet)
"""
ID: str
data: np.ndarray
title: str
measure_unit: str
fix_data: bool = False
DEFAULT_ANGLES_DIC = None # sets the micropolarizer orientations with a dictionary {angle : position in superpix 1->3}
[docs]def set_default_angles(angles_dic: dict):
"""Sets the default micropolarizer orientations for images.
Args:
angles_dic (dict): dictionary {value : pos} where value is the angle in degrees from -90 to 90 and pos is the pixel position in superpixel, from 0 to 3 (position [y, x], fast index x)
"""
global DEFAULT_ANGLES_DIC
DEFAULT_ANGLES_DIC = angles_dic
[docs]class MicropolImage(Image):
"""Micro-polarizer array image class. Can be initialized from a 2d array, a list of 1 or more file names (use the boolean keyword averageimages to select if sum or average is taken) or another MicropolImage. Dark and flat micropolarray images can also be provided to automatically correct the result."""
first_call = True # Avoid repeating messages
def __init__(
self,
initializer: str | np.ndarray | list | MicropolImage,
angle_dic: dict = None,
dark: MicropolImage = None,
flat: MicropolImage = None,
averageimages: bool = True,
):
self._initialize_internal_variables()
if angle_dic is None:
global DEFAULT_ANGLES_DIC
if DEFAULT_ANGLES_DIC is None:
if MicropolImage.first_call:
warning(
f"Micropolarizer orientation dictionary defaults to {PolarCam().angle_dic}, set it via set_default_angles(camera)\n"
)
MicropolImage.first_call = False
DEFAULT_ANGLES_DIC = PolarCam().angle_dic
angle_dic = DEFAULT_ANGLES_DIC
self.angle_dic = angle_dic
if type(initializer) is list and len(initializer) > 1:
self._num_of_images = len(initializer)
else:
self._num_of_images = 1
super().__init__(
initializer=initializer, averageimages=averageimages
) # Call generic Image() constructor
if type(initializer) is MicropolImage:
self._import_image_parameters(initializer)
else:
self.Stokes_vec = self._get_theo_Stokes_vec_components()
# Apply corrections if needed
if dark is not None:
self.subtract_dark(dark=dark)
if flat is not None:
self.correct_flat(flat=flat)
def _initialize_internal_variables(self):
self._data = None
self._is_demodulated = False
self._is_demosaiced = False
self._binning = 1
self._flat_subtracted = False
self._dark_subtracted = False
self.demosaiced_images = None
def _import_image_parameters(self, image: MicropolImage):
self.data = image.data
self.header = image.header
self.angle_dic = image.angle_dic
self.Stokes_vec = image.Stokes_vec
self._is_demodulated = image._is_demodulated
self._is_demosaiced = image._is_demosaiced
self._binning = image._binning
self._dark_subtracted = image._dark_subtracted
self._flat_subtracted = image._flat_subtracted
self.demosaiced_images = image.demosaiced_images
# ----------------------------------------------------------------
# -------------------- POLARIZATION PROPERTIES -------------------
# ----------------------------------------------------------------
@property
def I(self) -> PolParam:
return PolParam("I", self.Stokes_vec[0], "Stokes I", "DN", fix_data=False)
@property
def Q(self) -> PolParam:
return PolParam("Q", self.Stokes_vec[1], "Stokes Q", "DN", fix_data=False)
@property
def U(self) -> PolParam:
return PolParam("U", self.Stokes_vec[2], "Stokes U", "DN", fix_data=False)
@property
def pB(self) -> PolParam:
return PolParam(
"pB",
pB(self.Stokes_vec),
"Polarized brightness",
"DN",
fix_data=False,
)
@property
def AoLP(self) -> PolParam:
return PolParam(
"AoLP",
AoLP(self.Stokes_vec),
"Angle of Linear Polarization",
"rad",
fix_data=False,
)
@property
def DoLP(self) -> PolParam:
return PolParam(
"DoLP",
DoLP(self.Stokes_vec),
"Degree of Linear Polarization",
"",
fix_data=False,
)
@property
def polparam_list(self) -> list:
return [self.I, self.Q, self.U, self.pB, self.AoLP, self.DoLP]
@property
def single_pol_subimages(self):
return split_polarizations(self.data)
@property
def pol0(self) -> PolParam:
return PolParam(
"0",
self.single_pol_subimages[self.angle_dic[0]],
"0 deg orientation pixels",
"DN",
fix_data=False,
)
@property
def pol45(self) -> PolParam:
return PolParam(
"45",
self.single_pol_subimages[self.angle_dic[45]],
"45 deg orientation pixels",
"DN",
fix_data=False,
)
@property
def pol_45(self) -> PolParam:
return PolParam(
"-45",
self.single_pol_subimages[self.angle_dic[-45]],
"-45 deg orientation pixels",
"DN",
fix_data=False,
)
@property
def pol90(self) -> PolParam:
return PolParam(
"90",
self.single_pol_subimages[self.angle_dic[90]],
"90 deg orientation pixels",
"DN",
fix_data=False,
)
# ----------------------------------------------------------------
# ---------------------- STOKES COMPONENTS -----------------------
# ----------------------------------update_dim------------------------------
def _update_data_and_Stokes(self, newdata: np.ndarray = None):
if newdata is not None:
self.data = newdata
self.Stokes_vec = self._get_theo_Stokes_vec_components()
[docs] def demodulate(
self,
demodulator: Demodulator,
demosaicing: bool = False,
) -> MicropolImage:
"""Returns a MicropolImage with polarization parameters calculated from the demodulation tensor provided.
Args:
demodulator (Demodulator): Demodulator object containing the demodulation tensor components (see processing.new_demodulation)
demosaicing (bool, optional): wether to apply demosaicing to the image or not. Set it to False if demodulation matrices have half the dimension of the image. Defaults to True.
Raises:
ValueError: raised if image and demodulator do not have the same dimension, for example in case of different binning
Returns:
MicropolImage: copy of the input imagreturn e with I, Q, U, pB, DoLP, AoLP calculated from the demodulation tensor.
"""
info("Demodulating...")
demodulated_image = MicropolImage(self)
demodulated_image.Stokes_vec = demodulated_image._get_Stokes_from_demodulator(
demodulator, demosaicing
)
demodulated_image._is_demodulated = True
info("Image correctly demodulated")
return demodulated_image
def _get_theo_Stokes_vec_components(self) -> np.array:
"""
Computes stokes vector components from four polarized images at four angles, angle_dic describes the coupling between
poled_images_array[i] <--> angle_dic[i]
Return:
stokes vector, shape=(3, poled_images.shape[1], poled_images.shape[0])
"""
if self._is_demosaiced:
subimages = self.demosaiced_images
else:
subimages = self.single_pol_subimages
I = 0.5 * np.sum(subimages, axis=0)
Q = subimages[self.angle_dic[0]] - subimages[self.angle_dic[90]]
U = subimages[self.angle_dic[45]] - subimages[self.angle_dic[-45]]
S = np.array([I, Q, U], dtype=float)
return S
def _get_Stokes_from_demodulator(
self,
demodulator: Demodulator,
demosaicing: bool,
) -> np.array:
"""
Computes stokes vector components from four polarized images at four angles, angle_dic describes the coupling between
poled_images_array[i] <--> angle_dic[i]. Calculates:
I = M_00 * I_1 + M_01 * I_2 + M_02 * I_3 + M_03 * I_4
Q = M_10 * I_1 + M_11 * I_2 + M_12 * I_3 + M_13 * I_4
U = M_20 * I_1 + M_21 * I_2 + M_22 * I_3 + M_23 * I_4
Return:
stokes vector, shape=(3, poled_images.shape[1], poled_images.shape[0])
"""
# Corrected with demodulation matrices, S.shape = (4, n, m)
num_of_malus_parameters = 3 # 3 multiplication params
pixels_in_superpix = 4
fit_found_flags = demodulator.fit_found_flags
if demosaicing:
self.demosaic()
splitted_pols = self.demosaiced_images
mij = np.ones(
shape=(
demodulator.mij.shape[0],
demodulator.mij.shape[1],
demodulator.mij.shape[2] * 2,
demodulator.mij.shape[3] * 2,
),
dtype=float,
)
for i in range(num_of_malus_parameters):
for j in range(pixels_in_superpix):
demo_component = demodulator.mij[i, j]
mij[i, j, :, :] = merge_polarizations(
np.repeat(demo_component[np.newaxis, :, :], 4, axis=0)
) # repeat necessary to avoid numba problems
fit_found_flags = merge_polarizations(
np.array(4 * [demodulator.fit_found_flags.astype(float)])
) # idk why but fit_found_flags is >f8 as default
else:
mij = demodulator.mij
splitted_pols = self.single_pol_subimages
"""
print(mij.shape)
print(splitted_pols.shape)
pixel_values = np.array(
[splitted_pol for splitted_pol in splitted_pols],
dtype=float,
)
if (mij[0, 0].shape[0] != pixel_values[0].shape[0]) or (
mij[0, 0].shape[1] != pixel_values[0].shape[1]
):
raise ValueError(
f"demodulation matrix {mij[0,0].shape} and images {pixel_values[0].shape} have different shapes. Check that binning is correct."
) # sanity check
T_ij = np.zeros(
shape=(
num_of_malus_parameters,
pixels_in_superpix,
*splitted_pols[0].shape,
)
)
for i in range(num_of_malus_parameters):
for j in range(pixels_in_superpix):
temp_tij = np.multiply(
mij[i, j, :, :], pixel_values[j, :, :]
) # Matrix product
T_ij[i, j, :, :] = temp_tij
I = T_ij[0, 0] + T_ij[0, 1] + T_ij[0, 2] + T_ij[0, 3]
Q = T_ij[1, 0] + T_ij[1, 1] + T_ij[1, 2] + T_ij[1, 3]
U = T_ij[2, 0] + T_ij[2, 1] + T_ij[2, 2] + T_ij[2, 3]
"""
I, Q, U = np.matmul(
mij,
np.expand_dims(splitted_pols, axis=0),
axes=[(-4, -3), (-3, -4), (-4, -3)],
)[
:, 0
] # expand dims needed for matmul, output [3, 1, y, x]
S = np.array([I, Q, U], dtype=float)
# use theo stokes where fit wasn't found
if demodulator.fit_found_flags is not None:
theo_S = self.Stokes_vec
S = np.where(fit_found_flags == 1.0, S, theo_S)
return S
[docs] def subtract_dark(self, dark: MicropolImage) -> MicropolImage:
"""Correctly subtracts the input dark image from the image
Args:
dark (MicropolImage): dark to subtract
Returns:
MicropolImage: copy of input image with dark subtracted
"""
self.data = self.data - dark.data
self.data = np.where(self.data >= 0, self.data, 0) # Fix data
self.Stokes_vec = self._get_theo_Stokes_vec_components()
self._dark_subtracted = True
return self
[docs] def correct_flat(self, flat: MicropolImage) -> MicropolImage:
"""Normalizes the flat and uses it to correStokes_vecct the image.
Args:
flat (MicropolImage): flat image, does not need to be normalized.
Returns:
MicropolImage: copy of input image corrected by flat
"""
normalized_flat = flat.data / np.mean(flat.data)
self.data = np.divide(
self.data,
normalized_flat,
where=normalized_flat != 0.0,
)
# self.data = np.where(self.data >= 0, self.data, 0)
# self.data = np.where(self.data < 4096, self.data, 4096)
self.Stokes_vec = self._get_theo_Stokes_vec_components()
self._flat_subtracted = True
return self
[docs] def correct_ifov(self) -> MicropolImage:
"""Corrects differences in single pixels fields of view inside each superpixel
Returns:
MicropolImage: image with data corrected for field of view differences
"""
corrected_data = self.data.copy()
corrected_data = _ifov_jitcorrect(self.data, self.height, self.width)
self._update_data_and_Stokes(corrected_data)
return self
# ----------------------------------------------------------------
# ------------------------------ SHOW ----------------------------
# ----------------------------------------------------------------
[docs] def show_with_pol_params(self, cmap="Greys_r") -> tuple:
"""Returns a tuple containing figure and axis of the plotted
data, and figure and axis of polarization parameters (3x2
subplots). User must callplt.show after this is called.
Args:
cmap (str, optional): colormap string. Defaults to "Greys_r".
Returns:
tuple: a (figure, axis, figure, axis) couple same as
matplotlib.pyplot.subplots for the image data and another for
the six polarization parameters
"""
data_ratio = self.data.shape[0] / self.data.shape[1]
image_fig, imageax = plt.subplots(dpi=200, constrained_layout=True)
avg = np.mean(self.data)
stdev = np.std(self.data)
mappable = imageax.imshow(
self.data,
cmap=cmap,
vmin=avg - stdev,
vmax=avg + stdev,
)
if avg < 1.0e5:
format = "3.2f"
else:
format = ".1e"
imageax.set_title(
f"Image data (avrg {avg:{format}}+-{stdev:{format}})",
color="black",
)
imageax.set_xlabel("x [px]")
imageax.set_ylabel("y [px]")
image_fig.colorbar(
mappable, ax=imageax, label="[DN]", fraction=data_ratio * 0.05
)
stokes_fig, stokesax = self.show_pol_params(cmap=cmap)
return image_fig, imageax, stokes_fig, stokesax
[docs] def show_pol_params(self, cmap="Greys_r", figsize=None, **kwargs) -> tuple:
"""Returns a tuple containing figure and axis of polarization parameters (3x2 subplots). User must call plt.show after this is called.
Args:
cmap (str, optional): colormap string. Defaults to "Greys_r".
Returns:
tuple: a (figure, axis) couple same as
matplotlib.pyplot.subplots for the six polarization parameters
"""
data_ratio = self.data.shape[0] / self.data.shape[1]
if figsize is None:
figsize = (14, 6)
stokes_fig, stokesax = plt.subplots(
2, 3, figsize=figsize, constrained_layout=True, **kwargs
)
stokesax = stokesax.ravel()
for parameter, axis in zip(self.polparam_list, stokesax):
avg = np.mean(parameter.data)
stdev = np.std(parameter.data)
mappable_stokes = axis.imshow(
parameter.data,
cmap=cmap,
vmin=avg - stdev,
vmax=avg + stdev,
)
if avg < 1.0e5:
format = "3.2f"
else:
format = ".1e"
axis.set_title(
parameter.title + f" (avrg {avg:{format}}+-{stdev:{format}})",
color="black",
)
axis.set_xlabel("x [px]")
axis.set_ylabel("y [px]")
stokes_fig.colorbar(
mappable_stokes,
ax=axis,
label=parameter.measure_unit,
fraction=data_ratio * 0.05,
)
return stokes_fig, stokesax
[docs] def show_single_pol_images(self, cmap="Greys_r", **kwargs):
"""Plots the four polarizations images.
Args:
cmap (str, optional): colormap for the plot. Defaults to "Greys_r".
**kwargs: arguments passed to matplotlib.pyplot.imshow.
Returns:
tuple: a (figure, axis) couple same as matplotlib.pyplot.subplots
"""
data_ratio = self.data.shape[0] / self.data.shape[1]
fig, ax = plt.subplots(2, 2, figsize=(12, 9), constrained_layout=True)
ax = ax.ravel()
polslist = [self.pol0, self.pol45, self.pol90, self.pol_45]
for pol, axis in zip(polslist, ax):
mappable = axis.imshow(pol.data, cmap=cmap, **kwargs)
axis.set_title(pol.title)
axis.set_xlabel("x [px]")
axis.set_ylabel("y [px]")
fig.colorbar(
mappable,
ax=axis,
label=pol.measure_unit,
fraction=data_ratio * 0.05,
pad=0.01,
)
return fig, ax
[docs] def show_demo_images(self, cmap="Greys_r", vmin=None, vmax=None, **kwargs):
"""Plots the four demosaiced images.
Args:
cmap (str, optional): colormap for the plot. Defaults to "Greys_r".
**kwargs: arguments passed to matplotlib.pyplot.imshow.
Returns:
tuple: a (figure, axis) couple same as matplotlib.pyplot.subplots
"""
if not self._is_demosaiced:
error("Image is not yet demosaiced.")
data_ratio = self.data.shape[0] / self.data.shape[1]
fig, ax = plt.subplots(2, 2, figsize=(12, 9), constrained_layout=True)
ax = ax.ravel()
demo_images_list = self.demosaiced_images
for i, single_demo_ax in enumerate(ax):
if vmin is None:
this_vmin = mean_minus_std(demo_images_list[i])
if vmax is None:
this_vmax = mean_plus_std(demo_images_list[i])
mappable = single_demo_ax.imshow(
demo_images_list[i],
cmap=cmap,
vmin=this_vmin,
vmax=this_vmax,
**kwargs,
)
single_demo_ax.set_title(
f"Demosaiced image {list(self.angle_dic.keys())[list(self.angle_dic.values()).index(i)]}"
)
single_demo_ax.set_xlabel("x [px]")
single_demo_ax.set_ylabel("y [px]")
fig.colorbar(
mappable,
ax=single_demo_ax,
label="DN",
fraction=data_ratio * 0.05,
pad=0.01,
)
return fig, ax
[docs] def show_pol_param(
self, polparam: str, cmap="Greys_r", vmin=None, vmax=None, **kwargs
):
"""Plots a single polarization parameter given as input
Args:
polparam (str): image PolParam containing the parameter to plot. Can be one among [I, Q, U, pB, AoLP, DoLP]
cmap (str, optional): colormap for the plot. Defaults to "Greys_r".
**kwargs: arguments passed to matplotlib.pyplot.imshow.
Returns:
tuple: a (figure, axis) couple same as matplotlib.pyplot.subplots
"""
polparam = getattr(self, polparam)
data_ratio = self.data.shape[0] / self.data.shape[1]
fig, ax = plt.subplots(dpi=200)
if vmin is None:
vmin = mean_minus_std(polparam.data)
if vmax is None:
vmax = mean_plus_std(polparam.data)
mappable = ax.imshow(
polparam.data,
cmap=cmap,
vmin=vmin,
vmax=vmax,
**kwargs,
)
ax.set_title(polparam.title)
ax.set_xlabel("x [px]")
ax.set_ylabel("y [px]")
fig.colorbar(
mappable,
ax=ax,
label=polparam.measure_unit,
fraction=data_ratio * 0.05,
)
return fig, ax
[docs] def show_histogram(self, split_pols: bool = True, **kwargs):
"""Print the histogram of the flattened image data
Args:
split_pols (bool, optional): Whether to overplot histograms of same family pixels separately. Defaults to False.
**kwargs (int, optional): arguments to pass to numpy.histogram(), like bins and range.
Returns:
tuple: fig, ax tuple as returned by matplotlib.pyplot.subplots
"""
fig, ax = super().show_histogram(**kwargs)
if split_pols:
for i, single_pol_subimage in enumerate(self.single_pol_subimages):
subhist = np.histogram(single_pol_subimage, **kwargs)
ax.stairs(*subhist, label=f"pixel {i}")
return fig, ax
# ----------------------------------------------------------------
# -------------------------- SAVING ------------------------------
# ----------------------------------------------------------------
[docs] def save_single_pol_images(
self, filename: str, fixto: list[float, float] = None
) -> None:
"""Saves the four polarized images as fits files
Args:
filename (str): filename of the output image. The four images will be saved as filename_POLXX.fits
fixto (list[float, float], optional): set the minimum and maximum value for the output images. Defaults to None.
Raises:
ValueError: an invalid file name is provided
"""
polslist = [self.pol0, self.pol45, self.pol90, self.pol_45]
filepath = Path(_make_abs_and_create_dir(filename))
if filepath.suffix != ".fits":
raise ValueError("filename must be a valid file name, not folder.")
group_filepath = filepath.joinpath(filepath.parent, filepath.stem)
for single_pol in polslist:
hdr = self.header.copy()
hdr["POL"] = (single_pol.ID, "Micropolarizer orientation")
if fixto:
data = fix_data(single_pol.data, *fixto)
else:
data = single_pol.data
hdu = fits.PrimaryHDU(
data=data,
header=hdr,
do_not_scale_image_data=True,
uint=False,
)
filename_with_ID = str(
group_filepath.joinpath(
str(group_filepath) + "POL" + str(single_pol.ID) + ".fits"
)
)
hdu.writeto(filename_with_ID, overwrite=True)
info(f'All params successfully saved to "{filename}"')
[docs] def save_param_as_fits(
self,
polparam: str,
filename: str,
fixto: list[float, float] = None,
) -> None:
"""Saves chosen polarization parameter as a fits file
Args:
polparam (str): polarization parameter to save. Can be one among [I, Q, U, pB, AoLP, DoLP]
filename (str): filename of the output image.
fixto (list[float, float], optional): set the minimum and maximum value for the output images. Defaults to None.
Raises:
ValueError: filename is not a valid .fits file
"""
polparam = getattr(self, polparam)
filepath = Path(_make_abs_and_create_dir(filename))
if filepath.suffix != ".fits":
raise ValueError("filename must be a valid file name, not folder.")
hdr = self.header.copy()
hdr["PARAM"] = (str(polparam.title), "Polarization parameter")
hdr["UNITS"] = (str(polparam.measure_unit), "Measure units")
if fixto:
data = fix_data(polparam.data, *fixto)
else:
data = polparam.data
hdu = fits.PrimaryHDU(
data=data,
header=hdr,
do_not_scale_image_data=True,
uint=False,
)
filename_with_ID = str(
filepath.joinpath(
filepath.parent, filepath.stem + "_" + polparam.ID + ".fits"
)
)
# filename = _make_abs_and_create_dir(filename)
# filename_with_ID = (
# filename.split(".")[-2] + "_" + polparam.ID + ".fits"
# )
hdu.writeto(filename_with_ID, overwrite=True)
info(f'"{filename_with_ID}" {polparam.ID} successfully saved')
[docs] def save_all_pol_params_as_fits(self, filename: str) -> None:
"""Saves the image and all polarization parameters as fits file with the same name
Args:
filename (str): filename of the output image. Will be saved as filename_[I, Q, U, pB, AoLP, DoLP].fits
Raises:
ValueError: filename is not a valid .fits file
"""
filepath = Path(filename)
if filepath.suffix != ".fits":
raise ValueError("filename must be a valid file name, not folder.")
filepath = Path(_make_abs_and_create_dir(filename))
group_filename = str(filepath.joinpath(filepath.parent, filepath.stem))
for param in self.polparam_list:
hdr = self.header.copy()
hdr["PARAM"] = (str(param.title), "Polarization parameter")
hdr["UNITS"] = (str(param.measure_unit), "Measure units")
if param.fix_data:
data = fix_data(param.data)
else:
data = param.data
hdu = fits.PrimaryHDU(
data=data,
header=hdr,
do_not_scale_image_data=True,
uint=False,
)
filename_with_ID = group_filename + "_" + param.ID + ".fits"
hdu.writeto(filename_with_ID, overwrite=True)
info(f'All params successfully saved to "{group_filename}"')
[docs] def save_demosaiced_images_as_fits(
self, filename: str, fixto: list[float, float] = None
) -> None:
"""Saves the four demosaiced images as fits files
Args:
filename (str): filename of the output image. The four images will be saved as filename_POLXX.fits
fixto (list[float, float], optional): set the minimum and maximum value for the output images. Defaults to None.
Raises:
ValueError: an invalid file name is provided
"""
if not self._is_demosaiced:
raise ValueError("Demosaiced images not yet calculated.")
imageHdr = self.header.copy()
filepath = Path(filename)
if not filepath.suffix:
raise ValueError("filename must be a valid file name, not folder.")
filepath = Path(_make_abs_and_create_dir(filename))
group_filename = str(filepath.joinpath(filepath.parent, filepath.stem))
for i, demo_image in enumerate(self.demosaiced_images):
POL_ID = list(self.angle_dic.keys())[list(self.angle_dic.values()).index(i)]
imageHdr["POL"] = (int(POL_ID), "Micropolarizer orientation")
if fixto:
data = fix_data(demo_image, *fixto)
else:
data = demo_image
hdu = fits.PrimaryHDU(
data=data,
header=imageHdr,
do_not_scale_image_data=True,
uint=False,
)
new_filename = group_filename + "_POL" + str(POL_ID) + ".fits"
hdu.writeto(new_filename, overwrite=True)
info(f'Demosaiced images successfully saved to "{group_filename}_POLX.fits"')
# ----------------------------------------------------------------
# -------------------- DATA MANIPULATION -------------------------
# ----------------------------------------------------------------
[docs] def demosaic(self, demosaic_mode="adjacent") -> MicropolImage:
"""Returns a demosaiced copy of the image with updated polarization parameters. Demoisacing is done IN PLACE and
using the THEORETICAL MATRIX. If demodulation and demosaicing are required, please use demodulate(demosaic=True)
Args:
demosaic_mode (str, optional): demosaicing mode (see processing.demosaic). Defaults to "adjacent".
Returns:
MicropolImage: demosaiced image
"""
self.demosaiced_images = demosaic(self.data, option=demosaic_mode)
self._is_demosaiced = True
self.Stokes_vec = self._get_theo_Stokes_vec_components()
return self
[docs] def rebin(self, binning: int) -> MicropolImage:
"""Rebins the micropolarizer array image, binned each
binningxbinning. Sum bins by default.
Args:
binning (int): binning to perform. A value of n will be translated in a nxn binning.
Raises:
ValueError: negative binning provided
Returns:
MicropolImage: copy of the input image, rebinned.
"""
if binning <= 0:
raise ValueError(f"Negative binning {binning}x{binning}")
rebinned_image = MicropolImage(self)
rebinned_data = micropolarray_rebin(
np.array(rebinned_image.data, dtype=float),
binning,
)
"""
new_stokes = []
for i in range(3):
new_stokes.append(
standard_rebin(
np.array(self.Stokes_vec[i], dtype=float),
binning,
)
)
rebinned_image.Stokes_vec = np.array(new_stokes, dtype=float)
rebinned_image.data = rebinned_data
"""
rebinned_image._update_data_and_Stokes(rebinned_data)
return rebinned_image
[docs] def congrid(self, newdim_y: int, newdim_x: int) -> MicropolImage:
"""Reshapes a MicropolImage into any new lenght and width. This is done separately for each pixel family.
Args:
newdim_y (int): new height
newdim_x (int): new width
Returns:
MicropolImage: image with reshaped data.
"""
# Trim to nearest superpixel
if (newdim_y % 2) or (newdim_x % 2):
while newdim_y % 2:
newdim_y = newdim_y - 1
while newdim_x % 2:
newdim_x = newdim_x - 1
warning(
f"New dimension was incompatible with superpixels. Trimmed to ({newdim_y}, {newdim_x})"
)
new_subdims = [int(newdim_y / 2), int(newdim_x / 2)]
congridded_pol_images = np.zeros(shape=(4, *new_subdims), dtype=float)
for subimage_i, pol_subimage in enumerate(self.single_pol_subimages):
congridded_pol_images[subimage_i] = congrid(pol_subimage, new_subdims)
newdata = merge_polarizations(congridded_pol_images)
newimage = MicropolImage(self)
newimage.data = newdata
newimage.Stokes_vec = [
congrid(stokes_component, [newdim_y, newdim_x])
for stokes_component in self.Stokes_vec
]
return newimage
[docs] def rotate(self, angle: float) -> MicropolImage:
"""Rotates an image of angle degrees, counter-clockwise."""
single_pols = self.single_pol_subimages
for i in range(4):
image = PILImage.fromarray(single_pols[i])
image = image.rotate(angle)
single_pols[i] = np.asarray(image, dtype=float)
data = merge_polarizations(single_pols)
Stokes_vec = self.Stokes_vec
for i in range(3):
image = PILImage.fromarray(Stokes_vec[i])
image = image.rotate(angle)
Stokes_vec[i] = np.asarray(image, dtype=float)
newimage = MicropolImage(self)
newimage.data = data
newimage.Stokes_vec = Stokes_vec
return newimage
[docs] def mask_occulter(
self,
y: int = PolarCam().occulter_pos_last[0],
x: int = PolarCam().occulter_pos_last[1],
r: int = PolarCam().occulter_pos_last[2],
overoccult: int = 0,
) -> None:
"""Masks occulter for all image parameters
Args:
y (int, optional): Occulter y position. Defaults to PolarCam().occulter_pos_last[0].
x (int, optional): Occulter x position. Defaults to PolarCam().occulter_pos_last[1].
r (int, optional): Occulter radius. Defaults to PolarCam().occulter_pos_last[2].
overoccult (int, optional): Pixels to overoccult. Defaults to 0.
camera (_type_, optional): Camera image type. Defaults to PolarCam().
Returns:
None
"""
# y, x, r = camera.occulter_pos_last
r = r + overoccult
self.data = roi_from_polar(
self.data,
(y, x),
[r, 2 * np.max([self.height, self.width])],
)
if self._is_demosaiced:
self.demosaiced_images = [
roi_from_polar(
data,
(y, x),
(r, 2 * np.max([self.height, self.width])),
)
for data in self.demosaiced_images
]
if not self._is_demosaiced:
y = int(y / 2)
x = int(x / 2)
r = int(r / 2)
for i in range(3):
self.Stokes_vec[i] = roi_from_polar(
self.Stokes_vec[i],
(y, x),
[r, 2 * np.max(self.Stokes_vec[i].shape)],
include_superpixels=False,
)
[docs] def shift(self, y: int, x: int) -> MicropolImage:
"""Shifts image by y, x pixels and fills with 0 the remaining space. Positive numbers for up/right shift and negative for down/left shift. Image is split into polarizations, each one is shifted, then they are merged again.
Args:
y (int): vertical shift in pix
x (int): horizontal shift in pix
Returns:
MicropolImage: shifted image copied from the original
"""
# newdata = shift(self.data, y, x)
newdata = shift_micropol(self.data, y, x)
newimage = MicropolImage(self)
newimage._update_data_and_Stokes(newdata)
return newimage
[docs] def clean_hot_pixels(self, flagged_hot_pix_map: MicropolImage):
"""Returns a copy of the image with gaussian smeared pixels where flagged_hot_pix_map == 1.
Args:
flagged_hot_pix_map (MicropolImage): hot pixels map.
Returns:
MicropolImage: copy of the original image, gaussian smeared where flagged_hot_pix_map == 1
"""
subimages = self.single_pol_subimages
blurred_subimages = np.array(
[scipy.ndimage.median_filter(subimage, size=2) for subimage in subimages]
)
flagged_subimages = flagged_hot_pix_map.single_pol_subimages
subimages = np.where(flagged_subimages == 1, blurred_subimages, subimages)
newimage = MicropolImage(self)
newimage._update_data_and_Stokes(merge_polarizations(subimages))
return newimage
# ----------------------------------------------------------------
# ------------------------ OVERLOADING ---------------------------
# ----------------------------------------------------------------
def __add__(self, second) -> MicropolImage:
if type(self) is type(second):
newdata = self.data + second.data
newimage = MicropolImage(self)
newimage._update_data_and_Stokes(newdata)
return newimage
else:
newdata = self.data + second
return MicropolImage(newdata, angle_dic=self.angle_dic)
def __sub__(self, second) -> MicropolImage:
if type(self) is type(second):
newdata = self.data - second.data
newimage = MicropolImage(self)
newimage._update_data_and_Stokes(newdata)
return newimage
else:
newdata = self.data - second
return MicropolImage(newdata, angle_dic=self.angle_dic)
def __mul__(self, second) -> MicropolImage:
if type(self) is type(second):
newdata = self.data * second.data
newimage = MicropolImage(self)
newimage._update_data_and_Stokes(newdata)
return newimage
else:
newdata = self.data * second
return MicropolImage(newdata, angle_dic=self.angle_dic)
def __truediv__(self, second) -> MicropolImage:
if type(self) is type(second):
newdata = np.divide(self.data, second.data, where=second.data != 0.0)
newimage = MicropolImage(self)
newimage._update_data_and_Stokes(newdata)
return newimage
else:
# newdata = np.where(second != 0, self.data / second, 4096)
newdata = np.divide(self.data, second, where=second != 0.0)
return MicropolImage(newdata, angle_dic=self.angle_dic)
# provide shorter aliases
# PolarcamImage = MicropolImage
# MicropolImage = MicropolImage