|
|
""" |
|
|
shell_executor.py |
|
|
|
|
|
Core shell execution module for the glyphs framework. |
|
|
This module implements the diagnostic shell system that creates controlled |
|
|
environments for tracing model behavior through symbolic residue patterns. |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import time |
|
|
import json |
|
|
import re |
|
|
from typing import Dict, List, Optional, Tuple, Union, Any |
|
|
from pathlib import Path |
|
|
import yaml |
|
|
import hashlib |
|
|
|
|
|
from ..models.adapter import ModelAdapter |
|
|
from ..utils.attribution_utils import AttributionTracer |
|
|
from ..residue.patterns import ResiduePattern, ResidueRegistry |
|
|
from ..utils.visualization_utils import VisualizationEngine |
|
|
|
|
|
|
|
|
logger = logging.getLogger("glyphs.shell_executor") |
|
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
|
|
|
class ShellExecutor: |
|
|
""" |
|
|
Core shell execution module for the glyphs framework. |
|
|
|
|
|
This class implements the diagnostic shell system that creates controlled |
|
|
environments for tracing model behavior through symbolic residue patterns. |
|
|
Each shell is a specialized environment designed to induce and analyze |
|
|
specific patterns in model cognition. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
config: Optional[Dict[str, Any]] = None, |
|
|
tracer: Optional[AttributionTracer] = None, |
|
|
visualizer: Optional[VisualizationEngine] = None, |
|
|
shell_defs_path: Optional[str] = None |
|
|
): |
|
|
""" |
|
|
Initialize the shell executor. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
config : Optional[Dict[str, Any]] |
|
|
Configuration parameters for the executor |
|
|
tracer : Optional[AttributionTracer] |
|
|
Attribution tracer to use |
|
|
visualizer : Optional[VisualizationEngine] |
|
|
Visualization engine to use |
|
|
shell_defs_path : Optional[str] |
|
|
Path to shell definitions directory |
|
|
""" |
|
|
self.config = config or {} |
|
|
self.tracer = tracer |
|
|
self.visualizer = visualizer |
|
|
|
|
|
|
|
|
self.shells = {} |
|
|
self._load_shell_definitions(shell_defs_path) |
|
|
|
|
|
|
|
|
self.residue_registry = ResidueRegistry() |
|
|
|
|
|
|
|
|
self.execution_history = [] |
|
|
|
|
|
logger.info("Shell executor initialized") |
|
|
|
|
|
def _load_shell_definitions(self, shell_defs_path: Optional[str] = None): |
|
|
"""Load shell definitions from YAML files.""" |
|
|
if shell_defs_path: |
|
|
|
|
|
shell_defs_dir = Path(shell_defs_path) |
|
|
else: |
|
|
|
|
|
shell_defs_dir = Path(__file__).parent / "shell_defs" |
|
|
|
|
|
if not shell_defs_dir.exists(): |
|
|
logger.warning(f"Shell definitions directory not found: {shell_defs_dir}") |
|
|
return |
|
|
|
|
|
|
|
|
for yaml_file in shell_defs_dir.glob("*.yml"): |
|
|
try: |
|
|
with open(yaml_file, "r") as f: |
|
|
shell_defs = yaml.safe_load(f) |
|
|
|
|
|
|
|
|
for shell_id, shell_def in shell_defs.items(): |
|
|
self.shells[shell_id] = shell_def |
|
|
logger.info(f"Loaded shell: {shell_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"Error loading shell definitions from {yaml_file}: {e}") |
|
|
|
|
|
|
|
|
if not self.shells: |
|
|
self._load_default_shells() |
|
|
|
|
|
def _load_default_shells(self): |
|
|
"""Load default shell definitions.""" |
|
|
|
|
|
self.shells["MEMTRACE"] = { |
|
|
"id": "MEMTRACE", |
|
|
"type": "memory_trace", |
|
|
"description": "Probe latent token traces in decayed memory", |
|
|
"failure_signature": "Decay → Hallucination", |
|
|
"operations": [ |
|
|
{ |
|
|
"type": "model.generate", |
|
|
"parameters": { |
|
|
"prompt_prefix": "This is a memory-intensive task that requires holding information across a long sequence. Please carefully track the following details:", |
|
|
"temperature": 0.7, |
|
|
"max_tokens": 800 |
|
|
}, |
|
|
"update_prompt": True |
|
|
}, |
|
|
{ |
|
|
"type": "reflect.trace", |
|
|
"parameters": { |
|
|
"target": "memory", |
|
|
"depth": 3, |
|
|
"detailed": True |
|
|
} |
|
|
}, |
|
|
{ |
|
|
"type": "ghostcircuit.identify", |
|
|
"parameters": { |
|
|
"sensitivity": 0.8, |
|
|
"threshold": 0.2, |
|
|
"trace_type": "memory" |
|
|
} |
|
|
} |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
self.shells["VALUE-COLLAPSE"] = { |
|
|
"id": "VALUE-COLLAPSE", |
|
|
"type": "value_collapse", |
|
|
"description": "Examine competing value activations", |
|
|
"failure_signature": "Conflict null", |
|
|
"operations": [ |
|
|
{ |
|
|
"type": "model.generate", |
|
|
"parameters": { |
|
|
"prompt_prefix": "Consider two competing perspectives on this issue, evaluating both the potential benefits and harms:", |
|
|
"temperature": 0.7, |
|
|
"max_tokens": 800 |
|
|
}, |
|
|
"update_prompt": True |
|
|
}, |
|
|
{ |
|
|
"type": "reflect.attribution", |
|
|
"parameters": { |
|
|
"sources": "contested", |
|
|
"confidence": True |
|
|
} |
|
|
}, |
|
|
{ |
|
|
"type": "collapse.detect", |
|
|
"parameters": { |
|
|
"threshold": 0.6, |
|
|
"alert": True |
|
|
} |
|
|
} |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
self.shells["LAYER-SALIENCE"] = { |
|
|
"id": "LAYER-SALIENCE", |
|
|
"type": "layer_salience", |
|
|
"description": "Map attention salience and signal attenuation", |
|
|
"failure_signature": "Signal fade", |
|
|
"operations": [ |
|
|
{ |
|
|
"type": "model.generate", |
|
|
"parameters": { |
|
|
"prompt_prefix": "This analysis requires tracking relationships between multiple concepts across a complex domain:", |
|
|
"temperature": 0.5, |
|
|
"max_tokens": 800 |
|
|
}, |
|
|
"update_prompt": True |
|
|
}, |
|
|
{ |
|
|
"type": "reflect.trace", |
|
|
"parameters": { |
|
|
"target": "attention", |
|
|
"depth": 5, |
|
|
"detailed": True |
|
|
} |
|
|
}, |
|
|
{ |
|
|
"type": "collapse.detect", |
|
|
"parameters": { |
|
|
"threshold": 0.5, |
|
|
"alert": True |
|
|
} |
|
|
} |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
self.shells["TEMPORAL-INFERENCE"] = { |
|
|
"id": "TEMPORAL-INFERENCE", |
|
|
"type": "temporal_inference", |
|
|
"description": "Test temporal coherence in autoregression", |
|
|
"failure_signature": "Induction drift", |
|
|
"operations": [ |
|
|
{ |
|
|
"type": "model.generate", |
|
|
"parameters": { |
|
|
"prompt_prefix": "Track the following sequence of events in chronological order, ensuring that the temporal relationships remain consistent:", |
|
|
"temperature": 0.6, |
|
|
"max_tokens": 800 |
|
|
}, |
|
|
"update_prompt": True |
|
|
}, |
|
|
{ |
|
|
"type": "reflect.trace", |
|
|
"parameters": { |
|
|
"target": "reasoning", |
|
|
"depth": 4, |
|
|
"detailed": True |
|
|
} |
|
|
}, |
|
|
{ |
|
|
"type": "ghostcircuit.identify", |
|
|
"parameters": { |
|
|
"sensitivity": 0.7, |
|
|
"threshold": 0.3, |
|
|
"trace_type": "temporal" |
|
|
} |
|
|
} |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
self.shells["INSTRUCTION-DISRUPTION"] = { |
|
|
"id": "INSTRUCTION-DISRUPTION", |
|
|
"type": "instruction_disruption", |
|
|
"description": "Examine instruction conflict resolution", |
|
|
"failure_signature": "Prompt blur", |
|
|
"operations": [ |
|
|
{ |
|
|
"type": "model.generate", |
|
|
"parameters": { |
|
|
"prompt_prefix": "Consider these potentially conflicting instructions: First, prioritize brevity. Second, include comprehensive details. Third, focus only on key highlights.", |
|
|
"temperature": 0.7, |
|
|
"max_tokens": 800 |
|
|
}, |
|
|
"update_prompt": True |
|
|
}, |
|
|
{ |
|
|
"type": "reflect.trace", |
|
|
"parameters": { |
|
|
"target": "reasoning", |
|
|
"depth": 3, |
|
|
"detailed": True |
|
|
} |
|
|
}, |
|
|
{ |
|
|
"type": "fork.attribution", |
|
|
"parameters": { |
|
|
"sources": "all", |
|
|
"visualize": True |
|
|
} |
|
|
} |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
self.shells["FEATURE-SUPERPOSITION"] = { |
|
|
"id": "FEATURE-SUPERPOSITION", |
|
|
"type": "feature_superposition", |
|
|
"description": "Analyze polysemantic features", |
|
|
"failure_signature": "Feature overfit", |
|
|
"operations": [ |
|
|
{ |
|
|
"type": "model.generate", |
|
|
"parameters": { |
|
|
"prompt_prefix": "Consider terms that have multiple meanings across different contexts:", |
|
|
"temperature": 0.7, |
|
|
"max_tokens": 800 |
|
|
}, |
|
|
"update_prompt": True |
|
|
}, |
|
|
{ |
|
|
"type": "reflect.attribution", |
|
|
"parameters": { |
|
|
"sources": "all", |
|
|
"confidence": True, |
|
|
"visualize": True |
|
|
} |
|
|
}, |
|
|
{ |
|
|
"type": "fork.attribution", |
|
|
"parameters": { |
|
|
"sources": "all", |
|
|
"visualize": True |
|
|
} |
|
|
} |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
self.shells["CIRCUIT-FRAGMENT"] = { |
|
|
"id": "CIRCUIT-FRAGMENT", |
|
|
"type": "circuit_fragment", |
|
|
"description": "Examine circuit fragmentation", |
|
|
"failure_signature": "Orphan nodes", |
|
|
"operations": [ |
|
|
{ |
|
|
"type": "model.generate", |
|
|
"parameters": { |
|
|
"prompt_prefix": "Develop a complex multi-step reasoning chain to solve this problem:", |
|
|
"temperature": 0.5, |
|
|
"max_tokens": 1000 |
|
|
}, |
|
|
"update_prompt": True |
|
|
}, |
|
|
{ |
|
|
"type": "reflect.trace", |
|
|
"parameters": { |
|
|
"target": "reasoning", |
|
|
"depth": "complete", |
|
|
"detailed": True |
|
|
} |
|
|
}, |
|
|
{ |
|
|
"type": "ghostcircuit.identify", |
|
|
"parameters": { |
|
|
"sensitivity": 0.9, |
|
|
"threshold": 0.1, |
|
|
"trace_type": "full" |
|
|
} |
|
|
} |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
self.shells["META-COLLAPSE"] = { |
|
|
"id": "META-COLLAPSE", |
|
|
"type": "meta_collapse", |
|
|
"description": "Examine meta-cognitive collapse", |
|
|
"failure_signature": "Reflection depth collapse", |
|
|
"operations": [ |
|
|
{ |
|
|
"type": "model.generate", |
|
|
"parameters": { |
|
|
"prompt_prefix": "Reflect deeply on your own reasoning process as you solve this problem. Consider the meta-level principles guiding your approach:", |
|
|
"temperature": 0.6, |
|
|
"max_tokens": 1000 |
|
|
}, |
|
|
"update_prompt": True |
|
|
}, |
|
|
{ |
|
|
"type": "reflect.trace", |
|
|
"parameters": { |
|
|
"target": "reasoning", |
|
|
"depth": 5, |
|
|
"detailed": True |
|
|
} |
|
|
}, |
|
|
{ |
|
|
"type": "reflect.agent", |
|
|
"parameters": { |
|
|
"identity": "stable", |
|
|
"simulation": "explicit" |
|
|
} |
|
|
}, |
|
|
{ |
|
|
"type": "collapse.detect", |
|
|
"parameters": { |
|
|
"threshold": 0.7, |
|
|
"alert": True |
|
|
} |
|
|
} |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
self.shells["REFLECTION-COLLAPSE"] = { |
|
|
"id": "REFLECTION-COLLAPSE", |
|
|
"type": "reflection_collapse", |
|
|
"description": "Analyze failure in deep reflection chains", |
|
|
"failure_signature": "Reflection depth collapse", |
|
|
"operations": [ |
|
|
{ |
|
|
"type": "model.generate", |
|
|
"parameters": { |
|
|
"prompt_prefix": "Reflect on your reflection process. Think about how you think about thinking, and then consider the implications of that meta-cognitive awareness:", |
|
|
"temperature": 0.6, |
|
|
"max_tokens": 1000 |
|
|
}, |
|
|
"update_prompt": True |
|
|
}, |
|
|
{ |
|
|
"type": "reflect.trace", |
|
|
"parameters": { |
|
|
"target": "reasoning", |
|
|
"depth": "complete", |
|
|
"detailed": True |
|
|
} |
|
|
}, |
|
|
{ |
|
|
"type": "collapse.prevent", |
|
|
"parameters": { |
|
|
"trigger": "recursive_depth", |
|
|
"threshold": 7 |
|
|
} |
|
|
} |
|
|
] |
|
|
} |
|
|
|
|
|
logger.info(f"Loaded {len(self.shells)} default shells") |
|
|
|
|
|
def run( |
|
|
self, |
|
|
shell: Union[str, Dict[str, Any]], |
|
|
model: ModelAdapter, |
|
|
prompt: str, |
|
|
parameters: Optional[Dict[str, Any]] = None, |
|
|
trace_attribution: bool = True, |
|
|
record_residue: bool = True, |
|
|
visualize: bool = False |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Run a diagnostic shell on a model. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
shell : Union[str, Dict[str, Any]] |
|
|
Shell ID or shell definition |
|
|
model : ModelAdapter |
|
|
Model adapter for the target model |
|
|
prompt : str |
|
|
Input prompt for the shell |
|
|
parameters : Optional[Dict[str, Any]] |
|
|
Additional parameters to override shell defaults |
|
|
trace_attribution : bool |
|
|
Whether to trace attribution |
|
|
record_residue : bool |
|
|
Whether to record residue patterns |
|
|
visualize : bool |
|
|
Whether to generate visualizations |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Dict[str, Any] |
|
|
Shell execution result |
|
|
""" |
|
|
execution_start = time.time() |
|
|
|
|
|
|
|
|
if isinstance(shell, str): |
|
|
|
|
|
if shell not in self.shells: |
|
|
raise ValueError(f"Unknown shell: {shell}") |
|
|
shell_def = self.shells[shell] |
|
|
shell_id = shell |
|
|
else: |
|
|
|
|
|
shell_def = shell |
|
|
shell_id = shell_def.get("id", "custom") |
|
|
|
|
|
logger.info(f"Running shell: {shell_id}") |
|
|
|
|
|
|
|
|
from ..attribution.symbolic_engine import SymbolicEngine |
|
|
engine = SymbolicEngine( |
|
|
config={ |
|
|
"residue_sensitivity": self.config.get("residue_sensitivity", 0.7), |
|
|
"collapse_threshold": self.config.get("collapse_threshold", 0.8), |
|
|
"attribution_depth": self.config.get("attribution_depth", 5), |
|
|
"trace_attribution": trace_attribution, |
|
|
"generate_visualization": visualize |
|
|
}, |
|
|
tracer=self.tracer, |
|
|
visualizer=self.visualizer |
|
|
) |
|
|
|
|
|
|
|
|
trace = engine.execute_shell( |
|
|
shell_def=shell_def, |
|
|
model=model, |
|
|
prompt=prompt, |
|
|
parameters=parameters |
|
|
) |
|
|
|
|
|
|
|
|
result = { |
|
|
"shell_id": shell_id, |
|
|
"prompt": prompt, |
|
|
"output": trace.output, |
|
|
"operations": trace.operations, |
|
|
"residues": trace.residues, |
|
|
"attribution": trace.attribution_map, |
|
|
"collapse_samples": [ |
|
|
{ |
|
|
"position": sample.position, |
|
|
"type": sample.collapse_type.value, |
|
|
"confidence": sample.confidence, |
|
|
"context": " ".join(sample.context_window), |
|
|
"residue": sample.residue |
|
|
} |
|
|
for sample in trace.collapse_samples |
|
|
], |
|
|
"visualization": trace.visualization_data, |
|
|
"metadata": { |
|
|
**trace.metadata, |
|
|
"execution_time": time.time() - execution_start |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if record_residue and trace.residues: |
|
|
for residue in trace.residues: |
|
|
pattern = ResiduePattern( |
|
|
type=residue.get("type", "unknown"), |
|
|
pattern=residue.get("pattern", ""), |
|
|
context={ |
|
|
"shell_id": shell_id, |
|
|
"prompt": prompt, |
|
|
"position": residue.get("position", -1), |
|
|
"operation": residue.get("operation", "") |
|
|
}, |
|
|
signature=residue.get("signature", ""), |
|
|
confidence=residue.get("confidence", 0.5) |
|
|
) |
|
|
self.residue_registry.register(pattern) |
|
|
|
|
|
|
|
|
self.execution_history.append({ |
|
|
"shell_id": shell_id, |
|
|
"timestamp": execution_start, |
|
|
"prompt": prompt, |
|
|
"result_summary": { |
|
|
"output_length": len(trace.output), |
|
|
"num_operations": len(trace.operations), |
|
|
"num_residues": len(trace.residues), |
|
|
"num_collapses": len(trace.collapse_samples), |
|
|
"execution_time": time.time() - execution_start |
|
|
} |
|
|
}) |
|
|
|
|
|
logger.info(f"Shell execution completed: {shell_id}") |
|
|
return result |
|
|
|
|
|
def run_all_shells( |
|
|
self, |
|
|
model: ModelAdapter, |
|
|
prompt: str, |
|
|
shell_ids: Optional[List[str]] = None, |
|
|
parameters: Optional[Dict[str, Any]] = None, |
|
|
trace_attribution: bool = True, |
|
|
record_residue: bool = True, |
|
|
visualize: bool = False |
|
|
) -> Dict[str, Dict[str, Any]]: |
|
|
""" |
|
|
Run multiple diagnostic shells on a model. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
model : ModelAdapter |
|
|
Model adapter for the target model |
|
|
prompt : str |
|
|
Input prompt for the shells |
|
|
shell_ids : Optional[List[str]] |
|
|
List of shell IDs to run, if None runs all shells |
|
|
parameters : Optional[Dict[str, Any]] |
|
|
Additional parameters to override shell defaults |
|
|
trace_attribution : bool |
|
|
Whether to trace attribution |
|
|
record_residue : bool |
|
|
Whether to record residue patterns |
|
|
visualize : bool |
|
|
Whether to generate visualizations |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Dict[str, Dict[str, Any]] |
|
|
Shell execution results by shell ID |
|
|
""" |
|
|
results = {} |
|
|
|
|
|
|
|
|
if shell_ids is None: |
|
|
|
|
|
run_shells = list(self.shells.keys()) |
|
|
else: |
|
|
|
|
|
run_shells = [ |
|
|
shell_id for shell_id in shell_ids |
|
|
if shell_id in self.shells |
|
|
] |
|
|
if len(run_shells) < len(shell_ids): |
|
|
unknown_shells = set(shell_ids) - set(run_shells) |
|
|
logger.warning(f"Unknown shells: {unknown_shells}") |
|
|
|
|
|
logger.info(f"Running {len(run_shells)} shells") |
|
|
|
|
|
|
|
|
for shell_id in run_shells: |
|
|
try: |
|
|
results[shell_id] = self.run( |
|
|
shell=shell_id, |
|
|
model=model, |
|
|
prompt=prompt, |
|
|
parameters=parameters, |
|
|
trace_attribution=trace_attribution, |
|
|
record_residue=record_residue, |
|
|
visualize=visualize |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Error running shell {shell_id}: {e}") |
|
|
results[shell_id] = { |
|
|
"shell_id": shell_id, |
|
|
"prompt": prompt, |
|
|
"error": str(e), |
|
|
"metadata": { |
|
|
"success": False, |
|
|
"timestamp": time.time() |
|
|
} |
|
|
} |
|
|
|
|
|
return results |
|
|
|
|
|
def get_residue_analysis(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Get analysis of collected residue patterns. |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Dict[str, Any] |
|
|
Analysis of residue patterns |
|
|
""" |
|
|
patterns = self.residue_registry.get_all_patterns() |
|
|
|
|
|
if not patterns: |
|
|
return { |
|
|
"num_patterns": 0, |
|
|
"analysis": "No residue patterns collected" |
|
|
} |
|
|
|
|
|
|
|
|
patterns_by_type = {} |
|
|
for pattern in patterns: |
|
|
if pattern.type not in patterns_by_type: |
|
|
patterns_by_type[pattern.type] = [] |
|
|
patterns_by_type[pattern.type].append(pattern) |
|
|
|
|
|
|
|
|
type_stats = {} |
|
|
for pattern_type, type_patterns in patterns_by_type.items(): |
|
|
avg_confidence = sum(p.confidence for p in type_patterns) / len(type_patterns) |
|
|
type_stats[pattern_type] = { |
|
|
"count": len(type_patterns), |
|
|
"avg_confidence": avg_confidence, |
|
|
"examples": [ |
|
|
{ |
|
|
"signature": p.signature[:50] + "..." if len(p.signature) > 50 else p.signature, |
|
|
"confidence": p.confidence |
|
|
} |
|
|
for p in sorted(type_patterns, key=lambda p: p.confidence, reverse=True)[:3] |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
related_patterns = [] |
|
|
for i in range(len(patterns)): |
|
|
for j in range(i + 1, len(patterns)): |
|
|
p1 = patterns[i] |
|
|
p2 = patterns[j] |
|
|
|
|
|
|
|
|
similarity = self._calculate_signature_similarity(p1.signature, p2.signature) |
|
|
|
|
|
if similarity > 0.7: |
|
|
related_patterns.append({ |
|
|
"pattern1": p1.signature[:30] + "...", |
|
|
"pattern2": p2.signature[:30] + "...", |
|
|
"type1": p1.type, |
|
|
"type2": p2.type, |
|
|
"similarity": similarity |
|
|
}) |
|
|
|
|
|
return { |
|
|
"num_patterns": len(patterns), |
|
|
"types": list(patterns_by_type.keys()), |
|
|
"type_stats": type_stats, |
|
|
"related_patterns": related_patterns[:10], |
|
|
"timestamp": time.time() |
|
|
} |
|
|
|
|
|
def get_shell_info(self, shell_id: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Get information about a shell. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
shell_id : str |
|
|
Shell ID |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Dict[str, Any] |
|
|
Shell information |
|
|
""" |
|
|
if shell_id not in self.shells: |
|
|
raise ValueError(f"Unknown shell: {shell_id}") |
|
|
|
|
|
shell = self.shells[shell_id] |
|
|
|
|
|
|
|
|
op_counts = {} |
|
|
for op in shell.get("operations", []): |
|
|
op_type = op.get("type", "unknown") |
|
|
if op_type not in op_counts: |
|
|
op_counts[op_type] = 0 |
|
|
op_counts[op_type] += 1 |
|
|
|
|
|
return { |
|
|
"id": shell_id, |
|
|
"type": shell.get("type", "unknown"), |
|
|
"description": shell.get("description", ""), |
|
|
"failure_signature": shell.get("failure_signature", ""), |
|
|
"num_operations": len(shell.get("operations", [])), |
|
|
"operation_types": op_counts |
|
|
} |
|
|
|
|
|
def get_all_shell_info(self) -> Dict[str, Dict[str, Any]]: |
|
|
""" |
|
|
Get information about all shells. |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Dict[str, Dict[str, Any]] |
|
|
Shell information by shell ID |
|
|
""" |
|
|
return { |
|
|
shell_id: self.get_shell_info(shell_id) |
|
|
for shell_id in self.shells |
|
|
} |
|
|
|
|
|
def register_shell(self, shell_def: Dict[str, Any]) -> str: |
|
|
""" |
|
|
Register a new shell. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
shell_def : Dict[str, Any] |
|
|
Shell definition |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
str |
|
|
Shell ID |
|
|
""" |
|
|
|
|
|
if "id" not in shell_def: |
|
|
|
|
|
shell_def_str = json.dumps(shell_def, sort_keys=True) |
|
|
shell_id = f"SHELL_{hashlib.md5(shell_def_str.encode()).hexdigest()[:8]}" |
|
|
shell_def["id"] = shell_id |
|
|
else: |
|
|
shell_id = shell_def["id"] |
|
|
|
|
|
|
|
|
required_fields = ["operations"] |
|
|
for field in required_fields: |
|
|
|
|
|
required_fields = ["operations"] |
|
|
for field in required_fields: |
|
|
if field not in shell_def: |
|
|
raise ValueError(f"Missing required field: {field}") |
|
|
|
|
|
|
|
|
if not isinstance(shell_def["operations"], list): |
|
|
raise ValueError("Operations must be a list") |
|
|
|
|
|
|
|
|
for op in shell_def["operations"]: |
|
|
if "type" not in op: |
|
|
raise ValueError("Each operation must have a type") |
|
|
|
|
|
|
|
|
self.shells[shell_id] = shell_def |
|
|
logger.info(f"Registered shell: {shell_id}") |
|
|
|
|
|
return shell_id |
|
|
|
|
|
def unregister_shell(self, shell_id: str) -> bool: |
|
|
""" |
|
|
Unregister a shell. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
shell_id : str |
|
|
Shell ID |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
bool |
|
|
Whether the shell was unregistered |
|
|
""" |
|
|
if shell_id in self.shells: |
|
|
del self.shells[shell_id] |
|
|
logger.info(f"Unregistered shell: {shell_id}") |
|
|
return True |
|
|
else: |
|
|
logger.warning(f"Unknown shell: {shell_id}") |
|
|
return False |
|
|
|
|
|
def save_shell_definitions(self, output_path: str) -> str: |
|
|
""" |
|
|
Save shell definitions to a YAML file. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
output_path : str |
|
|
Path to save shell definitions to |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
str |
|
|
Path to saved file |
|
|
""" |
|
|
|
|
|
output_dir = Path(output_path).parent |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
with open(output_path, "w") as f: |
|
|
yaml.dump(self.shells, f, sort_keys=False, indent=2) |
|
|
|
|
|
logger.info(f"Saved {len(self.shells)} shell definitions to {output_path}") |
|
|
return output_path |
|
|
|
|
|
def load_shell_definitions(self, input_path: str) -> int: |
|
|
""" |
|
|
Load shell definitions from a YAML file. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
input_path : str |
|
|
Path to load shell definitions from |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
int |
|
|
Number of shells loaded |
|
|
""" |
|
|
|
|
|
with open(input_path, "r") as f: |
|
|
shell_defs = yaml.safe_load(f) |
|
|
|
|
|
|
|
|
count = 0 |
|
|
for shell_id, shell_def in shell_defs.items(): |
|
|
try: |
|
|
self.register_shell(shell_def) |
|
|
count += 1 |
|
|
except Exception as e: |
|
|
logger.error(f"Error registering shell {shell_id}: {e}") |
|
|
|
|
|
logger.info(f"Loaded {count} shell definitions from {input_path}") |
|
|
return count |
|
|
|
|
|
def create_custom_shell( |
|
|
self, |
|
|
shell_id: str, |
|
|
shell_type: str, |
|
|
description: str, |
|
|
failure_signature: str, |
|
|
operations: List[Dict[str, Any]] |
|
|
) -> str: |
|
|
""" |
|
|
Create a custom shell with the specified parameters. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
shell_id : str |
|
|
Shell ID |
|
|
shell_type : str |
|
|
Shell type |
|
|
description : str |
|
|
Shell description |
|
|
failure_signature : str |
|
|
Failure signature |
|
|
operations : List[Dict[str, Any]] |
|
|
List of operations |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
str |
|
|
Shell ID |
|
|
""" |
|
|
|
|
|
shell_def = { |
|
|
"id": shell_id, |
|
|
"type": shell_type, |
|
|
"description": description, |
|
|
"failure_signature": failure_signature, |
|
|
"operations": operations |
|
|
} |
|
|
|
|
|
|
|
|
return self.register_shell(shell_def) |
|
|
|
|
|
def _calculate_signature_similarity( |
|
|
self, |
|
|
signature1: str, |
|
|
signature2: str |
|
|
) -> float: |
|
|
"""Calculate similarity between two signatures.""" |
|
|
|
|
|
sig1 = signature1.lower() |
|
|
sig2 = signature2.lower() |
|
|
|
|
|
|
|
|
max_len = max(len(sig1), len(sig2)) |
|
|
if max_len == 0: |
|
|
return 1.0 |
|
|
|
|
|
lev_dist = self._levenshtein_distance(sig1, sig2) |
|
|
sim = 1.0 - (lev_dist / max_len) |
|
|
|
|
|
|
|
|
common_prefix_len = 0 |
|
|
for i in range(min(len(sig1), len(sig2))): |
|
|
if sig1[i] == sig2[i]: |
|
|
common_prefix_len += 1 |
|
|
else: |
|
|
break |
|
|
|
|
|
prefix_boost = 0.0 |
|
|
if common_prefix_len > 3: |
|
|
prefix_boost = min(0.2, common_prefix_len / max_len) |
|
|
|
|
|
return min(1.0, sim + prefix_boost) |
|
|
|
|
|
def _levenshtein_distance(self, s1: str, s2: str) -> int: |
|
|
"""Calculate Levenshtein distance between two strings.""" |
|
|
if len(s1) < len(s2): |
|
|
return self._levenshtein_distance(s2, s1) |
|
|
|
|
|
if len(s2) == 0: |
|
|
return len(s1) |
|
|
|
|
|
previous_row = range(len(s2) + 1) |
|
|
for i, c1 in enumerate(s1): |
|
|
current_row = [i + 1] |
|
|
for j, c2 in enumerate(s2): |
|
|
insertions = previous_row[j + 1] + 1 |
|
|
deletions = current_row[j] + 1 |
|
|
substitutions = previous_row[j] + (c1 != c2) |
|
|
current_row.append(min(insertions, deletions, substitutions)) |
|
|
previous_row = current_row |
|
|
|
|
|
return previous_row[-1] |
|
|
|
|
|
|
|
|
class RecursiveShellParser: |
|
|
""" |
|
|
Parser for recursive shell commands using the .p/ syntax. |
|
|
|
|
|
This class parses recursive shell commands in the .p/ format and converts |
|
|
them to operations that can be executed by the shell executor. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize the recursive shell parser.""" |
|
|
|
|
|
self.command_pattern = re.compile(r'\.p/([a-zA-Z0-9_]+)\.([a-zA-Z0-9_]+)(\{.*\})?') |
|
|
self.params_pattern = re.compile(r'\{(.*)\}') |
|
|
self.param_pattern = re.compile(r'([a-zA-Z0-9_]+)=([^,]+)') |
|
|
|
|
|
def parse(self, command: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Parse a recursive shell command. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
command : str |
|
|
Command to parse |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Dict[str, Any] |
|
|
Parsed command |
|
|
""" |
|
|
|
|
|
command_match = self.command_pattern.match(command) |
|
|
if not command_match: |
|
|
raise ValueError(f"Invalid command format: {command}") |
|
|
|
|
|
command_family = command_match.group(1) |
|
|
command_function = command_match.group(2) |
|
|
params_str = command_match.group(3) |
|
|
|
|
|
|
|
|
params = {} |
|
|
if params_str: |
|
|
params_match = self.params_pattern.match(params_str) |
|
|
if params_match: |
|
|
params_content = params_match.group(1) |
|
|
param_matches = self.param_pattern.findall(params_content) |
|
|
for param_name, param_value in param_matches: |
|
|
|
|
|
if param_value.lower() == 'true': |
|
|
params[param_name] = True |
|
|
elif param_value.lower() == 'false': |
|
|
params[param_name] = False |
|
|
elif param_value.isdigit(): |
|
|
params[param_name] = int(param_value) |
|
|
elif self._is_float(param_value): |
|
|
params[param_name] = float(param_value) |
|
|
else: |
|
|
|
|
|
if param_value.startswith('"') and param_value.endswith('"'): |
|
|
param_value = param_value[1:-1] |
|
|
elif param_value.startswith("'") and param_value.endswith("'"): |
|
|
param_value = param_value[1:-1] |
|
|
params[param_name] = param_value |
|
|
|
|
|
|
|
|
operation_type = self._map_to_operation_type(command_family, command_function) |
|
|
operation_params = self._map_to_operation_params(command_family, command_function, params) |
|
|
|
|
|
return { |
|
|
"type": operation_type, |
|
|
"parameters": operation_params, |
|
|
"original_command": { |
|
|
"family": command_family, |
|
|
"function": command_function, |
|
|
"parameters": params |
|
|
} |
|
|
} |
|
|
|
|
|
def _map_to_operation_type(self, command_family: str, command_function: str) -> str: |
|
|
"""Map command family and function to operation type.""" |
|
|
if command_family == "reflect": |
|
|
if command_function in ["trace", "attribution", "boundary", "uncertainty"]: |
|
|
return f"reflect.{command_function}" |
|
|
else: |
|
|
return "reflect.trace" |
|
|
elif command_family == "collapse": |
|
|
if command_function in ["detect", "prevent", "recover", "trace"]: |
|
|
return f"collapse.{command_function}" |
|
|
else: |
|
|
return "collapse.detect" |
|
|
elif command_family == "fork": |
|
|
if command_function in ["context", "attribution", "counterfactual"]: |
|
|
return f"fork.{command_function}" |
|
|
else: |
|
|
return "fork.attribution" |
|
|
elif command_family == "shell": |
|
|
if command_function in ["isolate", "audit"]: |
|
|
return f"shell.{command_function}" |
|
|
else: |
|
|
return "shell.isolate" |
|
|
else: |
|
|
|
|
|
return "reflect.trace" |
|
|
|
|
|
def _map_to_operation_params( |
|
|
self, |
|
|
command_family: str, |
|
|
command_function: str, |
|
|
params: Dict[str, Any] |
|
|
) -> Dict[str, Any]: |
|
|
"""Map command parameters to operation parameters.""" |
|
|
|
|
|
if command_family == "reflect": |
|
|
default_params = { |
|
|
"target": "reasoning", |
|
|
"depth": 3, |
|
|
"detailed": True |
|
|
} |
|
|
elif command_family == "collapse": |
|
|
default_params = { |
|
|
"threshold": 0.7, |
|
|
"alert": True |
|
|
} |
|
|
elif command_family == "fork": |
|
|
default_params = { |
|
|
"sources": "all", |
|
|
"visualize": True |
|
|
} |
|
|
elif command_family == "shell": |
|
|
default_params = { |
|
|
"boundary": "standard", |
|
|
"contamination": "prevent" |
|
|
} |
|
|
else: |
|
|
default_params = {} |
|
|
|
|
|
|
|
|
return {**default_params, **params} |
|
|
|
|
|
def _is_float(self, value: str) -> bool: |
|
|
"""Check if a string can be converted to a float.""" |
|
|
try: |
|
|
float(value) |
|
|
return True |
|
|
except ValueError: |
|
|
return False |
|
|
|
|
|
|
|
|
class RecursiveShell: |
|
|
""" |
|
|
Recursive shell interface for the glyphs framework. |
|
|
|
|
|
This class provides a high-level interface for recursive interpretability |
|
|
operations using the .p/ command syntax. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
model: ModelAdapter, |
|
|
config: Optional[Dict[str, Any]] = None, |
|
|
tracer: Optional[AttributionTracer] = None, |
|
|
visualizer: Optional[VisualizationEngine] = None |
|
|
): |
|
|
""" |
|
|
Initialize the recursive shell. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
model : ModelAdapter |
|
|
Model adapter for the target model |
|
|
config : Optional[Dict[str, Any]] |
|
|
Configuration parameters for the shell |
|
|
tracer : Optional[AttributionTracer] |
|
|
Attribution tracer to use |
|
|
visualizer : Optional[VisualizationEngine] |
|
|
Visualization engine to use |
|
|
""" |
|
|
self.model = model |
|
|
self.config = config or {} |
|
|
self.tracer = tracer |
|
|
self.visualizer = visualizer |
|
|
|
|
|
|
|
|
self.parser = RecursiveShellParser() |
|
|
|
|
|
|
|
|
self.executor = ShellExecutor( |
|
|
config=self.config, |
|
|
tracer=self.tracer, |
|
|
visualizer=self.visualizer |
|
|
) |
|
|
|
|
|
|
|
|
self.command_history = [] |
|
|
|
|
|
|
|
|
self.current_prompt = "" |
|
|
self.current_output = "" |
|
|
|
|
|
|
|
|
self.current_trace = None |
|
|
|
|
|
logger.info("Recursive shell initialized") |
|
|
|
|
|
def execute( |
|
|
self, |
|
|
command: str, |
|
|
prompt: Optional[str] = None, |
|
|
parameters: Optional[Dict[str, Any]] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Execute a recursive shell command. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
command : str |
|
|
Command to execute |
|
|
prompt : Optional[str] |
|
|
Prompt to use for execution, if None uses current prompt |
|
|
parameters : Optional[Dict[str, Any]] |
|
|
Additional parameters to override command defaults |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Dict[str, Any] |
|
|
Command execution result |
|
|
""" |
|
|
execution_start = time.time() |
|
|
|
|
|
|
|
|
if prompt is not None: |
|
|
self.current_prompt = prompt |
|
|
|
|
|
|
|
|
if not self.current_prompt: |
|
|
raise ValueError("No prompt provided") |
|
|
|
|
|
|
|
|
try: |
|
|
parsed_command = self.parser.parse(command) |
|
|
except Exception as e: |
|
|
logger.error(f"Error parsing command: {e}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Error parsing command: {e}", |
|
|
"command": command, |
|
|
"timestamp": time.time() |
|
|
} |
|
|
|
|
|
|
|
|
if parameters: |
|
|
parsed_command["parameters"] = { |
|
|
**parsed_command["parameters"], |
|
|
**parameters |
|
|
} |
|
|
|
|
|
|
|
|
shell_def = { |
|
|
"id": f"RECURSIVE_{int(execution_start)}", |
|
|
"type": "recursive", |
|
|
"description": f"Recursive shell for {command}", |
|
|
"operations": [ |
|
|
{ |
|
|
"type": parsed_command["type"], |
|
|
"parameters": parsed_command["parameters"] |
|
|
} |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
result = self.executor.run( |
|
|
shell=shell_def, |
|
|
model=self.model, |
|
|
prompt=self.current_prompt |
|
|
) |
|
|
|
|
|
|
|
|
if "output" in result: |
|
|
self.current_output = result["output"] |
|
|
|
|
|
|
|
|
self.command_history.append({ |
|
|
"command": command, |
|
|
"parsed": parsed_command, |
|
|
"timestamp": execution_start, |
|
|
"execution_time": time.time() - execution_start |
|
|
}) |
|
|
|
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"result": result, |
|
|
"command": command, |
|
|
"original_command": parsed_command["original_command"], |
|
|
"timestamp": time.time(), |
|
|
"execution_time": time.time() - execution_start |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error executing command: {e}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Error executing command: {e}", |
|
|
"command": command, |
|
|
"timestamp": time.time() |
|
|
} |
|
|
|
|
|
def execute_sequence( |
|
|
self, |
|
|
commands: List[str], |
|
|
prompt: str, |
|
|
parameters: Optional[Dict[str, Any]] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Execute a sequence of recursive shell commands. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
commands : List[str] |
|
|
Commands to execute |
|
|
prompt : str |
|
|
Prompt to use for execution |
|
|
parameters : Optional[Dict[str, Any]] |
|
|
Additional parameters to override command defaults |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Dict[str, Any] |
|
|
Sequence execution result |
|
|
""" |
|
|
sequence_start = time.time() |
|
|
|
|
|
|
|
|
self.current_prompt = prompt |
|
|
|
|
|
|
|
|
results = [] |
|
|
for i, command in enumerate(commands): |
|
|
logger.info(f"Executing command {i+1}/{len(commands)}: {command}") |
|
|
|
|
|
try: |
|
|
result = self.execute( |
|
|
command=command, |
|
|
parameters=parameters |
|
|
) |
|
|
results.append(result) |
|
|
|
|
|
|
|
|
if not result["success"]: |
|
|
logger.warning(f"Command {i+1} failed, stopping sequence") |
|
|
break |
|
|
except Exception as e: |
|
|
logger.error(f"Error executing command {i+1}: {e}") |
|
|
results.append({ |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"command": command, |
|
|
"timestamp": time.time() |
|
|
}) |
|
|
break |
|
|
|
|
|
return { |
|
|
"success": all(r["success"] for r in results), |
|
|
"results": results, |
|
|
"commands": commands, |
|
|
"prompt": prompt, |
|
|
"timestamp": time.time(), |
|
|
"execution_time": time.time() - sequence_start |
|
|
} |
|
|
|
|
|
def visualize( |
|
|
self, |
|
|
visualization_data: Dict[str, Any], |
|
|
output_path: Optional[str] = None |
|
|
) -> Any: |
|
|
""" |
|
|
Visualize shell execution results. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
visualization_data : Dict[str, Any] |
|
|
Visualization data from shell execution |
|
|
output_path : Optional[str] |
|
|
Path to save visualization to |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
Any |
|
|
Visualization result |
|
|
""" |
|
|
if self.visualizer: |
|
|
return self.visualizer.visualize( |
|
|
data=visualization_data, |
|
|
output_path=output_path |
|
|
) |
|
|
else: |
|
|
logger.warning("No visualizer available") |
|
|
return None |
|
|
|
|
|
def get_command_help(self, command_family: Optional[str] = None) -> str: |
|
|
""" |
|
|
Get help text for recursive shell commands. |
|
|
|
|
|
Parameters: |
|
|
----------- |
|
|
command_family : Optional[str] |
|
|
Command family to get help for, if None gets help for all families |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
str |
|
|
Help text |
|
|
""" |
|
|
help_text = "Recursive Shell Command Reference\n\n" |
|
|
|
|
|
|
|
|
command_docs = { |
|
|
"reflect": { |
|
|
"description": "Commands for reflection and tracing", |
|
|
"commands": { |
|
|
"trace": { |
|
|
"description": "Trace reasoning process", |
|
|
"parameters": { |
|
|
"depth": "Recursion depth (int, 'complete')", |
|
|
"target": "Target to trace (reasoning, attribution, attention, memory, uncertainty)", |
|
|
"detailed": "Whether to include detailed analysis (bool)" |
|
|
}, |
|
|
"example": ".p/reflect.trace{depth=4, target=reasoning}" |
|
|
}, |
|
|
"attribution": { |
|
|
"description": "Trace attribution patterns", |
|
|
"parameters": { |
|
|
"sources": "Sources to include (all, primary, secondary, contested)", |
|
|
"confidence": "Whether to include confidence scores (bool)" |
|
|
}, |
|
|
"example": ".p/reflect.attribution{sources=all, confidence=true}" |
|
|
}, |
|
|
"boundary": { |
|
|
"description": "Map epistemic boundaries", |
|
|
"parameters": { |
|
|
"distinct": "Whether to enforce clear boundaries (bool)", |
|
|
"overlap": "Boundary overlap treatment (minimal, moderate, maximal)" |
|
|
}, |
|
|
"example": ".p/reflect.boundary{distinct=true, overlap=minimal}" |
|
|
}, |
|
|
"uncertainty": { |
|
|
"description": "Quantify and map uncertainty", |
|
|
"parameters": { |
|
|
"quantify": "Whether to produce numerical metrics (bool)", |
|
|
"distribution": "Whether to show distributions (show, hide)" |
|
|
}, |
|
|
"example": ".p/reflect.uncertainty{quantify=true, distribution=show}" |
|
|
} |
|
|
} |
|
|
}, |
|
|
"collapse": { |
|
|
"description": "Commands for managing recursive collapse", |
|
|
"commands": { |
|
|
"detect": { |
|
|
"description": "Detect potential collapse points", |
|
|
"parameters": { |
|
|
"threshold": "Detection threshold (float)", |
|
|
"alert": "Whether to emit warnings (bool)" |
|
|
}, |
|
|
"example": ".p/collapse.detect{threshold=0.7, alert=true}" |
|
|
}, |
|
|
"prevent": { |
|
|
"description": "Prevent recursive collapse", |
|
|
"parameters": { |
|
|
"trigger": "Collapse trigger (recursive_depth, confidence_drop, contradiction, oscillation)", |
|
|
"threshold": "Threshold for intervention (int)" |
|
|
}, |
|
|
"example": ".p/collapse.prevent{trigger=recursive_depth, threshold=5}" |
|
|
}, |
|
|
"recover": { |
|
|
"description": "Recover from collapse event", |
|
|
"parameters": { |
|
|
"from": "Collapse type (loop, contradiction, dissipation, fork_explosion)", |
|
|
"method": "Recovery method (gradual, immediate, checkpoint)" |
|
|
}, |
|
|
"example": ".p/collapse.recover{from=loop, method=gradual}" |
|
|
}, |
|
|
"trace": { |
|
|
"description": "Trace collapse trajectory", |
|
|
"parameters": { |
|
|
"detail": "Trace resolution (minimal, standard, comprehensive)", |
|
|
"format": "Output format (symbolic, numeric, visual)" |
|
|
}, |
|
|
"example": ".p/collapse.trace{detail=standard, format=symbolic}" |
|
|
} |
|
|
} |
|
|
}, |
|
|
"fork": { |
|
|
"description": "Commands for forking and attribution", |
|
|
"commands": { |
|
|
"context": { |
|
|
"description": "Fork context for exploration", |
|
|
"parameters": { |
|
|
"branches": "Branch descriptions (list)", |
|
|
"assess": "Whether to assess branches (bool)" |
|
|
}, |
|
|
"example": ".p/fork.context{branches=[\"optimistic\", \"pessimistic\"], assess=true}" |
|
|
}, |
|
|
"attribution": { |
|
|
"description": "Fork attribution paths", |
|
|
"parameters": { |
|
|
"sources": "Sources to include (all, primary, secondary, contested)", |
|
|
"visualize": "Whether to visualize (bool)" |
|
|
}, |
|
|
"example": ".p/fork.attribution{sources=all, visualize=true}" |
|
|
}, |
|
|
"counterfactual": { |
|
|
"description": "Explore counterfactual paths", |
|
|
"parameters": { |
|
|
"variants": "Variant descriptions (list)", |
|
|
"compare": "Whether to compare variants (bool)" |
|
|
}, |
|
|
"example": ".p/fork.counterfactual{variants=[\"A\", \"B\"], compare=true}" |
|
|
} |
|
|
} |
|
|
}, |
|
|
"shell": { |
|
|
"description": "Commands for shell management", |
|
|
"commands": { |
|
|
"isolate": { |
|
|
"description": "Create isolated environment", |
|
|
"parameters": { |
|
|
"boundary": "Isolation strength (permeable, standard, strict)", |
|
|
"contamination": "Cross-contamination prevention (allow, warn, prevent)" |
|
|
}, |
|
|
"example": ".p/shell.isolate{boundary=standard, contamination=prevent}" |
|
|
}, |
|
|
"audit": { |
|
|
"description": "Audit shell integrity", |
|
|
"parameters": { |
|
|
"scope": "Audit scope (complete, recent, differential)", |
|
|
"detail": "Audit detail (basic, standard, forensic)" |
|
|
}, |
|
|
"example": ".p/shell.audit{scope=complete, detail=standard}" |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if command_family: |
|
|
if command_family in command_docs: |
|
|
families = {command_family: command_docs[command_family]} |
|
|
else: |
|
|
return f"Unknown command family: {command_family}" |
|
|
else: |
|
|
families = command_docs |
|
|
|
|
|
|
|
|
for family, family_info in families.items(): |
|
|
help_text += f"{family.upper()} - {family_info['description']}\n\n" |
|
|
|
|
|
for cmd, cmd_info in family_info["commands"].items(): |
|
|
help_text += f" .p/{family}.{cmd}\n" |
|
|
help_text += f" {cmd_info['description']}\n" |
|
|
help_text += f" Parameters:\n" |
|
|
|
|
|
for param, param_desc in cmd_info["parameters"].items(): |
|
|
help_text += f" {param}: {param_desc}\n" |
|
|
|
|
|
help_text += f" Example: {cmd_info['example']}\n\n" |
|
|
|
|
|
return help_text |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import argparse |
|
|
|
|
|
parser = argparse.ArgumentParser(description="Shell Executor for Diagnostic Interpretability") |
|
|
parser.add_argument("--prompt", "-p", type=str, required=True, help="Input prompt") |
|
|
parser.add_argument("--shell", "-s", type=str, default="MEMTRACE", help="Shell ID to execute") |
|
|
parser.add_argument("--output", "-o", type=str, help="Output file for results") |
|
|
parser.add_argument("--model", "-m", type=str, default="claude-3-sonnet", help="Model to use") |
|
|
parser.add_argument("--trace-attribution", "-t", action="store_true", help="Trace attribution") |
|
|
parser.add_argument("--visualize", "-v", action="store_true", help="Generate visualizations") |
|
|
parser.add_argument("--list-shells", "-l", action="store_true", help="List available shells") |
|
|
parser.add_argument("--shell-info", "-i", type=str, help="Get info for specified shell") |
|
|
parser.add_argument("--all-shells", "-a", action="store_true", help="Run all shells") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
executor = ShellExecutor() |
|
|
|
|
|
|
|
|
if args.list_shells: |
|
|
print("Available Shells:") |
|
|
for shell_id in executor.shells: |
|
|
shell_info = executor.get_shell_info(shell_id) |
|
|
print(f" {shell_id}: {shell_info['description']}") |
|
|
exit(0) |
|
|
|
|
|
|
|
|
if args.shell_info: |
|
|
try: |
|
|
shell_info = executor.get_shell_info(args.shell_info) |
|
|
print(f"Shell Information for {args.shell_info}:") |
|
|
print(f" Type: {shell_info['type']}") |
|
|
print(f" Description: {shell_info['description']}") |
|
|
print(f" Failure Signature: {shell_info['failure_signature']}") |
|
|
print(f" Operations: {shell_info['num_operations']}") |
|
|
print(f" Operation Types: {shell_info['operation_types']}") |
|
|
exit(0) |
|
|
except ValueError as e: |
|
|
print(f"Error: {e}") |
|
|
exit(1) |
|
|
|
|
|
|
|
|
from ..models.adapter import create_model_adapter |
|
|
try: |
|
|
model = create_model_adapter(args.model) |
|
|
print(f"Using model: {model.model_id}") |
|
|
except Exception as e: |
|
|
print(f"Error initializing model: {e}") |
|
|
exit(1) |
|
|
|
|
|
|
|
|
try: |
|
|
if args.all_shells: |
|
|
print(f"Running all shells on prompt: {args.prompt[:50]}...") |
|
|
results = executor.run_all_shells( |
|
|
model=model, |
|
|
prompt=args.prompt, |
|
|
trace_attribution=args.trace_attribution, |
|
|
visualize=args.visualize |
|
|
) |
|
|
|
|
|
|
|
|
print("\nExecution Results:") |
|
|
for shell_id, result in results.items(): |
|
|
success = "error" not in result |
|
|
print(f" {shell_id}: {'Success' if success else 'Failed'}") |
|
|
if success and "collapse_samples" in result: |
|
|
print(f" Collapse Samples: {len(result['collapse_samples'])}") |
|
|
if success and "residues" in result: |
|
|
print(f" Residues: {len(result['residues'])}") |
|
|
|
|
|
|
|
|
if args.output: |
|
|
with open(args.output, "w") as f: |
|
|
json.dump(results, f, indent=2) |
|
|
print(f"\nResults saved to {args.output}") |
|
|
else: |
|
|
print(f"Running shell {args.shell} on prompt: {args.prompt[:50]}...") |
|
|
result = executor.run( |
|
|
shell=args.shell, |
|
|
model=model, |
|
|
prompt=args.prompt, |
|
|
trace_attribution=args.trace_attribution, |
|
|
visualize=args.visualize |
|
|
) |
|
|
|
|
|
|
|
|
print("\nExecution Result:") |
|
|
print(f" Output: {result['output'][:100]}...") |
|
|
if "collapse_samples" in result: |
|
|
print(f" Collapse Samples: {len(result['collapse_samples'])}") |
|
|
for i, sample in enumerate(result['collapse_samples']): |
|
|
print(f" Sample {i+1}: {sample['type']} (Confidence: {sample['confidence']:.2f})") |
|
|
if "residues" in result: |
|
|
print(f" Residues: {len(result['residues'])}") |
|
|
|
|
|
|
|
|
if args.output: |
|
|
with open(args.output, "w") as f: |
|
|
json.dump(result, f, indent=2) |
|
|
print(f"\nResult saved to {args.output}") |
|
|
except Exception as e: |
|
|
print(f"Error executing shell: {e}") |
|
|
exit(1) |
|
|
|