diff --git a/ROADMAP.md b/ROADMAP.md index cce1f67..252003d 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -500,58 +500,60 @@ This is the most complex but most rewarding "quality of life" feature. #### **Backend** -* [ ] **Add WebSocket library:** +* [x] **Add WebSocket library:** * `go get github.com/gorilla/websocket` -* [ ] **Create WebSocket Hub:** - * Create `/backend/internal/websocket/hub.go`. - * The `Hub` struct will manage active connections: `clients map[string]*websocket.Conn` (mapping `userID` to their connection). - * It needs methods: `Register(userID, conn)`, `Unregister(userID)`, and `Send(userID, message []byte)`. -* [ ] **Create GET /api/v1/ws endpoint:** - * Add this route in `routes.go`. - * The handler (`wsHandler`) upgrades the HTTP connection to a WebSocket. - * It gets the `userID` from the auth context. - * It calls `hub.Register(userID, conn)`. - * It must handle disconnects by calling `hub.Unregister(userID)`. -* [ ] **Create IMAP IDLE listener:** - * In `/backend/internal/imap/idle.go`, create `func (s *Service) StartIdleListener(ctx context.Context, userID string, hub *websocket.Hub)`. - * **Launch it:** When a user successfully connects to the WebSocket (`hub.Register`), launch this function in a **new goroutine** for that `userID`. +* [x] **Create WebSocket Hub:** + * Created `/backend/internal/websocket/hub.go`. + * The `Hub` struct manages active connections: `userID -> set of *websocket.Conn` (via a `Client` wrapper). + * It exposes methods: `Register(userID, conn)`, `Unregister(userID, client)`, and `Send(userID, message []byte)`. + * It supports multiple connections per user with a per-user limit (currently 10). +* [x] **Create GET /api/v1/ws endpoint:** + * Added the route in `cmd/server/main.go`. + * The handler (`WebSocketHandler`) upgrades the HTTP connection to a WebSocket. + * It gets the `userID` from the auth context using `GetUserIDFromContext`. + * It calls `hub.Register(userID, conn)` and starts a read loop to detect disconnects. + * On disconnect, it calls `hub.Unregister(userID, client)` and stops the IMAP IDLE listener when there are no more active connections for that user. +* [x] **Create IMAP IDLE listener:** + * Implemented in `/backend/internal/imap/idle.go` as `func (s *Service) StartIdleListener(ctx context.Context, userID string, hub *websocket.Hub)`. + * **Launch:** When a user successfully connects to the WebSocket (`hub.Register`), `WebSocketHandler` starts this function in a **new goroutine** for that `userID` (if not already running). * **Logic:** - 1. Get a *dedicated* IMAP connection (do not use the pool). + 1. Get a dedicated IMAP listener connection from the pool. 2. Run `SELECT INBOX`. - 3. Start a `for` loop (to handle disconnects). - 4. Inside the loop, run `client.Idle()`. - 5. Listen for updates. When an update arrives (e.g., `* 1 EXISTS`), call `hub.Send(userID, []byte('{"type": "new_email", "folder": "INBOX"}'))`. - 6. If `client.Idle()` returns an error (e.g., timeout), `log.Println` and `time.Sleep(10 * time.Second)` before the loop retries. + 3. Start an IDLE loop (with fallback) using `go-imap-idle`. + 4. Listen for updates via the client's `Updates` channel. When an update indicates new messages in `INBOX`, call `SyncThreadsForFolder` for `INBOX` immediately. + 5. After syncing, call `hub.Send(userID, []byte('{"type":"new_email","folder":"INBOX"}'))`. + 6. On errors (e.g., timeout), log, remove the listener connection, and retry after a short sleep. #### **Frontend** -* [ ] **Create useWebSocket hook:** - * Create `hooks/useWebSocket.ts`. - * It should be called *once* from your main `Layout.tsx`. +* [x] **Create useWebSocket hook:** + * Created `hooks/useWebSocket.ts`. + * It is called *once* from the main `Layout.tsx`. * `useEffect` on mount: - 1. `const socket = new WebSocket('ws://localhost:8080/api/v1/ws')` (use wss in prod). - 2. `socket.onmessage = (event) => { ... }` - 3. `socket.onclose = () => { ... }` - * The `onmessage` handler parses the `event.data`. - * `if (message.type === 'new_email') { ... }` -* [ ] **Invalidate cache on message:** + 1. Opens `new WebSocket(VITE_WS_URL || '/api/v1/ws')`. + 2. Sets status in a `connection.store.ts` (Zustand) to `connecting`/`connected`/`disconnected`. + 3. Handles `onmessage` and `onclose` to update connection state. + * The `onmessage` handler parses `event.data` and, when `message.type === 'new_email'`, invalidates queries for that folder. +* [x] **Invalidate cache on message:** * Inside the `onmessage` handler: - * Get the `queryClient` using `useQueryClient()`. - * Call `queryClient.invalidateQueries({ queryKey: ['threads', message.folder] })`. - * This will automatically make `TanStack Query` refetch the thread list, and the new email will appear. + * Gets the `queryClient` using `useQueryClient()`. + * Calls `queryClient.invalidateQueries({ queryKey: ['threads', message.folder] })`. + * This automatically makes `TanStack Query` refetch the thread list, and the new email appears. +* [x] **Connection status banner and manual reconnect:** + * Added a `ConnectionStatusBanner` component, shown when the WebSocket status is `disconnected`. + * The banner displays a Gmail-style "Connection lost. New emails may be delayed." message with a "Try now" link that triggers a reconnect of the WebSocket. #### **Testing** -* [ ] **Frontend Integration (RTL + Mock WebSocket):** - * You'll need a library like `mock-socket`. - * Render the `Inbox.page.tsx` (which is inside `Layout.tsx`, so the hook runs). - * Simulate a message from the mock socket: `mockSocket.send('{"type": "new_email", "folder": "INBOX"}')`. - * **Assert** that `queryClient.invalidateQueries` was called with `['threads', 'INBOX']`. -* [ ] **E2E:** - * This is the only true test. - * Log in to V-Mail. Have the Inbox page open. - * Use a *different* email client (or your `spike` script!) to send a new email to your test account. - * **Assert** the new email appears in the V-Mail inbox *without* a page reload. +* [x] **Frontend Integration (RTL + WebSocket mocking via MSW):** + * Uses `msw`'s WebSocket support instead of `mock-socket`. + * Renders a component that uses `useWebSocket` under a `QueryClientProvider`. + * Simulates a message from the mock socket: `server.send('{"type": "new_email", "folder": "INBOX"}')`. + * **Asserts** that `queryClient.invalidateQueries` was called with `{ queryKey: ['threads', 'INBOX'] }`. +* [x] **E2E:** + * Adds a new E2E test in `e2e/tests/inbox.spec.ts`. + * With the Inbox page open, the test calls `/test/add-imap-message` (a test-only backend endpoint) to append a message to `INBOX` on the IMAP server. + * **Asserts** the new email appears in the V-Mail inbox *without* a page reload. ## Milestone 6: Offline diff --git a/backend/cmd/server/main.go b/backend/cmd/server/main.go index 186343a..cefbed5 100644 --- a/backend/cmd/server/main.go +++ b/backend/cmd/server/main.go @@ -14,6 +14,7 @@ import ( "github.com/vdavid/vmail/backend/internal/crypto" "github.com/vdavid/vmail/backend/internal/db" "github.com/vdavid/vmail/backend/internal/imap" + ws "github.com/vdavid/vmail/backend/internal/websocket" ) func main() { @@ -50,6 +51,7 @@ func NewServer(cfg *config.Config, dbPool *pgxpool.Pool) http.Handler { imapPool := imap.NewPoolWithMaxWorkers(cfg.IMAPMaxWorkers) imapService := imap.NewService(dbPool, imapPool, encryptor) + wsHub := ws.NewHub(10) authHandler := api.NewAuthHandler(dbPool) settingsHandler := api.NewSettingsHandler(dbPool, encryptor) @@ -57,6 +59,8 @@ func NewServer(cfg *config.Config, dbPool *pgxpool.Pool) http.Handler { threadsHandler := api.NewThreadsHandler(dbPool, encryptor, imapService) threadHandler := api.NewThreadHandler(dbPool, encryptor, imapService) searchHandler := api.NewSearchHandler(dbPool, encryptor, imapService) + wsHandler := api.NewWebSocketHandler(dbPool, imapService, wsHub) + testHandler := api.NewTestHandler(dbPool, encryptor, imapService, wsHub) mux := http.NewServeMux() @@ -76,6 +80,13 @@ func NewServer(cfg *config.Config, dbPool *pgxpool.Pool) http.Handler { mux.Handle("/api/v1/folders", auth.RequireAuth(http.HandlerFunc(foldersHandler.GetFolders))) mux.Handle("/api/v1/threads", auth.RequireAuth(http.HandlerFunc(threadsHandler.GetThreads))) mux.Handle("/api/v1/search", auth.RequireAuth(http.HandlerFunc(searchHandler.Search))) + // WebSocket handler handles its own authentication via query parameter + // (since browsers can't set headers on WebSocket connections). + mux.Handle("/api/v1/ws", http.HandlerFunc(wsHandler.Handle)) + // Add test endpoints + if cfg.Environment == "test" { + mux.Handle("/test/add-imap-message", auth.RequireAuth(http.HandlerFunc(testHandler.AddIMAPMessage))) + } // Handle /api/v1/thread/{thread_id} pattern mux.Handle("/api/v1/thread/", auth.RequireAuth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/backend/cmd/test-server/main.go b/backend/cmd/test-server/main.go index d52258f..0ce172a 100644 --- a/backend/cmd/test-server/main.go +++ b/backend/cmd/test-server/main.go @@ -23,6 +23,7 @@ import ( "github.com/vdavid/vmail/backend/internal/imap" "github.com/vdavid/vmail/backend/internal/models" "github.com/vdavid/vmail/backend/internal/testutil" + ws "github.com/vdavid/vmail/backend/internal/websocket" ) func main() { @@ -246,12 +247,15 @@ func NewServer(cfg *config.Config, dbPool *pgxpool.Pool) http.Handler { imapPool := imap.NewPoolWithMaxWorkers(cfg.IMAPMaxWorkers) imapService := imap.NewService(dbPool, imapPool, encryptor) + tsHub := ws.NewHub(10) authHandler := api.NewAuthHandler(dbPool) settingsHandler := api.NewSettingsHandler(dbPool, encryptor) foldersHandler := api.NewFoldersHandler(dbPool, encryptor, imapPool) threadsHandler := api.NewThreadsHandler(dbPool, encryptor, imapService) threadHandler := api.NewThreadHandler(dbPool, encryptor, imapService) searchHandler := api.NewSearchHandler(dbPool, encryptor, imapService) + wsHandler := api.NewWebSocketHandler(dbPool, imapService, tsHub) + testHandler := api.NewTestHandler(dbPool, encryptor, imapService, tsHub) mux := http.NewServeMux() @@ -271,6 +275,11 @@ func NewServer(cfg *config.Config, dbPool *pgxpool.Pool) http.Handler { mux.Handle("/api/v1/folders", auth.RequireAuth(http.HandlerFunc(foldersHandler.GetFolders))) mux.Handle("/api/v1/threads", auth.RequireAuth(http.HandlerFunc(threadsHandler.GetThreads))) mux.Handle("/api/v1/search", auth.RequireAuth(http.HandlerFunc(searchHandler.Search))) + // WebSocket handler handles its own authentication via query parameter + // (since browsers can't set headers on WebSocket connections). + mux.Handle("/api/v1/ws", http.HandlerFunc(wsHandler.Handle)) + // Test endpoints are only available in test environment + mux.Handle("/test/add-imap-message", auth.RequireAuth(http.HandlerFunc(testHandler.AddIMAPMessage))) // Handle /api/v1/thread/{thread_id} pattern mux.Handle("/api/v1/thread/", auth.RequireAuth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/backend/go.mod b/backend/go.mod index ec8e7c7..bac00cf 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -4,8 +4,10 @@ go 1.25.3 require ( github.com/emersion/go-imap v1.2.1 + github.com/emersion/go-imap-idle v0.0.0-20210907174914-db2568431445 github.com/emersion/go-imap-sortthread v1.2.0 github.com/emersion/go-smtp v0.24.0 + github.com/gorilla/websocket v1.5.3 github.com/jackc/pgx/v5 v5.7.6 github.com/jhillyerd/enmime v1.3.0 github.com/joho/godotenv v1.5.1 diff --git a/backend/go.sum b/backend/go.sum index c874eba..174ae16 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -40,8 +40,11 @@ github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD github.com/ebitengine/purego v0.8.4 h1:CF7LEKg5FFOsASUj0+QwaXf8Ht6TlFxg09+S9wz0omw= github.com/ebitengine/purego v0.8.4/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/emersion/go-imap v1.0.5/go.mod h1:yKASt+C3ZiDAiCSssxg9caIckWF/JG7ZQTO7GAmvicU= +github.com/emersion/go-imap v1.0.6/go.mod h1:yKASt+C3ZiDAiCSssxg9caIckWF/JG7ZQTO7GAmvicU= github.com/emersion/go-imap v1.2.1 h1:+s9ZjMEjOB8NzZMVTM3cCenz2JrQIGGo5j1df19WjTA= github.com/emersion/go-imap v1.2.1/go.mod h1:Qlx1FSx2FTxjnjWpIlVNEuX+ylerZQNFE5NsmKFSejY= +github.com/emersion/go-imap-idle v0.0.0-20210907174914-db2568431445 h1:dAGbaaU4LLupO7dnYZaELOoI3RoVDNi5DCGejLe8a7c= +github.com/emersion/go-imap-idle v0.0.0-20210907174914-db2568431445/go.mod h1:N/6S3dRTVt8xT867m+476C16+v/Fq4WZYvh2Chg0nmg= github.com/emersion/go-imap-sortthread v1.2.0 h1:EMVEJXPWAhXMWECjR82Rn/tza6MddcvTwGAdTu1vJKU= github.com/emersion/go-imap-sortthread v1.2.0/go.mod h1:UhenCBupR+vSYRnqJkpjSq84INUCsyAK1MLpogv14pE= github.com/emersion/go-message v0.11.1/go.mod h1:C4jnca5HOTo4bGN9YdqNQM9sITuT3Y0K6bSUw9RklvY= @@ -74,6 +77,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= diff --git a/backend/internal/api/auth_handler_test.go b/backend/internal/api/auth_handler_test.go index 1849ed9..f57920c 100644 --- a/backend/internal/api/auth_handler_test.go +++ b/backend/internal/api/auth_handler_test.go @@ -102,10 +102,10 @@ func TestAuthHandler_GetAuthStatus(t *testing.T) { t.Run("returns 500 when GetOrCreateUser returns an error", func(t *testing.T) { req := httptest.NewRequest("GET", "/api/v1/auth/status", nil) - // Use a cancelled context to simulate database connection failure - cancelledCtx, cancel := context.WithCancel(context.Background()) + // Use a canceled context to simulate database connection failure + canceledCtx, cancel := context.WithCancel(context.Background()) cancel() - reqCtx := context.WithValue(cancelledCtx, auth.UserEmailKey, "test@example.com") + reqCtx := context.WithValue(canceledCtx, auth.UserEmailKey, "test@example.com") req = req.WithContext(reqCtx) rr := httptest.NewRecorder() diff --git a/backend/internal/api/folders_handler_test.go b/backend/internal/api/folders_handler_test.go index a297d5a..20a2b03 100644 --- a/backend/internal/api/folders_handler_test.go +++ b/backend/internal/api/folders_handler_test.go @@ -60,11 +60,11 @@ func TestFoldersHandler_GetFolders(t *testing.T) { t.Run("returns 500 when GetOrCreateUser returns an error", func(t *testing.T) { email := "dberror@example.com" - // Use a cancelled context to simulate database connection failure - cancelledCtx, cancel := context.WithCancel(context.Background()) + // Use a canceled context to simulate database connection failure + canceledCtx, cancel := context.WithCancel(context.Background()) cancel() req := httptest.NewRequest("GET", "/api/v1/folders", nil) - reqCtx := context.WithValue(cancelledCtx, auth.UserEmailKey, email) + reqCtx := context.WithValue(canceledCtx, auth.UserEmailKey, email) req = req.WithContext(reqCtx) rr := httptest.NewRecorder() @@ -98,8 +98,11 @@ type mockIMAPPool struct { getClientPass string removeClientCalled map[string]bool // For retry scenarios: the first call returns one client, the second call returns another - retryClient imap.IMAPClient - retryClientErr error + retryClient imap.IMAPClient + retryClientErr error + listenerClient imap.ListenerClient + listenerClientErr error + removeListenerCalled map[string]bool } func (m *mockIMAPPool) WithClient(userID, server, username, password string, fn func(imap.IMAPClient) error) error { @@ -138,6 +141,20 @@ func (m *mockIMAPPool) RemoveClient(userID string) { func (m *mockIMAPPool) Close() {} +func (m *mockIMAPPool) GetListenerConnection(string, string, string, string) (imap.ListenerClient, error) { + if m.listenerClientErr != nil { + return nil, m.listenerClientErr + } + return m.listenerClient, nil +} + +func (m *mockIMAPPool) RemoveListenerConnection(userID string) { + if m.removeListenerCalled == nil { + m.removeListenerCalled = make(map[string]bool) + } + m.removeListenerCalled[userID] = true +} + // callGetFolders is a helper function that sets up and calls GetFolders handler. // It returns the response recorder for assertions. func callGetFolders(t *testing.T, handler *FoldersHandler, email string) *httptest.ResponseRecorder { diff --git a/backend/internal/api/search_handler_test.go b/backend/internal/api/search_handler_test.go index 127c9b0..1d9c873 100644 --- a/backend/internal/api/search_handler_test.go +++ b/backend/internal/api/search_handler_test.go @@ -12,6 +12,7 @@ import ( "github.com/vdavid/vmail/backend/internal/imap" "github.com/vdavid/vmail/backend/internal/models" "github.com/vdavid/vmail/backend/internal/testutil" + ws "github.com/vdavid/vmail/backend/internal/websocket" ) func TestSearchHandler_Search(t *testing.T) { @@ -321,6 +322,10 @@ func (m *mockIMAPServiceForSearch) Search(_ context.Context, _ string, query str func (m *mockIMAPServiceForSearch) Close() {} +// StartIdleListener is part of the IMAPService interface but is not used in search tests. +func (m *mockIMAPServiceForSearch) StartIdleListener(context.Context, string, *ws.Hub) { +} + type imapError struct { message string } diff --git a/backend/internal/api/settings_handler_test.go b/backend/internal/api/settings_handler_test.go index d78a731..ca4c503 100644 --- a/backend/internal/api/settings_handler_test.go +++ b/backend/internal/api/settings_handler_test.go @@ -334,8 +334,8 @@ func TestSettingsHandler_PostSettings(t *testing.T) { t.Run("returns 500 when GetUserSettings returns non-NotFound error in PostSettings", func(t *testing.T) { email := "dberror-post@example.com" - // Use a cancelled context to simulate database connection failure - cancelledCtx, cancel := context.WithCancel(context.Background()) + // Use a canceled context to simulate database connection failure + canceledCtx, cancel := context.WithCancel(context.Background()) cancel() reqBody := models.UserSettingsRequest{ @@ -351,7 +351,7 @@ func TestSettingsHandler_PostSettings(t *testing.T) { body, _ := json.Marshal(reqBody) req := httptest.NewRequest("POST", "/api/v1/settings", bytes.NewReader(body)) - reqCtx := context.WithValue(cancelledCtx, auth.UserEmailKey, email) + reqCtx := context.WithValue(canceledCtx, auth.UserEmailKey, email) req = req.WithContext(reqCtx) rr := httptest.NewRecorder() @@ -371,12 +371,12 @@ func TestSettingsHandler_PostSettings(t *testing.T) { t.Run("returns 500 when GetUserSettings returns non-NotFound error in GetSettings", func(t *testing.T) { email := "dberror-get@example.com" - // Use a cancelled context to simulate database connection failure - cancelledCtx, cancel := context.WithCancel(context.Background()) + // Use a canceled context to simulate database connection failure + canceledCtx, cancel := context.WithCancel(context.Background()) cancel() req := httptest.NewRequest("GET", "/api/v1/settings", nil) - reqCtx := context.WithValue(cancelledCtx, auth.UserEmailKey, email) + reqCtx := context.WithValue(canceledCtx, auth.UserEmailKey, email) req = req.WithContext(reqCtx) rr := httptest.NewRecorder() diff --git a/backend/internal/api/test_handler.go b/backend/internal/api/test_handler.go new file mode 100644 index 0000000..65b22b1 --- /dev/null +++ b/backend/internal/api/test_handler.go @@ -0,0 +1,188 @@ +package api + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "strings" + "time" + + "github.com/emersion/go-imap" + imapclient "github.com/emersion/go-imap/client" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/vdavid/vmail/backend/internal/crypto" + "github.com/vdavid/vmail/backend/internal/db" + imapinternal "github.com/vdavid/vmail/backend/internal/imap" + ws "github.com/vdavid/vmail/backend/internal/websocket" +) + +// TestHandler provides test-only endpoints used by E2E tests. +// These endpoints are only registered in test environments. +type TestHandler struct { + pool *pgxpool.Pool + encryptor *crypto.Encryptor + imapService imapinternal.IMAPService + hub *ws.Hub +} + +// NewTestHandler creates a new TestHandler instance. +func NewTestHandler(pool *pgxpool.Pool, encryptor *crypto.Encryptor, imapService imapinternal.IMAPService, hub *ws.Hub) *TestHandler { + return &TestHandler{ + pool: pool, + encryptor: encryptor, + imapService: imapService, + hub: hub, + } +} + +type addIMAPMessageRequest struct { + Folder string `json:"folder"` + Subject string `json:"subject"` + From string `json:"from"` + To string `json:"to"` +} + +// AddIMAPMessage appends a test message to the user's IMAP folder. +// It is used by E2E tests to simulate new incoming mail. +func (h *TestHandler) AddIMAPMessage(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + ctx := r.Context() + + userID, ok := GetUserIDFromContext(ctx, w, h.pool) + if !ok { + return + } + + req, err := h.parseRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + client, err := h.connectToIMAP(ctx, userID) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer func() { + _ = client.Logout() + }() + + if err := h.appendMessage(client, req); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + h.syncAndNotify(ctx, userID, req.Folder) + + w.WriteHeader(http.StatusNoContent) +} + +// parseRequest parses and validates the request body. +func (h *TestHandler) parseRequest(r *http.Request) (*addIMAPMessageRequest, error) { + var req addIMAPMessageRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return nil, fmt.Errorf("invalid JSON body") + } + + if req.Folder == "" { + req.Folder = "INBOX" + } + + if req.Subject == "" || req.From == "" || req.To == "" { + return nil, fmt.Errorf("subject, from, and to are required") + } + + return &req, nil +} + +// connectToIMAP connects to the IMAP server and selects the folder. +func (h *TestHandler) connectToIMAP(ctx context.Context, userID string) (*imapclient.Client, error) { + settings, err := db.GetUserSettings(ctx, h.pool, userID) + if err != nil { + log.Printf("TestHandler: failed to get user settings: %v", err) + return nil, fmt.Errorf("failed to get user settings") + } + + imapPassword, err := h.encryptor.Decrypt(settings.EncryptedIMAPPassword) + if err != nil { + log.Printf("TestHandler: failed to decrypt IMAP password: %v", err) + return nil, fmt.Errorf("failed to decrypt IMAP password") + } + + useTLS := os.Getenv("VMAIL_TEST_MODE") != "true" + + client, err := imapinternal.ConnectToIMAP(settings.IMAPServerHostname, useTLS) + if err != nil { + log.Printf("TestHandler: failed to connect to IMAP server: %v", err) + return nil, fmt.Errorf("failed to connect to IMAP server") + } + + if err := imapinternal.Login(client, settings.IMAPUsername, imapPassword); err != nil { + log.Printf("TestHandler: failed to login to IMAP server: %v", err) + _ = client.Logout() + return nil, fmt.Errorf("failed to login to IMAP server") + } + + return client, nil +} + +// appendMessage appends a message to the specified IMAP folder. +func (h *TestHandler) appendMessage(client *imapclient.Client, req *addIMAPMessageRequest) error { + // Select the folder. + if _, err := client.Select(req.Folder, false); err != nil { + log.Printf("TestHandler: failed to select folder %s: %v", req.Folder, err) + return fmt.Errorf("failed to select IMAP folder") + } + + // Construct a simple RFC 822 message. + messageID := fmt.Sprintf("", time.Now().UnixNano()) + now := time.Now() + messageBody := fmt.Sprintf(`Message-ID: %s +Date: %s +From: %s +To: %s +Subject: %s +Content-Type: text/plain; charset=utf-8 + +E2E test message. +`, messageID, now.Format(time.RFC1123Z), req.From, req.To, req.Subject) + + flags := []string{imap.SeenFlag} + if err := client.Append(req.Folder, flags, now, strings.NewReader(messageBody)); err != nil { + log.Printf("TestHandler: failed to append message: %v", err) + return fmt.Errorf("failed to append message to IMAP folder") + } + + return nil +} + +// syncAndNotify syncs the folder and sends a WebSocket notification. +func (h *TestHandler) syncAndNotify(ctx context.Context, userID, folder string) { + if err := h.imapService.SyncThreadsForFolder(ctx, userID, folder); err != nil { + log.Printf("TestHandler: failed to sync folder %s for user %s: %v", folder, userID, err) + return + } + + msg := struct { + Type string `json:"type"` + Folder string `json:"folder"` + }{ + Type: "new_email", + Folder: folder, + } + payload, err := json.Marshal(msg) + if err != nil { + log.Printf("TestHandler: failed to marshal new_email message: %v", err) + return + } + + h.hub.Send(userID, payload) +} diff --git a/backend/internal/api/thread_handler_test.go b/backend/internal/api/thread_handler_test.go index c64b2f7..38c0b4a 100644 --- a/backend/internal/api/thread_handler_test.go +++ b/backend/internal/api/thread_handler_test.go @@ -15,6 +15,7 @@ import ( "github.com/vdavid/vmail/backend/internal/imap" "github.com/vdavid/vmail/backend/internal/models" "github.com/vdavid/vmail/backend/internal/testutil" + ws "github.com/vdavid/vmail/backend/internal/websocket" ) func TestThreadHandler_GetThread(t *testing.T) { @@ -253,12 +254,12 @@ func TestThreadHandler_GetThread(t *testing.T) { email := "dberror-thread@example.com" setupTestUserAndSettings(t, pool, encryptor, email) - // Use a cancelled context to simulate database connection failure - cancelledCtx, cancel := context.WithCancel(context.Background()) + // Use a canceled context to simulate database connection failure + canceledCtx, cancel := context.WithCancel(context.Background()) cancel() req := httptest.NewRequest("GET", "/api/v1/thread/test-thread-id", nil) - reqCtx := context.WithValue(cancelledCtx, auth.UserEmailKey, email) + reqCtx := context.WithValue(canceledCtx, auth.UserEmailKey, email) req = req.WithContext(reqCtx) rr := httptest.NewRecorder() @@ -284,12 +285,12 @@ func TestThreadHandler_GetThread(t *testing.T) { t.Fatalf("Failed to save thread: %v", err) } - // Use a cancelled context to simulate database error when getting messages - cancelledCtx, cancel := context.WithCancel(context.Background()) + // Use a canceled context to simulate database error when getting messages + canceledCtx, cancel := context.WithCancel(context.Background()) cancel() req := httptest.NewRequest("GET", "/api/v1/thread/thread-db-error", nil) - reqCtx := context.WithValue(cancelledCtx, auth.UserEmailKey, email) + reqCtx := context.WithValue(canceledCtx, auth.UserEmailKey, email) req = req.WithContext(reqCtx) rr := httptest.NewRecorder() @@ -545,6 +546,10 @@ func (m *mockIMAPServiceForThread) Search(context.Context, string, string, int, func (m *mockIMAPServiceForThread) Close() {} +// StartIdleListener is part of the IMAPService interface but is not used in thread handler tests. +func (m *mockIMAPServiceForThread) StartIdleListener(context.Context, string, *ws.Hub) { +} + func TestThreadHandler_SyncsMissingBodies(t *testing.T) { pool := testutil.NewTestDB(t) defer pool.Close() diff --git a/backend/internal/api/threads_handler_test.go b/backend/internal/api/threads_handler_test.go index 491d400..f1c6024 100644 --- a/backend/internal/api/threads_handler_test.go +++ b/backend/internal/api/threads_handler_test.go @@ -14,6 +14,7 @@ import ( "github.com/vdavid/vmail/backend/internal/imap" "github.com/vdavid/vmail/backend/internal/models" "github.com/vdavid/vmail/backend/internal/testutil" + ws "github.com/vdavid/vmail/backend/internal/websocket" ) func TestThreadsHandler_GetThreads(t *testing.T) { @@ -264,6 +265,10 @@ func (m *mockIMAPService) Search(context.Context, string, string, int, int) ([]* func (m *mockIMAPService) Close() {} +// StartIdleListener is part of the IMAPService interface but is not used in threads handler tests. +func (m *mockIMAPService) StartIdleListener(context.Context, string, *ws.Hub) { +} + func TestThreadsHandler_SyncsWhenStale(t *testing.T) { pool := testutil.NewTestDB(t) defer pool.Close() @@ -413,8 +418,8 @@ func TestThreadsHandler_SyncsWhenStale(t *testing.T) { email := "threads-error@example.com" setupTestUserAndSettings(t, pool, encryptor, email) - // Use a cancelled context to simulate database error - cancelledCtx, cancel := context.WithCancel(context.Background()) + // Use a canceled context to simulate database error + canceledCtx, cancel := context.WithCancel(context.Background()) cancel() mockIMAP := &mockIMAPService{ @@ -424,7 +429,7 @@ func TestThreadsHandler_SyncsWhenStale(t *testing.T) { handler := NewThreadsHandler(pool, encryptor, mockIMAP) req := httptest.NewRequest("GET", "/api/v1/threads?folder=INBOX", nil) - reqCtx := context.WithValue(cancelledCtx, auth.UserEmailKey, email) + reqCtx := context.WithValue(canceledCtx, auth.UserEmailKey, email) req = req.WithContext(reqCtx) rr := httptest.NewRecorder() @@ -450,11 +455,11 @@ func TestThreadsHandler_SyncsWhenStale(t *testing.T) { t.Fatalf("Failed to save thread: %v", err) } - // Use a cancelled context to simulate database error when counting - // We need to create the user first, then use cancelled context - cancelledCtx, cancel := context.WithCancel(context.Background()) + // Use a canceled context to simulate database error when counting + // We need to create the user first, then use canceled context + canceledCtx, cancel := context.WithCancel(context.Background()) cancel() - reqCtx := context.WithValue(cancelledCtx, auth.UserEmailKey, email) + reqCtx := context.WithValue(canceledCtx, auth.UserEmailKey, email) req := httptest.NewRequest("GET", "/api/v1/threads?folder=INBOX", nil) req = req.WithContext(reqCtx) @@ -468,7 +473,7 @@ func TestThreadsHandler_SyncsWhenStale(t *testing.T) { handler.GetThreads(rr, req) // Note: This test is tricky because GetThreadsForFolder is called before GetThreadCountForFolder - // and both use the same context. The cancelled context will cause GetThreadsForFolder to fail first. + // and both use the same context. The canceled context will cause GetThreadsForFolder to fail first. // So we expect 500, but it's from GetThreadsForFolder, not GetThreadCountForFolder. // This still tests error handling, just at an earlier point. if rr.Code != http.StatusInternalServerError { diff --git a/backend/internal/api/ws_handler.go b/backend/internal/api/ws_handler.go new file mode 100644 index 0000000..7bee7bc --- /dev/null +++ b/backend/internal/api/ws_handler.go @@ -0,0 +1,178 @@ +package api + +import ( + "context" + "log" + "net/http" + "strings" + "sync" + + "github.com/gorilla/websocket" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/vdavid/vmail/backend/internal/auth" + "github.com/vdavid/vmail/backend/internal/db" + "github.com/vdavid/vmail/backend/internal/imap" + ws "github.com/vdavid/vmail/backend/internal/websocket" +) + +// WebSocketHandler handles the /api/v1/ws endpoint for real-time updates. +type WebSocketHandler struct { + pool *pgxpool.Pool + imap interface { + imap.IMAPService + StartIdleListener(ctx context.Context, userID string, hub *ws.Hub) + } + hub *ws.Hub + mu sync.Mutex + idleCancels map[string]context.CancelFunc +} + +// NewWebSocketHandler creates a new WebSocketHandler instance. +func NewWebSocketHandler(pool *pgxpool.Pool, imapService imap.IMAPService, hub *ws.Hub) *WebSocketHandler { + return &WebSocketHandler{ + pool: pool, + imap: imapService, + hub: hub, + idleCancels: make(map[string]context.CancelFunc), + } +} + +var wsUpgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + // For now, allow all origins. This server is expected to be used + // behind a reverse proxy in a trusted environment. + // TODO Review this decision and add CORS headers later if needed. + return true + }, +} + +// Handle upgrades the HTTP connection to a WebSocket and registers it with the Hub. +// Authentication is handled via query parameter (?token=...) since WebSocket connections +// cannot set custom headers in browsers. +// The token is validated using the same ValidateToken function used by the RequireAuth middleware. +func (h *WebSocketHandler) Handle(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + // Extract token from the query parameter + token := r.URL.Query().Get("token") + if token == "" { + // Fallback to Authorization header if query parameter is not provided. + // This allows testing with tools that can set headers. + authHeader := r.Header.Get("Authorization") + if authHeader != "" { + fields := strings.Fields(authHeader) + if len(fields) >= 2 && strings.EqualFold(fields[0], "Bearer") { + token = strings.TrimSpace(strings.Join(fields[1:], " ")) + } + } + } + + if token == "" { + log.Printf("WebSocketHandler: No token provided (neither query parameter nor Authorization header)") + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + // Validate token using the same function as the RequireAuth middleware. + userEmail, err := auth.ValidateToken(token) + if err != nil { + log.Printf("WebSocketHandler: Token validation failed: %v", err) + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + // Get or create the user by their email address. + userID, err := db.GetOrCreateUser(ctx, h.pool, userEmail) + if err != nil { + log.Printf("WebSocketHandler: Failed to get/create user: %v", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + // Upgrade the connection to a WebSocket + conn, err := wsUpgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("WebSocketHandler: failed to upgrade connection for user %s: %v", userID, err) + return + } + + // Check if this is the first connection for this user (before registering). + // This determines if we need to catch up on missed emails. + isFirstConnection := h.hub.ActiveConnections(userID) == 0 + + client := h.hub.Register(userID, conn) + if client == nil { + log.Printf("WebSocketHandler: Connection rejected for user %s (max connections exceeded)", userID) + return + } + + // Ensure an IDLE listener is running for this user. + h.ensureIdleListener(userID) + + // If this is the first connection, immediately sync INBOX to catch up on missed emails. + // This ensures emails that arrived while no WebSocket was connected are synced. + if isFirstConnection { + go func() { + // Use background context to avoid cancellation if the request context is canceled. + // The sync should complete even if the WebSocket connection is established. + syncCtx := context.Background() + if err := h.imap.SyncThreadsForFolder(syncCtx, userID, "INBOX"); err != nil { + log.Printf("WebSocketHandler: Failed to sync INBOX for user %s on connection: %v", userID, err) + } + }() + } + + // Read loop to keep the connection open and detect disconnects. + go h.readLoop(userID, client) +} + +// ensureIdleListener starts an IMAP IDLE listener for the user if one is not already running. +func (h *WebSocketHandler) ensureIdleListener(userID string) { + h.mu.Lock() + defer h.mu.Unlock() + + if _, exists := h.idleCancels[userID]; exists { + return + } + + idleCtx, cancel := context.WithCancel(context.Background()) + h.idleCancels[userID] = cancel + + // Start the IDLE listener in a separate goroutine. + go func(ctx context.Context, uid string, cancelFn context.CancelFunc) { + h.imap.StartIdleListener(ctx, uid, h.hub) + + // When the listener exits, clean up the cancel function if it's still registered. + h.mu.Lock() + delete(h.idleCancels, uid) + h.mu.Unlock() + }(idleCtx, userID, cancel) +} + +// readLoop reads messages from the WebSocket until the connection is closed. +// When the connection closes, it unregisters the client and may stop the IDLE listener +// if there are no more active connections for the user. +func (h *WebSocketHandler) readLoop(userID string, client *ws.Client) { + conn := client.Conn() + + for { + if _, _, err := conn.ReadMessage(); err != nil { + break + } + } + + // Unregister the client and close the connection. + h.hub.Unregister(userID, client) + + // If there are no active connections left for this user, stop the IDLE listener. + if h.hub.ActiveConnections(userID) == 0 { + // Temporary logging + log.Printf("WebSocketHandler: No active connections remaining for user %s, stopping IDLE listener", userID) + h.mu.Lock() + if cancel, exists := h.idleCancels[userID]; exists { + cancel() + delete(h.idleCancels, userID) + } + h.mu.Unlock() + } +} diff --git a/backend/internal/api/ws_handler_test.go b/backend/internal/api/ws_handler_test.go new file mode 100644 index 0000000..8686a38 --- /dev/null +++ b/backend/internal/api/ws_handler_test.go @@ -0,0 +1,164 @@ +package api + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/vdavid/vmail/backend/internal/imap" + "github.com/vdavid/vmail/backend/internal/models" + "github.com/vdavid/vmail/backend/internal/testutil" + ws "github.com/vdavid/vmail/backend/internal/websocket" +) + +func TestWebSocketHandler_Connection(t *testing.T) { + pool := testutil.NewTestDB(t) + defer pool.Close() + + // Create a mock IMAP service + mockIMAP := &mockIMAPServiceForWS{ + startIdleListenerCalled: false, + startIdleListenerCtx: make(chan context.Context, 1), + } + + hub := ws.NewHub(10) + handler := NewWebSocketHandler(pool, mockIMAP, hub) + + // Create test server + server := httptest.NewServer(http.HandlerFunc(handler.Handle)) + defer server.Close() + + // Convert http:// to ws:// + wsURL := "ws" + server.URL[4:] + "?token=token" + + t.Run("connects successfully and stays open", func(t *testing.T) { + conn, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer func(conn *websocket.Conn) { + err := conn.Close() + if err != nil { + t.Errorf("Failed to close connection: %v", err) + } + }(conn) + + if resp.StatusCode != http.StatusSwitchingProtocols { + t.Errorf("Expected status 101, got %d", resp.StatusCode) + } + + t.Log("Connection established successfully") + + // Verify connection stays open for at least 3-4 seconds + // We'll read messages in a goroutine and check if the connection closes prematurely + done := make(chan error, 1) + messageReceived := make(chan bool, 1) + + go func() { + for { + messageType, message, err := conn.ReadMessage() + if err != nil { + done <- err + return + } + t.Logf("Received message: type=%d, content=%s", messageType, string(message)) + select { + case messageReceived <- true: + default: + } + } + }() + + // Wait a number of seconds and check if the connection is still open + startTime := time.Now() + select { + case err := <-done: + duration := time.Since(startTime) + t.Errorf("Connection closed unexpectedly after %v: %v", duration, err) + case <-messageReceived: + t.Log("Received a message (connection is active)") + // Continue waiting to see if it stays open + select { + case err := <-done: + duration := time.Since(startTime) + t.Errorf("Connection closed after receiving message (after %v): %v", duration, err) + case <-time.After(4 * time.Second): + duration := time.Since(startTime) + t.Logf("Connection stayed open for %v after message", duration) + } + case <-time.After(5 * time.Second): + // Connection is still open - good! + duration := time.Since(startTime) + t.Logf("Connection stayed open for %v - SUCCESS", duration) + } + + // Check if IDLE listener was started + if !mockIMAP.startIdleListenerCalled { + t.Error("Expected StartIdleListener to be called") + } + }) + + t.Run("rejects connection without token", func(t *testing.T) { + wsURLNoToken := "ws" + server.URL[4:] + _, resp, err := websocket.DefaultDialer.Dial(wsURLNoToken, nil) + if err == nil { + t.Error("Expected connection to fail without token") + if resp != nil { + err := resp.Body.Close() + if err != nil { + return + } + } + } else { + t.Logf("Correctly rejected connection without token: %v", err) + } + }) + + t.Run("rejects invalid token", func(t *testing.T) { + // TODO: This test is skipped because ValidateToken is currently a stub that accepts all tokens. + // Once proper token validation is implemented, this test should verify rejection of invalid tokens. + t.Skip("Token validation is not yet implemented - ValidateToken accepts all non-empty tokens") + }) +} + +// mockIMAPServiceForWS is a mock implementation of IMAPService for WebSocket tests +type mockIMAPServiceForWS struct { + startIdleListenerCalled bool + startIdleListenerCtx chan context.Context +} + +func (m *mockIMAPServiceForWS) StartIdleListener(ctx context.Context, _ string, _ *ws.Hub) { + m.startIdleListenerCalled = true + select { + case m.startIdleListenerCtx <- ctx: + default: + } + // Block until context is canceled (simulating IDLE) + <-ctx.Done() +} + +// Implement other required IMAPService methods (return nil/empty for now) +func (m *mockIMAPServiceForWS) SyncThreadsForFolder(context.Context, string, string) error { + return nil +} + +func (m *mockIMAPServiceForWS) ShouldSyncFolder(context.Context, string, string) (bool, error) { + return false, nil +} + +func (m *mockIMAPServiceForWS) SyncFullMessage(context.Context, string, string, int64) error { + return nil +} + +func (m *mockIMAPServiceForWS) SyncFullMessages(context.Context, string, []imap.MessageToSync) error { + return nil +} + +func (m *mockIMAPServiceForWS) Search(context.Context, string, string, int, int) ([]*models.Thread, int, error) { + return nil, 0, nil +} + +func (m *mockIMAPServiceForWS) Close() {} diff --git a/backend/internal/imap/idle.go b/backend/internal/imap/idle.go new file mode 100644 index 0000000..8b1db76 --- /dev/null +++ b/backend/internal/imap/idle.go @@ -0,0 +1,150 @@ +package imap + +import ( + "context" + "encoding/json" + "log" + "time" + + idle "github.com/emersion/go-imap-idle" + imapclient "github.com/emersion/go-imap/client" + "github.com/vdavid/vmail/backend/internal/websocket" +) + +// idleListenerSleep is the backoff duration after an error before retrying IDLE. +const idleListenerSleep = 10 * time.Second + +// StartIdleListener runs an IMAP IDLE loop for a user and pushes new email events to the Hub. +// It listens on the INBOX folder only. +// This function blocks until the context is canceled. +func (s *Service) StartIdleListener(ctx context.Context, userID string, hub *websocket.Hub) { + for { + // Exit when context is canceled. + select { + case <-ctx.Done(): + return + default: + } + + // If the user has no active WebSocket connections, avoid doing work. + if hub.ActiveConnections(userID) == 0 { + time.Sleep(idleListenerSleep) + continue + } + + listener, err := s.getListenerConnection(ctx, userID) + if err != nil { + time.Sleep(idleListenerSleep) + continue + } + + // Ensure we always unlock the listener. + func() { + defer listener.Unlock() + s.runIdleLoop(ctx, userID, listener.GetClient(), hub) + }() + + // Small backoff before trying again. + time.Sleep(idleListenerSleep) + } +} + +// getListenerConnection gets settings and establishes a listener connection. +func (s *Service) getListenerConnection(ctx context.Context, userID string) (ListenerClient, error) { + settings, imapPassword, err := s.getSettingsAndPassword(ctx, userID) + if err != nil { + log.Printf("IMAP IDLE: failed to get settings for user %s: %v", userID, err) + return nil, err + } + + listener, err := s.imapPool.GetListenerConnection(userID, settings.IMAPServerHostname, settings.IMAPUsername, imapPassword) + if err != nil { + log.Printf("IMAP IDLE: failed to get listener connection for user %s: %v", userID, err) + return nil, err + } + + return listener, nil +} + +// runIdleLoop runs the IDLE command and handles mailbox updates. +func (s *Service) runIdleLoop(ctx context.Context, userID string, client *imapclient.Client, hub *websocket.Hub) { + // Select INBOX for IDLE. + if _, err := client.Select("INBOX", false); err != nil { + log.Printf("IMAP IDLE: failed to select INBOX for user %s: %v", userID, err) + s.imapPool.RemoveListenerConnection(userID) + return + } + + idleClient := idle.NewClient(client) + + // Create a channel to receive mailbox updates. + updates := make(chan imapclient.Update, 10) + client.Updates = updates + + // Start IDLE in a goroutine so we can listen for updates. + stop := make(chan struct{}) + done := make(chan error, 1) + go func() { + done <- idleClient.IdleWithFallback(stop, 5*time.Second) + }() + + for { + select { + case <-ctx.Done(): + // Stop idling and return. + close(stop) + return + case err := <-done: + if err != nil { + log.Printf("IMAP IDLE: idle loop ended with error for user %s: %v", userID, err) + s.imapPool.RemoveListenerConnection(userID) + } + return + case update := <-updates: + if update == nil { + continue + } + s.handleMailboxUpdate(ctx, userID, update, hub) + } + } +} + +// handleMailboxUpdate processes a mailbox update and syncs/notifies if needed. +func (s *Service) handleMailboxUpdate(ctx context.Context, userID string, update imapclient.Update, hub *websocket.Hub) { + // MailboxUpdate updates can indicate new messages. + mboxUpdate, ok := update.(*imapclient.MailboxUpdate) + if !ok || mboxUpdate.Mailbox == nil { + return + } + + status := mboxUpdate.Mailbox + if status.Name != "INBOX" || status.Messages == 0 { + return + } + + // Perform incremental sync for INBOX immediately. + if err := s.SyncThreadsForFolder(ctx, userID, "INBOX"); err != nil { + log.Printf("IMAP IDLE: failed to sync INBOX for user %s: %v", userID, err) + return + } + + // Notify frontend via WebSocket. + s.sendNewEmailNotification(userID, hub) +} + +// sendNewEmailNotification sends a WebSocket notification about new email. +func (s *Service) sendNewEmailNotification(userID string, hub *websocket.Hub) { + msg := struct { + Type string `json:"type"` + Folder string `json:"folder"` + }{ + Type: "new_email", + Folder: "INBOX", + } + payload, err := json.Marshal(msg) + if err != nil { + log.Printf("IMAP IDLE: failed to marshal new_email message: %v", err) + return + } + hub.Send(userID, payload) +} diff --git a/backend/internal/imap/idle_integration_test.go b/backend/internal/imap/idle_integration_test.go new file mode 100644 index 0000000..c3e84b9 --- /dev/null +++ b/backend/internal/imap/idle_integration_test.go @@ -0,0 +1,420 @@ +package imap + +import ( + "context" + "os" + "testing" + "time" + + "github.com/vdavid/vmail/backend/internal/db" + "github.com/vdavid/vmail/backend/internal/models" + "github.com/vdavid/vmail/backend/internal/testutil" +) + +// TestSyncThreadsForFolder_DetectsNewEmail tests the full flow: +// 1. Add initial emails to IMAP and sync them +// 2. List emails from database (get initial count) +// 3. Add a new email to IMAP server +// 4. Sync again (simulating what IDLE listener would do) +// 5. List emails again and verify the new email appears +func TestSyncThreadsForFolder_DetectsNewEmail(t *testing.T) { + // Set test mode to disable TLS for test IMAP server + err := os.Setenv("VMAIL_TEST_MODE", "true") + if err != nil { + t.Fatalf("Failed to set VMAIL_TEST_MODE: %v", err) + } + defer func() { + err := os.Unsetenv("VMAIL_TEST_MODE") + if err != nil { + t.Fatalf("Failed to unset VMAIL_TEST_MODE: %v", err) + } + }() + + pool := testutil.NewTestDB(t) + defer pool.Close() + + // Setup test IMAP server + server := testutil.NewTestIMAPServer(t) + defer server.Close() + + // Ensure INBOX exists + server.EnsureINBOX(t) + + encryptor := getTestEncryptor(t) + service := NewService(pool, NewPool(), encryptor) + defer service.Close() + + ctx := context.Background() + userID, err := db.GetOrCreateUser(ctx, pool, "sync-test@example.com") + if err != nil { + t.Fatalf("Failed to create user: %v", err) + } + + // Save user settings with the test IMAP server + password := server.Password() + encryptedPassword, err := encryptor.Encrypt(password) + if err != nil { + t.Fatalf("Failed to encrypt password: %v", err) + } + + // Also encrypt SMTP password (required field) + encryptedSMTPPassword, err := encryptor.Encrypt(password) + if err != nil { + t.Fatalf("Failed to encrypt SMTP password: %v", err) + } + + settings := &models.UserSettings{ + UserID: userID, + IMAPServerHostname: server.Address, + IMAPUsername: server.Username(), + EncryptedIMAPPassword: encryptedPassword, + EncryptedSMTPPassword: encryptedSMTPPassword, + } + err = db.SaveUserSettings(ctx, pool, settings) + if err != nil { + t.Fatalf("Failed to save user settings: %v", err) + } + + folderName := "INBOX" + + // Step 1: Add initial emails to IMAP + now := time.Now() + initialMessageID1 := "" + initialMessageID2 := "" + _ = server.AddMessage(t, folderName, initialMessageID1, "Initial Email 1", "from1@test.com", "to@test.com", now.Add(-2*time.Hour)) + _ = server.AddMessage(t, folderName, initialMessageID2, "Initial Email 2", "from2@test.com", "to@test.com", now.Add(-1*time.Hour)) + + // Step 2: Sync initial emails + err = service.SyncThreadsForFolder(ctx, userID, folderName) + if err != nil { + t.Fatalf("Failed to sync initial emails: %v", err) + } + + // Step 3: List emails from database (get initial count) + initialThreads, err := db.GetThreadsForFolder(ctx, pool, userID, folderName, 100, 0) + if err != nil { + t.Fatalf("Failed to get initial threads: %v", err) + } + initialCount := len(initialThreads) + t.Logf("Initial thread count: %d", initialCount) + + // Verify we have the initial emails + if initialCount < 2 { + t.Errorf("Expected at least 2 initial threads, got %d", initialCount) + } + + // Verify the initial emails are in the database + foundInitial1 := false + foundInitial2 := false + for _, thread := range initialThreads { + if thread.StableThreadID == initialMessageID1 { + foundInitial1 = true + } + if thread.StableThreadID == initialMessageID2 { + foundInitial2 = true + } + } + if !foundInitial1 { + t.Error("Initial email 1 not found in database") + } + if !foundInitial2 { + t.Error("Initial email 2 not found in database") + } + + // Step 4: Add a new email to IMAP server + newMessageID := "" + newSubject := "New Email Subject" + _ = server.AddMessage(t, folderName, newMessageID, newSubject, "newfrom@test.com", "to@test.com", now) + + // Step 5: Sync again (simulating what IDLE listener would do) + err = service.SyncThreadsForFolder(ctx, userID, folderName) + if err != nil { + t.Fatalf("Failed to sync new email: %v", err) + } + + // Step 6: List emails again and verify the new email appears + updatedThreads, err := db.GetThreadsForFolder(ctx, pool, userID, folderName, 100, 0) + if err != nil { + t.Fatalf("Failed to get updated threads: %v", err) + } + updatedCount := len(updatedThreads) + t.Logf("Updated thread count: %d", updatedCount) + + // Verify the count increased + if updatedCount <= initialCount { + t.Errorf("Expected thread count to increase, got %d (was %d)", updatedCount, initialCount) + } + + // Verify the new email is in the database + foundNewEmail := false + for _, thread := range updatedThreads { + if thread.StableThreadID == newMessageID { + foundNewEmail = true + if thread.Subject != newSubject { + t.Errorf("Expected subject %s, got %s", newSubject, thread.Subject) + } + break + } + } + if !foundNewEmail { + t.Errorf("New email with Message-ID %s not found in database. Threads: %v", newMessageID, getThreadIDs(updatedThreads)) + } + + // Verify the message was saved correctly + msg, err := db.GetMessageByMessageID(ctx, pool, userID, newMessageID) + if err != nil { + t.Fatalf("Failed to get new message from database: %v", err) + } + if msg.Subject != newSubject { + t.Errorf("Expected message subject %s, got %s", newSubject, msg.Subject) + } + if msg.IMAPFolderName != folderName { + t.Errorf("Expected folder %s, got %s", folderName, msg.IMAPFolderName) + } +} + +// TestSyncThreadsForFolder_IncrementalSync verifies that incremental sync +// correctly picks up new emails after the last synced UID. +func TestSyncThreadsForFolder_IncrementalSync(t *testing.T) { + // Set test mode to disable TLS for test IMAP server + err := os.Setenv("VMAIL_TEST_MODE", "true") + if err != nil { + t.Fatalf("Failed to set VMAIL_TEST_MODE: %v", err) + } + defer func() { + err := os.Unsetenv("VMAIL_TEST_MODE") + if err != nil { + t.Fatalf("Failed to unset VMAIL_TEST_MODE: %v", err) + } + }() + + pool := testutil.NewTestDB(t) + defer pool.Close() + + // Ensure folder_sync_timestamps table exists + ctx := context.Background() + _, err2 := pool.Exec(ctx, ` + CREATE TABLE IF NOT EXISTS folder_sync_timestamps ( + user_id UUID NOT NULL REFERENCES users (id) ON DELETE CASCADE, + folder_name TEXT NOT NULL, + synced_at TIMESTAMPTZ NOT NULL DEFAULT now(), + last_synced_uid BIGINT, + thread_count INT DEFAULT 0, + PRIMARY KEY (user_id, folder_name) + ) + `) + if err2 != nil { + t.Fatalf("Failed to ensure folder_sync_timestamps table exists: %v", err2) + } + + // Setup test IMAP server + server := testutil.NewTestIMAPServer(t) + defer server.Close() + + // Ensure INBOX exists + server.EnsureINBOX(t) + + encryptor := getTestEncryptor(t) + service := NewService(pool, NewPool(), encryptor) + defer service.Close() + + userID, err := db.GetOrCreateUser(ctx, pool, "incremental-sync-test@example.com") + if err != nil { + t.Fatalf("Failed to create user: %v", err) + } + + // Save user settings with the test IMAP server + password := server.Password() + encryptedPassword, err := encryptor.Encrypt(password) + if err != nil { + t.Fatalf("Failed to encrypt password: %v", err) + } + + encryptedSMTPPassword, err := encryptor.Encrypt(password) + if err != nil { + t.Fatalf("Failed to encrypt SMTP password: %v", err) + } + + settings := &models.UserSettings{ + UserID: userID, + IMAPServerHostname: server.Address, + IMAPUsername: server.Username(), + EncryptedIMAPPassword: encryptedPassword, + EncryptedSMTPPassword: encryptedSMTPPassword, + } + err = db.SaveUserSettings(ctx, pool, settings) + if err != nil { + t.Fatalf("Failed to save user settings: %v", err) + } + + folderName := "INBOX" + + // Add initial messages and sync + now := time.Now() + _ = server.AddMessage(t, folderName, "", "Initial 1", "from@test.com", "to@test.com", now.Add(-2*time.Hour)) + uid2 := server.AddMessage(t, folderName, "", "Initial 2", "from@test.com", "to@test.com", now.Add(-1*time.Hour)) + + // Sync initial messages + err = service.SyncThreadsForFolder(ctx, userID, folderName) + if err != nil { + t.Fatalf("Failed to sync initial messages: %v", err) + } + + // Set sync info to uid2 (we've synced up to uid2) + lastUID := int64(uid2) + err = db.SetFolderSyncInfo(ctx, pool, userID, folderName, &lastUID) + if err != nil { + t.Fatalf("Failed to set folder sync info: %v", err) + } + + // Add a new message after uid2 + newUID := server.AddMessage(t, folderName, "", "New Message", "from@test.com", "to@test.com", now) + + // Sync again - should use incremental sync + err = service.SyncThreadsForFolder(ctx, userID, folderName) + if err != nil { + t.Fatalf("Failed to sync new message: %v", err) + } + + // Verify the new message is in the database + msg, err := db.GetMessageByMessageID(ctx, pool, userID, "") + if err != nil { + t.Fatalf("Failed to get new message: %v", err) + } + + if msg.IMAPUID != int64(newUID) { + t.Errorf("Expected UID %d, got %d", newUID, msg.IMAPUID) + } + + // Verify sync info was updated + syncInfo, err := db.GetFolderSyncInfo(ctx, pool, userID, folderName) + if err != nil { + t.Fatalf("Failed to get sync info: %v", err) + } + if syncInfo == nil { + t.Fatal("Expected sync info to exist after sync") + } + + if syncInfo.LastSyncedUID == nil || *syncInfo.LastSyncedUID != int64(newUID) { + t.Errorf("Expected LastSyncedUID to be %d, got %v", newUID, syncInfo.LastSyncedUID) + } +} + +// TestSyncThreadsForFolder_CatchesUpOnMissedEmails tests that when syncing +// after a period of inactivity, all missed emails are synced. +// This simulates the scenario where emails arrive while WebSocket is disconnected, +// and then WebSocket connects and triggers a sync. +func TestSyncThreadsForFolder_CatchesUpOnMissedEmails(t *testing.T) { + // Set test mode to disable TLS for test IMAP server + err := os.Setenv("VMAIL_TEST_MODE", "true") + if err != nil { + t.Fatalf("Failed to set VMAIL_TEST_MODE: %v", err) + } + defer func() { + err := os.Unsetenv("VMAIL_TEST_MODE") + if err != nil { + t.Fatalf("Failed to unset VMAIL_TEST_MODE: %v", err) + } + }() + + pool := testutil.NewTestDB(t) + defer pool.Close() + + server := testutil.NewTestIMAPServer(t) + defer server.Close() + server.EnsureINBOX(t) + + encryptor := getTestEncryptor(t) + service := NewService(pool, NewPool(), encryptor) + defer service.Close() + + ctx := context.Background() + userID, err := db.GetOrCreateUser(ctx, pool, "catchup-test@example.com") + if err != nil { + t.Fatalf("Failed to create user: %v", err) + } + + // Save user settings + password := server.Password() + encryptedPassword, _ := encryptor.Encrypt(password) + encryptedSMTPPassword, _ := encryptor.Encrypt(password) + settings := &models.UserSettings{ + UserID: userID, + IMAPServerHostname: server.Address, + IMAPUsername: server.Username(), + EncryptedIMAPPassword: encryptedPassword, + EncryptedSMTPPassword: encryptedSMTPPassword, + } + _ = db.SaveUserSettings(ctx, pool, settings) + + folderName := "INBOX" + now := time.Now() + + // Initial sync: add and sync some emails + uid1 := server.AddMessage(t, folderName, "", "Initial 1", "from@test.com", "to@test.com", now.Add(-2*time.Hour)) + _ = server.AddMessage(t, folderName, "", "Initial 2", "from@test.com", "to@test.com", now.Add(-1*time.Hour)) + + err = service.SyncThreadsForFolder(ctx, userID, folderName) + if err != nil { + t.Fatalf("Failed to sync initial emails: %v", err) + } + + // Set sync info to uid1 (simulating that we've synced up to uid1) + // This simulates a previous sync that happened before WebSocket disconnected + lastUID := int64(uid1) + err = db.SetFolderSyncInfo(ctx, pool, userID, folderName, &lastUID) + if err != nil { + t.Fatalf("Failed to set folder sync info: %v", err) + } + + // Simulate emails arriving while WebSocket was disconnected + missedMessageID1 := "" + missedMessageID2 := "" + _ = server.AddMessage(t, folderName, missedMessageID1, "Missed Email 1", "from@test.com", "to@test.com", now.Add(-30*time.Minute)) + _ = server.AddMessage(t, folderName, missedMessageID2, "Missed Email 2", "from@test.com", "to@test.com", now) + + // Now simulate WebSocket connecting and triggering sync + // This should catch up on all missed emails using incremental sync + err = service.SyncThreadsForFolder(ctx, userID, folderName) + if err != nil { + t.Fatalf("Failed to sync missed emails: %v", err) + } + + // Verify both missed emails are in the database + msg1, err := db.GetMessageByMessageID(ctx, pool, userID, missedMessageID1) + if err != nil { + t.Fatalf("Missed email 1 not found in database: %v", err) + } + if msg1.Subject != "Missed Email 1" { + t.Errorf("Expected subject 'Missed Email 1', got %s", msg1.Subject) + } + + msg2, err := db.GetMessageByMessageID(ctx, pool, userID, missedMessageID2) + if err != nil { + t.Fatalf("Missed email 2 not found in database: %v", err) + } + if msg2.Subject != "Missed Email 2" { + t.Errorf("Expected subject 'Missed Email 2', got %s", msg2.Subject) + } + + // Verify sync info was updated to the highest UID + syncInfo, err := db.GetFolderSyncInfo(ctx, pool, userID, folderName) + if err != nil { + t.Fatalf("Failed to get sync info: %v", err) + } + if syncInfo == nil { + t.Fatal("Expected sync info to exist after sync") + } + if syncInfo.LastSyncedUID == nil { + t.Error("Expected LastSyncedUID to be set after sync") + } +} + +// getThreadIDs is a helper to extract thread IDs for debugging. +func getThreadIDs(threads []*models.Thread) []string { + ids := make([]string, len(threads)) + for i, thread := range threads { + ids[i] = thread.StableThreadID + } + return ids +} diff --git a/backend/internal/imap/pool_cleanup.go b/backend/internal/imap/pool_cleanup.go index 638d279..5e807f8 100644 --- a/backend/internal/imap/pool_cleanup.go +++ b/backend/internal/imap/pool_cleanup.go @@ -5,7 +5,7 @@ import ( ) // startCleanupGoroutine runs a background goroutine that periodically cleans up idle connections. -// The goroutine will stop when cleanupCtx is cancelled (via Pool.Close()). +// The goroutine will stop when cleanupCtx is canceled (via Pool.Close()). func (p *Pool) startCleanupGoroutine() { ticker := time.NewTicker(1 * time.Minute) go func() { @@ -13,7 +13,7 @@ func (p *Pool) startCleanupGoroutine() { for { select { case <-p.cleanupCtx.Done(): - // Context cancelled - stop the ticker and exit + // Context canceled - stop the ticker and exit return case <-ticker.C: // Periodic cleanup diff --git a/backend/internal/imap/pool_interface.go b/backend/internal/imap/pool_interface.go index 86b2d41..5544320 100644 --- a/backend/internal/imap/pool_interface.go +++ b/backend/internal/imap/pool_interface.go @@ -32,6 +32,13 @@ type IMAPPool interface { // Close closes all connections in the pool. Close() + + // GetListenerConnection gets or creates a dedicated listener client for IDLE. + // Returns a locked client that must be unlocked by the caller. + GetListenerConnection(userID, server, username, password string) (ListenerClient, error) + + // RemoveListenerConnection removes a listener connection from the pool. + RemoveListenerConnection(userID string) } // ClientWrapper wraps a go-imap client.Client to implement IMAPClient interface. diff --git a/backend/internal/imap/search_test.go b/backend/internal/imap/search_test.go index fa2c0e0..fab63cf 100644 --- a/backend/internal/imap/search_test.go +++ b/backend/internal/imap/search_test.go @@ -413,8 +413,8 @@ func TestService_buildThreadMapFromMessages(t *testing.T) { } t.Run("returns error when GetMessageByMessageID returns non-NotFound error", func(t *testing.T) { - // Create a cancelled context to simulate a database error - cancelledCtx, cancel := context.WithCancel(ctx) + // Create a canceled context to simulate a database error + canceledCtx, cancel := context.WithCancel(ctx) cancel() // Cancel immediately to cause context error imapMsg := &imap.Message{ @@ -424,7 +424,7 @@ func TestService_buildThreadMapFromMessages(t *testing.T) { }, } - _, _, err := service.buildThreadMapFromMessages(cancelledCtx, userID, []*imap.Message{imapMsg}) + _, _, err := service.buildThreadMapFromMessages(canceledCtx, userID, []*imap.Message{imapMsg}) if err == nil { t.Error("Expected error when GetMessageByMessageID returns non-NotFound error") } diff --git a/backend/internal/imap/service.go b/backend/internal/imap/service.go index 2ca7da6..468d815 100644 --- a/backend/internal/imap/service.go +++ b/backend/internal/imap/service.go @@ -392,7 +392,6 @@ func (s *Service) processFullSyncMessages(ctx context.Context, messages []*imap. // Uses incremental sync if possible (only syncs new messages since last sync). func (s *Service) SyncThreadsForFolder(ctx context.Context, userID, folderName string) error { return s.withClientAndSelectFolder(ctx, userID, folderName, func(client *imapclient.Client, mbox *imap.MailboxStatus) error { - log.Printf("Selected folder %s: %d messages", folderName, mbox.Messages) // Check if we can do incremental sync syncInfo, err := db.GetFolderSyncInfo(ctx, s.dbPool, userID, folderName) @@ -412,13 +411,13 @@ func (s *Service) SyncThreadsForFolder(ctx context.Context, userID, folderName s if err != nil { return fmt.Errorf("failed to fetch message headers: %w", err) } - log.Printf("Fetched %d message headers", len(messages)) + log.Printf("IMAP Sync: Fetched %d message headers for user %s, folder %s", len(messages), userID, folderName) s.processIncrementalMessages(ctx, messages, userID, folderName) // Update sync info with the highest UID highestUIDInt64 := int64(incResult.highestUID) if err := db.SetFolderSyncInfo(ctx, s.dbPool, userID, folderName, &highestUIDInt64); err != nil { - log.Printf("Warning: Failed to set folder sync info: %v", err) + log.Printf("IMAP Sync: Warning: Failed to set folder sync info for user %s, folder %s: %v", userID, folderName, err) } go s.updateThreadCountInBackground(userID, folderName) return nil @@ -439,13 +438,14 @@ func (s *Service) SyncThreadsForFolder(ctx context.Context, userID, folderName s return fmt.Errorf("failed to fetch message headers: %w", err) } - log.Printf("Fetched %d message headers", len(messages)) + log.Printf("IMAP Sync: Fetched %d message headers for user %s, folder %s", len(messages), userID, folderName) // Process messages: use thread structure if available, otherwise use incremental processing threadMaps := fullResult.threadMaps if threadMaps == nil { // THREAD command not supported - process messages without thread structure // (same as incremental sync) + log.Printf("IMAP Sync: THREAD command not supported, processing messages incrementally for user %s, folder %s", userID, folderName) s.processIncrementalMessages(ctx, messages, userID, folderName) } else { // Process messages using thread structure @@ -457,8 +457,9 @@ func (s *Service) SyncThreadsForFolder(ctx context.Context, userID, folderName s // Update sync info with the highest UID highestUIDInt64 := int64(fullResult.highestUID) if err := db.SetFolderSyncInfo(ctx, s.dbPool, userID, folderName, &highestUIDInt64); err != nil { - log.Printf("Warning: Failed to set folder sync info: %v", err) - // Don't fail the entire sync if timestamp update fails + log.Printf("IMAP Sync: Warning: Failed to set folder sync info for user %s, folder %s: %v", userID, folderName, err) + } else { + log.Printf("IMAP Sync: Updated sync info for user %s, folder %s (highest UID: %d)", userID, folderName, fullResult.highestUID) } // Trigger background thread count update diff --git a/backend/internal/imap/service_interface.go b/backend/internal/imap/service_interface.go index 9b06f03..63c0bb5 100644 --- a/backend/internal/imap/service_interface.go +++ b/backend/internal/imap/service_interface.go @@ -4,6 +4,7 @@ import ( "context" "github.com/vdavid/vmail/backend/internal/models" + "github.com/vdavid/vmail/backend/internal/websocket" ) // MessageToSync represents a message that needs to be synced. @@ -35,6 +36,10 @@ type IMAPService interface { // Returns threads, total count, and error. Search(ctx context.Context, userID string, query string, page, limit int) ([]*models.Thread, int, error) + // StartIdleListener runs an IMAP IDLE loop for a user and pushes events to the WebSocket hub. + // This function blocks until the context is canceled. + StartIdleListener(ctx context.Context, userID string, hub *websocket.Hub) + // Close closes the service and cleans up connections. Close() } diff --git a/backend/internal/websocket/hub.go b/backend/internal/websocket/hub.go new file mode 100644 index 0000000..544f87b --- /dev/null +++ b/backend/internal/websocket/hub.go @@ -0,0 +1,121 @@ +package websocket + +import ( + "log" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +// Client wraps a WebSocket connection. +type Client struct { + conn *websocket.Conn +} + +// Conn returns the underlying WebSocket connection. +func (c *Client) Conn() *websocket.Conn { + return c.conn +} + +// Hub manages active WebSocket connections per user. +// It supports multiple connections per user (e.g., multiple tabs). +type Hub struct { + mu sync.RWMutex + clients map[string]map[*Client]struct{} // userID -> set of clients + maxPerUser int +} + +// NewHub creates a new Hub with a per-user connection limit. +func NewHub(maxPerUser int) *Hub { + if maxPerUser <= 0 { + maxPerUser = 10 + } + return &Hub{ + clients: make(map[string]map[*Client]struct{}), + maxPerUser: maxPerUser, + } +} + +// Register adds a WebSocket connection for the given user. +// If the per-user limit is exceeded, the new connection is closed and nil is returned. +func (h *Hub) Register(userID string, conn *websocket.Conn) *Client { + h.mu.Lock() + defer h.mu.Unlock() + + userClients, ok := h.clients[userID] + if !ok { + userClients = make(map[*Client]struct{}) + h.clients[userID] = userClients + } + + if len(userClients) >= h.maxPerUser { + log.Printf("websocket: user %s exceeded max connections (%d), closing new connection", userID, h.maxPerUser) + _ = conn.WriteControl( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "too many connections for this user"), + // Use zero deadline - best effort. + // See https://pkg.go.dev/github.com/gorilla/websocket#Conn.WriteControl + // for details. + //nolint:exhaustruct + time.Time{}, + ) + _ = conn.Close() + return nil + } + + client := &Client{conn: conn} + userClients[client] = struct{}{} + return client +} + +// Unregister removes a client for the given user and closes the connection. +func (h *Hub) Unregister(userID string, client *Client) { + if client == nil { + return + } + + h.mu.Lock() + defer h.mu.Unlock() + + userClients, ok := h.clients[userID] + if !ok { + _ = client.conn.Close() + return + } + + delete(userClients, client) + + if len(userClients) == 0 { + delete(h.clients, userID) + } + + _ = client.conn.Close() +} + +// Send broadcasts a message to all active clients for the user. +func (h *Hub) Send(userID string, msg []byte) { + h.mu.RLock() + userClients := h.clients[userID] + h.mu.RUnlock() + + if len(userClients) == 0 { + return + } + + for client := range userClients { + if err := client.conn.WriteMessage(websocket.TextMessage, msg); err != nil { + log.Printf("websocket: failed to write message for user %s: %v", userID, err) + // Best-effort cleanup: unregister this client. + go h.Unregister(userID, client) + } + } +} + +// ActiveConnections returns the number of active WebSocket connections for a user. +func (h *Hub) ActiveConnections(userID string) int { + h.mu.RLock() + defer h.mu.RUnlock() + + return len(h.clients[userID]) +} diff --git a/docs/architecture.md b/docs/architecture.md index 7f8d3e7..6461520 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -134,14 +134,33 @@ unique identifier, such as the `Message-ID` header of the root/first message in ### Real-time API (WebSockets) -For real-time updates (like new emails), the front end will open a WebSocket connection. +For real-time updates (like new emails), the front end opens a WebSocket connection. -* [ ] `GET /api/v1/ws`: Upgrades the HTTP connection to a WebSocket. +* [x] `GET /api/v1/ws`: Upgrades the HTTP connection to a WebSocket. The server uses this connection to push updates to the client. + * The backend maintains a per-process **WebSocket Hub** that: + * Tracks multiple connections per user (`userID -> set of connections`). + * Limits the number of concurrent connections per user (default: 10). + * Sends messages (like new-email notifications) to all active connections for a user. + * When the first WebSocket connection for a user is established, the backend starts an **IMAP IDLE listener**: + * Uses a dedicated IMAP listener connection from the pool. + * Runs `IDLE` on the `INBOX` folder. + * On new-mail notifications, performs an **incremental sync** for `INBOX` immediately and then pushes an event to the WebSocket hub. * **Server-to-client message example:** ```json - {"type": "new_message", "folder": "INBOX"} + {"type": "new_email", "folder": "INBOX"} ``` + * The front end listens for `new_email` messages and calls `queryClient.invalidateQueries({ queryKey: ['threads', folder] })` + so `GET /threads?folder=...` refetches and the new email appears. + +**Cache TTL as fallback:** +The 5‑minute cache TTL used by `GET /threads` is now a **backup mechanism**: + +* Real-time updates (IDLE + WebSockets) cause immediate incremental syncs for `INBOX`. +* TTL-based sync still runs when: + * WebSockets are not connected or temporarily unavailable. + * The IDLE listener fails or is not yet started. + * A user navigates to a folder without real-time support. ### Technical decisions diff --git a/e2e/fixtures/auth.ts b/e2e/fixtures/auth.ts index 07984b0..d976cb7 100644 --- a/e2e/fixtures/auth.ts +++ b/e2e/fixtures/auth.ts @@ -6,10 +6,10 @@ import { Page } from '@playwright/test' * We intercept API requests and modify the Authorization header to include the email. */ export async function setupAuth(page: Page, userEmail: string = 'test@example.com') { - // Intercept all API requests and modify the Authorization header + // Intercept all API requests and test endpoints, and modify the Authorization header // to include the email in the token format "email:user@example.com" // This allows the backend to extract the email in test mode - await page.route('**/api/**', async (route) => { + const addAuthHeader = async (route: any) => { const request = route.request() const headers = { ...request.headers() } @@ -19,7 +19,12 @@ export async function setupAuth(page: Page, userEmail: string = 'test@example.co // Continue with the modified request await route.continue({ headers }) - }) + } + + // Intercept API routes + await page.route('**/api/**', addAuthHeader) + // Intercept test routes + await page.route('**/test/**', addAuthHeader) } /** diff --git a/e2e/tests/inbox.spec.ts b/e2e/tests/inbox.spec.ts index 66e76f9..64f7127 100644 --- a/e2e/tests/inbox.spec.ts +++ b/e2e/tests/inbox.spec.ts @@ -1,10 +1,6 @@ import { test, expect } from '@playwright/test' -import { - clickFirstEmail, - setupInboxForNavigation, - setupInboxTest, -} from '../utils/helpers' +import { clickFirstEmail, setupInboxForNavigation, setupInboxTest } from '../utils/helpers' /** * Test 2: Existing User Read-Only Flow @@ -137,6 +133,88 @@ test.describe('Existing User Read-Only Flow', () => { } }) + test('shows new emails in real time without page reload', async ({ page }) => { + // Capture console logs to debug WebSocket issues + const consoleMessages: string[] = [] + page.on('console', (msg) => { + const text = msg.text() + consoleMessages.push(`[${msg.type()}] ${text}`) + // Log errors and warnings immediately + if (msg.type() === 'error' || msg.type() === 'warning') { + console.log(`Browser ${msg.type()}:`, text) + } + }) + + // Capture network requests to see WebSocket connection status + const networkErrors: string[] = [] + page.on('requestfailed', (request) => { + const error = `${request.method()} ${request.url()} - ${request.failure()?.errorText}` + networkErrors.push(error) + console.log('Network error:', error) + }) + + const result = await setupInboxTest(page) + if (!result) { + // Skip if redirected to settings + return + } + + // Wait for WebSocket connection to be established. + // The connection status banner only shows when disconnected, so wait for it to not be visible. + // Give it a few seconds for the WebSocket to connect. + await page.waitForTimeout(3000) + + // Verify WebSocket is connected by checking that the connection banner is not visible + // (it only shows when status is 'disconnected') + const connectionBanner = page.locator('text=Connection lost') + const bannerVisible = await connectionBanner.isVisible().catch(() => false) + + if (bannerVisible) { + console.log('WebSocket connection banner is visible - connection may not be established') + console.log('Console messages:', consoleMessages.filter(m => m.includes('WebSocket') || m.includes('error'))) + console.log('Network errors:', networkErrors) + } + + // Capture current thread subjects (if any). + const initialSubjects = await page + .locator('[data-testid="email-subject"]') + .allInnerTexts() + + // Trigger backend helper that appends a new message to INBOX on the test IMAP server. + // The backend is expected to expose a test-only endpoint for this. + // Use page.evaluate to make the request from the page context so it goes through route interceptors + const response = await page.evaluate(async () => { + const res = await fetch('/test/add-imap-message', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + folder: 'INBOX', + subject: 'E2E Real-Time Test', + from: 'sender@example.com', + to: 'username@example.com', + }), + }) + return { status: res.status, statusText: res.statusText } + }) + + if (response.status !== 204) { + console.log(`Test endpoint returned status ${response.status}: ${response.statusText}`) + } + + // Wait for the new subject to appear without reloading the page. + await expect( + page.locator('[data-testid="email-subject"]', { hasText: 'E2E Real-Time Test' }), + ).toBeVisible({ timeout: 15000 }) + + const updatedSubjects = await page + .locator('[data-testid="email-subject"]') + .allInnerTexts() + + expect(updatedSubjects).not.toEqual(initialSubjects) + }) + test('clicking email navigates to thread with correct URL and displays body', async ({ page, }) => { diff --git a/frontend/src/components/ConnectionStatusBanner.tsx b/frontend/src/components/ConnectionStatusBanner.tsx new file mode 100644 index 0000000..62aa646 --- /dev/null +++ b/frontend/src/components/ConnectionStatusBanner.tsx @@ -0,0 +1,26 @@ +import { useConnectionStore } from '../store/connection.store' + +export default function ConnectionStatusBanner() { + const { status, triggerReconnect } = useConnectionStore() + + if (status !== 'disconnected') { + return null + } + + return ( +
+
+ Connection lost. New emails may be delayed. + +
+
+ ) +} diff --git a/frontend/src/components/Layout.tsx b/frontend/src/components/Layout.tsx index f549cc9..9f02ef3 100644 --- a/frontend/src/components/Layout.tsx +++ b/frontend/src/components/Layout.tsx @@ -1,7 +1,9 @@ import { useState, type ReactNode } from 'react' import { useKeyboardShortcuts } from '../hooks/useKeyboardShortcuts' +import { useWebSocket } from '../hooks/useWebSocket' +import ConnectionStatusBanner from './ConnectionStatusBanner' import Header from './Header' import Sidebar from './Sidebar' @@ -11,10 +13,12 @@ interface LayoutProps { export default function Layout({ children }: LayoutProps) { useKeyboardShortcuts() + useWebSocket() const [isSidebarOpen, setIsSidebarOpen] = useState(false) return (
+
{ setIsSidebarOpen(true) diff --git a/frontend/src/hooks/useWebSocket.test.tsx b/frontend/src/hooks/useWebSocket.test.tsx new file mode 100644 index 0000000..c99e1f0 --- /dev/null +++ b/frontend/src/hooks/useWebSocket.test.tsx @@ -0,0 +1,89 @@ +import { QueryClient, QueryClientProvider } from '@tanstack/react-query' +import { act, render } from '@testing-library/react' +import { describe, expect, it, vi, beforeEach, afterEach } from 'vitest' + +import { useConnectionStore } from '../store/connection.store' + +import { useWebSocket } from './useWebSocket' + +class MockSocket { + static instances: MockSocket[] = [] + onopen: (() => void) | null = null + onmessage: ((event: MessageEvent) => void) | null = null + onerror: (() => void) | null = null + onclose: ((event: CloseEvent) => void) | null = null + + url: string + + constructor(url: string) { + this.url = url + MockSocket.instances.push(this) + } + + send(): void {} + + close() { + if (this.onclose) { + this.onclose(new CloseEvent('close')) + } + } +} + +function TestComponent() { + useWebSocket() + return null +} + +function renderWithClient(queryClient: QueryClient) { + return render( + + + , + ) +} + +describe('useWebSocket', () => { + beforeEach(() => { + // Reset connection store between tests + useConnectionStore.setState({ + status: 'connecting', + lastError: null, + forceReconnectToken: 0, + }) + vi.stubGlobal('WebSocket', MockSocket as unknown as typeof WebSocket) + }) + + afterEach(() => { + MockSocket.instances = [] + vi.unstubAllGlobals() + }) + + it('invalidates threads query when new_email message is received', () => { + const queryClient = new QueryClient({ + defaultOptions: { + queries: { retry: false }, + }, + }) + const invalidateSpy = vi + .spyOn(queryClient, 'invalidateQueries') + .mockResolvedValue(undefined) + + renderWithClient(queryClient) + + // Grab the created mock socket instance. + const socket = MockSocket.instances[0] + expect(socket).toBeDefined() + + act(() => { + const event = new MessageEvent('message', { + data: JSON.stringify({ type: 'new_email', folder: 'INBOX' }), + }) + socket.onmessage?.(event) + }) + + expect(invalidateSpy).toHaveBeenCalledWith({ + queryKey: ['threads', 'INBOX'], + exact: false, + }) + }) +}) diff --git a/frontend/src/hooks/useWebSocket.ts b/frontend/src/hooks/useWebSocket.ts new file mode 100644 index 0000000..f9facd5 --- /dev/null +++ b/frontend/src/hooks/useWebSocket.ts @@ -0,0 +1,143 @@ +import { useQueryClient } from '@tanstack/react-query' +import { useEffect, useRef } from 'react' + +import { useConnectionStore } from '../store/connection.store' + +export function useWebSocket() { + const queryClient = useQueryClient() + const { setStatus, setLastError, forceReconnectToken } = useConnectionStore() + const queryClientRef = useRef(queryClient) + const socketRef = useRef(null) + const socketCreationTimeRef = useRef(null) + + // Keep refs up to date without causing re-renders + useEffect(() => { + queryClientRef.current = queryClient + }, [queryClient]) + + useEffect(() => { + // Check if we already have an active socket from a previous effect run (StrictMode) + const existingSocket = socketRef.current + if (existingSocket) { + const state = existingSocket.readyState + // Try to reuse existing socket + if (state === WebSocket.OPEN || state === WebSocket.CONNECTING) { + // Update status to match the current state + if (state === WebSocket.OPEN) { + setStatus('connected') + setLastError(null) + } else { + setStatus('connecting') + } + // Return early - don't create a new socket or set up handlers + // The existing socket will continue with its existing handlers + // No cleanup needed since we're not creating anything new + return + } + // Socket is closed or closing, create a new one + } + + setStatus('connecting') + + // Get the token (currently hardcoded as "token", same as used in API calls). + // TODO: When Authelia is implemented, this should get the actual JWT token. + const token = 'token' + + const wsEnvUrl = import.meta.env.VITE_WS_URL as string | undefined + let wsUrl: string + if (wsEnvUrl && wsEnvUrl.length > 0) { + wsUrl = wsEnvUrl + } else { + const baseUrl = `${window.location.origin.replace(/^http/, 'ws')}/api/v1/ws` + // Append token as query parameter since WebSocket connections can't set headers. + wsUrl = `${baseUrl}?token=${encodeURIComponent(token)}` + } + + // Connect + const socket = new WebSocket(wsUrl) + const socketInstance = socket + socketRef.current = socket + socketCreationTimeRef.current = Date.now() + + socket.onopen = () => { + // Only update state if this is still the current socket + if (socketRef.current === socketInstance) { + setStatus('connected') + setLastError(null) + } else { + // Connection opened but socket ref changed (StrictMode) + socket.close() + } + } + + socket.onerror = (error) => { + // eslint-disable-next-line no-console -- We do want to log this in production too + console.error('WebSocket: Error occurred', error, 'readyState:', socket.readyState) + if (socketRef.current === socketInstance) { + setStatus('disconnected') + setLastError('WebSocket error') + } + } + + socket.onclose = () => { + if (socketRef.current === socketInstance) { + socketRef.current = null + setStatus('disconnected') + } + } + + socket.onmessage = (event) => { + if (socketRef.current !== socketInstance) { + return + } + try { + const data = JSON.parse(event.data as string) as { type?: string; folder?: string } + if (data.type === 'new_email' && data.folder) { + // Invalidate all queries that start with ['threads', folder] + // This will match ['threads', folder, page, limit] for any page/limit + queryClientRef.current + .invalidateQueries({ + queryKey: ['threads', data.folder], + exact: false, // Match all queries that start with this key + }) + .catch((err: unknown) => { + // eslint-disable-next-line no-console -- Weird error, better log it + console.error('WebSocket: Failed to invalidate queries', err) + }) + } + } catch (err) { + // eslint-disable-next-line no-console -- We actually want to log this + console.error('WebSocket: Failed to parse message', err, event.data) + } + } + + return () => { + // Cleanup method + + // Only clean up if this is still the current socket + if (socketRef.current !== socketInstance) { + return + } + + const timeSinceCreation = socketCreationTimeRef.current + ? Date.now() - socketCreationTimeRef.current + : Infinity + + // In StrictMode, effects run twice. If cleanup is called very soon after creation, + // it's likely a StrictMode double-mount. Don't close immediately. + // The 100 ms is kinda arbitrary, so this is kinda a hack. + if (timeSinceCreation < 100) { + return + } + + socketRef.current = null + if ( + socket.readyState === WebSocket.CONNECTING || + socket.readyState === WebSocket.OPEN + ) { + // Close socket as cleanup + socket.close() + } + } + }, [forceReconnectToken, setLastError, setStatus]) +} diff --git a/frontend/src/store/connection.store.ts b/frontend/src/store/connection.store.ts new file mode 100644 index 0000000..e342780 --- /dev/null +++ b/frontend/src/store/connection.store.ts @@ -0,0 +1,30 @@ +import { create } from 'zustand' + +type ConnectionStatus = 'connected' | 'connecting' | 'disconnected' + +interface ConnectionState { + status: ConnectionStatus + lastError: string | null + forceReconnectToken: number + setStatus: (status: ConnectionStatus) => void + setLastError: (message: string | null) => void + triggerReconnect: () => void +} + +export const useConnectionStore = create((set) => ({ + status: 'connecting', + lastError: null, + forceReconnectToken: 0, + setStatus: (status) => { + set({ status }) + }, + setLastError: (message) => { + set({ lastError: message }) + }, + triggerReconnect: () => { + set((state) => ({ + forceReconnectToken: state.forceReconnectToken + 1, + status: 'connecting', + })) + }, +})) diff --git a/frontend/vite.config.ts b/frontend/vite.config.ts index a31d64c..758da14 100644 --- a/frontend/vite.config.ts +++ b/frontend/vite.config.ts @@ -18,6 +18,11 @@ export default defineConfig({ strictPort: true, proxy: { '/api': { + target: process.env.VITE_API_URL || 'http://localhost:11764', + changeOrigin: true, // Needed for CORS + ws: true, // Enable WebSocket proxying + }, + '/test': { target: process.env.VITE_API_URL || 'http://localhost:11764', changeOrigin: true, }, @@ -35,6 +40,11 @@ export default defineConfig({ strictPort: true, proxy: { '/api': { + target: process.env.VITE_API_URL || 'http://localhost:11764', + changeOrigin: true, // Needed for CORS + ws: true, // Enable WebSocket proxying + }, + '/test': { target: process.env.VITE_API_URL || 'http://localhost:11764', changeOrigin: true, },