Source code for species.analysis.fit_planck

"""
Module with functionalities for fitting a Planck spectrum.
"""

import math

from multiprocessing import Pool, cpu_count

import emcee
import spectres
import numpy as np

from species.analysis import photometry
from species.data import database
from species.read import read_object, read_planck


[docs]def lnprior(param, bounds): """ Internal function for the prior probability. Parameters ---------- param : numpy.ndarray Parameter values. bounds : dict Parameter boundaries for 'teff' and 'radius'. The values should be provided in a list such that multiple Planck functions can be combined, e.g. ``{'teff': [(1000., 2000.), (500., 1500.)], 'radius': [(0.5, 1.5), (1.5, 2.0)]}``. Returns ------- float Log prior probability. """ ln_prior = 0 for i, item in enumerate(bounds): if bounds[item][0] <= param[i] <= bounds[item][1]: ln_prior += 0. else: ln_prior = -np.inf break return ln_prior
[docs]def lnlike(param, bounds, objphot, synphot, distance, spectrum): """ Internal function for the likelihood function. Parameters ---------- param : numpy.ndarray Parameter values. bounds : dict Parameter boundaries for 'teff' and 'radius'. The values should be provided in a list such that multiple Planck functions can be combined, e.g. ``{'teff': [(1000., 2000.), (500., 1500.)], 'radius': [(0.5, 1.5), (1.5, 2.0)]}``. objphot : list(tuple(float, float), ) List with the photometric fluxes and uncertainties. synphot : list(species.analysis.photometry.SyntheticPhotometry, ) List with the :class:`~species.analysis.photometry.SyntheticPhotometry` objects for calculation of synthetic photometry from the model spectra. distance : float Distance (pc). spectrum : numpy.ndarray, None Spectrum array with the wavelength (um), flux (W m-2 um-1), and error (W m-2 um-1). Not used if set to None. Returns ------- float Log likelihood probability. """ paramdict = {} for i, item in enumerate(bounds.keys()): paramdict[item] = param[i] paramdict['distance'] = distance chisq = 0. if objphot is not None: for i, obj_item in enumerate(objphot): readplanck = read_planck.ReadPlanck(filter_name=synphot[i].filter_name) flux = readplanck.get_flux(paramdict, synphot=synphot[i])[0] chisq += (obj_item[0]-flux)**2 / obj_item[1]**2 if spectrum is not None: for i, item in enumerate(spectrum.keys()): readplanck = read_planck.ReadPlanck((0.9*spectrum[item][0][0, 0], 1.1*spectrum[item][0][-1, 0])) model = readplanck.get_spectrum(paramdict, 100.) flux_new = spectres.spectres(spectrum[item][0][:, 0], model.wavelength, model.flux, spec_errs=None) if spectrum[item][1] is not None: spec_diff = spectrum[item][0][:, 1] - flux_new dot_tmp = np.dot(np.transpose(spec_diff), np.linalg.inv(spectrum[item][1])) chisq += np.dot(dot_tmp, spec_diff) else: chisq += np.nansum((spectrum[item][0][:, 1] - flux_new)**2 / spectrum[item][0][:, 2]**2) return -0.5*chisq
[docs]def lnprob(param, bounds, objphot, synphot, distance, spectrum): """ Internal function for the posterior probability. Parameters ---------- param : numpy.ndarray Parameter values. bounds : dict Parameter boundaries for 'teff' and 'radius'. The values should be provided in a list such that multiple Planck functions can be combined, e.g. ``{'teff': [(1000., 2000.), (500., 1500.)], 'radius': [(0.5, 1.5), (1.5, 2.0)]}``. objphot : list(tuple(float, float), ) List with the photometric fluxes and uncertainties. synphot : list(species.analysis.photometry.SyntheticPhotometry, ) List with the :class:`~species.analysis.photometry.SyntheticPhotometry` objects for calculation of synthetic photometry from the model spectra. distance : float Distance (pc). spectrum : numpy.ndarray, None Spectrum array with the wavelength (um), flux (W m-2 um-1), and error (W m-2 um-1). Not used if set to None. Returns ------- float Log posterior probability. """ ln_prior = lnprior(param, bounds) if math.isinf(ln_prior): ln_prob = -np.inf else: ln_prob = ln_prior + lnlike(param, bounds, objphot, synphot, distance, spectrum) if np.isnan(ln_prob): ln_prob = -np.inf return ln_prob
[docs]class FitPlanck: """ Class for fitting Planck spectra to photometric and/or spectroscopic data. """ def __init__(self, object_name, filters, bounds, inc_phot=True, inc_spec=True): """ Parameters ---------- object_name : str Object name in the database. filters : tuple(str, ) Filter names for which the photometry is selected. All available photometric data of the object are used if set to None. bounds : dict Parameter boundaries for 'teff' and 'radius'. The values should be provided either as float or as list of floats such that multiple Planck functions can be combined, e.g. ``{'teff': [(1000., 2000.), (500., 1500.)], 'radius': [(0.5, 1.5), (1.5, 2.0)]}``. inc_phot : bool Include photometric data with the fit. inc_spec : bool Include spectroscopic data with the fit. Returns ------- NoneType None """ if not inc_phot and not inc_spec: raise ValueError('No photometric or spectroscopic data has been selected.') if 'teff' not in bounds or 'radius' not in bounds: raise ValueError('The \'bounds\' dictionary should contain \'teff\' and \'radius\'.') self.model = 'planck' self.object = read_object.ReadObject(object_name) self.distance = self.object.get_distance() if isinstance(bounds['teff'], list) and isinstance(bounds['radius'], list): self.modelpar = [] self.bounds = {} for i, item in enumerate(bounds['teff']): self.modelpar.append(f'teff_{i}') self.modelpar.append(f'radius_{i}') self.bounds[f'teff_{i}'] = bounds['teff'][i] self.bounds[f'radius_{i}'] = bounds['radius'][i] else: self.modelpar = ['teff', 'radius'] self.bounds = bounds if inc_phot: self.synphot = [] self.objphot = [] if not filters: species_db = database.Database() objectbox = species_db.get_object(object_name, None) filters = objectbox.filters for item in filters: sphot = photometry.SyntheticPhotometry(item) self.synphot.append(sphot) obj_phot = self.object.get_photometry(item) self.objphot.append((obj_phot[2], obj_phot[3])) else: self.synphot = None self.objphot = None if inc_spec: self.spectrum = self.object.get_spectrum() else: self.spectrum = None
[docs] def run_mcmc(self, nwalkers, nsteps, guess, tag): """ Function to run the MCMC sampler. Parameters ---------- nwalkers : int Number of walkers. nsteps : int Number of steps per walker. guess : dict, None Guess for the 'teff' and 'radius'. Random values between the boundary values are used if a value is set to None. The values should be provided either as float or in a list of floats such that multiple Planck functions can be combined, e.g. ``{'teff': [1500., 1000.], 'radius': [1., 2.]``. tag : str Database tag where the MCMC samples are stored. Returns ------- NoneType None """ print('Running MCMC...') ndim = len(self.bounds) if ndim == 2: sigma = {'teff': 5., 'radius': 0.01} else: sigma = {} for i, item in enumerate(guess['teff']): sigma[f'teff_{i}'] = 5. guess[f'teff_{i}'] = guess['teff'][i] for i, item in enumerate(guess['radius']): sigma[f'radius_{i}'] = 0.01 guess[f'radius_{i}'] = guess['radius'][i] del guess['teff'] del guess['radius'] initial = np.zeros((nwalkers, ndim)) for i, item in enumerate(self.modelpar): if guess[item] is not None: initial[:, i] = guess[item] + np.random.normal(0, sigma[item], nwalkers) else: initial[:, i] = np.random.uniform(low=self.bounds[item][0], high=self.bounds[item][1], size=nwalkers) with Pool(processes=cpu_count()): ens_sampler = emcee.EnsembleSampler(nwalkers, ndim, lnprob, args=([self.bounds, self.objphot, self.synphot, self.distance[0], self.spectrum])) ens_sampler.run_mcmc(initial, nsteps, progress=True) species_db = database.Database() species_db.add_samples(sampler='emcee', samples=ens_sampler.chain, ln_prob=ens_sampler.lnprobability, mean_accept=np.mean(ens_sampler.acceptance_fraction), spectrum=('model', self.model), tag=tag, modelpar=self.modelpar, distance=self.distance[0], spec_labels=None)