Source code for spectacle.fitting.mcmc_fitter

import emcee
import numpy as np
import scipy.optimize as op

from astropy.modeling.fitting import LevMarLSQFitter
from astropy.modeling.fitting import (_fitter_to_model_params,
                                      _model_to_fit_params)
from astropy.table import QTable
from pathos.multiprocessing import Pool


def lnprior(theta, model):
    # Convert the array of parameter values back into model parameters
    _, fit_params_indices = _model_to_fit_params(model)
    bounds = np.array(list(model.bounds.values()))[fit_params_indices]

    if all([(bounds[i][0] or -np.inf)
                    <= theta[i] <= (bounds[i][1] or np.inf)
            for i in range(len(theta))]):
        return 0.0

    return -np.inf


def lnlike(theta, x, y, yerr, model):
    # Convert the array of parameter values back into model parameters
    _fitter_to_model_params(model, theta)

    mod_y = model(x)

    # inv_sigma2 = 1.0 / (yerr ** 2 + mod_y ** 2 * np.exp(2 * lnf))
    # res = -0.5 * (np.sum((y - mod_y) ** 2 * inv_sigma2 - np.log(inv_sigma2)))
    if np.sum(yerr) > 0:
        res = -0.5 * np.sum(((y - mod_y)/yerr) ** 2)
    else:
        res = -0.5 * np.sum((y - mod_y) ** 2)

    return res


def lnprob(theta, x, y, yerr, model):
    lp = lnprior(theta, model)

    if not np.isfinite(lp):
        return -np.inf

    ll = lnlike(theta, x, y, yerr, model)

    return lp + ll


[docs]class EmceeFitter: def __init__(self): self._uncertainties = {} @property def uncertainties(self): return self._uncertainties @property def errors(self): tab = QTable([self._uncertainties['param_names'], self._uncertainties['param_fit'], self._uncertainties['param_err'][:, 0], self._uncertainties['param_err'][:, 1], self._uncertainties['param_units']], names=('name', 'value', 'min_uncert', 'max_uncert', 'unit')) return tab
[docs] def __call__(self, model, x, y, yerr=None, nwalkers=500, steps=200, nprocs=1): model = model.copy() # If no errors are provided, assume all errors are normalized if yerr is None: yerr = np.zeros(shape=x.shape) # Retrieve the parameters that are not considered fixed or tied fit_params, fit_params_indices = _model_to_fit_params(model) # fit_params = np.append(fit_params, np.log(1)) # TODO: The following two seem to be unnecessary, however, the fits # don't work well without *both* of them. Need to rethink. fitter = LevMarLSQFitter() fit_model = fitter(model, x, y) fit_params = fit_model.parameters[fit_params_indices] # Perform a quick optimization of the parameters nll = lambda *args: -lnlike(*args) result = op.minimize(nll, fit_params, args=(x, y, yerr, model)) fit_params = result["x"] _fitter_to_model_params(model, fit_params) # Cache the number of dimensions of the problem, and walker count ndim = len(fit_params) # Initialize starting positions of walkers in a Gaussian ball pos = [fit_params * (1 + 1e-8 * np.random.randn(ndim)) for _ in range(nwalkers)] with Pool(nprocs) as pool: sampler = emcee.EnsembleSampler(nwalkers, ndim, lnprob, args=(x, y, yerr, model), pool=pool) sampler.run_mcmc(pos, steps) # for result in sampler.sample(pos, iterations=steps, storechain=False): # position = result[0] # with open("chain.dat", "a") as f: # for k in range(position.shape[0]): # f.write("{0:4d} {1:s}\n".format(k, " ".join(position[k]))) burnin = int(steps * 0.1) samples = sampler.chain[:, burnin:, :].reshape((-1, ndim)) # Compute the quantiles. res = list(map(lambda v: (v[1], v[2] - v[1], v[1] - v[0]), zip(*np.percentile(samples, [16, 50, 84], axis=0)))) theta = [x[0] for x in res] _fitter_to_model_params(model, theta) self._uncertainties['param_names'] = model.param_names self._uncertainties['param_fit'] = model.parameters errs = np.array([(0.0, 0.0) for _ in range(len(model.parameters))]) errs[fit_params_indices] = np.array([(res[i][1], res[i][2]) for i in range(len(res))]) self._uncertainties['param_err'] = errs self._uncertainties['param_units'] = [getattr(model, p).unit for p in model.param_names] return model