"""
Utility functions for model spectra.
"""
import json
import warnings
from itertools import product
from numbers import Real
from pathlib import Path
import numpy as np
from astropy import units as u
from beartype import beartype
from beartype.typing import Dict, Optional, Tuple, Union
# from PyAstronomy.pyasl import fastRotBroad
from scipy.interpolate import RegularGridInterpolator
from spectres.spectral_resampling_numba import spectres_numba
from species.core import constants
from species.core.box import ModelBox, create_box
from species.util.dust_util import ism_extinction
from species.util.spec_util import create_wavelengths, smooth_spectrum
[docs]
@beartype
def convert_model_name(in_name: str) -> str:
"""
Function for updating a model name for use in plots.
Parameters
----------
in_name : str
Model name as used by species.
Returns
-------
str
Updated model name for plots.
"""
data_file = (
Path(__file__).parent.resolve().parents[0] / "data/model_data/model_data.json"
)
with open(data_file, "r", encoding="utf-8") as json_file:
model_data = json.load(json_file)
if in_name in model_data.keys():
out_name = model_data[in_name]["name"]
elif in_name == "planck":
out_name = "Blackbody"
else:
out_name = in_name
warnings.warn(
f"The model name '{in_name}' is not known "
"so the output name will not get adjusted "
"for plot purposes"
)
return out_name
[docs]
@beartype
def powerlaw_spectrum(
wavel_range: Tuple[Real, Real],
model_param: Dict[str, Real],
spec_res: Real = 100.0,
) -> ModelBox:
"""
Function for calculating a power-law spectrum. The power-law
function is calculated in log(wavelength)-log(flux) space but
stored in the :class:`~species.core.box.ModelBox` in linear
wavelength-flux space.
Parameters
----------
wavel_range : tuple(float, float)
Tuple with the minimum and maximum wavelength (um).
model_param : dict
Dictionary with the model parameters. Should contain
`'log_powerlaw_a'`, `'log_powerlaw_b'`, and `'log_powerlaw_c'`.
spec_res : float
Spectral resolution (default: 100).
Returns
-------
species.core.box.ModelBox
Box with the power-law spectrum.
"""
wavel = create_wavelengths((wavel_range[0], wavel_range[1]), spec_res)
wavel *= 1e3 # (um) -> (nm)
log_flux = (
model_param["log_powerlaw_a"]
+ model_param["log_powerlaw_b"]
* np.log10(wavel) ** model_param["log_powerlaw_c"]
)
model_box = create_box(
boxtype="model",
model="powerlaw",
wavelength=1e-3 * wavel, # (um)
flux=10.0**log_flux, # (W m-2 um-1)
parameters=model_param,
quantity="flux",
)
return model_box
[docs]
@beartype
def gaussian_spectrum(
wavel_range: Tuple[Real, Real],
model_param: Dict[str, Real],
spec_res: Real = 100.0,
double_gaussian: bool = False,
) -> ModelBox:
"""
Function for calculating a Gaussian spectrum (i.e. for an emission
line).
Parameters
----------
wavel_range : tuple(float, float)
Tuple with the minimum and maximum wavelength (um).
model_param : dict
Dictionary with the model parameters. Should contain
``'gauss_amplitude'``, ``'gauss_mean'``, ``'gauss_sigma'``,
and optionally ``'gauss_offset'``.
spec_res : float
Spectral resolution (default: 100).
double_gaussian : bool
Set to ``True`` for returning a double Gaussian function.
In that case, ``model_param`` should also contain
``'gauss_amplitude_2'``, ``'gauss_mean_2'``, and
``'gauss_sigma_2'``.
Returns
-------
species.core.box.ModelBox
Box with the Gaussian spectrum.
"""
wavel = create_wavelengths((wavel_range[0], wavel_range[1]), spec_res)
gauss_exp = np.exp(
-0.5
* (wavel - model_param["gauss_mean"]) ** 2
/ model_param["gauss_sigma"] ** 2
)
flux = model_param["gauss_amplitude"] * gauss_exp
if double_gaussian:
gauss_exp = np.exp(
-0.5
* (wavel - model_param["gauss_mean_2"]) ** 2
/ model_param["gauss_sigma_2"] ** 2
)
flux += model_param["gauss_amplitude_2"] * gauss_exp
if "gauss_offset" in model_param:
flux += model_param["gauss_offset"]
model_box = create_box(
boxtype="model",
model="gaussian",
wavelength=wavel,
flux=flux,
parameters=model_param,
quantity="flux",
)
return model_box
[docs]
@beartype
def binary_to_single(param_dict: Dict[str, Real], star_index: int) -> Dict[str, Real]:
"""
Function for converting a dictionary with atmospheric parameters
of a binary system to a dictionary of parameters for one of the
two stars.
Parameters
----------
param_dict : dict
Dictionary with the atmospheric parameters of both stars. The
keywords end either with ``_0`` or ``_1`` that correspond with
``star_index=0`` or ``star_index=1``.
star_index : int
Star index (0 or 1) that is used for the parameters in
``param_dict``.
Returns
-------
dict
Dictionary with the parameters of the selected star.
"""
new_dict = {}
for param_key, param_value in param_dict.items():
if star_index == 0 and param_key[-1] == "0":
new_dict[param_key[:-2]] = param_value
elif star_index == 1 and param_key[-1] == "1":
new_dict[param_key[:-2]] = param_value
elif param_key[-2:] not in ["_0", "_1"]:
new_dict[param_key] = param_value
return new_dict
[docs]
@beartype
def apply_obs(
model_flux: np.ndarray,
model_wavel: Optional[np.ndarray] = None,
model_param: Optional[Dict[str, Real]] = None,
data_wavel: Optional[np.ndarray] = None,
spec_res: Optional[Real] = None,
rot_broad: Optional[Real] = None,
limb_dark: Optional[Real] = 0.6,
rad_vel: Optional[Real] = None,
cross_sections: Optional[RegularGridInterpolator] = None,
ext_model: Optional[str] = None,
) -> np.ndarray:
"""
Function for post-processing of a model spectrum. This will
apply a rotational broadening, radial velocity shift, extinction,
scaling, instrumental broadening, and wavelength resampling.
Each of the steps are optional, depending on the arguments that
are set and the parameters in the ``model_param`` dictionary.
Parameters
----------
model_flux : np.ndarray
Array with the fluxes of the model spectrum.
model_wavel : np.ndarray, None
Array with the wavelengths of the model spectrum. Used by
most of the steps, except the flux scaling.
model_param : dict(str, float), None
Dictionary with the model parameters. Not used if the
argument is set to ``None``.
data_wavel : np.ndarray, None
Array with the wavelengths of the data used for the
wavelength resampling. Not applied if the argument is
set to ``None``.
spec_res : float, None
Spectral resolution of the data used for the
instrumental broadening. Not applied if the argument
is set to ``None`` or np.nan.
rot_broad : float, None
Rotational broadening :math:`v\\sin{i}` (km/s). Not
applied if the argument is set to ``None``.
limb_dark : float
Linear limb darkening coefficient in the range of 0.0
to 1.0 (default: 0.6). This parameter is only applied
in combination with setting ``rot_broad``.
rad_vel : float, None
Radial velocity (km/s). Not applied if the argument
is set to ``None``.
cross_sections : RegularGridInterpolator, None
Interpolated cross sections for fitting extinction by dust
grains with a log-normal or power-law size distribution.
ext_model : str, None
Name with the extinction model from the ``dust-extinction``
package (see `list of available models
<https://dust-extinction.readthedocs.io/en/latest/
dust_extinction/choose_model.html>`_). For example,
set the argument to ``'CCM89'`` to use the extinction
relation from `Cardelli et al. (1989) <https://ui.adsabs.
harvard.edu/abs/1989ApJ...345..245C/abstract>`_.
Returns
-------
np.ndarray
Array with the processed model fluxes.
"""
# Set empty dictionary if None
if model_param is None:
model_param = {}
# Apply rotational broadening
if rot_broad is not None:
model_flux = rot_int_cmj(
wavel=model_wavel,
flux=model_flux,
vsini=rot_broad,
eps=limb_dark,
)
# Apply extinction
if "ism_ext" in model_param:
ism_reddening = model_param.get("ism_red", 3.1)
ext_filt = ism_extinction(
model_param["ism_ext"],
ism_reddening,
model_wavel,
)
model_flux *= 10.0 ** (-0.4 * ext_filt)
if "ext_av" in model_param:
import dust_extinction.parameter_averages as dust_ext
ext_object = getattr(dust_ext, ext_model)()
if "ext_rv" in model_param:
ext_object.Rv = model_param["ext_rv"]
# Wavelength range (um) for which the extinction is defined
ext_wavel = (1.0 / ext_object.x_range[1], 1.0 / ext_object.x_range[0])
if model_wavel[0] < ext_wavel[0] or model_wavel[-1] > ext_wavel[1]:
warnings.warn(
"The wavelength range of the model spectrum "
f"({model_wavel[0]:.3f}-{model_wavel[-1]:.3f} "
"um) does not fully lie within the available "
"wavelength range of the extinction model "
f"({ext_wavel[0]:.3f}-{ext_wavel[1]:.3f} um). "
"The extinction will therefore not be applied "
"to fluxes of which the wavelength lies "
"outside the range of the extinction model."
)
wavel_select = (model_wavel > ext_wavel[0]) & (model_wavel < ext_wavel[1])
model_flux[wavel_select] *= ext_object.extinguish(
model_wavel[wavel_select] * u.micron, Av=model_param["ext_av"]
)
elif "lognorm_ext" in model_param:
cross_tmp = cross_sections(
(
model_wavel,
10.0 ** model_param["lognorm_radius"],
model_param["lognorm_sigma"],
)
)
model_flux *= np.exp(-model_param["lognorm_ext"] * cross_tmp)
elif "powerlaw_ext" in model_param:
cross_tmp = cross_sections(
(
model_wavel,
10.0 ** model_param["powerlaw_max"],
model_param["powerlaw_exp"],
)
)
model_flux *= np.exp(-model_param["powerlaw_ext"] * cross_tmp)
# elif self.ext_filter is not None:
# ism_reddening = all_param.get("ism_red", 3.1)
#
# av_required = convert_to_av(
# filter_name=self.ext_filter,
# filter_ext=all_param[f"phot_ext_{self.ext_filter}"],
# v_band_red=ism_reddening,
# )
#
# ext_spec = ism_extinction(
# av_required, ism_reddening, self.spectrum[spec_item][0][:, 0]
# )
#
# model_flux *= 10.0 ** (-0.4 * ext_spec)
# Scale the spectrum by (radius/distance)^2
if "radius" in model_param:
model_flux *= (model_param["radius"] * constants.R_JUP) ** 2 / (
1e3 * constants.PARSEC / model_param["parallax"]
) ** 2
elif "flux_scaling" in model_param:
model_flux *= model_param["flux_scaling"]
elif "log_flux_scaling" in model_param:
model_flux *= 10.0 ** model_param["log_flux_scaling"]
# Apply radial velocity shift
if rad_vel is not None:
# Wavelength shift in um
# rad_vel in km s-1 and constants.LIGHT in m s-1
wavel_shift = model_wavel * 1e3 * rad_vel / constants.LIGHT
# Resampling will introduce a few NaNs at the edge of the flux
# array but that should not influence the fit given the total
# number of wavelength points of a typical spectrum
model_flux = spectres_numba(
model_wavel,
model_wavel + wavel_shift,
model_flux,
spec_errs=None,
fill=np.nan,
verbose=False,
)
# Apply instrument broadening
if spec_res is not None and not np.isnan(spec_res):
model_flux = smooth_spectrum(model_wavel, model_flux, spec_res)
# Resample wavelengths to data
if data_wavel is not None:
# The 'fill' should not happen because model_wavel is
# 20 wavelength points broader than data_wavel, but
# spectres sometimes sets the outer fluxes to 'fill'
# depending on the spectral resolution of the data
model_flux = spectres_numba(
data_wavel,
model_wavel,
model_flux,
spec_errs=None,
fill=np.nan,
verbose=False,
)
# Set the flux to the neighboring wavelength
# if a NaN is present at the edge.
# TODO this is quick hack and not the best solution
if np.isnan(model_flux[0]):
model_flux[0] = model_flux[1]
if np.isnan(model_flux[-1]):
model_flux[0] = model_flux[-2]
# Shift the spectrum by a constant
# if "flux_offset" in model_param:
# model_flux += model_param["flux_offset"]
return model_flux
[docs]
@beartype
def rot_int_cmj(
wavel: np.ndarray,
flux: np.ndarray,
vsini: Real,
eps: Real = 0.6,
nr: int = 10,
ntheta: int = 100,
dif: Real = 0.0,
):
"""
A routine to quickly rotationally broaden a spectrum in linear time.
This function has been adopted from `Carvalho & Johns-Krull (2023)
<https://ui.adsabs.harvard.edu/abs/2023RNAAS...7...91C>`_.
Parameters
----------
wavel : np.ndarray
Array with the wavelengths.
flux : np.ndarray
Array with the fluxes
vsini : float
Projected rotational velocity (km s-1).
eps : float
Coefficient of the limb darkening law (default: 0.0).
nr : int
Number of radial bins on the projected disk (default: 10).
ntheta : int
Number of azimuthal bins in the largest radial annulus
(default: 100). Note: the number of bins at each r is
int(r*ntheta) where r < 1.
dif : float
Differential rotation coefficient (default = 0.0), applied
according to the law Omeg(th)/Omeg(eq) = (1 - dif/2 - (dif/2)
cos(2 th)). Dif = 0.675 reproduces the law proposed by Smith,
1994, A&A, Vol. 287, p. 523-534, to unify WTTS and CTTS.
Dif = 0.23 is similar to observed solar differential rotation.
Note: the th in the expression above is the stellar co-latitude,
which is not the same as the integration variable used in the
function. This is a disk integration routine.
Returns
-------
np.ndarray
Array with the rotationally broadened spectrum.
"""
ns = np.zeros(flux.shape)
tarea = 0.0
dr = 1.0 / nr
for j in range(nr):
r = dr / 2.0 + j * dr
area = (
((r + dr / 2.0) ** 2 - (r - dr / 2.0) ** 2)
/ int(ntheta * r)
* (1.0 - eps + eps * np.cos(np.arcsin(r)))
)
for k in range(int(ntheta * r)):
th = np.pi / int(ntheta * r) + k * 2.0 * np.pi / int(ntheta * r)
vl = r * vsini * np.sin(th)
if dif != 0:
vl *= (1.0 - dif / 2.0 - dif / 2.0 * np.cos(2.0 * np.arccos(r * np.cos(th))))
ns += area * np.interp(
wavel + wavel * vl / (1e-3 * constants.LIGHT), wavel, flux
)
tarea += area
return ns / tarea
[docs]
@beartype
def check_nearest_spec(model_name: str, model_param: Dict[str, Real]):
"""
Check if the nearest grid points of the requested model parameters
have a spectrum stored in the database. For some grids, spectra
are missing for certain parameters, in which case a spectrum with
zero fluxes has been stored in the database. Interpolating from
such a grid point will therefore give an inaccurate spectrum, so
it is important to check if for example the best-fit parameters
from a fit are close to grid points with a missing spectrum.
Parameters
----------
model_name : str
Name of the atmosphere model.
model_param : dict
Dictionary with the model parameters.
Returns
-------
NoneType
None
"""
from species.read.read_model import ReadModel
read_model = ReadModel(model_name)
model_points = read_model.get_points()
near_low = {}
near_high = {}
param_idx = []
for param_key, param_value in model_points.items():
near_idx = np.argsort(np.abs(model_param[param_key] - param_value))
near_low[param_key] = param_value[near_idx[0]]
near_high[param_key] = param_value[near_idx[1]]
param_idx.append((near_idx[0], near_idx[1]))
for comb_item in list(product(*param_idx)):
model_param = {}
for param_idx, param_key in enumerate(model_points):
model_param[param_key] = model_points[param_key][comb_item[param_idx]]
model_box = read_model.get_data(model_param)
if np.count_nonzero(model_box.flux) == 0:
warnings.warn(
"The selected parameters have a nearest grid point for "
f"which a spectrum is not available: {model_param}. "
"Therefore, zeros had been stored for the spectrum at "
"this grid point. Interpolating from this grid point "
"will therefore be inaccurate. When using FitModel, it "
"is best to adjust the prior range in the 'bounds' "
"parameter accordingly to exclude the parameter space "
"for which model spectrum is missing. See also details "
"printed when the model spectra are added to the "
"database with add_model()."
)