#!/usr/bin/env python3
"""
.. module:: simplifiedLikelihoods
:synopsis: Code that implements the simplified likelihoods as presented
in CMS-NOTE-2017-001, see https://cds.cern.ch/record/2242860.
Simplified likelihoods v2 (arXiv:1809.05548) are partly implemented.
.. moduleauthor:: Wolfgang Waltenberger <wolfgang.waltenberger@gmail.com>
"""
from scipy import stats, optimize, integrate, special, linalg
from numpy import sqrt, exp, log, sign, array, ndarray
from functools import reduce
from smodels.statistics.basicStats import CLsfromNLL, determineBrentBracket
from smodels.statistics.exceptions import SModelSStatisticsError as SModelSError
from typing import Text, Optional, Union, Tuple
import numpy as np
import math
import copy
try:
from smodels.base.smodelsLogging import logger
except ModuleNotFoundError:
def getLogger():
"""
Configure the logging facility. Maybe adapted to fit into
your framework.
"""
import logging
logger = logging.getLogger("SL")
formatter = logging.Formatter("%(module)s - %(levelname)s: %(message)s")
ch = logging.StreamHandler()
ch.setFormatter(formatter)
# ch.setLevel(logging.DEBUG)
logger.addHandler(ch)
return logger
logger = getLogger()
[docs]class Data:
"""A very simple observed container to collect all the data
needed to fully define a specific statistical model"""
def __init__(
self,
observed,
backgrounds,
covariance,
third_moment=None,
nsignal=None,
name="model",
deltas_rel=0.2,
lumi=None,
):
"""
:param observed: number of observed events per dataset
:param backgrounds: expected bg per dataset
:param covariance: uncertainty in background, as a covariance matrix
:param nsignal: number of signal events in each dataset
:param name: give the model a name, just for convenience
:param deltas_rel: the assumed relative error on the signal hypotheses.
The default is 20%.
:param lumi: luminosity of dataset in 1/fb, or None
"""
self.observed = self.convert(observed) # Make sure observed number of events are integers
## self.observed = np.around(self.convert(observed)) #Make sure observed number of events are integers
self.backgrounds = self.convert(backgrounds)
self.n = len(self.observed)
self.covariance = self._convertCov(covariance)
self.nsignal = self.convert(nsignal)
self.lumi = lumi
if self.nsignal is None:
if len(self.backgrounds) == 1:
# doesnt matter, does it?
self.nsignal = np.array([1.0])
self.signal_rel = self.convert(1.0)
elif self.nsignal.sum():
self.signal_rel = self.nsignal / self.nsignal.sum()
else:
self.signal_rel = array([0.0] * len(self.nsignal))
self.third_moment = self.convert(third_moment)
if (
type(self.third_moment) != type(None)
and np.sum([abs(x) for x in self.third_moment]) < 1e-10
):
self.third_moment = None
self.name = name
self.deltas_rel = deltas_rel
self._computeABC()
[docs] def totalCovariance(self, nsig):
"""get the total covariance matrix, taking into account
also signal uncertainty for the signal hypothesis <nsig>.
If nsig is None, the predefined signal hypothesis is taken.
"""
if self.isLinear():
cov_tot = self.V + self.var_s(nsig)
else:
cov_tot = self.covariance + self.var_s(nsig)
return cov_tot
[docs] def zeroSignal(self):
"""
Is the total number of signal events zero?
"""
if self.nsignal is None:
return True
return len(self.nsignal[self.nsignal > 0.0]) == 0
[docs] def var_s(self, nsig=None):
"""
The signal variances. Convenience function.
:param nsig: If None, it will use the model expected number of signal events,
otherwise will return the variances for the input value using the relative
signal uncertainty defined for the model.
"""
if nsig is None:
nsig = self.nsignal
else:
nsig = self.convert(nsig)
return np.diag((nsig * self.deltas_rel) ** 2)
[docs] def isScalar(self, obj):
"""
Determine if obj is a scalar (float or int)
"""
if isinstance(obj, ndarray):
## need to treat separately since casting array([0.]) to float works
return False
try:
_ = float(obj)
return True
except (ValueError, TypeError):
pass
return False
[docs] def convert(self, obj):
"""
Convert object to numpy arrays.
If object is a float or int, it is converted to a one element
array.
"""
if type(obj) == type(None):
return obj
if self.isScalar(obj):
return array([obj])
return array(obj)
def __str__(self):
return self.name + " (%d dims)" % self.n
def _convertCov(self, obj):
if self.isScalar(obj):
return array([[obj]])
if isinstance(obj[0], list):
return array(obj)
if isinstance(obj[0], float):
## if the matrix is flattened, unflatten it.
return array([obj[self.n * i : self.n * (i + 1)] for i in range(self.n)])
return obj
def _computeABC(self):
"""
Compute the terms A, B, C, rho, V. Corresponds with
Eqs. 1.27-1.30 in arXiv:1809.05548
"""
self.V = self.covariance
if self.third_moment is None:
self.A = None
self.B = None
self.C = None
return
covD = self.diagCov()
C = []
for m2, m3 in zip(covD, self.third_moment):
if m3 == 0.0:
m3 = 1e-30
k = -np.sign(m3) * sqrt(2.0 * m2)
dm = sqrt(8.0 * m2**3 / m3**2 - 1.0)
C.append(k * np.cos(4.0 * np.pi / 3.0 + np.arctan(dm) / 3.0))
self.C = np.array(C) ## C, as define in Eq. 1.27 (?) in the second paper
self.B = sqrt(covD - 2 * self.C**2) ## B, as defined in Eq. 1.28(?)
self.A = self.backgrounds - self.C ## A, Eq. 1.30(?)
self.rho = np.array([[0.0] * self.n] * self.n) ## Eq. 1.29 (?)
for x in range(self.n):
for y in range(x, self.n):
bxby = self.B[x] * self.B[y]
cxcy = self.C[x] * self.C[y]
e = (4.0 * cxcy) ** (-1) * (
sqrt(bxby**2 + 8 * cxcy * self.covariance[x][y]) - bxby
)
self.rho[x][y] = e
self.rho[y][x] = e
self.sandwich()
# self.V = sandwich ( self.B, self.rho )
[docs] def sandwich(self):
"""
Sandwich product
"""
ret = np.array([[0.0] * len(self.B)] * len(self.B))
for x in range(len(self.B)):
for y in range(x, len(self.B)):
T = self.B[x] * self.B[y] * self.rho[x][y]
ret[x][y] = T
ret[y][x] = T
self.V = ret
[docs] def isLinear(self):
"""
Statistical model is linear, i.e. no quadratic term in poissonians
"""
return type(self.C) == type(None)
[docs] def diagCov(self):
"""
Diagonal elements of covariance matrix. Convenience function.
"""
return np.diag(self.covariance)
[docs] def correlations(self):
"""
Correlation matrix, computed from covariance matrix.
Convenience function.
"""
if hasattr(self, "corr"):
return self.corr
self.corr = copy.deepcopy(self.covariance)
for x in range(self.n):
self.corr[x][x] = 1.0
for y in range(x + 1, self.n):
rho = self.corr[x][y] / sqrt(self.covariance[x][x] * self.covariance[y][y])
self.corr[x][y] = rho
self.corr[y][x] = rho
return self.corr
[docs] def rel_signals(self, mu):
"""
Returns the number of expected relative signal events, for all datasets,
given total signal strength mu. For mu=1, the sum of the numbers = 1.
:param mu: Total number of signal events summed over all datasets.
"""
return mu * self.signal_rel
[docs] def nsignals(self, mu):
"""
Returns the number of expected signal events, for all datasets,
given total signal strength mu.
:param mu: Total number of signal events summed over all datasets.
"""
return mu * self.nsignal
[docs]class LikelihoodComputer:
debug_mode = False
def __init__(self, data ):
"""
:param data: a Data object.
"""
self.origModel = copy.deepcopy ( data )
self.model = data
[docs] def dNLLdMu(self, mu, theta_hat = None ):
"""
d (- ln L)/d mu, if L is the likelihood. The function
whose root gives us muhat, i.e. the mu that maximizes
the likelihood.
:param mu: total number of signal events
:param theta_hat: array with nuisance parameters, if None then
compute them
"""
if type(mu) in [ list, np.ndarray ] and len(mu)==1:
mu = float(mu[0])
if theta_hat == None:
theta_hat, _ = self.findThetaHat ( mu )
nsig = self.model.nsignal
if not self.model.isLinear():
logger.debug("implemented only for linear model")
# n_pred^i := mu s_i + b_i + theta_i
# NLL = sum_i [ - n_obs^i * ln ( n_pred^i ) + n_pred^i ]
# d NLL / d mu = sum_i [ - ( n_obs^i * s_ i ) / n_pred_i + s_i ]
# Define relative signal strengths:
n_pred = mu * nsig + self.model.backgrounds + theta_hat
for ctr, d in enumerate(n_pred):
if d == 0.0:
if (self.model.observed[ctr] * nsig[ctr]) == 0.0:
# logger.debug("zero denominator, but numerator also zero, so we set denom to 1.")
n_pred[ctr] = 1e-5
else:
# n_pred[ctr]=1e-5
raise Exception(
"we have a zero value in the denominator at pos %d, with a non-zero numerator. dont know how to handle."
% ctr
)
ret = -self.model.observed * nsig / n_pred + nsig
if type(ret) in [array, ndarray, list]:
ret = sum(ret)
return ret
[docs] def extendedOutput(self, extended_output, default=None):
if extended_output:
ret = {"muhat": default, "sigma_mu": default, "lmax": default}
return ret
return default
[docs] def findAvgr(self, theta_hat ):
"""from the difference observed - background, find got inital
values for lower and upper"""
mu_c = self.model.observed - self.model.backgrounds - theta_hat
mu_r, wmu_r = [], []
hessian = self.d2NLLdMu2 ( 1., theta_hat )
wtot = 0.0
for s in zip(mu_c, self.model.nsignal, hessian):
if s[1] > 1e-16:
w = 1. # 1e-5
if s[2] > 0.0:
w = s[2]
wtot += w
r = s[0] / s[1]
mu_r.append( r )
wmu_r.append(w * r )
if len(mu_r)==0:
return None, None, None
ret = min(mu_r), sum(wmu_r) / wtot, max(mu_r)
return ret
[docs] def d2NLLdMu2 ( self, mu, theta_hat, allowZeroHessian=True ):
""" the hessian of the likelihood of mu, at mu,
which is the Fisher information
which is approximately the inverse of the covariance
:param allowZeroHessian: if false and sum(observed)==0, then replace \
observed with expected
"""
# nll=-nobs*ln(mu*s + b + theta) + ( mu*s + b + theta)
# d nll / d mu = - nobs * s / ( mu*s + b + theta) + s
# d2nll / dmu2 = nobs * s**2 / ( mu*s + b + theta )**2
n_pred = mu * self.model.nsignal + self.model.backgrounds + theta_hat
for i, s in enumerate(n_pred):
if s == 0.0: # the denominator in the hessian is 0?
if (self.model.observed[i] * self.model.nsignal[i]) == 0.0:
# logger.debug("zero denominator, but numerator also zero, so we set denom to 1.")
n_pred[i] = 1.0
else:
raise Exception( f"we have a zero value in the denominator at pos {i}, with a non-zero numerator. dont know how to handle." )
obs = self.model.observed
if sum(obs)==0 and not allowZeroHessian:
obs = self.model.backgrounds
hessian = obs * self.model.nsignal**2 / n_pred**2
if sum(hessian) == 0.0 and not allowZeroHessian:
# if all observations are zero, we replace them by the expectations
if sum(self.model.observed) == 0:
hessian = self.model.nsignal**2 / n_pred
return hessian
#def findMuHat(
[docs] def findMuHatViaBracketing( self, allowNegativeSignals=False,
extended_output=False, nll=False ):
"""
Find the most likely signal strength mu via a brent bracketing technique
given the relative signal strengths in each dataset (signal region).
:param allowNegativeSignals: if true, then also allow for negative values
:param extended_output: if true, return also sigma_mu, the estimate \
of the error of mu_hat, and lmax, the likelihood at mu_hat
:param nll: if true, return nll instead of lmax in the extended output
:returns: mu_hat, i.e. the maximum likelihood estimate of mu, if \
extended output is requested, it returns a dictionary with mu_hat, \
sigma_mu -- the standard deviation around mu_hat, and lmax, \
i.e. the likelihood at mu_hat
"""
if (self.model.backgrounds == self.model.observed).all():
return self.extendedOutput(extended_output, 0.0)
nsig = self.model.nsignal
if type(nsig) in [list, ndarray]:
nsig = array(nsig)
nsig[nsig == 0.0] = 1e-20
if sum(nsig < 0.0):
raise Exception("Negative relative signal strengths!")
## we need a very rough initial guess for mu(hat), to come
## up with a first theta
# self.nsig = array([0.]*len(self.model.observed))
self.mu = 1.
## we start with theta_hat being all zeroes
# theta_hat = array([0.]*len(self.model.observed))
mu_hat_old, mu_hat = 0.0, 1.0
ctr = 0
widener = 3.0
while (
abs(mu_hat - mu_hat_old) > 1e-10
and abs(mu_hat - mu_hat_old) / (mu_hat + mu_hat_old) > 0.5e-2
and ctr < 20
):
theta_hat, _ = self.findThetaHat(mu_hat)
ctr += 1
mu_hat_old = mu_hat
minr, avgr, maxr = self.findAvgr( theta_hat )
# for i,s in enumerate ( signal_rel ):
# if abs(s) < 1e-19:
# mu_c[i]=0.
## find mu_hat by finding the root of 1/L dL/dmu. We know
## that the zero has to be between min(mu_c) and max(mu_c).
lstarters = [ avgr - 0.2 * abs(avgr), minr, 0.0, -1.0, 1.0, 10.0, -0.1, \
0.1, -100.0, 100.0, -1000.0, ]
closestl, closestr = None, float("inf")
for lower in lstarters:
lower_v = self.dNLLdMu(lower, theta_hat)
if lower_v < 0.0:
break
if lower_v < closestr:
closestl, closestr = lower, lower_v
if lower_v > 0.0:
logger.debug(
f"did not find a lower value with rootfinder(lower) < 0. Closest: f({closestl})={closestr}"
)
return self.extendedOutput(extended_output, 0.0)
ustarters = [ avgr + 0.2 * abs(avgr), maxr, 0.0, 1.0, 10.0, -1.0 - 0.1,\
0.1, 100.0, -100.0, 1000.0, -1000.0, 0.01, -0.01, ]
closestl, closestr = None, float("inf")
for upper in ustarters:
upper_v = self.dNLLdMu(upper, theta_hat)
if upper_v > 0.0:
break
if upper_v < closestr:
closestl, closestr = upper, upper_v
if upper_v < 0.0:
logger.debug("did not find an upper value with rootfinder(upper) > 0.")
return self.extendedOutput(extended_output, 0.0)
mu_hat = optimize.brentq(self.dNLLdMu, lower, upper, args=(theta_hat, ), rtol=1e-9)
if not allowNegativeSignals and mu_hat < 0.0:
mu_hat = 0.0
theta_hat, _ = self.findThetaHat(mu_hat)
self.theta_hat = theta_hat
if extended_output:
sigma_mu = self.getSigmaMu(mu_hat, theta_hat)
llhd = self.likelihood( mu_hat, nll=nll)
# print ( f"returning {allowNegativeSignals}: mu_hat {mu_hat}+-{sigma_mu} llhd {llhd}" )
ret = {"muhat": mu_hat, "sigma_mu": sigma_mu, "lmax": llhd}
return ret
return mu_hat
[docs] def getSigmaMu(self, mu, theta_hat):
"""
Get an estimate for the standard deviation of mu at <mu>, from
the inverse hessian
"""
if not self.model.isLinear():
logger.debug("implemented only for linear model")
# d^2 mu NLL / d mu^2 = sum_i [ n_obs^i * s_i**2 / n_pred^i**2 ]
hessian = self.d2NLLdMu2 ( mu, theta_hat, allowZeroHessian = False )
hessian = sum ( hessian )
if hessian == 0.0:
hessian = 1e-10
"""
# if all observations are zero, we replace them by the expectations
if sum(self.model.observed) == 0:
hessian = sum(nsig**2 / n_pred)
"""
stderr = float(np.sqrt(1.0 / hessian))
return stderr
# Define integrand (gaussian_(bg+signal)*poisson(nobs)):
# def prob(x0, x1 )
[docs] def llhdOfTheta(self, theta, nll = True ):
""" likelihood for nuicance parameters theta, given signal strength \
self.mu. notice, by default it returns nll
:param theta: nuisance parameters
:params nll: if True, compute negative log likelihood
"""
# theta = array ( thetaA )
# ntot = self.model.backgrounds + self.nsig
nsig = self.mu * self.model.nsignal
if self.model.isLinear():
lmbda = self.model.backgrounds + nsig + theta
else:
lmbda = nsig + self.model.A + theta + self.model.C * theta**2 / self.model.B**2
lmbda[lmbda <= 0.0] = 1e-30 ## turn zeroes to small values
obs = self.model.observed
def is_integer(x):
if type(x) in [int, np.int64]:
return True
if type(x) in [float]:
return x.is_integer()
return False
## not needed for now
allintegers = np.all([is_integer(i) for i in obs])
if nll:
if allintegers:
poisson = stats.poisson.logpmf(obs, lmbda)
else:
poisson = -lmbda + obs * np.log(lmbda) - special.loggamma(obs + 1)
else:
if allintegers:
poisson = stats.poisson.pmf(obs, lmbda)
else:
# poisson = np.exp(-lmbda)*np.power(lmbda,obs)/special.gamma(obs+1)
logpoiss = -lmbda + obs * np.log(lmbda) - special.loggamma(obs + 1)
poisson = np.exp(logpoiss)
try:
M = [0.0] * len(theta)
C = self.model.V
# if self.model.n == 1: I think not a good idea
# C = self.model.totalCovariance(self.nsig)
dTheta = theta - M
expon = -0.5 * np.dot(np.dot(dTheta, self.weight), dTheta) + self.logcoeff
# print ( "expon", expon, "coeff", self.coeff )
if nll:
gaussian = expon # np.log ( self.coeff )
# gaussian2 = stats.multivariate_normal.logpdf(theta,mean=M,cov=C)
ret = -gaussian - sum(poisson)
else:
gaussian = np.exp(expon)
# gaussian = self.coeff * np.exp ( expon )
# gaussian2 = stats.multivariate_normal.pdf(theta,mean=M,cov=C)
ret = gaussian * (reduce(lambda x, y: x * y, poisson))
return float(ret)
except ValueError as e:
raise Exception("ValueError %s, %s" % (e, self.model.V))
# raise Exception("ValueError %s, %s" % ( e, self.model.totalCovariance(self.nsig) ))
# raise Exception("ValueError %s, %s" % ( e, self.model.V ))
[docs] def dNLLdTheta(self, theta):
"""the derivative of nll as a function of the thetas.
Makes it easier to find the maximum likelihood."""
# print ( f"nsig {self.nsig} {self.model.nsignal}" )
nsig = self.mu * self.model.nsignal
if self.model.isLinear():
xtot = theta + self.model.backgrounds + nsig
xtot[xtot <= 0.0] = 1e-30 ## turn zeroes to small values
nllp_ = self.ones - self.model.observed / xtot + np.dot(theta, self.weight)
return nllp_
lmbda = nsig + self.model.A + theta + self.model.C * theta**2 / self.model.B**2
lmbda[lmbda <= 0.0] = 1e-30 ## turn zeroes to small values
# nllp_ = ( self.ones - self.model.observed / lmbda + np.dot( theta , self.weight ) ) * ( self.ones + 2*self.model.C * theta / self.model.B**2 )
T = self.ones + 2 * self.model.C / self.model.B**2 * theta
nllp_ = T - self.model.observed / lmbda * (T) + np.dot(theta, self.weight)
return nllp_
[docs] def d2NLLdTheta2(self, theta):
"""the Hessian of nll as a function of the thetas.
Makes it easier to find the maximum likelihood."""
# xtot = theta + self.ntot
nsig = self.mu * self.model.nsignal
if self.model.isLinear():
xtot = theta + self.model.backgrounds + nsig
xtot[xtot <= 0.0] = 1e-30 ## turn zeroes to small values
nllh_ = self.weight + np.diag(self.model.observed / (xtot**2))
return nllh_
lmbda = nsig + self.model.A + theta + self.model.C * theta**2 / self.model.B**2
lmbda[lmbda <= 0.0] = 1e-30 ## turn zeroes to small values
T = self.ones + 2 * self.model.C / self.model.B**2 * theta
# T_i = 1_i + 2*c_i/B_i**2 * theta_i
nllh_ = (
self.weight
+ np.diag(self.model.observed * T**2 / (lmbda**2))
- np.diag(self.model.observed / lmbda * 2 * self.model.C / self.model.B**2)
+ np.diag(2 * self.model.C / self.model.B**2)
)
return nllh_
[docs] def getThetaHat(self, nobs, nb, mu, covb, max_iterations):
""" Compute nuisance parameter theta that maximizes our likelihood \
(poisson*gauss) -- by setting dNLL/dTheta to zero
:param mu: signal strength
:returns: theta_hat
"""
# ntot = mu * nsig + nb
# nll = - nobs ln(ntot + theta) - ntot - theta - theta**2/(2 delta**2)
# dnll/dtheta = - nobs / ( ntot + theta ) + 1 + theta / delta**2
# theta**2 + theta * ( delta**2 + ntot ) + delta**2 * ( ntot-nobs) = 0
nsig = mu * self.model.nsignal
self.mu = mu
sigma2 = covb + self.model.var_s(nsig) ## np.diag ( (self.model.deltas)**2 )
## for now deal with variances only
ntot = nb + nsig
cov = np.array(sigma2)
# weight = cov**(-1) ## weight matrix
# weight = linalg.inv(cov)
diag_cov = np.diag(cov)
# first: no covariances:
q = diag_cov * (ntot - nobs)
p = ntot + diag_cov
thetamaxes = []
# thetamax = -p / 2.0 * (1 - sign(p) * sqrt(1.0 - 4 * q / p**2))
thetamax = -p / 2.0 + sign(p) * sqrt( p**2/4 - q)
thetamaxes.append(thetamax)
ndims = len(p)
def distance(theta1, theta2):
for ctr, i in enumerate(theta1):
if i == 0.0:
theta1[ctr] = 1e-20
for ctr, i in enumerate(theta2):
if i == 0.0:
theta2[ctr] = 1e-20
return sum(np.abs(theta1 - theta2) / np.abs(theta1 + theta2))
ictr = 0
while ictr < max_iterations:
ictr += 1
q = diag_cov * (ntot - nobs)
p = ntot + diag_cov
for i in range(ndims):
# q[i] = diag_cov[i] * ( ntot[i] - nobs[i] )
# p[i] = ntot[i] + diag_cov[i]
for j in range(ndims):
if i == j:
continue
dq = thetamax[j] * ntot[i] * diag_cov[i] * self.weight[i, j]
dp = thetamax[j] * self.weight[i, j] * diag_cov[i]
if abs(dq / q[i]) > 0.3:
# logger.warning ( "too big a change in iteration." )
dq = np.abs(0.3 * q[i]) * np.sign(dq)
if abs(dp / p[i]) > 0.3:
# logger.warning ( "too big a change in iteration." )
dp = np.abs(0.3 * p[i]) * np.sign(dp)
q[i] += dq
p[i] += dp
# thetamax = -p / 2.0 * (1 - sign(p) * sqrt(1.0 - 4 * q / p**2))
thetamax = -p / 2.0 + sign(p) * sqrt( p**2/4 - q)
thetamaxes.append(thetamax)
if len(thetamaxes) > 2:
d1 = distance(thetamaxes[-2], thetamax)
d2 = distance(thetamaxes[-3], thetamaxes[-2])
if d1 > d2:
raise Exception("diverging when computing thetamax: %f > %f" % (d1, d2))
if d1 < 1e-5:
return thetamax
return thetamax
[docs] def findThetaHat(self, mu : float ):
"""Compute nuisance parameters theta that maximize our likelihood
(poisson*gauss).
"""
if isinstance(mu,(np.ndarray,list)):
mu = mu[0]
mu = float(mu)
if np.isinf ( mu ):
return None
nsig = mu * self.model.nsignal
## first step is to disregard the covariances and solve the
## quadratic equations
ini = self.getThetaHat(
self.model.observed, self.model.backgrounds, mu, self.model.covariance, 0
)
self.cov_tot = self.model.V
# if self.model.n == 1:
# self.cov_tot = self.model.totalCovariance ( nsig )
# if not self.model.isLinear():
# self.cov_tot = self.model.V + self.model.var_s(nsig)
# self.cov_tot = self.model.totalCovariance (nsig)
self.weight = np.linalg.inv(self.cov_tot)
# self.coeff = 1.
logdet = np.linalg.slogdet(self.cov_tot)
self.logcoeff = -self.model.n / 2 * np.log(2 * np.pi) - 0.5 * logdet[1]
# self.coeff = (2*np.pi)**(-self.model.n/2) * np.exp(-.5* logdet[1] )
# print ( "coeff", self.coeff, "n", self.model.n, "det", np.linalg.slogdet ( self.cov_tot ) )
# print ( "cov_tot", self.cov_tot[:10] )
self.ones = 1.0
if type(self.model.observed) in [list, ndarray]:
self.ones = np.ones(len(self.model.observed))
self.gammaln = special.gammaln(self.model.observed + 1)
try:
ret_c = optimize.fmin_ncg(
self.llhdOfTheta,
ini,
fprime=self.dNLLdTheta,
fhess=self.d2NLLdTheta2,
full_output=True,
disp=0,
)
# then always continue with TNC
if type(self.model.observed) in [int, float]:
bounds = [(-10 * self.model.observed, 10 * self.model.observed)]
else:
bounds = [(-10 * x, 10 * x) for x in self.model.observed]
ini = ret_c
ret_c = optimize.fmin_tnc(
self.llhdOfTheta, ret_c[0], fprime=self.dNLLdTheta, disp=0, bounds=bounds
)
# print ( "[findThetaHat] mu=%s bg=%s observed=%s V=%s, nsig=%s theta=%s, nll=%s" % ( self.nsig[0]/self.model.efficiencies[0], self.model.backgrounds, self.model.observed,self.model.covariance, self.nsig, ret_c[0], self.nllOfNuisances(ret_c[0]) ) )
if ret_c[-1] not in [0, 1, 2]:
return ret_c[0], ret_c[-1]
else:
return ret_c[0], 0
logger.debug("tnc worked.")
ret = ret_c[0]
return ret, -2
except (IndexError, ValueError) as e:
logger.error("exception: %s. ini[-3:]=%s" % (e, ini[-3:]))
raise Exception("cov-1=%s" % (self.model.covariance + self.model.var_s(nsig)) ** (-1))
return ini, -1
[docs] def likelihood(self, mu : float, return_nll : bool = False ):
"""compute the profiled likelihood for mu.
:param mu: float Parameter of interest, signal strength
:param return_nll: if true, return nll instead of likelihood
:returns: profile likelihood and error code (0=no error)
"""
# compute the profiled (not normalized) likelihood of observing
# nsig signal events
theta_hat, _ = self.findThetaHat(mu)
if self.debug_mode:
self.theta_hat = theta_hat
ret = self.llhdOfTheta( theta_hat, return_nll )
return ret
[docs] def lmax(self, return_nll=False, allowNegativeSignals=False):
"""convenience function, computes likelihood for nsig = nobs-nbg,
:param return_nll: return nll instead of likelihood
:param allowNegativeSignals: if False, then negative nsigs are replaced with 0.
"""
if len(self.model.observed) == 1:
dn = self.model.observed - self.model.backgrounds
if not allowNegativeSignals and dn[0] < 0.0:
dn = [0.0]
muhat = float(dn[0])
if abs(self.model.nsignal) > 1e-100:
muhat = float(dn[0] / self.model.nsignal[0])
#sigma_mu2 = np.sqrt(self.model.observed[0] / self.model.nsignal[0] + self.model.covariance[0][0] )
theta_hat = self.findThetaHat( muhat )
sigma_mu = self.getSigmaMu ( muhat, theta_hat[0] )
ret= self.likelihood( return_nll=return_nll, mu = muhat )
# print ( "sigma_mu", sigma_mu, "old", sigma_mu2 )
return { "lmax": ret, "muhat": muhat, "sigma_mu": sigma_mu }
fmh = self.findMuHat( allowNegativeSignals=allowNegativeSignals,
extended_output=True, return_nll=return_nll
)
muhat_, sigma_mu, lmax = fmh["muhat"], fmh["sigma_mu"], fmh["lmax"]
lmax = self.likelihood ( return_nll=return_nll, mu=muhat_ )
ret = { "lmax": lmax, "muhat": float ( muhat_ ), "sigma_mu": sigma_mu }
return ret
[docs] def findMuHat(
#def findMuHatViaGradientDescent(
self,
allowNegativeSignals=False,
extended_output=False,
return_nll=False,
):
"""
Find the most likely signal strength mu via gradient descent
given the relative signal strengths in each dataset (signal region).
:param allowNegativeSignals: if true, then also allow for negative values
:param extended_output: if true, return also sigma_mu, the estimate of the error of mu_hat,
and lmax, the likelihood at mu_hat
:param return_nll: if true, return nll instead of lmax in the extended output
:returns: mu_hat, i.e. the maximum likelihood estimate of mu, if extended \
output is requested, it returns mu_hat, sigma_mu \
-- the standard deviation around mu_hat, and llhd, the likelihood at mu_hat
"""
theta_hat, _ = self.findThetaHat( 0. )
minr, avgr, maxr = self.findAvgr( theta_hat )
theta_hat, _ = self.findThetaHat( avgr )
minr, avgr, maxr = self.findAvgr( theta_hat )
def myllhd( mu ):
if isinstance(mu,(np.ndarray,list)):
mu = mu[0]
theta = self.findThetaHat ( mu=float(mu) )
ret = self.likelihood(return_nll=True, mu = mu )
return ret
import scipy.optimize
ominr = minr
if minr > 0.:
minr = .5 * minr
if minr < 0.:
minr = 2.*minr
if maxr > 0.:
maxr = 3.*maxr + 1e-5
if maxr <= 0.:
maxr = .3 * maxr + 1e-5
bounds = [(minr,maxr)]
if not allowNegativeSignals:
bounds = [(0, max(maxr,1e-5))]
assert bounds[0][1] > bounds[0][0], f"bounds are in wrong order: {bounds}"
o = scipy.optimize.minimize( myllhd, x0=avgr, bounds=bounds, jac = self.dNLLdMu )
llhd = o.fun
if not return_nll:
llhd = np.exp(-o.fun)
"""
hess = o.hess_inv
try:
hess = hess.todense()
except Exception as e:
pass
"""
mu_hat = float(o.x[0])
if extended_output:
sigma_mu = self.getSigmaMu ( mu_hat, theta_hat )
llhd = self.likelihood( mu_hat, return_nll=return_nll)
# sigma_mu = float(np.sqrt(hess[0][0]))
ret = {"muhat": mu_hat, "sigma_mu": sigma_mu, "lmax": llhd }
return ret
return mu_hat
[docs] def chi2(self ):
"""
Computes the chi2 for a given number of observed events nobs given
the predicted background nb, error on this background deltab,
expected number of signal events nsig and the relative error on
signal (deltas_rel).
:param nsig: number of signal events
:return: chi2 (float)
"""
# Compute the likelhood for the null hypothesis (signal hypothesis) H0:
llhd = self.likelihood(1., return_nll=True)
# Compute the maximum likelihood H1, which sits at nsig = nobs - nb
# (keeping the same % error on signal):
if len(self.model.observed) == 1:
# TODO this nsig initiation seems wrong and changing maxllhd to likelihood
# fails ./testStatistics.py : zero division error in L115
mu_hat = ( self.model.observed - self.model.backgrounds ) / self.model.nsignal
maxllhd = self.likelihood (mu_hat, return_nll=True )
else:
maxllhd = self.lmax( return_nll=True, allowNegativeSignals=False)
chi2 = 2 * (llhd - maxllhd)
if not np.isfinite(chi2):
logger.error("chi2 is not a finite number! %s,%s,%s" % (chi2, llhd, maxllhd))
# Return the test statistic -2log(H0/H1)
return chi2
[docs]class UpperLimitComputer:
debug_mode = False
def __init__(self, cl: float = 0.95):
"""
:param cl: desired quantile for limits
"""
self.cl = cl
[docs] def getUpperLimitOnSigmaTimesEff(
self, model, expected=False, trylasttime=False
):
"""upper limit on the fiducial cross section sigma times efficiency,
summed over all signal regions, i.e. sum_i xsec^prod_i eff_i
obtained from the defined Data (using the signal prediction
for each signal region/dataset), by using
the q_mu test statistic from the CCGV paper (arXiv:1007.1727).
:params expected: if false, compute observed,
true: compute a priori expected, "posteriori":
compute a posteriori expected
:params trylasttime: if True, then dont try extra
:returns: upper limit on fiducial cross section
"""
ul = self.getUpperLimitOnMu( model, expected=expected,
trylasttime=trylasttime)
if ul == None:
return ul
if model.lumi is None:
logger.error(f"asked for upper limit on fiducial xsec, but no lumi given with the data")
return ul
xsec = sum(model.nsignal) / model.lumi
return ul * xsec
[docs] def getCLsRootFunc(
self,
model: Data,
expected: Optional[Union[bool, Text]] = False,
trylasttime: Optional[bool] = False,
) -> Tuple:
"""
Obtain the function "CLs-alpha[0.05]" whose root defines the upper limit,
plus mu_hat and sigma_mu
:param model: statistical model
:param expected: false: compute observed, true: compute a priori expected, \
"posteriori": compute a posteriori expected
:param trylasttime: if True, then dont try extra
:return: mu_hat, sigma_mu, CLs-alpha
"""
if model.zeroSignal():
"""only zeroes in efficiencies? cannot give a limit!"""
return None, None, None
oldmodel = model
if expected:
model = copy.deepcopy(oldmodel)
model.observed = copy.deepcopy ( model.backgrounds )
if expected == "posteriori":
tempc = LikelihoodComputer(oldmodel )
theta_hat_, _ = tempc.findThetaHat(0 )
if model.isLinear():
model.observed = model.backgrounds + theta_hat_
else:
model.observed = model.A + theta_hat_ + model.C * theta_hat_**2 / model.B**2
#for i, d in enumerate(model.backgrounds):
# d += theta_hat_[i]
### FIXME!
computer = LikelihoodComputer(model )
mu_hat = computer.findMuHat( allowNegativeSignals=False, extended_output=False)
theta_hat0, _ = computer.findThetaHat( 0. )
sigma_mu = computer.getSigmaMu(mu_hat, theta_hat0)
nll0 = computer.likelihood( mu_hat, return_nll=True)
aModel = copy.deepcopy(model)
if model.isLinear():
aModel.observed = array([x + y for x, y in zip(model.backgrounds, theta_hat0)]) # old
else:
aModel.observed = model.A + theta_hat0 + model.C * theta_hat0**2 / model.B**2
aModel.name = aModel.name + "A"
# print ( f"SL finding mu hat with {aModel.signal_rel}: mu_hatA, obs: {aModel.observed}" )
compA = LikelihoodComputer(aModel )
## compute
mu_hatA = compA.findMuHat()
# TODO convert rel_signals to signals
nll0A = compA.likelihood( mu=mu_hatA, return_nll=True)
# return 1.
def clsRoot(mu: float, return_type: Text = "CLs-alpha") -> float:
"""
Calculate the root
:param mu: float POI
:param return_type: (Text) can be "CLs-alpha", "1-CLs", "CLs"
CLs-alpha: returns CLs - 0.05
1-CLs: returns 1-CLs value
CLs: returns CLs value
"""
nll = computer.likelihood(mu, return_nll=True)
nllA = compA.likelihood(mu, return_nll=True)
return CLsfromNLL(nllA, nll0A, nll, nll0, return_type=return_type)
return mu_hat, sigma_mu, clsRoot
[docs] def getUpperLimitOnMu(
self, model, expected=False, trylasttime=False
):
"""upper limit on the signal strength multiplier mu
obtained from the defined Data (using the signal prediction
for each signal regio/dataset), by using
the q_mu test statistic from the CCGV paper (arXiv:1007.1727).
:params expected: if false, compute observed,
true: compute a priori expected, "posteriori":
compute a posteriori expected
:params trylasttime: if True, then dont try extra
:returns: upper limit on the signal strength multiplier mu
"""
mu_hat, sigma_mu, clsRoot = self.getCLsRootFunc(
model, expected, trylasttime
)
if mu_hat == None:
return None
try:
a, b = determineBrentBracket(mu_hat, sigma_mu, clsRoot, allowNegative=False )
except SModelSError as e:
return None
mu_lim = optimize.brentq(clsRoot, a, b, rtol=1e-03, xtol=1e-06)
logger.debug ( f"muhat={mu_hat}+-{sigma_mu} a,b={a,b} mu_lim={mu_lim}" )
return mu_lim
[docs] def computeCLs(
self,
model: Data,
expected: Union[bool, Text] = False,
trylasttime: bool = False,
return_type: Text = "1-CLs",
) -> float:
"""
Compute the exclusion confidence level of the model (1-CLs).
:param model: statistical model
:param expected: if false, compute observed,
true: compute a priori expected, "posteriori":
compute a posteriori expected
:param trylasttime: if True, then dont try extra
:param return_type: (Text) can be "CLs-alpha", "1-CLs", "CLs"
CLs-alpha: returns CLs - 0.05 (alpha)
1-CLs: returns 1-CLs value
CLs: returns CLs value
"""
_, _, clsRoot = self.getCLsRootFunc(model, expected, trylasttime )
ret = clsRoot(1.0, return_type=return_type)
# its not an uppser limit on mu, its on nsig
return ret
if __name__ == "__main__":
C = [
18774.2,
-2866.97,
-5807.3,
-4460.52,
-2777.25,
-1572.97,
-846.653,
-442.531,
-2866.97,
496.273,
900.195,
667.591,
403.92,
222.614,
116.779,
59.5958,
-5807.3,
900.195,
1799.56,
1376.77,
854.448,
482.435,
258.92,
134.975,
-4460.52,
667.591,
1376.77,
1063.03,
664.527,
377.714,
203.967,
106.926,
-2777.25,
403.92,
854.448,
664.527,
417.837,
238.76,
129.55,
68.2075,
-1572.97,
222.614,
482.435,
377.714,
238.76,
137.151,
74.7665,
39.5247,
-846.653,
116.779,
258.92,
203.967,
129.55,
74.7665,
40.9423,
21.7285,
-442.531,
59.5958,
134.975,
106.926,
68.2075,
39.5247,
21.7285,
11.5732,
]
nsignal = [x / 100.0 for x in [47, 29.4, 21.1, 14.3, 9.4, 7.1, 4.7, 4.3]]
m = Data(
observed=[1964, 877, 354, 182, 82, 36, 15, 11],
backgrounds=[2006.4, 836.4, 350.0, 147.1, 62.0, 26.2, 11.1, 4.7],
covariance=C,
# third_moment = [ 0.1, 0.02, 0.1, 0.1, 0.003, 0.0001, 0.0002, 0.0005 ],
third_moment=[0.0] * 8,
nsignal=nsignal,
name="CMS-NOTE-2017-001 model",
)
ulComp = UpperLimitComputer( cl=0.95)
ul = ulComp.getUpperLimitOnMu(m )
cls = ulComp.computeCLs(m )
print("ul (profiled)", ul)
print("CLs (profiled)", cls)
"""
results:
old ul= 180.999844
ul (profiled) 180.68039063387553
CLs (profiled) 0.75
"""