#!/usr/bin/env python3
"""
.. module:: simplifiedLikelihoods
:synopsis: Code that implements the simplified likelihoods as presented
in CMS-NOTE-2017-001, see https://cds.cern.ch/record/2242860.
Simplified likelihoods v2 (arXiv:1809.05548) are partly implemented.
.. moduleauthor:: Wolfgang Waltenberger <wolfgang.waltenberger@gmail.com>
"""
from scipy import stats, optimize, integrate, special, linalg
from numpy import sqrt, exp, log, sign, array, ndarray
from functools import reduce
from smodels.base.physicsUnits import UnitXSec
from smodels.statistics.basicStats import CLsfromNLL, determineBrentBracket
from smodels.statistics.exceptions import SModelSStatisticsError as SModelSError
from smodels.statistics.basicStats import observed, apriori, aposteriori, NllEvalType
from typing import Text, Optional, Union, Tuple
from smodels.statistics.basicStats import findRoot
from smodels.tools.caching import roundCache, lru_cache
import numpy as np
import math
import copy
import warnings
try:
from smodels.base.smodelsLogging import logger
except ModuleNotFoundError:
def getLogger():
"""
Configure the logging facility. Maybe adapted to fit into
your framework.
"""
import logging
logger = logging.getLogger("SL")
formatter = logging.Formatter("%(module)s - %(levelname)s: %(message)s")
ch = logging.StreamHandler()
ch.setFormatter(formatter)
# ch.setLevel(logging.DEBUG)
logger.addHandler(ch)
return logger
logger = getLogger()
[docs]class Data:
"""A very simple observed container to collect all the data
needed to fully define a specific statistical model"""
def __init__(
self,
observed,
backgrounds,
covariance,
third_moment=None,
nsignal=None,
name : str = "model",
deltas_rel : float = 0.2,
lumi=None,
asimov : Union[None,float] = None
):
"""
:param observed: number of observed events per dataset
:param backgrounds: evaluationType bg per dataset
:param covariance: uncertainty in background, as a covariance matrix
:param nsignal: number of signal events in each dataset
:param name: give the model a name, just for convenience
:param deltas_rel: the assumed relative error on the signal hypotheses.
The default is 20%.
:param lumi: luminosity of dataset in 1/fb, or None
:param asimov: None if not asimov data, else signal strength of asimov data
"""
self.observed = self.convert(observed) # Make sure observed number of events are integers
## self.observed = np.around(self.convert(observed)) #Make sure observed number of events are integers
self.backgrounds = self.convert(backgrounds)
self.n = len(self.observed)
self.covariance = self._convertCov(covariance)
self.nsignal = self.convert(nsignal)
self.asimov = asimov
self.lumi = lumi
if self.nsignal is None:
if len(self.backgrounds) == 1:
# doesnt matter, does it?
self.nsignal = np.array([1.0])
self.signal_rel = self.convert(1.0)
elif self.nsignal.sum():
self.signal_rel = self.nsignal / self.nsignal.sum()
else:
self.signal_rel = array([0.0] * len(self.nsignal))
self.third_moment = self.convert(third_moment)
if (
type(self.third_moment) != type(None)
and np.sum([abs(x) for x in self.third_moment]) < 1e-10
):
self.third_moment = None
self.name = name
self.deltas_rel = deltas_rel
self._computeABC()
self.weight = np.linalg.inv(self.V )
[docs] def generateAsimovData ( self, theta_hat : list, mu : float = 0. ):
""" generate a model with Asimov data out of the current model
:param theta_hat: the profiled nuisance parameters
:returns: Data object
"""
newModel = copy.deepcopy(self)
if newModel.isLinear():
newModel.observed = array([mu*s + x + y for s, x, y in zip(self.nsignal, self.backgrounds, theta_hat)]) # old
else:
newModel.observed = mu*self.nsignal + self.A + theta_hat + self.C * theta_hat**2 / self.B**2
newModel.name = self.name + "A"
newModel.asimov = mu
return newModel
[docs] def totalCovariance(self, nsig : list ):
"""get the total covariance matrix, taking into account
also signal uncertainty for the signal hypothesis <nsig>.
If nsig is None, the predefined signal hypothesis is taken.
"""
if self.isLinear():
cov_tot = self.V + self.var_s(nsig)
else:
cov_tot = self.covariance + self.var_s(nsig)
return cov_tot
[docs] def zeroSignal(self):
"""
Is the total number of signal events zero?
"""
if self.nsignal is None:
return True
return len(self.nsignal[self.nsignal > 0.0]) == 0
[docs] def var_s(self, nsig : Union[None,list] = None):
"""
The signal variances. Convenience function.
:param nsig: If None, it will use the model evaluationType number of signal events,
otherwise will return the variances for the input value using the relative
signal uncertainty defined for the model.
"""
if nsig is None:
nsig = self.nsignal
else:
nsig = self.convert(nsig)
return np.diag((nsig * self.deltas_rel) ** 2)
[docs] def isScalar(self, obj):
"""
Determine if obj is a scalar (float or int)
"""
if isinstance(obj, ndarray):
## need to treat separately since casting array([0.]) to float works
return False
try:
_ = float(obj)
return True
except (ValueError, TypeError):
pass
return False
[docs] def convert(self, obj):
"""
Convert object to numpy arrays.
If object is a float or int, it is converted to a one element
array.
"""
if type(obj) == type(None):
return obj
if self.isScalar(obj):
return array([obj])
return array(obj)
def __str__(self):
return self.name + f" ({self.n} dims)"
def _convertCov(self, obj):
if self.isScalar(obj):
return array([[obj]])
if isinstance(obj[0], list):
return array(obj)
if isinstance(obj[0], float):
## if the matrix is flattened, unflatten it.
return array([obj[self.n * i : self.n * (i + 1)] for i in range(self.n)])
return obj
def _computeABC(self):
"""
Compute the terms A, B, C, rho, V. Corresponds with
Eqs. 1.27-1.30 in arXiv:1809.05548
"""
self.V = self.covariance
if self.third_moment is None:
self.A = None
self.B = None
self.C = None
return
covD = self.diagCov()
C = []
for m2, m3 in zip(covD, self.third_moment):
if m3 == 0.0:
m3 = 1e-30
k = -np.sign(m3) * sqrt(2.0 * m2)
dm = sqrt(8.0 * m2**3 / m3**2 - 1.0)
C.append(k * np.cos(4.0 * np.pi / 3.0 + np.arctan(dm) / 3.0))
self.C = np.array(C) ## C, as define in Eq. 1.27 (?) in the second paper
self.B = sqrt(covD - 2 * self.C**2) ## B, as defined in Eq. 1.28(?)
self.A = self.backgrounds - self.C ## A, Eq. 1.30(?)
self.rho = np.array([[0.0] * self.n] * self.n) ## Eq. 1.29 (?)
for x in range(self.n):
for y in range(x, self.n):
bxby = self.B[x] * self.B[y]
cxcy = self.C[x] * self.C[y]
e = (4.0 * cxcy) ** (-1) * (
sqrt(bxby**2 + 8 * cxcy * self.covariance[x][y]) - bxby
)
self.rho[x][y] = e
self.rho[y][x] = e
self.sandwich()
# self.V = sandwich ( self.B, self.rho )
[docs] def sandwich(self):
"""
Sandwich product
"""
ret = np.array([[0.0] * len(self.B)] * len(self.B))
for x in range(len(self.B)):
for y in range(x, len(self.B)):
T = self.B[x] * self.B[y] * self.rho[x][y]
ret[x][y] = T
ret[y][x] = T
self.V = ret
[docs] def isLinear(self):
"""
Statistical model is linear, i.e. no quadratic term in poissonians
"""
return type(self.C) == type(None)
[docs] def diagCov(self):
"""
Diagonal elements of covariance matrix. Convenience function.
"""
return np.diag(self.covariance)
[docs] def correlations(self):
"""
Correlation matrix, computed from covariance matrix.
Convenience function.
"""
if hasattr(self, "corr"):
return self.corr
self.corr = copy.deepcopy(self.covariance)
for x in range(self.n):
self.corr[x][x] = 1.0
for y in range(x + 1, self.n):
rho = self.corr[x][y] / sqrt(self.covariance[x][x] * self.covariance[y][y])
self.corr[x][y] = rho
self.corr[y][x] = rho
return self.corr
[docs] def rel_signals(self, mu):
"""
Returns the number of evaluationType relative signal events, for all datasets,
given total signal strength mu. For mu=1, the sum of the numbers = 1.
:param mu: Total number of signal events summed over all datasets.
"""
return mu * self.signal_rel
[docs] def nsignals(self, mu):
"""
Returns the number of evaluationType signal events, for all datasets,
given total signal strength mu.
:param mu: Total number of signal events summed over all datasets.
"""
return mu * self.nsignal
[docs]class LikelihoodComputer:
debug_mode = False
def __init__(self, data ):
"""
:param data: a Data object.
"""
self.origModel = copy.deepcopy ( data )
self.model = data
self.asimovComputer = None
[docs] def dNLLdMu(self, mu, theta_hat = None ):
"""
d (- ln L)/d mu, if L is the likelihood. The function
whose root gives us muhat, i.e. the mu that maximizes
the likelihood.
:param mu: total number of signal events
:param theta_hat: array with nuisance parameters, if None then
compute them
"""
if type(mu) in [ list, np.ndarray ] and len(mu)==1:
mu = float(mu[0])
if theta_hat == None:
theta_hat, _ = self.findThetaHat ( mu )
model = self.model
nsig = model.nsignal
if not model.isLinear():
logger.debug("implemented only for linear model")
# n_pred^i := mu s_i + b_i + theta_i
# NLL = sum_i [ - n_obs^i * ln ( n_pred^i ) + n_pred^i ]
# d NLL / d mu = sum_i [ - ( n_obs^i * s_ i ) / n_pred_i + s_i ]
# Define relative signal strengths:
n_pred = mu * nsig + model.backgrounds + theta_hat
for ctr, d in enumerate(n_pred):
if d == 0.0:
if (model.observed[ctr] * nsig[ctr]) == 0.0:
# logger.debug("zero denominator, but numerator also zero, so we set denom to 1.")
n_pred[ctr] = 1e-5
else:
# n_pred[ctr]=1e-5
raise Exception(
f"we have a zero value in the denominator at pos {ctr}, with a non-zero numerator. dont know how to handle."
)
ret = -model.observed * nsig / n_pred + nsig
if type(ret) in [array, ndarray, list]:
ret = sum(ret)
return ret
[docs] def extendedOutput(self, extended_output : bool,
default : Union[None,float] = None, return_nll : bool = False ):
if extended_output:
ret = { "muhat": default, "sigma_mu": default }
if return_nll:
ret["nll_min"] = default
else:
ret["lmax"] = default
return ret
return default
[docs] def findAvgr(self, theta_hat ):
"""from the difference observed - background, find got inital
values for lower and upper"""
model = self.model
mu_c = model.observed - model.backgrounds - theta_hat
mu_r, wmu_r = [], []
hessian = self.d2NLLdMu2 ( 1., theta_hat )
wtot = 0.0
for s in zip(mu_c, model.nsignal, hessian):
if s[1] > 1e-16:
w = 1. # 1e-5
if s[2] > 0.0:
w = s[2]
wtot += w
r = s[0] / s[1]
mu_r.append( r )
wmu_r.append(w * r )
if len(mu_r)==0:
return None, None, None
ret = min(mu_r), sum(wmu_r) / wtot, max(mu_r)
return ret
[docs] def d2NLLdMu2 ( self, mu, theta_hat, allowZeroHessian=True ):
""" the hessian of the likelihood of mu, at mu,
which is the Fisher information
which is approximately the inverse of the covariance
:param allowZeroHessian: if false and sum(observed)==0, then replace \
observed with expected
"""
# nll=-nobs*ln(mu*s + b + theta) + ( mu*s + b + theta)
# d nll / d mu = - nobs * s / ( mu*s + b + theta) + s
# d2nll / dmu2 = nobs * s**2 / ( mu*s + b + theta )**2
model = self.model
n_pred = mu * model.nsignal + model.backgrounds + theta_hat
for i, s in enumerate(n_pred):
if s == 0.0: # the denominator in the hessian is 0?
if (model.observed[i] * model.nsignal[i]) == 0.0:
# logger.debug("zero denominator, but numerator also zero, so we set denom to 1.")
n_pred[i] = 1.0
else:
raise Exception( f"we have a zero value in the denominator at pos {i}, with a non-zero numerator. dont know how to handle." )
obs = model.observed
if sum(obs)==0 and not allowZeroHessian:
obs = model.backgrounds
hessian = obs * model.nsignal**2 / n_pred**2
if sum(hessian) == 0.0 and not allowZeroHessian:
# if all observations are zero, we replace them by the expectations
if sum(model.observed) == 0:
hessian = model.nsignal**2 / n_pred
return hessian
#def findMuHat(
[docs] def findMuHatViaBracketing( self, allowNegativeSignals=False,
extended_output=False, return_nll=False ):
"""
Find the most likely signal strength mu via a brent bracketing technique
given the relative signal strengths in each dataset (signal region).
:param allowNegativeSignals: if true, then also allow for negative values
:param extended_output: if true, return also sigma_mu, the estimate \
of the error of mu_hat, and lmax, the likelihood at mu_hat
:param return_nll: if true, return nll instead of lmax in the
extended output
:returns: mu_hat, i.e. the maximum likelihood estimate of mu, if \
extended output is requested, it returns a dictionary with mu_hat, \
sigma_mu -- the standard deviation around mu_hat, and lmax, \
i.e. the likelihood at mu_hat
"""
model = self.model
if (model.backgrounds == model.observed).all():
return self.extendedOutput(extended_output, 0.0, return_nll)
nsig = model.nsignal
if type(nsig) in [list, ndarray]:
nsig = array(nsig)
nsig[nsig == 0.0] = 1e-20
if sum(nsig < 0.0):
raise Exception("Negative relative signal strengths!")
## we need a very rough initial guess for mu(hat), to come
## up with a first theta
# self.nsig = array([0.]*len(model.observed))
self.mu = 1.
## we start with theta_hat being all zeroes
# theta_hat = array([0.]*len(model.observed))
mu_hat_old, mu_hat = 0.0, 1.0
ctr = 0
widener = 3.0
while (
abs(mu_hat - mu_hat_old) > 1e-10
and abs(mu_hat - mu_hat_old) / (mu_hat + mu_hat_old) > 0.5e-2
and ctr < 20
):
theta_hat, _ = self.findThetaHat(mu_hat)
ctr += 1
mu_hat_old = mu_hat
minr, avgr, maxr = self.findAvgr( theta_hat )
# for i,s in enumerate ( signal_rel ):
# if abs(s) < 1e-19:
# mu_c[i]=0.
## find mu_hat by finding the root of 1/L dL/dmu. We know
## that the zero has to be between min(mu_c) and max(mu_c).
lstarters = [ avgr - 0.2 * abs(avgr), minr, 0.0, -1.0, 1.0, 10.0, -0.1, \
0.1, -100.0, 100.0, -1000.0, ]
closestl, closestr = None, float("inf")
for lower in lstarters:
lower_v = self.dNLLdMu(lower, theta_hat)
if lower_v < 0.0:
break
if lower_v < closestr:
closestl, closestr = lower, lower_v
if lower_v > 0.0:
logger.debug(
f"did not find a lower value with rootfinder(lower) < 0. Closest: f({closestl})={closestr}"
)
return self.extendedOutput(extended_output, 0.0, return_nll )
ustarters = [ avgr + 0.2 * abs(avgr), maxr, 0.0, 1.0, 10.0, -1.0 - 0.1,\
0.1, 100.0, -100.0, 1000.0, -1000.0, 0.01, -0.01, ]
closestl, closestr = None, float("inf")
for upper in ustarters:
upper_v = self.dNLLdMu(upper, theta_hat)
if upper_v > 0.0:
break
if upper_v < closestr:
closestl, closestr = upper, upper_v
if upper_v < 0.0:
logger.debug("did not find an upper value with rootfinder(upper) > 0.")
return self.extendedOutput(extended_output, 0.0, return_nll )
mu_hat = findRoot(self.dNLLdMu, lower, upper, args=(theta_hat, ), rtol=1e-9 )
if not allowNegativeSignals and mu_hat < 0.0:
mu_hat = 0.0
theta_hat, _ = self.findThetaHat(mu_hat)
self.theta_hat = theta_hat
if extended_output:
sigma_mu = self.getSigmaMu(mu_hat, theta_hat)
llhd = self.likelihood( mu_hat, return_nll=return_nll)
# print ( f"returning {allowNegativeSignals}: mu_hat {mu_hat}+-{sigma_mu} llhd {llhd}" )
ret = {"muhat": mu_hat, "sigma_mu": sigma_mu }
if nll:
ret["nll_min"] = llhd
else:
ret["lmax"] = llhd
return ret
return mu_hat
[docs] def getSigmaMu(self, mu, theta_hat):
"""
Get an estimate for the standard deviation of mu at <mu>, from
the inverse hessian
"""
model = self.model
if not model.isLinear():
logger.debug("implemented only for linear model")
# d^2 mu NLL / d mu^2 = sum_i [ n_obs^i * s_i**2 / n_pred^i**2 ]
hessian = self.d2NLLdMu2 ( mu, theta_hat, allowZeroHessian = False )
hessian = sum ( hessian )
if hessian == 0.0:
hessian = 1e-10
"""
# if all observations are zero, we replace them by the expectations
if sum(model.observed) == 0:
hessian = sum(nsig**2 / n_pred)
"""
stderr = float(np.sqrt(1.0 / hessian))
return stderr
# Define integrand (gaussian_(bg+signal)*poisson(nobs)):
# def prob(x0, x1 )
[docs] def llhdOfTheta(self, theta, nll = True ):
""" likelihood for nuicance parameters theta, given signal strength \
self.mu. notice, by default it returns nll
:param theta: nuisance parameters
:param nll: if True, compute negative log likelihood
"""
model = self.model
# theta = array ( thetaA )
# ntot = model.backgrounds + self.nsig
nsig = self.mu * model.nsignal
if model.isLinear():
lmbda = model.backgrounds + nsig + theta
else:
lmbda = nsig + model.A + theta + model.C * theta**2 / model.B**2
lmbda[lmbda <= 0.0] = 1e-30 ## turn zeroes to small values
obs = model.observed
def is_integer(x):
if type(x) in [int, np.int64]:
return True
if type(x) in [float]:
return x.is_integer()
return False
## not needed for now
allintegers = np.all([is_integer(i) for i in obs])
if nll:
if allintegers:
poisson = stats.poisson.logpmf(obs, lmbda)
else:
poisson = -lmbda + obs * np.log(lmbda) - special.loggamma(obs + 1)
else:
if allintegers:
poisson = stats.poisson.pmf(obs, lmbda)
else:
# poisson = np.exp(-lmbda)*np.power(lmbda,obs)/special.gamma(obs+1)
logpoiss = -lmbda + obs * np.log(lmbda) - special.loggamma(obs + 1)
poisson = np.exp(logpoiss)
try:
M = [0.0] * len(theta)
C = model.V
# if model.n == 1: I think not a good idea
# C = model.totalCovariance(self.nsig)
dTheta = theta - M
expon = -0.5 * np.dot(np.dot(dTheta, self.weight), dTheta) + self.logcoeff
# print ( "expon", expon, "coeff", self.coeff )
if nll:
gaussian = expon # np.log ( self.coeff )
# gaussian2 = stats.multivariate_normal.logpdf(theta,mean=M,cov=C)
ret = -gaussian - sum(poisson)
else:
gaussian = np.exp(expon)
# gaussian = self.coeff * np.exp ( expon )
# gaussian2 = stats.multivariate_normal.pdf(theta,mean=M,cov=C)
ret = gaussian * (reduce(lambda x, y: x * y, poisson))
return float(ret)
except ValueError as e:
raise Exception( f"ValueError {e}, {model.V}" )
[docs] def dNLLdTheta(self, theta):
"""the derivative of nll as a function of the thetas.
Makes it easier to find the maximum likelihood."""
model = self.model
# print ( f"nsig {self.nsig} {model.nsignal}" )
nsig = self.mu * model.nsignal
if model.isLinear():
xtot = theta + model.backgrounds + nsig
xtot[xtot <= 0.0] = 1e-30 ## turn zeroes to small values
nllp_ = self.ones - model.observed / xtot + np.dot(theta, self.weight)
return nllp_
lmbda = nsig + model.A + theta + model.C * theta**2 / model.B**2
lmbda[lmbda <= 0.0] = 1e-30 ## turn zeroes to small values
# nllp_ = ( self.ones - model.observed / lmbda + np.dot( theta , self.weight ) ) * ( self.ones + 2*model.C * theta / model.B**2 )
T = self.ones + 2 * model.C / model.B**2 * theta
nllp_ = T - model.observed / lmbda * (T) + np.dot(theta, self.weight)
return nllp_
[docs] def d2NLLdTheta2(self, theta):
"""the Hessian of nll as a function of the thetas.
Makes it easier to find the maximum likelihood."""
# xtot = theta + self.ntot
model = self.model
nsig = self.mu * model.nsignal
if model.isLinear():
xtot = theta + model.backgrounds + nsig
xtot[xtot <= 0.0] = 1e-30 ## turn zeroes to small values
nllh_ = self.weight + np.diag(model.observed / (xtot**2))
return nllh_
lmbda = nsig + model.A + theta + model.C * theta**2 / model.B**2
lmbda[lmbda <= 0.0] = 1e-30 ## turn zeroes to small values
T = self.ones + 2 * model.C / model.B**2 * theta
# T_i = 1_i + 2*c_i/B_i**2 * theta_i
nllh_ = (
self.weight
+ np.diag(model.observed * T**2 / (lmbda**2))
- np.diag(model.observed / lmbda * 2 * model.C / model.B**2)
+ np.diag(2 * model.C / model.B**2)
)
return nllh_
[docs] def getThetaHat(self, nobs, nb, mu, covb, max_iterations):
""" Compute nuisance parameter theta that maximizes our likelihood \
(poisson*gauss) -- by setting dNLL/dTheta to zero
:param mu: signal strength
:returns: theta_hat
"""
# ntot = mu * nsig + nb
# nll = - nobs ln(ntot + theta) - ntot - theta - theta**2/(2 delta**2)
# dnll/dtheta = - nobs / ( ntot + theta ) + 1 + theta / delta**2
# theta**2 + theta * ( delta**2 + ntot ) + delta**2 * ( ntot-nobs) = 0
model = self.model
nsig = mu * model.nsignal
self.mu = mu
sigma2 = covb + model.var_s(nsig) ## np.diag ( (model.deltas)**2 )
## for now deal with variances only
ntot = nb + nsig
cov = np.array(sigma2)
# weight = cov**(-1) ## weight matrix
# weight = linalg.inv(cov)
diag_cov = np.diag(cov)
# first: no covariances:
q = diag_cov * (ntot - nobs)
p = ntot + diag_cov
thetamaxes = []
# thetamax = -p / 2.0 * (1 - sign(p) * sqrt(1.0 - 4 * q / p**2))
thetamax = -p / 2.0 + sign(p) * sqrt( p**2/4 - q)
thetamaxes.append(thetamax)
ndims = len(p)
def distance(theta1, theta2):
for ctr, i in enumerate(theta1):
if i == 0.0:
theta1[ctr] = 1e-20
for ctr, i in enumerate(theta2):
if i == 0.0:
theta2[ctr] = 1e-20
return sum(np.abs(theta1 - theta2) / np.abs(theta1 + theta2))
ictr = 0
while ictr < max_iterations:
ictr += 1
q = diag_cov * (ntot - nobs)
p = ntot + diag_cov
for i in range(ndims):
# q[i] = diag_cov[i] * ( ntot[i] - nobs[i] )
# p[i] = ntot[i] + diag_cov[i]
for j in range(ndims):
if i == j:
continue
dq = thetamax[j] * ntot[i] * diag_cov[i] * self.weight[i, j]
dp = thetamax[j] * self.weight[i, j] * diag_cov[i]
if abs(dq / q[i]) > 0.3:
# logger.warning ( "too big a change in iteration." )
dq = np.abs(0.3 * q[i]) * np.sign(dq)
if abs(dp / p[i]) > 0.3:
# logger.warning ( "too big a change in iteration." )
dp = np.abs(0.3 * p[i]) * np.sign(dp)
q[i] += dq
p[i] += dp
# thetamax = -p / 2.0 * (1 - sign(p) * sqrt(1.0 - 4 * q / p**2))
thetamax = -p / 2.0 + sign(p) * sqrt( p**2/4 - q)
thetamaxes.append(thetamax)
if len(thetamaxes) > 2:
d1 = distance(thetamaxes[-2], thetamax)
d2 = distance(thetamaxes[-3], thetamaxes[-2])
if d1 > d2:
raise Exception( f"diverging when computing thetamax: {d1} > {d2}" )
if d1 < 1e-5:
return thetamax
return thetamax
[docs] def findThetaHat(self, mu : float ):
"""Compute nuisance parameters theta that maximize our likelihood
(poisson*gauss).
"""
if isinstance(mu,(np.ndarray,list)):
mu = mu[0]
mu = float(mu)
if np.isinf ( mu ):
return None
model = self.model
nsig = mu * model.nsignal
## first step is to disregard the covariances and solve the
## quadratic equations
ini = self.getThetaHat(
model.observed, model.backgrounds, mu, model.covariance, 0)
self.cov_tot = model.V
# if model.n == 1:
# self.cov_tot = model.totalCovariance ( nsig )
# if not model.isLinear():
# self.cov_tot = model.V + model.var_s(nsig)
# self.cov_tot = model.totalCovariance (nsig)
self.weight = model.weight # np.linalg.inv(self.cov_tot)
# self.coeff = 1.
## to catch slogdet warnings on Mac, numpy 2.3.2
## added by WW&SK, 14/08/2025
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message="divide by zero", # pattern 1
category=RuntimeWarning
)
warnings.filterwarnings(
"ignore",
message="invalid value encountered", # pattern 2
category=RuntimeWarning
)
warnings.filterwarnings(
"ignore",
message="overflow encountered", # pattern 3
category=RuntimeWarning
)
logdet = np.linalg.slogdet(self.cov_tot)
self.logcoeff = -model.n / 2 * np.log(2 * np.pi) - 0.5 * logdet[1]
# self.coeff = (2*np.pi)**(-model.n/2) * np.exp(-.5* logdet[1] )
# print ( "coeff", self.coeff, "n", model.n, "det", np.linalg.slogdet ( self.cov_tot ) )
# print ( "cov_tot", self.cov_tot[:10] )
self.ones = 1.0
if type(model.observed) in [list, ndarray]:
self.ones = np.ones(len(model.observed))
self.gammaln = special.gammaln(model.observed + 1)
try:
ret_c = optimize.fmin_ncg(
self.llhdOfTheta,
ini,
fprime=self.dNLLdTheta,
fhess=self.d2NLLdTheta2,
full_output=True,
disp=0,
)
# then always continue with TNC
if type(model.observed) in [int, float]:
bounds = [(-10 * model.observed, 10 * model.observed)]
else:
bounds = [(-10 * x, 10 * x) for x in model.observed]
ini = ret_c
ret_c = optimize.fmin_tnc(
self.llhdOfTheta, ret_c[0], fprime=self.dNLLdTheta, disp=0, bounds=bounds
)
if ret_c[-1] not in [0, 1, 2]:
return ret_c[0], ret_c[-1]
else:
return ret_c[0], 0
logger.debug("tnc worked.")
ret = ret_c[0]
return ret, -2
except (IndexError, ValueError) as e:
logger.error( f"exception: {e}. ini[-3:]={ini[-3:]}" )
raise Exception( f"cov-1={model.covariance + model.var_s(nsig)**(-1)}")
return ini, -1
[docs] def likelihood(self, mu : float, return_nll : bool = False,
evaluationType : NllEvalType=observed,
asimov : Union[None,float] = None ):
"""compute the profiled likelihood for mu.
:param mu: float Parameter of interest, signal strength
:param return_nll: if true, return nll instead of likelihood
:returns: profile likelihood and error code (0=no error)
"""
if evaluationType != observed:
self.transform ( evaluationType )
if asimov != None:
assert abs(asimov)<1e-20, "we currently treat asimov data only with mu=0."
if self.asimovComputer == None:
self.asimovComputer = self.generateAsimovComputer(asimov)
return self.likelihood(mu,return_nll,asimov=None)
# compute the profiled (not normalized) likelihood of observing
# nsig signal events
theta_hat, _ = self.findThetaHat(mu)
if self.debug_mode:
self.theta_hat = theta_hat
ret = self.llhdOfTheta( theta_hat, return_nll )
return ret
[docs] def lmax(self, return_nll=False, allowNegativeSignals=False):
"""convenience function, computes likelihood for nsig = nobs-nbg,
:param return_nll: return nll instead of likelihood
:param allowNegativeSignals: if False, then negative nsigs are replaced with 0.
"""
model = self.model
if len(model.observed) == 1:
dn = model.observed - model.backgrounds
if not allowNegativeSignals and dn[0] < 0.0:
dn = [0.0]
muhat = float(dn[0])
if abs(model.nsignal) > 1e-100:
muhat = float(dn[0] / model.nsignal[0])
#sigma_mu2 = np.sqrt(model.observed[0] / model.nsignal[0] + model.covariance[0][0] )
theta_hat = self.findThetaHat( muhat )
sigma_mu = self.getSigmaMu ( muhat, theta_hat[0] )
lmax = self.likelihood( return_nll=return_nll, mu = muhat )
# print ( "sigma_mu", sigma_mu, "old", sigma_mu2 )
ret = { "muhat": muhat, "sigma_mu": sigma_mu }
if return_nll:
ret["nll_min"] = lmax
else:
ret["lmax"] = lmax
return ret
s_max = "nll_min" if return_nll else "lmax"
fmh = self.findMuHat( allowNegativeSignals=allowNegativeSignals,
extended_output=True, return_nll=return_nll
)
muhat_, sigma_mu, lmax = fmh["muhat"], fmh["sigma_mu"], fmh[s_max]
lmax = self.likelihood ( return_nll=return_nll, mu=muhat_ )
ret = { "muhat": float ( muhat_ ), "sigma_mu": sigma_mu,
s_max : fmh [ s_max ] }
return ret
[docs] def findMuHat(
#def findMuHatViaGradientDescent(
self,
allowNegativeSignals=False,
extended_output=False,
return_nll=False,
):
"""
Find the most likely signal strength mu via gradient descent
given the relative signal strengths in each dataset (signal region).
:param allowNegativeSignals: if true, then also allow for negative values
:param extended_output: if true, return also sigma_mu, the estimate of the error of mu_hat,
and lmax, the likelihood at mu_hat
:param return_nll: if true, return nll instead of lmax in the extended output
:returns: mu_hat, i.e. the maximum likelihood estimate of mu, if extended \
output is requested, it returns mu_hat, sigma_mu \
-- the standard deviation around mu_hat, and llhd, the likelihood at mu_hat
"""
theta_hat, _ = self.findThetaHat( 0. )
minr, avgr, maxr = self.findAvgr( theta_hat )
theta_hat, _ = self.findThetaHat( avgr )
minr, avgr, maxr = self.findAvgr( theta_hat )
def myllhd( mu ):
if isinstance(mu,(np.ndarray,list)):
mu = mu[0]
theta = self.findThetaHat ( mu=float(mu) )
ret = self.likelihood(return_nll=True, mu = mu )
return ret
import scipy.optimize
ominr = minr
if minr > 0.:
minr = .5 * minr
if minr < 0.:
minr = 2.*minr
if maxr > 0.:
maxr = 3.*maxr + 1e-5
if maxr <= 0.:
maxr = .3 * maxr + 1e-5
bounds = [(minr,maxr)]
if not allowNegativeSignals:
bounds = [(0, max(maxr,1e-5))]
assert bounds[0][1] > bounds[0][0], f"bounds are in wrong order: {bounds}"
o = scipy.optimize.minimize( myllhd, x0=avgr, bounds=bounds, jac = self.dNLLdMu )
llhd = o.fun
if not return_nll:
llhd = np.exp(-o.fun)
"""
hess = o.hess_inv
try:
hess = hess.todense()
except Exception as e:
pass
"""
mu_hat = float(o.x[0])
if extended_output:
sigma_mu = self.getSigmaMu ( mu_hat, theta_hat )
llhd = self.likelihood( mu_hat, return_nll=return_nll)
# sigma_mu = float(np.sqrt(hess[0][0]))
ret = {"muhat": mu_hat, "sigma_mu": sigma_mu }
if return_nll:
ret["nll_min"] = llhd
else:
ret["lmax"] = llhd
return ret
return mu_hat
[docs] def generateAsimovComputer ( self, mu : float = 0. ):
""" given the current computer with its model,
generate a model with asimov data for a given mu.
:param mu: signal strength to create Asimov data for.
:returns: LikelihoodComputer, with theta_hat as data member
"""
theta_hat, _ = self.findThetaHat( mu )
# mu_hat = self.findMuHat( allowNegativeSignals=False, extended_output=False)
# sigma_mu = self.getSigmaMu(mu_hat, theta_hat0)
# nll0 = computer.likelihood( mu_hat, return_nll=True)
aModel = self.model.generateAsimovData( theta_hat, mu )
newComputer = LikelihoodComputer(aModel )
newComputer.theta_hat = theta_hat
return newComputer
[docs] def chi2(self ):
"""
Computes the chi2 for a given number of observed events nobs given
the predicted background nb, error on this background deltab,
evaluationType number of signal events nsig and the relative error on
signal (deltas_rel).
:param nsig: number of signal events
:return: chi2 (float)
"""
model = self.model
# Compute the likelhood for the null hypothesis (signal hypothesis) H0:
llhd = self.likelihood(1., return_nll=True)
# Compute the maximum likelihood H1, which sits at nsig = nobs - nb
# (keeping the same % error on signal):
if len(model.observed) == 1:
# TODO this nsig initiation seems wrong and changing maxllhd to likelihood
# fails ./testStatistics.py : zero division error in L115
mu_hat = ( model.observed - model.backgrounds ) / model.nsignal
maxllhd = self.likelihood (mu_hat, return_nll=True )
else:
maxllhd = self.lmax( return_nll=True, allowNegativeSignals=False)
chi2 = 2 * (llhd - maxllhd)
if not np.isfinite(chi2):
logger.error( f"chi2 is not a finite number! {chi2},{llhd},{maxllhd}")
# Return the test statistic -2log(H0/H1)
return chi2
[docs]class UpperLimitComputer:
debug_mode = False
def __init__(self, likelihoodComputer, cl: float = 0.95):
"""
:param cl: desired quantile for limits
"""
self.likelihoodComputer = likelihoodComputer
self.cl = cl
[docs] def getUpperLimitOnSigmaTimesEff(
self, evaluationType : NllEvalType = observed, trylasttime : bool =False,
nSigma : int = 0 ) -> UnitXSec:
"""upper limit on the fiducial cross section sigma times efficiency,
summed over all signal regions, i.e. sum_i xsec^prod_i eff_i
obtained from the defined Data (using the signal prediction
for each signal region/dataset), by using
the q_mu test statistic from the CCGV paper (arXiv:1007.1727).
:param evaluationType: one of: observed, apriori, aposteriori
:param trylasttime: if True, then dont try extra
:param nSigma: the upper limit for central value (0),
+ 1 sigma, - 1 sigma, etc. For error bands.
:returns: upper limit on fiducial cross section
"""
model = self.likelihoodComputer.model
ul = self.getUpperLimitOnMu( evaluationType=evaluationType,
trylasttime=trylasttime, nSigma = nSigma )
if ul == None:
return ul
if model.lumi is None:
logger.error(f"asked for upper limit on fiducial xsec, but no lumi given with the data")
return ul
xsec = sum(model.nsignal) / model.lumi
return ul * xsec
[docs] def getCLsRootFunc(
self,
evaluationType: NllEvalType=observed,
trylasttime: Optional[bool] = False,
nSigma : int = 0
) -> Tuple:
"""
Obtain the function "CLs-alpha[0.05]" whose root defines the upper limit,
plus mu_hat and sigma_mu
:param model: statistical model
:param evaluationType: one of: observed, apriori, aposteriori
:param trylasttime: if True, then dont try extra
:param nSigma: the upper limit for central value (0),
+ 1 sigma, - 1 sigma, etc.
For error bands.
:return: mu_hat, sigma_mu, CLs-alpha
"""
model = self.likelihoodComputer.model
if model.zeroSignal():
"""only zeroes in efficiencies? cannot give a limit!"""
return None, None, None
oldmodel = model
if evaluationType in [ apriori, aposteriori ]:
model = copy.deepcopy(oldmodel)
model.observed = copy.deepcopy ( model.backgrounds )
if evaluationType == aposteriori:
tempc = LikelihoodComputer(oldmodel )
theta_hat_, _ = tempc.findThetaHat(0 )
if model.isLinear():
model.observed = model.backgrounds + theta_hat_
else:
model.observed = model.A + theta_hat_ + model.C * theta_hat_**2 / model.B**2
#for i, d in enumerate(model.backgrounds):
# d += theta_hat_[i]
### FIXME!
computer = LikelihoodComputer(model )
mu_hat = computer.findMuHat( allowNegativeSignals=False, extended_output=False)
compA = computer.generateAsimovComputer ( 0. )
sigma_mu = computer.getSigmaMu(mu_hat, compA.theta_hat)
nll0 = computer.likelihood( mu_hat, return_nll=True)
theta_hat0 = compA.theta_hat
# theta_hat0, _ = computer.findThetaHat( 0. )
#aModel = model.generateAsimovData( theta_hat0, 0. )
# compA = LikelihoodComputer(aModel )
## compute
mu_hatA = compA.findMuHat()
# TODO convert rel_signals to signals
nll0A = compA.likelihood( mu=mu_hatA, return_nll=True)
# return 1.
def clsRoot(mu: float, return_type: Text = "CLs-alpha") -> float:
"""
Calculate the root
:param mu: float POI
:param return_type: (Text) can be "CLs-alpha", "1-CLs", "CLs"
CLs-alpha: returns CLs - 0.05
1-CLs: returns 1-CLs value
CLs: returns CLs value
"""
nll = computer.likelihood(mu, return_nll=True)
nllA = compA.likelihood(mu, return_nll=True)
return CLsfromNLL(nllA, nll0A, nll, nll0, (mu_hat>mu),
return_type=return_type, nSigma = nSigma )
return mu_hat, sigma_mu, clsRoot
[docs] def getUpperLimitOnMu(
self, evaluationType : NllEvalType=observed, trylasttime : bool =False,
nSigma : int = 0 ) -> Union[None,float]:
"""upper limit on the signal strength multiplier mu
obtained from the defined Data (using the signal prediction
for each signal regio/dataset), by using
the q_mu test statistic from the CCGV paper (arXiv:1007.1727).
:param evaluationType: one of: observed, apriori, aposteriori.
:param trylasttime: if True, then dont try extra.
:param nSigma: the upper limit for central value (0),
+ 1 sigma, - 1 sigma, etc.
For error bands.
:returns: upper limit on the signal strength multiplier mu
"""
mu_hat, sigma_mu, clsRoot = self.getCLsRootFunc(
evaluationType = evaluationType, trylasttime = trylasttime,
nSigma = nSigma )
if mu_hat == None:
return None
try:
a, b = determineBrentBracket( mu_hat, sigma_mu, clsRoot,
allowNegative=False )
except SModelSError as e:
return None
mu_lim = findRoot(clsRoot, a, b, rtol=1e-03, xtol=1e-06 )
logger.debug ( f"muhat={mu_hat}+-{sigma_mu} a,b={a,b} mu_lim={mu_lim}" )
return mu_lim
[docs] def CLs(
self,
mu : float = 1.0,
evaluationType: NllEvalType=observed,
trylasttime: bool = False,
return_type: Text = "CLs",
) -> float:
"""
Compute the exclusion confidence level of the model (1-CLs).
:param model: statistical model
:param evaluationType: one of: observed, apriori, aposteriori
:param trylasttime: if True, then dont try extra
:param return_type: (Text) can be "CLs-alpha", "1-CLs", "CLs"
CLs-alpha: returns CLs - 0.05 (alpha)
1-CLs: returns 1-CLs value
CLs: returns CLs value
"""
_, _, clsRoot = self.getCLsRootFunc( evaluationType, trylasttime )
ret = clsRoot(mu, return_type=return_type)
# its not an uppser limit on mu, its on nsig
# print ( f"@@SL0 been asked to compute CLs mu={mu} asimov={self.likelihoodComputer.model.asimov}: {ret}" )
return ret
if __name__ == "__main__":
C = [
18774.2,
-2866.97,
-5807.3,
-4460.52,
-2777.25,
-1572.97,
-846.653,
-442.531,
-2866.97,
496.273,
900.195,
667.591,
403.92,
222.614,
116.779,
59.5958,
-5807.3,
900.195,
1799.56,
1376.77,
854.448,
482.435,
258.92,
134.975,
-4460.52,
667.591,
1376.77,
1063.03,
664.527,
377.714,
203.967,
106.926,
-2777.25,
403.92,
854.448,
664.527,
417.837,
238.76,
129.55,
68.2075,
-1572.97,
222.614,
482.435,
377.714,
238.76,
137.151,
74.7665,
39.5247,
-846.653,
116.779,
258.92,
203.967,
129.55,
74.7665,
40.9423,
21.7285,
-442.531,
59.5958,
134.975,
106.926,
68.2075,
39.5247,
21.7285,
11.5732,
]
nsignal = [x / 100.0 for x in [47, 29.4, 21.1, 14.3, 9.4, 7.1, 4.7, 4.3]]
m = Data(
observed=[1964, 877, 354, 182, 82, 36, 15, 11],
backgrounds=[2006.4, 836.4, 350.0, 147.1, 62.0, 26.2, 11.1, 4.7],
covariance=C,
# third_moment = [ 0.1, 0.02, 0.1, 0.1, 0.003, 0.0001, 0.0002, 0.0005 ],
third_moment=[0.0] * 8,
nsignal=nsignal,
name="CMS-NOTE-2017-001 model",
)
ulComp = UpperLimitComputer( LikelihoodComputer(m), cl=0.95)
ul = ulComp.getUpperLimitOnMu( )
cls = ulComp.CLs( 1. )
print("ul (profiled)", ul)
print("CLs (profiled)", cls)
"""
results:
old ul= 180.999844
ul (profiled) 180.68039063387553
CLs (profiled) 0.75
"""