Source code for species.read.read_spectrum

"""
Module with reading functionalities for spectral libraries.
"""

import os

from configparser import ConfigParser
from typing import List, Optional

import h5py
import numpy as np

from typeguard import typechecked

from species.core.box import PhotometryBox, SpectrumBox, create_box
from species.data.spec_data.add_spec_data import add_spec_library
from species.phot.syn_phot import SyntheticPhotometry
from species.read.read_filter import ReadFilter


[docs] class ReadSpectrum: """ Class for reading spectral library data from the database. """ @typechecked def __init__(self, spec_library: str, filter_name: Optional[str] = None) -> None: """ Parameters ---------- spec_library : str Name of the spectral library ('irtf', 'spex', 'kesseli+2017', 'bonnefoy+2014', 'allers+2013', or 'vega'). filter_name : str, None Filter name for the wavelength range. Full spectra are read if the argument is set to ``None``. Returns ------- NoneType None """ self.spec_library = spec_library self.filter_name = filter_name if filter_name is None: self.wavel_range = None else: transmission = ReadFilter(filter_name) self.wavel_range = transmission.wavelength_range() config_file = os.path.join(os.getcwd(), "species_config.ini") config = ConfigParser() config.read(config_file) self.database = config["species"]["database"] self.data_folder = config["species"]["data_folder"]
[docs] @typechecked def get_spectrum( self, sptypes: Optional[List[str]] = None, exclude_nan: bool = True ) -> SpectrumBox: """ Function for selecting spectra from the database. Parameters ---------- sptypes : list(str), None Spectral types to select from a library. The spectral types should be indicated with two characters (e.g. 'M5', 'L2', 'T3'). All spectra are selected if set to ``None``. For each object in the ``spec_library``, the requested ``sptypes`` are first compared with the optical spectral type and, if not available, secondly the near-infrared spectral type. exclude_nan : bool Exclude wavelength points for which the flux is NaN. Returns ------- species.core.box.SpectrumBox Box with the spectra. """ with h5py.File(self.database, "a") as hdf5_file: if f"spectra/{self.spec_library}" not in hdf5_file: add_spec_library( self.data_folder, hdf5_file, self.spec_library, sptypes ) list_wavelength = [] list_flux = [] list_error = [] list_name = [] list_simbad = [] list_sptype = [] list_parallax = [] list_spec_res = [] with h5py.File(self.database, "r") as hdf5_file: for item in hdf5_file[f"spectra/{self.spec_library}"]: dset = hdf5_file[f"spectra/{self.spec_library}/{item}"] wavelength = dset[:, 0] # (um) flux = dset[:, 1] # (W m-2 um-1) error = dset[:, 2] # (W m-2 um-1) if exclude_nan: nan_index = np.isnan(flux) wavelength = wavelength[~nan_index] flux = flux[~nan_index] error = error[~nan_index] if self.wavel_range is None: wl_index = np.arange(0, len(wavelength), 1) else: wl_index = ( (flux > 0.0) & (wavelength > self.wavel_range[0]) & (wavelength < self.wavel_range[1]) ) count = np.count_nonzero(wl_index) if count > 0: index = np.where(wl_index)[0] if index[0] > 0: wl_index[index[0] - 1] = True if index[-1] < len(wl_index) - 1: wl_index[index[-1] + 1] = True list_wavelength.append(wavelength[wl_index]) list_flux.append(flux[wl_index]) list_error.append(error[wl_index]) attrs = dset.attrs if "name" in attrs: if isinstance(dset.attrs["name"], str): list_name.append(dset.attrs["name"]) else: list_name.append(dset.attrs["name"].decode("utf-8")) else: list_name.append("") if "simbad" in attrs: if isinstance(dset.attrs["simbad"], str): list_simbad.append(dset.attrs["simbad"]) else: list_simbad.append(dset.attrs["simbad"].decode("utf-8")) else: list_simbad.append("") if "sptype" in attrs: if isinstance(dset.attrs["sptype"], str): list_sptype.append(dset.attrs["sptype"]) else: list_sptype.append(dset.attrs["sptype"].decode("utf-8")) else: list_sptype.append("None") if "parallax" in attrs: list_parallax.append( (dset.attrs["parallax"], dset.attrs["parallax_error"]) ) else: list_parallax.append((np.nan, np.nan)) if "spec_res" in attrs: list_spec_res.append(dset.attrs["spec_res"]) else: list_spec_res.append(np.nan) else: list_wavelength.append(np.array([])) list_flux.append(np.array([])) list_error.append(np.array([])) list_name.append("") list_simbad.append("") list_sptype.append("None") list_parallax.append((np.nan, np.nan)) list_spec_res.append(np.nan) spec_box = SpectrumBox() spec_box.spec_library = self.spec_library if sptypes is not None: spec_box.wavelength = [] spec_box.flux = [] spec_box.error = [] spec_box.name = [] spec_box.simbad = [] spec_box.sptype = [] spec_box.parallax = [] spec_box.spec_res = [] for item in sptypes: for i, spec_item in enumerate(list_sptype): if item == spec_item[:2]: spec_box.wavelength.append(list_wavelength[i]) spec_box.flux.append(list_flux[i]) spec_box.error.append(list_error[i]) spec_box.name.append(list_name[i]) spec_box.simbad.append(list_simbad[i]) spec_box.sptype.append(list_sptype[i]) spec_box.parallax.append(list_parallax[i]) spec_box.spec_res.append(list_spec_res[i]) else: spec_box.wavelength = list_wavelength spec_box.flux = list_flux spec_box.error = list_error spec_box.name = list_name spec_box.simbad = list_simbad spec_box.sptype = list_sptype spec_box.parallax = list_parallax spec_box.spec_res = list_spec_res return spec_box
[docs] @typechecked def get_flux(self, sptypes: Optional[List[str]] = None) -> PhotometryBox: """ Function for calculating the average flux density for the ``filter_name``. Parameters ---------- sptypes : list(str), None Spectral types to select from a library. The spectral types should be indicated with two characters (e.g. 'M5', 'L2', 'T3'). All spectra are selected if set to ``None``. Returns ------- species.core.box.PhotometryBox Box with the synthetic photometry. """ spec_box = self.get_spectrum(sptypes=sptypes, exclude_nan=True) n_spectra = len(spec_box.wavelength) filter_profile = ReadFilter(filter_name=self.filter_name) mean_wavel = filter_profile.mean_wavelength() wavelengths = np.full(n_spectra, mean_wavel) filters = np.full(n_spectra, self.filter_name) synphot = SyntheticPhotometry(filter_name=self.filter_name) phot_flux = [] for i in range(n_spectra): flux = synphot.spectrum_to_flux( wavelength=spec_box.wavelength[i], flux=spec_box.flux[i], error=spec_box.error[i], ) phot_flux.append(flux) phot_flux = np.asarray(phot_flux) return create_box( boxtype="photometry", name=spec_box.name, sptype=spec_box.sptype, wavelength=wavelengths, flux=phot_flux, app_mag=None, abs_mag=None, filter_name=filters, )
[docs] @typechecked def get_magnitude(self, sptypes: Optional[List[str]] = None) -> PhotometryBox: """ Function for calculating the apparent magnitude for the specified ``filter_name``. Parameters ---------- sptypes : list(str), None Spectral types to select from the library. The spectral types should be indicated with two characters (e.g. 'M5', 'L2', 'T3'). All spectra are selected if set to ``None``. Returns ------- species.core.box.PhotometryBox Box with the synthetic photometry. """ spec_box = self.get_spectrum(sptypes=sptypes, exclude_nan=True) n_spectra = len(spec_box.wavelength) filter_profile = ReadFilter(filter_name=self.filter_name) mean_wavel = filter_profile.mean_wavelength() wavelengths = np.full(n_spectra, mean_wavel) filters = np.full(n_spectra, self.filter_name) synphot = SyntheticPhotometry(filter_name=self.filter_name) app_mag = [] abs_mag = [] for i in range(n_spectra): if np.isnan(spec_box.parallax[i][0]): app_tmp = (np.nan, np.nan) abs_tmp = (np.nan, np.nan) else: app_tmp, abs_tmp = synphot.spectrum_to_magnitude( spec_box.wavelength[i], spec_box.flux[i], error=spec_box.error[i], parallax=( float(spec_box.parallax[i][0]), float(spec_box.parallax[i][1]), ), ) app_mag.append(app_tmp) abs_mag.append(abs_tmp) return create_box( boxtype="photometry", name=spec_box.name, sptype=spec_box.sptype, wavelength=wavelengths, flux=None, app_mag=np.asarray(app_mag), abs_mag=np.asarray(abs_mag), filter_name=filters, )