Source code for taurex.util.util

"""General utility functions"""
import math
import re
import numpy as np
mass = {
  "H":	1.00794,
  "He":	4.002602,
  "Li":	6.941,
  "Be":	9.012182,
  "B":	10.811,
  "C":	12.011,
  "N":	14.00674,
  "O":	15.9994,
  "F":	18.9984032,
  "Ne":	20.1797,
  "Na":	22.989768,
  "Mg":	24.3050,
  "Al":	26.981539,
  "Si":	28.0855,
  "P":	30.973762,
  "S":	32.066,
  "Cl":	35.4527,
  "Ar":	39.948,
  "K":	39.0983,
  "Ca":	40.078,
  "Sc":	44.955910,
  "Ti":	47.88,
  "V":	50.9415,
  "Cr":	51.9961,
  "Mn":	54.93805,
  "Fe":	55.847,
  "Co":	58.93320,
  "Ni":	58.6934,
  "Cu":	63.546,
  "Zn":	65.39,
  "Ga":	69.723,
  "Ge":	72.61,
  "As":	74.92159,
  "Se":	78.96,
  "Br":	79.904,
  "Kr":	83.80,
  "Rb":	85.4678,
  "Sr":	87.62,
  "Y":	88.90585,
  "Zr":	91.224,
  "Nb":	92.90638,
  "Mo":	95.94,
  "Tc":	98,
  "Ru":	101.07,
  "Rh":	102.90550,
  "Pd":	106.42,
  "Ag":	107.8682,
  "Cd":	112.411,
  "In":	114.82,
  "Sn":	118.710,
  "Sb":	121.757,
  "Te":	127.60,
  "I":	126.90447,
  "Xe":	131.29,
  "Cs":	132.90543,
  "Ba":	137.327,
  "La":	138.9055,
  "Ce":	140.115,
  "Pr":	140.90765,
  "Nd":	144.24,
  "Pm":	145,
  "Sm":	150.36,
  "Eu":	151.965,
  "Gd":	157.25,
  "Tb":	158.92534,
  "Dy":	162.50,
  "Ho":	164.93032,
  "Er":	167.26,
  "Tm":	168.93421,
  "Yb":	173.04,
  "Lu":	174.967,
  "Hf":	178.49,
  "Ta":	180.9479,
  "W":	183.85,
  "Re":	186.207,
  "Os":	190.2,
  "Ir":	192.22,
  "Pt":	195.08,
  "Au":	196.96654,
  "Hg":	200.59,
  "Tl":	204.3833,
  "Pb":	207.2,
  "Bi":	208.98037,
  "Po":	209,
  "At":	210,
  "Rn":	222,
  "Fr":	223,
  "Ra":	226.0254,
  "Ac":	227,
  "Th":	232.0381,
  "Pa":	213.0359,
  "U":	238.0289,
  "Np":	237.0482,
  "Pu":	244,
  "Am":	243,
  "Cm":	247,
  "Bk":	247,
  "Cf":	251,
  "Es":	252,
  "Fm":	257,
  "Md":	258,
  "No":	259,
  "Lr":	260,
  "Rf":	261,
  "Db":	262,
  "Sg":	263,
  "Bh":	262,
  "Hs":	265,
  "Mt":	266,
  "e-": 5.4857990907e-4,
}


