"""
Module with reading functionalities for calibration spectra.
"""
import configparser
import os
from numbers import Real
import h5py
import numpy as np
from beartype import beartype
from beartype.typing import Dict, Optional, Tuple
from scipy.optimize import curve_fit
from spectres.spectral_resampling_numba import spectres_numba
from species.core.box import SpectrumBox, create_box
from species.phot.syn_phot import SyntheticPhotometry
from species.read.read_filter import ReadFilter
from species.util.spec_util import create_wavelengths, smooth_spectrum
[docs]
class ReadCalibration:
"""
Class for reading a calibration spectrum from the database.
"""
@beartype
def __init__(self, tag: str, filter_name: Optional[str] = None) -> None:
"""
Parameters
----------
tag : str
Database tag of the calibration spectrum.
filter_name : str, None
Filter that is used for the wavelength range. The full
spectrum is read if the argument is set to ``None``.
Returns
-------
NoneType
None
"""
self.tag = tag
self.filter_name = filter_name
if filter_name is None:
self.wavel_range = None
else:
transmission = ReadFilter(filter_name)
self.wavel_range = transmission.wavelength_range()
if "SPECIES_CONFIG" in os.environ:
config_file = os.environ["SPECIES_CONFIG"]
else:
config_file = os.path.join(os.getcwd(), "species_config.ini")
config = configparser.ConfigParser()
config.read(config_file)
self.database = config["species"]["database"]
[docs]
@beartype
def resample_spectrum(
self,
wavel_points: np.ndarray,
model_param: Optional[Dict[str, Real]] = None,
spec_res: Optional[Real] = None,
apply_mask: bool = False,
interp_highres: bool = False,
) -> SpectrumBox:
"""
Function for resampling the spectrum and optional
uncertainties onto a new wavelength grid.
Parameters
----------
wavel_points : np.ndarray
Wavelengths (um).
model_param : dict, None
Dictionary with the model parameters, which should only
contain the ``'scaling'`` keyword. No scaling is applied if
the argument of ``model_param`` is set to ``None``.
spec_res : float, None
Spectral resolution that is used for smoothing the spectrum
before resampling the wavelengths. No smoothing is applied
if the argument is set to ``None``. The smoothing can only
be applied to spectra with a constant
$\\lambda/\\Delta\\lambda$ (which is the case for all model
spectra that are supported by ``species``) or a constant
wavelength spacing. The ``interp_highres`` parameter can be
set to ``True`` to resample the spectrum to
$\\lambda/\\Delta\\lambda = 10000$ before smoothing the
spectrum.
apply_mask : bool
Exclude negative values and NaNs.
interp_highres : bool
Oversample the spectrum to
$\\lambda/\\Delta\\lambda = 10000$, such that the smoothing
by the ``spec_res`` parameter will be applied on a spectrum
with constant logarithmically spaced wavelengths.
Returns
-------
species.core.box.SpectrumBox
Box with the resampled spectrum.
"""
calib_box = self.get_spectrum(apply_mask=apply_mask)
if interp_highres:
wavel_highres = create_wavelengths(
(calib_box.wavelength[0], calib_box.wavelength[-1]), 10000.0
)
flux_highres, error_highres = spectres_numba(
wavel_highres,
calib_box.wavelength,
calib_box.flux,
spec_errs=calib_box.error,
fill=np.nan,
verbose=True,
)
if spec_res is not None:
flux_highres = smooth_spectrum(
wavelength=wavel_highres,
flux=flux_highres,
spec_res=spec_res,
)
flux_new, error_new = spectres_numba(
wavel_points,
wavel_highres,
flux_highres,
spec_errs=error_highres,
fill=np.nan,
verbose=True,
)
else:
if spec_res is not None:
calib_box.flux = smooth_spectrum(
wavelength=calib_box.wavelength,
flux=calib_box.flux,
spec_res=spec_res,
)
flux_new, error_new = spectres_numba(
wavel_points,
calib_box.wavelength,
calib_box.flux,
spec_errs=calib_box.error,
fill=np.nan,
verbose=True,
)
if model_param is not None:
flux_new = model_param["scaling"] * flux_new
error_new = model_param["scaling"] * error_new
return create_box(
boxtype="spectrum",
spectrum="calibration",
wavelength=wavel_points,
flux=flux_new,
error=error_new,
name=self.tag,
)
[docs]
@beartype
def get_spectrum(
self,
model_param: Optional[Dict[str, Real]] = None,
apply_mask: bool = False,
wavel_sampling: Optional[Real] = None,
extrapolate: bool = False,
min_wavelength: Optional[Real] = None,
) -> SpectrumBox:
"""
Function for selecting the calibration spectrum.
Parameters
----------
model_param : dict, None
Model parameters. Should contain the 'scaling' value. Not
used if set to ``None``.
apply_mask : bool
Exclude negative values and NaN values.
wavel_sampling : float, None
Wavelength sampling :math:`\\lambda/\\Delta\\lambda`. The
original wavelength points are used if set to ``None``.
extrapolate : bool
Extrapolate to 6 um by fitting a power law function.
min_wavelength : float, None
Minimum wavelength used for fitting the power law function.
All data is used if set to ``None``.
Returns
-------
species.core.box.SpectrumBox
Box with the spectrum.
"""
with h5py.File(self.database, "r") as h5_file:
cal_spec = np.array(h5_file[f"spectra/calibration/{self.tag}"])
wavelength = cal_spec[0,]
flux = cal_spec[1,]
error = cal_spec[2,]
if apply_mask:
indices = np.where(flux > 0.0)[0]
wavelength = wavelength[indices]
flux = flux[indices]
error = error[indices]
if model_param is not None:
flux = model_param["scaling"] * flux
error = model_param["scaling"] * error
if self.wavel_range is None:
wl_index = np.ones(wavelength.size, dtype=bool)
else:
wl_index = (
(flux > 0.0)
& (wavelength > self.wavel_range[0])
& (wavelength < self.wavel_range[1])
)
count = np.count_nonzero(wl_index)
if count > 0:
index = np.where(wl_index)[0]
if index[0] > 0:
wl_index[index[0] - 1] = True
if index[-1] < len(wl_index) - 1:
wl_index[index[-1] + 1] = True
wavelength = wavelength[wl_index]
flux = flux[wl_index]
error = error[wl_index]
if extrapolate:
def _power_law(wavelength, offset, scaling, power_index):
return offset + scaling * wavelength**power_index
if min_wavelength:
indices = np.where(wavelength > min_wavelength)[0]
else:
indices = np.arange(0, wavelength.size, 1)
popt, pcov = curve_fit(
f=_power_law,
xdata=wavelength[indices],
ydata=flux[indices],
p0=(0.0, np.mean(flux[indices]), -1.0),
sigma=error[indices],
)
sigma = np.sqrt(np.diag(pcov))
print("Fit result for f(x) = a + b*x^c:")
print(f"a = {popt[0]} +/- {sigma[0]}")
print(f"b = {popt[1]} +/- {sigma[1]}")
print(f"c = {popt[2]} +/- {sigma[2]}")
while wavelength[-1] <= 6.0:
wl_add = wavelength[-1] + wavelength[-1] / 1000.0
wavelength = np.append(wavelength, wl_add)
flux = np.append(flux, _power_law(wl_add, popt[0], popt[1], popt[2]))
error = np.append(error, 0.0)
if wavel_sampling is not None:
wavelength_new = create_wavelengths(
(wavelength[0], wavelength[-1]), wavel_sampling
)
flux_new, error_new = spectres_numba(
wavelength_new,
wavelength,
flux,
spec_errs=error,
fill=np.nan,
verbose=True,
)
# Fluxes at the edges on the resampled spectrum might be NaN
flux_nan = np.isnan(flux_new)
wavelength = wavelength_new[~flux_nan]
flux = flux_new[~flux_nan]
error = error_new[~flux_nan]
return create_box(
boxtype="spectrum",
spectrum="calibration",
wavelength=wavelength,
flux=flux,
error=error,
name=self.tag,
)
[docs]
@beartype
def get_flux(
self, model_param: Optional[Dict[str, Real]] = None
) -> Tuple[Real, Real]:
"""
Function for calculating the average flux for the
``filter_name``.
Parameters
----------
model_param : dict, None
Model parameters. Should contain the 'scaling' value. Not
used if set to ``None``.
Returns
-------
Returns
-------
float
Average flux (W m-2 um-1).
float
Uncertainty (W m-2 um-1).
"""
specbox = self.get_spectrum(model_param=model_param, apply_mask=True)
synphot = SyntheticPhotometry(self.filter_name)
return synphot.spectrum_to_flux(
specbox.wavelength, specbox.flux, error=specbox.flux
)
[docs]
@beartype
def get_magnitude(
self,
model_param: Optional[Dict[str, Real]] = None,
distance: Optional[Tuple[Real, Real]] = None,
) -> Tuple[Tuple[Real, Optional[Real]], Tuple[Optional[Real], Optional[Real]]]:
"""
Function for calculating the apparent magnitude for the
``filter_name``.
Parameters
----------
model_param : dict, None
Model parameters. Should contain the 'scaling' value. Not
used if set to ``None``.
distance : tuple(float, float), None
Distance and uncertainty to the calibration object (pc).
Not used if set to ``None``, in which case the returned
absolute magnitude is ``(None, None)``.
Returns
-------
tuple(float, float)
Apparent magnitude and uncertainty.
tuple(float, float), tuple(None, None)
Absolute magnitude and uncertainty.
"""
specbox = self.get_spectrum(model_param=model_param, apply_mask=True)
if np.count_nonzero(specbox.error) == 0:
error = None
else:
error = specbox.error
synphot = SyntheticPhotometry(self.filter_name)
return synphot.spectrum_to_magnitude(
specbox.wavelength, specbox.flux, error=error, distance=distance
)