Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e0eb490
Fix: Include task_id in ScheduleMessageItem serialization
Nov 29, 2025
2606fc7
Fix(Scheduler): Correct event log creation and task_id serialization
Nov 29, 2025
b3a6f1b
Feat(Scheduler): Add conditional detailed logging for KB updates
Nov 29, 2025
4b2cc2f
Fix(Scheduler): Correct create_event_log call sites
Nov 29, 2025
d8726ec
Fix(Scheduler): Deserialize task_id in ScheduleMessageItem.from_dict
Nov 29, 2025
b8cc42a
Refactor(Config): Centralize RabbitMQ config override logic
Nov 29, 2025
b6ebee6
Revert "Refactor(Config): Centralize RabbitMQ config override logic"
Nov 29, 2025
702d3e1
Fix(Redis): Convert None task_id to empty string during serialization
Nov 29, 2025
975e585
Feat(Log): Add diagnostic log to /product/add endpoint
Nov 29, 2025
bceaf68
Merge branch 'dev' into hotfix/task-id-loss
glin93 Nov 29, 2025
82a95c4
Feat(Log): Add comprehensive diagnostic logs for /product/add flow
Nov 29, 2025
c5631cc
Feat(Log): Add comprehensive diagnostic logs for /product/add flow an…
Nov 29, 2025
600fe24
Fix(rabbitmq): Use env vars for KB updates and improve logging
Nov 29, 2025
1da7c71
Fix(rabbitmq): Explicitly use MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME and…
Nov 29, 2025
f32399b
Fix(add_handler): Update diagnostic log timestamp
Nov 29, 2025
42fea63
Fix(add_handler): Update diagnostic log timestamp again (auto-updated)
Nov 29, 2025
003a169
Update default scheduler redis stream prefix
Nov 29, 2025
6b5d5c6
Update diagnostic timestamp in add handler
Nov 29, 2025
5339b08
Allow optional log_content in scheduler event log
Nov 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added dump.rdb
Binary file not shown.
4 changes: 3 additions & 1 deletion src/memos/api/handlers/add_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def handle_add_memories(self, add_req: APIADDRequest) -> MemoryResponse:
Returns:
MemoryResponse with added memory information
"""
self.logger.info(f"[AddHandler] Add Req is: {add_req}")
self.logger.info(
f"[DIAGNOSTIC] server_router -> add_handler.handle_add_memories called (Modified at 2025-11-29 18:46). Full request: {add_req.model_dump_json(indent=2)}"
)

if add_req.info:
exclude_fields = list_all_fields()
Expand Down
1 change: 1 addition & 0 deletions src/memos/api/routers/product_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def get_all_memories(memory_req: GetMemoryPlaygroundRequest):
@router.post("/add", summary="add a new memory", response_model=SimpleResponse)
def create_memory(memory_req: MemoryCreateRequest):
"""Create a new memory for a specific user."""
logger.info("DIAGNOSTIC: /product/add endpoint called. This confirms the new code is deployed.")
# Initialize status_tracker outside try block to avoid NameError in except blocks
status_tracker = None

Expand Down
3 changes: 3 additions & 0 deletions src/memos/mem_os/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,9 @@ def process_textual_memory():
timestamp=datetime.utcnow(),
task_id=task_id,
)
logger.info(
f"[DIAGNOSTIC] core.add: Submitting message to scheduler: {message_item.model_dump_json(indent=2)}"
)
self.mem_scheduler.memos_message_queue.submit_messages(
messages=[message_item]
)
Expand Down
5 changes: 5 additions & 0 deletions src/memos/mem_scheduler/base_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,11 @@ def _submit_web_logs(
Args:
messages: Single log message or list of log messages
"""
messages_list = [messages] if isinstance(messages, ScheduleLogForWebItem) else messages
for message in messages_list:
logger.info(
f"[DIAGNOSTIC] base_scheduler._submit_web_logs called. Message to publish: {message.model_dump_json(indent=2)}"
)
if self.rabbitmq_config is None:
return

