#!/usr/bin/env python3
# =============================================================================
# @file phasespace.py
# @author Albert Puig (albert.puig@cern.ch)
# @date 25.02.2019
# =============================================================================
"""Implementation of the Raubold and Lynch method to generate n-body events.
The code is based on the GENBOD function (W515 from CERNLIB), documented in:
F. James, Monte Carlo Phase Space, CERN 68-15 (1968)
"""
from __future__ import annotations
import inspect
import warnings
from collections.abc import Callable
from math import pi
import tensorflow as tf
import tensorflow.experimental.numpy as tnp
from . import kinematics as kin
from .backend import function, function_jit_fixedshape
from .random import SeedLike, get_rng
RELAX_SHAPES = False
[docs]def process_list_to_tensor(lst):
"""Convert a list to a tensor.
The list is converted to a tensor and transposed to get the proper shape.
Note:
If `lst` is a tensor, nothing is done to it other than convert it to `tf.float64`.
Arguments:
lst (list): List to convert.
Return:
~`tf.Tensor`
"""
if isinstance(lst, list):
lst = tnp.transpose(tnp.asarray(lst, dtype=tnp.float64))
return tnp.asarray(lst, dtype=tnp.float64)
[docs]@function
def pdk(a, b, c):
"""Calculate the PDK (2-body phase space) function.
Based on Eq. (9.17) in CERN 68-15 (1968).
Arguments:
a (~`tf.Tensor`): :math:`M_{i+1}` in Eq. (9.17).
b (~`tf.Tensor`): :math:`M_{i}` in Eq. (9.17).
c (~`tf.Tensor`): :math:`m_{i+1}` in Eq. (9.17).
Return:
~`tf.Tensor`
"""
x = (a - b - c) * (a + b + c) * (a - b + c) * (a + b - c)
return tnp.sqrt(x) / (tnp.asarray(2.0, dtype=tnp.float64) * a)
[docs]class GenParticle:
"""Representation of a particle.
Instances of this class can be combined with each other to build decay chains,
which can then be used to generate phase space events through the `generate`
or `generate_tensor` method.
A `GenParticle` must have
+ a `name`, which is ensured not to clash with any others in
the decay chain.
+ a `mass`, which can be either a number or a function to generate it according to
a certain distribution. The returned ~`tf.Tensor` needs to have shape (nevents,).
In this case, the particle is not considered as having a
fixed mass and the `has_fixed_mass` method will return False.
It may also have:
+ Children, ie, decay products, which are also `GenParticle` instances.
Arguments:
name (str): Name of the particle.
mass (float, ~`tf.Tensor`, callable): Mass of the particle. If it's a float, it get
converted to a `tf.constant`.
"""
def __init__(self, name: str, mass: Callable | int | float) -> None: # noqa
self.name = name
self.children = []
self._mass_val = mass
if not callable(mass) and not isinstance(mass, tf.Variable):
mass = tnp.asarray(mass, dtype=tnp.float64)
else:
mass = tf.function(
mass, autograph=False, experimental_relax_shapes=RELAX_SHAPES
)
self._mass = mass
self._generate_called = False # not yet called, children can be set
def __repr__(self):
return "<phasespace.GenParticle: name='{}' mass={} children=[{}]>".format(
self.name,
f"{self._mass_val:.2f}" if self.has_fixed_mass else "variable",
", ".join(child.name for child in self.children),
)
def _do_names_clash(self, particles):
def get_list_of_names(part):
output = [part.name]
for child in part.children:
output.extend(get_list_of_names(child))
return output
names_to_check = [self.name]
for part in particles:
names_to_check.extend(get_list_of_names(part))
# Find top
dup_names = {name for name in names_to_check if names_to_check.count(name) > 1}
if dup_names:
return dup_names
return None
[docs] @function
def get_mass(
self,
min_mass: tf.Tensor = None,
max_mass: tf.Tensor = None,
n_events: tf.Tensor | tf.Variable = None,
seed: SeedLike = None,
) -> tf.Tensor:
"""Get the particle mass.
If the particle is resonant, the mass function will be called with the
`min_mass`, `max_mass`, `n_events` and optionally a `seed` parameter.
Arguments:
min_mass (tensor): Lower mass range. Defaults to None, which
is only valid in the case of fixed mass.
max_mass (tensor): Upper mass range. Defaults to None, which
is only valid in the case of fixed mass.
n_events (): Number of events to produce. Has to be specified if the particle is resonant.
seed (`SeedLike`): The seed can be a number or a `tf.random.Generator` that are used
as a seed to create a random number generator inside the function or directly as
the random number generator instance, respectively.
Return:
~`tf.Tensor`: Mass of the particles, either a scalar or shape (nevents,)
Raise:
ValueError: If the mass is requested and has not been set.
"""
if self.has_fixed_mass:
mass = self._mass
else:
seed = get_rng(seed)
min_mass = tnp.reshape(min_mass, (n_events,))
max_mass = tnp.reshape(max_mass, (n_events,))
signature = inspect.signature(self._mass)
if "seed" in signature.parameters:
mass = self._mass(min_mass, max_mass, n_events, seed=seed)
else:
mass = self._mass(min_mass, max_mass, n_events)
return mass
@property
def has_fixed_mass(self):
"""bool: Is the mass a callable function?"""
return not callable(self._mass)
[docs] def set_children(self, *children):
"""Assign children.
Arguments:
children (GenParticle): Two or more children to assign to the current particle.
Return:
self
Raise:
ValueError: If there is an inconsistency in the parent/children relationship, ie,
if children were already set, if their parent was or if less than two children were given.
KeyError: If there is a particle name clash.
RuntimeError: If `generate` was already called before.
"""
# self._set_cache_validity(False)
if self._generate_called:
raise RuntimeError(
"Cannot set children after the first call to `generate`."
)
if self.children:
raise ValueError("Children already set!")
if len(children) <= 1:
raise ValueError(
f"Have to set at least 2 children, not {len(children)} for a particle to decay"
)
# Check name clashes
name_clash = self._do_names_clash(children)
if name_clash:
raise KeyError(f"Particle name {name_clash} already used")
self.children = children
return self
@property
def has_children(self):
"""bool: Does the particle have children?"""
return bool(self.children)
@property
def has_grandchildren(self):
"""bool: Does the particle have grandchildren?"""
if not self.children:
return False
return any(child.has_children for child in self.children)
@staticmethod
def _preprocess(momentum, n_events):
"""Preprocess momentum input and determine number of events to generate.
Both `momentum` and `n_events` are converted to tensors if they
are not already.
Arguments:
`momentum`: Momentum vector, of shape (4, x), where x is optional.
`n_events`: Number of events to generate. If `None`, the number of events
to generate is calculated from the shape of `momentum`.
Return:
tuple: Processed `momentum` and `n_events`.
Raise:
tf.errors.InvalidArgumentError: If the number of events deduced from the
shape of `momentum` is inconsistent with `n_events`.
"""
momentum = process_list_to_tensor(momentum)
# Check sanity of inputs
if len(momentum.shape) not in (1, 2):
raise ValueError(f"Bad shape for momentum -> {list(momentum.shape)}")
# Check compatibility of inputs
if len(momentum.shape) == 2:
if n_events is not None:
momentum_shape = momentum.shape[0]
if momentum_shape is None:
momentum_shape = tf.shape(momentum)[0]
momentum_shape = tnp.asarray(momentum_shape, tnp.int64)
else:
momentum_shape = tnp.asarray(momentum_shape, dtype=tnp.int64)
tf.assert_equal(
n_events,
momentum_shape,
message="Conflicting inputs -> momentum_shape and n_events",
)
if n_events is None:
if len(momentum.shape) == 2:
n_events = momentum.shape[0]
if n_events is None: # dynamic shape
n_events = tf.shape(momentum)[0]
n_events = tnp.asarray(n_events, dtype=tnp.int64)
else:
n_events = tnp.asarray(1, dtype=tnp.int64)
n_events = tnp.asarray(n_events, dtype=tnp.int64)
# Now preparation of tensors
if len(momentum.shape) == 1:
momentum = tnp.expand_dims(momentum, axis=0)
return momentum, n_events
@staticmethod
@function_jit_fixedshape
def _get_w_max(available_mass, masses):
emmax = available_mass + tnp.take(masses, indices=[0], axis=1)
emmin = tnp.zeros_like(emmax, dtype=tnp.float64)
w_max = tnp.ones_like(emmax, dtype=tnp.float64)
for i in range(1, masses.shape[1]):
emmin += tnp.take(masses, [i - 1], axis=1)
emmax += tnp.take(masses, [i], axis=1)
w_max *= pdk(emmax, emmin, tnp.take(masses, [i], axis=1))
return w_max
def _generate(self, momentum, n_events, rng):
"""Generate a n-body decay according to the Raubold and Lynch method.
The number and mass of the children particles are taken from self.children.
Note:
This method generates the same results as the GENBOD routine.
Arguments:
momentum (tensor): Momentum of the parent particle. All generated particles
will be boosted to that momentum.
n_events (int): Number of events to generate.
Return:
tuple: Result of the generation (per-event weights, maximum weights, output particles
and their output masses).
"""
self._generate_called = True
if not self.children:
raise ValueError("No children have been configured")
p_top, n_events = self._preprocess(momentum, n_events)
top_mass = tnp.broadcast_to(kin.mass(p_top), (n_events, 1))
n_particles = len(self.children)
# Prepare masses
def recurse_stable(part):
output_mass = 0
for child in part.children:
if child.has_fixed_mass:
output_mass += child.get_mass()
else:
output_mass += recurse_stable(child)
return output_mass
mass_from_stable = tnp.broadcast_to(
tnp.sum(
[child.get_mass() for child in self.children if child.has_fixed_mass],
axis=0,
),
(n_events, 1),
)
max_mass = top_mass - mass_from_stable
masses = []
for child in self.children:
if child.has_fixed_mass:
masses.append(tnp.broadcast_to(child.get_mass(), (n_events, 1)))
else:
# Recurse that particle to know the minimum mass we need to generate
min_mass = tnp.broadcast_to(recurse_stable(child), (n_events, 1))
mass = child.get_mass(min_mass, max_mass, n_events)
mass = tnp.reshape(mass, (n_events, 1))
max_mass -= mass
masses.append(mass)
masses = tnp.concatenate(masses, axis=-1)
# if len(masses.shape) == 1:
# masses = tnp.expand_dims(masses, axis=0)
available_mass = top_mass - tnp.sum(masses, axis=1, keepdims=True)
tf.debugging.assert_greater_equal(
available_mass,
tnp.zeros_like(available_mass, dtype=tnp.float64),
message="Forbidden decay",
name="mass_check",
) # Calculate the max weight, initial beta, etc
w_max = self._get_w_max(available_mass, masses)
p_top_boost = kin.boost_components(p_top)
# Start the generation
random_numbers = rng.uniform((n_events, n_particles - 2), dtype=tnp.float64)
random = tnp.concatenate(
[
tnp.zeros((n_events, 1), dtype=tnp.float64),
tnp.sort(random_numbers, axis=1),
tnp.ones((n_events, 1), dtype=tnp.float64),
],
axis=1,
)
if random.shape[1] is None:
random.set_shape((None, n_particles))
# random = tnp.expand_dims(random, axis=-1)
sum_ = tnp.zeros((n_events, 1), dtype=tnp.float64)
inv_masses = []
# TODO(Mayou36): rewrite with cumsum?
for i in range(n_particles):
sum_ += tnp.take(masses, [i], axis=1)
inv_masses.append(tnp.take(random, [i], axis=1) * available_mass + sum_)
generated_particles, weights = self._generate_part2(
inv_masses, masses, n_events, n_particles, rng=rng
)
# Final boost of all particles
generated_particles = [
kin.lorentz_boost(part, p_top_boost) for part in generated_particles
]
return (
tnp.reshape(weights, (n_events,)),
tnp.reshape(w_max, (n_events,)),
generated_particles,
masses,
)
@staticmethod
@function
def _generate_part2(inv_masses, masses, n_events, n_particles, rng):
pds = []
# Calculate weights of the events
for i in range(n_particles - 1):
pds.append(
pdk(
inv_masses[i + 1],
inv_masses[i],
tnp.take(masses, [i + 1], axis=1),
)
)
weights = tnp.prod(pds, axis=0)
zero_component = tnp.zeros_like(pds[0], dtype=tnp.float64)
generated_particles = [
tnp.concatenate(
[
zero_component,
pds[0],
zero_component,
tnp.sqrt(
tnp.square(pds[0]) + tnp.square(tnp.take(masses, [0], axis=1))
),
],
axis=1,
)
]
part_num = 1
while True:
generated_particles.append(
tnp.concatenate(
[
zero_component,
-pds[part_num - 1],
zero_component,
tnp.sqrt(
tnp.square(pds[part_num - 1])
+ tnp.square(tnp.take(masses, [part_num], axis=1))
),
],
axis=1,
)
)
cos_z = tnp.asarray(2.0, dtype=tnp.float64) * rng.uniform(
(n_events, 1), dtype=tnp.float64
) - tnp.asarray(1.0, dtype=tnp.float64)
sin_z = tnp.sqrt(tnp.asarray(1.0, dtype=tnp.float64) - cos_z * cos_z)
ang_y = (
tnp.asarray(2.0, dtype=tnp.float64)
* tnp.asarray(pi, dtype=tnp.float64)
* rng.uniform((n_events, 1), dtype=tnp.float64)
)
cos_y = tnp.cos(ang_y)
sin_y = tnp.sin(ang_y)
# Do the rotations
for j in range(part_num + 1):
px = kin.x_component(generated_particles[j])
py = kin.y_component(generated_particles[j])
# Rotate about z
# TODO(Mayou36): only list? will be overwritten below anyway, but can `*_component` handle it?
generated_particles[j] = tnp.concatenate(
[
cos_z * px - sin_z * py,
sin_z * px + cos_z * py,
kin.z_component(generated_particles[j]),
kin.time_component(generated_particles[j]),
],
axis=1,
)
# Rotate about y
px = kin.x_component(generated_particles[j])
pz = kin.z_component(generated_particles[j])
generated_particles[j] = tnp.concatenate(
[
cos_y * px - sin_y * pz,
kin.y_component(generated_particles[j]),
sin_y * px + cos_y * pz,
kin.time_component(generated_particles[j]),
],
axis=1,
)
if part_num == (n_particles - 1):
break
betas = pds[part_num] / tnp.sqrt(
tnp.square(pds[part_num]) + tnp.square(inv_masses[part_num])
)
generated_particles = [
kin.lorentz_boost(
part,
tnp.concatenate([zero_component, betas, zero_component], axis=1),
)
for part in generated_particles
]
part_num += 1
return generated_particles, weights
@function
def _recursive_generate(
self,
n_events,
boost_to=None,
recalculate_max_weights=False,
rng: SeedLike = None,
):
"""Recursively generate normalized n-body phase space as tensorflow tensors.
Events are generated in the rest frame of the particle, unless `boost_to` is given.
Note:
In this method, the event weights are returned normalized to their maximum.
Arguments:
n_events (int): Number of events to generate.
boost_to (tensor, optional): Momentum vector of shape (x, 4), where x is optional, to where
the resulting events will be boosted. If not specified, events are generated
in the rest frame of the particle.
recalculate_max_weights (bool, optional): Recalculate the maximum weight of the event
using all the particles of the tree? This is necessary for the top particle of a decay,
otherwise the maximum weight calculation is going to be wrong (particles from subdecays
would not be taken into account). Defaults to False.
seed (`SeedLike`): The seed can be a number or a `tf.random.Generator` that are used
as a seed to create a random number generator inside the function or directly as
the random number generator instance, respectively.
Return:
tuple: Result of the generation (per-event weights, maximum weights, output particles
and their output masses).
Raise:
tf.errors.InvalidArgumentError: If the the decay is kinematically forbidden.
ValueError: If `n_events` and the size of `boost_to` don't match.
See `GenParticle.generate_unnormalized`.
"""
if boost_to is not None:
momentum = boost_to
else:
if self.has_fixed_mass:
momentum = tnp.broadcast_to(
tnp.stack((0.0, 0.0, 0.0, self.get_mass()), axis=-1), (n_events, 4)
)
else:
raise ValueError("Cannot use resonance as top particle")
weights, weights_max, parts, children_masses = self._generate(
momentum, n_events, rng=rng
)
output_particles = {
child.name: parts[child_num]
for child_num, child in enumerate(self.children)
}
output_masses = {
child.name: tnp.take(children_masses, [child_num], axis=1)
for child_num, child in enumerate(self.children)
}
for child_num, child in enumerate(self.children):
if child.has_children:
(
child_weights,
_,
child_gen_particles,
child_masses,
) = child._recursive_generate(
n_events=n_events,
boost_to=parts[child_num],
recalculate_max_weights=False,
rng=rng,
)
weights *= child_weights
output_particles.update(child_gen_particles)
output_masses.update(child_masses)
if recalculate_max_weights:
def build_mass_tree(particle, leaf):
if particle.has_children:
leaf[particle.name] = {}
for child in particle.children:
build_mass_tree(child, leaf[particle.name])
else:
leaf[particle.name] = output_masses[particle.name]
def get_flattened_values(dict_):
output = []
for val in dict_.values():
if isinstance(val, dict):
output.extend(get_flattened_values(val))
else:
output.append(val)
return output
def recurse_w_max(parent_mass, current_mass_tree):
available_mass = parent_mass - sum(
get_flattened_values(current_mass_tree)
)
masses = []
w_max = tnp.ones_like(available_mass)
for child, child_mass in current_mass_tree.items():
if isinstance(child_mass, dict):
w_max *= recurse_w_max(
parent_mass
- sum(
get_flattened_values(
{
ch_it: ch_m_it
for ch_it, ch_m_it in current_mass_tree.items()
if ch_it != child
}
)
),
child_mass,
)
masses.append(sum(get_flattened_values(child_mass)))
else:
masses.append(child_mass)
masses = tnp.concatenate(masses, axis=1)
w_max *= self._get_w_max(available_mass, masses)
return w_max
mass_tree = {}
build_mass_tree(self, mass_tree)
momentum = process_list_to_tensor(momentum)
if len(momentum.shape) == 1:
momentum = tnp.expand_dims(momentum, axis=-1)
weights_max = tnp.reshape(
recurse_w_max(kin.mass(momentum), mass_tree[self.name]), (n_events,)
)
return weights, weights_max, output_particles, output_masses
[docs] def generate(
self,
n_events: int | tf.Tensor | tf.Variable,
boost_to: tf.Tensor | None = None,
normalize_weights: bool = True,
seed: SeedLike = None,
) -> tuple[tf.Tensor, dict[str, tf.Tensor]]:
"""Generate normalized n-body phase space as tensorflow tensors.
Any TensorFlow tensor can always be converted to a numpy array with the method `numpy()`.
Events are generated in the rest frame of the particle, unless `boost_to` is given.
Note:
In this method, the event weights are returned normalized to their maximum.
Arguments:
n_events (int): Number of events to generate.
boost_to (optional): Momentum vector of shape (x, 4), where x is optional, to where
the resulting events will be boosted. If not specified, events are generated
in the rest frame of the particle.
normalize_weights (bool, optional): Normalize the event weight to its max?
seed (`SeedLike`): The seed can be a number or a `tf.random.Generator` that are used
as a seed to create a random number generator inside the function or directly as
the random number generator instance, respectively.
Return:
tuple: Result of the generation, which varies with the value of `normalize_weights`:
+ If True, the tuple elements are the normalized event weights as a tensor of shape
(n_events, ), and the momenta generated particles as a dictionary of tensors of shape
(4, n_events) with particle names as keys.
+ If False, the tuple weights are the unnormalized event weights as a tensor of shape
(n_events, ), the maximum per-event weights as a tensor of shape (n_events, ) and the
momenta generated particles as a dictionary of tensors of shape (4, n_events) with particle
names as keys.
Raise:
tf.errors.InvalidArgumentError: If the the decay is kinematically forbidden.
ValueError: If `n_events` and the size of `boost_to` don't match.
See `GenParticle.generate_unnormalized`.
"""
rng = get_rng(seed)
if boost_to is not None:
message = (
f"The number of events requested ({n_events}) doesn't match the boost_to input size "
f"of {boost_to.shape}"
)
tf.assert_equal(tf.shape(boost_to)[0], tf.shape(n_events), message=message)
if not isinstance(n_events, tf.Variable):
n_events = tnp.asarray(n_events, dtype=tnp.int64)
weights, weights_max, parts, _ = self._recursive_generate(
n_events=n_events,
boost_to=boost_to,
recalculate_max_weights=self.has_grandchildren,
rng=rng,
)
return (
(weights / weights_max, parts)
if normalize_weights
else (weights, weights_max, parts)
)
[docs] def generate_tensor(
self,
n_events: int,
boost_to=None,
normalize_weights: bool = True,
):
"""Generate normalized n-body phase space as numpy arrays.
Events are generated in the rest frame of the particle, unless `boost_to` is given.
Note:
In this method, the event weights are returned normalized to their maximum.
Arguments:
n_events (int): Number of events to generate.
boost_to (optional): Momentum vector of shape (x, 4), where x is optional, to where
the resulting events will be boosted. If not specified, events are generated
in the rest frame of the particle.
normalize_weights (bool, optional): Normalize the event weight to its max
Return:
tuple: Result of the generation, which varies with the value of `normalize_weights`:
+ If True, the tuple elements are the normalized event weights as an array of shape
(n_events, ), and the momenta generated particles as a dictionary of arrays of shape
(4, n_events) with particle names as keys.
+ If False, the tuple weights are the unnormalized event weights as an array of shape
(n_events, ), the maximum per-event weights as an array of shape (n_events, ) and the
momenta generated particles as a dictionary of arrays of shape (4, n_events) with particle
names as keys.
Raise:
tf.errors.InvalidArgumentError: If the the decay is kinematically forbidden.
ValueError: If `n_events` and the size of `boost_to` don't match.
See `GenParticle.generate_unnormalized`.
"""
# Run generation
warnings.warn(
"This function is depreceated. Use `generate` which does not return a Tensor as well."
)
generate_tf = self.generate(n_events, boost_to, normalize_weights)
# self._cache = generate_tf
# self._set_cache_validity(True, propagate=True)
return generate_tf
# legacy class to warn user about name change
[docs]class Particle:
"""Deprecated Particle class.
Renamed to GenParticle.
"""
def __init__(self):
raise NameError(
"'Particle' has been renamed to 'GenParticle'. Please update your code accordingly."
"For more information, see: https://github.com/zfit/phasespace/issues/22"
)
[docs]def nbody_decay(mass_top: float, masses: list, top_name: str = "", names: list = None):
"""Shortcut to build an n-body decay of a GenParticle.
If the particle names are not given, the top particle is called 'top' and the
children 'p_{i}', where i corresponds to their position in the `masses` sequence.
Arguments:
mass_top (tensor, list): Mass of the top particle. Can be a list of 4-vectors.
masses (list): Masses of the child particles.
name_top (str, optional): Name of the top particle. If not given, the top particle is
named top.
names (list, optional): Names of the child particles. If not given, they are build as
'p_{i}', where i is given by their ordering in the `masses` list.
Return:
`GenParticle`: Particle decay.
Raise:
ValueError: If the length of `masses` and `names` doesn't match.
"""
if not top_name:
top_name = "top"
if not names:
names = [f"p_{num}" for num in range(len(masses))]
if len(names) != len(masses):
raise ValueError("Mismatch in length between children masses and their names.")
return GenParticle(top_name, mass_top).set_children(
*(GenParticle(names[num], mass=mass) for num, mass in enumerate(masses))
)
[docs]def generate_decay(*args, **kwargs):
"""Deprecated."""
raise NameError(
"'generate_decay' has been removed. A similar behavior can be accomplished with 'nbody_decay'. "
"For more information see https://github.com/zfit/phasespace/issues/22"
)
# EOF