"""ConeDetector: directed projection geometry via depth-stratified topology.
Core idea: if a source attractor C projects through receivers at increasing
depth (C -> A3 -> A5), the joint state-space cross-section topology should
change systematically with depth along the projection axis. This progressive
topological enrichment IS the cone.
Key methods:
- estimate_projection_axis(): conditional-mean PCA of receiver cloud
conditioned on source state quantiles
- slice_at_depth(): extract cross-section point clouds at each depth bin
- availability_profile(): Betti numbers as function of depth (core output)
- coupling_influence_subspace(): CCA-based low-dimensional projection
- depth_asymmetry(): binding score difference [C;A5] vs [C;A3]
"""
from __future__ import annotations
import numpy as np
from sklearn.cross_decomposition import CCA
from att.embedding.joint import JointEmbedder
from att.embedding.takens import TakensEmbedder
from att.topology.persistence import PersistenceAnalyzer
[docs]
class ConeDetector:
"""Detect conical projection geometry in directed attractor networks.
Parameters
----------
n_depth_bins : int
Number of bins along the projection axis for depth slicing.
Start with 5 for statistical power (~3200+ pts/bin at 80k steps).
max_dim : int
Maximum homology dimension for persistence computation.
n_quantiles : int
Number of source-state quantiles for axis estimation.
cca_components : int
Number of CCA dimensions for coupling-influence subspace.
"""
[docs]
def __init__(
self,
n_depth_bins: int = 5,
max_dim: int = 1,
n_quantiles: int = 20,
cca_components: int = 3,
) -> None:
self.n_depth_bins = n_depth_bins
self.max_dim = max_dim
self.n_quantiles = n_quantiles
self.cca_components = cca_components
# Fitted state (populated by fit())
self._source_embedded: np.ndarray | None = None
self._receiver_cloud: np.ndarray | None = None
self._projection_axis: np.ndarray | None = None
self._depth_projections: np.ndarray | None = None
self._cca_subspace: np.ndarray | None = None
self._availability: dict | None = None
[docs]
def fit(
self,
source_ts: np.ndarray,
receiver_channels: list[np.ndarray],
source_embedder: JointEmbedder | None = None,
receiver_embedder: JointEmbedder | None = None,
) -> "ConeDetector":
"""Fit the cone detector on source + receiver time series.
Parameters
----------
source_ts : 1D array
Source node time series (e.g., C's x-component).
receiver_channels : list of 1D arrays
Receiver node time series (e.g., [A3_x, B3_x, A5_x, B5_x]).
source_embedder : optional pre-configured embedder for source
receiver_embedder : optional pre-configured embedder for receivers
Returns self for fluent API.
"""
# Embed source (single channel)
src_emb = source_embedder or TakensEmbedder(delay="auto", dimension="auto")
self._source_embedded = src_emb.fit_transform(source_ts)
# Joint-embed all receiver channels
rcv_emb = receiver_embedder or JointEmbedder(delays="auto", dimensions="auto")
self._receiver_cloud = rcv_emb.fit_transform(receiver_channels)
# Tail-align (both embedders trim from the start)
n = min(len(self._source_embedded), len(self._receiver_cloud))
self._source_embedded = self._source_embedded[-n:]
self._receiver_cloud = self._receiver_cloud[-n:]
# Estimate projection axis and project
self._projection_axis = self.estimate_projection_axis()
self._depth_projections = self._receiver_cloud @ self._projection_axis
# Compute coupling-influence subspace
self._cca_subspace = self.coupling_influence_subspace()
return self
[docs]
def estimate_projection_axis(self) -> np.ndarray:
"""Estimate the cone's projection axis via conditional-mean PCA.
For each quantile of the source's state, compute the conditional
mean of the receiver joint cloud. The first PC of those conditional
means is the projection axis — the direction along which the source's
variation maximally structures the receiver cloud.
Returns: unit vector in receiver embedding space.
"""
source_vals = self._source_embedded[:, 0]
edges = np.quantile(source_vals, np.linspace(0, 1, self.n_quantiles + 1))
# Conditional means of receiver cloud per source quantile bin
conditional_means = []
for i in range(self.n_quantiles):
if i < self.n_quantiles - 1:
mask = (source_vals >= edges[i]) & (source_vals < edges[i + 1])
else:
mask = (source_vals >= edges[i]) & (source_vals <= edges[i + 1])
if mask.sum() > 0:
conditional_means.append(self._receiver_cloud[mask].mean(axis=0))
# Fallback: PCA of full cloud if too few conditional means
if len(conditional_means) < 3:
centered = self._receiver_cloud - self._receiver_cloud.mean(axis=0)
_, _, Vt = np.linalg.svd(centered, full_matrices=False)
return Vt[0] / np.linalg.norm(Vt[0])
cond_matrix = np.array(conditional_means)
cond_centered = cond_matrix - cond_matrix.mean(axis=0)
_, _, Vt = np.linalg.svd(cond_centered, full_matrices=False)
axis = Vt[0]
return axis / np.linalg.norm(axis)
[docs]
def slice_at_depth(self, depth_bin: int) -> np.ndarray:
"""Extract the cross-section point cloud at a given depth bin.
Parameters
----------
depth_bin : int in [0, n_depth_bins)
Returns: (n_points_in_bin, embedding_dim) array
"""
if depth_bin < 0 or depth_bin >= self.n_depth_bins:
raise ValueError(
f"depth_bin must be in [0, {self.n_depth_bins}), got {depth_bin}"
)
edges = np.quantile(
self._depth_projections,
np.linspace(0, 1, self.n_depth_bins + 1),
)
lo, hi = edges[depth_bin], edges[depth_bin + 1]
if depth_bin < self.n_depth_bins - 1:
mask = (self._depth_projections >= lo) & (self._depth_projections < hi)
else:
mask = (self._depth_projections >= lo) & (self._depth_projections <= hi)
return self._receiver_cloud[mask]
[docs]
def availability_profile(
self,
subspace: str = "full",
subsample: int | None = 2000,
) -> dict:
"""Compute the availability profile: Betti numbers vs depth.
This is the core output — the shape of the cone expressed as
topology-vs-depth.
Parameters
----------
subspace : "full" for full Takens embedding, "cca" for
coupling-influence subspace
subsample : max points per depth bin for persistence computation
Returns
-------
dict with:
'depths': array of bin centers
'betti_0': array of Betti_0 per bin
'betti_1': array of Betti_1 per bin
'persistence_entropy': list of entropies per bin
'diagrams': list of persistence diagrams per bin
'is_monotonic': bool — does Betti_1 increase with depth?
'trend_slope': float — linear regression slope of Betti_1 vs depth
"""
edges = np.quantile(
self._depth_projections,
np.linspace(0, 1, self.n_depth_bins + 1),
)
centers = (edges[:-1] + edges[1:]) / 2
betti_0 = np.zeros(self.n_depth_bins, dtype=int)
betti_1 = np.zeros(self.n_depth_bins, dtype=int)
all_entropy = []
all_diagrams = []
rng = np.random.default_rng(42)
for i in range(self.n_depth_bins):
# Select points in this depth bin
lo, hi = edges[i], edges[i + 1]
if i < self.n_depth_bins - 1:
mask = (self._depth_projections >= lo) & (
self._depth_projections < hi
)
else:
mask = (self._depth_projections >= lo) & (
self._depth_projections <= hi
)
if subspace == "cca" and self._cca_subspace is not None:
cloud = self._cca_subspace[mask]
else:
cloud = self._receiver_cloud[mask]
# Subsample if needed
if subsample is not None and len(cloud) > subsample:
idx = rng.choice(len(cloud), size=subsample, replace=False)
cloud = cloud[idx]
# Compute persistent homology
pa = PersistenceAnalyzer(max_dim=self.max_dim, backend="ripser")
result = pa.fit_transform(cloud)
diagrams = result["diagrams"]
all_diagrams.append(diagrams)
all_entropy.append(result["persistence_entropy"])
# Count Betti numbers (features with persistence > 10% of max)
for dim in range(min(2, len(diagrams))):
dgm = diagrams[dim]
if len(dgm) == 0:
continue
persistence = dgm[:, 1] - dgm[:, 0]
threshold = 0.1 * persistence.max() if persistence.max() > 0 else 0
count = int((persistence > threshold).sum())
if dim == 0:
betti_0[i] = count
elif dim == 1:
betti_1[i] = count
# Trend analysis
if self.n_depth_bins >= 2:
trend_slope = float(np.polyfit(centers, betti_1, 1)[0])
else:
trend_slope = 0.0
is_monotonic = all(
betti_1[j + 1] >= betti_1[j] for j in range(len(betti_1) - 1)
)
self._availability = {
"depths": centers,
"betti_0": betti_0,
"betti_1": betti_1,
"persistence_entropy": all_entropy,
"diagrams": all_diagrams,
"is_monotonic": bool(is_monotonic),
"trend_slope": trend_slope,
}
return self._availability
[docs]
def coupling_influence_subspace(self) -> np.ndarray:
"""Estimate the low-dim subspace where source maximally predicts receivers.
Uses Canonical Correlation Analysis (CCA) between the embedded
source state and the receiver joint cloud.
Returns: (n_points, cca_components) projected receiver cloud
"""
n_components = min(
self.cca_components,
self._source_embedded.shape[1],
self._receiver_cloud.shape[1],
)
cca = CCA(n_components=n_components)
cca.fit(self._source_embedded, self._receiver_cloud)
_, receiver_cca = cca.transform(
self._source_embedded, self._receiver_cloud
)
return receiver_cca
[docs]
def depth_asymmetry(
self,
source_ts: np.ndarray,
shallow_ts: np.ndarray,
deep_ts: np.ndarray,
subsample: int | None = None,
seed: int | None = None,
) -> dict:
"""Compute binding asymmetry between shallow and deep pairings.
Compares [source; shallow] binding vs [source; deep] binding.
A positive asymmetry (deep > shallow) indicates the cone opens
with depth.
This is a simpler measure than the full availability profile,
using ATT's existing BindingDetector directly.
Parameters
----------
source_ts : 1D source time series (C's x)
shallow_ts : 1D shallow receiver time series (A3's x)
deep_ts : 1D deep receiver time series (A5's x)
subsample : max points for persistence computation (saves memory)
seed : random seed for subsampling
Returns
-------
dict with:
'shallow_binding': float
'deep_binding': float
'asymmetry': float (deep - shallow)
'ratio': float (deep / shallow, if shallow > 0)
"""
from att.binding import BindingDetector
bd_shallow = BindingDetector(max_dim=self.max_dim)
bd_shallow.fit(source_ts, shallow_ts, subsample=subsample, seed=seed)
score_shallow = bd_shallow.binding_score()
bd_deep = BindingDetector(max_dim=self.max_dim)
bd_deep.fit(source_ts, deep_ts, subsample=subsample, seed=seed)
score_deep = bd_deep.binding_score()
return {
"shallow_binding": score_shallow,
"deep_binding": score_deep,
"asymmetry": score_deep - score_shallow,
"ratio": score_deep / max(score_shallow, 1e-10),
}
[docs]
def full_chain_emergence(
self,
source_ts: np.ndarray,
shallow_ts: np.ndarray,
deep_ts: np.ndarray,
subsample: int | None = None,
seed: int | None = None,
) -> dict:
"""Test whether the full chain [C; A3; A5] has emergent topology.
Compares the 3-way joint binding against the max of all pairwise
bindings. Emergent topology = 3-way > max(pairwise).
Parameters
----------
subsample : max points for persistence computation (saves memory)
seed : random seed for subsampling
Returns
-------
dict with:
'pairwise_bindings': dict of pair -> score
'full_chain_binding': float
'max_pairwise': float
'emergence': float (full_chain - max_pairwise)
'has_emergence': bool
"""
from att.binding import BindingDetector
# Pairwise bindings
pairs = {
"source_shallow": (source_ts, shallow_ts),
"source_deep": (source_ts, deep_ts),
"shallow_deep": (shallow_ts, deep_ts),
}
pairwise_bindings = {}
for name, (a, b) in pairs.items():
bd = BindingDetector(max_dim=self.max_dim)
bd.fit(a, b, subsample=subsample, seed=seed)
pairwise_bindings[name] = bd.binding_score()
max_pairwise = max(pairwise_bindings.values())
# 3-way joint: embed all three, compute PI subtraction vs marginals
embedder = JointEmbedder(delays="auto", dimensions="auto")
joint_cloud = embedder.fit_transform([source_ts, shallow_ts, deep_ts])
marginal_clouds = embedder.transform_marginals(
[source_ts, shallow_ts, deep_ts]
)
# Compute PH for joint and each marginal
joint_pa = PersistenceAnalyzer(max_dim=self.max_dim)
joint_pa.fit_transform(joint_cloud, subsample=subsample, seed=seed)
marginal_pas = []
for mc in marginal_clouds:
pa = PersistenceAnalyzer(max_dim=self.max_dim)
pa.fit_transform(mc, subsample=subsample, seed=seed)
marginal_pas.append(pa)
# Shared ranges for persistence images
all_diagrams = [joint_pa.diagrams_] + [mp.diagrams_ for mp in marginal_pas]
birth_min, birth_max = float("inf"), float("-inf")
pers_max = 0.0
for diags in all_diagrams:
for dgm in diags:
if len(dgm) > 0:
birth_min = min(birth_min, dgm[:, 0].min())
birth_max = max(birth_max, dgm[:, 0].max())
pers_max = max(pers_max, (dgm[:, 1] - dgm[:, 0]).max())
if birth_min == float("inf"):
birth_min, birth_max, pers_max = 0.0, 1.0, 1.0
img_kwargs = {
"birth_range": (birth_min, birth_max),
"persistence_range": (0.0, pers_max),
}
joint_images = joint_pa.to_image(**img_kwargs)
marginal_image_sets = [mp.to_image(**img_kwargs) for mp in marginal_pas]
# Baseline: element-wise max across all marginal images
full_chain_score = 0.0
for dim in range(len(joint_images)):
baseline = np.maximum.reduce(
[mis[dim] for mis in marginal_image_sets]
)
residual = joint_images[dim] - baseline
full_chain_score += float(np.sum(np.maximum(residual, 0)))
emergence = full_chain_score - max_pairwise
return {
"pairwise_bindings": pairwise_bindings,
"full_chain_binding": full_chain_score,
"max_pairwise": max_pairwise,
"emergence": emergence,
"has_emergence": emergence > 0,
}