"""Persistent homology computation via Ripser/GUDHI."""
import numpy as np
from att.config.seed import get_rng
[docs]
class PersistenceAnalyzer:
"""Compute persistent homology on point clouds.
Parameters
----------
max_dim : int
Max homology dimension (0=components, 1=loops, 2=voids).
backend : str
"ripser" (fast for H0+H1) or "gudhi" (H2+, alpha complexes).
use_witness : bool
Use witness complex for large point clouds (n > 2000).
n_landmarks : int
Number of landmarks for witness complex.
"""
[docs]
def __init__(
self,
max_dim: int = 2,
backend: str = "ripser",
use_witness: bool = False,
n_landmarks: int = 500,
):
self.max_dim = max_dim
self.backend = backend
self.use_witness = use_witness
self.n_landmarks = n_landmarks
self.diagrams_: list[np.ndarray] | None = None
self._cloud: np.ndarray | None = None
def _ripser_compute(self, cloud: np.ndarray) -> list[np.ndarray]:
"""Compute persistence via Ripser."""
import ripser
result = ripser.ripser(cloud, maxdim=self.max_dim)
diagrams = []
for dim in range(self.max_dim + 1):
dgm = result["dgms"][dim]
# Remove infinite death features for H0
if dim == 0:
finite_mask = np.isfinite(dgm[:, 1])
dgm = dgm[finite_mask]
diagrams.append(dgm)
return diagrams
@staticmethod
def _maxmin_landmarks(cloud: np.ndarray, n_landmarks: int) -> np.ndarray:
"""Greedy farthest-point (maxmin) landmark sampling.
Parameters
----------
cloud : (n_points, dimension) point cloud
n_landmarks : number of landmarks to select
Returns
-------
(n_landmarks,) array of indices into cloud
"""
n = len(cloud)
n_landmarks = min(n_landmarks, n)
indices = np.zeros(n_landmarks, dtype=int)
# Start from point 0
indices[0] = 0
# Distance from each point to nearest landmark so far
dist_to_nearest = np.full(n, np.inf)
for i in range(1, n_landmarks):
# Update distances with the most recently added landmark
last = indices[i - 1]
d = np.linalg.norm(cloud - cloud[last], axis=1)
dist_to_nearest = np.minimum(dist_to_nearest, d)
# Pick the point farthest from all current landmarks
indices[i] = np.argmax(dist_to_nearest)
return indices
def _gudhi_witness_compute(self, cloud: np.ndarray) -> list[np.ndarray]:
"""Compute persistence via GUDHI Euclidean strong witness complex.
Parameters
----------
cloud : (n_points, dimension) point cloud (witnesses)
Returns
-------
list of persistence diagrams, one per homology dimension
"""
import gudhi
landmark_indices = self._maxmin_landmarks(cloud, self.n_landmarks)
landmarks = cloud[landmark_indices]
witness_complex = gudhi.EuclideanStrongWitnessComplex(
witnesses=cloud, landmarks=landmarks
)
stree = witness_complex.create_simplex_tree(
max_alpha_square=float("inf"),
limit_dimension=self.max_dim + 1,
)
stree.compute_persistence()
diagrams = []
for dim in range(self.max_dim + 1):
pairs = stree.persistence_intervals_in_dimension(dim)
if len(pairs) > 0:
dgm = np.array(pairs)
finite_mask = np.isfinite(dgm[:, 1])
dgm = dgm[finite_mask]
else:
dgm = np.empty((0, 2))
diagrams.append(dgm)
return diagrams
def _gudhi_compute(self, cloud: np.ndarray) -> list[np.ndarray]:
"""Compute persistence via GUDHI alpha or witness complex."""
if self.use_witness:
return self._gudhi_witness_compute(cloud)
import gudhi
alpha = gudhi.AlphaComplex(points=cloud)
st = alpha.create_simplex_tree()
st.compute_persistence()
diagrams = []
for dim in range(self.max_dim + 1):
pairs = st.persistence_intervals_in_dimension(dim)
if len(pairs) > 0:
dgm = np.array(pairs)
finite_mask = np.isfinite(dgm[:, 1])
dgm = dgm[finite_mask]
else:
dgm = np.empty((0, 2))
diagrams.append(dgm)
return diagrams
def _compute_betti_curves(
self, diagrams: list[np.ndarray], n_grid: int = 100
) -> list[np.ndarray]:
"""Compute Betti curves (Betti number as function of filtration parameter)."""
all_births = []
all_deaths = []
for dgm in diagrams:
if len(dgm) > 0:
all_births.extend(dgm[:, 0])
all_deaths.extend(dgm[:, 1])
if not all_births:
return [np.zeros(n_grid) for _ in diagrams]
eps_min = min(all_births)
eps_max = max(all_deaths) if all_deaths else eps_min + 1
grid = np.linspace(eps_min, eps_max, n_grid)
curves = []
for dgm in diagrams:
betti = np.zeros(n_grid)
for birth, death in dgm:
betti += (grid >= birth) & (grid < death)
curves.append(betti)
return curves
def _compute_entropy(self, diagrams: list[np.ndarray]) -> list[float]:
"""Compute persistence entropy per dimension."""
entropies = []
for dgm in diagrams:
if len(dgm) == 0:
entropies.append(0.0)
continue
lifetimes = dgm[:, 1] - dgm[:, 0]
lifetimes = lifetimes[lifetimes > 0]
if len(lifetimes) == 0:
entropies.append(0.0)
continue
total = lifetimes.sum()
probs = lifetimes / total
entropy = -np.sum(probs * np.log(probs + 1e-15))
entropies.append(float(entropy))
return entropies
[docs]
def distance(self, other: "PersistenceAnalyzer", metric: str = "bottleneck") -> float:
"""Compute distance between persistence diagrams.
Parameters
----------
other : another PersistenceAnalyzer with computed diagrams
metric : "bottleneck", "wasserstein_1", "wasserstein_2"
Returns
-------
float : maximum distance across all dimensions
"""
if self.diagrams_ is None or other.diagrams_ is None:
raise RuntimeError("Both analyzers must have computed diagrams.")
import persim
max_dist = 0.0
n_dims = min(len(self.diagrams_), len(other.diagrams_))
for dim in range(n_dims):
dgm1 = self.diagrams_[dim]
dgm2 = other.diagrams_[dim]
if len(dgm1) == 0 and len(dgm2) == 0:
continue
# Ensure non-empty diagrams for persim
if len(dgm1) == 0:
dgm1 = np.array([[0, 0]])
if len(dgm2) == 0:
dgm2 = np.array([[0, 0]])
if metric == "bottleneck":
d = persim.bottleneck(dgm1, dgm2)
elif metric.startswith("wasserstein"):
# persim.wasserstein only supports order 1
d = persim.wasserstein(dgm1, dgm2)
else:
raise ValueError(f"Unknown metric: {metric}")
max_dist = max(max_dist, d)
return max_dist
[docs]
def to_image(
self,
resolution: int = 50,
sigma: float = 0.1,
birth_range: tuple[float, float] | None = None,
persistence_range: tuple[float, float] | None = None,
) -> list[np.ndarray]:
"""Convert diagrams to persistence images.
Parameters
----------
resolution : int
Grid size for the persistence image (resolution x resolution).
sigma : float
Gaussian kernel bandwidth.
birth_range : (min, max) or None
Explicit birth-axis range. If None, computed from data per diagram.
persistence_range : (min, max) or None
Explicit persistence-axis range. If None, computed from data per diagram.
Returns
-------
list of (resolution, resolution) arrays, one per homology dimension.
"""
if self.diagrams_ is None:
raise RuntimeError("Call fit_transform first.")
images = []
for dgm in self.diagrams_:
if len(dgm) == 0:
images.append(np.zeros((resolution, resolution)))
continue
# Transform to birth-persistence coordinates
births = dgm[:, 0]
persistence = dgm[:, 1] - dgm[:, 0]
mask = persistence > 1e-10
if mask.sum() == 0:
images.append(np.zeros((resolution, resolution)))
continue
births = births[mask]
persistence = persistence[mask]
# Weight by persistence (linear ramp)
weights = persistence / persistence.max()
# Grid ranges — use explicit if provided, else compute from data
if birth_range is not None:
b_min, b_max = birth_range
else:
b_min, b_max = births.min(), births.max()
if persistence_range is not None:
p_min, p_max = persistence_range
else:
p_min, p_max = 0.0, persistence.max()
b_pad = (b_max - b_min) * 0.1 + 1e-6
p_pad = (p_max - p_min) * 0.1 + 1e-6
b_grid = np.linspace(b_min - b_pad, b_max + b_pad, resolution)
p_grid = np.linspace(max(0, p_min - p_pad), p_max + p_pad, resolution)
# Compute persistence image as sum of weighted Gaussians
img = np.zeros((resolution, resolution))
for b, p, w in zip(births, persistence, weights):
gb = np.exp(-((b_grid - b) ** 2) / (2 * sigma ** 2))
gp = np.exp(-((p_grid - p) ** 2) / (2 * sigma ** 2))
img += w * np.outer(gp, gb)
images.append(img)
return images
[docs]
def to_landscape(self, n_layers: int = 5, n_grid: int = 100) -> list[np.ndarray]:
"""Convert diagrams to persistence landscapes.
Returns one (n_layers, n_grid) array per homology dimension.
"""
if self.diagrams_ is None:
raise RuntimeError("Call fit_transform first.")
landscapes = []
for dgm in self.diagrams_:
if len(dgm) == 0:
landscapes.append(np.zeros((n_layers, n_grid)))
continue
births = dgm[:, 0]
deaths = dgm[:, 1]
t_min = births.min()
t_max = deaths.max()
grid = np.linspace(t_min, t_max, n_grid)
# Each feature defines a tent function
tents = np.zeros((len(dgm), n_grid))
for i, (b, d) in enumerate(dgm):
mid = (b + d) / 2.0
half_life = (d - b) / 2.0
if half_life < 1e-10:
continue
tents[i] = np.maximum(0, half_life - np.abs(grid - mid))
# k-th landscape is the k-th largest tent value at each grid point
landscape = np.zeros((n_layers, n_grid))
for j in range(n_grid):
vals = np.sort(tents[:, j])[::-1]
for k in range(min(n_layers, len(vals))):
landscape[k, j] = vals[k]
landscapes.append(landscape)
return landscapes
[docs]
def plot(self, kind: str = "diagram"):
"""Plot persistence results.
kind: 'diagram', 'barcode', 'betti_curve', 'landscape', 'image'
"""
# Defer to viz module
from att.viz import plotting
if kind == "diagram":
return plotting.plot_persistence_diagram(self.diagrams_)
elif kind == "barcode":
return plotting.plot_barcode(self.diagrams_)
elif kind == "betti_curve":
curves = self._compute_betti_curves(self.diagrams_)
return plotting.plot_betti_curve(curves)
elif kind == "image":
images = self.to_image()
return plotting.plot_persistence_image(images)
elif kind == "landscape":
landscapes = self.to_landscape()
# Basic plot
import matplotlib.pyplot as plt
fig, axes = plt.subplots(1, len(landscapes), figsize=(5 * len(landscapes), 4))
if len(landscapes) == 1:
axes = [axes]
for i, (ls, ax) in enumerate(zip(landscapes, axes)):
for k in range(ls.shape[0]):
ax.plot(ls[k], label=f"λ_{k}")
ax.set_title(f"H{i} Landscape")
ax.legend()
return fig
else:
raise ValueError(f"Unknown plot kind: {kind}")