Source code for spectacle.modeling.models

import operator
from functools import wraps
import logging

import astropy.units as u
import numpy as np
from scipy.stats import chisquare
from astropy.convolution import Kernel1D
from astropy.modeling import Fittable1DModel, FittableModel, Parameter
from astropy.modeling.models import Const1D, RedshiftScaleFactor
from astropy.modeling.fitting import LevMarLSQFitter
from astropy.table import QTable
from collections import OrderedDict

from .converters import FluxConvert, FluxDecrementConvert
from .lsfs import COSLSFModel, GaussianLSFModel, LSFModel
from .profiles import OpticalDepth1D
from ..utils.misc import DOPPLER_CONVERT
from ..analysis import delta_v_90, equivalent_width, full_width_half_max
from ..analysis.region_finder import find_regions

__all__ = ['Spectral1D']


[docs]class Spectral1D(Fittable1DModel): """ Base representation of a compound model containing a variable number of :class:`~spectacle.modeling.profiles.OpticalDepth1D` line model features. Parameters ---------- lines : str, :class:`~OpticalDepth1D`, list The line information used to compose the spectral model. This can be either a string, in which case the line information is retrieve from the ion registry; an instance of :class:`~OpticalDepth1D`; or a list of either of the previous two types. continuum : :class:`~Fittable1DModel`, optional An astropy model representing the continuum of the spectrum. If not provided, a :class:`~Const1D` model is used. z : float, optional The redshift applied to the spectral model. Default = 0. lsf : :class:`~spectacle.modeling.lsfs.LSFModel`, :class:`~astropy.convolution.Kernel1D`, str, optional The line spread function applied to the spectral model. It can be a pre-defined kernel model, or a convolution kernel, or a string referencing the built-in Hubble COS lsf, or a Gaussian lsf. Optional keyword arguments can be passed through. """ inputs = ('x',) outputs = ('y',) @property def input_units_allow_dimensionless(self): return {'x': False} @property def input_units(self): # Apply input unit restriction only when the model is not being fitted, # otherwise, allow the fitter's parameter-less usage of the model. if any([x.unit is not None for x in [getattr(self, y) for y in self.param_names]]): return {'x': u.Unit('Angstrom')} else: return {'x': None} @property def input_units_equivalencies(self): rest_wavelength = self.rest_wavelength if len(self.lines) > 0 and (self.is_single_ion or self.rest_wavelength.value == 0): rest_wavelength = self.lines[0].lambda_0.quantity disp_equiv = u.spectral() + DOPPLER_CONVERT[ self._velocity_convention](rest_wavelength) return {'x': disp_equiv} def __new__(cls, lines=None, continuum=None, z=None, lsf=None, output=None, velocity_convention=None, rest_wavelength=None, copy=False, input_redshift=None, **kwargs): # If the cls already contains parameter attributes, assume that this is # being called as part of a copy operation and return the class as-is. if (lines is None and continuum is None and z is None and output is None and velocity_convention is None and rest_wavelength is None): return super().__new__(cls) output = output or 'optical_depth' velocity_convention = velocity_convention or 'relativistic' rest_wavelength = rest_wavelength or u.Quantity(0, 'Angstrom') z = z or 0 input_redshift = input_redshift or 0 # If no continuum is provided, or the continuum provided is not a # model, use a constant model to represent the continuum. if continuum is not None: if not issubclass(type(continuum), FittableModel): if isinstance(continuum, (float, int)): continuum = Const1D(amplitude=continuum, fixed={'amplitude': True}) else: raise ValueError("Continuum must be a number or `FittableModel`.") else: # If the continuum model is an astropy model, ensure that it # can handle inputs with units or wrap otherwise. if not continuum._supports_unit_fitting: continuum = _wrap_unitless_model(continuum) else: continuum = Const1D(amplitude=0) if output not in ('flux', 'flux_decrement', 'optical_depth'): raise ValueError("Parameter 'output' must be one of 'flux', " "'flux_decrement', 'optical_depth'.") # Parse the lines argument which can be a list, a quantity, or a string _lines = [] if isinstance(lines, Fittable1DModel): _lines.append(lines) elif isinstance(lines, str): _lines.append(OpticalDepth1D(name=lines)) elif isinstance(lines, list): for line in lines: if isinstance(line, str): _lines.append(OpticalDepth1D(name=line)) elif isinstance(line, u.Quantity): _lines.append(OpticalDepth1D(lambda_0=line)) elif isinstance(line, Fittable1DModel): _lines.append(line) # Parse the lsf information, if provided if lsf is not None and not isinstance(lsf, LSFModel): if isinstance(lsf, Kernel1D): lsf = LSFModel(kernel=lsf) elif isinstance(lsf, str): if lsf == 'cos': lsf = COSLSFModel() elif lsf == 'gaussian': lsf = GaussianLSFModel(kwargs.pop('stddev', 1)) else: raise ValueError("Kernel must be of type 'LSFModel', or " "'Kernel1D'; or a string with value 'cos' " "or 'gaussian'.") # Compose the line-based compound model taking into consideration # the redshift, continuum, and dispersion conversions. rs = RedshiftScaleFactor(z, fixed={'z': True}, name="redshift").inverse irs = RedshiftScaleFactor(input_redshift, fixed={'z': True}, name="input_redshift") if lines is not None and len(_lines) > 0: ln = np.sum(_lines) if output == 'flux_decrement': compound_model = rs | ((ln | FluxDecrementConvert()) + continuum) elif output == 'flux': compound_model = rs | ((ln | FluxConvert()) + continuum) else: compound_model = rs | (ln + continuum) else: compound_model = rs | continuum # Check for any lsf kernels that have been added if lsf is not None: compound_model |= lsf # Model parameter members are setup in the model's compound meta class. # After we've attached the parameters to this fittable model, call the # __new__ and __init__ meta methods again to ensure proper creation. members = {} members.update(cls.__dict__) # Delete all previous parameter definitions living on the class for k, v in members.items(): if isinstance(v, Parameter): delattr(cls, k) # Create a dictionary to pass as the parameter unit definitions to the # new class. This ensures fitters know this model supports units. data_units = OrderedDict() # Attach all of the compound model parameters to this model for param_name in compound_model.param_names: param = getattr(compound_model, param_name) members[param_name] = param.copy(fixed=param.fixed) data_units[param_name] = param.unit members['_parameter_units_for_data_units'] = lambda *pufdu: data_units new_cls = type('Spectral{}'.format(compound_model.__name__), (cls, ), members) # Ensure that the class is recorded in the global scope so that the # serialization done can access the stored class definitions. new_cls.__module__ = '__main__' # globals()[new_cls.__name__] = new_cls globals()[compound_model.__name__] = compound_model.__class__ instance = super().__new__(new_cls) # Define the instance-level parameters setattr(instance, '_continuum', continuum) setattr(instance, '_compound_model', compound_model) setattr(instance, '_velocity_convention', velocity_convention) setattr(instance, '_rest_wavelength', rest_wavelength) setattr(instance, '_output', output) return instance def __init__(self, *args, **kwargs): super().__init__()
[docs] def __call__(self, x, *args, **kwargs): """ Override the default call function to handle fixed parameters based on the unit of the provided dispersion. Notes ----- The initial call to this method from fitter will provide unit information on the dispersion. Subsequent calls do not. It is assumed that subsequent calls provide dispersion as km/s. """ def _fix_deltas(model, fix_delta_v, fix_delta_lambda): for k in model.fixed: if 'delta_v' in k: getattr(model, k).fixed = fix_delta_v elif 'delta_lambda' in k: getattr(model, k).fixed = fix_delta_lambda if isinstance(x, u.Quantity): if x.unit.physical_type in ('length', 'frequency'): _fix_deltas(self, True, False) _fix_deltas(self._compound_model, True, False) elif x.unit.physical_type == 'speed': _fix_deltas(self, False, True) _fix_deltas(self._compound_model, False, True) return super().__call__(x, *args, **kwargs)
[docs] def evaluate(self, x, *args, **kwargs): # For the input dispersion to be unit-ful, especially when fitting x = u.Quantity(x, 'Angstrom') # For the parameters to be unit-ful especially when used in fitting. # TODO: fix arguments being passed with extra dimension. args = [u.Quantity(val[0], unit) if unit is not None else val[0] for val, unit in zip(args, self._parameter_units_for_data_units().values())] # Create a new compound class given the potentially different parameter # units so as to preserve unit-ful behavior self._compound_model = self._compound_model.__class__(*args, **kwargs) return self._compound_model(x)
[docs] def rejection_criteria(self, x, y, auto_fit=True): """ Implementation of the Akaike Information Criteria with Correction (AICC) (Akaike 1974; Liddle 2007; King et al. 2011). Used to determine whether lines can be safely removed from the compound model without loss of information. Parameters ---------- x : :class:`~astropy.units.Quantity` The dispersion data. y : array-like The expected flux or tau data. auto_fit : bool Whether the model fit should be re-evaluated for every removed line. Returns ------- final_model : :class:`~spectacle.Spectral1D` The new spectral model with the least complexity. """ base_aicc, chi2, cmplx = self._aicc(x, y, self) final_model = self finished = False while not finished: for i in range(len(final_model.lines)): lines = [x for x in final_model.lines] removed_line = lines.pop(i) if len(lines) == 0: finished = True break new_spec = self._copy(lines=lines) if auto_fit: fitter = LevMarLSQFitter() new_spec = fitter(new_spec, x, y) aicc, chi2, cmplx = self._aicc(x, y, new_spec) if base_aicc - aicc > 5: final_model = new_spec base_aicc = aicc break else: finished = True return final_model, base_aicc, chi2, cmplx
def _aicc(self, x, y, model): chi2, pval = chisquare(f_obs=model(x)**2, f_exp=y**2) p = len([k for x in model.lines for k, v in x.fixed.items() if not v]) n = x.size cmplx = (2 * p * n) / (n - p - 1) tot = chi2 + cmplx return tot, chi2, cmplx @property def continuum(self): return self._continuum @property def velocity_convention(self): return self._velocity_convention @property def rest_wavelength(self): return self._rest_wavelength @rest_wavelength.setter def rest_wavelength(self, value): self._rest_wavelength = value @property def redshift(self): """ The redshift at which the data given to. Returns ------- : float The redshift value. """ return next((x for x in self._compound_model if isinstance(x, RedshiftScaleFactor) and x.name == 'redshift')).inverse.z.value
[docs] def with_redshift(self, value): """Generate a new spectral model with the given redshift.""" return self._copy(z=value)
def _input_redshift(self): """The defined redshift at which dispersion values are provided.""" return next((x for x in self._compound_model if isinstance(x, RedshiftScaleFactor) and x.name == 'input_redshift')).z.value @property def lines(self): """ The collection of profiles representing the absorption or emission lines in the spectrum model. Returns ------- : list A list of :class:`~spectacle.modeling.profiles.Voigt1D` models. """ return [x for x in self._compound_model if isinstance(x, OpticalDepth1D)] @property def is_single_ion(self): """ Whether this spectrum represents a collection of single ions or a collection of multiple different ions. Returns ------- : bool Is the spectrum composed of a collection of single ions. """ return len(set([x.name for x in self.lines])) == 1 @property def lsf_kernel(self): return next((x for x in self._compound_model if isinstance(x, LSFModel)), None) @property def output_type(self): """ The data output of this spectral model. It could one of 'flux', 'flux_decrement', or 'optical_depth'. Returns ------- : str The output type of the model. """ return self._output def _copy(self, **kwargs): """ Copy the spectral model, optionally overriding any previous values. Parameters ---------- kwargs : dict A dictionary holding the values desired to be overwritten. Returns ------- : :class:`~spectacle.modeling.models.Spectral1D` The new spectral model. """ new_kwargs = dict( lines=[line.copy() for line in self.lines], continuum=self.continuum, z=self.redshift, output=self.output_type, lsf=self.lsf_kernel, velocity_convention=self.velocity_convention, rest_wavelength=self.rest_wavelength, # input_redshift=self._input_redshift() ) new_kwargs.update(kwargs) return Spectral1D(**new_kwargs, copy=True) @property def as_flux(self): """New spectral model that produces flux output.""" return self._copy(output='flux') @property def as_flux_decrement(self): """New spectral model that produces flux decrement output.""" return self._copy(output='flux_decrement') @property def as_optical_depth(self): """New spectral model that produces optical depth output.""" return self._copy(output='optical_depth')
[docs] def with_continuum(self, continuum): """New spectral model defined with a different continuum.""" return self._copy(continuum=continuum)
[docs] def with_lsf(self, kernel=None, **kwargs): """New spectral model with a line spread function.""" return self._copy(lsf=kernel, **kwargs)
[docs] def with_line(self, *args, reset=False, **kwargs): """ Add a new line to the spectral model. Returns ------- : :class:`~spectacle.modeling.models.Spectral1D` The new spectral model. """ args = list(args) if len(args) > 0: if isinstance(args[0], OpticalDepth1D): new_line = args[0] elif isinstance(args[0], str): name = args.pop(0) new_line = OpticalDepth1D(name=name, *args, **kwargs) elif isinstance(args[0], u.Quantity): lambda_0 = args.pop(0) new_line = OpticalDepth1D(lambda_0=lambda_0, *args, **kwargs) else: new_line = OpticalDepth1D(*args, **kwargs) return self._copy( lines=self.lines + [new_line] if not reset else [new_line])
[docs] def with_lines(self, lines, reset=False): """ Create a new spectral model with the added lines. Parameters ---------- lines : list List of :class:`~spectacle.modeling.profiles.OpticalDepth1D` line objects. Returns ------- : :class:`~spectacle.modeling.models.Spectral1D` The new spectral model. """ if not all([isinstance(x, OpticalDepth1D) for x in lines]): raise ValueError("All lines must be `OpticalDepth1D` objects.") return self._copy(lines=self.lines + lines if not reset else lines)
[docs] @u.quantity_input(x=['length', 'speed', 'frequency']) def line_stats(self, x): """ Calculate statistics over individual line profiles. Parameters ---------- x : :class:`~u.Quantity` The input dispersion in either wavelength/frequency or velocity space. Returns ------- tab : :class:`~astropy.table.QTable` A table detailing the calculated statistics. """ tab = QTable(names=['name', 'wave', 'col_dens', 'v_dop', 'delta_v', 'delta_lambda', 'ew', 'dv90', 'fwhm'], dtype=('S10', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8')) tab['wave'].unit = u.AA tab['v_dop'].unit = u.km / u.s tab['ew'].unit = u.AA tab['dv90'].unit = u.km / u.s tab['fwhm'].unit = u.AA tab['delta_v'].unit = u.km / u.s tab['delta_lambda'].unit = u.AA for line in self.lines: disp_equiv = u.spectral() + DOPPLER_CONVERT[ self.velocity_convention](line.lambda_0.quantity) with u.set_enabled_equivalencies(disp_equiv): vel = x.to('km/s') wav = x.to('Angstrom') # Generate the spectrum1d object for this line profile ew = equivalent_width(wav, line(wav)) dv90 = delta_v_90(vel, line(wav)) fwhm = full_width_half_max(wav, line(wav)) tab.add_row([line.name, line.lambda_0, line.column_density, line.v_doppler, line.delta_v, line.delta_lambda, ew, dv90, fwhm]) return tab
[docs] @u.quantity_input(x=['length', 'speed', 'frequency'], rest_wavelength='length') def region_stats(self, x, rest_wavelength, rel_tol=1e-2, abs_tol=1e-5): """ Calculate statistics over arbitrary line regions given some tolerance from the continuum. Parameters ---------- x : :class:`~u.Quantity` The input dispersion in either wavelength/frequency or velocity space. rest_wavelength : :class:`~u.Quantity` The rest frame wavelength used in conversions between wavelength/ frequency and velocity space. rel_tol : float The relative tolerance parameter. abs_tol : float The absolute tolerance parameter. Returns ------- tab : :class:`~astropy.table.QTable` A table detailing the calculated statistics. """ y = self(x) if self.output_type == 'flux': y = self.continuum(x) - y else: y -= self.continuum(x) # Calculate the regions in the raw data # absolute(a - b) <= (atol + rtol * absolute(b)) regions = {(reg[0], reg[1]): [] for reg in find_regions(y, rel_tol=rel_tol, abs_tol=abs_tol)} tab = QTable(names=['region_start', 'region_end', 'rest_wavelength', 'ew', 'dv90', 'fwhm'], dtype=('f8', 'f8', 'f8', 'f8', 'f8', 'f8')) tab['region_start'].unit = x.unit tab['region_end'].unit = x.unit tab['rest_wavelength'].unit = u.AA tab['ew'].unit = u.AA tab['dv90'].unit = u.km / u.s tab['fwhm'].unit = u.AA for mn_bnd, mx_bnd in regions: mask = (x > x[mn_bnd]) & (x < x[mx_bnd]) x_reg = x[mask] y_reg = y[mask] disp_equiv = u.spectral() + DOPPLER_CONVERT[ self.velocity_convention](rest_wavelength) with u.set_enabled_equivalencies(disp_equiv): vel = x_reg.to('km/s') wav = x_reg.to('Angstrom') # Generate the spectrum1d object for this line profile ew = equivalent_width(wav, y_reg) dv90 = delta_v_90(vel, y_reg) fwhm = full_width_half_max(wav, y_reg) tab.add_row([x[mn_bnd], x[mx_bnd], rest_wavelength, ew, dv90, fwhm]) return tab
def _wrap_unitless_model(model): """ Wraps a model that does not support inputs with units, decorating its evaluate method to strip any input units. Parameters ---------- model : :class:`astropy.modeling.models.Fittable1DModel` The model whose evaluate method will be wrapped. Returns ------- model : :class:`astropy.modeling.models.Fittable1DModel` The model instance whose evaluate method has been wrapped. """ def decorator(func): @wraps(func) def wrapper(x, *args, **kwargs): if isinstance(x, u.Quantity): x = x.value return func(x, *args, **kwargs) return wrapper setattr(model, 'evaluate', decorator(model.evaluate)) return model # Model arithmetic operators OPERATORS = {'+': operator.add, '-': operator.sub, '*': operator.mul, '/': operator.truediv, '**': operator.pow, '&': operator.and_, '|': operator.or_} def _strip_units(compound_model, x=None): """ Remove the units of a given compound model. Parameters ---------- compound_model : :class:`~Fittable1D` Compound model for which the units will be removed. x : :class:`~astropy.units.Quantity`, optional The dispersion array that will be passed into the compound model. If provided, relevant parameters of the compound model will be converted to the unit. Returns ------- : :class:`~Fittable1D` Compound model without units. """ leaf_idx = -1 compound_model = compound_model.copy() if x is not None: compound_model._input_units = {'x': x.unit} parameter_units = {pn: getattr(sm, pn).unit for sm in compound_model for pn in sm.param_names} def getter(idx, model): # By indexing on self[] this will return an instance of the # model, with all the appropriate parameters set sub_mod = compound_model[idx] for pn in sub_mod.param_names: param = getattr(sub_mod, pn) if param.unit is not None: # if x is not None and isinstance(x, u.Quantity): # print("Value", sub_mod.lambda_0.quantity) # with u.set_enabled_equivalencies( # u.spectral() + u.doppler_relativistic(lambda_0)): # quant = param.quantity.to(x.unit) # else: quant = param.quantity param.value = quant.value param._set_unit(None, force=True) return sub_mod unitless_model = compound_model._tree.evaluate(OPERATORS, getter=getter).__class__ unitless_model._parameter_units = parameter_units # Set custom call to return a Spectrum1D object # _set_custom_call(unitless_model) return unitless_model, parameter_units def _apply_units(compound_model, parameter_units): """ Applies a set of units to a compound model. Parameters ---------- compound_model : :class:`~Fittable1D` Unitless compound model. parameter_units : dict Dictionary containing a mapping between parameter names in the sub models of the compound model and the units to be associated with them. Returns ------- : :class:`~Fittable1D` Compound model with units. """ leaf_idx = -1 def getter(idx, model): # By indexing on self[] this will return an instance of the # model, with all the appropriate parameters set sub_mod = compound_model[idx] for pn in sub_mod.param_names: param = getattr(sub_mod, pn) unit = parameter_units.get(pn) param._set_unit(unit, force=True) return sub_mod unitful_model = compound_model._tree.evaluate(OPERATORS, getter=getter).__class__ # Set custom call to return a Spectrum1D object # _set_custom_call(unitful_model) return unitful_model