-
Notifications
You must be signed in to change notification settings - Fork 0
feat: add per-user memory with LLM extraction, dedup, and decay #67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,31 +1,11 @@ | ||
| -- Koopa Database Schema - Down Migration | ||
| -- Drops all objects created by 000001_init_schema.up.sql in reverse order | ||
|
|
||
| -- ============================================================================ | ||
| -- Drop Messages Table | ||
| -- ============================================================================ | ||
|
|
||
| DROP TABLE IF EXISTS memories; | ||
| DROP TABLE IF EXISTS messages; | ||
|
|
||
| -- ============================================================================ | ||
| -- Drop Sessions Table (including indexes) | ||
| -- ============================================================================ | ||
|
|
||
| DROP INDEX IF EXISTS idx_sessions_owner_id; | ||
| DROP INDEX IF EXISTS idx_sessions_updated_at; | ||
| DROP TABLE IF EXISTS sessions; | ||
|
|
||
| -- ============================================================================ | ||
| -- Drop Documents Table (including indexes) | ||
| -- ============================================================================ | ||
|
|
||
| DROP INDEX IF EXISTS idx_documents_owner; | ||
| DROP INDEX IF EXISTS idx_documents_metadata_gin; | ||
| DROP INDEX IF EXISTS idx_documents_source_type; | ||
| DROP INDEX IF EXISTS idx_documents_embedding; | ||
| DROP TABLE IF EXISTS documents; | ||
|
|
||
| -- ============================================================================ | ||
| -- Drop Extensions | ||
| -- Note: Only drop if no other schemas depend on it | ||
| -- ============================================================================ | ||
|
|
||
| DROP EXTENSION IF EXISTS vector; | ||
|
Comment on lines
+1
to
11
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The migration script does not wrap the DROP statements in a transaction. If an error occurs partway through, the database could be left in an inconsistent state. Recommendation: Wrap all statements in a transaction block (e.g., |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,47 +1,46 @@ | ||
| -- Koopa Database Schema | ||
| -- Consolidated migration for sessions, messages, and documents | ||
| -- NOTE: All CREATE statements use IF NOT EXISTS for idempotent execution | ||
| -- Koopa Database Schema (consolidated) | ||
| -- All tables: sessions, messages, documents, memories | ||
|
|
||
| -- Enable pgvector extension (required for vector search) | ||
| CREATE EXTENSION IF NOT EXISTS vector; | ||
|
|
||
| -- ============================================================================ | ||
| -- Documents Table (for RAG / Knowledge Store) | ||
| -- Used by Genkit PostgreSQL Plugin with custom column names | ||
| -- Documents Table (RAG / Knowledge Store) | ||
| -- ============================================================================ | ||
|
|
||
| CREATE TABLE IF NOT EXISTS documents ( | ||
| id TEXT PRIMARY KEY, | ||
| content TEXT NOT NULL, | ||
| embedding vector(768) NOT NULL, -- gemini-embedding-001 truncated via OutputDimensionality | ||
| source_type TEXT, -- Metadata column for filtering | ||
| metadata JSONB -- Additional metadata in JSON format | ||
| embedding vector(768) NOT NULL, | ||
| source_type TEXT, | ||
| metadata JSONB, | ||
| owner_id TEXT | ||
| ); | ||
|
|
||
| -- HNSW index for fast vector similarity search | ||
| CREATE INDEX IF NOT EXISTS idx_documents_embedding ON documents | ||
| USING hnsw (embedding vector_cosine_ops) | ||
| WITH (m = 16, ef_construction = 64); | ||
|
|
||
| -- Index for filtering by source_type | ||
| CREATE INDEX IF NOT EXISTS idx_documents_source_type ON documents(source_type); | ||
|
|
||
| -- Enables fast queries like: WHERE metadata @> '{"key": "value"}' | ||
| CREATE INDEX IF NOT EXISTS idx_documents_metadata_gin | ||
| ON documents USING GIN (metadata jsonb_path_ops); | ||
|
|
||
| CREATE INDEX IF NOT EXISTS idx_documents_owner ON documents(owner_id); | ||
|
|
||
| -- ============================================================================ | ||
| -- Sessions Table | ||
| -- ============================================================================ | ||
|
|
||
| CREATE TABLE IF NOT EXISTS sessions ( | ||
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | ||
| title TEXT, | ||
| owner_id TEXT NOT NULL DEFAULT '', | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential Data Integrity and Security Issue: The
Example: owner_id TEXT NOT NULLOr, if nullable: owner_id TEXT |
||
| created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), | ||
| updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() | ||
| ); | ||
|
|
||
| CREATE INDEX IF NOT EXISTS idx_sessions_updated_at ON sessions(updated_at DESC); | ||
| CREATE INDEX IF NOT EXISTS idx_sessions_owner_id ON sessions(owner_id, updated_at DESC); | ||
|
|
||
| -- ============================================================================ | ||
| -- Messages Table | ||
|
|
@@ -55,7 +54,60 @@ CREATE TABLE IF NOT EXISTS messages ( | |
| sequence_number INTEGER NOT NULL, | ||
| created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), | ||
|
|
||
| -- UNIQUE constraint automatically creates index on (session_id, sequence_number) | ||
| CONSTRAINT unique_message_sequence UNIQUE (session_id, sequence_number), | ||
| CONSTRAINT message_role_check CHECK (role IN ('user', 'assistant', 'system', 'tool')) | ||
| ); | ||
|
|
||
| -- ============================================================================ | ||
| -- Memories Table (user memory with vector search, decay, dedup) | ||
| -- ============================================================================ | ||
|
|
||
| CREATE TABLE IF NOT EXISTS memories ( | ||
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | ||
| owner_id TEXT NOT NULL, | ||
| content TEXT NOT NULL, | ||
| embedding vector(768) NOT NULL, | ||
| category TEXT NOT NULL DEFAULT 'contextual' | ||
| CHECK (category IN ('identity', 'preference', 'project', 'contextual')), | ||
| source_session_id UUID REFERENCES sessions(id) ON DELETE SET NULL, | ||
| active BOOLEAN NOT NULL DEFAULT true, | ||
| created_at TIMESTAMPTZ NOT NULL DEFAULT now(), | ||
| updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), | ||
| importance SMALLINT NOT NULL DEFAULT 5 | ||
| CHECK (importance BETWEEN 1 AND 10), | ||
| access_count INTEGER NOT NULL DEFAULT 0, | ||
| last_accessed_at TIMESTAMPTZ, | ||
| decay_score REAL NOT NULL DEFAULT 1.0 | ||
| CHECK (decay_score BETWEEN 0.0 AND 1.0), | ||
| superseded_by UUID REFERENCES memories(id) ON DELETE SET NULL, | ||
| CONSTRAINT memories_no_self_supersede | ||
| CHECK (superseded_by IS NULL OR superseded_by != id), | ||
| expires_at TIMESTAMPTZ, | ||
| search_text tsvector | ||
| GENERATED ALWAYS AS (to_tsvector('english', content)) STORED | ||
| ); | ||
|
|
||
| CREATE INDEX idx_memories_embedding ON memories | ||
| USING hnsw (embedding vector_cosine_ops) | ||
| WITH (m = 16, ef_construction = 64); | ||
|
|
||
| CREATE INDEX idx_memories_owner ON memories(owner_id); | ||
|
|
||
| CREATE INDEX idx_memories_owner_active_category | ||
| ON memories(owner_id, active, category); | ||
|
|
||
| CREATE UNIQUE INDEX idx_memories_owner_content_unique | ||
| ON memories(owner_id, md5(content)) WHERE active = true; | ||
|
Comment on lines
+99
to
+100
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deduplication Vulnerability via MD5 Hash Collisions: The unique index on Example (if using SHA256): ON memories(owner_id, encode(digest(content, 'sha256'), 'hex')) WHERE active = true; |
||
|
|
||
| CREATE INDEX idx_memories_search_text ON memories USING gin (search_text); | ||
|
|
||
| CREATE INDEX idx_memories_decay_candidates | ||
| ON memories (owner_id, updated_at) | ||
| WHERE active = true AND superseded_by IS NULL; | ||
|
|
||
| CREATE INDEX idx_memories_superseded_by ON memories (superseded_by) | ||
| WHERE superseded_by IS NOT NULL; | ||
|
|
||
| CREATE INDEX idx_memories_expires_at | ||
| ON memories (expires_at) | ||
| WHERE expires_at IS NOT NULL AND active = true; | ||
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| package app | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "log/slog" | ||
| "sync" | ||
|
|
@@ -16,6 +17,7 @@ import ( | |
|
|
||
| "github.com/koopa0/koopa/internal/chat" | ||
| "github.com/koopa0/koopa/internal/config" | ||
| "github.com/koopa0/koopa/internal/memory" | ||
| "github.com/koopa0/koopa/internal/security" | ||
| "github.com/koopa0/koopa/internal/session" | ||
| "github.com/koopa0/koopa/internal/tools" | ||
|
|
@@ -32,6 +34,7 @@ type App struct { | |
| DocStore *postgresql.DocStore | ||
| Retriever ai.Retriever | ||
| SessionStore *session.Store | ||
| MemoryStore *memory.Store | ||
| PathValidator *security.Path | ||
| Tools []ai.Tool // Pre-registered Genkit tools (for chat agent) | ||
|
|
||
|
|
@@ -41,8 +44,10 @@ type App struct { | |
| Network *tools.Network | ||
| Knowledge *tools.Knowledge // nil if retriever unavailable | ||
|
|
||
| // Lifecycle management (unexported) | ||
| // Lifecycle management (unexported except bgCtx for agent construction) | ||
| bgCtx context.Context // Outlives individual requests; canceled by Close(). | ||
| cancel func() | ||
| wg sync.WaitGroup // tracks background goroutines (scheduler, memory extraction) | ||
| dbCleanup func() | ||
| otelCleanup func() | ||
| closeOnce sync.Once | ||
|
|
@@ -53,8 +58,9 @@ type App struct { | |
| // | ||
| // Shutdown order: | ||
| // 1. Cancel context (signals background tasks to stop) | ||
| // 2. Close DB pool | ||
| // 3. Flush OTel spans | ||
| // 2. Wait for background goroutines (scheduler) to exit | ||
| // 3. Close DB pool | ||
| // 4. Flush OTel spans | ||
| func (a *App) Close() error { | ||
| a.closeOnce.Do(func() { | ||
| slog.Info("shutting down application") | ||
|
Comment on lines
58
to
66
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resource cleanup error handling is missing in The var err error
if a.dbCleanup != nil {
if cerr := a.dbCleanup(); cerr != nil {
err = errors.Join(err, cerr)
}
}
// ... same for otelCleanup
return err |
||
|
|
@@ -64,12 +70,15 @@ func (a *App) Close() error { | |
| a.cancel() | ||
| } | ||
|
|
||
| // 2. Close DB pool | ||
| // 2. Wait for background goroutines to finish | ||
| a.wg.Wait() | ||
|
|
||
| // 3. Close DB pool | ||
| if a.dbCleanup != nil { | ||
| a.dbCleanup() | ||
| } | ||
|
|
||
| // 3. Flush OTel spans | ||
| // 4. Flush OTel spans | ||
| if a.otelCleanup != nil { | ||
| a.otelCleanup() | ||
| } | ||
|
|
@@ -82,13 +91,16 @@ func (a *App) Close() error { | |
| // Setup guarantees all dependencies are non-nil. | ||
| func (a *App) CreateAgent() (*chat.Agent, error) { | ||
| agent, err := chat.New(chat.Config{ | ||
| Genkit: a.Genkit, | ||
| SessionStore: a.SessionStore, | ||
| Logger: slog.Default(), | ||
| Tools: a.Tools, | ||
| ModelName: a.Config.FullModelName(), | ||
| MaxTurns: a.Config.MaxTurns, | ||
| Language: a.Config.Language, | ||
| Genkit: a.Genkit, | ||
| SessionStore: a.SessionStore, | ||
| MemoryStore: a.MemoryStore, | ||
| Logger: slog.Default(), | ||
| Tools: a.Tools, | ||
| ModelName: a.Config.FullModelName(), | ||
| MaxTurns: a.Config.MaxTurns, | ||
| Language: a.Config.Language, | ||
| BackgroundCtx: a.bgCtx, | ||
| WG: &a.wg, | ||
| }) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("creating chat agent: %w", err) | ||
|
Comment on lines
91
to
106
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dependency guarantees in The comment claims Setup guarantees all dependencies are non-nil, but this is not enforced in code. If Setup fails, |
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ import ( | |
|
|
||
| "github.com/koopa0/koopa/db" | ||
| "github.com/koopa0/koopa/internal/config" | ||
| "github.com/koopa0/koopa/internal/memory" | ||
| "github.com/koopa0/koopa/internal/rag" | ||
| "github.com/koopa0/koopa/internal/security" | ||
| "github.com/koopa0/koopa/internal/session" | ||
|
|
@@ -78,6 +79,12 @@ func Setup(ctx context.Context, cfg *config.Config) (_ *App, retErr error) { | |
|
|
||
| a.SessionStore = provideSessionStore(pool) | ||
|
|
||
| memStore, err := provideMemoryStore(pool, embedder) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| a.MemoryStore = memStore | ||
|
|
||
| path, err := providePathValidator() | ||
| if err != nil { | ||
| return nil, err | ||
|
|
@@ -88,10 +95,21 @@ func Setup(ctx context.Context, cfg *config.Config) (_ *App, retErr error) { | |
| return nil, err | ||
| } | ||
|
|
||
| // Set up lifecycle management | ||
| _, cancel := context.WithCancel(ctx) | ||
| // Set up lifecycle management. | ||
| bgCtx, cancel := context.WithCancel(ctx) | ||
| a.bgCtx = bgCtx | ||
| a.cancel = cancel | ||
|
|
||
| // Start memory decay scheduler if memory store is available. | ||
| if memStore != nil { | ||
| scheduler := memory.NewScheduler(memStore, slog.Default()) | ||
| a.wg.Add(1) | ||
| go func() { | ||
| defer a.wg.Done() | ||
| scheduler.Run(bgCtx) | ||
| }() | ||
| } | ||
|
|
||
| return a, nil | ||
| } | ||
|
|
||
|
|
@@ -299,6 +317,15 @@ func provideSessionStore(pool *pgxpool.Pool) *session.Store { | |
| return session.New(sqlc.New(pool), pool, nil) | ||
| } | ||
|
|
||
| // provideMemoryStore creates a memory store backed by pgvector. | ||
| func provideMemoryStore(pool *pgxpool.Pool, embedder ai.Embedder) (*memory.Store, error) { | ||
| store, err := memory.NewStore(pool, embedder, slog.Default()) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("creating memory store: %w", err) | ||
| } | ||
| return store, nil | ||
| } | ||
|
|
||
| // providePathValidator creates a path validator instance. | ||
| // Denies access to prompts/ to protect system prompt files from tool-based access. | ||
| func providePathValidator() (*security.Path, error) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Security/Configurability Issue: The return security.NewPath([]string{"."}, []string{"prompts"})If the directory structure changes or additional sensitive directories are introduced, this approach will not scale. Consider making the denial list configurable via application settings or environment variables to enhance security and maintainability. Recommended Solution:
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indexes are dropped after their associated tables (e.g., indexes on 'sessions' and 'documents' are dropped after the tables themselves). This can cause errors if the table no longer exists when attempting to drop the index. Recommendation: Drop all indexes before dropping their associated tables to avoid dependency errors.