from .binner import Binner
from taurex.util.util import compute_bin_edges
import numpy as np
from taurex import OutputSize
[docs]class FluxBinner(Binner):
"""
Bins to a wavenumber grid given by ``wngrid`` using a
more accurate method that takes into account the amount
of contribution from each native bin. This method also
handles cases where bins are not continuous and/or
overlapping.
Parameters
----------
wngrid: :obj:`array`
Wavenumber grid
wngrid_width: :obj:`array`, optional
Must have same shape as ``wngrid``
Full bin widths for each wavenumber grid point
given in ``wngrid``. If not provided then
this is automatically computed from ``wngrid``.
"""
def __init__(self, wngrid, wngrid_width=None):
super().__init__()
sort_grid = wngrid.argsort()
self._wngrid = wngrid[sort_grid]
self._wngrid_width = wngrid_width
if self._wngrid_width is None:
self._wngrid_width = compute_bin_edges(self._wngrid)[-1]
elif hasattr(self._wngrid_width, '__len__'):
if len(self._wngrid_width) != len(self._wngrid):
raise ValueError('Wavenumber width should be signel value or '
'same shape as wavenumber grid')
self._wngrid_width = wngrid_width[sort_grid]
if not hasattr(self._wngrid_width, '__len__'):
self._wngrid_width = np.ones_like(self._wngrid)*self._wngrid_width
[docs] def bindown(self, wngrid, spectrum, grid_width=None, error=None):
"""
Bins down spectrum.
Parameters
----------
wngrid : :obj:`array`
The wavenumber grid of the spectrum to be binned down.
spectrum: :obj:`array`
The spectra we wish to bin-down. Must be same shape as
``wngrid``.
grid_width: :obj:`array`, optional
Wavenumber grid full-widths for the spectrum to be binned down.
Must be same shape as ``wngrid``.
Optional.
error: :obj:`array`, optional
Associated errors or noise of the spectrum. Must be same shape
as ``wngrid``.Optional parameter.
Returns
-------
binned_wngrid : :obj:`array`
New wavenumber grid
spectrum: :obj:`array`
Binned spectrum.
grid_width: :obj:`array`
New grid-widths
error: :obj:`array` or None
Binned error if given else ``None``
"""
sorted_input = wngrid.argsort()
wngrid = wngrid[sorted_input]
spectrum = spectrum[..., sorted_input]
if error is not None:
error = error[..., sorted_input]
bin_spectrum = np.zeros(spectrum[..., 0].shape + self._wngrid.shape)
if error is not None:
bin_error = np.zeros(spectrum[..., 0].shape + self._wngrid.shape)
else:
bin_error = None
old_spect_wn = wngrid
old_spect_flux = spectrum
old_spect_err = error
old_spect_width = grid_width
if old_spect_width is None:
old_spect_width = compute_bin_edges(old_spect_wn)[-1]
old_spect_min = old_spect_wn - old_spect_width/2
old_spect_max = old_spect_wn + old_spect_width/2
new_spec_lhs = self._wngrid
new_spec_rhs = self._wngrid_width
new_spec_wn = self._wngrid
new_spec_wn_min = new_spec_lhs - new_spec_rhs/2
new_spec_wn_max = new_spec_lhs + new_spec_rhs/2
save_start = 0
save_stop = 0
for idx, res in enumerate(zip(new_spec_wn, new_spec_wn_min,
new_spec_wn_max)):
wn, wn_min, wn_max = res
sum_spectrum = 0
sum_noise = 0
sum_weight = 0
save_start = np.searchsorted(old_spect_max, wn_min, side='right')
save_stop = np.searchsorted(old_spect_min[1:], wn_max,
side='right')
save_stop = min(save_stop, old_spect_min.shape[0]-1)
save_start = min(save_start, old_spect_min.shape[0]-1)
if not wn_min <= old_spect_max[save_start] or not \
old_spect_min[save_stop] <= wn_max:
continue
spect_min = old_spect_min[save_start:save_stop+1]
spect_max = old_spect_max[save_start:save_stop+1]
weight = (np.minimum(wn_max, spect_max) -
np.maximum(spect_min, wn_min))/(wn_max-wn_min)
sum_weight = np.sum(weight)
sum_spectrum = np.sum(weight/sum_weight *
old_spect_flux[..., save_start:save_stop+1],
axis=-1)
if error is not None:
sum_noise = np.sum(weight * weight *
old_spect_err[..., save_start:save_stop+1]**2,
axis=0)
sum_noise = np.sqrt(sum_noise / sum_weight/sum_weight)
bin_spectrum[..., idx] = sum_spectrum
if error is not None:
bin_error[idx] = sum_noise
return self._wngrid, bin_spectrum, bin_error, self._wngrid_width
[docs] def generate_spectrum_output(self, model_output,
output_size=OutputSize.heavy):
output = super().generate_spectrum_output(model_output,
output_size=output_size)
output['binned_wngrid'] = self._wngrid
output['binned_wlgrid'] = 10000/self._wngrid
output['binned_wnwidth'] = self._wngrid_width
output['binned_wlwidth'] = 1.0/self._wngrid_width
return output