Source code for experiment.expResultObj

"""
.. module:: expResultObj
   :synopsis: Contains class that encapsulates an experimental result

.. moduleauthor:: Veronika Magerl <v.magerl@gmx.at>
.. moduleauthor:: Andre Lessa <lessa.a.p@gmail.com>

"""

import os
from smodels.experiment import infoObj
from smodels.experiment import datasetObj
from smodels.experiment import metaObj
from smodels.experiment.exceptions import SModelSExperimentError
from smodels.tools.smodelsLogging import logger
from smodels.tools.stringTools import cleanWalk

try:
    import cPickle as serializer
except ImportError as e:
    import pickle as serializer

[docs]class ExpResult(object): """ Object containing the information and data corresponding to an experimental result (experimental conference note or publication). :ivar path: path to the experimental result folder (i.e. ATLAS-CONF-2013-047) :ivar globalInfo: Info object holding the data in <path>/globalInfo.txt :ivar datasets: List of DataSet objects corresponding to the dataset folders in <path> """ def __init__(self, path = None, discard_zeroes = True): """ :param path: Path to the experimental result folder :param discard_zeroes: Discard maps with only zeroes """ if not path: return if not os.path.isdir ( path ): raise SModelSExperimentError ( "%s is not a path" % path ) self.discard_zeroes = discard_zeroes self.path = path if not os.path.isfile(os.path.join(path, "globalInfo.txt")): logger.error("globalInfo.txt file not found in " + path) raise TypeError self.globalInfo = infoObj.Info(os.path.join(path, "globalInfo.txt")) datasets = {} folders=[] for root, _, files in cleanWalk(path): folders.append ( (root, files) ) folders.sort() self.datasets = [] hasOrder = hasattr ( self.globalInfo, "datasetOrder" ) for root, files in folders: if 'dataInfo.txt' in files: # data folder found # Build data set try: dataset = datasetObj.DataSet(root, self.globalInfo, discard_zeroes = discard_zeroes ) if hasOrder: datasets[dataset.dataInfo.dataId]=dataset else: self.datasets.append ( dataset ) except TypeError: continue if not hasOrder: return dsOrder = self.globalInfo.datasetOrder if type ( dsOrder ) == str: ## for debugging only, we allow a single dataset dsOrder = [ dsOrder ] for dsname in dsOrder: self.datasets.append ( datasets[dsname] ) if len(self.datasets) != len(dsOrder): raise SModelSExperimentError ( "lengths of datasets and datasetOrder mismatch" )
[docs] def writePickle(self, dbVersion): """ write the pickle file """ meta = metaObj.Meta ( self.path, self.discard_zeroes, databaseVersion=dbVersion ) pclfile = "%s/.%s" % ( self.path, meta.getPickleFileName() ) logger.debug ( "writing expRes pickle file %s, mtime=%s" % (pclfile, meta.cTime() ) ) f=open( pclfile, "wb" ) ptcl = serializer.HIGHEST_PROTOCOL # ptcl = 2 serializer.dump(meta, f, protocol=ptcl) serializer.dump(self, f, protocol=ptcl) f.close()
def __eq__(self, other ): if self.globalInfo != other.globalInfo: return False if len(self.datasets) != len ( other.datasets ): return False for (myds,otherds) in zip ( self.datasets, other.datasets ): if myds != otherds: return False return True def __ne__(self, other ): return not self.__eq__ ( other )
[docs] def id(self): return self.globalInfo.getInfo('id')
def __str__(self): label = self.globalInfo.getInfo('id') + ": " dataIDs = [dataset.getID() for dataset in self.datasets] ct_dids=0 if dataIDs: for dataid in dataIDs: if dataid: ct_dids+=1 label += dataid + "," label = "%s(%d):" % ( label[:-1], ct_dids ) txnames = [] for dataset in self.datasets: for txname in dataset.txnameList: tx = txname.txName if not tx in txnames: txnames.append(tx) if isinstance(txnames, list): for txname in txnames: label += txname + ',' label = "%s(%d)," % (label[:-1], len(txnames) ) else: label += txnames + ',' return label[:-1]
[docs] def getDataset(self, dataId ): """ retrieve dataset by dataId """ for dataset in self.datasets: if dataset.getID() == dataId: return dataset return None
[docs] def getTxNames(self): """ Returns a list of all TxName objects appearing in all datasets. """ txnames = [] for dataset in self.datasets: txnames += dataset.txnameList return txnames
[docs] def getEfficiencyFor(self, txname, mass, dataset=None): """ Convenience function. Get the efficiency for a specific dataset for a a specific txname. Equivalent to: self.getDataset ( dataset ).getEfficiencyFor ( txname, mass ) """ dataset = self.getDataset( dataset ) if dataset: return dataset.getEfficiencyFor( txname, mass ) return None
[docs] def hasCovarianceMatrix( self ): return hasattr(self.globalInfo, "covariance")
""" this feature is not yet ready def isUncorrelatedWith ( self, other ): can this expResult be safely assumed to be approximately uncorrelated with "other"? "Other" can be another expResult, or a dataset of an expResult. if self == other: return False if other.globalInfo.dirName ( 1 ) != self.globalInfo.dirName ( 1 ): return True # print ( "%s combinable with %s?" % ( self.globalInfo.id, other.globalInfo.id ) ) if hasattr ( self.globalInfo, "combinableWith" ): #print ( "check: %s, %s" % (other.globalInfo.id, self.globalInfo.combinableWith) ) if other.globalInfo.id in self.globalInfo.combinableWith: return True if hasattr ( other.globalInfo, "combinableWith" ): if self.globalInfo.id in other.globalInfo.combinableWith: return True return None ## FIXME implement """
[docs] def getUpperLimitFor(self, dataID=None, alpha=0.05, expected=False, txname=None, mass=None, compute=False): """ Computes the 95% upper limit (UL) on the signal cross section according to the type of result. For an Efficiency Map type, returns the UL for the signal*efficiency for the given dataSet ID (signal region). For an Upper Limit type, returns the UL for the signal*BR for the given mass array and Txname. :param dataID: dataset ID (string) (only for efficiency-map type results) :param alpha: Can be used to change the C.L. value. The default value is 0.05 (= 95% C.L.) (only for efficiency-map results) :param expected: Compute expected limit, i.e. Nobserved = NexpectedBG (only for efficiency-map results) :param txname: TxName object or txname string (only for UL-type results) :param mass: Mass array with units (only for UL-type results) :param compute: If True, the upper limit will be computed from expected and observed number of events. If False, the value listed in the database will be used instead. :return: upper limit (Unum object) """ dataset = self.getDataset(dataID) if dataset: upperLimit = dataset.getUpperLimitFor(mass=mass,expected = expected, txnames = txname, compute=compute,alpha=alpha) return upperLimit else: logger.error("Dataset ID %s not found in experimental result %s" %(dataID,self)) return None
[docs] def getValuesFor(self, attribute=None): """ Returns a list for the possible values appearing in the ExpResult for the required attribute (sqrts,id,constraint,...). If there is a single value, returns the value itself. :param attribute: name of a field in the database (string). If not defined it will return a dictionary with all fields and their respective values :return: list of values or value """ fieldDict = list ( self.__dict__.items() ) valuesDict = {} while fieldDict: for field, value in fieldDict: if not '<smodels.experiment' in str(value): if not field in valuesDict: valuesDict[field] = [value] else: valuesDict[field].append(value) else: if isinstance(value, list): for entry in value: fieldDict += entry.__dict__.items() else: fieldDict += value.__dict__.items() fieldDict.remove((field, value)) # Try to keep only the set of unique values for key, val in valuesDict.items(): try: valuesDict[key] = list(set(val)) except TypeError: pass if not attribute: return valuesDict elif not attribute in valuesDict.keys(): logger.warning("Could not find field %s in %s", attribute, self.path) return False else: return valuesDict[attribute]
[docs] def getAttributes(self, showPrivate=False): """ Checks for all the fields/attributes it contains as well as the attributes of its objects if they belong to smodels.experiment. :param showPrivate: if True, also returns the protected fields (_field) :return: list of field names (strings) """ fields = self.getValuesFor().keys() fields = list(set(fields)) if not showPrivate: for field in fields[:]: if "_" == field[0]: fields.remove(field) return fields
[docs] def getTxnameWith(self, restrDict={}): """ Returns a list of TxName objects satisfying the restrictions. The restrictions specified as a dictionary. :param restrDict: dictionary containing the fields and their allowed values. E.g. {'txname' : 'T1', 'axes' : ....} The dictionary values can be single entries or a list of values. For the fields not listed, all values are assumed to be allowed. :return: list of TxName objects if more than one txname matches the selection criteria or a single TxName object, if only one matches the selection. """ txnameList = [] for tag, value in restrDict.items(): for txname in self.getTxNames(): txval = txname.getInfo(tag) if txval is False: continue elif txval == value: txnameList.append(txname) if len(txnameList) == 1: txnameList = txnameList[0] return txnameList
def __lt__ ( self, other ): """ experimental results are sorted alphabetically according to their description strings """ return str(self) < str(other)