import logging
import math
import numpy as np
from astropy import units as u
from astropy.nddata.bitmask import (
bitfield_to_boolean_mask,
interpret_bit_flags,
)
from scipy import interpolate
from spherical_geometry.polygon import SphericalPolygon # type: ignore[import-untyped]
__all__ = [
"calc_pixmap",
"build_driz_weight",
"build_mask",
"compute_mean_pixel_area",
"get_tmeasure",
"is_flux_density",
"is_imaging_wcs",
"resample_range",
]
log = logging.getLogger(__name__)
def calc_pixmap(wcs_from, wcs_to, shape=None, disable_bbox="to", stepsize=1, order=1):
"""
Calculate pixel coordinates of one WCS corresponding to the native pixel grid of another WCS.
.. note::
This function assumes that output frames of ``wcs_from`` and ``wcs_to``
WCS have the same units.
Parameters
----------
wcs_from : object
A WCS object representing the coordinate system you are
converting from. This object's ``array_shape`` (or ``pixel_shape``)
property will be used to define the shape of the pixel map array.
If ``shape`` parameter is provided, it will take precedence
over this object's ``array_shape`` value.
wcs_to : object
A WCS object representing the coordinate system you are
converting to.
shape : tuple, None, optional
A tuple of integers indicating the shape of the output array in the
``numpy.ndarray`` order. When provided, it takes precedence over the
``wcs_from.array_shape`` property.
disable_bbox : str, optional
Indicates whether to use or not to use the bounding box of either
(both) ``wcs_from`` or (and) ``wcs_to`` when computing pixel map.
Allowable values: "to", "from", "both", "none". When
``disable_bbox`` is "none", pixel coordinates outside of the bounding
box are set to `NaN` only if ``wcs_from`` or (and) ``wcs_to`` sets
world coordinates to NaN when input pixel coordinates are outside of
the bounding box.
stepsize : int, optional
If ``stepsize>1``, perform the full WCS calculation on a sparser
grid and use interpolation to fill in the rest of the pixels. This
option speeds up pixel map computation by reducing the number of WCS
calls, though at the cost of reduced pixel map accuracy. The loss
of accuracy is typically negligible if the underlying distortion
correction is smooth, but if the distortion is non-smooth,
``stepsize>1`` is not recommended. Large ``stepsize`` values are
automatically reduced to no more than 1/10 of image size.
Default 1.
order : int, optional
Order of the 2D spline to interpolate the sparse pixel mapping
if stepsize>1. Supported values are: 1 (bilinear) or 3 (bicubic).
This Parameter is ignored when ``stepsize <= 1``. Default 1.
Returns
-------
pixmap : numpy.ndarray
A three dimensional array representing the transformation between
the two. The last dimension is of length two and contains the x and
y coordinates of a pixel center, respectively. The other two
coordinates correspond to the two coordinates of the image the first
WCS is from.
Raises
------
ValueError
A `ValueError` is raised when output pixel map shape cannot be
determined from provided inputs.
Notes
-----
When ``shape`` is not provided and ``wcs_from.array_shape`` is not set
(i.e., it is `None`), `calc_pixmap` will attempt to determine pixel map
shape from the ``bounding_box`` property of the input ``wcs_from`` object.
If ``bounding_box`` is not available, a `ValueError` will be raised.
"""
if (bbox_from := getattr(wcs_from, "bounding_box", None)) is not None:
try:
# to avoid dependency on astropy just to check whether
# the bounding box is an instance of
# modeling.bounding_box.ModelBoundingBox, we try to
# directly use and bounding_box(order='F') and if it fails,
# fall back to converting the bounding box to a tuple
# (of intervals):
bbox_from = bbox_from.bounding_box(order="F")
except AttributeError:
bbox_from = tuple(bbox_from)
if (bbox_to := getattr(wcs_to, "bounding_box", None)) is not None:
try:
# to avoid dependency on astropy just to check whether
# the bounding box is an instance of
# modeling.bounding_box.ModelBoundingBox, we try to
# directly use and bounding_box(order='F') and if it fails,
# fall back to converting the bounding box to a tuple
# (of intervals):
bbox_to = bbox_to.bounding_box(order="F")
except AttributeError:
bbox_to = tuple(bbox_to)
if shape is None:
shape = wcs_from.array_shape
if shape is None and bbox_from is not None:
if (ndim := np.ndim(bbox_from)) == 1:
bbox_from = (bbox_from,)
if ndim > 1:
shape = tuple(math.ceil(lim[1] + 0.5) for lim in bbox_from[::-1])
if shape is None:
raise ValueError('The "from" WCS must have pixel_shape property set.')
# temporarily disable the bounding box for the "from" WCS:
if disable_bbox in ["from", "both"] and bbox_from is not None:
wcs_from.bounding_box = None
if disable_bbox in ["to", "both"] and bbox_to is not None:
wcs_to.bounding_box = None
# find integer boundaries of of the pixel grid to be computed:
if bbox_from is None:
xmin, xmax = 0, shape[1] - 1
ymin, ymax = 0, shape[0] - 1
else:
((xmin, xmax), (ymin, ymax)) = bbox_from
xmin = max(0, int(math.ceil(xmin)))
xmax = min(shape[1] - 1, int(math.floor(xmax)))
ymin = max(0, int(math.ceil(ymin)))
ymax = min(shape[0] - 1, int(math.floor(ymax)))
eff_width = xmax - xmin + 1
eff_height = ymax - ymin + 1
try:
if stepsize == 1 or max(eff_width, eff_height) <= 10:
y, x = np.indices(shape, dtype=np.float64)
x, y = wcs_to.world_to_pixel_values(*wcs_from.pixel_to_world_values(x, y))
else:
if order not in [1, 3]:
raise ValueError("Interpolation order should be either 1 or 3.")
# Because of the way RectBivariateSpline works, we need to pick
# only those points that have finite coordinates. While using
# bounding boxes does not guarantee that all points within them
# will have finite world coordinates, it is the best we can do
# without calling the WCS on all pixels.
y_in, x_in = np.arange(ymin, ymax + 1), np.arange(xmin, xmax + 1)
# Number of points so that step size is no larger than requested
# but at least 10 points are used (or the number of input pixels
# in each dimension, if smaller than that).
npts_x = max(int(math.ceil(eff_width / stepsize)), min(10, eff_width))
npts_y = max(int(math.ceil(eff_height / stepsize)), min(10, eff_height))
x_coarse = np.linspace(xmin, xmax, npts_x)
y_coarse = np.linspace(ymin, ymax, npts_y)
sparsegrid = np.meshgrid(x_coarse, y_coarse)
pixmap_coarse = wcs_to.world_to_pixel_values(
*wcs_from.pixel_to_world_values(sparsegrid[0], sparsegrid[1])
)
if np.all(np.isfinite(pixmap_coarse[0])) and np.all(np.isfinite(pixmap_coarse[1])):
fx = interpolate.RectBivariateSpline(y_coarse, x_coarse, pixmap_coarse[0], kx=order, ky=order)
fy = interpolate.RectBivariateSpline(y_coarse, x_coarse, pixmap_coarse[1], kx=order, ky=order)
# Evaluate the spline on the full grid
x = fx(y_in, x_in)
y = fy(y_in, x_in)
if not (xmin == 0 and ymin == 0 and xmax == shape[1] - 1 and ymax == shape[0] - 1):
# we need to create a full grid and inject the interpolated
# values into it:
x_full = np.full(shape, np.nan)
y_full = np.full(shape, np.nan)
x_full[ymin : ymax + 1, xmin : xmax + 1] = x
y_full[ymin : ymax + 1, xmin : xmax + 1] = y
x, y = x_full, y_full
else:
# revert to the full WCS calculation if there are any
# non-finite values in the coarse grid:
y, x = np.indices(shape, dtype=np.float64)
x, y = wcs_to.world_to_pixel_values(*wcs_from.pixel_to_world_values(x, y))
finally:
if bbox_from is not None:
wcs_from.bounding_box = bbox_from
if bbox_to is not None:
wcs_to.bounding_box = bbox_to
pixmap = np.dstack([x, y])
return pixmap
def resample_range(data_shape, bbox=None): # noqa: D103
# Find range of input pixels to resample:
if len(data_shape) != 2:
raise ValueError("Expected 'data_shape' to be of length 2.")
if bbox is None:
xmin = ymin = 0
xmax = max(data_shape[1] - 1, 0)
ymax = max(data_shape[0] - 1, 0)
else:
if len(bbox) != 2:
raise ValueError("Expected bounding box to be of length 2.")
((x1, x2), (y1, y2)) = bbox
xmin = max(0, int(x1 + 0.5))
ymin = max(0, int(y1 + 0.5))
xmax = max(xmin, min(data_shape[1] - 1, int(x2 + 0.5)))
ymax = max(ymin, min(data_shape[0] - 1, int(y2 + 0.5)))
return xmin, xmax, ymin, ymax
def build_mask(dqarr, good_bits, flag_name_map=None):
"""Build a bit mask from an input DQ array and a bitvalue flag.
In the returned bit mask, 1 is good, 0 is bad
"""
good_bits = interpret_bit_flags(good_bits, flag_name_map=flag_name_map)
dqmask = bitfield_to_boolean_mask(
dqarr,
good_bits,
good_mask_value=1,
dtype=np.uint8,
flag_name_map=flag_name_map,
)
return dqmask
def build_driz_weight(model, weight_type=None, good_bits=None, flag_name_map=None):
"""
Build drizzle weight map.
Create a weight map that is used for weighting input images when
they are co-added to the output model.
Parameters
----------
model : dict
Input model: a dictionary of relevant keywords and values.
weight_type : str or None, optional
The weighting type ("exptime", "ivm", or "ivm-sky") for adding models' data.
For ``weight_type="ivm"`` and ``weight_type="ivm-sky"``,
the weighting will be determined
per-pixel using the inverse of either the read noise (VAR_RNOISE) or
sky variance (VAR_SKY) arrays, respectively. If the array does not
exist, the weight is set to 1 for all pixels (i.e., equal weighting).
If ``weight_type="exptime"``, the weight will be set equal to the
measurement time when available and to the exposure time otherwise for
pixels not flagged in the DQ array of the model. The default value of
`None` will set weights to 1 for pixels not flagged in the DQ array of
the model. Pixels flagged as "bad" in the DQ array will have their
weights set to 0.
good_bits : int, str, None, optional
An integer bit mask, `None`, a Python list of bit flags, a comma-,
or ``'|'``-separated, ``'+'``-separated string list of integer
bit flags or mnemonic flag names that indicate what bits in models'
DQ bitfield array should be *ignored* (i.e., zeroed).
See `Resample` for more information.
flag_name_map : astropy.nddata.BitFlagNameMap, dict, None, optional
A `~astropy.nddata.BitFlagNameMap` object or a dictionary that provides
mapping from mnemonic bit flag names to integer bit values in order to
translate mnemonic flags to numeric values when ``bit_flags``
that are comma- or '+'-separated list of mnemonic bit flag names.
"""
data = model["data"]
dq = model["dq"]
dqmask = bitfield_to_boolean_mask(
dq,
good_bits,
good_mask_value=1,
dtype=np.uint8,
flag_name_map=flag_name_map,
)
if weight_type == "ivm":
inv_variance = _get_inverse_variance(
model["var_rnoise"] if "var_rnoise" in model else None,
data.shape,
"var_rnoise",
)
result = inv_variance * dqmask
elif weight_type == "ivm-sky":
inv_sky_variance = _get_inverse_variance(
model["var_sky"] if "var_sky" in model else None,
data.shape,
"var_sky",
)
result = inv_sky_variance * dqmask
elif weight_type == "exptime":
exptime, _ = get_tmeasure(model)
result = np.float32(exptime) * dqmask
elif weight_type is None:
result = dqmask
else:
raise ValueError(
f"Invalid weight type: {repr(weight_type)}."
"Allowed weight types are 'ivm', 'ivm-sky', 'exptime', or None."
)
return result.astype(np.float32)
def get_tmeasure(model):
"""
Get tmeasure from datamodel.
Check if the measurement_time keyword is present in the datamodel
for use in exptime weighting. If not, revert to using exposure_time.
Returns a tuple of (exptime, is_measurement_time)
"""
try:
tmeasure = model["measurement_time"]
except KeyError:
return model["exposure_time"], False
if tmeasure is None:
return model["exposure_time"], False
else:
return tmeasure, True
[docs]
def is_imaging_wcs(wcs):
"""Return `True` if ``wcs`` is an imaging WCS and `False` otherwise."""
imaging = all(ax == "SPATIAL" for ax in wcs.output_frame.axes_type)
return imaging
def compute_mean_pixel_area(wcs, shape=None):
"""
Compute mean pixel area.
Computes the average pixel area (in steradians) based on input WCS
using pixels within either the bounding box (if available) or the entire
data array as defined either by ``wcs.array_shape`` or the ``shape``
argument.
Parameters
----------
shape : tuple, optional
Shape of the region over which average pixel area will be computed.
When not provided, pixel average will be estimated over a region
defined by ``wcs.array_shape``.
Returns
-------
pix_area : float
Pixel area in steradians.
Notes
-----
This function takes the outline of the region in which the average is
computed (a rectangle defined by either the bounding box or
``wcs.array_shape`` or the ``shape``) and projects it to world coordinates.
It then uses ``spherical_geometry`` to compute the area of the polygon
defined by this outline on the sky. In order to minimize errors due to
distortions in the ``wcs``, the code defines the outline using pixels
spaced no more than 15 pixels apart along the border of the rectangle
in which the average is computed.
"""
if (shape := (shape or wcs.array_shape)) is None:
raise ValueError(
"Either WCS must have 'array_shape' attribute set or 'shape' argument must be supplied."
)
valid_polygon = False
spatial_idx = np.where(np.array(wcs.output_frame.axes_type) == "SPATIAL")[0]
ny, nx = shape
if wcs.bounding_box is None:
((xmin, xmax), (ymin, ymax)) = ((-0.5, nx - 0.5), (-0.5, ny - 0.5))
else:
((xmin, xmax), (ymin, ymax)) = wcs.bounding_box
if xmin > xmax:
(xmin, xmax) = (xmax, xmin)
if ymin > ymax:
(ymin, ymax) = (ymax, ymin)
xmin = max(0, int(xmin + 0.5))
xmax = min(nx - 1, int(xmax - 0.5))
ymin = max(0, int(ymin + 0.5))
ymax = min(ny - 1, int(ymax - 0.5))
k = 0
dxy = [1, -1, -1, 1]
while xmin < xmax and ymin < ymax:
try:
(x, y, image_area, center, b, r, t, l) = _get_boundary_points(
xmin=xmin,
xmax=xmax,
ymin=ymin,
ymax=ymax,
dx=min((xmax - xmin) // 4, 15),
dy=min((ymax - ymin) // 4, 15),
)
except ValueError:
return None
world = wcs(x, y)
ra = world[spatial_idx[0]]
dec = world[spatial_idx[1]]
limits = [ymin, xmax, ymax, xmin]
for _ in range(4):
sl = [b, r, t, l][k]
if not (np.all(np.isfinite(ra[sl])) and np.all(np.isfinite(dec[sl]))):
limits[k] += dxy[k]
ymin, xmax, ymax, xmin = limits
k = (k + 1) % 4
break
k = (k + 1) % 4
else:
valid_polygon = True
break
ymin, xmax, ymax, xmin = limits
if not valid_polygon:
return None
world = wcs(*center)
wcenter = (world[spatial_idx[0]], world[spatial_idx[1]])
sky_area = SphericalPolygon.from_radec(ra, dec, center=wcenter).area()
if sky_area > 2 * np.pi:
log.warning("Unexpectedly large computed sky area for an image. Setting area to: 4*Pi - area")
sky_area = 4 * np.pi - sky_area
if image_area == 0:
log.error("Image area is zero; cannot compute pixel area.")
return None
pix_area = sky_area / image_area
return pix_area
def _get_boundary_points(xmin, xmax, ymin, ymax, dx=None, dy=None, shrink=0): # noqa: E741
"""
Get boundary points.
Creates a list of ``x`` and ``y`` coordinates of points along the perimiter
of the rectangle defined by ``xmin``, ``xmax``, ``ymin``, ``ymax``, and
``shrink`` in counter-clockwise order.
Parameters
----------
xmin : int
X-coordinate of the left edge of a rectangle.
xmax : int
X-coordinate of the right edge of a rectangle.
ymin : int
Y-coordinate of the bottom edge of a rectangle.
ymax : int
Y-coordinate of the top edge of a rectangle.
dx : int, float, None, optional
Desired spacing between adjacent points along horizontal edges of
the rectangle.
dy : int, float, None, optional
Desired spacing between adjacent points along vertical edges of
the rectangle.
shrink : int, optional
Amount to be applied to input ``xmin``, ``xmax``, ``ymin``, ``ymax``
to reduce the rectangle size.
Returns
-------
x : numpy.ndarray
An array of X-coordinates of points along the perimiter
of the rectangle defined by ``xmin``, ``xmax``, ``ymin``, ``ymax``, and
``shrink`` in counter-clockwise order.
y : numpy.ndarray
An array of Y-coordinates of points along the perimiter
of the rectangle defined by ``xmin``, ``xmax``, ``ymin``, ``ymax``, and
``shrink`` in counter-clockwise order.
area : float
Area in units of pixels of the region defined by ``xmin``, ``xmax``,
``ymin``, ``ymax``, and ``shrink``.
center : tuple
A tuple of pixel coordinates at the center of the rectangle defined
by ``xmin``, ``xmax``, ``ymin``, ``ymax``.
bottom : slice
A `slice` object that allows selection of pixels from ``x`` and ``y``
arrays along the bottom edge of the rectangle.
right : slice
A `slice` object that allows selection of pixels from ``x`` and ``y``
arrays along the right edge of the rectangle.
top : slice
A `slice` object that allows selection of pixels from ``x`` and ``y``
arrays along the top edge of the rectangle.
left : slice
A `slice` object that allows selection of pixels from ``x`` and ``y``
arrays along the left edge of the rectangle.
"""
nx = xmax - xmin + 1
ny = ymax - ymin + 1
if dx is None or dx <= 0:
dx = nx
if dy is None or dy <= 0:
dy = ny
if nx - 2 * shrink < 1 or ny - 2 * shrink < 1:
raise ValueError("Image size is too small.")
sx = max(1, int(np.ceil(nx / dx)))
sy = max(1, int(np.ceil(ny / dy)))
xmin += shrink
xmax -= shrink
ymin += shrink
ymax -= shrink
size = 2 * sx + 2 * sy
x = np.empty(size)
y = np.empty(size)
bottom = np.s_[0:sx] # bottom edge
right = np.s_[sx : sx + sy] # right edge
top = np.s_[sx + sy : 2 * sx + sy] # top edge
left = np.s_[2 * sx + sy : 2 * sx + 2 * sy] # noqa: E741 left edge
x[bottom] = np.linspace(xmin, xmax, sx, False)
y[bottom] = ymin
x[right] = xmax
y[right] = np.linspace(ymin, ymax, sy, False)
x[top] = np.linspace(xmax, xmin, sx, False)
y[top] = ymax
x[left] = xmin
y[left] = np.linspace(ymax, ymin, sy, False)
area = (xmax - xmin) * (ymax - ymin)
center = (0.5 * (xmin + xmax), 0.5 * (ymin + ymax))
return x, y, area, center, bottom, right, top, left
def is_flux_density(bunit):
"""
Differentiate between surface brightness and flux density data units.
Parameters
----------
bunit : str or `~astropy.units.Unit`
Data units, e.g. 'MJy' (is flux density) or 'MJy/sr' (is not).
Returns
-------
bool
True if the units are equivalent to flux density units.
"""
try:
flux_density = u.Unit(bunit).is_equivalent(u.Jy)
except (ValueError, TypeError):
flux_density = False
return flux_density
def _get_inverse_variance(array, data_shape, array_name):
"""
Compute the inverse variance array for weighting.
Parameters
----------
array : numpy.ndarray or None
Input variance array.
data_shape : tuple
Expected shape of the output array.
Returns
-------
inv : numpy.ndarray
Inverse variance array. If input array is missing or has wrong shape,
returns an array filled with ones of shape `data_shape`. Otherwise the inverse of the variance
array is returned, with invalid values replaced by zeros.
"""
if array is not None and array.shape == data_shape:
with np.errstate(divide="ignore", invalid="ignore"):
inv = 1.0 / array
inv[~np.isfinite(inv)] = 0 # zeros for bad pixels
else:
log.warning(
f"'{array_name}' array not available. Setting drizzle weight map to 1",
)
inv = np.full(data_shape, 1, dtype=np.float32) # ones for missing/misshaped array
return inv