Source code for tools.pyhfInterface

#!/usr/bin/env python3

"""
.. module:: pyhfInterface
   :synopsis: Code that delegates the computation of limits and likelihoods to
              pyhf.

.. moduleauthor:: Gael Alguero <gaelalguero@gmail.com>
.. moduleauthor:: Wolfgang Waltenberger <wolfgang.waltenberger@gmail.com>

"""
import jsonpatch
import warnings
import jsonschema
import copy

import warnings
warnings.filterwarnings("ignore")

jsonver = ""
try:
    import importlib.metadata

    jsonver = importlib.metadata.version("jsonschema")
except Exception as e:
    try:
        from jsonschema import __version__ as jsonver
    except Exception as e:
        pass
if jsonver[0] == "2":
    # if jsonschema.__version__[0] == "2": ## deprecated
    print(
        "[SModelS:pyhfInterface] jsonschema is version %s, we need > 3.x.x"
        % (jsonschema.__version__)
    )
    sys.exit()

import time, sys, os

try:
    import pyhf
except ModuleNotFoundError:
    print("[SModelS:pyhfInterface] pyhf import failed. Is the module installed?")
    sys.exit(-1)

ver = pyhf.__version__.split(".")
if ver[1] == "4" or (ver[1] == "5" and ver[2] in ["0", "1"]):
    print("[SModelS:pyhfInterface] WARNING you are using pyhf v%s." % pyhf.__version__)
    print("[SModelS:pyhfInterface] We recommend pyhf >= 0.5.2. Please try to update pyhf ASAP!")

pyhfinfo = {
    "backend": "numpy",
    "hasgreeted": False,
    "backendver": "?",
    "ver": ver,
    "required": "0.6.1".split("."),
}

try:
    pyhf.set_backend(b"pytorch")
    import torch

    pyhfinfo["backend"] = "pytorch"
    pyhfinfo["backendver"] = torch.__version__
except pyhf.exceptions.ImportBackendError as e:
    print(
        "[SModelS:pyhfInterface] WARNING could not set pytorch as the pyhf backend, falling back to the default."
    )
    print("[SModelS:pyhfInterface] We however recommend that pytorch be installed.")
    import numpy

    pyhfinfo["backendver"] = numpy.version.full_version

    warnings.filterwarnings("ignore", r"invalid value encountered in log")

from scipy import optimize
import numpy as np
from smodels.tools.smodelsLogging import logger
import logging
import math

logging.getLogger("pyhf").setLevel(logging.CRITICAL)


