Source code for galport.averager

"""
=============================================
averager: Averaging Functions for Time Series
=============================================

This module provides tools for computing averaged quantities from
time series data, with a focus on action-angle variables in galactic
dynamics. It implements mean-preserving spline interpolation and
extrema-based averaging methods.

Main Functions
--------------
``value(t, x, **kwargs)``
    Average a time series between its extrema points.
    Supports different averaging types and can return
    minima, maxima, frequency, and phase angle.

``action(t, xv, act=None, **kwargs)``
    Calculate averaged action-angle variables from orbital trajectories.
    Can compute secular variations, frequency derivatives, and
    bar-specific quantities.

Helper Functions
----------------
``find_peaks_with_limitations(t, x, **kwargs)``
    Find peaks in a time series with optional filtering
    to remove spurious peaks based on frequency and amplitude criteria.
"""

import numpy as np
from .mpspline import MeanPreservingInterpolation as MPSpline
from scipy.interpolate import CubicSpline, interp1d
from scipy.signal import find_peaks
from typing import Optional


__all__ = [
    'value',
    'action',
    'find_peaks_with_limitations'
]

def _create_splines(t, xv, act, spline_expansion):
    """Create high-resolution spline interpolations."""
    x = xv[:, 0]
    y = xv[:, 1]
    z = xv[:, 2]
    vx = xv[:, 3]
    vy = xv[:, 4]
    vz = xv[:, 5]
    R = np.hypot(x, y)
    vR = (x*vx + y*vy) / R
    Lz = (x*vy - y*vx) if act is None else act[:, 2]

    sin_dphi = (x[:-1]*y[1:] - x[1:]*y[:-1]) / R[:-1] / R[1:]
    cos_dphi = (x[:-1]*x[1:] + y[1:]*y[:-1]) / R[:-1] / R[1:]

    dphi = np.arctan2(sin_dphi, cos_dphi)
    phi = np.zeros(len(t))
    phi[0] = np.arctan2(y[0], x[0])
    phi[1:] = np.cumsum(dphi) + phi[0]

    t_spline = np.linspace(t[0], t[-1], len(t) * spline_expansion)
    R_spline = CubicSpline(t, R)(t_spline)
    vR_spline = CubicSpline(t, vR)(t_spline)
    z_spline = CubicSpline(t, z)(t_spline)
    vz_spline = CubicSpline(t, vz)(t_spline)
    phi_spline = CubicSpline(t, phi)(t_spline)
    Lz_spline = CubicSpline(t, Lz)(t_spline)

    if act is None:
        JR_spline = None
        Jz_spline = None
    elif np.nan in act:
        JR_spline = None
        Jz_spline = None
    else:
        JR_spline = CubicSpline(t, act[:, 0])(t_spline)
        Jz_spline = CubicSpline(t, act[:, 1])(t_spline)
        
    return t_spline, R_spline, vR_spline, z_spline, vz_spline, phi_spline, \
        JR_spline, Jz_spline, Lz_spline


def _find_tedges_for_mpspline(t, t_extrema, border_type='apocenters'):
    """Find t edges as variables for mean-preserving spline """
    n_extrema = len(t_extrema)
    if n_extrema <= 3:
        return np.r_[t[0], t_extrema, t[-1]]
    elif border_type == 'nbody':
        return np.r_[t[0], t_extrema]
    else:
        return t_extrema


def _find_values_for_mpspline(J, border_type='apocenters'):
    """Find actions for mean preserving spline, depend on border type"""
    n_intervals = len(J)
    if n_intervals < 3:
        return np.r_[J[0], J, J[-1]]
    elif border_type == 'nbody':
        return np.r_[J[0], J]
    else:
        return J


def _calculate_frequency(t_extrema, border_type):
    """Calculate frequencies between extrema points."""
    frequency = np.zeros(len(t_extrema) - 1)
    frequency = 2 * np.pi / np.diff(t_extrema)
    return _find_values_for_mpspline(frequency, border_type)


