Source code for spectacle.fitting.line_finder

import logging

import astropy.units as u
import numpy as np
from astropy.constants import c, m_e, e
from astropy.modeling import Fittable2DModel, Parameter
from astropy.modeling.fitting import LevMarLSQFitter
from astropy.modeling.models import RedshiftScaleFactor

from .curve_fitter import CurveFitter
from ..modeling import OpticalDepth1D, Spectral1D
from ..utils.detection import region_bounds
from ..utils.misc import DOPPLER_CONVERT
from ..registries import line_registry

TAU_FACTOR = (np.sqrt(np.pi) * e.esu ** 2 /
              (m_e.cgs * c.cgs)).cgs


[docs]class LineFinder1D(Fittable2DModel): inputs = ('x', 'y') outputs = ('y',) @property def input_units_allow_dimensionless(self): return {'x': False, 'y': True} threshold = Parameter(default=0, fixed=True) min_distance = Parameter(default=10.0, min=1, fixed=True) def __init__(self, ions=None, continuum=None, defaults=None, z=None, auto_fit=True, velocity_convention='relativistic', output='flux', fitter=None, with_rejection=False, fitter_args=None, *args, **kwargs): super().__init__(*args, **kwargs) self._ions = ions or [] self._continuum = continuum self._defaults = defaults or {} self._redshift_model = RedshiftScaleFactor(z=z or 0) self._model_result = None self._auto_fit = auto_fit self._output = output self._velocity_convention = velocity_convention self._fitter_args = fitter_args or {} self._fitter = fitter or CurveFitter() self._with_rejection = with_rejection @property def model_result(self): return self._model_result @property def fitter(self): return self._fitter
[docs] def __call__(self, x, *args, auto_fit=None, **kwargs): if auto_fit is not None: self._auto_fit = auto_fit if x.unit.physical_type == 'speed' and len(self._ions) != 1: raise ReferenceError("The line finder will not be able to parse " "ion information in velocity space without " "being given explicit ion reference in the " "defaults dictionary.") super().__call__(x, *args, **kwargs) return self._model_result
[docs] def evaluate(self, x, y, threshold, min_distance, *args, **kwargs): spec_mod = Spectral1D(continuum=self._continuum, output=self._output) if x.unit.physical_type in ('length', 'frequency'): x = self._redshift_model.inverse(x) # Generate the subset of the table for the ions chosen by the user sub_registry = line_registry if len(self._ions) > 0: # In this case, the user has provided a list of ions for their # spectrum. Create a subset of the line registry so that only # these ions will be searched when attempting to identify. sub_registry = line_registry.subset(self._ions) # Convert the min_distance from dispersion units to data elements. # Assumes uniform spacing. # min_ind = (np.abs(x.value - (x[0].value + min_distance))).argmin() # Find peaks regions = region_bounds(x, y, threshold=threshold, min_distance=min_distance) lines = [] for centroid, (mn_bnd, mx_bnd), is_absorption, buried in regions.values(): mn_bnd, mx_bnd = mn_bnd * x.unit, mx_bnd * x.unit sub_x, vel_mn_bnd, vel_mx_bnd = None, None, None line_kwargs = {} # For the case where the user has provided a list of ions with a # dispersion in wavelength or frequency, convert each ion to # velocity space individually to avoid making assumptions of their # kinematics. if x.unit.physical_type in ('length', 'frequency'): line = sub_registry.with_lambda(centroid) disp_equiv = u.spectral() + DOPPLER_CONVERT[ self._velocity_convention](line['wave']) with u.set_enabled_equivalencies(disp_equiv): sub_x = u.Quantity(x, 'km/s') vel_mn_bnd, vel_mx_bnd, vel_centroid = mn_bnd.to('km/s'), \ mx_bnd.to('km/s'), \ centroid.to('km/s') else: line = sub_registry.with_name(self._ions[0]) line_kwargs.update({ 'name': line['name'], 'lambda_0': line['wave'], 'gamma': line['gamma'], 'f_value': line['osc_str']}) # Estimate the doppler b and column densities for this line. # For the parameter estimator to be accurate, the spectrum must be # continuum subtracted. v_dop, col_dens, nmn_bnd, nmx_bnd = parameter_estimator( centroid=centroid, bounds=(vel_mn_bnd or mn_bnd, vel_mx_bnd or mx_bnd), x=sub_x or x, y=spec_mod.continuum(sub_x or x) - y if is_absorption else y, ion_info=line_kwargs, buried=buried) if np.isinf(col_dens) or np.isnan(col_dens): continue estimate_kwargs = { 'v_doppler': v_dop, 'column_density': col_dens, 'fixed': {}, 'bounds': {}, } # Depending on the dispersion unit information, decide whether # the fitter should consider delta values in velocity or # wavelength/frequency space. if x.unit.physical_type in ('length', 'frequency'): estimate_kwargs['delta_lambda'] = centroid - line['wave'] estimate_kwargs['fixed'].update({'delta_v': True}) estimate_kwargs['bounds'].update({ 'delta_lambda': (mn_bnd.value - line['wave'].value, mx_bnd.value - line['wave'].value)}) else: # In velocity space, the centroid *should* be zero for any # line given that the rest wavelength is taken as its lamba_0 # in conversions. Thus, the given centroid is akin to the # velocity offset. estimate_kwargs['delta_v'] = centroid estimate_kwargs['fixed'].update({'delta_lambda': True}) estimate_kwargs['bounds'].update({ 'delta_v': (mn_bnd.value, mx_bnd.value)}) line_kwargs.update(estimate_kwargs) line_kwargs.update(self._defaults.copy()) line = OpticalDepth1D(**line_kwargs) lines.append(line) logging.debug("Found %s possible lines (theshold=%s, min_distance=%s).", len(lines), threshold, min_distance) if len(lines) == 0: return np.zeros(x.shape) spec_mod = Spectral1D(lines, continuum=self._continuum, output=self._output, z=self._redshift_model.z.value) if self._auto_fit: if issubclass(self._fitter.__class__, LevMarLSQFitter): if 'maxiter' not in self._fitter_args: self._fitter_args['maxiter'] = 1000 fit_spec_mod = self._fitter(spec_mod, self._redshift_model(x), y, **self._fitter_args) else: fit_spec_mod = spec_mod # The parameter values on the underlying compound model must also be # updated given the new fitted parameters on the Spectral1D instance. # FIXME: when fitting without using line finder, these values will not # be updated in the compound model. for pn in fit_spec_mod.param_names: pv = getattr(fit_spec_mod, pn) setattr(fit_spec_mod._compound_model, pn, pv) fit_spec_mod.line_regions = regions self._model_result = fit_spec_mod return fit_spec_mod(x)
def parameter_estimator(centroid, bounds, x, y, ion_info, buried=False): # bound_low, bound_up = bounds # mid_diff = (bound_up - bound_low) # new_bound_low, new_bound_up = (bound_low - mid_diff), (bound_up + mid_diff) new_bound_low, new_bound_up = bounds mask = ((x >= new_bound_low) & (x <= new_bound_up)) mx, my = x[mask], y[mask] # Width can be estimated by the weighted 2nd moment of the x coordinate dx = mx - np.mean(mx) fwhm = 2 * np.sqrt(np.sum((dx * dx) * my) / np.sum(my)) sigma = fwhm / 2.355 # Amplitude is derived from area sum_y = np.trapz(my, x=mx) height = sum_y / (sigma * np.sqrt(2 * np.pi)) # Estimate the doppler b parameter v_dop = (np.sqrt(2) * sigma).to('km/s') # Estimate the column density col_dens = (height * v_dop / (TAU_FACTOR * ion_info['lambda_0'] * ion_info['f_value'])).to('1/cm2') # col_dens = (sum_y / (TAU_FACTOR * ion_info['lambda_0'] * ion_info['f_value'])).to('1/cm2') ln_col_dens = np.log10(col_dens.value) # if buried: # ln_col_dens -= 0.1 logging.info("""Estimated initial values: Ion: {} Centroid: {:g} ({:g}) Column density: {:g}, ({:g}) Doppler width: {:g}""".format(ion_info['name'], centroid, ion_info['lambda_0'], ln_col_dens, col_dens, v_dop)) return v_dop, ln_col_dens, new_bound_low, new_bound_up