[docs]def calculate_weight(chem): s = split_molecule_elements(chem) compoundweight = 0.0 for element, count in s.items(): compoundweight += mass[element] * count return compoundweight
# def split_molecule_elements(chem): # s = re.findall('([A-Z][a-z]?)([0-9]*)', chem) # return s
[docs]def tokenize_molecule(molecule): import re return re.findall('[A-Z][a-z]?|\d+|.', molecule)
[docs]def merge_elements(elem1, elem2, factor=1): return {elem: elem1.get(elem, 0) + elem2.get(elem,0)*factor for elem in set(elem1)| set(elem2)}
[docs]def split_molecule_elements(molecule=None, tokens=None): from taurex.util.util import mass elems = {} if molecule: tokens = tokenize_molecule(molecule) length = 0 while length < len(tokens): token = tokens[length] if token in mass: if token not in elems: elems[token] = 0 try: peek = int(tokens[length+1]) length +=1 except IndexError: peek = 1 except ValueError: peek = 1 elems[token] += peek elif token in '{([': length += 1 sub_elems,moved = split_molecule_elements(tokens=tokens[length:]) length += moved try: peek = int(tokens[length+1]) length +=1 except IndexError: peek = 1 except ValueError: peek = 1 elems = merge_elements(elems, sub_elems,peek) elif token in '}])': return elems, length length+=1 return elems
[docs]def sanitize_molecule_string(molecule): """ Cleans a molecule string to match up with molecule naming in TauREx3. e.g: H2O -> H2O 1H2-16O -> H2O Parameters ---------- molecule: str Molecule to sanitize Returns ------- str: Sanitized name """ return ''.join([''.join(s) for s in re.findall('([A-Z][a-z]?)([0-9]*)', molecule)])
_mol_latex = { 'HE': 'He', 'H2': 'H$_2$', 'N2': 'N$_2$', 'O2': 'O$_2$', 'CO2': 'CO$_2$', 'CH4': 'CH$_4$', 'CO': 'CO', 'NH3': 'NH$_3$', 'H2O': 'H$_2$O', 'C2H2': 'C$_2$H$_2$', 'HCN': 'HCN', 'H2S': 'H$_2$S', 'SIO2': 'SiO$_2$', 'SO2': 'SO$_2$', } """Latex versions of molecule names"""
[docs]def get_molecular_weight(gasname): """ For a given molecule return the molecular weight in atomic units Parameters ---------- gasname : str Name of molecule Returns ------- float : molecular weight in amu or 0 if not found """ from taurex.constants import AMU mu = calculate_weight(gasname) return mu * AMU
[docs]def molecule_texlabel(gasname): """ For a given molecule return its latex form Parameters ---------- gasname : str Name of molecule Returns ------- str : Latex form of the molecule or just the passed name if not found """ gasname = gasname try: return _mol_latex[gasname] except KeyError: return gasname
[docs]def bindown(original_bin,original_data,new_bin,last_point=None): """ This method quickly bins down by taking the mean. The numpy histogram function is exploited to do this quickly Parameters ---------- original_bin: :obj:`numpy.array` The original bins for the that we want to bin down original_data: :obj:`numpy.array` The associated data that will be averaged along the new bins new_bin: :obj:`numpy.array` The new binnings we want to use (must have less points than the original) Returns ------- :obj:`array` Binned mean of ``original_data`` """ import numpy as np #print(original_bin.shape,original_data.shape) #if last_point is None: # last_point = new_bin[-1]*2 #calc_bin = np.append(new_bin,last_point) #return(np.histogram(original_bin, calc_bin, weights=original_data)[0] / # np.histogram(original_bin,calc_bin)[0]) filter_lhs = np.zeros(new_bin.shape[0]+1) filter_lhs[0] = new_bin[0] filter_lhs[0] -= (new_bin[1] - new_bin[0])/2 filter_lhs[-1] = new_bin[-1] filter_lhs[-1] += (new_bin[-1] - new_bin[-2])/2 filter_lhs[1:-1] = (new_bin[1:] + new_bin[:-1])/2 axis = len(original_data.shape)-1 if axis: digitized = np.digitize(original_bin,filter_lhs,right=True) axis = len(original_data.shape)-1 bin_means = [original_data[...,digitized == i].mean(axis=axis) for i in range(1, len(filter_lhs))] return np.column_stack(bin_means) return np.histogram(original_bin, filter_lhs, weights=original_data)[0]/np.histogram(original_bin,filter_lhs)[0]
[docs]def movingaverage(a, n=3) : """ Computes moving average Parameters ---------- a : :obj:`array` Array to compute average n : int Averaging window Returns ------- :obj:`array` Resultant array """ import numpy as np ret = np.cumsum(a) ret[n:] = ret[n:] - ret[:-n] return ret[n - 1:] / n
[docs]def quantile_corner(x, q, weights=None): """ * Taken from corner.py __author__ = "Dan Foreman-Mackey (danfm@nyu.edu)" __copyright__ = "Copyright 2013-2015 Daniel Foreman-Mackey" Like numpy.percentile, but: * Values of q are quantiles [0., 1.] rather than percentiles [0., 100.] * scalar q not supported (q must be iterable) * optional weights on x Parameters ---------- x : :obj:`array` Input array or object that can be converted to an array. q : :obj:`array` or float Percentile or sequence of percentiles to compute, which must be between 0 and 1 inclusive. weights : :obj:`array` or float , optional Weights on x Returns ------- percentile : scalar or ndarray """ import numpy as np if weights is None: return np.percentile(x, [100. * qi for qi in q]) else: idx = np.argsort(x) xsorted = x[idx] cdf = np.add.accumulate(weights[idx]) cdf /= cdf[-1] return np.interp(q, cdf, xsorted).tolist()
[docs]def loadtxt2d(intext): """ Wraps loadtext and either returns a 2d array or 1d array """ try: return np.loadtxt(intext, ndmin=2) except: return np.loadtxt(intext)
[docs]def read_error_line(l): """ Reads line? """ print('_read_error_line -> line>', l) name, values = l.split(' ', 1) print('_read_error_line -> name>', name) print('_read_error_line -> values>', values) name = name.strip(': ').strip() values = values.strip(': ').strip() v, error = values.split(" +/- ") return name, float(v), float(error)
[docs]def read_error_into_dict(l, d): """ Reads the error into dict? """ name, v, error = read_error_line(l) d[name.lower()] = v d['%s error' % name.lower()] = error
[docs]def read_table(txt, d=None, title=None): """ Yeah whatever i give up """ from io import StringIO import numpy as np if title is None: title, table = txt.split("\n", 1) else: table = txt header, table = table.split("\n", 1) data = loadtxt2d(StringIO(table)) if d is not None: d[title.strip().lower()] = data if len(data.shape) == 1: data = np.reshape(data, (1, -1)) return data
[docs]def decode_string_array(f): """Helper to decode strings from hdf5""" sl = list(f) return [s[0].decode('utf-8') for s in sl]
[docs]def recursively_save_dict_contents_to_output(output, dic): """ Will recursive write a dictionary into output. Parameters ---------- output : :class:`~taurex.output.output.Output` or :class:`~taurex.output.output.OutputGroup` Group (or root) in output file to write to dic : :obj:`dict` Dictionary we want to write """ import numpy as np for key, item in dic.items(): try: store_thing(output, key, item) except TypeError: raise ValueError('Cannot save %s type'%type(item))
[docs]def store_thing(output, key, item): if isinstance(item, (float,int,np.int64,np.float64,)): output.write_scalar(key,item) elif isinstance(item,(np.ndarray,)): output.write_array(key,item) elif isinstance(item,(str,)): output.write_string(key,item) elif isinstance(item,(list,tuple,)): if isinstance(item,tuple): item = list(item) if True in [isinstance(x,str) for x in item]: output.write_string_array(key,item) else: try: output.write_array(key,np.array(item)) except TypeError: for idx,val in enumerate(item): new_key = '{}{}'.format(key,idx) store_thing(output,new_key,val) elif isinstance(item, dict): group = output.create_group(key) recursively_save_dict_contents_to_output(group, item) else: raise TypeError
[docs]def weighted_avg_and_std(values, weights, axis=None): """ Computes weight average and standard deviation Parameters ---------- values : :obj:`array` Input array weights : :obj:`array` Must be same shape as ``values`` axis : int , optional axis to perform weighting """ import numpy as np average = np.average(values, weights=weights,axis=axis) variance = np.average((values-average)**2, weights=weights, axis=axis) # Fast and numerically precise return (average, np.sqrt(variance))
[docs]def random_int_iter(total, fraction): import random n_points = int(total*fraction) samples = random.sample(range(total), n_points) for x in samples: yield x
[docs]def compute_bin_edges(wngrid): import numpy as np diff = np.diff(wngrid)/2 edges = np.concatenate([[wngrid[0]-(wngrid[1]-wngrid[0])/2], wngrid[:-1]+diff, [(wngrid[-1]-wngrid[-2])/2 + wngrid[-1]]]) return edges, np.abs(np.diff(edges))
[docs]def clip_native_to_wngrid(native_grid, wngrid): min_wngrid = wngrid.min() max_wngrid = wngrid.max() #Compute the maximum width wnwidths = compute_bin_edges(wngrid)[-1] wn_min = min_wngrid - wnwidths.max() wn_max = max_wngrid + wnwidths.max() native_filter = (native_grid >= wn_min) & (native_grid <= wn_max) return native_grid[native_filter]
[docs]def wnwidth_to_wlwidth(wngrid, wnwidth): return 10000*wnwidth/(wngrid**2)
[docs]def class_from_keyword(keyword, class_filter=None): from ..parameter.classfactory import ClassFactory cf = ClassFactory() combined_classes = [] if class_filter is None: combined_classes = list(cf.temperatureKlasses) + \ list(cf.pressureKlasses) + \ list(cf.chemistryKlasses) + \ list(cf.gasKlasses) + \ list(cf.planetKlasses) + \ list(cf.starKlasses) + \ list(cf.modelKlasses) + \ list(cf.contributionKlasses) else: if hasattr(class_filter, '__len__'): for x in class_filter: combined_classes += list(cf.list_from_base(x)) else: combined_classes = list(cf.list_from_base(class_filter)) for x in combined_classes: try: if keyword in x.input_keywords(): return x except NotImplementedError: continue return None
[docs]def class_for_name(class_name): from ..parameter.classfactory import ClassFactory cf = ClassFactory() combined_classes = list(cf.temperatureKlasses) + \ list(cf.pressureKlasses) + \ list(cf.chemistryKlasses) + \ list(cf.gasKlasses) + \ list(cf.planetKlasses) + \ list(cf.starKlasses) + \ list(cf.modelKlasses) + \ list(cf.contributionKlasses) try: class_name = class_name.decode() except (UnicodeDecodeError, AttributeError): pass combined_classes_name = [c.__name__ for c in combined_classes] if class_name in combined_classes_name: return combined_classes[combined_classes_name.index(class_name)] else: raise Exception(f'Class of name {class_name} does not exist')
[docs]def create_grid_res(resolution, wave_min, wave_max): # # R = l/Dl # l = (l-1)+Dl/2 + (Dl-1)/2 # # --> (R - 1/2)*Dl = (l-1) + (Dl-1)/2 # # wave_list = [] width_list = [] wave = wave_min width = wave/resolution while wave < wave_max: width = wave / (resolution - 0.5) + width/2/(resolution - 0.5) wave = resolution * width width_list.append(width) wave_list.append(wave) return np.array((wave_list ,width_list)).T
[docs]def conversion_factor(from_unit, to_unit): import astropy.units as u try: from_conv = u.Unit(from_unit) except: from_conv = u.Unit(from_unit, format="cds") try: to_conv = u.Unit(to_unit) except: to_conv = u.Unit(to_unit, format="cds") return from_conv.to(to_unit)
[docs]def compute_dz(altitude): dz = np.zeros_like(altitude) dz[:-1] = np.diff(altitude) dz[-1] = altitude[-1] - altitude[-2] return dz
[docs]def has_duplicates(arr): return len(arr) != len(set(arr))
[docs]def find_closest_pair(arr, value) -> (int, int): """ Will find the indices that lie to the left and right of the value arr[left] <= value <= arr[right] If the value is less than the array minimum then it will always return left=0 and right=1 If the value is above the maximum Parameters ---------- arr: :obj:`array` Array to search, must be sorted value: float Value to find in array Returns ------- left: int right: int """ right = arr.searchsorted(value) right = max(min(arr.shape[0]-1, right),1) left = right-1 left = max(0, left) return left, right
[docs]def ensure_string_utf8(val): output = val try: output = val.decode() except (UnicodeDecodeError, AttributeError,): pass return output