Source code for statistics.simplifiedLikelihoods

#!/usr/bin/env python3

"""
.. module:: simplifiedLikelihoods
   :synopsis: Code that implements the simplified likelihoods as presented
              in CMS-NOTE-2017-001, see https://cds.cern.ch/record/2242860.
              Simplified likelihoods v2 (arXiv:1809.05548) are partly implemented.

.. moduleauthor:: Wolfgang Waltenberger <wolfgang.waltenberger@gmail.com>
"""

from scipy import stats, optimize, integrate, special, linalg
from numpy import sqrt, exp, log, sign, array, ndarray
from functools import reduce
from smodels.base.physicsUnits import UnitXSec
from smodels.statistics.basicStats import CLsfromNLL, determineBrentBracket
from smodels.statistics.exceptions import SModelSStatisticsError as SModelSError
from smodels.statistics.basicStats import observed, apriori, aposteriori, \
         NllEvalType, exponentiateNLL
from typing import Text, Optional, Union, Tuple
from smodels.statistics.basicStats import findRoot
from smodels.tools.caching import roundCache, lru_cache

import numpy as np
import math
import copy
import warnings

try:
    from smodels.base.smodelsLogging import logger
except ModuleNotFoundError:

    def getLogger():
        """
        Configure the logging facility. Maybe adapted to fit into
        your framework.
        """

        import logging

        logger = logging.getLogger("SL")
        formatter = logging.Formatter("%(module)s - %(levelname)s: %(message)s")
        ch = logging.StreamHandler()
        ch.setFormatter(formatter)
        # ch.setLevel(logging.DEBUG)
        logger.addHandler(ch)
        return logger

    logger = getLogger()


