import sys
from functools import lru_cache
from logging import info
import cv2
import matplotlib.pyplot as plt
import numpy as np
from numpy.lib.stride_tricks import as_strided
from scipy.optimize import curve_fit
from tqdm import tqdm
[docs]def roi_from_polar(
data: np.array,
center: list = None,
rho: list = None,
theta=[0, 360],
fill: float = 0.0,
return_boolean: bool = False,
include_superpixels: bool = True,
) -> np.array:
"""Returns the input array in a circular selection, otherwise an arbitrary number. If a pixel is not in the selection the ENTIRE superpixel is considered out of selection. If return_boolean is True then a boolean array is returned instead (useful for mean/stdev operations).
Args:
data (np.array): input data
center (list, optional): pixel coordinates of the circle center. Defaults to None (image center).
rho (list, optional): radius to exclude. Defaults to None (center to image border).
theta (list, optional): polar selection angle, 0 is horizonta, anti-clockwise direction. Defaults to [0, 360].
fill (float, optional): number to fill the outer selection. Defaults to 0.0.
return_boolean (bool, optional): if set to true, function returns a boolean array of the roi. Defaults to False.
include_superpixels (bool, optional): if set to true then exclude entire superpixel if one pixel is in the occulter
Returns:
np.array: array containing the input data inside the selection, and fill otherwise
"""
height, width = data.shape
theta_min, theta_max = theta
rho_min, rho_max = rho
if center is None:
center = [int(height / 2), int(width / 2)]
else:
center = list(center)
if rho is None:
rho_max = np.min([height - center[0], width - center[1]])
rho = [0.0, rho_max]
if include_superpixels:
# make a map that is HALF THE SIZE, do condition, then resize to make all the superpixel outside selection
height = int(height / 2)
width = int(width / 2)
rho_min = int(rho_min / 2)
rho_max = int(rho_max / 2)
center[0] = int(center[0] / 2)
center[1] = int(center[1] / 2)
rho_coords, phi_coords = map_polar_coordinates(
height,
width,
tuple([center[0], center[1]]),
) # cast it to a tuple (which is hashable)
theta_condition = np.logical_and(phi_coords >= theta_min, phi_coords < theta_max)
rho_condition = np.logical_and(
rho_coords > rho_min, rho_coords <= rho_max
) # half the radius because half the map
condition = np.logical_and(rho_condition, theta_condition)
if include_superpixels:
# resize condition to correct shape
condition = condition.repeat(2, axis=0).repeat(2, axis=1)
if return_boolean:
return np.where(condition, True, False)
else:
return np.where(condition, data, fill)
[docs]def tile_double(a):
height, width = a.shape
hs, ws = a.strides
tiles = as_strided(a, (height, 2, width, 2), (hs, 0, ws, 0))
return tiles.reshape(2 * height, 2 * width)
[docs]@lru_cache
def map_polar_coordinates(height, width, center):
y_center, x_center = center
i_list, j_list = np.arange(width), np.arange(height)
x_coords, y_coords = np.meshgrid(i_list, j_list)
# Map polar coordinates, 0 = horizontal dx, anti-clockwise angles
rho_coords = np.sqrt((x_coords - x_center) ** 2 + (y_coords - y_center) ** 2)
phi_coords = (
(np.arctan2(y_coords - y_center, x_coords - x_center) * 180 / np.pi) + 360
) % 360
return rho_coords, phi_coords
[docs]def nrgf(
data: np.array,
y_center: int = None,
x_center: int = None,
rho_min: int = None,
step: int = 1,
phi_to_mean=[0.0, 360],
output_phi=[0.0, 360],
) -> np.array:
"""
Performs nrgf filtering on the image, starting from center and radius. Mean is performed between phi_to_mean, 0 is horizontal right, anti-clockwise.
Args:
data (np.array): input array
y_center (int, optional): pixel y coordinate of the nrgf center. Defaults to None (image y center).
x_center (int, optional): pixel x coordinate of the nrgf center. Defaults to (image x center).
rho_min (int, optional): minimun radius in pixels to perform nrgf to. Defaults to None (radius 0).
step (int, optional): step to which apply the nrgf from center, in pixels. Defaults to 1 pixel.
phi_to_mean (list[float, float], optional): polar angle to calculate the mean value from. Defaults to [0, 360].
output_phi (list[float, float], optional): polar angle to include in output data. Defaults to [0, 360].
Returns:
np.array: nrgf-filtered input data
"""
height, width = data.shape
if (y_center is None) or (x_center is None) or (rho_min is None):
info("Calculating occulter position...")
y_center, x_center, rho_min = find_occulter_position(data)
center = [int(y_center), int(x_center)]
newdata = np.zeros(shape=data.shape, dtype=float)
i_list, j_list = np.arange(width), np.arange(height)
x_coords, y_coords = np.meshgrid(i_list, j_list)
# Map polar coordinates
rho_coords, phi_coords = map_polar_coordinates(
height, width, tuple(center)
) # cast it to a tuple (which is hashable)
mean_phi_condition = np.logical_and(
phi_coords >= phi_to_mean[0], phi_coords < phi_to_mean[1]
) # Exclude angle from mean
out_phi_condition = np.logical_and(
phi_coords >= output_phi[0], phi_coords < output_phi[1]
) # Exclude angle from filter
rho_max = int(np.max(rho_coords))
rho_step = step
print("Applying nrgf filter...")
for r in tqdm(range(rho_min, rho_max, rho_step)):
rho_condition = np.logical_and(rho_coords > r, rho_coords <= r + rho_step)
condition = np.logical_and(rho_condition, out_phi_condition)
# condition = np.logical_and(rho_condition, mean_phi_condition)
mean_condition = np.logical_and(rho_condition, mean_phi_condition)
mean_over_ROI = np.mean(data, where=mean_condition)
std_over_ROI = np.std(data, where=mean_condition)
if std_over_ROI > 0:
newdata = np.where(
condition,
(data - mean_over_ROI) / std_over_ROI,
newdata,
)
else:
newdata = np.where(condition, 0, newdata)
return newdata
[docs]def find_occulter_hough(
data: np.array, minr: float = 1, maxr: float = None, **kwargs
) -> tuple:
"""Uses Hough Gradient from cv2 and computes the coronagraph occulter coordinates. Returns Y, X, Radius of the occulter.
Args:
data (np.array): input data
Returns:
tuple: occulter y, x, r
"""
info("Searching occulter, this may take a while...")
if maxr is None:
maxr = np.max(data.shape)
minDist = np.max(data.shape) # minimum distance between circles
data = 255 * data / np.max(data)
blurred = cv2.medianBlur(data.astype("uint8"), 5)
accumulator = 10
circles = cv2.HoughCircles(
image=blurred,
method=cv2.HOUGH_GRADIENT,
dp=1.2,
minDist=minDist,
param1=200,
param2=accumulator,
minRadius=minr,
maxRadius=maxr,
)
try:
while len(circles[0]) > 1:
print(f"{len(circles[0])} circles found, retrying...")
accumulator += 5
circles = cv2.HoughCircles(
image=blurred,
method=cv2.HOUGH_GRADIENT,
dp=1.2,
minDist=minDist,
param1=200,
param2=accumulator,
minRadius=minr,
maxRadius=maxr,
)
except TypeError:
print("Failed to find the occulter.")
x, y, r = circles[0, 0]
return y, x, r # BEWARE OF THIS REVERSION: Y THEN X
[docs]def find_occulter_position(
data: np.array, method: str = "sigmoid", threshold: float = 4.0
):
"""Finds the occulter position of an image.
Args:
data (np.array): input data
method (str, optional): Method to find occulter edges. If "sigmoid" it will try to fit four sigmoids at the image edges centers, inferring the occulter edges from the parameters. If "algo" it will start from the image edge center and infer the occulter position when DN[i] > threshold*mean(DN[:i]) Defaults to "sigmoid".
threshold (float, optional): Threshold for the algo method. Defaults to 4.0.
Raises:
UnboundLocalError: couldn't converge
Returns:
list: occulter y, occulter x, occulter radius
"""
# works if occulter is not entirely inside a single quadrant, fits
# a sigmoid to find occulter bounds
half_y = int(data.shape[0] / 2)
half_x = int(data.shape[1] / 2)
values_x_0 = np.flip(data[half_y, :half_x])
values_x_1 = data[half_y, half_x:]
values_y_0 = np.flip(data[:half_y, half_x])
values_y_1 = data[half_y:, half_x]
occulter_bounds = [0.0] * 4
show = False
if show:
fig, ax = plt.subplots(2, 2, constrained_layout=True)
ax = ax.ravel()
titles = [
"Lower vertical",
"Upper vertical",
"Right horizontal",
"Left horizontal",
]
if method == "sigmoid":
for idx, half_array in enumerate(
[values_y_0, values_y_1, values_x_0, values_x_1]
):
# Artificial plateau after maximum
# max_half_array = np.max(half_array)
# for i, element in enumerate(half_array):
# if element == max_half_array:
# max_i = i
# break
# half_array[max_i:] = max_half_array
# x = np.arange(0, half_y, 1)
x = np.arange(0, int(len(half_array / 2)), 1)
params = [
# np.mean(half_array[int(len(half_array) / 2) :]),
np.max(half_array),
# np.mean(half_array[: int(len(half_array) / 2)]),
np.min(half_array),
0.5,
len(half_array) / 2,
]
params, pcov = curve_fit(sigmoid, x, half_array, params)
occulter_bounds[idx] = params[3]
if show:
axis = ax[idx]
axis.scatter(x, half_array, c="grey")
axis.plot(x, sigmoid(x, *params), c="black")
axis.set_title(titles[idx])
axis.set_xlabel("Pixels from center")
axis.set_ylabel("DN")
try:
occulter_bounds[0] = half_y - occulter_bounds[0]
occulter_bounds[1] = half_y + occulter_bounds[1]
occulter_bounds[2] = half_x - occulter_bounds[2]
occulter_bounds[3] = half_x + occulter_bounds[3]
except UnboundLocalError:
raise ValueError("Edges not found, try to change the threshold")
elif method == "algo":
# OLD, ALGORITM INSTEAD OF FITTING - less stable
min_points = 5
threshold = threshold
for i, val in enumerate(values_x_0):
if i > min_points:
mean = np.mean(values_x_0[:i])
if val > (threshold * mean):
occulter_start_x = half_x - i
break
for i, val in enumerate(values_x_1):
if i > min_points:
mean = np.mean(values_x_1[:i])
if val > (threshold * mean):
occulter_end_x = half_x + i
break
for i, val in enumerate(values_y_0):
if i > min_points:
mean = np.mean(values_y_0[:i])
if val > (threshold * mean):
occulter_start_y = half_y - i
break
for i, val in enumerate(values_y_1):
if i > min_points:
mean = np.mean(values_y_1[:i])
if val > (threshold * mean):
occulter_end_y = half_y + i
break
try:
occulter_bounds[0] = occulter_start_y
occulter_bounds[1] = occulter_end_y
occulter_bounds[2] = occulter_start_x
occulter_bounds[3] = occulter_end_x
except UnboundLocalError:
raise ValueError(
"ERROR: occulter edges not found, try to change the threshold"
)
elif method == "hugh":
pass # TODO
if show:
plt.show()
y_center = int(np.mean([occulter_bounds[:2]]))
x_center = int(np.mean([occulter_bounds[2:]]))
radius = int(
np.mean(
[
np.abs(occulter_bounds[1] - occulter_bounds[0]) / 2,
np.abs(occulter_bounds[3] - occulter_bounds[2]) / 2,
]
)
)
return [y_center, x_center, radius]
[docs]def sigmoid(x, max, min, slope, intercept):
sigma = slope * (x - intercept)
return max * np.exp(sigma) / (1 + np.exp(sigma)) + min