Source code for species.fit.fit_spectrum

"""
Module with functionalities for photometric and spectroscopic
calibration. The fitting routine  be used to fit photometric
data with a calibration spectrum (e.g. extracted with
:func:`~species.read.read_model.ReadModel.get_model`) by
simply fitting a scaling parameter.
"""

import math

from typing import Dict, List, Optional, Tuple, Union

from multiprocessing import cpu_count, Pool

import emcee
import numpy as np

from typeguard import typechecked

from species.phot.syn_phot import SyntheticPhotometry
from species.read.read_calibration import ReadCalibration
from species.read.read_object import ReadObject


[docs] @typechecked def lnprob( param: np.ndarray, bounds: Dict[str, Tuple[float, float]], modelpar: List[str], objphot: List[np.ndarray], specphot: Union[ List[float], List[Tuple[SyntheticPhotometry, Tuple[np.float64, np.float64]]], ], ) -> float: """ Internal function for calculating the posterior probability. Parameters ---------- param : np.ndarray Value of the scaling parameter. bounds : dict Boundaries of the main scaling parameter. modelpar : list(str) Parameter names. objphot : list(tuple(float, float)) Photometry of the object. specphot : list(float), SyntheticPhotometry Synthetic photometry of the calibration spectrum for the same filters as the photometry of the object. Returns ------- float Log posterior probability. """ ln_prob = 0.0 for i, item in enumerate(modelpar): if bounds[item][0] <= param[i] <= bounds[item][1]: ln_prob += 0.0 else: ln_prob += -np.inf break if not math.isinf(ln_prob): for i, obj_item in enumerate(objphot): if obj_item.ndim == 1: ln_prob += ( -0.5 * (obj_item[0] - param[0] * specphot[i]) ** 2 / obj_item[1] ** 2 ) else: for j in range(obj_item.shape[1]): ln_prob += ( -0.5 * (obj_item[0, j] - param[0] * specphot[i]) ** 2 / obj_item[1, j] ** 2 ) return ln_prob
[docs] class FitSpectrum: """ Class for fitting a calibration spectrum to photometric data. """ @typechecked def __init__( self, object_name: str, filters: Optional[List[str]], spectrum: str, bounds: Dict[str, Tuple[float, float]], ) -> None: """ Parameters ---------- object_name : str Object name in the database. filters : list(str) Filter names for which the photometry is selected. All available photometry of the object is selected if the argument is set to ``None``. spectrum : str Calibration spectrum as labelled in the database. The calibration spectrum can be stored in the database with :func:`~species.data.database.Database.add_calibration`. bounds : dict Boundaries of the scaling parameter, as ``{'scaling':(min, max)}``. Returns ------- NoneType None """ self.object = ReadObject(object_name) self.spectrum = spectrum self.bounds = bounds self.objphot = [] self.specphot = [] if filters is None: from species.data.database import Database species_db = Database() objectbox = species_db.get_object( object_name, inc_phot=True, inc_spec=False ) filters = objectbox.filters for item in filters: readcalib = ReadCalibration(self.spectrum, item) calibspec = readcalib.get_spectrum() synphot = SyntheticPhotometry(item) spec_phot = synphot.spectrum_to_flux(calibspec.wavelength, calibspec.flux) self.specphot.append(spec_phot[0]) obj_phot = self.object.get_photometry(item) self.objphot.append(np.array([obj_phot[2], obj_phot[3]])) self.modelpar = ["scaling"]
[docs] @typechecked def run_mcmc( self, nwalkers: int, nsteps: int, guess: Union[Dict[str, float], Dict[str, None]], tag: str, ) -> None: """ Function to run the MCMC sampler. Parameters ---------- nwalkers : int Number of walkers. nsteps : int Number of steps per walker. guess : dict(str, float), dict(str, None) Guess of the scaling parameter. tag : str Database tag where the MCMC samples will be stored. Returns ------- NoneType None """ print("Running MCMC...") ndim = 1 initial = np.zeros((nwalkers, ndim)) for i, item in enumerate(self.modelpar): if guess[item] is not None: width = min( abs(guess[item] - self.bounds[item][0]), abs(guess[item] - self.bounds[item][1]), ) initial[:, i] = guess[item] + np.random.normal(0, 0.1 * width, nwalkers) else: initial[:, i] = np.random.uniform( low=self.bounds[item][0], high=self.bounds[item][1], size=nwalkers ) with Pool(processes=cpu_count()): ens_sampler = emcee.EnsembleSampler( nwalkers, ndim, lnprob, args=([self.bounds, self.modelpar, self.objphot, self.specphot]), ) ens_sampler.run_mcmc(initial, nsteps, progress=True) # Dictionary with attributes that will be stored attr_dict = { "spec_type": "calibration", "spec_name": self.spectrum, "mean_accept": np.mean(ens_sampler.acceptance_fraction), } # Add samples to the database from species.data.database import Database species_db = Database() species_db.add_samples( sampler="emcee", samples=ens_sampler.get_chain(), ln_prob=ens_sampler.get_log_prob(), tag=tag, modelpar=self.modelpar, attr_dict=attr_dict, )