Source code for species.util.fit_util

"""
Utility functions for fit results.
"""

import os
import warnings

from configparser import ConfigParser

import h5py
import numpy as np

from beartype import beartype
from beartype.typing import Dict, List, Optional, Union
from spectres.spectral_resampling_numba import spectres_numba

from species.core.box import ObjectBox, ResidualsBox, SynphotBox, create_box
from species.phot.syn_phot import SyntheticPhotometry
from species.read.read_calibration import ReadCalibration
from species.read.read_filter import ReadFilter
from species.read.read_model import ReadModel
from species.read.read_planck import ReadPlanck
from species.read.read_radtrans import ReadRadtrans
from species.util.core_util import print_section
from species.util.model_util import binary_to_single, powerlaw_spectrum
from species.util.retrieval_util import convolve_spectrum


[docs] @beartype def multi_photometry( datatype: str, spectrum: str, filters: List[str], parameters: Dict[str, float], radtrans: Optional[ReadRadtrans] = None, verbose: bool = True, ) -> SynphotBox: """ Function for calculating synthetic photometry for a list of filters and a specified atmosphere model and related parameters. This function can for example be used for calculating the synthetic photometry from a best-fit model spectrum. It returns a :class:`~species.core.box_types.SynphotBox` that can be provided as input to :func:`~species.plot.plot_spectrum.plot_spectrum`. Parameters ---------- datatype : str Data type ('model' or 'calibration'). spectrum : str Spectrum name (e.g., 'drift-phoenix', 'bt-settl-cifist', planck', 'powerlaw', 'petitradtrans'). filters : list(str) List with the filter names. parameters : dict Dictionary with the model parameters. radtrans : ReadRadtrans, None Instance of :class:`~species.read.read_radtrans.ReadRadtrans`. Only required with ``spectrum='petitradtrans'`. Make sure that the ``wavel_range`` of the ``ReadRadtrans`` instance is sufficiently broad to cover all the ``filters``. The argument can be set to ``None`` for any other model than petitRADTRANS. verbose : bool Print output. Returns ------- species.core.box.SynphotBox Box with synthetic photometry. """ if verbose: print_section("Calculate multi-photometry") print(f"Data type: {datatype}") print(f"Spectrum name: {spectrum}") print("\nParameters:") for param_key, param_value in parameters.items(): if isinstance(param_value, float): if -0.1 < param_value < 0.1: print(f" - {param_key} = {param_value:.2e}") else: print(f" - {param_key} = {param_value:.2f}") mean_wavel = {} for filter_item in filters: read_filt = ReadFilter(filter_item) mean_wavel[filter_item] = read_filt.mean_wavelength() flux = {} if datatype == "model": if spectrum == "petitradtrans": # Calculate the petitRADTRANS spectrum only once radtrans_box = radtrans.get_model(parameters) for item in filters: if spectrum == "petitradtrans": # Use an instance of SyntheticPhotometry instead # of get_flux from ReadRadtrans in order to not # recalculate the spectrum syn_phot = SyntheticPhotometry(item) flux[item], _ = syn_phot.spectrum_to_flux( radtrans_box.wavelength, radtrans_box.flux ) elif spectrum == "powerlaw": synphot = SyntheticPhotometry(item) # Set the wavel_range attribute synphot.zero_point() powerl_box = powerlaw_spectrum(synphot.wavel_range, parameters) flux[item] = synphot.spectrum_to_flux( powerl_box.wavelength, powerl_box.flux )[0] else: if spectrum == "planck": readmodel = ReadPlanck(filter_name=item) else: readmodel = ReadModel(spectrum, filter_name=item) try: if ( spectrum != "planck" and "teff_0" in parameters and "teff_1" in parameters ): # Binary system param_0 = binary_to_single(parameters, 0) model_flux_0 = readmodel.get_flux(param_0)[0] param_1 = binary_to_single(parameters, 1) model_flux_1 = readmodel.get_flux(param_1)[0] # Weighted flux of two spectra for atmospheric asymmetries # Or simply the same in case of an actual binary system if "spec_weight" in parameters: flux[item] = ( parameters["spec_weight"] * model_flux_0 + (1.0 - parameters["spec_weight"]) * model_flux_1 ) else: flux[item] = model_flux_0 + model_flux_1 else: # Single object flux[item] = readmodel.get_flux(parameters)[0] except IndexError: flux[item] = np.nan warnings.warn( f"The wavelength range of the {item} filter does not " f"match with the wavelength range of {spectrum}. The " f"flux is set to NaN." ) elif datatype == "calibration": for item in filters: readcalib = ReadCalibration(spectrum, filter_name=item) flux[item] = readcalib.get_flux(parameters)[0] app_mag = {} abs_mag = {} for phot_key, phot_value in flux.items(): syn_phot = SyntheticPhotometry(phot_key) if "parallax" in parameters: app_mag[phot_key], abs_mag[phot_key] = syn_phot.flux_to_magnitude( flux=phot_value, error=None, parallax=(parameters["parallax"], None) ) elif "distance" in parameters: app_mag[phot_key], abs_mag[phot_key] = syn_phot.flux_to_magnitude( flux=phot_value, error=None, distance=(parameters["distance"], None) ) else: app_mag[phot_key], abs_mag[phot_key] = syn_phot.flux_to_magnitude( flux=phot_value, error=None ) if verbose: print("\nMagnitudes:") for phot_key, phot_value in app_mag.items(): if phot_value[1] is None: print(f" - {phot_key} = {phot_value[0]:.2f}") else: print(f" - {phot_key} = {phot_value[0]:.2f} +/- {phot_value[1]:.2f}") print("\nFluxes (W m-2 um-1):") for phot_key, phot_value in flux.items(): print(f" - {phot_key} = {phot_value:.2e}") return create_box( "synphot", name="synphot", flux=flux, wavelength=mean_wavel, app_mag=app_mag, abs_mag=abs_mag, )
[docs] @beartype def get_residuals( parameters: Dict[str, float], objectbox: ObjectBox, tag: Optional[str] = None, inc_phot: Union[bool, List[str]] = True, inc_spec: Union[bool, List[str]] = True, radtrans: Optional[ReadRadtrans] = None, model_name: Optional[str] = None, datatype: Optional[str] = None, spectrum: Optional[str] = None, ) -> ResidualsBox: """ Function for calculating the residuals from fitting model or calibration spectra to a set of spectra and/or photometry. This function also calculates the (reduced) :math:`\\chi^2`, taking into account the covariances in case these have been provided with a spectrum. Parameters ---------- parameters : dict Parameters and values for the spectrum objectbox : ObjectBox Box with the photometry and/or spectra of an object. A scaling and/or error inflation of the spectra should be applied with :func:`~species.util.box_util.update_objectbox` beforehand. tag : str, None Database tag with the sampling results. By setting the argument, the reduced :math:`\\chi^2` will be calculated and the ``model_name`` does not need to be set, since the model from the fit is automatically selected. Setting the argument to ``None`` requires setting the ``model_name``, which can be used for calculating the residuals for any set of model parameters, not associated with a fit. inc_phot : bool, list(str) Include photometric data in the fit. If a boolean, either all (``True``) or none (``False``) of the data are selected. If a list, a subset of filter names (as stored in the database) can be provided. inc_spec : bool, list(str) Include spectroscopic data in the fit. If a boolean, either all (``True``) or none (``False``) of the data are selected. If a list, a subset of spectrum names (as stored in the database with :func:`~species.data.database.Database.add_object`) can be provided. radtrans : ReadRadtrans, None Instance of :class:`~species.read.read_radtrans.ReadRadtrans`. Only required with ``spectrum='petitradtrans'``. Make sure that the ``wavel_range`` of the ``ReadRadtrans`` instance is sufficiently broad to cover all the photometric and spectroscopic data of ``inc_phot`` and ``inc_spec``. Not used if the argument is set to ``None``. model_name : str, None The atmospheric model that will be used for calculating the residuals. The argument is mandatory when ``tag=None``, and optional otherwise. If the argument of ``tag`` is not ``None`` then the model is automatically selected, but this can be overruled by setting the argument of ``model_name``. Returns ------- species.core.box.ResidualsBox Box with the residuals. """ # Check deprecated parameters if datatype is not None: warnings.warn( "The 'datatype' parameter is no longer " "used by the 'get_residuals()' function. " "Instead, the 'tag' parameter should be set, " "which points to the sampling results as " "stored in the database.", DeprecationWarning, ) if spectrum is not None: warnings.warn( "The 'spectrum' parameter is no longer " "used by the 'get_residuals()' function. " "Instead, the 'tag' parameter should be set, " "which points to the sampling results as " "stored in the database.", DeprecationWarning, ) # Read sampling results print_section("Calculate residuals") if "SPECIES_CONFIG" in os.environ: config_file = os.environ["SPECIES_CONFIG"] else: config_file = os.path.join(os.getcwd(), "species_config.ini") config = ConfigParser() config.read(config_file) database_path = config["species"]["database"] print(f"Database tag: {tag}") if tag is not None: with h5py.File(database_path, "r") as hdf5_file: if f"results/fit/{tag}/samples" in hdf5_file: # Samples from FitModel or AtmosphericRetrieval dset = hdf5_file[f"results/fit/{tag}/samples"] if "model_name" in dset.attrs: if dset.attrs["model_name"] == "petitradtrans": results_type = "AtmosphericRetrieval" else: results_type = "FitModel" else: results_type = "FitModel" print(f"Results type: {results_type}") elif f"results/comparison/{tag}/goodness_of_fit" in hdf5_file: # Samples from CompareSpectra results_type = "CompareSpectra" dset = hdf5_file[f"results/comparison/{tag}/goodness_of_fit"] print("Selected results: CompareSpectra") else: raise ValueError( f"The '{tag}' tag is not found in the database. " "Please specify modeling results from either " "FitModel, AtmosphericRetrieval, or CompareSpectra." ) if model_name is None: if "model_name" in dset.attrs: model_name = dset.attrs["model_name"] elif "spectrum" in dset.attrs: model_name = dset.attrs["spectrum"] elif "model" in dset.attrs: model_name = dset.attrs["model"] else: raise ValueError( "The attribute with the model name is " f"not found in the results of '{tag}'." ) else: warnings.warn( "The argument of 'model_name' is set, so the " "model name that is stored as attribute of " "the provided database 'tag' will be ignored." ) print(f"Model: {model_name}") if "binary" in dset.attrs: binary = dset.attrs["binary"] print(f"Binary: {binary}") n_param = dset.attrs["n_param"] if results_type in ["FitModel", "AtmosphericRetrieval"]: if "n_fixed" in dset.attrs: n_fixed = dset.attrs["n_fixed"] else: n_fixed = 0 warnings.warn( "The 'fixed_param' group is not found in " f"the results of {tag}. Probably the " "results were obtained with an older " "version of the package. The number " "of fixed parameters will be set to " "zero. The reduced chi^2 may not be " "accurate in case parameters were " "fixed when running FitModel." ) else: # TODO not yet implemented for CompareSpectra warnings.warn( "Not yet implemented for CompareSpectra. Please " "open an issue on the Github page if needed. The " "number of fixed parameters will be set to zero." ) n_fixed = 0 print("\nModel parameters:") for param_idx in range(n_param): param_item = dset.attrs[f"parameter{param_idx}"] print(f" - {param_item}") if n_fixed == 0: print("\nFixed parameters: none") else: dset = hdf5_file[f"results/fit/{tag}/fixed_param"] print("\nFixed parameters:") for param_item in hdf5_file[f"results/fit/{tag}/fixed_param"]: print(f" - {param_item}") print(f"\nInclude photometry: {inc_phot}") print(f"Include spectra: {inc_spec}") res_phot = None res_spec = None chi2_stat = 0 n_dof = 0 # Photometry residuals if inc_phot and objectbox.filters is not None: if isinstance(inc_phot, bool) and inc_phot: inc_phot = objectbox.filters model_phot = multi_photometry( datatype="model", spectrum=model_name, filters=inc_phot, parameters=parameters, radtrans=radtrans, verbose=False, ) res_phot = {} for phot_item in inc_phot: transmission = ReadFilter(phot_item) res_phot[phot_item] = np.zeros(objectbox.flux[phot_item].shape) if objectbox.flux[phot_item].ndim == 1: res_phot[phot_item][0] = transmission.mean_wavelength() res_phot[phot_item][1] = ( objectbox.flux[phot_item][0] - model_phot.flux[phot_item] ) / objectbox.flux[phot_item][1] chi2_stat += res_phot[phot_item][1] ** 2 n_dof += 1 elif objectbox.flux[phot_item].ndim == 2: for j in range(objectbox.flux[phot_item].shape[1]): res_phot[phot_item][0, j] = transmission.mean_wavelength() res_phot[phot_item][1, j] = ( objectbox.flux[phot_item][0, j] - model_phot.flux[phot_item] ) / objectbox.flux[phot_item][1, j] chi2_stat += res_phot[phot_item][1, j] ** 2 n_dof += 1 # Spectra residuals if inc_spec and objectbox.spectrum is not None: res_spec = {} if model_name == "petitradtrans": # Calculate the petitRADTRANS spectrum only once # Smoothing and resampling not with get_model model_box = radtrans.get_model(parameters) for spec_key in objectbox.spectrum: if isinstance(inc_spec, bool) or spec_key in inc_spec: wavel_range = ( 0.9 * objectbox.spectrum[spec_key][0][0, 0], 1.1 * objectbox.spectrum[spec_key][0][-1, 0], ) wl_new = objectbox.spectrum[spec_key][0][:, 0] spec_res = objectbox.spectrum[spec_key][3] if model_name == "planck": # Resampling to the new wavelength points # is done by the get_spectrum method readmodel = ReadPlanck(wavel_range=wavel_range) model_box = readmodel.get_spectrum( model_param=parameters, spec_res=1000.0, wavel_resample=wl_new, ) flux_new = model_box.flux elif model_name == "petitradtrans": # Smoothing to the instrument resolution flux_smooth = convolve_spectrum( model_box.wavelength, model_box.flux, spec_res ) # Resampling to the new wavelength points flux_new = spectres_numba( wl_new, model_box.wavelength, flux_smooth, spec_errs=None, fill=np.nan, verbose=True, ) else: # Resampling to the new wavelength points # is done by the get_model method readmodel = ReadModel(model_name, wavel_range=wavel_range) if "teff_0" in parameters and "teff_1" in parameters: # Binary system param_0 = binary_to_single(parameters, 0) model_box_0 = readmodel.get_model( param_0, spec_res=spec_res, wavel_resample=wl_new, ) param_1 = binary_to_single(parameters, 1) model_box_1 = readmodel.get_model( param_1, spec_res=spec_res, wavel_resample=wl_new, ) # Weighted flux of two spectra for atmospheric asymmetries # Or simply the same in case of an actual binary system if "spec_weight" in parameters: flux_comb = ( parameters["spec_weight"] * model_box_0.flux + (1.0 - parameters["spec_weight"]) * model_box_1.flux ) else: flux_comb = model_box_0.flux + model_box_1.flux model_box = create_box( boxtype="model", model=model_name, wavelength=wl_new, flux=flux_comb, parameters=parameters, quantity="flux", ) else: # Single object model_box = readmodel.get_model( parameters, spec_res=spec_res, wavel_resample=wl_new, ) flux_new = model_box.flux data_spec = objectbox.spectrum[spec_key][0] diff_spec = data_spec[:, 1] - flux_new res_tmp = diff_spec / data_spec[:, 2] res_spec[spec_key] = np.column_stack([wl_new, res_tmp]) count_nan = np.sum(np.isnan(res_spec[spec_key][:, 1])) if objectbox.spectrum[spec_key][2] is None: if ( f"corr_len_{spec_key}" in parameters and f"corr_amp_{spec_key}" in parameters ): # Covariance model (Wang et al. 2020) wavel = objectbox.spectrum[spec_key][0][:, 0] # (um) wavel_j, wavel_i = np.meshgrid(wavel, wavel) error = objectbox.spectrum[spec_key][0][:, 1] # (W m-2 um-1) error_j, error_i = np.meshgrid(error, error) corr_len = 10.0 ** parameters[f"corr_len_{spec_key}"] # (um) corr_amp = parameters[f"corr_amp_{spec_key}"] cov_matrix = ( corr_amp**2 * error_i * error_j * np.exp(-((wavel_i - wavel_j) ** 2) / (2.0 * corr_len**2)) + (1.0 - corr_amp**2) * np.eye(wavel.shape[0]) * error_i**2 ) chi2_stat += diff_spec @ np.linalg.inv(cov_matrix) @ diff_spec else: chi2_stat += np.nansum(res_spec[spec_key][:, 1] ** 2) else: # Use objectbox.spectrum[spec_key][2] which contains # the inverse of the covariance matrix of a spectrum if data_spec.shape[0] != objectbox.spectrum[spec_key][2].shape[0]: raise ValueError( f"The '{spec_key}' spectrum has {data_spec.shape[0]} " "wavelengths but the covariance matrix has been " f"specified for {objectbox.spectrum[spec_key][2].shape[0]} " "wavelengths. Could it be that the spectrum has been " f"resampled? Please make sure that '{spec_key}' spectrum has " "the same number of wavelengths as its covariance matrix." ) chi2_stat += diff_spec @ objectbox.spectrum[spec_key][2] @ diff_spec n_dof += res_spec[spec_key].shape[0] - count_nan print("\nResiduals (sigma):") if res_phot is not None: for phot_item in inc_phot: if res_phot[phot_item].ndim == 1: print(f" - {phot_item} = {res_phot[phot_item][1]:.2f}") elif res_phot[phot_item].ndim == 2: for j in range(res_phot[phot_item].shape[1]): print(f" - {phot_item} = {res_phot[phot_item][1, j]:.2f}") if res_spec is not None: for spec_key in objectbox.spectrum: if isinstance(inc_spec, bool) or spec_key in inc_spec: print( f" - {spec_key}: min = {np.nanmin(res_spec[spec_key]):.2f}, " f"max = {np.nanmax(res_spec[spec_key]):.2f}" ) if tag is not None: print(f"\nNumber of data points = {n_dof}") print(f"Number of model parameters = {n_param}") print(f"Number of fixed parameters = {n_fixed}") n_dof -= n_param - n_fixed print(f"Number of degrees of freedom = {n_dof}") chi2_red = chi2_stat / n_dof else: n_dof = None chi2_red = None print(f"\nchi2 = {chi2_stat:.2f}") if tag is not None: print(f"reduced chi2 = {chi2_red:.2f}") return create_box( boxtype="residuals", name=objectbox.name, photometry=res_phot, spectrum=res_spec, chi2=chi2_stat, n_dof=n_dof, chi2_red=chi2_red, )