def _calculate_omega(t_apocenter, n_apocenter, phi, border_type, positive):
    """Calculate mean omega between apocenters."""
    omega = np.zeros(len(t_apocenter) - 1)
    delta_phi = phi[n_apocenter[1:]] - phi[n_apocenter[:-1]]

    delta_phi = np.where(delta_phi > 0, delta_phi, 2*np.pi + delta_phi) \
        if positive else delta_phi
    omega = (delta_phi) / \
            (t_apocenter[1:] - t_apocenter[:-1])

    return _find_values_for_mpspline(omega, border_type)


def _calculate_action_by_integrating(x, vx, extrema_indices, border_type):
    """Calculate actions by integrating between extrema."""
    J_aver = np.zeros(len(extrema_indices) - 1)
    for j in range(len(extrema_indices) - 1):
        n0, n1 = extrema_indices[j], extrema_indices[j+1]
        J_aver[j] = np.trapezoid(vx[n0:n1], x=x[n0:n1]) / (2*np.pi)
    return _find_values_for_mpspline(J_aver, border_type)


def _calculate_average_value(value, extrema_indices, border_type):
    """Calculate actions by averaging input actions between extrema."""
    value_aver = np.zeros(len(extrema_indices)-1)
    for j in range(len(extrema_indices) - 1):
        n0, n1 = extrema_indices[j], extrema_indices[j+1]
        value_aver[j] = np.mean(value[n0:n1])
    return _find_values_for_mpspline(value_aver, border_type)


