Source code for att.embedding.joint

"""Joint delay embedding for multi-system analysis."""

import numpy as np

from att.embedding.delay import estimate_delay
from att.embedding.dimension import estimate_dimension


[docs] class JointEmbedder: """Construct joint delay embeddings with per-channel delay estimation. Using "auto" is strongly recommended for systems with different timescales. Parameters ---------- delays : list[int] or "auto" Per-channel delays. "auto" estimates independently per channel via AMI. dimensions : list[int] or "auto" Per-channel embedding dimensions. "auto" estimates per channel via FNN. """
[docs] def __init__( self, delays: list[int] | str = "auto", dimensions: list[int] | str = "auto", ): self.delays = delays self.dimensions = dimensions self.delays_: list[int] | None = None self.dimensions_: list[int] | None = None
[docs] def fit(self, channels: list[np.ndarray]) -> "JointEmbedder": """Estimate per-channel parameters. Stores .delays_ and .dimensions_.""" n_channels = len(channels) if self.delays == "auto": self.delays_ = [estimate_delay(np.asarray(ch).ravel()) for ch in channels] else: self.delays_ = [int(d) for d in self.delays] if len(self.delays_) != n_channels: raise ValueError(f"Expected {n_channels} delays, got {len(self.delays_)}") if self.dimensions == "auto": self.dimensions_ = [ estimate_dimension(np.asarray(ch).ravel(), self.delays_[i]) for i, ch in enumerate(channels) ] else: self.dimensions_ = [int(d) for d in self.dimensions] if len(self.dimensions_) != n_channels: raise ValueError(f"Expected {n_channels} dimensions, got {len(self.dimensions_)}") return self
[docs] def transform(self, channels: list[np.ndarray]) -> np.ndarray: """Construct joint delay vectors by concatenating per-channel embeddings. Input: list of 1D arrays, each (n_samples,) Output: (n_valid_samples, sum(dimensions)) """ if self.delays_ is None or self.dimensions_ is None: raise RuntimeError("Call .fit() before .transform()") embeddings = [] min_length = float("inf") for i, ch in enumerate(channels): ch = np.asarray(ch).ravel() d = self.dimensions_[i] tau = self.delays_[i] n = len(ch) - (d - 1) * tau if n <= 0: raise ValueError( f"Channel {i} too short ({len(ch)}) for delay={tau}, dim={d}." ) cloud = np.zeros((n, d)) for j in range(d): cloud[:, j] = ch[j * tau: j * tau + n] embeddings.append(cloud) min_length = min(min_length, n) # Truncate all to same length (determined by most restrictive channel) truncated = [emb[:min_length] for emb in embeddings] return np.hstack(truncated)
[docs] def transform_marginals(self, channels: list[np.ndarray]) -> list[np.ndarray]: """Return individually embedded point clouds for marginal comparison.""" if self.delays_ is None or self.dimensions_ is None: raise RuntimeError("Call .fit() before .transform_marginals()") marginals = [] for i, ch in enumerate(channels): ch = np.asarray(ch).ravel() d = self.dimensions_[i] tau = self.delays_[i] n = len(ch) - (d - 1) * tau cloud = np.zeros((n, d)) for j in range(d): cloud[:, j] = ch[j * tau: j * tau + n] marginals.append(cloud) return marginals
[docs] def fit_transform(self, channels: list[np.ndarray]) -> np.ndarray: """Fit and transform in one call.""" return self.fit(channels).transform(channels)