Source code for skim.utils.misc
#!/usr/bin/env python3
##########################################################################
# basf2 (Belle II Analysis Software Framework) #
# Author: The Belle II Collaboration #
# #
# See git log for contributors and copyright holders. #
# This file is licensed under LGPL-3.0, see LICENSE.md. #
##########################################################################
"""
Miscellaneous utility functions for skim experts.
"""
import subprocess
import json
import re
from pathlib import Path
from skim.registry import Registry
[docs]
def get_file_metadata(filename):
"""
Retrieve the metadata for a file using ``b2file-metadata-show``.
Parameters:
metadata (str): File to get number of events from.
Returns:
dict: Metadata of file in dict format.
"""
if not Path(filename).exists():
raise FileNotFoundError(f"Could not find file {filename}")
proc = subprocess.run(
["b2file-metadata-show", "--json", str(filename)],
stdout=subprocess.PIPE,
check=True,
)
metadata = json.loads(proc.stdout.decode("utf-8"))
return metadata
[docs]
def get_eventN(filename):
"""
Retrieve the number of events in a file using ``b2file-metadata-show``.
Parameters:
filename (str): File to get number of events from.
Returns:
int: Number of events in the file.
"""
return int(get_file_metadata(filename)["nEvents"])
[docs]
def resolve_skim_modules(SkimsOrModules, *, LocalModule=None):
"""
Produce an ordered list of skims, by expanding any Python skim module names into a
list of skims in that module. Also produce a dict of skims grouped by Python module.
Raises:
RuntimeError: Raised if a skim is listed twice.
ValueError: Raised if ``LocalModule`` is passed and skims are normally expected
from more than one module.
"""
skims = []
for name in SkimsOrModules:
if name in Registry.names:
skims.append(name)
elif name in Registry.modules:
skims.extend(Registry.get_skims_in_module(name))
duplicates = {skim for skim in skims if skims.count(skim) > 1}
if duplicates:
raise RuntimeError(
f"Skim{'s'*(len(duplicates)>1)} requested more than once: {', '.join(duplicates)}"
)
modules = sorted({Registry.get_skim_module(skim) for skim in skims})
if LocalModule:
if len(modules) > 1:
raise ValueError(
f"Local module {LocalModule} specified, but the combined skim expects "
"skims from more than one module. No steering file written."
)
modules = {LocalModule.rstrip(".py"): sorted(skims)}
else:
modules = {
module: sorted(
[skim for skim in skims if Registry.get_skim_module(skim) == module]
)
for module in modules
}
return skims, modules
class _hashable_list(list):
def __hash__(self):
return hash(tuple(self))
def _sphinxify_decay(decay_string):
"""Format the given decay string by using LaTeX commands instead of plain-text.
Output is formatted for use with Sphinx (ReStructured Text).
This is a utility function for autogenerating skim documentation.
Parameters:
decay_string (str): A decay descriptor.
Returns:
sphinxed_string (str): LaTeX version of the decay descriptor.
"""
decay_string = re.sub("^(B.):generic", "\\1_{\\\\text{had}}", decay_string)
decay_string = decay_string.replace(":generic", "")
decay_string = decay_string.replace(":semileptonic", "_{\\text{SL}}")
decay_string = decay_string.replace(":FSP", "_{FSP}")
decay_string = decay_string.replace(":V0", "_{V0}")
decay_string = re.sub("_[0-9]+", "", decay_string)
# Note: these are applied from top to bottom, so if you have
# both B0 and anti-B0, put anti-B0 first.
substitutes = [
("==>", "\\to"),
("->", "\\to"),
("gamma", "\\gamma"),
("p+", "p"),
("anti-p-", "\\bar{p}"),
("pi+", "\\pi^+"),
("pi-", "\\pi^-"),
("pi0", "\\pi^0"),
("K_S0", "K^0_S"),
("K_L0", "K^0_L"),
("mu+", "\\mu^+"),
("mu-", "\\mu^-"),
("tau+", "\\tau^+"),
("tau-", "\\tau^-"),
("nu", "\\nu"),
("K+", "K^+"),
("K-", "K^-"),
("e+", "e^+"),
("e-", "e^-"),
("J/psi", "J/\\psi"),
("anti-Lambda_c-", "\\Lambda^{-}_{c}"),
("anti-Sigma+", "\\overline{\\Sigma}^{+}"),
("anti-Lambda0", "\\overline{\\Lambda}^{0}"),
("anti-D0*", "\\overline{D}^{0*}"),
("anti-D*0", "\\overline{D}^{0*}"),
("anti-D0", "\\overline{D}^0"),
("anti-B0", "\\overline{B}^0"),
("Sigma+", "\\Sigma^{+}"),
("Lambda_c+", "\\Lambda^{+}_{c}"),
("Lambda0", "\\Lambda^{0}"),
("D+", "D^+"),
("D-", "D^-"),
("D0", "D^0"),
("D*+", "D^{+*}"),
("D*-", "D^{-*}"),
("D*0", "D^{0*}"),
("D_s+", "D^+_s"),
("D_s-", "D^-_s"),
("D_s*+", "D^{+*}_s"),
("D_s*-", "D^{-*}_s"),
("B+", "B^+"),
("B-", "B^-"),
("B0", "B^0"),
("B_s0", "B^0_s"),
("K*0", "K^{0*}"),
]
tex_string = decay_string
for (key, value) in substitutes:
tex_string = tex_string.replace(key, value)
return f":math:`{tex_string}`"
[docs]
def fancy_skim_header(SkimClass):
"""Decorator to generate a fancy header to skim documentation and prepend it to the
docstring. Add this just above the definition of a skim.
Also ensures the documentation of the template functions like `BaseSkim.build_lists`
is not repeated in every skim documentation.
.. code-block:: python
@fancy_skim_header
class MySkimName(BaseSkim):
# docstring here describing your skim, and explaining cuts.
"""
SkimName = SkimClass.__name__
SkimCode = Registry.encode_skim_name(SkimName)
authors = SkimClass.__authors__ or ["(no authors listed)"]
description = SkimClass.__description__ or "(no description)"
contact = SkimClass.__contact__ or "(no contact listed)"
category = SkimClass.__category__ or "(no category listed)"
if isinstance(authors, str):
# If we were given a string, split it up at: commas, "and", "&", and newlines
authors = re.split(
r",\s+and\s+|\s+and\s+|,\s+&\s+|\s+&\s+|,\s+|\s*\n\s*", authors
)
# Strip any remaining whitespace either side of an author's name
authors = [re.sub(r"^\s+|\s+$", "", author) for author in authors]
if isinstance(category, list):
category = ", ".join(category)
# If multiple contacts were given, split them up:
contacts = re.split(r",\s+and\s+|\s+and\s+|,\s+&\s+|\s+&\s+|,\s+|\s*\n\s*", contact)
# If the contact is of the form "NAME <EMAIL>" or "NAME (EMAIL)", then make it a link
matches = [re.match("([^<>()`]+) [<(]([^<>()`]+@[^<>()`]+)[>)]", contact) for contact in contacts]
for i, match in enumerate(matches):
if match:
name, email = match[1], match[2]
contacts[i] = f"`{name} <mailto:{email}>`_"
else:
contacts[i] = contacts[i]
header = f"""
Note:
* **Skim description**: {description}
* **Skim name**: {SkimName}
* **Skim LFN code**: {SkimCode}
* **Category**: {category}
* **Author{"s"*(len(authors) > 1)}**: {", ".join(authors)}
* **Contact{"s"*(len(contacts) > 1)}**: {", ".join(contacts)}
"""
if SkimClass.ApplyHLTHadronCut:
HLTLine = "*This skim includes a selection on the HLT flag* ``hlt_hadron``."
header = f"{header.rstrip()}\n\n {HLTLine}\n"
if SkimClass.__doc__:
SkimClass.__doc__ = header + "\n\n" + SkimClass.__doc__.lstrip("\n")
else:
# Handle case where docstring is empty, or was not redefined
SkimClass.__doc__ = header
# If documentation of template functions not redefined, make sure BaseSkim docstring is not repeated
SkimClass.load_standard_lists.__doc__ = SkimClass.load_standard_lists.__doc__ or ""
SkimClass.build_lists.__doc__ = SkimClass.build_lists.__doc__ or ""
SkimClass.validation_histograms.__doc__ = (
SkimClass.validation_histograms.__doc__ or ""
)
SkimClass.additional_setup.__doc__ = SkimClass.additional_setup.__doc__ or ""
return SkimClass
[docs]
def dry_run_steering_file(SteeringFile):
"""
Check if the steering file at the given path can be run with the "--dry-run" option.
"""
proc = subprocess.run(
["basf2", "--dry-run", "-i", "i.root", "-o", "o.root", str(SteeringFile)],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
)
if proc.returncode != 0:
stdout = proc.stdout.decode("utf-8")
stderr = proc.stderr.decode("utf-8")
raise RuntimeError(
f"An error occurred while dry-running steering file {SteeringFile}\n"
f"Script output:\n{stdout}\n{stderr}"
)