Source code for att.surrogates.core
"""Surrogate generation for significance testing."""
import numpy as np
from scipy.spatial.distance import cdist
from att.config.seed import get_rng
[docs]
def phase_randomize(
X: np.ndarray,
n_surrogates: int = 100,
seed: int | None = None,
) -> np.ndarray:
"""Generate amplitude-adjusted phase-randomized (AAFT) surrogates.
Preserves the power spectrum and marginal distribution of X while
destroying nonlinear coupling and phase relationships.
Parameters
----------
X : 1D time series of length n_samples
n_surrogates : number of surrogates to generate
seed : random seed for reproducibility
Returns
-------
(n_surrogates, n_samples) array of surrogate time series
"""
X = np.asarray(X).ravel()
n = len(X)
rng = get_rng(seed)
# Precompute FFT of original
fft_x = np.fft.rfft(X)
amplitudes = np.abs(fft_x)
# Sorted original values for amplitude adjustment
sorted_x = np.sort(X)
surrogates = np.empty((n_surrogates, n))
for i in range(n_surrogates):
# Random phases for non-DC, non-Nyquist frequencies
n_freq = len(fft_x)
random_phases = rng.uniform(0, 2 * np.pi, n_freq)
# DC component: keep phase 0
random_phases[0] = 0.0
# Nyquist: phase must be 0 or pi for real-valued output
if n % 2 == 0:
random_phases[-1] = rng.choice([0.0, np.pi])
# Apply random phases to original amplitudes
randomized_fft = amplitudes * np.exp(1j * random_phases)
surrogate = np.fft.irfft(randomized_fft, n=n)
# Amplitude adjustment: rank-reorder to match original distribution
rank_order = np.argsort(np.argsort(surrogate))
surrogates[i] = sorted_x[rank_order]
return surrogates
[docs]
def time_shuffle(
X: np.ndarray,
n_surrogates: int = 100,
block_size: int | None = None,
seed: int | None = None,
) -> np.ndarray:
"""Generate time-shuffled surrogates.
Destroys temporal structure while preserving univariate statistics.
With block_size > 1, preserves short-range autocorrelation within blocks.
Parameters
----------
X : 1D time series of length n_samples
n_surrogates : number of surrogates to generate
block_size : if None, iid permutation; if int, shuffle blocks of this size
seed : random seed for reproducibility
Returns
-------
(n_surrogates, n_samples) array of surrogate time series
"""
X = np.asarray(X).ravel()
n = len(X)
rng = get_rng(seed)
surrogates = np.empty((n_surrogates, n))
if block_size is None or block_size <= 1:
for i in range(n_surrogates):
surrogates[i] = rng.permutation(X)
else:
n_blocks = int(np.ceil(n / block_size))
for i in range(n_surrogates):
# Split into blocks
blocks = [X[j * block_size: (j + 1) * block_size] for j in range(n_blocks)]
# Shuffle block order
perm = rng.permutation(n_blocks)
shuffled = np.concatenate([blocks[p] for p in perm])
surrogates[i] = shuffled[:n]
return surrogates
[docs]
def twin_surrogate(
X: np.ndarray,
n_surrogates: int = 100,
embedding_dim: int = 3,
embedding_delay: int = 1,
recurrence_threshold: float | None = None,
seed: int | None = None,
) -> np.ndarray:
"""Generate twin surrogates from recurrence structure (Thiel et al. 2006).
Preserves the recurrence properties of the attractor while destroying
the specific temporal ordering. Tests specifically for deterministic
coupling structure in the dynamics.
Parameters
----------
X : 1D time series of length n_samples
n_surrogates : number of surrogates to generate
embedding_dim : dimension for Takens delay embedding
embedding_delay : delay for Takens embedding
recurrence_threshold : distance threshold for recurrence matrix;
if None, uses 10th percentile of pairwise distances
seed : random seed for reproducibility
Returns
-------
(n_surrogates, n_output) array where n_output = n_samples - (embedding_dim - 1) * embedding_delay
"""
X = np.asarray(X).ravel()
n = len(X)
rng = get_rng(seed)
# 1. Delay-embed the time series
pad = (embedding_dim - 1) * embedding_delay
n_embedded = n - pad
embedded = np.empty((n_embedded, embedding_dim))
for d in range(embedding_dim):
embedded[:, d] = X[d * embedding_delay: d * embedding_delay + n_embedded]
# 2. Pairwise distance matrix
dist_matrix = cdist(embedded, embedded, metric="euclidean")
# 3. Build recurrence matrix
if recurrence_threshold is None:
# Use 10th percentile of all pairwise distances (excluding diagonal zeros)
upper_tri = dist_matrix[np.triu_indices(n_embedded, k=1)]
recurrence_threshold = np.percentile(upper_tri, 10)
recurrence = dist_matrix < recurrence_threshold
# 4. Precompute twin lists: twins[i] = indices j where recurrence[i,j] is True
# (simplified definition: twins are recurrent neighbors)
twins = [np.where(recurrence[i])[0] for i in range(n_embedded)]
# 5. Generate surrogates
surrogates = np.empty((n_surrogates, n_embedded))
for s in range(n_surrogates):
trajectory = np.empty(n_embedded, dtype=int)
# Pick random starting index
trajectory[0] = rng.integers(0, n_embedded)
for t in range(1, n_embedded):
current = trajectory[t - 1]
tw = twins[current]
if len(tw) > 0:
# Jump to a random twin's next time step
chosen_twin = tw[rng.integers(0, len(tw))]
# Advance to next step, wrapping around if at the end
trajectory[t] = (chosen_twin + 1) % n_embedded
else:
# No twins — advance sequentially
trajectory[t] = (current + 1) % n_embedded
# Extract the first coordinate of the embedded point at each visited index
surrogates[s] = embedded[trajectory, 0]
return surrogates