Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions crackle_planning/crackle_planning/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor

# MOVE LATER
import numpy as np
import openai
from openai import OpenAI
from _keys import openai_key
os.environ["OPENAI_API_KEY"] = str(openai_key)
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])

class PlannerAPI:
def __init__(self, use_ros: bool):
if use_ros:
Expand Down Expand Up @@ -44,6 +52,45 @@ def _shutdown_ros(self):
except Exception as e:
print(f"[PlannerAPI] ROS shutdown warning: {e}")

# def find(self, object_name : str, mapped_objects): # This function is to locate the named object by matching it with the most similar mapped object

# MOVE LATER
def embedding_from_string(text: str, model: str = "text-embedding-3-small") -> np.ndarray:
"""Get an embedding as a NumPy vector."""
resp = client.embeddings.create(model=model, input=text)
# print("str: ", text, " | embedding: ", np.array(resp.data[0].embedding, dtype=np.float32))
return np.array(resp.data[0].embedding, dtype=np.float32)

# MOVE LATER
def recommendations_from_strings(
strings: List[str],
index_of_source_string: int,
model: str = "text-embedding-3-small",
) -> List[int]:
"""Return nearest neighbors of a given string (including itself first)."""

# 1) embeddings for all strings
embeddings = [embedding_from_string(s, model=model) for s in strings]

# 2) source embedding
query = embeddings[index_of_source_string]

# 3) cosine *distance* = 1 - cosine similarity
q_norm = np.linalg.norm(query)
distances = []
for e in embeddings:
denom = q_norm * np.linalg.norm(e)
sim = float(query @ e / denom) if denom != 0 else 0.0
distances.append(1.0 - sim)

# 4) nearest neighbors = smallest distance first
indices_of_nearest_neighbors = list(np.argsort(distances))

closest_idx = indices_of_nearest_neighbors[1]
if distances[closest_idx] <= 0.5:
return strings[closest_idx]
return None

def pick_up(self, object_name : str): # This function allows us to pick up object at called object_name
if (self.gripper_occupied):
print("Gripper is already holding an object.")
Expand Down
2 changes: 1 addition & 1 deletion crackle_planning/crackle_planning/_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def get_command(self, prompt: str, ros_interface: any = None):
"Assume 'self' is already in scope so no need to pass it in. "
"Do not redefine these. For example, if you were to pick up and "
"place an object called 'x y', code would look like this:\n"
"api.pick_up(\"x_y\")\\api.place()"
"api.pick_up(\"x_y\")\\api.place()."
),
},
"emotion": {
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
10 changes: 5 additions & 5 deletions crackle_planning/crackle_planning/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pyaudio
import os
from _api import PlannerAPI
from planner import main_planner
# from planner import main_planner
import wave
from openai import OpenAI
from _llm import GptAPI
Expand Down Expand Up @@ -65,12 +65,12 @@ def __init__(self):
# Path to your custom TFLite model
custom_model_path = os.path.join(
os.path.dirname(__file__),
"lee_oh.tflite",
"leeoh.tflite",
)
print("custom_model_path:", custom_model_path)

# Key you'll use in the prediction dict
self.WAKEWORD_NAME = "lee_oh"
self.WAKEWORD_NAME = "leeoh"

from openwakeword.model import Model

Expand Down Expand Up @@ -129,7 +129,7 @@ async def listening_thread(name: str, stop_event: asyncio.Event):
audio = np.frombuffer(self._mic_stream.read(CHUNK), dtype=np.int16)
prediction = self._owwModel.predict(audio)
score = prediction[self.WAKEWORD_NAME]
#print(score)

if score > 0.1:
print(f"Wake word detected with score {score:.3f}")
self._state = CrackleState.LISTENING
Expand All @@ -156,7 +156,7 @@ async def handle_task(self):
# ...
await asyncio.sleep(2) # Simulate task execution time
print("Entering TASK state: Executing task...")
main_planner() #main_planner()
# main_planner()
pass

async def handle_resetting(self):
Expand Down
1 change: 1 addition & 0 deletions crackle_planning/crackle_planning/ros_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import numpy as np
from crackle_interfaces.srv import PickupObject, LookAt


from sensor_msgs.msg import Image
from rclpy.callback_groups import ReentrantCallbackGroup
from geometry_msgs.msg import Vector3Stamped
Expand Down
Loading