Source code for taurex.optimizer.optimizer

from re import M
from taurex.log import Logger, disableLogging, enableLogging
import numpy as np
import math
from taurex import OutputSize
from taurex.core.priors import PriorMode
from taurex.data.citation import Citable


[docs]def compile_params(fitparams, driveparams, fit_priors=None): from taurex.core.priors import Uniform, LogUniform fitting_parameters = [] fitting_priors = [] derived_parameters = [] _fit_priors = fit_priors or {} for params in fitparams.values(): name, latex, fget, fset, mode, to_fit, bounds = params #self.debug('Checking fitting parameter {}'.format(params)) if to_fit: fitting_parameters.append(params) if name not in _fit_priors: prior = None if mode == 'log': prior = LogUniform(lin_bounds=bounds) else: prior = Uniform(bounds=bounds) fitting_priors.append(prior) _fit_priors[name] = prior else: fitting_priors.append(_fit_priors[name]) for params in driveparams.values(): name, latex, fget, compute = params if compute: derived_parameters.append(params) return fitting_parameters, fitting_priors, _fit_priors, derived_parameters
[docs]class Optimizer(Logger, Citable): """ A base class that handles fitting and optimization of forward models. The class handles the compiling and management of fitting parameters in forward models, in its current form it cannot fit and requires a class derived from it to implement the :func:`compute_fit` function. Parameters ---------- name: str Name to be used in logging observed : :class:`~taurex.data.spectrum.spectrum.BaseSpectrum`, optional See :func:`set_observed` model : :class:`~taurex.model.model.ForwardModel`, optional See :func:`set_model` sigma_fraction: float, optional Fraction of weights to use in computing the error. (Default: 0.1) """ def __init__(self, name, observed=None, model=None, sigma_fraction=0.1): super().__init__(name) self.set_model(model) self.set_observed(observed) self._model_callback = None self._sigma_fraction = sigma_fraction self._fit_priors = {} self.fitting_parameters = [] self.fitting_priors = []
[docs] def set_model(self, model): from taurex.model import ForwardModel """ Sets the model to be optimized/fit Parameters ---------- model : :class:`~taurex.model.model.ForwardModel` The forward model we wish to optimize """ self._model: ForwardModel = model
[docs] def set_observed(self, observed): from taurex.spectrum import BaseSpectrum """ Sets the observation to optimize the model to Parameters ---------- observed : :class:`~taurex.data.spectrum.spectrum.BaseSpectrum` Observed spectrum we will optimize to """ self._observed: BaseSpectrum = observed if observed is not None: self._binner = observed.create_binner()
[docs] def compile_params(self): from taurex.core.priors import Uniform, LogUniform """ Goes through and compiles all parameters within the model that we will be retrieving. Called before :func:`compute_fit` """ self.info('Initializing parameters') self.fitting_parameters = [] self.derived_parameters = [] self.fitting_priors = [] # param_name,param_latex, # fget.__get__(self),fset.__get__(self), # default_fit,default_bounds # for params in self._model.fittingParameters.values(): self.fitting_parameters, \ self.fitting_priors, \ _model_priors, \ self.derived_parameters = \ compile_params(self._model.fittingParameters, self._model.derivedParameters, self._fit_priors) self._fit_priors.update(_model_priors) obs_fit, obs_prior, _obs_priors, obs_deriv = \ compile_params( self._observed.fittingParameters, self._observed.derivedParameters, self._fit_priors ) self.fitting_parameters.extend(obs_fit) self.fitting_priors.extend(obs_prior) self._fit_priors.update(_obs_priors) self.derived_parameters.extend(obs_deriv) # name, latex, fget, fset, mode, to_fit, bounds = params # self.debug('Checking fitting parameter {}'.format(params)) # if to_fit: # self.fitting_parameters.append(params) # if name not in self._fit_priors: # prior = None # if mode == 'log': # prior = LogUniform(lin_bounds=bounds) # else: # prior = Uniform(bounds=bounds) # self.fitting_priors.append(prior) # self._fit_priors[name] = prior # else: # self.fitting_priors.append(self._fit_priors[name]) # for params in self._model.derivedParameters.values(): # name, latex, fget, compute = params # if compute: # self.derived_parameters.append(params) self.info('-------FITTING---------------') self.info('Parameters to be fit:') for params, priors in zip(self.fitting_parameters, self.fitting_priors): name, latex, fget, fset, mode, to_fit, bounds = params self.info('{}: Value: {} Type:{} Params:{}'.format( name, fget(), priors.__class__.__name__, priors.params()))
[docs] def update_model(self, fit_params): """ Updates the model with new parameters Parameters ---------- fit_params : :obj:`list` A list of new values to apply to the model. The list of values are assumed to be in the same order as the parameters given by :func:`fit_names` """ if len(fit_params) != len(self.fitting_parameters): self.error('Trying to update model with more fitting parameters' ' than enabled') self.error(f'No. enabled parameters:{len(self.fitting_parameters)}' f' Update length: {len(fit_params)}') raise ValueError('Trying to update model with more fitting' ' parameters than enabled') for value, param, priors in zip(fit_params, self.fitting_parameters, self.fitting_priors): name, latex, fget, fset, mode, to_fit, bounds = param # if mode == 'log': # value = 10**value fset(priors.prior(value))
@property def fit_values_nomode(self): """ Returns a list of the current values of a fitting parameter. Regardless of the ``mode`` setting Returns ------- :obj:`list`: List of each value of a fitting parameter """ return [c[2]() for c in self.fitting_parameters] @property def fit_values(self): """ Returns a list of the current values of a fitting parameter. This respects the ``mode`` setting Returns ------- :obj:`list`: List of each value of a fitting parameter """ return [c[2]() if c[4] == 'linear' else math.log10(c[2]()) for c in self.fitting_parameters] @property def fit_boundaries(self): """ Returns the fitting boundaries of the parameter Returns ------- :obj:`list`: List of boundaries for each fitting parameter. It takes the form of a python :obj:`tuple` with the form ( ``bound_min`` , ``bound_max`` ) """ return [c[-1] if c[4] == 'linear' else (math.log10(c[-1][0]), math.log10(c[-1][1])) for c in self.fitting_parameters] @property def fit_names(self): """ Returns the names of the model parameters we will be fitting Returns ------- :obj:`list`: List of names of parameters that will be fit """ return [c[0] if self._fit_priors[c[0]].priorMode is PriorMode.LINEAR else 'log_{}'.format(c[0]) for c in self.fitting_parameters] @property def fit_latex(self): """ Returns the names of the parameters in LaTeX format Returns ------- :obj:`list` List of parameter names in LaTeX format """ return [c[1] if self._fit_priors[c[0]].priorMode is PriorMode.LINEAR else 'log({})'.format(c[1]) for c in self.fitting_parameters] @property def derived_names(self): """ Returns a list of the current values of a fitting parameter. This respects the ``mode`` setting Returns ------- :obj:`list`: List of each value of a fitting parameter """ return [c[0] for c in self.derived_parameters] @property def derived_latex(self): """ Returns a list of the current values of a fitting parameter. This respects the ``mode`` setting Returns ------- :obj:`list`: List of each value of a fitting parameter """ return [c[1] for c in self.derived_parameters] @property def derived_values(self): """ Returns a list of the current values of a fitting parameter. This respects the ``mode`` setting Returns ------- :obj:`list`: List of each value of a fitting parameter """ return [c[2]() for c in self.derived_parameters]
[docs] def enable_fit(self, parameter): """ Enables fitting of the parameter Parameters ---------- parameter : str Name of the parameter we want to fit """ obj = self._model if parameter in self._model.fittingParameters \ else self._observed name, latex, fget, fset, mode, to_fit, bounds = \ obj.fittingParameters[parameter] to_fit = True obj.fittingParameters[parameter] = ( name, latex, fget, fset, mode, to_fit, bounds)
[docs] def enable_derived(self, parameter): obj = self._model if parameter in self._model.derivedParameters \ else self._observed name, latex, fget, compute = \ obj.derivedParameters[parameter] compute = True obj.derivedParameters[parameter] = ( name, latex, fget, compute )
[docs] def disable_fit(self, parameter): """ Disables fitting of the parameter Parameters ---------- parameter : str Name of the parameter we do not want to fit """ obj = self._model if parameter in self._model.fittingParameters \ else self._observed name, latex, fget, fset, mode, to_fit, bounds = \ obj.fittingParameters[parameter] to_fit = False obj.fittingParameters[parameter] = ( name, latex, fget, fset, mode, to_fit, bounds)
[docs] def disable_derived(self, parameter): obj = self._model if parameter in self._model.fittingParameters \ else self._observed name, latex, fget, compute = \ obj.derivedParameters[parameter] compute = False obj.derivedParameters[parameter] = ( name, latex, fget, compute )
[docs] def set_boundary(self, parameter, new_boundaries): """ Sets the boundary of the parameter Parameters ---------- parameter : str Name of the parameter we want to change new_boundaries : tuple of float New fitting boundaries, with the form ( ``bound_min`` , ``bound_max`` ). These should not take into account the ``mode`` setting of a fitting parameter. """ obj = self._model if parameter in self._model.fittingParameters \ else self._observed name, latex, fget, fset, mode, to_fit, bounds = \ obj.fittingParameters[parameter] bounds = new_boundaries obj.fittingParameters[parameter] = ( name, latex, fget, fset, mode, to_fit, bounds)
[docs] def set_factor_boundary(self, parameter, factors): """ Sets the boundary of the parameter based on a factor Parameters ---------- parameter : str Name of the parameter we want to change factor : tuple of float To be written """ obj = self._model if parameter in self._model.fittingParameters \ else self._observed name, latex, fget, fset, mode, to_fit, bounds = \ obj.fittingParameters[parameter] value = fget() new_boundaries = factors[0]*value, factors[1]*value bounds = new_boundaries obj.fittingParameters[parameter] = ( name, latex, fget, fset, mode, to_fit, bounds)
[docs] def set_mode(self, parameter, new_mode): """ Sets the fitting mode of a parameter Parameters ---------- parameter : str Name of the parameter we want to change new_mode : ``linear`` or ``log`` Sets whether the parameter is fit in linear or log space """ obj = self._model if parameter in self._model.fittingParameters \ else self._observed new_mode = new_mode.lower() name, latex, fget, fset, mode, to_fit, bounds = \ obj.fittingParameters[parameter] if new_mode not in ('log', 'linear',): self.error('Incorrect mode set for fit parameter,') raise ValueError obj.fittingParameters[parameter] = ( name, latex, fget, fset, new_mode, to_fit, bounds)
[docs] def set_prior(self, parameter, prior): obj = self._model if parameter in self._model.fittingParameters \ else self._observed if parameter not in obj.fittingParameters: self.error('Fitting parameter %s does not exist', parameter) raise ValueError('Fitting parameter does not exist') self._fit_priors[parameter] = prior
[docs] def chisq_trans(self, fit_params, data, datastd): """ Computes the Chi-Squared between the forward model and observation. The steps taken are: 1. Forward model (FM) is updated with :func:`update_model` 2. FM is then computed at its native grid then binned. 3. Chi-squared between FM and observation is computed Parameters ---------- fit_params : list of parameter values List of new parameter values to update the model data : obj:`ndarray` Observed spectrum datastd : obj:`ndarray` Observed spectrum error Returns ------- float : chi-squared """ from taurex.exceptions import InvalidModelException self.update_model(fit_params) mydata = self._observed.spectrum #myerror = self._observed.errorBar obs_bins = self._observed.wavenumberGrid try: _, final_model, _, _ = self._binner.bin_model( self._model.model(wngrid=obs_bins)) except InvalidModelException: return np.nan res = (mydata.ravel() - final_model.ravel()) / datastd.ravel() res = np.nansum(res*res) if res == 0: res = np.nan return res
[docs] def compute_fit(self): """ Unimplemented. When inheriting this should be overwritten to perform the actual fit. Raises ------ NotImplementedError Raised when a derived class does override this function """ raise NotImplementedError
[docs] def fit(self, output_size=OutputSize.heavy): import time """ Performs fit. """ from tabulate import tabulate self.compile_params() fit_names = self.fit_names prior_type = [p.__class__.__name__ for p in self.fitting_priors] args = [p.params() for p in self.fitting_priors] fit_values = self.fit_values self.info('') self.info('-------------------------------------') self.info('------Retrieval Parameters-----------') self.info('-------------------------------------') self.info('') self.info('Dimensionality of fit: %s', len(fit_names)) self.info('') output = tabulate(zip(fit_names, fit_values, prior_type, args), headers=['Param', 'Value', 'Type', 'Args']) self.info('\n%s\n\n', output) self.info('') start_time = time.time() disableLogging() # Compute fit here self.compute_fit() enableLogging() end_time = time.time() self.info('Sampling time %s s', end_time-start_time) solution = self.generate_solution(output_size=output_size) self.info('') self.info('-------------------------------------') self.info('------Final results------------------') self.info('-------------------------------------') self.info('') self.info('Dimensionality of fit: %s', len(fit_names)) self.info('') for idx, optimized_map, optimized_median, values in self.get_solution(): self.info('\n%s', '---Solution {}------'.format(idx)) output = tabulate(zip(fit_names, optimized_map, optimized_median), headers=[ 'Param', 'MAP', 'Median']) self.info('\n%s\n\n', output) return solution
[docs] def write_optimizer(self, output): """ Writes optimizer settings under the 'Optimizer' heading in an output file Parameters ---------- output: :class:`~taurex.output.output.Output` or :class:`~taurex.output.output.OutputGroup` Group (or root) in output file to write to Returns ------- :class:`~taurex.output.output.Output` or :class:`~taurex.output.output.OutputGroup` Group (or root) in output file written to """ output.write_string('optimizer', self.__class__.__name__) output.write_string_array('fit_parameter_names', self.fit_names) output.write_string_array('fit_parameter_latex', self.fit_latex) output.write_array('fit_boundary_low', np.array( [x.boundaries()[0] for x in self.fitting_priors])) output.write_array('fit_boundary_high', np.array( [x.boundaries()[1] for x in self.fitting_priors])) if len(self.derived_names) > 0: output.write_string_array( 'derived_parameter_names', self.derived_names) output.write_string_array( 'derived_parameter_latex', self.derived_latex) return output
[docs] def write_fit(self, output): """ Writes basic fitting parameters into output Parameters ---------- output : :class:`~taurex.output.output.Output` or :class:`~taurex.output.output.OutputGroup` Group (or root) in output file to write to Returns ------- :class:`~taurex.output.output.Output` or :class:`~taurex.output.output.OutputGroup` Group (or root) in output file written to """ fit = output.create_group('FitParams') fit.write_string('fit_format', self.__class__.__name__) fit.write_string_array('fit_parameter_names', self.fit_names) fit.write_string_array('fit_parameter_latex', self.fit_latex) fit.write_array('fit_boundary_low', np.array( [x[0] for x in self.fit_boundaries])) fit.write_array('fit_boundary_high', np.array( [x[1] for x in self.fit_boundaries])) # This is the last sampled value ... should not be recorded to avoid confusion. # fit.write_list('fit_parameter_values',self.fit_values) # fit.write_list('fit_parameter_values_nomode',self.fit_values_nomode) return output
[docs] def generate_profiles(self, solution, binning): from taurex import mpi """Generates sigma plots for profiles""" sample_list = None if mpi.get_rank() == 0: sample_list = list(self.sample_parameters(solution)) sample_list = mpi.broadcast(sample_list) self.debug('We all got %s', sample_list) self.info('------------Variance generation step------------------') self.info('We are sampling %s points for the profiles', len(sample_list)) rank = mpi.get_rank() size = mpi.nprocs() enableLogging() self.info('I will only iterate through partitioned %s ' 'points (the rest is in parallel)', len(sample_list)//size) disableLogging() def sample_iter(): count = 0 for parameters, weight in sample_list[rank::size]: self.update_model(parameters) enableLogging() if rank == 0 and count % 10 == 0 and count > 0: self.info('Progress {}%'.format( count*100.0 / (len(sample_list)/size))) disableLogging() yield weight count += 1 return self._model.compute_error(sample_iter, wngrid=binning, binner=self._binner)
[docs] def generate_solution(self, output_size=OutputSize.heavy): """ Generates a dictionary with all solutions and other useful parameters """ from taurex.util.output import generate_profile_dict, \ store_contributions solution_dict = {} self.info('Post-processing - Generating spectra and profiles') # Loop through each solution, grab optimized parameters and anything # else we want to store for solution, optimized_map, \ optimized_median, values in self.get_solution(): enableLogging() self.info('Computing solution %s', solution) sol_values = {} # Include extra stuff we might want to store (provided by the child) for k, v in values: sol_values[k] = v # Update the model with optimized map values self.update_model(optimized_map) opt_result = self._model.model(cutoff_grid=False) # Run the model sol_values['Spectra'] = self._binner.generate_spectrum_output( opt_result, output_size=output_size) try: sol_values['Spectra']['Contributions'] = store_contributions( self._binner, self._model, output_size=output_size-3) except Exception as e: self.warning( 'Not bothering to store contributions since its broken') self.warning('%s ', str(e)) # Update with the optimized median self.update_model(optimized_median) self._model.model(cutoff_grid=False) # Store profiles here sol_values['Profiles'] = self._model.generate_profiles() profile_dict, spectrum_dict = self.generate_profiles( solution, self._observed.wavenumberGrid) for k, v in profile_dict.items(): if k in sol_values['Profiles']: sol_values['Profiles'][k].update(v) else: sol_values['Profiles'][k] = v for k, v in spectrum_dict.items(): if k in sol_values['Spectra']: sol_values['Spectra'][k].update(v) else: sol_values['Spectra'][k] = v solution_dict['solution{}'.format(solution)] = sol_values if len(self.derived_names) > 0: #solution_dict[f'solution{solution}']['derived_params'] = {} # Compute derived for solution, optimized_map, \ optimized_median, values in self.get_solution(): solution_dict[f'solution{solution}']['derived_params'] = {} derived_dict = self.compute_derived_trace(solution) if derived_dict is None: continue solution_dict[f'solution{solution}']['derived_params'].update( derived_dict) enableLogging() self.info('Post-processing - Complete') return solution_dict
[docs] def compute_derived_trace(self, solution): from taurex.util.util import quantile_corner from taurex.constants import AMU from taurex import mpi enableLogging() samples = self.get_samples(solution) weights = self.get_weights(solution) len_samples = len(samples) rank = mpi.get_rank() num_procs = mpi.nprocs() count = 0 derived_param = {p: ([], []) for p in self.derived_names} weight_comb = [] if len(self.derived_names) == 0: return self.info('Computing derived parameters......') disableLogging() for idx in range(rank, len_samples, num_procs): enableLogging() if rank == 0 and count % 10 == 0 and count > 0: self.info('Progress {}%'.format( idx*100.0 / len_samples)) disableLogging() parameters = samples[idx] weight = weights[idx] self.update_model(parameters) self._model.initialize_profiles() for p, v in zip(self.derived_names, self.derived_values): derived_param[p][0].append(v) derived_param[p][1].append(weight) result_dict = {} sorted_weights = weights.argsort() for param, (trace, w) in derived_param.items(): # I cant remember why this works all_trace = np.array(mpi.allreduce(trace, op='SUM')) # I cant remember why this works all_weight = np.array(mpi.allreduce(w, op='SUM')) all_weight_sort = all_weight.argsort() # Sort them into the right order all_weight[sorted_weights] = all_weight[all_weight_sort] all_trace[sorted_weights] = all_trace[all_weight_sort] q_16, q_50, q_84 = \ quantile_corner(np.array(all_trace), [0.16, 0.5, 0.84], weights=np.array(all_weight)) mean = np.average(all_trace, weights=all_weight, axis=0) derived = { 'value': q_50, 'sigma_m': q_50-q_16, 'sigma_p': q_84-q_50, 'trace': all_trace, 'mean': mean } result_dict[f'{param}_derived'] = derived return result_dict
[docs] def sample_parameters(self, solution): """ Read traces and weights and return a random ``sigma_fraction`` sample of them Parameters ---------- solution: a solution output from sampler Yields ------ traces: :obj:`array` Traces of a particular sample weight: float Weight of sample """ from taurex.util.util import random_int_iter samples = self.get_samples(solution) weights = self.get_weights(solution) iterator = random_int_iter(samples.shape[0], self._sigma_fraction) for x in iterator: w = weights[x]+1e-300 yield samples[x, :], w
[docs] def get_solution(self): """ ** Requires implementation ** Generator for solutions and their median and MAP values Yields ------ solution_no: int Solution number map: :obj:`array` Map values median: :obj:`array` Median values extra: :obj:`list` List of tuples of extra information to store. Must be of form ``(name, data)`` """ raise NotImplementedError
[docs] def get_samples(self, solution_id): raise NotImplementedError
[docs] def get_weights(self, solution_id): raise NotImplementedError
[docs] def write(self, output): """ Creates 'Optimizer' them respectively Parameters ---------- output : :class:`~taurex.output.output.Output` or :class:`~taurex.output.output.OutputGroup` Group (or root) in output file to write to """ opt = output.create_group('Optimizer') self.write_optimizer(opt)
[docs] @classmethod def input_keywords(self): raise NotImplementedError