[docs]def find_peaks_with_limitations( t: np.ndarray, x: np.ndarray, apply_filter: bool = False, freq_ratio_lim: float = 1.4, value_ratio_lim: float = 0.1, minmax: bool = False): """ Find peaks in a time series with optional filtering to remove spurious peaks based on frequency and amplitude criteria. Parameters ---------- t : (N,) numpy array Array of time values. x : (N,) numpy array Array of values to find peaks in. apply_filter : bool, optional Whether to apply filtering conditions to remove spurious peaks. If True, peaks are filtered based on frequency ratio and value ratio criteria. Default: True freq_ratio_lim : float, optional Lower limit on the ratio of frequencies between consecutive peaks. A peak is considered spurious if its instantaneous frequency differs from neighbors by more than this factor. Specifically: ``freq_i > freq_ratio_lim * freq_{i+1} AND freq_i > freq_ratio_lim * freq_{i-1}`` Default: 1.4 value_ratio_lim : float, optional Lower limit on the value ratio for peak filtering. A peak is considered spurious if the amplitude variation between consecutive extrema is below this threshold. Calculated as: ``2 * (|x_max| - |x_min|) / (|x_max| + |x_min|) < value_ratio_lim`` Default: 0.1 minmax : bool, optional If True, also return indices and values of minima. Default: False Returns ------- n_max : (M, ) numpy array 1D integer array of indices where maxima occur. x_max : (M, ) numpy array 1D array of maximum values at the identified peak indices. If minmax=True, additionally returns: n_min : (L, )numpy array (optional) 1D integer array of indices where minima occur. x_min : (L, ) numpy array (optional) 1D array of minimum values at the identified minima indices. """ # Find peaks of x n_max = find_peaks(x)[0] n_min = find_peaks(-x)[0] x_max = x[n_max] x_min = x[n_min] if not apply_filter or len(n_max) < 3: if minmax: return n_max, x[n_max], n_min, x[n_min] else: return n_max, x[n_max] n_extrema = np.sort(np.hstack((n_min, n_max))) t_extrema = t[n_extrema] x_extrema = x[n_extrema] # Find frequency ratio filter max dt = t_extrema[1:] - t_extrema[:-1] freq_ratio_filter_left = (dt[:-1] / dt[1:] > freq_ratio_lim) freq_ratio_filter_right = (dt[1:] / dt[:-1] > freq_ratio_lim) # Find value ratio filter dx = np.abs(np.diff(x_extrema)) x_mean = np.abs(x_extrema[1:]+x_extrema[:-1]) / 2. ratio_filter = (dx / x_mean) < value_ratio_lim # Find bool array of values to be deleted del_filter = np.zeros_like(n_extrema, dtype='bool') # In this case concatenate 3 extrema in 1 conc_one = freq_ratio_filter_left[:-2] & freq_ratio_filter_right[2:] & \ ratio_filter[1:-2] & ratio_filter[2:-1] conc_one[1:][conc_one[:-1]] = 0 del_filter[2:-2] += conc_one del_filter[3:-1] += conc_one n_extrema[1:-3][conc_one] = (n_extrema[1:-3][conc_one] + n_extrema[3:-1][conc_one]) // 2 x_extrema[1:-3][conc_one] = (x_extrema[1:-3][conc_one] + x_extrema[3:-1][conc_one]) / 2. # In this case delete 2 extrema del_interval = \ freq_ratio_filter_left[:-1] & ~freq_ratio_filter_left[1:] & \ freq_ratio_filter_right[1:] & ~freq_ratio_filter_right[:-1] & \ ratio_filter[1:-1] & ~ratio_filter[2:] & ~ratio_filter[:-2] del_filter[1:-2] += del_interval del_filter[2:-1] += del_interval n_extrema = n_extrema[~del_filter] x_extrema = x_extrema[~del_filter] if n_max[0] < n_min[0]: n_max, n_min = n_extrema[::2], n_extrema[1::2] x_max, x_min = x_extrema[::2], x_extrema[1::2] else: n_min, n_max = n_extrema[::2], n_extrema[1::2] x_min, x_max = x_extrema[::2], x_extrema[1::2] return (n_max, x_max, n_min, x_min) if minmax else (n_max, x_max)
def _calculate_dot_action_naive(act, t_extrema): """Calculate dJ/dt """ t_edges_dot = (t_extrema[1:] + t_extrema[:-1])/2 delta_t = (t_extrema[2:] - t_extrema[:-2])/2 dot_action = (act[1:] - act[:-1]) / delta_t return t_edges_dot, dot_action def _calculate_average_action( t, extrema_indices, t_out, border_type, act=None, x=None, vx=None, JR_ilr=False, Lz=None, dJdt=False): """Calculate action""" if len(extrema_indices) < 2: return np.zeros((len(t_out), 2))*np.nan \ if dJdt else np.zeros(len(t_out))*np.nan t_extrema = t[extrema_indices] t_edges = _find_tedges_for_mpspline(t, t_extrema, border_type) # Find mean action between extrema_indices Lz_neg = _calculate_average_value(np.where(Lz < 0, Lz, 0), extrema_indices, border_type) if JR_ilr else 0. if (act is not None): action_average = _calculate_average_value( act, extrema_indices, border_type) - Lz_neg if dJdt: # t_edges_dot, dot_act_average = calculate_dot_action_by_averaging( # t, act, extrema_indices) t_edges_dot, dot_act_average = _calculate_dot_action_naive( action_average, t_extrema) elif (x is not None) or (vx is not None): action_average = _calculate_action_by_integrating( x, vx, extrema_indices, border_type) - Lz_neg if dJdt: t_edges_dot, dot_act_average = _calculate_dot_action_naive( action_average, t_extrema) else: ValueError('Not find action or x and vx') # Smoothing action action_mps = MPSpline(xi=np.delete(t_edges, -2), yi=action_average, x_edges=t_edges, border_type=border_type)(t_out) nanmask = np.ones_like(t_out) add0, add1 = (1, 1) if border_type == 'apocenters' else \ ((0, 1) if border_type == 'nbody' else (0, 0)) nanmask[(t_out < t_edges[0+add0]) | (t_out > t_edges[-1-add1])] = np.nan if not dJdt: return action_mps*nanmask # Smooth time derivative of action dot_action_mps = MPSpline( xi=np.delete(t_edges_dot, -2), yi=dot_act_average, x_edges=t_edges_dot, border_type=border_type)(t_out) return np.column_stack((action_mps*nanmask, dot_action_mps*nanmask)) def _calculate_frequency_and_angle(t, extrema_indices, t_out, border_type, phi=None, phi0=0., positive=True, angle=True): """Calculate frequencies and angles""" if len(extrema_indices) < 2: return np.zeros(len(t_out))*np.nan, np.zeros(len(t_out))*np.nan \ if angle else np.zeros(len(t_out))*np.nan t_extrema = t[extrema_indices] if phi is None: freq_average = _calculate_frequency(t_extrema, border_type) else: freq_average = _calculate_omega(t_extrema, extrema_indices, phi, border_type, positive) phi0 = phi[extrema_indices[0]] t_edges = _find_tedges_for_mpspline(t, t_extrema, border_type) nanmask = np.ones_like(t_out) add0, add1 = (1, 1) if border_type == 'apocenters' else \ ((0, 1) if border_type == 'nbody' else (0, 0)) nanmask[(t_out < t_edges[0+add0]) | (t_out > t_edges[-1-add1])] = np.nan if angle: frequency, angle = MPSpline( xi=np.delete(t_edges, -2), yi=freq_average, x_edges=t_edges, border_type=border_type)( t_out, integral=True, x0=t_extrema[0], f0=phi0) return frequency*nanmask, angle*nanmask frequency = MPSpline( xi=np.delete(t_edges, -2), yi=freq_average, x_edges=t_edges, border_type=border_type)(t_out) return frequency*nanmask def _calculate_averaged_aa_variables( t: np.ndarray, xv: np.ndarray, act: Optional[np.ndarray] = None, border_type: str = 'apocenters', dJdt: bool = True, JR_ilr: bool = True, positive_omega: bool = True, apply_apo_filter: bool = True, freq_ratio_lim: float = 1.4, value_ratio_lim: float = 0.1, spline_expansion: int = 100 ): """Calculate averaged action-angle variables""" # Create high-resolution splines t_spline, R_spline, vR_spline, z_spline, vz_spline, phi_spline, \ JR_spline, Jz_spline, Lz_spline = \ _create_splines(t, xv, act, spline_expansion) # Find extrema indexes n_apo_spline = find_peaks_with_limitations( t_spline, R_spline, apply_filter=apply_apo_filter, freq_ratio_lim=freq_ratio_lim, value_ratio_lim=value_ratio_lim)[0] n_zmin_spline = find_peaks(-z_spline)[0] n_zmax_spline = find_peaks(z_spline)[0] # Calculate averaged actions JR = _calculate_average_action( t_spline, n_apo_spline, t, border_type, act=JR_spline, x=R_spline, vx=vR_spline, JR_ilr=JR_ilr, Lz=Lz_spline, dJdt=dJdt) Jz_min = _calculate_average_action( t_spline, n_zmin_spline, t, border_type, act=Jz_spline, x=z_spline, vx=vz_spline, dJdt=dJdt) Jz_max = _calculate_average_action( t_spline, n_zmax_spline, t, border_type, act=Jz_spline, x=z_spline, vx=vz_spline, dJdt=dJdt) Lz = _calculate_average_action( t_spline, n_apo_spline, t, border_type, act=Lz_spline, dJdt=dJdt) # Calculate averaged frequencies and angles kappa, theta_R = _calculate_frequency_and_angle( t_spline, n_apo_spline, t, border_type, phi0=0.) omega, theta_phi = _calculate_frequency_and_angle( t_spline, n_apo_spline, t, border_type, phi=phi_spline, positive=positive_omega) if (len(n_zmin_spline) == 0) or (len(n_zmax_spline) == 0): phi0_zmin = np.pi else: phi0_zmin = np.pi if n_zmin_spline[0] > n_zmax_spline[0] else -np.pi omegaz_min, theta_z_min = _calculate_frequency_and_angle( t_spline, n_zmin_spline, t, border_type, phi0=phi0_zmin) omegaz_max, theta_z_max = _calculate_frequency_and_angle( t_spline, n_zmax_spline, t, border_type, phi0=0.) # Get mean value for vertical variables theta_z = (theta_z_min + theta_z_max) / 2. Jz = (Jz_min + Jz_max) / 2. omegaz = (omegaz_min + omegaz_max) / 2. # Output if dJdt: dot_JR, dot_Jz, dot_Lz = JR[:, 1], Jz[:, 1], Lz[:, 1] JR, Jz, Lz = JR[:, 0], Jz[:, 0], Lz[:, 0] return np.column_stack((JR, Jz, Lz, dot_JR, dot_Jz, dot_Lz, theta_R, theta_z, theta_phi, kappa, omegaz, omega)) return np.column_stack((JR, Jz, Lz, theta_R, theta_z, theta_phi, kappa, omegaz, omega)) def _calculate_bar_variables(t, act, freq, spline_expansion, freq_ratio_lim, value_ratio_lim): """Calculate secular Jv=JR+Jz+Lz/2, Ωpr=Ω-κ/2, dLz / dΩpr""" t_spline = np.linspace(t[0], t[-1], len(t) * spline_expansion) Jv_0 = act[:, 0] + act[:, 1] + act[:, 2]/2 Omega_pr_0 = freq[:, 2] - freq[:, 0]/2 Jv = CubicSpline(t[~np.isnan(Jv_0)], Jv_0[~np.isnan(Jv_0)], extrapolate=False)(t_spline) Omega_pr = CubicSpline(t[~np.isnan(Omega_pr_0)], Omega_pr_0[~np.isnan(Omega_pr_0)], extrapolate=False)(t_spline) Lz = CubicSpline(t[~np.isnan(act[:, 2])], act[~np.isnan(act[:, 2]), 2], extrapolate=False)(t_spline) n_max_spline, Lz_max_spline, n_min_spline, Lz_min_spline = \ find_peaks_with_limitations( t_spline, Lz, apply_filter=True, freq_ratio_lim=freq_ratio_lim, value_ratio_lim=value_ratio_lim, minmax=True) n_spline = np.sort(np.hstack((n_max_spline, n_min_spline))) t_edges = t_spline[n_spline] if (len(n_max_spline) < 5) or (len(n_min_spline) < 5): Omega_pr_secular = _calculate_average_action( t_spline, n_spline, t, 'secular', act=Omega_pr) Jv_secular = _calculate_average_action( t_spline, n_spline, t, 'secular', act=Jv) else: Omega_pr_secular_max = _calculate_average_action( t_spline, n_max_spline, t, 'secular', act=Omega_pr) Omega_pr_secular_min = _calculate_average_action( t_spline, n_min_spline, t, 'secular', act=Omega_pr) Omega_pr_secular = (Omega_pr_secular_max + Omega_pr_secular_min) / 2. Jv_secular_max = _calculate_average_action( t_spline, n_max_spline, t, 'secular', act=Jv) Jv_secular_min = _calculate_average_action( t_spline, n_min_spline, t, 'secular', act=Jv) Jv_secular = (Jv_secular_max + Jv_secular_min) / 2. dLzdOmegapr = (Lz[n_spline[1:]] - Lz[n_spline[:-1]]) / \ (Omega_pr[n_spline[1:]] - Omega_pr[n_spline[:-1]]) LB_derivative = interp1d(t_edges[1:], dLzdOmegapr, kind='next', bounds_error=False)(t) return np.column_stack((Jv_secular, Omega_pr_secular, LB_derivative)) ########################################################################### BORDER_TYPES = ['apocenters', 'nbody', 'apocenters2', 'secular'] AVERAGE_TYPES = ['extrema', 'mean', 'onlymax']
[docs]def value( t: np.ndarray, x: np.ndarray, average_type: str = 'extrema', border_type: str = 'apocenters', minmax: bool = False, frequency: bool = False, angle: bool = False, spline_expansion: int = 100, apply_filter: bool = False, freq_ratio_lim: float = 1.4, value_ratio_lim: float = 0.1 ): """ Average a time series between its extrema points. This function computes averaged values of a time series by identifying extrema (maxima and minima) and averaging between them. Various averaging methods are available, and additional quantities like frequency and angle can be computed. Parameters ---------- t : (N,) numpy array Array of time values. x : (N,) numpy array Array of values to be averaged. average_type : str, optional Type of averaging to perform. Options: - ``extrema`` : average between successive maxima and minima - ``mean`` : compute mean values separately for maxima and minima intervals, then average them - ``onlymax`` : average only between maxima intervals Default: 'extrema' border_type : str, optional Border processing parameter. Options: * ``apocenters`` : calculation between first and last extrema * ``nbody`` : calculation between t=0 and last extrema * ``secular`` : for secular variation calculations Default: 'apocenters' minmax : bool, optional If True, also return the minima and maxima values as functions of time. Default: False frequency : bool, optional If True, compute the frequency of oscillation between extrema. Default: False angle : bool, optional If True, compute the phase angle of oscillation. Default: False spline_expansion : int, optional Factor by which to increase the resolution for finding extrema using cubic spline interpolation. Higher values give more accurate extrema detection but increase computation time. Default: 100 apply_filter : bool, optional If True, apply frequency and amplitude filters to remove spurious extrema. Default: False freq_ratio_lim : float, optional Lower limit on the ratio of frequencies for extrema filtering. An extremum is considered spurious if its frequency differs from neighbors by more than this factor. Used when apply_filter=True. Default: 1.4 value_ratio_lim : float, optional Lower limit on the value ratio for extrema filtering. An extremum is considered spurious if the amplitude variation between consecutive extrema is below this threshold. Calculated as: ``2*(|x_max| - |x_min|)/(|x_max| + |x_min|) < value_ratio_lim`` Used when apply_filter=True. Default: 0.1 Returns ------- result : (N, n_var) numpy array 2D array of shape (len(t), n_var) where n_var depends on the selected options. The columns are organized as: ===== ========= ========================================= ================= Index Name Description Condition ===== ========= ========================================= ================= 0 x_avg Time-averaged value between extrema always 1 x_min Minimum values interpolated in time ``minmax=True`` 2 x_max Maximum values interpolated in time ``minmax=True`` 3 f Oscillation frequency ``frequency=True`` 4 φ Oscillation phase angle ``angle=True`` ===== ========= ========================================= ================= """ if average_type not in AVERAGE_TYPES: ValueError(f"Unknown calc_type: {average_type}. Expected one of:\ {list(AVERAGE_TYPES)}") n_var = 1 + minmax*2 + frequency + angle t_spline = np.linspace(t[0], t[-1], len(t) * spline_expansion) notnan_x = ~np.isnan(x) if len(t[notnan_x]) > 1: x_spline = CubicSpline(t[notnan_x], x[notnan_x], extrapolate=False)(t_spline) elif len(t[notnan_x]) == 1: x_spline = np.ones_like(t_spline)*x[notnan_x] n_max_spline, xn_max_spline, n_min_spline, xn_min_spline = \ find_peaks_with_limitations( t_spline, x_spline, apply_filter=apply_filter, freq_ratio_lim=freq_ratio_lim, value_ratio_lim=value_ratio_lim, minmax=True) t_edges_max = t_spline[n_max_spline] t_edges_min = t_spline[n_min_spline] if border_type == 'secular': if (len(n_max_spline) < 5) or (len(n_min_spline) < 5): average_type = 'extrema' if average_type in ['extrema', 'mean']: n_spline = np.sort(np.hstack((n_max_spline, n_min_spline))) t_edges = t_spline[n_spline] if average_type == 'onlymax': n_spline = n_max_spline t_edges = t_edges_max if average_type == 'mean': if (len(n_min_spline) < 3) or (len(n_max_spline) < 3): return np.zeros((len(t), n_var))*np.nan x_mean_max_aver = _calculate_average_value( x_spline, n_max_spline, 'apocenters') x_mean_min_aver = _calculate_average_value( x_spline, n_min_spline, 'apocenters') t_edges_max_sp = _find_tedges_for_mpspline( t, t_edges_max, border_type='apocenters') t_edges_min_sp = _find_tedges_for_mpspline( t, t_edges_min, border_type='apocenters') x_mean_max = MPSpline(xi=np.delete(t_edges_max_sp, -2), yi=x_mean_max_aver, x_edges=t_edges_max_sp, border_type=border_type)(t) x_mean_min = MPSpline(xi=np.delete(t_edges_min_sp, -2), yi=x_mean_min_aver, x_edges=t_edges_min_sp, border_type=border_type)(t) x_mean = (x_mean_min + x_mean_max) / 2. else: if (len(n_spline) < 3): return np.zeros((len(t), n_var))*np.nan x_aver = _calculate_average_value( x_spline, n_spline, border_type) t_edges = _find_tedges_for_mpspline( t, t_edges, border_type='apocenters') x_mean = MPSpline(xi=np.delete(t_edges, -2), yi=x_aver, x_edges=t_edges, border_type=border_type)(t) nanmask = np.ones_like(x_mean) nanmask[(t < t_edges[0]) | (t > t_edges[-1])] = np.nan x_all = x_mean*nanmask # Calculate maxima and minima as a function if minmax: x_max = np.interp(t, t_edges_max, xn_max_spline) x_min = np.interp(t, t_edges_min, xn_min_spline) x_all = np.column_stack((x_mean*nanmask, x_min*nanmask, x_max*nanmask)) if not frequency: return x_all # Calculate frequencies and angles freq_x_min = _calculate_frequency_and_angle( t_spline, n_min_spline, t, 'apocenters', angle=angle) freq_x_max = _calculate_frequency_and_angle( t_spline, n_max_spline, t, 'apocenters', angle=angle) if angle: freq_x = (freq_x_max[0] + freq_x_min[0]) / 2. angle_x = (freq_x_max[1] + freq_x_min[1]) / 2. return np.column_stack((x_all, freq_x*nanmask, angle_x*nanmask)) freq_x = (freq_x_min + freq_x_max) / 2. return np.column_stack((x_all, freq_x*nanmask))
[docs]def action( t: np.ndarray, xv: np.ndarray, act: Optional[np.ndarray] = None, dJdt: bool = False, secular: bool = False, secular_extrema: bool = False, secular_act_freq: bool = False, secular_bar_var: bool = False, border_type: str = 'apocenters', JR_ilr: bool = True, positive_omega: bool = True, apply_apo_filter: bool = True, freq_ratio_lim: float = 1.4, value_ratio_lim: float = 0.1, spline_expansion: int = 100 ): """ Calculate averaged action-angle variables from orbital trajectories. This function processes time series of positions and velocities to compute averaged actions, angles, and frequencies. It can also extract secular (long-term) variations and various derived quantities. Parameters ---------- t : (N,) numpy array Array of time values. xv : (N, 6) numpy array Array of phase-space coordinates [x, y, z, vx, vy, vz] at each time step. act : (N, 3) numpy array, optional Array of instantaneous actions [JR, Jz, Lz] from Agama. If not provided, actions will be computed internally. Default: None dJdt : bool, optional If True, calculate time derivatives of actions (dJR/dt, dJz/dt, dLz/dt). Default: False secular : bool, optional If True, calculate secular (long-term) actions and frequencies. Default: False secular_extrema : bool, optional If True, calculate secular maxima and minima of averaged actions and frequencies. Default: False secular_act_freq : bool, optional If True, calculate oscillation frequencies of secular actions. Default: False secular_bar_var : bool, optional If True, calculate bar-specific variables: * Jv = JR + Jz + Lz/2 (adiabatic invariant), * Ωpr = Ω - κ/2 (secular precession rate), * dLz/dΩpr (Lynden-Bell derivative). Default: False border_type : str, optional Border processing parameter. Options: * ``apocenters`` : calculation between first and last apocenters * ``nbody`` : calculation between t=0 and last apocenter * ``secular`` : for secular variation calculations Default: 'apocenters' JR_ilr : bool, optional If True, adjust JR for ILR by adding -Lz when Lz < 0. Default: True positive_omega : bool, optional If True, always calculate average angular velocity as a positive value (the angle between apocenters is measured in the positive direction). Default: True apply_apo_filter : bool, optional If True, apply frequency and amplitude filters to remove spurious apocenters. Default: True freq_ratio_lim : float, optional Lower limit on the ratio of frequencies for apocenter filtering. Used when apply_apo_filter=True. Default: 1.4 value_ratio_lim : float, optional Lower limit on the value ratio for apocenter filtering. Used when apply_apo_filter=True. Default: 0.1 spline_expansion : int, optional Factor by which to increase the resolution for finding extrema using cubic spline interpolation. Default: 100 Returns ------- result : (N, n_var) numpy array 2D array of shape (len(t), n_values) where n_var depends on the selected options (ranges from 9 to 36). The columns are organized as: ====== ========================= ===================================== ============================= Index Name Description Condition ====== ========================= ===================================== ============================= 0 JR radial action always 1 Jz vertical action always 2 Lz angular momentum always 3 dJR/dt time derivative of radial action ``dJdt=True`` 4 dJz/dt time derivative of vertical action ``dJdt=True`` 5 dLz/dt time derivative of angular momentum ``dJdt=True`` 6 θR radial angle always 7 θz vertical angle always 8 θφ azimuthal angle always 9 κ radial frequency always 10 ωz vertical frequency always 11 Ω azimuthal frequency always 12 JR_sec secular radial action ``secular=True`` 13 Jz_sec secular vertical action ``secular=True`` 14 Lz_sec secular angular momentum ``secular=True`` 15 κ_sec secular radial frequency ``secular=True`` 16 ωz_sec secular vertical frequency ``secular=True`` 17 Ω_sec secular azimuthal frequency ``secular=True`` 18-20 JR_max, Jz_max, Lz_max secular maxima ``secular_extrema=True`` 21-23 JR_min, Jz_min, Lz_min secular minima ``secular_extrema=True`` 24-26 κ_max, ωz_max, Ω_max secular frequency maxima ``secular_extrema=True`` 27-29 κ_min, ωz_min, Ω_min secular frequency minima ``secular_extrema=True`` 30 ΩJR oscillation frequency of JR ``secular_act_freq=True`` 31 ΩJz oscillation frequency of Jz ``secular_act_freq=True`` 32 ΩLz oscillation frequency of Lz ``secular_act_freq=True`` 33 Jv adiabatic invariant (JR+Jz+Lz/2) ``secular_bar_var=True`` 34 Ωpr secular precession rate (Ω-κ/2) ``secular_bar_var=True`` 35 dLz/dΩpr Lynden-Bell derivative ``secular_bar_var=True`` ====== ========================= ===================================== ============================= Notes ----- The function uses mean-preserving spline interpolation to compute averaged quantities between orbital turning points (apocenters or z-maxima). The number of output columns depends on which options are enabled. """ # Find mean-averaged variables if np.all(np.isfinite(xv)): result = _calculate_averaged_aa_variables( t, xv, act=act, border_type=border_type, dJdt=dJdt, JR_ilr=JR_ilr, positive_omega=positive_omega, apply_apo_filter=apply_apo_filter, freq_ratio_lim=freq_ratio_lim, value_ratio_lim=value_ratio_lim, spline_expansion=spline_expansion) else: result = np.empty((len(t), 9 + dJdt*3)) if not secular: return result averaged_actions = result[:, 0:3] averaged_frequencies = result[:, 9:12] if dJdt else result[:, 6:9] # Find secular variables if secular: len_secular = 6 len_secular = len_secular+12 if secular_extrema else len_secular len_secular = len_secular+3 if secular_act_freq else len_secular secular_result = np.zeros((len(t), len_secular))*np.nan for i in range(3): if np.any(np.isfinite(averaged_actions[:, i])): secular_result[:, i::6] = value( t, averaged_actions[:, i], average_type='mean', minmax=secular_extrema, frequency=secular_act_freq, angle=False, spline_expansion=spline_expansion, border_type='secular', apply_filter=True ).reshape(-1, len_secular//6+secular_act_freq) if np.any(np.isfinite(averaged_frequencies[:, i])): secular_result[:, i+3::6] = value( t, averaged_frequencies[:, i], average_type='mean', minmax=secular_extrema, frequency=False, angle=False, spline_expansion=spline_expansion, border_type='secular', apply_filter=True ).reshape(-1, len_secular//6) if not secular_bar_var: return np.column_stack((result, secular_result)) # Calculate secular bar variables Jv, Ω_pr = Ω-κ/2, dLz/dΩ_pr secular_bar_result = _calculate_bar_variables( t, averaged_actions, averaged_frequencies, spline_expansion, freq_ratio_lim, value_ratio_lim) return np.column_stack((result, secular_result, secular_bar_result))