Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ coverage.xml
# Code documentation for internal usage
doc_*.*
example_*.txt
echo cycle_log.jsonl
echo terminal.txt


# Build/dist
build/
Expand Down
625 changes: 558 additions & 67 deletions README.md

Large diffs are not rendered by default.

133 changes: 133 additions & 0 deletions cca8_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ class EnvObservation:
- raw_sensors : numeric/tensor-like channels (distances, images, IMU, ...)
- predicates : discrete tokens suitable for insertion into WorldGraph
- cues : tokens routed into Features/Columns
- nav_patches : processed local navigation map fragments (NavPatch v0; JSON-safe dicts)
- env_meta : lightweight metadata (episode id, uncertainties, etc.)

Note: these are *observations*, not beliefs. WorldGraph+Columns are where
Expand All @@ -294,6 +295,7 @@ class EnvObservation:
raw_sensors: Dict[str, Any] = field(default_factory=dict)
predicates: List[str] = field(default_factory=list)
cues: List[str] = field(default_factory=list)
nav_patches: List[Dict[str, Any]] = field(default_factory=list)
env_meta: Dict[str, Any] = field(default_factory=dict)


Expand Down Expand Up @@ -670,6 +672,13 @@ def observe(self, env_state: EnvState) -> EnvObservation:
cues: List[str] = []
meta: Dict[str, Any] = {}

# --- nav patches (v0 stub) ---
# We do NOT model raw pixels. Instead, we emit small, JSON-safe NavPatch dicts
# representing processed spatial structure (hazards, shelter region, mom region, etc.).
# These are designed to be stored as separate Column engrams later.
patches: List[Dict[str, Any]] = []


# --- raw channels ---
dx = env_state.mom_position[0] - env_state.kid_position[0]
dy = env_state.mom_position[1] - env_state.kid_position[1]
Expand Down Expand Up @@ -737,13 +746,137 @@ def observe(self, env_state: EnvState) -> EnvObservation:
meta["time_since_birth"] = env_state.time_since_birth
meta["scenario_stage"] = env_state.scenario_stage

# --- NavPatch stub stream ---
# These are NOT used for policy selection yet. They are Phase X scaffolding so the agent-side
# predictive matching loop can start generating top-K match traces, and so later WorkingMap code
# can attach patch_refs to entities.

#nav_patches = self._stub_nav_patches(env_state) #commented out since variable not used

# Scene-level patch (zone + position)
try:
patches.append({
"schema": "navpatch_v1",
"local_id": "p_scene",
"entity_id": "scene",
"role": "scene",
"frame": "ego_schematic_v1",
"extent": {"type": "aabb", "x0": -2.0, "y0": -2.0, "x1": 2.0, "y1": 2.0},
"tags": [
f"zone:{env_state.zone}",
f"position:{env_state.position}",
f"stage:{env_state.scenario_stage}",
],
"layers": {},
"obs": {"source": "PerceptionAdapter.observe"},
})
except Exception:
pass

# Entity-linked patches (hazard/shelter/mom)
def _add_simple_patch(local_id: str, entity_id: str, role: str, tags: list[str]) -> None:
patches.append({
"schema": "navpatch_v1",
"local_id": local_id,
"entity_id": entity_id,
"role": role,
"frame": "ego_schematic_v1",
"extent": {"type": "aabb", "x0": -1.0, "y0": -1.0, "x1": 1.0, "y1": 1.0},
"tags": list(tags),
"layers": {},
"obs": {"source": "PerceptionAdapter.observe"},
})

try:
_add_simple_patch("p_cliff", "cliff", "hazard", [f"hazard:cliff:{env_state.cliff_distance}"])
_add_simple_patch("p_shelter", "shelter", "shelter", [f"proximity:shelter:{env_state.shelter_distance}"])
_add_simple_patch("p_mom", "mom", "agent", [f"proximity:mom:{env_state.mom_distance}"])
except Exception:
pass

return EnvObservation(
raw_sensors=raw,
predicates=preds,
cues=cues,
nav_patches=patches,
env_meta=meta,
)

def _stub_nav_patches(self, env_state: EnvState) -> List[Dict[str, Any]]:
"""Return a minimal list of NavPatch-like dicts derived from EnvState.

Purpose
-------
The newborn-goat storyboard is not yet a full navigation environment. However, we still want
an *intermediate representation* that looks like the eventual NavPatch stream so we can:

1) test JSON-safe patch schemas end-to-end,
2) run a simple prototype matching loop against Column memory,
3) log top-K candidate matches for interpretability and future learning.

Design
------
- The dict schema is intentionally small and JSON-safe.
- We avoid any heavy geometry (images, point clouds) in this stub.
- The agent-side code computes patch signatures and matching; this function only emits patches.

Schema (v0)
-----------
Each patch is a dict with keys:

v : str, version label ("navpatch_v1")
id : str, short patch id within the observation (e.g., "p_zone")
kind : str, coarse patch family ("zone" | "hazard" | "affordance" | ...)
tags : list[str], human-readable tags (order is not significant)
geom : dict[str, Any], small structured features used for matching (JSON-safe)
meta : dict[str, Any], volatile tick info (NOT part of signature)

Returns
-------
list[dict[str, Any]]
Patch list for this observation tick.
"""
zone = getattr(env_state, "zone", None) or "unknown"
cliff = getattr(env_state, "cliff_distance", None) or "unknown"
shelter = getattr(env_state, "shelter_distance", None) or "unknown"

# The 'zone' patch is a coarse summary used for future route/context selection.
p_zone = {
"v": "navpatch_v1",
"id": "p_zone",
"kind": "zone",
"tags": [f"zone:{zone}"],
"geom": {
"zone": zone,
"position": getattr(env_state, "position", None) or "unknown",
"cliff_distance": cliff,
"shelter_distance": shelter,
},
"meta": {
"source": "PerceptionAdapter",
"step_index": int(getattr(env_state, "step_index", 0) or 0),
},
}

# The 'cliff hazard' patch is the first concrete hazard-like patch in the storyboard.
p_cliff = {
"v": "navpatch_v1",
"id": "p_cliff",
"kind": "hazard",
"tags": ["hazard:cliff", f"cliff:{cliff}", f"shelter:{shelter}"],
"geom": {
"cliff_distance": cliff,
"shelter_distance": shelter,
"zone": zone,
},
"meta": {
"source": "PerceptionAdapter",
"step_index": int(getattr(env_state, "step_index", 0) or 0),
},
}

return [p_zone, p_cliff]


# ---------------------------------------------------------------------------
# HybridEnvironment: orchestrator
Expand Down
Loading