Source code for

Module with reading functionalities for atmospheric model spectra.

import os
import math
import warnings
import configparser

import h5py
import spectres
import numpy as np

from scipy.interpolate import RegularGridInterpolator

from species.analysis import photometry
from species.core import box, constants
from import database
from import read_filter, read_calibration
from species.util import read_util

[docs]class ReadModel: """ Class for reading a model spectrum from the database. """ def __init__(self, model, wavel_range=None, filter_name=None): """ Parameters ---------- model : str Name of the atmospheric model. wavel_range : tuple(float, float), None Wavelength range (um). Full spectrum is selected if set to None. Not used if ``filter_name`` is not None. filter_name : str, None Filter ID that is used for the wavelength range. The ``wavel_range`` is used if set to None. Returns ------- NoneType None """ self.model = model self.spectrum_interp = None self.wl_points = None self.wl_index = None self.filter_name = filter_name self.wavel_range = wavel_range if self.filter_name is not None: transmission = read_filter.ReadFilter(self.filter_name) self.wavel_range = transmission.wavelength_range() config_file = os.path.join(os.getcwd(), 'species_config.ini') config = configparser.ConfigParser() config.read_file(open(config_file)) self.database = config['species']['database']
[docs] def open_database(self): """ Internal function for opening the `species` database. Returns ------- h5py._hl.files.File The HDF5 database. """ config_file = os.path.join(os.getcwd(), 'species_config.ini') config = configparser.ConfigParser() config.read_file(open(config_file)) database_path = config['species']['database'] h5_file = h5py.File(database_path, 'r') try: h5_file[f'models/{self.model}'] except KeyError: raise ValueError(f'The \'{self.model}\' model spectra are not present in the ' f'database.') return h5_file
[docs] def wavelength_points(self, hdf5_file): """ Internal function for extracting the wavelength points and indices that are used. Parameters ---------- hdf5_file : h5py._hl.files.File The HDF5 database. Returns ------- numpy.ndarray Wavelength points (um). numpy.ndarray Array with the size of the original wavelength grid. The booleans indicate if a wavelength point was used. """ wl_points = np.asarray(hdf5_file[f'models/{self.model}/wavelength']) if self.wavel_range is None: wl_index = np.ones(wl_points.shape[0], dtype=bool) else: wl_index = (wl_points > self.wavel_range[0]) & (wl_points < self.wavel_range[1]) index = np.where(wl_index)[0] if index[0]-1 >= 0: wl_index[index[0] - 1] = True if index[-1]+1 < wl_index.size: wl_index[index[-1] + 1] = True return wl_points[wl_index], wl_index
[docs] def interpolate_model(self): """ Internal function for linearly interpolating the full grid of model spectra. Returns ------- NoneType None """ h5_file = self.open_database() points = [] for item in self.get_points().values(): points.append(item) if self.wl_points is None: self.wl_points, self.wl_index = self.wavelength_points(h5_file) flux = np.asarray(h5_file[f'models/{self.model}/flux']) flux = flux[..., self.wl_index] self.spectrum_interp = RegularGridInterpolator(points=points, values=flux, method='linear', bounds_error=False, fill_value=np.nan)
[docs] def interpolate_grid(self, bounds, wavel_resample=None, smooth=False, spec_res=None): """ Internal function for linearly interpolating the grid of model spectra for a given filter or wavelength sampling, and grid boundaries. bounds : dict Dictionary with the parameter boundaries. wavel_resample : numpy.array, None Wavelength points for the resampling of the spectrum. The ``filter_name`` is used if set to None. smooth : bool Smooth the spectrum with a Gaussian line spread function. Only recommended in case the input wavelength sampling has a uniform spectral resolution. spec_res : float Spectral resolution that is used for the Gaussian filter when ``smooth=True``. Returns ------- NoneType None """ self.interpolate_model() if smooth and wavel_resample is None: raise ValueError('Smoothing is only required if the spectra are resampled to a new ' 'wavelength grid, therefore requiring the \'wavel_resample\' ' 'argument.') points = [] for key, value in self.get_points().items(): value_new = [] for i, item in enumerate(value): if bounds[key][0] <= item <= bounds[key][-1]: value_new.append(item) points.append(value_new) param = self.get_parameters() n_param = len(param) dim_size = [] for item in points: dim_size.append(len(item)) if self.filter_name is not None: dim_size.append(1) else: dim_size.append(wavel_resample.size) flux_new = np.zeros(dim_size) if n_param == 2: model_param = {} for i, item_0 in enumerate(points[0]): for j, item_1 in enumerate(points[1]): model_param[param[0]] = item_0 model_param[param[1]] = item_1 if self.filter_name is not None: flux_new[i, j] = self.get_flux(model_param)[0] else: flux_new[i, j, :] = self.get_model(model_param, spec_res=spec_res, wavel_resample=wavel_resample, smooth=smooth).flux elif n_param == 3: model_param = {} for i, item_0 in enumerate(points[0]): for j, item_1 in enumerate(points[1]): for k, item_2 in enumerate(points[2]): model_param[param[0]] = item_0 model_param[param[1]] = item_1 model_param[param[2]] = item_2 if self.filter_name is not None: flux_new[i, j, k] = self.get_flux(model_param)[0] else: flux_new[i, j, k, :] = self.get_model(model_param, spec_res=spec_res, wavel_resample=wavel_resample, smooth=smooth).flux elif n_param == 4: model_param = {} for i, item_0 in enumerate(points[0]): for j, item_1 in enumerate(points[1]): for k, item_2 in enumerate(points[2]): for l, item_3 in enumerate(points[3]): model_param[param[0]] = item_0 model_param[param[1]] = item_1 model_param[param[2]] = item_2 model_param[param[3]] = item_3 if self.filter_name is not None: flux_new[i, j, k, l] = self.get_flux(model_param)[0] else: flux_new[i, j, k, l, :] = self.get_model( model_param, spec_res=spec_res, wavel_resample=wavel_resample, smooth=smooth).flux elif n_param == 5: model_param = {} for i, item_0 in enumerate(points[0]): for j, item_1 in enumerate(points[1]): for k, item_2 in enumerate(points[2]): for l, item_3 in enumerate(points[3]): for m, item_4 in enumerate(points[4]): model_param[param[0]] = item_0 model_param[param[1]] = item_1 model_param[param[2]] = item_2 model_param[param[3]] = item_3 model_param[param[4]] = item_4 if self.filter_name is not None: flux_new[i, j, k, l, m] = self.get_flux(model_param)[0] else: flux_new[i, j, k, l, m, :] = self.get_model( model_param, spec_res=spec_res, wavel_resample=wavel_resample, smooth=smooth).flux if self.filter_name is not None: transmission = read_filter.ReadFilter(self.filter_name) self.wl_points = [transmission.mean_wavelength()] else: self.wl_points = wavel_resample self.spectrum_interp = RegularGridInterpolator(points=points, values=flux_new, method='linear', bounds_error=False, fill_value=np.nan)
[docs] def get_model(self, model_param, spec_res=None, wavel_resample=None, magnitude=False, smooth=False): """ Function for extracting a model spectrum by linearly interpolating the model grid. Parameters ---------- model_param : dict Dictionary with the model parameters and values. The values should be within the boundaries of the grid. The grid boundaries of the spectra in the database can be obtained with :func:``. spec_res : float, None Spectral resolution that is used for smoothing the spectrum with a Gaussian kernel when ``smooth=True`` and/or resampling the spectrum when ``wavel_range`` of ``FitModel`` is not None. The original wavelength points are used if both ``spec_res`` and ``wavel_resample`` are set to None. wavel_resample : numpy.ndarray, None Wavelength points (um) to which the spectrum is resampled. In that case, ``spec_res`` can still be used to smooth the spectrum. magnitude : bool Normalize the spectrum with a flux calibrated spectrum of Vega and return the magnitude instead of flux density. smooth : bool If True, the spectrum is smoothed with a Gaussian kernel to the spectral resolution of ``spec_res``. This requires either a uniform spectral resolution of the input spectra (fast) or a uniform wavelength spacing of the input spectra (slow). Returns ------- Box with the model spectrum. """ if smooth and spec_res is None: warnings.warn('The \'spec_res\' argument is required for smoothing the spectrum when ' '\'smooth\' is set to True.') grid_bounds = self.get_bounds() extra_param = ['radius', 'distance', 'mass', 'luminosity'] for key in self.get_parameters(): if key not in model_param.keys(): raise ValueError(f'The \'{key}\' parameter is required by \'{self.model}\'. ' f'The mandatory parameters are {self.get_parameters()}.') if model_param[key] < grid_bounds[key][0]: raise ValueError(f'The input value of \'{key}\' is smaller than the lower ' f'boundary of the model grid ({model_param[key]} < ' f'{grid_bounds[key][0]}).') if model_param[key] > grid_bounds[key][1]: raise ValueError(f'The input value of \'{key}\' is larger than the upper ' f'boundary of the model grid ({model_param[key]} > ' f'{grid_bounds[key][1]}).') for key in model_param.keys(): if key not in self.get_parameters() and key not in extra_param: warnings.warn(f'The \'{key}\' parameter is not required by \'{self.model}\' so ' f'the parameter will be ignored. The mandatory parameters are ' f'{self.get_parameters()}.') if 'mass' in model_param and 'radius' not in model_param: mass = 1e3 * model_param['mass'] * constants.M_JUP # (g) radius = math.sqrt(1e3 * constants.GRAVITY * mass / (10.**model_param['logg'])) # (cm) model_param['radius'] = 1e-2 * radius / constants.R_JUP # (Rjup) if self.spectrum_interp is None: self.interpolate_model() if self.wavel_range is None: wl_points = self.get_wavelengths() self.wavel_range = (wl_points[0], wl_points[-1]) parameters = [] if 'teff' in model_param: parameters.append(model_param['teff']) if 'logg' in model_param: parameters.append(model_param['logg']) if 'feh' in model_param: parameters.append(model_param['feh']) if 'co' in model_param: parameters.append(model_param['co']) if 'fsed' in model_param: parameters.append(model_param['fsed']) flux = self.spectrum_interp(parameters)[0] if 'radius' in model_param: model_param['mass'] = read_util.get_mass(model_param) if 'distance' in model_param: scaling = (model_param['radius']*constants.R_JUP)**2 / \ (model_param['distance']*constants.PARSEC)**2 flux *= scaling if smooth: flux = read_util.smooth_spectrum(wavelength=self.wl_points, flux=flux, spec_res=spec_res) if wavel_resample is not None: flux = spectres.spectres(wavel_resample, self.wl_points, flux) elif spec_res is not None: index = np.where(np.isnan(flux))[0] if index.size > 0: raise ValueError('Flux values should not contains NaNs. Please make sure that ' 'the parameter values and the wavelength range are within ' 'the grid boundaries as stored in the database.') wavel_resample = [self.wl_points[0]] while wavel_resample[-1] <= self.wl_points[-1]: wavel_resample.append(wavel_resample[-1] + wavel_resample[-1]/(2.*spec_res)) wavel_resample = np.asarray(wavel_resample[:-1]) indices = np.where((wavel_resample > self.wl_points[0]) & (wavel_resample < self.wl_points[-2]))[0] for i in range(1, 10): try: index_error = False flux = spectres.spectres(wavel_resample[indices][i:-i], self.wl_points, flux) except ValueError: index_error = True if not index_error: wavel_resample = wavel_resample[indices][i:-i] break if magnitude: quantity = 'magnitude' with h5py.File(self.database, 'r') as h5_file: try: h5_file['spectra/calibration/vega'] except KeyError: h5_file.close() species_db = database.Database() species_db.add_spectrum('vega') h5_file = h5py.File(self.database, 'r') readcalib = read_calibration.ReadCalibration('vega', filter_name=None) calibbox = readcalib.get_spectrum() if wavel_resample is not None: new_spec_wavs = wavel_resample else: new_spec_wavs = self.wl_points flux_vega, _ = spectres.spectres(new_spec_wavs, calibbox.wavelength, calibbox.flux, spec_errs=calibbox.error) flux = -2.5*np.log10(flux/flux_vega) else: quantity = 'flux' is_finite = np.where(np.isfinite(flux))[0] if wavel_resample is None: wavelength = self.wl_points[is_finite] else: wavelength = wavel_resample[is_finite] if wavelength.shape[0] == 0: raise ValueError(f'The model spectrum is empty. Perhaps the grid could not be ' f'interpolated at {model_param} because zeros are stored in the ' f'database.') spec_box = box.create_box(boxtype='model', model=self.model, wavelength=wavelength, flux=flux[is_finite], parameters=model_param, quantity=quantity) if 'radius' in spec_box.parameters: spec_box.parameters['luminosity'] = 4. * np.pi * (spec_box.parameters['radius'] * \ constants.R_JUP)**2 * constants.SIGMA_SB * spec_box.parameters['teff']**4. / \ constants.L_SUN # (Lsun) return spec_box
[docs] def get_data(self, model_param): """ Function for selecting a model spectrum (without interpolation) for a set of parameter values that coincide with the grid points. The stored grid points can be inspected with :func:``. Parameters ---------- model_param : dict Model parameters and values. Only discrete values from the original grid are possible. Else, the nearest grid values are selected. Returns ------- Box with the model spectrum. """ for key in self.get_parameters(): if key not in model_param.keys(): raise ValueError(f'The \'{key}\' parameter is required by \'{self.model}\'. ' f'The mandatory parameters are {self.get_parameters()}.') extra_param = ['radius', 'distance', 'mass', 'luminosity'] for key in model_param.keys(): if key not in self.get_parameters() and key not in extra_param: warnings.warn(f'The \'{key}\' parameter is not required by \'{self.model}\' so ' f'the parameter will be ignored. The mandatory parameters are ' f'{self.get_parameters()}.') h5_file = self.open_database() param_key = [] param_val = [] if 'teff' in model_param: param_key.append('teff') param_val.append(model_param['teff']) if 'logg' in model_param: param_key.append('logg') param_val.append(model_param['logg']) if 'feh' in model_param: param_key.append('feh') param_val.append(model_param['feh']) if 'co' in model_param: param_key.append('co') param_val.append(model_param['co']) if 'fsed' in model_param: param_key.append('fsed') param_val.append(model_param['fsed']) flux = np.asarray(h5_file[f'models/{self.model}/flux']) indices = [] for item in param_key: data = np.asarray(h5_file[f'models/{self.model}/{item}']) data_index = np.argwhere(data == model_param[item])[0] if len(data_index) == 0: raise ValueError('The parameter {item}={model_val[i]} is not found.') indices.append(data_index[0]) wl_points, wl_index = self.wavelength_points(h5_file) indices.append(wl_index) flux = flux[tuple(indices)] if 'radius' in model_param and 'distance' in model_param: scaling = (model_param['radius']*constants.R_JUP)**2 / \ (model_param['distance']*constants.PARSEC)**2 flux *= scaling h5_file.close() spec_box = box.create_box(boxtype='model', model=self.model, wavelength=wl_points, flux=flux, parameters=model_param, quantity='flux') if 'radius' in spec_box.parameters: spec_box.parameters['luminosity'] = 4. * np.pi * (spec_box.parameters['radius'] * \ constants.R_JUP)**2 * constants.SIGMA_SB * spec_box.parameters['teff']**4. / \ constants.L_SUN # (Lsun) return spec_box
[docs] def get_flux(self, model_param, synphot=None): """ Function for calculating the average flux density for the ``filter_name``. Parameters ---------- model_param : dict Model parameters and values. synphot : species.analysis.photometry.SyntheticPhotometry, None Synthetic photometry object. The object is created if set to None. Returns ------- float Average flux (W m-2 um-1). float, None Uncertainty (W m-2 um-1), which is set to None. """ if self.spectrum_interp is None: self.interpolate_model() spectrum = self.get_model(model_param) if synphot is None: synphot = photometry.SyntheticPhotometry(self.filter_name) return synphot.spectrum_to_flux(spectrum.wavelength, spectrum.flux)
[docs] def get_magnitude(self, model_param): """ Function for calculating the apparent and absolute magnitudes for the ``filter_name``. Parameters ---------- model_param : dict Model parameters and values. Returns ------- float Apparent magnitude (mag). float Absolute magnitude (mag). """ if self.spectrum_interp is None: self.interpolate_model() try: spectrum = self.get_model(model_param) except ValueError: warnings.warn(f'The set of model parameters {model_param} is outside the grid range ' f'{self.get_bounds()} so returning a NaN.') return np.nan, np.nan if spectrum.wavelength.size == 0: app_mag = np.nan abs_mag = np.nan else: synphot = photometry.SyntheticPhotometry(self.filter_name) if 'distance' in model_param: app_mag, abs_mag = synphot.spectrum_to_magnitude( spectrum.wavelength, spectrum.flux, distance=(model_param['distance'], None)) else: app_mag, abs_mag = synphot.spectrum_to_magnitude( spectrum.wavelength, spectrum.flux, distance=None) return app_mag[0], abs_mag[0]
[docs] def get_bounds(self): """ Function for extracting the grid boundaries. Returns ------- dict Boundaries of parameter grid. """ h5_file = self.open_database() parameters = self.get_parameters() bounds = {} for item in parameters: data = h5_file[f'models/{self.model}/{item}'] bounds[item] = (data[0], data[-1]) h5_file.close() return bounds
[docs] def get_wavelengths(self): """ Function for extracting the wavelength points. Returns ------- numpy.ndarray Wavelength points (um). """ with self.open_database() as h5_file: wavelength = np.asarray(h5_file[f'models/{self.model}/wavelength']) return wavelength
[docs] def get_points(self): """ Function for extracting the grid points. Returns ------- dict Parameter points of the model grid. """ points = {} h5_file = self.open_database() parameters = self.get_parameters() points = {} for item in parameters: data = h5_file[f'models/{self.model}/{item}'] points[item] = np.asarray(data) h5_file.close() return points
[docs] def get_parameters(self): """ Function for extracting the parameter names. Returns ------- list(str, ) Model parameters. """ h5_file = self.open_database() dset = h5_file[f'models/{self.model}'] if 'n_param' in dset.attrs: n_param = dset.attrs['n_param'] elif 'nparam' in dset.attrs: n_param = dset.attrs['nparam'] param = [] for i in range(n_param): param.append(dset.attrs[f'parameter{i}']) h5_file.close() return param