[docs]class Data: """A very simple observed container to collect all the data needed to fully define a specific statistical model""" def __init__( self, observed, backgrounds, covariance, third_moment=None, nsignal=None, name : str = "model", deltas_rel : float = 0.2, lumi=None, asimov : Union[None,float] = None ): """ :param observed: number of observed events per dataset :param backgrounds: evaluationType bg per dataset :param covariance: uncertainty in background, as a covariance matrix :param nsignal: number of signal events in each dataset :param name: give the model a name, just for convenience :param deltas_rel: the assumed relative error on the signal hypotheses. The default is 20%. :param lumi: luminosity of dataset in 1/fb, or None :param asimov: None if not asimov data, else signal strength of asimov data """ self.observed = self.convert(observed) # Make sure observed number of events are integers ## self.observed = np.around(self.convert(observed)) #Make sure observed number of events are integers self.backgrounds = self.convert(backgrounds) self.n = len(self.observed) self.covariance = self._convertCov(covariance) self.nsignal = self.convert(nsignal) self.asimov = asimov self.lumi = lumi if self.nsignal is None: if len(self.backgrounds) == 1: # doesnt matter, does it? self.nsignal = np.array([1.0]) self.signal_rel = self.convert(1.0) elif self.nsignal.sum(): self.signal_rel = self.nsignal / self.nsignal.sum() else: self.signal_rel = array([0.0] * len(self.nsignal)) self.third_moment = self.convert(third_moment) if ( type(self.third_moment) != type(None) and np.sum([abs(x) for x in self.third_moment]) < 1e-10 ): self.third_moment = None self.name = name self.deltas_rel = deltas_rel self._computeABC() self.weight = np.linalg.inv(self.V )
[docs] def generateAsimovData ( self, theta_hat : list, mu : float = 0. ): """ generate a model with Asimov data out of the current model :param theta_hat: the profiled nuisance parameters :returns: Data object """ newModel = copy.deepcopy(self) if newModel.isLinear(): newModel.observed = array([mu*s + x + y for s, x, y in zip(self.nsignal, self.backgrounds, theta_hat)]) # old else: newModel.observed = mu*self.nsignal + self.A + theta_hat + self.C * theta_hat**2 / self.B**2 newModel.name = self.name + "A" newModel.asimov = mu return newModel
[docs] def totalCovariance(self, nsig : list ): """get the total covariance matrix, taking into account also signal uncertainty for the signal hypothesis <nsig>. If nsig is None, the predefined signal hypothesis is taken. """ if self.isLinear(): cov_tot = self.V + self.var_s(nsig) else: cov_tot = self.covariance + self.var_s(nsig) return cov_tot
[docs] def zeroSignal(self): """ Is the total number of signal events zero? """ if self.nsignal is None: return True return len(self.nsignal[self.nsignal > 0.0]) == 0
[docs] def var_s(self, nsig : Union[None,list] = None): """ The signal variances. Convenience function. :param nsig: If None, it will use the model evaluationType number of signal events, otherwise will return the variances for the input value using the relative signal uncertainty defined for the model. """ if nsig is None: nsig = self.nsignal else: nsig = self.convert(nsig) return np.diag((nsig * self.deltas_rel) ** 2)
[docs] def isScalar(self, obj): """ Determine if obj is a scalar (float or int) """ if isinstance(obj, ndarray): ## need to treat separately since casting array([0.]) to float works return False try: _ = float(obj) return True except (ValueError, TypeError): pass return False
[docs] def convert(self, obj): """ Convert object to numpy arrays. If object is a float or int, it is converted to a one element array. """ if type(obj) == type(None): return obj if self.isScalar(obj): return array([obj]) return array(obj)
def __str__(self): return self.name + f" ({self.n} dims)" def _convertCov(self, obj): if self.isScalar(obj): return array([[obj]]) if isinstance(obj[0], list): return array(obj) if isinstance(obj[0], float): ## if the matrix is flattened, unflatten it. return array([obj[self.n * i : self.n * (i + 1)] for i in range(self.n)]) return obj def _computeABC(self): """ Compute the terms A, B, C, rho, V. Corresponds with Eqs. 1.27-1.30 in arXiv:1809.05548 """ self.V = self.covariance if self.third_moment is None: self.A = None self.B = None self.C = None return covD = self.diagCov() C = [] for m2, m3 in zip(covD, self.third_moment): if m3 == 0.0: m3 = 1e-30 k = -np.sign(m3) * sqrt(2.0 * m2) dm = sqrt(8.0 * m2**3 / m3**2 - 1.0) C.append(k * np.cos(4.0 * np.pi / 3.0 + np.arctan(dm) / 3.0)) self.C = np.array(C) ## C, as define in Eq. 1.27 (?) in the second paper self.B = sqrt(covD - 2 * self.C**2) ## B, as defined in Eq. 1.28(?) self.A = self.backgrounds - self.C ## A, Eq. 1.30(?) self.rho = np.array([[0.0] * self.n] * self.n) ## Eq. 1.29 (?) for x in range(self.n): for y in range(x, self.n): bxby = self.B[x] * self.B[y] cxcy = self.C[x] * self.C[y] e = (4.0 * cxcy) ** (-1) * ( sqrt(bxby**2 + 8 * cxcy * self.covariance[x][y]) - bxby ) self.rho[x][y] = e self.rho[y][x] = e self.sandwich() # self.V = sandwich ( self.B, self.rho )
[docs] def sandwich(self): """ Sandwich product """ ret = np.array([[0.0] * len(self.B)] * len(self.B)) for x in range(len(self.B)): for y in range(x, len(self.B)): T = self.B[x] * self.B[y] * self.rho[x][y] ret[x][y] = T ret[y][x] = T self.V = ret
[docs] def isLinear(self): """ Statistical model is linear, i.e. no quadratic term in poissonians """ return type(self.C) == type(None)
[docs] def diagCov(self): """ Diagonal elements of covariance matrix. Convenience function. """ return np.diag(self.covariance)
[docs] def correlations(self): """ Correlation matrix, computed from covariance matrix. Convenience function. """ if hasattr(self, "corr"): return self.corr self.corr = copy.deepcopy(self.covariance) for x in range(self.n): self.corr[x][x] = 1.0 for y in range(x + 1, self.n): rho = self.corr[x][y] / sqrt(self.covariance[x][x] * self.covariance[y][y]) self.corr[x][y] = rho self.corr[y][x] = rho return self.corr
[docs] def rel_signals(self, mu): """ Returns the number of evaluationType relative signal events, for all datasets, given total signal strength mu. For mu=1, the sum of the numbers = 1. :param mu: Total number of signal events summed over all datasets. """ return mu * self.signal_rel
[docs] def nsignals(self, mu): """ Returns the number of evaluationType signal events, for all datasets, given total signal strength mu. :param mu: Total number of signal events summed over all datasets. """ return mu * self.nsignal
[docs]class LikelihoodComputer: debug_mode = False def __init__(self, data ): """ :param data: a Data object. """ self.origModel = copy.deepcopy ( data ) self.model = data self.asimovComputer = None
[docs] def transform ( self, evaluationType : NllEvalType ): """ replace the actual observations with backgrounds, if evaluationType is True or "posteriori" """ # always start from scratch self.model = copy.deepcopy ( self.origModel ) if evaluationType == observed: return self.model.observed = self.model.backgrounds if evaluationType == apriori: return if not evaluationType == aposteriori: logger.error ( f"dont know the evaluationType value {expected}" ) sys.exit(-1) thetahat, _ = self.findThetaHat(0.) if type(self.model.backgrounds) in [float, np.float64, np.float32, int, np.int64, np.int32]: thetahat = float(thetahat[0]) # obs = self.model.backgrounds + thetahat if self.model.isLinear(): obs = self.model.backgrounds + thetahat else: obs = self.model.A + thetahat + self.model.C * thetahat**2 / self.model.B**2 self.model.observed = obs
@property def name ( self ): return self.model.name
[docs] def dNLLdMu(self, mu, theta_hat = None ): """ d (- ln L)/d mu, if L is the likelihood. The function whose root gives us muhat, i.e. the mu that maximizes the likelihood. :param mu: total number of signal events :param theta_hat: array with nuisance parameters, if None then compute them """ if type(mu) in [ list, np.ndarray ] and len(mu)==1: mu = float(mu[0]) if theta_hat == None: theta_hat, _ = self.findThetaHat ( mu ) model = self.model nsig = model.nsignal if not model.isLinear(): logger.debug("implemented only for linear model") # n_pred^i := mu s_i + b_i + theta_i # NLL = sum_i [ - n_obs^i * ln ( n_pred^i ) + n_pred^i ] # d NLL / d mu = sum_i [ - ( n_obs^i * s_ i ) / n_pred_i + s_i ] # Define relative signal strengths: n_pred = mu * nsig + model.backgrounds + theta_hat for ctr, d in enumerate(n_pred): if d == 0.0: if (model.observed[ctr] * nsig[ctr]) == 0.0: # logger.debug("zero denominator, but numerator also zero, so we set denom to 1.") n_pred[ctr] = 1e-5 else: # n_pred[ctr]=1e-5 raise Exception( f"we have a zero value in the denominator at pos {ctr}, with a non-zero numerator. dont know how to handle." ) ret = -model.observed * nsig / n_pred + nsig if type(ret) in [array, ndarray, list]: ret = sum(ret) return ret
[docs] def extendedOutput(self, extended_output : bool, default : Union[None,float] = None ): if extended_output: ret = { "muhat": default, "sigma_mu": default } ret["nll_min"] = default return ret return default
[docs] def findAvgr(self, theta_hat ): """from the difference observed - background, find got inital values for lower and upper""" model = self.model mu_c = model.observed - model.backgrounds - theta_hat mu_r, wmu_r = [], [] hessian = self.d2NLLdMu2 ( 1., theta_hat ) wtot = 0.0 for s in zip(mu_c, model.nsignal, hessian): if s[1] > 1e-16: w = 1. # 1e-5 if s[2] > 0.0: w = s[2] wtot += w r = s[0] / s[1] mu_r.append( r ) wmu_r.append(w * r ) if len(mu_r)==0: return None, None, None ret = min(mu_r), sum(wmu_r) / wtot, max(mu_r) return ret
[docs] def d2NLLdMu2 ( self, mu, theta_hat, allowZeroHessian=True ): """ the hessian of the likelihood of mu, at mu, which is the Fisher information which is approximately the inverse of the covariance :param allowZeroHessian: if false and sum(observed)==0, then replace \ observed with expected """ # nll=-nobs*ln(mu*s + b + theta) + ( mu*s + b + theta) # d nll / d mu = - nobs * s / ( mu*s + b + theta) + s # d2nll / dmu2 = nobs * s**2 / ( mu*s + b + theta )**2 model = self.model n_pred = mu * model.nsignal + model.backgrounds + theta_hat for i, s in enumerate(n_pred): if s == 0.0: # the denominator in the hessian is 0? if (model.observed[i] * model.nsignal[i]) == 0.0: # logger.debug("zero denominator, but numerator also zero, so we set denom to 1.") n_pred[i] = 1.0 else: raise Exception( f"we have a zero value in the denominator at pos {i}, with a non-zero numerator. dont know how to handle." ) obs = model.observed if sum(obs)==0 and not allowZeroHessian: obs = model.backgrounds hessian = obs * model.nsignal**2 / n_pred**2 if sum(hessian) == 0.0 and not allowZeroHessian: # if all observations are zero, we replace them by the expectations if sum(model.observed) == 0: hessian = model.nsignal**2 / n_pred return hessian
#def findMuHat(
[docs] def findMuHatViaBracketing( self, allowNegativeSignals=False, extended_output=False ): """ Find the most likely signal strength mu via a brent bracketing technique given the relative signal strengths in each dataset (signal region). :param allowNegativeSignals: if true, then also allow for negative values :param extended_output: if true, return also sigma_mu, the estimate \ of the error of mu_hat, and nll_min, the nll at mu_hat :returns: mu_hat, i.e. the maximum likelihood estimate of mu, if \ extended output is requested, it returns a dictionary with mu_hat, \ sigma_mu -- the standard deviation around mu_hat, and nll_min, \ i.e. the nll at mu_hat """ model = self.model if (model.backgrounds == model.observed).all(): return self.extendedOutput(extended_output, 0.0 ) nsig = model.nsignal if type(nsig) in [list, ndarray]: nsig = array(nsig) nsig[nsig == 0.0] = 1e-20 if sum(nsig < 0.0): raise Exception("Negative relative signal strengths!") ## we need a very rough initial guess for mu(hat), to come ## up with a first theta # self.nsig = array([0.]*len(model.observed)) self.mu = 1. ## we start with theta_hat being all zeroes # theta_hat = array([0.]*len(model.observed)) mu_hat_old, mu_hat = 0.0, 1.0 ctr = 0 widener = 3.0 while ( abs(mu_hat - mu_hat_old) > 1e-10 and abs(mu_hat - mu_hat_old) / (mu_hat + mu_hat_old) > 0.5e-2 and ctr < 20 ): theta_hat, _ = self.findThetaHat(mu_hat) ctr += 1 mu_hat_old = mu_hat minr, avgr, maxr = self.findAvgr( theta_hat ) # for i,s in enumerate ( signal_rel ): # if abs(s) < 1e-19: # mu_c[i]=0. ## find mu_hat by finding the root of 1/L dL/dmu. We know ## that the zero has to be between min(mu_c) and max(mu_c). lstarters = [ avgr - 0.2 * abs(avgr), minr, 0.0, -1.0, 1.0, 10.0, -0.1, \ 0.1, -100.0, 100.0, -1000.0, ] closestl, closestr = None, float("inf") for lower in lstarters: lower_v = self.dNLLdMu(lower, theta_hat) if lower_v < 0.0: break if lower_v < closestr: closestl, closestr = lower, lower_v if lower_v > 0.0: logger.debug( f"did not find a lower value with rootfinder(lower) < 0. Closest: f({closestl})={closestr}" ) return self.extendedOutput(extended_output, 0.0 ) ustarters = [ avgr + 0.2 * abs(avgr), maxr, 0.0, 1.0, 10.0, -1.0 - 0.1,\ 0.1, 100.0, -100.0, 1000.0, -1000.0, 0.01, -0.01, ] closestl, closestr = None, float("inf") for upper in ustarters: upper_v = self.dNLLdMu(upper, theta_hat) if upper_v > 0.0: break if upper_v < closestr: closestl, closestr = upper, upper_v if upper_v < 0.0: logger.debug("did not find an upper value with rootfinder(upper) > 0.") return self.extendedOutput(extended_output, 0.0 ) mu_hat = findRoot(self.dNLLdMu, lower, upper, args=(theta_hat, ), rtol=1e-9 ) if not allowNegativeSignals and mu_hat < 0.0: mu_hat = 0.0 theta_hat, _ = self.findThetaHat(mu_hat) self.theta_hat = theta_hat if extended_output: sigma_mu = self.getSigmaMu(mu_hat, theta_hat) nll_ = self.nll( mu_hat ) # print ( f"returning {allowNegativeSignals}: mu_hat {mu_hat}+-{sigma_mu} llhd {llhd}" ) ret = {"muhat": mu_hat, "sigma_mu": sigma_mu } ret["nll_min"] = nll_ return ret return mu_hat
[docs] def getSigmaMu(self, mu, theta_hat): """ Get an estimate for the standard deviation of mu at <mu>, from the inverse hessian """ model = self.model if not model.isLinear(): logger.debug("implemented only for linear model") # d^2 mu NLL / d mu^2 = sum_i [ n_obs^i * s_i**2 / n_pred^i**2 ] hessian = self.d2NLLdMu2 ( mu, theta_hat, allowZeroHessian = False ) hessian = sum ( hessian ) if hessian == 0.0: hessian = 1e-10 """ # if all observations are zero, we replace them by the expectations if sum(model.observed) == 0: hessian = sum(nsig**2 / n_pred) """ stderr = float(np.sqrt(1.0 / hessian)) return stderr
# Define integrand (gaussian_(bg+signal)*poisson(nobs)): # def prob(x0, x1 )
[docs] def nllOfTheta(self, theta ): """ likelihood for nuicance parameters theta, given signal strength \ self.mu. notice, by default it returns nll :param theta: nuisance parameters :param nll: if True, compute negative log likelihood """ nll = True model = self.model # theta = array ( thetaA ) # ntot = model.backgrounds + self.nsig nsig = self.mu * model.nsignal if model.isLinear(): lmbda = model.backgrounds + nsig + theta else: lmbda = nsig + model.A + theta + model.C * theta**2 / model.B**2 lmbda[lmbda <= 0.0] = 1e-30 ## turn zeroes to small values obs = model.observed def is_integer(x): if type(x) in [int, np.int64]: return True if type(x) in [float]: return x.is_integer() return False ## not needed for now allintegers = np.all([is_integer(i) for i in obs]) if nll: if allintegers: poisson = stats.poisson.logpmf(obs, lmbda) else: poisson = -lmbda + obs * np.log(lmbda) - special.loggamma(obs + 1) else: if allintegers: poisson = stats.poisson.pmf(obs, lmbda) else: # poisson = np.exp(-lmbda)*np.power(lmbda,obs)/special.gamma(obs+1) logpoiss = -lmbda + obs * np.log(lmbda) - special.loggamma(obs + 1) poisson = np.exp(logpoiss) try: M = [0.0] * len(theta) C = model.V # if model.n == 1: I think not a good idea # C = model.totalCovariance(self.nsig) dTheta = theta - M expon = -0.5 * np.dot(np.dot(dTheta, self.weight), dTheta) + self.logcoeff # print ( "expon", expon, "coeff", self.coeff ) if nll: gaussian = expon # np.log ( self.coeff ) # gaussian2 = stats.multivariate_normal.logpdf(theta,mean=M,cov=C) ret = -gaussian - sum(poisson) else: gaussian = np.exp(expon) # gaussian = self.coeff * np.exp ( expon ) # gaussian2 = stats.multivariate_normal.pdf(theta,mean=M,cov=C) ret = gaussian * (reduce(lambda x, y: x * y, poisson)) return float(ret) except ValueError as e: raise Exception( f"ValueError {e}, {model.V}" )
[docs] def dNLLdTheta(self, theta): """the derivative of nll as a function of the thetas. Makes it easier to find the maximum likelihood.""" model = self.model # print ( f"nsig {self.nsig} {model.nsignal}" ) nsig = self.mu * model.nsignal if model.isLinear(): xtot = theta + model.backgrounds + nsig xtot[xtot <= 0.0] = 1e-30 ## turn zeroes to small values nllp_ = self.ones - model.observed / xtot + np.dot(theta, self.weight) return nllp_ lmbda = nsig + model.A + theta + model.C * theta**2 / model.B**2 lmbda[lmbda <= 0.0] = 1e-30 ## turn zeroes to small values # nllp_ = ( self.ones - model.observed / lmbda + np.dot( theta , self.weight ) ) * ( self.ones + 2*model.C * theta / model.B**2 ) T = self.ones + 2 * model.C / model.B**2 * theta nllp_ = T - model.observed / lmbda * (T) + np.dot(theta, self.weight) return nllp_
[docs] def d2NLLdTheta2(self, theta): """the Hessian of nll as a function of the thetas. Makes it easier to find the maximum likelihood.""" # xtot = theta + self.ntot model = self.model nsig = self.mu * model.nsignal if model.isLinear(): xtot = theta + model.backgrounds + nsig xtot[xtot <= 0.0] = 1e-30 ## turn zeroes to small values nllh_ = self.weight + np.diag(model.observed / (xtot**2)) return nllh_ lmbda = nsig + model.A + theta + model.C * theta**2 / model.B**2 lmbda[lmbda <= 0.0] = 1e-30 ## turn zeroes to small values T = self.ones + 2 * model.C / model.B**2 * theta # T_i = 1_i + 2*c_i/B_i**2 * theta_i nllh_ = ( self.weight + np.diag(model.observed * T**2 / (lmbda**2)) - np.diag(model.observed / lmbda * 2 * model.C / model.B**2) + np.diag(2 * model.C / model.B**2) ) return nllh_
[docs] def getThetaHat(self, nobs, nb, mu, covb, max_iterations): """ Compute nuisance parameter theta that maximizes our likelihood \ (poisson*gauss) -- by setting dNLL/dTheta to zero :param mu: signal strength :returns: theta_hat """ # ntot = mu * nsig + nb # nll = - nobs ln(ntot + theta) - ntot - theta - theta**2/(2 delta**2) # dnll/dtheta = - nobs / ( ntot + theta ) + 1 + theta / delta**2 # theta**2 + theta * ( delta**2 + ntot ) + delta**2 * ( ntot-nobs) = 0 model = self.model nsig = mu * model.nsignal self.mu = mu sigma2 = covb + model.var_s(nsig) ## np.diag ( (model.deltas)**2 ) ## for now deal with variances only ntot = nb + nsig cov = np.array(sigma2) # weight = cov**(-1) ## weight matrix # weight = linalg.inv(cov) diag_cov = np.diag(cov) # first: no covariances: q = diag_cov * (ntot - nobs) p = ntot + diag_cov thetamaxes = [] # thetamax = -p / 2.0 * (1 - sign(p) * sqrt(1.0 - 4 * q / p**2)) thetamax = -p / 2.0 + sign(p) * sqrt( p**2/4 - q) thetamaxes.append(thetamax) ndims = len(p) def distance(theta1, theta2): for ctr, i in enumerate(theta1): if i == 0.0: theta1[ctr] = 1e-20 for ctr, i in enumerate(theta2): if i == 0.0: theta2[ctr] = 1e-20 return sum(np.abs(theta1 - theta2) / np.abs(theta1 + theta2)) ictr = 0 while ictr < max_iterations: ictr += 1 q = diag_cov * (ntot - nobs) p = ntot + diag_cov for i in range(ndims): # q[i] = diag_cov[i] * ( ntot[i] - nobs[i] ) # p[i] = ntot[i] + diag_cov[i] for j in range(ndims): if i == j: continue dq = thetamax[j] * ntot[i] * diag_cov[i] * self.weight[i, j] dp = thetamax[j] * self.weight[i, j] * diag_cov[i] if abs(dq / q[i]) > 0.3: # logger.warning ( "too big a change in iteration." ) dq = np.abs(0.3 * q[i]) * np.sign(dq) if abs(dp / p[i]) > 0.3: # logger.warning ( "too big a change in iteration." ) dp = np.abs(0.3 * p[i]) * np.sign(dp) q[i] += dq p[i] += dp # thetamax = -p / 2.0 * (1 - sign(p) * sqrt(1.0 - 4 * q / p**2)) thetamax = -p / 2.0 + sign(p) * sqrt( p**2/4 - q) thetamaxes.append(thetamax) if len(thetamaxes) > 2: d1 = distance(thetamaxes[-2], thetamax) d2 = distance(thetamaxes[-3], thetamaxes[-2]) if d1 > d2: raise Exception( f"diverging when computing thetamax: {d1} > {d2}" ) if d1 < 1e-5: return thetamax return thetamax
[docs] def findThetaHat(self, mu : float ): """Compute nuisance parameters theta that maximize our likelihood (poisson*gauss). """ if isinstance(mu,(np.ndarray,list)): mu = mu[0] if mu == None or np.isinf ( mu ): return None mu = float(mu) model = self.model nsig = mu * model.nsignal ## first step is to disregard the covariances and solve the ## quadratic equations ini = self.getThetaHat( model.observed, model.backgrounds, mu, model.covariance, 0) self.cov_tot = model.V # if model.n == 1: # self.cov_tot = model.totalCovariance ( nsig ) # if not model.isLinear(): # self.cov_tot = model.V + model.var_s(nsig) # self.cov_tot = model.totalCovariance (nsig) self.weight = model.weight # np.linalg.inv(self.cov_tot) # self.coeff = 1. ## to catch slogdet warnings on Mac, numpy 2.3.2 ## added by WW&SK, 14/08/2025 with warnings.catch_warnings(): warnings.filterwarnings( "ignore", message="divide by zero", # pattern 1 category=RuntimeWarning ) warnings.filterwarnings( "ignore", message="invalid value encountered", # pattern 2 category=RuntimeWarning ) warnings.filterwarnings( "ignore", message="overflow encountered", # pattern 3 category=RuntimeWarning ) logdet = np.linalg.slogdet(self.cov_tot) self.logcoeff = -model.n / 2 * np.log(2 * np.pi) - 0.5 * logdet[1] # self.coeff = (2*np.pi)**(-model.n/2) * np.exp(-.5* logdet[1] ) # print ( "coeff", self.coeff, "n", model.n, "det", np.linalg.slogdet ( self.cov_tot ) ) # print ( "cov_tot", self.cov_tot[:10] ) self.ones = 1.0 if type(model.observed) in [list, ndarray]: self.ones = np.ones(len(model.observed)) self.gammaln = special.gammaln(model.observed + 1) try: ret_c = optimize.fmin_ncg( self.nllOfTheta, ini, fprime=self.dNLLdTheta, fhess=self.d2NLLdTheta2, full_output=True, disp=0, ) # then always continue with TNC if type(model.observed) in [int, float]: bounds = [(-10 * model.observed, 10 * model.observed)] else: bounds = [(-10 * x, 10 * x) for x in model.observed] ini = ret_c ret_c = optimize.fmin_tnc( self.nllOfTheta, ret_c[0], fprime=self.dNLLdTheta, disp=0, bounds=bounds ) if ret_c[-1] not in [0, 1, 2]: return ret_c[0], ret_c[-1] else: return ret_c[0], 0 logger.debug("tnc worked.") ret = ret_c[0] return ret, -2 except (IndexError, ValueError) as e: logger.error( f"exception: {e}. ini[-3:]={ini[-3:]}" ) raise Exception( f"cov-1={model.covariance + model.var_s(nsig)**(-1)}") return ini, -1
[docs] def nll(self, mu : float, evaluationType : NllEvalType=observed, asimov : Union[None,float] = None ): """compute the profiled likelihood for mu. :param mu: float Parameter of interest, signal strength :param asimov: if :returns: profile likelihood and error code (0=no error) """ if evaluationType != observed: self.transform ( evaluationType ) if asimov != None: assert abs(asimov)<1e-20, "we currently treat asimov data only with mu=0." if self.asimovComputer == None: self.asimovComputer = self.generateAsimovComputer(asimov) return self.nll(mu, asimov=None) # compute the profiled (not normalized) likelihood of observing # nsig signal events theta_hat, _ = self.findThetaHat(mu) if self.debug_mode: self.theta_hat = theta_hat ret = self.nllOfTheta( theta_hat ) return ret
""" def likelihood(self, mu : float, return_nll : bool = False, evaluationType : NllEvalType=observed, asimov : Union[None,float] = None ): # legacy, should slowly phase out nll = self.nll ( mu, evaluationType, asimov ) return exponentiateNLL ( nll, doIt = not return_nll ) """
[docs] def nll_min( self, evaluationType : NllEvalType = observed, allowNegativeSignals : bool = False ) -> dict: """ nll_min :param allowNegativeSignals: if False, then negative nsigs are replaced with 0. :returns: dictionary with muhat, sigma_mu and nll_min as keys """ model = self.model if len(model.observed) == 1: dn = model.observed - model.backgrounds if not allowNegativeSignals and dn[0] < 0.0: dn = [0.0] muhat = float(dn[0]) if abs(model.nsignal) > 1e-100: muhat = float(dn[0] / model.nsignal[0]) #sigma_mu2 = np.sqrt(model.observed[0] / model.nsignal[0] + model.covariance[0][0] ) theta_hat = self.findThetaHat( muhat ) sigma_mu = self.getSigmaMu ( muhat, theta_hat[0] ) nll_min = self.nll( mu = muhat ) # print ( "sigma_mu", sigma_mu, "old", sigma_mu2 ) ret = { "muhat": muhat, "sigma_mu": sigma_mu, "nll_min": nll_min } return ret fmh = self.findMuHat( allowNegativeSignals=allowNegativeSignals, extended_output=True ) return fmh
[docs] def findMuHat( #def findMuHatViaGradientDescent( self, allowNegativeSignals : bool = False, extended_output : bool = False, ): """ Find the most likely signal strength mu via gradient descent given the relative signal strengths in each dataset (signal region). :param allowNegativeSignals: if true, then also allow for negative values :param extended_output: if true, return also sigma_mu, the estimate of the error of mu_hat, and nll_min, the nll at mu_hat :returns: mu_hat, i.e. the maximum likelihood estimate of mu, if extended output is requested, it returns mu_hat, sigma_mu \ -- the standard deviation around mu_hat, and llhd, the likelihood at mu_hat """ theta_hat, _ = self.findThetaHat( 0. ) minr, avgr, maxr = self.findAvgr( theta_hat ) theta_hat, _ = self.findThetaHat( avgr ) minr, avgr, maxr = self.findAvgr( theta_hat ) def myllhd( mu ): if isinstance(mu,(np.ndarray,list)): mu = mu[0] theta = self.findThetaHat ( mu=float(mu) ) ret = self.nll( mu = mu ) return ret import scipy.optimize ominr = minr if minr > 0.: minr = .5 * minr if minr < 0.: minr = 2.*minr if maxr > 0.: maxr = 3.*maxr + 1e-5 if maxr <= 0.: maxr = .3 * maxr + 1e-5 bounds = [(minr,maxr)] if not allowNegativeSignals: bounds = [(0, max(maxr,1e-5))] assert bounds[0][1] > bounds[0][0], f"bounds are in wrong order: {bounds}" o = scipy.optimize.minimize( myllhd, x0=avgr, bounds=bounds, jac = self.dNLLdMu ) nll = o.fun mu_hat = float(o.x[0]) if extended_output: sigma_mu = self.getSigmaMu ( mu_hat, theta_hat ) nll = self.nll( mu_hat ) # llhd = exponentiateNLL ( nll, doIt = not return_nll ) # sigma_mu = float(np.sqrt(hess[0][0])) ret = {"muhat": mu_hat, "sigma_mu": sigma_mu } ret["nll_min"] = nll return ret return mu_hat
[docs] def generateAsimovComputer ( self, mu : float = 0. ): """ given the current computer with its model, generate a model with asimov data for a given mu. :param mu: signal strength to create Asimov data for. :returns: LikelihoodComputer, with theta_hat as data member """ theta_hat, _ = self.findThetaHat( mu ) # sigma_mu = self.getSigmaMu(mu_hat, theta_hat0) # nll0 = computer.likelihood( mu_hat, return_nll=True) aModel = self.model.generateAsimovData( theta_hat, mu ) newComputer = LikelihoodComputer(aModel ) newComputer.theta_hat = theta_hat return newComputer
[docs] def chi2(self ): """ Computes the chi2 for a given number of observed events nobs given the predicted background nb, error on this background deltab, evaluationType number of signal events nsig and the relative error on signal (deltas_rel). :param nsig: number of signal events :return: chi2 (float) """ model = self.model # Compute the likelhood for the null hypothesis (signal hypothesis) H0: nll_ = self.nll(1.) # Compute the maximum likelihood H1, which sits at nsig = nobs - nb # (keeping the same % error on signal): if len(model.observed) == 1: # TODO this nsig initiation seems wrong and changing maxllhd to likelihood # fails ./testStatistics.py : zero division error in L115 mu_hat = ( model.observed - model.backgrounds ) / model.nsignal minnll = self.nll (mu_hat ) else: minnll = self.nll_min( allowNegativeSignals=False ) chi2 = 2 * ( nll_ - minnll) if not np.isfinite(chi2): logger.error( f"chi2 is not a finite number! {chi2},{llhd},{maxllhd}") # Return the test statistic -2log(H0/H1) return chi2
[docs]class UpperLimitComputer: debug_mode = False def __init__(self, likelihoodComputer, cl: float = 0.95): """ :param cl: desired quantile for limits """ self.likelihoodComputer = likelihoodComputer self.cl = cl self.name = likelihoodComputer.model.name
[docs] def nll_min(self,**kwargs) -> float: return self.likelihoodComputer.nll_min ( **kwargs )
[docs] def nll( self, poi_test : float, evaluationType : NllEvalType, **kwargs) -> float: return self.likelihoodComputer.nll ( poi_test, evaluationType, **kwargs )
[docs] def getTotalXSec ( self ): model = self.likelihoodComputer.model return sum ( model.nsignal) / model.lumi
[docs] def getCLsRootFunc( self, evaluationType: NllEvalType=observed, trylasttime: Optional[bool] = False, nSigma : int = 0 ) -> Tuple: """ Obtain the function "CLs-alpha[0.05]" whose root defines the upper limit, plus mu_hat and sigma_mu :param model: statistical model :param evaluationType: one of: observed, apriori, aposteriori :param trylasttime: if True, then dont try extra :param nSigma: the upper limit for central value (0), + 1 sigma, - 1 sigma, etc. For error bands. :return: mu_hat, sigma_mu, CLs-alpha """ model = self.likelihoodComputer.model if model.zeroSignal(): """only zeroes in efficiencies? cannot give a limit!""" return None, None, None oldmodel = model if evaluationType in [ apriori, aposteriori ]: model = copy.deepcopy(oldmodel) model.observed = copy.deepcopy ( model.backgrounds ) if evaluationType == aposteriori: tempc = LikelihoodComputer(oldmodel ) theta_hat_, _ = tempc.findThetaHat(0 ) if model.isLinear(): model.observed = model.backgrounds + theta_hat_ else: model.observed = model.A + theta_hat_ + model.C * theta_hat_**2 / model.B**2 #for i, d in enumerate(model.backgrounds): # d += theta_hat_[i] ### FIXME! computer = LikelihoodComputer(model ) mu_hat = computer.findMuHat( allowNegativeSignals=False, extended_output=False) compA = computer.generateAsimovComputer ( 0. ) sigma_mu = computer.getSigmaMu(mu_hat, compA.theta_hat) nll0 = computer.nll( mu_hat ) theta_hat0 = compA.theta_hat # theta_hat0, _ = computer.findThetaHat( 0. ) #aModel = model.generateAsimovData( theta_hat0, 0. ) # compA = LikelihoodComputer(aModel ) ## compute mu_hatA = compA.findMuHat() # TODO convert rel_signals to signals nll0A = compA.nll( mu=mu_hatA ) # return 1. def clsRoot(mu: float, return_type: Text = "CLs-alpha") -> float: """ Calculate the root :param mu: float POI :param return_type: (Text) can be "CLs-alpha", "1-CLs", "CLs" CLs-alpha: returns CLs - 0.05 1-CLs: returns 1-CLs value CLs: returns CLs value """ nll = computer.nll(mu ) nllA = compA.nll(mu ) return CLsfromNLL(nllA, nll0A, nll, nll0, (mu_hat>mu), return_type=return_type, nSigma = nSigma ) return mu_hat, sigma_mu, clsRoot
[docs] def getUpperLimitOnMu( self, evaluationType : NllEvalType=observed, trylasttime : bool =False, nSigma : int = 0 ) -> Union[None,float]: """upper limit on the signal strength multiplier mu obtained from the defined Data (using the signal prediction for each signal regio/dataset), by using the q_mu test statistic from the CCGV paper (arXiv:1007.1727). :param evaluationType: one of: observed, apriori, aposteriori. :param trylasttime: if True, then dont try extra. :param nSigma: the upper limit for central value (0), + 1 sigma, - 1 sigma, etc. For error bands. :returns: upper limit on the signal strength multiplier mu """ mu_hat, sigma_mu, clsRoot = self.getCLsRootFunc( evaluationType = evaluationType, trylasttime = trylasttime, nSigma = nSigma ) if mu_hat == None: return None try: a, b = determineBrentBracket( mu_hat, sigma_mu, clsRoot, allowNegative=False ) except SModelSError as e: return None mu_lim = findRoot(clsRoot, a, b, rtol=1e-03, xtol=1e-06 ) logger.debug ( f"muhat={mu_hat}+-{sigma_mu} a,b={a,b} mu_lim={mu_lim}" ) return mu_lim
[docs] def CLs( self, mu : float = 1.0, evaluationType: NllEvalType=observed, trylasttime: bool = False, return_type: Text = "CLs", ) -> float: """ Compute the exclusion confidence level of the model (1-CLs). :param model: statistical model :param evaluationType: one of: observed, apriori, aposteriori :param trylasttime: if True, then dont try extra :param return_type: (Text) can be "CLs-alpha", "1-CLs", "CLs" CLs-alpha: returns CLs - 0.05 (alpha) 1-CLs: returns 1-CLs value CLs: returns CLs value """ _, _, clsRoot = self.getCLsRootFunc( evaluationType, trylasttime ) ret = clsRoot(mu, return_type=return_type) # its not an uppser limit on mu, its on nsig # print ( f"@@SL0 been asked to compute CLs mu={mu} asimov={self.likelihoodComputer.model.asimov}: {ret}" ) return ret
if __name__ == "__main__": C = [ 18774.2, -2866.97, -5807.3, -4460.52, -2777.25, -1572.97, -846.653, -442.531, -2866.97, 496.273, 900.195, 667.591, 403.92, 222.614, 116.779, 59.5958, -5807.3, 900.195, 1799.56, 1376.77, 854.448, 482.435, 258.92, 134.975, -4460.52, 667.591, 1376.77, 1063.03, 664.527, 377.714, 203.967, 106.926, -2777.25, 403.92, 854.448, 664.527, 417.837, 238.76, 129.55, 68.2075, -1572.97, 222.614, 482.435, 377.714, 238.76, 137.151, 74.7665, 39.5247, -846.653, 116.779, 258.92, 203.967, 129.55, 74.7665, 40.9423, 21.7285, -442.531, 59.5958, 134.975, 106.926, 68.2075, 39.5247, 21.7285, 11.5732, ] nsignal = [x / 100.0 for x in [47, 29.4, 21.1, 14.3, 9.4, 7.1, 4.7, 4.3]] m = Data( observed=[1964, 877, 354, 182, 82, 36, 15, 11], backgrounds=[2006.4, 836.4, 350.0, 147.1, 62.0, 26.2, 11.1, 4.7], covariance=C, # third_moment = [ 0.1, 0.02, 0.1, 0.1, 0.003, 0.0001, 0.0002, 0.0005 ], third_moment=[0.0] * 8, nsignal=nsignal, name="CMS-NOTE-2017-001 model", ) ulComp = UpperLimitComputer( LikelihoodComputer(m), cl=0.95) ul = ulComp.getUpperLimitOnMu( ) cls = ulComp.CLs( 1. ) print("ul (profiled)", ul) print("CLs (profiled)", cls) """ results: old ul= 180.999844 ul (profiled) 180.68039063387553 CLs (profiled) 0.75 """