Source code for statistics.pyhfInterface

#!/usr/bin/env python3

"""
.. module:: pyhfInterface
   :synopsis: Code that delegates the computation of limits and likelihoods to
              pyhf.

.. moduleauthor:: Gael Alguero <gaelalguero@gmail.com>
.. moduleauthor:: Wolfgang Waltenberger <wolfgang.waltenberger@gmail.com>

"""

import jsonpatch
import warnings
import jsonschema
import copy
from scipy import optimize
import numpy as np
from smodels.base.smodelsLogging import logger
import logging
logging.getLogger("pyhf").setLevel(logging.CRITICAL)
# warnings.filterwarnings("ignore")
warnings.filterwarnings("ignore", r"invalid value encountered in log")
from typing import Dict, List

jsonver = ""
try:
    import importlib.metadata

    jsonver = int(importlib.metadata.version("jsonschema")[0])
except Exception as e:
    try:
        from jsonschema import __version__ as jsonver
    except Exception as e:
        pass
if jsonver < 3:
    # if jsonschema.__version__[0] == "2": ## deprecated
    print( f"[SModelS:pyhfInterface] jsonschema is version {jsonschema.__version__}, we need > 3.x.x" )
    sys.exit()

import time, sys, os

try:
    import pyhf
except ModuleNotFoundError:
    print("[SModelS:pyhfInterface] pyhf import failed. Is the module installed?")
    sys.exit(-1)

ver = pyhf.__version__

pyhfinfo = {
    "backend": "numpy",
    "hasgreeted": False,
    "backendver": np.version.full_version,
    "ver": ver,
#    "required": "0.6.1", # the required pyhf version
}

