Source code for species.data.model_data.custom_model

"""
Module for adding a custom grid of model spectra to the database.
"""

import os
import warnings

from typing import List, Optional, Tuple

import h5py
import spectres
import numpy as np

from typeguard import typechecked

from species.util.core_util import print_section
from species.util.data_util import sort_data, write_data, add_missing
from species.util.spec_util import create_wavelengths


[docs] @typechecked def add_custom_model_grid( model_name: str, data_path: str, parameters: List[str], database: h5py._hl.files.File, wavel_range: Optional[Tuple[float, float]], teff_range: Optional[Tuple[float, float]], wavel_sampling: Optional[float], ) -> None: """ Function for adding a custom grid of model spectra to the database. The spectra are read from the ``data_path`` and should contain the ``model_name`` and ``parameters`` in the filenames in the following format example: `model-name_teff_1000_logg_4.0_feh_0.0_spec.dat`. The list with ``parameters`` should contain the same parameters as are included in the filename. Each datafile should contain two columns with the wavelengths in :math:`\\mu\\text{m}` and the fluxes in :math:`\\text{W} \\text{m}^{-2} \\mu\\text{m}^{-1}`. Each file should contain the same number and values of wavelengths. The wavelengths should be logarithmically sampled, so with a constant :math:`\\lambda/\\Delta\\lambda`. If not, then the ``wavel_range`` and ``wavel_sampling`` parameters should be used such that the wavelengths are resampled when reading the data into the ``species`` database. Parameters ---------- model_name : str Name of the model grid. Should be identical to the model name that is included in the filenames. data_path : str Path where the files with the model spectra are located. parameters : list(str) List with the model parameters. The following parameters are supported: ``teff`` (for :math:`T_\\mathrm{eff}`), ``logg`` (for :math:`\\log\\,g`), ``feh`` (for [Fe/H]), ``c_o_ratio`` (for C/O), ``fsed`` (for :math:`f_\\mathrm{sed}`), ``log_kzz`` (for :math:`\\log\\,K_\\mathrm{zz}`), and ``ad_index`` (for :math:`\\gamma_\\mathrm{ad}`). Please contact the code maintainer if support for other parameters should be added. database : h5py._hl.files.File Database. wavel_range : tuple(float, float), None Wavelength range (:math:`\\mu\\text{m}`). The original wavelength points are used if the argument is set to ``None``. teff_range : tuple(float, float), None Effective temperature range (K) for adding a subset of the model grid. The full parameter grid will be added if the argument is set to ``None``. wavel_sampling : float, None Wavelength spacing :math:`\\lambda/\\Delta\\lambda` to which the spectra will be resampled. Typically this parameter is not needed so the argument can be set to ``None``. The only benefit of using this parameter is limiting the storage in the HDF5 database. The parameter should be used in combination with setting the ``wavel_range``. Returns ------- NoneType None """ print_section("Add custom grid of model spectra") if wavel_sampling is not None and wavel_range is None: warnings.warn( "The 'wavel_sampling' parameter can only be " "used in combination with the 'wavel_range' " "parameter. The model spectra are therefore " "not resampled." ) wavel_sampling = None teff = [] if "logg" in parameters: logg = [] else: logg = None if "feh" in parameters: feh = [] else: feh = None if "c_o_ratio" in parameters: c_o_ratio = [] else: c_o_ratio = None if "fsed" in parameters: fsed = [] else: fsed = None if "log_kzz" in parameters: log_kzz = [] else: log_kzz = None if "ad_index" in parameters: ad_index = [] else: ad_index = None flux = [] if wavel_range is not None: print(f"Wavelength range (um) = {wavel_range[0]} - {wavel_range[1]}") if wavel_range is not None and wavel_sampling is not None: wavelength = create_wavelengths(wavel_range, wavel_sampling) resample_spectra = True else: wavelength = None resample_spectra = False print(f"Sampling (lambda/d_lambda) = {wavel_sampling}") if teff_range is not None: print(f"Teff range (K) = {teff_range[0]} - {teff_range[1]}") print_message = "" count = 0 for _, _, file_list in os.walk(data_path): for filename in sorted(file_list): if filename[: len(model_name)] == model_name: file_split = filename.split("_") param_index = file_split.index("teff") + 1 teff_val = float(file_split[param_index]) if teff_range is not None: if teff_val < teff_range[0] or teff_val > teff_range[1]: continue teff.append(teff_val) if logg is not None: param_index = file_split.index("logg") + 1 logg.append(float(file_split[param_index])) if feh is not None: param_index = file_split.index("feh") + 1 feh.append(float(file_split[param_index])) if c_o_ratio is not None: param_index = file_split.index("co") + 1 c_o_ratio.append(float(file_split[param_index])) if fsed is not None: param_index = file_split.index("fsed") + 1 fsed.append(float(file_split[param_index])) if log_kzz is not None: param_index = file_split.index("logkzz") + 1 log_kzz.append(float(file_split[param_index])) if ad_index is not None: param_index = file_split.index("adindex") + 1 ad_index.append(float(file_split[param_index])) empty_message = len(print_message) * " " print(f"\r{empty_message}", end="") print_message = f"Adding {model_name} model spectra... {filename}" print(f"\r{print_message}", end="") data_wavel, data_flux = np.loadtxt( os.path.join(data_path, filename), unpack=True ) if wavel_range is None: if wavelength is None: wavelength = np.copy(data_wavel) # (um) if np.all(np.diff(wavelength) < 0): raise ValueError( "The wavelengths are not all sorted by increasing value." ) flux.append(data_flux) # (W m-2 um-1) else: if not resample_spectra: if wavelength is None: wavelength = np.copy(data_wavel) # (um) if np.all(np.diff(wavelength) < 0): raise ValueError( "The wavelengths are not all sorted by increasing value." ) wavel_select = (wavel_range[0] < wavelength) & ( wavelength < wavel_range[1] ) wavelength = wavelength[wavel_select] flux.append(data_flux[wavel_select]) # (W m-2 um-1) else: flux_resample = spectres.spectres( wavelength, data_wavel, data_flux, spec_errs=None, fill=np.nan, verbose=False, ) if np.isnan(np.sum(flux_resample)): raise ValueError( f"Resampling is only possible if the new wavelength " f"range ({wavelength[0]} - {wavelength[-1]} um) falls " f"sufficiently far within the wavelength range " f"({data_wavel[0]} - {data_wavel[-1]} um) of the input " f"spectra." ) flux.append(flux_resample) # (W m-2 um-1) count += 1 empty_message = len(print_message) * " " print(f"\r{empty_message}", end="") print_message = f"Adding {model_name} model spectra... [DONE]" print(f"\r{print_message}") if logg is not None: logg = np.asarray(logg) if feh is not None: feh = np.asarray(feh) if c_o_ratio is not None: c_o_ratio = np.asarray(c_o_ratio) if fsed is not None: fsed = np.asarray(fsed) if log_kzz is not None: log_kzz = np.asarray(log_kzz) if ad_index is not None: ad_index = np.asarray(ad_index) if wavelength is None: raise ValueError( "No files have been found. Please check " "the arguments of 'model', 'data_path', " "and 'parameters' of the add_custom_model " "method to make sure that the correct " "folder and files names are selected." ) data_sorted = sort_data( np.asarray(teff), logg, feh, c_o_ratio, fsed, log_kzz, ad_index, wavelength, np.asarray(flux), ) if wavel_sampling is None: wavel_sampling = np.mean( 0.5 * (wavelength[1:] + wavelength[:-1]) / np.diff(wavelength) ) write_data(model_name, parameters, wavel_sampling, database, data_sorted) add_missing(model_name, parameters, database)