Source code for

Module with functionalities for the analysis of emission lines.

import configparser
import os
import warnings

from typing import Dict, List, Optional, Tuple, Union

import h5py
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np

    import ultranest
        "UltraNest could not be imported. Perhaps "
        "because cython was not correctly compiled?"

from astropy import units as u
from astropy.modeling.fitting import LinearLSQFitter
from astropy.modeling.polynomial import Polynomial1D
from astropy.nddata import StdDevUncertainty
from scipy.interpolate import interp1d, InterpolatedUnivariateSpline
from specutils import Spectrum1D
from specutils.fitting import fit_generic_continuum
from typeguard import typechecked

from species.core import constants
from import ReadObject
from species.util.model_util import gaussian_spectrum
from species.util.spec_util import create_wavelengths

[docs] class EmissionLine: """ Class for the analysis of emission lines. """ @typechecked def __init__( self, object_name: str, spec_name: str, hydrogen_line: Optional[str] = None, lambda_rest: Optional[float] = None, wavel_range: Optional[Tuple[float, float]] = None, ) -> None: """ Parameters ---------- object_name : str Object name as stored in the database with :func:`` or :func:``. spec_name : str Name of the spectrum that is stored at the object data of ``object_name``. hydrogen_line : str, None Name of the hydrogen line that will be analyzed. The names available lines can be checked with the :func:` list_hydrogen_lines` method. If the argument is set to ``None`` then provide the rest wavelength as argument of ``lambda_rest``. lambda_rest : float, None Rest wavelength (um) of the emission line. The parameter if used for calculating the radial velocity and its uncertainty. The argument can be set to ``None`` and will be ignored if the argument of ``hydrogen_line`` is used. wavel_range : tuple(float, float), None Wavelength range (um) that is cropped from the spectrum. The full spectrum is used if the argument is set to ``None``. Returns ------- NoneType None """ self.object_name = object_name self.spec_name = spec_name self.hydrogen_line = hydrogen_line self.object = ReadObject(object_name) self.parallax = self.object.get_parallax()[0] self.spectrum = self.object.get_spectrum()[spec_name][0] if wavel_range is None: self.wavel_range = (self.spectrum[0, 0], self.spectrum[-1, 0]) else: self.wavel_range = wavel_range indices = np.where( (self.spectrum[:, 0] >= wavel_range[0]) & (self.spectrum[:, 0] <= wavel_range[1]) )[0] self.spectrum = self.spectrum[indices,] config_file = os.path.join(os.getcwd(), "species_config.ini") config = configparser.ConfigParser() self.database = config["species"]["database"] h5_database = h5py.File(self.database, "r") if "accretion" not in h5_database: h5_database.close() from import Database species_db = Database() species_db.add_accretion() if self.hydrogen_line is None and lambda_rest is not None: self.lambda_rest = lambda_rest else: if self.hydrogen_line is None: self.list_hydrogen_lines() warnings.warn( "Provide an argument for either " "the 'hydrogen_line' or 'lambda_rest' " "parameter. Only one of the two " "arguments should be set to None." ) self.hydrogen_line = input( "Please provide the name " "of the hydrogen line: " ) h5_database = h5py.File(self.database, "r") line_names = np.array(h5_database["accretion/hydrogen_lines"], dtype=str) line_wavel = np.array(h5_database["accretion/wavelengths"]) if self.hydrogen_line not in line_names: raise ValueError( "The hydrogen line with the name " f"'{self.hydrogen_line}' is not found. Please " "use the 'list_hydrogen_lines' method to check " "which hydrogen lines are available." ) line_idx = np.argwhere(line_names == self.hydrogen_line)[0][0] # Vacuum rest wavelength (um) self.lambda_rest = line_wavel[line_idx] print(f"Hydrogen line = {self.hydrogen_line}") print(f"Rest wavelength (um) = {self.lambda_rest:.4f}") self.spec_vrad = ( 1e-3 * constants.LIGHT * (self.spectrum[:, 0] - self.lambda_rest) / self.lambda_rest ) self.continuum_flux = np.full(self.spectrum.shape[0], 0.0) self.continuum_check = False
[docs] @typechecked def list_hydrogen_lines(self) -> List[str]: """ Function to list the hydrogen lines for which an accretion luminosity relation :math:`L_\\mathrm{acc}(L_\\mathrm{line}`) was fit explicitly provided in `Aoyama et al. (2021) < abstract>`_ [Ao21] or `Marleau & Aoyama (2022) < abstract>`_ [MA22]. These names can be set as argument ``hydrogen_line``. In that case, the measured line luminosity from :func:` fit_gaussian` will be automatically converted to an accretion luminosity with the relation from [Ao21] and [MA22]. Returns ------- list(str) List with the names of the hydrogen lines for which there are coefficients available for the accretion relation. """ with h5py.File(self.database, "r") as h5_file: line_names = list(h5_file["accretion/hydrogen_lines/"]) # Convert from bytes to strings for i, item in enumerate(line_names): if isinstance(item, bytes): line_names[i] = item.decode("utf-8") print(f"Available hydrogen lines:\n{line_names}") return line_names
[docs] @typechecked def subtract_continuum( self, poly_degree: int = 3, plot_filename: Optional[str] = "continuum.pdf", spec_filename: Optional[str] = None, ) -> None: """ Method for fitting the continuum with a polynomial function of the following form: :math:`P = \\sum_{i=0}^{i=n}C_{i} * x^{i}`. The spectrum is first smoothed with a median filter and then fitted with a linear least squares algorithm. Parameters ---------- poly_degree : int Degree of the polynomial series. plot_filename : str, None Filename for the plots with the continuum fit and the continuum-subtracted spectrum. The plot is shown in an interface window if the argument is set to ``None``. spec_filename : str, None Output text file for writing the continuum-subtracted spectrum. The file will not be created if the argument is set to ``None``. Returns ------- NoneType None """ # Fit continuum print("Fitting continuum...", end="", flush=True) spec_extract = Spectrum1D( flux=self.spectrum[:, 1] * u.W, spectral_axis=self.spectrum[:, 0] *, uncertainty=StdDevUncertainty(self.spectrum[:, 2] * u.W), ) g1_fit = fit_generic_continuum( spec_extract, median_window=3, model=Polynomial1D(poly_degree), fitter=LinearLSQFitter(), ) continuum_fit = g1_fit(spec_extract.spectral_axis) print(" [DONE]") # Subtract continuum spec_cont_sub = spec_extract - continuum_fit self.continuum_flux = continuum_fit / u.W # Create plot if plot_filename is None: print("Plotting continuum fit...", end="", flush=True) else: print(f"Plotting continuum fit: {plot_filename}...", end="", flush=True) plt.rcParams[""] = "serif" plt.rcParams["mathtext.fontset"] = "dejavuserif" plt.rcParams["axes.axisbelow"] = False plt.figure(figsize=(6, 6)) gs = mpl.gridspec.GridSpec(2, 1) gs.update(wspace=0, hspace=0.1, left=0, right=1, bottom=0, top=1) ax1 = plt.subplot(gs[0, 0]) ax2 = plt.subplot(gs[1, 0]) ax3 = ax1.twiny() ax4 = ax2.twiny() ax1.tick_params( axis="both", which="major", colors="black", labelcolor="black", direction="in", width=1, length=5, labelsize=12, top=False, bottom=True, left=True, right=True, labelbottom=False, ) ax1.tick_params( axis="both", which="minor", colors="black", labelcolor="black", direction="in", width=1, length=3, labelsize=12, top=False, bottom=True, left=True, right=True, labelbottom=False, ) ax2.tick_params( axis="both", which="major", colors="black", labelcolor="black", direction="in", width=1, length=5, labelsize=12, top=False, bottom=True, left=True, right=True, ) ax2.tick_params( axis="both", which="minor", colors="black", labelcolor="black", direction="in", width=1, length=3, labelsize=12, top=False, bottom=True, left=True, right=True, ) ax3.tick_params( axis="both", which="major", colors="black", labelcolor="black", direction="in", width=1, length=5, labelsize=12, top=True, bottom=False, left=True, right=True, ) ax3.tick_params( axis="both", which="minor", colors="black", labelcolor="black", direction="in", width=1, length=3, labelsize=12, top=True, bottom=False, left=True, right=True, ) ax4.tick_params( axis="both", which="major", colors="black", labelcolor="black", direction="in", width=1, length=5, labelsize=12, top=True, bottom=False, left=True, right=True, labeltop=False, ) ax4.tick_params( axis="both", which="minor", colors="black", labelcolor="black", direction="in", width=1, length=3, labelsize=12, top=True, bottom=False, left=True, right=True, labeltop=False, ) ax1.set_ylabel("Flux (W m$^{-2}$ µm$^{-1}$)", fontsize=16) ax2.set_xlabel("Wavelength (µm)", fontsize=16) ax2.set_ylabel("Flux (W m$^{-2}$ µm$^{-1}$)", fontsize=16) ax3.set_xlabel("Velocity (km s$^{-1}$)", fontsize=16) ax1.get_yaxis().set_label_coords(-0.1, 0.5) ax2.get_xaxis().set_label_coords(0.5, -0.1) ax2.get_yaxis().set_label_coords(-0.1, 0.5) ax3.get_xaxis().set_label_coords(0.5, 1.12) ax1.plot( spec_extract.spectral_axis, spec_extract.flux, color="black", label=self.spec_name, ) ax1.plot( spec_extract.spectral_axis, continuum_fit, color="tab:blue", label="Continuum fit", ) ax2.plot( spec_cont_sub.spectral_axis, spec_cont_sub.flux, color="black", label="Continuum subtracted", ) ax3.plot(self.spec_vrad, spec_extract.flux, ls="-", lw=0.0) ax4.plot(self.spec_vrad, spec_cont_sub.flux, ls="-", lw=0.0) ax1.legend(loc="upper right", frameon=False, fontsize=12.0) ax2.legend(loc="upper right", frameon=False, fontsize=12.0) print(" [DONE]") if plot_filename is None: else: plt.savefig(plot_filename, bbox_inches="tight") plt.clf() plt.close() # Overwrite original spectrum with continuum-subtracted spectrum self.spectrum[:, 1] = spec_cont_sub.flux self.continuum_check = True if spec_filename is not None: print( f"Writing continuum-subtracted spectrum: {spec_filename}...", end="", flush=True, ) header = "Wavelength (um) - Flux (W m-2 um-1) - Error (W m-2 um-1)" np.savetxt(spec_filename, self.spectrum, header=header) print(" [DONE]")
[docs] @typechecked def integrate_flux( self, wavel_int: Tuple[float, float], interp_kind: str = "linear", plot_filename: Optional[str] = "int_line.pdf", ) -> Tuple[np.float64, np.float64]: """ Method for calculating the integrated line flux and error. The spectrum is first interpolated to :math:`R = 100000` and then integrated across the specified wavelength range with the composite trapezoidal rule of ``np.trapz``. The error is estimated with a Monte Carlo approach from 1000 samples. The accretion luminosity is also calculated with the relation from `Aoyama et al. (2021) < abs/2021ApJ...917L..30A/abstract>`_ and `Marleau & Aoyama (2022) < 2022RNAAS...6..262M/abstract>`_ if the argument of ``hydrogen_line`` was set when creating an instance of the class. Parameters ---------- wavel_int : tuple(float, float) Wavelength range (um) across which the flux will be integrated. interp_kind : str Kind of interpolation kind for ``scipy.interpolate.interp1d`` (default: 'linear'). plot_filename : str, None Filename for the plot with the interpolated line profile. The plot is shown in an interface window if the argument is set to ``None``. Returns ------- float Integrated line flux (W m-2). float Flux error (W m-2). """ if plot_filename is None: print("Plotting integrated line...", end="", flush=True) else: print(f"Plotting integrated line: {plot_filename}...", end="", flush=True) n_samples = 1000 wavel_high_res = create_wavelengths(wavel_int, 1e5) # Creating plot plt.rcParams[""] = "serif" plt.rcParams["mathtext.fontset"] = "dejavuserif" plt.rcParams["axes.axisbelow"] = False plt.figure(figsize=(6, 3)) gs = mpl.gridspec.GridSpec(1, 1) gs.update(wspace=0, hspace=0, left=0, right=1, bottom=0, top=1) ax1 = plt.subplot(gs[0, 0]) ax2 = ax1.twiny() ax1.tick_params( axis="both", which="major", colors="black", labelcolor="black", direction="in", width=1, length=5, labelsize=12, top=False, bottom=True, left=True, right=True, ) ax1.tick_params( axis="both", which="minor", colors="black", labelcolor="black", direction="in", width=1, length=3, labelsize=12, top=False, bottom=True, left=True, right=True, ) ax2.tick_params( axis="both", which="major", colors="black", labelcolor="black", direction="in", width=1, length=5, labelsize=12, top=True, bottom=False, left=False, right=True, ) ax2.tick_params( axis="both", which="minor", colors="black", labelcolor="black", direction="in", width=1, length=3, labelsize=12, top=True, bottom=False, left=False, right=True, ) ax1.set_xlabel("Wavelength (µm)", fontsize=16) ax1.set_ylabel("Flux (W m$^{-2}$ µm$^{-1}$)", fontsize=16) ax2.set_xlabel("Velocity (km s$^{-1}$)", fontsize=16) ax1.get_xaxis().set_label_coords(0.5, -0.12) ax1.get_yaxis().set_label_coords(-0.1, 0.5) ax2.get_xaxis().set_label_coords(0.5, 1.12) ax1.plot( self.spectrum[:, 0], self.spectrum[:, 1], color="black", label=self.spec_name, ) ax2.plot(self.spec_vrad, self.spectrum[:, 1], ls="-", lw=0.0) flux_sample = np.zeros(n_samples) fwhm_sample = np.zeros(n_samples) mean_sample = np.zeros(n_samples) vrad_sample = np.zeros(n_samples) lum_sample = np.zeros(n_samples) for i in range(n_samples): # Sample fluxes from random errors spec_rand = np.random.normal(self.spectrum[:, 1], self.spectrum[:, 2]) # Interpolate sampled spectrum spec_interp = interp1d( self.spectrum[:, 0], spec_rand, kind=interp_kind, bounds_error=False ) # Resample to high-resolution wavelengths flux_rand = spec_interp(wavel_high_res) # Integrate line flux (W m-2) flux_sample[i] = np.trapz(flux_rand, x=wavel_high_res) # Line luminosity (Lsun) lum_sample[i] = ( 4.0 * np.pi * (1e3 * constants.PARSEC / self.parallax) ** 2 * flux_sample[i] ) lum_sample[i] /= constants.L_SUN # (Lsun) # Weighted (with flux) mean wavelength (um) mean_sample[i] = np.trapz( wavel_high_res * flux_rand, x=wavel_high_res ) / np.trapz(flux_rand, x=wavel_high_res) # Radial velocity (km s-1) vrad_sample[i] = ( 1e-3 * constants.LIGHT * (mean_sample[i] - self.lambda_rest) / self.lambda_rest ) # Find full width at half maximum spline = InterpolatedUnivariateSpline( wavel_high_res, flux_rand - np.max(flux_rand) / 2.0 ) root = spline.roots() diff = root - mean_sample[i] root1 = np.amax(diff[diff < 0.0]) root2 = np.amin(diff[diff > 0.0]) fwhm_sample[i] = 1e-3 * constants.LIGHT * (root2 - root1) / mean_sample[i] # Add 30 samples to the plot if i == 0: ax1.plot( wavel_high_res, flux_rand, ls="-", lw=0.5, color="gray", alpha=0.4, label="Random samples", ) elif i < 30: ax1.plot( wavel_high_res, flux_rand, ls="-", lw=0.5, color="gray", alpha=0.4 ) # Line flux from original, interpolated spectrum spec_interp = interp1d( self.spectrum[:, 0], self.spectrum[:, 1], kind=interp_kind, bounds_error=False, ) flux_high_res = spec_interp(wavel_high_res) line_flux = np.trapz(flux_high_res, x=wavel_high_res) ax1.plot( wavel_high_res, flux_high_res, color="tab:blue", label="High resolution" ) ax1.legend(loc="upper right", frameon=False, fontsize=12.0) print(" [DONE]") if plot_filename is None: else: plt.savefig(plot_filename, bbox_inches="tight") plt.clf() plt.close() wavel_mean, wavel_std = np.mean(mean_sample), np.std(mean_sample) print(f"Mean wavelength (nm) = {1e3*wavel_mean:.2f} +/- {1e3*wavel_std:.2f}") fwhm_mean, fwhm_std = np.mean(fwhm_sample), np.std(fwhm_sample) print(f"FWHM (km s-1) = {fwhm_mean:.2f} +/- {fwhm_std:.2f}") vrad_mean, vrad_std = np.mean(vrad_sample), np.std(vrad_sample) print(f"Radial velocity (km s-1) = {vrad_mean:.1f} +/- {vrad_std:.1f}") line_error = np.std(flux_sample) print(f"Line flux (W m-2) = {line_flux:.2e} +/- {line_error:.2e}") line_lum_mean = np.mean(lum_sample) line_lum_std = np.std(lum_sample) print(f"Line luminosity (Lsun) = {line_lum_mean:.2e} +/- {line_lum_std:.2e}") line_lum_mean = np.mean(np.log10(lum_sample)) line_lum_std = np.std(np.log10(lum_sample)) print( f"Line luminosity log10(L/Lsun) = {line_lum_mean:.2f} +/- {line_lum_std:.2f}" ) if self.hydrogen_line is not None: log_acc_sample = np.log10(self.accretion_luminosity(lum_sample)) log_acc_mean = np.mean(log_acc_sample) log_acc_std = np.std(log_acc_sample) print( "Inflating the uncertainty on the " "accretion luminosity by 0.3 dex\n to " "account for the model uncertainty " "(see Aoyama et al. 2021)..." ) log_acc_std = np.sqrt(log_acc_std**2 + 0.3**2) print( "Accretion luminosity log10(L/Lsun) = " f"{log_acc_mean:.2f} +/- {log_acc_std:.2f}" ) # acc_lum_new = np.log10(acc_lum_sample) + np.random.normal( # 0.0, acc_lum_std, size=acc_lum_sample.size # ) # # acc_lum_mean = np.mean(10.0**acc_lum_new) # acc_lum_std = np.std(10.0**acc_lum_new) # # print( # f"Accretion luminosity (Lsun): {acc_lum_mean:.2e} +/- {acc_lum_std:.2e}" # ) return line_flux, line_error
[docs] @typechecked def fit_gaussian( self, tag: str, min_num_live_points: float = 400, bounds: Optional[Dict[str, Union[Tuple[float, float]]]] = None, output: str = "ultranest/", plot_filename: Optional[str] = "line_fit.pdf", show_status: bool = True, double_gaussian: bool = False, ) -> None: """ Method for fitting a Gaussian profile to an emission line and using ``UltraNest`` for sampling the posterior distributions and estimating the evidence. Parameters ---------- tag : str Database tag where the posterior samples will be stored. min_num_live_points : int Minimum number of live points (see bounds : dict(str, tuple(float, float)), None The boundaries that are used for the uniform priors of the 3 Gaussian parameters (``gauss_amplitude``, ``gauss_mean``, and ``gauss_sigma``). Conservative prior boundaries will be estimated from the spectrum if the argument is set to ``None`` or if any of the required parameters is missing in the ``bounds`` dictionary. output : str Path that is used for the output files from ``UltraNest``. plot_filename : str Filename for the plot with the best-fit line profile. The plot is shown in an interface window if the argument is set to ``None``. show_status : bool Print information about the convergence. double_gaussian : bool Set to ``True`` for fitting a double instead of a single Gaussian. In that case, the ``bounds`` dictionary may also contain ``'gauss_amplitude_2'``, ``'gauss_mean_2'``, and ``'gauss_sigma_2'`` (otherwise conservative parameter boundaries are estimated from the data). Returns ------- NoneType None """ high_spec_res = 1e5 @typechecked def gaussian_function( amplitude: float, mean: float, sigma: float, wavel: np.ndarray ): return amplitude * np.exp(-0.5 * (wavel - mean) ** 2 / sigma**2) # Model parameters modelpar = ["gauss_amplitude", "gauss_mean", "gauss_sigma"] if double_gaussian: modelpar.append("gauss_amplitude_2") modelpar.append("gauss_mean_2") modelpar.append("gauss_sigma_2") # Create a dictionary with the cube indices of the parameters cube_index = {} for i, item in enumerate(modelpar): cube_index[item] = i # Check if all prior boundaries are present if bounds is None: bounds = {} if "gauss_amplitude" not in bounds: bounds["gauss_amplitude"] = (0.0, 2.0 * np.amax(self.spectrum[:, 1])) if "gauss_mean" not in bounds: bounds["gauss_mean"] = (self.spectrum[0, 0], self.spectrum[-1, 0]) if "gauss_sigma" not in bounds: bounds["gauss_sigma"] = (0.0, self.spectrum[-1, 0] - self.spectrum[0, 0]) if double_gaussian: if "gauss_amplitude_2" not in bounds: bounds["gauss_amplitude_2"] = (0.0, 2.0 * np.amax(self.spectrum[:, 1])) if "gauss_mean_2" not in bounds: bounds["gauss_mean_2"] = (self.spectrum[0, 0], self.spectrum[-1, 0]) if "gauss_sigma_2" not in bounds: bounds["gauss_sigma_2"] = ( 0.0, self.spectrum[-1, 0] - self.spectrum[0, 0], ) # Get the MPI rank of the process try: from mpi4py import MPI mpi_rank = MPI.COMM_WORLD.Get_rank() except ModuleNotFoundError: mpi_rank = 0 # Create the output folder if required if mpi_rank == 0 and not os.path.exists(output): os.mkdir(output) @typechecked def lnprior_ultranest(cube: np.ndarray) -> np.ndarray: """ Function for transforming the unit cube into the parameter cube. Parameters ---------- cube : np.ndarray Array with unit parameters. Returns ------- np.ndarray Array with physical parameters. """ params = cube.copy() for item in cube_index: # Uniform priors for all parameters params[cube_index[item]] = ( bounds[item][0] + (bounds[item][1] - bounds[item][0]) * params[cube_index[item]] ) return params @typechecked def lnlike_ultranest(params: np.ndarray) -> np.float64: """ Function for calculating the log-likelihood for the sampled parameter cube. Parameters ---------- params : np.ndarray Cube with physical parameters. Returns ------- float Log-likelihood. """ data_flux = self.spectrum[:, 1] data_var = self.spectrum[:, 2] ** 2 model_flux = gaussian_function( params[cube_index["gauss_amplitude"]], params[cube_index["gauss_mean"]], params[cube_index["gauss_sigma"]], self.spectrum[:, 0], ) if double_gaussian: model_flux += gaussian_function( params[cube_index["gauss_amplitude_2"]], params[cube_index["gauss_mean_2"]], params[cube_index["gauss_sigma_2"]], self.spectrum[:, 0], ) chi_sq = -0.5 * (data_flux - model_flux) ** 2 / data_var return np.nansum(chi_sq) sampler = ultranest.ReactiveNestedSampler( modelpar, lnlike_ultranest, transform=lnprior_ultranest, resume="subfolder", log_dir=output, ) result = show_status=show_status, viz_callback=False, min_num_live_points=min_num_live_points, ) # Log-evidence ln_z = result["logz"] ln_z_error = result["logzerr"] print(f"Log-evidence = {ln_z:.2f} +/- {ln_z_error:.2f}") # Best-fit parameters print("Best-fit parameters (mean +/- std):") for i, item in enumerate(modelpar): mean = np.mean(result["samples"][:, i]) std = np.std(result["samples"][:, i]) print(f" - {item} = {mean:.2e} +/- {std:.2e}") # Maximum likelihood sample print("Maximum likelihood sample:") max_lnlike = result["maximum_likelihood"]["logl"] print(f" - Log-likelihood = {max_lnlike:.2e}") for i, item in enumerate(result["maximum_likelihood"]["point"]): print(f" - {modelpar[i]} = {item:.2e}") # Posterior samples samples = result["samples"] # Best-fit model parameters model_param = { "gauss_amplitude": np.median(samples[:, 0]), "gauss_mean": np.median(samples[:, 1]), "gauss_sigma": np.median(samples[:, 2]), } if double_gaussian: model_param["gauss_amplitude_2"] = np.median(samples[:, 3]) model_param["gauss_mean_2"] = np.median(samples[:, 4]) model_param["gauss_sigma_2"] = np.median(samples[:, 5]) best_model = gaussian_spectrum( self.wavel_range, model_param, spec_res=high_spec_res, double_gaussian=double_gaussian, ) # Interpolate high-resolution continuum if self.continuum_check: cont_interp = interp1d( self.spectrum[:, 0], self.continuum_flux, bounds_error=False ) cont_high_res = cont_interp(best_model.wavelength) else: cont_high_res = np.full(best_model.wavelength.shape[0], 0.0) # Add FWHM velocity modelpar.append("gauss_fwhm") gauss_mean = samples[:, 1] # (um) gauss_fwhm = 2.0 * np.sqrt(2.0 * np.log(2.0)) * samples[:, 2] # (um) vel_fwhm = 1e-3 * constants.LIGHT * gauss_fwhm / gauss_mean # (km s-1) vel_fwhm = vel_fwhm[..., np.newaxis] samples = np.append(samples, vel_fwhm, axis=1) # Add line flux and luminosity print("Calculating line fluxes...", end="", flush=True) modelpar.append("line_flux") modelpar.append("log_line_lum") if self.hydrogen_line is not None: modelpar.append("log_acc_lum") line_flux = np.zeros(samples.shape[0]) line_lum = np.zeros(samples.shape[0]) if self.continuum_check: modelpar.append("line_eq_width") eq_width = np.zeros(samples.shape[0]) for i in range(samples.shape[0]): model_param = { "gauss_amplitude": samples[i, 0], "gauss_mean": samples[i, 1], "gauss_sigma": samples[i, 2], } if double_gaussian: model_param["gauss_amplitude_2"] = samples[i, 3] model_param["gauss_mean_2"] = samples[i, 4] model_param["gauss_sigma_2"] = samples[i, 5] model_box = gaussian_spectrum( self.wavel_range, model_param, spec_res=high_spec_res, double_gaussian=double_gaussian, ) line_flux[i] = np.trapz(model_box.flux, x=model_box.wavelength) # (W m-2) line_lum[i] = ( 4.0 * np.pi * (1e3 * constants.PARSEC / self.parallax) ** 2 * line_flux[i] ) # (W) line_lum[i] /= constants.L_SUN # (Lsun) if self.continuum_check: # Normalize the spectrum to the continuum spec_norm = (model_box.flux + cont_high_res) / cont_high_res # Check if the flux is NaN (due to interpolation errors at the spectrum edge) indices = ~np.isnan(spec_norm) eq_width[i] = np.trapz( 1.0 - spec_norm[indices], x=model_box.wavelength[indices] ) # (um) eq_width[i] *= 1e4 # (A) print(" [DONE]") line_flux = line_flux[..., np.newaxis] samples = np.append(samples, line_flux, axis=1) line_lum = line_lum[..., np.newaxis] samples = np.append(samples, np.log10(line_lum), axis=1) if self.hydrogen_line is not None: log_acc_lum = np.log10(self.accretion_luminosity(line_lum[:, 0])) print( "Inflating the uncertainty on the " "accretion luminosity by 0.3 dex\n to " "account for the model uncertainty " "(see Aoyama et al. 2021)..." ) log_acc_lum += np.random.normal(0.0, 0.3, size=log_acc_lum.size) log_acc_lum = log_acc_lum[..., np.newaxis] samples = np.append(samples, log_acc_lum, axis=1) if self.continuum_check: eq_width = eq_width[..., np.newaxis] samples = np.append(samples, eq_width, axis=1) if self.lambda_rest is not None: # Radial velocity (km s-1) if double_gaussian: # Weighted (with Gaussian amplitudes) mean of the central wavelength v_fit = ( samples[:, 0] * samples[:, 1] + samples[:, 3] * samples[:, 4] ) / (samples[:, 0] + samples[:, 3]) else: v_fit = samples[:, 1] v_rad = ( 1e-3 * constants.LIGHT * (v_fit - self.lambda_rest) / self.lambda_rest ) v_rad = v_rad[..., np.newaxis] modelpar.append("line_vrad") samples = np.append(samples, v_rad, axis=1) # Log-likelihood ln_prob = result["weighted_samples"]["logl"] # Log-evidence ln_z = result["logz"] ln_z_error = result["logzerr"] print(f"Log-evidence = {ln_z:.2f} +/- {ln_z_error:.2f}") # Get the MPI rank of the process try: from mpi4py import MPI mpi_rank = MPI.COMM_WORLD.Get_rank() except ModuleNotFoundError: mpi_rank = 0 # Dictionary with attributes that will be stored attr_dict = { "spec_type": "model", "spec_name": "gaussian", "ln_evidence": (ln_z, ln_z_error), "parallax": self.parallax, } # Add samples to the database if mpi_rank == 0: # Writing the samples to the database is only # possible when using a single process from import Database species_db = Database() species_db.add_samples( sampler="ultranest", samples=samples, ln_prob=ln_prob, tag=tag, modelpar=modelpar, attr_dict=attr_dict, ) # Create plot if plot_filename is None: print("Plotting best-fit line profile...", end="", flush=True) else: print( f"Plotting best-fit line profile: {plot_filename}...", end="", flush=True, ) plt.rcParams[""] = "serif" plt.rcParams["mathtext.fontset"] = "dejavuserif" plt.rcParams["axes.axisbelow"] = False plt.figure(figsize=(6, 6)) gs = mpl.gridspec.GridSpec(2, 1) gs.update(wspace=0, hspace=0.1, left=0, right=1, bottom=0, top=1) ax1 = plt.subplot(gs[0, 0]) ax2 = plt.subplot(gs[1, 0]) ax3 = ax1.twiny() ax4 = ax2.twiny() ax1.tick_params( axis="both", which="major", colors="black", labelcolor="black", direction="in", width=1, length=5, labelsize=12, top=False, bottom=True, left=True, right=True, labelbottom=False, ) ax1.tick_params( axis="both", which="minor", colors="black", labelcolor="black", direction="in", width=1, length=3, labelsize=12, top=False, bottom=True, left=True, right=True, labelbottom=False, ) ax2.tick_params( axis="both", which="major", colors="black", labelcolor="black", direction="in", width=1, length=5, labelsize=12, top=False, bottom=True, left=True, right=True, ) ax2.tick_params( axis="both", which="minor", colors="black", labelcolor="black", direction="in", width=1, length=3, labelsize=12, top=False, bottom=True, left=True, right=True, ) ax3.tick_params( axis="both", which="major", colors="black", labelcolor="black", direction="in", width=1, length=5, labelsize=12, top=True, bottom=False, left=True, right=True, ) ax3.tick_params( axis="both", which="minor", colors="black", labelcolor="black", direction="in", width=1, length=3, labelsize=12, top=True, bottom=False, left=True, right=True, ) ax4.tick_params( axis="both", which="major", colors="black", labelcolor="black", direction="in", width=1, length=5, labelsize=12, top=True, bottom=False, left=True, right=True, labeltop=False, ) ax4.tick_params( axis="both", which="minor", colors="black", labelcolor="black", direction="in", width=1, length=3, labelsize=12, top=True, bottom=False, left=True, right=True, labeltop=False, ) ax1.set_ylabel("Flux (W m$^{-2}$ µm$^{-1}$)", fontsize=16) ax2.set_xlabel("Wavelength (µm)", fontsize=16) ax2.set_ylabel("Flux (W m$^{-2}$ µm$^{-1}$)", fontsize=16) ax3.set_xlabel("Velocity (km s$^{-1}$)", fontsize=16) ax1.get_yaxis().set_label_coords(-0.1, 0.5) ax2.get_xaxis().set_label_coords(0.5, -0.1) ax2.get_yaxis().set_label_coords(-0.1, 0.5) ax3.get_xaxis().set_label_coords(0.5, 1.12) ax1.plot( self.spectrum[:, 0], self.spectrum[:, 1] + self.continuum_flux, color="black", label=self.spec_name, ) ax1.plot( best_model.wavelength, best_model.flux + cont_high_res, color="tab:blue", label="Best-fit model (continuum + line)", ) ax2.plot( self.spectrum[:, 0], self.spectrum[:, 1], color="black", label=self.spec_name, ) ax2.plot( best_model.wavelength, best_model.flux, color="tab:blue", label="Best-fit line profile", ) ax3.plot( self.spec_vrad, self.spectrum[:, 1] + self.continuum_flux, ls="-", lw=0.0 ) ax4.plot(self.spec_vrad, self.spectrum[:, 1], ls="-", lw=0.0) ax1.legend(loc="upper left", frameon=False, fontsize=12.0) ax2.legend(loc="upper left", frameon=False, fontsize=12.0) print(" [DONE]") if plot_filename is None: else: plt.savefig(plot_filename, bbox_inches="tight") plt.clf() plt.close()
[docs] @typechecked def accretion_luminosity( self, line_lum: Union[float, np.ndarray], ) -> Union[float, np.ndarray]: """ Method for calculating the accretion luminosity from the (hydrogen) line luminosity with the relation from `Aoyama et al. (2021) < 2021ApJ...917L..30A/abstract>`_ and extrapolated values from `Marleau & Aoyama (2022) <https://ui.adsabs.harvard. edu/abs/2022RNAAS...6..262M/abstract>`_. Parameters ---------- line_lum : float, np.array Line luminosity (:math:`L_\\odot`) or array with line luminosities. Returns ------- float, np.ndarray Accretion luminosity (:math:`L_\\odot`) or array with accretion luminosities. """ with h5py.File(self.database, "r") as h5_file: line_names = np.array(h5_file["accretion/hydrogen_lines"], dtype=str) coefficients = np.array(h5_file["accretion/coefficients"]) line_idx = np.argwhere(line_names == self.hydrogen_line)[0][0] a_coeff, b_coeff = coefficients[line_idx,] # Equation C1 in Aoymama et al. (2021) log_acc_lum = a_coeff * np.log10(line_lum) + b_coeff return 10.0**log_acc_lum