[docs]def setBackend ( backend : str ) -> bool: """ try to setup backend to <backend> :param backend: one of: numpy (default), pytorch, jax, tensorflow :returns: True, if worked, False if failed """ try: pyhf.set_backend( backend ) pyhfinfo["backend"] = backend pyhfinfo["backendver"] = "?" module_name = backend if backend == "pytorch": module_name = "torch" from importlib.metadata import version pyhfinfo["backendver"] = version(module_name) return True except (pyhf.exceptions.ImportBackendError,pyhf.exceptions.InvalidBackend) as e: print( f"[SModelS:pyhfInterface] WARNING could not set {backend} as the pyhf backend: {e}" ) print( f"[SModelS:pyhfInterface] falling back to {pyhfinfo['backend']}." ) # print("[SModelS:pyhfInterface] We however recommend that pytorch be installed.") return False
# setBackend ( "pytorch" ) countWarning = {"llhdszero": 0} # Sets the maximum number of attempts for determining the brent bracketing interval for mu: nattempts_max = 10
[docs]def guessPyhfType ( name : str ) -> str: """ given the pyhf analysis region name, guess the type. awkward, phase this out! """ if name.startswith ( "CR" ): return "CR" if name.startswith ( "VR" ): return "VR" if name.startswith ( "SR" ): return "SR" if "CR" in name: ## arggh!! logger.debug ( f"old jsonFiles format used, and 'CR' in the middle of the region name: {name}. I will assume it is a control region but do switch to the new format ASAP" ) return "CR" return "SR"
[docs]class PyhfData: """ Holds data for use in pyhf :ivar nsignals: signal predictions dictionary of dictionaries, one for each json file, one entry per signal region :ivar inputJsons: list of json instances :ivar jsonFiles: optional list of json files :ivar nWS: number of workspaces = number of json files """ def __init__( self, nsignals : Dict[str, Dict], inputJsons, jsonFiles=None, includeCRs=False, signalUncertainty=None): self.nsignals = nsignals self.getTotalYield() self.inputJsons = inputJsons self.cached_likelihoods = { False: {}, True: {}, "posteriori": {} } ## cache of likelihoods (actually twice_nlls) self.cached_lmaxes = { False: {}, True: {}, "posteriori": {} } # cache of lmaxes (actually twice_nlls) self.cachedULs = { False: {}, True: {}, "posteriori": {}} self.cacheBestCombo = None # memorize also whats the best combo if jsonFiles is None: # If no name has been provided for the json file(s) and the channels, use fake ones jsonFiles = {} for jFile,sregions in nsignals.items(): regions = [] srname = "SR1" if len(sregions)==1: regions.append ( { "smodels": srname, "type": "SR", "pyhf": srname } ) else: for j in range( len ( sregions ) ): srname = f"SR1[{j}]" regions.append ( { "smodels": srname, "type": "SR", "pyhf": srname } ) jsonFiles[ jFile ] = regions self.jsonFiles = jsonFiles self.includeCRs = includeCRs self.signalUncertainty = signalUncertainty self.combinations = None if jsonFiles != None: self.combinations = [os.path.splitext(os.path.basename(js))[0] for js in jsonFiles] self.nWS = len(inputJsons) self.errorFlag = False self.checkConsistency() self.getWSInfo()
[docs] def getTotalYield ( self ): """ the total yield in all signal regions """ S = 0 for dct in self.nsignals.values(): for signal in dct.values(): if isinstance(signal, list): for sig in signal: S += sig else: S += signal self.totalYield = S
[docs] def createPatchForRegion ( self, region, i_ch, ch, jsName ): chname = ch['name'] chname2 = f'{ch["name"]}[0]' ## binned SRs if not region["pyhf"] in [ chname, chname2 ]: return None, None if (region['type'] == 'SR') or (region['type'] == 'CR' and self.includeCRs and region['smodels'] is not None): if region['smodels'] not in self.nsignals[jsName]: logger.error(f"Region {region['smodels']} of {jsName} not in the signal dictionary!") self.errorFlag = True return None, None nBins = len(ch["data"]) # Find all smodels names if many share the same pyhf name (for multi-bin regions) smodelsName = [] ctr = 0 if region["pyhf"] == chname: # no bins, so easy smodelsName.append(region['smodels']) else: ## bins hasAdded = True while hasAdded: hasAdded = False for rregion in self.jsonFiles[jsName]: if rregion["pyhf"] == f'{ch["name"]}[{ctr}]': smodelsName.append(rregion['smodels']) ctr+=1 hasAdded = True if len(smodelsName) != nBins: logger.error(f"Json region {region['pyhf']} has {nBins} bins, but only {len(smodelsName)} are implemented!") self.errorFlag = True return None, None signal = [] for name in smodelsName: # In case of multiple signals for one region, the dict ordering within globalInfo.jsonFiles matters signal.append(self.nsignals[jsName][name]) smodelsName = ";".join( smodelsName ) # Name of the corresponding region(s). Join all names if multiple bins. ret = { "path": f"/channels/{i_ch}/samples/0", "size": nBins, "smodelsName": smodelsName, "signal": signal }, "signalRegions" return ret ret = { 'path': f"/channels/{i_ch}", 'name': chname, 'type': region['type']}, "otherRegions" return ( ret )
[docs] def updatePyhfNames ( self, jsName : str, observations : List ): """ if no pyhf names are given, get them from the ws, in the order of the ws """ if "pyhf" in self.jsonFiles[jsName][0]: return ## we dont have the mapping smodels<->pyhf ctr = 0 #ic ( "---" ) nJsonFiles = len(self.jsonFiles[jsName]) #nObs = len(observations) #ic ( self.includeCRs, nObs, nJsonFiles ) #ic ( observations ) #ic ( self.jsonFiles[jsName] ) nSRs = 0 for observation in observations: name = observation["name"] regionType = guessPyhfType ( name ) if regionType == "SR": nSRs += 1 # ic ( nSRs, nCRs ) for observation in observations: name = observation["name"] regionType = guessPyhfType ( name ) if regionType in [ "VR" ]: region = { "pyhf": observation["name"], "smodels": None, "type": regionType } self.jsonFiles[jsName].append ( region ) continue if not self.includeCRs and regionType in [ "CR" ]: region = { "pyhf": observation["name"], "smodels": None, "type": regionType } self.jsonFiles[jsName].append ( region ) continue if self.includeCRs and regionType in [ "CR" ]: if nSRs == nJsonFiles: # and nSRs+nCRs == nObs: ## the signal regions alone do it region = { "pyhf": observation["name"], "smodels": None, "type": regionType } self.jsonFiles[jsName].append ( region ) continue if len(observation["data"])==1: if ctr < len(self.jsonFiles[jsName]): self.jsonFiles[jsName][ctr]["pyhf"]=f"{name}" self.jsonFiles[jsName][ctr]["type"]=regionType ctr += 1 else: for i in range(len(observation["data"])): if ctr < len(self.jsonFiles[jsName]): self.jsonFiles[jsName][ctr]["pyhf"]=f"{name}[{i}]" self.jsonFiles[jsName][ctr]["type"]=regionType ctr += 1
[docs] def getWSInfo(self): """ Getting informations from the json files :ivar channelsInfo: list of dictionaries (one dictionary for each json file) containing useful information about the json files - :key signalRegions: list of dictonaries with 'json path' and 'size' (number of bins) of the 'signal regions' channels in the json files - :key otherRegions: list of dictionnaries indicating the path and the name of the control and/or validation region channels """ if self.errorFlag: return # Identifying the path to the channels in the main workspace files self.channelsInfo = [] # workspace specifications if not isinstance(self.inputJsons, list): logger.error("The 'inputJsons' parameter must be of type list") self.errorFlag = True return for ws, jsName in zip(self.inputJsons, self.jsonFiles): wsChannelsInfo = {} wsChannelsInfo["signalRegions"] = [] wsChannelsInfo["otherRegions"] = [] if not "channels" in ws.keys(): logger.error( "Json file number {} is corrupted (channels are missing)".format( self.inputJsons.index(ws) ) ) self.channelsInfo = None return sigInCRs = False signalNames = self.nsignals[jsName].keys() for signalName in signalNames: for region in self.jsonFiles[jsName]: if signalName == region['smodels'] and region['type'] == 'CR': sigInCRs = True if sigInCRs and not self.includeCRs: logger.warning("Signal in CRs but includeCRs = False. CRs will still be removed.") smodelsRegions = self.nsignals[jsName].values() # CR and SR names implemented in the database if "observations" in ws: self.updatePyhfNames ( jsName, ws["observations"] ) for i_r, region in enumerate ( self.jsonFiles[jsName] ): for i_ch, ch in enumerate(ws["observations"]): ## create a patch for the region, but only if channel matches patch, patchType = self.createPatchForRegion ( region, i_ch, ch, jsName ) if patch != None: wsChannelsInfo[patchType].append(patch) break wsChannelsInfo["otherRegions"].sort( key=lambda path: int(path['path'].split("/")[-1]), reverse=True ) # Need to sort correctly the paths to the channels to be removed self.channelsInfo.append(wsChannelsInfo)
# ic ( wsChannelsInfo )
[docs] def checkConsistency(self): """ Check various inconsistencies of the PyhfData attributes :param zeroSignalsFlag: boolean identifying if all SRs of a single json are empty """ if not isinstance(self.nsignals, dict): logger.error("The 'nsignals' parameter must be of type list") self.errorFlag = True if self.nWS != len(self.nsignals): logger.error( "The number of subsignals provided is different from the number of json files" ) self.errorFlag = True self.zeroSignalsFlag = list() for jsName, subSig in zip(self.jsonFiles, self.nsignals.values()): if not isinstance(subSig, dict): logger.error("The 'nsignals' parameter must be a dictionary of dictionary") self.errorFlag = True return nBinsJson = 0 for region in self.jsonFiles[jsName]: if (region['type'] == 'SR') or (region['type'] == 'CR' and self.includeCRs and region['smodels'] is not None): nBinsJson += 1 if nBinsJson != len(subSig): logger.error( f"The number of signals ({len(subSig)}) provided is different from the number of signal bins for json ({nBinsJson}) for {jsName}" ) self.errorFlag = True allZero = all([s == 0 for s in subSig.values()]) # Checking if all signals matching this json are zero self.zeroSignalsFlag.append(allZero)
[docs]class PyhfUpperLimitComputer: """ Class that computes the upper limit using the jsons files and signal informations in the 'data' instance of 'PyhfData' """ def __init__(self, data, cl=0.95, lumi=None ): """ :param data: instance of 'PyhfData' holding the signals information :param cl: confdence level at which the upper limit is desired to be computed :ivar data: created from data :ivar nsignals: signal predictions list divided into sublists, one for each json file :ivar inputJsons: list of input json files as python json instances :ivar channelsInfo: list of channels information for the json files :ivar zeroSignalsFlag: list boolean flags in case all signals are zero for a specific json :ivar nWS: number of workspaces = number of json files :ivar patches: list of patches to be applied to the inputJsons as python dictionary instances :ivar workspaces: list of workspaces resulting from the patched inputJsons :ivar workspaces_expected: list of patched workspaces with observation yields replaced by the expected ones :ivar cl: created from cl :ivar scale: scale that is applied to the signal predictions, dynamically changes throughout the upper limit calculation :ivar alreadyBeenThere: boolean flag that identifies when nsignals accidentally passes twice at two identical values """ self.data = data self.lumi = lumi self.nsignals = copy.deepcopy ( self.data.nsignals ) logger.debug("Signals : {}".format(self.nsignals)) self.inputJsons = self.data.inputJsons self.channelsInfo = None if hasattr ( self.data, "channelsInfo" ): self.channelsInfo = self.data.channelsInfo self.zeroSignalsFlag = self.data.zeroSignalsFlag self.nWS = self.data.nWS self.includeCRs = data.includeCRs self.patches = self.patchMaker() self.workspaces = self.wsMaker() self.workspaces_expected = self.wsMaker(apriori=True) self.cl = cl self.scale = 1.0 self.sigma_mu = None self.alreadyBeenThere = ( False # boolean to detect wether self.signals has returned to an older value ) # self.checkPyhfVersion() self.welcome()
[docs] def welcome(self): """ greet the world """ if pyhfinfo["hasgreeted"]: return logger.info( f"Pyhf interface, we are using v{str(pyhfinfo['ver'])}, with {pyhfinfo['backend']} v{str(pyhfinfo['backendver'])} as backend." ) pyhfinfo["hasgreeted"] = True
[docs] def checkPyhfVersion(self): """ check the pyhf version, currently we need 0.6.1+ """ if pyhfinfo["ver"] < pyhfinfo["required"]: logger.warning( f"pyhf version is {str(pyhfinfo['ver'])}. SModelS currently requires pyhf>={str(pyhfinfo['required'])}. You have been warned." )
[docs] def rescale(self, factor): """ Rescales the signal predictions (self.nsignals) and processes again the patches and workspaces :return: updated list of patches and workspaces (self.patches, self.workspaces and self.workspaces_expected) """ for jsName in self.nsignals.keys(): for regionName in self.nsignals[jsName].keys(): self.nsignals[jsName][regionName] = self.nsignals[jsName][regionName]*factor try: self.alreadyBeenThere = self.nsignals == self.nsignals_2 except AttributeError: pass self.scale *= factor logger.debug("new signal scale : {}".format(self.scale)) self.patches = self.patchMaker() self.workspaces = self.wsMaker() self.workspaces_expected = self.wsMaker(apriori=True) try: self.nsignals_2 = self.nsignals_1.copy() # nsignals at previous-to-previous loop except AttributeError: pass self.nsignals_1 = self.nsignals.copy() # nsignals at previous loop
[docs] def changeChannelName ( self, srInfo ): """ changes the channel names in the json to match the SModelS name. FIXME this is only a hack for now, should be replaced by a self-respecting dataIdMap in the database itself, that maps the smodels names to the pyhf names explicitly. This method will then go away! """ operators= [] operator = {} # Operator for renaming the channels according to their region name from the database operator["op"] = "replace" operator["path"] = srInfo["path"].replace('samples/0','name') operator["value"] = srInfo["smodelsName"] operators.append(operator) operator = {} # Operator for renaming the observations according to their region name from the database operator["op"] = "replace" operator["path"] = srInfo["path"].replace('channels','observations').replace('samples/0','name') operator["value"] = srInfo["smodelsName"] operators.append(operator) return operators
[docs] def patchMaker(self): """ Method that creates the list of patches to be applied to the self.inputJsons workspaces, one for each region given the self.nsignals and the informations available in self.channelsInfo and the content of the self.inputJsons NB: It seems we need to include the change of the "modifiers" in the patches as well :return: the list of patches, one for each workspace """ if self.channelsInfo == None: return None # Constructing the patches to be applied on the main workspace files patches = [] for ws, info, (jsFileName,jsFile) in zip(self.inputJsons, self.channelsInfo, self.data.jsonFiles.items() ): patch = [] for srInfo in info["signalRegions"]: operator = {} # Operator for patching the signal operator["op"] = "add" operator["path"] = srInfo["path"] value = {} #value["data"] = srInfo['signal'] sr_order = srInfo["smodelsName"].split(";") nsignals = self.nsignals[jsFileName] # ic ( nsignals, sr_order, srInfo, jsFileName ) value["data"] = [ nsignals[x] for x in sr_order ] #import sys, IPython; IPython.embed( colors = "neutral" ); sys.exit() # sys.exit() value["modifiers"] = [] value["modifiers"].append({"data": None, "type": "normfactor", "name": "mu_SIG"}) value["modifiers"].append({"data": None, "type": "lumi", "name": "lumi"}) if self.data.signalUncertainty is not None and self.data.signalUncertainty != 0: # Uncomment the line below to add a MC statistical uncertainty. # value["modifiers"].append({"data": [sigBin*self.data.signalUncertainty for sigBin in value["data"]], "type": "staterror", "name": "MCError"}) value["modifiers"].append({"data": {"hi_data": [sigBin*(1+self.data.signalUncertainty) for sigBin in value["data"]], "lo_data": [sigBin*(1-self.data.signalUncertainty) for sigBin in value["data"]] }, "type": "histosys", "name": "signalUncertainty" }) value["name"] = "bsm" operator["value"] = value patch.append(operator) ## FIXME this if block is a hack, only to be used until ## smodels-database has proper dataIdMaps, mapping the smodels ## SR names to the pyhf ones. once these dataIdMaps are in place, ## they should be used instead of this hack that rewrites ## the pyhf channel names if False: # srInfo["smodelsName"]: # If the CRs/SRs have a name in the database (it is always True when running SModelS the usual way) operators = self.changeChannelName ( srInfo ) for operator in operators: patch.append(operator) for region in info["otherRegions"]: if region['type'] == 'CR' and self.includeCRs: continue else: patch.append({"op": "remove", "path": region['path']}) # operator for removing useless regions patches.append(patch) return patches
[docs] def wsMaker(self, apriori=False): """ Apply each region patch (self.patches) to his associated json (self.inputJsons) to obtain the complete workspaces :param apriori: - If set to 'True': Replace the observation data entries of each workspace by the corresponding sum of the expected yields \ - Else: The observed yields put in the workspace are the ones written in the corresponfing json dictionary :returns: the list of patched workspaces """ if self.patches == None: return None if self.nWS == 1: try: wsDict = jsonpatch.apply_patch(self.inputJsons[0], self.patches[0]) if apriori == True: # Replace the observation data entries by the corresponding sum of the expected yields for obs in wsDict["observations"]: for ch in wsDict["channels"]: # Finding matching observation and bkg channel if obs["name"] == ch["name"]: bkg = [0.0] * len(obs["data"]) for sp in ch["samples"]: if sp["name"] == "bsm": continue for iSR in range(len(obs["data"])): # Summing over all bkg samples for each bin/SR bkg[iSR] += sp["data"][iSR] # logger.debug('bkgs for channel {} :\n{}'.format(obs['name'], bkg)) obs["data"] = bkg return [pyhf.Workspace(wsDict)] except (pyhf.exceptions.InvalidSpecification, KeyError) as e: logger.error("The json file is corrupted:\n{}".format(e)) return None else: workspaces = [] for js, patch in zip(self.inputJsons, self.patches): wsDict = jsonpatch.apply_patch(js, patch) if apriori == True: # Replace the observation data entries by the corresponding sum of the expected yields for obs in wsDict["observations"]: for ch in wsDict["channels"]: # Finding matching observation and bkg channel if obs["name"] == ch["name"]: bkg = [0.0] * len(obs["data"]) for sp in ch["samples"]: if sp["name"] == "bsm": continue for iSR in range(len(obs["data"])): # Summing over all bkg samples for each bin/SR bkg[iSR] += sp["data"][iSR] # logger.debug('bkgs for channel {} :\n{}'.format(obs['name'], bkg)) obs["data"] = bkg try: ws = pyhf.Workspace(wsDict) except (pyhf.exceptions.InvalidSpecification, KeyError) as e: logger.error( "Json file number {} is corrupted:\n{}".format( self.inputJsons.index(json), e ) ) return None workspaces.append(ws) return workspaces
[docs] def backup(self): self.bu_signal = copy.deepcopy(self.data.nsignals)
[docs] def restore(self): if not hasattr(self, "bu_signal"): return self.data.nsignals = copy.deepcopy(self.bu_signal) del self.bu_signal
[docs] def get_position(self, name, model): """ :param name: name of the parameter one wants to increase :param model: the pyhf model :return: the position of the parameter that has to be modified in order to turn positive the negative total yield """ position = 0 for par in model.config.par_order: if name == par: return position else: position += model.config.param_set(par).n_parameters
[docs] def rescaleBgYields(self, init_pars, workspace, model): """ Increase initial value of nuisance parameters until the starting value of the total yield (mu*signal + background) is positive :param init_pars: list of initial parameters values one wants to increase in order to turn positive the negative total yields :param workspace: the pyhf workspace :param model: the pyhf model :return: the list of initial parameters values that gives positive total yields """ for pos,yld in enumerate(model.expected_actualdata(init_pars)): if yld < 0: sum_bins = 0 positive = False # Find the SR and the bin (the position inside the SR) corresponding to the negative total yield for channel,nbins in model.config.channel_nbins.items(): sum_bins += nbins if pos < sum_bins: SR = channel bin_num = pos-sum_bins+nbins # The position of the bin in the SR break # Find the name of the parameter that modifies the background yield for channel in workspace['channels']: if channel['name'] == SR: for sample in range(len(channel['samples'])): for modifier in channel['samples'][sample]['modifiers']: # The multiplicative modifier that will be increased is of type 'staterror' (others may work as well) # In case of a simplified json, let's take the only parameter called 'totalError' if ('type' in modifier.keys() and modifier['type'] == 'staterror') or ('type' in modifier.keys() and modifier['type'] == 'normsys') or modifier['name'] == 'totalError': name = modifier['name'] # Find the position of the parameter within the pyhf list of parameters position = self.get_position(name,model)+bin_num # Find the upper bound of the parameter that will be increased max_bound = model.config.suggested_bounds()[position][1] # If the parameter one wants to increase is not fixed, add 0.1 to its value (it is an arbitrary step) until # the total yield becomes positive or the parameter upper bound is reached if not model.config.suggested_fixed()[position]: while model.expected_actualdata(init_pars)[pos] < 0: if init_pars[position] + 0.1 < max_bound: init_pars[position] += 0.1 else: init_pars[position] = max_bound break positive = model.expected_actualdata(init_pars)[pos] >= 0 if positive: break if positive: break return init_pars
[docs] def likelihood( self, mu=1.0, workspace_index=None, return_nll=False, expected=False): """ Returns the value of the likelihood. \ Inspired by the 'pyhf.infer.mle' module but for non-log likelihood :param workspace_index: supply index of workspace to use. If None, \ choose index of best combo :param return_nll: if true, return nll, not llhd :param expected: if False, compute expected values, if True, \ compute a priori expected, if "posteriori" compute posteriori \ expected """ if workspace_index in self.data.cached_likelihoods[expected] and \ mu in self.data.cached_likelihoods[expected][workspace_index]: return self.data.cached_likelihoods[expected][workspace_index][mu] logger.debug("Calling likelihood") if type(workspace_index) == float: logger.error("workspace index is float") # logger.error("expected flag needs to be heeded!!!") with warnings.catch_warnings(): warnings.filterwarnings( "ignore", "Values in x were outside bounds during a minimize step, clipping to bounds", ) # warnings.filterwarnings ( "ignore", "", module="pyhf.exceptions" ) old_index = workspace_index if workspace_index == None: workspace_index = self.getBestCombinationIndex() if workspace_index == None: return None self.backup() try: if abs(mu - 1.0) > 1e-6: for jsName in self.data.nsignals.keys(): for regionName in self.data.nsignals[jsName].keys(): self.data.nsignals[jsName][regionName] = self.data.nsignals[jsName][regionName]*mu self.__init__(self.data, self.cl, self.lumi) ### allow this, for computation of l_SM # if self.zeroSignalsFlag[workspace_index] == True: # logger.warning( f"Workspace number {workspace_index} has zero signals" ) # return None workspace = self.updateWorkspace(workspace_index, expected=expected) # Same modifiers_settings as those used when running the 'pyhf cls' command line msettings = { "normsys": {"interpcode": "code4"}, "histosys": {"interpcode": "code4p"}, } model = workspace.model(modifier_settings=msettings) wsData = workspace.data(model) _, nllh = pyhf.infer.mle.fixed_poi_fit( 1.0, wsData, model, return_fitted_val=True, maxiter=200 ) except (pyhf.exceptions.FailedMinimization, ValueError) as e: logger.error(f"pyhf fixed_poi_fit failed for {list(self.data.jsonFiles)[workspace_index]} for mu={mu}: {e}") # lets try with different initialisation init, n_ = pyhf.infer.mle.fixed_poi_fit( 0.0, workspace.data(model), model, return_fitted_val=True, maxiter=200 ) initpars = init.tolist() initpars[model.config.poi_index] = 1. # Try to turn positive all the negative total yields (mu*signal + background) evaluated with the initial parameters initpars = self.rescaleBgYields(initpars, workspace, model) # If the a total yield is still negative with the increased initial parameters, print a message if not all([True if yld >= 0 else False for yld in model.expected_actualdata(initpars)]): logger.debug(f'Negative total yield after increasing the initial parameters for mu={mu}') try: bestFitParam, nllh = pyhf.infer.mle.fixed_poi_fit( 1.0, wsData, model, return_fitted_val=True, init_pars=initpars, maxiter=2000, ) # If a total yield is negative with the best profiled parameters, return None if not all([True if yld >= 0 else False for yld in model.expected_actualdata(bestFitParam)]): self.restore() return self.exponentiateNLL(None, not return_nll) except (pyhf.exceptions.FailedMinimization, ValueError) as e: logger.info(f"pyhf fixed_poi_fit failed twice for {list(self.data.jsonFiles)[workspace_index]} for mu={mu}: {e}") self.restore() return self.exponentiateNLL(None, not return_nll) ret = nllh.tolist() try: ret = float(ret) except: ret = float(ret[0]) # Cache the likelihood (but do we use it?) if not workspace_index in self.data.cached_likelihoods[expected]: self.data.cached_likelihoods[expected][workspace_index]={} ret = self.exponentiateNLL(ret, not return_nll) self.data.cached_likelihoods[expected][ workspace_index ][mu] = ret if old_index == None: if not None in self.data.cached_likelihoods[expected]: self.data.cached_likelihoods[expected][None]={} self.data.cached_likelihoods[expected][None][mu] = ret # print ( "now leaving the fit mu=", mu, "llhd", ret, "nsig was", self.data.nsignals ) self.restore() return ret
[docs] def getBestCombinationIndex(self): """find the index of the best expected combination""" if self.nWS == 1: return 0 if self.data.cacheBestCombo != None: return self.data.cacheBestCombo logger.debug( f"Finding best expected combination among {self.nWS} workspace(s)" ) ulMin = float("+inf") i_best = None for i_ws in range(self.nWS): if self.data.totalYield == 0.: continue if self.zeroSignalsFlag[i_ws] == True: logger.debug( f"Workspace number {i_ws} has zero signals" ) continue else: ul = self.getUpperLimitOnMu(expected=True, workspace_index=i_ws) if ul == None: continue if ul < ulMin: ulMin = ul i_best = i_ws self.data.cacheBestCombo = i_best return i_best
[docs] def exponentiateNLL(self, twice_nll, doIt): """if doIt, then compute likelihood from nll, else return nll""" if twice_nll == None: return None #if doIt: # return 0.0 #return 9000.0 if doIt: return np.exp(-twice_nll / 2.0) return twice_nll / 2.0
[docs] def compute_invhess(self, x, data, model, index, epsilon=1e-05): ''' if inv_hess is not given by the optimiser, calculate numerically by evaluating second order partial derivatives using 2 point central finite differences method :param x: parameter values given to pyhf.infer.mle.twice_nll taken from pyhf.infer.mle.fit - optimizer.x (best_fit parameter values) :param data: workspace.data(model) passed to pyhf.infer.mle.fit :param model: model passed to pyhf.infer.mle.fit :param index: index of the POI Note : If len(x) <=5, compute the entire hessian matrix and ind its inverse. Else, compute the hessian at the index of the POI and return its inverse (diagonal approximation) returns the inverse hessian at the index of the poi ''' n = len(x) if n<=5: hessian = np.zeros((n,n)) for i in range(n): for j in range(i+1): eps_i = epsilon*np.eye(n)[:,i] #identity along ith column eps_j = epsilon*np.eye(n)[:,j] #twice_nll is the objective function, hence need to find its hessian par_11 = pyhf.infer.mle.twice_nll(x + eps_i + eps_j, data, model) par_12 = pyhf.infer.mle.twice_nll(x - eps_i + eps_j, data, model) par_21 = pyhf.infer.mle.twice_nll(x + eps_i - eps_j, data, model) par_22 = pyhf.infer.mle.twice_nll(x - eps_i - eps_j, data, model) partial_xi_xj = (par_11 - par_12 - par_21 +par_22)/(4*epsilon**2) hessian[i,j] = partial_xi_xj if i!=j: hessian[j,i] = partial_xi_xj def is_positive_definite(matrix): eigenvalues = np.linalg.eigvals(matrix) return all(eig > 0 for eig in eigenvalues) if not is_positive_definite(hessian): #raise ValueError("Hessian Matrix is not positive definite") logger.warning("Hessian Matrix is not positive definite") inverse_hessian = np.linalg.inv(hessian) #return the inverse hessian at the poi return inverse_hessian[index][index] #calculate only the hessian at the poi and return its inverse (an approximation!) eps_i = epsilon*np.eye(n)[:,index] par_11 = pyhf.infer.mle.twice_nll(x + 2*eps_i, data, model) par_12 = pyhf.infer.mle.twice_nll(x, data, model) par_22 = pyhf.infer.mle.twice_nll(x - 2*eps_i, data, model) hessian = (par_11 - 2*par_12 + par_22)/(4*epsilon**2) #return the inverse hessian at the poi return 1.0/hessian
[docs] def lmax( self, workspace_index=None, return_nll=False, expected=False, allowNegativeSignals=False): """ Returns the negative log max likelihood :param return_nll: if true, return nll, not llhd :param workspace_index: supply index of workspace to use. If None, \ choose index of best combo :param expected: if False, compute expected values, if True, \ compute a priori expected, if "posteriori" compute posteriori \ expected :param allowNegativeSignals: if False, then negative nsigs are replaced \ with 0. """ if workspace_index in self.data.cached_lmaxes[expected]: return self.data.cached_lmaxes[expected][workspace_index] # logger.error("expected flag needs to be heeded!!!") logger.debug("Calling lmax") with warnings.catch_warnings(): warnings.filterwarnings( "ignore", "Values in x were outside bounds during a minimize step, clipping to bounds", ) self.__init__(self.data, self.cl, self.lumi) old_index = workspace_index if workspace_index == None: workspace_index = self.getBestCombinationIndex() if workspace_index != None: if self.zeroSignalsFlag[workspace_index] == True: logger.warning( f"Workspace number {workspace_index} has zero signals" ) return None else: workspace = self.updateWorkspace(workspace_index, expected=expected) else: return None # Same modifiers_settings as those used when running the 'pyhf cls' command line msettings = {"normsys": {"interpcode": "code4"}, "histosys": {"interpcode": "code4p"}} model = workspace.model(modifier_settings=msettings) muhat, maxNllh = model.config.suggested_init(), float("nan") sigma_mu = float("nan") # obs = workspace.data(model) try: bounds = model.config.suggested_bounds() if allowNegativeSignals: bounds[model.config.poi_index] = (-5., 10. ) try: muhat, maxNllh, o = pyhf.infer.mle.fit(workspace.data(model), model, return_fitted_val=True, par_bounds = bounds, return_result_obj = True ) #removed jacobain way of computing sigma_mu except (pyhf.exceptions.FailedMinimization,ValueError) as e: pass if hasattr ( o, "hess_inv" ): # maybe the backend gets changed sigma_mu = float ( np.sqrt ( o.hess_inv[model.config.poi_index][model.config.poi_index] ) ) * self.scale else: sigma_mu_temp = 1. try: warnings.filterwarnings( "ignore", category=RuntimeWarning ) _1, _2, o = pyhf.infer.mle.fit(workspace.data(model), model, return_fitted_val=True, return_result_obj = True, init_pars = list(muhat), method="BFGS" ) sigma_mu_temp = float ( np.sqrt ( o.hess_inv[model.config.poi_index][model.config.poi_index] ) ) except (pyhf.exceptions.FailedMinimization,ValueError) as e: pass if abs ( sigma_mu_temp - 1.0 ) > 1e-5: sigma_mu = sigma_mu_temp * self.scale else: warnings.filterwarnings( "ignore", category=RuntimeWarning ) _, _, o = pyhf.infer.mle.fit(workspace.data(model), model, return_fitted_val=True, return_result_obj = True, method="L-BFGS-B" ) sigma_mu_temp = float ( np.sqrt ( o.hess_inv.todense()[model.config.poi_index][model.config.poi_index] ) ) if abs ( sigma_mu_temp - 1.0 ) < 1e-8: # Fischer information is nonsense #Calculate inv_hess numerically inv_hess = self.compute_invhess(o.x, workspace.data(model), model, model.config.poi_index) sigma_mu_temp = float ( np.sqrt ( inv_hess)) if abs ( sigma_mu_temp - 1.0 ) > 1e-8: sigma_mu = sigma_mu_temp * self.scale else: # sigma_mu is still nonsense logger.warning("sigma_mu computation failed, even with inv_hess numercially computed. sigma_mu will be set to 1.") sigma_mu = 1. except (pyhf.exceptions.FailedMinimization, ValueError) as e: if pyhfinfo["backend"] == "pytorch": logger.warning(f"pyhf mle.fit failed {e} with {pyhfinfo['backend']} v{pyhfinfo['backendver']}. Calculating inv_hess numerically.") if pyhfinfo["backend"] == "numpy": logger.debug(f"pyhf mle.fit failed {e} with {pyhfinfo['backend']} v{pyhfinfo['backendver']}. Calculating inv_hess numerically.") #Calculate inv_hess numerically inv_hess = self.compute_invhess(o.x, workspace.data(model), model, model.config.poi_index) sigma_mu = float ( np.sqrt ( inv_hess)) * self.scale muhat = float ( muhat[model.config.poi_index]*self.scale ) try: lmax = maxNllh.tolist() except: lmax= maxNllh try: lmax = float(lmax) except: lmax = float(lmax[0]) lmax = self.exponentiateNLL(lmax, not return_nll) ret = { "lmax": lmax, "muhat": muhat, "sigma_mu": sigma_mu } self.data.cached_lmaxes[expected][workspace_index] = ret if old_index == None: self.data.cached_lmaxes[expected][None] = ret # print ( f"@@11 ret {ret}" ) return ret
[docs] def updateWorkspace(self, workspace_index=None, expected=False): """ Small method used to return the appropriate workspace :param workspace_index: the index of the workspace to retrieve from the corresponding list :param expected: if False, retuns the unmodified (but patched) workspace. Used for computing observed or aposteriori expected limits. if True, retuns the modified (and patched) workspace, where obs = sum(bkg). Used for computing apriori expected limit. """ if self.nWS == 1: if expected == True: return self.workspaces_expected[0] else: return self.workspaces[0] else: if workspace_index == None: logger.error("No workspace index was provided.") if expected == True: return self.workspaces_expected[workspace_index] else: return self.workspaces[workspace_index]
[docs] def getUpperLimitOnSigmaTimesEff(self, expected=False, workspace_index=None): """ Compute the upper limit on the fiducial cross section sigma times efficiency: - by default, the combination of the workspaces contained into self.workspaces - if workspace_index is specified, self.workspace[workspace_index] (useful for computation of the best upper limit) :param expected: - if set to 'True': uses expected SM backgrounds as signals - else: uses 'self.nsignals' :param workspace_index: - if different from 'None': index of the workspace to use for upper limit - else: choose best combo :return: the upper limit on sigma times eff at 'self.cl' level (0.95 by default) """ if self.data.totalYield == 0.: return None else: ul = self.getUpperLimitOnMu( expected=expected, workspace_index=workspace_index) if ul == None: return ul if self.lumi is None: logger.error(f"asked for upper limit on fiducial xsec, but no lumi given with the data") return ul xsec = self.data.totalYield / self.lumi return ul * xsec
# Trying a new method for upper limit computation : # re-scaling the signal predictions so that mu falls in [0, 10] instead of # looking for mu bounds # Usage of the index allows for rescaling
[docs] def getUpperLimitOnMu(self, expected=False, workspace_index=None): """ Compute the upper limit on the signal strength modifier with: - by default, the combination of the workspaces contained into self.workspaces - if workspace_index is specified, self.workspace[workspace_index] (useful for computation of the best upper limit) :param expected: - if set to 'True': uses expected SM backgrounds as signals - else: uses 'self.nsignals' :param workspace_index: - if different from 'None': index of the workspace to use for upper limit - else: choose best combo :return: the upper limit at 'self.cl' level (0.95 by default) """ self.__init__(self.data, self.cl, self.lumi) if workspace_index in self.data.cachedULs[expected]: ret = self.data.cachedULs[expected][workspace_index] return ret with warnings.catch_warnings(): warnings.filterwarnings( "ignore", "Values in x were outside bounds during a minimize step, clipping to bounds", ) startUL = time.time() logger.debug("Calling getUpperLimitOnMu") if self.data.errorFlag or self.workspaces == None: # For now, this flag can only be turned on by PyhfData.checkConsistency return None if ( all([self.zeroSignalsFlag[workspace_index] for workspace_index in range(self.nWS)]) == True ): logger.debug( f"There is (are) {self.nWS} workspace(s) and no signal(s) was (were) found" ) return None if workspace_index == None: workspace_index = self.getBestCombinationIndex() if workspace_index == None: logger.debug("Best combination index not found") return None def clsRoot(mu : float ) -> float: # If expected == False, use unmodified (but patched) workspace # If expected == True, use modified workspace where observations = sum(bkg) (and patched) # If expected == posteriori, use unmodified (but patched) workspace workspace = self.updateWorkspace(workspace_index, expected=expected) # Same modifiers_settings as those use when running the 'pyhf cls' command line msettings = { "normsys": {"interpcode": "code4"}, "histosys": {"interpcode": "code4p"}, } with warnings.catch_warnings(): warnings.simplefilter("ignore", category=(DeprecationWarning,UserWarning)) model = workspace.model(modifier_settings=msettings) bounds = model.config.suggested_bounds() bounds[model.config.poi_index] = (0, 10) start = time.time() args = {} args["return_expected"] = expected == "posteriori" args["par_bounds"] = bounds # args["maxiter"]=100000 pver = float(pyhf.__version__[:3]) stat = "qtilde" if pver < 0.6: args["qtilde"] = True else: args["test_stat"] = stat with np.testing.suppress_warnings() as sup: if pyhfinfo["backend"] == "numpy": sup.filter(RuntimeWarning, r"invalid value encountered in log") # print ("expected", expected, "return_expected", args["return_expected"], "mu", mu, "\nworkspace.data(model) :", workspace.data(model, include_auxdata = False), "\nworkspace.observations :", workspace.observations, "\nobs[data] :", workspace['observations']) # ic ( workspace["channels"][0]["samples"][0]["data"] ) # import sys, IPython; IPython.embed( colors = "neutral" ); sys.exit() try: result = pyhf.infer.hypotest(mu, workspace.data(model), model, **args) except Exception as e: logger.error(f"when testing hypothesis {mu}, caught exception: {e}") result = float("nan") if expected == "posteriori": result = [float("nan")] * 2 end = time.time() logger.debug( f"Hypotest elapsed time : {end-start:1.4f} secs" ) logger.debug(f"result for {mu} {result}") if expected == "posteriori": logger.debug("computing a-posteriori expected limit") logger.debug("expected = {}, mu = {}, result = {}".format(expected, mu, result)) try: CLs = float(result[1].tolist()) except TypeError: CLs = float(result[1][0]) else: logger.debug("expected = {}, mu = {}, result = {}".format(expected, mu, result)) CLs = float(result) # logger.debug(f"Call of clsRoot({mu}) -> {1.0 - CLs}" ) return 1.0 - self.cl - CLs # Rescaling signals so that mu is in [0, 10] factor = 3.0 wereBothLarge = False wereBothTiny = False nattempts = 0 nNan = 0 lo_mu, med_mu, hi_mu = 0.2, 1.0, 5.0 # ic ( "A", lo_mu, hi_mu, clsRoot(lo_mu), clsRoot(hi_mu) ) # print ( "starting with expected", expected ) while "mu is not in [lo_mu,hi_mu]": nattempts += 1 if nNan > 5: # logger.warning("encountered NaN 5 times while trying to determine the bounds for brent bracketing. now trying with q instead of qtilde test statistic") return None # nattempts = 0 if nattempts > nattempts_max: logger.warning( f"tried {nattempts_max} times to determine the bounds for brent bracketing. we abort now." ) return None # Computing CL(1) - 0.95 and CL(10) - 0.95 once and for all rt1 = clsRoot(lo_mu) # rt5 = clsRoot(med_mu) rt10 = clsRoot(hi_mu) # print ( "we are at",lo_mu,med_mu,hi_mu,"values at", rt1, rt5, rt10, "scale at", self.scale,"factor at", factor ) if rt1 < 0.0 and 0.0 < rt10: # Here's the real while condition break if self.alreadyBeenThere: factor = 1 + .75 * (factor - 1) logger.debug("Diminishing rescaling factor") if np.isnan(rt1): rt5 = clsRoot(med_mu) if rt5 < 0.0 and rt10 > 0.0: lo_mu = med_mu med_mu = np.sqrt(lo_mu * hi_mu) continue if rt10 < 0.0: ## also try to increase hi_mu hi_mu = hi_mu + (10.0 - hi_mu) * 0.5 med_mu = np.sqrt(lo_mu * hi_mu) nNan += 1 self.rescale(factor) continue if np.isnan(rt10): rt5 = clsRoot(med_mu) if rt5 > 0.0 and rt1 < 0.0: hi_mu = med_mu med_mu = np.sqrt(lo_mu * hi_mu) continue if rt1 > 0.0: ## also try to decrease lo_mu lo_mu = lo_mu * 0.5 med_mu = np.sqrt(lo_mu * hi_mu) nNan += 1 self.rescale(1 / factor) continue # Analyzing previous values of wereBoth*** if rt10 < 0 and rt1 < 0 and wereBothLarge: factor = 1 + (factor - 1) / 2 logger.debug("Diminishing rescaling factor") if rt10 > 0 and rt1 > 0 and wereBothTiny: factor = 1 + (factor - 1) / 2 logger.debug("Diminishing rescaling factor") # Preparing next values of wereBoth*** wereBothTiny = rt10 < 0 and rt1 < 0 wereBothLarge = rt10 > 0 and rt1 > 0 # Main rescaling code if rt10 < 0.0: self.rescale(factor) continue if rt1 > 0.0: self.rescale(1 / factor) continue # Finding the root (Brent bracketing part) logger.debug( f"Final scale : {self.scale}" ) logger.debug("Starting brent bracketing") ul = optimize.brentq(clsRoot, lo_mu, hi_mu, rtol=1e-3, xtol=1e-3) endUL = time.time() logger.debug( f"getUpperLimitOnMu elapsed time : {endUL-startUL:1.4f} secs" ) ul = ul * self.scale self.data.cachedULs[expected][workspace_index] = ul return ul # self.scale has been updated within self.rescale() method
if __name__ == "__main__": ### Needs to be updated print("Needs to be updated") # C = [ # 18774.2, # -2866.97, # -5807.3, # -4460.52, # -2777.25, # -1572.97, # -846.653, # -442.531, # -2866.97, # 496.273, # 900.195, # 667.591, # 403.92, # 222.614, # 116.779, # 59.5958, # -5807.3, # 900.195, # 1799.56, # 1376.77, # 854.448, # 482.435, # 258.92, # 134.975, # -4460.52, # 667.591, # 1376.77, # 1063.03, # 664.527, # 377.714, # 203.967, # 106.926, # -2777.25, # 403.92, # 854.448, # 664.527, # 417.837, # 238.76, # 129.55, # 68.2075, # -1572.97, # 222.614, # 482.435, # 377.714, # 238.76, # 137.151, # 74.7665, # 39.5247, # -846.653, # 116.779, # 258.92, # 203.967, # 129.55, # 74.7665, # 40.9423, # 21.7285, # -442.531, # 59.5958, # 134.975, # 106.926, # 68.2075, # 39.5247, # 21.7285, # 11.5732, # ] # nsignal = [x / 100.0 for x in [47, 29.4, 21.1, 14.3, 9.4, 7.1, 4.7, 4.3]] # m = PyhfData( # observed=[1964, 877, 354, 182, 82, 36, 15, 11], # backgrounds=[2006.4, 836.4, 350.0, 147.1, 62.0, 26.2, 11.1, 4.7], # covariance=C, # # third_moment = [ 0.1, 0.02, 0.1, 0.1, 0.003, 0.0001, 0.0002, 0.0005 ], # third_moment=[0.0] * 8, # nsignal=nsignal, # name="ATLAS-SUSY-2018-31 model", # ) # ulComp = PyhfUpperLimitComputer(cl=0.95) # # uls = ulComp.getUpperLimitOnMu ( Data ( 15,17.5,3.2,0.00454755 ) ) # # print ( "uls=", uls ) # ul_old = 131.828 * sum( # nsignal # ) # With respect to the older refernece value one must normalize the xsec # print("old ul=", ul_old) # ul = ulComp.getUpperLimitOnMu(m) # print("ul", ul)