Source code for phasespace.phasespace

#!/usr/bin/env python3
# =============================================================================
# @file   phasespace.py
# @author Albert Puig (albert.puig@cern.ch)
# @date   25.02.2019
# =============================================================================
"""Implementation of the Raubold and Lynch method to generate n-body events.

The code is based on the GENBOD function (W515 from CERNLIB), documented in:

F. James, Monte Carlo Phase Space, CERN 68-15 (1968)
"""

from __future__ import annotations

import functools
import inspect
from collections.abc import Callable
from math import pi
from typing import TYPE_CHECKING, NoReturn

import numpy as np
import tensorflow as tf
import tensorflow.experimental.numpy as tnp

from . import kinematics as kin
from .backend import function, function_jit_fixedshape
from .random import SeedLike, get_rng

if TYPE_CHECKING:
    import vector


RELAX_SHAPES = False


[docs] def process_list_to_tensor(lst): """Convert a list to a tensor. The list is converted to a tensor and transposed to get the proper shape. Notes: If ``lst`` is a tensor, nothing is done to it other than convert it to ``tf.float64``. Args: lst (list): List to convert. Returns: ``~tf.Tensor`` """ if isinstance(lst, list): lst = tnp.transpose(tnp.asarray(lst, dtype=tnp.float64)) return tnp.asarray(lst, dtype=tnp.float64)
[docs] @function def pdk(a, b, c): """Calculate the PDK (2-body phase space) function. Based on Eq. (9.17) in CERN 68-15 (1968). Args: a (``~tf.Tensor``): :math:`M_{i+1}` in Eq. (9.17). b (``~tf.Tensor``): :math:`M_{i}` in Eq. (9.17). c (``~tf.Tensor``): :math:`m_{i+1}` in Eq. (9.17). Returns: ``~tf.Tensor`` """ x = (a - b - c) * (a + b + c) * (a - b + c) * (a + b - c) return tnp.sqrt(x) / (tnp.asarray(2.0, dtype=tnp.float64) * a)
[docs] class GenParticle: """Representation of a particle. Instances of this class can be combined with each other to build decay chains, which can then be used to generate phase space events through the ``generate`` or ``generate_tensor`` method. A ``GenParticle`` must have - a ``name``, which is ensured not to clash with any others in the decay chain. - a ``mass``, which can be either a number or a function to generate it according to a certain distribution. The returned ``~tf.Tensor`` needs to have shape ``(nevents,)``. In this case, the particle is not considered as having a fixed mass and the ``has_fixed_mass`` method will return False. It may also have: - Children, ie, decay products, which are also ``GenParticle`` instances. Args: name (str): Name of the particle. mass (float, array-like, callable): Mass of the particle. If it's a float, it gets converted to array-like. """ def __init__( self, name: str, mass: Callable | int | float | np.typing.ArrayLike ) -> None: # noqa self.name = name self.children = [] self._mass_val = mass if callable(mass): @functools.wraps(mass) def mass_preprocessed(*args, mass=mass, **kwargs): return tnp.atleast_1d( tnp.asarray(mass(*args, **kwargs), dtype=tnp.float64) ) elif not isinstance(mass, tf.Variable): mass_preprocessed = tnp.atleast_1d(tnp.asarray(mass, dtype=tnp.float64)) else: mass_preprocessed = mass self._mass = mass_preprocessed self._generate_called = False # not yet called, children can be set def __repr__(self): return "<phasespace.GenParticle: name='{}' mass={} children=[{}]>".format( self.name, f"{self._mass_val:.2f}" if self.has_fixed_mass else "variable", ", ".join(child.name for child in self.children), ) def _do_names_clash(self, particles): def get_list_of_names(part): output = [part.name] for child in part.children: output.extend(get_list_of_names(child)) return output names_to_check = [self.name] for part in particles: names_to_check.extend(get_list_of_names(part)) # Find top dup_names = {name for name in names_to_check if names_to_check.count(name) > 1} if dup_names: return dup_names return None
[docs] @function def get_mass( self, min_mass: tf.Tensor = None, max_mass: tf.Tensor = None, n_events: tf.Tensor | tf.Variable = None, seed: SeedLike = None, ) -> tf.Tensor: """Get the particle mass. If the particle is resonant, the mass function will be called with the ``min_mass``, ``max_mass``, ``n_events`` and optionally a ``seed`` parameter. Args: min_mass (tensor): Lower mass range. Defaults to None, which is only valid in the case of fixed mass. max_mass (tensor): Upper mass range. Defaults to None, which is only valid in the case of fixed mass. n_events (): Number of events to produce. Has to be specified if the particle is resonant. seed (``SeedLike``): The seed can be a number or a ``tf.random.Generator`` that are used as a seed to create a random number generator inside the function or directly as the random number generator instance, respectively. Returns: ``~tf.Tensor``: Mass of the particles, either a scalar or shape ``(nevents,)`` Raises: ValueError: If the mass is requested and has not been set. """ if self.has_fixed_mass: mass = self._mass else: seed = get_rng(seed) min_mass = tnp.reshape(min_mass, (n_events,)) max_mass = tnp.reshape(max_mass, (n_events,)) signature = inspect.signature(self._mass) if "seed" in signature.parameters: mass = self._mass(min_mass, max_mass, n_events, seed=seed) else: mass = self._mass(min_mass, max_mass, n_events) return mass
@property def has_fixed_mass(self): """bool: Is the mass a callable function?""" return not callable(self._mass)
[docs] def set_children(self, *children): """Assign children. Args: children (GenParticle): Two or more children to assign to the current particle. Returns: self Raises: ValueError: If there is an inconsistency in the parent/children relationship, ie, if children were already set, if their parent was or if less than two children were given. KeyError: If there is a particle name clash. RuntimeError: If ``generate`` was already called before. """ # self._set_cache_validity(False) if self._generate_called: raise RuntimeError( "Cannot set children after the first call to `generate`." ) if self.children: raise ValueError("Children already set!") if len(children) <= 1: raise ValueError( f"Have to set at least 2 children, not {len(children)} for a particle to decay" ) # Check name clashes if name_clash := self._do_names_clash(children): raise KeyError(f"Particle name {name_clash} already used") self.children = children return self
@property def has_children(self): """bool: Does the particle have children?""" return bool(self.children) @property def has_grandchildren(self): """bool: Does the particle have grandchildren?""" if not self.children: return False return any(child.has_children for child in self.children) @staticmethod def _preprocess(momentum, n_events): """Preprocess momentum input and determine number of events to generate. Both ``momentum`` and ``n_events`` are converted to tensors if they are not already. Args: momentum: Momentum vector, of shape ``(4, x)``, where x is optional. n_events: Number of events to generate. If ``None``, the number of events to generate is calculated from the shape of ``momentum``. Returns: tuple: Processed ``momentum`` and ``n_events``. Raises: tf.errors.InvalidArgumentError: If the number of events deduced from the shape of ``momentum`` is inconsistent with ``n_events``. """ momentum = process_list_to_tensor(momentum) # Check sanity of inputs if len(momentum.shape) not in (1, 2): raise ValueError(f"Bad shape for momentum -> {list(momentum.shape)}") # Check compatibility of inputs if len(momentum.shape) == 2: if n_events is not None: momentum_shape = momentum.shape[0] if momentum_shape is None: momentum_shape = tf.shape(momentum)[0] momentum_shape = tnp.asarray(momentum_shape, tnp.int64) else: momentum_shape = tnp.asarray(momentum_shape, dtype=tnp.int64) # tf.assert_equal( # n_events, # momentum_shape, # message="Conflicting inputs -> momentum_shape and n_events", # ) if n_events is None: if len(momentum.shape) == 2: n_events = momentum.shape[0] if n_events is None: # dynamic shape n_events = tf.shape(momentum)[0] n_events = tnp.asarray(n_events, dtype=tnp.int64) else: n_events = tnp.asarray(1, dtype=tnp.int64) n_events = tnp.asarray(n_events, dtype=tnp.int64) # Now preparation of tensors if len(momentum.shape) == 1: momentum = tnp.expand_dims(momentum, axis=0) return momentum, n_events @staticmethod @function_jit_fixedshape def _get_w_max(available_mass, masses): emmax = available_mass + tnp.take(masses, indices=[0], axis=1) emmin = tnp.zeros_like(emmax, dtype=tnp.float64) w_max = tnp.ones_like(emmax, dtype=tnp.float64) for i in range(1, masses.shape[1]): emmin += tnp.take(masses, [i - 1], axis=1) emmax += tnp.take(masses, [i], axis=1) w_max *= pdk(emmax, emmin, tnp.take(masses, [i], axis=1)) return w_max def _generate(self, momentum, n_events, rng): """Generate an n-body decay according to the Raubold and Lynch method. The number and mass of the children particles are taken from self.children. Notes: This method generates the same results as the GENBOD routine. Args: momentum (tensor): Momentum of the parent particle. All generated particles will be boosted to that momentum. n_events (int): Number of events to generate. rng: Random number generator. Returns: tuple: Result of the generation (per-event weights, maximum weights, output particles and their output masses). """ self._generate_called = True if not self.children: raise ValueError("No children have been configured") p_top, n_events = self._preprocess(momentum, n_events) top_mass = tnp.broadcast_to(kin.mass(p_top), (n_events, 1)) n_particles = len(self.children) # Prepare masses def recurse_stable(part): output_mass = 0 for child in part.children: if child.has_fixed_mass: output_mass += child.get_mass() else: output_mass += recurse_stable(child) return output_mass mass_from_stable = tnp.broadcast_to( tnp.sum( [child.get_mass() for child in self.children if child.has_fixed_mass], axis=0, ), (n_events, 1), ) max_mass = top_mass - mass_from_stable masses = [] for child in self.children: if child.has_fixed_mass: masses.append(tnp.broadcast_to(child.get_mass(), (n_events, 1))) else: # Recurse that particle to know the minimum mass we need to generate min_mass = tnp.broadcast_to(recurse_stable(child), (n_events, 1)) mass = child.get_mass(min_mass, max_mass, n_events) mass = tnp.reshape(mass, (n_events, 1)) max_mass -= mass masses.append(mass) masses = tnp.concatenate(masses, axis=-1) # if len(masses.shape) == 1: # masses = tnp.expand_dims(masses, axis=0) available_mass = top_mass - tnp.sum(masses, axis=1, keepdims=True) tf.debugging.assert_greater_equal( available_mass, tnp.zeros_like(available_mass, dtype=tnp.float64), message="Forbidden decay", name="mass_check", ) # Calculate the max weight, initial beta, etc w_max = self._get_w_max(available_mass, masses) p_top_boost = kin.boost_components(p_top) # Start the generation random_numbers = rng.uniform((n_events, n_particles - 2), dtype=tnp.float64) random = tnp.concatenate( [ tnp.zeros((n_events, 1), dtype=tnp.float64), tnp.sort(random_numbers, axis=1), tnp.ones((n_events, 1), dtype=tnp.float64), ], axis=1, ) if random.shape[1] is None: random.set_shape((None, n_particles)) # random = tnp.expand_dims(random, axis=-1) sum_ = tnp.zeros((n_events, 1), dtype=tnp.float64) inv_masses = [] # TODO(Mayou36): rewrite with cumsum? for i in range(n_particles): sum_ += tnp.take(masses, [i], axis=1) inv_masses.append(tnp.take(random, [i], axis=1) * available_mass + sum_) generated_particles, weights = self._generate_part2( inv_masses, masses, n_events, n_particles, rng=rng ) # Final boost of all particles generated_particles = [ kin.lorentz_boost(part, p_top_boost) for part in generated_particles ] return ( tnp.reshape(weights, (n_events,)), tnp.reshape(w_max, (n_events,)), generated_particles, masses, ) @staticmethod @function def _generate_part2(inv_masses, masses, n_events, n_particles, rng): pds = [] # Calculate weights of the events for i in range(n_particles - 1): pds.append( pdk( inv_masses[i + 1], inv_masses[i], tnp.take(masses, [i + 1], axis=1), ) ) weights = tnp.prod(pds, axis=0) zero_component = tnp.zeros_like(pds[0], dtype=tnp.float64) generated_particles = [ tnp.concatenate( [ zero_component, pds[0], zero_component, tnp.sqrt( tnp.square(pds[0]) + tnp.square(tnp.take(masses, [0], axis=1)) ), ], axis=1, ) ] part_num = 1 while True: generated_particles.append( tnp.concatenate( [ zero_component, -pds[part_num - 1], zero_component, tnp.sqrt( tnp.square(pds[part_num - 1]) + tnp.square(tnp.take(masses, [part_num], axis=1)) ), ], axis=1, ) ) cos_z = tnp.asarray(2.0, dtype=tnp.float64) * rng.uniform( (n_events, 1), dtype=tnp.float64 ) - tnp.asarray(1.0, dtype=tnp.float64) sin_z = tnp.sqrt(tnp.asarray(1.0, dtype=tnp.float64) - cos_z * cos_z) ang_y = ( tnp.asarray(2.0, dtype=tnp.float64) * tnp.asarray(pi, dtype=tnp.float64) * rng.uniform((n_events, 1), dtype=tnp.float64) ) cos_y = tnp.cos(ang_y) sin_y = tnp.sin(ang_y) # Do the rotations for j in range(part_num + 1): px = kin.x_component(generated_particles[j]) py = kin.y_component(generated_particles[j]) # Rotate about z # TODO(Mayou36): only list? will be overwritten below anyway, but can `*_component` handle it? generated_particles[j] = tnp.concatenate( [ cos_z * px - sin_z * py, sin_z * px + cos_z * py, kin.z_component(generated_particles[j]), kin.time_component(generated_particles[j]), ], axis=1, ) # Rotate about y px = kin.x_component(generated_particles[j]) pz = kin.z_component(generated_particles[j]) generated_particles[j] = tnp.concatenate( [ cos_y * px - sin_y * pz, kin.y_component(generated_particles[j]), sin_y * px + cos_y * pz, kin.time_component(generated_particles[j]), ], axis=1, ) if part_num == (n_particles - 1): break betas = pds[part_num] / tnp.sqrt( tnp.square(pds[part_num]) + tnp.square(inv_masses[part_num]) ) generated_particles = [ kin.lorentz_boost( part, tnp.concatenate([zero_component, betas, zero_component], axis=1), ) for part in generated_particles ] part_num += 1 return generated_particles, weights @function def _recursive_generate( self, n_events, boost_to=None, recalculate_max_weights=False, rng: SeedLike = None, ): """Recursively generate normalized n-body phase space as tensorflow tensors. Events are generated in the rest frame of the particle, unless ``boost_to`` is given. Notes: In this method, the event weights are returned normalized to their maximum. Args: n_events (int): Number of events to generate. boost_to (tensor, optional): Momentum vector of shape ``(x, 4)``, where x is optional, to where the resulting events will be boosted. If not specified, events are generated in the rest frame of the particle. recalculate_max_weights (bool, optional): Recalculate the maximum weight of the event using all the particles of the tree? This is necessary for the top particle of a decay, otherwise the maximum weight calculation is going to be wrong (particles from subdecays would not be taken into account). Defaults to False. rng (``SeedLike``): The seed can be a number or a ``tf.random.Generator`` that are used as a seed to create a random number generator inside the function or directly as the random number generator instance, respectively. Returns: tuple: Result of the generation (per-event weights, maximum weights, output particles and their output masses). Raises: tf.errors.InvalidArgumentError: If the decay is kinematically forbidden. ValueError: If ``n_events`` and the size of ``boost_to`` don't match. See ``GenParticle.generate_unnormalized``. """ if boost_to is not None: momentum = boost_to else: if self.has_fixed_mass: momentum = tnp.broadcast_to( tnp.stack((0.0, 0.0, 0.0, self.get_mass()[0]), axis=-1), (n_events, 4), ) else: raise ValueError("Cannot use resonance as top particle") weights, weights_max, parts, children_masses = self._generate( momentum, n_events, rng=rng ) output_particles = { child.name: parts[child_num] for child_num, child in enumerate(self.children) } output_masses = { child.name: tnp.take(children_masses, [child_num], axis=1) for child_num, child in enumerate(self.children) } for child_num, child in enumerate(self.children): if child.has_children: ( child_weights, _, child_gen_particles, child_masses, ) = child._recursive_generate( n_events=n_events, boost_to=parts[child_num], recalculate_max_weights=False, rng=rng, ) weights *= child_weights output_particles.update(child_gen_particles) output_masses.update(child_masses) if recalculate_max_weights: def build_mass_tree(particle, leaf): if particle.has_children: leaf[particle.name] = {} for child in particle.children: build_mass_tree(child, leaf[particle.name]) else: leaf[particle.name] = output_masses[particle.name] def get_flattened_values(dict_): output = [] for val in dict_.values(): if isinstance(val, dict): output.extend(get_flattened_values(val)) else: output.append(val) return output def recurse_w_max(parent_mass, current_mass_tree): available_mass = parent_mass - sum( get_flattened_values(current_mass_tree) ) masses = [] w_max = tnp.ones_like(available_mass) for child, child_mass in current_mass_tree.items(): if isinstance(child_mass, dict): w_max *= recurse_w_max( parent_mass - sum( get_flattened_values( { ch_it: ch_m_it for ch_it, ch_m_it in current_mass_tree.items() if ch_it != child } ) ), child_mass, ) masses.append(sum(get_flattened_values(child_mass))) else: masses.append(child_mass) masses = tnp.concatenate(masses, axis=1) w_max *= self._get_w_max(available_mass, masses) return w_max mass_tree = {} build_mass_tree(self, mass_tree) momentum = process_list_to_tensor(momentum) if len(momentum.shape) == 1: momentum = tnp.expand_dims(momentum, axis=-1) weights_max = tnp.reshape( recurse_w_max(kin.mass(momentum), mass_tree[self.name]), (n_events,) ) return weights, weights_max, output_particles, output_masses
[docs] def generate( self, n_events: int | tf.Tensor | tf.Variable, boost_to: tf.Tensor | vector.Momentum | None = None, normalize_weights: bool = True, seed: SeedLike = None, *, as_vectors: bool | None = None, ) -> tuple[tf.Tensor, dict[str, tf.Tensor]]: """Generate normalized n-body phase space as tensorflow tensors. Any TensorFlow tensor can always be converted to a numpy array with the method ``numpy()``. Events are generated in the rest frame of the particle, unless ``boost_to`` is given. Notes: In this method, the event weights are returned normalized to their maximum. Args: n_events (int): Number of events to generate. boost_to (optional): Momentum vector of shape ``(x, 4)``, where x is optional, to where the resulting events will be boosted in the (px, py, pz, E) format. Can also be a ``vector`` momentum Lorentz vector. If not specified, events are generated in the rest frame of the particle. normalize_weights (bool, optional): Normalize the event weight to its max? seed (``SeedLike``): The seed can be a number or a ``tf.random.Generator`` that are used as a seed to create a random number generator inside the function or directly as the random number generator instance, respectively. as_vectors (bool, optional): If True, the output momenta are returned as ``vector`` objects. Returns: tuple: Result of the generation, which varies with the value of ``normalize_weights``: - If True, the tuple elements are the normalized event weights as a tensor of shape ``(n_events,)``, and the momenta generated particles as a dictionary of tensors of shape ``(4, n_events)`` with particle names as keys. - If False, the tuple weights are the unnormalized event weights as a tensor of shape ``(n_events,)``, the maximum per-event weights as a tensor of shape ``(n_events,)`` and the momenta generated particles as a dictionary of tensors of shape ``(4, n_events)`` with particle names as keys. Raises: tf.errors.InvalidArgumentError: If the the decay is kinematically forbidden. ValueError: If ``n_events`` and the size of ``boost_to`` don't match. See ``GenParticle.generate_unnormalized``. """ rng = get_rng(seed) if boost_to is not None: try: import vector except ImportError as error: _raise_missing_vector_package(error) if isinstance(boost_to, vector.Vector): if not ( isinstance(boost_to, vector.Momentum) and isinstance(boost_to, vector.Lorentz) ): raise ValueError("boost_to has to be a momentum Lorentz vector.") try: boost_to = boost_to.to_pxpypzenergy() except Exception as error: raise ValueError( "boost_to has to be a momentum Lorentz vector, failed to convert to pxpypzenergy." ) from error boost_to = tnp.stack( [boost_to.px, boost_to.py, boost_to.pz, boost_to.energy], axis=-1 ) message = ( f"The number of events requested ({n_events}) doesn't match the boost_to input size " f"of {boost_to.shape}" ) if n_events is not None: if boost_to.shape[0] not in (n_events, 1): raise ValueError(message) if not isinstance(n_events, tf.Variable): n_events = tnp.asarray(n_events, dtype=tnp.int64) weights, weights_max, parts, _ = self._recursive_generate( n_events=n_events, boost_to=boost_to, recalculate_max_weights=self.has_grandchildren, rng=rng, ) parts = to_vectors(parts) if as_vectors else parts return ( (weights / weights_max, parts) if normalize_weights else (weights, weights_max, parts) )
[docs] def generate_tensor( self, n_events: int, boost_to=None, normalize_weights: bool = True, ): """Generate normalized n-body phase space as numpy arrays. Events are generated in the rest frame of the particle, unless ``boost_to`` is given. Notes: In this method, the event weights are returned normalized to their maximum. Args: n_events (int): Number of events to generate. boost_to (optional): Momentum vector of shape ``(x, 4)``, where x is optional, to where the resulting events will be boosted. If not specified, events are generated in the rest frame of the particle. normalize_weights (bool, optional): Normalize the event weight to its max Returns: tuple: Result of the generation, which varies with the value of ``normalize_weights``: - If True, the tuple elements are the normalized event weights as an array of shape ``(n_events,)``, and the momenta generated particles as a dictionary of arrays of shape ``(4, n_events)`` with particle names as keys. - If False, the tuple weights are the unnormalized event weights as an array of shape ``(n_events,)``, the maximum per-event weights as an array of shape ``(n_events,)`` and the momenta generated particles as a dictionary of arrays of shape ``(4, n_events)`` with particle names as keys. Raises: tf.errors.InvalidArgumentError: If the the decay is kinematically forbidden. ValueError: If ``n_events`` and the size of ``boost_to`` don't match. See ``GenParticle.generate_unnormalized``. """ # Run generation raise RuntimeError( "This function is removed. Use `generate` which does not return a Tensor as well." )
# legacy class to warn user about name change
[docs] class Particle: """Deprecated Particle class. Renamed to GenParticle. """ def __init__(self): raise NameError( "'Particle' has been renamed to 'GenParticle'. Please update your code accordingly." "For more information, see: https://github.com/zfit/phasespace/issues/22" )
[docs] def nbody_decay(mass_top: float, masses: list, top_name: str = "", names: list = None): """Shortcut to build an n-body decay of a GenParticle. If the particle names are not given, the top particle is called 'top' and the children 'p_{i}', where i corresponds to their position in the ``masses`` sequence. Args: mass_top (tensor, list): Mass of the top particle. Can be a list of 4-vectors. masses (list): Masses of the child particles. top_name (str, optional): Name of the top particle. If not given, the top particle is named top. names (list, optional): Names of the child particles. If not given, they are build as 'p_{i}', where i is given by their ordering in the ``masses`` list. Returns: ``GenParticle``: Particle decay. Raises: ValueError: If the length of ``masses`` and ``names`` doesn't match. """ if not top_name: top_name = "top" if not names: names = [f"p_{num}" for num in range(len(masses))] if len(names) != len(masses): raise ValueError("Mismatch in length between children masses and their names.") return GenParticle(top_name, mass=mass_top).set_children( *(GenParticle(names[num], mass=mass) for num, mass in enumerate(masses)) )
[docs] def generate_decay(*args, **kwargs): """Deprecated.""" raise NameError( "'generate_decay' has been removed. A similar behavior can be accomplished with 'nbody_decay'. " "For more information see https://github.com/zfit/phasespace/issues/22" )
[docs] def to_vectors(particles: dict[str, tf.Tensor]) -> dict[str, vector.Momentum]: """Convert a dictionary of particles to a dictionary of ``vector.Momentum`` instances. Args: particles (dict): Dictionary of particles, with the keys being the particle names and the values being the momenta. Returns: dict: Dictionary of ``vector.Momentum`` instances with numpy arrays """ try: import vector except ImportError as error: _raise_missing_vector_package(error) newparticles = {} for name, particle in particles.items(): px, py, pz, e = np.moveaxis(particle, -1, 0) # numpy "unstack" newparticles[name] = vector.array(dict(px=px, py=py, pz=pz, energy=e)) return newparticles
def _raise_missing_vector_package(exception: ImportError) -> NoReturn: raise ImportError( "To use `boost_to`, the `vector` package has to be installed." ) from exception