Source code for spectacle.analysis.resample

import logging
import os

import numpy as np
import astropy.units as u

__all__ = ['Resample']


[docs]class Resample(object): """ Resample model which can be used with compound model objects. """ def __init__(self, new_dispersion): self._new_dispersion = new_dispersion
[docs] def __call__(self, x, y): _remat = resample(x, self._new_dispersion) return np.dot(_remat, y)
def resample(orig_lamb, fin_lamb, force=None, **kwargs): """ Abstraction function to decide whether to use uniform or non-uniform resampling functions. Parameters ---------- orig_lamb : ndarray Original dispersion grid. fin_lamb : ndarray Final dispersion grid. kwargs : dict Extra keyword arguments to pass to functions. Returns ------- resample_mat : ndarray The resampling matrix to be applied to data arrays. """ if isinstance(orig_lamb, u.Quantity): orig_lamb = orig_lamb.value if isinstance(fin_lamb, u.Quantity): fin_lamb = fin_lamb.value orig_space = orig_lamb[1:] - orig_lamb[:-1] fin_space = fin_lamb[1:] - fin_lamb[:-1] if np.allclose(orig_space, orig_space[0]) and np.allclose(fin_space, fin_space[0]): logging.info("Re-sampling: original and final grids are uniform.") mat = _uniform_matrix(orig_lamb, fin_lamb) else: logging.info("Re-sampling: original and final grids are non-uniform.") mat = _nonuniform_matrix(orig_lamb, fin_lamb, **kwargs) return mat def _uniform_matrix(orig_lamb, fin_lamb): """ Create a re-sampling matrix to be used in re-sampling spectra in a way that conserves flux. This is adapted from code created by the SEAGal Group. .. note:: This method assumes uniform grids. Parameters ---------- orig_lamb : ndarray The original dispersion array. fin_lamb : ndarray The desired dispersion array. Returns ------- resample_mat : ndarray An [[N_{fin_lamb}, M_{orig_lamb}]] matrix. """ # Get step size delta_orig = orig_lamb[1] - orig_lamb[0] delta_fin = fin_lamb[1] - fin_lamb[0] n_orig_lamb = len(orig_lamb) n_fin_lamb = len(fin_lamb) # Lower bin and upper bin edges orig_low = orig_lamb - delta_orig * 0.5 orig_upp = orig_lamb + delta_orig * 0.5 fin_low = fin_lamb - delta_fin * 0.5 fin_upp = fin_lamb + delta_fin * 0.5 # Create re-sampling matrix resamp_mat = np.zeros(shape=(n_fin_lamb, n_orig_lamb)) for i in range(n_fin_lamb): # Calculate the contribution of each original bin to the # resampled bin l_inf = np.where(orig_low > fin_low[i], orig_low, fin_low[i]) l_sup = np.where(orig_upp < fin_upp[i], orig_upp, fin_upp[i]) # Interval overlap of each original bin for current resampled # bin; negatives clipped dl = (l_sup - l_inf).clip(0) # This will only happen at the edges of lorig. # Discard resampled bin if it's not fully covered (> 99%) by the # original bin -- only happens at the edges of the original bins if 0 < dl.sum() < 0.99 * delta_fin: dl = 0 * orig_lamb resamp_mat[i, :] = dl resamp_mat /= delta_fin return resamp_mat def _nonuniform_matrix(orig_lamb, fin_lamb, extrapolate=False): """ Compute re-sampling matrix R_o2r, useful to convert a spectrum sampled at wavelengths lorig to a new grid lresamp. Here, there is no necessity to have constant gris as on :func:`_uniform_matrix`. This is adapted from code created by the SEAGal Group. .. warning:: lorig and lresam MUST be on ascending order! Parameters ---------- orig_lamb : array_like Original spectrum lambda array. fin_lamb : array_like Spectrum lambda array in which the spectrum should be sampled. extrapolate : boolean, optional Extrapolate values, i.e. values for fin_lamb < orig_lamb[0] are set to match orig_lamb[0] and values for fin_lamb > orig_lamb[-1] are set to match orig_lamb[-1]. Returns ------- resample_mat : ndarray Resample matrix. """ matrix = np.zeros(shape=(len(fin_lamb), len(orig_lamb))) # Define lambda ranges (low, upp) for original and resampled. lo_low = np.zeros(len(orig_lamb)) lo_low[1:] = (orig_lamb[1:] + orig_lamb[:-1]) / 2 lo_low[0] = orig_lamb[0] - (orig_lamb[1] - orig_lamb[0]) / 2 lo_upp = np.zeros(len(orig_lamb)) lo_upp[:-1] = lo_low[1:] lo_upp[-1] = orig_lamb[-1] + (orig_lamb[-1] - orig_lamb[-2]) / 2 lr_low = np.zeros(len(fin_lamb)) lr_low[1:] = (fin_lamb[1:] + fin_lamb[:-1]) / 2 lr_low[0] = fin_lamb[0] - (fin_lamb[1] - fin_lamb[0]) / 2 lr_upp = np.zeros(len(fin_lamb)) lr_upp[:-1] = lr_low[1:] lr_upp[-1] = fin_lamb[-1] + (fin_lamb[-1] - fin_lamb[-2]) / 2 # Iterate over resampled fin_lamb vector for i in range(len(fin_lamb)): # Find in which bins fin_lamb bin within orig_lamb bin bins_resamp = np.where((lr_low[i] < lo_upp) & (lr_upp[i] > lo_low))[0] # On these bins, eval fraction of resampled bin is within original bin. for j in bins_resamp: aux = 0 d_lr = lr_upp[i] - lr_low[i] d_lo = lo_upp[j] - lo_low[j] d_ir = lo_upp[j] - lr_low[i] # common section on the right d_il = lr_upp[i] - lo_low[j] # common section on the left # Case 1: resampling window is smaller than or equal to the # original window. This is where the bug was: if an original bin # is all inside the resampled bin, then all flux should go into it, # not then d_lr/d_lo fraction. if (lr_low[i] > lo_low[j]) & (lr_upp[i] < lo_upp[j]): aux += 1. # Case 2: resampling window is larger than the original window. if (lr_low[i] < lo_low[j]) & (lr_upp[i] > lo_upp[j]): aux += d_lo / d_lr # Case 3: resampling window is on the right of the original window. if (lr_low[i] > lo_low[j]) & (lr_upp[i] > lo_upp[j]): aux += d_ir / d_lr # Case 4: resampling window is on the left of the original window. if (lr_low[i] < lo_low[j]) & (lr_upp[i] < lo_upp[j]): aux += d_il / d_lr matrix[i, j] += aux # Fix extremes: extrapolate if needed if extrapolate: bins_extrapl = np.where((lr_low < lo_low[0]))[0] bins_extrapr = np.where((lr_upp > lo_upp[-1]))[0] if len(bins_extrapl) > 0 and len(bins_extrapr) > 0: io_extrapl = np.where((lo_low >= lr_low[bins_extrapl[0]]))[0][0] io_extrapr = np.where((lo_upp <= lr_upp[bins_extrapr[0]]))[0][-1] matrix[bins_extrapl, io_extrapl] = 1. matrix[bins_extrapr, io_extrapr] = 1. return matrix