Source code for theory.clusterTools

"""
.. module:: clusterTools
   :synopsis: Module holding the ElementCluster class and cluster methods used to combine similar elements according
      to the analysis.
        
.. moduleauthor:: Andre Lessa <lessa.a.p@gmail.com>
        
"""

from smodels.theory import crossSection
from smodels.theory.auxiliaryFunctions import massAvg, massPosition, distance
from smodels.tools.physicsUnits import fb, MeV
from smodels.theory.exceptions import SModelSTheoryError as SModelSError

from smodels.tools.smodelsLogging import logger


[docs]class ElementCluster(object): """ An instance of this class represents a cluster. This class is used to store the relevant information about a cluster of elements and to manipulate this information. :ivar elements: list of elements in the cluster (Element objects) """ def __init__(self): self.elements = [] def __iter__(self): return iter(self.elements) def __getitem__(self, iel): return self.elements[iel]
[docs] def getTotalXSec(self): """ Return the sum over the cross sections of all elements belonging to the cluster. :returns: sum of weights of all the elements in the cluster (XSectionList object) """ totxsec = crossSection.XSectionList() for el in self.elements: totxsec.combineWith(el.weight) if len(totxsec) != 1: logger.error("Cluster total cross section should have a single value") raise SModelSError() return totxsec[0]
[docs] def getAvgMass(self): """ Return the average mass of all elements belonging to the cluster. If the cluster does not refer to a TxName (i.e. in efficiency map results) AND the cluster contains more than one element (assuming they differ in the masses), returns None. :returns: average mass array """ def similar ( a1, a2 ): if len(a1) != len(a2): return False for l1,l2 in zip ( a1,a2 ): if len(l1) != len(l2): return False for e1, e2 in zip (l1,l2 ): d=abs ( (e1-e2).asNumber(MeV) ) if d>.1: return False return True if self.getDataType() == 'efficiencyMap': if len(self.elements) > 1: ret = self.elements[0].getMasses() for i in self.elements[1:]: if not similar ( i.getMasses(), ret ): return None return ret else: return self.elements[0].getMasses() elif self.getDataType() == 'upperLimit': massList = [el.getMasses() for el in self.elements] weights = [el.weight.getMaxXsec() / fb for el in self.elements] return massAvg(massList,weights=weights)
[docs] def getPIDs(self): """ Return the list of all PIDs appearing in all elements in the cluster, i.e. [ [[pdg1, pdg2,...],[pdg3,pdg4,...]], [[pdg1', pdg2',...],[pdg3',pdg4',...]] :returns: list of PIDs """ PIDs = [] for el in self: for pidList in el.getPIDs(): if not pidList in PIDs: PIDs.append(pidList) return PIDs
[docs] def getIDs(self): """ Return list of all element IDs appearing in the cluster :return: list of element IDs """ IDs = [] for el in self: if not el.elID in IDs: IDs.append(el.elID) return IDs
[docs] def getDataType(self): """ Checks to which type of data (efficiency map or upper limit) the cluster refers to. It uses the cluster.txnames attribute. If not defined, returns None :return: upperLimits or efficiencyMap (string) """ if not hasattr(self, 'txnames') or not self.txnames: return None else: #Check the data types #for txname in self.txnames: # if not txname.txnameData._data: # txname.txnameData.loadData() #Make sure the _data is loaded dataTag = list(set([txname.txnameData.dataTag for txname in self.txnames])) if len(dataTag) != 1: logger.error("A single cluster contain mixed data types!") raise SModelSError() elif 'upperLimit' in dataTag[0]: return 'upperLimit' elif 'efficiencyMap' in dataTag[0]: return 'efficiencyMap' else: logger.error("Unknown data type %s" % (dataTag[0])) raise SModelSError()
[docs]class IndexCluster(object): """ An instance of this class represents a cluster storing element indices. This auxiliary class is used to store element indices and positions in upper limit space. It is only used by the clustering algorithm. :ivar indices: list of integers mapping the cluster elements to their position in the list (1st element -> index 0, 2nd element -> index 1,...) :ivar avgPosition: position in upper limit space for the cluster average mass :ivar massMap: dictionary with indices as keys and the corresponding element mass as values :ivar positionMap: dictionary with indices as keys and the corresponding element position in upper limit space as values :ivar weightMap: dictionary with indices as keys and the corresponding element weight as values :ivar txdata: TxNameData object to be used for computing distances in UL space """ def __init__(self, massMap=None, posMap=None, wMap=None, indices=set([]), txdata = None): self.indices = indices self.avgPosition = None self.massMap = massMap self.positionMap = posMap self.weightMap = wMap self.txdata = txdata if massMap and posMap and wMap and len(self.indices) > 0: self.avgPosition = self._getAvgPosition() def __eq__(self, other): if type(self) != type(other): return False elif set(self.indices) != set(other.indices): return False else: return True def __iter__(self): return iter(list(self.indices)) def __getitem__(self, iel): return list(self.indices)[iel]
[docs] def copy(self): """ Returns a copy of the index cluster (faster than deepcopy). """ newcluster = IndexCluster() newcluster.indices = set(list(self.indices)[:]) newcluster.avgPosition = self.avgPosition if type(self.positionMap) == type(dict()): newcluster.positionMap = dict(self.positionMap.items()) else: newcluster.positionMap = None if type(self.massMap) == type(dict()): newcluster.massMap = dict(self.massMap.items()) else: newcluster.massMap = None if type(self.weightMap) == type(dict()): newcluster.weightMap = dict(self.weightMap.items()) else: newcluster.weightMap = None newcluster.txdata = self.txdata return newcluster
[docs] def add(self, iels): """ Add an index or a list of indices to the list of indices and update the avgPosition value. """ if type(iels) == type(int()): ilist = [iels] else: ilist = iels indices = list(self.indices).extend(ilist) self.indices = set(indices) self.avgPosition = self._getAvgPosition()
[docs] def remove(self, iels): """ Remove an index or a list of indices to the list of indices and update the avgPosition value. """ if type(iels) == type(int()): ilist = [iels] else: ilist = iels indices = list(self.indices) for iel in ilist: indices.remove(iel) self.indices = set(indices) self.avgPosition = self._getAvgPosition()
def _getAvgPosition(self): """ Return the average position in upper limit space for all indices belonging to the cluster. """ if len(list(self.indices)) == 1: return self.positionMap[self[0]] masses = [self.massMap[iel] for iel in self] weights = [self.weightMap[iel] for iel in self] clusterMass = massAvg(masses,weights=weights) avgPos = self.txdata.getValueFor(clusterMass) if avgPos is None: return False else: return (avgPos/fb).asNumber() def _getDistanceTo(self, obj): """ Return the maximum distance between any elements belonging to the cluster and the object obj. obj can be a position in upper limit space or an element index. """ dmax = 0. if type(obj) == type(int()) and obj >= 0: pos = self.positionMap[obj] elif type(obj) == type(1.): pos = obj else: logger.error("Unknown object type (must be an element index or " "position)") raise SModelSError() for jel in self: dmax = max(dmax, distance(pos, self.positionMap[jel])) return dmax def _getMaxInternalDist(self): """ Return the maximum distance between any pair of elements belonging to the cluster as well as the cluster center and any element. """ dmax = 0. if self.avgPosition is None: self.avgPosition = self._getAvgPosition() for iel in self: dmax = max(dmax, distance(self.positionMap[iel], self.avgPosition)) dmax = max(dmax, self._getDistanceTo(iel)) return dmax
[docs]def groupAll(elements): """ Create a single cluster containing all the elements. Skip mother elements which contain the daughter in the list (avoids double counting). :param elements: List of Element objects :return: ElementCluster object containing all unique elements """ cluster = ElementCluster() cluster.elements = [] allmothers = [] #Collect the list of all mothers: for el in elements: allmothers += [elMom[1].elID for elMom in el.motherElements] for el in elements: #Skip the element if it is a mother of another element in the list if any((elMom is el.elID) for elMom in allmothers): continue cluster.elements.append(el) #Collect the txnames appearing in the cluster cluster.txnames = list(set([el.txname for el in cluster.elements])) cluster.txnames.sort() return cluster
[docs]def clusterElements(elements, maxDist): """ Cluster the original elements according to their mass distance. :parameter elements: list of elements (Element objects) :parameter txname: TxName object to be used for computing distances in UL space :parameter maxDist: maximum mass distance for clustering two elements :returns: list of clusters (ElementCluster objects) """ if len(elements) == 0: return [] txnames = list(set([el.txname for el in elements])) if len(txnames) != 1: logger.error("Clustering elements with different Txnames!") raise SModelSError() txdata = txnames[0].txnameData # ElementCluster elements by their mass: clusters = _doCluster(elements, txdata, maxDist) for cluster in clusters: cluster.txnames = txnames return clusters
def _doCluster(elements, txdata, maxDist): """ Cluster algorithm to cluster elements. :parameter elements: list of all elements to be clustered :parameter txdata: TxNameData object to be used for computing distances in UL space :parameter maxDist: maximum mass distance for clustering two elements :returns: a list of ElementCluster objects containing the elements belonging to the cluster """ # First build the element:mass, element:position in UL space # and element:maxWeight (in fb) dictionaries #(Combine elements with identical masses) massMap = {} posMap = {} weightMap = {} for iel, el in enumerate(elements): if not el.getMasses() in massMap.values(): massMap[iel] = el.getMasses() posMap[iel] = massPosition(massMap[iel], txdata) weightMap[iel] = el.weight.getMaxXsec() / fb else: j = list(massMap.keys())[list(massMap.values()).index(el.getMasses())] weightMap[j] += el.weight.getMaxXsec() / fb # Start with maximal clusters clusterList = [] for iel in posMap: indices = [iel] for jel in posMap: if distance(posMap[iel], posMap[jel]) <= maxDist: indices.append(jel) indexCluster = IndexCluster(massMap, posMap, weightMap, set(indices),txdata) #Ignore cluster which average mass falls oustide the grid: if indexCluster.avgPosition: clusterList.append(indexCluster) #Split the maximal clusters until all elements inside each cluster are #less than maxDist apart from each other and the cluster average position #is less than maxDist apart from all elements finalClusters = [] newClusters = True while newClusters: newClusters = [] for indexCluster in clusterList: # cluster is good if indexCluster._getMaxInternalDist() < maxDist: if not indexCluster in finalClusters: finalClusters.append(indexCluster) continue # Distance to cluster center (average) distAvg = indexCluster._getDistanceTo(indexCluster.avgPosition) #Loop over cluster elements and if element distance or cluster #average distance falls outside the cluster, remove element for iel in indexCluster: dist = indexCluster._getDistanceTo(iel) if max(dist, distAvg) > maxDist: newcluster = indexCluster.copy() newcluster.remove(iel) if not newcluster in newClusters: #Ignore cluster which average mass falls oustide the grid: if newcluster.avgPosition: newClusters.append(newcluster) clusterList = newClusters # Check for oversized list of indexCluster (too time consuming) if len(clusterList) > 100: logger.warning("ElementCluster failed, using unclustered masses") finalClusters = [] clusterList = [] # finalClusters = finalClusters + clusterList # Add clusters of individual masses (just to be safe) for iel in massMap: finalClusters.append(IndexCluster(massMap, posMap, weightMap, set([iel]))) # Clean up clusters (remove redundant clusters) for ic, clusterA in enumerate(finalClusters): if clusterA is None: continue for jc, clusterB in enumerate(finalClusters): if clusterB is None: continue if ic != jc and clusterB.indices.issubset(clusterA.indices): finalClusters[jc] = None while finalClusters.count(None) > 0: finalClusters.remove(None) # Transform index clusters to element clusters: clusterList = [] for indexCluster in finalClusters: cluster = ElementCluster() masses = [massMap[iel] for iel in indexCluster] for el in elements: if el.getMasses() in masses: cluster.elements.append(el) clusterList.append(cluster) return clusterList