Source code for species.analysis.fit_spectrum

"""
Module with functionalities for fitting a calibration spectrum.
"""

import math

from multiprocessing import Pool, cpu_count

import emcee
import numpy as np

from species.analysis import photometry
from species.data import database
from species.read import read_object, read_calibration


[docs]def lnprob(param, bounds, modelpar, objphot, specphot, bands): """ Internal function for the posterior probability. Parameters ---------- param : numpy.ndarray Values of the main scaling parameter and optionally additional band-dependent scaling parameters. bounds : dict Boundaries of the main scaling parameter. modelpar : list(str, ) Parameter names. objphot : list(tuple(float, float), ) Photometry of the object. specphot : list(float, ) Synthetic photometry of the calibration spectrum for the same filters as the photometry of the object. bands : bool Use band-dependent scaling parameters in addition to the main scaling parameter which is used for the full spectrum. Returns ------- float Log posterior probability. """ for i, item in enumerate(modelpar): if bounds[item][0] <= param[i] <= bounds[item][1]: ln_prior = 0. else: ln_prior = -np.inf break if math.isinf(ln_prior): ln_prob = -np.inf else: chisq = 0. for i, _ in enumerate(objphot): if bands: chisq += (objphot[i][0] - param[0]*param[i+1]*specphot[i])**2 / objphot[i][1]**2 else: chisq += (objphot[i][0] - param[0]*specphot[i])**2 / objphot[i][1]**2 ln_prob = ln_prior - 0.5*chisq return ln_prob
[docs]class FitSpectrum: """ Class for fitting a calibration spectrum to photometric data. """ def __init__(self, object_name, filters, spectrum, bounds): """ Parameters ---------- object_name : str Object name in the database. filters : tuple(str, ) Filter IDs for which the photometry is selected. All available photometry of the object is selected if set to None. spectrum : str Calibration spectrum. bounds : dict Boundaries of the scaling parameter, as {'scaling':(min, max)}. Returns ------- None """ self.object = read_object.ReadObject(object_name) self.spectrum = spectrum self.bounds = bounds self.objphot = [] self.specphot = [] if filters is None: species_db = database.Database() objectbox = species_db.get_object(object_name, filters=None, inc_phot=True, inc_spec=False) filters = objectbox.filters for item in filters: readcalib = read_calibration.ReadCalibration(self.spectrum, item) calibspec = readcalib.get_spectrum() synphot = photometry.SyntheticPhotometry(item) spec_phot = synphot.spectrum_to_flux(calibspec.wavelength, calibspec.flux) self.specphot.append(spec_phot[0]) obj_phot = self.object.get_photometry(item) self.objphot.append((obj_phot[2], obj_phot[3])) self.modelpar = ['scaling']
[docs] def run_mcmc(self, nwalkers, nsteps, guess, tag, bands=False): """ Function to run the MCMC sampler. Parameters ---------- nwalkers : int Number of walkers. nsteps : int Number of steps per walker. guess : dict Guess of the scaling parameter. tag : str Database tag where the MCMC samples are stored. bands : bool Use band-dependent scaling parameters in addition to the main scaling parameter which is used for the full spectrum. Returns ------- None """ print('Running MCMC...') if bands: ndim = 1 + len(self.objphot) else: ndim = 1 initial = np.zeros((nwalkers, ndim)) initial[:, 0] = guess['scaling'] + np.random.normal(0, 1e-1*guess['scaling'], nwalkers) if ndim > 1: for i in range(1, ndim): initial[:, i] = 1. + np.random.normal(0, 0.1, nwalkers) self.modelpar.append('scaling'+str(i)) self.bounds['scaling'+str(i)] = (0., 1e2) with Pool(processes=cpu_count()): ens_sampler = emcee.EnsembleSampler(nwalkers, ndim, lnprob, args=([self.bounds, self.modelpar, self.objphot, self.specphot, bands])) ens_sampler.run_mcmc(initial, nsteps, progress=True) species_db = database.Database() species_db.add_samples(sampler='emcee', samples=ens_sampler.chain, ln_prob=ens_sampler.lnprobability, mean_accept=np.mean(ens_sampler.acceptance_fraction), spectrum=('calibration', self.spectrum), tag=tag, modelpar=self.modelpar, distance=None, spec_labels=None)