"""
Module with functionalities for retrieving the age and
bulk parameters of one or multiple planets and/or brown
dwarfs in a system.
"""
import configparser
import os
import warnings
from typing import Dict, List, Optional, Tuple, Union
import h5py
import numpy as np
try:
import pymultinest
except:
warnings.warn(
"PyMultiNest could not be imported. "
"Perhaps because MultiNest was not build "
"and/or found at the LD_LIBRARY_PATH "
"(Linux) or DYLD_LIBRARY_PATH (Mac)?"
)
from scipy import stats
from tqdm.auto import tqdm
from typeguard import typechecked
from species.core import constants
from species.read.read_isochrone import ReadIsochrone
[docs]
class FitEvolution:
"""
Class for retrieving evolutionary parameters from the bolometric
luminosity of one or multiple planets in a planetary system.
Optionally, dynamical mass and/or radius priors can be applied
A single age is retrieved, so assuming that a difference
in formation time is negligible at the age of the system.
"""
@typechecked
def __init__(
self,
evolution_model: str,
object_lbol: Optional[Union[Tuple[float, float], List[Tuple[float, float]]]],
object_mass: Optional[
Union[Tuple[float, float], List[Optional[Tuple[float, float]]]]
] = None,
object_radius: Optional[
Union[Tuple[float, float], List[Optional[Tuple[float, float]]]]
] = None,
bounds: Optional[
Dict[
str,
Union[
Tuple[float, float],
Tuple[Optional[Tuple[float, float]], Optional[Tuple[float, float]]],
List[Tuple[float, float]],
],
]
] = None,
) -> None:
"""
Parameters
----------
evolution_model : str
Database tag of the isochrone data (e.g. 'ames-cond',
'sonora+0.0', 'atmo-ceq'). When using an incorrect
argument, and error message is printed that includes
a list with the isochrone models that are available
in the current ``species`` database.
object_lbol : tuple(float, float), list(tuple(float, float))
List with tuples that contain :math:`\\log10{L/L_\\odot}`
and the related uncertainty for one or multiple objects.
The list should follow the alphabetical order of companion
characters (i.e. b, c, d, etc.) to make sure that the
labels are correctly shown when plotting results.
object_mass : tuple(float, float), list(tuple(float, float)), None
Optional list with tuples that contain the (dynamical)
masses and the related uncertainty for one or multiple
objects. These masses we be used as Gaussian prior with
the fit. The order should be identical to ``object_lbol``.
object_radius : tuple(float, float), list(tuple(float, float)), None
Optional list with tuples that contain the radii (e.g.
from and SED fit) and the related uncertainty for one
or multiple objects. These radii we be used as Gaussian
prior with the fit. The order should be identical to
``object_lbol``.
bounds : dict(str, tuple(float, float)), None
The boundaries that are used for the uniform or
log-uniform priors. Fixing a parameter is possible by
providing the same value as lower and upper boundary
of the parameter (e.g. ``bounds={'y_frac': (0.25, 0.25)``.
Returns
-------
NoneType
None
"""
self.evolution_model = evolution_model
self.object_lbol = object_lbol
self.object_mass = object_mass
self.object_radius = object_radius
self.bounds = bounds
self.fix_param = {}
if isinstance(self.object_lbol, tuple):
self.object_lbol = [self.object_lbol]
self.object_mass = [self.object_mass]
self.n_planets = 1
if isinstance(self.object_lbol, list):
self.n_planets = len(self.object_lbol)
else:
self.n_planets = 1
if self.object_mass is None:
self.object_mass = []
for i in range(self.n_planets):
self.object_mass.append(None)
if self.object_radius is None:
self.object_radius = []
for i in range(self.n_planets):
self.object_radius.append(None)
config_file = os.path.join(os.getcwd(), "species_config.ini")
config = configparser.ConfigParser()
config.read(config_file)
self.database = config["species"]["database"]
self.database_path = config["species"]["database"]
# Add grid with evolution data
with h5py.File(self.database_path, "r") as h5_file:
found_model = bool(f"isochrones/{evolution_model}" in h5_file)
tag_list = list(h5_file["isochrones"])
if not found_model:
raise ValueError(
f"The isochrones of '{evolution_model}' "
"are not found in the database. Please "
"add the isochrones by using the "
"add_isochrones method of Database. The "
"following isochrone data are found in "
f"the database: {tag_list}"
)
# Model parameters
self.model_par = ["age"]
for i in range(self.n_planets):
self.model_par.append(f"mass_{i}")
[docs]
@typechecked
def run_multinest(
self,
tag: str,
n_live_points: int = 1000,
output: str = "multinest/",
) -> None:
"""
Function to run the ``PyMultiNest`` wrapper of the
``MultiNest`` sampler. While ``PyMultiNest`` can be
installed with ``pip`` from the PyPI repository,
``MultiNest`` has to to be build manually. See the
``PyMultiNest`` documentation for details:
http://johannesbuchner.github.io/PyMultiNest/install.html.
Note that the library path of ``MultiNest`` should be set
to the environmental variable ``LD_LIBRARY_PATH`` on a
Linux machine and ``DYLD_LIBRARY_PATH`` on a Mac.
Alternatively, the variable can be set before importing
the ``species`` package, for example:
.. code-block:: python
>>> import os
>>> os.environ['DYLD_LIBRARY_PATH'] = '/path/to/MultiNest/lib'
>>> import species
Parameters
----------
tag : str
Database tag where the samples will be stored.
n_live_points : int
Number of live points.
output : str
Path that is used for the output files from MultiNest.
Returns
-------
NoneType
None
"""
try:
from mpi4py import MPI
mpi_rank = MPI.COMM_WORLD.Get_rank()
except ModuleNotFoundError:
mpi_rank = 0
# Create the output folder if required
if mpi_rank == 0 and not os.path.exists(output):
os.mkdir(output)
read_iso = ReadIsochrone(
tag=self.evolution_model,
create_regular_grid=False,
)
# Prior boundaries
if self.bounds is not None:
bounds_grid = {}
for key, value in read_iso.grid_points().items():
if key in ["age", "mass"]:
bounds_grid[key] = (value[0], value[-1])
for key, value in bounds_grid.items():
if key not in self.bounds:
# Set the parameter boundaries to the grid
# boundaries if set to None or not found
if key == "age":
self.bounds[key] = bounds_grid[key]
else:
for i in range(self.n_planets):
if f"{key}_{i}" not in self.bounds:
self.bounds[f"{key}_{i}"] = bounds_grid[key]
else:
if self.bounds[key][0] < bounds_grid[key][0]:
warnings.warn(
f"The lower bound on {key} "
f"({self.bounds[key][0]}) is smaller than "
f"the lower bound from the available "
f"evolution model grid "
f"({bounds_grid[key][0]}). The lower bound "
f"of the {key} prior will be adjusted to "
f"{bounds_grid[key][0]}."
)
if key == "age":
self.bounds[key] = (
bounds_grid[key][0],
self.bounds[key][1],
)
else:
for i in range(self.n_planets):
self.bounds[f"{key}_{i}"] = (
bounds_grid[key][0],
self.bounds[key][1],
)
if self.bounds[key][1] > bounds_grid[key][1]:
warnings.warn(
f"The upper bound on {key} "
f"({self.bounds[key][1]}) is larger than the "
f"upper bound from the available evolution "
f"model grid ({bounds_grid[key][1]}). The "
f"bound of the {key} prior will be adjusted "
f"to {bounds_grid[key][1]}."
)
if key == "age":
self.bounds[key] = (
self.bounds[key][0],
bounds_grid[key][1],
)
else:
for i in range(self.n_planets):
if f"{key}_{i}" in self.bounds:
self.bounds[f"{key}_{i}"] = (
self.bounds[f"{key}_{i}"][0],
bounds_grid[key][1],
)
else:
self.bounds[f"{key}_{i}"] = (
self.bounds[key][0],
bounds_grid[key][1],
)
if key != "age":
for i in range(self.n_planets):
self.bounds[f"{key}_{i}"] = self.bounds[key]
del self.bounds[key]
for i in range(self.n_planets):
if f"inflate_lbol{i}" in self.bounds:
self.model_par.append(f"inflate_lbol{i}")
for i in range(self.n_planets):
if f"inflate_mass{i}" in self.bounds:
if self.object_mass[i] is None:
warnings.warn(
f"The object_mass with index "
f"{i} is set to None so the "
f"inflate_mass{i} parameter "
f"will be excluded."
)
del self.bounds[f"inflate_mass{i}"]
else:
self.model_par.append(f"inflate_mass{i}")
else:
# Set all parameter boundaries to the grid boundaries
self.bounds = {}
for key, value in read_iso.grid_points().items():
if key == "age":
self.bounds[key] = (value[0], value[-1])
elif key == "mass":
for i in range(self.n_planets):
self.bounds[f"{key}_{i}"] = (value[0], value[-1])
# Check if parameters are fixed
del_param = []
for key, value in self.bounds.items():
if value[0] == value[1] and value[0] is not None and value[1] is not None:
self.fix_param[key] = value[0]
del_param.append(key)
for item in del_param:
del self.bounds[item]
self.model_par.remove(item)
if self.fix_param:
print(f"Fixing {len(self.fix_param)} parameters:")
for key, value in self.fix_param.items():
print(f" - {key} = {value}")
print(f"Fitting {len(self.model_par)} parameters:")
for item in self.model_par:
print(f" - {item}")
print("Prior boundaries:")
for key, value in self.bounds.items():
print(f" - {key} = {value}")
# Create a dictionary with the cube indices of the parameters
cube_index = {}
for i, item in enumerate(self.model_par):
cube_index[item] = i
@typechecked
def ln_prior(cube, n_dim, n_param) -> None:
"""
Function to transform the unit cube into the parameter
cube. It is not clear how to pass additional arguments
to the function, therefore it is placed here.
Parameters
----------
cube : pymultinest.run.LP_c_double
Unit cube.
n_dim : int
Number of dimensions.
n_param : int
Number of parameters.
Returns
-------
NoneType
None
"""
for i, key in enumerate(self.model_par):
if key != "age":
obj_idx = int(key.split("_")[-1])
if key[:4] == "mass" and self.object_mass[obj_idx] is not None:
# Gaussian mass prior
sigma = self.object_mass[obj_idx][1]
if f"inflate_mass{obj_idx}" in self.bounds:
sigma += (
self.bounds[f"inflate_mass{obj_idx}"][0]
+ (
self.bounds[f"inflate_mass{obj_idx}"][1]
- self.bounds[f"inflate_mass{obj_idx}"][0]
)
* cube[cube_index[f"inflate_mass{obj_idx}"]]
)
cube[cube_index[f"mass_{obj_idx}"]] = stats.norm.ppf(
cube[cube_index[f"mass_{obj_idx}"]],
loc=self.object_mass[obj_idx][0],
scale=sigma,
)
else:
cube[i] = (
self.bounds[key][0]
+ (self.bounds[key][1] - self.bounds[key][0]) * cube[i]
)
@typechecked
def ln_like(params, n_dim, n_param) -> np.float64:
"""
Function for return the log-likelihood for
the sampled parameter cube.
Parameters
----------
params : pymultinest.run.LP_c_double
Cube with physical parameters.
n_dim : int
Number of dimensions. This parameter is mandatory
but not used by the function.
n_param : int
Number of parameters. This parameter is mandatory
but not used by the function.
Returns
-------
float
Log-likelihood.
"""
chi_square = 0.0
for i in range(self.n_planets):
param_names = [
"age",
f"mass_{i}",
]
param_val = []
for item in param_names:
if item in self.fix_param:
param_val.append(self.fix_param[item])
else:
param_val.append(params[cube_index[item]])
if f"inflate_lbol{i}" in self.bounds:
lbol_var = (
self.object_lbol[i][1] + params[cube_index[f"inflate_lbol{i}"]]
) ** 2.0
else:
lbol_var = self.object_lbol[i][1] ** 2
iso_box = read_iso.get_isochrone(
age=params[cube_index["age"]],
masses=np.array([params[cube_index[f"mass_{i}"]]]),
filters_color=None,
filter_mag=None,
)
chi_square += (
self.object_lbol[i][0] - iso_box.log_lum[0]
) ** 2 / lbol_var
# Only required when fitting the
# inflation on the Lbol variance
chi_square += np.log(2.0 * np.pi * lbol_var)
if self.object_mass[i] is not None:
# Only required when fitting the
# inflation on the mass variance
if f"inflate_mass{i}" in self.bounds:
mass_var = (
self.object_mass[i][1]
+ params[cube_index[f"inflate_mass{i}"]]
) ** 2.0
else:
mass_var = self.object_mass[i][1] ** 2
chi_square += np.log(2.0 * np.pi * mass_var)
# Radius prior
if self.object_radius[i] is not None:
chi_square += (
self.object_radius[i][0] - iso_box.radius[0]
) ** 2 / self.object_radius[i][1] ** 2
# ln_like += -0.5 * weight * (obj_item[0] - phot_flux) ** 2 / phot_var
# ln_like += -0.5 * weight * np.log(2.0 * np.pi * phot_var)
if np.isnan(chi_square):
log_like = -np.inf
elif np.isinf(chi_square):
log_like = -np.inf
else:
log_like = -0.5 * chi_square
return log_like
pymultinest.run(
ln_like,
ln_prior,
len(self.model_par),
outputfiles_basename=output,
resume=False,
n_live_points=n_live_points,
)
# Create the Analyzer object
analyzer = pymultinest.analyse.Analyzer(
len(self.model_par), outputfiles_basename=output
)
# Get a dictionary with the ln(Z) and its errors, the
# individual modes and their parameters quantiles of
# the parameter posteriors
sampling_stats = analyzer.get_stats()
# Nested sampling global log-evidence
ln_z = sampling_stats["nested sampling global log-evidence"]
ln_z_error = sampling_stats["nested sampling global log-evidence error"]
print(f"Nested sampling global log-evidence: {ln_z:.2f} +/- {ln_z_error:.2f}")
# Nested sampling global log-evidence
ln_z = sampling_stats["nested importance sampling global log-evidence"]
ln_z_error = sampling_stats[
"nested importance sampling global log-evidence error"
]
print(
f"Nested importance sampling global log-evidence: {ln_z:.2f} +/- {ln_z_error:.2f}"
)
# Get the best-fit (highest likelihood) point
print("Sample with the highest likelihood:")
best_params = analyzer.get_best_fit()
max_lnlike = best_params["log_likelihood"]
print(f" - Log-likelihood = {max_lnlike:.2f}")
for i, item in enumerate(best_params["parameters"]):
print(f" - {self.model_par[i]} = {item:.2f}")
# Get the posterior samples
samples = analyzer.get_equal_weighted_posterior()
analyzer = pymultinest.analyse.Analyzer(
len(self.model_par), outputfiles_basename=output
)
sampling_stats = analyzer.get_stats()
ln_z = sampling_stats["nested sampling global log-evidence"]
ln_z_error = sampling_stats["nested sampling global log-evidence error"]
print(f"Nested sampling global log-evidence: {ln_z:.2f} +/- {ln_z_error:.2f}")
ln_z = sampling_stats["nested importance sampling global log-evidence"]
ln_z_error = sampling_stats[
"nested importance sampling global log-evidence error"
]
print(
f"Nested importance sampling global log-evidence: {ln_z:.2f} +/- {ln_z_error:.2f}"
)
print("Sample with the highest likelihood:")
best_params = analyzer.get_best_fit()
max_lnlike = best_params["log_likelihood"]
print(f" - Log-likelihood = {max_lnlike:.2f}")
for i, item in enumerate(best_params["parameters"]):
print(f" - {self.model_par[i]} = {item:.2f}")
samples = analyzer.get_equal_weighted_posterior()
ln_prob = samples[:, -1]
# Adding the fixed parameters to the samples
if self.fix_param:
samples_tmp = samples[:, :-1]
self.model_par = ["age"]
for i in range(self.n_planets):
self.model_par.append(f"mass_{i}")
for i in range(self.n_planets):
if f"inflate_lbol{i}" in self.bounds:
self.model_par.append(f"inflate_lbol{i}")
for i in range(self.n_planets):
if f"inflate_mass{i}" in self.bounds:
self.model_par.append(f"inflate_mass{i}")
samples = np.zeros((samples_tmp.shape[0], len(self.model_par)))
for i, key in enumerate(self.model_par):
if key in self.fix_param:
samples[:, i] = np.full(samples_tmp.shape[0], self.fix_param[key])
else:
samples[:, i] = samples_tmp[:, cube_index[key]]
else:
samples = samples[:, :-1]
# Recreate cube_index dictionary because of included fix_param
cube_index = {}
for i, item in enumerate(self.model_par):
cube_index[item] = i
# Add atmospheric parameters (R, Teff, and log(g))
print("Extracting the posteriors of Teff, R, and log(g)...")
radius = np.zeros((samples.shape[0], self.n_planets))
log_g = np.zeros((samples.shape[0], self.n_planets))
t_eff = np.zeros((samples.shape[0], self.n_planets))
for j in tqdm(range(self.n_planets)):
for i in tqdm(range(samples.shape[0]), leave=False):
age = samples[i, cube_index["age"]]
mass = samples[i, cube_index[f"mass_{j}"]]
iso_box = read_iso.get_isochrone(
age=age,
masses=np.array([mass]),
filters_color=None,
filter_mag=None,
)
radius[i, j] = iso_box.radius[0]
log_g[i, j] = iso_box.logg[0]
l_bol = 10.0 ** iso_box.log_lum[0] * constants.L_SUN
t_eff[i, j] = (
l_bol
/ (
4.0
* np.pi
* (radius[i, j] * constants.R_JUP) ** 2
* constants.SIGMA_SB
)
) ** 0.25
for i in range(self.n_planets):
self.model_par.append(f"teff_evol_{i}")
for i in range(self.n_planets):
self.model_par.append(f"radius_evol_{i}")
for i in range(self.n_planets):
self.model_par.append(f"logg_evol_{i}")
samples = np.hstack((samples, t_eff, radius, log_g))
# Recreate cube_index dictionary because of
# derived parameters that were included
cube_index = {}
for i, item in enumerate(self.model_par):
cube_index[item] = i
# Remove outliers
# percent = np.percentile(samples, (1.0, 99.0), axis=0)
#
# for i, item in enumerate(percent[0]):
# if i == 0:
# indices = samples[:, i] < item
# else:
# indices += samples[:, i] < item
#
# samples = samples[~indices, :]
#
# for i, item in enumerate(percent[1]):
# if i == 0:
# indices = samples[:, i] > item
# else:
# indices += samples[:, i] > item
#
# samples = samples[~indices, :]
# Apply uncertainty inflation
for i in range(self.n_planets):
if f"inflate_lbol{i}" in self.bounds:
# sigma_add = np.median(samples[:, cube_index[f"inflate_lbol{i}"]])
index_prob = np.argmax(ln_prob)
sigma_add = samples[index_prob, cube_index[f"inflate_lbol{i}"]]
self.object_lbol[i] = (
self.object_lbol[i][0],
self.object_lbol[i][1] + sigma_add,
)
for i in range(self.n_planets):
if f"inflate_mass{i}" in self.bounds:
# sigma_add = np.median(samples[:, cube_index[f"inflate_lbol{i}"]])
index_prob = np.argmax(ln_prob)
sigma_add = samples[index_prob, cube_index[f"inflate_mass{i}"]]
self.object_mass[i] = (
self.object_mass[i][0],
self.object_mass[i][1] + sigma_add,
)
# Set object_radius to posterior value if argument was None
for i, item in enumerate(self.object_radius):
if item is None:
radius_samples = samples[:, cube_index[f"radius_evol_{i}"]]
self.object_radius[i] = (
np.mean(radius_samples),
np.std(radius_samples),
)
# Adjust object_mass to posterior value
# for i, item in enumerate(self.object_mass):
# mass_samples = samples[:, cube_index[f"mass_{i}"]]
# self.object_mass[i] = (np.mean(mass_samples), np.std(mass_samples))
# Adjust object_radius to posterior value
# for i, item in enumerate(self.object_radius):
# radius_samples = samples[:, cube_index[f"radius_evol_{i}"]]
# self.object_radius[i] = (np.mean(radius_samples), np.std(radius_samples))
# Set object_mass and object_radius to NaN if no prior was provided
for i, item in enumerate(self.object_mass):
if f"mass_{i}" in self.fix_param:
self.object_mass[i] = (self.fix_param[f"mass_{i}"], 0.0)
elif item is None:
self.object_mass[i] = np.nan
for i, item in enumerate(self.object_radius):
if item is None:
self.object_radius[i] = np.nan
# Dictionary with attributes that will be stored
attr_dict = {
"spec_type": "model",
"spec_name": "evolution",
"evolution_model": self.evolution_model,
"ln_evidence": (ln_z, ln_z_error),
"n_planets": self.n_planets,
"object_lbol": self.object_lbol,
"object_mass": self.object_mass,
"object_radius": self.object_radius,
}
# Add samples to the database
if mpi_rank == 0:
# Writing the samples to the database is only
# possible when using a single process
from species.data.database import Database
species_db = Database()
species_db.add_samples(
sampler="multinest",
samples=samples,
ln_prob=ln_prob,
tag=tag,
modelpar=self.model_par,
attr_dict=attr_dict,
)