#!/usr/bin/env python3
"""
.. module:: statsTools
:synopsis: a module that contains the class responsible for
all statistical computations. Designed to
eventually become simply a frontend for spey
.. moduleauthor:: Wolfgang Waltenberger <wolfgang.waltenberger@gmail.com>
"""
__all__ = [ "StatsComputer", "getCompRetrieverModule" ]
from typing import Union, Text, Dict, List, Optional
from smodels.statistics.exceptions import SModelSStatisticsError as SModelSError
from smodels.base.smodelsLogging import logger
from smodels.base.physicsUnits import fb, UnitLumi
from smodels.statistics.simplifiedLikelihoods import LikelihoodComputer, UpperLimitComputer, Data
from smodels.statistics.pyhfInterface import PyhfData, PyhfUpperLimitComputer
from smodels.statistics.basicStats import observed, apriori, NllEvalType, \
exponentiateNLL
from smodels.statistics.truncatedGaussians import TruncatedGaussians
from smodels.statistics.analysesCombinations import AnaCombLikelihoodComputer
from smodels.experiment.datasetObj import DataSet,CombinedDataSet
from smodels.base.physicsUnits import UnitXSec
from smodels.tools.caching import roundCache, lru_cache
from typing import Union, Text, Optional
[docs]def getCompRetrieverModule():
""" very single convenience function to centralize
switching between our stats code and spey. """
from smodels.base import runtime
if runtime._experimental["spey"]:
from smodels.statistics.speyTools import SpeyRetriever as CompRetriever
return CompRetriever
else:
from smodels.statistics.statsTools import CompRetriever
return CompRetriever
class CompRetriever:
""" simple class that retrieves and constructs the sub computers,
in StatsComputer """
@classmethod
def forMultiBinSL(cls,dataset, nsig, deltas_rel : float ) -> list:
""" get a subcomputer for simplified likelihood sr-combination.
:param dataset: CombinedDataSet object
:param nsig: Number of signal events for each SR
:deltas_rel: Relative uncertainty for the signal
:returns: a subcomputer
"""
dataType = "SL"
covs = dataset.globalInfo.cachedModels
offset = 0
subComputers = []
for covname,cov in covs.items():
if not covname.endswith ( ".cov" ):
continue
if type(cov) != list:
raise SModelSError( f"covariance field has wrong type: {type(cov)}" )
if len(cov) < 1:
raise SModelSError( f"covariance matrix has length {len(cov)}." )
n = len(cov)
# FIXME are we sure thats right, its not the one below?
#nobs = [ x.dataInfo.observedN for x in dataset.origdatasets[offset:offset+n] ]
#bg = [ x.dataInfo.expectedBG for x in dataset.origdatasets[offset:offset+n] ]
#third_momenta = [ getattr ( x.dataInfo, "thirdMoment", None ) for x in dataset.origdatasets[offset:offset+n] ]
nobs = [ x.dataInfo.observedN for x in dataset._datasets[offset:offset+n] ]
bg = [ x.dataInfo.expectedBG for x in dataset._datasets[offset:offset+n] ]
nsig = nsig[offset:offset+n]
third_momenta = [ getattr ( x.dataInfo, "thirdMoment", None ) for x in dataset._datasets[offset:offset+n] ]
c = third_momenta.count ( None )
if c > 0:
if c < len(third_momenta):
logger.warning ( f"third momenta given for some but not all signal regions in {dataset.globalInfo.id}" )
third_momenta = None
data = Data( nobs, bg, cov, third_moment=third_momenta,
nsignal = nsig,
deltas_rel = deltas_rel, lumi=dataset.getLumi(),
name = covname )
likelihoodComputer = LikelihoodComputer ( data )
computer = UpperLimitComputer ( likelihoodComputer )
computer.allowNegativeSignals = False
computer.dataType = dataType
subComputers.append ( computer )
offset += n
return subComputers
@classmethod
def forSingleBin( cls, dataset, nsig, deltas_rel : Optional[float],
lumi : Optional[UnitLumi]=None ) -> list:
""" get a sub computer for an efficiency map (single bin).
:param dataset: DataSet object
:param nsig: Number of signal events for each SR
:deltas_rel: Relative uncertainty for the signal
:returns: a sub computer
"""
data = Data( dataset.dataInfo.observedN, dataset.dataInfo.expectedBG,
dataset.dataInfo.bgError**2, deltas_rel = deltas_rel,
nsignal = nsig, lumi = lumi )
likelihoodComputer = LikelihoodComputer ( data )
computer = UpperLimitComputer ( likelihoodComputer )
computer.dataType = "1bin"
computer.allowNegativeSignals = False
return [ computer ]
@classmethod
def forNNs(cls, dataset, nsig, deltas_rel : Optional[float] ) -> list:
""" get a sub computer for an NN combination.
:param dataset: CombinedDataSet object
:param nsig: Number of signal events for each SR
:deltas_rel: Relative uncertainty for the signal
:returns: a sub computer
"""
globalInfo = dataset.globalInfo
statModels = globalInfo.statModels
labelToONNX = {}
labelToSModelS = {}
hasJsonsWithoutMLModels = False
for sr in globalInfo.srMappings:
# nsignals[ sr["onnx"] ] = 0.
if sr["label"] != None:
labelToONNX [ sr["label"] ] = sr["onnx"]
labelToSModelS [ sr["label"] ] = sr["smodels"]
## translate the signal from smodels names to pyhf names
#for smname,onnxname in translator.items():
# nsignals[onnxname] = self.nsig[smname]
from smodels.statistics.nnInterface import NNData, NNUpperLimitComputer
subComputers = cls.forPyhf ( dataset, nsig, deltas_rel )
for srSetName, model_tuples in globalInfo.statModels.items():
f_signals = {}
for sr in globalInfo.srMappings:
f_signals[ sr["onnx"] ] = 0.
for label in globalInfo.srSets[srSetName]:
smodelsName = labelToSModelS[label]
if smodelsName in nsig:
f_signals[ labelToONNX[label] ] = \
nsig[ smodelsName ]
model_tuple = model_tuples[0]
modelfilename = model_tuple[1]
model_type = model_tuple[0]
if "pyhf" in model_type:
continue
data = NNData( f_signals, dataset )
upperLimitComputer = NNUpperLimitComputer(data,
lumi=dataset.getLumi(),
onnxfilename = modelfilename )
upperLimitComputer.allowNegativeSignals = False
upperLimitComputer.dataType = "nn"
subComputers.append ( upperLimitComputer )
return subComputers
@classmethod
def forPyhf(cls, dataset, nsig, deltas_rel : Optional[float]) -> list:
""" get a sub computer for pyhf combination.
:param dataset: CombinedDataSet object
:param nsig: Number of signal events for each SR
:deltas_rel: Relative uncertainty for the signal
:returns: a sub computer
"""
globalInfo = dataset.globalInfo
datasets = [ds.getID() for ds in dataset.origdatasets]
jsonFiles, jsons = [], []
jsonDictNames = {}
for srSetName, model_tuples in globalInfo.statModels.items():
model_tuple = model_tuples[0]
model_type = model_tuple[0]
model_name = model_tuple[1]
if not "pyhf" in model_type:
continue
jsonFiles.append ( model_name )
jsons.append ( globalInfo.cachedModels[model_name] )
jsonDictNames[model_name]=cls.getAll ( \
globalInfo.srSets[srSetName], globalInfo.srMappingsDict )
jsonRegions = [ region for regions in jsonDictNames.values() for region in regions ]
"""
for ds in datasets:
if not ds in jsonRegions:
logger.info(f'Region {ds} does not appear in any json file for {globalInfo.id}')
"""
logger.debug(f"list of datasets: {datasets}")
# Constructing the list of signals with subsignals matching each json
nsignals = {}
for jsName in jsonFiles:
nsignals.update( { jsName: {} } )
for name, nsig in nsig.items():
for jsName in nsignals.keys():
if name in jsonDictNames[jsName]:
nsignals[jsName].update( { name: nsig } )
includeCRs = False
if hasattr(globalInfo,'includeCRs'):
includeCRs = globalInfo.includeCRs
signalUncertainty = None
if hasattr(globalInfo,"signalUncertainty"):
signalUncertainty = globalInfo.signalUncertainty
r_jsonFiles = {} ## here we reconstruct the old jsonFiles dict
## (for now, maybe we will see that there is a smarter way)
for srSetName, model_tuples in globalInfo.statModels.items():
model_tuple = model_tuples[0]
if not model_tuple[0] in [ "pyhf", "full_pyhf" ]:
continue
region_names = globalInfo.srSets[srSetName]
regions = cls.getRegions ( region_names,
globalInfo.srMappingsDict )
if model_tuple[1] in r_jsonFiles:
raise SModelSError ( f"model {model} mentioned more than once in {globalInfo.path}" )
r_jsonFiles[model_tuple[1]]=regions
subComputers = []
for nsignal,json,(jsonFileName,r_jsonFile) in zip(nsignals.values(),jsons,r_jsonFiles.items() ):
# Loading the jsonFiles, unless we already have them (because we pickled)
data = PyhfData(nsignal, json, r_jsonFile,
includeCRs, signalUncertainty, globalInfo,
jsonFileName = jsonFileName )
upperLimitComputer = PyhfUpperLimitComputer( data,
lumi=dataset.getLumi() )
upperLimitComputer.dataType = "pyhf"
upperLimitComputer.allowNegativeSignals = False
subComputers.append ( upperLimitComputer )
return subComputers
@classmethod
def forTruncatedGaussian(cls,theorypred, corr : float =0.6 ) -> list:
""" get a sub computer for truncated gaussians
:param theorypred: TheoryPrediction object
:param corr: correction factor: \
ULexp_mod = ULexp / (1. - corr*((ULobs-ULexp)/(ULobs+ULexp))) \
a factor of corr = 0.6 is proposed.
:returns: list of subComputers (with a single entry)
"""
# marked as experimental feature
if not hasattr(theorypred, "avgElement"):
logger.error( f"theory prediction {theorypred.analysisId()} has no average element! why??" )
return None
eul = theorypred.dataset.getUpperLimitFor(
element=theorypred.avgElement, txnames=theorypred.txnames, evaluationType=apriori
)
if eul is None:
return None
eul = eul / theorypred.xsection
ul = theorypred.dataset.getUpperLimitFor(
element=theorypred.avgElement, txnames=theorypred.txnames,
evaluationType=observed) / theorypred.xsection
kwargs = { "upperLimitOnMu": float(ul),
"expectedUpperLimitOnMu": float(eul),
"corr": corr }
computer = TruncatedGaussians ( **kwargs )
return [ computer ]
@classmethod
def forAnalysesComb(cls,theoryPredictions, deltas_rel : Optional[float]) \
-> list:
""" get a sub computer for combination of analyses
:param theoryPredictions: list of TheoryPrediction objects
:param deltas_rel: relative error for the signal
:returns: a sub computer
"""
# Only allow negative signal if all theory predictions allow for it
allowNegativeSignals = all([tp.statsComputer.allowNegativeSignals
for tp in theoryPredictions])
computer = AnaCombLikelihoodComputer( theoryPredictions=theoryPredictions,
deltas_rel=deltas_rel )
computer.dataType = "analysesComb"
computer.allowNegativeSignals = allowNegativeSignals
return [ computer ]
@classmethod
def getAll ( obj, srSet : list, srMappingsDict : dict ) -> list:
""" for the srSet, return list only of signal regions,
drop others """
ret = []
for r in srSet:
if r in srMappingsDict:
m = srMappingsDict[r]
if True: # m["type"] == "SR":
ret.append ( r )
return ret
@classmethod
def getRegions ( obj, region_labels : list, srMappingsDict : dict ) -> list:
""" given a list of the region_labels, return the
corresponding list of the region dictionaries """
ret = []
for label in region_labels:
if not label in srMappingsDict:
continue
ret.append ( srMappingsDict[label] )
return ret
[docs]class StatsComputer:
""" this is the stats computer, it takes the subcomputers
upon construction, and handles all the delegations to them,
getting the most sensitive model, etc
"""
def __init__ ( self, subComputers : list ):
"""
Initialise. allowNegativeSignals is true if its true for all
subcomputers.
"""
self.subComputers = subComputers
self.allowNegativeSignals = True
for computer in subComputers:
if not self.allowNegativeSignals:
self.allowNegativeSignals = False
break
[docs] def get_five_values ( self, evaluationType : NllEvalType,
return_nll : bool = False,
check_for_maxima : bool = False )-> Dict:
"""
Return the Five Values: l(bsm), l(sm), muhat, l(muhat), sigma(mu_hat)
:param check_for_maxima: if true, then check lmax against l(sm) and l(bsm)
correct, if necessary
"""
assert return_nll == True, f"get_five_values return_nll {return_nll}"
ret = self.nll_min ( evaluationType = evaluationType )
if ret is None:
return {}
nll_min = ret[ "nll_min" ]
nllbsm = self.nll ( poi_test = 1., evaluationType=evaluationType )
nllsm = self.nll ( poi_test = 0., evaluationType=evaluationType )
ret["nllbsm"] = nllbsm
ret["nllsm"] = nllsm
if check_for_maxima:
if nllsm < nll_min: ## if return_nll is off, its the other way
muhat = ret["muhat"]
logger.debug(f"nllsm={nllsm:.2g} < nll_min({muhat:.2g})={nll_min:.2g}: will correct")
ret[ "nll_min" ] = nllsm
ret["muhat"] = 0.0
if nllbsm < nll_min:
muhat = ret["muhat"]
logger.debug(f"nllbsm={nllbsm:.2g} < nll_min({muhat:.2g})={nll_min:.2g}: will correct")
ret[ "nll_min" ] = nllbsm
ret["muhat"] = 1.0
return ret
[docs] def nll ( self, poi_test : float, evaluationType : NllEvalType,
asimov : Union[None,float] = None, **kwargs ) -> float:
""" simple frontend to individual computers """
msm = self.getMostSensitiveModel()
self.transform ( evaluationType )
kwargs.update ( { "evaluationType": evaluationType, "asimov": asimov } )
# kwargs = { "evaluationType": evaluationType, "asimov": asimov }
idx = msm["idx"]
if idx == None:
return None
ret = self.subComputers[ idx ].nll ( poi_test, **kwargs)
return ret
[docs] def likelihood ( self, poi_test : float, evaluationType : NllEvalType,
return_nll : bool, asimov : Union[None,float] = None,
**kwargs ) -> float:
""" convenience function, should become obsolete longterm """
nll = self.nll ( poi_test, evaluationType, asimov, **kwargs )
return exponentiateNLL ( nll, doIt = not return_nll )
[docs] def CLs ( self, poi_test : float = 1.,
evaluationType : NllEvalType=observed,
**kwargs ) -> Union[float,None]:
""" compute CLs value for a given value of the poi """
ret = self.getMostSensitiveModel()
# print ( f"@@ST0 getMostSensitiveModel eType {evaluationType} ret {ret}" )
idx = ret["idx"]
if idx == None:
return None
# self.transform ( evaluationType )
if hasattr ( self.subComputers[ idx ] , "CLs" ):
return self.subComputers[ idx ].CLs ( poi_test,
evaluationType = evaluationType, **kwargs )
return None
[docs] def restore ( self, evaluationType ):
""" SL only. transform the data to evaluationType or observed """
if evaluationType != observed:
return
for subComputer in self.subComputers:
if subComputer is None:
continue
if subComputer.dataType in [ "pyhf", "truncGaussian", "analysesComb" ]:
continue
subComputer.model = subComputer.origModel
[docs] def nll_min ( self, evaluationType : NllEvalType, ** kwargs ) -> dict:
"""
:returns: dictionary with muhat, sigma_mu and nll_min as keys
"""
msm = self.getMostSensitiveModel()
if msm["idx"] == None:
return { "nll_min": float("nan" ), "mu_hat": float("nan"),
"sigma_mu": float("nan") }
computer = self.subComputers[ msm["idx"] ]
self.transform ( evaluationType )
ret = computer.nll_min (
evaluationType = evaluationType,
allowNegativeSignals = computer.allowNegativeSignals, **kwargs )
return ret
[docs] @lru_cache
def getMostSensitiveModel ( self ) -> dict:
""" convenience function to get the most significant model
:returns: dictionary with idx of the computer, ul_min,
limit_on_xsecs
and the name of the most sensitive model
"""
ul_min,idx,name = float("inf"), None, ""
for i,computer in enumerate ( self.subComputers ):
if computer is None:
continue
ul = computer.getUpperLimitOnMu ( evaluationType=apriori )
# print ( f"@@0 getMostSensitiveModel {i} {computer.name} {ul}" )
if ul != None and ul < ul_min:
ul_min, idx, name = ul, i, computer.name
return { "idx": idx, "ul_min": ul_min, "name": name }
[docs] def getTotalXSec ( self ):
""" get the total yield, summing over all computers """
ret = 0.*fb
for computer in self.subComputers:
add = computer.getTotalXSec()
ret += add
return ret
[docs] def getUpperLimit ( self, evaluationType : NllEvalType,
limit_on_xsec : bool = False,
nSigma : int = 0, **kwargs ) -> Union[float,UnitXSec,None]:
"""
Simple frontend to the upperlimit computers, later to spey.poi_upper_limit
:param limit_on_xsec: if True, then return the limit on the
cross section
:param nSigma: the upper limit for central value (0),
+ 1 sigma, - 1 sigma, etc. For error bands.
:param kwargs: e.g. pmSigma
"""
msm = self.getMostSensitiveModel()
idx = msm["idx"]
if idx == None:
return None
if idx >= len(self.subComputers):
raise SModelSError ( f"most sensitive model is {msm} but we only have {len(self.subComputers)} subComputers" )
ulmu = self.subComputers[ idx ].getUpperLimitOnMu(
evaluationType = evaluationType, nSigma = nSigma, **kwargs )
if ulmu == None or not limit_on_xsec:
return ulmu
ret = ulmu * self.getTotalXSec()
return ret
class SimpleStatsDataSet:
""" a very simple data class that can replace a smodels.dataset,
for 1d SL data only. used for testing and in dataPreparation """
class SimpleInfo:
def __init__ ( self, observedN : float, evaluationTypeBG : float,
bgError : float ):
self.observedN = observedN
self.expectedBG = expectedBG
self.bgError = bgError
class GlobalInfo:
def __init__ ( self, lumi ):
self.id = "SimpleStatsDataSet"
self.lumi = lumi
def __init__ ( self, observedN : float, evaluationTypeBG : float,
bgError : float, lumi : fb ):
""" initialise the dataset with all relevant stats """
self.dataInfo = self.SimpleInfo ( observedN, evaluationTypeBG, bgError )
self.globalInfo = self.GlobalInfo( lumi )
def getLumi ( self ):
return self.globalInfo.lumi
def getType ( self ):
return "efficiencyMap"
if __name__ == "__main__":
# nobs,bg,bgerr,lumi = 3., 4.1, 0.6533758489567854, 35.9/fb
# nobs,bg,bgerr,lumi = 0, 1., 0.2, 35.9/fb
# nobs,bg,bgerr,lumi = 0, 0.001, 0.01, 35.9/fb
nobs,bg,bgerr,lumi = 3905,3658.3,238.767, 35.9/fb
dataset = SimpleStatsDataSet ( nobs, bg, bgerr, lumi )
computer = StatsComputer ( dataset, 1. )
ul = computer.getUpperLimit ( evaluationType = observed,
limit_on_xsec = True )
print ( "ul", ul )
ule = computer.getUpperLimit ( evaluationType = apriori,
limit_on_xsec = True )
print ( "ule", ule )