"""
.. module:: expAuxiliaryFunctions
:synopsis: A collection of functions used to evaluate fuzzy the conditions.
.. moduleauthor:: Andre Lessa <lessa.a.p@gmail.com>
"""
from smodels.base.physicsUnits import standardUnits, GeV
from smodels.experiment.exceptions import SModelSExperimentError as SModelSError
from smodels.base.smodelsLogging import logger
import unum
import re
import numpy as np
from collections import OrderedDict, deque
try:
from collections.abc import Iterable
except (ImportError, ModuleNotFoundError):
from collections import Iterable
minWidth = 1e-30 # Any width below this can be safely considered to be zero
maxWidth = 1e50 # Any width above this can be safely considered to be infinity
[docs]def cSim(*weights):
"""
Define the auxiliar similar function.
Return the maximum relative difference between any element weights of the
list, normalized to [0,1].
:returns: List of values.
"""
if any(not isinstance(w, (float, unum.Unum)) for w in weights):
logger.error("Trying to evaluate with invalid objects %s")
raise SModelSError()
x = np.array([removeUnits(w) for w in weights], dtype=float)
# Convert x to a matrix
xM = np.reshape(x, (len(x), 1))
# Compute the differences between all values
d = np.abs(xM - xM.transpose())
# Compute the average between all values
s = np.abs(xM + xM.transpose())
# Compute the relative difference for
# the entries which have a non-zero average
delta = d[np.nonzero(s)]/s[np.nonzero(s)]
if len(delta) > 0:
return delta.max()
else:
return 0.
[docs]def cGtr(weightA, weightB):
"""
Define the auxiliary greater function.
Return a number between 0 and 1 depending on how much it is violated
(0 = A > B, 1 = A << B).
:returns: XSectioList object with the values for each label.
"""
if any(not isinstance(w, (float, unum.Unum)) for w in [weightA, weightB]):
logger.error("Trying to evaluate with invalid objects")
raise SModelSError()
# Evaluate the inequality for each cross section info
a = removeUnits(weightA)
b = removeUnits(weightB)
if a + b == 0.:
return None
else:
result = (abs(a - b) - (a - b)) / (2.*(a + b))
return result
[docs]def removeUnits(value, stdUnits=standardUnits, returnUnit=False):
"""
Remove units from unum objects. Uses the units defined
in base.physicsUnits.standard units to normalize the data.
:param value: Object containing units (e.g. [[100*GeV,100.*GeV],3.*pb])
:param standardUnits: Unum unit or Array of unum units defined to
normalize the data.
:param returnUnit: If True also resturns the unit corresponding to the returned
value.
:return: Object normalized to standard units (e.g. [[100,100],3000]).
If returnUnit = True, returns a tuple with the value and its unit (e.g. 100,GeV).
For unitless values return 1.0 as the unit.
"""
if isinstance(stdUnits, unum.Unum):
stdunits = [stdUnits]
else:
stdunits = stdUnits
if isinstance(value, list):
return [removeUnits(x, stdunits) for x in value]
if isinstance(value, tuple):
return tuple([removeUnits(x, stdunits) for x in value])
elif isinstance(value, dict):
return dict([[removeUnits(x, stdunits), removeUnits(y, stdunits)]
for x, y in value.items()])
elif isinstance(value, unum.Unum):
# Check if value has unit or not:
if not value._unit:
return value.asNumber()
# Now try to normalize it by one of the standard pre-defined units:
for unit in stdunits:
y = (value/unit).normalize()
if not y._unit:
val = value.asNumber(unit)
if returnUnit:
return val, unit
else:
return val
raise SModelSError("Could not normalize unit value %s using the standard units: %s"
% (str(value), str(standardUnits)))
else:
if returnUnit:
return value, 1.0
else:
return value
[docs]def addUnit(obj, unit):
"""
Add unit to object.
If the object is a nested list, adds the unit to all of its elements.
:param obj: Object without units (e.g. [[100,100.]])
:param unit: Unit to be added to the object (Unum object, e.g. GeV)
:return: Object with units (e.g. [[100*GeV,100*GeV]])
"""
if isinstance(obj, list):
return [addUnit(x, unit) for x in obj]
elif isinstance(obj, tuple):
return tuple([addUnit(x, unit) for x in obj])
elif isinstance(obj, dict):
return dict([[addUnit(x, unit), addUnit(y, unit)]
for x, y in obj.items()])
elif isinstance(obj, (float, int, unum.Unum)):
return obj*unit
else:
return obj
[docs]def flattenArray(objList):
"""
Flatten any nested list to a 1D list.
:param objList: Any list or nested list of objects (e.g. [[[(100.,1e-10),100.],1.],[[200.,200.],2.],..]
:return: 1D list (e.g. [100.,1e-10,100.,1.,200.,200.,2.,..])
"""
ret = []
for obj in objList:
if isinstance(obj, Iterable) and not isinstance(obj, str):
ret.extend(flattenArray(obj))
else:
ret.append(obj)
return ret
[docs]def reshapeList(objList, shapeArray):
"""
Reshape a flat list according to the shape of shapeArray.
The number of elements in shapeArray should equal the length of objList.
:param objList: 1D list of objects (e.g. [200,100,'*',50,'*'])
:param shapeArray: Nested array (e.g. [[float,float,'*',float],'*'])
:return: Array with elements from objList shaped according to shapeArray
(e.g. [[200.,100.,'*',50],'*'])
"""
if isinstance(shapeArray, list):
return [reshapeList(objList, v) for v in shapeArray]
else:
return objList.pop(0)
[docs]def index_bisect(inlist, el):
"""
Return the index where to insert item el in inlist.
inlist is assumed to be sorted and a comparison function (lt or cmp)
must exist for el and the other elements of the list.
If el already appears in the list, inlist.insert(el) will
insert just before the leftmost el already there.
"""
lo = 0
hi = len(inlist)
while lo < hi:
mid = (lo+hi)//2
if inlist[mid] < el:
lo = mid+1
else:
hi = mid
return lo
[docs]def smsInStr(instring):
"""
Parse instring and return a list of elements appearing in instring.
instring can also be a list of strings.
:param instring: string containing elements (e.g. "[[['e+']],[['e-']]]+[[['mu+']],[['mu-']]]")
:returns: list of elements appearing in instring in string format
"""
outstr = ""
if isinstance(instring, str):
outstr = instring
elif isinstance(instring, list):
for st in instring:
if not isinstance(st, str):
logger.error("Input must be a string or a list of strings")
raise SModelSError()
# Combine list of strings in a single string
outstr += st
else:
raise SModelSError("syntax error in constraint/condition: ``%s''."
"Check your constraints and conditions in your database." % str(instring))
outstr = outstr.replace(" ", "")
if 'PV' in outstr and '>' in outstr:
if '{' not in outstr or '}' not in outstr:
raise SModelSError("Elements in %s should be enclosed by curly brackets" % outstr)
elements = re.findall(r"\{(.*?)\}", outstr)
return elements
elements = []
elStr = ""
bCounter = 0
# Parse the string and looks for matching ['s and ]'s, when the matching is
# complete, store element
for c in outstr:
if c == '[':
bCounter += 1
elif c == ']':
bCounter -= 1
if bCounter != 0:
elStr += c
elif elStr:
elements.append(elStr + c)
elStr = ""
# Check if there are not unmatched ['s and/or ]'s in the string
if bCounter != 0:
raise SModelSError("Wrong input (incomplete elements?) " + instring)
# Make sure single quotes are used for the particle strings:
newElements = []
for el in elements:
# Remove all single quotes
el = el.replace("'", "")
# Get particles without quotes
ptcList = el.replace(']', '').replace('[', '').split(',')
ptcList = [ptc for ptc in ptcList if ptc]
newEl = el[:]
iptc = 0
# Replace particle strings by their values with single quotes
for ptc in ptcList:
# Search for ptc only starting after the last replacement
iptc += newEl[iptc:].find(ptc)
newEl = newEl[:iptc] + newEl[iptc:].replace(ptc, "'%s'" % ptc, 1)
iptc += len(ptc) + 1 # Update the position
newElements.append(newEl)
return newElements
[docs]def getValuesForObj(obj, attribute):
"""
Loops over all attributes in the object and in its attributes
and returns a list of values for the desired attribute:
:param obj: Any object with a __dict__ attribute
:param attribute: String for the desired attribute
:return: List with unique attribute values. If the attribute is not found, returns empty list.
"""
values = []
try:
objDict = obj.__dict__.items()
except AttributeError:
return values
for attr, value in objDict:
if attribute == attr:
values += [value]
elif isinstance(value, Iterable):
values += [getValuesForObj(v, attribute) for v in value if v is not obj]
elif value is not obj:
values += getValuesForObj(value, attribute)
values = list(filter(lambda a: (not isinstance(a, list)) or a != [], values))
values = flattenArray(values)
uniqueValues = [v for n, v in enumerate(values) if v not in values[:n]]
return uniqueValues
[docs]def bracketToProcessStr(stringSMS,
finalState=None, intermediateState=None,
returnNodeDict=False):
"""
:parameter stringSMS: string describing the SMS in bracket notation
(e.g. [[[e+],[jet]],[[e-],[jet]]])
:parameter finalState: list containing the final state labels for each branch
(e.g. ['MET', 'HSCP'] or ['MET','MET'])
:parameter intermediateState: nested list containing intermediate state labels
for each branch (e.g. [['gluino'], ['gluino']])
:param returnNodeDict: If True, return a dictionary mapping the nested bracket
indices to the particle nodes
({(branchIndex,vertexIndex) : nodeIndex})
:return: process string in new format (str) and dictionary nodes dictionary (if returnNodeDict=True)
"""
# Make sure all particle labels are enclosed by single quotes:
stringSMS = stringSMS.replace("'","").replace(" ","")
# Add quotes
addQuotes = lambda match: "[%s]" %(",".join(["'%s'" %ptc for ptc in match.group(1).split(',')]))
# Find all strings surrounded by brackets and replace
# them by the string with quotes added
stringSMS = re.sub(r"\[([^\[\]]+)\]", addQuotes, stringSMS)
# Convert string to nested list:
smsList = eval(stringSMS)
# Collect all BSM states in each branch
# and convert inclusive branches
branches = []
bsmStates = []
for ibr,br in enumerate(smsList):
bsmStates.append([])
if br == ['*']:
branches.append([['*anySM']])
bsmStates[ibr] = ['InclusiveNode']
else:
branches.append([sorted([ptc if ptc != '*' else 'anySM'
for ptc in vt]) for vt in br])
if intermediateState is None:
bsmStates[ibr] = ['anyBSM']*len(br)
else:
bsmStates[ibr] = intermediateState[ibr][:]
if finalState is None:
bsmStates[ibr].append('MET')
else:
bsmStates[ibr].append(finalState[ibr])
# Create decay string for primary vertex:
bsmNodesDict = {}
decayStrings = []
daughtersStr = []
momStr = 'PV(0)'
nodeIndex = 1
for ibr,bsmBranch in enumerate(bsmStates):
bsmNodesDict[(ibr,0)] = nodeIndex
nodeIndex += 1
daughtersStr.append('%s(%i)' %(bsmBranch[0],bsmNodesDict[(ibr,0)]))
decStr = '(%s > %s)' %(momStr,','.join(daughtersStr))
decayStrings.append(decStr)
# Create decay string for all branches:
smNodesDict = {}
for ibr,bsmBranch in enumerate(bsmStates):
for idec,bsm in enumerate(bsmBranch[:-1]):
# Define node index for BSM mom
if (ibr,idec) not in bsmNodesDict:
bsmNodesDict[(ibr,idec)] = nodeIndex
nodeIndex += 1
# Create string with node index for BSM mom
momStr = '%s(%i)' %(bsm,bsmNodesDict[(ibr,idec)])
daughtersStr = []
# Define node index for BSM daughter
if (ibr,idec+1) not in bsmNodesDict:
bsmNodesDict[(ibr,idec+1)] = nodeIndex
nodeIndex += 1
# Create string with node index for mom
daughtersStr.append('%s(%i)' %(bsmBranch[idec+1],bsmNodesDict[(ibr,idec+1)]))
# Define node indices for SM daughters:
for ism,_ in enumerate(branches[ibr][idec]):
smNodesDict[(ibr,idec,ism)] = nodeIndex
nodeIndex += 1
# Create string with node index for mom
daughtersStr += ['%s(%i)' %(sm,smNodesDict[(ibr,idec,ism)])
for ism,sm in enumerate(branches[ibr][idec])]
# Create decay string:
decStr = '(%s > %s)' %(momStr, ','.join(daughtersStr))
decayStrings.append(decStr)
processStr = ', '.join(decayStrings)
if returnNodeDict:
return processStr,bsmNodesDict
else:
return processStr
[docs]def getAttributesFrom(obj, skipIDs=[]):
"""
Loops over all attributes in the object and return a list
of the attributes.
:param obj: Any object with a __dict__ attribute
:param skipIDs: List of object ids. Any object which has its id on the list
will be ignored (useful to avoid recursion).
:return: List with unique attribute labels.
"""
if id(obj) in skipIDs or isinstance(obj, (tuple, list, float, int, unum.Unum)):
return []
else:
newSkipIDs = skipIDs[:] + [id(obj)]
attributes = []
try:
objDict = obj.__dict__.items()
except AttributeError:
return attributes
for attr, value in objDict:
attributes.append(attr)
if isinstance(value, list):
attributes += [getAttributesFrom(v, newSkipIDs) for v in value]
elif isinstance(value, dict):
attributes += [getAttributesFrom(v, newSkipIDs) for v in value.values()]
else:
attributes += getAttributesFrom(value, newSkipIDs)
attributes = list(filter(lambda a: a != [], attributes))
return list(set(flattenArray(attributes)))
[docs]def rescaleWidth(width):
"""
The function that is applied to all widths to
map it into a better variable for interpolation.
It grows logarithmically from zero (for width=0.)
to a large number (machine dependent) for width = infinity.
:param width: Width value (in GeV) with or without units
:return x: Coordinate value (float)
"""
if isinstance(width, unum.Unum):
w = width.asNumber(GeV)
else:
w = width
w = (min(w, maxWidth)/minWidth) # Normalize the width and convert it to some finite number (if not finite)
if w < 1e-10: # The log function misbehaves for very small values of w (step behavior), so we use log(1+x) = x for x << 1
return w
else:
return np.log(1+w)
[docs]def unscaleWidth(x):
"""
Maps a coordinate value back to width.
The mapping is such that x=0->width=0 and x=very large -> width = inf.
:param x: Coordinate value (float)
:return width: Width value without units
"""
with np.errstate(over='ignore'): # Temporarily disable overflow error message
# The small increase in x is required to
# enforce unscaleWidth(widthToCoordinae(np.inf)) = np.inf
width = minWidth*(np.exp(x)-1)
if width > maxWidth:
width = np.inf
return width
[docs]def removeInclusives(massArray, shapeArray):
"""
Remove all entries corresponding to '*' in shapeArray.
If shapeArray contains entries = '*', the corresponding entries
in value will be removed from the output.
:param massArray: Array to be formatted (e.g. [[200.,100.],[200.,100.]] or [[200.,'*'],'*'],0.4])
:param shapeArray: Array with format info (e.g. ['*',[float,float]])
:return: formatted array (e.g. [[200.,100.]] or [[200.]],0.4])
"""
if shapeArray == '*':
return None
elif isinstance(massArray, list):
if len(shapeArray) != len(massArray):
raise SModelSError("Input value and data shape mismatch (%s,%s)"
% (len(shapeArray), len(massArray)))
else:
return [removeInclusives(xi, shapeArray[i]) for i, xi in enumerate(massArray)
if not removeInclusives(xi, shapeArray[i]) is None]
else:
return massArray
[docs]def addInclusives(massList, shapeArray):
"""
Add entries corresponding to '*' in shapeArray.
If shapeArray contains entries = '*', the corresponding entries
in value will be added from the output.
:param massList: 1D array of floats. Its dimension should be equal to the number
of non "*" items in shapeArray (e.g. [200.0,100.0])
:param shapeArray: 1D array containing the data type and '*'. The
values of data type are ignored (e.g. [float,'*',float,'*','*']).
:return: original array with '*' inserted at the correct entries.
"""
if isinstance(shapeArray, list):
return [addInclusives(massList, v) for v in shapeArray]
elif shapeArray != '*':
return massList.pop(0)
else:
return shapeArray
[docs]def sortParticleList(ptcList):
"""
sorts a list of particle or particle list objects by their label
:param ptcList: list to be sorted containing particle or particle list objects
:return: sorted list of particles
"""
newPtcList = sorted(ptcList, key=lambda x: x.label)
return newPtcList
[docs]def cleanWalk ( topdir ):
""" perform os.walk, but ignore all hidden files and directories """
import os
ret = []
for root, d_, f_ in os.walk ( topdir ):
isHidden=False
tokens = root.split("/")
for token in tokens:
if len(token)>0 and token[0]==".":
isHidden=True
break
if isHidden:
continue
dirs,files = [],[]
for d in d_:
if not d[0]==".":
dirs.append ( d )
for f in f_:
if not f[0]==".":
files.append ( f )
ret.append ( [ root, dirs, files ] )
return ret
[docs]def concatenateLines ( oldcontent ):
""" of all lines in the list "oldcontent", concatenate the ones
that end with '\' or ',' """
content=[] ## concatenate lines that end with "," or "\"
tmp=""
import re
for i,line in enumerate ( oldcontent ):
tmp+=line.strip()
## if next line starts with tab or whitespace or "}",
## merge the lines
if i < len(oldcontent)-1 and re.match("[ \t}]",oldcontent[i+1] ):
# if next line starts with white space, we add also
continue
if tmp != "" and tmp[-1] not in [ ",", '\\' ]:
content.append ( tmp )
tmp=""
if tmp != "" and tmp[-1] == '\\':
tmp=tmp[:-1] # remove trailing \ (but keep trailing ,)
return content