Expand Down
3 changes: 2 additions & 1 deletion src/memos/mem_scheduler/general_modules/scheduler_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ def create_event_log(
metadata: list[dict],
memory_len: int,
memcube_name: str | None = None,
log_content: str | None = None,
) -> ScheduleLogForWebItem:
item = self.create_autofilled_log_item(
log_content="",
log_content=log_content or "",
label=label,
from_memory_type=from_memory_type,
to_memory_type=to_memory_type,
Expand Down
14 changes: 13 additions & 1 deletion src/memos/mem_scheduler/general_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,16 +367,19 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
if kb_log_content:
event = self.create_event_log(
label="knowledgeBaseUpdate",
# 1. 移除 log_content 参数
# 2. 补充 memory_type
from_memory_type=USER_INPUT_TYPE,
to_memory_type=LONG_TERM_MEMORY_TYPE,
user_id=msg.user_id,
mem_cube_id=msg.mem_cube_id,
mem_cube=self.current_mem_cube,
memcube_log_content=kb_log_content,
metadata=None, # Per design doc for KB logs
metadata=None,
memory_len=len(kb_log_content),
memcube_name=self._map_memcube_name(msg.mem_cube_id),
)
# 3. 后置赋值 log_content
event.log_content = (
f"Knowledge Base Memory Update: {len(kb_log_content)} changes."
)
Expand Down Expand Up @@ -474,6 +477,9 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
logger.error(f"Error: {e}", exc_info=True)

def _mem_read_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
logger.info(
f"[DIAGNOSTIC] general_scheduler._mem_read_message_consumer called. Received messages: {[msg.model_dump_json(indent=2) for msg in messages]}"
)
logger.info(f"Messages {messages} assigned to {MEM_READ_LABEL} handler.")

def process_message(message: ScheduleMessageItem):
Expand Down Expand Up @@ -538,6 +544,9 @@ def _process_memories_with_reader(
task_id: str | None = None,
info: dict | None = None,
) -> None:
logger.info(
f"[DIAGNOSTIC] general_scheduler._process_memories_with_reader called. mem_ids: {mem_ids}, user_id: {user_id}, mem_cube_id: {mem_cube_id}, task_id: {task_id}"
)
"""
Process memories using mem_reader for enhanced memory processing.

Expand Down Expand Up @@ -635,6 +644,9 @@ def _process_memories_with_reader(
}
)
if kb_log_content:
logger.info(
f"[DIAGNOSTIC] general_scheduler._process_memories_with_reader: Creating event log for KB update. Label: knowledgeBaseUpdate, user_id: {user_id}, mem_cube_id: {mem_cube_id}, task_id: {task_id}. KB content: {json.dumps(kb_log_content, indent=2)}"
)
event = self.create_event_log(
label="knowledgeBaseUpdate",
from_memory_type=USER_INPUT_TYPE,
Expand Down
2 changes: 2 additions & 0 deletions src/memos/mem_scheduler/schemas/message_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def to_dict(self) -> dict:
"content": self.content,
"timestamp": self.timestamp.isoformat(),
"user_name": self.user_name,
"task_id": self.task_id if self.task_id is not None else "",
}

@classmethod
Expand All @@ -97,6 +98,7 @@ def from_dict(cls, data: dict) -> "ScheduleMessageItem":
content=data["content"],
timestamp=datetime.fromisoformat(data["timestamp"]),
user_name=data.get("user_name"),
task_id=data.get("task_id"),
)


Expand Down
8 changes: 7 additions & 1 deletion src/memos/mem_scheduler/task_schedule_modules/redis_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class SchedulerRedisQueue(RedisSchedulerModule):
def __init__(
self,
stream_key_prefix: str = os.getenv(
"MEMSCHEDULER_REDIS_STREAM_KEY_PREFIX", "scheduler:messages:stream"
"MEMSCHEDULER_REDIS_STREAM_KEY_PREFIX",
"scheduler:messages:stream:v2",
),
consumer_group: str = "scheduler_group",
consumer_name: str | None = "scheduler_consumer",
Expand Down Expand Up @@ -78,6 +79,11 @@ def __init__(
# Task tracking for mem_scheduler_wait compatibility
self._unfinished_tasks = 0

logger.info(
f"[REDIS_QUEUE] Initialized with stream_prefix='{self.stream_key_prefix}', "
f"consumer_group='{self.consumer_group}', consumer_name='{self.consumer_name}'"
)

# Auto-initialize Redis connection
if self.auto_initialize_redis():
self._is_connected = True
Expand Down
26 changes: 24 additions & 2 deletions src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import os
import ssl
import threading
import time
Expand Down Expand Up @@ -270,15 +271,36 @@ def rabbitmq_publish_message(self, message: dict):
"""
import pika

exchange_name = self.rabbitmq_exchange_name
routing_key = self.rabbit_queue_name

if message.get("label") == "knowledgeBaseUpdate":
kb_specific_exchange_name = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME")

if kb_specific_exchange_name:
exchange_name = kb_specific_exchange_name

routing_key = "" # User specified empty routing key for KB updates

logger.info(
f"[DIAGNOSTIC] Publishing KB Update message. "
f"ENV_EXCHANGE_NAME_USED: {kb_specific_exchange_name is not None}. "
f"Current configured Exchange: {exchange_name}, Routing Key: '{routing_key}'."
)
logger.info(f" - Message Content: {json.dumps(message, indent=2)}")

with self._rabbitmq_lock:
if not self.is_rabbitmq_connected():
logger.error("Cannot publish - no active connection")
return False

logger.info(
f"[DIAGNOSTIC] rabbitmq_service.rabbitmq_publish_message: Attempting to publish message. Exchange: {exchange_name}, Routing Key: {routing_key}, Message Content: {json.dumps(message, indent=2)}"
)
try:
self.rabbitmq_channel.basic_publish(
exchange=self.rabbitmq_exchange_name,
routing_key=self.rabbit_queue_name,
exchange=exchange_name,
routing_key=routing_key,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
Expand Down
4 changes: 4 additions & 0 deletions src/memos/multi_mem_cube/single_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ def add_memories(self, add_req: APIADDRequest) -> list[dict[str, Any]]:
This is basically your current handle_add_memories logic,
but scoped to a single cube_id.
"""
sync_mode = add_req.async_mode or self._get_sync_mode()
self.logger.info(
f"[DIAGNOSTIC] single_cube.add_memories called for cube_id: {self.cube_id}. sync_mode: {sync_mode}. Request: {add_req.model_dump_json(indent=2)}"
)
user_context = UserContext(
user_id=add_req.user_id,
mem_cube_id=self.cube_id,
Expand Down
Loading