Source code for species.util.dust_util

"""
Utility functions for dust cross sections and extinction.
"""

import os
import configparser

from typing import Dict, List, Tuple, Union

import h5py
import numpy as np

from typeguard import typechecked
from scipy.interpolate import interp1d, RegularGridInterpolator
from scipy.stats import lognorm

from species.read.read_filter import ReadFilter
from species.data.misc_data.dust_data import add_cross_sections, add_optical_constants


[docs] @typechecked def check_dust_database() -> str: """ Function to check if the dust data is present in the database and add the data if needed. Returns ------- str Path of the HDF5 database. """ config_file = os.path.join(os.getcwd(), "species_config.ini") config = configparser.ConfigParser() config.read(config_file) database_path = config["species"]["database"] data_folder = config["species"]["data_folder"] with h5py.File(database_path, "a") as hdf5_file: if "dust" not in hdf5_file: add_optical_constants(data_folder, hdf5_file) add_cross_sections(data_folder, hdf5_file) return database_path
[docs] @typechecked def log_normal_distribution( radius_g: float, sigma_g: float, n_bins: int ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Function for returning a log-normal size distribution. See Eq. 9 in Ackerman & Marley (2001). Parameters ---------- radius_g : float Mean geometric radius (um). sigma_g : float Geometric standard deviation (dimensionless). n_bins : int Number of logarithmically-spaced radius bins. Returns ------- np.ndarray Number of grains in each radius bin, normalized to a total of 1 grain. np.ndarray Widths of the radius bins (um). np.ndarray Grain radii (um). """ if sigma_g == 1.0: # The log-normal distribution is equal to a delta # function with sigma_g = 1 radii = np.array([radius_g]) r_width = np.array([np.nan]) dn_grains = np.array([1.0]) else: # Get the radius interval which contains 99.999% # of the distribution interval = lognorm.interval( 1.0 - 1e-5, np.log(sigma_g), loc=0.0, scale=radius_g ) # Create bin boundaries (um), so +1 because there # are n_bins+1 bin boundaries r_bins = np.logspace(np.log10(interval[0]), np.log10(interval[1]), n_bins + 1) # Width of the radius bins (um) r_width = np.diff(r_bins) # Grain radii (um) at which the size distribution is sampled radii = (r_bins[1:] + r_bins[:-1]) / 2.0 # Number of grains per radius bin width, normalized to an # integrated value of 1 grain, that is, # np.sum(dn_dr*r_width) = 1 # The log-normal distribution from Ackerman & Marley 2001 # gives the same result as scipy.stats.lognorm.pdf with # s = log(sigma_g) and scale=radius_g dn_dr = lognorm.pdf(radii, s=np.log(sigma_g), loc=0.0, scale=radius_g) # Number of grains for each radius bin dn_grains = dn_dr * r_width return dn_grains, r_width, radii
[docs] @typechecked def power_law_distribution( exponent: float, radius_min: float, radius_max: float, n_bins: int ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Function for returning a power-law size distribution. Parameters ---------- exponent : float Exponent of the power-law size distribution, dn/dr = r**exponent. radius_min : float Minimum grain radius (um). radius_max : float Maximum grain radius (um). n_bins : int Number of logarithmically-spaced radius bins. Returns ------- np.ndarray Number of grains in each radius bin, normalized to a total of 1 grain. np.ndarray Widths of the radius bins (um). np.ndarray Grain radii (um). """ # Create bin boundaries (um), so +1 because there # are n_sizes+1 bin boundaries (um) r_bins = np.logspace(np.log10(radius_min), np.log10(radius_max), n_bins + 1) # Width of the radius bins (um) r_width = np.diff(r_bins) # Grains radii (um) at which the size distribution is sampled radii = (r_bins[1:] + r_bins[:-1]) / 2.0 # Number of grains per radius bins size dn_dr = radii**exponent # Normalize the size distribution to 1 grain dn_dr /= np.sum(r_width * dn_dr) # Number of grains for each radius bin dn_grains = dn_dr * r_width return dn_grains, r_width, radii
[docs] @typechecked def dust_cross_section( dn_grains: np.ndarray, radii: np.ndarray, wavelength: float, n_index: float, k_index: float, ) -> np.float64: """ Function for calculating the extinction cross section for a size distribution of dust grains. Parameters ---------- dn_grains : np.ndarray Number of grains in each radius bin, normalized to a total of 1 grain. radii : np.ndarray Grain radii (um). wavelength : float Wavelength (um). n_index : float Real part of the refractive index. k_index : float Imaginary part of the refractive index. Returns ------- float Extinction cross section (um2) """ # Importing here because it causes an error with Python 3.10 import PyMieScatt c_ext = 0.0 for i, item in enumerate(radii): # From the PyMieScatt documentation: When using PyMieScatt, # pay close attention to the units of the your inputs and # outputs. Wavelength and particle diameters are always in # nanometers, efficiencies are unitless, cross-sections are # in nm2, coefficients are in Mm-1, and size distribution # concentration is always in cm-3. mie = PyMieScatt.MieQ( complex(n_index, k_index), wavelength * 1e3, # (nm) 2.0 * item * 1e3, # diameter (nm) asDict=True, asCrossSection=False, ) if "Qext" in mie: c_ext += np.pi * item**2 * mie["Qext"] * dn_grains[i] # (um2) else: raise ValueError("Qext not found in the PyMieScatt dictionary.") return c_ext # (um2)
[docs] @typechecked def calc_reddening( filters_color: Tuple[str, str], extinction: Tuple[str, float], composition: str = "MgSiO3", structure: str = "crystalline", radius_g: float = 1.0, ) -> Tuple[float, float]: """ Function for calculating the reddening of a color given the extinction for a given filter. A log-normal size distribution with a geometric standard deviation of 2 is used as parametrization for the grain sizes (Ackerman & Marley 2001). Parameters ---------- filters_color : tuple(str, str) Filter names for which the extinction is calculated. extinction : str Filter name and extinction (mag). composition : str Dust composition ('MgSiO3' or 'Fe'). structure : str Grain structure ('crystalline' or 'amorphous'). radius_g : float Geometric radius of the grain size distribution (um). Returns ------- float Extinction (mag) for ``filters_color[0]``. float Extinction (mag) for ``filters_color[1]``. """ database_path = check_dust_database() h5_file = h5py.File(database_path, "r") filters = [extinction[0], filters_color[0], filters_color[1]] dn_grains, _, radii = log_normal_distribution(radius_g, 2.0, 100) c_ext = {} for item in filters: h5_file.close() read_filt = ReadFilter(item) filter_wavel = read_filt.mean_wavelength() h5_file = h5py.File(database_path, "r") if composition == "MgSiO3" and structure == "crystalline": for i in range(3): data = h5_file[f"dust/mgsio3/crystalline/axis_{i+1}"] wavel_index = (np.abs(data[:, 0] - filter_wavel)).argmin() # Average cross section of the three axes if i == 0: c_ext[item] = ( dust_cross_section( dn_grains, radii, data[wavel_index, 0], data[wavel_index, 1], data[wavel_index, 2], ) / 3.0 ) else: c_ext[item] += ( dust_cross_section( dn_grains, radii, data[wavel_index, 0], data[wavel_index, 1], data[wavel_index, 2], ) / 3.0 ) else: if composition == "MgSiO3" and structure == "amorphous": data = h5_file["dust/mgsio3/amorphous/"] elif composition == "Fe" and structure == "crystalline": data = h5_file["dust/fe/crystalline/"] elif composition == "Fe" and structure == "amorphous": data = h5_file["dust/fe/amorphous/"] wavel_index = (np.abs(data[:, 0] - filter_wavel)).argmin() c_ext[item] += ( dust_cross_section( dn_grains, radii, data[wavel_index, 0], data[wavel_index, 1], data[wavel_index, 2], ) / 3.0 ) h5_file.close() return ( extinction[1] * c_ext[filters_color[0]] / c_ext[extinction[0]], extinction[1] * c_ext[filters_color[1]] / c_ext[extinction[0]], )
[docs] @typechecked def interp_lognorm( inc_phot: List[str], inc_spec: List[str], ) -> Tuple[ Dict[str, Union[RegularGridInterpolator, List[RegularGridInterpolator]]], np.ndarray, np.ndarray, ]: """ Function for interpolating the log-normal dust cross sections for each filter and spectrum. Parameters ---------- inc_phot : list(str) List with filter names. Not used if the list is empty. inc_spec : list(str) List with the spectrum names (as stored in the database with :func:`~species.data.database.Database.add_object`). Not used if the list is empty. Returns ------- dict Dictionary with the extinction cross section for each filter and spectrum. np.ndarray Grid points of the geometric mean radius. np.ndarray Grid points of the geometric standard deviation. """ database_path = check_dust_database() with h5py.File(database_path, "r") as h5_file: cross_section = np.asarray( h5_file["dust/lognorm/mgsio3/crystalline/cross_section"] ) wavelength = np.asarray(h5_file["dust/lognorm/mgsio3/crystalline/wavelength"]) radius_g = np.asarray(h5_file["dust/lognorm/mgsio3/crystalline/radius_g"]) sigma_g = np.asarray(h5_file["dust/lognorm/mgsio3/crystalline/sigma_g"]) print("Grid boundaries of the dust opacities:") print(f" - Wavelength (um) = {wavelength[0]:.2f} - {wavelength[-1]:.2f}") print(f" - Geometric mean radius (um) = {radius_g[0]:.2e} - {radius_g[-1]:.2e}") print(f" - Geometric standard deviation = {sigma_g[0]:.2f} - {sigma_g[-1]:.2f}") inc_phot.append("Generic/Bessell.V") cross_sections = {} for phot_item in inc_phot: read_filt = ReadFilter(phot_item) filt_trans = read_filt.get_filter() cross_phot = np.zeros((radius_g.shape[0], sigma_g.shape[0])) for i in range(radius_g.shape[0]): for j in range(sigma_g.shape[0]): cross_interp = interp1d( wavelength, cross_section[:, i, j], kind="linear", bounds_error=True ) cross_tmp = cross_interp(filt_trans[:, 0]) integral1 = np.trapz(filt_trans[:, 1] * cross_tmp, x=filt_trans[:, 0]) integral2 = np.trapz(filt_trans[:, 1], x=filt_trans[:, 0]) # Filter-weighted average of the extinction cross section cross_phot[i, j] = integral1 / integral2 cross_sections[phot_item] = RegularGridInterpolator( (radius_g, sigma_g), cross_phot, method="linear", bounds_error=True ) print("Interpolating dust opacities...", end="") if len(inc_spec) > 0: cross_tmp = RegularGridInterpolator( (wavelength, radius_g, sigma_g), cross_section, method="linear", bounds_error=True, ) cross_sections["spectrum"] = cross_tmp print(" [DONE]") return cross_sections, radius_g, sigma_g
[docs] @typechecked def interp_powerlaw( inc_phot: List[str], inc_spec: List[str], ) -> Tuple[ Dict[str, Union[RegularGridInterpolator, List[RegularGridInterpolator]]], np.ndarray, np.ndarray, ]: """ Function for interpolating the power-law dust cross sections for each filter and spectrum. Parameters ---------- inc_phot : list(str) List with filter names. Not used if the list is empty. inc_spec : list(str) List with the spectrum names (as stored in the database with :func:`~species.data.database.Database.add_object`). Not used if the list is empty. Returns ------- dict Dictionary with the extinction cross section for each filter and spectrum. np.ndarray Grid points of the maximum radius. np.ndarray Grid points of the power-law exponent. """ database_path = check_dust_database() with h5py.File(database_path, "r") as h5_file: cross_section = np.asarray( h5_file["dust/powerlaw/mgsio3/crystalline/cross_section"] ) wavelength = np.asarray(h5_file["dust/powerlaw/mgsio3/crystalline/wavelength"]) radius_max = np.asarray(h5_file["dust/powerlaw/mgsio3/crystalline/radius_max"]) exponent = np.asarray(h5_file["dust/powerlaw/mgsio3/crystalline/exponent"]) print("Grid boundaries of the dust opacities:") print(f" - Wavelength (um) = {wavelength[0]:.2f} - {wavelength[-1]:.2f}") print(f" - Maximum radius (um) = {radius_max[0]:.2e} - {radius_max[-1]:.2e}") print(f" - Power-law exponent = {exponent[0]:.2f} - {exponent[-1]:.2f}") inc_phot.append("Generic/Bessell.V") cross_sections = {} for phot_item in inc_phot: read_filt = ReadFilter(phot_item) filt_trans = read_filt.get_filter() cross_phot = np.zeros((radius_max.shape[0], exponent.shape[0])) for i in range(radius_max.shape[0]): for j in range(exponent.shape[0]): cross_interp = interp1d( wavelength, cross_section[:, i, j], kind="linear", bounds_error=True ) cross_tmp = cross_interp(filt_trans[:, 0]) integral1 = np.trapz(filt_trans[:, 1] * cross_tmp, x=filt_trans[:, 0]) integral2 = np.trapz(filt_trans[:, 1], x=filt_trans[:, 0]) # Filter-weighted average of the extinction cross section cross_phot[i, j] = integral1 / integral2 cross_sections[phot_item] = RegularGridInterpolator( (radius_max, exponent), cross_phot, method="linear", bounds_error=True ) print("Interpolating dust opacities...", end="") if len(inc_spec) > 0: cross_tmp = RegularGridInterpolator( (wavelength, radius_max, exponent), cross_section, method="linear", bounds_error=True, ) cross_sections["spectrum"] = cross_tmp print(" [DONE]") return cross_sections, radius_max, exponent
[docs] @typechecked def ism_extinction( av_mag: float, rv_red: float, wavelengths: Union[np.ndarray, List[float], float] ) -> np.ndarray: """ Function for calculating the optical and IR extinction with the empirical relation from `Cardelli et al. (1989) <https://ui.adsabs.harvard.edu/abs/1989ApJ...345..245C/abstract>`_. Parameters ---------- av_mag : float Extinction (mag) in the $V$ band. rv_red : float Reddening in the $V$ band, ``R_V = A_V / E(B-V)``. wavelengths : np.ndarray, list(float), float Array or list with the wavelengths (um) for which the extinction is calculated. It is also possible to provide a single value as float. Returns ------- np.ndarray Extinction (mag) at ``wavelengths``. """ if isinstance(wavelengths, float): wavelengths = np.array([wavelengths]) elif isinstance(wavelengths, list): wavelengths = np.array(wavelengths) x_wavel = 1.0 / wavelengths y_wavel = x_wavel - 1.82 a_coeff = np.zeros(x_wavel.size) b_coeff = np.zeros(x_wavel.size) indices = np.where(x_wavel < 1.1)[0] if len(indices) > 0: a_coeff[indices] = 0.574 * x_wavel[indices] ** 1.61 b_coeff[indices] = -0.527 * x_wavel[indices] ** 1.61 indices = np.where(x_wavel >= 1.1)[0] if len(indices) > 0: a_coeff[indices] = ( 1.0 + 0.17699 * y_wavel[indices] - 0.50447 * y_wavel[indices] ** 2 - 0.02427 * y_wavel[indices] ** 3 + 0.72085 * y_wavel[indices] ** 4 + 0.01979 * y_wavel[indices] ** 5 - 0.77530 * y_wavel[indices] ** 6 + 0.32999 * y_wavel[indices] ** 7 ) b_coeff[indices] = ( 1.41338 * y_wavel[indices] + 2.28305 * y_wavel[indices] ** 2 + 1.07233 * y_wavel[indices] ** 3 - 5.38434 * y_wavel[indices] ** 4 - 0.62251 * y_wavel[indices] ** 5 + 5.30260 * y_wavel[indices] ** 6 - 2.09002 * y_wavel[indices] ** 7 ) return av_mag * (a_coeff + b_coeff / rv_red)
[docs] @typechecked def apply_ism_ext( wavelengths: np.ndarray, flux: np.ndarray, v_band_ext: float, v_band_red: float ) -> np.ndarray: """ Function for applying ISM extinction to a spectrum. wavelengths : np.ndarray Wavelengths (um) of the spectrum. flux : np.ndarray Fluxes (W m-2 um-1) of the spectrum. v_band_ext : float Extinction (mag) in the $V$ band. v_band_red : float Reddening in the $V$ band. Returns ------- np.ndarray Fluxes (W m-2 um-1) with the extinction applied. """ ext_mag = ism_extinction(v_band_ext, v_band_red, wavelengths) return flux * 10.0 ** (-0.4 * ext_mag)
[docs] @typechecked def convert_to_av( filter_name: str, filter_ext: float, v_band_red: float = 3.1 ) -> float: """ Function for converting the extinction in any filter from the `SVO Filter Profile Service <http://svo2.cab.inta-csic. es/svo/theory/fps/>`_ to a visual extinction, :math:`A_V`. This is done by simply scaling the extinction so at the mean wavelength of the filter. filter_name : str Filter name for which the extinction will be converted to a visual extinction (i.e. :math:`A_V`). filter_ext : float Extinction (mag) for the ``filter_name``. v_band_red : float Reddening in the $V$ band. Returns ------- float Visual extinction (i.e. :math:`A_V`) for which the extinction in the ``filter_name`` band is ``filter_ext``. """ av_test = 1.0 # Mean wavelength for filter_name read_filt = ReadFilter(filter_name) filt_wavel = np.array([read_filt.mean_wavelength()]) # Calculate test extinction for A_V = 1.0 # at mean wavelength of filter_name ext_ref = ism_extinction(av_test, v_band_red, filt_wavel)[0] # Scaling for A_V = 1.0 to the A_V for which # extinction of filter_name is filter_ext scaling = filter_ext / ext_ref # Should be the same as filter_ext # filter_ext_test = ism_extinction(scaling * av_test, v_band_red, filt_wavel)[0] return scaling * av_test