Source code for jwst.targ_centroid.targ_centroid_step

import logging

import jwst.datamodels as dm
from jwst.assign_wcs.miri import retrieve_filter_offset
from jwst.stpipe import Step
from jwst.targ_centroid.targ_centroid import (
    BadFitError,
    NoFinitePixelsError,
    WCSError,
    center_from_ta_image,
    find_dither_position,
)

__all__ = ["TargCentroidStep"]

log = logging.getLogger(__name__)


[docs] class TargCentroidStep(Step): """Determine position of target source from TA verification image.""" class_alias = "targ_centroid" spec = """ ta_file = string(default=None) # Target acquisition image file name skip = boolean(default=True) # Skip this step by default """ # noqa: E501 reference_file_types = ["filteroffset"]
[docs] def process(self, step_input): """ Determine target position from TA verification image. Parameters ---------- step_input : str, `~jwst.datamodels.container.ModelContainer`, \ `~stdatamodels.jwst.datamodels.ImageModel`, \ `~stdatamodels.jwst.datamodels.CubeModel` The input data model or association. Returns ------- result : `~jwst.datamodels.container.ModelContainer`, \ `~stdatamodels.jwst.datamodels.ImageModel`, \ `~stdatamodels.jwst.datamodels.CubeModel` The output data model or association with TA centering applied. """ result = self.prepare_output(step_input) if isinstance(result, dm.ModelContainer): container = result # Extract science and TA image from container result, ta_model = self._ta_image_from_container(result) self.ta_file = ta_model else: container = None # Ensure TA file is provided if str(self.ta_file).lower() == "none": log.error("No target acquisition file provided. Step will be SKIPPED.") result.meta.cal_step.targ_centroid = "SKIPPED" result = self._rebuild_container(container, result) return result # Check that this is a point source if result.meta.target.source_type != "POINT": log.warning( "TargCentroidStep is only intended for point sources. " f"Input data has source_type={result.meta.target.source_type}. " "Step may not perform as expected." ) # Check exposure type exp_type = result.meta.exposure.type if exp_type not in ["MIR_LRS-FIXEDSLIT", "MIR_LRS-SLITLESS"]: log.error( "TA centering is only implemented for MIR_LRS-FIXEDSLIT and" " MIR_LRS-SLITLESS modes. Step will be SKIPPED." ) result.meta.cal_step.targ_centroid = "SKIPPED" result = self._rebuild_container(container, result) return result log.info(f"Performing TA centering using file: {self.ta_file}") with dm.open(self.ta_file) as ta_model: # check TA verification image and exposure type ta_exptype = ta_model.meta.exposure.type if (not isinstance(ta_model, dm.ImageModel)) or (ta_exptype != "MIR_TACONFIRM"): log.error( "TA verification image must be an ImageModel with exposure type MIR_TACONFIRM. " f"Input TA file is of type {type(ta_model)} with exposure type {ta_exptype}. " "Step will be SKIPPED." ) result.meta.cal_step.targ_centroid = "SKIPPED" result = self._rebuild_container(container, result) return result # Find expected source position by assigning WCS to TA model # and translating dither offsets to detector coordinates try: ref_center, ta_dither_offset = find_dither_position(ta_model) except WCSError as e: log.error(f"Error when assigning WCS to {self.ta_file}: {e}. Step will be SKIPPED.") result.meta.cal_step.targ_centroid = "SKIPPED" result = self._rebuild_container(container, result) return result log.debug(f"Reference center in detector subarray (0-indexed): {ref_center}") # Put attributes needed later into local variables so we can exit context manager ta_image = ta_model.data ta_filter = ta_model.meta.instrument.filter filteroffset_file = self.get_reference_file(ta_model, "filteroffset") subarray_name = ta_model.meta.subarray.name xstart = ta_model.meta.subarray.xstart # 1-indexed in FITS ystart = ta_model.meta.subarray.ystart # 1-indexed in FITS log.debug(f"TA subarray: {subarray_name}, origin: ({xstart}, {ystart})") # Compute centroid from TA image try: (x_center, y_center), (x_center_sub, y_center_sub) = center_from_ta_image( ta_image, ref_center, subarray_origin=(xstart, ystart), ) except (NoFinitePixelsError, BadFitError) as e: log.error(f"Error during TA centering: {e}. Step will be SKIPPED.") result.meta.cal_step.targ_centroid = "SKIPPED" result = self._rebuild_container(container, result) return result # Store TA centering results in output model result.ta_xpos = x_center_sub result.ta_ypos = y_center_sub log.info( "Fitted source position on TA verification image: " f"({result.ta_xpos:.2f}, {result.ta_ypos:.2f})" ) # undo dither offset of TA image. # At this stage we should be near the reference point from the specwcs, # (326.13, 300.7) for slit data # (38.5, 829.0) for slitless data x_center -= ta_dither_offset[0] y_center -= ta_dither_offset[1] log.debug( "Undoing TA verification image dither offsets: " f"x={ta_dither_offset[0]}, y={ta_dither_offset[1]}" f" -> new position: x={x_center}, y={y_center} should be near reference point." ) # Apply science exposure subarray offset log.debug( f"Applying science exposure subarray origin offsets: xstart={xstart}, ystart={ystart}" ) x_center -= xstart - 1 y_center -= ystart - 1 # Apply dither offset from science exposure _, (offset_x, offset_y) = find_dither_position(result) log.debug(f"Applying science exposure dither offsets: x={offset_x}, y={offset_y}") x_center += offset_x y_center += offset_y # Apply filter offsets with dm.FilteroffsetModel(filteroffset_file) as filteroffset: col_offset, row_offset = retrieve_filter_offset(filteroffset, ta_filter) log.debug(f"Applying filter offsets: column={col_offset}, row={row_offset}") x_center += col_offset y_center += row_offset # Store final source position in science model and set completion flag result.source_xpos = x_center result.source_ypos = y_center result.meta.cal_step.targ_centroid = "COMPLETE" log.info("TA centering complete.") log.info( "Final source position in science data frame: " f"({result.source_xpos:.2f}, {result.source_ypos:.2f})" ) # Reconstruct the container if needed result = self._rebuild_container(container, result) return result
def _ta_image_from_container(self, container): """ Extract the TA image from a container (association or ModelContainer). Parameters ---------- container : ModelContainer The input container. Returns ------- sci_model : DataModel The science data model. ta_model : DataModel The TA image data model. """ sci_idx = container.ind_asn_type("science") sci_model = container[sci_idx[0]] ta_idx = container.ind_asn_type("target_acquisition") if not len(ta_idx): ta_model = None else: ta_model = container[ta_idx[0]] return sci_model, ta_model def _rebuild_container(self, container, updated_sci_model): """ Rebuild the container with the updated science model. Parameters ---------- container : ModelContainer or DataModel The original container if present. If None, the updated science model is returned as is. updated_sci_model : DataModel The updated science data model. Returns ------- new_container : ModelContainer The rebuilt container with the updated science model. """ if container is None: return updated_sci_model sci_idx = container.ind_asn_type("science") container[sci_idx[0]] = updated_sci_model return container