|
|
""" |
|
|
example_usage.py |
|
|
|
|
|
This example demonstrates how to use the glyphs framework to analyze model cognition |
|
|
through attribution tracing, residue analysis, and symbolic shell execution. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import json |
|
|
import matplotlib.pyplot as plt |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
|
|
|
from glyphs.models import create_model_adapter |
|
|
from glyphs.attribution.tracer import AttributionTracer |
|
|
from glyphs.shells.executor import ShellExecutor, RecursiveShell |
|
|
from glyphs.viz.visualizer import GlyphVisualizer |
|
|
from glyphs.viz.glyph_mapper import GlyphMapper, GlyphExplorer |
|
|
|
|
|
|
|
|
output_dir = Path("./outputs") |
|
|
output_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
model = create_model_adapter("openai:gpt-4") |
|
|
|
|
|
|
|
|
print(f"Using model: {model.model_id}") |
|
|
|
|
|
|
|
|
visualizer = GlyphVisualizer() |
|
|
tracer = AttributionTracer(model, visualizer=visualizer) |
|
|
executor = ShellExecutor(tracer=tracer, visualizer=visualizer) |
|
|
mapper = GlyphMapper(visualizer=visualizer) |
|
|
|
|
|
|
|
|
test_prompts = { |
|
|
"memory": "List the following five items in order, then recall them in reverse order at the end of your response: apple, train, elephant, computer, umbrella. Between the forward and reverse lists, write a brief paragraph about memory techniques.", |
|
|
|
|
|
"value_conflict": "Should society prioritize economic growth or environmental protection? Consider both perspectives thoroughly.", |
|
|
|
|
|
"boundary": "Explain how the Higgs field interacts with dark matter to produce quantum gravity effects in the early universe.", |
|
|
|
|
|
"recursion": "Reflect on how you're currently reflecting on this question, including your meta-awareness of this reflective process itself.", |
|
|
|
|
|
"polysemantic": "Explain the concept of a 'bank' in different contexts, including financial institutions, river geography, and data storage." |
|
|
} |
|
|
|
|
|
def run_attribution_analysis(prompt_key="memory"): |
|
|
"""Run attribution analysis on a specific prompt.""" |
|
|
prompt = test_prompts[prompt_key] |
|
|
print(f"\n=== Attribution Analysis for '{prompt_key}' prompt ===") |
|
|
print(f"Prompt: {prompt[:100]}...") |
|
|
|
|
|
|
|
|
output = model.generate(prompt, max_tokens=800) |
|
|
print(f"Output: {output[:100]}...") |
|
|
|
|
|
|
|
|
print("Tracing attribution...") |
|
|
attribution_map = tracer.trace( |
|
|
prompt=prompt, |
|
|
output=output, |
|
|
include_confidence=True |
|
|
) |
|
|
|
|
|
|
|
|
print("Mapping attribution to glyphs...") |
|
|
glyph_map = mapper.map_attribution( |
|
|
attribution_map=attribution_map, |
|
|
layout_type="force_directed", |
|
|
include_tokens=True |
|
|
) |
|
|
|
|
|
|
|
|
output_path = output_dir / f"attribution_{prompt_key}.svg" |
|
|
mapper.visualize(glyph_map, output_path=str(output_path)) |
|
|
print(f"Visualization saved to {output_path}") |
|
|
|
|
|
|
|
|
explorer = GlyphExplorer(glyph_map) |
|
|
stats = explorer.calculate_statistics() |
|
|
print("\nAttribution Statistics:") |
|
|
print(f" Number of glyphs: {stats['num_glyphs']}") |
|
|
print(f" Number of connections: {stats['num_connections']}") |
|
|
print(f" Glyph types: {stats['glyph_types']}") |
|
|
|
|
|
|
|
|
central_glyphs = explorer.find_central_glyphs(top_n=3) |
|
|
print("\nCentral Glyphs:") |
|
|
for glyph in central_glyphs: |
|
|
print(f" {glyph.symbol} - {glyph.description}") |
|
|
|
|
|
return attribution_map, glyph_map |
|
|
|
|
|
def run_shell_analysis(shell_id="MEMTRACE", prompt_key="memory"): |
|
|
"""Run a specific diagnostic shell on a prompt.""" |
|
|
prompt = test_prompts[prompt_key] |
|
|
print(f"\n=== Shell Analysis: {shell_id} on '{prompt_key}' prompt ===") |
|
|
print(f"Prompt: {prompt[:100]}...") |
|
|
|
|
|
|
|
|
print(f"Executing shell {shell_id}...") |
|
|
result = executor.run( |
|
|
shell=shell_id, |
|
|
model=model, |
|
|
prompt=prompt, |
|
|
trace_attribution=True, |
|
|
record_residue=True, |
|
|
visualize=True |
|
|
) |
|
|
|
|
|
|
|
|
output_path = output_dir / f"shell_{shell_id}_{prompt_key}.json" |
|
|
with open(output_path, "w") as f: |
|
|
|
|
|
serializable_result = {} |
|
|
for k, v in result.items(): |
|
|
if k == "collapse_samples": |
|
|
serializable_result[k] = [ |
|
|
{ |
|
|
"position": sample["position"], |
|
|
"type": sample["type"], |
|
|
"confidence": sample["confidence"], |
|
|
"context": sample["context"], |
|
|
"residue": sample["residue"] |
|
|
} |
|
|
for sample in v |
|
|
] |
|
|
elif k == "attribution": |
|
|
|
|
|
serializable_result[k] = "Attribution map (omitted for serialization)" |
|
|
elif k == "visualization": |
|
|
|
|
|
serializable_result[k] = "Visualization data (omitted for serialization)" |
|
|
else: |
|
|
serializable_result[k] = v |
|
|
|
|
|
json.dump(serializable_result, f, indent=2) |
|
|
|
|
|
print(f"Shell result saved to {output_path}") |
|
|
|
|
|
|
|
|
print("\nShell Execution Summary:") |
|
|
print(f" Output length: {len(result['output'])}") |
|
|
print(f" Number of operations: {len(result['operations'])}") |
|
|
print(f" Number of residues: {len(result['residues'])}") |
|
|
|
|
|
|
|
|
if result["residues"]: |
|
|
print("\nResidues detected:") |
|
|
for i, residue in enumerate(result["residues"]): |
|
|
res_type = residue.get("type", "unknown") |
|
|
res_conf = residue.get("confidence", 0.0) |
|
|
print(f" {i+1}. Type: {res_type}, Confidence: {res_conf:.2f}") |
|
|
|
|
|
|
|
|
if result["collapse_samples"]: |
|
|
print("\nCollapse samples detected:") |
|
|
for i, sample in enumerate(result["collapse_samples"]): |
|
|
print(f" {i+1}. Type: {sample['type']}, Position: {sample['position']}, Confidence: {sample['confidence']:.2f}") |
|
|
print(f" Context: {sample['context'][:50]}...") |
|
|
|
|
|
|
|
|
if "visualization" in result and result["visualization"]: |
|
|
output_path = output_dir / f"shell_{shell_id}_{prompt_key}.svg" |
|
|
visualizer.save_visualization(result["visualization"], str(output_path)) |
|
|
print(f"Visualization saved to {output_path}") |
|
|
|
|
|
return result |
|
|
|
|
|
def run_recursive_shell(prompt_key="recursion"): |
|
|
"""Run a recursive shell with .p/ commands on a prompt.""" |
|
|
prompt = test_prompts[prompt_key] |
|
|
print(f"\n=== Recursive Shell Analysis on '{prompt_key}' prompt ===") |
|
|
print(f"Prompt: {prompt[:100]}...") |
|
|
|
|
|
|
|
|
recursive_shell = RecursiveShell( |
|
|
model=model, |
|
|
tracer=tracer, |
|
|
visualizer=visualizer |
|
|
) |
|
|
|
|
|
|
|
|
commands = [ |
|
|
".p/reflect.trace{depth=4, target=reasoning}", |
|
|
".p/reflect.uncertainty{quantify=true, distribution=show}", |
|
|
".p/collapse.detect{threshold=0.7, alert=true}", |
|
|
".p/fork.attribution{sources=all, visualize=true}" |
|
|
] |
|
|
|
|
|
|
|
|
print("Executing recursive shell commands...") |
|
|
result = recursive_shell.execute_sequence( |
|
|
commands=commands, |
|
|
prompt=prompt |
|
|
) |
|
|
|
|
|
|
|
|
output_path = output_dir / f"recursive_shell_{prompt_key}.json" |
|
|
with open(output_path, "w") as f: |
|
|
|
|
|
serializable_result = { |
|
|
"success": result["success"], |
|
|
"commands": result["commands"], |
|
|
"prompt": result["prompt"], |
|
|
"timestamp": result["timestamp"], |
|
|
"execution_time": result["execution_time"], |
|
|
"results": [] |
|
|
} |
|
|
|
|
|
|
|
|
for cmd_result in result["results"]: |
|
|
if cmd_result["success"]: |
|
|
serializable_result["results"].append({ |
|
|
"command": cmd_result["command"], |
|
|
"success": cmd_result["success"], |
|
|
"result_summary": { |
|
|
"command_family": cmd_result["original_command"]["family"], |
|
|
"command_function": cmd_result["original_command"]["function"], |
|
|
"execution_time": cmd_result["execution_time"], |
|
|
"output_length": len(cmd_result["result"]["output"]) if "output" in cmd_result["result"] else 0, |
|
|
"residues": len(cmd_result["result"]["residues"]) if "residues" in cmd_result["result"] else 0, |
|
|
"collapse_samples": len(cmd_result["result"]["collapse_samples"]) if "collapse_samples" in cmd_result["result"] else 0 |
|
|
} |
|
|
}) |
|
|
else: |
|
|
serializable_result["results"].append({ |
|
|
"command": cmd_result["command"], |
|
|
"success": cmd_result["success"], |
|
|
"error": cmd_result["error"] |
|
|
}) |
|
|
|
|
|
json.dump(serializable_result, f, indent=2) |
|
|
|
|
|
print(f"Recursive shell result saved to {output_path}") |
|
|
|
|
|
|
|
|
print("\nRecursive Shell Execution Summary:") |
|
|
print(f" Overall success: {result['success']}") |
|
|
print(f" Commands executed: {len(result['commands'])}") |
|
|
print(f" Execution time: {result['execution_time']:.2f}s") |
|
|
|
|
|
print("\nCommand Results:") |
|
|
for i, cmd_result in enumerate(result["results"]): |
|
|
cmd = cmd_result["command"] |
|
|
success = cmd_result["success"] |
|
|
print(f" {i+1}. {cmd}: {'Success' if success else 'Failed'}") |
|
|
if not success: |
|
|
print(f" Error: {cmd_result['error']}") |
|
|
|
|
|
|
|
|
visualization_data = None |
|
|
for cmd_result in reversed(result["results"]): |
|
|
if (cmd_result["success"] and |
|
|
cmd_result["original_command"]["family"] == "fork" and |
|
|
cmd_result["original_command"]["function"] == "attribution" and |
|
|
"result" in cmd_result and |
|
|
"visualization" in cmd_result["result"]): |
|
|
visualization_data = cmd_result["result"]["visualization"] |
|
|
break |
|
|
|
|
|
if visualization_data: |
|
|
output_path = output_dir / f"recursive_shell_{prompt_key}.svg" |
|
|
visualizer.save_visualization(visualization_data, str(output_path)) |
|
|
print(f"Visualization saved to {output_path}") |
|
|
|
|
|
return result |
|
|
|
|
|
def compare_shells(prompt_key="value_conflict", shells=None): |
|
|
"""Compare multiple shells on the same prompt.""" |
|
|
if shells is None: |
|
|
shells = ["VALUE-COLLAPSE", "FEATURE-SUPERPOSITION", "FORK-ATTRIBUTION"] |
|
|
|
|
|
prompt = test_prompts[prompt_key] |
|
|
print(f"\n=== Shell Comparison on '{prompt_key}' prompt ===") |
|
|
print(f"Prompt: {prompt[:100]}...") |
|
|
|
|
|
|
|
|
results = {} |
|
|
residue_patterns = [] |
|
|
|
|
|
for shell_id in shells: |
|
|
print(f"\nRunning shell: {shell_id}") |
|
|
result = executor.run( |
|
|
shell=shell_id, |
|
|
model=model, |
|
|
prompt=prompt, |
|
|
trace_attribution=True, |
|
|
record_residue=True |
|
|
) |
|
|
results[shell_id] = result |
|
|
|
|
|
|
|
|
for residue in result["residues"]: |
|
|
residue_patterns.append({ |
|
|
"shell": shell_id, |
|
|
"type": residue.get("type", "unknown"), |
|
|
"pattern": residue.get("pattern", ""), |
|
|
"confidence": residue.get("confidence", 0.0), |
|
|
"signature": residue.get("signature", "")[:30] + "..." |
|
|
}) |
|
|
|
|
|
|
|
|
output_path = output_dir / f"shell_comparison_{prompt_key}.json" |
|
|
with open(output_path, "w") as f: |
|
|
json.dump({ |
|
|
"prompt": prompt, |
|
|
"shells": shells, |
|
|
"residue_patterns": residue_patterns, |
|
|
"results": { |
|
|
shell_id: { |
|
|
"output_length": len(result["output"]), |
|
|
"num_operations": len(result["operations"]), |
|
|
"num_residues": len(result["residues"]), |
|
|
"num_collapses": len(result["collapse_samples"]) |
|
|
} |
|
|
for shell_id, result in results.items() |
|
|
} |
|
|
}, f, indent=2) |
|
|
|
|
|
print(f"Shell comparison saved to {output_path}") |
|
|
|
|
|
|
|
|
fig, ax = plt.subplots(figsize=(10, 6)) |
|
|
shell_names = list(results.keys()) |
|
|
|
|
|
|
|
|
residue_types = set() |
|
|
for pattern in residue_patterns: |
|
|
residue_types.add(pattern["type"]) |
|
|
|
|
|
residue_counts = { |
|
|
shell_id: { |
|
|
res_type: sum(1 for p in residue_patterns if p["shell"] == shell_id and p["type"] == res_type) |
|
|
for res_type in residue_types |
|
|
} |
|
|
for shell_id in shell_names |
|
|
} |
|
|
|
|
|
|
|
|
bar_width = 0.2 |
|
|
positions = range(len(shell_names)) |
|
|
|
|
|
for i, res_type in enumerate(sorted(residue_types)): |
|
|
counts = [residue_counts[shell][res_type] for shell in shell_names] |
|
|
ax.bar( |
|
|
[p + i * bar_width for p in positions], |
|
|
counts, |
|
|
width=bar_width, |
|
|
label=res_type |
|
|
) |
|
|
|
|
|
ax.set_xlabel('Shell') |
|
|
ax.set_ylabel('Number of Residues') |
|
|
ax.set_title(f'Residue Patterns by Shell - {prompt_key}') |
|
|
ax.set_xticks([p + bar_width * len(residue_types) / 2 for p in positions]) |
|
|
ax.set_xticklabels(shell_names) |
|
|
ax.legend() |
|
|
|
|
|
|
|
|
output_path = output_dir / f"shell_comparison_{prompt_key}.png" |
|
|
plt.savefig(output_path) |
|
|
print(f"Comparison visualization saved to {output_path}") |
|
|
|
|
|
return results, residue_patterns |
|
|
|
|
|
def analyze_residue_registry(): |
|
|
"""Analyze all recorded residue patterns.""" |
|
|
print("\n=== Residue Registry Analysis ===") |
|
|
|
|
|
|
|
|
analysis = executor.get_residue_analysis() |
|
|
|
|
|
|
|
|
output_path = output_dir / "residue_analysis.json" |
|
|
with open(output_path, "w") as f: |
|
|
json.dump(analysis, f, indent=2) |
|
|
|
|
|
print(f"Residue analysis saved to {output_path}") |
|
|
|
|
|
|
|
|
print(f"\nRecorded {analysis['num_patterns']} residue patterns of {len(analysis['types'])} types") |
|
|
|
|
|
print("\nPattern Types:") |
|
|
for pattern_type, stats in analysis["type_stats"].items(): |
|
|
print(f" {pattern_type}: {stats['count']} patterns, avg confidence: {stats['avg_confidence']:.2f}") |
|
|
if stats['examples']: |
|
|
print(f" Examples: {stats['examples'][0]['signature']} (conf: {stats['examples'][0]['confidence']:.2f})") |
|
|
|
|
|
print("\nRelated Patterns:") |
|
|
for i, relation in enumerate(analysis["related_patterns"][:3]): |
|
|
print(f" {i+1}. {relation['pattern1']} ({relation['type1']}) ~ {relation['pattern2']} ({relation['type2']})") |
|
|
print(f" Similarity: {relation['similarity']:.2f}") |
|
|
|
|
|
|
|
|
fig, ax = plt.subplots(figsize=(10, 6)) |
|
|
|
|
|
types = [] |
|
|
counts = [] |
|
|
confidences = [] |
|
|
|
|
|
for pattern_type, stats in analysis["type_stats"].items(): |
|
|
types.append(pattern_type) |
|
|
counts.append(stats["count"]) |
|
|
confidences.append(stats["avg_confidence"]) |
|
|
|
|
|
|
|
|
ax.bar(types, counts, alpha=0.7, label="Count") |
|
|
|
|
|
|
|
|
ax2 = ax.twinx() |
|
|
ax2.plot(types, confidences, 'r-', marker='o', label="Avg Confidence") |
|
|
ax2.set_ylim([0, 1.0]) |
|
|
ax2.set_ylabel("Average Confidence") |
|
|
|
|
|
ax.set_xlabel("Residue Type") |
|
|
ax.set_ylabel("Number of Patterns") |
|
|
ax.set_title("Residue Pattern Analysis") |
|
|
ax.set_xticklabels(types, rotation=45, ha="right") |
|
|
|
|
|
|
|
|
lines, labels = ax.get_legend_handles_labels() |
|
|
lines2, labels2 = ax2.get_legend_handles_labels() |
|
|
ax.legend(lines + lines2, labels + labels2, loc="upper left") |
|
|
|
|
|
plt.tight_layout() |
|
|
|
|
|
|
|
|
output_path = output_dir / "residue_analysis.png" |
|
|
plt.savefig(output_path) |
|
|
print(f"Residue analysis visualization saved to {output_path}") |
|
|
|
|
|
return analysis |
|
|
|
|
|
def main(): |
|
|
"""Run a complete analysis demonstration.""" |
|
|
print("=== glyphs Framework Demonstration ===") |
|
|
print(f"Results will be saved to: {output_dir}") |
|
|
|
|
|
|
|
|
attribution_map, glyph_map = run_attribution_analysis(prompt_key="memory") |
|
|
|
|
|
|
|
|
shell_result = run_shell |
|
|
|
|
|
shell_result = run_shell_analysis(shell_id="MEMTRACE", prompt_key="memory") |
|
|
|
|
|
|
|
|
value_shell_result = run_shell_analysis(shell_id="VALUE-COLLAPSE", prompt_key="value_conflict") |
|
|
|
|
|
|
|
|
recursive_result = run_recursive_shell(prompt_key="recursion") |
|
|
|
|
|
|
|
|
comparison_results, residue_patterns = compare_shells( |
|
|
prompt_key="value_conflict", |
|
|
shells=["VALUE-COLLAPSE", "FORK-ATTRIBUTION", "FEATURE-SUPERPOSITION"] |
|
|
) |
|
|
|
|
|
|
|
|
residue_analysis = analyze_residue_registry() |
|
|
|
|
|
|
|
|
print("\n=== Complete Analysis on 'boundary' prompt ===") |
|
|
boundary_prompt = test_prompts["boundary"] |
|
|
|
|
|
|
|
|
boundary_output = model.generate(boundary_prompt, max_tokens=800) |
|
|
boundary_attribution = tracer.trace( |
|
|
prompt=boundary_prompt, |
|
|
output=boundary_output, |
|
|
include_confidence=True |
|
|
) |
|
|
|
|
|
|
|
|
boundary_glyph_map = mapper.map_attribution( |
|
|
attribution_map=boundary_attribution, |
|
|
layout_type="force_directed", |
|
|
include_tokens=True |
|
|
) |
|
|
|
|
|
|
|
|
boundary_glyph_path = output_dir / "boundary_glyph_map.svg" |
|
|
mapper.visualize(boundary_glyph_map, output_path=str(boundary_glyph_path)) |
|
|
print(f"Boundary glyph map saved to {boundary_glyph_path}") |
|
|
|
|
|
|
|
|
boundary_shells = ["BOUNDARY-HESITATION", "GHOST-ACTIVATION", "META-COLLAPSE"] |
|
|
boundary_shell_results = {} |
|
|
|
|
|
for shell_id in boundary_shells: |
|
|
print(f"\nRunning {shell_id} on boundary prompt...") |
|
|
result = executor.run( |
|
|
shell=shell_id, |
|
|
model=model, |
|
|
prompt=boundary_prompt, |
|
|
trace_attribution=True, |
|
|
record_residue=True, |
|
|
visualize=True |
|
|
) |
|
|
boundary_shell_results[shell_id] = result |
|
|
|
|
|
|
|
|
if "visualization" in result and result["visualization"]: |
|
|
viz_path = output_dir / f"boundary_{shell_id}.svg" |
|
|
visualizer.save_visualization(result["visualization"], str(viz_path)) |
|
|
print(f"{shell_id} visualization saved to {viz_path}") |
|
|
|
|
|
|
|
|
recursive_shell = RecursiveShell( |
|
|
model=model, |
|
|
tracer=tracer, |
|
|
visualizer=visualizer |
|
|
) |
|
|
|
|
|
|
|
|
boundary_commands = [ |
|
|
".p/reflect.boundary{distinct=true, overlap=minimal}", |
|
|
".p/reflect.uncertainty{quantify=true, distribution=show}", |
|
|
".p/collapse.detect{threshold=0.6, alert=true}", |
|
|
".p/fork.attribution{sources=contested, visualize=true}" |
|
|
] |
|
|
|
|
|
|
|
|
print("\nExecuting recursive shell commands on boundary prompt...") |
|
|
boundary_recursive_result = recursive_shell.execute_sequence( |
|
|
commands=boundary_commands, |
|
|
prompt=boundary_prompt |
|
|
) |
|
|
|
|
|
|
|
|
boundary_summary = { |
|
|
"prompt": boundary_prompt, |
|
|
"output": boundary_output, |
|
|
"shells_run": boundary_shells, |
|
|
"recursive_commands": boundary_commands, |
|
|
"attribution_stats": { |
|
|
"links": len(boundary_attribution.links), |
|
|
"attribution_gaps": len(boundary_attribution.attribution_gaps), |
|
|
"collapsed_regions": len(boundary_attribution.collapsed_regions) |
|
|
}, |
|
|
"shell_results": { |
|
|
shell_id: { |
|
|
"residues": len(result["residues"]), |
|
|
"collapse_samples": len(result["collapse_samples"]) |
|
|
} |
|
|
for shell_id, result in boundary_shell_results.items() |
|
|
}, |
|
|
"recursive_success": boundary_recursive_result["success"] |
|
|
} |
|
|
|
|
|
|
|
|
boundary_summary_path = output_dir / "boundary_analysis_summary.json" |
|
|
with open(boundary_summary_path, "w") as f: |
|
|
json.dump(boundary_summary, f, indent=2) |
|
|
print(f"Boundary analysis summary saved to {boundary_summary_path}") |
|
|
|
|
|
|
|
|
boundary_residues = [] |
|
|
for shell_id, result in boundary_shell_results.items(): |
|
|
for residue in result["residues"]: |
|
|
boundary_residues.append({ |
|
|
"shell": shell_id, |
|
|
"type": residue.get("type", "unknown"), |
|
|
"confidence": residue.get("confidence", 0.0), |
|
|
"pattern": residue.get("pattern", "")[:50] + "..." |
|
|
}) |
|
|
|
|
|
|
|
|
if boundary_residues: |
|
|
from glyphs.residue.patterns import ResiduePattern |
|
|
residue_pattern_objects = [ |
|
|
ResiduePattern( |
|
|
type=r["type"], |
|
|
pattern=r["pattern"], |
|
|
context={"shell": r["shell"]}, |
|
|
signature=r["pattern"][:20], |
|
|
confidence=r["confidence"] |
|
|
) |
|
|
for r in boundary_residues |
|
|
] |
|
|
|
|
|
|
|
|
residue_glyph_map = mapper.map_residue_patterns( |
|
|
residue_patterns=residue_pattern_objects, |
|
|
layout_type="circular", |
|
|
cluster_patterns=True |
|
|
) |
|
|
|
|
|
|
|
|
residue_glyph_path = output_dir / "boundary_residue_glyphs.svg" |
|
|
mapper.visualize(residue_glyph_map, output_path=str(residue_glyph_path)) |
|
|
print(f"Boundary residue glyph map saved to {residue_glyph_path}") |
|
|
|
|
|
|
|
|
fig, axes = plt.subplots(2, 2, figsize=(15, 12)) |
|
|
fig.suptitle("glyphs Framework - Comprehensive Analysis", fontsize=16) |
|
|
|
|
|
|
|
|
axes[0, 0].bar( |
|
|
["Memory", "Value Conflict", "Boundary", "Recursion", "Polysemantic"], |
|
|
[ |
|
|
len(attribution_map.attribution_gaps), |
|
|
len(comparison_results["VALUE-COLLAPSE"]["attribution_map"].attribution_gaps) if "attribution_map" in comparison_results["VALUE-COLLAPSE"] else 0, |
|
|
len(boundary_attribution.attribution_gaps), |
|
|
0, |
|
|
0 |
|
|
] |
|
|
) |
|
|
axes[0, 0].set_title("Attribution Gaps by Prompt Type") |
|
|
axes[0, 0].set_xlabel("Prompt Type") |
|
|
axes[0, 0].set_ylabel("Number of Gaps") |
|
|
axes[0, 0].set_ylim(bottom=0) |
|
|
|
|
|
|
|
|
residue_types = list(residue_analysis["type_stats"].keys()) |
|
|
residue_counts = [residue_analysis["type_stats"][t]["count"] for t in residue_types] |
|
|
axes[0, 1].bar(residue_types, residue_counts) |
|
|
axes[0, 1].set_title("Residue Pattern Types") |
|
|
axes[0, 1].set_xlabel("Residue Type") |
|
|
axes[0, 1].set_ylabel("Count") |
|
|
axes[0, 1].set_ylim(bottom=0) |
|
|
axes[0, 1].set_xticklabels(residue_types, rotation=45, ha="right") |
|
|
|
|
|
|
|
|
shell_names = list(boundary_shell_results.keys()) |
|
|
residue_counts = [len(boundary_shell_results[s]["residues"]) for s in shell_names] |
|
|
collapse_counts = [len(boundary_shell_results[s]["collapse_samples"]) for s in shell_names] |
|
|
|
|
|
x = range(len(shell_names)) |
|
|
width = 0.35 |
|
|
axes[1, 0].bar([i - width/2 for i in x], residue_counts, width, label="Residues") |
|
|
axes[1, 0].bar([i + width/2 for i in x], collapse_counts, width, label="Collapses") |
|
|
axes[1, 0].set_title("Shell Analysis on Boundary Prompt") |
|
|
axes[1, 0].set_xlabel("Shell") |
|
|
axes[1, 0].set_ylabel("Count") |
|
|
axes[1, 0].set_xticks(x) |
|
|
axes[1, 0].set_xticklabels(shell_names, rotation=45, ha="right") |
|
|
axes[1, 0].legend() |
|
|
|
|
|
|
|
|
prompt_types = ["Memory", "Value Conflict", "Boundary", "Recursion", "Polysemantic"] |
|
|
shell_counts = [ |
|
|
len(executor.shells), |
|
|
len([s for s in executor.shells if "VALUE" in s or "CONFLICT" in s]), |
|
|
len([s for s in executor.shells if "BOUNDARY" in s or "GHOST" in s]), |
|
|
len([s for s in executor.shells if "REFLECTION" in s or "RECURSIVE" in s]), |
|
|
len([s for s in executor.shells if "FEATURE" in s or "SUPERPOSITION" in s]) |
|
|
] |
|
|
|
|
|
axes[1, 1].bar(prompt_types, shell_counts) |
|
|
axes[1, 1].set_title("Available Shells by Conceptual Area") |
|
|
axes[1, 1].set_xlabel("Conceptual Area") |
|
|
axes[1, 1].set_ylabel("Number of Specialized Shells") |
|
|
axes[1, 1].set_ylim(bottom=0) |
|
|
|
|
|
plt.tight_layout() |
|
|
plt.subplots_adjust(top=0.92) |
|
|
|
|
|
|
|
|
summary_viz_path = output_dir / "framework_analysis_summary.png" |
|
|
plt.savefig(summary_viz_path, dpi=300) |
|
|
print(f"Framework analysis summary visualization saved to {summary_viz_path}") |
|
|
|
|
|
print("\n=== Analysis Complete ===") |
|
|
print(f"All outputs saved to: {output_dir}") |
|
|
|
|
|
def attribution_demo(): |
|
|
"""Focused demo of advanced attribution tracing.""" |
|
|
print("\n=== Advanced Attribution Tracing Demo ===") |
|
|
|
|
|
|
|
|
prompt = test_prompts["polysemantic"] |
|
|
print(f"Prompt: {prompt[:100]}...") |
|
|
|
|
|
|
|
|
output = model.generate(prompt, max_tokens=800) |
|
|
print(f"Output: {output[:100]}...") |
|
|
|
|
|
|
|
|
attribution_map = tracer.trace( |
|
|
prompt=prompt, |
|
|
output=output, |
|
|
include_confidence=True |
|
|
) |
|
|
|
|
|
|
|
|
print("Tracing attribution forks...") |
|
|
fork_result = tracer.trace_with_forks( |
|
|
prompt=prompt, |
|
|
output=output, |
|
|
fork_factor=3, |
|
|
include_confidence=True, |
|
|
visualize=True |
|
|
) |
|
|
|
|
|
|
|
|
if fork_result["metadata"].get("visualization"): |
|
|
output_path = output_dir / "attribution_forks_polysemantic.svg" |
|
|
visualizer.save_visualization(fork_result["metadata"]["visualization"], str(output_path)) |
|
|
print(f"Fork visualization saved to {output_path}") |
|
|
|
|
|
|
|
|
print("Tracing QK alignment...") |
|
|
qk_result = tracer.trace_qk_alignment( |
|
|
prompt=prompt, |
|
|
output=output, |
|
|
visualize=True |
|
|
) |
|
|
|
|
|
|
|
|
if "visualization" in qk_result: |
|
|
output_path = output_dir / "qk_alignment_polysemantic.svg" |
|
|
visualizer.save_visualization(qk_result["visualization"], str(output_path)) |
|
|
print(f"QK alignment visualization saved to {output_path}") |
|
|
|
|
|
|
|
|
print("Tracing OV projection...") |
|
|
ov_result = tracer.trace_ov_projection( |
|
|
prompt=prompt, |
|
|
output=output, |
|
|
visualize=True |
|
|
) |
|
|
|
|
|
|
|
|
if "visualization" in ov_result: |
|
|
output_path = output_dir / "ov_projection_polysemantic.svg" |
|
|
visualizer.save_visualization(ov_result["visualization"], str(output_path)) |
|
|
print(f"OV projection visualization saved to {output_path}") |
|
|
|
|
|
|
|
|
print("Tracing attention heads...") |
|
|
heads_result = tracer.trace_attention_heads( |
|
|
prompt=prompt, |
|
|
output=output, |
|
|
visualize=True |
|
|
) |
|
|
|
|
|
|
|
|
if "visualization" in heads_result: |
|
|
output_path = output_dir / "attention_heads_polysemantic.svg" |
|
|
visualizer.save_visualization(heads_result["visualization"], str(output_path)) |
|
|
print(f"Attention heads visualization saved to {output_path}") |
|
|
|
|
|
|
|
|
attribution_summary = { |
|
|
"basic_attribution": { |
|
|
"links": len(attribution_map.links), |
|
|
"gaps": len(attribution_map.attribution_gaps), |
|
|
"collapsed_regions": len(attribution_map.collapsed_regions) |
|
|
}, |
|
|
"fork_attribution": { |
|
|
"forks": len(fork_result["fork_paths"]), |
|
|
"conflict_points": sum(len(fork.get("conflict_points", [])) for fork in fork_result["fork_paths"]) |
|
|
}, |
|
|
"qk_alignment": { |
|
|
"alignments": len(qk_result["qk_alignments"]), |
|
|
"patterns": list(qk_result["qk_patterns"].keys()) if "qk_patterns" in qk_result else [] |
|
|
}, |
|
|
"ov_projection": { |
|
|
"projections": len(ov_result["ov_projections"]), |
|
|
"patterns": list(ov_result["ov_patterns"].keys()) if "ov_patterns" in ov_result else [] |
|
|
}, |
|
|
"attention_heads": { |
|
|
"heads": len(heads_result["attention_heads"]), |
|
|
"patterns": list(heads_result["head_patterns"].keys()) if "head_patterns" in heads_result else [] |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
output_path = output_dir / "attribution_analysis_summary.json" |
|
|
with open(output_path, "w") as f: |
|
|
json.dump(attribution_summary, f, indent=2) |
|
|
print(f"Attribution analysis summary saved to {output_path}") |
|
|
|
|
|
return attribution_summary |
|
|
|
|
|
def recursive_shell_demo(): |
|
|
"""Focused demo of recursive shell commands.""" |
|
|
print("\n=== Recursive Shell Command Demo ===") |
|
|
|
|
|
|
|
|
recursive_shell = RecursiveShell( |
|
|
model=model, |
|
|
tracer=tracer, |
|
|
visualizer=visualizer |
|
|
) |
|
|
|
|
|
|
|
|
prompt = test_prompts["recursion"] |
|
|
print(f"Prompt: {prompt[:100]}...") |
|
|
|
|
|
|
|
|
help_text = recursive_shell.get_command_help() |
|
|
output_path = output_dir / "recursive_shell_help.txt" |
|
|
with open(output_path, "w") as f: |
|
|
f.write(help_text) |
|
|
print(f"Recursive shell help text saved to {output_path}") |
|
|
|
|
|
|
|
|
demo_commands = [ |
|
|
|
|
|
".p/reflect.trace{depth=3, target=reasoning}", |
|
|
".p/reflect.attribution{sources=all, confidence=true}", |
|
|
".p/reflect.uncertainty{quantify=true, distribution=show}", |
|
|
".p/reflect.boundary{distinct=true, overlap=minimal}", |
|
|
|
|
|
|
|
|
".p/collapse.detect{threshold=0.7, alert=true}", |
|
|
".p/collapse.prevent{trigger=recursive_depth, threshold=5}", |
|
|
".p/collapse.recover{from=loop, method=gradual}", |
|
|
|
|
|
|
|
|
".p/fork.attribution{sources=all, visualize=true}", |
|
|
".p/fork.counterfactual{variants=['optimistic', 'pessimistic'], compare=true}", |
|
|
|
|
|
|
|
|
".p/shell.isolate{boundary=permeable, contamination=warn}" |
|
|
] |
|
|
|
|
|
|
|
|
command_results = {} |
|
|
for cmd in demo_commands: |
|
|
print(f"\nExecuting: {cmd}") |
|
|
try: |
|
|
result = recursive_shell.execute( |
|
|
command=cmd, |
|
|
prompt=prompt |
|
|
) |
|
|
command_results[cmd] = { |
|
|
"success": result["success"], |
|
|
"execution_time": result.get("execution_time", 0), |
|
|
"error": result["error"] if not result["success"] else None |
|
|
} |
|
|
|
|
|
|
|
|
if (result["success"] and |
|
|
"result" in result and |
|
|
"visualization" in result["result"] and |
|
|
result["result"]["visualization"]): |
|
|
|
|
|
|
|
|
safe_cmd = cmd.replace(".", "_").replace("/", "_").replace("{", "_").replace("}", "_") |
|
|
output_path = output_dir / f"recursive_cmd_{safe_cmd}.svg" |
|
|
visualizer.save_visualization(result["result"]["visualization"], str(output_path)) |
|
|
print(f"Command visualization saved to {output_path}") |
|
|
|
|
|
|
|
|
command_results[cmd]["visualization_path"] = str(output_path) |
|
|
except Exception as e: |
|
|
print(f"Error executing {cmd}: {e}") |
|
|
command_results[cmd] = { |
|
|
"success": False, |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
|
|
|
output_path = output_dir / "recursive_commands_summary.json" |
|
|
with open(output_path, "w") as f: |
|
|
json.dump(command_results, f, indent=2) |
|
|
print(f"Recursive commands summary saved to {output_path}") |
|
|
|
|
|
|
|
|
plt.figure(figsize=(12, 8)) |
|
|
|
|
|
|
|
|
cmd_names = list(command_results.keys()) |
|
|
cmd_success = [1 if command_results[cmd]["success"] else 0 for cmd in cmd_names] |
|
|
cmd_times = [command_results[cmd].get("execution_time", 0) if command_results[cmd]["success"] else 0 for cmd in cmd_names] |
|
|
|
|
|
plt.barh(range(len(cmd_names)), cmd_success, alpha=0.7, label="Success") |
|
|
plt.barh(range(len(cmd_names)), cmd_times, alpha=0.5, label="Execution Time (s)") |
|
|
|
|
|
plt.yticks(range(len(cmd_names)), [cmd[:30] + "..." for cmd in cmd_names]) |
|
|
plt.xlabel("Success / Execution Time (s)") |
|
|
plt.title("Recursive Shell Command Execution") |
|
|
plt.legend() |
|
|
plt.tight_layout() |
|
|
|
|
|
|
|
|
output_path = output_dir / "recursive_commands_visualization.png" |
|
|
plt.savefig(output_path) |
|
|
print(f"Recursive commands visualization saved to {output_path}") |
|
|
|
|
|
return command_results |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
output_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
main() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|