Source code for species.data.database

"""
Module with functionalities for reading and writing of data.
"""

import json
import os
import warnings

from configparser import ConfigParser
from numbers import Real
from pathlib import Path

import astropy.units as u
import h5py
import numpy as np
import pooch

from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.units.quantity import Quantity
from beartype import beartype
from beartype.typing import Any, Dict, List, Optional, Tuple, Union
from scipy.integrate import simpson
from tqdm.auto import tqdm

from species.core import constants
from species.core.box import ObjectBox, ModelBox, SamplesBox, SpectrumBox, create_box
from species.util.core_util import print_section


[docs] class Database: """ Class with reading and writing functionalities for the HDF5 database. """ @beartype def __init__(self) -> None: """ Returns ------- NoneType None """ if "SPECIES_CONFIG" in os.environ: config_file = os.environ["SPECIES_CONFIG"] else: config_file = os.path.join(os.getcwd(), "species_config.ini") config = ConfigParser() config.read(config_file) self.database = config["species"]["database"] self.data_folder = config["species"]["data_folder"]
[docs] @beartype def list_content(self) -> None: """ Function for listing the content of the HDF5 database. The database structure will be descended while printing the paths of all the groups and datasets, as well as the dataset attributes. Returns ------- NoneType None """ print_section("List database content") @beartype def _descend( h5_object: Union[ h5py._hl.files.File, h5py._hl.group.Group, h5py._hl.dataset.Dataset ], seperator: str = "", ) -> None: """ Internal function for descending into an HDF5 dataset and printing its content. Parameters ---------- h5_object : h5py._hl.files.File, h5py._hl.group.Group, h5py._hl.dataset.Dataset The ``h5py`` object. separator : str Separator that is used between items. Returns ------- NoneType None """ if isinstance(h5_object, (h5py._hl.files.File, h5py._hl.group.Group)): for group_key in h5_object.keys(): print( seperator + "- " + group_key + ": " + str(h5_object[group_key]) ) _descend(h5_object[group_key], seperator=seperator + "\t") for attr_key in h5_object[group_key].attrs.keys(): print( seperator + "\t" + "- " + attr_key + " = " + str(h5_object[group_key].attrs[attr_key]) ) elif isinstance(h5_object, h5py._hl.dataset.Dataset): for attr_key in h5_object.attrs.keys(): print( seperator + "- " + attr_key + ": " + str(h5_object.attrs[attr_key]) ) with h5py.File(self.database, "r") as hdf_file: _descend(hdf_file)
[docs] @beartype def list_companions(self, verbose: bool = False) -> List[str]: """ Function for printing an overview of the companion data that are stored in the database. It will return a list with all the companion names. Each name can be used as input for :class:`~species.read.read_object.ReadObject`. Parameters ---------- verbose : bool Print details on the companion data or list only the names of the companions for which data are available. Returns ------- list(str) List with the object names that are stored in the database. """ data_file = ( Path(__file__).parent.resolve() / "companion_data/companion_data.json" ) with open(data_file, "r", encoding="utf-8") as json_file: comp_data = json.load(json_file) spec_file = ( Path(__file__).parent.resolve() / "companion_data/companion_spectra.json" ) with open(spec_file, "r", encoding="utf-8") as json_file: comp_spec = json.load(json_file) print_section("Companions with available data") comp_names = [] for planet_name, planet_dict in comp_data.items(): comp_names.append(planet_name) if verbose: print(f"Object name = {planet_name}") if "parallax" in planet_dict: parallax = planet_dict["parallax"] print(f"Parallax (pc) = {parallax[0]} +/- {parallax[1]}") if "distance" in planet_dict: distance = planet_dict["distance"] print(f"Distance (pc) = {distance[0]} +/- {distance[1]}") app_mag = planet_dict["app_mag"] for mag_key, mag_value in app_mag.items(): if isinstance(mag_value[0], list) or isinstance( mag_value[0], tuple ): for item in mag_value: print(f"{mag_key} (mag) = {item[0]} +/- {item[1]}") else: print(f"{mag_key} (mag) = {mag_value[0]} +/- {mag_value[1]}") print("References:") for ref_item in planet_dict["references"]: print(f" - {ref_item}") if planet_name in comp_spec: for key, value in comp_spec[planet_name].items(): print(f"{key} spectrum from {value[3]}") print() else: print(planet_name) return comp_names
[docs] @beartype def available_models(self, verbose: bool = True) -> Dict: """ Function for printing an overview of the available model grids that can be downloaded and added to the database with :class:`~species.data.database.Database.add_model`. Parameters ---------- verbose : bool Print details on the grids of model spectra or list only the names of the available model spectra. Returns ------- dict Dictionary with the details on the model grids. The dictionary is created from the ``model_data.json`` file in the ``species.data`` folder. """ print_section("Available model spectra") data_file = Path(__file__).parent.resolve() / "model_data/model_data.json" with open(data_file, "r", encoding="utf-8") as json_file: model_data = json.load(json_file) for model_name, model_dict in model_data.items(): if verbose: print(f" - {model_dict['name']}:") print(f" - Label = {model_name}") if "parameters" in model_dict: print(f" - Model parameters: {model_dict['parameters']}") if "teff range" in model_dict: print(f" - Teff range (K): {model_dict['teff range']}") if "wavelength range" in model_dict: print( f" - Wavelength range (um): {model_dict['wavelength range']}" ) if "lambda/d_lambda" in model_dict: print( f" - Sampling (lambda/d_lambda): {model_dict['lambda/d_lambda']}" ) if "information" in model_dict: print(f" - Extra details: {model_dict['information']}") if "file size" in model_dict: print(f" - File size: {model_dict['file size']}") if "reference" in model_dict: print(f" - Reference: {model_dict['reference']}") if "url" in model_dict: print(f" - URL: {model_dict['url']}") print() else: print(f"{model_dict['name']} -> label: {model_name}") return model_data
[docs] @beartype def delete_data(self, data_set: str) -> None: """ Function for deleting a dataset from the HDF5 database. Parameters ---------- data_set : str Group or dataset path in the HDF5 database. The content and structure of the database can be shown with :func:`~species.data.database.Database.list_content`. That could help to determine which argument should be provided as argument of ``data_set``. For example, ``data_set="models/drift-phoenix"`` will remove the model spectra of DRIFT-PHOENIX. Returns ------- NoneType None """ with h5py.File(self.database, "a") as hdf5_file: if data_set in hdf5_file: print(f"Deleting data: {data_set}...", end="", flush=True) del hdf5_file[data_set] print(" [DONE]") else: warnings.warn( f"The dataset {data_set} is not found in {self.database}." )
[docs] @beartype def add_companion( self, name: Optional[Union[Optional[str], Optional[List[str]]]] = None, verbose: bool = True, ) -> None: """ Function for adding the magnitudes and spectra of directly imaged planets and brown dwarfs from `data/companion_data/companion_data.json` and :func:`~species.data.companion_data.companion_spectra` to the database. Parameters ---------- name : str, list(str), None Name or list with names of the directly imaged planets and brown dwarfs (e.g. ``'HR 8799 b'`` or ``['HR 8799 b', '51 Eri b', 'PZ Tel B']``). All the available companion data are added if the argument is set to ``None``. verbose : bool Print details on the companion data that are added to the database. Returns ------- NoneType None """ from species.data.companion_data.companion_spectra import companion_spectra if isinstance(name, str): name = [ name, ] data_file = ( Path(__file__).parent.resolve() / "companion_data/companion_data.json" ) with open(data_file, "r", encoding="utf-8") as json_file: comp_data = json.load(json_file) if name is None: name = list(comp_data.keys()) if not verbose: print(f"Add companion: {name}") for item in name: spec_dict = companion_spectra(Path(self.data_folder), item, verbose=verbose) parallax = None # try: # # Query SIMBAD to get the parallax # simbad = Simbad() # simbad.add_votable_fields("parallax") # simbad_result = simbad.query_object(comp_data[item]["simbad"]) # # if simbad_result is not None: # par_sim = ( # simbad_result["PLX_VALUE"][0], # (mas) # simbad_result["PLX_ERROR"][0], # ) # (mas) # # if not np.ma.is_masked(par_sim[0]) and not np.ma.is_masked( # par_sim[1] # ): # parallax = (float(par_sim[0]), float(par_sim[1])) # # except urllib.error.URLError: # parallax = tuple(comp_data[item]["parallax"]) if parallax is None: parallax = tuple(comp_data[item]["parallax"]) if "app_mag" in comp_data[item]: app_mag = comp_data[item]["app_mag"] for key, value in app_mag.items(): if isinstance(value[0], list): mag_list = [] for mag_item in value: mag_list.append(tuple(mag_item)) app_mag[key] = mag_list else: app_mag[key] = tuple(value) else: app_mag = None self.add_object( object_name=item, parallax=parallax, app_mag=app_mag, spectrum=spec_dict, verbose=verbose, )
[docs] @beartype def add_dust(self) -> None: """ Function for adding optical constants of MgSiO3 and Fe, and MgSiO3 cross sections for a log-normal and power-law size distribution to the database. The optical constants have been compiled by Mollière et al. (2019) for petitRADTRANS from the following sources: - MgSiO3, crystalline - Scott & Duley (1996), ApJS, 105, 401 - Jäger et al. (1998), A&A, 339, 904 - MgSiO3, amorphous - Jäger et al. (2003), A&A, 408, 193 - Fe, crystalline - Henning & Stognienko (1996), A&A, 311, 291 - Fe, amorphous - Pollack et al. (1994), ApJ, 421, 615 Returns ------- NoneType None """ from species.data.misc_data.dust_data import ( add_cross_sections, add_optical_constants, ) with h5py.File(self.database, "a") as hdf5_file: if "dust" in hdf5_file: del hdf5_file["dust"] add_optical_constants(self.data_folder, hdf5_file) add_cross_sections(self.data_folder, hdf5_file)
[docs] @beartype def add_accretion(self) -> None: """ Function for adding the coefficients for converting line luminosities of hydrogen emission lines into accretion luminosities (see `Aoyama et al. (2021) <https://ui. adsabs.harvard.edu/abs/ 2021ApJ...917L..30A/abstract>`_ and `Marleau & Aoyama (2022) <https://ui.adsabs.harvard. edu/abs/2022RNAAS...6..262M/abstract>`_ for details). The relation is used by :class:`~species.fit.emission_line.EmissionLine` for converting the fitted line luminosity. Returns ------- NoneType None """ from species.data.misc_data.accretion_data import add_accretion_relation with h5py.File(self.database, "a") as hdf5_file: if "accretion" in hdf5_file: del hdf5_file["accretion"] add_accretion_relation(self.data_folder, hdf5_file)
[docs] @beartype def add_filter( self, filter_name: str, filename: Optional[str] = None, detector_type: str = "photon", verbose: bool = True, ) -> None: """ Function for adding a filter profile to the database, either from the SVO Filter profile Service or from an input file. Additional filters that are automatically added are Magellan/VisAO.rp, Magellan/VisAO.ip, Magellan/VisAO.zp, Magellan/VisAO.Ys, ALMA/band6, and ALMA/band7. Parameters ---------- filter_name : str Filter name from the SVO Filter Profile Service (e.g., 'Paranal/NACO.Lp') or a user-defined name if a ``filename`` is specified. filename : str, None Filename of the filter profile. The first column should contain the wavelength (um) and the second column the fractional transmission. The profile is downloaded from the SVO Filter Profile Service if the argument of ``filename`` is set to ``None``. detector_type : str The detector type ('photon' or 'energy'). The argument is only used if a ``filename`` is provided. Otherwise, for filters that are fetched from the SVO website, the detector type is read from the SVO data. The detector type determines if a wavelength factor is included in the integral for the synthetic photometry. verbose : bool Print details on the companion data that are added to the database. Returns ------- NoneType None """ if verbose: print(f"Adding filter: {filter_name}...", end="", flush=True) # filter_split = filter_name.split("/") if filename is not None: filter_profile = np.loadtxt(filename) wavelength = filter_profile[:, 0] transmission = filter_profile[:, 1] with h5py.File(self.database, "a") as hdf5_file: if f"filters/{filter_name}" in hdf5_file: del hdf5_file[f"filters/{filter_name}"] dset = hdf5_file.create_dataset( f"filters/{filter_name}", data=np.column_stack((wavelength, transmission)), dtype="f8", ) dset.attrs["det_type"] = str(detector_type) else: from species.data.filter_data.filter_data import add_filter_profile with h5py.File(self.database, "a") as hdf5_file: if f"filters/{filter_name}" in hdf5_file: del hdf5_file[f"filters/{filter_name}"] # if f"filters/{filter_split[0]}" not in hdf5_file: # hdf5_file.create_group(f"filters/{filter_split[0]}") add_filter_profile(self.data_folder, hdf5_file, filter_name) if verbose: print(" [DONE]")
[docs] @beartype def add_isochrones( self, model: Optional[str] = None, filename: Optional[str] = None, tag: Optional[str] = None, ) -> None: """ Function for adding isochrone data to the database. Parameters ---------- model : str, None Evolutionary model ('ames', 'atmo', 'atmo-chabrier2023', 'baraffe2015', 'bt-settl', linder2019', 'marleau2014', 'nextgen', 'parsec', 'saumon2008', 'sonora-bobcat', 'sonora-diamondback', 'spiegel2012'). Isochrones will be automatically downloaded. Alternatively, the ``filename`` parameter can be used in combination with ``tag``. filename : str, None Filename with the isochrone data. The argument of ``model`` will be ignored by setting the argument of ``filename``. When using ``filename``, also the argument of ``tag`` should be set. Only files with isochrone data from https://phoenix.ens-lyon.fr/Grids/ and https://perso.ens-lyon.fr/isabelle.baraffe/ are supported. The parameter is ignored by setting the argument to ``None``. tag : str, None Database tag name where the isochrone that will be stored. Setting the argument is only required in combination with the ``filename`` parameter. Otherwise, the argument can be set to ``None``. Returns ------- NoneType None """ from species.data.isochrone_data.add_isochrone import add_isochrone_grid print_section("Add isochrone grid") print(f"Evolutionary model: {model}") print(f"File name: {filename}") print(f"Database tag: {tag}") avail_models = [ "ames", "atmo", "atmo-chabrier2023", "baraffe2015", "bt-settl", "linder2019", "marleau2014", "nextgen", "parsec", "saumon2008", "sonora-bobcat", "sonora-diamondback", "spiegel2012", ] if filename is None and model not in avail_models: raise ValueError( f"The selected 'model={model}' is not supported. Please " f"select one of the following models: {avail_models}" ) with h5py.File(self.database, "a") as hdf5_file: if model == "ames": if "isochrones/ames-cond" in hdf5_file: del hdf5_file["isochrones/ames-cond"] if "isochrones/ames-dusty" in hdf5_file: del hdf5_file["isochrones/ames-dusty"] elif model == "atmo": if "isochrones/atmo-ceq" in hdf5_file: del hdf5_file["isochrones/atmo-ceq"] if "isochrones/atmo-neq-weak" in hdf5_file: del hdf5_file["isochrones/atmo-neq-weak"] if "isochrones/atmo-neq-strong" in hdf5_file: del hdf5_file["isochrones/atmo-neq-strong"] elif model == "baraffe2015": if "isochrones/baraffe2015" in hdf5_file: del hdf5_file["isochrones/baraffe2015"] elif model == "bt-settl": if "isochrones/bt-settl" in hdf5_file: del hdf5_file["isochrones/bt-settl"] elif model == "atmo-chabrier2023": if "isochrones/atmo-ceq-chabrier2023" in hdf5_file: del hdf5_file["isochrones/atmo-ceq-chabrier2023"] if "isochrones/atmo-neq-weak-chabrier2023" in hdf5_file: del hdf5_file["isochrones/atmo-neq-weak-chabrier2023"] if "isochrones/atmo-neq-strong-chabrier2023" in hdf5_file: del hdf5_file["isochrones/atmo-neq-strong-chabrier2023"] elif model == "linder2019": if "isochrones" in hdf5_file: for iso_item in list(hdf5_file["isochrones"]): if iso_item[:10] == "linder2019": del hdf5_file[f"isochrones/{iso_item}"] elif model == "marleau2014": if "isochrones/marleau2014" in hdf5_file: del hdf5_file["isochrones/marleau2014"] elif model == "nextgen": if "isochrones/nextgen" in hdf5_file: del hdf5_file["isochrones/nextgen"] elif model == "parsec": if "isochrones/parsec" in hdf5_file: del hdf5_file["isochrones/parsec"] elif model == "saumon2008": if "isochrones/saumon2008-nc_solar" in hdf5_file: del hdf5_file["isochrones/saumon2008-nc_solar"] if "isochrones/saumon2008-nc_-0.3" in hdf5_file: del hdf5_file["isochrones/saumon2008-nc_-0.3"] if "isochrones/saumon2008-nc_+0.3" in hdf5_file: del hdf5_file["isochrones/saumon2008-nc_+0.3"] if "isochrones/saumon2008-f2_solar" in hdf5_file: del hdf5_file["isochrones/saumon2008-f2_solar"] if "isochrones/saumon2008-hybrid_solar" in hdf5_file: del hdf5_file["isochrones/saumon2008-hybrid_solar"] elif model == "sonora-bobcat": if "isochrones/sonora-bobcat+0.0" in hdf5_file: del hdf5_file["isochrones/sonora-bobcat+0.0"] if "isochrones/sonora-bobcat+0.5" in hdf5_file: del hdf5_file["isochrones/sonora-bobcat+0.5"] if "isochrones/sonora-bobcat-0.5" in hdf5_file: del hdf5_file["isochrones/sonora-bobcat-0.5"] elif model == "sonora-diamondback": iso_tags = [ "nc-0.5", "nc+0.0", "nc+0.5", "hybrid-0.5", "hybrid+0.0", "hybrid+0.5", "hybrid-grav-0.5", "hybrid-grav+0.0", "hybrid-grav+0.5", ] for iso_item in iso_tags: if f"isochrones/sonora-diamondback-{iso_item}" in hdf5_file: del hdf5_file[f"isochrones/sonora-diamondback-{iso_item}"] elif model == "spiegel2012": if "isochrones/spiegel2012-hybrid+0.0" in hdf5_file: del hdf5_file["isochrones/spiegel2012-hybrid+0.0"] if "isochrones/spiegel2012-hybrid+0.5" in hdf5_file: del hdf5_file["isochrones/spiegel2012-hybrid+0.5"] if "isochrones/spiegel2012-cloudfree+0.0" in hdf5_file: del hdf5_file["isochrones/spiegel2012-cloudfree+0.0"] if "isochrones/spiegel2012-cloudfree+0.5" in hdf5_file: del hdf5_file["isochrones/spiegel2012-cloudfree+0.5"] else: if f"isochrones/{tag}" in hdf5_file: del hdf5_file[f"isochrones/{tag}"] add_isochrone_grid( self.data_folder, hdf5_file, model, filename=filename, tag=tag )
[docs] @beartype def add_model( self, model: str, wavel_range: Optional[Tuple[Real, Real]] = None, wavel_sampling: Optional[Real] = None, teff_range: Optional[Tuple[Real, Real]] = None, unpack_tar: bool = True, fit_from: Optional[Real] = None, extend_from: Optional[Real] = None, ) -> None: """ Function for adding a grid of model spectra to the database. All spectra have been resampled to logarithmically-spaced wavelengths (see :func:`~species.data.database.Database.available_models`), typically at the order of several thousand. It should be noted that the original spectra were typically calculated with a constant step size in wavenumber, so the original wavelength sampling decreased from short to long wavelengths. When fitting medium/high- resolution spectra, it is best to carefully check the result to determine if the sampling of the input grid was sufficient for modeling the spectra at the considered wavelength regime. See also :func:`~species.data.database.Database.add_custom_model` for adding a custom grid to the database. Most grids provide spectra up to the near- or mid-infrared regime. To extend the spectra towards longer wavelengths, it is possible to use the ``fit_from`` and ``extend_from`` parameters to extend the spectra with a Rayleigh-Jeans slope. Parameters ---------- model : str Model name ('ames-cond', 'ames-dusty', 'atmo-ceq', 'atmo-neq-weak', 'atmo-neq-strong', 'bt-settl', 'bt-settl-cifist', 'bt-nextgen', 'drift-phoenix', 'petitcode-cool-clear', 'petitcode-cool-cloudy', 'petitcode-hot-clear', 'petitcode-hot-cloudy', 'exo-rem', 'blackbody', bt-cond', 'bt-cond-feh, 'morley-2012', 'sonora-cholla', 'sonora-bobcat', 'sonora-bobcat-co', 'koester-wd', 'saumon2008-clear', 'saumon2008-cloudy', 'petrus2023', 'sphinx'). wavel_range : tuple(float, float), None Wavelength range (:math:`\\mu\\text{m}`) that will be stored in the database. The full wavelength range is used if the argument is set to ``None``. wavel_sampling : float, None Wavelength spacing :math:`\\lambda/\\Delta\\lambda` to which the spectra will be resampled. Typically this parameter is not needed so the argument can be set to ``None``. The only benefit of using this parameter is limiting the storage in the HDF5 database. The parameter should be used in combination with setting the ``wavel_range``. teff_range : tuple(float, float), None Range of effective temperatures (K) of which the spectra are extracted from the TAR file and added to the HDF5 database. The full grid of spectra will be added if the argument is set to ``None``. unpack_tar : bool Unpack the TAR file with the model spectra in the ``data_folder``. By default, the argument is set to ``True`` such the TAR file with the model spectra will be unpacked after downloading. In case the TAR file had already been unpacked previously, the argument can be set to ``False`` such that the unpacking will be skipped. This can save some time with unpacking large TAR files. fit_from : float, None Extend the spectra with a Rayleigh-Jeans slope. To do so, the red end of the spectra will be fitted by setting ``fit_from`` to the minimum wavelength (in um) beyond which fluxes will be included in the least-squares fit. The spectra are not extended when setting the argument to ``None``. extend_from : float, None This parameter can be used in combination with ``fit_from``. The argument of ``extend_from`` is the minimum wavelength (in um) from which the spectra will be extended with the Rayleigh-Jeans slope. The spectra will be extended from the last available wavelength when setting the argument to ``None``. Typically, the value of ``fit_from`` will be smaller than the value of ``extend_from``. Returns ------- NoneType None """ from species.data.model_data.model_spectra import add_model_grid with h5py.File(self.database, "a") as hdf5_file: add_model_grid( model_tag=model, input_path=self.data_folder, database=hdf5_file, wavel_range=wavel_range, teff_range=teff_range, wavel_sampling=wavel_sampling, unpack_tar=unpack_tar, fit_from=fit_from, extend_from=extend_from, )
[docs] @beartype def add_custom_model( self, model: str, data_path: Union[str, Path], parameters: List[str], wavel_range: Optional[Tuple[Real, Real]] = None, wavel_sampling: Optional[Real] = None, teff_range: Optional[Tuple[Real, Real]] = None, ) -> None: """ Function for adding a custom grid of model spectra to the database. The spectra are read from the ``data_path`` and should contain the ``model_name`` and ``parameters`` in the filenames in the following format example: `model-name_teff_1000_logg_4.0_feh_0.0_spec.dat` or `model-name_teff_1000_logg_4.0_feh_0.0_spec.npy`. The list with ``parameters`` should contain the same parameters as are included in the filenames. Each datafile should contain two columns with the wavelengths in :math:`\\mu\\text{m}` and the fluxes in :math:`\\text{W} \\text{m}^{-2} \\mu\\text{m}^{-1}`. Each file should contain the same number and values of wavelengths. The wavelengths should be logarithmically spaced, so at a constant :math:`\\lambda/\\Delta\\lambda`. If not, then the ``wavel_range`` and ``wavel_sampling`` parameters should be used such that the wavelengths are resampled. Parameters ---------- model : str Name of the model grid. Should be identical to the model name that is used in the filenames. data_path : str, Path Path where the files with the model spectra are located. Either a relative or absolute path. Either a string or a ``Path`` object from ``pathlib``. The model spectra can be stored as NumPy binary files (.npy) or plain text files. parameters : list(str) List with the model parameters. The following parameters are supported: ``teff`` (for :math:`T_\\mathrm{eff}`), ``logg`` (for :math:`\\log\\,g`), ``feh`` (for [Fe/H]), ``co`` (for C/O), ``fsed`` (for :math:`f_\\mathrm{sed}`), ``logkzz`` (for :math:`\\log\\,K_\\mathrm{zz}`), ``adindex`` (for :math:`\\gamma_\\mathrm{ad}`), and ``log_co_iso`` (for :math:`\\log\\,^{12}\\mathrm{CO}/^{13}\\mathrm{CO}`). Please contact the code maintainer if support for other parameters should be added. wavel_range : tuple(float, float), None Wavelength range (:math:`\\mu\\text{m}`) that will be stored in the database. The full wavelength range is used if the argument is set to ``None``. wavel_sampling : float, None Wavelength spacing :math:`\\lambda/\\Delta\\lambda` to which the spectra will be resampled. Typically this parameter is not needed so the argument can be set to ``None``. The only benefit of using this parameter is limiting the storage in the HDF5 database. The parameter should be used in combination with setting the ``wavel_range``. teff_range : tuple(float, float), None Effective temperature range (K) for adding a subset of the model grid. The full parameter grid will be added if the argument is set to ``None``. Returns ------- NoneType None """ from species.data.model_data.custom_model import add_custom_model_grid with h5py.File(self.database, "a") as hdf5_file: add_custom_model_grid( model, data_path, parameters, hdf5_file, wavel_range, teff_range, wavel_sampling, )
[docs] @beartype def add_object( self, object_name: str, parallax: Optional[Tuple[Real, Real]] = None, app_mag: Optional[ Dict[str, Union[Tuple[Real, Real], List[Tuple[Real, Real]]]] ] = None, flux_density: Optional[Dict[str, Tuple[Real, Real]]] = None, spectrum: Optional[ Dict[ str, Tuple[ Union[str, np.ndarray], Optional[Union[str, np.ndarray]], Optional[Real], ], ] ] = None, deredden: Optional[Union[Dict[str, Real], Real]] = None, verbose: bool = True, units: Optional[Dict[str, Union[str, Tuple[str, str]]]] = None, ) -> None: """ Function for adding the photometry and/or spectra of an object to the database. Parameters ---------- object_name : str Object name that will be used as label in the database. parallax : tuple(float, float), None Parallax and uncertainty (mas). Not stored if the argument is set to ``None``. app_mag : dict, None Dictionary with the filter names, apparent magnitudes, and uncertainties. For example, ``{'Paranal/NACO.Lp': (15., 0.2), 'Paranal/NACO.Mp': (13., 0.3)}``. For the use of duplicate filter names, the magnitudes have to be provided in a list, for example ``{'Paranal/NACO.Lp': [(15., 0.2), (14.5, 0.5)], 'Paranal/NACO.Mp': (13., 0.3)}``. No photometry is stored if the argument is set to ``None``. flux_density : dict, None Dictionary with filter names, flux densities (W m-2 um-1), and uncertainties (W m-1 um-1), or setting the ``units`` parameter when other flux units are used. For example, ``{'Paranal/NACO.Lp': (1e-15, 1e-16)}``. Currently, the use of duplicate filters is not implemented. The use of ``app_mag`` is preferred over ``flux_density`` because with ``flux_density`` only fluxes are stored while with ``app_mag`` both magnitudes and fluxes. However, ``flux_density`` can be used in case the magnitudes and/or filter profiles are not available. In that case, the fluxes can still be selected with ``inc_phot`` in :class:`~species.fit.fit_model.FitModel`. The ``flux_density`` can also be used for an upper limit on a photometric flux, by setting the flux to zero and the uncertainty to the 1:math:`\\sigma` upper limit. The argument of ``flux_density`` is ignored if set to ``None``. spectrum : dict, None Dictionary with the spectrum, optional covariance matrix, and resolving power of the instrument. The input data can either be a FITS or ASCII file, or as NumPy array directly. The spectra should have 3 columns, so the array shape should be (n_wavelengths, 3), with wavelength (um), flux (W m-2 um-1), and uncertainty (W m-2 um-1). The ``units`` parameter can be set for reading in data with different wavelength and/or flux units. The covariance matrix should be 2D with the same number of wavelength points as the spectrum, so the shape is (n_wavelengths, n_wavelengths). Like the spectrum, the covariance matrix can be provided as FITS or ASCII file, or as NumPy array directly. For example, when providing the input as files, ``{'SPHERE': ('spectrum.dat', 'covariance.fits', 50.)}``. No covariance data is stored if set to ``None``, for example, ``{'SPHERE': ('spectrum.dat', None, 50.)}``. The ``spectrum`` parameter is ignored if set to ``None``. For VLTI/GRAVITY data, the same FITS file can be provided as spectrum and covariance matrix. deredden : dict, float, None Dictionary with ``spectrum`` and ``app_mag`` names that will be dereddened with the provided :math:`A_V`. For example, ``deredden={'SPHERE': 1.5, 'Keck/NIRC2.J': 1.5}`` will deredden the provided spectrum named 'SPHERE' and the Keck/NIRC2 J-band photometry with a visual extinction of 1.5. For photometric fluxes, the filter-averaged extinction is used for the dereddening. verbose : bool Print details on the object data that are added to the database. units : dict, None Dictionary with the units of the data provided with ``flux_density`` and ``spectrum``. Only required if the wavelength units are not :math:`\\mu\\text{m}` and/or the flux units are not provided as :math:`\\text{W} \\text{m}^{-2} \\mu\\text{m}^{-1}`. Otherwise, the argument of ``units`` can be set to ``None`` such that it will be ignored. The dictionary keys should be the filter names as provided with ``flux_density`` and the spectrum names as provided with ``spectrum``. Supported units can be found in the docstring of :func:`~species.util.data_util.convert_units`. Returns ------- NoneType None """ from species.read.read_filter import ReadFilter if verbose: print_section("Add object") print(f"Object name: {object_name}") print(f"Units: {units}") print(f"Deredden: {deredden}") # Set default units if units is None: units = {} # First add filters here because ReadFilter # will also open the HDF5 database if app_mag is not None: for mag_item in app_mag: read_filt = ReadFilter(mag_item) if flux_density is not None: for flux_item in flux_density: read_filt = ReadFilter(flux_item) if deredden is None: deredden = {} if app_mag is not None: from species.data.spec_data.spec_vega import add_vega with h5py.File(self.database, "a") as hdf5_file: if "spectra/calibration/vega" not in hdf5_file: add_vega(self.data_folder, hdf5_file) for mag_item in app_mag: if f"filters/{mag_item}" not in hdf5_file: self.add_filter(mag_item, verbose=verbose) if flux_density is not None: from species.data.spec_data.spec_vega import add_vega with h5py.File(self.database, "a") as hdf5_file: if "spectra/calibration/vega" not in hdf5_file: add_vega(self.data_folder, hdf5_file) for flux_item in flux_density: if f"filters/{flux_item}" not in hdf5_file: self.add_filter(flux_item, verbose=verbose) hdf5_file = h5py.File(self.database, "a") if "objects" not in hdf5_file: hdf5_file.create_group("objects") if f"objects/{object_name}" not in hdf5_file: hdf5_file.create_group(f"objects/{object_name}") if parallax is not None: if verbose: print(f"Parallax (mas) = {parallax[0]:.2f} +/- {parallax[1]:.2f}") if f"objects/{object_name}/parallax" in hdf5_file: del hdf5_file[f"objects/{object_name}/parallax"] hdf5_file.create_dataset( f"objects/{object_name}/parallax", data=parallax, dtype="f8" ) # (mas) flux = {} error = {} dered_phot = {} mean_wavel = {} if app_mag is not None: if verbose: print("\nMagnitudes:") for mag_item in app_mag: read_filt = ReadFilter(mag_item) mean_wavel[mag_item] = read_filt.mean_wavelength() if isinstance(deredden, float) or mag_item in deredden: from species.util.dust_util import ism_extinction filter_profile = read_filt.get_filter() if isinstance(deredden, float): ext_mag = ism_extinction(deredden, 3.1, filter_profile[:, 0]) else: ext_mag = ism_extinction( deredden[mag_item], 3.1, filter_profile[:, 0] ) from species.phot.syn_phot import SyntheticPhotometry synphot = SyntheticPhotometry(mag_item) dered_phot[mag_item], _ = synphot.spectrum_to_flux( filter_profile[:, 0], 10.0 ** (0.4 * ext_mag) ) else: dered_phot[mag_item] = 1.0 if isinstance(app_mag[mag_item], tuple): try: from species.phot.syn_phot import SyntheticPhotometry synphot = SyntheticPhotometry(mag_item) flux[mag_item], error[mag_item] = synphot.magnitude_to_flux( app_mag[mag_item][0], app_mag[mag_item][1] ) flux[mag_item] *= dered_phot[mag_item] except KeyError: warnings.warn( f"Filter '{mag_item}' is not available on the SVO Filter " f"Profile Service so a flux calibration cannot be done. " f"Please add the filter manually with the 'add_filter' " f"function. For now, only the '{mag_item}' magnitude of " f"'{object_name}' is stored." ) # Write NaNs if the filter is not available flux[mag_item], error[mag_item] = np.nan, np.nan elif isinstance(app_mag[mag_item], list): flux_list = [] error_list = [] for i, dupl_item in enumerate(app_mag[mag_item]): try: from species.phot.syn_phot import SyntheticPhotometry synphot = SyntheticPhotometry(mag_item) flux_dupl, error_dupl = synphot.magnitude_to_flux( dupl_item[0], dupl_item[1] ) flux_dupl *= dered_phot[mag_item] except KeyError: warnings.warn( f"Filter '{mag_item}' is not available on the SVO " f"Filter Profile Service so a flux calibration cannot " f"be done. Please add the filter manually with the " f"'add_filter' function. For now, only the " f"'{mag_item}' magnitude of '{object_name}' is " f"stored." ) # Write NaNs if the filter is not available flux_dupl, error_dupl = np.nan, np.nan flux_list.append(flux_dupl) error_list.append(error_dupl) flux[mag_item] = flux_list error[mag_item] = error_list else: raise ValueError( "The values in the dictionary with magnitudes " "should be tuples or a list with tuples (in " "case duplicate filter names are required)." ) for mag_item in app_mag: if f"objects/{object_name}/{mag_item}" in hdf5_file: del hdf5_file[f"objects/{object_name}/{mag_item}"] if isinstance(app_mag[mag_item], tuple): n_phot = 1 app_mag[mag_item] = ( app_mag[mag_item][0] - 2.5 * np.log10(dered_phot[mag_item]), app_mag[mag_item][1], ) if verbose: print(f" - {mag_item}:") print( f" - Mean wavelength (um) = {mean_wavel[mag_item]:.4e}" ) print( f" - Apparent magnitude = {app_mag[mag_item][0]:.2f} +/- " f"{app_mag[mag_item][1]:.2f}" ) print( f" - Flux (W m-2 um-1) = {flux[mag_item]:.2e} +/- " f"{error[mag_item]:.2e}" ) if isinstance(deredden, float): print(f" - Dereddening A_V: {deredden}") elif mag_item in deredden: print(f" - Dereddening A_V: {deredden[mag_item]}") data = np.asarray( [ app_mag[mag_item][0], app_mag[mag_item][1], flux[mag_item], error[mag_item], ] ) elif isinstance(app_mag[mag_item], list): n_phot = len(app_mag[mag_item]) if verbose: print(f" - {mag_item} ({n_phot} values):") mag_list = [] mag_err_list = [] for i, dupl_item in enumerate(app_mag[mag_item]): dered_mag = app_mag[mag_item][i][0] - 2.5 * np.log10( dered_phot[mag_item] ) app_mag_item = (dered_mag, app_mag[mag_item][i][1]) if verbose: print( f" - Mean wavelength (um) = {mean_wavel[mag_item]:.4e}" ) print( f" - Apparent magnitude = {app_mag_item[0]:.2f} +/- " f"{app_mag_item[1]:.2f}" ) print( f" - Flux (W m-2 um-1) = {flux[mag_item][i]:.2e} +/- " f"{error[mag_item][i]:.2e}" ) if isinstance(deredden, float): print(f" - Dereddening A_V: {deredden}") elif mag_item in deredden: print(f" - Dereddening A_V: {deredden[mag_item]}") mag_list.append(app_mag_item[0]) mag_err_list.append(app_mag_item[1]) data = np.asarray( [mag_list, mag_err_list, flux[mag_item], error[mag_item]] ) # (mag), (mag), (W m-2 um-1), (W m-2 um-1) dset = hdf5_file.create_dataset( f"objects/{object_name}/{mag_item}", data=data, dtype="f8" ) dset.attrs["n_phot"] = n_phot if flux_density is not None: if verbose: print("\nFlux densities:") for flux_item in flux_density: read_filt = ReadFilter(flux_item) mean_wavel[flux_item] = read_filt.mean_wavelength() if isinstance(deredden, float) or flux_item in deredden: warnings.warn( "The deredden parameter is not supported " "by flux_density. Please use app_mag instead " "and/or open an issue on Github. Ignoring " f"the dereddening of {flux_item}." ) if f"objects/{object_name}/{flux_item}" in hdf5_file: del hdf5_file[f"objects/{object_name}/{flux_item}"] if isinstance(flux_density[flux_item], tuple): data = np.asarray( [ np.nan, np.nan, flux_density[flux_item][0], flux_density[flux_item][1], ] ) if flux_item in units: from species.util.data_util import convert_units flux_in = np.array([[mean_wavel[flux_item], data[2], data[3]]]) flux_out = convert_units( flux_in, ("um", units[flux_item]), convert_from=True ) data = [np.nan, np.nan, flux_out[0, 1], flux_out[0, 2]] if verbose: print(f" - {flux_item}:") print( f" - Mean wavelength (um) = {mean_wavel[flux_item]:.4e}" ) print( f" - Flux (W m-2 um-1) = {data[2]:.2e} +/- {data[3]:.2e}" ) # None, None, (W m-2 um-1), (W m-2 um-1) dset = hdf5_file.create_dataset( f"objects/{object_name}/{flux_item}", data=data, dtype="f8" ) dset.attrs["n_phot"] = 1 if spectrum is not None: if verbose: print("\nSpectra:") read_spec = {} read_cov = {} # Read spectra spec_nan = {} for spec_item, spec_value in spectrum.items(): if "/" in spec_item: raise ValueError( "Spectrum names can't include a slash as " f"character. Please adjust '{spec_item}'." ) if f"objects/{object_name}/spectrum/{spec_item}" in hdf5_file: del hdf5_file[f"objects/{object_name}/spectrum/{spec_item}"] if isinstance(spec_value[0], str): if spec_value[0].endswith(".fits") or spec_value[0].endswith( ".fit" ): with fits.open(spec_value[0]) as hdulist: if ( "INSTRU" in hdulist[0].header and hdulist[0].header["INSTRU"] == "GRAVITY" ): # Read data from a FITS file with the GRAVITY format if "OBJECT" in hdulist[0].header: gravity_object = hdulist[0].header["OBJECT"] else: gravity_object = None if verbose: print(" - GRAVITY spectrum:") print(f" - Object: {gravity_object}") # Wavelength (um) wavelength = hdulist[1].data["WAVELENGTH"].byteswap() wavelength = wavelength.view( wavelength.dtype.newbyteorder("=") ) # Flux (W m-2 um-1) flux = hdulist[1].data["FLUX"].byteswap() flux = flux.view(flux.dtype.newbyteorder("=")) # Covariance (W m-2 um-1)^2 covariance = hdulist[1].data["COVARIANCE"].byteswap() covariance = covariance.view( covariance.dtype.newbyteorder("=") ) # Uncorrelated uncertainties (W m-2 um-1) error = np.sqrt(np.diag(covariance)) read_spec[spec_item] = np.column_stack( [wavelength, flux, error] ) else: # Otherwise try to read a 2D dataset with 3 columns if verbose: print(" - Spectrum:") for i, hdu_item in enumerate(hdulist): data = hdu_item.data.byteswap() data = data.view(data.dtype.newbyteorder("=")) if ( data.ndim == 2 and 3 in data.shape and spec_item not in read_spec ): if spec_item in units: from species.util.data_util import ( convert_units, ) data = convert_units( data, units[spec_item], convert_from=True, ) read_spec[spec_item] = data if spec_item not in read_spec: raise ValueError( f"The spectrum data from {spec_value[0]} cannot " f"be read. The data format should be 2D with " f"3 columns." ) else: try: data = np.loadtxt(spec_value[0]) except UnicodeDecodeError: raise ValueError( f"The spectrum data from {spec_value[0]} cannot " "be read. Please provide a FITS or ASCII file." ) if data.ndim != 2 or 3 not in data.shape: raise ValueError( f"The spectrum data from {spec_value[0]} " "cannot be read. The data format " "should be 2D with 3 columns." ) if verbose: print(" - Spectrum:") if spec_item in units: from species.util.data_util import convert_units data = convert_units( data, units[spec_item], convert_from=True ) read_spec[spec_item] = data else: read_spec[spec_item] = spec_value[0] if isinstance(deredden, float): from species.util.dust_util import ism_extinction ext_mag = ism_extinction(deredden, 3.1, read_spec[spec_item][:, 0]) read_spec[spec_item][:, 1] *= 10.0 ** (0.4 * ext_mag) elif spec_item in deredden: from species.util.dust_util import ism_extinction ext_mag = ism_extinction( deredden[spec_item], 3.1, read_spec[spec_item][:, 0] ) read_spec[spec_item][:, 1] *= 10.0 ** (0.4 * ext_mag) if ( read_spec[spec_item].shape[0] == 3 and read_spec[spec_item].shape[1] != 3 ): warnings.warn( f"Transposing the data of {spec_item} because " f"the first instead of the second axis " f"has a length of 3." ) read_spec[spec_item] = read_spec[spec_item].transpose() # Remove wavelengths with NaN flux and/or error nan_idx_flux = np.isnan(read_spec[spec_item][:, 1]) nan_idx_error = np.isnan(read_spec[spec_item][:, 2]) nan_idx = nan_idx_flux + nan_idx_error # Add NaN booleans to dictionary for adjusting # the covariance matrix later on spec_nan[spec_item] = nan_idx if np.sum(nan_idx) != 0: read_spec[spec_item] = read_spec[spec_item][~nan_idx, :] warnings.warn( f"Found {np.sum(nan_idx)} fluxes with NaN in " f"the data of {spec_item}. Removing the spectral " f"fluxes that contain a NaN." ) wavelength = read_spec[spec_item][:, 0] flux = read_spec[spec_item][:, 1] error = read_spec[spec_item][:, 2] if verbose: print(f" - Database tag: {spec_item}") if isinstance(spec_value[0], str): print(f" - Filename: {spec_value[0]}") print(f" - Data shape: {read_spec[spec_item].shape}") print( f" - Wavelength range (um): {wavelength[0]:.2f} - {wavelength[-1]:.2f}" ) print(f" - Mean flux (W m-2 um-1): {np.nanmean(flux):.2e}") print(f" - Mean error (W m-2 um-1): {np.nanmean(error):.2e}") if isinstance(deredden, float): print(f" - Dereddening A_V: {deredden}") elif spec_item in deredden: print(f" - Dereddening A_V: {deredden[spec_item]}") # Read covariance matrix for spec_item, spec_value in spectrum.items(): if spec_value[1] is None: read_cov[spec_item] = None elif isinstance(spec_value[1], str): if spec_value[1].endswith(".fits") or spec_value[1].endswith( ".fit" ): with fits.open(spec_value[1]) as hdulist: if ( "INSTRU" in hdulist[0].header and hdulist[0].header["INSTRU"] == "GRAVITY" ): # Read data from a FITS file with the GRAVITY format if "OBJECT" in hdulist[0].header: gravity_object = hdulist[0].header["OBJECT"] else: gravity_object = None if verbose: print(" - GRAVITY covariance matrix:") print(f" - Object: {gravity_object}") # (W m-2 um-1)^2 read_cov[spec_item] = ( hdulist[1].data["COVARIANCE"].byteswap() ) read_cov[spec_item] = read_cov[spec_item].view( read_cov[spec_item].dtype.newbyteorder("=") ) else: if spec_item in units: warnings.warn( "The unit conversion has not been " "implemented for covariance matrices. " "Please open an issue on the Github " "page if such functionality is required " "or provide the file with covariances " "in (W m-2 um-1)^2." ) # Otherwise try to read a square, 2D dataset if verbose: print(" - Covariance matrix:") for i, hdu_item in enumerate(hdulist): data = hdu_item.data.byteswap() data = data.view(data.dtype.newbyteorder("=")) corr_warn = ( f"The matrix from {spec_value[1]} contains " f"ones along the diagonal. Converting this " f"correlation matrix into a covariance matrix." ) from species.util.data_util import ( correlation_to_covariance, ) if ( data.ndim == 2 and data.shape[0] == data.shape[1] ): if spec_item not in read_cov: if ( data.shape[0] == read_spec[spec_item].shape[0] ): if np.all(np.diag(data) == 1.0): warnings.warn(corr_warn) read_cov[spec_item] = ( correlation_to_covariance( data, read_spec[spec_item][:, 2], ) ) else: read_cov[spec_item] = data if spec_item not in read_cov: raise ValueError( f"The covariance matrix from {spec_value[1]} cannot " f"be read. The data format should be 2D with the " f"same number of wavelength points as the " f"spectrum." ) else: try: data = np.loadtxt(spec_value[1]) except UnicodeDecodeError: raise ValueError( f"The covariance matrix from {spec_value[1]} " f"cannot be read. Please provide a " f"FITS or ASCII file." ) if data.ndim != 2 or data.shape[0] != data.shape[1]: raise ValueError( f"The covariance matrix from {spec_value[1]} " f"cannot be read. The data format " f"should be 2D with the same number of " f"wavelength points as the spectrum." ) if verbose: print(" - Covariance matrix:") if np.all(np.diag(data) == 1.0): warnings.warn( f"The matrix from {spec_value[1]} contains " f"ones on the diagonal. Converting this " f" correlation matrix into a covariance " f"matrix." ) from species.util.data_util import correlation_to_covariance read_cov[spec_item] = correlation_to_covariance( data, read_spec[spec_item][:, 2] ) else: read_cov[spec_item] = data else: read_cov[spec_item] = spec_value[1] if read_cov[spec_item] is not None: # Remove the wavelengths for which the flux is NaN read_cov[spec_item] = read_cov[spec_item][~spec_nan[spec_item], :] read_cov[spec_item] = read_cov[spec_item][:, ~spec_nan[spec_item]] if verbose and read_cov[spec_item] is not None: print(f" - Database tag: {spec_item}") if isinstance(spec_value[1], str): print(f" - Filename: {spec_value[1]}") print(f" - Data shape: {read_cov[spec_item].shape}") if verbose: print(" - Instrument resolution:") for spec_item, spec_value in spectrum.items(): hdf5_file.create_dataset( f"objects/{object_name}/spectrum/{spec_item}/spectrum", data=read_spec[spec_item], dtype="f8", ) if read_cov[spec_item] is not None: hdf5_file.create_dataset( f"objects/{object_name}/spectrum/{spec_item}/covariance", data=read_cov[spec_item], dtype="f8", ) hdf5_file.create_dataset( f"objects/{object_name}/spectrum/{spec_item}/inv_covariance", data=np.linalg.inv(read_cov[spec_item]), dtype="f8", ) dset = hdf5_file[f"objects/{object_name}/spectrum/{spec_item}"] if spec_value[2] is None: if verbose: print(f" - {spec_item}: None") dset.attrs["specres"] = np.nan else: if verbose: print(f" - {spec_item}: {spec_value[2]:.1f}") dset.attrs["specres"] = float(spec_value[2]) hdf5_file.close()
[docs] @beartype def add_simple_object( self, object_name: str, simple_database: Optional[str] = None, ) -> None: """ Function for retrieving data of an individual object from the `SIMPLE database <https://simple-bd-archive.org>`_ and adding the data to the ``species`` database. This function includes code that has been adapted from `SEDkitSIMPLE <https://github.com/dr-rodriguez/SEDkitSIMPLE>`_. Parameters ---------- object_name : str Object name that will be used for the query and also used as label by which the data will be stored in the database. simple_database : str, None Path to the ``SIMPLE`` database file `SIMPLE.db`. The binary file is automatically downloaded from the `Github page <https://github.com/SIMPLE-AstroDB/SIMPLE-binary>`_ of ``SIMPLE`` when the argument is set to ``None``. Returns ------- NoneType None """ print_section("Add SIMPLE object") from astrodbkit2.astrodb import Database as astro_db from sqlalchemy import and_ if simple_database is None: simple_database = os.path.join(self.data_folder, "SIMPLE.db") url = ( "https://github.com/SIMPLE-AstroDB/" "SIMPLE-binary/raw/main/SIMPLE.db" ) if not os.path.isfile(simple_database): pooch.retrieve( url=url, known_hash=None, fname="SIMPLE.db", path=self.data_folder, progressbar=True, ) print(f"SIMPLE database: {simple_database}") simple_db = astro_db(f"sqlite:///{simple_database}") def _fetch_bibcode(ref: str) -> Optional[str]: """ Internal function for fetching a reference from the SIMPLE database Parameters ---------- ref : str Reference for which the `bibcode` should be retrieved. Returns ------- NoneType Bibcode of the reference. Returns a ``None`` in case the bibcode is not available. """ bibcode = None if hasattr(simple_db.Publications.c, "bibcode"): if hasattr(simple_db.Publications.c, "reference"): simple_query = simple_db.query(simple_db.Publications.c.bibcode) simple_result = simple_query.filter( simple_db.Publications.c.reference == ref ) bibcode = simple_result.table()["bibcode"][0] return bibcode # Get SIMPLE name of object print(f"\nObject name: {object_name}") sources = simple_db.search_object(object_name) if len(sources) > 0: if len(sources) > 1: warnings.warn( "Multiple sources found in the SIMPLE " f"database for {object_name}. The first " "retrieved source will be selected." ) simple_id = sources["source"][0] else: raise ValueError( f"The requested source, {object_name}, is " "not found in the SIMPLE database." ) print(f"Retrieved name: {simple_id}") # Get inventory of the queried object inventory = simple_db.inventory(simple_id, pretty_print=False) # Get coordinates coordinates = None if "Sources" in inventory: if len(inventory["Sources"]) == 1: data = inventory["Sources"][0] coordinates = SkyCoord(ra=data["ra"] * u.deg, dec=data["dec"] * u.deg) if coordinates is None: print("Coordinates: None") else: coord_str = coordinates.to_string( "hmsdms", alwayssign=True, precision=2, pad=True ) print(f"Coordinates: {coord_str}") # Get parallax parallax = None if "Parallaxes" in inventory: for row in inventory["Parallaxes"]: value = Quantity(row["parallax"], unit=u.mas) error = Quantity(row["parallax_error"], unit=u.mas) bibcode = _fetch_bibcode(row["reference"]) parallax = (value.value, error.value) if row["adopted"]: break if parallax is None: print("Parallax: None") else: print( f"Parallax: {(parallax)[0]:.2f} +/- " f"{(parallax)[1]:.2f} mas (bibcode: {bibcode})" ) # Get spectral type spectral_type = None if "SpectralTypes" in inventory: for row in inventory["SpectralTypes"]: bibcode = _fetch_bibcode(row["reference"]) spectral_type = row["spectral_type_string"] if row["adopted"]: break if spectral_type is None: print("Spectral type: None") else: print(f"Spectral type: {spectral_type} (bibcode: {bibcode})") # Get photometry app_mag = None if "Photometry" in inventory: app_mag = {} bibcode = {} for row in inventory["Photometry"]: filter_name = None if row["band"].split(".")[0] == "GAIA3": filter_name = "GAIA/" + row["band"] elif row["band"].split(".")[0] == "2MASS": filter_name = "2MASS/" + row["band"] elif row["band"].split(".")[0] == "WISE": filter_name = "WISE/" + row["band"] elif row["band"].split(".")[0] == "WISE": filter_name = "WISE/" + row["band"] if filter_name is not None and row["magnitude_error"] is not None: bibcode[filter_name] = _fetch_bibcode(row["reference"]) app_mag[filter_name] = (row["magnitude"], row["magnitude_error"]) if app_mag is None: print("Magnitudes: None") else: print("\nMagnitudes:") for key, value in app_mag.items(): if value[1] > 1e-2: print( f" - {key} = {value[0]:.2f} +/- {value[1]:.2f} (bibcode: {bibcode[key]})" ) elif value[1] < 1e-2 and value[1] > 1e-3: print( f" - {key} = {value[0]:.3f} +/- {value[1]:.3f} (bibcode: {bibcode[key]})" ) elif value[1] < 1e-3 and value[1] > 1e-4: print( f" - {key} = {value[0]:.4f} +/- {value[1]:.4f} (bibcode: {bibcode[key]})" ) else: print( f" - {key} = {value[0]:.5f} +/- {value[1]:.5f} (bibcode: {bibcode[key]})" ) # Get spectra spectrum = None if "Spectra" in inventory: spectrum = {} bibcode = {} for row in inventory["Spectra"]: spec_tag = f"{row['telescope']} {row['instrument']}" if row["mode"] != "Missing": spec_tag += f" {row['mode']}" simple_query = simple_db.query(simple_db.Spectra) spec_table = simple_query.filter( and_( simple_db.Spectra.c.source == simple_id, simple_db.Spectra.c.regime == row["regime"], simple_db.Spectra.c.reference == row["reference"], simple_db.Spectra.c.observation_date == row["observation_date"], ) ).astropy(spectra="spectrum", spectra_format=None) wavel = spec_table["spectrum"][0].wavelength flux = spec_table["spectrum"][0].flux wavel = wavel.to(u.micron).value flux = flux.to(u.W / u.m**2 / u.um).value if spec_table["spectrum"][0].uncertainty is not None: error = spec_table["spectrum"][0].uncertainty.quantity error = error.to(u.W / u.m**2 / u.um).value else: error = np.full(wavel.size, np.nan) spec_data = np.column_stack([wavel, flux, error]) spec_res = input( "Please provide the resolving power, " f"R = lambda/Delta_lambda, for '{spec_tag}': " ) bibcode[spec_tag] = _fetch_bibcode(row["reference"]) spectrum[spec_tag] = ( spec_data, np.diag(spec_data[:, 2] ** 2), float(spec_res), ) if spectrum is None: if app_mag is not None: print() print("Spectra: None") else: print("\nSpectra:") for key, value in spectrum.items(): print(f" - {key}") print(f" - Array shape: {value[0].shape}") print(f" - lambda/d_lambda: {value[2]}") print(f" - bibcode: {bibcode[key]}") self.add_object( object_name=object_name, parallax=parallax, app_mag=app_mag, spectrum=spectrum, verbose=False, )
[docs] @beartype def add_photometry(self, phot_library: str) -> None: """ Function for adding a photometry library to the database. Parameters ---------- phot_library : str Photometric library ('vlm-plx', 'leggett', or 'beiler2024'). Returns ------- NoneType None """ print_section("Add photometric library") print(f"Database tag: {phot_library}") with h5py.File(self.database, "a") as hdf5_file: if f"photometry/{phot_library}" in hdf5_file: del hdf5_file[f"photometry/{phot_library}"] if phot_library[0:7] == "vlm-plx": from species.data.phot_data.phot_vlm_plx import add_vlm_plx print("Library: Database of Ultracool Parallaxes") add_vlm_plx(self.data_folder, hdf5_file) elif phot_library[0:7] == "leggett": from species.data.phot_data.phot_leggett import add_leggett add_leggett(self.data_folder, hdf5_file) elif phot_library[0:10] == "beiler2024": from species.data.phot_data.phot_jwst_ydwarfs import add_jwst_ydwarfs add_jwst_ydwarfs(self.data_folder, hdf5_file)
[docs] @beartype def add_calibration( self, tag: str, filename: Optional[str] = None, data: Optional[np.ndarray] = None, units: Optional[Dict[str, str]] = None, scaling: Optional[Tuple[Real, Real]] = None, ) -> None: """ Function for adding a calibration spectrum to the database. Parameters ---------- tag : str Tag name in the database. filename : str, None Name of the file that contains the calibration spectrum. The file could be either a plain text file, in which the first column contains the wavelength (um), the second column the flux density (W m-2 um-1), and the third column the uncertainty (W m-2 um-1). Or, a FITS file can be provided in which the data is stored as a 2D array in the primary HDU. The ``data`` argument is not used if set to ``None``. data : np.ndarray, None Spectrum stored as 3D array with shape ``(n_wavelength, 3)``. The first column should contain the wavelength (um), the second column the flux density (W m-2 um-1), and the third column the error (W m-2 um-1). The ``filename`` argument is used if set to ``None``. units : dict, None Dictionary with the wavelength and flux units, e.g. ``{'wavelength': 'angstrom', 'flux': 'w m-2'}``. The default units (um and W m-2 um-1) are used if set to ``None``. scaling : tuple(float, float), None Scaling for the wavelength and flux as ``(scaling_wavelength, scaling_flux)``. Not used if set to ``None``. Returns ------- NoneType None """ if filename is None and data is None: raise ValueError( "Either the 'filename' or 'data' argument should be provided." ) if scaling is None: scaling = (1.0, 1.0) hdf5_file = h5py.File(self.database, "a") if "spectra/calibration" not in hdf5_file: hdf5_file.create_group("spectra/calibration") if "spectra/calibration/" + tag in hdf5_file: del hdf5_file["spectra/calibration/" + tag] if filename is not None: if filename[-5:] == ".fits": data = fits.getdata(filename).byteswap() data = data.view(data.dtype.newbyteorder("=")) if data.ndim != 2: raise RuntimeError( "The FITS file that is provided " "as argument of 'filename' does " "not contain a 2D dataset." ) if data.shape[1] != 3 and data.shape[0]: warnings.warn( f"Transposing the data that is read " f"from {filename} because the shape " f"is {data.shape} instead of " f"{data.shape[1], data.shape[0]}." ) data = np.transpose(data) else: data = np.loadtxt(filename) nan_idx = np.isnan(data[:, 1]) if np.sum(nan_idx) != 0: data = data[~nan_idx, :] warnings.warn( f"Found {np.sum(nan_idx)} fluxes with NaN in " f"the data of {filename}. Removing the " f"spectral fluxes that contain a NaN." ) if units is None: wavelength = scaling[0] * data[:, 0] # (um) flux = scaling[1] * data[:, 1] # (W m-2 um-1) else: if units["wavelength"] == "um": wavelength = scaling[0] * data[:, 0] # (um) elif units["wavelength"] == "angstrom": wavelength = scaling[0] * data[:, 0] * 1e-4 # (um) if units["flux"] == "w m-2 um-1": flux = scaling[1] * data[:, 1] # (W m-2 um-1) elif units["flux"] == "w m-2": if units["wavelength"] == "um": flux = scaling[1] * data[:, 1] / wavelength # (W m-2 um-1) if data.shape[1] == 3: if units is None: error = scaling[1] * data[:, 2] # (W m-2 um-1) else: if units["flux"] == "w m-2 um-1": error = scaling[1] * data[:, 2] # (W m-2 um-1) elif units["flux"] == "w m-2": if units["wavelength"] == "um": error = scaling[1] * data[:, 2] / wavelength # (W m-2 um-1) else: error = np.repeat(0.0, wavelength.size) # nan_idx = np.isnan(flux) # # if np.sum(nan_idx) != 0: # wavelength = wavelength[~nan_idx] # flux = flux[~nan_idx] # error = error[~nan_idx] # # warnings.warn( # f"Found {np.sum(nan_idx)} fluxes with NaN in " # f"the calibration spectrum. Removing the " # f"spectral fluxes that contain a NaN." # ) print(f"Adding calibration spectrum: {tag}...", end="", flush=True) hdf5_file.create_dataset( f"spectra/calibration/{tag}", data=np.vstack((wavelength, flux, error)), dtype="f8", ) hdf5_file.close() print(" [DONE]")
[docs] @beartype def add_spectra( self, spec_library: str, sptypes: Optional[List[str]] = None ) -> None: """ Function for adding empirical spectral libraries to the database. The spectra are stored together with several attributes such as spectral type, parallax, and Simbad ID. The spectra can be read with the functionalities of :class:`~species.read.read_spectrum.ReadSpectrum`. Parameters ---------- spec_library : str Spectral library ('irtf', 'spex', 'kesseli+2017', 'bonnefoy+2014', 'allers+2013'). sptypes : list(str) Spectral types ('F', 'G', 'K', 'M', 'L', 'T'). Currently only implemented for ``spec_library='irtf'``. Returns ------- NoneType None """ from species.data.spec_data.add_spec_data import add_spec_library print_section("Add spectral library") print(f"Database tag: {spec_library}") print(f"Spectral types: {sptypes}") with h5py.File(self.database, "a") as hdf5_file: if f"spectra/{spec_library}" in hdf5_file: del hdf5_file[f"spectra/{spec_library}"] add_spec_library(self.data_folder, hdf5_file, spec_library, sptypes)
[docs] @beartype def add_samples( self, tag: str, sampler: str, samples: np.ndarray, ln_prob: np.ndarray, modelpar: List[str], bounds: Dict[str, Tuple[Real, Real]], normal_prior: Dict[str, Tuple[Real, Real]], fixed_param: Dict[str, Real], spec_labels: Optional[List[str]] = None, attr_dict: Optional[Dict] = None, ): """ This function stores the posterior samples produced by :class:`~species.fit.fit_model.FitModel` in the database, including some additional attributes. Parameters ---------- tag : str Database tag. sampler : str Sampler ('multinest', 'ultranest', 'dynesty'). samples : np.ndarray Samples of the posterior. ln_prob : np.ndarray Log-likelihood for each sample. modelpar : list(str) List with the model parameter names. bounds : dict Dictionary with the (log-)uniform priors. normal_prior : dict Dictionary with the normal priors. fixed_param : dict Dictionary with the fixed parameters spec_labels : list(str), None List with the spectrum labels that are used for fitting an additional scaling parameter. Not used if set to ``None``. attr_dict : dict, None Dictionary with data that will be stored as attributes of the dataset with samples. Not used if set to ``None``. Returns ------- NoneType None """ print_section("Add posterior samples") print(f"Database tag: {tag}") print(f"Sampler: {sampler}") print(f"Samples shape: {samples.shape}") if spec_labels is None: spec_labels = [] with h5py.File(self.database, "a") as hdf5_file: if f"results/fit/{tag}" in hdf5_file: del hdf5_file[f"results/fit/{tag}"] dset = hdf5_file.create_dataset( f"results/fit/{tag}/samples", data=samples, dtype="f8" ) hdf5_file.create_dataset( f"results/fit/{tag}/ln_prob", data=ln_prob, dtype="f8" ) for key, value in bounds.items(): group_path = f"results/fit/{tag}/bounds/{key}" hdf5_file.create_dataset(group_path, data=value, dtype="f8") for key, value in normal_prior.items(): group_path = f"results/fit/{tag}/normal_prior/{key}" hdf5_file.create_dataset(group_path, data=value, dtype="f8") for key, value in fixed_param.items(): group_path = f"results/fit/{tag}/fixed_param/{key}" hdf5_file.create_dataset(group_path, data=value, dtype="f8") if "model_type" in attr_dict: dset.attrs["model_type"] = attr_dict["model_type"] elif "spec_type" in attr_dict: dset.attrs["model_type"] = attr_dict["spec_type"] if "model_name" in attr_dict: dset.attrs["model_name"] = attr_dict["model_name"] elif "spec_name" in attr_dict: dset.attrs["model_name"] = attr_dict["spec_name"] dset.attrs["n_param"] = int(len(modelpar)) dset.attrs["sampler"] = str(sampler) dset.attrs["n_bounds"] = int(len(bounds)) dset.attrs["n_normal_prior"] = int(len(normal_prior)) dset.attrs["n_fixed"] = int(len(fixed_param)) if "mean_accept" in attr_dict: mean_accept = float(attr_dict["mean_accept"]) dset.attrs["mean_accept"] = mean_accept print(f"Mean acceptance fraction: {mean_accept:.3f}") if "ln_evidence" in attr_dict: ln_evidence = attr_dict["ln_evidence"] dset.attrs["ln_evidence"] = ln_evidence[0] dset.attrs["ln_evidence_error"] = ln_evidence[1] print( f"ln(Z) = {float(ln_evidence[0]):.2f} " f"+/- {float(ln_evidence[1]):.2f}" ) count_scaling = 0 for i, item in enumerate(modelpar): dset.attrs[f"parameter{i}"] = str(item) if item in spec_labels: dset.attrs[f"scaling{count_scaling}"] = str(item) count_scaling += 1 dset.attrs["n_scaling"] = int(count_scaling) for key, value in attr_dict.items(): dset.attrs[key] = value
[docs] @beartype def get_probable_sample( self, tag: str, verbose: bool = True, ) -> Dict[str, float]: """ Function for extracting the sample parameters with the maximum likelihood. Parameters ---------- tag : str Database tag with the posterior results. verbose : bool Print output, including the parameter values. Returns ------- dict Parameters of the sample with the maximum likelihood. """ from species.util.model_util import binary_to_single, check_nearest_spec if verbose: print_section("Get sample with the maximum likelihood") print(f"Database tag: {tag}") with h5py.File(self.database, "r") as hdf5_file: dset = hdf5_file[f"results/fit/{tag}/samples"] dset_attrs = dict(dset.attrs) samples = np.asarray(dset) ln_prob = np.asarray(hdf5_file[f"results/fit/{tag}/ln_prob"]) if "n_param" in dset_attrs: n_param = dset_attrs["n_param"] else: n_param = dset_attrs["nparam"] index_max = np.argmax(ln_prob) max_sample = samples[index_max,] prob_sample = {} for i in range(n_param): par_key = dset_attrs[f"parameter{i}"] par_value = max_sample[i] prob_sample[par_key] = par_value if ( "parallax" not in prob_sample and "parallax_0" not in prob_sample and "parallax" in dset_attrs ): prob_sample["parallax"] = dset_attrs["parallax"] elif "distance" not in prob_sample and "distance" in dset_attrs: prob_sample["distance"] = dset_attrs["distance"] if "pt_smooth" in dset_attrs: prob_sample["pt_smooth"] = dset_attrs["pt_smooth"] if "ext_model" in dset_attrs: prob_sample["ext_model"] = dset_attrs["ext_model"] if verbose: print("\nParameters:") for param_key, param_value in prob_sample.items(): if isinstance(param_value, float): if -0.01 < param_value < 0.01: print(f" - {param_key} = {param_value:.2e}") else: print(f" - {param_key} = {param_value:.2f}") if "model_type" in dset_attrs and dset_attrs["model_type"] == "atmosphere": if "binary" in dset_attrs and dset_attrs["binary"]: param_0 = binary_to_single(prob_sample, 0) check_nearest_spec(dset_attrs["model_name"], param_0) param_1 = binary_to_single(prob_sample, 1) check_nearest_spec(dset_attrs["model_name"], param_1) else: check_nearest_spec(dset_attrs["model_name"], prob_sample) return prob_sample
[docs] @beartype def get_median_sample( self, tag: str, verbose: bool = True, ) -> Dict[str, float]: """ Function for extracting the median parameter values from the posterior samples. Parameters ---------- tag : str Database tag with the posterior results. verbose : bool Print output, including the parameter values. Returns ------- dict Median parameter values of the posterior distribution. """ from species.util.model_util import binary_to_single, check_nearest_spec if verbose: print_section("Get median parameters") print(f"Database tag: {tag}") with h5py.File(self.database, "r") as hdf5_file: dset = hdf5_file[f"results/fit/{tag}/samples"] samples = np.asarray(dset) dset_attrs = dict(dset.attrs) if "n_param" in dset_attrs: n_param = dset_attrs["n_param"] else: n_param = dset_attrs["nparam"] median_sample = {} for i in range(n_param): par_key = dset_attrs[f"parameter{i}"] par_value = np.median(samples[:, i]) median_sample[par_key] = par_value if ( "parallax" not in median_sample and "parallax_0" not in median_sample and "parallax" in dset_attrs ): median_sample["parallax"] = dset_attrs["parallax"] elif "distance" not in median_sample and "distance" in dset_attrs: median_sample["distance"] = dset_attrs["distance"] if "pt_smooth" in dset_attrs: median_sample["pt_smooth"] = dset_attrs["pt_smooth"] if "ext_model" in dset_attrs: median_sample["ext_model"] = dset_attrs["ext_model"] if verbose: print("\nParameters:") for param_key, param_value in median_sample.items(): if isinstance(param_value, float): if param_value < 0.01: print(f" - {param_key} = {param_value:.2e}") else: print(f" - {param_key} = {param_value:.2f}") if "model_type" in dset_attrs and dset_attrs["model_type"] == "atmosphere": if "binary" in dset_attrs and dset_attrs["binary"]: param_0 = binary_to_single(median_sample, 0) check_nearest_spec(dset_attrs["model_name"], param_0) param_1 = binary_to_single(median_sample, 1) check_nearest_spec(dset_attrs["model_name"], param_1) else: check_nearest_spec(dset_attrs["model_name"], median_sample) return median_sample
[docs] @beartype def get_compare_sample(self, tag: str, verbose: bool = True) -> Dict[str, float]: """ Function for extracting the sample parameters for which the goodness-of-fit statistic has been minimized when using :func:`~species.fit.compare_spectra.CompareSpectra.compare_model` for comparing data with a grid of model spectra. Parameters ---------- tag : str Database tag where the results from :func:`~species.fit.compare_spectra.CompareSpectra.compare_model` are stored. verbose : bool Print output, including the parameter values. Returns ------- dict Dictionary with the best-fit parameters, including optional scaling parameters for spectra that can be applied by running :func:`~species.util.read_util.update_objectbox` on an :func:`~species.core.box.ObjectBox` by providing the returned dictionary from :func:`~species.data.database.Database.get_compare_sample` as argument. """ if verbose: print_section("Get best comparison parameters") print(f"Database tag: {tag}") with h5py.File(self.database, "r") as hdf5_file: dset = hdf5_file[f"results/comparison/{tag}/goodness_of_fit"] dset_attrs = dict(dset.attrs) n_param = dset_attrs["n_param"] n_scale_spec = dset_attrs["n_scale_spec"] model_param = {} for i in range(n_param): model_param[dset_attrs[f"parameter{i}"]] = dset_attrs[f"best_param{i}"] if "parallax" in dset_attrs: model_param["parallax"] = dset_attrs["parallax"] elif "distance" in dset_attrs: model_param["distance"] = dset_attrs["distance"] model_param["radius"] = dset_attrs["radius"] for i in range(n_scale_spec): scale_spec = dset_attrs[f"scale_spec{i}"] model_param[f"scaling_{scale_spec}"] = dset_attrs[f"scaling_{scale_spec}"] if verbose: print("\nParameters:") for param_key, param_value in model_param.items(): if param_value < 0.01: print(f" - {param_key} = {param_value:.2e}") else: print(f" - {param_key} = {param_value:.2f}") return model_param
[docs] @beartype def get_mcmc_spectra( self, tag: str, random: Optional[int] = None, wavel_range: Optional[Union[Tuple[Real, Real], str]] = None, spec_res: Optional[Real] = None, wavel_resample: Optional[np.ndarray] = None, ) -> Union[ Union[List[ModelBox], List[SpectrumBox]], Tuple[List[ModelBox], List[ModelBox], List[ModelBox]], ]: """ Function for drawing random model spectra from the sampled posterior distribution. Parameters ---------- tag : str Database tag with the posterior samples. random : int, None Number of random samples. All samples are selected by setting the argument to ``None``. wavel_range : tuple(float, float), str, None Wavelength range (um) or filter name. Full spectrum is used if set to ``None``. spec_res : float, None Spectral resolution that is used for the smoothing with a Gaussian kernel. No smoothing is applied if the argument set to ``None``. wavel_resample : np.ndarray, None Wavelength points (um) to which the model spectrum will be resampled. The resampling is applied after the optional smoothing to the resolution of ``spec_res``. Returns ------- list(species.core.box.ModelBox) List with ``ModelBox`` objects. When fitting an unresolved binary system, this list contains the model spectra of the combined flux. list(species.core.box.ModelBox) The list of ``ModelBox`` objects is only returned when fitting and unresolved binary system. It contains the model spectra of the first component. list(species.core.box.ModelBox) The list of ``ModelBox`` objects is only returned when fitting and unresolved binary system. It contains the model spectra of the second component. """ print_section(f"Get posterior spectra") print(f"Database tag: {tag}") print(f"Wavelength range (um): {wavel_range}") print(f"Resolution: {spec_res}") with h5py.File(self.database, "r") as hdf5_file: dset = hdf5_file[f"results/fit/{tag}/samples"] samples = np.array(dset) dset_attrs = dict(dset.attrs) if "model_type" in dset_attrs: model_type = dset_attrs["model_type"] else: model_type = dset_attrs["type"] if "model_name" in dset_attrs: model_name = dset_attrs["model_name"] else: model_name = dset_attrs["spectrum"] if "n_param" in dset_attrs: n_param = dset_attrs["n_param"] elif "nparam" in dset_attrs: n_param = dset_attrs["nparam"] if "n_scaling" in dset_attrs: n_scaling = dset_attrs["n_scaling"] elif "nscaling" in dset_attrs: n_scaling = dset_attrs["nscaling"] else: n_scaling = 0 if "n_error" in dset_attrs: n_error = dset_attrs["n_error"] else: n_error = 0 if "binary" in dset_attrs: binary = dset_attrs["binary"] else: binary = False if "ext_filter" in dset_attrs: ext_filter = dset_attrs["ext_filter"] else: ext_filter = None ignore_param = [] for i in range(n_scaling): ignore_param.append(dset_attrs[f"scaling{i}"]) for i in range(n_error): ignore_param.append(dset_attrs[f"error{i}"]) for i in range(n_param): if dset_attrs[f"parameter{i}"][:9] == "corr_len_": ignore_param.append(dset_attrs[f"parameter{i}"]) elif dset_attrs[f"parameter{i}"][:9] == "corr_amp_": ignore_param.append(dset_attrs[f"parameter{i}"]) if spec_res is not None and model_type == "calibration": warnings.warn( "Smoothing of the spectral resolution is not " "implemented for calibration spectra." ) if "parallax" in dset_attrs: parallax = dset_attrs["parallax"] else: parallax = None if "distance" in dset_attrs: distance = dset_attrs["distance"] else: distance = None if random is None: print(f"Number of samples: {samples.shape[0]}") else: print(f"Number of samples: {random}") rng = np.random.default_rng() samples = rng.choice(samples, random, replace=False, axis=0, shuffle=False) param = [] for i in range(n_param): param.append(dset_attrs[f"parameter{i}"]) if model_type in ["model", "atmosphere"]: if model_name == "planck": from species.read.read_planck import ReadPlanck readmodel = ReadPlanck(wavel_range) elif model_name == "powerlaw": pass else: from species.read.read_model import ReadModel readmodel = ReadModel(model_name, wavel_range=wavel_range) elif model_type == "calibration": from species.read.read_calibration import ReadCalibration readcalib = ReadCalibration(model_name, filter_name=None) boxes = [] if binary: boxes_0 = [] boxes_1 = [] else: boxes_0 = None boxes_1 = None print() for i in tqdm(range(samples.shape[0])): model_param = {} for j in range(samples.shape[1]): if param[j] not in ignore_param: model_param[param[j]] = samples[i, j] if ( "parallax" not in model_param and "parallax_0" not in model_param and parallax is not None ): model_param["parallax"] = parallax elif "distance" not in model_param and distance is not None: model_param["distance"] = distance if "ext_model" in dset_attrs: model_param["ext_model"] = dset_attrs["ext_model"] if model_type in ["model", "atmosphere"]: if model_name == "planck": spec_box = readmodel.get_spectrum( model_param, spec_res, wavel_resample=wavel_resample, ) elif model_name == "powerlaw": if wavel_resample is not None: warnings.warn( "The 'wavel_resample' parameter is not " "supported by the 'powerlaw' model so " "the argument will be ignored." ) from species.util.model_util import powerlaw_spectrum spec_box = powerlaw_spectrum(wavel_range, model_param) else: from species.util.model_util import binary_to_single if binary: param_0 = binary_to_single(model_param, 0) spec_box_0 = readmodel.get_model( param_0, spec_res=spec_res, wavel_resample=wavel_resample, ext_filter=ext_filter, ) param_1 = binary_to_single(model_param, 1) spec_box_1 = readmodel.get_model( param_1, spec_res=spec_res, wavel_resample=wavel_resample, ext_filter=ext_filter, ) # Weighted flux of two spectra for atmospheric asymmetries # Or simply the same in case of an actual binary system if "spec_weight" in model_param: flux_comb = ( model_param["spec_weight"] * spec_box_0.flux + (1.0 - model_param["spec_weight"]) * spec_box_1.flux ) else: flux_comb = spec_box_0.flux + spec_box_1.flux spec_box = create_box( boxtype="model", model=model_name, wavelength=spec_box_0.wavelength, flux=flux_comb, parameters=model_param, quantity="flux", ) else: spec_box = readmodel.get_model( model_param, spec_res=spec_res, wavel_resample=wavel_resample, ext_filter=ext_filter, ) elif model_type == "calibration": spec_box = readcalib.get_spectrum(model_param) else: spec_box = None boxes.append(spec_box) if binary: boxes_0.append(spec_box_0) boxes_1.append(spec_box_1) if binary: return boxes, boxes_0, boxes_1 else: return boxes
[docs] @beartype def get_mcmc_photometry( self, tag: str, filter_name: str, random: Optional[int] = None, phot_type: str = "magnitude", flux_units: str = "W m-2 um-1", ) -> np.ndarray: """ Function for calculating synthetic magnitudes or fluxes from the posterior samples. Parameters ---------- tag : str Database tag with the posterior samples. filter_name : str Filter name for which the synthetic photometry will be computed. random : int, None Number of random samples. All samples are selected by setting the argument to ``None``. phot_type : str Photometry type ('magnitude' or 'flux'). flux_units : tuple(str, str), None Flux units that will be used when the ``phot_type`` argument is set to ``'flux``. Supported units can be found in the docstring of :func:`~species.util.data_util.convert_units`. Returns ------- np.ndarray Synthetic magnitudes or fluxes. """ from species.read.read_filter import ReadFilter if phot_type not in ["magnitude", "flux"]: raise ValueError( "The argument of 'phot_type' is not recognized " "and should be set to 'magnitude' or 'flux'." ) with h5py.File(self.database, "r") as hdf5_file: dset = hdf5_file[f"results/fit/{tag}/samples"] dset_attrs = dict(dset.attrs) samples = np.array(dset) if random is not None: rng = np.random.default_rng() samples = rng.choice(samples, random, replace=False, axis=0, shuffle=False) if "n_param" in dset_attrs: n_param = dset_attrs["n_param"] elif "nparam" in dset_attrs: n_param = dset_attrs["nparam"] if "model_type" in dset_attrs: model_type = dset_attrs["model_type"] else: model_type = dset_attrs["type"] if "model_name" in dset_attrs: model_name = dset_attrs["model_name"] else: model_name = dset_attrs["spectrum"] if "binary" in dset_attrs: binary = dset_attrs["binary"] else: binary = False if "parallax" in dset_attrs: parallax = dset_attrs["parallax"] else: parallax = None if "distance" in dset_attrs: distance = dset_attrs["distance"] else: distance = None param = [] for i in range(n_param): param.append(dset_attrs[f"parameter{i}"]) if model_type in ["model", "atmosphere"]: if model_name == "powerlaw": from species.phot.syn_phot import SyntheticPhotometry synphot = SyntheticPhotometry(filter_name) synphot.zero_point() # Set the wavel_range attribute else: from species.read.read_model import ReadModel readmodel = ReadModel(model_name, filter_name=filter_name) elif model_type == "calibration": from species.read.read_calibration import ReadCalibration readcalib = ReadCalibration(model_name, filter_name=filter_name) mcmc_phot = np.zeros((samples.shape[0])) for i in tqdm(range(samples.shape[0]), desc="Getting MCMC photometry"): model_param = {} for j in range(n_param): model_param[param[j]] = samples[i, j] if ( "parallax" not in model_param and "parallax_0" not in model_param and parallax is not None ): model_param["parallax"] = parallax elif "distance" not in model_param and distance is not None: model_param["distance"] = distance if "ext_av" in model_param and "ext_model" in dset_attrs: model_param["ext_model"] = dset_attrs["ext_model"] if model_type in ["model", "atmosphere"]: if model_name == "powerlaw": from species.util.model_util import powerlaw_spectrum pl_box = powerlaw_spectrum(synphot.wavel_range, model_param) if phot_type == "magnitude": app_mag, _ = synphot.spectrum_to_magnitude( pl_box.wavelength, pl_box.flux ) mcmc_phot[i] = app_mag[0] elif phot_type == "flux": mcmc_phot[i], _ = synphot.spectrum_to_flux( pl_box.wavelength, pl_box.flux ) else: if phot_type == "magnitude": if binary: from species.util.model_util import binary_to_single param_0 = binary_to_single(model_param, 0) mcmc_phot_0, _ = readmodel.get_magnitude(param_0) param_1 = binary_to_single(model_param, 1) mcmc_phot_1, _ = readmodel.get_magnitude(param_1) # Weighted flux of two spectra for atmospheric asymmetries # Or simply the same in case of an actual binary system if "spec_weight" in model_param: mcmc_phot[i] = ( model_param["spec_weight"] * mcmc_phot_0 + (1.0 - model_param["spec_weight"]) * mcmc_phot_1 ) else: mcmc_phot[i] = mcmc_phot_0 + mcmc_phot_1 else: mcmc_phot[i], _ = readmodel.get_magnitude(model_param) elif phot_type == "flux": if binary: from species.util.model_util import binary_to_single param_0 = binary_to_single(model_param, 0) mcmc_phot_0, _ = readmodel.get_flux(param_0) param_1 = binary_to_single(model_param, 1) mcmc_phot_1, _ = readmodel.get_flux(param_1) # Weighted flux of two spectra for atmospheric asymmetries # Or simply the same in case of an actual binary system if "spec_weight" in model_param: mcmc_phot[i] = ( model_param["spec_weight"] * mcmc_phot_0 + (1.0 - model_param["spec_weight"]) * mcmc_phot_1 ) else: mcmc_phot[i] = mcmc_phot_0 + mcmc_phot_1 else: mcmc_phot[i], _ = readmodel.get_flux(model_param) elif model_type == "calibration": if phot_type == "magnitude": app_mag, _ = readcalib.get_magnitude(model_param=model_param) mcmc_phot[i] = app_mag[0] elif phot_type == "flux": mcmc_phot[i], _ = readcalib.get_flux(model_param=model_param) if phot_type == "flux": from species.util.data_util import convert_units read_filt = ReadFilter(filter_name) filt_wavel = read_filt.mean_wavelength() wavel_ones = np.full(mcmc_phot.size, filt_wavel) data_in = np.column_stack([wavel_ones, mcmc_phot]) data_out = convert_units(data_in, ("um", flux_units), convert_from=False) mcmc_phot = data_out[:, 1] return mcmc_phot
[docs] @beartype def get_object( self, object_name: str, inc_phot: Union[bool, List[str]] = True, inc_spec: Union[bool, List[str]] = True, verbose: bool = True, ) -> ObjectBox: """ Function for extracting the photometric and/or spectroscopic data of an object from the database. The spectroscopic data contains optionally the covariance matrix and its inverse. Parameters ---------- object_name : str Object name in the database. inc_phot : bool, list(str) Include photometric data. If a boolean, either all (``True``) or none (``False``) of the data are selected. If a list, a subset of filter names (as stored in the database) can be provided. inc_spec : bool, list(str) Include spectroscopic data. If a boolean, either all (``True``) or none (``False``) of the data are selected. If a list, a subset of spectrum names (as stored in the database with :func:`~species.data.database.Database.add_object`) can be provided. verbose : bool Print output. Returns ------- species.core.box.ObjectBox Box with the object's data. """ from species.read.read_filter import ReadFilter if verbose: print_section(f"Get object") print(f"Object name: {object_name}") print(f"Include photometry: {inc_phot}") print(f"Include spectra: {inc_spec}") with h5py.File(self.database, "r") as hdf5_file: if f"objects/{object_name}" not in hdf5_file: raise ValueError( "The argument of 'object_name' is " f"set to '{object_name}' but the " "data is not found in the database." ) dset = hdf5_file[f"objects/{object_name}"] if "parallax" in dset: parallax = np.asarray(dset["parallax"]) else: parallax = None if "distance" in dset: distance = np.asarray(dset["distance"]) else: distance = None if inc_phot: magnitude = {} flux = {} mean_wavel = {} filter_width = {} for observatory in dset.keys(): if observatory not in ["parallax", "distance", "spectrum"]: for filter_name in dset[observatory]: name = f"{observatory}/{filter_name}" if isinstance(inc_phot, bool) or name in inc_phot: magnitude[name] = dset[name][0:2] flux[name] = dset[name][2:4] phot_filters = list(magnitude.keys()) else: magnitude = None flux = None phot_filters = None mean_wavel = None filter_width = None if inc_spec and f"objects/{object_name}/spectrum" in hdf5_file: spectrum = {} for item in hdf5_file[f"objects/{object_name}/spectrum"]: data_group = f"objects/{object_name}/spectrum/{item}" if isinstance(inc_spec, bool) or item in inc_spec: if f"{data_group}/covariance" not in hdf5_file: spectrum[item] = ( np.asarray(hdf5_file[f"{data_group}/spectrum"]), None, None, hdf5_file[f"{data_group}"].attrs["specres"], ) else: spectrum[item] = ( np.asarray(hdf5_file[f"{data_group}/spectrum"]), np.asarray(hdf5_file[f"{data_group}/covariance"]), np.asarray(hdf5_file[f"{data_group}/inv_covariance"]), hdf5_file[f"{data_group}"].attrs["specres"], ) else: spectrum = None if magnitude is not None: for filter_name in magnitude.keys(): read_filt = ReadFilter(filter_name) mean_wavel[filter_name] = read_filt.mean_wavelength() filter_width[filter_name] = read_filt.filter_fwhm() return create_box( "object", name=object_name, filters=phot_filters, mean_wavel=mean_wavel, filter_width=filter_width, magnitude=magnitude, flux=flux, spectrum=spectrum, parallax=parallax, distance=distance, )
[docs] @beartype def get_samples( self, tag: str, random: Optional[int] = None, json_file: Optional[str] = None, ) -> SamplesBox: """ Parameters ---------- tag: str Database tag with the samples. random : int, None Number of random samples to select. All samples are selected if the argument is set to ``None``. json_file : str, None JSON file to store the posterior samples. The data will not be written if the argument is set to ``None``. Returns ------- species.core.box.SamplesBox Box with the posterior samples. """ print_section("Get posterior samples") with h5py.File(self.database, "r") as hdf5_file: dset = hdf5_file[f"results/fit/{tag}/samples"] ln_prob = np.asarray(hdf5_file[f"results/fit/{tag}/ln_prob"]) samples = np.asarray(dset) if random is not None: indices = np.random.randint(samples.shape[0], size=random) samples = samples[indices, :] print(f"Database tag: {tag}") print(f"Random samples: {random}") print(f"Samples shape: {samples.shape}") attributes = {} for item in dset.attrs: attributes[item] = dset.attrs[item] if "model_name" in dset.attrs: model_name = dset.attrs["model_name"] else: model_name = dset.attrs["spectrum"] if "n_param" in dset.attrs: n_param = dset.attrs["n_param"] elif "nparam" in dset.attrs: n_param = dset.attrs["nparam"] if "ln_evidence" in dset.attrs: ln_evidence = dset.attrs["ln_evidence"] else: ln_evidence = None param = [] print("\nParameters:") for i in range(n_param): param.append(dset.attrs[f"parameter{i}"]) print(f" - {param[-1]}") # Printing uniform and normal priors uniform_priors = {} normal_priors = {} if "n_bounds" in attributes and attributes["n_bounds"] > 0: dset_bounds = hdf5_file[f"results/fit/{tag}/bounds"] print("\nUniform priors (min, max):") for bound_item in dset_bounds: group_path = f"results/fit/{tag}/bounds/{bound_item}" if isinstance(hdf5_file[group_path], h5py._hl.group.Group): for filter_item in hdf5_file[group_path]: # This is required for the inflation parameter # of the photometric fluxes. Since the SVO # filter names contain a slash which introduces # a subgroup in the HDF5 database prior_bound = np.array( hdf5_file[f"{group_path}/{filter_item}"] ) print( f" - {bound_item}/{filter_item} = " f"({prior_bound[0]}, {prior_bound[1]})" ) uniform_priors[f"{bound_item}/{filter_item}"] = ( prior_bound[0], prior_bound[1], ) else: prior_bound = np.array(hdf5_file[group_path]) print( f" - {bound_item} = ({prior_bound[0]}, {prior_bound[1]})" ) uniform_priors[bound_item] = (prior_bound[0], prior_bound[1]) if "n_normal_prior" in attributes and attributes["n_normal_prior"] > 0: dset_prior = hdf5_file[f"results/fit/{tag}/normal_prior"] print("\nNormal priors (mean, sigma):") for prior_item in dset_prior: group_path = f"results/fit/{tag}/normal_prior/{prior_item}" if isinstance(hdf5_file[group_path], h5py._hl.group.Group): for filter_item in hdf5_file[group_path]: # This is required for the inflation parameter # of the photometric fluxes. Since the SVO # filter names contain a slash which introduces # a subgroup in the HDF5 database norm_prior = np.array( hdf5_file[f"{group_path}/{filter_item}"] ) if -0.1 < norm_prior[0] < 0.1: print( f" - {prior_item}/{filter_item} = " f"({norm_prior[0]:.2e}, {norm_prior[1]:.2e})" ) else: print( f" - {prior_item}/{filter_item} = " f"({norm_prior[0]:.2f}, {norm_prior[1]:.2f})" ) normal_priors[f"{prior_item}/{filter_item}"] = ( norm_prior[0], norm_prior[1], ) else: norm_prior = np.array(hdf5_file[group_path]) if -0.1 < norm_prior[0] < 0.1: print( f" - {prior_item} = ({norm_prior[0]:.2e}, {norm_prior[1]:.2e})" ) else: print( f" - {prior_item} = ({norm_prior[0]:.2f}, {norm_prior[1]:.2f})" ) normal_priors[prior_item] = (norm_prior[0], norm_prior[1]) median_sample = self.get_median_sample(tag, verbose=False) prob_sample = self.get_probable_sample(tag, verbose=False) if json_file is not None: samples_dict = {} for i, item in enumerate(param): samples_dict[item] = list(samples[:, i]) with open(json_file, "w", encoding="utf-8") as out_file: json.dump(samples_dict, out_file, indent=4) print(f"\nOutput: {json_file}") return create_box( "samples", model_name=model_name, parameters=param, samples=samples, ln_prob=ln_prob, ln_evidence=ln_evidence, prob_sample=prob_sample, median_sample=median_sample, attributes=attributes, uniform_priors=uniform_priors, normal_priors=normal_priors, )
[docs] @beartype def get_evidence(self, tag: str) -> Tuple[float, float]: """ Function for returning the log-evidence (i.e. marginalized likelihood) that was computed by the nested sampling algorithm when using :class:`~species.fit.fit_model.FitModel` or :class:`~species.fit.retrieval.AtmosphericRetrieval`. Parameters ---------- tag: str Database tag with the posterior samples. Returns ------- float Log-evidence. float Uncertainty on the log-evidence. """ with h5py.File(self.database, "r") as hdf5_file: dset = hdf5_file[f"results/fit/{tag}/samples"] if "ln_evidence" in dset.attrs: ln_evidence = dset.attrs["ln_evidence"] else: ln_evidence = (None, None) return ln_evidence[0], ln_evidence[1]
[docs] @beartype def get_pt_profiles( self, tag: str, random: Optional[int] = None, out_file: Optional[str] = None ) -> Tuple[np.ndarray, np.ndarray]: """ Function for returning the pressure-temperature profiles from the posterior of the atmospheric retrieval with ``petitRADTRANS``. The data can also optionally be written to an output file. Parameters ---------- tag: str Database tag with the posterior samples from the atmospheric retrieval with :class:`~species.fit.retrieval.AtmosphericRetrieval`. random : int, None Number of random samples that will be used for the P-T profiles. All samples will be selected if set to ``None``. out_file : str, None Output file to store the P-T profiles. The data will be stored in a FITS file if the argument of ``out_file`` ends with `.fits`. Otherwise, the data will be written to a text file. The data has two dimensions with the first column containing the pressures (bar) and the remaining columns the temperature profiles (K). The data will not be written to a file if the argument is set to ``None``. Returns ------- np.ndarray Array (1D) with the pressures (bar). np.ndarray Array (2D) with the temperature profiles (K). The shape of the array is (n_pressures, n_samples). """ from species.util.retrieval_util import ( pt_ret_model, pt_spline_interp, ) hdf5_file = h5py.File(self.database, "r") dset = hdf5_file[f"results/fit/{tag}/samples"] if "model_name" in dset.attrs: model_name = dset.attrs["model_name"] else: model_name = dset.attrs["spectrum"] pt_profile = dset.attrs["pt_profile"] if model_name != "petitradtrans": raise ValueError( f"The model spectrum of the posterior samples is '{model_name}' " f"instead of 'petitradtrans'. Extracting P-T profiles is " f"therefore not possible." ) if "n_param" in dset.attrs: n_param = dset.attrs["n_param"] elif "nparam" in dset.attrs: n_param = dset.attrs["nparam"] if "temp_nodes" in dset.attrs: temp_nodes = dset.attrs["temp_nodes"] else: temp_nodes = 15 samples = np.asarray(dset) if random is None: n_profiles = samples.shape[0] else: n_profiles = random indices = np.random.randint(samples.shape[0], size=random) samples = samples[indices, :] param_index = {} for i in range(n_param): param_index[dset.attrs[f"parameter{i}"]] = i hdf5_file.close() press = np.logspace(-6, 3, 180) # (bar) temp = np.zeros((press.shape[0], n_profiles)) desc = f"Extracting the P-T profiles of {tag}" for i in tqdm(range(samples.shape[0]), desc=desc): item = samples[i, :] if pt_profile == "molliere": three_temp = np.array( [ item[param_index["t1"]], item[param_index["t2"]], item[param_index["t3"]], ] ) temp[:, i], _, _ = pt_ret_model( three_temp, 10.0 ** item[param_index["log_delta"]], item[param_index["alpha"]], item[param_index["tint"]], press, item[param_index["metallicity"]], item[param_index["c_o_ratio"]], ) elif pt_profile == "mod-molliere": temp[:, i], _, _ = pt_ret_model( None, 10.0 ** item[param_index["log_delta"]], item[param_index["alpha"]], item[param_index["tint"]], press, item[param_index["metallicity"]], item[param_index["c_o_ratio"]], ) elif pt_profile == "eddington": # Eddington approximation # delta = kappa_ir/gravity tau = press * 1e6 * 10.0 ** item[param_index["log_delta"]] temp[:, i] = ( 0.75 * item[param_index["tint"]] ** 4.0 * (2.0 / 3.0 + tau) ) ** 0.25 elif pt_profile in ["free", "monotonic"]: if "pt_smooth" in param_index: pt_smooth = item[param_index["pt_smooth"]] else: pt_smooth = 0.0 knot_press = np.logspace( np.log10(press[0]), np.log10(press[-1]), temp_nodes ) knot_temp = [] for k in range(temp_nodes): knot_temp.append(item[param_index[f"t{k}"]]) knot_temp = np.asarray(knot_temp) temp[:, i] = pt_spline_interp(knot_press, knot_temp, press, pt_smooth) if out_file is not None: data = np.hstack([press[..., np.newaxis], temp]) if out_file.endswith(".fits"): fits.writeto(out_file, data, overwrite=True) else: np.savetxt(out_file, data, header="Pressure (bar) - Temperature (K)") return press, temp
[docs] @beartype def add_empirical( self, tag: str, names: List[str], sptypes: List[str], goodness_of_fit: List[float], flux_scaling: List[np.ndarray], av_ext: List[float], rad_vel: List[float], object_name: str, spec_name: List[str], spec_library: str, ) -> None: """ Parameters ---------- tag : str Database tag where the results will be stored. names : list(str) Array with the names of the empirical spectra. sptypes : list(str) Array with the spectral types of ``names``. goodness_of_fit : list(float) Array with the goodness-of-fit values. flux_scaling : list(np.ndarray) List with arrays with the best-fit scaling values to match the library spectra with the data. The size of each array is equal to the number of spectra that are provided as argument of ``spec_name``. av_ext : list(float) Array with the visual extinction :math:`A_V`. rad_vel : list(float) Array with the radial velocities (km s-1). object_name : str Object name as stored in the database with :func:`~species.data.database.Database.add_object` or :func:`~species.data.database.Database.add_companion`. spec_name : list(str) List with spectrum names that are stored at the object data of ``object_name``. spec_library : str Name of the spectral library that was used for the empirical comparison. Returns ------- NoneType None """ with h5py.File(self.database, "a") as hdf5_file: if "results" not in hdf5_file: hdf5_file.create_group("results") if "results/empirical" not in hdf5_file: hdf5_file.create_group("results/empirical") if f"results/empirical/{tag}" in hdf5_file: del hdf5_file[f"results/empirical/{tag}"] dtype = h5py.special_dtype(vlen=str) dset = hdf5_file.create_dataset( f"results/empirical/{tag}/names", (np.size(names),), dtype=dtype ) dset[...] = names dset.attrs["object_name"] = str(object_name) dset.attrs["spec_library"] = str(spec_library) dset.attrs["n_spec_name"] = len(spec_name) for i, item in enumerate(spec_name): dset.attrs[f"spec_name{i}"] = item dset = hdf5_file.create_dataset( f"results/empirical/{tag}/sptypes", (np.size(sptypes),), dtype=dtype ) dset[...] = sptypes hdf5_file.create_dataset( f"results/empirical/{tag}/goodness_of_fit", data=goodness_of_fit, dtype="f8", ) hdf5_file.create_dataset( f"results/empirical/{tag}/flux_scaling", data=flux_scaling, dtype="f8" ) hdf5_file.create_dataset( f"results/empirical/{tag}/av_ext", data=av_ext, dtype="f8" ) hdf5_file.create_dataset( f"results/empirical/{tag}/rad_vel", data=rad_vel, dtype="f8" )
[docs] @beartype def add_comparison( self, tag: str, goodness_of_fit: np.ndarray, flux_scaling: np.ndarray, model_param: List[str], coord_points: List[np.ndarray], object_name: str, spec_name: List[str], model_name: str, scale_spec: List[str], extra_scaling: Optional[np.ndarray], inc_phot: List[str], ) -> None: """ Function for adding results obtained with :class:`~species.fit.compare_spectra.CompareSpectra` to the HDF5 database. Parameters ---------- tag : str Database tag where the results will be stored. goodness_of_fit : np.ndarray Array with the goodness-of-fit values. flux_scaling : np.ndarray Array with the best-fit scaling values to match the model spectra with the data. model_param : list(str) List with the names of the model parameters. coord_points : list(np.ndarray) List with 1D arrays of the model grid points, in the same order as ``model_param``. object_name : str Object name as stored in the database with :func:`~species.data.database.Database.add_object` or :func:`~species.data.database.Database.add_companion`. spec_name : list(str) List with spectrum names that are stored at the object data of ``object_name``. model_name : str Atmospheric model grid that is used for the comparison. scale_spec : list(str) List with spectrum names to which an additional scaling has been applied. extra_scaling : np.ndarray. None Array with extra scalings that have been applied to the spectra of ``scale_spec``. The argument can be set to ``None`` if no extra scalings have been applied. inc_phot : list(str) List with filter names of which photometric data was included with the comparison. Returns ------- NoneType None """ from species.read.read_object import ReadObject read_obj = ReadObject(object_name) parallax = read_obj.get_parallax()[0] # (mas) with h5py.File(self.database, "a") as hdf5_file: if "results" not in hdf5_file: hdf5_file.create_group("results") if "results/comparison" not in hdf5_file: hdf5_file.create_group("results/comparison") if f"results/comparison/{tag}" in hdf5_file: del hdf5_file[f"results/comparison/{tag}"] dset = hdf5_file.create_dataset( f"results/comparison/{tag}/goodness_of_fit", data=goodness_of_fit, dtype="f8", ) dset.attrs["object_name"] = str(object_name) dset.attrs["model_name"] = str(model_name) dset.attrs["n_param"] = len(model_param) dset.attrs["n_spec_name"] = len(spec_name) dset.attrs["n_scale_spec"] = len(scale_spec) dset.attrs["parallax"] = parallax dset.attrs["n_inc_phot"] = len(inc_phot) for i, item in enumerate(model_param): dset.attrs[f"parameter{i}"] = item for i, item in enumerate(spec_name): dset.attrs[f"spec_name{i}"] = item for i, item in enumerate(scale_spec): dset.attrs[f"scale_spec{i}"] = item for i, item in enumerate(inc_phot): dset.attrs[f"inc_phot{i}"] = item hdf5_file.create_dataset( f"results/comparison/{tag}/flux_scaling", data=flux_scaling, dtype="f8" ) if len(scale_spec) > 0: hdf5_file.create_dataset( f"results/comparison/{tag}/extra_scaling", data=extra_scaling, dtype="f8", ) for i, item in enumerate(coord_points): hdf5_file.create_dataset( f"results/comparison/{tag}/coord_points{i}", data=item, dtype="f8" ) # Indices of the best-fit model best_index = np.unravel_index( np.nanargmin(goodness_of_fit), goodness_of_fit.shape ) dset.attrs["best_fit"] = goodness_of_fit[best_index] print("Best-fit parameters:") print(f" - Goodness-of-fit = {goodness_of_fit[best_index]:.2e}") for param_idx, param_item in enumerate(model_param): best_param = coord_points[param_idx][best_index[param_idx]] dset.attrs[f"best_param{param_idx}"] = best_param print(f" - {param_item} = {best_param}") scaling = flux_scaling[best_index] radius = np.sqrt(scaling * (1e3 * constants.PARSEC / parallax) ** 2) # (m) radius /= constants.R_JUP # (Rjup) dset.attrs["radius"] = radius print(f" - Radius (Rjup) = {radius:.2f}") dset.attrs["scaling"] = scaling print(f" - Scaling = {scaling:.2e}") for spec_idx, spec_item in enumerate(scale_spec): scaling_idx = np.append(best_index, spec_idx) scale_tmp = extra_scaling[tuple(scaling_idx)] dset.attrs[f"scaling_{spec_item}"] = scale_tmp print(f" - {spec_item} scaling = {scale_tmp:.2f}")
[docs] def add_retrieval( self, tag: str, output_folder: str, inc_teff: bool = False ) -> None: """ Function for adding the output data from the atmospheric retrieval with :class:`~species.fit.retrieval.AtmosphericRetrieval` to the database. Parameters ---------- tag : str Database tag to store the posterior samples. output_folder : str Output folder that was used for the output files by ``MultiNest``. inc_teff : bool Calculate :math:`T_\\mathrm{eff}` for each sample by integrating the model spectrum from 0.5 to 50 um. The :math:`T_\\mathrm{eff}` samples are added to the array with samples that are stored in the database. The computation time for adding :math:`T_\\mathrm{eff}` will be long because the spectra need to be calculated and integrated for all samples. Returns ------- NoneType None """ from species.util.retrieval_util import ( list_to_dict, pt_ret_model, pt_spline_interp, quench_pressure, scale_cloud_abund, ) print("Storing samples in the database...", end="", flush=True) json_filename = os.path.join(output_folder, "params.json") with open(json_filename, encoding="utf-8") as json_file: parameters = json.load(json_file) radtrans_filename = os.path.join(output_folder, "radtrans.json") with open(radtrans_filename, encoding="utf-8") as json_file: radtrans = json.load(json_file) post_new = os.path.join(output_folder, "retrieval_post_equal_weights.dat") post_old = os.path.join(output_folder, "post_equal_weights.dat") if os.path.exists(post_new): samples = np.loadtxt(post_new) elif os.path.exists(post_old): samples = np.loadtxt(post_old) else: raise RuntimeError("Can not find the post_equal_weights.dat file.") if samples.ndim == 1: warnings.warn( f"Only 1 sample found in post_equal_weights.dat " f"of the '{output_folder}' folder." ) samples = samples[np.newaxis,] # Number of fixed parameters fixed_param = {} for i in range(samples.shape[1]): if np.amin(samples[:, i]) == np.amax(samples[:, i]): fixed_param[parameters[i]] = np.mean(samples[:, i]) with h5py.File(self.database, "a") as hdf5_file: if "results" not in hdf5_file: hdf5_file.create_group("results") if "results/fit" not in hdf5_file: hdf5_file.create_group("results/fit") if f"results/fit/{tag}" in hdf5_file: del hdf5_file[f"results/fit/{tag}"] # Store the log-likelihood hdf5_file.create_dataset( f"results/fit/{tag}/ln_prob", data=samples[:, -1], dtype="f8" ) # Remove the column with the log-likelihood value samples = samples[:, :-1] if samples.shape[1] != len(parameters): raise ValueError( "The number of parameters is not equal to the parameter size " "of the samples array." ) dset = hdf5_file.create_dataset( f"results/fit/{tag}/samples", data=samples, dtype="f8" ) dset.attrs["model_type"] = "retrieval" dset.attrs["model_name"] = "petitradtrans" dset.attrs["n_param"] = len(parameters) if "parallax" in radtrans: dset.attrs["parallax"] = radtrans["parallax"] else: dset.attrs["distance"] = radtrans["distance"] count_scale = 0 count_error = 0 for i, item in enumerate(parameters): dset.attrs[f"parameter{i}"] = item for i, item in enumerate(parameters): if item[0:6] == "scaling_": dset.attrs[f"scaling{count_scale}"] = item count_scale += 1 for i, item in enumerate(parameters): if item[0:6] == "error_": dset.attrs[f"error{count_error}"] = item count_error += 1 dset.attrs["n_scaling"] = count_scale dset.attrs["n_error"] = count_error for i, item in enumerate(radtrans["line_species"]): dset.attrs[f"line_species{i}"] = item for i, item in enumerate(radtrans["cloud_species"]): dset.attrs[f"cloud_species{i}"] = item dset.attrs["n_line_species"] = len(radtrans["line_species"]) dset.attrs["n_cloud_species"] = len(radtrans["cloud_species"]) dset.attrs["scattering"] = radtrans["scattering"] dset.attrs["pressure_grid"] = radtrans["pressure_grid"] dset.attrs["pt_profile"] = radtrans["pt_profile"] dset.attrs["chemistry"] = radtrans["chemistry"] dset.attrs["wavel_min"] = radtrans["wavel_range"][0] dset.attrs["wavel_max"] = radtrans["wavel_range"][1] if "quenching" not in radtrans or radtrans["quenching"] is None: dset.attrs["quenching"] = "None" else: dset.attrs["quenching"] = radtrans["quenching"] if "temp_nodes" not in radtrans or radtrans["temp_nodes"] is None: dset.attrs["temp_nodes"] = "None" temp_nodes = 15 else: dset.attrs["temp_nodes"] = radtrans["temp_nodes"] temp_nodes = radtrans["temp_nodes"] if "pt_smooth" in radtrans: dset.attrs["pt_smooth"] = radtrans["pt_smooth"] if "max_press" in radtrans: dset.attrs["max_press"] = radtrans["max_press"] if "abund_nodes" not in radtrans or radtrans["abund_nodes"] is None: dset.attrs["abund_nodes"] = "None" else: dset.attrs["abund_nodes"] = radtrans["abund_nodes"] if "res_mode" in radtrans: dset.attrs["res_mode"] = radtrans["res_mode"] else: dset.attrs["res_mode"] = "c-k" if ( "lbl_opacity_sampling" not in radtrans or radtrans["lbl_opacity_sampling"] is None ): dset.attrs["abund_nodes"] = "None" else: dset.attrs["lbl_opacity_sampling"] = radtrans["lbl_opacity_sampling"] dset.attrs["n_fixed"] = int(len(fixed_param)) for key, value in fixed_param.items(): group_path = f"results/fit/{tag}/fixed_param/{key}" hdf5_file.create_dataset(group_path, data=value, dtype="f8") print(" [DONE]") # Set number of pressures if radtrans["pressure_grid"] in ["standard", "smaller"]: n_pressures = 180 elif radtrans["pressure_grid"] == "clouds": n_pressures = 1440 rt_object = None from petitRADTRANS.chemistry.pre_calculated_chemistry import ( PreCalculatedEquilibriumChemistryTable, ) eq_chem = PreCalculatedEquilibriumChemistryTable() eq_chem.load() for i, cloud_item in enumerate(radtrans["cloud_species"]): if f"{cloud_item}_tau" in parameters: pressure = np.logspace(-6, 3, n_pressures) cloud_mass = np.zeros(samples.shape[0]) if rt_object is None: from petitRADTRANS.radtrans import Radtrans # Pressure array for Radtrans if self.pressure_grid in ["standard", "manual"]: radtrans_press = np.copy(pressure) elif self.pressure_grid == "smaller": radtrans_press = pressure[::3] elif self.pressure_grid == "clouds": radtrans_press = pressure[::24] rt_object = Radtrans( pressures=radtrans_press, line_species=radtrans["line_species"], rayleigh_species=["H2", "He"], cloud_species=radtrans["cloud_species"].copy(), gas_continuum_contributors=["H2-H2", "H2-He"], wavelength_boundaries=radtrans["wavel_range"], line_opacity_mode="c-k", scattering_in_emission=radtrans["scattering"], ) desc = f"Calculating mass fractions of {cloud_item}" for j in tqdm(range(samples.shape[0]), desc=desc): sample_dict = list_to_dict( parameters, samples[j,], ) if radtrans["pt_profile"] == "molliere": upper_temp = np.array( [sample_dict["t1"], sample_dict["t2"], sample_dict["t3"]] ) temp, _, _ = pt_ret_model( upper_temp, 10.0 ** sample_dict["log_delta"], sample_dict["alpha"], sample_dict["tint"], pressure, sample_dict["metallicity"], sample_dict["c_o_ratio"], ) elif ( radtrans["pt_profile"] == "free" or radtrans["pt_profile"] == "monotonic" ): knot_press = np.logspace( np.log10(pressure[0]), np.log10(pressure[-1]), temp_nodes ) knot_temp = [] for k in range(temp_nodes): knot_temp.append(sample_dict[f"t{k}"]) knot_temp = np.asarray(knot_temp) pt_smooth = sample_dict.get("pt_smooth", radtrans["pt_smooth"]) temp = pt_spline_interp( knot_press, knot_temp, pressure, pt_smooth=pt_smooth ) # Set the quenching pressure (bar) if "log_p_quench" in parameters: quench_press = 10.0 ** sample_dict["log_p_quench"] else: quench_press = None abund_in = eq_chem.interpolate_mass_fractions( np.full(pressure.shape[0], sample_dict["c_o_ratio"]), np.full(pressure.shape[0], sample_dict["metallicity"]), temp, pressure, carbon_pressure_quench=quench_press, full=False, ) # Calculate the scaled mass fraction of the clouds cloud_mass[j] = scale_cloud_abund( sample_dict, rt_object, pressure, temp, abund_in["MMW"], "equilibrium", abund_in, cloud_item, sample_dict[f"{cloud_item}_tau"], pressure_grid=radtrans["pressure_grid"], ) db_tag = f"results/fit/{tag}/samples" with h5py.File(self.database, "a") as hdf5_file: dset_attrs = hdf5_file[db_tag].attrs samples = np.asarray(hdf5_file[db_tag]) samples = np.append(samples, cloud_mass[..., np.newaxis], axis=1) del hdf5_file[db_tag] dset = hdf5_file.create_dataset(db_tag, data=samples, dtype="f8") for attr_item in dset_attrs: dset.attrs[attr_item] = dset_attrs[attr_item] n_param = dset_attrs["n_param"] + 1 dset.attrs["n_param"] = n_param if radtrans["quenching"] == "diffusion": p_quench = np.zeros(samples.shape[0]) desc = "Calculating quenching pressures" for i in tqdm(range(samples.shape[0]), desc=desc): # Convert list of parameters and samples into dictionary sample_dict = list_to_dict( parameters, samples[i,], ) # Recalculate the P-T profile from the sampled parameters pressure = np.logspace(-6, 3, n_pressures) # (bar) if radtrans["pt_profile"] == "molliere": upper_temp = np.array( [sample_dict["t1"], sample_dict["t2"], sample_dict["t3"]] ) temp, _, _ = pt_ret_model( upper_temp, 10.0 ** sample_dict["log_delta"], sample_dict["alpha"], sample_dict["tint"], pressure, sample_dict["metallicity"], sample_dict["c_o_ratio"], ) elif ( radtrans["pt_profile"] == "free" or radtrans["pt_profile"] == "monotonic" ): knot_press = np.logspace( np.log10(pressure[0]), np.log10(pressure[-1]), temp_nodes ) knot_temp = [] for k in range(temp_nodes): knot_temp.append(sample_dict[f"t{k}"]) knot_temp = np.asarray(knot_temp) if "pt_smooth" in sample_dict: pt_smooth = sample_dict["pt_smooth"] else: pt_smooth = radtrans["pt_smooth"] temp = pt_spline_interp( knot_press, knot_temp, pressure, pt_smooth=pt_smooth ) # Calculate the quenching pressure p_quench[i] = quench_pressure( pressure, temp, sample_dict["metallicity"], sample_dict["c_o_ratio"], sample_dict["logg"], sample_dict["log_kzz"], ) db_tag = f"results/fit/{tag}/samples" with h5py.File(self.database, "a") as hdf5_file: dset_attrs = hdf5_file[db_tag].attrs samples = np.asarray(hdf5_file[db_tag]) samples = np.append( samples, np.log10(p_quench[..., np.newaxis]), axis=1 ) del hdf5_file[db_tag] dset = hdf5_file.create_dataset(db_tag, data=samples, dtype="f8") for item in dset_attrs: dset.attrs[item] = dset_attrs[item] n_param = dset_attrs["n_param"] + 1 dset.attrs["n_param"] = n_param dset.attrs[f"parameter{n_param-1}"] = "log_p_quench" if inc_teff: print("Calculating Teff from the posterior samples... ") boxes, _ = self.get_retrieval_spectra( tag=tag, random=None, wavel_range=(0.5, 50.0), spec_res=100.0 ) teff = np.zeros(len(boxes)) for i, box_item in enumerate(boxes): if "parallax" in box_item.parameters: sample_distance = ( 1e3 * constants.PARSEC / box_item.parameters["parallax"] ) else: sample_distance = box_item.parameters["distance"] * constants.PARSEC sample_radius = box_item.parameters["radius"] * constants.R_JUP # Scaling for the flux back to the planet surface sample_scale = (sample_distance / sample_radius) ** 2 # Blackbody flux: sigma * Teff^4 flux_int = simpson(sample_scale * box_item.flux, x=box_item.wavelength) teff[i] = (flux_int / constants.SIGMA_SB) ** 0.25 db_tag = f"results/fit/{tag}/samples" with h5py.File(self.database, "a") as hdf5_file: dset_attrs = hdf5_file[db_tag].attrs samples = np.asarray(hdf5_file[db_tag]) samples = np.append(samples, teff[..., np.newaxis], axis=1) del hdf5_file[db_tag] dset = hdf5_file.create_dataset(db_tag, data=samples, dtype="f8") for item in dset_attrs: dset.attrs[item] = dset_attrs[item] n_param = dset_attrs["n_param"] + 1 dset.attrs["n_param"] = n_param dset.attrs[f"parameter{n_param-1}"] = "teff"
[docs] @staticmethod @beartype def get_retrieval_spectra( tag: str, random: Optional[int], wavel_range: Optional[Union[Tuple[float, float], str]] = None, spec_res: Optional[float] = None, lbl_opacity_sampling: Optional[Union[int, np.int64]] = None, ) -> Tuple[List[ModelBox], Union[Any]]: """ Function for extracting random spectra from the posterior distribution that was sampled with :class:`~species.fit.retrieval.AtmosphericRetrieval`. Parameters ---------- tag : str Database tag with the posterior samples. random : int, None Number of randomly selected samples. All samples are selected if set to ``None``. When setting ``random=0``, no random spectra are sampled (so the returned list with ``ModelBox`` objects is empty), but the :class:`~species.read.read_radtrans.ReadRadtrans` instance is still returned. wavel_range : tuple(float, float), str, None Wavelength range (um) or filter name. The wavelength range from the retrieval is adopted (i.e. the``wavel_range`` parameter of :class:`~species.fit.retrieval.AtmosphericRetrieval`) when set to ``None``. It is mandatory to set the argument to ``None`` in case the ``log_tau_cloud`` parameter has been used with the retrieval. spec_res : float, None Spectral resolution that is used for the smoothing with a Gaussian kernel. No smoothing is applied when the argument is set to ``None``. lbl_opacity_sampling : int, None This is the same parameter as in ``petitRADTRANS`` which is used with ``res_mode='lbl'`` to downsample the line-by-line opacities by selecting every ``lbl_opacity_sampling``-th wavelength from the original sampling of :math:`\\lambda/\\Delta \\lambda = 10^6`. Setting this parameter will lower the computation time. By setting the argument to ``None``, the value that was set with the retrieval will be used. Setting a lower value will be useful when calculating a low-resolution spectrum across a wide wavelength range, which may require too much memory when using the ``res_mode='lbl'`` mode. Returns ------- list(ModelBox) Boxes with the randomly sampled spectra. species.read.read_radtrans.ReadRadtrans Instance of :class:`~species.read.read_radtrans.ReadRadtrans`. """ from species.read.read_radtrans import ReadRadtrans from species.util.radtrans_util import retrieval_spectrum # Open configuration file if "SPECIES_CONFIG" in os.environ: config_file = os.environ["SPECIES_CONFIG"] else: config_file = os.path.join(os.getcwd(), "species_config.ini") config = ConfigParser() config.read(config_file) # Read path of the HDF5 database database_path = config["species"]["database"] # Open the HDF5 database with h5py.File(database_path, "r") as hdf5_file: # Read the posterior samples dset = hdf5_file[f"results/fit/{tag}/samples"] samples = np.asarray(dset) attrs_dict = dict(dset.attrs) # Select random samples if random is None: # Required for the printed output in the for loop random = samples.shape[0] else: random_indices = np.random.randint(samples.shape[0], size=random) samples = samples[random_indices, :] # Get number of model parameters if "n_param" in attrs_dict: n_param = attrs_dict["n_param"] elif "nparam" in attrs_dict: n_param = attrs_dict["nparam"] # Get number of line and cloud species n_line_species = attrs_dict["n_line_species"] n_cloud_species = attrs_dict["n_cloud_species"] # Get number of abundance nodes if "abund_nodes" in attrs_dict: if attrs_dict["abund_nodes"] == "None": abund_nodes = None else: abund_nodes = attrs_dict["abund_nodes"] else: abund_nodes = None # Convert numpy boolean to regular boolean scattering = bool(attrs_dict["scattering"]) # Get chemistry attributes chemistry = attrs_dict["chemistry"] if attrs_dict["quenching"] == "None": quenching = None else: quenching = attrs_dict["quenching"] # Get P-T profile attributes pt_profile = attrs_dict["pt_profile"] if "pressure_grid" in attrs_dict: pressure_grid = attrs_dict["pressure_grid"] else: pressure_grid = "smaller" # Get free temperarture nodes if "temp_nodes" in attrs_dict: if attrs_dict["temp_nodes"] == "None": temp_nodes = None else: temp_nodes = attrs_dict["temp_nodes"] else: temp_nodes = None # Get distance if "parallax" in attrs_dict: distance = 1e3 / attrs_dict["parallax"][0] elif "distance" in attrs_dict: distance = attrs_dict["distance"] else: distance = None # Get maximum pressure if "max_press" in attrs_dict: max_press = attrs_dict["max_press"] else: max_press = None # Get model parameters parameters = [] for i in range(n_param): parameters.append(attrs_dict[f"parameter{i}"]) parameters = np.asarray(parameters) # Get wavelength range for median cloud optical depth if "log_tau_cloud" in parameters and wavel_range is not None: cloud_wavel = (attrs_dict["wavel_min"], attrs_dict["wavel_max"]) else: cloud_wavel = None # Get wavelength range for spectrum if wavel_range is None: wavel_range = (attrs_dict["wavel_min"], attrs_dict["wavel_max"]) # Create dictionary with array indices of the model parameters indices = {} for item in parameters: indices[item] = np.argwhere(parameters == item)[0][0] # Create list with line species line_species = [] for i in range(n_line_species): line_species.append(attrs_dict[f"line_species{i}"]) # Create list with cloud species cloud_species = [] for i in range(n_cloud_species): cloud_species.append(attrs_dict[f"cloud_species{i}"]) # Get resolution mode if "res_mode" in attrs_dict: res_mode = attrs_dict["res_mode"] else: res_mode = "c-k" # High-resolution downsampling factor if lbl_opacity_sampling is None: if "lbl_opacity_sampling" in attrs_dict: if attrs_dict["lbl_opacity_sampling"] == "None": lbl_opacity_sampling = None else: lbl_opacity_sampling = attrs_dict["lbl_opacity_sampling"] else: lbl_opacity_sampling = None # Create an instance of ReadRadtrans # Afterwards, the names of the cloud_species have been shortened # from e.g. 'MgSiO3(c)_cd' to 'MgSiO3(c)' read_rad = ReadRadtrans( line_species=line_species, cloud_species=cloud_species, scattering=scattering, wavel_range=wavel_range, pressure_grid=pressure_grid, cloud_wavel=cloud_wavel, max_press=max_press, res_mode=res_mode, lbl_opacity_sampling=lbl_opacity_sampling, ) # Set quenching attribute such that the parameter of get_model is not required read_rad.quenching = quenching # pool = multiprocessing.Pool(os.cpu_count()) # processes = [] # Initiate empty list for ModelBox objects boxes = [] for i, item in enumerate(samples): print(f"\rGetting posterior spectra {i+1}/{random}...", end="") # Get smoothing parameter for P-T profile if "pt_smooth" in attrs_dict: pt_smooth = attrs_dict["pt_smooth"] elif "pt_smooth_0" in parameters: pt_smooth = {} for j in range(temp_nodes - 1): pt_smooth[f"pt_smooth_{j}"] = item[-1 * temp_nodes + j] else: pt_smooth = item[indices["pt_smooth"]] # Get smoothing parameter for abundance profiles if "abund_smooth" in attrs_dict: if attrs_dict["abund_smooth"] == "None": abund_smooth = None else: abund_smooth = attrs_dict["abund_smooth"] else: abund_smooth = None # Calculate the petitRADTRANS spectrum model_box = retrieval_spectrum( indices=indices, chemistry=chemistry, pt_profile=pt_profile, line_species=line_species, cloud_species=cloud_species, quenching=quenching, spec_res=spec_res, distance=distance, pt_smooth=pt_smooth, temp_nodes=temp_nodes, abund_nodes=abund_nodes, abund_smooth=abund_smooth, read_rad=read_rad, sample=item, ) # Add the ModelBox to the list boxes.append(model_box) # proc = pool.apply_async(retrieval_spectrum, # args=(indices, # chemistry, # pt_profile, # line_species, # cloud_species, # quenching, # spec_res, # read_rad, # item)) # # processes.append(proc) # pool.close() # # for i, item in enumerate(processes): # boxes.append(item.get(timeout=30)) # print(f'\rGetting posterior spectra {i+1}/{random}...', end='', flush=True) print(" [DONE]") return boxes, read_rad
[docs] @beartype def get_retrieval_teff( self, tag: str, wavel_range: Tuple[Real, Real], random: int = 100, ) -> Tuple[np.ndarray, np.ndarray]: """ Function for calculating :math:`T_\\mathrm{eff}` and :math:`L_\\mathrm{bol}` from randomly drawn samples of the posterior distribution that is estimated with :class:`~species.fit.retrieval.AtmosphericRetrieval`. This requires the recalculation of the spectra across a broad wavelength range (0.5-50 um). Parameters ---------- tag : str Database tag with the posterior samples. wavel_range : tuple(float, float) Wavelength range (um) across which the spectra are calculated and interpolated. A wide range should be set for an accurate calculation of the bolometric luminosity, but the boundaries should be within the available range of the opacities. random : int Number of randomly selected samples (default: 100). Returns ------- np.ndarray Array with :math:`T_\\mathrm{eff}` samples. np.ndarray Array with :math:`\\log(L/L_\\mathrm{sun})` samples. """ print(f"Calculating Teff from {random} posterior samples... ") boxes, _ = self.get_retrieval_spectra( tag=tag, random=random, wavel_range=wavel_range, spec_res=None, lbl_opacity_sampling=10000, ) t_eff = np.zeros(len(boxes)) l_bol = np.zeros(len(boxes)) for i, box_item in enumerate(boxes): if "parallax" in box_item.parameters: sample_distance = ( 1e3 * constants.PARSEC / box_item.parameters["parallax"] ) else: sample_distance = box_item.parameters["distance"] * constants.PARSEC sample_radius = box_item.parameters["radius"] * constants.R_JUP # Scaling for the flux back to the planet surface sample_scale = (sample_distance / sample_radius) ** 2 # Blackbody flux: sigma * Teff^4 flux_int = simpson(sample_scale * box_item.flux, x=box_item.wavelength) t_eff[i] = (flux_int / constants.SIGMA_SB) ** 0.25 # Bolometric luminosity: 4 * pi * R^2 * sigma * Teff^4 l_bol[i] = 4.0 * np.pi * sample_radius**2 * flux_int l_bol[i] = np.log10(l_bol[i] / constants.L_SUN) # np.savetxt(f'output/spectrum/spectrum{i:04d}.dat', # np.column_stack([box_item.wavelength, sample_scale*box_item.flux]), # header='Wavelength (um) - Flux (W m-2 um-1)') q_16_teff, q_50_teff, q_84_teff = np.nanpercentile(t_eff, [16.0, 50.0, 84.0]) print( f"Teff (K) = {q_50_teff:.2f} " f"(-{q_50_teff-q_16_teff:.2f} " f"+{q_84_teff-q_50_teff:.2f})" ) q_16_lbol, q_50_lbol, q_84_lbol = np.nanpercentile(l_bol, [16.0, 50.0, 84.0]) print( f"log(L/Lsun) = {q_50_lbol:.2f} " f"(-{q_50_lbol-q_16_lbol:.2f} " f"+{q_84_lbol-q_50_lbol:.2f})" ) with h5py.File(self.database, "a") as hdf5_file: print( f"Storing Teff (K) as attribute of results/fit/{tag}/samples...", end="", ) dset = hdf5_file[f"results/fit/{tag}/samples"] dset.attrs["teff"] = ( q_50_teff - q_16_teff, q_50_teff, q_84_teff - q_50_teff, ) print(" [DONE]") print( f"Storing log(L/Lsun) as attribute of results/fit/{tag}/samples...", end="", ) dset.attrs["log_l_bol"] = ( q_50_lbol - q_16_lbol, q_50_lbol, q_84_lbol - q_50_lbol, ) print(" [DONE]") return t_eff, l_bol
[docs] @beartype def petitcode_param( self, tag: str, sample_type: str = "median", json_file: Optional[str] = None ) -> Dict[str, float]: """ Function for converting the median or maximum likelihood posterior parameters of ``petitRADTRANS`` into a dictionary of input parameters for ``petitCODE``. Parameters ---------- tag : str Database tag with the posterior samples. sample_type : str Sample type that will be selected from the posterior ('median' or 'probable'). Either the median or maximum likelihood parameters are used. json_file : str, None JSON file to store the posterior samples. The data will not be written if the argument is set to ``None``. Returns ------- dict Dictionary with parameters for ``petitCODE``. """ from species.read.read_radtrans import ReadRadtrans from species.util.retrieval_util import ( find_cloud_deck, log_x_cloud_base, pt_ret_model, pt_spline_interp, ) if sample_type == "median": model_param = self.get_median_sample(tag, verbose=False) elif sample_type == "probable": model_param = self.get_probable_sample(tag, verbose=False) else: raise ValueError( "The argument of 'sample_type' should be set " "to either 'median' or 'probable'." ) sample_box = self.get_samples(tag) line_species = [] for i in range(sample_box.attributes["n_line_species"]): line_species.append(sample_box.attributes[f"line_species{i}"]) cloud_species = [] cloud_species_full = [] for i in range(sample_box.attributes["n_cloud_species"]): cloud_species.append(sample_box.attributes[f"cloud_species{i}"]) cloud_species_full.append(sample_box.attributes[f"cloud_species{i}"]) pcode_param = {} pcode_param["logg"] = model_param["logg"] pcode_param["metallicity"] = model_param["metallicity"] pcode_param["c_o_ratio"] = model_param["c_o_ratio"] if "fsed" in model_param: pcode_param["fsed"] = model_param["fsed"] if "log_kzz" in model_param: pcode_param["log_kzz"] = model_param["log_kzz"] if "sigma_lnorm" in model_param: pcode_param["sigma_lnorm"] = model_param["sigma_lnorm"] if "log_p_quench" in model_param: pcode_param["log_p_quench"] = model_param["log_p_quench"] p_quench = 10.0 ** model_param["log_p_quench"] else: p_quench = None if "temp_nodes" in sample_box.attributes: temp_nodes = sample_box.attributes["temp_nodes"] else: temp_nodes = 15 if "pressure_grid" in sample_box.attributes: pressure_grid = sample_box.attributes["pressure_grid"] else: pressure_grid = "smaller" pressure = np.logspace(-6.0, 3.0, 180) if sample_box.attributes["pt_profile"] == "molliere": temperature, _, _ = pt_ret_model( np.array([model_param["t1"], model_param["t2"], model_param["t3"]]), 10.0 ** model_param["log_delta"], model_param["alpha"], model_param["tint"], pressure, model_param["metallicity"], model_param["c_o_ratio"], ) else: knot_press = np.logspace( np.log10(pressure[0]), np.log10(pressure[-1]), temp_nodes ) knot_temp = [] for i in range(temp_nodes): knot_temp.append(model_param[f"t{i}"]) knot_temp = np.asarray(knot_temp) if "pt_smooth" in model_param: pt_smooth = model_param["pt_smooth"] else: pt_smooth = 0.0 temperature = pt_spline_interp( knot_press, knot_temp, pressure, pt_smooth=pt_smooth ) from petitRADTRANS.chemistry.pre_calculated_chemistry import ( PreCalculatedEquilibriumChemistryTable, ) eq_chem = PreCalculatedEquilibriumChemistryTable() eq_chem.load() # Interpolate the abundances, following chemical equilibrium abund_in, mmw, _ = eq_chem.interpolate_mass_fractions( np.full(pressure.shape[0], model_param["c_o_ratio"]), np.full(pressure.shape[0], model_param["metallicity"]), temperature, pressure, carbon_pressure_quench=p_quench, full=True, ) cloud_fractions = {} if "log_tau_cloud" in model_param: # tau_cloud = 10.0 ** model_param["log_tau_cloud"] for i, item in enumerate(cloud_species): if i == 0: cloud_fractions[item] = 0.0 else: cloud_fractions[item] = model_param[ f"{item}_{cloud_species[0]}_ratio" ] else: # tau_cloud = None for i, item in enumerate(cloud_species): cloud_fractions[item] = model_param[f"{item}_fraction"] log_x_base = log_x_cloud_base( model_param["c_o_ratio"], model_param["metallicity"], cloud_fractions ) p_base = {} for item in cloud_species: p_base_item = find_cloud_deck( item, pressure, temperature, model_param["metallicity"], model_param["c_o_ratio"], mmw=np.mean(mmw), plotting=False, ) abund_in[item] = np.zeros_like(temperature) abund_in[item][pressure < p_base_item] = ( 10.0 ** log_x_base[item] * (pressure[pressure <= p_base_item] / p_base_item) ** model_param["fsed"] ) p_base[item] = p_base_item indices = np.where(pressure <= p_base_item)[0] pcode_param[f"{item}_base"] = pressure[np.amax(indices)] # abundances = create_abund_dict( # abund_in, line_species, sample_box.attributes['chemistry'], # pressure_grid='smaller', indices=None) cloud_wavel = ( sample_box.attributes["wavel_min"], sample_box.attributes["wavel_max"], ) read_rad = ReadRadtrans( line_species=line_species, cloud_species=cloud_species, scattering=True, wavel_range=(0.5, 50.0), pressure_grid=pressure_grid, res_mode="c-k", cloud_wavel=cloud_wavel, ) print(f"Converting {tag} to petitCODE parameters...", end="", flush=True) if sample_box.attributes["quenching"] == "None": quenching = None else: quenching = sample_box.attributes["quenching"] model_box = read_rad.get_model( model_param=model_param, quenching=quenching, spec_res=500.0 ) # Distance (pc) if "parallax" in model_param: distance = 1e3 * constants.PARSEC / model_param["parallax"] else: distance = model_param["distance"] * constants.PARSEC # Radius (Rjup) radius = model_param["radius"] * constants.R_JUP # Blackbody flux: sigma * Teff^4 # Scale the flux back to the planet surface flux_int = simpson( model_box.flux * (distance / radius) ** 2, x=model_box.wavelength ) pcode_param["teff"] = (flux_int / constants.SIGMA_SB) ** 0.25 if "log_tau_cloud" in model_param: cloud_scaling = read_rad.rt_object.cloud_scaling_factor for item in cloud_species_full: cloud_abund = abund_in[item] indices = np.where(cloud_abund > 0.0)[0] pcode_param[f"{item}_abund"] = ( cloud_scaling * cloud_abund[np.amax(indices)] ) else: for item in cloud_species_full: cloud_abund = abund_in[item] indices = np.where(cloud_abund > 0.0)[0] pcode_param[f"{item}_abund"] = cloud_abund[np.amax(indices)] if json_file is not None: with open(json_file, "w", encoding="utf-8") as out_file: json.dump(pcode_param, out_file, indent=4) print(" [DONE]") return pcode_param
[docs] @beartype def get_spectral_type(self, tag: str, verbose: bool = True) -> SpectrumBox: """ Function for extracting the spectral template for which the goodness-of-fit statistic has been minimized when using :func:`~species.fit.compare_spectra.CompareSpectra.spectral_type` for comparing data with the empirical templates in a spectral library. Parameters ---------- tag : str Database tag where the results from the empirical comparison with :class:`~species.fit.compare_spectra.CompareSpectra.spectral_type` are stored. verbose : bool Print output, including the parameter values. Returns ------- SpectrumBox A ``SpectrumBox`` that contains the spectrum and attributes of the best-fit template. """ if verbose: print_section("Get best spectral type") print(f"Database tag: {tag}") with h5py.File(self.database, "r") as hdf5_file: dset = hdf5_file[f"results/empirical/{tag}/names"] object_name = dset.attrs["object_name"] spec_library = dset.attrs["spec_library"] n_spec_name = dset.attrs["n_spec_name"] if verbose: print(f"Object name: {object_name}") print(f"Spectral library: {spec_library}") if verbose: print("\nIncluded spectra:") spec_name = [] for i in range(n_spec_name): spec_name.append(dset.attrs[f"spec_name{i}"]) if verbose: print(f" - {spec_name[-1]}") names = np.array(dset) flux_scaling = np.array(hdf5_file[f"results/empirical/{tag}/flux_scaling"]) av_ext = np.array(hdf5_file[f"results/empirical/{tag}/av_ext"]) rad_vel = 1e3 * np.array( hdf5_file[f"results/empirical/{tag}/rad_vel"] ) # (m s-1) goodness_fit = np.array( [hdf5_file[f"results/empirical/{tag}/goodness_of_fit"]] ) if isinstance(names[i], str): best_name = names[i] else: best_name = names[i].decode("utf-8") with h5py.File(self.database, "r") as hdf5_file: dset = hdf5_file[f"spectra/{spec_library}/{best_name}"] spectrum = np.array(dset) sptype = dset.attrs["sptype"] simbad = dset.attrs["simbad"] parallax = dset.attrs["parallax"] spec_res = dset.attrs["spec_res"] if verbose: print("\nBest-fit spectral template:") print(f" - G_k = {goodness_fit[0][0]:.2f}") print(f" - Object name = {best_name}") print(f" - SIMBAD name = {simbad}") print(f" - Spectral type = {sptype}") print(f" - Flux scaling = {flux_scaling[0][0]:.2e}") if np.amin(av_ext) > 0.0: print(f" - Extinction A_V = {av_ext[0]}") if np.amin(rad_vel) > 0.0: print(f" - Radial velocity (km/s) = {rad_vel[0]}") return create_box( "spectrum", spectrum=spec_library, wavelength=spectrum[:, 0], flux=spectrum[:, 1], error=spectrum[:, 2], name=best_name, sptype=sptype, simbad=best_name, parallax=parallax, spec_res=spec_res, )