"""Standalone coupling measurement methods for benchmarking."""
import numpy as np
from scipy.signal import hilbert
from scipy.spatial.distance import cdist
[docs]
def transfer_entropy(
X: np.ndarray,
Y: np.ndarray,
k: int = 1,
bins: int = 8,
**kwargs,
) -> float:
"""Compute transfer entropy TE(X -> Y) using histogram estimation.
Measures information transfer from X to Y beyond Y's own history.
Parameters
----------
X, Y : 1D time series (same length)
k : history length (lag)
bins : number of bins for discretization
Returns
-------
float : TE(X -> Y) in nats
"""
X = np.asarray(X).ravel()
Y = np.asarray(Y).ravel()
n = min(len(X), len(Y))
X, Y = X[:n], Y[:n]
# Discretize using percentile bins
X_d = _discretize(X, bins)
Y_d = _discretize(Y, bins)
# Build conditional variables
# TE(X->Y) = H(Y_future | Y_past) - H(Y_future | Y_past, X_past)
Y_future = Y_d[k:]
Y_past = Y_d[:n - k]
X_past = X_d[:n - k]
# Compute as MI(X_past; Y_future | Y_past)
# = H(Y_future, Y_past) + H(X_past, Y_past) - H(Y_past) - H(Y_future, Y_past, X_past)
h_yf_yp = _joint_entropy(Y_future, Y_past)
h_xp_yp = _joint_entropy(X_past, Y_past)
h_yp = _entropy(Y_past)
h_yf_yp_xp = _triple_entropy(Y_future, Y_past, X_past)
te = h_yf_yp + h_xp_yp - h_yp - h_yf_yp_xp
return max(0.0, float(te))
[docs]
def pac(
X: np.ndarray,
Y: np.ndarray,
n_bins: int = 18,
**kwargs,
) -> float:
"""Compute phase-amplitude coupling modulation index (Tort et al. 2010).
Measures how much the amplitude of Y is modulated by the phase of X.
Uses raw Hilbert transform (no bandpass) — appropriate for broadband
chaotic signals where PAC is expected to return near-zero.
Parameters
----------
X, Y : 1D time series (same length)
n_bins : number of phase bins
Returns
-------
float : modulation index in [0, 1]
"""
X = np.asarray(X).ravel()
Y = np.asarray(Y).ravel()
n = min(len(X), len(Y))
X, Y = X[:n], Y[:n]
# Extract phase of X and amplitude of Y via Hilbert transform
phase_x = np.angle(hilbert(X))
amp_y = np.abs(hilbert(Y))
# Bin phases into equal-width bins in [-pi, pi)
bin_edges = np.linspace(-np.pi, np.pi, n_bins + 1)
bin_indices = np.digitize(phase_x, bin_edges) - 1
bin_indices = np.clip(bin_indices, 0, n_bins - 1)
# Mean amplitude in each phase bin
mean_amp = np.zeros(n_bins)
for b in range(n_bins):
mask = bin_indices == b
if mask.any():
mean_amp[b] = amp_y[mask].mean()
# Normalize to probability distribution
total = mean_amp.sum()
if total < 1e-15:
return 0.0
p = mean_amp / total
# Modulation index: KL divergence from uniform, normalized
uniform = np.ones(n_bins) / n_bins
# Avoid log(0)
p_safe = np.where(p > 1e-15, p, 1e-15)
kl = np.sum(p_safe * np.log(p_safe / uniform))
mi = kl / np.log(n_bins)
return float(max(0.0, mi))
[docs]
def crqa(
X: np.ndarray,
Y: np.ndarray,
embedding_dim: int = 3,
delay: int = 1,
radius: float | None = None,
min_line: int = 2,
**kwargs,
) -> float:
"""Compute cross-recurrence quantification analysis determinism.
Parameters
----------
X, Y : 1D time series
embedding_dim : embedding dimension for delay vectors
delay : time delay for embedding
radius : recurrence threshold; if None, auto-select 10th percentile of distances
min_line : minimum diagonal line length for determinism
Returns
-------
float : determinism (fraction of recurrence points on diagonal lines >= min_line)
"""
X = np.asarray(X).ravel()
Y = np.asarray(Y).ravel()
# Delay embedding
X_emb = _delay_embed(X, embedding_dim, delay)
Y_emb = _delay_embed(Y, embedding_dim, delay)
# Truncate to same length
n = min(len(X_emb), len(Y_emb))
X_emb = X_emb[:n]
Y_emb = Y_emb[:n]
# Subsample for speed if too large
max_points = 1000
if n > max_points:
step = n // max_points
X_emb = X_emb[::step]
Y_emb = Y_emb[::step]
n = min(len(X_emb), len(Y_emb))
# Cross-distance matrix
dists = cdist(X_emb, Y_emb, metric="euclidean")
# Auto-select radius if not provided
if radius is None:
radius = np.percentile(dists, 10)
if radius < 1e-10:
radius = np.percentile(dists, 20)
# Cross-recurrence matrix
crm = (dists <= radius).astype(int)
total_recurrence = crm.sum()
if total_recurrence == 0:
return 0.0
# Determinism: fraction of recurrence points on diagonal lines >= min_line
diag_points = 0
rows, cols = crm.shape
for offset in range(-rows + 1, cols):
diag = np.diag(crm, offset)
diag_points += _count_line_points(diag, min_line)
det = diag_points / total_recurrence if total_recurrence > 0 else 0.0
return float(np.clip(det, 0.0, 1.0))
# ---- Private helpers ----
def _discretize(x: np.ndarray, bins: int) -> np.ndarray:
"""Discretize continuous values into bins via percentile edges."""
edges = np.percentile(x, np.linspace(0, 100, bins + 1))
edges[-1] += 1e-10 # include max value
return np.digitize(x, edges[1:])
def _entropy(x: np.ndarray) -> float:
"""Shannon entropy of discrete variable."""
_, counts = np.unique(x, return_counts=True)
p = counts / counts.sum()
return -np.sum(p * np.log(p + 1e-15))
def _joint_entropy(x: np.ndarray, y: np.ndarray) -> float:
"""Joint entropy of two discrete variables."""
xy = np.stack([x, y], axis=1)
_, counts = np.unique(xy, axis=0, return_counts=True)
p = counts / counts.sum()
return -np.sum(p * np.log(p + 1e-15))
def _triple_entropy(x: np.ndarray, y: np.ndarray, z: np.ndarray) -> float:
"""Joint entropy of three discrete variables."""
xyz = np.stack([x, y, z], axis=1)
_, counts = np.unique(xyz, axis=0, return_counts=True)
p = counts / counts.sum()
return -np.sum(p * np.log(p + 1e-15))
def _delay_embed(x: np.ndarray, dim: int, delay: int) -> np.ndarray:
"""Create delay embedding vectors."""
n = len(x) - (dim - 1) * delay
if n <= 0:
return np.empty((0, dim))
return np.array([x[i * delay: i * delay + n] for i in range(dim)]).T
def _count_line_points(diag: np.ndarray, min_line: int) -> int:
"""Count points on lines of length >= min_line in a binary diagonal."""
count = 0
current_len = 0
for val in diag:
if val:
current_len += 1
else:
if current_len >= min_line:
count += current_len
current_len = 0
if current_len >= min_line:
count += current_len
return count