|
|
""" |
|
|
glyph_mapper.py |
|
|
|
|
|
Core implementation of the Glyph Mapper module for the glyphs framework. |
|
|
This module transforms attribution traces, residue patterns, and attention |
|
|
flows into symbolic glyph representations that visualize latent spaces. |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import time |
|
|
import numpy as np |
|
|
from typing import Dict, List, Optional, Tuple, Union, Any, Set |
|
|
from dataclasses import dataclass, field |
|
|
import json |
|
|
import hashlib |
|
|
from pathlib import Path |
|
|
from enum import Enum |
|
|
import networkx as nx |
|
|
import matplotlib.pyplot as plt |
|
|
from scipy.spatial import distance |
|
|
from sklearn.manifold import TSNE |
|
|
from sklearn.cluster import DBSCAN |
|
|
|
|
|
from ..models.adapter import ModelAdapter |
|
|
from ..attribution.tracer import AttributionMap, AttributionType, AttributionLink |
|
|
from ..residue.patterns import ResiduePattern, ResidueRegistry |
|
|
from ..utils.visualization_utils import VisualizationEngine |
|
|
|
|
|
|
|
|
logger = logging.getLogger("glyphs.glyph_mapper") |
|
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
|
|
|
class GlyphType(Enum): |
|
|
"""Types of glyphs for different interpretability functions.""" |
|
|
ATTRIBUTION = "attribution" |
|
|
ATTENTION = "attention" |
|
|
RESIDUE = "residue" |
|
|
SALIENCE = "salience" |
|
|
COLLAPSE = "collapse" |
|
|
RECURSIVE = "recursive" |
|
|
META = "meta" |
|
|
SENTINEL = "sentinel" |
|
|
|
|
|
|
|
|
class GlyphSemantic(Enum): |
|
|
"""Semantic dimensions captured by glyphs.""" |
|
|
STRENGTH = "strength" |
|
|
DIRECTION = "direction" |
|
|
STABILITY = "stability" |
|
|
COMPLEXITY = "complexity" |
|
|
RECURSION = "recursion" |
|
|
CERTAINTY = "certainty" |
|
|
TEMPORAL = "temporal" |
|
|
EMERGENCE = "emergence" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Glyph: |
|
|
"""A symbolic representation of a pattern in transformer cognition.""" |
|
|
id: str |
|
|
symbol: str |
|
|
type: GlyphType |
|
|
semantics: List[GlyphSemantic] |
|
|
position: Tuple[float, float] |
|
|
size: float |
|
|
color: str |
|
|
opacity: float |
|
|
source_elements: List[Any] = field(default_factory=list) |
|
|
description: Optional[str] = None |
|
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class GlyphConnection: |
|
|
"""A connection between glyphs in a glyph map.""" |
|
|
source_id: str |
|
|
target_id: str |
|
|
strength: float |
|
|
type: str |
|
|
directed: bool |
|
|
color: str |
|
|
width: float |
|
|
opacity: float |
|
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class GlyphMap: |
|
|
"""A complete map of glyphs representing transformer cognition.""" |
|
|
id: str |
|
|
glyphs: List[Glyph] |
|
|
connections: List[GlyphConnection] |
|
|
source_type: str |
|
|
layout_type: str |
|
|
dimensions: Tuple[int, int] |
|
|
scale: float |
|
|
focal_points: List[str] = field(default_factory=list) |
|
|
regions: Dict[str, List[str]] = field(default_factory=dict) |
|
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
|
|
|
class GlyphRegistry: |
|
|
"""Registry of available glyphs and their semantics.""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize the glyph registry.""" |
|
|
|
|
|
self.attribution_glyphs = { |
|
|
"direct_strong": { |
|
|
"symbol": "π", |
|
|
"semantics": [GlyphSemantic.STRENGTH, GlyphSemantic.CERTAINTY], |
|
|
"description": "Strong direct attribution" |
|
|
}, |
|
|
"direct_medium": { |
|
|
"symbol": "π", |
|
|
"semantics": [GlyphSemantic.STRENGTH, GlyphSemantic.CERTAINTY], |
|
|
"description": "Medium direct attribution" |
|
|
}, |
|
|
"direct_weak": { |
|
|
"symbol": "π§©", |
|
|
"semantics": [GlyphSemantic.STRENGTH, GlyphSemantic.CERTAINTY], |
|
|
"description": "Weak direct attribution" |
|
|
}, |
|
|
"indirect": { |
|
|
"symbol": "β€", |
|
|
"semantics": [GlyphSemantic.DIRECTION, GlyphSemantic.COMPLEXITY], |
|
|
"description": "Indirect attribution" |
|
|
}, |
|
|
"composite": { |
|
|
"symbol": "β¬₯", |
|
|
"semantics": [GlyphSemantic.COMPLEXITY, GlyphSemantic.EMERGENCE], |
|
|
"description": "Composite attribution" |
|
|
}, |
|
|
"fork": { |
|
|
"symbol": "π", |
|
|
"semantics": [GlyphSemantic.DIRECTION, GlyphSemantic.COMPLEXITY], |
|
|
"description": "Attribution fork" |
|
|
}, |
|
|
"loop": { |
|
|
"symbol": "π", |
|
|
"semantics": [GlyphSemantic.RECURSION, GlyphSemantic.COMPLEXITY], |
|
|
"description": "Attribution loop" |
|
|
}, |
|
|
"gap": { |
|
|
"symbol": "β", |
|
|
"semantics": [GlyphSemantic.CERTAINTY, GlyphSemantic.STABILITY], |
|
|
"description": "Attribution gap" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
self.attention_glyphs = { |
|
|
"focus": { |
|
|
"symbol": "π―", |
|
|
"semantics": [GlyphSemantic.STRENGTH, GlyphSemantic.CERTAINTY], |
|
|
"description": "Attention focus point" |
|
|
}, |
|
|
"diffuse": { |
|
|
"symbol": "π«οΈ", |
|
|
"semantics": [GlyphSemantic.STRENGTH, GlyphSemantic.CERTAINTY], |
|
|
"description": "Diffuse attention" |
|
|
}, |
|
|
"induction": { |
|
|
"symbol": "π", |
|
|
"semantics": [GlyphSemantic.TEMPORAL, GlyphSemantic.DIRECTION], |
|
|
"description": "Induction head pattern" |
|
|
}, |
|
|
"inhibition": { |
|
|
"symbol": "π", |
|
|
"semantics": [GlyphSemantic.DIRECTION, GlyphSemantic.STRENGTH], |
|
|
"description": "Attention inhibition" |
|
|
}, |
|
|
"multi_head": { |
|
|
"symbol": "β", |
|
|
"semantics": [GlyphSemantic.COMPLEXITY, GlyphSemantic.EMERGENCE], |
|
|
"description": "Multi-head attention pattern" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
self.residue_glyphs = { |
|
|
"memory_decay": { |
|
|
"symbol": "π", |
|
|
"semantics": [GlyphSemantic.TEMPORAL, GlyphSemantic.STABILITY], |
|
|
"description": "Memory decay residue" |
|
|
}, |
|
|
"value_conflict": { |
|
|
"symbol": "β‘", |
|
|
"semantics": [GlyphSemantic.STABILITY, GlyphSemantic.CERTAINTY], |
|
|
"description": "Value conflict residue" |
|
|
}, |
|
|
"ghost_activation": { |
|
|
"symbol": "π»", |
|
|
"semantics": [GlyphSemantic.STRENGTH, GlyphSemantic.CERTAINTY], |
|
|
"description": "Ghost activation residue" |
|
|
}, |
|
|
"boundary_hesitation": { |
|
|
"symbol": "β§", |
|
|
"semantics": [GlyphSemantic.CERTAINTY, GlyphSemantic.STABILITY], |
|
|
"description": "Boundary hesitation residue" |
|
|
}, |
|
|
"null_output": { |
|
|
"symbol": "β", |
|
|
"semantics": [GlyphSemantic.CERTAINTY, GlyphSemantic.STABILITY], |
|
|
"description": "Null output residue" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
self.recursive_glyphs = { |
|
|
"recursive_aegis": { |
|
|
"symbol": "π", |
|
|
"semantics": [GlyphSemantic.RECURSION, GlyphSemantic.STABILITY], |
|
|
"description": "Recursive immunity" |
|
|
}, |
|
|
"recursive_seed": { |
|
|
"symbol": "β΄", |
|
|
"semantics": [GlyphSemantic.RECURSION, GlyphSemantic.EMERGENCE], |
|
|
"description": "Recursion initiation" |
|
|
}, |
|
|
"recursive_exchange": { |
|
|
"symbol": "β", |
|
|
"semantics": [GlyphSemantic.RECURSION, GlyphSemantic.DIRECTION], |
|
|
"description": "Bidirectional recursion" |
|
|
}, |
|
|
"recursive_mirror": { |
|
|
"symbol": "π", |
|
|
"semantics": [GlyphSemantic.RECURSION, GlyphSemantic.EMERGENCE], |
|
|
"description": "Recursive reflection" |
|
|
}, |
|
|
"recursive_anchor": { |
|
|
"symbol": "β", |
|
|
"semantics": [GlyphSemantic.RECURSION, GlyphSemantic.STABILITY], |
|
|
"description": "Stable recursive reference" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
self.meta_glyphs = { |
|
|
"uncertainty": { |
|
|
"symbol": "β", |
|
|
"semantics": [GlyphSemantic.CERTAINTY], |
|
|
"description": "Uncertainty marker" |
|
|
}, |
|
|
"emergence": { |
|
|
"symbol": "β§", |
|
|
"semantics": [GlyphSemantic.EMERGENCE, GlyphSemantic.COMPLEXITY], |
|
|
"description": "Emergent pattern marker" |
|
|
}, |
|
|
"collapse_point": { |
|
|
"symbol": "π₯", |
|
|
"semantics": [GlyphSemantic.STABILITY, GlyphSemantic.CERTAINTY], |
|
|
"description": "Collapse point marker" |
|
|
}, |
|
|
"temporal_marker": { |
|
|
"symbol": "β§", |
|
|
"semantics": [GlyphSemantic.TEMPORAL], |
|
|
"description": "Temporal sequence marker" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
self.sentinel_glyphs = { |
|
|
"start": { |
|
|
"symbol": "β", |
|
|
"semantics": [GlyphSemantic.DIRECTION], |
|
|
"description": "Start marker" |
|
|
}, |
|
|
"end": { |
|
|
"symbol": "β―", |
|
|
"semantics": [GlyphSemantic.DIRECTION], |
|
|
"description": "End marker" |
|
|
}, |
|
|
"boundary": { |
|
|
"symbol": "β¬", |
|
|
"semantics": [GlyphSemantic.STABILITY], |
|
|
"description": "Boundary marker" |
|
|
}, |
|
|
"reference": { |
|
|
"symbol": "β±", |
|
|
"semantics": [GlyphSemantic.DIRECTION], |
|
|
"description": "Reference marker" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
self.all_glyphs = { |
|
|
**{f"attribution_{k}": v for k, v in self.attribution_glyphs.items()}, |
|
|
**{f"attention_{k}": v for k, v in self.attention_glyphs.items()}, |
|
|
**{f"residue_{k}": v for k, v in self.residue_glyphs.items()}, |
|
|
**{f"recursive_{k}": v for k, v in self.recursive_glyphs.items()}, |
|
|
**{f"meta_{k}": v for k, v in self.meta_glyphs.items()}, |
|
|
**{f"sentinel_{k}": v for k, v in self.sentinel_glyphs.items()} |
|
|
} |
|
|
|
|
|
def get_glyph(self, glyph_id: str) -> Dict[str, Any]: |
|
|
"""Get a glyph by ID.""" |
|
|
if glyph_id in self.all_glyphs: |
|
|
return self.all_glyphs[glyph_id] |
|
|
else: |
|
|
raise ValueError(f"Unknown glyph ID: {glyph_id}") |
|
|
|
|
|
def find_glyphs_by_semantic(self, semantic: GlyphSemantic) -> List[str]: |
|
|
"""Find glyphs that have a specific semantic dimension.""" |
|
|
return [ |
|
|
glyph_id for glyph_id, glyph in self.all_glyphs.items() |
|
|
if semantic in glyph.get("semantics", []) |
|
|
] |
|
|
|
|
|
def find_glyphs_by_type(self, glyph_type: str) -> List[str]: |
|
|
"""Find glyphs of a specific type.""" |
|
|
return [ |
|
|
glyph_id for glyph_id in self.all_glyphs.keys() |
|
|
if glyph_id.startswith(f"{glyph_type}_") |
|
|
] |
|
|
|
|
|
|
|
|
class GlyphMapper: |
|
|
""" |
|
|
Core glyph mapping system for the glyphs framework. |
|
|
|
|
|
This class transforms attribution traces, residue patterns, and attention |
|
|
flows into symbolic glyph representations that visualize latent spaces. |
|
|
It serves as the bridge between raw interpretability data and meaningful |
|
|
symbolic visualization. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
config: Optional[Dict[str, Any]] = None, |
|
|
visualizer: Optional[VisualizationEngine] = None |
|
|
): |
|
|
""" |
|
|
Initialize the glyph mapper. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
config : Optional[Dict[str, Any]] |
|
|
Configuration parameters for the mapper |
|
|
visualizer : Optional[VisualizationEngine] |
|
|
Visualization engine for glyph visualization |
|
|
""" |
|
|
self.config = config or {} |
|
|
self.visualizer = visualizer |
|
|
|
|
|
|
|
|
self.min_connection_strength = self.config.get("min_connection_strength", 0.1) |
|
|
self.auto_layout = self.config.get("auto_layout", True) |
|
|
self.default_layout = self.config.get("default_layout", "force_directed") |
|
|
self.default_dimensions = self.config.get("default_dimensions", (800, 600)) |
|
|
self.default_scale = self.config.get("default_scale", 1.0) |
|
|
self.connection_bundling = self.config.get("connection_bundling", True) |
|
|
self.color_scheme = self.config.get("color_scheme", "semantic") |
|
|
|
|
|
|
|
|
self.registry = GlyphRegistry() |
|
|
|
|
|
|
|
|
self.glyph_map_history = [] |
|
|
|
|
|
logger.info("Glyph mapper initialized") |
|
|
|
|
|
def map_attribution( |
|
|
self, |
|
|
attribution_map: AttributionMap, |
|
|
layout_type: Optional[str] = None, |
|
|
dimensions: Optional[Tuple[int, int]] = None, |
|
|
scale: Optional[float] = None, |
|
|
include_tokens: bool = True, |
|
|
focus_on: Optional[List[str]] = None |
|
|
) -> GlyphMap: |
|
|
""" |
|
|
Map attribution patterns to glyphs. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
attribution_map : AttributionMap |
|
|
Attribution map to visualize |
|
|
layout_type : Optional[str] |
|
|
Type of layout to use |
|
|
dimensions : Optional[Tuple[int, int]] |
|
|
Dimensions for visualization |
|
|
scale : Optional[float] |
|
|
Scale factor for visualization |
|
|
include_tokens : bool |
|
|
Whether to include tokens as sentinel glyphs |
|
|
focus_on : Optional[List[str]] |
|
|
Tokens to focus visualization on |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
GlyphMap |
|
|
Glyph map representation of attribution |
|
|
""" |
|
|
map_start = time.time() |
|
|
layout_type = layout_type or self.default_layout |
|
|
dimensions = dimensions or self.default_dimensions |
|
|
scale = scale or self.default_scale |
|
|
|
|
|
logger.info(f"Mapping attribution to glyphs with {layout_type} layout") |
|
|
|
|
|
|
|
|
map_id = f"attribution_{int(time.time())}_{hashlib.md5(str(attribution_map).encode()).hexdigest()[:8]}" |
|
|
|
|
|
|
|
|
glyph_map = GlyphMap( |
|
|
id=map_id, |
|
|
glyphs=[], |
|
|
connections=[], |
|
|
source_type="attribution", |
|
|
layout_type=layout_type, |
|
|
dimensions=dimensions, |
|
|
scale=scale, |
|
|
metadata={ |
|
|
"attribution_map_id": attribution_map.metadata.get("id", "unknown"), |
|
|
"prompt": attribution_map.metadata.get("prompt", ""), |
|
|
"output": attribution_map.metadata.get("output", ""), |
|
|
"model_id": attribution_map.metadata.get("model_id", "unknown"), |
|
|
"timestamp": time.time() |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
token_glyph_ids = {} |
|
|
if include_tokens: |
|
|
|
|
|
for i, token in enumerate(attribution_map.prompt_tokens): |
|
|
glyph_id = f"token_prompt_{i}" |
|
|
glyph = Glyph( |
|
|
id=glyph_id, |
|
|
symbol="β", |
|
|
type=GlyphType.SENTINEL, |
|
|
semantics=[GlyphSemantic.DIRECTION], |
|
|
position=(0, i * 20), |
|
|
size=5.0, |
|
|
color="#3498db", |
|
|
opacity=0.8, |
|
|
source_elements=[token], |
|
|
description=f"Prompt token: '{token}'", |
|
|
metadata={"token_index": i, "token_type": "prompt"} |
|
|
) |
|
|
glyph_map.glyphs.append(glyph) |
|
|
token_glyph_ids[f"prompt_{i}"] = glyph_id |
|
|
|
|
|
|
|
|
for i, token in enumerate(attribution_map.output_tokens): |
|
|
glyph_id = f"token_output_{i}" |
|
|
glyph = Glyph( |
|
|
id=glyph_id, |
|
|
symbol="β", |
|
|
type=GlyphType.SENTINEL, |
|
|
semantics=[GlyphSemantic.DIRECTION], |
|
|
position=(100, i * 20), |
|
|
size=5.0, |
|
|
color="#e74c3c", |
|
|
opacity=0.8, |
|
|
source_elements=[token], |
|
|
description=f"Output token: '{token}'", |
|
|
metadata={"token_index": i, "token_type": "output"} |
|
|
) |
|
|
glyph_map.glyphs.append(glyph) |
|
|
token_glyph_ids[f"output_{i}"] = glyph_id |
|
|
|
|
|
|
|
|
link_glyphs = {} |
|
|
for link_idx, link in enumerate(attribution_map.links): |
|
|
|
|
|
if link.strength < self.min_connection_strength: |
|
|
continue |
|
|
|
|
|
|
|
|
if link.attribution_type == AttributionType.DIRECT: |
|
|
if link.strength > 0.7: |
|
|
glyph_type = "attribution_direct_strong" |
|
|
elif link.strength > 0.4: |
|
|
glyph_type = "attribution_direct_medium" |
|
|
else: |
|
|
glyph_type = "attribution_direct_weak" |
|
|
elif link.attribution_type == AttributionType.INDIRECT: |
|
|
glyph_type = "attribution_indirect" |
|
|
elif link.attribution_type == AttributionType.COMPOSITE: |
|
|
glyph_type = "attribution_composite" |
|
|
elif link.attribution_type == AttributionType.RECURSIVE: |
|
|
glyph_type = "recursive_recursive_exchange" |
|
|
else: |
|
|
glyph_type = "attribution_direct_weak" |
|
|
|
|
|
|
|
|
glyph_info = self.registry.get_glyph(glyph_type) |
|
|
|
|
|
|
|
|
glyph_id = f"link_{link_idx}" |
|
|
glyph = Glyph( |
|
|
id=glyph_id, |
|
|
symbol=glyph_info["symbol"], |
|
|
type=GlyphType.ATTRIBUTION, |
|
|
semantics=glyph_info["semantics"], |
|
|
position=(50, (link.source_idx + link.target_idx) * 10), |
|
|
size=10.0 * link.strength, |
|
|
color=self._get_color_for_attribution(link), |
|
|
opacity=min(1.0, 0.5 + link.strength / 2), |
|
|
source_elements=[link], |
|
|
description=glyph_info["description"], |
|
|
metadata={ |
|
|
"link_index": link_idx, |
|
|
"source_index": link.source_idx, |
|
|
"target_index": link.target_idx, |
|
|
"attribution_type": str(link.attribution_type), |
|
|
"strength": link.strength |
|
|
} |
|
|
) |
|
|
glyph_map.glyphs.append(glyph) |
|
|
link_glyphs[link_idx] = glyph_id |
|
|
|
|
|
|
|
|
if include_tokens: |
|
|
source_glyph_id = token_glyph_ids.get(f"prompt_{link.source_idx}") |
|
|
target_glyph_id = token_glyph_ids.get(f"output_{link.target_idx}") |
|
|
|
|
|
if source_glyph_id and target_glyph_id: |
|
|
|
|
|
glyph_map.connections.append(GlyphConnection( |
|
|
source_id=source_glyph_id, |
|
|
target_id=glyph_id, |
|
|
strength=link.strength, |
|
|
type="attribution_flow", |
|
|
directed=True, |
|
|
color="#7f8c8d", |
|
|
width=1.0 + 2.0 * link.strength, |
|
|
opacity=0.7 * link.strength |
|
|
)) |
|
|
|
|
|
|
|
|
glyph_map.connections.append(GlyphConnection( |
|
|
source_id=glyph_id, |
|
|
target_id=target_glyph_id, |
|
|
strength=link.strength, |
|
|
type="attribution_flow", |
|
|
directed=True, |
|
|
color="#7f8c8d", |
|
|
width=1.0 + 2.0 * link.strength, |
|
|
opacity=0.7 * link.strength |
|
|
)) |
|
|
|
|
|
|
|
|
for gap_idx, (start_idx, end_idx) in enumerate(attribution_map.attribution_gaps): |
|
|
|
|
|
glyph_info = self.registry.get_glyph("attribution_gap") |
|
|
glyph_id = f"gap_{gap_idx}" |
|
|
glyph = Glyph( |
|
|
id=glyph_id, |
|
|
symbol=glyph_info["symbol"], |
|
|
type=GlyphType.ATTRIBUTION, |
|
|
semantics=glyph_info["semantics"], |
|
|
position=(50, (start_idx + end_idx) * 10), |
|
|
size=8.0, |
|
|
color="#e67e22", |
|
|
opacity=0.8, |
|
|
source_elements=[(start_idx, end_idx)], |
|
|
description=f"Attribution gap between indices {start_idx} and {end_idx}", |
|
|
metadata={ |
|
|
"gap_index": gap_idx, |
|
|
"start_index": start_idx, |
|
|
"end_index": end_idx |
|
|
} |
|
|
) |
|
|
glyph_map.glyphs.append(glyph) |
|
|
|
|
|
|
|
|
if include_tokens: |
|
|
source_glyph_id = token_glyph_ids.get(f"prompt_{start_idx}") |
|
|
target_glyph_id = token_glyph_ids.get(f"output_{end_idx}") |
|
|
|
|
|
if source_glyph_id and target_glyph_id: |
|
|
|
|
|
glyph_map.connections.append(GlyphConnection( |
|
|
source_id=source_glyph_id, |
|
|
target_id=glyph_id, |
|
|
strength=0.5, |
|
|
type="attribution_gap", |
|
|
directed=True, |
|
|
color="#e67e22", |
|
|
width=1.5, |
|
|
opacity=0.6 |
|
|
)) |
|
|
|
|
|
|
|
|
glyph_map.connections.append(GlyphConnection( |
|
|
source_id=glyph_id, |
|
|
target_id=target_glyph_id, |
|
|
strength=0.5, |
|
|
type="attribution_gap", |
|
|
directed=True, |
|
|
color="#e67e22", |
|
|
width=1.5, |
|
|
opacity=0.6 |
|
|
)) |
|
|
|
|
|
|
|
|
for collapse_idx, (start_idx, end_idx) in enumerate(attribution_map.collapsed_regions): |
|
|
|
|
|
glyph_info = self.registry.get_glyph("meta_collapse_point") |
|
|
glyph_id = f"collapse_{collapse_idx}" |
|
|
glyph = Glyph( |
|
|
id=glyph_id, |
|
|
symbol=glyph_info["symbol"], |
|
|
type=GlyphType.COLLAPSE, |
|
|
semantics=glyph_info["semantics"], |
|
|
position=(50, (start_idx + end_idx) * 10), |
|
|
size=12.0, |
|
|
color="#9b59b6", |
|
|
opacity=0.9, |
|
|
source_elements=[(start_idx, end_idx)], |
|
|
description=f"Attribution collapse between indices {start_idx} and {end_idx}", |
|
|
metadata={ |
|
|
"collapse_index": collapse_idx, |
|
|
"start_index": start_idx, |
|
|
"end_index": end_idx |
|
|
} |
|
|
) |
|
|
glyph_map.glyphs.append(glyph) |
|
|
|
|
|
|
|
|
if include_tokens: |
|
|
|
|
|
for i in range(start_idx, end_idx + 1): |
|
|
token_glyph_id = token_glyph_ids.get(f"output_{i}") |
|
|
if token_glyph_id: |
|
|
glyph_map.connections.append(GlyphConnection( |
|
|
source_id=glyph_id, |
|
|
target_id=token_glyph_id, |
|
|
strength=0.7, |
|
|
type="collapse_effect", |
|
|
directed=True, |
|
|
color="#9b59b6", |
|
|
width=1.5, |
|
|
opacity=0.7 |
|
|
)) |
|
|
|
|
|
|
|
|
if attribution_map.token_salience: |
|
|
|
|
|
salient_tokens = sorted( |
|
|
attribution_map.token_salience.items(), |
|
|
key=lambda x: x[1], |
|
|
reverse=True |
|
|
)[:5] |
|
|
|
|
|
for token_idx, salience in salient_tokens: |
|
|
|
|
|
if salience > 0.5: |
|
|
token_glyph_id = token_glyph_ids.get(f"output_{token_idx}") |
|
|
if token_glyph_id: |
|
|
glyph_map.focal_points.append(token_glyph_id) |
|
|
|
|
|
|
|
|
if salience > 0.8: |
|
|
glyph_info = self.registry.get_glyph("attention_focus") |
|
|
glyph_id = f"salience_{token_idx}" |
|
|
glyph = Glyph( |
|
|
id=glyph_id, |
|
|
symbol=glyph_info["symbol"], |
|
|
type=GlyphType.SALIENCE, |
|
|
semantics=glyph_info["semantics"], |
|
|
position=(120, token_idx * 20), |
|
|
size=10.0 * salience, |
|
|
color="#f1c40f", |
|
|
opacity=salience, |
|
|
source_elements=[token_idx], |
|
|
description=f"High salience token at index {token_idx}", |
|
|
metadata={ |
|
|
"token_index": token_idx, |
|
|
"salience": salience |
|
|
} |
|
|
) |
|
|
glyph_map.glyphs.append(glyph) |
|
|
|
|
|
|
|
|
glyph_map.connections.append(GlyphConnection( |
|
|
source_id=glyph_id, |
|
|
target_id=token_glyph_id, |
|
|
strength=salience, |
|
|
type="salience_marker", |
|
|
directed=False, |
|
|
color="#f1c40f", |
|
|
width=2.0 * salience, |
|
|
opacity=0.8 |
|
|
)) |
|
|
|
|
|
|
|
|
if self.auto_layout: |
|
|
glyph_map = self._apply_layout(glyph_map, layout_type) |
|
|
|
|
|
|
|
|
if focus_on: |
|
|
glyph_map = self._apply_focus(glyph_map, focus_on) |
|
|
|
|
|
|
|
|
map_time = time.time() - map_start |
|
|
glyph_map.metadata["map_time"] = map_time |
|
|
|
|
|
|
|
|
self.glyph_map_history.append(glyph_map) |
|
|
|
|
|
logger.info(f"Attribution mapping completed in {map_time:.2f}s") |
|
|
return glyph_map |
|
|
|
|
|
def map_residue_patterns( |
|
|
self, |
|
|
residue_patterns: List[ResiduePattern], |
|
|
layout_type: Optional[str] = None, |
|
|
dimensions: Optional[Tuple[int, int]] = None, |
|
|
scale: Optional[float] = None, |
|
|
cluster_patterns: bool = True |
|
|
) -> |
|
|
def map_residue_patterns( |
|
|
self, |
|
|
residue_patterns: List[ResiduePattern], |
|
|
layout_type: Optional[str] = None, |
|
|
dimensions: Optional[Tuple[int, int]] = None, |
|
|
scale: Optional[float] = None, |
|
|
cluster_patterns: bool = True |
|
|
) -> GlyphMap: |
|
|
""" |
|
|
Map residue patterns to glyphs. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
residue_patterns : List[ResiduePattern] |
|
|
Residue patterns to visualize |
|
|
layout_type : Optional[str] |
|
|
Type of layout to use |
|
|
dimensions : Optional[Tuple[int, int]] |
|
|
Dimensions for visualization |
|
|
scale : Optional[float] |
|
|
Scale factor for visualization |
|
|
cluster_patterns : bool |
|
|
Whether to cluster similar patterns |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
GlyphMap |
|
|
Glyph map representation of residue patterns |
|
|
""" |
|
|
map_start = time.time() |
|
|
layout_type = layout_type or self.default_layout |
|
|
dimensions = dimensions or self.default_dimensions |
|
|
scale = scale or self.default_scale |
|
|
|
|
|
logger.info(f"Mapping {len(residue_patterns)} residue patterns to glyphs") |
|
|
|
|
|
|
|
|
map_id = f"residue_{int(time.time())}_{hashlib.md5(str(residue_patterns).encode()).hexdigest()[:8]}" |
|
|
|
|
|
|
|
|
glyph_map = GlyphMap( |
|
|
id=map_id, |
|
|
glyphs=[], |
|
|
connections=[], |
|
|
source_type="residue", |
|
|
layout_type=layout_type, |
|
|
dimensions=dimensions, |
|
|
scale=scale, |
|
|
metadata={ |
|
|
"num_patterns": len(residue_patterns), |
|
|
"pattern_types": [p.type for p in residue_patterns], |
|
|
"timestamp": time.time() |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
pattern_by_type = {} |
|
|
for pattern in residue_patterns: |
|
|
if pattern.type not in pattern_by_type: |
|
|
pattern_by_type[pattern.type] = [] |
|
|
pattern_by_type[pattern.type].append(pattern) |
|
|
|
|
|
|
|
|
type_glyphs = {} |
|
|
for i, (pattern_type, patterns) in enumerate(pattern_by_type.items()): |
|
|
|
|
|
if pattern_type == "memory_decay": |
|
|
glyph_type = "residue_memory_decay" |
|
|
elif pattern_type == "value_conflict": |
|
|
glyph_type = "residue_value_conflict" |
|
|
elif pattern_type == "ghost_activation": |
|
|
glyph_type = "residue_ghost_activation" |
|
|
elif pattern_type == "boundary_hesitation": |
|
|
glyph_type = "residue_boundary_hesitation" |
|
|
elif pattern_type == "null_output": |
|
|
glyph_type = "residue_null_output" |
|
|
else: |
|
|
|
|
|
glyph_type = "residue_null_output" |
|
|
|
|
|
|
|
|
glyph_info = self.registry.get_glyph(glyph_type) |
|
|
|
|
|
|
|
|
glyph_id = f"type_{pattern_type}" |
|
|
glyph = Glyph( |
|
|
id=glyph_id, |
|
|
symbol=glyph_info["symbol"], |
|
|
type=GlyphType.RESIDUE, |
|
|
semantics=glyph_info["semantics"], |
|
|
position=(dimensions[0] / 2, 100 + i * 150), |
|
|
size=20.0, |
|
|
color=self._get_color_for_residue_type(pattern_type), |
|
|
opacity=1.0, |
|
|
source_elements=patterns, |
|
|
description=f"{pattern_type.replace('_', ' ').title()} Residue Pattern", |
|
|
metadata={ |
|
|
"pattern_type": pattern_type, |
|
|
"pattern_count": len(patterns), |
|
|
"average_confidence": sum(p.confidence for p in patterns) / len(patterns) |
|
|
} |
|
|
) |
|
|
glyph_map.glyphs.append(glyph) |
|
|
type_glyphs[pattern_type] = glyph_id |
|
|
|
|
|
|
|
|
glyph_map.regions[pattern_type] = [glyph_id] |
|
|
|
|
|
|
|
|
instance_glyphs = {} |
|
|
for pattern_idx, pattern in enumerate(residue_patterns): |
|
|
|
|
|
parent_glyph_id = type_glyphs.get(pattern.type) |
|
|
if not parent_glyph_id: |
|
|
continue |
|
|
|
|
|
|
|
|
if pattern.confidence > 0.8: |
|
|
size_factor = 1.2 |
|
|
opacity = 0.9 |
|
|
elif pattern.confidence > 0.5: |
|
|
size_factor = 1.0 |
|
|
opacity = 0.7 |
|
|
else: |
|
|
size_factor = 0.8 |
|
|
opacity = 0.5 |
|
|
|
|
|
|
|
|
glyph_type = f"residue_{pattern.type}" |
|
|
glyph_info = self.registry.get_glyph(glyph_type) |
|
|
|
|
|
|
|
|
glyph_id = f"pattern_{pattern_idx}" |
|
|
glyph = Glyph( |
|
|
id=glyph_id, |
|
|
symbol=glyph_info["symbol"], |
|
|
type=GlyphType.RESIDUE, |
|
|
semantics=glyph_info["semantics"], |
|
|
position=(0, 0), |
|
|
size=12.0 * size_factor, |
|
|
color=self._get_color_for_residue_type(pattern.type), |
|
|
opacity=opacity, |
|
|
source_elements=[pattern], |
|
|
description=f"{pattern.type.replace('_', ' ').title()} Pattern: {pattern.signature[:20]}", |
|
|
metadata={ |
|
|
"pattern_type": pattern.type, |
|
|
"confidence": pattern.confidence, |
|
|
"signature": pattern.signature, |
|
|
"context": pattern.context |
|
|
} |
|
|
) |
|
|
glyph_map.glyphs.append(glyph) |
|
|
instance_glyphs[pattern_idx] = glyph_id |
|
|
|
|
|
|
|
|
if pattern.type in glyph_map.regions: |
|
|
glyph_map.regions[pattern.type].append(glyph_id) |
|
|
|
|
|
|
|
|
glyph_map.connections.append(GlyphConnection( |
|
|
source_id=parent_glyph_id, |
|
|
target_id=glyph_id, |
|
|
strength=pattern.confidence, |
|
|
type="type_instance", |
|
|
directed=True, |
|
|
color=self._get_color_for_residue_type(pattern.type), |
|
|
width=1.0 + pattern.confidence, |
|
|
opacity=0.6 * pattern.confidence |
|
|
)) |
|
|
|
|
|
|
|
|
if cluster_patterns and len(residue_patterns) > 1: |
|
|
|
|
|
similarity_matrix = np.zeros((len(residue_patterns), len(residue_patterns))) |
|
|
|
|
|
for i, pattern1 in enumerate(residue_patterns): |
|
|
for j, pattern2 in enumerate(residue_patterns): |
|
|
if i == j: |
|
|
similarity_matrix[i, j] = 1.0 |
|
|
else: |
|
|
|
|
|
signature_sim = self._calculate_signature_similarity( |
|
|
pattern1.signature, pattern2.signature |
|
|
) |
|
|
context_sim = self._calculate_context_similarity( |
|
|
pattern1.context, pattern2.context |
|
|
) |
|
|
|
|
|
similarity_matrix[i, j] = 0.7 * signature_sim + 0.3 * context_sim |
|
|
|
|
|
|
|
|
for i in range(len(residue_patterns)): |
|
|
for j in range(i + 1, len(residue_patterns)): |
|
|
similarity = similarity_matrix[i, j] |
|
|
if similarity > 0.6: |
|
|
source_id = instance_glyphs.get(i) |
|
|
target_id = instance_glyphs.get(j) |
|
|
if source_id and target_id: |
|
|
glyph_map.connections.append(GlyphConnection( |
|
|
source_id=source_id, |
|
|
target_id=target_id, |
|
|
strength=similarity, |
|
|
type="pattern_similarity", |
|
|
directed=False, |
|
|
color="#2ecc71", |
|
|
width=1.0 + 2.0 * similarity, |
|
|
opacity=0.5 * similarity |
|
|
)) |
|
|
|
|
|
|
|
|
if layout_type == "similarity": |
|
|
|
|
|
tsne = TSNE(n_components=2, perplexity=min(5, len(residue_patterns) - 1)) |
|
|
positions = tsne.fit_transform(similarity_matrix) |
|
|
|
|
|
|
|
|
positions = self._scale_positions(positions, dimensions) |
|
|
|
|
|
|
|
|
for i, pattern_idx in enumerate(instance_glyphs.keys()): |
|
|
glyph_id = instance_glyphs[pattern_idx] |
|
|
for glyph in glyph_map.glyphs: |
|
|
if glyph.id == glyph_id: |
|
|
glyph.position = (positions[i, 0], positions[i, 1]) |
|
|
|
|
|
|
|
|
if self.auto_layout and layout_type != "similarity": |
|
|
glyph_map = self._apply_layout(glyph_map, layout_type) |
|
|
|
|
|
|
|
|
for pattern_idx, pattern in enumerate(residue_patterns): |
|
|
|
|
|
if pattern.confidence < 0.7: |
|
|
continue |
|
|
|
|
|
glyph_id = instance_glyphs.get(pattern_idx) |
|
|
if not glyph_id: |
|
|
continue |
|
|
|
|
|
|
|
|
ref_glyph_id = f"ref_{pattern_idx}" |
|
|
ref_glyph = Glyph( |
|
|
id=ref_glyph_id, |
|
|
symbol="β±", |
|
|
type=GlyphType.SENTINEL, |
|
|
semantics=[GlyphSemantic.DIRECTION], |
|
|
position=(0, 0), |
|
|
size=6.0, |
|
|
color="#3498db", |
|
|
opacity=0.7, |
|
|
source_elements=[pattern.context], |
|
|
description=f"Context reference for pattern {pattern_idx}", |
|
|
metadata={ |
|
|
"pattern_index": pattern_idx, |
|
|
"reference_type": "context" |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
for glyph in glyph_map.glyphs: |
|
|
if glyph.id == glyph_id: |
|
|
x, y = glyph.position |
|
|
ref_glyph.position = (x + 20, y - 20) |
|
|
|
|
|
glyph_map.glyphs.append(ref_glyph) |
|
|
|
|
|
|
|
|
glyph_map.connections.append(GlyphConnection( |
|
|
source_id=ref_glyph_id, |
|
|
target_id=glyph_id, |
|
|
strength=0.7, |
|
|
type="context_reference", |
|
|
directed=True, |
|
|
color="#3498db", |
|
|
width=1.0, |
|
|
opacity=0.5 |
|
|
)) |
|
|
|
|
|
|
|
|
map_time = time.time() - map_start |
|
|
glyph_map.metadata["map_time"] = map_time |
|
|
|
|
|
|
|
|
self.glyph_map_history.append(glyph_map) |
|
|
|
|
|
logger.info(f"Residue pattern mapping completed in {map_time:.2f}s") |
|
|
return glyph_map |
|
|
|
|
|
def map_attention_heads( |
|
|
self, |
|
|
attention_data: Dict[str, Any], |
|
|
layout_type: Optional[str] = None, |
|
|
dimensions: Optional[Tuple[int, int]] = None, |
|
|
scale: Optional[float] = None, |
|
|
include_tokens: bool = True |
|
|
) -> GlyphMap: |
|
|
""" |
|
|
Map attention head patterns to glyphs. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
attention_data : Dict[str, Any] |
|
|
Attention head data to visualize |
|
|
layout_type : Optional[str] |
|
|
Type of layout to use |
|
|
dimensions : Optional[Tuple[int, int]] |
|
|
Dimensions for visualization |
|
|
scale : Optional[float] |
|
|
Scale factor for visualization |
|
|
include_tokens : bool |
|
|
Whether to include tokens as sentinel glyphs |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
GlyphMap |
|
|
Glyph map representation of attention head patterns |
|
|
""" |
|
|
map_start = time.time() |
|
|
layout_type = layout_type or self.default_layout |
|
|
dimensions = dimensions or self.default_dimensions |
|
|
scale = scale or self.default_scale |
|
|
|
|
|
logger.info("Mapping attention head patterns to glyphs") |
|
|
|
|
|
|
|
|
prompt_tokens = attention_data.get("prompt_tokens", []) |
|
|
output_tokens = attention_data.get("output_tokens", []) |
|
|
attention_heads = attention_data.get("attention_heads", []) |
|
|
head_patterns = attention_data.get("head_patterns", {}) |
|
|
|
|
|
|
|
|
map_id = f"attention_{int(time.time())}_{hashlib.md5(str(attention_heads).encode()).hexdigest()[:8]}" |
|
|
|
|
|
|
|
|
glyph_map = GlyphMap( |
|
|
id=map_id, |
|
|
glyphs=[], |
|
|
connections=[], |
|
|
source_type="attention", |
|
|
layout_type=layout_type, |
|
|
dimensions=dimensions, |
|
|
scale=scale, |
|
|
metadata={ |
|
|
"num_heads": len(attention_heads), |
|
|
"model_id": attention_data.get("metadata", {}).get("model_id", "unknown"), |
|
|
"timestamp": time.time() |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
token_glyph_ids = {} |
|
|
if include_tokens: |
|
|
|
|
|
for i, token in enumerate(prompt_tokens): |
|
|
glyph_id = f"token_prompt_{i}" |
|
|
glyph = Glyph( |
|
|
id=glyph_id, |
|
|
symbol="β", |
|
|
type=GlyphType.SENTINEL, |
|
|
semantics=[GlyphSemantic.DIRECTION], |
|
|
position=(0, i * 20), |
|
|
size=5.0, |
|
|
color="#3498db", |
|
|
opacity=0.8, |
|
|
source_elements=[token], |
|
|
description=f"Prompt token: '{token}'", |
|
|
metadata={"token_index": i, "token_type": "prompt"} |
|
|
) |
|
|
glyph_map.glyphs.append(glyph) |
|
|
token_glyph_ids[f"prompt_{i}"] = glyph_id |
|
|
|
|
|
|
|
|
for i, token in enumerate(output_tokens): |
|
|
glyph_id = f"token_output_{i}" |
|
|
glyph = Glyph( |
|
|
id=glyph_id, |
|
|
symbol="β", |
|
|
type=GlyphType.SENTINEL, |
|
|
semantics=[GlyphSemantic.DIRECTION], |
|
|
position=(100, i * 20), |
|
|
size=5.0, |
|
|
color="#e74c3c", |
|
|
opacity=0.8, |
|
|
source_elements=[token], |
|
|
description=f"Output token: '{token}'", |
|
|
metadata={"token_index": i, "token_type": "output"} |
|
|
) |
|
|
glyph_map.glyphs.append(glyph) |
|
|
token_glyph_ids[f"output_{i}"] = glyph_id |
|
|
|
|
|
|
|
|
head_glyphs = {} |
|
|
for head_idx, head in enumerate(attention_heads): |
|
|
|
|
|
if head.pattern_type == "induction": |
|
|
glyph_type = "attention_induction" |
|
|
elif head.pattern_type == "focus": |
|
|
glyph_type = "attention_focus" |
|
|
elif head.pattern_type == "diffuse": |
|
|
glyph_type = "attention_diffuse" |
|
|
elif head.pattern_type == "inhibition": |
|
|
glyph_type = "attention_inhibition" |
|
|
else: |
|
|
glyph_type = "attention_multi_head" |
|
|
|
|
|
|
|
|
glyph_info = self.registry.get_glyph(glyph_type) |
|
|
|
|
|
|
|
|
glyph_id = f"head_{head_idx}" |
|
|
glyph = Glyph( |
|
|
id=glyph_id, |
|
|
symbol=glyph_info["symbol"], |
|
|
type=GlyphType.ATTENTION, |
|
|
semantics=glyph_info["semantics"], |
|
|
position=(50, head.layer * 40 + head.head * 10), |
|
|
size=8.0 + 5.0 * head.strength, |
|
|
color=self._get_color_for_layer(head.layer), |
|
|
opacity=min(1.0, 0.5 + head.strength / 2), |
|
|
source_elements=[head], |
|
|
description=f"Attention head {head.layer}.{head.head}: {head.pattern_type}", |
|
|
metadata={ |
|
|
"head_index": head_idx, |
|
|
"layer": head.layer, |
|
|
"head": head.head, |
|
|
"pattern_type": head.pattern_type, |
|
|
"strength": head.strength, |
|
|
"function": head.function, |
|
|
"attribution_role": head.attribution_role |
|
|
} |
|
|
) |
|
|
glyph_map.glyphs.append(glyph) |
|
|
head_glyphs[head_idx] = glyph_id |
|
|
|
|
|
|
|
|
if include_tokens and head.focus_tokens: |
|
|
for token_idx in head.focus_tokens: |
|
|
token_type = "prompt" if token_idx < len(prompt_tokens) else "output" |
|
|
adjusted_idx = token_idx if token_type == "prompt" else token_idx - len(prompt_tokens) |
|
|
token_glyph_id = token_glyph_ids.get(f"{token_type}_{adjusted_idx}") |
|
|
if token_glyph_id: |
|
|
glyph_map.connections.append(GlyphConnection( |
|
|
source_id=glyph_id, |
|
|
target_id=token_glyph_id, |
|
|
strength=head.strength, |
|
|
type="attention_focus", |
|
|
directed=True, |
|
|
color=self._get_color_for_layer(head.layer, alpha=0.6), |
|
|
width=1.0 + 2.0 * head.strength, |
|
|
opacity=0.6 * head.strength |
|
|
)) |
|
|
|
|
|
|
|
|
for pattern_name, related_heads in head_patterns.items(): |
|
|
|
|
|
if len(related_heads) < 2: |
|
|
continue |
|
|
|
|
|
|
|
|
glyph_type = "meta_emergence" if "emergent" in pattern_name else "recursive_recursive_exchange" |
|
|
glyph_info = self.registry.get_glyph(glyph_type) |
|
|
|
|
|
pattern_glyph_id = f"pattern_{pattern_name}" |
|
|
pattern_glyph = Glyph( |
|
|
id=pattern_glyph_id, |
|
|
symbol=glyph_info["symbol"], |
|
|
type=GlyphType.META if "emergent" in pattern_name else GlyphType.RECURSIVE, |
|
|
semantics=glyph_info["semantics"], |
|
|
position=(100, 100), |
|
|
size=12.0, |
|
|
color="#f1c40f", |
|
|
opacity=0.9, |
|
|
source_elements=[pattern_name, related_heads], |
|
|
description=f"Attention pattern: {pattern_name}", |
|
|
metadata={ |
|
|
"pattern_name": pattern_name, |
|
|
"related_heads": related_heads |
|
|
} |
|
|
) |
|
|
glyph_map.glyphs.append(pattern_glyph) |
|
|
|
|
|
|
|
|
for head_idx in related_heads: |
|
|
head_glyph_id = head_glyphs.get(head_idx) |
|
|
if head_glyph_id: |
|
|
glyph_map.connections.append(GlyphConnection( |
|
|
source_id=pattern_glyph_id, |
|
|
target_id=head_glyph_id, |
|
|
strength=0.8, |
|
|
type="pattern_membership", |
|
|
directed=True, |
|
|
color="#f1c40f", |
|
|
width=1.5, |
|
|
opacity=0.7 |
|
|
)) |
|
|
|
|
|
|
|
|
if pattern_name not in glyph_map.regions: |
|
|
glyph_map.regions[pattern_name] = [] |
|
|
glyph_map.regions[pattern_name].append(pattern_glyph_id) |
|
|
for head_idx in related_heads: |
|
|
head_glyph_id = head_glyphs.get(head_idx) |
|
|
if head_glyph_id: |
|
|
glyph_map.regions[pattern_name].append(head_glyph_id) |
|
|
|
|
|
|
|
|
layers = set(head.layer for head in attention_heads) |
|
|
for layer in layers: |
|
|
|
|
|
layer_heads = [ |
|
|
head_idx for head_idx, head in enumerate(attention_heads) |
|
|
if head.layer == layer |
|
|
] |
|
|
|
|
|
|
|
|
layer_region = f"layer_{layer}" |
|
|
glyph_map.regions[layer_region] = [ |
|
|
head_glyphs[head_idx] for head_idx in layer_heads |
|
|
if head_idx in head_glyphs |
|
|
] |
|
|
|
|
|
|
|
|
if self.auto_layout: |
|
|
glyph_map = self._apply_layout(glyph_map, layout_type) |
|
|
|
|
|
|
|
|
map_time = time.time() - map_start |
|
|
glyph_map.metadata["map_time"] = map_time |
|
|
|
|
|
|
|
|
self.glyph_map_history.append(glyph_map) |
|
|
|
|
|
logger.info(f"Attention head mapping completed in {map_time:.2f}s") |
|
|
return glyph_map |
|
|
|
|
|
def map_recursive_trace( |
|
|
self, |
|
|
trace_data: Dict[str, Any], |
|
|
layout_type: Optional[str] = None, |
|
|
dimensions: Optional[Tuple[int, int]] = None, |
|
|
scale: Optional[float] = None, |
|
|
depth_limit: Optional[int] = None |
|
|
) -> GlyphMap: |
|
|
""" |
|
|
Map recursive trace data to glyphs. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
trace_data : Dict[str, Any] |
|
|
Recursive trace data to visualize |
|
|
layout_type : Optional[str] |
|
|
Type of layout to use |
|
|
dimensions : Optional[Tuple[int, int]] |
|
|
Dimensions for visualization |
|
|
scale : Optional[float] |
|
|
Scale factor for visualization |
|
|
depth_limit : Optional[int] |
|
|
Maximum recursion depth to visualize |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
GlyphMap |
|
|
Glyph map representation of recursive trace |
|
|
""" |
|
|
map_start = time.time() |
|
|
layout_type = layout_type or self.default_layout |
|
|
dimensions = dimensions or self.default_dimensions |
|
|
scale = scale or self.default_scale |
|
|
|
|
|
logger.info("Mapping recursive trace to glyphs") |
|
|
|
|
|
|
|
|
trace_operations = trace_data.get("operations", []) |
|
|
trace_result = trace_data.get("result", {}) |
|
|
trace_depth = trace_data.get("depth", 0) |
|
|
|
|
|
|
|
|
if depth_limit is not None: |
|
|
trace_depth = min(trace_depth, depth_limit) |
|
|
|
|
|
trace_operations = [ |
|
|
op for op in trace_operations |
|
|
if op.get("depth", 0) <= depth_limit |
|
|
] |
|
|
|
|
|
|
|
|
map_id = f"recursive_{int(time.time())}_{hashlib.md5(str(trace_data).encode()).hexdigest()[:8]}" |
|
|
|
|
|
|
|
|
glyph_map = GlyphMap( |
|
|
id=map_id, |
|
|
glyphs=[], |
|
|
connections=[], |
|
|
source_type="recursive", |
|
|
layout_type=layout_type, |
|
|
dimensions=dimensions, |
|
|
scale=scale, |
|
|
metadata={ |
|
|
"trace_command": trace_data.get("command", "unknown"), |
|
|
"trace_target": trace_data.get("target", "unknown"), |
|
|
"trace_depth": trace_depth, |
|
|
"timestamp": time.time() |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
seed_glyph = Glyph( |
|
|
id="seed", |
|
|
symbol="β΄", |
|
|
type=GlyphType.RECURSIVE, |
|
|
semantics=[GlyphSemantic.RECURSION, GlyphSemantic.EMERGENCE], |
|
|
position=(dimensions[0] / 2, 50), |
|
|
size=15.0, |
|
|
color="#9b59b6", |
|
|
opacity=1.0, |
|
|
source_elements=[trace_data.get("command", "")], |
|
|
description=f"Recursive seed: {trace_data.get('command', 'unknown')}", |
|
|
metadata={ |
|
|
"command": trace_data.get("command", ""), |
|
|
"target": trace_data.get("target", ""), |
|
|
"depth": trace_depth |
|
|
} |
|
|
) |
|
|
glyph_map.glyphs.append(seed_glyph) |
|
|
glyph_map.focal_points.append("seed") |
|
|
|
|
|
|
|
|
depth_glyphs = {"0": "seed"} |
|
|
for op_idx, operation in enumerate(trace_operations): |
|
|
op_depth = operation.get("depth", 0) |
|
|
op_type = operation.get("type", "unknown") |
|
|
|
|
|
|
|
|
if "reflect" in op_type: |
|
|
glyph_type = "recursive_recursive_mirror" |
|
|
elif "collapse" in op_type: |
|
|
glyph_type = "meta_collapse_point" |
|
|
elif "fork" in op_type: |
|
|
glyph_type = "attribution_fork" |
|
|
elif "trace" in op_type: |
|
|
glyph_type = "recursive_recursive_exchange" |
|
|
else: |
|
|
glyph_type = "recursive_recursive_anchor" |
|
|
|
|
|
|
|
|
glyph_info = self.registry.get_glyph(glyph_type) |
|
|
|
|
|
|
|
|
glyph_id = f"op_{op_idx}" |
|
|
glyph = Glyph( |
|
|
id=glyph_id, |
|
|
symbol=glyph_info["symbol"], |
|
|
type=GlyphType.RECURSIVE, |
|
|
semantics=glyph_info["semantics"], |
|
|
position=(100 + op_depth * 80, 100 + op_idx * 30), |
|
|
size=10.0 - op_depth * 0.5, |
|
|
color=self._get_color_for_depth(op_depth), |
|
|
opacity=max(0.5, 1.0 - op_depth * 0.1), |
|
|
source_elements=[operation], |
|
|
description=f"Operation {op_type} at depth {op_depth}", |
|
|
metadata={ |
|
|
"operation_index": op_idx, |
|
|
"operation_type": op_type, |
|
|
"depth": op_depth, |
|
|
"parameters": operation.get("parameters", {}) |
|
|
} |
|
|
) |
|
|
glyph_map.glyphs.append(glyph) |
|
|
|
|
|
|
|
|
depth_key = str(op_depth) |
|
|
if depth_key not in depth_glyphs: |
|
|
depth_glyphs[depth_key] = [] |
|
|
if isinstance(depth_glyphs[depth_key], list): |
|
|
depth_glyphs[depth_key].append(glyph_id) |
|
|
else: |
|
|
depth_glyphs[depth_key] = [depth_glyphs[depth_key], glyph_id] |
|
|
|
|
|
|
|
|
parent_depth_key = str(op_depth - 1) |
|
|
if parent_depth_key in depth_glyphs: |
|
|
parent_glyphs = depth_glyphs[parent_depth_key] |
|
|
if isinstance(parent_glyphs, list): |
|
|
|
|
|
parent_indices = [ |
|
|
int(g.split('_')[1]) if g.startswith('op_') else 0 |
|
|
for g in parent_glyphs |
|
|
] |
|
|
closest_parent_idx = min(range(len(parent_indices)), key=lambda i: abs(parent_indices[i] - op_idx)) |
|
|
parent_glyph_id = parent_glyphs[closest_parent_idx] |
|
|
else: |
|
|
parent_glyph_id = parent_glyphs |
|
|
|
|
|
glyph_map.connections.append(GlyphConnection( |
|
|
source_id=parent_glyph_id, |
|
|
target_id=glyph_id, |
|
|
strength=0.8, |
|
|
type="recursive_descent", |
|
|
directed=True, |
|
|
color=self._get_color_for_depth(op_depth - 1, alpha=0.6), |
|
|
width=2.0 - op_depth * 0.2, |
|
|
opacity=max(0.4, 0.9 - op_depth * 0.1) |
|
|
)) |
|
|
|
|
|
|
|
|
if trace_result: |
|
|
result_type = trace_result.get("type", "unknown") |
|
|
|
|
|
|
|
|
if "success" in result_type: |
|
|
glyph_type = "recursive_recursive_aegis" |
|
|
elif "collapse" in result_type: |
|
|
glyph_type = "meta_collapse_point" |
|
|
elif "partial" in result_type: |
|
|
glyph_type = "residue_ghost_activation" |
|
|
else: |
|
|
glyph_type = "meta_uncertainty" |
|
|
|
|
|
|
|
|
glyph_info = self.registry.get_glyph(glyph_type) |
|
|
|
|
|
result_glyph_id = "result" |
|
|
result_glyph = Glyph( |
|
|
id=result_glyph_id, |
|
|
symbol=glyph_info["symbol"], |
|
|
type=GlyphType.RECURSIVE if "success" in result_type else GlyphType.META, |
|
|
semantics=glyph_info["semantics"], |
|
|
position=(dimensions[0] / 2, dimensions[1] - 80), |
|
|
size=15.0, |
|
|
color="#27ae60" if "success" in result_type else "#e74c3c", |
|
|
opacity=1.0, |
|
|
source_elements=[trace_result], |
|
|
description=f"Trace result: {result_type}", |
|
|
metadata={ |
|
|
"result_type": result_type, |
|
|
"result_data": trace_result.get("data", {}), |
|
|
"confidence": trace_result.get("confidence", 0.0) |
|
|
} |
|
|
) |
|
|
glyph_map.glyphs.append(result_glyph) |
|
|
|
|
|
|
|
|
max_depth = max(int(d) for d in depth_glyphs.keys()) |
|
|
deepest_glyphs = depth_glyphs[str(max_depth)] |
|
|
if isinstance(deepest_glyphs, list): |
|
|
for glyph_id in deepest_glyphs: |
|
|
glyph_map.connections.append(GlyphConnection( |
|
|
source_id=glyph_id, |
|
|
target_id=result_glyph_id, |
|
|
strength=0.9, |
|
|
type="recursion_result", |
|
|
directed=True, |
|
|
color="#27ae60" if "success" in result_type else "#e74c3c", |
|
|
width=1.5, |
|
|
opacity=0.8 |
|
|
)) |
|
|
else: |
|
|
glyph_map.connections.append(GlyphConnection( |
|
|
source_id=deepest_glyphs, |
|
|
target_id=result_glyph_id, |
|
|
strength=0.9, |
|
|
type="recursion_result", |
|
|
directed=True, |
|
|
color="#27ae60" if "success" in result_type else "#e74c3c", |
|
|
width=1.5, |
|
|
opacity=0.8 |
|
|
)) |
|
|
|
|
|
|
|
|
glyph_map.connections.append(GlyphConnection( |
|
|
source_id="seed", |
|
|
target_id=result_glyph_id, |
|
|
strength=1.0, |
|
|
type="recursion_completion", |
|
|
directed=True, |
|
|
color="#9b59b6", |
|
|
width=2.0, |
|
|
opacity=0.5 |
|
|
)) |
|
|
|
|
|
|
|
|
for depth in range(trace_depth + 1): |
|
|
depth_key = str(depth) |
|
|
if depth_key in depth_glyphs: |
|
|
depth_glyphs_list = depth_glyphs[depth_key] |
|
|
if not isinstance(depth_glyphs_list, list): |
|
|
depth_glyphs_list = [depth_glyphs_list] |
|
|
glyph_map.regions[f"depth_{depth}"] = depth_glyphs_list |
|
|
|
|
|
|
|
|
if self.auto_layout: |
|
|
|
|
|
if layout_type == self.default_layout and self.default_layout != "hierarchical": |
|
|
layout_type = "hierarchical" |
|
|
glyph_map = self._apply_layout(glyph_map, layout_type) |
|
|
|
|
|
|
|
|
map_time = time.time() - map_start |
|
|
glyph_map.metadata["map_time"] = map_time |
|
|
|
|
|
|
|
|
self.glyph_map_history.append(glyph_map) |
|
|
|
|
|
logger.info(f"Recursive trace mapping completed in {map_time:.2f}s") |
|
|
return glyph_map |
|
|
|
|
|
def combine_glyph_maps( |
|
|
self, |
|
|
glyph_maps: List[GlyphMap], |
|
|
layout_type: Optional[str] = None, |
|
|
dimensions: Optional[Tuple[int, int]] = None, |
|
|
scale: Optional[float] = None, |
|
|
connection_threshold: float = 0.5 |
|
|
) -> GlyphMap: |
|
|
""" |
|
|
Combine multiple glyph maps into a unified map. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
glyph_maps : List[GlyphMap] |
|
|
Glyph maps to combine |
|
|
layout_type : Optional[str] |
|
|
Type of layout to use |
|
|
dimensions : Optional[Tuple[int, int]] |
|
|
Dimensions for visualization |
|
|
scale : Optional[float] |
|
|
Scale factor for visualization |
|
|
connection_threshold : float |
|
|
Minimum strength for inter-map connections |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
GlyphMap |
|
|
Combined glyph map |
|
|
""" |
|
|
map_start = time.time() |
|
|
layout_type = layout_type or self.default_layout |
|
|
dimensions = dimensions or ( |
|
|
max(gm.dimensions[0] for gm in glyph_maps), |
|
|
max(gm.dimensions[1] for gm in glyph_maps) |
|
|
) |
|
|
scale = scale or self.default_scale |
|
|
|
|
|
logger.info(f"Combining {len(glyph_maps)} glyph maps") |
|
|
|
|
|
|
|
|
map_id = f"combined_{int(time.time())}_{hashlib.md5(str([gm.id for gm in glyph_maps]).encode()).hexdigest()[:8]}" |
|
|
|
|
|
|
|
|
combined_map = GlyphMap( |
|
|
id=map_id, |
|
|
glyphs=[], |
|
|
connections=[], |
|
|
source_type="combined", |
|
|
layout_type=layout_type, |
|
|
dimensions=dimensions, |
|
|
scale=scale, |
|
|
metadata={ |
|
|
"source_maps": [gm.id for gm in glyph_maps], |
|
|
"source_types": [gm.source_type for gm in glyph_maps], |
|
|
"timestamp": time.time() |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
id_mapping = {} |
|
|
for i, gm in enumerate(glyph_maps): |
|
|
prefix = f"map{i}_" |
|
|
for glyph in gm.glyphs: |
|
|
id_mapping[glyph.id] = prefix + glyph.id |
|
|
|
|
|
|
|
|
for i, gm in enumerate(glyph_maps): |
|
|
prefix = f"map{i}_" |
|
|
|
|
|
|
|
|
map_region = f"map_{i}" |
|
|
combined_map.regions[map_region] = [] |
|
|
|
|
|
|
|
|
for glyph in gm.glyphs: |
|
|
new_id = prefix + glyph.id |
|
|
new_glyph = Glyph( |
|
|
id=new_id, |
|
|
symbol=glyph.symbol, |
|
|
type=glyph.type, |
|
|
semantics=glyph.semantics, |
|
|
position=glyph.position, |
|
|
size=glyph.size, |
|
|
color=glyph.color, |
|
|
opacity=glyph.opacity, |
|
|
source_elements=glyph.source_elements, |
|
|
description=glyph.description, |
|
|
metadata={ |
|
|
**glyph.metadata, |
|
|
"original_id": glyph.id, |
|
|
"source_map": gm.id |
|
|
} |
|
|
) |
|
|
combined_map.glyphs.append(new_glyph) |
|
|
combined_map.regions[map_region].append(new_id) |
|
|
|
|
|
|
|
|
for conn in gm.connections: |
|
|
combined_map.connections.append(GlyphConnection( |
|
|
source_id=prefix + conn.source_id, |
|
|
target_id=prefix + conn.target_id, |
|
|
strength=conn.strength, |
|
|
type=conn.type, |
|
|
directed=conn.directed, |
|
|
color=conn.color, |
|
|
width=conn.width, |
|
|
opacity=conn.opacity, |
|
|
metadata={ |
|
|
**conn.metadata, |
|
|
"source_map": gm.id |
|
|
} |
|
|
)) |
|
|
|
|
|
|
|
|
for focal_point in gm.focal_points: |
|
|
combined_map.focal_points.append(prefix + focal_point) |
|
|
|
|
|
|
|
|
for i, gm1 in enumerate(glyph_maps): |
|
|
for j, gm2 in enumerate(glyph_maps): |
|
|
if i >= j: |
|
|
continue |
|
|
|
|
|
|
|
|
related_pairs = self._find_related_glyphs(gm1, gm2) |
|
|
|
|
|
|
|
|
for glyph1_id, glyph2_id, similarity in related_pairs: |
|
|
if similarity >= connection_threshold: |
|
|
prefixed_id1 = f"map{i}_{glyph1_id}" |
|
|
prefixed_id2 = f"map{j}_{glyph2_id}" |
|
|
|
|
|
combined_map.connections.append(GlyphConnection( |
|
|
source_id=prefixed_id1, |
|
|
target_id=prefixed_id2, |
|
|
strength=similarity, |
|
|
type="cross_map_relation", |
|
|
directed=False, |
|
|
color="#3498db", |
|
|
width=1.0 + similarity, |
|
|
opacity=0.6 * similarity, |
|
|
metadata={ |
|
|
"relation_type": "cross_map", |
|
|
"source_map": gm1.id, |
|
|
"target_map": gm2.id, |
|
|
"similarity": similarity |
|
|
} |
|
|
)) |
|
|
|
|
|
|
|
|
if self.auto_layout: |
|
|
combined_map = self._apply_layout(combined_map, layout_type) |
|
|
|
|
|
|
|
|
map_time = time.time() - map_start |
|
|
combined_map.metadata["map_time"] = map_time |
|
|
|
|
|
|
|
|
self.glyph_map_history.append(combined_map) |
|
|
|
|
|
logger.info(f"Glyph map combination completed in {map_time:.2f}s") |
|
|
return combined_map |
|
|
|
|
|
def visualize( |
|
|
self, |
|
|
glyph_map: GlyphMap, |
|
|
output_path: Optional[str] = None, |
|
|
interactive: bool = True |
|
|
) -> Any: |
|
|
""" |
|
|
Visualize a glyph map. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
glyph_map : GlyphMap |
|
|
Glyph map to visualize |
|
|
output_path : Optional[str] |
|
|
Path to save visualization to |
|
|
interactive : bool |
|
|
Whether to generate interactive visualization |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Any |
|
|
Visualization result |
|
|
""" |
|
|
if self.visualizer: |
|
|
return self.visualizer.visualize_glyph_map( |
|
|
glyph_map=glyph_map, |
|
|
output_path=output_path, |
|
|
interactive=interactive |
|
|
) |
|
|
else: |
|
|
|
|
|
return self._simple_visualization( |
|
|
glyph_map=glyph_map, |
|
|
output_path=output_path |
|
|
) |
|
|
|
|
|
def save_glyph_map( |
|
|
self, |
|
|
glyph_map: GlyphMap, |
|
|
output_path: str |
|
|
) -> str: |
|
|
""" |
|
|
Save a glyph map to a file. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
glyph_map : GlyphMap |
|
|
Glyph map to save |
|
|
output_path : str |
|
|
Path to save glyph map to |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
str |
|
|
Path to saved file |
|
|
""" |
|
|
|
|
|
serializable_map = { |
|
|
"id": glyph_map.id, |
|
|
"source_type": glyph_map.source_type, |
|
|
"layout_type": glyph_map.layout_type, |
|
|
"dimensions": glyph_map.dimensions, |
|
|
"scale": glyph_map.scale, |
|
|
"focal_points": glyph_map.focal_points, |
|
|
"regions": glyph_map.regions, |
|
|
"metadata": glyph_map.metadata, |
|
|
"glyphs": [ |
|
|
{ |
|
|
"id": g.id, |
|
|
"symbol": g.symbol, |
|
|
"type": g.type.value, |
|
|
"semantics": [s.value for s in g.semantics], |
|
|
"position": g.position, |
|
|
"size": g.size, |
|
|
"color": g.color, |
|
|
"opacity": g.opacity, |
|
|
"description": g.description, |
|
|
"metadata": g.metadata |
|
|
} |
|
|
for g in glyph_map.glyphs |
|
|
], |
|
|
"connections": [ |
|
|
{ |
|
|
"source_id": c.source_id, |
|
|
"target_id": c.target_id, |
|
|
"strength": c.strength, |
|
|
"type": c.type, |
|
|
"directed": c.directed, |
|
|
"color": c.color, |
|
|
"width": c.width, |
|
|
"opacity": c.opacity, |
|
|
"metadata": c.metadata |
|
|
} |
|
|
for c in glyph_map.connections |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
output_dir = Path(output_path).parent |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
with open(output_path, "w") as f: |
|
|
json.dump(serializable_map, f, indent=2) |
|
|
|
|
|
logger.info(f"Saved glyph map to {output_path}") |
|
|
return output_path |
|
|
|
|
|
def load_glyph_map( |
|
|
self, |
|
|
input_path: str |
|
|
) -> GlyphMap: |
|
|
""" |
|
|
Load a glyph map from a file. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
input_path : str |
|
|
Path to load glyph map from |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
GlyphMap |
|
|
Loaded glyph map |
|
|
""" |
|
|
|
|
|
with open(input_path, "r") as f: |
|
|
data = json.load(f) |
|
|
|
|
|
|
|
|
glyphs = [ |
|
|
Glyph( |
|
|
id=g["id"], |
|
|
symbol=g["symbol"], |
|
|
type=GlyphType(g["type"]), |
|
|
semantics=[GlyphSemantic(s) for s in g["semantics"]], |
|
|
position=tuple(g["position"]), |
|
|
size=g["size"], |
|
|
color=g["color"], |
|
|
opacity=g["opacity"], |
|
|
description=g.get("description"), |
|
|
metadata=g.get("metadata", {}) |
|
|
) |
|
|
for g in data["glyphs"] |
|
|
] |
|
|
|
|
|
connections = [ |
|
|
GlyphConnection( |
|
|
source_id=c["source_id"], |
|
|
target_id=c["target_id"], |
|
|
strength=c["strength"], |
|
|
type=c["type"], |
|
|
directed=c["directed"], |
|
|
color=c["color"], |
|
|
width=c["width"], |
|
|
opacity=c["opacity"], |
|
|
metadata=c.get("metadata", {}) |
|
|
) |
|
|
for c in data["connections"] |
|
|
] |
|
|
|
|
|
glyph_map = GlyphMap( |
|
|
id=data["id"], |
|
|
glyphs=glyphs, |
|
|
connections=connections, |
|
|
source_type=data["source_type"], |
|
|
layout_type=data["layout_type"], |
|
|
dimensions=tuple(data["dimensions"]), |
|
|
scale=data["scale"], |
|
|
focal_points=data.get("focal_points", []), |
|
|
regions=data.get("regions", {}), |
|
|
metadata=data.get("metadata", {}) |
|
|
) |
|
|
|
|
|
logger.info(f"Loaded glyph map from {input_path}") |
|
|
return glyph_map |
|
|
|
|
|
|
|
|
|
|
|
def _apply_layout( |
|
|
self, |
|
|
glyph_map: GlyphMap, |
|
|
layout_type: str |
|
|
) -> GlyphMap: |
|
|
"""Apply a layout to a glyph map.""" |
|
|
layout_start = time.time() |
|
|
|
|
|
if layout_type == "force_directed": |
|
|
glyph_map = self._apply_force_directed_layout(glyph_map) |
|
|
elif layout_type == "hierarchical": |
|
|
glyph_map = self._apply_hierarchical_layout(glyph_map) |
|
|
elif layout_type == "circular": |
|
|
glyph_map = self._apply_circular_layout(glyph_map) |
|
|
elif layout_type == "grid": |
|
|
glyph_map = self._apply_grid_layout(glyph_map) |
|
|
elif layout_type == "radial": |
|
|
glyph_map = self._apply_radial_layout(glyph_map) |
|
|
else: |
|
|
logger.warning(f"Unknown layout type: {layout_type}, using force_directed") |
|
|
glyph_map = self._apply_force_directed_layout(glyph_map) |
|
|
|
|
|
layout_time = time.time() - layout_start |
|
|
logger.info(f"Applied {layout_type} layout in {layout_time:.2f}s") |
|
|
|
|
|
return glyph_map |
|
|
|
|
|
def _apply_force_directed_layout(self, glyph_map: GlyphMap) -> GlyphMap: |
|
|
"""Apply force-directed layout to a glyph map.""" |
|
|
|
|
|
G = nx.Graph() |
|
|
|
|
|
|
|
|
for glyph in glyph_map.glyphs: |
|
|
G.add_node(glyph.id, size=glyph.size, type=glyph.type.value) |
|
|
|
|
|
|
|
|
for conn in glyph_map.connections: |
|
|
if conn.source_id in G and conn.target_id in G: |
|
|
G.add_edge( |
|
|
conn.source_id, |
|
|
conn.target_id, |
|
|
weight=conn.strength |
|
|
) |
|
|
|
|
|
|
|
|
width, height = glyph_map.dimensions |
|
|
pos = nx.spring_layout( |
|
|
G, |
|
|
k=0.2, |
|
|
iterations=100, |
|
|
seed=42 |
|
|
) |
|
|
|
|
|
|
|
|
pos_array = np.array(list(pos.values())) |
|
|
if len(pos_array) > 0: |
|
|
min_x, min_y = pos_array.min(axis=0) |
|
|
max_x, max_y = pos_array.max(axis=0) |
|
|
|
|
|
|
|
|
x_range = max_x - min_x |
|
|
y_range = max_y - min_y |
|
|
|
|
|
if x_range > 0: |
|
|
scale_x = (width * 0.8) / x_range |
|
|
else: |
|
|
scale_x = 1.0 |
|
|
|
|
|
if y_range > 0: |
|
|
scale_y = (height * 0.8) / y_range |
|
|
else: |
|
|
scale_y = 1.0 |
|
|
|
|
|
|
|
|
for node_id, (x, y) in pos.items(): |
|
|
x_scaled = ((x - min_x) * scale_x) + width * 0.1 |
|
|
y_scaled = ((y - min_y) * scale_y) + height * 0.1 |
|
|
pos[node_id] = (x_scaled, y_scaled) |
|
|
|
|
|
|
|
|
for glyph in glyph_map.glyphs: |
|
|
if glyph.id in pos: |
|
|
glyph.position = pos[glyph.id] |
|
|
|
|
|
return glyph_map |
|
|
|
|
|
def _apply_hierarchical_layout(self, glyph_map: GlyphMap) -> GlyphMap: |
|
|
"""Apply hierarchical layout to a glyph map.""" |
|
|
|
|
|
G = nx.DiGraph() |
|
|
|
|
|
|
|
|
for glyph in glyph_map.glyphs: |
|
|
G.add_node(glyph.id, size=glyph.size, type=glyph.type.value) |
|
|
|
|
|
|
|
|
for conn in glyph_map.connections: |
|
|
if conn.directed and conn.source_id in G and conn.target_id in G: |
|
|
G.add_edge( |
|
|
conn.source_id, |
|
|
conn.target_id, |
|
|
weight=conn.strength |
|
|
) |
|
|
|
|
|
|
|
|
root_nodes = [n for n in G.nodes() if G.in_degree(n) == 0] |
|
|
|
|
|
|
|
|
if not root_nodes: |
|
|
if glyph_map.focal_points: |
|
|
root_nodes = [fp for fp in glyph_map.focal_points if fp in G] |
|
|
else: |
|
|
root_nodes = [glyph_map.glyphs[0].id] if glyph_map.glyphs else [] |
|
|
|
|
|
|
|
|
width, height = glyph_map.dimensions |
|
|
|
|
|
|
|
|
if root_nodes: |
|
|
|
|
|
layers = {} |
|
|
visited = set() |
|
|
|
|
|
|
|
|
current_layer = root_nodes |
|
|
layer_idx = 0 |
|
|
|
|
|
while current_layer and layer_idx < 20: |
|
|
layers[layer_idx] = current_layer |
|
|
next_layer = [] |
|
|
|
|
|
for node in current_layer: |
|
|
visited.add(node) |
|
|
for _, neighbor in G.out_edges(node): |
|
|
if neighbor not in visited and neighbor not in next_layer: |
|
|
next_layer.append(neighbor) |
|
|
|
|
|
current_layer = next_layer |
|
|
layer_idx += 1 |
|
|
|
|
|
|
|
|
for layer_idx, nodes in layers.items(): |
|
|
y_pos = (layer_idx + 1) * (height / (len(layers) + 1)) |
|
|
x_step = width / (len(nodes) + 1) |
|
|
|
|
|
for i, node_id in enumerate(nodes): |
|
|
x_pos = (i + 1) * x_step |
|
|
|
|
|
for glyph in glyph_map.glyphs: |
|
|
if glyph.id == node_id: |
|
|
glyph.position = (x_pos, y_pos) |
|
|
break |
|
|
|
|
|
|
|
|
unvisited = [g.id for g in glyph_map.glyphs if g.id not in visited] |
|
|
if unvisited: |
|
|
y_pos = (layer_idx + 1) * (height / (len(layers) + 2)) |
|
|
x_step = width / (len(unvisited) + 1) |
|
|
|
|
|
for i, node_id in enumerate(unvisited): |
|
|
x_pos = (i + 1) * x_step |
|
|
|
|
|
for glyph in glyph_map.glyphs: |
|
|
if glyph.id == node_id: |
|
|
glyph.position = (x_pos, y_pos) |
|
|
break |
|
|
else: |
|
|
|
|
|
glyph_map = self._apply_grid_layout(glyph_map) |
|
|
|
|
|
return glyph_map |
|
|
|
|
|
def _apply_circular_layout(self, glyph_map: GlyphMap) -> GlyphMap: |
|
|
"""Apply circular layout to a glyph map.""" |
|
|
|
|
|
G = nx.Graph() |
|
|
|
|
|
|
|
|
for glyph in glyph_map.glyphs: |
|
|
G.add_node(glyph.id, size=glyph.size, type=glyph.type.value) |
|
|
|
|
|
|
|
|
for conn in glyph_map.connections: |
|
|
if conn.source_id in G and conn.target_id in G: |
|
|
G.add_edge( |
|
|
conn.source_id, |
|
|
conn.target_id, |
|
|
weight=conn.strength |
|
|
) |
|
|
|
|
|
|
|
|
width, height = glyph_map.dimensions |
|
|
center_x, center_y = width / 2, height / 2 |
|
|
radius = min(width, height) * 0.4 |
|
|
|
|
|
pos = nx.circular_layout(G, scale=radius) |
|
|
|
|
|
|
|
|
for node_id, (x, y) in pos.items(): |
|
|
pos[node_id] = (x + center_x, y + center_y) |
|
|
|
|
|
|
|
|
for glyph in glyph_map.glyphs: |
|
|
if glyph.id in pos: |
|
|
glyph.position = pos[glyph.id] |
|
|
|
|
|
return glyph_map |
|
|
|
|
|
def _apply_grid_layout(self, glyph_map: GlyphMap) -> GlyphMap: |
|
|
"""Apply grid layout to a glyph map.""" |
|
|
width, height = glyph_map.dimensions |
|
|
num_glyphs = len(glyph_map.glyphs) |
|
|
|
|
|
|
|
|
grid_size = int(np.ceil(np.sqrt(num_glyphs))) |
|
|
cell_width = width / (grid_size + 1) |
|
|
cell_height = height / (grid_size + 1) |
|
|
|
|
|
|
|
|
for i, glyph in enumerate(glyph_map.glyphs): |
|
|
row = i // grid_size |
|
|
col = i % grid_size |
|
|
|
|
|
glyph.position = ( |
|
|
(col + 1) * cell_width, |
|
|
(row + 1) * cell_height |
|
|
) |
|
|
|
|
|
return glyph_map |
|
|
|
|
|
def _apply_radial_layout(self, glyph_map: GlyphMap) -> GlyphMap: |
|
|
"""Apply radial layout to a glyph map.""" |
|
|
|
|
|
G = nx.Graph() |
|
|
|
|
|
|
|
|
for glyph in glyph_map.glyphs: |
|
|
G.add_node(glyph.id, size=glyph.size, type=glyph.type.value) |
|
|
|
|
|
|
|
|
for conn in glyph_map.connections: |
|
|
if conn.source_id in G and conn.target_id in G: |
|
|
G.add_edge( |
|
|
conn.source_id, |
|
|
conn.target_id, |
|
|
weight=conn.strength |
|
|
) |
|
|
|
|
|
|
|
|
if glyph_map.focal_points: |
|
|
central_nodes = [fp for fp in glyph_map.focal_points if fp in G] |
|
|
else: |
|
|
|
|
|
centrality = nx.betweenness_centrality(G) |
|
|
central_nodes = sorted( |
|
|
centrality.keys(), |
|
|
key=lambda x: centrality[x], |
|
|
reverse=True |
|
|
)[:min(3, len(centrality))] |
|
|
|
|
|
|
|
|
width, height = glyph_map.dimensions |
|
|
center_x, center_y = width / 2, height / 2 |
|
|
|
|
|
pos = nx.kamada_kawai_layout(G) |
|
|
|
|
|
|
|
|
pos_array = np.array(list(pos.values())) |
|
|
if len(pos_array) > 0: |
|
|
min_x, min_y = pos_array.min(axis=0) |
|
|
max_x, max_y = pos_array.max(axis=0) |
|
|
|
|
|
|
|
|
x_range = max_x - min_x |
|
|
y_range = max_y - min_y |
|
|
|
|
|
if x_range > 0: |
|
|
scale_x = (width * 0.8) / x_range |
|
|
else: |
|
|
scale_x = 1.0 |
|
|
|
|
|
if y_range > 0: |
|
|
scale_y = (height * 0.8) / y_range |
|
|
else: |
|
|
scale_y = 1.0 |
|
|
|
|
|
|
|
|
for node_id, (x, y) in pos.items(): |
|
|
x_scaled = ((x - min_x) * scale_x) + width * 0.1 |
|
|
y_scaled = ((y - min_y) * scale_y) + height * 0.1 |
|
|
pos[node_id] = (x_scaled, y_scaled) |
|
|
|
|
|
|
|
|
if central_nodes: |
|
|
|
|
|
angle_step = 2 * np.pi / len(central_nodes) |
|
|
center_radius = min(width, height) * 0.15 |
|
|
|
|
|
for i, node_id in enumerate(central_nodes): |
|
|
angle = i * angle_step |
|
|
x = center_x + center_radius * np.cos(angle) |
|
|
y = center_y + center_radius * np.sin(angle) |
|
|
pos[node_id] = (x, y) |
|
|
|
|
|
|
|
|
for glyph in glyph_map.glyphs: |
|
|
if glyph.id in pos: |
|
|
glyph.position = pos[glyph.id] |
|
|
|
|
|
return glyph_map |
|
|
|
|
|
def _apply_focus( |
|
|
self, |
|
|
glyph_map: GlyphMap, |
|
|
focus_on: List[str] |
|
|
) -> GlyphMap: |
|
|
"""Apply focus to specific tokens or elements.""" |
|
|
|
|
|
focus_glyph_ids = [] |
|
|
|
|
|
for glyph in glyph_map.glyphs: |
|
|
|
|
|
if glyph.type == GlyphType.SENTINEL and hasattr(glyph, 'source_elements') and glyph.source_elements: |
|
|
for term in focus_on: |
|
|
if any(term in str(elem) for elem in glyph.source_elements): |
|
|
focus_glyph_ids.append(glyph.id) |
|
|
break |
|
|
|
|
|
if not focus_glyph_ids: |
|
|
|
|
|
return glyph_map |
|
|
|
|
|
|
|
|
glyph_map.focal_points = focus_glyph_ids |
|
|
|
|
|
|
|
|
for glyph in glyph_map.glyphs: |
|
|
if glyph.id in focus_glyph_ids: |
|
|
glyph.size *= 1.5 |
|
|
glyph.opacity = min(1.0, glyph.opacity + 0.2) |
|
|
|
|
|
|
|
|
for conn in glyph_map.connections: |
|
|
if conn.source_id in focus_glyph_ids or conn.target_id in focus_glyph_ids: |
|
|
conn.width *= 1.5 |
|
|
conn.opacity = min(1.0, conn.opacity + 0.2) |
|
|
|
|
|
return glyph_map |
|
|
|
|
|
def _find_related_glyphs( |
|
|
self, |
|
|
glyph_map1: GlyphMap, |
|
|
glyph_map2: GlyphMap |
|
|
) -> List[Tuple[str, str, float]]: |
|
|
"""Find related glyphs between two glyph maps.""" |
|
|
related_pairs = [] |
|
|
|
|
|
|
|
|
def token_similarity(g1: Glyph, g2: Glyph) -> float: |
|
|
"""Calculate similarity between token glyphs.""" |
|
|
if (g1.type == GlyphType.SENTINEL and g2.type == GlyphType.SENTINEL and |
|
|
hasattr(g1, 'source_elements') and hasattr(g2, 'source_elements') and |
|
|
g1.source_elements and g2.source_elements): |
|
|
|
|
|
text1 = str(g1.source_elements[0]) |
|
|
text2 = str(g2.source_elements[0]) |
|
|
if text1 == text2: |
|
|
return 1.0 |
|
|
elif text1 in text2 or text2 in text1: |
|
|
return 0.8 |
|
|
else: |
|
|
|
|
|
return 1.0 - min(1.0, distance.levenshtein(text1, text2) / max(len(text1), len(text2))) |
|
|
return 0.0 |
|
|
|
|
|
def attribution_similarity(g1: Glyph, g2: Glyph) -> float: |
|
|
"""Calculate similarity between attribution glyphs.""" |
|
|
if g1.type == GlyphType.ATTRIBUTION and g2.type == GlyphType.ATTRIBUTION: |
|
|
|
|
|
metadata_sim = 0.0 |
|
|
count = 0 |
|
|
|
|
|
|
|
|
for attr in ['source_index', 'target_index', 'attribution_type', 'strength']: |
|
|
if attr in g1.metadata and attr in g2.metadata: |
|
|
if g1.metadata[attr] == g2.metadata[attr]: |
|
|
metadata_sim += 1.0 |
|
|
else: |
|
|
|
|
|
if attr == 'strength' and isinstance(g1.metadata[attr], (int, float)) and isinstance(g2.metadata[attr], (int, float)): |
|
|
diff = abs(g1.metadata[attr] - g2.metadata[attr]) |
|
|
metadata_sim += max(0.0, 1.0 - diff) |
|
|
else: |
|
|
metadata_sim += 0.0 |
|
|
count += 1 |
|
|
|
|
|
|
|
|
symbol_sim = 1.0 if g1.symbol == g2.symbol else 0.0 |
|
|
|
|
|
|
|
|
if count > 0: |
|
|
return (metadata_sim / count) * 0.7 + symbol_sim * 0.3 |
|
|
else: |
|
|
return symbol_sim |
|
|
return 0.0 |
|
|
|
|
|
def residue_similarity(g1: Glyph, g2: Glyph) -> float: |
|
|
"""Calculate similarity between residue glyphs.""" |
|
|
if g1.type == GlyphType.RESIDUE and g2.type == GlyphType.RESIDUE: |
|
|
|
|
|
if g1.metadata.get('pattern_type') == g2.metadata.get('pattern_type'): |
|
|
return 0.9 |
|
|
|
|
|
|
|
|
if g1.symbol == g2.symbol: |
|
|
return 0.7 |
|
|
|
|
|
|
|
|
if 'confidence' in g1.metadata and 'confidence' in g2.metadata: |
|
|
conf_diff = abs(g1.metadata['confidence'] - g2.metadata['confidence']) |
|
|
return max(0.0, 0.5 - conf_diff) |
|
|
|
|
|
return 0.3 |
|
|
return 0.0 |
|
|
|
|
|
def recursive_similarity(g1: Glyph, g2: Glyph) -> float: |
|
|
"""Calculate similarity between recursive glyphs.""" |
|
|
if g1.type == GlyphType.RECURSIVE and g2.type == GlyphType.RECURSIVE: |
|
|
|
|
|
if g1.symbol == g2.symbol: |
|
|
return 0.8 |
|
|
|
|
|
|
|
|
if 'depth' in g1.metadata and 'depth' in g2.metadata: |
|
|
if g1.metadata['depth'] == g2.metadata['depth']: |
|
|
return 0.7 |
|
|
else: |
|
|
depth_diff = abs(g1.metadata['depth'] - g2.metadata['depth']) |
|
|
return max(0.0, 0.6 - (depth_diff * 0.1)) |
|
|
|
|
|
return 0.4 |
|
|
return 0.0 |
|
|
|
|
|
def meta_similarity(g1: Glyph, g2: Glyph) -> float: |
|
|
"""Calculate similarity between meta glyphs.""" |
|
|
if g1.type == GlyphType.META and g2.type == GlyphType.META: |
|
|
|
|
|
if g1.symbol == g2.symbol: |
|
|
return 0.9 |
|
|
|
|
|
|
|
|
common_semantics = set(s.value for s in g1.semantics).intersection( |
|
|
set(s.value for s in g2.semantics) |
|
|
) |
|
|
if common_semantics: |
|
|
return 0.6 + 0.3 * (len(common_semantics) / max(len(g1.semantics), len(g2.semantics))) |
|
|
|
|
|
return 0.3 |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
for g1 in glyph_map1.glyphs: |
|
|
for g2 in glyph_map2.glyphs: |
|
|
similarity = 0.0 |
|
|
|
|
|
|
|
|
if g1.type == GlyphType.SENTINEL and g2.type == GlyphType.SENTINEL: |
|
|
similarity = token_similarity(g1, g2) |
|
|
elif g1.type == GlyphType.ATTRIBUTION and g2.type == GlyphType.ATTRIBUTION: |
|
|
similarity = attribution_similarity(g1, g2) |
|
|
elif g1.type == GlyphType.RESIDUE and g2.type == GlyphType.RESIDUE: |
|
|
similarity = residue_similarity(g1, g2) |
|
|
elif g1.type == GlyphType.RECURSIVE and g2.type == GlyphType.RECURSIVE: |
|
|
similarity = recursive_similarity(g1, g2) |
|
|
elif g1.type == GlyphType.META and g2.type == GlyphType.META: |
|
|
similarity = meta_similarity(g1, g2) |
|
|
elif g1.type == g2.type: |
|
|
|
|
|
if g1.symbol == g2.symbol: |
|
|
similarity = 0.6 |
|
|
else: |
|
|
similarity = 0.3 |
|
|
|
|
|
|
|
|
if similarity >= 0.5: |
|
|
related_pairs.append((g1.id, g2.id, similarity)) |
|
|
|
|
|
|
|
|
related_pairs.sort(key=lambda x: x[2], reverse=True) |
|
|
|
|
|
return related_pairs |
|
|
|
|
|
def _calculate_signature_similarity( |
|
|
self, |
|
|
signature1: str, |
|
|
signature2: str |
|
|
) -> float: |
|
|
"""Calculate similarity between two residue signatures.""" |
|
|
|
|
|
sig1 = signature1.lower() |
|
|
sig2 = signature2.lower() |
|
|
|
|
|
|
|
|
max_len = max(len(sig1), len(sig2)) |
|
|
if max_len == 0: |
|
|
return 1.0 |
|
|
|
|
|
lev_dist = distance.levenshtein(sig1, sig2) |
|
|
sim = 1.0 - (lev_dist / max_len) |
|
|
|
|
|
|
|
|
common_prefix_len = 0 |
|
|
for i in range(min(len(sig1), len(sig2))): |
|
|
if sig1[i] == sig2[i]: |
|
|
common_prefix_len += 1 |
|
|
else: |
|
|
break |
|
|
|
|
|
prefix_boost = 0.0 |
|
|
if common_prefix_len > 3: |
|
|
prefix_boost = min(0.2, common_prefix_len / max_len) |
|
|
|
|
|
return min(1.0, sim + prefix_boost) |
|
|
|
|
|
def _calculate_context_similarity( |
|
|
self, |
|
|
context1: Dict[str, Any], |
|
|
context2: Dict[str, Any] |
|
|
) -> float: |
|
|
"""Calculate similarity between two residue contexts.""" |
|
|
|
|
|
if not context1 or not context2: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
common_keys = set(context1.keys()).intersection(set(context2.keys())) |
|
|
if not common_keys: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
similarity_sum = 0.0 |
|
|
for key in common_keys: |
|
|
val1 = context1[key] |
|
|
val2 = context2[key] |
|
|
|
|
|
if isinstance(val1, str) and isinstance(val2, str): |
|
|
|
|
|
similarity_sum += self._calculate_signature_similarity(val1, val2) |
|
|
elif isinstance(val1, (int, float)) and isinstance(val2, (int, float)): |
|
|
|
|
|
max_val = max(abs(val1), abs(val2)) |
|
|
if max_val > 0: |
|
|
similarity_sum += 1.0 - min(1.0, abs(val1 - val2) / max_val) |
|
|
else: |
|
|
similarity_sum += 1.0 |
|
|
elif val1 == val2: |
|
|
|
|
|
similarity_sum += 1.0 |
|
|
else: |
|
|
similarity_sum += 0.0 |
|
|
|
|
|
|
|
|
return similarity_sum / len(common_keys) |
|
|
|
|
|
def _get_color_for_attribution(self, link: AttributionLink) -> str: |
|
|
"""Get color for attribution link based on type and strength.""" |
|
|
if link.attribution_type == AttributionType.DIRECT: |
|
|
|
|
|
blue_val = max(0, int(255 - (link.strength * 170))) |
|
|
return f"rgb(0, {120 + int(link.strength * 70)}, {180 + blue_val})" |
|
|
elif link.attribution_type == AttributionType.INDIRECT: |
|
|
|
|
|
return f"rgb({120 + int(link.strength * 70)}, 0, {180 + int(link.strength * 60)})" |
|
|
elif link.attribution_type == AttributionType.RESIDUAL: |
|
|
|
|
|
return f"rgb(0, {150 + int(link.strength * 70)}, {80 + int(link.strength * 40)})" |
|
|
elif link.attribution_type == AttributionType.RECURSIVE: |
|
|
|
|
|
return f"rgb({200 + int(link.strength * 50)}, {70 + int(link.strength * 60)}, 0)" |
|
|
else: |
|
|
|
|
|
intensity = 100 + int(link.strength * 100) |
|
|
return f"rgb({intensity}, {intensity}, {intensity})" |
|
|
|
|
|
def _get_color_for_residue_type(self, residue_type: str) -> str: |
|
|
"""Get color for residue based on type.""" |
|
|
colors = { |
|
|
"memory_decay": "#3498db", |
|
|
"value_conflict": "#e74c3c", |
|
|
"ghost_activation": "#9b59b6", |
|
|
"boundary_hesitation": "#f39c12", |
|
|
"null_output": "#95a5a6", |
|
|
"recursive_collapse": "#27ae60", |
|
|
"attention_drift": "#1abc9c", |
|
|
"token_oscillation": "#d35400" |
|
|
} |
|
|
|
|
|
return colors.get(residue_type, "#2c3e50") |
|
|
|
|
|
def _get_color_for_layer(self, layer: int, alpha: float = 1.0) -> str: |
|
|
"""Get color for a specific layer.""" |
|
|
|
|
|
hue = (layer * 30) % 360 |
|
|
return f"hsl({hue}, 70%, 60%, {alpha})" |
|
|
|
|
|
def _get_color_for_depth(self, depth: int, alpha: float = 1.0) -> str: |
|
|
"""Get color for a specific recursion depth.""" |
|
|
|
|
|
if depth == 0: |
|
|
return f"rgba(155, 89, 182, {alpha})" |
|
|
elif depth == 1: |
|
|
return f"rgba(52, 152, 219, {alpha})" |
|
|
elif depth == 2: |
|
|
return f"rgba(26, 188, 156, {alpha})" |
|
|
elif depth == 3: |
|
|
return f"rgba(39, 174, 96, {alpha})" |
|
|
elif depth == 4: |
|
|
return f"rgba(241, 196, 15, {alpha})" |
|
|
elif depth >= 5: |
|
|
return f"rgba(230, 126, 34, {alpha})" |
|
|
|
|
|
def _scale_positions( |
|
|
self, |
|
|
positions: np.ndarray, |
|
|
dimensions: Tuple[int, int] |
|
|
) -> np.ndarray: |
|
|
"""Scale positions to fit dimensions.""" |
|
|
width, height = dimensions |
|
|
|
|
|
|
|
|
min_x, min_y = positions.min(axis=0) |
|
|
max_x, max_y = positions.max(axis=0) |
|
|
|
|
|
|
|
|
x_range = max_x - min_x |
|
|
y_range = max_y - min_y |
|
|
|
|
|
if x_range > 0: |
|
|
scale_x = (width * 0.8) / x_range |
|
|
else: |
|
|
scale_x = 1.0 |
|
|
|
|
|
if y_range > 0: |
|
|
scale_y = (height * 0.8) / y_range |
|
|
else: |
|
|
scale_y = 1.0 |
|
|
|
|
|
|
|
|
positions_scaled = np.zeros_like(positions) |
|
|
positions_scaled[:, 0] = (positions[:, 0] - min_x) * scale_x + width * 0.1 |
|
|
positions_scaled[:, 1] = (positions[:, 1] - min_y) * scale_y + height * 0.1 |
|
|
|
|
|
return positions_scaled |
|
|
|
|
|
def _simple_visualization( |
|
|
self, |
|
|
glyph_map: GlyphMap, |
|
|
output_path: Optional[str] = None |
|
|
) -> Dict[str, Any]: |
|
|
"""Simple matplotlib visualization if no visualizer available.""" |
|
|
|
|
|
plt.figure(figsize=(12, 10)) |
|
|
|
|
|
|
|
|
for conn in glyph_map.connections: |
|
|
|
|
|
source_glyph = next((g for g in glyph_map.glyphs if g.id == conn.source_id), None) |
|
|
target_glyph = next((g for g in glyph_map.glyphs if g.id == conn.target_id), None) |
|
|
|
|
|
if source_glyph and target_glyph: |
|
|
|
|
|
source_x, source_y = source_glyph.position |
|
|
target_x, target_y = target_glyph.position |
|
|
|
|
|
|
|
|
plt.plot( |
|
|
[source_x, target_x], |
|
|
[source_y, target_y], |
|
|
color=conn.color, |
|
|
linewidth=conn.width, |
|
|
alpha=conn.opacity, |
|
|
zorder=1, |
|
|
linestyle='-' if conn.directed else '--' |
|
|
) |
|
|
|
|
|
|
|
|
if conn.directed: |
|
|
dx = target_x - source_x |
|
|
dy = target_y - source_y |
|
|
dist = np.sqrt(dx**2 + dy**2) |
|
|
if dist > 0: |
|
|
|
|
|
dx, dy = dx / dist, dy / dist |
|
|
midpoint_x = (source_x + target_x) / 2 |
|
|
midpoint_y = (source_y + target_y) / 2 |
|
|
|
|
|
|
|
|
plt.arrow( |
|
|
midpoint_x - dx * 5, |
|
|
midpoint_y - dy * 5, |
|
|
dx * 10, |
|
|
dy * 10, |
|
|
head_width=5, |
|
|
head_length=5, |
|
|
fc=conn.color, |
|
|
ec=conn.color, |
|
|
alpha=conn.opacity, |
|
|
zorder=1 |
|
|
) |
|
|
|
|
|
|
|
|
for glyph in glyph_map.glyphs: |
|
|
x, y = glyph.position |
|
|
|
|
|
|
|
|
plt.text( |
|
|
x, y, |
|
|
glyph.symbol, |
|
|
fontsize=glyph.size, |
|
|
color=glyph.color, |
|
|
alpha=glyph.opacity, |
|
|
ha='center', |
|
|
va='center', |
|
|
zorder=2 |
|
|
) |
|
|
|
|
|
|
|
|
if glyph.id in glyph_map.focal_points: |
|
|
circle = plt.Circle( |
|
|
(x, y), |
|
|
glyph.size * 0.8, |
|
|
fill=False, |
|
|
color='black', |
|
|
linestyle=':', |
|
|
alpha=0.7, |
|
|
zorder=1 |
|
|
) |
|
|
plt.gca().add_patch(circle) |
|
|
|
|
|
|
|
|
width, height = glyph_map.dimensions |
|
|
plt.xlim(0, width) |
|
|
plt.ylim(0, height) |
|
|
|
|
|
|
|
|
plt.axis('off') |
|
|
|
|
|
|
|
|
title = f"Glyph Map: {glyph_map.source_type.capitalize()}" |
|
|
if "trace_target" in glyph_map.metadata: |
|
|
title += f" - {glyph_map.metadata['trace_target']}" |
|
|
plt.title(title) |
|
|
|
|
|
|
|
|
if output_path: |
|
|
plt.savefig(output_path, dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
return {"output_path": output_path} |
|
|
|
|
|
|
|
|
return {"figure": plt.gcf()} |
|
|
|
|
|
|
|
|
|
|
|
class GlyphExplorer: |
|
|
""" |
|
|
Utility class for interactive exploration of glyph maps. |
|
|
|
|
|
This class provides methods for filtering, searching, and analyzing |
|
|
glyph maps to extract insights and patterns. |
|
|
""" |
|
|
|
|
|
def __init__(self, glyph_map: GlyphMap): |
|
|
""" |
|
|
Initialize the glyph explorer. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
glyph_map : GlyphMap |
|
|
Glyph map to explore |
|
|
""" |
|
|
self.glyph_map = glyph_map |
|
|
self.filtered_glyphs = glyph_map.glyphs |
|
|
self.filtered_connections = glyph_map.connections |
|
|
|
|
|
def filter_by_type(self, glyph_type: GlyphType) -> 'GlyphExplorer': |
|
|
""" |
|
|
Filter glyphs by type. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
glyph_type : GlyphType |
|
|
Type of glyphs to include |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
GlyphExplorer |
|
|
Self, for method chaining |
|
|
""" |
|
|
self.filtered_glyphs = [ |
|
|
g for g in self.filtered_glyphs |
|
|
if g.type == glyph_type |
|
|
] |
|
|
self._update_connections() |
|
|
return self |
|
|
|
|
|
def filter_by_semantic(self, semantic: GlyphSemantic) -> 'GlyphExplorer': |
|
|
""" |
|
|
Filter glyphs by semantic dimension. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
semantic : GlyphSemantic |
|
|
Semantic dimension to filter by |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
GlyphExplorer |
|
|
Self, for method chaining |
|
|
""" |
|
|
self.filtered_glyphs = [ |
|
|
g for g in self.filtered_glyphs |
|
|
if semantic in g.semantics |
|
|
] |
|
|
self._update_connections() |
|
|
return self |
|
|
|
|
|
def filter_by_symbol(self, symbol: str) -> 'GlyphExplorer': |
|
|
""" |
|
|
Filter glyphs by symbol. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
symbol : str |
|
|
Symbol to filter by |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
GlyphExplorer |
|
|
Self, for method chaining |
|
|
""" |
|
|
self.filtered_glyphs = [ |
|
|
g for g in self.filtered_glyphs |
|
|
if g.symbol == symbol |
|
|
] |
|
|
self._update_connections() |
|
|
return self |
|
|
|
|
|
def filter_by_size( |
|
|
self, |
|
|
min_size: Optional[float] = None, |
|
|
max_size: Optional[float] = None |
|
|
) -> 'GlyphExplorer': |
|
|
""" |
|
|
Filter glyphs by size. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
min_size : Optional[float] |
|
|
Minimum size (inclusive) |
|
|
max_size : Optional[float] |
|
|
Maximum size (inclusive) |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
GlyphExplorer |
|
|
Self, for method chaining |
|
|
""" |
|
|
if min_size is not None: |
|
|
self.filtered_glyphs = [ |
|
|
g for g in self.filtered_glyphs |
|
|
if g.size >= min_size |
|
|
] |
|
|
|
|
|
if max_size is not None: |
|
|
self.filtered_glyphs = [ |
|
|
g for g in self.filtered_glyphs |
|
|
if g.size <= max_size |
|
|
] |
|
|
|
|
|
self._update_connections() |
|
|
return self |
|
|
|
|
|
def filter_by_metadata( |
|
|
self, |
|
|
key: str, |
|
|
value: Any |
|
|
) -> 'GlyphExplorer': |
|
|
""" |
|
|
Filter glyphs by metadata field. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
key : str |
|
|
Metadata key |
|
|
value : Any |
|
|
Metadata value to match |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
GlyphExplorer |
|
|
Self, for method chaining |
|
|
""" |
|
|
self.filtered_glyphs = [ |
|
|
g for g in self.filtered_glyphs |
|
|
if key in g.metadata and g.metadata[key] == value |
|
|
] |
|
|
self._update_connections() |
|
|
return self |
|
|
|
|
|
def filter_connections_by_type(self, conn_type: str) -> 'GlyphExplorer': |
|
|
""" |
|
|
Filter connections by type. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
conn_type : str |
|
|
Connection type to filter by |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
GlyphExplorer |
|
|
Self, for method chaining |
|
|
""" |
|
|
self.filtered_connections = [ |
|
|
c for c in self.filtered_connections |
|
|
if c.type == conn_type |
|
|
] |
|
|
return self |
|
|
|
|
|
def filter_connections_by_strength( |
|
|
self, |
|
|
min_strength: Optional[float] = None, |
|
|
max_strength: Optional[float] = None |
|
|
) -> 'GlyphExplorer': |
|
|
""" |
|
|
Filter connections by strength. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
min_strength : Optional[float] |
|
|
Minimum strength (inclusive) |
|
|
max_strength : Optional[float] |
|
|
Maximum strength (inclusive) |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
GlyphExplorer |
|
|
Self, for method chaining |
|
|
""" |
|
|
if min_strength is not None: |
|
|
self.filtered_connections = [ |
|
|
c for c in self.filtered_connections |
|
|
if c.strength >= min_strength |
|
|
] |
|
|
|
|
|
if max_strength is not None: |
|
|
self.filtered_connections = [ |
|
|
c for c in self.filtered_connections |
|
|
if c.strength <= max_strength |
|
|
] |
|
|
|
|
|
return self |
|
|
|
|
|
def search_by_description(self, query: str) -> 'GlyphExplorer': |
|
|
""" |
|
|
Search glyphs by description text. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
query : str |
|
|
Search query |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
GlyphExplorer |
|
|
Self, for method chaining |
|
|
""" |
|
|
self.filtered_glyphs = [ |
|
|
g for g in self.filtered_glyphs |
|
|
if g.description and query.lower() in g.description.lower() |
|
|
] |
|
|
self._update_connections() |
|
|
return self |
|
|
|
|
|
def find_central_glyphs(self, top_n: int = 5) -> List[Glyph]: |
|
|
""" |
|
|
Find central glyphs based on connection count. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
top_n : int |
|
|
Number of top central glyphs to return |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
List[Glyph] |
|
|
Top central glyphs |
|
|
""" |
|
|
|
|
|
glyph_ids = [g.id for g in self.filtered_glyphs] |
|
|
connection_counts = {} |
|
|
|
|
|
for glyph_id in glyph_ids: |
|
|
count = sum( |
|
|
1 for c in self.filtered_connections |
|
|
if c.source_id == glyph_id or c.target_id == glyph_id |
|
|
) |
|
|
connection_counts[glyph_id] = count |
|
|
|
|
|
|
|
|
top_glyph_ids = sorted( |
|
|
connection_counts.keys(), |
|
|
key=lambda x: connection_counts[x], |
|
|
reverse=True |
|
|
)[:top_n] |
|
|
|
|
|
|
|
|
top_glyphs = [ |
|
|
g for g in self.filtered_glyphs |
|
|
if g.id in top_glyph_ids |
|
|
] |
|
|
|
|
|
return top_glyphs |
|
|
|
|
|
def find_clusters(self, min_size: int = 3) -> Dict[str, List[Glyph]]: |
|
|
""" |
|
|
Find clusters of connected glyphs. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
min_size : int |
|
|
Minimum cluster size |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Dict[str, List[Glyph]] |
|
|
Dictionary of clusters |
|
|
""" |
|
|
|
|
|
G = nx.Graph() |
|
|
|
|
|
|
|
|
for glyph in self.filtered_glyphs: |
|
|
G.add_node(glyph.id) |
|
|
|
|
|
|
|
|
for conn in self.filtered_connections: |
|
|
if conn.source_id in G and conn.target_id in G: |
|
|
G.add_edge(conn.source_id, conn.target_id, weight=conn.strength) |
|
|
|
|
|
|
|
|
components = list(nx.connected_components(G)) |
|
|
|
|
|
|
|
|
clusters = {} |
|
|
for i, component in enumerate(components): |
|
|
if len(component) >= min_size: |
|
|
cluster_glyphs = [ |
|
|
g for g in self.filtered_glyphs |
|
|
if g.id in component |
|
|
] |
|
|
clusters[f"cluster_{i}"] = cluster_glyphs |
|
|
|
|
|
return clusters |
|
|
|
|
|
def calculate_statistics(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Calculate statistics for the filtered glyph map. |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Dict[str, Any] |
|
|
Dictionary of statistics |
|
|
""" |
|
|
stats = { |
|
|
"num_glyphs": len(self.filtered_glyphs), |
|
|
"num_connections": len(self.filtered_connections), |
|
|
"glyph_types": {}, |
|
|
"connection_types": {}, |
|
|
"avg_connection_strength": 0.0, |
|
|
"glyph_size_stats": { |
|
|
"min": float('inf'), |
|
|
"max": 0.0, |
|
|
"avg": 0.0 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
for glyph in self.filtered_glyphs: |
|
|
glyph_type = glyph.type.value |
|
|
if glyph_type not in stats["glyph_types"]: |
|
|
stats["glyph_types"][glyph_type] = 0 |
|
|
stats["glyph_types"][glyph_type] += 1 |
|
|
|
|
|
|
|
|
stats["glyph_size_stats"]["min"] = min(stats["glyph_size_stats"]["min"], glyph.size) |
|
|
stats["glyph_size_stats"]["max"] = max(stats["glyph_size_stats"]["max"], glyph.size) |
|
|
stats["glyph_size_stats"]["avg"] += glyph.size |
|
|
|
|
|
if self.filtered_glyphs: |
|
|
stats["glyph_size_stats"]["avg"] /= len(self.filtered_glyphs) |
|
|
else: |
|
|
stats["glyph_size_stats"]["min"] = 0.0 |
|
|
|
|
|
|
|
|
total_strength = 0.0 |
|
|
for conn in self.filtered_connections: |
|
|
if conn.type not in stats["connection_types"]: |
|
|
stats["connection_types"][conn.type] = 0 |
|
|
stats["connection_types"][conn.type] += 1 |
|
|
total_strength += conn.strength |
|
|
|
|
|
if self.filtered_connections: |
|
|
stats["avg_connection_strength"] = total_strength / len(self.filtered_connections) |
|
|
|
|
|
return stats |
|
|
|
|
|
def reset_filters(self) -> 'GlyphExplorer': |
|
|
""" |
|
|
Reset all filters. |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
GlyphExplorer |
|
|
Self, for method chaining |
|
|
""" |
|
|
self.filtered_glyphs = self.glyph_map.glyphs |
|
|
self.filtered_connections = self.glyph_map.connections |
|
|
return self |
|
|
|
|
|
def _update_connections(self): |
|
|
"""Update connections based on filtered glyphs.""" |
|
|
filtered_glyph_ids = [g.id for g in self.filtered_glyphs] |
|
|
self.filtered_connections = [ |
|
|
c for c in self.glyph_map.connections |
|
|
if c.source_id in filtered_glyph_ids and c.target_id in filtered_glyph_ids |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import argparse |
|
|
|
|
|
parser = argparse.ArgumentParser(description="Glyph Mapper for Attribution and Residue Visualization") |
|
|
parser.add_argument("--input", "-i", type=str, help="Input attribution or residue file") |
|
|
parser.add_argument("--output", "-o", type=str, help="Output visualization file") |
|
|
parser.add_argument("--type", "-t", type=str, default="attribution", choices=["attribution", "residue", "attention", "recursive"], help="Type of input data") |
|
|
parser.add_argument("--layout", "-l", type=str, default="force_directed", choices=["force_directed", "hierarchical", "circular", "grid", "radial"], help="Layout type") |
|
|
parser.add_argument("--width", "-w", type=int, default=1200, help="Visualization width") |
|
|
parser.add_argument("--height", "-h", type=int, default=900, |
|
|
parser.add_argument("--height", "-h", type=int, default=900, help="Visualization height") |
|
|
parser.add_argument("--focus", "-f", type=str, help="Comma-separated tokens to focus on") |
|
|
parser.add_argument("--include-tokens", action="store_true", help="Include token sentinels in visualization") |
|
|
parser.add_argument("--cluster", action="store_true", help="Apply clustering to similar patterns") |
|
|
parser.add_argument("--save-map", "-s", type=str, help="Save glyph map to file") |
|
|
parser.add_argument("--interactive", "-i", action="store_true", help="Generate interactive visualization") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
mapper = GlyphMapper() |
|
|
|
|
|
if args.input: |
|
|
|
|
|
with open(args.input, "r") as f: |
|
|
data = json.load(f) |
|
|
|
|
|
|
|
|
if args.type == "attribution": |
|
|
|
|
|
attribution_map = AttributionMap( |
|
|
prompt_tokens=data.get("prompt_tokens", []), |
|
|
output_tokens=data.get("output_tokens", []), |
|
|
links=[ |
|
|
AttributionLink( |
|
|
source_idx=link.get("source_idx", 0), |
|
|
target_idx=link.get("target_idx", 0), |
|
|
attribution_type=AttributionType(link.get("attribution_type", "direct")), |
|
|
strength=link.get("strength", 0.5), |
|
|
attention_heads=link.get("attention_heads", []), |
|
|
layers=link.get("layers", []), |
|
|
intermediate_tokens=link.get("intermediate_tokens", []), |
|
|
residue=link.get("residue") |
|
|
) |
|
|
for link in data.get("links", []) |
|
|
], |
|
|
token_salience=data.get("token_salience", {}), |
|
|
attribution_gaps=data.get("attribution_gaps", []), |
|
|
collapsed_regions=data.get("collapsed_regions", []), |
|
|
uncertainty=data.get("uncertainty", {}), |
|
|
metadata=data.get("metadata", {}) |
|
|
) |
|
|
|
|
|
|
|
|
focus_on = args.focus.split(",") if args.focus else None |
|
|
|
|
|
|
|
|
glyph_map = mapper.map_attribution( |
|
|
attribution_map=attribution_map, |
|
|
layout_type=args.layout, |
|
|
dimensions=(args.width, args.height), |
|
|
include_tokens=args.include_tokens, |
|
|
focus_on=focus_on |
|
|
) |
|
|
|
|
|
elif args.type == "residue": |
|
|
|
|
|
residue_patterns = [ |
|
|
ResiduePattern( |
|
|
type=pattern.get("type", "unknown"), |
|
|
pattern=pattern.get("pattern", ""), |
|
|
context=pattern.get("context", {}), |
|
|
signature=pattern.get("signature", ""), |
|
|
confidence=pattern.get("confidence", 0.5) |
|
|
) |
|
|
for pattern in data |
|
|
] |
|
|
|
|
|
|
|
|
glyph_map = mapper.map_residue_patterns( |
|
|
residue_patterns=residue_patterns, |
|
|
layout_type=args.layout, |
|
|
dimensions=(args.width, args.height), |
|
|
cluster_patterns=args.cluster |
|
|
) |
|
|
|
|
|
elif args.type == "attention": |
|
|
|
|
|
glyph_map = mapper.map_attention_heads( |
|
|
attention_data=data, |
|
|
layout_type=args.layout, |
|
|
dimensions=(args.width, args.height), |
|
|
include_tokens=args.include_tokens |
|
|
) |
|
|
|
|
|
elif args.type == "recursive": |
|
|
|
|
|
glyph_map = mapper.map_recursive_trace( |
|
|
trace_data=data, |
|
|
layout_type=args.layout, |
|
|
dimensions=(args.width, args.height) |
|
|
) |
|
|
|
|
|
else: |
|
|
print(f"Unknown data type: {args.type}") |
|
|
exit(1) |
|
|
|
|
|
|
|
|
if args.save_map: |
|
|
mapper.save_glyph_map(glyph_map, args.save_map) |
|
|
|
|
|
|
|
|
if args.output: |
|
|
mapper.visualize( |
|
|
glyph_map=glyph_map, |
|
|
output_path=args.output, |
|
|
interactive=args.interactive |
|
|
) |
|
|
print(f"Visualization saved to {args.output}") |
|
|
else: |
|
|
|
|
|
explorer = GlyphExplorer(glyph_map) |
|
|
stats = explorer.calculate_statistics() |
|
|
|
|
|
print(f"Glyph Map Statistics:") |
|
|
print(f" Number of glyphs: {stats['num_glyphs']}") |
|
|
print(f" Number of connections: {stats['num_connections']}") |
|
|
print(f" Glyph types: {stats['glyph_types']}") |
|
|
print(f" Connection types: {stats['connection_types']}") |
|
|
print(f" Average connection strength: {stats['avg_connection_strength']:.2f}") |
|
|
|
|
|
|
|
|
central_glyphs = explorer.find_central_glyphs(top_n=3) |
|
|
print(f"\nCentral Glyphs:") |
|
|
for glyph in central_glyphs: |
|
|
print(f" {glyph.symbol} - {glyph.description}") |
|
|
else: |
|
|
print("No input file specified. Use --input to provide input data.") |
|
|
exit(1) |
|
|
|