"""
This module contains the advanced simulation method and related resources.
"""
from __future__ import annotations
import logging
from typing import Dict, Any, Union, Optional, List, Tuple
from functools import lru_cache
from concurrent.futures.thread import ThreadPoolExecutor
import numpy as np
from scipy.integrate import solve_ivp
import numba
from .. import xs
from .. import plasma
from ._advanced_helpers import (
Device,
BackgroundGas,
AdvancedModel,
ModelOptions,
DEFAULT_MODEL_OPTIONS
)
from ..utils import validate_namedtuple_field_types
from ..elements import Element
from ..physconst import Q_E, M_P, PI
from ..physconst import MINIMAL_N_1D, MINIMAL_KBT
from ._result import AdvancedResult, Rate
from ._radial_dist import (
boltzmann_radial_potential_linear_density_ebeam,
# heat_capacity
)
logger = logging.getLogger(__name__)
@numba.njit(cache=True)
def _cubic_spline(x, x1, x2, y1, y2, k1, k2):
t = (x-x1)/(x2-x1)
a = k1*(x2-x1) - (y2-y1)
b = -k2*(x2-x1) + (y2-y1)
q = (1-t) * y1 + t*y2 + t*(1-t)*((1-t)*a+t*b)
return q
@numba.njit(cache=True)
def _smooth_to_zero(x):
N1 = MINIMAL_N_1D
N2 = 1000*N1
x = x.copy()
x[x < N1] = 0
fil = np.logical_and(N1 < x, x < N2)
x[fil] = _cubic_spline(x[fil], N1, N2, 0., N2, 0., 1.)
return x
# @numba.njit(cache=True)
# def _smooth_temp_rate_to_zero(n, dkT):
# N1 = MINIMAL_N_1D/100
# N2 = MINIMAL_N_1D
# dkT = dkT.copy()
# dkT[n < N1] = 0
# fil = np.logical_and(N1 < n, n < N2)
# dkT[fil] = dkT[fil] * _cubic_spline(n[fil], N1, N2, 0., 1., 0., 1.)
# return dkT
@numba.njit(cache=True, nogil=True)
def _chunked_adv_rhs(model, t, y):
yf = np.asfortranarray(y)
ret = np.zeros_like(yf)
for k in range(yf.shape[1]):
ret[:, k] = _adv_rhs(model, t, yf[:, k], None)
return ret
@numba.njit(cache=True, nogil=True)
def _adv_rhs(model, _t, y, rates=None):
"""
The right hand side of the differential equation set.
Parameters
----------
_t : float
<s> Time, currently no effect.
y : numpy.ndarray
<1/m^3> and <eV>
Joint array of ion densities and temperatures.
Array must have the following structure:
z+1 elements holding the density for each Target in self.targets (same order)
followed by
z+1 elements holding the temperature for each Target in self.targets (same order)
rates: numba.typed.Dict[numpy.ndarray], optional
If a dictionary object is passed into rates it will be populated with the arrays
holding the reaction rates.
Returns
-------
numpy.ndarray
dy/dt
"""
# Split y into useful parts
n = y[:model.nq]
kT = y[model.nq:]
# Clip low values?
n_r = n[:]
n = _smooth_to_zero(n)
kT = np.maximum(kT, MINIMAL_KBT)
# Transposed helperarrays
q_T = np.atleast_2d(model.q).T
a_T = np.atleast_2d(model.a).T
n_T = np.atleast_2d(n).T
kT_T = np.atleast_2d(kT).T
# Preallocate output arrays
dn = np.zeros_like(n)
dkT = np.zeros_like(kT)
# Radial dynamics
if model.options.RADIAL_DYNAMICS:
# Solve radial problem
phi, _n3d, _shapes = boltzmann_radial_potential_linear_density_ebeam(
model.device.rad_grid, model.device.current, model.device.r_e, model.device.e_kin,
n_T, kT_T, q_T,
ldu=(model.device.rad_fd_l, model.device.rad_fd_d, model.device.rad_fd_u),
max_step=model.options.RADIAL_SOLVER_MAX_STEPS,
rel_diff=model.options.RADIAL_SOLVER_REL_DIFF,
)
else:
phi = model.device.rad_phi_uncomp
ix = model.device.rad_re_idx
r = model.device.rad_grid
# Boltzmann distribution shape functions
shapes = np.exp(-q_T * (phi - phi.min())/kT_T) # Works for neutrals
# Radial integrals
i_rs_re = np.trapz(shapes[:, :ix+1] * r[:ix+1], r[:ix+1])
i_rsp_re = np.trapz(shapes[:, :ix+1] * r[:ix+1] * (phi[:ix+1]-phi.min()), r[:ix+1])
i_rs_rd = np.trapz(shapes * r, r)
i_rrs_rd = np.trapz(shapes * r * r, r)
# On axis 3d density
n3d = n_T / 2 / PI / np.atleast_2d(i_rs_rd).T * np.atleast_2d(shapes[:, 0]).T
n3d = n3d.T[0] # Adjust shape
# Compute overlap factors
ion_rad = i_rrs_rd / i_rs_rd
fei = i_rs_re/i_rs_rd
fij = (ion_rad/np.atleast_2d(ion_rad).T)**2
fij = np.minimum(fij, 1.0)
# Compute effective trapping voltages
v_ax = (model.device.v_ax + model.device.v_ax_sc) - phi.min()
v_ra = -phi.min()
# Characteristic beam energies
_sc_mean = 2*np.trapz(r[:ix+1]*phi[:ix+1], r[:ix+1])/model.device.r_e**2
e_kin = model.device.e_kin + _sc_mean
if not model.options.OVERRIDE_FWHM:
e_kin_fwhm = 2.355*np.sqrt(
2*np.trapz(r[:ix+1]*(phi[:ix+1]-_sc_mean)**2, r[:ix+1])/model.device.r_e**2
)
else:
e_kin_fwhm = model.device.fwhm
# Ionisation heating (mean)
if model.options.IONISATION_HEATING:
# iheat = 2/3*(2 / self.device.r_e**2 * i_rsp_re)
iheat = 2/3 * i_rsp_re / i_rs_re
else:
iheat = np.zeros(model.nq)
# Compute some electron beam quantities
je = model.device.j / Q_E * 1e4 # electron number current density
ve = plasma.electron_velocity(e_kin)
ne = je/ve # Electron number density
# Collision rates
rij = plasma.ion_coll_rate(
np.atleast_2d(n3d).T, n3d,
kT_T, kT,
a_T, model.a,
q_T, model.q
)
ri = np.sum(rij, axis=-1)
# Thermal ion velocities
v_th = np.sqrt(8 * Q_E * kT/(PI * model.a * M_P)) # Thermal velocities
# update cross sections?
if model.options.RECOMPUTE_CROSS_SECTIONS:
eixs = np.zeros(model.nq)
rrxs = np.zeros(model.nq)
drxs = np.zeros(model.nq)
for i, trgt in enumerate(model.targets):
eixs[model.lb[i]:model.ub[i]] = xs.eixs_vec(trgt, e_kin)
rrxs[model.lb[i]:model.ub[i]] = xs.rrxs_vec(trgt, e_kin)
drxs[model.lb[i]:model.ub[i]] = xs.drxs_vec(trgt, e_kin, e_kin_fwhm)
else:
eixs = model.eixs
rrxs = model.rrxs
drxs = model.drxs
# EI
if model.options.EI:
R_ei = eixs * n * je * fei
dn -= R_ei
dn[1:] += R_ei[:-1]
dkT[1:] += R_ei[:-1] / n_r[1:] * (kT[:-1] - kT[1:])
dkT[1:] += R_ei[:-1] / n_r[1:] * iheat[:-1]
# RR
if model.options.RR:
R_rr = rrxs * n * je * fei
dn -= R_rr
dn[:-1] += R_rr[1:]
dkT[:-1] += R_rr[1:] / n_r[:-1] * (kT[1:] - kT[:-1])
dkT[:-1] -= R_rr[1:] / n_r[:-1] * iheat[1:]
# DR
if model.options.DR:
R_dr = drxs * n * je * fei
dn -= R_dr
dn[:-1] += R_dr[1:]
dkT[:-1] += R_dr[1:] / n_r[:-1] * (kT[1:] - kT[:-1])
dkT[:-1] -= R_dr[1:] / n_r[:-1] * iheat[1:]
# CX
if model.options.CX:
R_cx = np.zeros_like(n)
for g, gas in enumerate(model.bg_gases):
R_cx += model.cxxs_bggas[g] * gas.n0 * n * v_th
for j, jtrgt in enumerate(model.targets):
if jtrgt.cx: # Only compute cx with target gas if wished by user
R_cx += model.cxxs_trgts[j] * n3d[model.lb[j]] * n * v_th
dn -= R_cx
dn[:-1] += R_cx[1:]
dkT[:-1] += R_cx[1:] / n_r[:-1] * (kT[1:] - kT[:-1])
dkT[:-1] -= R_cx[1:] / n_r[:-1] * iheat[1:]
# Electron heating / Spitzer heating
if model.options.SPITZER_HEATING:
_dkT_eh = plasma.spitzer_heating(n3d, ne, kT, e_kin, model.a, model.q) * fei
dkT += _dkT_eh
# Ion-ion heat transfer (collisional thermalisation)
if model.options.COLLISIONAL_THERMALISATION:
_dkT_ct = np.sum(
fij*plasma.collisional_thermalisation(
kT_T, kT, a_T, model.a, rij
), axis=-1
)
dkT += _dkT_ct
# Axial escape
if model.options.ESCAPE_AXIAL:
w_ax = plasma.trapping_strength_axial(kT, model.q, v_ax)
R_ax_co = plasma.collisional_escape_rate(ri, w_ax) * n
R_ax_co = np.maximum(R_ax_co, 0.0)
for k in model.lb:
R_ax_co[k] = 0
dn -= R_ax_co
dkT -= 2/3*plasma.collisional_escape_rate(ri, w_ax) * w_ax * kT
# Radial escape
if model.options.ESCAPE_RADIAL:
w_ra = plasma.trapping_strength_radial(
kT, model.q, model.a, v_ra, model.device.b_ax, model.device.r_dt
)
R_ra_co = plasma.collisional_escape_rate(ri, w_ra) * n
R_ra_co = np.maximum(R_ra_co, 0.0)
for k in model.lb:
R_ra_co[k] = 0
dn -= R_ra_co
dkT -= 2/3*plasma.collisional_escape_rate(ri, w_ra) * w_ra * kT
# Kill all neutral rates - seems to improve stability - assumes nonchanging background gas
for k in model.lb:
dn[k] = 0.0
dkT[k] = 0.0
if rates is not None:
if model.options.EI:
rates[Rate.EI] = R_ei
# rates[Rate.T_EI] = R_ei * kT/n
if model.options.RR:
rates[Rate.RR] = R_rr
# rates[Rate.T_RR] = R_rr * kT/n
if model.options.CX:
rates[Rate.CX] = R_cx
# rates[Rate.T_CX] = R_cx * kT/n
if model.options.DR:
rates[Rate.DR] = R_dr
# rates[Rate.T_DR] = R_dr * kT/n
if model.options.ESCAPE_AXIAL:
rates[Rate.W_AX] = w_ax
rates[Rate.AX_CO] = R_ax_co
rates[Rate.T_AX_CO] = R_ax_co * (kT + w_ax * kT)/n_r
if model.options.ESCAPE_RADIAL:
rates[Rate.W_RA] = w_ra
rates[Rate.RA_CO] = R_ra_co
rates[Rate.T_RA_CO] = R_ra_co * (kT + w_ra * kT)/n_r
if model.options.COLLISIONAL_THERMALISATION:
rates[Rate.T_COLLISIONAL_THERMALISATION] = _dkT_ct
if model.options.SPITZER_HEATING:
rates[Rate.T_SPITZER_HEATING] = _dkT_eh
if model.options.IONISATION_HEATING:
rates[Rate.IONISATION_HEAT] = iheat
# rates[Rate.CV] = heat_capacity(self.device.rad_grid, phi, q_T, kT_T).T[0]
# rates[Rate.CHARGE_COMPENSATION] = comp
rates[Rate.F_EI] = fei
rates[Rate.E_KIN_MEAN] = np.atleast_1d(np.array(e_kin))
rates[Rate.E_KIN_FWHM] = np.atleast_1d(np.array(e_kin_fwhm))
rates[Rate.V_RA] = np.atleast_1d(np.array(v_ra))
rates[Rate.V_AX] = np.atleast_1d(np.array(v_ax))
rates[Rate.COLLISION_RATE_SELF] = np.diag(rij).copy()
rates[Rate.COLLISION_RATE_TOTAL] = ri
return np.concatenate((dn, dkT))
[docs]def advanced_simulation(device: Device, targets: Union[Element, List[Element]], t_max: float,
bg_gases: Union[BackgroundGas, List[BackgroundGas], None] = None,
options: Optional[ModelOptions] = None, rates: bool = False,
solver_kwargs: Optional[Dict[str, Any]] = None,
verbose: bool = True, n_threads: int = 1
) -> Union[AdvancedResult, Tuple[AdvancedResult, ...]]:
"""
Interface for performing advanced charge breeding simulations.
For a list of effects refer to `ebisim.simulation.ModelOptions`.
Parameters
----------
device :
Container describing the EBIS/T and specifically the electron beam.
targets :
Target(s) for which charge breeding is simulated.
t_max :
<s>
Simulated breeding time
bg_gases :
Background gas(es) which act as CX partners.
rates :
If true a 'second run' is performed to store the rates, this takes extra time and can
create quite a bit of data.
options :
Switches for effects considered in the simulation, see default values of
ebisim.simulation.ModelOptions.
solver_kwargs :
If supplied these keyword arguments are unpacked in the solver call.
Refer to the documentation of scipy.integrate.solve_ivp for more information.
By default None.
verbose :
Print a little progress indicator and some status messages, by default True.
n_threads :
How many threads to use (mostly for jacbion estimation which can evaluate the RHS
in parallel with different inputs.)
Returns
-------
An instance of the AdvancedResult class, holding the simulation parameters, timesteps and
charge state distribution including the species temperature.
"""
logger.info("Preparing advanced simulation.")
logger.info(f"device = {device}.")
logger.info(f"targets = {targets}.")
logger.info(f"t_max = {t_max}.")
logger.info(f"bg_gases = {bg_gases}.")
logger.info(f"options = {options}.")
logger.info(f"rates = {rates}.")
logger.info(f"solver_kwargs = {solver_kwargs}.")
logger.info(f"verbose = {verbose}.")
# ----- Pretreat arguments
targets = [targets] if not isinstance(targets, list) else targets
bg_gases = bg_gases or []
bg_gases = [bg_gases] if not isinstance(bg_gases, list) else bg_gases
options = options or DEFAULT_MODEL_OPTIONS
solver_kwargs = solver_kwargs or {}
solver_kwargs.setdefault("method", "Radau")
# ----- Generate AdvancedModel
logger.debug("Initialising AdvancedModel object.")
model = AdvancedModel.get(device, targets, bg_gases, options)
# ----- Validate Tuple field types
for i, tg in enumerate(targets):
if not validate_namedtuple_field_types(tg):
logger.warning(f"Unable to verify the types of Target #{i}: {tg!s}.")
for i, bg in enumerate(bg_gases):
if not validate_namedtuple_field_types(bg):
logger.warning(f"Unable to verify the types of BgGas #{i}: {bg!s}.")
if not validate_namedtuple_field_types(device):
logger.warning(f"Unable to verify the types of {device!s}.")
if not validate_namedtuple_field_types(options):
logger.warning(f"Unable to verify the types of {options!s}.")
if not validate_namedtuple_field_types(model):
logger.warning(f"Unable to verify the types of {model!s}.")
# ----- Generate Initial conditions
n_kT_initial = _assemble_initial_conditions(model)
# ----- Generate Callable
if n_threads < 2:
executor = None
def rhs_int(t, y):
return _chunked_adv_rhs(model, t, y)
else:
executor = ThreadPoolExecutor()
def rhs_int(t, y):
f = lambda ix: _chunked_adv_rhs(model, t, y[:, ix[0]:ix[1]]) # noqa: E731
ix = _multithreading_indices(y.shape[1], n_threads)
res = list(executor.map(f, ix))
return np.concatenate(res, axis=-1)
# ----- Run simulation
if verbose:
rhs_int = _IntegrationProgressWrapper(rhs_int)
logger.debug("Starting integration.")
res = solve_ivp(
rhs_int, (0, t_max), n_kT_initial, vectorized=True, **solver_kwargs
)
if isinstance(rhs_int, _IntegrationProgressWrapper):
rhs_int.finalize_integration_report(res)
# ----- Extract rates if demanded
ratebuffer = _gather_rates(model, res, verbose) if rates else None
# ----- Result assembly
if executor is not None:
executor.shutdown()
return _assemble_results(model, res, ratebuffer)
class _IntegrationProgressWrapper:
def __init__(self, rhs):
self.rhs = rhs
self.k = 0
self.k_last_emit = 0
def __call__(self, t: float, y: np.ndarray):
self._log_progress(t, y)
return self.rhs(t, y)
def _log_progress(self, t: float, y: np.ndarray) -> None:
inc = 1 if len(y.shape) == 1 else y.shape[1]
self.k = self.k + inc
if self.k - self.k_last_emit > 99:
print(f"Integration: {self.k} calls, t = {t:.4e} s", end="\r")
self.k_last_emit = self.k
def finalize_integration_report(self, res: Any) -> None:
"""
Call after integration finishes to finalize the progress display
Parameters
----------
res :
The solve_ivp result object
"""
print("\rIntegration finished:", self.k, "calls ")
print(res.message)
print(f"Calls: {self.k} of which ~{res.nfev} normal ({res.nfev/self.k:.2%}) and "
+ f"~{res.y.shape[0]*res.njev} for jacobian approximation "
+ f"({res.y.shape[0]*res.njev/self.k:.2%})")
class _RatesProgressWrapper:
def __init__(self, rhs, steps):
self.rhs = rhs
self.k = 0
self.steps = steps
def __call__(self, model: AdvancedModel, t: float, y: np.ndarray, rates: Any) -> np.ndarray:
self._log_progress()
return self.rhs(model, t, y, rates)
def _log_progress(self) -> None:
self.k += 1
if not self.k % 100:
print(f"Rates: {self.k} / {self.steps}", end="\r")
def finalize_rate_report(self) -> None:
"""
Call after rate extraction finishes to finalize the rate progress display
"""
print("Rates finished:", self.steps, "rates")
@lru_cache(maxsize=None)
def _multithreading_indices(n_cols: int, n_threads: int) -> Tuple[Tuple[int, int], ...]:
"""
Computes a balanced distribution of column indices for the multithreaded
vectorised version of the RHS function
Parameters
----------
n_cols :
Number of columns to distribute
n_threads :
Max number of available threads
Returns
-------
Tuple of start / stop index pairs
"""
cl = n_threads * [n_cols//n_threads, ]
for _k in range(n_threads):
if _k < (n_cols % n_threads):
cl[_k] += 1
indices = []
for _k in range(n_threads):
if cl[_k] > 0:
indices.append((sum(cl[:_k]), sum(cl[:_k+1])))
return tuple(indices)
def _assemble_initial_conditions(model: AdvancedModel) -> np.ndarray:
_kT0 = []
for t in model.targets: # Make sure that initial temperature is not unreasonably small
if t.kT is None or t.n is None:
raise ValueError(f"{t!s} does not provide initial conditions (n, kT).")
kT = t.kT.copy()
# I tried to reduce the value of minkT and it caused crashes, I have no solid
# argument for a value here, but it is obvius that the simulation stability is very
# sensitive to the temperature/radial well ratio. This must be normalisation issues
# since it presents itself as np.nan or np.inf being produced during the solution
# of the rate equations
minkT = np.maximum(model.device.fwhm * np.arange(t.z+1), MINIMAL_KBT)
filter_ = t.n < 1.00001 * MINIMAL_N_1D
kT[filter_] = np.maximum(kT[filter_], minkT[filter_])
if np.not_equal(kT, t.kT).any():
logger.warning(
f"Initial temperature vector adjusted for {t!s}. "
+ "This only affects charge states with densities at the minimum limit."
)
_kT0.append(kT)
_n0 = np.concatenate([t.n for t in model.targets])
_kT0 = np.concatenate(_kT0)
return np.concatenate([_n0, _kT0])
def _gather_rates(model, res, verbose):
logger.debug("Assembling rate arrays.")
# Recompute rates for final solution (this cannot be done parasitically due to
# the solver approximating the jacobian and calling rhs with bogus values).
nt = res.t.size
rhs = _RatesProgressWrapper(_adv_rhs, nt) if verbose else _adv_rhs
# Poll once to get the available rates
extractor = numba.typed.Dict.empty(
key_type=numba.typeof(Rate.EI),
value_type=numba.types.float64[::1]
)
_ = rhs(model, res.t[0], res.y[:, 0], extractor)
ratebuffer = {}
for k in extractor:
if len(extractor[k].shape) == 1:
ratebuffer[k] = np.zeros((extractor[k].size, nt))
# Poll all steps
for idx in range(nt):
_ = rhs(model, res.t[idx], res.y[:, idx], extractor)
for key, val in extractor.items():
if len(val.shape) == 1:
ratebuffer[key][:, idx] = val
if isinstance(rhs, _RatesProgressWrapper):
rhs.finalize_rate_report()
return ratebuffer
def _assemble_results(model, res, ratebuffer):
out = []
for i, trgt in enumerate(model.targets):
logger.debug(f"Assembling result of target #{i}.")
irates = {}
if ratebuffer is not None:
for key in ratebuffer.keys():
_ir = ratebuffer[key]
if _ir.shape[0] != 1:
irates[key] = _ir[model.lb[i]:model.ub[i]] # Per CS
else:
irates[key] = _ir # scalar
out.append(
AdvancedResult(
t=res.t,
N=res.y[model.lb[i]:model.ub[i]],
kbT=res.y[model.nq + model.lb[i]:model.nq + model.ub[i]],
res=res,
target=trgt,
device=model.device,
rates=irates or None,
model=model,
id_=i
)
)
if len(out) == 1:
return out[0]
return tuple(out)