Source code for jwst.master_background.master_background_step

import logging
from pathlib import Path

from scipy.signal import medfilt
from stdatamodels.jwst import datamodels
from stdatamodels.properties import merge_tree

from jwst.combine_1d.combine1d import combine_1d_spectra
from jwst.datamodels import ModelContainer
from jwst.master_background.expand_to_2d import expand_to_2d
from jwst.stpipe import Step, record_step_status

__all__ = ["MasterBackgroundStep"]

log = logging.getLogger(__name__)


[docs] class MasterBackgroundStep(Step): """Compute and subtract master background from spectra.""" class_alias = "master_background" spec = """ median_kernel = integer(default=1) # Moving-median boxcar size to filter the background user_background = string(default=None) # Path to user-supplied master background save_background = boolean(default=False) # Save computed master background force_subtract = boolean(default=False) # Force subtracting master background output_use_model = boolean(default=True) """ # noqa: E501
[docs] def process(self, input_data): """ Compute and subtract a master background spectrum. Parameters ---------- input_data : `~stdatamodels.jwst.datamodels.ImageModel`, \ `~stdatamodels.jwst.datamodels.IFUImageModel`, \ `~jwst.datamodels.container.ModelContainer`, str Input target datamodel(s) or association file to which master background subtraction is to be applied. Returns ------- result : `~stdatamodels.jwst.datamodels.ImageModel`, \ `~stdatamodels.jwst.datamodels.IFUImageModel`, \ `~jwst.datamodels.container.ModelContainer` The background-subtracted science datamodel(s) """ output_model = self.prepare_output(input_data) # First check if we should even do the subtraction. If not, bail. if not self._do_sub(output_model): record_step_status(output_model, "master_background", success=False) return output_model # Check that data is a supported datamodel. If not, bail. if not isinstance( output_model, ( ModelContainer, datamodels.MultiSlitModel, datamodels.ImageModel, datamodels.SlitModel, datamodels.IFUImageModel, ), ): log.warning( f"Input {input_data} of type {type(output_model)} cannot be handled. Step skipped." ) record_step_status(output_model, "master_background", success=False) return output_model # If user-supplied master background, subtract it if self.user_background: if isinstance(output_model, ModelContainer): result, bg_container = split_container(output_model) asn_id = result.asn_table["asn_id"] del bg_container background_2d_collection = ModelContainer() user_bg_name = Path(self.user_background).name for model in result: background_2d = expand_to_2d(model, self.user_background) subtract_2d_background(model, background_2d) background_2d_collection.append(background_2d) # Record name of user-supplied master background spectrum model.meta.background.master_background_file = user_bg_name else: # Use user-supplied master background and subtract it asn_id = None background_2d = expand_to_2d(output_model, self.user_background) background_2d_collection = ModelContainer([background_2d]) result = subtract_2d_background(output_model, background_2d) # Record name of user-supplied master background spectrum result.meta.background.master_background_file = Path(self.user_background).name # Save the computed 2d background if requested by user. The user has supplied # the master background so just save the expanded 2d background if self.save_background: self.save_container( background_2d_collection, suffix="masterbg2d", force=True, asn_id=asn_id ) # Compute master background and subtract it else: # Input must be a container to continue if not isinstance(output_model, ModelContainer): log.warning( f"Input {input_data} of type {type(output_model)} cannot be " "handled without user-supplied background. Step skipped." ) record_step_status(output_model, "master_background", success=False) return output_model result, background_data = split_container(output_model) if len(background_data) == 0: msg = ( "No background data found in input container, " "and no user-supplied background provided. Skipping step." ) log.warning(msg) record_step_status(output_model, "master_background", success=False) return output_model asn_id = result.asn_table["asn_id"] for model in background_data: # Check if the background members are nodded x1d extractions # or background from dedicated background exposures. this_is_ifu_extended = False if ( model.meta.exposure.type == "NRS_IFU" and model.spec[0].source_type == "EXTENDED" ): this_is_ifu_extended = True if model.meta.exposure.type == "MIR_MRS": # always treat as extended for MIRI MRS this_is_ifu_extended = True # Use "bkgdtarg is False" so we don't also get None cases # for simulated data that didn't bother populating this # keyword. if model.meta.observation.bkgdtarg is False or this_is_ifu_extended: log.debug("Copying BACKGROUND column to SURF_BRIGHT") copy_background_to_surf_bright(model) master_background = combine_1d_spectra( background_data, exptime_key="exposure_time", ) # If requested, apply a moving-median boxcar filter to the # master background spectrum. # Round down even kernel sizes because only odd kernel sizes are supported. if self.median_kernel % 2 == 0: self.median_kernel -= 1 log.info( "Even median filter kernels are not supported." f" Rounding the median kernel size down to {self.median_kernel}." ) if self.median_kernel > 1: log.info(f"Applying moving-median boxcar of width {self.median_kernel}.") master_background.spec[0].spec_table["surf_bright"] = medfilt( master_background.spec[0].spec_table["surf_bright"], kernel_size=[self.median_kernel], ) master_background.spec[0].spec_table["flux"] = medfilt( master_background.spec[0].spec_table["flux"], kernel_size=[self.median_kernel], ) background_data.close() background_2d_collection = ModelContainer() for model in result: background_2d = expand_to_2d(model, master_background) subtract_2d_background(model, background_2d) background_2d_collection.append(background_2d) # Save the computed background if requested by user if self.save_background: self.save_model(master_background, suffix="masterbg1d", force=True, asn_id=asn_id) self.save_container( background_2d_collection, suffix="masterbg2d", force=True, asn_id=asn_id ) record_step_status(result, "master_background", success=True) # Clean up intermediate background models background_2d_collection.close() return result
def _do_sub(self, input_model): """ Decide if subtraction is to be done. Encapsulates logic that checks if background step has already been run on the data, or if the user has selected to force_subtract regardless. Returns ------- do_sub : bool If ``True``, do the subtraction """ # If force_subtract is set, always return True if self.force_subtract: return True # Otherwise, check if the input data is a model container. do_sub = True if isinstance(input_model, ModelContainer): # Loop over the container and see if the background was # subtracted in calspec2. If all data was background subtracted, # skip master bkg subtraction. # If there is a mixture of some being background subtracted, don't # subtract and print warning message isub = 0 for indata in input_model: if ( indata.meta.cal_step.bkg_subtract == "COMPLETE" or indata.meta.cal_step.master_background == "COMPLETE" ): do_sub = False isub += 1 if not do_sub and isub == len(input_model): log.info("Not subtracting master background, background was subtracted in calspec2") log.info( "To force the master background to be subtracted from this data, " "run again and set force_subtract = True." ) if not do_sub and isub != len(input_model): log.warning("Not subtracting master background.") log.warning( "Input data contains a mixture of data with and without " "background subtraction done in calspec2." ) log.warning( "To force the master background to be subtracted from this data, " "run again and set force_subtract = True." ) else: # input data is a single file if ( input_model.meta.cal_step.bkg_subtract == "COMPLETE" or input_model.meta.cal_step.master_background == "COMPLETE" ): do_sub = False log.info("Not subtracting master background, background was subtracted in calspec2") log.info( "To force the master background to be subtracted from this data, " "run again and set force_subtract = True." ) return do_sub
[docs] def save_container(self, container, suffix="", asn_id="", force=True): """Save all models in container for intermediate background subtraction.""" for i, model in enumerate(container): self.save_model(model, suffix=suffix, force=force, asn_id=asn_id, idx=i)
def copy_background_to_surf_bright(spectrum): """Copy the background column to the surf_bright column in a MultiSpecModel in-place.""" for spec in spectrum.spec: spec.spec_table["SURF_BRIGHT"][:] = spec.spec_table["BACKGROUND"].copy() spec.spec_table["SB_ERROR"][:] = spec.spec_table["BKGD_ERROR"].copy() # Zero out the background column for safety spec.spec_table["BACKGROUND"][:] = 0 # Set BERROR to dummy val of 0.0, as in extract_1d currently spec.spec_table["BKGD_ERROR"][:] = 0.0 def split_container(container): """ Divide a ModelContainer with science and background into one of each. Parameters ---------- container : ModelContainer Input model container Returns ------- science : ModelContainer Container for science data. background : ModelContainer Container for background data. """ background = ModelContainer() science = ModelContainer() for ind_science in container.ind_asn_type("science"): science.append(container[ind_science]) for ind_bkgd in container.ind_asn_type("background"): background.append(container[ind_bkgd]) # Pass along the association table to the output science container science.asn_pool_name = container.asn_pool_name science.asn_table_name = container.asn_table_name merge_tree(science.asn_table, container.asn_table) # Prune the background members from the table for p in science.asn_table["products"]: p["members"] = [m for m in p["members"] if m["exptype"].lower() != "background"] return science, background def subtract_2d_background(source, background): """ Subtract a 2D background. Parameters ---------- source : `~stdatamodels.jwst.datamodels.JwstDataModel` or \ `~jwst.datamodels.container.ModelContainer` The input science data, updated in place. background : `~stdatamodels.jwst.datamodels.JwstDataModel` The input background data. Must be the same datamodel type as `source`. For a `~jwst.datamodels.container.ModelContainer`, the source and background models in the input containers must match one-to-one. Returns ------- source : `~stdatamodels.jwst.datamodels.JwstDataModel` Input data with background subtracted. """ def _subtract_2d_background(model, background): # Handle individual NIRSpec FS, NIRSpec MOS if isinstance(model, datamodels.MultiSlitModel): for slit, slitbg in zip(model.slits, background.slits, strict=False): slit.data -= slitbg.data slit.dq |= slitbg.dq # Handle MIRI LRS, MIRI MRS and NIRSpec IFU else: model.data -= background.data model.dq |= background.dq if isinstance(source, ModelContainer): # Handle containers of many datamodels for model, bg in zip(source, background, strict=False): _subtract_2d_background(model, bg) else: # Handle single datamodels _subtract_2d_background(source, background) return source