Source code for att.llm.features
"""Topological feature extraction for ML prediction (Direction 2).
Vectorizes persistence diagrams into fixed-length feature vectors suitable
for logistic regression, random forests, etc. Supports summary statistics
and persistence image features.
"""
from __future__ import annotations
import numpy as np
from sklearn.decomposition import PCA
from att.topology.persistence import PersistenceAnalyzer
[docs]
class TopologicalFeatureExtractor:
"""Extract fixed-length topological feature vectors from point clouds.
Parameters
----------
max_dim : int
Maximum homology dimension.
n_pca_components : int
PCA dimensions before PH computation.
subsample : int or None
Max points per cloud for PH.
feature_set : str
"summary" (8 features per dim) or "image" (summary + flattened PI).
pi_resolution : int
Persistence image resolution (only used when feature_set="image").
pi_sigma : float
Persistence image Gaussian bandwidth.
seed : int
Random seed.
"""
# Summary feature names per homology dimension
_SUMMARY_NAMES = [
"persistence_entropy",
"total_persistence",
"n_features",
"max_lifetime",
"mean_lifetime",
"std_lifetime",
"max_birth",
"mean_birth",
]
def __init__(
self,
max_dim: int = 1,
n_pca_components: int = 50,
subsample: int | None = 200,
feature_set: str = "summary",
pi_resolution: int = 20,
pi_sigma: float = 0.1,
seed: int = 42,
):
self.max_dim = max_dim
self.n_pca_components = n_pca_components
self.subsample = subsample
self.feature_set = feature_set
self.pi_resolution = pi_resolution
self.pi_sigma = pi_sigma
self.seed = seed
@property
def feature_names(self) -> list[str]:
"""List of feature names matching the output columns."""
names = []
for dim in range(self.max_dim + 1):
for name in self._SUMMARY_NAMES:
names.append(f"H{dim}_{name}")
if self.feature_set == "image":
for dim in range(self.max_dim + 1):
for i in range(self.pi_resolution):
for j in range(self.pi_resolution):
names.append(f"H{dim}_pi_{i}_{j}")
return names
@property
def n_features(self) -> int:
"""Total number of features."""
n_summary = len(self._SUMMARY_NAMES) * (self.max_dim + 1)
if self.feature_set == "image":
n_summary += (self.pi_resolution ** 2) * (self.max_dim + 1)
return n_summary
def _summarize_diagram(self, diagrams: list[np.ndarray]) -> np.ndarray:
"""Extract summary statistics from persistence diagrams.
Returns (n_summary_features,) array.
"""
features = []
for dim in range(self.max_dim + 1):
dgm = diagrams[dim] if dim < len(diagrams) else np.empty((0, 2))
if len(dgm) == 0:
features.extend([0.0] * len(self._SUMMARY_NAMES))
continue
lifetimes = dgm[:, 1] - dgm[:, 0]
lifetimes = lifetimes[lifetimes > 0]
births = dgm[:, 0]
if len(lifetimes) == 0:
features.extend([0.0] * len(self._SUMMARY_NAMES))
continue
# Persistence entropy
total = lifetimes.sum()
probs = lifetimes / (total + 1e-15)
entropy = -np.sum(probs * np.log(probs + 1e-15))
features.extend([
float(entropy), # persistence_entropy
float(total), # total_persistence
float(len(lifetimes)), # n_features
float(np.max(lifetimes)), # max_lifetime
float(np.mean(lifetimes)), # mean_lifetime
float(np.std(lifetimes)), # std_lifetime
float(np.max(births)), # max_birth
float(np.mean(births)), # mean_birth
])
return np.array(features)
[docs]
def extract_single(self, cloud: np.ndarray) -> np.ndarray:
"""Extract topological features from a single point cloud.
Parameters
----------
cloud : (n_points, d) point cloud.
Returns
-------
(n_features,) feature vector.
"""
n_pts = cloud.shape[0]
if n_pts < 3:
return np.zeros(self.n_features)
n_comp = min(self.n_pca_components, n_pts - 1, cloud.shape[1])
pca = PCA(n_components=n_comp)
cloud_pca = pca.fit_transform(cloud)
pa = PersistenceAnalyzer(max_dim=self.max_dim, backend="ripser")
sub = min(n_pts, self.subsample) if self.subsample else None
result = pa.fit_transform(cloud_pca, subsample=sub, seed=self.seed)
diagrams = result["diagrams"]
summary = self._summarize_diagram(diagrams)
if self.feature_set == "summary":
return summary
# Image features
images = pa.to_image(
resolution=self.pi_resolution,
sigma=self.pi_sigma,
)
image_features = []
for dim in range(self.max_dim + 1):
img = images[dim] if dim < len(images) else np.zeros(
(self.pi_resolution, self.pi_resolution)
)
image_features.append(img.ravel())
return np.concatenate([summary, *image_features])
[docs]
def extract_batch(
self,
loader,
layer: int = -1,
) -> tuple[np.ndarray, list[str]]:
"""Extract features for all problems in a loader, per difficulty level.
Computes PH on the level-cloud at the given layer for each difficulty
level, producing one feature vector per level.
Parameters
----------
loader : HiddenStateLoader
layer : int
Layer index (-1 = final layer).
Returns
-------
X : (n_levels, n_features) feature matrix.
feature_names : list of feature name strings.
"""
levels = sorted(loader.unique_levels.tolist())
X = np.zeros((len(levels), self.n_features))
for i, level in enumerate(levels):
cloud = loader.get_level_cloud(level, layer=layer)
X[i] = self.extract_single(cloud)
return X, self.feature_names
[docs]
def extract_per_problem(
self,
loader,
layer: int = -1,
) -> tuple[np.ndarray, np.ndarray]:
"""Extract features per problem using token trajectories.
Each problem's token trajectory (T_i, d) is treated as a point cloud.
Parameters
----------
loader : HiddenStateLoader
layer : int
Not used for token trajectories (included for API consistency).
Returns
-------
X : (n_problems, n_features) feature matrix.
levels : (n_problems,) difficulty levels.
"""
n = loader.n_problems
X = np.zeros((n, self.n_features))
for i in range(n):
traj = loader.token_trajectories[i]
if traj is not None and len(traj) >= 3:
X[i] = self.extract_single(traj)
return X, loader.levels.copy()