"""
Utility functions for data processing.
"""
import warnings
import numpy as np
from scipy.interpolate import griddata
[docs]def update_sptype(sptypes):
"""
Function to update a list with spectral types to two characters (e.g., M8, L3, or T1).
Parameters
----------
sptypes : np.ndarray
Input spectral types.
Returns
-------
np.ndarray
Updated spectral types.
"""
sptype_list = ['O', 'B', 'A', 'F', 'G', 'K', 'M', 'L', 'T', 'Y']
for i, spt_item in enumerate(sptypes):
if spt_item == 'None':
pass
elif spt_item == 'null':
sptypes[i] = 'None'
else:
for list_item in sptype_list:
try:
sp_index = spt_item.index(list_item)
sptypes[i] = spt_item[sp_index:sp_index+2]
except ValueError:
pass
return sptypes
[docs]def update_filter(filter_in):
"""
Function to update a filter ID from the Vizier Photometry viewer VOTable to the filter ID from
the SVO Filter Profile Service.
Parameters
----------
filter_in : str
Filter ID in the format of the Vizier Photometry viewer.
Returns
-------
str
Filter ID in the format of the SVO Filter Profile Service.
"""
if filter_in[0:5] == b'2MASS':
filter_out = str(b'2MASS/2MASS.'+filter_in[6:])
elif filter_in[0:4] == b'WISE':
filter_out = str(b'WISE/WISE.'+filter_in[5:])
elif filter_in[0:10] == b'GAIA/GAIA2':
filter_out = str(filter_in[0:9]+b'0'+filter_in[10:])
else:
filter_out = None
return filter_out
[docs]def sort_data(param_teff,
param_logg,
param_feh,
param_co,
param_fsed,
wavelength,
flux):
"""
Parameters
----------
param_teff : np.ndarray
Array with the effective temperature (K) of each spectrum.
param_logg : np.ndarray
Array with the log10 surface gravity (cgs) of each spectrum.
param_feh : np.ndarray, None
Array with the metallicity of each spectrum. Not used if set to None.
param_co : np.ndarray, None
Array with the carbon-to-oxygen ratio of each spectrum. Not used if set to None.
param_fsed : np.ndarray, None
Array with the sedimentation parameter of each spectrum. Not used if set to None.
wavelength : np.ndarray
Array with the wavelengths (um).
flux : np.ndarray
Array with the spectra (n_spectra, n_wavelengths).
Returns
-------
list(np.ndarray, )
List with the unique values of the atmosphere parameters (each in a separate array), an
array with the wavelengths, and a multidimensional array with the sorted spectra.
"""
teff_unique = np.unique(param_teff)
logg_unique = np.unique(param_logg)
spec_shape = [teff_unique.shape[0], logg_unique.shape[0]]
if param_feh is not None:
feh_unique = np.unique(param_feh)
spec_shape.append(feh_unique.shape[0])
if param_co is not None:
co_unique = np.unique(param_co)
spec_shape.append(co_unique.shape[0])
if param_fsed is not None:
fsed_unique = np.unique(param_fsed)
spec_shape.append(fsed_unique.shape[0])
spec_shape.append(wavelength.shape[0])
spectrum = np.zeros(spec_shape)
for i in range(param_teff.shape[0]):
# The parameter order: Teff, log(g), [Fe/H], C/O, f_sed
# Not all parameters have to be included but the order matters
index_teff = np.argwhere(teff_unique == param_teff[i])[0][0]
index_logg = np.argwhere(logg_unique == param_logg[i])[0][0]
spec_select = [index_teff, index_logg]
if param_feh is not None:
index_feh = np.argwhere(feh_unique == param_feh[i])[0][0]
spec_select.append(index_feh)
if param_co is not None:
index_co = np.argwhere(co_unique == param_co[i])[0][0]
spec_select.append(index_co)
if param_fsed is not None:
index_fsed = np.argwhere(fsed_unique == param_fsed[i])[0][0]
spec_select.append(index_fsed)
spec_select.append(...)
spectrum[tuple(spec_select)] = flux[i]
sorted_data = [teff_unique, logg_unique]
if param_feh is not None:
sorted_data.append(feh_unique)
if param_co is not None:
sorted_data.append(co_unique)
if param_fsed is not None:
sorted_data.append(fsed_unique)
sorted_data.append(wavelength)
sorted_data.append(spectrum)
return sorted_data
[docs]def write_data(model,
parameters,
database,
data_sorted):
"""
Function for writing the model spectra and parameters to the database.
Parameters
----------
model : str
Atmosphere model.
parameters : list(str, )
Model parameters.
database: h5py._hl.files.File
Database.
data_sorted : list(np.ndarray, )
Sorted model data with the parameter values, wavelength points (um), and flux
densities (W m-2 um-1).
Returns
-------
NoneType
None
"""
if f'models/{model}' in database:
del database[f'models/{model}']
dset = database.create_group(f'models/{model}')
dset.attrs['n_param'] = len(parameters)
for i, item in enumerate(parameters):
dset.attrs[f'parameter{i}'] = item
database.create_dataset(f'models/{model}/{item}',
data=data_sorted[i])
database.create_dataset(f'models/{model}/wavelength',
data=data_sorted[len(parameters)])
database.create_dataset(f'models/{model}/flux',
data=data_sorted[len(parameters)+1])
[docs]def add_missing(model,
parameters,
database):
"""
Function for adding missing grid points by linearly interpolating the available grid points.
Parameters
----------
model : str
Atmosphere model.
parameters : list(str, )
Model parameters.
database : h5py._hl.files.File
Database.
Returns
-------
NoneType
None
"""
print(f'Number of grid points per parameter:')
grid_shape = []
param_data = []
for i, item in enumerate(parameters):
grid_shape.append(database[f'models/{model}/{item}'].shape[0])
param_data.append(np.asarray(database[f'models/{model}/{item}']))
print(f' - {item}: {grid_shape[i]}')
teff = np.asarray(database[f'models/{model}/teff'])
wavelength = np.asarray(database[f'models/{model}/wavelength'])
flux = np.asarray(database[f'models/{model}/flux'])
count_total = 0
count_missing = 0
print('Fixing missing grid points:')
if len(parameters) == 2:
find_missing = np.zeros(grid_shape, dtype=bool)
values = []
points = [[], []]
new_points = [[], []]
new_flux = np.zeros((grid_shape[0], grid_shape[1], wavelength.shape[0]))
for i in range(grid_shape[0]):
for j in range(grid_shape[1]):
if np.count_nonzero(flux[i, j, ...]) == 0:
find_missing[i, j] = True
else:
points[0].append(param_data[0][i])
points[1].append(param_data[1][j])
values.append(flux[i, j, ...])
new_points[0].append(param_data[0][i])
new_points[1].append(param_data[1][j])
count_total += 1
values = np.asarray(values)
points = np.asarray(points)
new_points = np.asarray(new_points)
test = griddata(points.T, values, new_points.T, method='linear')
for item in test:
if np.isnan(item[0]):
count_missing += 1
count_interp = np.sum(find_missing) - count_missing
count = 0
for i in range(grid_shape[0]):
for j in range(grid_shape[1]):
new_flux[i, j, :] = test[count, :]
count += 1
elif len(parameters) == 3:
find_missing = np.zeros(grid_shape, dtype=bool)
values = []
points = [[], [], []]
new_points = [[], [], []]
new_flux = np.zeros((grid_shape[0], grid_shape[1], grid_shape[2], wavelength.shape[0]))
for i in range(grid_shape[0]):
for j in range(grid_shape[1]):
for k in range(grid_shape[2]):
if np.count_nonzero(flux[i, j, k, ...]) == 0:
find_missing[i, j, k] = True
else:
points[0].append(param_data[0][i])
points[1].append(param_data[1][j])
points[2].append(param_data[2][k])
values.append(flux[i, j, k, ...])
new_points[0].append(param_data[0][i])
new_points[1].append(param_data[1][j])
new_points[2].append(param_data[2][k])
count_total += 1
values = np.asarray(values)
points = np.asarray(points)
new_points = np.asarray(new_points)
test = griddata(points.T, values, new_points.T, method='linear')
for item in test:
if np.isnan(item[0]):
count_missing += 1
count_interp = np.sum(find_missing) - count_missing
count = 0
for i in range(grid_shape[0]):
for j in range(grid_shape[1]):
for k in range(grid_shape[2]):
new_flux[i, j, k, :] = test[count, :]
count += 1
elif len(parameters) == 4:
find_missing = np.zeros(grid_shape, dtype=bool)
values = []
points = [[], [], [], []]
new_points = [[], [], [], []]
new_flux = np.zeros((grid_shape[0], grid_shape[1], grid_shape[2], grid_shape[3], wavelength.shape[0]))
for i in range(grid_shape[0]):
for j in range(grid_shape[1]):
for k in range(grid_shape[2]):
for m in range(grid_shape[3]):
if np.count_nonzero(flux[i, j, k, m, ...]) == 0:
find_missing[i, j, k, m] = True
else:
points[0].append(param_data[0][i])
points[1].append(param_data[1][j])
points[2].append(param_data[2][k])
points[3].append(param_data[3][m])
values.append(flux[i, j, k, m, ...])
new_points[0].append(param_data[0][i])
new_points[1].append(param_data[1][j])
new_points[2].append(param_data[2][k])
new_points[3].append(param_data[3][m])
count_total += 1
values = np.asarray(values)
points = np.asarray(points)
new_points = np.asarray(new_points)
test = griddata(points.T, values, new_points.T, method='linear')
for item in test:
if np.isnan(item[0]):
count_missing += 1
count_interp = np.sum(find_missing) - count_missing
count = 0
for i in range(grid_shape[0]):
for j in range(grid_shape[1]):
for k in range(grid_shape[2]):
for m in range(grid_shape[3]):
new_flux[i, j, k, m, :] = test[count, :]
count += 1
elif len(parameters) == 5:
find_missing = np.zeros(grid_shape, dtype=bool)
values = []
points = [[], [], [], [], []]
new_points = [[], [], [], [], []]
new_flux = np.zeros((grid_shape[0], grid_shape[1], grid_shape[2], grid_shape[3], grid_shape[4], wavelength.shape[0]))
for i in range(grid_shape[0]):
for j in range(grid_shape[1]):
for k in range(grid_shape[2]):
for m in range(grid_shape[3]):
for n in range(grid_shape[4]):
if np.count_nonzero(flux[i, j, k, m, n, ...]) == 0:
find_missing[i, j, k, m, n] = True
else:
points[0].append(param_data[0][i])
points[1].append(param_data[1][j])
points[2].append(param_data[2][k])
points[3].append(param_data[3][m])
points[4].append(param_data[4][n])
values.append(flux[i, j, k, m, n, ...])
new_points[0].append(param_data[0][i])
new_points[1].append(param_data[1][j])
new_points[2].append(param_data[2][k])
new_points[3].append(param_data[3][m])
new_points[4].append(param_data[4][n])
count_total += 1
values = np.asarray(values)
points = np.asarray(points)
new_points = np.asarray(new_points)
test = griddata(points.T, values, new_points.T, method='linear')
for item in test:
if np.isnan(item[0]):
count_missing += 1
count_interp = np.sum(find_missing) - count_missing
count = 0
for i in range(grid_shape[0]):
for j in range(grid_shape[1]):
for k in range(grid_shape[2]):
for m in range(grid_shape[3]):
for n in range(grid_shape[4]):
new_flux[i, j, k, m, n, :] = test[count, :]
count += 1
else:
raise ValueError('The add_missing function is currently not compatible with more than 5 '
'parameters.')
# if len(parameters) == 4:
# check_constant = np.zeros(grid_shape, dtype=bool)
#
# for z in range(5):
# for i in range(grid_shape[0]):
# for j in range(grid_shape[1]):
# for k in range(grid_shape[2]):
# for m in range(grid_shape[3]):
# if z == 0:
# count_total += 1
#
# index = (i, j, k, m, ...)
#
# if np.count_nonzero(flux[index]) == 0:
# for dim_index in range(len(grid_shape)):
#
# if index[dim_index] > 0 and index[dim_index] < grid_shape[dim_index]-1:
# index_low = [i, j, k, m, ...]
# index_up = [i, j, k, m, ...]
#
# index_low[dim_index] = index_low[dim_index] - 1
# index_up[dim_index] = index_up[dim_index] + 1
#
# index_low = tuple(index_low)
# index_up = tuple(index_up)
#
# if np.count_nonzero(flux[index_low]) != 0 and np.count_nonzero(flux[index_up]) != 0:
# scaling = (param_data[dim_index][index[dim_index]] - param_data[dim_index][index_low[dim_index]]) / (param_data[dim_index][index_up[dim_index]] - param_data[dim_index][index_low[dim_index]])
# flux[index] = flux[index_low]*(1.-scaling) + flux[index_up]*scaling
# count_interp += 1
# break
#
# for z in range(2):
# for i in range(grid_shape[0]):
# for j in range(grid_shape[1]):
# for k in range(grid_shape[2]):
# for m in range(grid_shape[3]):
# index = (i, j, k, m, ...)
#
# if np.count_nonzero(flux[index]) == 0:
# for dim_index in range(len(grid_shape)):
#
# if index[dim_index] > 0:
# index_low = [i, j, k, m, ...]
# index_low[dim_index] = index_low[dim_index] - 1
# index_low = tuple(index_low)
#
# if np.count_nonzero(flux[index_low]) != 0:
# if z == 0 and check_constant[index_low[:-1]]:
# continue
#
# flux[index] = flux[index_low]
# count_same += 1
# check_constant[index] = True
# print(param_data[0][i], param_data[1][j], param_data[2][k], param_data[3][m])
# break
#
# elif index[dim_index] < grid_shape[dim_index]-1:
# index_up = [i, j, k, m, ...]
# index_up[dim_index] = index_up[dim_index] + 1
# index_up = tuple(index_up)
#
# if np.count_nonzero(flux[index_up]) != 0:
# if z == 0 and check_constant[index_up[:-1]]:
# continue
#
# flux[index] = flux[index_up]
# count_same += 1
# check_constant[index] = True
# print(param_data[0][i], param_data[1][j], param_data[2][k], param_data[3][m])
# break
#
# if np.count_nonzero(flux[index]) == 0:
# print(z)
# if z == 1:
# count_missing += 1
#
# warnings.warn(f'It is not possible to add the missing grid position '
# f'at ({param_data[0][i]}, {param_data[1][j]}, '
# f'{param_data[2][k]}, {param_data[3][m]}). '
# f'Storing a spectrum with only zeros instead.')
#
# else:
# raise ValueError(f'Interpolation of missing grid points is not implemented for '
# f'{len(parameters)} parameters.')
print(f' - Number of stored grid points: {count_total}')
print(f' - Number of interpolated grid points: {count_interp}')
print(f' - Number of missing grid points: {count_missing}')
del database[f'models/{model}/flux']
database.create_dataset(f'models/{model}/flux',
data=flux)
[docs]def correlation_to_covariance(cor_matrix,
spec_sigma):
"""
Parameters
----------
cor_matrix : np.ndarray
Correlation matrix of the spectrum.
spec_sigma : np.ndarray
Uncertainties (W m-2 um-1).
Returns
-------
np.ndarrays
Covariance matrix of the spectrum.
"""
cov_matrix = np.zeros(cor_matrix.shape)
for i in range(cor_matrix.shape[0]):
for j in range(cor_matrix.shape[1]):
cov_matrix[i, j] = cor_matrix[i, j]*spec_sigma[i]*spec_sigma[j]
if i == j:
assert cor_matrix[i, j] == 1.
return cov_matrix