Source code for species.read.read_calibration

"""
Module with reading functionalities for calibration spectra.
"""

import configparser
import os

from typing import Dict, Optional, Tuple

import h5py
import numpy as np
import spectres

from scipy import interpolate, optimize

from typeguard import typechecked

from species.core.box import SpectrumBox, create_box
from species.phot.syn_phot import SyntheticPhotometry
from species.read.read_filter import ReadFilter
from species.util.spec_util import create_wavelengths, smooth_spectrum


[docs] class ReadCalibration: """ Class for reading a calibration spectrum from the database. """ @typechecked def __init__(self, tag: str, filter_name: Optional[str] = None) -> None: """ Parameters ---------- tag : str Database tag of the calibration spectrum. filter_name : str, None Filter that is used for the wavelength range. The full spectrum is read if the argument is set to ``None``. Returns ------- NoneType None """ self.tag = tag self.filter_name = filter_name if filter_name is None: self.wavel_range = None else: transmission = ReadFilter(filter_name) self.wavel_range = transmission.wavelength_range() config_file = os.path.join(os.getcwd(), "species_config.ini") config = configparser.ConfigParser() config.read(config_file) self.database = config["species"]["database"]
[docs] @typechecked def resample_spectrum( self, wavel_points: np.ndarray, model_param: Optional[Dict[str, float]] = None, spec_res: Optional[float] = None, apply_mask: bool = False, interp_highres: bool = False, ) -> SpectrumBox: """ Function for resampling the spectrum and optional uncertainties onto a new wavelength grid. Parameters ---------- wavel_points : np.ndarray Wavelengths (um). model_param : dict, None Dictionary with the model parameters, which should only contain the ``'scaling'`` keyword. No scaling is applied if the argument of ``model_param`` is set to ``None``. spec_res : float, None Spectral resolution that is used for smoothing the spectrum before resampling the wavelengths. No smoothing is applied if the argument is set to ``None``. The smoothing can only be applied to spectra with a constant spectral resolution (which is the case for all model spectra that are supported by ``species``) or a constant wavelength spacing. The first smoothing approach is fastest. apply_mask : bool Exclude negative values and NaNs. interp_highres : bool Oversample the spectrum to $R = 10000$, such that the ``spec_res`` parameter can be applied on a spectrum with constant $\\lambda/\\Delta\\lambda$. The uncertainties are crudely propagated with an interpolation as well and should only be considered as estimate. Returns ------- species.core.box.SpectrumBox Box with the resampled spectrum. """ calib_box = self.get_spectrum(apply_mask=apply_mask) if interp_highres: flux_interp = interpolate.interp1d( calib_box.wavelength, calib_box.flux, bounds_error=False, fill_value="extrapolate", ) sigma_interp = interpolate.interp1d( calib_box.wavelength, calib_box.error, bounds_error=False, fill_value="extrapolate", ) wavel_range = (calib_box.wavelength[0], calib_box.wavelength[-1]) wavel_highres = create_wavelengths(wavel_range, 10000.0) calib_box.wavelength = wavel_highres calib_box.flux = flux_interp(wavel_highres) calib_box.error = sigma_interp(wavel_highres) if spec_res is not None: calib_box.flux = smooth_spectrum( wavelength=calib_box.wavelength, flux=calib_box.flux, spec_res=spec_res, ) flux_interp = interpolate.interp1d( calib_box.wavelength, calib_box.flux, bounds_error=False, fill_value="extrapolate", ) sigma_interp = interpolate.interp1d( calib_box.wavelength, calib_box.error, bounds_error=False, fill_value="extrapolate", ) flux_new = flux_interp(wavel_points) error_new = sigma_interp(wavel_points) else: if spec_res is not None: calib_box.flux = smooth_spectrum( wavelength=calib_box.wavelength, flux=calib_box.flux, spec_res=spec_res, ) flux_new, error_new = spectres.spectres( wavel_points, calib_box.wavelength, calib_box.flux, spec_errs=calib_box.error, fill=0.0, verbose=False, ) if model_param is not None: flux_new = model_param["scaling"] * flux_new error_new = model_param["scaling"] * error_new return create_box( boxtype="spectrum", spectrum="calibration", wavelength=wavel_points, flux=flux_new, error=error_new, name=self.tag, )
[docs] @typechecked def get_spectrum( self, model_param: Optional[Dict[str, float]] = None, apply_mask: bool = False, wavel_sampling: Optional[float] = None, extrapolate: bool = False, min_wavelength: Optional[float] = None, ) -> SpectrumBox: """ Function for selecting the calibration spectrum. Parameters ---------- model_param : dict, None Model parameters. Should contain the 'scaling' value. Not used if set to ``None``. apply_mask : bool Exclude negative values and NaN values. wavel_sampling : float, None Wavelength sampling :math:`\\lambda/\\Delta\\lambda`. The original wavelength points are used if set to ``None``. extrapolate : bool Extrapolate to 6 um by fitting a power law function. min_wavelength : float, None Minimum wavelength used for fitting the power law function. All data is used if set to ``None``. Returns ------- species.core.box.SpectrumBox Box with the spectrum. """ with h5py.File(self.database, "r") as h5_file: cal_spec = np.array(h5_file[f"spectra/calibration/{self.tag}"]) wavelength = cal_spec[0,] flux = cal_spec[1,] error = cal_spec[2,] if apply_mask: indices = np.where(flux > 0.0)[0] wavelength = wavelength[indices] flux = flux[indices] error = error[indices] if model_param is not None: flux = model_param["scaling"] * flux error = model_param["scaling"] * error if self.wavel_range is None: wl_index = np.ones(wavelength.size, dtype=bool) else: wl_index = ( (flux > 0.0) & (wavelength > self.wavel_range[0]) & (wavelength < self.wavel_range[1]) ) count = np.count_nonzero(wl_index) if count > 0: index = np.where(wl_index)[0] if index[0] > 0: wl_index[index[0] - 1] = True if index[-1] < len(wl_index) - 1: wl_index[index[-1] + 1] = True wavelength = wavelength[wl_index] flux = flux[wl_index] error = error[wl_index] if extrapolate: def _power_law(wavelength, offset, scaling, power_index): return offset + scaling * wavelength**power_index if min_wavelength: indices = np.where(wavelength > min_wavelength)[0] else: indices = np.arange(0, wavelength.size, 1) popt, pcov = optimize.curve_fit( f=_power_law, xdata=wavelength[indices], ydata=flux[indices], p0=(0.0, np.mean(flux[indices]), -1.0), sigma=error[indices], ) sigma = np.sqrt(np.diag(pcov)) print("Fit result for f(x) = a + b*x^c:") print(f"a = {popt[0]} +/- {sigma[0]}") print(f"b = {popt[1]} +/- {sigma[1]}") print(f"c = {popt[2]} +/- {sigma[2]}") while wavelength[-1] <= 6.0: wl_add = wavelength[-1] + wavelength[-1] / 1000.0 wavelength = np.append(wavelength, wl_add) flux = np.append(flux, _power_law(wl_add, popt[0], popt[1], popt[2])) error = np.append(error, 0.0) if wavel_sampling is not None: wavelength_new = create_wavelengths( (wavelength[0], wavelength[-1]), wavel_sampling ) flux_new, error_new = spectres.spectres( wavelength_new, wavelength, flux, spec_errs=error, fill=0.0, verbose=True, ) wavelength = wavelength_new flux = flux_new error = error_new return create_box( boxtype="spectrum", spectrum="calibration", wavelength=wavelength, flux=flux, error=error, name=self.tag, )
[docs] @typechecked def get_flux( self, model_param: Optional[Dict[str, float]] = None ) -> Tuple[float, float]: """ Function for calculating the average flux for the ``filter_name``. Parameters ---------- model_param : dict, None Model parameters. Should contain the 'scaling' value. Not used if set to ``None``. Returns ------- Returns ------- float Average flux (W m-2 um-1). float Uncertainty (W m-2 um-1). """ specbox = self.get_spectrum(model_param=model_param, apply_mask=True) synphot = SyntheticPhotometry(self.filter_name) return synphot.spectrum_to_flux( specbox.wavelength, specbox.flux, error=specbox.flux )
[docs] @typechecked def get_magnitude( self, model_param: Optional[Dict[str, float]] = None, distance: Optional[Tuple[float, float]] = None, ) -> Tuple[Tuple[float, Optional[float]], Tuple[Optional[float], Optional[float]]]: """ Function for calculating the apparent magnitude for the ``filter_name``. Parameters ---------- model_param : dict, None Model parameters. Should contain the 'scaling' value. Not used if set to ``None``. distance : tuple(float, float), None Distance and uncertainty to the calibration object (pc). Not used if set to ``None``, in which case the returned absolute magnitude is ``(None, None)``. Returns ------- tuple(float, float) Apparent magnitude and uncertainty. tuple(float, float), tuple(None, None) Absolute magnitude and uncertainty. """ specbox = self.get_spectrum(model_param=model_param, apply_mask=True) if np.count_nonzero(specbox.error) == 0: error = None else: error = specbox.error synphot = SyntheticPhotometry(self.filter_name) return synphot.spectrum_to_magnitude( specbox.wavelength, specbox.flux, error=error, distance=distance )