Source code for theory.clusterTools

.. module:: clusterTools
   :synopsis: Module holding the ElementCluster class and cluster methods used to combine similar elements according
      to the analysis.

.. moduleauthor:: Andre Lessa <>


from smodels.theory import crossSection
from smodels.theory.element import Element
from smodels.experiment.datasetObj import DataSet,CombinedDataSet
from import fb
from smodels.theory.exceptions import SModelSTheoryError as SModelSError
from smodels.theory.auxiliaryFunctions import average
import numpy as np
from import logger

[docs]class AverageElement(Element): """ Represents an element or list of elements containing only the basic attributes required for clustering or computing efficiencies/upper limits. Its properties are given by the average properties of the elements it represents and its weight is given by the total weight of all elements. """ def __init__(self,elements=[]): if any(not isinstance(el,Element) for el in elements): raise SModelSError("An AverageElement must be created from a list of Element objects.") #Define relevant properties to be stored and averaged over:['mass','totalwidth','txname'] self.elements = elements[:] if self.elements: for attr in setattr(self,attr,self.getAverage(attr)) self.weight = self.elements[0].weight.copy() for el in self.elements[1:]: self.weight += el.weight def __str__(self): """ Simply returns "averageElement", since the element has no well defined branches/final states (in general). :returns: averageElement (string) """ return "averageElement" def __cmp__(self,other): """ Compares the element with other. Only the properties defined in are used for comparison. :param other: element to be compared (Element or AverageElement object) :return: -1 if self < other, 0 if self == other, +1, if self > other. """ if not isinstance(other,(Element,AverageElement)): return -1 otherProperties = [getattr(other,attr) for attr in] selfProperties = [getattr(self,attr) for attr in] comp = (selfProperties > otherProperties) - (otherProperties > selfProperties) return comp def __eq__(self,other): return self.__cmp__(other)==0 def __lt__(self,other): return self.__cmp__(other)<0 def __gt__(self,other): return self.__cmp__(other)>0 def __neq__(self,other): return not self.__eq__(other) def __getattr__(self, attr): """ Returns the attribute of self (necessary to overwrite the Element class method). :param attr: Attribute name :return: Attribute value """ if not attr in self.__dict__: raise AttributeError ( "%s not in AverageElement" % attr ) else: return self.__dict__[attr]
[docs] def getAverage(self,attribute,weighted=True,nround=5): """ Compute the average value for the attribute using the elements in self.elements. If weighted = True, compute the weighted average using the elements weights. :param attribute: Name of attribute to be averaged over (string) :param weighted: If True, uses the element weights to compute a weighted average :param nround: If greater than zero and the returning attibute is numeric, will round it to this number of significant digits. :return: Average value of attribute. """ if not self.elements: return None if len(self.elements) == 1: return getattr(self.elements[0],attribute) values = [getattr(el,attribute) for el in self.elements] if weighted: weights = [el.weight.getMaxXsec().asNumber(fb) for el in self.elements] else: weights = [1.]*len(self.elements) return average(values,weights,nround)
[docs] def contains(self,element): """ Check if the average element contains the element :param element: Element object :return: True/False """ if any(el is element for el in self.elements): return True return False
[docs]class ElementCluster(object): """ An instance of this class represents a cluster of elements. This class is used to store the relevant information about a cluster of elements and to manipulate this information. """ def __init__(self, elements = [], dataset = None, distanceMatrix = None): self.elements = elements self.dataset = dataset self.maxInternalDist = 0. self._distanceMatrix = distanceMatrix #Compute maximal internal distance if self.elements and not self._distanceMatrix is None: self.maxInternalDist = max([self.getDistanceTo(el) for el in self]) def __eq__(self, other): if type(self) != type(other): return False elif set(self.indices()) != set(other.indices()): return False else: return True def __iter__(self): return iter(self.elements) def __getitem__(self, iel): return self.elements[iel] def __len__(self): return len(self.elements) def __str__(self): return str(self.elements) def __repr__(self): return str(self.elements)
[docs] def indices(self): """ Return a list of element indices appearing in cluster """ indices = [el._index for el in self] return indices
[docs] def getTotalXSec(self): """ Return the sum over the cross sections of all elements belonging to the cluster. :returns: sum of weights of all the elements in the cluster (XSectionList object) """ totxsec = crossSection.XSectionList() for el in self: totxsec += el.weight if len(totxsec) != 1: logger.error("Cluster total cross section should have a single value") raise SModelSError() return totxsec[0]
[docs] def getDataType(self): """ Checks to which type of data (efficiency map or upper limit) the cluster refers to. It uses the cluster.dataset attribute. If not defined, returns None :return: upperLimits or efficiencyMap (string) """ if self.dataset: dataType = self.dataset.getType() else: dataType = None return dataType
[docs] def averageElement(self): """ Computes the average element for the cluster. The average element is an empty element, but with its mass and width given by the average over the cluster elements. :return: Element object """ avgEl = AverageElement(self.elements[:]) if self.dataset: avgEl._upperLimit = self.dataset.getUpperLimitFor(avgEl, txnames=avgEl.txname) avgEl._index = None return avgEl
[docs] def copy(self): """ Returns a copy of the index cluster (faster than deepcopy). """ newcluster = ElementCluster(self.elements[:],self.dataset,self._distanceMatrix) newcluster.maxInternalDist = self.maxInternalDist return newcluster
[docs] def add(self, elements): """ Add an element or list of elements. :param elements: Element object or list of elements """ if not isinstance(elements,list): elementList = [elements] else: elementList = elements for el in elementList: if el._index in self.indices(): continue self.elements.append(el) #Update internal distance: self.maxInternalDist = max(self.maxInternalDist,self.getDistanceTo(el))
[docs] def remove(self, elements): """ Remove an element or a list of element from the cluster. :param elements: Element object or list of elements """ if not isinstance(elements,list): elementList = [elements] else: elementList = elements for el in elementList: indices = self.indices() if not el._index in indices: continue iel = indices.index(el._index) self.elements.pop(iel) #Update internal distance: self.maxInternalDist = max([self.getDistanceTo(elB) for elB in self])
[docs] def getDistanceTo(self, element): """ Return the maximum distance between any elements belonging to the cluster and element. :parameter element: Element object :return: maximum distance (float) """ if not hasattr(element, '_upperLimit'): element._upperLimit = self.dataset.getUpperLimitFor(element, txnames=element.txname) if element._upperLimit is None: return None #Use pre-computed distances for regular (non-averge) elements if not element._index is None: return max([self._distanceMatrix[element._index,el._index] for el in self]) dmax = 0. for el in self: if not hasattr(el, '_upperLimit'): el._upperLimit = self.dataset.getUpperLimitFor(el, txnames=el.txname) dmax = max(dmax,relativeDistance(element,el,self.dataset)) return dmax
[docs] def isConsistent(self,maxDist): """ Checks if the cluster is consistent. Computes an average element in the cluster and checks if this average element belongs to the cluster according to the maximum allowed distance between cluster elements. :return: True/False if the cluster is/is not consistent. """ avgElement = self.averageElement() if avgElement._upperLimit is None: return False dmax = self.getDistanceTo(avgElement) if dmax > maxDist: return False return True
[docs]def relativeDistance(el1, el2, dataset): """ Defines the relative distance between two elements according to their upper limit values. The distance is defined as d = 2*|ul1-ul2|/(ul1+ul2). :parameter el1: Element object :parameter el2: Element object :returns: relative distance """ if not hasattr(el1,'_upperLimit'): el1._upperLimit = dataset.getUpperLimitFor(el1, txnames=el1.txname) if not hasattr(el2,'_upperLimit'): el2._upperLimit = dataset.getUpperLimitFor(el2, txnames=el2.txname) ul1 = el1._upperLimit ul2 = el2._upperLimit if ul1 is None or ul2 is None: return None if (ul1+ul2).asNumber(fb) == 0.: return 0. ulDistance = 2.*abs(ul1 - ul2)/(ul1 + ul2) return ulDistance
[docs]def clusterElements(elements, maxDist, dataset): """ Cluster the original elements according to their distance in upper limit space. :parameter elements: list of elements (Element objects) :parameter dataset: Dataset object to be used when computing distances in upper limit space :parameter maxDist: maximum distance for clustering two elements :returns: list of clusters (ElementCluster objects) """ if len(elements) == 0: return [] if any(not isinstance(el,Element) for el in elements): raise SModelSError("Asked to cluster non Element objects") if not isinstance(dataset,(DataSet,CombinedDataSet)): raise SModelSError("A dataset object must be defined for clustering") #Make sure only unique elements are clustered together (avoids double counting weights) #Sort element, so the ones with highest contribution (weight) come first: elementList = sorted(elements, key = lambda el: el.weight.getMaxXsec(), reverse=True) #Remove duplicated elements: elementsUnique = [] for el in elementList: #Skip the element if it is related to any another element in the list if any(el.isRelatedTo(elB) for elB in elementsUnique): continue elementsUnique.append(el) #Get txname list only with the txnames from unique elements used for clustering txnames = list(set([el.txname for el in elementsUnique])) if dataset.getType() == 'upperLimit' and len(txnames) != 1 : logger.error("Clustering elements with different Txnames for an UL result.") raise SModelSError() if dataset.getType() == 'upperLimit': #Group according to upper limit values clusters = doCluster(elementsUnique, dataset, maxDist) elif dataset.getType() == 'efficiencyMap': #Group all elements together distanceMatrix = np.zeros((len(elementsUnique),len(elementsUnique))) cluster = ElementCluster(dataset=dataset,distanceMatrix=distanceMatrix) for iel,el in enumerate(elementsUnique): el._index = iel cluster.elements = elementsUnique clusters = [cluster] for cluster in clusters: cluster.txnames = txnames return clusters
[docs]def groupElements(elements,dataset): """ Group elements into groups where the average element identical to all the elements in group. The groups contain all elements which share the same mass,width and upper limit and can be replaced by their average element when building clusters. :parameter elements: list of all elements to be grouped :parameter dataset: Dataset object to be used when computing distances in upper limit space :returns: a list of AverageElement objects which represents a group of elements with same mass, width and upper limit. """ # First make sure all elements contain their upper limits for el in elements: if not hasattr(el,'._upperLimit'): el._upperLimit = dataset.getUpperLimitFor(el,txnames=el.txname) if el._upperLimit is None: raise SModelSError("Trying to cluster element outside the grid.") #Group elements if they have the same UL #and give the same average element (same mass and same width) avgElements = [] for iA,elA in enumerate(elements): avgEl = AverageElement([elA]) avgEl._upperLimit = elA._upperLimit for iB,elB in enumerate(elements): if iB <= iA: continue if elA._upperLimit != elB._upperLimit: continue if avgEl != elB: continue avgEl.elements.append(elB) avgEl.weight += elB.weight if not avgEl in avgElements: avgElements.append(avgEl) #Make sure each element belongs to a average element: for el in elements: nclusters = sum([avgEl.contains(el) for avgEl in avgElements]) if nclusters != 1: raise SModelSError("Error computing average elements. Element %s belongs to %i average elements." %(str(el),nclusters)) return avgElements
[docs]def doCluster(elements, dataset, maxDist): """ Cluster algorithm to cluster elements. :parameter elements: list of all elements to be clustered :parameter dataset: Dataset object to be used when computing distances in upper limit space :parameter maxDist: maximum distance for clustering two elements :returns: a list of ElementCluster objects containing the elements belonging to the cluster """ #Get average elements: averageElements = groupElements(elements,dataset) #Index average elements: elementList = sorted(averageElements, key = lambda el: el._upperLimit) for iel,el in enumerate(elementList): el._index = iel #Pre-compute all necessary distances: distanceMatrix = np.zeros((len(elementList),len(elementList))) for iel,elA in enumerate(elementList): for jel,elB in enumerate(elementList): if jel <= iel: continue distanceMatrix[iel,jel] = relativeDistance(elA, elB, dataset) distanceMatrix = distanceMatrix + distanceMatrix.T #Start building maximal clusters clusterList = [] for el in elementList: cluster = ElementCluster([],dataset,distanceMatrix) for elB in elementList: if distanceMatrix[el._index,elB._index] <= maxDist: cluster.add(elB) if not cluster.elements: continue if cluster.averageElement()._upperLimit is None: continue if not cluster in clusterList: clusterList.append(cluster) #Split the maximal clusters until all elements inside each cluster are #less than maxDist apart from each other and the cluster average position #is less than maxDist apart from all elements finalClusters = [] while clusterList: newClusters = [] for cluster in clusterList: #Check if maximal internal distance is below maxDist isConsistent = cluster.isConsistent(maxDist) if isConsistent and cluster.maxInternalDist < maxDist: if not cluster in finalClusters: finalClusters.append(cluster) #Cluster violates maxDist: else: #Loop over cluster elements and if element distance #falls outside the cluster, remove element for el in cluster: if cluster.getDistanceTo(el) > maxDist or not isConsistent: newcluster = cluster.copy() newcluster.remove(el) if newcluster.averageElement()._upperLimit is None: continue if newcluster in newClusters: continue newClusters.append(newcluster) clusterList = newClusters # Check for oversized list of indexCluster (too time consuming) if len(clusterList) > 100: logger.warning("ElementCluster failed, using unclustered masses") finalClusters = [] clusterList = [] # finalClusters = finalClusters + clusterList # Add clusters of individual masses (just to be safe) for el in elementList: finalClusters.append(ElementCluster([el],dataset,distanceMatrix)) # Clean up clusters (remove redundant clusters) for ic, clusterA in enumerate(finalClusters): if clusterA is None: continue for jc, clusterB in enumerate(finalClusters): if clusterB is None: continue if ic != jc and set(clusterB.indices()).issubset(set(clusterA.indices())): finalClusters[jc] = None while finalClusters.count(None) > 0: finalClusters.remove(None) #Replace average elements by the original elements: for cluster in finalClusters: originalElements = [] for avgEl in cluster.elements[:]: originalElements += avgEl.elements[:] cluster.elements = originalElements[:] return finalClusters