Source code for att.transitions.detector
"""Sliding-window persistent homology for topology transition detection."""
import numpy as np
from att.topology.persistence import PersistenceAnalyzer
[docs]
class TransitionDetector:
"""Detect topological transitions via sliding-window persistent homology.
Parameters
----------
window_size : int
Number of points per window.
step_size : int
Step between consecutive windows.
max_dim : int
Maximum homology dimension.
backend : str
PersistenceAnalyzer backend ("ripser" or "gudhi").
subsample : int or None
Points to subsample per window (None = use all).
"""
[docs]
def __init__(
self,
window_size: int = 500,
step_size: int = 50,
max_dim: int = 1,
backend: str = "ripser",
subsample: int | None = None,
):
self.window_size = window_size
self.step_size = step_size
self.max_dim = max_dim
self.backend = backend
self.subsample = subsample
self._result = None
[docs]
def fit_transform(
self,
X: np.ndarray,
seed: int | None = None,
embedding_dim: int | None = None,
embedding_delay: int | None = None,
) -> dict:
"""Run sliding-window PH on input data.
Parameters
----------
X : array
If 2D (n_points, dim): pre-embedded point cloud. Windows the cloud directly.
If 1D (n_samples,): time series. Embeds each window separately
(requires embedding_dim and embedding_delay).
seed : random seed for subsampling
embedding_dim : embedding dimension (required for 1D input)
embedding_delay : embedding delay (required for 1D input)
Returns
-------
dict with keys:
topology_timeseries: list of fit_transform results per window
distances: list of bottleneck distances between consecutive windows
image_distances: list of L2 distances between consecutive persistence images
window_centers: array of center sample indices
transition_scores: array (same as image_distances, the default score)
"""
X = np.asarray(X)
is_1d = X.ndim == 1
if is_1d:
if embedding_dim is None or embedding_delay is None:
raise ValueError("1D input requires embedding_dim and embedding_delay")
from att.embedding.takens import TakensEmbedder
# Generate windows
n_samples = len(X)
window_starts = list(
range(0, n_samples - self.window_size + 1, self.step_size)
)
if not window_starts:
raise ValueError(
f"Input ({n_samples} points) too short for window_size={self.window_size}"
)
# Phase 1: Compute PH per window
analyzers = []
topology_timeseries = []
window_centers = []
for start in window_starts:
end = start + self.window_size
if is_1d:
embedder = TakensEmbedder(delay=embedding_delay, dimension=embedding_dim)
embedder.fit(X[start:end])
cloud = embedder.transform(X[start:end])
else:
cloud = X[start:end]
pa = PersistenceAnalyzer(max_dim=self.max_dim, backend=self.backend)
result = pa.fit_transform(cloud, subsample=self.subsample, seed=seed)
analyzers.append(pa)
topology_timeseries.append(result)
window_centers.append(start + self.window_size // 2)
window_centers = np.array(window_centers)
# Phase 2: Compute shared birth/persistence ranges across ALL windows
all_births = []
all_persistences = []
for res in topology_timeseries:
for dgm in res["diagrams"]:
if len(dgm) > 0:
all_births.extend(dgm[:, 0].tolist())
pers = dgm[:, 1] - dgm[:, 0]
all_persistences.extend(pers[pers > 1e-10].tolist())
if all_births and all_persistences:
birth_range = (min(all_births), max(all_births))
persistence_range = (0.0, max(all_persistences))
else:
birth_range = (0.0, 1.0)
persistence_range = (0.0, 1.0)
# Phase 3: Re-compute images on shared grid
shared_images = []
for pa in analyzers:
imgs = pa.to_image(birth_range=birth_range, persistence_range=persistence_range)
shared_images.append(imgs)
# Phase 4: Compute distances between consecutive windows
distances = []
image_distances = []
for i in range(len(analyzers) - 1):
# Bottleneck distance
d = analyzers[i].distance(analyzers[i + 1], metric="bottleneck")
distances.append(d)
# L2 image distance (sum across dimensions)
img_dist = 0.0
for dim in range(self.max_dim + 1):
diff = shared_images[i][dim] - shared_images[i + 1][dim]
img_dist += float(np.sqrt(np.sum(diff**2)))
image_distances.append(img_dist)
distances = np.array(distances)
image_distances = np.array(image_distances)
self._result = {
"topology_timeseries": topology_timeseries,
"distances": distances,
"image_distances": image_distances,
"window_centers": window_centers,
"transition_scores": image_distances,
"_analyzers": analyzers,
"_shared_images": shared_images,
}
return self._result
[docs]
def detect_changepoints(
self,
method: str = "cusum",
threshold: float | None = None,
) -> list[int]:
"""Detect changepoints in the transition score series.
Parameters
----------
method : "cusum" or "threshold"
threshold : detection threshold. Default: mean + 2*std for cusum,
mean + 2*std for threshold.
Returns
-------
List of indices into window_centers[:-1] where transitions detected.
"""
if self._result is None:
raise RuntimeError("Call fit_transform first.")
scores = self._result["transition_scores"]
if len(scores) == 0:
return []
if method == "cusum":
return self._cusum_changepoints(scores, threshold)
elif method == "threshold":
if threshold is None:
threshold = float(np.mean(scores) + 2 * np.std(scores))
return [int(i) for i in np.where(scores > threshold)[0]]
else:
raise ValueError(f"Unknown method: {method}")
@staticmethod
def _cusum_changepoints(
scores: np.ndarray,
threshold: float | None = None,
) -> list[int]:
"""Forward CUSUM changepoint detection.
Accumulates positive deviations from the mean. A changepoint is
detected where the cumulative sum exceeds the threshold, then
the accumulator resets.
"""
mean = float(np.mean(scores))
std = float(np.std(scores))
if threshold is None:
threshold = mean + 2 * std
cusum = 0.0
changepoints = []
for i, s in enumerate(scores):
cusum = max(0.0, cusum + (s - mean))
if cusum > threshold:
changepoints.append(i)
cusum = 0.0 # Reset after detection
return changepoints
[docs]
def plot_timeline(self, ground_truth: list[int] | None = None):
"""Plot transition timeline. Delegates to viz module."""
from att.viz.plotting import plot_transition_timeline
return plot_transition_timeline(self, ground_truth=ground_truth)