[docs]def getLogger(): """ Configure the logging facility. Maybe adapted to fit into your framework. """ import logging logger = logging.getLogger("pyhfInterface") # formatter = logging.Formatter('%(module)s - %(levelname)s: %(message)s') # ch = logging.StreamHandler() # ch.setFormatter(formatter) # ch.setLevel(logging.DEBUG) # logger.addHandler(ch) logger.setLevel(logging.DEBUG) return logger
countWarning = {"llhdszero": 0} # logger=getLogger()
[docs]class PyhfData: """ Holds data for use in pyhf :ivar nsignals: signal predictions list divided into sublists, one for each json file :ivar inputJsons: list of json instances :ivar jsonFiles: optional list of json files :ivar nWS: number of workspaces = number of json files """ def __init__(self, nsignals, inputJsons, jsonFiles=None ): self.nsignals = nsignals # fb self.getTotalYield() self.inputJsons = inputJsons self.cached_likelihoods = {} ## cache of likelihoods (actually twice_nlls) self.cached_lmaxes = {} # cache of lmaxes (actually twice_nlls) self.cachedULs = {False: {}, True: {}, "posteriori": {}} self.jsonFiles = jsonFiles self.combinations = None if jsonFiles != None: self.combinations = [os.path.splitext(os.path.basename(js))[0] for js in jsonFiles] self.nWS = len(inputJsons) self.errorFlag = False self.getWSInfo() self.checkConsistency()
[docs] def getTotalYield ( self ): """ the total yield in all signal regions """ S = sum ( [ sum(x) for x in self.nsignals ] ) self.totalYield = S
[docs] def getWSInfo(self): """ Getting informations from the json files :ivar channelsInfo: list of dictionaries (one dictionary for each json file) containing useful information about the json files - :key signalRegions: list of dictonaries with 'json path' and 'size' (number of bins) of the 'signal regions' channels in the json files - :key otherRegions: list of strings indicating the path to the control and validation region channels """ # Identifying the path to the SR and VR channels in the main workspace files self.channelsInfo = [] # workspace specifications if not isinstance(self.inputJsons, list): logger.error("The `inputJsons` parameter must be of type list") self.errorFlag = True return for ws in self.inputJsons: wsChannelsInfo = {} wsChannelsInfo["signalRegions"] = [] wsChannelsInfo["otherRegions"] = [] if not "channels" in ws.keys(): logger.error( "Json file number {} is corrupted (channels are missing)".format( self.inputJsons.index(ws) ) ) self.channelsInfo = None return for i_ch, ch in enumerate(ws["channels"]): if ch["name"][:2] == "SR": # if channel name starts with 'SR' wsChannelsInfo["signalRegions"].append( { "path": "/channels/" + str(i_ch) + "/samples/0", # Path of the new sample to add (signal prediction) "size": len(ch["samples"][0]["data"]), } ) # Number of bins else: wsChannelsInfo["otherRegions"].append("/channels/" + str(i_ch)) wsChannelsInfo["otherRegions"].sort( key=lambda path: path.split("/")[-1], reverse=True ) # Need to sort correctly the paths to the channels to be removed self.channelsInfo.append(wsChannelsInfo)
[docs] def checkConsistency(self): """ Check various inconsistencies of the PyhfData attributes :param zeroSignalsFlag: boolean identifying if all SRs of a single json are empty """ if not isinstance(self.nsignals, list): logger.error("The `nsignals` parameter must be of type list") self.errorFlag = True if self.nWS != len(self.nsignals): logger.error( "The number of subsignals provided is different from the number of json files" ) self.errorFlag = True self.zeroSignalsFlag = list() if self.channelsInfo == None: return for wsInfo, subSig in zip(self.channelsInfo, self.nsignals): if not isinstance(subSig, list): logger.error("The `nsignals` parameter must be a two dimensional list") self.errorFlag = True nBinsJson = 0 for sr in wsInfo["signalRegions"]: nBinsJson += sr["size"] if nBinsJson != len(subSig): logger.error( "The number of signals provided is different from the number of bins for json number {} and channel number {}".format( self.channelsInfo.index(wsInfo), self.nsignals.index(subSig) ) ) self.errorFlag = True allZero = all([s == 0 for s in subSig]) # Checking if all signals matching this json are zero self.zeroSignalsFlag.append(allZero)
[docs]class PyhfUpperLimitComputer: """ Class that computes the upper limit using the jsons files and signal informations in the `data` instance of `PyhfData` """ def __init__(self, data, cl=0.95, includeCRs=False, lumi=None ): """ :param data: instance of `PyhfData` holding the signals information :param cl: confdence level at which the upper limit is desired to be computed :ivar data: created from :param data: :ivar nsignals: signal predictions list divided into sublists, one for each json file :ivar inputJsons: list of input json files as python json instances :ivar channelsInfo: list of channels information for the json files :ivar zeroSignalsFlag: list boolean flags in case all signals are zero for a specific json :ivar nWS: number of workspaces = number of json files :ivar patches: list of patches to be applied to the inputJsons as python dictionary instances :ivar workspaces: list of workspaces resulting from the patched inputJsons ;ivar workspaces_expected: list of patched workspaces with observation yields replaced by the expected ones :ivar cl: created from :param cl: :ivar scale: scale that is applied to the signal predictions, dynamically changes throughout the upper limit calculation :ivar alreadyBeenThere: boolean flag that identifies when the :ivar nsignals: accidentally passes twice at two identical values """ self.data = data self.lumi = lumi self.nsignals = copy.deepcopy ( self.data.nsignals ) logger.debug("Signals : {}".format(self.nsignals)) self.inputJsons = self.data.inputJsons self.channelsInfo = self.data.channelsInfo self.zeroSignalsFlag = self.data.zeroSignalsFlag self.nWS = self.data.nWS self.includeCRs = includeCRs self.patches = self.patchMaker() self.workspaces = self.wsMaker() self.workspaces_expected = self.wsMaker(apriori=True) self.cl = cl self.scale = 1.0 self.sigma_mu = None self.alreadyBeenThere = ( False # boolean to detect wether self.signals has returned to an older value ) self.checkPyhfVersion() self.welcome()
[docs] def welcome(self): """greet the world""" if pyhfinfo["hasgreeted"]: return logger.info( f"Pyhf interface, we are using v{'.'.join(pyhfinfo['ver'])}, with {pyhfinfo['backend']} v{pyhfinfo['backendver']} as backend." ) pyhfinfo["hasgreeted"] = True
[docs] def checkPyhfVersion(self): """check the pyhf version, currently we need 0.6.1+""" if pyhfinfo["ver"] < pyhfinfo["required"]: logger.warning( f"pyhf version is {'.'.join(pyhfinfo['ver'])}. SModelS currently requires pyhf>={'.'.join(pyhfinfo['required'])}. You have been warned." )
[docs] def rescale(self, factor): """ Rescales the signal predictions (self.nsignals) and processes again the patches and workspaces :return: updated list of patches and workspaces (self.patches, self.workspaces and self.workspaces_expected) """ self.nsignals = [[sig * factor for sig in ws] for ws in self.nsignals] try: self.alreadyBeenThere = self.nsignals == self.nsignals_2 except AttributeError: pass self.scale *= factor logger.debug("new signal scale : {}".format(self.scale)) self.patches = self.patchMaker() self.workspaces = self.wsMaker() self.workspaces_expected = self.wsMaker(apriori=True) try: self.nsignals_2 = self.nsignals_1.copy() # nsignals at previous-to-previous loop except AttributeError: pass self.nsignals_1 = self.nsignals.copy() # nsignals at previous loop
[docs] def patchMaker(self): """ Method that creates the list of patches to be applied to the `self.inputJsons` workspaces, one for each region given the `self.nsignals` and the informations available in `self.channelsInfo` and the content of the `self.inputJsons` NB: It seems we need to include the change of the "modifiers" in the patches as well :return: the list of patches, one for each workspace """ if self.channelsInfo == None: return None nsignals = self.nsignals # Constructing the patches to be applied on the main workspace files patches = [] for ws, info, subSig in zip(self.inputJsons, self.channelsInfo, self.nsignals): patch = [] for srInfo in info["signalRegions"]: nBins = srInfo["size"] operator = {} operator["op"] = "add" operator["path"] = srInfo["path"] value = {} value["data"] = subSig[:nBins] subSig = subSig[nBins:] value["modifiers"] = [] value["modifiers"].append({"data": None, "type": "normfactor", "name": "mu_SIG"}) value["modifiers"].append({"data": None, "type": "lumi", "name": "lumi"}) value["name"] = "bsm" operator["value"] = value patch.append(operator) if self.includeCRs: logger.debug("keeping the CRs") else: for path in info["otherRegions"]: patch.append({"op": "remove", "path": path}) patches.append(patch) return patches
[docs] def wsMaker(self, apriori=False): """ Apply each region patch (self.patches) to his associated json (self.inputJsons) to obtain the complete workspaces :param apriori: - If set to `True`: Replace the observation data entries of each workspace by the corresponding sum of the expected yields \ - Else: The observed yields put in the workspace are the ones written in the corresponfing json dictionary :returns: the list of patched workspaces """ if self.patches == None: return None if self.nWS == 1: try: wsDict = jsonpatch.apply_patch(self.inputJsons[0], self.patches[0]) if apriori == True: # Replace the observation data entries by the corresponding sum of the expected yields for obs in wsDict["observations"]: for ch in wsDict["channels"]: # Finding matching observation and bkg channel if obs["name"] == ch["name"]: bkg = [0.0] * len(obs["data"]) for sp in ch["samples"]: if sp["name"] == "bsm": continue for iSR in range(len(obs["data"])): # Summing over all bkg samples for each bin/SR bkg[iSR] += sp["data"][iSR] # logger.debug('bkgs for channel {} :\n{}'.format(obs['name'], bkg)) obs["data"] = bkg return [pyhf.Workspace(wsDict)] except (pyhf.exceptions.InvalidSpecification, KeyError) as e: logger.error("The json file is corrupted:\n{}".format(e)) return None else: workspaces = [] for js, patch in zip(self.inputJsons, self.patches): wsDict = jsonpatch.apply_patch(js, patch) if apriori == True: # Replace the observation data entries by the corresponding sum of the expected yields for obs in wsDict["observations"]: for ch in wsDict["channels"]: # Finding matching observation and bkg channel if obs["name"] == ch["name"]: bkg = [0.0] * len(obs["data"]) for sp in ch["samples"]: if sp["name"] == "bsm": continue for iSR in range(len(obs["data"])): # Summing over all bkg samples for each bin/SR bkg[iSR] += sp["data"][iSR] # logger.debug('bkgs for channel {} :\n{}'.format(obs['name'], bkg)) obs["data"] = bkg try: ws = pyhf.Workspace(wsDict) except (pyhf.exceptions.InvalidSpecification, KeyError) as e: logger.error( "Json file number {} is corrupted:\n{}".format( self.inputJsons.index(json), e ) ) return None workspaces.append(ws) return workspaces
[docs] def backup(self): self.bu_signal = copy.deepcopy(self.data.nsignals)
[docs] def restore(self): if not hasattr(self, "bu_signal"): return self.data.nsignals = copy.deepcopy(self.bu_signal) del self.bu_signal
[docs] def get_position(self, name, model): """ :param name: name of the parameter one wants to increase :param model: the pyhf model :return: the position of the parameter that has to be modified in order to turn positive the negative total yield """ position = 0 for par in model.config.par_order: if name == par: return position else: position += model.config.param_set(par).n_parameters
[docs] def rescaleBgYields(self, init_pars, workspace, model): """ :param init_pars: list of initial parameters values one wants to increase in order to turn positive the negative total yields :param workspace: the pyhf workspace :param model: the pyhf model :return: the list of initial parameters values that gives positive total yields """ for pos,yld in enumerate(model.expected_actualdata(init_pars)): # If a total yield (mu*signal + background) evaluated with the initial parameters is negative, increase a parameter to turn it positive if yld < 0: sum_bins = 0 # Find the SR and the bin (the position inside the SR) corresponding to the negative total yield for channel,nbins in model.config.channel_nbins.items(): sum_bins += nbins if pos < sum_bins: SR = channel # Bin number = the position of the bin in the SR # (i.e. the position of the negative total yield in the list, starting at the corresponding SR) bin_num = pos-sum_bins+nbins break # Find the name of the parameter that modifies the background yield of the negative total yield for channel in workspace['channels']: if channel['name'] == SR: for sample in range(len(channel['samples'])): for modifier in channel['samples'][sample]['modifiers']: # The multiplicative modifier (i.e. parameter) that will be increased is a 'staterror' one (others may work as well) # In case of a simplified json, let's take the only parameter called 'totalError' if ('type' in modifier.keys() and modifier['type'] == 'staterror') or modifier['name'] == 'totalError': name = modifier['name'] break # Find the position of the parameter within the pyhf list of parameters position = self.get_position(name,model)+bin_num # Find the upper bound of the parameter that will be increased max_bound = model.config.suggested_bounds()[position][1] # If the parameter one wants to increase is not fixed, add 0.1 to its value (it is an arbitrary step) until # the total yield becomes positive or the parameter upper bound is reached if not model.config.suggested_fixed()[position]: while model.expected_actualdata(init_pars)[pos] < 0: if init_pars[position] + 0.1 < max_bound: init_pars[position] += 0.1 else: init_pars[position] = max_bound break return init_pars
[docs] def likelihood( self, mu=1.0, workspace_index=None, return_nll=False, expected=False): """ Returns the value of the likelihood. \ Inspired by the `pyhf.infer.mle` module but for non-log likelihood :param workspace_index: supply index of workspace to use. If None, \ choose index of best combo :param return_nll: if true, return nll, not llhd :param expected: if False, compute expected values, if True, \ compute a priori expected, if "posteriori" compute posteriori \ expected """ logger.debug("Calling likelihood") if type(workspace_index) == float: logger.error("workspace index is float") # logger.error("expected flag needs to be heeded!!!") with warnings.catch_warnings(): warnings.filterwarnings( "ignore", "Values in x were outside bounds during a minimize step, clipping to bounds", ) # warnings.filterwarnings ( "ignore", "", module="pyhf.exceptions" ) if workspace_index == None: workspace_index = self.getBestCombinationIndex() if workspace_index == None: return None self.backup() try: if abs(mu - 1.0) > 1e-6: for i, ns in enumerate(self.data.nsignals): for j, v in enumerate(ns): self.data.nsignals[i][j] = v * mu self.__init__(self.data, self.cl, self.includeCRs, self.lumi) ### allow this, for computation of l_SM # if self.zeroSignalsFlag[workspace_index] == True: # logger.warning("Workspace number %d has zero signals" % workspace_index) # return None workspace = self.updateWorkspace(workspace_index, expected=expected) # Same modifiers_settings as those used when running the 'pyhf cls' command line msettings = { "normsys": {"interpcode": "code4"}, "histosys": {"interpcode": "code4p"}, } model = workspace.model(modifier_settings=msettings) wsData = workspace.data(model) _, nllh = pyhf.infer.mle.fixed_poi_fit( 1.0, wsData, model, return_fitted_val=True, maxiter=200 ) except (pyhf.exceptions.FailedMinimization, ValueError) as e: logger.error(f"pyhf fixed_poi_fit failed for {list(self.data.jsonFiles)[workspace_index]} for mu={mu}: {e}") # lets try with different initialisation init, n_ = pyhf.infer.mle.fixed_poi_fit( 0.0, workspace.data(model), model, return_fitted_val=True, maxiter=200 ) initpars = init.tolist() initpars[model.config.poi_index] = 1. # Try to turn positive all the negative total yields (mu*signal + background) evaluated with the initial parameters initpars = self.rescaleBgYields(initpars, workspace, model) # If the a total yield is still negative with the increased initial parameters, print a message if not all([True if yld >= 0 else False for yld in model.expected_actualdata(initpars)]): logger.debug(f'Negative total yield after increasing the initial parameters for mu={mu}') try: bestFitParam, nllh = pyhf.infer.mle.fixed_poi_fit( 1.0, wsData, model, return_fitted_val=True, init_pars=initpars, maxiter=2000, ) # If a total yield is negative with the best profiled parameters, return None if not all([True if yld >= 0 else False for yld in model.expected_actualdata(bestFitParam)]): self.restore() return self.exponentiateNLL(None, not return_nll) except (pyhf.exceptions.FailedMinimization, ValueError) as e: logger.info(f"pyhf fixed_poi_fit failed twice for {list(self.data.jsonFiles)[workspace_index]} for mu={mu}: {e}") self.restore() return self.exponentiateNLL(None, not return_nll) ret = nllh.tolist() try: ret = float(ret) except: ret = float(ret[0]) # Cache the likelihood (but do we use it?) self.data.cached_likelihoods[ workspace_index ] = ret ret = self.exponentiateNLL(ret, not return_nll) # print ( "now leaving the fit mu=", mu, "llhd", ret, "nsig was", self.data.nsignals ) self.restore() return ret
[docs] def getBestCombinationIndex(self): """find the index of the best expected combination""" if self.nWS == 1: return 0 logger.debug("Finding best expected combination among %d workspace(s)" % self.nWS) ulMin = float("+inf") i_best = None for i_ws in range(self.nWS): if self.data.totalYield == 0.: continue if self.zeroSignalsFlag[i_ws] == True: logger.debug("Workspace number %d has zero signals" % i_ws) continue else: ul = self.getUpperLimitOnMu(expected=True, workspace_index=i_ws) if ul == None: continue if ul < ulMin: ulMin = ul i_best = i_ws return i_best
[docs] def exponentiateNLL(self, twice_nll, doIt): """if doIt, then compute likelihood from nll, else return nll""" if twice_nll == None: return None #if doIt: # return 0.0 #return 9000.0 if doIt: return np.exp(-twice_nll / 2.0) return twice_nll / 2.0
[docs] def getSigmaMu(self, workspace): """given a workspace, compute a rough estimate of sigma_mu, the uncertainty of mu_hat""" obss, bgs, bgVars, nsig = {}, {}, {}, {} channels = workspace.channels for chdata in workspace["channels"]: if not chdata["name"] in channels: continue bg = 0.0 var = 0.0 ns = 0.0 for sample in chdata["samples"]: if sample["name"] == "bsm": ns = sample["data"][0] else: tbg = sample["data"][0] bg += tbg mult_factor_hi = 1. mult_factor_lo = 1. add_factor_hi = 0. add_factor_lo = 0. # To be tested - Don't take aux data into account for modifier in sample["modifiers"]: if modifier["type"] in ["normfactor","shapesys","staterror","shapefactor"]: if type(modifier["data"]) == list: mult_factor_hi *= modifier["data"][0] mult_factor_lo *= modifier["data"][0] elif modifier["type"] == "normsys": if type(modifier["data"]) == dict: mult_factor_hi *= modifier["data"]["hi"] mult_factor_lo *= modifier["data"]["lo"] elif modifier["type"] == "histosys": # If many histosys modifiers, only the last one counts if type(modifier["data"]) == dict: add_factor_hi = modifier["data"]["hi_data"][0] - tbg add_factor_lo = modifier["data"]["lo_data"][0] - tbg elif modifier["type"] == "lumi": for param in workspace["measurements"][0]["config"]["parameters"]: if param["name"] == "lumi": mult_factor_hi *= max(param["bounds"][0]) mult_factor_lo *= min(param["bounds"][0]) hi = mult_factor_hi*(tbg+add_factor_hi) lo = mult_factor_lo*(tbg+add_factor_lo) delta = max((hi - tbg, tbg - lo)) var += delta**2 nsig[chdata["name"]] = ns bgs[chdata["name"]] = bg bgVars[chdata["name"]] = var for chdata in workspace["observations"]: if not chdata["name"] in channels: continue obss[chdata["name"]] = chdata["data"][0] vars = [] for c in channels: # poissonian error if nsig[c]==0.: nsig[c]=1e-5 poiss = abs(obss[c]-bgs[c]) / nsig[c] gauss = bgVars[c] / nsig[c]**2 vars.append ( poiss + gauss ) var_mu = np.sum ( vars ) n = len ( obss ) # print ( f" sigma_mu from pyhf uncorr {var_mu} {n} " ) sigma_mu = float ( np.sqrt ( var_mu / (n**2) ) ) # self.sigma_mu = sigma_mu return sigma_mu
#import IPython #IPython.embed() #sys.exit()
[docs] def lmax( self, workspace_index=None, return_nll=False, expected=False, allowNegativeSignals=False): """ Returns the negative log max likelihood :param return_nll: if true, return nll, not llhd :param workspace_index: supply index of workspace to use. If None, \ choose index of best combo :param expected: if False, compute expected values, if True, \ compute a priori expected, if "posteriori" compute posteriori \ expected :param allowNegativeSignals: if False, then negative nsigs are replaced \ with 0. """ # logger.error("expected flag needs to be heeded!!!") logger.debug("Calling lmax") with warnings.catch_warnings(): warnings.filterwarnings( "ignore", "Values in x were outside bounds during a minimize step, clipping to bounds", ) self.__init__(self.data, self.cl, self.includeCRs, self.lumi) if workspace_index == None: workspace_index = self.getBestCombinationIndex() if workspace_index != None: if self.zeroSignalsFlag[workspace_index] == True: logger.warning("Workspace number %d has zero signals" % workspace_index) return None else: workspace = self.updateWorkspace(workspace_index, expected=expected) else: return None # Same modifiers_settings as those used when running the 'pyhf cls' command line msettings = {"normsys": {"interpcode": "code4"}, "histosys": {"interpcode": "code4p"}} model = workspace.model(modifier_settings=msettings) # obs = workspace.data(model) try: bounds = model.config.suggested_bounds() if allowNegativeSignals: bounds[model.config.poi_index] = (-5., 10. ) muhat, maxNllh = pyhf.infer.mle.fit(workspace.data(model), model, return_fitted_val=True, par_bounds = bounds ) if False: # get sigma_mu from hessian pyhf.set_backend(pyhf.tensorlib, 'minuit') muhat, maxNllh,o = pyhf.infer.mle.fit(workspace.data(model), model, return_fitted_val=True, par_bounds = bounds, return_result_obj = True ) sigma_mu = float ( np.sqrt ( o.hess_inv[0][0] ) ) * self.scale # print ( f"\n>>> sigma_mu from hessian {sigma_mu:.2f}" ) pyhf.set_backend(pyhf.tensorlib, 'scipy') muhat = float ( muhat[model.config.poi_index]*self.scale ) except (pyhf.exceptions.FailedMinimization, ValueError) as e: logger.error(f"pyhf mle.fit failed {e}") muhat, maxNllh = float("nan"), float("nan") try: lmax = maxNllh.tolist() except: lmax= maxNllh try: lmax = float(lmax) except: lmax = float(lmax[0]) sigma_mu = self.getSigmaMu ( workspace ) lmax = self.exponentiateNLL(lmax, not return_nll) ret = { "lmax": lmax, "muhat": muhat, "sigma_mu": sigma_mu } self.data.cached_lmaxes[workspace_index] = ret return ret
[docs] def updateWorkspace(self, workspace_index=None, expected=False): """ Small method used to return the appropriate workspace :param workspace_index: the index of the workspace to retrieve from the corresponding list :param expected: if False, retuns the unmodified (but patched) workspace. Used for computing observed or aposteriori expected limits. if True, retuns the modified (and patched) workspace, where obs = sum(bkg). Used for computing apriori expected limit. """ if self.nWS == 1: if expected == True: return self.workspaces_expected[0] else: return self.workspaces[0] else: if workspace_index == None: logger.error("No workspace index was provided.") if expected == True: return self.workspaces_expected[workspace_index] else: return self.workspaces[workspace_index]
[docs] def getUpperLimitOnSigmaTimesEff(self, expected=False, workspace_index=None): """ Compute the upper limit on the fiducial cross section sigma times efficiency: - by default, the combination of the workspaces contained into self.workspaces - if workspace_index is specified, self.workspace[workspace_index] (useful for computation of the best upper limit) :param expected: - if set to `True`: uses expected SM backgrounds as signals - else: uses `self.nsignals` :param workspace_index: - if different from `None`: index of the workspace to use for upper limit - else: choose best combo :return: the upper limit on sigma times eff at `self.cl` level (0.95 by default) """ if self.data.totalYield == 0.: return None else: ul = self.getUpperLimitOnMu( expected=expected, workspace_index=workspace_index) if ul == None: return ul if self.lumi is None: logger.error(f"asked for upper limit on fiducial xsec, but no lumi given with the data") return ul xsec = self.data.totalYield / self.lumi return ul * xsec
# Trying a new method for upper limit computation : # re-scaling the signal predictions so that mu falls in [0, 10] instead of # looking for mu bounds # Usage of the index allows for rescaling
[docs] def getUpperLimitOnMu(self, expected=False, workspace_index=None): """ Compute the upper limit on the signal strength modifier with: - by default, the combination of the workspaces contained into self.workspaces - if workspace_index is specified, self.workspace[workspace_index] (useful for computation of the best upper limit) :param expected: - if set to `True`: uses expected SM backgrounds as signals - else: uses `self.nsignals` :param workspace_index: - if different from `None`: index of the workspace to use for upper limit - else: choose best combo :return: the upper limit at `self.cl` level (0.95 by default) """ self.__init__(self.data, self.cl, self.includeCRs, self.lumi) if workspace_index in self.data.cachedULs[expected]: ret = self.data.cachedULs[expected][workspace_index] return ret with warnings.catch_warnings(): warnings.filterwarnings( "ignore", "Values in x were outside bounds during a minimize step, clipping to bounds", ) startUL = time.time() logger.debug("Calling getUpperLimitOnMu") if self.data.errorFlag or self.workspaces == None: # For now, this flag can only be turned on by PyhfData.checkConsistency return None if ( all([self.zeroSignalsFlag[workspace_index] for workspace_index in range(self.nWS)]) == True ): logger.debug( "There is (are) %d workspace(s) and no signal(s) was (were) found" % self.nWS ) return None if workspace_index == None: workspace_index = self.getBestCombinationIndex() if workspace_index == None: logger.debug("Best combination index not found") return None def root_func(mu): # If expected == False, use unmodified (but patched) workspace # If expected == True, use modified workspace where observations = sum(bkg) (and patched) # If expected == posteriori, use unmodified (but patched) workspace workspace = self.updateWorkspace(workspace_index, expected=expected) # Same modifiers_settings as those use when running the 'pyhf cls' command line msettings = { "normsys": {"interpcode": "code4"}, "histosys": {"interpcode": "code4p"}, } with warnings.catch_warnings(): warnings.simplefilter("ignore", category=(DeprecationWarning,UserWarning)) model = workspace.model(modifier_settings=msettings) bounds = model.config.suggested_bounds() bounds[model.config.poi_index] = (0, 10) start = time.time() args = {} args["return_expected"] = expected == "posteriori" args["par_bounds"] = bounds # args["maxiter"]=100000 pver = float(pyhf.__version__[:3]) stat = "qtilde" if pver < 0.6: args["qtilde"] = True else: args["test_stat"] = stat with np.testing.suppress_warnings() as sup: if pyhfinfo["backend"] == "numpy": sup.filter(RuntimeWarning, r"invalid value encountered in log") # print ("expected", expected, "return_expected", args["return_expected"], "mu", mu, "\nworkspace.data(model) :", workspace.data(model, include_auxdata = False), "\nworkspace.observations :", workspace.observations, "\nobs[data] :", workspace['observations']) try: result = pyhf.infer.hypotest(mu, workspace.data(model), model, **args) except Exception as e: logger.info(f"when testing hypothesis {mu}, caught exception: {e}") result = float("nan") if expected == "posteriori": result = [float("nan")] * 2 end = time.time() logger.debug("Hypotest elapsed time : %1.4f secs" % (end - start)) logger.debug(f"result for {mu} {result}") if expected == "posteriori": logger.debug("computing a-posteriori expected limit") logger.debug("expected = {}, mu = {}, result = {}".format(expected, mu, result)) try: CLs = float(result[1].tolist()) except TypeError: CLs = float(result[1][0]) else: logger.debug("expected = {}, mu = {}, result = {}".format(expected, mu, result)) CLs = float(result) # logger.debug("Call of root_func(%f) -> %f" % (mu, 1.0 - CLs)) return 1.0 - self.cl - CLs # Rescaling signals so that mu is in [0, 10] factor = 3.0 wereBothLarge = False wereBothTiny = False nattempts = 0 nNan = 0 lo_mu, med_mu, hi_mu = 0.2, 1.0, 5.0 # print ( "starting with expected", expected ) while "mu is not in [lo_mu,hi_mu]": nattempts += 1 if nNan > 5: # logger.warning("encountered NaN 5 times while trying to determine the bounds for brent bracketing. now trying with q instead of qtilde test statistic") return None # nattempts = 0 if nattempts > 10: logger.warning( "tried 10 times to determine the bounds for brent bracketing. we abort now." ) return None # Computing CL(1) - 0.95 and CL(10) - 0.95 once and for all rt1 = root_func(lo_mu) # rt5 = root_func(med_mu) rt10 = root_func(hi_mu) # print ( "we are at",lo_mu,med_mu,hi_mu,"values at", rt1, rt5, rt10, "scale at", self.scale,"factor at", factor ) if rt1 < 0.0 and 0.0 < rt10: # Here's the real while condition break if self.alreadyBeenThere: factor = 1 + (factor - 1) / 2 logger.debug("Diminishing rescaling factor") if np.isnan(rt1): rt5 = root_func(med_mu) if rt5 < 0.0 and rt10 > 0.0: lo_mu = med_mu med_mu = np.sqrt(lo_mu * hi_mu) continue if rt10 < 0.0: ## also try to increase hi_mu hi_mu = hi_mu + (10.0 - hi_mu) * 0.5 med_mu = np.sqrt(lo_mu * hi_mu) nNan += 1 self.rescale(factor) continue if np.isnan(rt10): rt5 = root_func(med_mu) if rt5 > 0.0 and rt1 < 0.0: hi_mu = med_mu med_mu = np.sqrt(lo_mu * hi_mu) continue if rt1 > 0.0: ## also try to decrease lo_mu lo_mu = lo_mu * 0.5 med_mu = np.sqrt(lo_mu * hi_mu) nNan += 1 self.rescale(1 / factor) continue # Analyzing previous values of wereBoth*** if rt10 < 0 and rt1 < 0 and wereBothLarge: factor = 1 + (factor - 1) / 2 logger.debug("Diminishing rescaling factor") if rt10 > 0 and rt1 > 0 and wereBothTiny: factor = 1 + (factor - 1) / 2 logger.debug("Diminishing rescaling factor") # Preparing next values of wereBoth*** wereBothTiny = rt10 < 0 and rt1 < 0 wereBothLarge = rt10 > 0 and rt1 > 0 # Main rescaling code if rt10 < 0.0: self.rescale(factor) continue if rt1 > 0.0: self.rescale(1 / factor) continue # Finding the root (Brent bracketing part) logger.debug("Final scale : %f" % self.scale) logger.debug("Starting brent bracketing") ul = optimize.brentq(root_func, lo_mu, hi_mu, rtol=1e-3, xtol=1e-3) endUL = time.time() logger.debug("getUpperLimitOnMu elpased time : %1.4f secs" % (endUL - startUL)) ul = ul * self.scale self.data.cachedULs[expected][workspace_index] = ul return ul # self.scale has been updated within self.rescale() method
if __name__ == "__main__": C = [ 18774.2, -2866.97, -5807.3, -4460.52, -2777.25, -1572.97, -846.653, -442.531, -2866.97, 496.273, 900.195, 667.591, 403.92, 222.614, 116.779, 59.5958, -5807.3, 900.195, 1799.56, 1376.77, 854.448, 482.435, 258.92, 134.975, -4460.52, 667.591, 1376.77, 1063.03, 664.527, 377.714, 203.967, 106.926, -2777.25, 403.92, 854.448, 664.527, 417.837, 238.76, 129.55, 68.2075, -1572.97, 222.614, 482.435, 377.714, 238.76, 137.151, 74.7665, 39.5247, -846.653, 116.779, 258.92, 203.967, 129.55, 74.7665, 40.9423, 21.7285, -442.531, 59.5958, 134.975, 106.926, 68.2075, 39.5247, 21.7285, 11.5732, ] nsignal = [x / 100.0 for x in [47, 29.4, 21.1, 14.3, 9.4, 7.1, 4.7, 4.3]] m = PyhfData( observed=[1964, 877, 354, 182, 82, 36, 15, 11], backgrounds=[2006.4, 836.4, 350.0, 147.1, 62.0, 26.2, 11.1, 4.7], covariance=C, # third_moment = [ 0.1, 0.02, 0.1, 0.1, 0.003, 0.0001, 0.0002, 0.0005 ], third_moment=[0.0] * 8, nsignal=nsignal, name="ATLAS-SUSY-2018-31 model", ) ulComp = PyhfUpperLimitComputer(cl=0.95) # uls = ulComp.getUpperLimitOnMu ( Data ( 15,17.5,3.2,0.00454755 ) ) # print ( "uls=", uls ) ul_old = 131.828 * sum( nsignal ) # With respect to the older refernece value one must normalize the xsec print("old ul=", ul_old) ul = ulComp.getUpperLimitOnMu(m) print("ul", ul)