#!/usr/bin/env python3
##########################################################################
# basf2 (Belle II Analysis Software Framework) #
# Author: The Belle II Collaboration #
# #
# See git log for contributors and copyright holders. #
# This file is licensed under LGPL-3.0, see LICENSE.md. #
##########################################################################
"""
The core classes of the skim package are defined in ``skim.core``: ``BaseSkim`` and
``CombinedSkim``.
* ``BaseSkim`` is an abstract base class from which all skims inherit. It defines
template functions for a skim, and includes attributes describing the skim metadata.
* ``CombinedSkim`` is a class for combining ``BaseSkim`` objects into a single steering
file.
"""
from abc import ABC, abstractmethod
import warnings
import basf2 as b2
from modularAnalysis import applyCuts, summaryOfLists
from skim.registry import Registry
from skim.utils.flags import InitialiseSkimFlag, UpdateSkimFlag
from skim.utils.testfiles import get_test_file
from skim.utils.misc import _hashable_list
[docs]
class BaseSkim(ABC):
"""Base class for skims. Initialises a skim name, and creates template functions
required for each skim.
See `writing-skims` for information on how to use this to define a new skim.
"""
NoisyModules = None
"""List of module types to be silenced. This may be necessary in certain skims in
order to keep log file sizes small.
.. tip::
The elements of this list should be the module *type*, which is not necessarily
the same as the module name. The module type can be inspected in Python via
``module.type()``.
.. seealso::
This attribute is used by `BaseSkim.set_skim_logging`.
"""
TestSampleProcess = "mixed"
"""MC process of test file. `BaseSkim.TestFiles` passes this property to
`skim.utils.testfiles.get_test_file` to retrieve an appropriate file location.
Defaults to a :math:`B^{0}\\overline{B^{0}}` sample.
"""
MergeDataStructures = {}
"""Dict of ``str -> function`` pairs to determine if any special data structures
should be merged when combining skims. Currently, this is only used to merge FEI
config parameters when running multiple FEI skims at once, so that it can be run
just once with all the necessary arguments."""
ApplyHLTHadronCut = False
"""If this property is set to True, then the HLT selection for ``hlt_hadron`` will
be applied to the skim lists when the skim is added to the path.
"""
produce_on_tau_samples = True
"""If this property is set to False, then ``b2skim-prod`` will not produce data
production requests for this skim on taupair MC samples. This decision may be made
for one of two reasons:
* The retention rate of the skim on taupair samples is basically zero, so there is
no point producing the skim for these samples.
* The retention rate of the skim on taupair samples is too high (>20%), so the
production system may struggle to handle the jobs.
"""
produces_mdst_by_default = False
"""Special property for combined systematics skims, which produce MDST output instead of
uDST. This property is used by ``b2skim-prod`` to set the ``DataLevel`` parameter in
the ``DataDescription`` block for this skim to ``mdst`` instead of ``udst``.
"""
validation_sample = None
"""
MDST sample to use for validation histograms. Must be a valid location of a
validation dataset (see documentation for `basf2.find_file`).
"""
mc = True
"""
Include Monte Carlo quantities in skim output.
"""
analysisGlobaltag = None
"""
Analysis globaltag.
"""
pidGlobaltag = None
"""
PID globaltag.
"""
@property
@abstractmethod
def __description__(self):
pass
@property
@abstractmethod
def __category__(self):
pass
@property
@abstractmethod
def __authors__(self):
pass
@property
@abstractmethod
def __contact__(self):
pass
@property
def code(self):
"""Eight-digit code assigned to this skim in the registry."""
return Registry.encode_skim_name(self.name)
def __init__(
self,
*,
OutputFileName=None,
additionalDataDescription=None,
udstOutput=True,
validation=False,
mc=True,
analysisGlobaltag=None,
pidGlobaltag=None,
):
"""Initialise the BaseSkim class.
Parameters:
OutputFileName (str): Name to give output uDST files. If none given, then
defaults to eight-number skim code.
additionalDataDescription (dict): additional data description to be added to the output file metadata.
udstOutput (bool): If True, add uDST output to the path.
validation (bool): If True, build lists and write validation histograms
instead of writing uDSTs.
mc (bool): If True, include MC quantities in output.
analysisGlobaltag (str): Analysis globaltag.
pidGlobaltag (str): PID globaltag.
"""
self.name = self.__class__.__name__
self.OutputFileName = OutputFileName
self.additionalDataDescription = additionalDataDescription
self._udstOutput = udstOutput
self._validation = validation
self.mc = mc
self.analysisGlobaltag = analysisGlobaltag
self.pidGlobaltag = pidGlobaltag
if self.NoisyModules is None:
self.NoisyModules = []
[docs]
def load_standard_lists(self, path):
"""
Load any standard lists. This code will be run before any
`BaseSkim.additional_setup` and `BaseSkim.build_lists`.
Note:
This is separated into its own function so that when skims are combined, any
standard lists used by two skims can be loaded just once.
Parameters:
path (basf2.Path): Skim path to be processed.
"""
[docs]
def additional_setup(self, path):
"""
Perform any setup steps necessary before running the skim.
Warning:
Standard particle lists should *not* be loaded in here. This should be done
by overriding the method `BaseSkim.load_standard_lists`. This is crucial for
avoiding loading lists twice when combining skims for production.
Parameters:
path (basf2.Path): Skim path to be processed.
"""
# Abstract method to ensure that it is overridden whenever `BaseSkim` is inherited
[docs]
@abstractmethod
def build_lists(self, path):
"""Create the skim lists to be saved in the output uDST. This function is where
the main skim cuts should be applied. This function should return a list of
particle list names.
Parameters:
path (basf2.Path): Skim path to be processed.
.. versionchanged:: release-06-00-00
Previously, this function was expected to set the attribute
`BaseSkim.SkimLists`. Now this is handled by `BaseSkim`, and this function is
expected to return the list of particle list names.
"""
[docs]
def validation_histograms(self, path):
"""Create validation histograms for the skim.
Parameters:
path (basf2.Path): Skim path to be processed.
"""
# Everything beyond this point can remain as-is when defining a skim
def __call__(self, path):
"""Produce the skim particle lists and write uDST file.
Parameters:
path (basf2.Path): Skim path to be processed.
"""
self._MainPath = path
self.initialise_skim_flag(path)
self.load_standard_lists(path)
self.additional_setup(path)
# At this point, BaseSkim.skim_event_cuts may have been run, so pass
# self._ConditionalPath for the path if it is not None (otherwise just pass the
# regular path)
self.SkimLists = self.build_lists(self._ConditionalPath or path)
self.apply_hlt_hadron_cut_if_required(self._ConditionalPath or path)
self.update_skim_flag(self._ConditionalPath or path)
if self._udstOutput:
self.output_udst(self._ConditionalPath or path)
if self._validation:
if self._method_unchanged("validation_histograms"):
b2.B2FATAL(f"No validation histograms defined for {self} skim.")
self.validation_histograms(self._ConditionalPath or path)
self.set_skim_logging()
@property
def postskim_path(self):
"""
Return the skim path.
* If `BaseSkim.skim_event_cuts` has been run, then the skim lists will only be
created on a conditional path, so subsequent modules should be added to the
conditional path.
* If `BaseSkim.skim_event_cuts` has not been run, then the main analysis path is
returned.
"""
if not self._MainPath:
raise ValueError("Skim has not been added to the path yet!")
return self._ConditionalPath or self._MainPath
SkimLists = []
"""
List of particle lists reconstructed by the skim. This attribute should only be
accessed after running the ``__call__`` method.
"""
_MainPath = None
"""Main analysis path."""
_ConditionalPath = None
"""Conditional path to be set by `BaseSkim.skim_event_cuts` if event-level cuts are applied."""
[docs]
def skim_event_cuts(self, cut, *, path):
"""Apply event-level cuts in a skim-safe way.
Parameters:
cut (str): Event-level cut to be applied.
path (basf2.Path): Skim path to be processed.
Returns:
Path on which the rest of this skim should be processed.
On this path, only events which passed the event-level cut will
be processed further.
.. Tip::
If running this function in `BaseSkim.additional_setup` or
`BaseSkim.build_lists`, redefine the ``path`` to the path returned by
`BaseSkim.skim_event_cuts`, *e.g.*
.. code-block:: python
def build_lists(self, path):
path = self.skim_event_cuts("nTracks>4", path=path)
# rest of skim list building...
.. Note::
The motivation for using this function over `applyEventCuts` is that
`applyEventCuts` completely removes events from processing. If we combine
multiple skims in a single steering file (which is done in production), and
the first has a set of event-level cuts, then all the remaining skims will
never even see those events.
Internally, this function creates a new path, which is only processed for
events passing the event-level cut. To avoid issues around particles not
being available on the main path (leading to noisy error logs), we need to
add the rest of the skim to this path. So this new path is assigned to the
attribute ``BaseSkim._ConditionalPath``, and ``BaseSkim.__call__`` will run
all remaining methods on this path.
"""
if self._ConditionalPath is not None:
b2.B2FATAL(
"BaseSkim.skim_event_cuts cannot be run twice in one skim. "
"Please join your event-level cut strings into a single string."
)
ConditionalPath = b2.Path()
self._ConditionalPath = ConditionalPath
eselect = path.add_module("VariableToReturnValue", variable=f"passesEventCut({cut})")
eselect.if_value('=1', ConditionalPath, b2.AfterConditionPath.CONTINUE)
return ConditionalPath
@property
def TestFiles(self):
"""
Location of test MDST sample. To modify this, set the property
`BaseSkim.TestSampleProcess`, and this function will find an appropriate test
sample from the list in
``/group/belle2/dataprod/MC/SkimTraining/SampleLists/TestFiles.yaml``
If no sample can be found, an empty list is returned.
"""
try:
return [str(get_test_file(process=self.TestSampleProcess))]
except FileNotFoundError:
# Could not find TestFiles.yaml
# (Don't issue a warning, since this will just show up as noise during grid processing)
return []
except ValueError:
b2.B2WARNING(
f"Could not find '{self.TestSampleProcess}' sample in TestFiles.yaml"
)
# Could not find sample in YAML file
return []
@property
def flag(self):
"""
Event-level variable indicating whether an event passes the skim or not. To use
the skim flag without writing uDST output, use the argument ``udstOutput=False``
when instantiating the skim class.
"""
return f"passes_{self}"
[docs]
def initialise_skim_flag(self, path):
"""
Add the module `skim.utils.flags.InitialiseSkimFlag` to the path, which
initialises flag for this skim to zero.
"""
path.add_module(InitialiseSkimFlag(self))
[docs]
def update_skim_flag(self, path):
"""
Add the module `skim.utils.flags.UpdateSkimFlag` to the path, which
updates flag for this skim.
.. Warning::
If a conditional path has been created before this, then this function
*must* run on the conditional path, since the skim lists are not guaranteed
to exist for all events on the main path.
"""
path.add_module(UpdateSkimFlag(self))
def _method_unchanged(self, method):
"""Check if the method of the class is the same as in its parent class, or if it has
been overridden.
Useful for determining if *e.g.* `validation_histograms` has been defined for a
particular skim.
"""
cls = self.__class__
ParentsWithAttr = [parent for parent in cls.__mro__[1:] if hasattr(parent, method)]
if ParentsWithAttr:
# Look for oldest ancestor which as that attribute, to handle inheritance.
# In the case of `validation_histograms`, this will be `BaseSkim`.
OldestParentWithAttr = ParentsWithAttr[-1]
return getattr(cls, method) == getattr(OldestParentWithAttr, method)
else:
return False
def __str__(self):
return self.name
def __name__(self):
return self.name
[docs]
def set_skim_logging(self):
"""Turns the log level to ERROR for selected modules to decrease the total size
of the skim log files. Additional modules can be silenced by setting the attribute
`NoisyModules` for an individual skim.
Parameters:
path (basf2.Path): Skim path to be processed.
.. warning::
This method works by inspecting the modules added to the path, and setting
the log level to ERROR. This method should be called *after* all
skim-related modules are added to the path.
"""
b2.set_log_level(b2.LogLevel.INFO)
NoisyModules = ["ParticleLoader", "ParticleVertexFitter"] + self.NoisyModules
# Set log level of modules on both main path and conditional path
paths = filter(None, (self._MainPath, self._ConditionalPath))
modules = [module for path in paths for module in path.modules()]
for module in modules:
if module.type() in set(NoisyModules):
module.set_log_level(b2.LogLevel.ERROR)
[docs]
def output_udst(self, path):
"""Write the skim particle lists to an output uDST and print a summary of the
skim list statistics.
Parameters:
path (basf2.Path): Skim path to be processed.
"""
# Keep this import here to avoid ROOT hijacking the argument parser
import udst # noqa
# Make a fuss if self.SkimLists is empty
if len(self.SkimLists) == 0:
b2.B2FATAL(
f"No skim list names defined in self.SkimLists for {self} skim!"
)
udst.add_skimmed_udst_output(
skimDecayMode=self.code,
skimParticleLists=self.SkimLists,
outputFile=self.OutputFileName,
dataDescription=self.additionalDataDescription,
mc=self.mc,
path=path,
)
summaryOfLists(self.SkimLists, path=path)
[docs]
def apply_hlt_hadron_cut_if_required(self, path):
"""Apply the ``hlt_hadron`` selection if the property ``ApplyHLTHadronCut`` is True.
Parameters:
path (basf2.Path): Skim path to be processed.
"""
hlt_hadron = "SoftwareTriggerResult(software_trigger_cut&skim&accept_hadron)"
if self.ApplyHLTHadronCut:
for SkimList in self.SkimLists:
applyCuts(SkimList, f"{hlt_hadron}==1", path=path)
[docs]
class CombinedSkim(BaseSkim):
"""Class for creating combined skims which can be run using similar-looking methods
to `BaseSkim` objects.
A steering file which combines skims can be as simple as the following:
.. code-block:: python
import basf2 as b2
import modularAnalysis as ma
from skim.WGs.foo import OneSkim, TwoSkim, RedSkim, BlueSkim
path = b2.Path()
ma.inputMdstList([], path=path)
skims = CombinedSkim(OneSkim(), TwoSkim(), RedSkim(), BlueSkim())
skims(path) # load standard lists, create skim lists, and save to uDST
path.process()
When skims are combined using this class, the `BaseSkim.NoisyModules` lists of each
skim are combined and all silenced.
The heavy-lifting functions `BaseSkim.additional_setup`, `BaseSkim.build_lists` and
`BaseSkim.output_udst` are modified to loop over the corresponding functions of each
individual skim. The `load_standard_lists` method is also modified to load all
required lists, without accidentally loading a list twice.
Calling an instance of the `CombinedSkim` class will load all the required particle
lists, then run all the setup steps, then the list building functions, and then all
the output steps.
"""
__authors__ = ["Phil Grace"]
__description__ = None
__category__ = "combined"
__contact__ = None
def __init__(
self,
*skims,
NoisyModules=None,
additionalDataDescription=None,
udstOutput=None,
mdstOutput=False,
mdst_kwargs=None,
CombinedSkimName="CombinedSkim",
OutputFileName=None,
mc=None,
analysisGlobaltag=None,
pidGlobaltag=None,
):
"""Initialise the CombinedSkim class.
Parameters:
*skims (BaseSkim): One or more (instantiated) skim objects.
NoisyModules (list(str)): Additional modules to silence.
additionalDataDescription (dict): Overrides corresponding setting of all individual skims.
udstOutput (bool): Overrides corresponding setting of all individual skims.
mdstOutput (bool): Write a single MDST output file containing events which
pass any of the skims in this combined skim.
mdst_kwargs (dict): kwargs to be passed to `mdst.add_mdst_output`. Only used
if ``mdstOutput`` is True.
CombinedSkimName (str): Sets output of ``__str__`` method of this combined skim.
OutputFileName (str): If mdstOutput=True, this option sets the name of the combined output file.
If mdstOutput=False, this option does nothing.
mc (bool): If True, include MC quantities in output.
analysisGlobaltag (str): Analysis globaltag.
pidGlobaltag (str): PID globaltag.
"""
if NoisyModules is None:
NoisyModules = []
# Check that we were passed only BaseSkim objects
if not all([isinstance(skim, BaseSkim) for skim in skims]):
raise NotImplementedError(
"Must pass only `BaseSkim` type objects to `CombinedSkim`."
)
self.Skims = skims
self.name = CombinedSkimName
for skim in self:
skim.NoisyModules += NoisyModules
# empty but needed for functions inherited from baseSkim to work
self.SkimLists = []
if additionalDataDescription is not None:
for skim in self:
skim.additionalDataDescription = additionalDataDescription
self._udstOutput = udstOutput
if udstOutput is not None:
for skim in self:
skim._udstOutput = udstOutput
self.mc = mc
if mc is not None:
for skim in self:
skim.mc = mc
self.analysisGlobaltag = analysisGlobaltag
if analysisGlobaltag is not None:
for skim in self:
skim.analysisGlobaltag = analysisGlobaltag
self.pidGlobaltag = pidGlobaltag
if pidGlobaltag is not None:
for skim in self:
skim.pidGlobaltag = pidGlobaltag
self._mdstOutput = mdstOutput
self.mdst_kwargs = mdst_kwargs or {}
self.mdst_kwargs.update(OutputFileName=OutputFileName)
if mc is not None:
self.mdst_kwargs.update(mc=mc)
self.merge_data_structures()
def __str__(self):
return self.name
def __name__(self):
return self.name
def __call__(self, path):
for skim in self:
skim._MainPath = path
self.initialise_skim_flag(path)
self.load_standard_lists(path)
self.additional_setup(path)
self.build_lists(path)
self.apply_hlt_hadron_cut_if_required(path)
self.update_skim_flag(path)
self._check_duplicate_list_names()
self.output_udst(path)
self.output_mdst_if_any_flag_passes(path=path, **self.mdst_kwargs)
self.set_skim_logging()
def __iter__(self):
yield from self.Skims
[docs]
def load_standard_lists(self, path):
"""Add all required standard list loading to the path.
Note:
To avoid loading standard lists twice, this function creates dummy paths
that are passed through ``load_standard_lists`` for each skim. These dummy
paths are then inspected, and a list of unique module-parameter combinations
is added to the main skim path.
Parameters:
path (basf2.Path): Skim path to be processed.
"""
ModulesAndParams = []
for skim in self:
DummyPath = b2.Path()
skim.load_standard_lists(DummyPath)
# Create a hashable data object to store the information about which
# standard lists have been added to the path.
ModulesAndParams.extend(tuple([
(
module.type(),
tuple(sorted(
(param.name, _hashable_list(param.values) if isinstance(param.values, list) else param.values)
for param in module.available_params()
if param.values != param.default
)),
)
for module in DummyPath.modules()
]))
# Take this data structure and convert it to a dict. This removes any duplicate entries.
ModulesAndParams = dict.fromkeys(ModulesAndParams)
# Add the (now unique) module+param combinations to the main path
for module, params in ModulesAndParams:
path.add_module(module, **dict(params))
[docs]
def additional_setup(self, path):
"""Run the `BaseSkim.additional_setup` function of each skim.
Parameters:
path (basf2.Path): Skim path to be processed.
"""
for skim in self:
skim.additional_setup(path)
[docs]
def build_lists(self, path):
"""Run the `BaseSkim.build_lists` function of each skim.
Parameters:
path (basf2.Path): Skim path to be processed.
"""
for skim in self:
skim.SkimLists = skim.build_lists(skim._ConditionalPath or path)
[docs]
def output_udst(self, path):
"""Run the `BaseSkim.output_udst` function of each skim.
Parameters:
path (basf2.Path): Skim path to be processed.
"""
for skim in self:
if skim._udstOutput:
skim.output_udst(skim._ConditionalPath or path)
[docs]
def output_mdst_if_any_flag_passes(self, *, path, **kwargs):
"""
Add MDST output to the path if the event passes any of the skim flags.
EventExtraInfo is included in the MDST output so that the flags are available in
the output.
The ``CombinedSkimName`` parameter in the `CombinedSkim` initialisation is used
for the output filename if ``filename`` is not included in kwargs.
Parameters:
path (basf2.Path): Skim path to be processed.
**kwargs: Passed on to `mdst.add_mdst_output`.
"""
from mdst import add_mdst_output
if not self._mdstOutput:
return
sum_flags = " + ".join(f"eventExtraInfo({f})" for f in self.flags)
variable = f"formula({sum_flags})"
passes_flag_path = b2.Path()
passes_flag = path.add_module("VariableToReturnValue", variable=variable)
passes_flag.if_value(">0", passes_flag_path, b2.AfterConditionPath.CONTINUE)
filename = kwargs.get("filename", kwargs.get("OutputFileName", self.code))
if filename is None:
filename = self.code
if not filename.endswith(".mdst.root"):
filename += ".mdst.root"
kwargs["filename"] = filename
if "OutputFileName" in kwargs.keys():
del kwargs["OutputFileName"]
kwargs.setdefault("dataDescription", {})
# If the combinedSkim is not in the registry getting the code will throw a LookupError.
# There is no requirement that a combinedSkim with single MDST output is
# registered so set the skimDecayMode to ``None`` if no code is defined.
try:
skim_code = self.code
except LookupError:
skim_code = None
kwargs["dataDescription"].setdefault("skimDecayMode", skim_code)
try:
kwargs["additionalBranches"] += ["EventExtraInfo"]
except KeyError:
kwargs["additionalBranches"] = ["EventExtraInfo"]
add_mdst_output(path=passes_flag_path, **kwargs)
[docs]
def apply_hlt_hadron_cut_if_required(self, path):
"""Run the `BaseSkim.apply_hlt_hadron_cut_if_required` function for each skim.
Parameters:
path (basf2.Path): Skim path to be processed.
"""
for skim in self:
skim.apply_hlt_hadron_cut_if_required(skim._ConditionalPath or path)
[docs]
def set_skim_logging(self):
"""Run `BaseSkim.set_skim_logging` for each skim."""
for skim in self:
skim.set_skim_logging()
@property
def TestFiles(self):
return list({f for skim in self for f in skim.TestFiles})
@property
def flags(self):
"""
List of flags for each skim in combined skim.
"""
return [skim.flag for skim in self]
@property
def flag(self):
"""
Event-level variable indicating whether an event passes the combinedSkim or not.
"""
return f"passes_{self}"
[docs]
def initialise_skim_flag(self, path):
"""
Add the module `skim.utils.flags.InitialiseSkimFlag` to the path, to
initialise flags for each skim.
"""
path.add_module(InitialiseSkimFlag(*self))
[docs]
def update_skim_flag(self, path):
"""
Add the module `skim.utils.flags.UpdateSkimFlag` to the conditional path
of each skims.
"""
for skim in self:
skim.postskim_path.add_module(UpdateSkimFlag(skim))
@property
def produce_on_tau_samples(self):
"""
Corresponding value of this attribute for each individual skim.
A warning is issued if the individual skims in combined skim contain a mix of
True and False for this property.
"""
produce_on_tau = [skim.produce_on_tau_samples for skim in self]
if all(produce_on_tau):
return True
elif all(not TauBool for TauBool in produce_on_tau):
return False
else:
warnings.warn(
(
"The individual skims in the combined skim contain a mix of True and "
"False for the attribute `produce_on_tau_samples`.\n The default in "
"this case is to allow the combined skim to be produced on tau samples.\n"
" Skims included in the problematic combined skim: "
f"{', '.join(skim.name for skim in self)}"
),
RuntimeWarning,
)
return True
[docs]
def merge_data_structures(self):
"""Read the values of `BaseSkim.MergeDataStructures` and merge data structures
accordingly.
For example, if ``MergeDataStructures`` has the value ``{"FEIChannelArgs":
_merge_boolean_dicts.__func__}``, then ``_merge_boolean_dicts`` is run on all
input skims with the attribute ``FEIChannelArgs``, and the value of
``FEIChannelArgs`` for that skim is set to the result.
In the FEI skims, this is used to merge configs which are passed to a cached
function, thus allowing us to apply the FEI once with all the required particles
available.
"""
for iSkim, skim in enumerate(self.Skims):
for attribute, MergingFunction in skim.MergeDataStructures.items():
SkimsWithAttribute = [skim for skim in self if hasattr(skim, attribute)]
setattr(
self.Skims[iSkim],
attribute,
MergingFunction(*[getattr(skim, attribute) for skim in SkimsWithAttribute])
)
def _check_duplicate_list_names(self):
"""Check for duplicate particle list names.
.. Note::
Skims cannot be relied on to define their particle list names in advance, so
this function can only be run after `build_lists` is run.
"""
ParticleListLists = [skim.SkimLists for skim in self]
ParticleLists = [lst for L in ParticleListLists for lst in L]
DuplicatedParticleLists = {
ParticleList
for ParticleList in ParticleLists
if ParticleLists.count(ParticleList) > 1
}
if DuplicatedParticleLists:
raise ValueError(
f"Non-unique output particle list names in combined skim! "
f"{', '.join(DuplicatedParticleLists)}"
)