"""
Module with reading functionalities for spectral libraries.
"""
import os
import configparser
import h5py
import numpy as np
from species.analysis import photometry
from species.core import box
from species.data import database
from species.read import read_filter
[docs]class ReadSpectrum:
"""
Class for reading spectral library data from the database.
"""
def __init__(self,
spec_library,
filter_name=None):
"""
Parameters
----------
spec_library : str
Name of the spectral library ('irtf', 'spex') or other type of spectrum ('vega').
filter_name : str, None
Filter ID for the wavelength range. Full spectra are read if set to None.
Returns
-------
NoneType
None
"""
self.spec_library = spec_library
self.filter_name = filter_name
if filter_name is None:
self.wavel_range = None
else:
transmission = read_filter.ReadFilter(filter_name)
self.wavel_range = transmission.wavelength_range()
config_file = os.path.join(os.getcwd(), 'species_config.ini')
config = configparser.ConfigParser()
config.read_file(open(config_file))
self.database = config['species']['database']
[docs] def get_spectrum(self,
sptypes=None,
exclude_nan=True):
"""
Function for selecting spectra from the database.
Parameters
----------
sptypes : list('str', )
Spectral types to select from a library. The spectral types should be indicated with
two characters (e.g. 'M5', 'L2', 'T3'). All spectra are selected if set to None.
exclude_nan : bool
Exclude wavelength points for which the flux is NaN.
Returns
-------
species.core.box.SpectrumBox
Box with the spectra.
"""
h5_file = h5py.File(self.database, 'r')
try:
h5_file[f'spectra/{self.spec_library}']
except KeyError:
h5_file.close()
species_db = database.Database()
species_db.add_spectrum(self.spec_library, sptypes)
h5_file = h5py.File(self.database, 'r')
list_wavelength = []
list_flux = []
list_error = []
list_name = []
list_simbad = []
list_sptype = []
list_distance = []
for item in h5_file[f'spectra/{self.spec_library}']:
data = h5_file[f'spectra/{self.spec_library}/{item}']
wavelength = data[0, :] # (um)
flux = data[1, :] # (W m-2 um-1)
error = data[2, :] # (W m-2 um-1)
if exclude_nan:
indices = np.isnan(flux)
indices = np.logical_not(indices)
indices = np.where(indices)[0]
wavelength = wavelength[indices]
flux = flux[indices]
error = error[indices]
if self.wavel_range is None:
wl_index = np.arange(0, len(wavelength), 1)
else:
wl_index = (flux > 0.) & (wavelength > self.wavel_range[0]) & \
(wavelength < self.wavel_range[1])
count = np.count_nonzero(wl_index)
if count > 0:
index = np.where(wl_index)[0]
if index[0] > 0:
wl_index[index[0] - 1] = True
if index[-1] < len(wl_index)-1:
wl_index[index[-1] + 1] = True
list_wavelength.append(wavelength[wl_index])
list_flux.append(flux[wl_index])
list_error.append(error[wl_index])
attrs = data.attrs
if 'name' in attrs:
list_name.append(data.attrs['name'].decode('utf-8'))
else:
list_name.append('')
if 'simbad' in attrs:
list_simbad.append(data.attrs['simbad'].decode('utf-8'))
else:
list_simbad.append('')
if 'sptype' in attrs:
list_sptype.append(data.attrs['sptype'].decode('utf-8'))
else:
list_sptype.append('None')
if 'distance' in attrs:
list_distance.append((data.attrs['distance'], data.attrs['distance_error']))
else:
list_distance.append((np.nan, np.nan))
else:
list_wavelength.append(np.array([]))
list_flux.append(np.array([]))
list_error.append(np.array([]))
list_name.append('')
list_simbad.append('')
list_sptype.append('None')
list_distance.append((np.nan, np.nan))
specbox = box.SpectrumBox()
specbox.spec_library = self.spec_library
specbox.wavelength = np.asarray(list_wavelength)
specbox.flux = np.asarray(list_flux)
specbox.error = np.asarray(list_error)
specbox.name = np.asarray(list_name)
specbox.simbad = np.asarray(list_simbad)
specbox.sptype = np.asarray(list_sptype)
specbox.distance = np.asarray(list_distance)
if sptypes is not None:
indices = None
for item in sptypes:
if indices is None:
indices = np.where(np.chararray.startswith(specbox.sptype, item))[0]
else:
ind_tmp = np.where(np.chararray.startswith(specbox.sptype, item))[0]
indices = np.append(indices, ind_tmp)
specbox.wavelength = specbox.wavelength[indices]
specbox.flux = specbox.flux[indices]
specbox.error = specbox.error[indices]
specbox.name = specbox.name[indices]
specbox.simbad = specbox.simbad[indices]
specbox.sptype = specbox.sptype[indices]
specbox.distance = specbox.distance[indices]
return specbox
[docs] def get_flux(self,
sptypes=None):
"""
Function for calculating the average flux density for the ``filter_name``.
Parameters
----------
sptypes : list('str', )
Spectral types to select from a library. The spectral types should be indicated with
two characters (e.g. 'M5', 'L2', 'T3'). All spectra are selected if set to None.
Returns
-------
species.core.box.PhotometryBox
Box with the synthetic photometry.
"""
specbox = self.get_spectrum(sptypes=sptypes,
exclude_nan=True)
n_spectra = specbox.wavelength.shape[0]
filter_profile = read_filter.ReadFilter(filter_name=self.filter_name)
mean_wavel = filter_profile.mean_wavelength()
wavelengths = np.full(specbox.wavelength.shape[0], mean_wavel)
filters = np.full(specbox.wavelength.shape[0], self.filter_name)
synphot = photometry.SyntheticPhotometry(filter_name=self.filter_name)
phot_flux = []
for i in range(n_spectra):
flux = synphot.spectrum_to_flux(wavelength=specbox.wavelength[i],
flux=specbox.flux[i],
error=specbox.error[i])
phot_flux.append(flux)
phot_flux = np.asarray(phot_flux)
return box.create_box(boxtype='photometry',
name=specbox.name,
sptype=specbox.sptype,
wavelength=wavelengths,
flux=phot_flux,
app_mag=None,
abs_mag=None,
filter_name=filters)
[docs] def get_magnitude(self,
sptypes=None):
"""
Function for calculating the apparent magnitude for the ``filter_name``.
Parameters
----------
sptypes : list('str', )
Spectral types to select from a library. The spectral types should be indicated with
two characters (e.g. 'M5', 'L2', 'T3'). All spectra are selected if set to None.
Returns
-------
species.core.box.PhotometryBox
Box with the synthetic photometry.
"""
specbox = self.get_spectrum(sptypes=sptypes,
exclude_nan=True)
n_spectra = specbox.wavelength.shape[0]
filter_profile = read_filter.ReadFilter(filter_name=self.filter_name)
mean_wavel = filter_profile.mean_wavelength()
wavelengths = np.full(specbox.wavelength.shape[0], mean_wavel)
filters = np.full(specbox.wavelength.shape[0], self.filter_name)
synphot = photometry.SyntheticPhotometry(filter_name=self.filter_name)
app_mag = []
abs_mag = []
for i in range(n_spectra):
if np.isnan(specbox.distance[i][0]):
app_tmp = (np.nan, np.nan)
abs_tmp = (np.nan, np.nan)
else:
app_tmp, abs_tmp = synphot.spectrum_to_magnitude(wavelength=specbox.wavelength[i],
flux=specbox.flux[i],
error=specbox.error[i],
distance=specbox.distance[i])
app_mag.append(app_tmp)
abs_mag.append(abs_tmp)
return box.create_box(boxtype='photometry',
name=specbox.name,
sptype=specbox.sptype,
wavelength=wavelengths,
flux=None,
app_mag=np.asarray(app_mag),
abs_mag=np.asarray(abs_mag),
filter_name=filters)