Source code for experiment.expResultObj

.. module:: expResultObj
   :synopsis: Contains class that encapsulates an experimental result

.. moduleauthor:: Veronika Magerl <>
.. moduleauthor:: Andre Lessa <>


import os
from smodels.experiment import infoObj
from smodels.experiment import datasetObj
from smodels.experiment import metaObj
from smodels.experiment.exceptions import SModelSExperimentError
from import logger
from import cleanWalk

    import cPickle as serializer
except ImportError as e:
    import pickle as serializer

[docs]class ExpResult(object): """ Object containing the information and data corresponding to an experimental result (experimental conference note or publication). :ivar path: path to the experimental result folder (i.e. ATLAS-CONF-2013-047) :ivar globalInfo: Info object holding the data in <path>/globalInfo.txt :ivar datasets: List of DataSet objects corresponding to the dataset folders in <path> """ def __init__(self, path = None, discard_zeroes = True): """ :param path: Path to the experimental result folder :param discard_zeroes: Discard maps with only zeroes """ if not path: return if not os.path.isdir ( path ): raise SModelSExperimentError ( "%s is not a path" % path ) self.discard_zeroes = discard_zeroes self.path = path if not os.path.isfile(os.path.join(path, "globalInfo.txt")): logger.error("globalInfo.txt file not found in " + path) raise TypeError self.globalInfo = infoObj.Info(os.path.join(path, "globalInfo.txt")) datasets = {} folders=[] for root, _, files in cleanWalk(path): folders.append ( (root, files) ) folders.sort() self.datasets = [] hasOrder = hasattr ( self.globalInfo, "datasetOrder" ) for root, files in folders: if 'dataInfo.txt' in files: # data folder found # Build data set try: dataset = datasetObj.DataSet(root, self.globalInfo, discard_zeroes = discard_zeroes ) if hasOrder: datasets[dataset.dataInfo.dataId]=dataset else: self.datasets.append ( dataset ) except TypeError: continue if not hasOrder: return dsOrder = self.globalInfo.datasetOrder if type ( dsOrder ) == str: ## for debugging only, we allow a single dataset dsOrder = [ dsOrder ] for dsname in dsOrder: self.datasets.append ( datasets[dsname] ) if len(self.datasets) != len(dsOrder): raise SModelSExperimentError ( "lengths of datasets and datasetOrder mismatch" )
[docs] def writePickle(self, dbVersion): """ write the pickle file """ meta = metaObj.Meta ( self.path, self.discard_zeroes, databaseVersion=dbVersion ) pclfile = "%s/.%s" % ( self.path, meta.getPickleFileName() ) logger.debug ( "writing expRes pickle file %s, mtime=%s" % (pclfile, meta.cTime() ) ) f=open( pclfile, "wb" ) # ptcl = serializer.HIGHEST_PROTOCOL ptcl = min ( serializer.HIGHEST_PROTOCOL, 4 ) serializer.dump(meta, f, protocol=ptcl) serializer.dump(self, f, protocol=ptcl) f.close()
def __eq__(self, other ): if self.globalInfo != other.globalInfo: return False if len(self.datasets) != len ( other.datasets ): return False for (myds,otherds) in zip ( self.datasets, other.datasets ): if myds != otherds: return False return True def __ne__(self, other ): return not self.__eq__ ( other )
[docs] def id(self): return self.globalInfo.getInfo('id')
def __str__(self): label = self.globalInfo.getInfo('id') + ": " dataIDs = [dataset.getID() for dataset in self.datasets] ct_dids=0 if dataIDs: for dataid in dataIDs: if dataid: ct_dids+=1 label += dataid + "," label = "%s(%d):" % ( label[:-1], ct_dids ) txnames = [] for dataset in self.datasets: for txname in dataset.txnameList: tx = txname.txName if not tx in txnames: txnames.append(tx) if isinstance(txnames, list): for txname in txnames: label += txname + ',' label = "%s(%d)," % (label[:-1], len(txnames) ) else: label += txnames + ',' return label[:-1]
[docs] def getDataset(self, dataId ): """ retrieve dataset by dataId """ for dataset in self.datasets: if dataset.getID() == dataId: return dataset return None
[docs] def getTxNames(self): """ Returns a list of all TxName objects appearing in all datasets. """ txnames = [] for dataset in self.datasets: txnames += dataset.txnameList return txnames
[docs] def getEfficiencyFor(self, txname, mass, dataset=None): """ Convenience function. Get the efficiency for a specific dataset for a a specific txname. Equivalent to: self.getDataset ( dataset ).getEfficiencyFor ( txname, mass ) """ dataset = self.getDataset( dataset ) if dataset: return dataset.getEfficiencyFor( txname, mass ) return None
[docs] def hasCovarianceMatrix( self ): return hasattr(self.globalInfo, "covariance")
""" this feature is not yet ready def isUncorrelatedWith ( self, other ): can this expResult be safely assumed to be approximately uncorrelated with "other"? "Other" can be another expResult, or a dataset of an expResult. if self == other: return False if other.globalInfo.dirName ( 1 ) != self.globalInfo.dirName ( 1 ): return True # print ( "%s combinable with %s?" % (, ) ) if hasattr ( self.globalInfo, "combinableWith" ): #print ( "check: %s, %s" % (, self.globalInfo.combinableWith) ) if in self.globalInfo.combinableWith: return True if hasattr ( other.globalInfo, "combinableWith" ): if in other.globalInfo.combinableWith: return True return None ## FIXME implement """
[docs] def getUpperLimitFor(self, dataID=None, alpha=0.05, expected=False, txname=None, mass=None, compute=False): """ Computes the 95% upper limit (UL) on the signal cross section according to the type of result. For an Efficiency Map type, returns the UL for the signal*efficiency for the given dataSet ID (signal region). For an Upper Limit type, returns the UL for the signal*BR for the given mass array and Txname. :param dataID: dataset ID (string) (only for efficiency-map type results) :param alpha: Can be used to change the C.L. value. The default value is 0.05 (= 95% C.L.) (only for efficiency-map results) :param expected: Compute expected limit, i.e. Nobserved = NexpectedBG (only for efficiency-map results) :param txname: TxName object or txname string (only for UL-type results) :param mass: Mass array with units (only for UL-type results) :param compute: If True, the upper limit will be computed from expected and observed number of events. If False, the value listed in the database will be used instead. :return: upper limit (Unum object) """ dataset = self.getDataset(dataID) if dataset: upperLimit = dataset.getUpperLimitFor(mass=mass,expected = expected, txnames = txname, compute=compute,alpha=alpha) return upperLimit else: logger.error("Dataset ID %s not found in experimental result %s" %(dataID,self)) return None
[docs] def hasJsonFile( self ): return hasattr(self.globalInfo, "jsonFiles")
[docs] def getValuesFor(self, attribute=None): """ Returns a list for the possible values appearing in the ExpResult for the required attribute (sqrts,id,constraint,...). If there is a single value, returns the value itself. :param attribute: name of a field in the database (string). If not defined it will return a dictionary with all fields and their respective values :return: list of values or value """ fieldDict = list ( self.__dict__.items() ) valuesDict = {} while fieldDict: for field, value in fieldDict: if not '<smodels.experiment' in str(value): if not field in valuesDict: valuesDict[field] = [value] else: valuesDict[field].append(value) else: if isinstance(value, list): for entry in value: fieldDict += entry.__dict__.items() else: fieldDict += value.__dict__.items() fieldDict.remove((field, value)) # Try to keep only the set of unique values for key, val in valuesDict.items(): try: valuesDict[key] = list(set(val)) except TypeError: pass if not attribute: return valuesDict elif not attribute in valuesDict.keys(): logger.warning("Could not find field %s in %s", attribute, self.path) return False else: return valuesDict[attribute]
[docs] def getAttributes(self, showPrivate=False): """ Checks for all the fields/attributes it contains as well as the attributes of its objects if they belong to smodels.experiment. :param showPrivate: if True, also returns the protected fields (_field) :return: list of field names (strings) """ fields = self.getValuesFor().keys() fields = list(set(fields)) if not showPrivate: for field in fields[:]: if "_" == field[0]: fields.remove(field) return fields
[docs] def getTxnameWith(self, restrDict={}): """ Returns a list of TxName objects satisfying the restrictions. The restrictions specified as a dictionary. :param restrDict: dictionary containing the fields and their allowed values. E.g. {'txname' : 'T1', 'axes' : ....} The dictionary values can be single entries or a list of values. For the fields not listed, all values are assumed to be allowed. :return: list of TxName objects if more than one txname matches the selection criteria or a single TxName object, if only one matches the selection. """ txnameList = [] for tag, value in restrDict.items(): for txname in self.getTxNames(): txval = txname.getInfo(tag) if txval is False: continue elif txval == value: txnameList.append(txname) if len(txnameList) == 1: txnameList = txnameList[0] return txnameList
def __lt__ ( self, other ): """ experimental results are sorted alphabetically according to their description strings """ return str(self) < str(other)