"""
Module with reading functionalities for spectral libraries.
"""
import os
from configparser import ConfigParser
import h5py
import numpy as np
from beartype import beartype
from beartype.typing import List, Optional, Union
from species.core.box import PhotometryBox, SpectrumBox, create_box
from species.data.spec_data.add_spec_data import add_spec_library
from species.phot.syn_phot import SyntheticPhotometry
from species.read.read_filter import ReadFilter
from species.util.dust_util import ism_extinction
[docs]
class ReadSpectrum:
"""
Class for reading spectral library data from the database.
"""
@beartype
def __init__(self, spec_library: str, filter_name: Optional[str] = None) -> None:
"""
Parameters
----------
spec_library : str
Name of the spectral library ('irtf', 'spex', 'kesseli+2017',
'bonnefoy+2014', 'allers+2013', or 'vega').
filter_name : str, None
Filter name for the wavelength range. Full spectra
are read if the argument is set to ``None``.
Returns
-------
NoneType
None
"""
self.spec_library = spec_library
self.filter_name = filter_name
if filter_name is None:
self.wavel_range = None
else:
transmission = ReadFilter(filter_name)
self.wavel_range = transmission.wavelength_range()
if "SPECIES_CONFIG" in os.environ:
config_file = os.environ["SPECIES_CONFIG"]
else:
config_file = os.path.join(os.getcwd(), "species_config.ini")
config = ConfigParser()
config.read(config_file)
self.database = config["species"]["database"]
self.data_folder = config["species"]["data_folder"]
[docs]
@beartype
def get_spectrum(
self,
object_name: Optional[Union[str, List[str]]] = None,
sptypes: Optional[List[str]] = None,
exclude_nan: bool = True,
av_ext: Optional[float] = None,
) -> SpectrumBox:
"""
Function for selecting spectra from the database.
Parameters
----------
object_name : str, list(str), None
Object name of which the spectrum will be selected from the
spectral library. Either a single object or a list of object
names. By setting the argument to ``None``, the ``sptype``
parameter will be used.
sptypes : list(str), None
Spectral types to select from a library. The spectral types
should be indicated with two characters (e.g. 'M5', 'L2',
'T3'). All spectra are selected if set to ``None``. For
each object in the ``spec_library``, the requested
``sptypes`` are first compared with the optical spectral
type and, if not available, secondly the near-infrared
spectral type. To use the ``sptypes`` parameter, the
argument of ``object_name`` should be set to ``None``
exclude_nan : bool
Exclude wavelength points for which the flux is NaN.
av_ext : float, None
Visual extinction, :math:`A_V`, applied to the spectra.
The extinction is calculated with the empirical relation
from `Cardelli et al. (1989)
<https://ui.adsabs.harvard.edu/abs/1989ApJ...345..245C/
abstract>`_. The extinction is not applied if the argument
is set to ``None``.
Returns
-------
species.core.box.SpectrumBox
Box with the spectra.
"""
with h5py.File(self.database, "r") as hdf5_file:
# Check if the spectral library is found in
# 'r' mode because the 'a' mode is not possible
# when using multiprocessing
spec_found = f"spectra/{self.spec_library}" in hdf5_file
if not spec_found:
with h5py.File(self.database, "a") as hdf5_file:
add_spec_library(
self.data_folder, hdf5_file, self.spec_library, sptypes
)
list_wavelength = []
list_flux = []
list_error = []
list_name = []
list_simbad = []
list_sptype = []
list_parallax = []
list_spec_res = []
if object_name is not None:
if isinstance(object_name, str):
object_name = [object_name]
with h5py.File(self.database, "r") as hdf5_file:
for object_item in object_name:
if object_item not in hdf5_file[f"spectra/{self.spec_library}"]:
object_list = list(hdf5_file[f"spectra/{self.spec_library}"])
raise ValueError(
f"The selected 'object_name' (='{object_item}') "
"is found in the selected spectral library "
f"(='{self.spec_library}'). The following objects "
f"are available in the database: {object_list}"
)
with h5py.File(self.database, "r") as hdf5_file:
for spec_item in hdf5_file[f"spectra/{self.spec_library}"]:
if object_name is not None and spec_item not in object_name:
continue
dset = hdf5_file[f"spectra/{self.spec_library}/{spec_item}"]
wavelength = dset[:, 0] # (um)
flux = dset[:, 1] # (W m-2 um-1)
error = dset[:, 2] # (W m-2 um-1)
if exclude_nan:
nan_index = np.isnan(flux)
wavelength = wavelength[~nan_index]
flux = flux[~nan_index]
error = error[~nan_index]
if av_ext is not None:
ism_ext = ism_extinction(av_ext, 3.1, wavelength)
flux *= 10.0 ** (-0.4 * ism_ext)
if self.wavel_range is None:
wl_index = np.arange(0, len(wavelength), 1)
else:
wl_index = (
(flux > 0.0)
& (wavelength > self.wavel_range[0])
& (wavelength < self.wavel_range[1])
)
count = np.count_nonzero(wl_index)
if count > 0:
index = np.where(wl_index)[0]
if index[0] > 0:
wl_index[index[0] - 1] = True
if index[-1] < len(wl_index) - 1:
wl_index[index[-1] + 1] = True
list_wavelength.append(wavelength[wl_index])
list_flux.append(flux[wl_index])
list_error.append(error[wl_index])
attrs = dset.attrs
if "name" in attrs:
if isinstance(dset.attrs["name"], str):
list_name.append(dset.attrs["name"])
else:
list_name.append(dset.attrs["name"].decode("utf-8"))
else:
list_name.append("")
if "simbad" in attrs:
if isinstance(dset.attrs["simbad"], str):
list_simbad.append(dset.attrs["simbad"])
else:
list_simbad.append(dset.attrs["simbad"].decode("utf-8"))
else:
list_simbad.append("")
if "sptype" in attrs:
if isinstance(dset.attrs["sptype"], str):
list_sptype.append(dset.attrs["sptype"])
else:
list_sptype.append(dset.attrs["sptype"].decode("utf-8"))
else:
list_sptype.append("None")
if "parallax" in attrs:
list_parallax.append(
(dset.attrs["parallax"], dset.attrs["parallax_error"])
)
else:
list_parallax.append((np.nan, np.nan))
if "spec_res" in attrs:
list_spec_res.append(dset.attrs["spec_res"])
else:
list_spec_res.append(np.nan)
else:
list_wavelength.append(np.array([]))
list_flux.append(np.array([]))
list_error.append(np.array([]))
list_name.append("")
list_simbad.append("")
list_sptype.append("None")
list_parallax.append((np.nan, np.nan))
list_spec_res.append(np.nan)
spec_box = SpectrumBox()
spec_box.spec_library = self.spec_library
if sptypes is not None:
spec_box.wavelength = []
spec_box.flux = []
spec_box.error = []
spec_box.name = []
spec_box.simbad = []
spec_box.sptype = []
spec_box.parallax = []
spec_box.spec_res = []
for spt_item in sptypes:
for i, spec_item in enumerate(list_sptype):
if spt_item == spec_item[:2]:
spec_box.wavelength.append(list_wavelength[i])
spec_box.flux.append(list_flux[i])
spec_box.error.append(list_error[i])
spec_box.name.append(list_name[i])
spec_box.simbad.append(list_simbad[i])
spec_box.sptype.append(list_sptype[i])
spec_box.parallax.append(list_parallax[i])
spec_box.spec_res.append(list_spec_res[i])
else:
spec_box.wavelength = list_wavelength
spec_box.flux = list_flux
spec_box.error = list_error
spec_box.name = list_name
spec_box.simbad = list_simbad
spec_box.sptype = list_sptype
spec_box.parallax = list_parallax
spec_box.spec_res = list_spec_res
return spec_box
[docs]
@beartype
def get_flux(
self,
object_name: Optional[Union[str, List[str]]] = None,
sptypes: Optional[List[str]] = None,
av_ext: Optional[float] = None,
) -> PhotometryBox:
"""
Function for calculating the flux density for the filter
that is set with ``filter_name``.
Parameters
----------
object_name : str, list(str), None
Object name of which the spectrum will be selected from the
spectral library. Either a single object or a list of object
names. By setting the argument to ``None``, the ``sptype``
parameter will be used.
sptypes : list(str), None
Spectral types to select from a library. The spectral types
should be indicated with two characters (e.g. 'M5', 'L2',
'T3'). All spectra are selected if set to ``None``. For
each object in the ``spec_library``, the requested
``sptypes`` are first compared with the optical spectral
type and, if not available, secondly the near-infrared
spectral type. To use the ``sptypes`` parameter, the
argument of ``object_name`` should be set to ``None``
av_ext : float, None
Visual extinction, :math:`A_V`, applied to the spectra.
The extinction is calculated with the empirical relation
from `Cardelli et al. (1989)
<https://ui.adsabs.harvard.edu/abs/1989ApJ...345..245C/
abstract>`_. The extinction is not applied if the argument
is set to ``None``.
Returns
-------
species.core.box.PhotometryBox
Box with the synthetic photometry.
"""
spec_box = self.get_spectrum(
object_name=object_name,
sptypes=sptypes,
exclude_nan=True,
av_ext=av_ext,
)
n_spectra = len(spec_box.wavelength)
filter_profile = ReadFilter(filter_name=self.filter_name)
mean_wavel = filter_profile.mean_wavelength()
filter_wavel_list = np.full(n_spectra, mean_wavel)
filter_name_list = np.full(n_spectra, self.filter_name)
syn_phot = SyntheticPhotometry(filter_name=self.filter_name)
flux_list = []
for i in range(n_spectra):
spec_wavel = spec_box.wavelength[i]
spec_flux = spec_box.flux[i]
spec_error = spec_box.error[i]
if np.sum(np.isnan(spec_error)) == spec_error.size:
spec_error = None
phot_flux = syn_phot.spectrum_to_flux(
wavelength=spec_wavel,
flux=spec_flux,
error=spec_error,
)
flux_list.append(phot_flux)
return create_box(
boxtype="photometry",
name=spec_box.name,
sptype=spec_box.sptype,
wavelength=filter_wavel_list,
flux=np.array(flux_list),
app_mag=None,
abs_mag=None,
filter_name=filter_name_list,
)
[docs]
@beartype
def get_magnitude(
self,
object_name: Optional[Union[str, List[str]]] = None,
sptypes: Optional[List[str]] = None,
av_ext: Optional[float] = None,
) -> PhotometryBox:
"""
Function for calculating the magnitude for the filter
that is set with ``filter_name``.
Parameters
----------
object_name : str, list(str), None
Object name of which the spectrum will be selected from the
spectral library. Either a single object or a list of object
names. By setting the argument to ``None``, the ``sptype``
parameter will be used.
sptypes : list(str), None
Spectral types to select from a library. The spectral types
should be indicated with two characters (e.g. 'M5', 'L2',
'T3'). All spectra are selected if set to ``None``. For
each object in the ``spec_library``, the requested
``sptypes`` are first compared with the optical spectral
type and, if not available, secondly the near-infrared
spectral type. To use the ``sptypes`` parameter, the
argument of ``object_name`` should be set to ``None``
av_ext : float, None
Visual extinction, :math:`A_V`, applied to the spectra.
The extinction is calculated with the empirical relation
from `Cardelli et al. (1989)
<https://ui.adsabs.harvard.edu/abs/1989ApJ...345..245C/
abstract>`_. The extinction is not applied if the argument
is set to ``None``.
Returns
-------
species.core.box.PhotometryBox
Box with the synthetic photometry.
"""
spec_box = self.get_spectrum(
object_name=object_name,
sptypes=sptypes,
exclude_nan=True,
av_ext=av_ext,
)
n_spectra = len(spec_box.wavelength)
filter_profile = ReadFilter(filter_name=self.filter_name)
mean_wavel = filter_profile.mean_wavelength()
filter_wavel_list = np.full(n_spectra, mean_wavel)
filter_name_list = np.full(n_spectra, self.filter_name)
syn_phot = SyntheticPhotometry(filter_name=self.filter_name)
app_mag_list = []
abs_mag_list = []
for i in range(n_spectra):
spec_wavel = spec_box.wavelength[i]
spec_flux = spec_box.flux[i]
spec_error = spec_box.error[i]
if np.sum(np.isnan(spec_error)) == spec_error.size:
spec_error = None
if np.isnan(spec_box.parallax[i][0]):
parallax = None
else:
parallax = (
float(spec_box.parallax[i][0]),
float(spec_box.parallax[i][1]),
)
app_mag, abs_mag = syn_phot.spectrum_to_magnitude(
spec_wavel,
spec_flux,
error=spec_error,
parallax=parallax,
)
app_mag_list.append(app_mag)
abs_mag_list.append(abs_mag)
return create_box(
boxtype="photometry",
name=spec_box.name,
sptype=spec_box.sptype,
wavelength=filter_wavel_list,
flux=None,
app_mag=np.asarray(app_mag_list),
abs_mag=np.asarray(abs_mag_list),
filter_name=filter_name_list,
)