From f82e149a920e33595b2b23b1eae619d8b9714811 Mon Sep 17 00:00:00 2001 From: ihlec Date: Tue, 10 Mar 2026 16:31:47 +0100 Subject: [PATCH 1/2] re-run bootstrap if all nodes are lost --- README.md | 47 +- internal/monitor/monitor_cleanup.go | 18 +- internal/monitor/monitor_keywords.go | 666 +++++++++++++++++++++++++++ internal/monitor/monitor_libp2p.go | 58 +++ 4 files changed, 749 insertions(+), 40 deletions(-) create mode 100644 internal/monitor/monitor_keywords.go diff --git a/README.md b/README.md index f77a89e..a51638f 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ export DLOCKSS_DATA_DIR="$HOME/my-data" # Node Identity export DLOCKSS_NODE_NAME="my-node" # Human-readable name (shown in monitor) -export DLOCKSS_IDENTITY_PATH="/data/dlockss.key" # Persistent identity key location +export DLOCKSS_IDENTITY_PATH="/data/dlockss.key" # Persistent identity key location (fallback if IPFS node config cannot be read) export DLOCKSS_IPFS_CONFIG="/path/to/ipfs/config" # Kubo config JSON (derives identity from IPFS repo) # Replication Targets @@ -87,47 +87,16 @@ For Docker deployments: either mount the Kubo config file and set `DLOCKSS_IPFS_ #### Docker Compose Example -```yaml -services: - ipfs: - image: ipfs/kubo:latest - volumes: - - ipfs-data:/data/ipfs - ports: - - "4001:4001" # Swarm - - "5001:5001" # API - - dlockss: - image: dlockss:latest - depends_on: - - ipfs - volumes: - - ipfs-data:/ipfs-repo:ro # read-only access to Kubo config - - dlockss-data:/data - environment: - DLOCKSS_IPFS_CONFIG: /ipfs-repo/config # derive identity from Kubo - DLOCKSS_IPFS_NODE: /dns4/ipfs/tcp/5001 # connect to Kubo API - DLOCKSS_DATA_DIR: /data/ingest - DLOCKSS_NODE_NAME: my-node - -volumes: - ipfs-data: - dlockss-data: -``` - -See [docs/DLOCKSS_PROTOCOL.md](docs/DLOCKSS_PROTOCOL.md) for protocol details. - -### Docker-compose ```yaml services: dlockss-node: image: ghcr.io/gipplab/dlockss-single-node:latest restart: unless-stopped environment: - DLOCKSS_IPFS_NODE: "/dns4/ipfs/tcp/5001" # "ipfs" resolves to the Kubo service below - DLOCKSS_DATA_DIR: "/data/ingest" # tells DLOCKSS were to scan for ingested data and documents + DLOCKSS_IPFS_NODE: "/dns4/ipfs/tcp/5001" # "ipfs" resolves to the Kubo service below + DLOCKSS_DATA_DIR: "/data/ingest" # location that DLOCKSS monitors for igesting files DLOCKSS_IPFS_CONFIG: "/ipfs-repo/config" # derive identity from IPFS node (shared peer ID) - # DLOCKSS_NODE_NAME: dlockss.example.tld # human-readable name shown in the monitor; + # DLOCKSS_NODE_NAME: my-node # human-readable name shown in the monitor; # # if empty the peer ID is displayed instead volumes: - dlockss-data:/data # persistent D-LOCKSS data (identity, cluster state, ingested files) @@ -138,7 +107,6 @@ services: labels: - com.centurylinklabs.watchtower.enable=true - # Kubo IPFS node — see https://github.com/ipfs/kubo/blob/master/docker-compose.yaml ipfs: image: ipfs/kubo restart: unless-stopped @@ -161,7 +129,6 @@ services: # Watchtower keeps the D-LOCKSS image up to date automatically. # Recommended until a stable release is published. - # See https://containrrr.github.io/watchtower/arguments/ watchtower: image: containrrr/watchtower restart: always @@ -170,11 +137,13 @@ services: command: --label-enable --include-stopped --revive-stopped volumes: - ipfs-staging: # IPFS staging area on /export — see https://docs.ipfs.tech/install/run-ipfs-inside-docker/ + ipfs-staging: # IPFS staging area on /export ipfs-data: # IPFS repo on /data/ipfs (shared read-only with D-LOCKSS for identity) - dlockss-data: # persistent D-LOCKSS data (identity key, cluster state, ingested files + dlockss-data: # persistent D-LOCKSS data (identity key, cluster state, ingested files) ``` +See [docs/DLOCKSS_PROTOCOL.md](docs/DLOCKSS_PROTOCOL.md) for protocol details. + --- ## 3. Architecture diff --git a/internal/monitor/monitor_cleanup.go b/internal/monitor/monitor_cleanup.go index a31564b..bf5cd29 100644 --- a/internal/monitor/monitor_cleanup.go +++ b/internal/monitor/monitor_cleanup.go @@ -6,6 +6,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" ) // SplitGracePeriod: after a split, don't prune nodes for this duration to allow @@ -98,6 +99,10 @@ func (m *Monitor) pruneOrphanedSplitEvents() { // evictStalePeerstoreEntries removes peers from the libp2p peerstore that are // disconnected and not tracked in the monitor's nodes map. Without this, the // peerstore grows unbounded from DHT crawls and transient connections. +// +// To avoid a death spiral (prune nodes → evict their peerstore entries → +// GossipSub can't re-graft → no heartbeats → prune more nodes), we also +// protect peers that GossipSub is actively using across any subscribed topic. func (m *Monitor) evictStalePeerstoreEntries() { if m.host == nil { return @@ -110,6 +115,14 @@ func (m *Monitor) evictStalePeerstoreEntries() { for id := range m.nodes { activeNodes[id] = true } + // Protect peers in any GossipSub topic so mesh recovery remains possible + // even after all tracked nodes have been pruned. + topicPeers := make(map[peer.ID]bool) + for _, topic := range m.shardTopics { + for _, pid := range topic.ListPeers() { + topicPeers[pid] = true + } + } m.mu.RUnlock() var evicted int @@ -123,11 +136,14 @@ func (m *Monitor) evictStalePeerstoreEntries() { if activeNodes[pid.String()] { continue } + if topicPeers[pid] { + continue + } ps.RemovePeer(pid) evicted++ } if evicted > 0 { - slog.Info("peerstore gc evicted stale peers", "count", evicted) + slog.Info("peerstore gc evicted stale peers", "count", evicted, "protected_topic_peers", len(topicPeers)) } } diff --git a/internal/monitor/monitor_keywords.go b/internal/monitor/monitor_keywords.go new file mode 100644 index 0000000..3d3386d --- /dev/null +++ b/internal/monitor/monitor_keywords.go @@ -0,0 +1,666 @@ +package monitor + +import ( + "bytes" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "net/url" + "sort" + "strings" + "sync" + "time" + + "dlockss/pkg/schema" +) + +const ( + geminiModel = "gemini-2.5-flash-lite" + geminiEndpoint = "https://generativelanguage.googleapis.com/v1beta/models/" + + maxPDFFetchSize = 20 * 1024 * 1024 // 20 MB (Gemini request limit) + + keywordPrompt = `Analyze this PDF document. Extract the following information and return ONLY a valid JSON object with these fields: +- "title": the document title (string) +- "broad_field": the broad academic/research field (string, e.g. "Computer Science", "Biology", "Physics", "Economics") +- "sub_topic": the sub-topic within that field (string, e.g. "Machine Learning", "Genomics", "Quantum Computing") +- "research_niche": the specific research niche (string, e.g. "Transformer Architectures for NLP", "CRISPR Gene Editing in Plants") +- "keywords": the 10 most important keywords or key phrases (array of exactly 10 lowercase strings) +Example: {"title":"Attention Is All You Need","broad_field":"Computer Science","sub_topic":"Machine Learning","research_niche":"Transformer Architectures for Sequence Modeling","keywords":["transformer","attention mechanism","self-attention","neural networks","sequence modeling","encoder-decoder","natural language processing","deep learning","machine translation","positional encoding"]}` + + // Gemini 2.5 Flash-Lite free tier limits + geminiRPM = 15 + geminiRPD = 1000 + geminiRequestSpacing = 4500 * time.Millisecond // ~13.3 RPM, safe margin under 15 RPM + + manifestFetchTimeout = 30 * time.Second + pdfFetchTimeout = 90 * time.Second + geminiFetchTimeout = 120 * time.Second + + maxRetries = 3 + retryCooldown = 10 * time.Minute + maxRecentItems = 30 + + ipfsGateway = "https://ipfs.io" +) + +// KeywordStore manages keyword extraction, indexing, and search for CIDs. +type KeywordStore struct { + mu sync.RWMutex + + cidKeywords map[string]*CIDKeywordEntry // manifest CID → entry + keywordCIDs map[string]map[string]struct{} // lowercase keyword → set of manifest CIDs + processed map[string]bool // CIDs fully processed (or skipped) + failures map[string]*failureRecord // CIDs that failed processing + recent []RecentSearch // recent successful searches (ring buffer) + + dailyCount int + dayStart time.Time + + totalOK int + totalFail int + totalSkipped int + + apiKey string +} + +type CIDKeywordEntry struct { + ManifestCID string `json:"manifest_cid"` + PayloadCID string `json:"payload_cid"` + MetaRef string `json:"meta_ref"` + Title string `json:"title"` + BroadField string `json:"broad_field"` + SubTopic string `json:"sub_topic"` + ResearchNiche string `json:"research_niche"` + Keywords []string `json:"keywords"` + IndexedAt time.Time `json:"indexed_at"` +} + +type failureRecord struct { + count int + lastTry time.Time +} + +type RecentSearch struct { + Keyword string `json:"keyword"` + ResultCount int `json:"result_count"` + Timestamp int64 `json:"timestamp"` +} + +type KeywordSuggestion struct { + Keyword string `json:"keyword"` + CIDCount int `json:"cid_count"` +} + +type KeywordStats struct { + TotalCIDs int `json:"total_cids"` + Indexed int `json:"indexed"` + Failed int `json:"failed"` + Skipped int `json:"skipped"` + Pending int `json:"pending"` + UniqueKeywords int `json:"unique_keywords"` + DailyRemaining int `json:"daily_remaining"` + Enabled bool `json:"enabled"` +} + +// geminiRequest / geminiResponse model the Gemini REST API. +type geminiRequest struct { + Contents []geminiContent `json:"contents"` + GenerationConfig *geminiGenConfig `json:"generationConfig,omitempty"` +} + +type geminiContent struct { + Parts []geminiPart `json:"parts"` +} + +type geminiPart struct { + Text string `json:"text,omitempty"` + InlineData *geminiInline `json:"inline_data,omitempty"` +} + +type geminiInline struct { + MimeType string `json:"mime_type"` + Data string `json:"data"` +} + +type geminiGenConfig struct { + Temperature float64 `json:"temperature"` + MaxOutputTokens int `json:"maxOutputTokens"` +} + +type geminiResponse struct { + Candidates []struct { + Content struct { + Parts []struct { + Text string `json:"text"` + } `json:"parts"` + } `json:"content"` + } `json:"candidates"` + Error *struct { + Code int `json:"code"` + Message string `json:"message"` + } `json:"error"` +} + +func NewKeywordStore(apiKey string) *KeywordStore { + return &KeywordStore{ + cidKeywords: make(map[string]*CIDKeywordEntry), + keywordCIDs: make(map[string]map[string]struct{}), + processed: make(map[string]bool), + failures: make(map[string]*failureRecord), + recent: make([]RecentSearch, 0, maxRecentItems), + dayStart: startOfDayPT(time.Now()), + apiKey: apiKey, + } +} + +// Run is the background loop that discovers new CIDs from the monitor and extracts keywords. +func (ks *KeywordStore) Run(ctx <-chan struct{}, monitor *Monitor) { + if ks.apiKey == "" { + log.Println("[Keywords] GEMINI_API_KEY not set — keyword extraction disabled") + return + } + log.Printf("[Keywords] Background extraction enabled (model: %s, spacing: %s)", geminiModel, geminiRequestSpacing) + + ticker := time.NewTicker(geminiRequestSpacing) + defer ticker.Stop() + + for { + select { + case <-ctx: + return + case <-ticker.C: + cid := ks.pickNextCID(monitor) + if cid == "" { + continue + } + ks.processCID(cid, monitor) + } + } +} + +func (ks *KeywordStore) pickNextCID(monitor *Monitor) string { + ks.mu.Lock() + ks.resetDayIfNeeded() + if ks.dailyCount >= geminiRPD { + ks.mu.Unlock() + return "" + } + ks.mu.Unlock() + + monitor.mu.RLock() + candidates := make([]string, 0) + for cidStr := range monitor.uniqueCIDs { + candidates = append(candidates, cidStr) + } + monitor.mu.RUnlock() + + sort.Strings(candidates) + + ks.mu.RLock() + defer ks.mu.RUnlock() + + now := time.Now() + for _, c := range candidates { + if ks.processed[c] { + continue + } + if f, ok := ks.failures[c]; ok { + if f.count >= maxRetries { + continue + } + if now.Sub(f.lastTry) < retryCooldown { + continue + } + } + return c + } + return "" +} + +func (ks *KeywordStore) processCID(manifestCID string, monitor *Monitor) { + payloadCID, metaRef, err := ks.resolveManifest(manifestCID) + if err != nil { + log.Printf("[Keywords] Manifest resolve failed for %s: %v", manifestCID, err) + ks.recordFailure(manifestCID) + return + } + + if !looksLikePDF(metaRef) { + pdfData, isPDF, fetchErr := ks.fetchAndCheckPDF(payloadCID) + if fetchErr != nil { + log.Printf("[Keywords] Fetch failed for payload %s: %v", payloadCID, fetchErr) + ks.recordFailure(manifestCID) + return + } + if !isPDF { + log.Printf("[Keywords] Skipping non-PDF: %s (meta_ref: %s)", manifestCID, metaRef) + ks.markSkipped(manifestCID) + return + } + ks.extractAndStore(manifestCID, payloadCID, metaRef, pdfData, monitor) + return + } + + pdfData, _, fetchErr := ks.fetchAndCheckPDF(payloadCID) + if fetchErr != nil { + log.Printf("[Keywords] PDF fetch failed for %s: %v", payloadCID, fetchErr) + ks.recordFailure(manifestCID) + return + } + + ks.extractAndStore(manifestCID, payloadCID, metaRef, pdfData, monitor) +} + +func (ks *KeywordStore) extractAndStore(manifestCID, payloadCID, metaRef string, pdfData []byte, monitor *Monitor) { + result, err := ks.callGemini(pdfData) + if err != nil { + log.Printf("[Keywords] Gemini call failed for %s: %v", manifestCID, err) + ks.recordFailure(manifestCID) + return + } + + ks.mu.Lock() + ks.dailyCount++ + entry := &CIDKeywordEntry{ + ManifestCID: manifestCID, + PayloadCID: payloadCID, + MetaRef: metaRef, + Title: result.Title, + BroadField: result.BroadField, + SubTopic: result.SubTopic, + ResearchNiche: result.ResearchNiche, + Keywords: result.Keywords, + IndexedAt: time.Now(), + } + ks.cidKeywords[manifestCID] = entry + + // Index all searchable labels into the unified keyword→CID map + allLabels := make([]string, 0, len(result.Keywords)+3) + allLabels = append(allLabels, result.Keywords...) + for _, label := range []string{result.BroadField, result.SubTopic, result.ResearchNiche} { + if label != "" { + allLabels = append(allLabels, label) + } + } + for _, kw := range allLabels { + kwLower := strings.ToLower(strings.TrimSpace(kw)) + if kwLower == "" { + continue + } + if ks.keywordCIDs[kwLower] == nil { + ks.keywordCIDs[kwLower] = make(map[string]struct{}) + } + ks.keywordCIDs[kwLower][manifestCID] = struct{}{} + } + ks.processed[manifestCID] = true + delete(ks.failures, manifestCID) + ks.totalOK++ + ks.mu.Unlock() + + log.Printf("[Keywords] Indexed %s → %q [%s / %s / %s] %v", + manifestCID, result.Title, result.BroadField, result.SubTopic, result.ResearchNiche, result.Keywords) +} + +func (ks *KeywordStore) resolveManifest(manifestCID string) (payloadCID, metaRef string, err error) { + reqURL := ipfsGateway + "/ipfs/" + url.PathEscape(manifestCID) + client := &http.Client{Timeout: manifestFetchTimeout} + resp, err := client.Get(reqURL) + if err != nil { + return "", "", fmt.Errorf("gateway fetch: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return "", "", fmt.Errorf("gateway status: %s", resp.Status) + } + + block, err := io.ReadAll(io.LimitReader(resp.Body, 1*1024*1024)) + if err != nil { + return "", "", fmt.Errorf("read body: %w", err) + } + + var ro schema.ResearchObject + if err := ro.UnmarshalCBOR(block); err != nil { + return "", "", fmt.Errorf("unmarshal CBOR: %w", err) + } + + return ro.Payload.String(), ro.MetadataRef, nil +} + +func (ks *KeywordStore) fetchAndCheckPDF(payloadCID string) ([]byte, bool, error) { + reqURL := ipfsGateway + "/ipfs/" + url.PathEscape(payloadCID) + client := &http.Client{Timeout: pdfFetchTimeout} + resp, err := client.Get(reqURL) + if err != nil { + return nil, false, fmt.Errorf("gateway fetch: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, false, fmt.Errorf("gateway status: %s", resp.Status) + } + + data, err := io.ReadAll(io.LimitReader(resp.Body, maxPDFFetchSize+1)) + if err != nil { + return nil, false, fmt.Errorf("read body: %w", err) + } + if len(data) > maxPDFFetchSize { + return nil, false, fmt.Errorf("file exceeds %d MB limit", maxPDFFetchSize/(1024*1024)) + } + + isPDF := len(data) >= 4 && string(data[:4]) == "%PDF" + return data, isPDF, nil +} + +func (ks *KeywordStore) callGemini(pdfData []byte) (*geminiResult, error) { + b64 := base64.StdEncoding.EncodeToString(pdfData) + + reqBody := geminiRequest{ + Contents: []geminiContent{{ + Parts: []geminiPart{ + {Text: keywordPrompt}, + {InlineData: &geminiInline{MimeType: "application/pdf", Data: b64}}, + }, + }}, + GenerationConfig: &geminiGenConfig{ + Temperature: 0.1, + MaxOutputTokens: 512, + }, + } + + bodyBytes, err := json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("marshal request: %w", err) + } + + apiURL := geminiEndpoint + geminiModel + ":generateContent?key=" + ks.apiKey + client := &http.Client{Timeout: geminiFetchTimeout} + resp, err := client.Post(apiURL, "application/json", bytes.NewReader(bodyBytes)) + if err != nil { + return nil, fmt.Errorf("HTTP request: %w", err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response: %w", err) + } + + if resp.StatusCode == 429 { + return nil, fmt.Errorf("rate limited (429)") + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("API error %d: %s", resp.StatusCode, string(respBody)) + } + + var gemResp geminiResponse + if err := json.Unmarshal(respBody, &gemResp); err != nil { + return nil, fmt.Errorf("unmarshal response: %w", err) + } + if gemResp.Error != nil { + return nil, fmt.Errorf("API error %d: %s", gemResp.Error.Code, gemResp.Error.Message) + } + + if len(gemResp.Candidates) == 0 || len(gemResp.Candidates[0].Content.Parts) == 0 { + return nil, fmt.Errorf("empty response from Gemini") + } + + text := strings.TrimSpace(gemResp.Candidates[0].Content.Parts[0].Text) + return parseGeminiResponse(text) +} + +// geminiResult is the expected JSON structure from the Gemini response text. +type geminiResult struct { + Title string `json:"title"` + BroadField string `json:"broad_field"` + SubTopic string `json:"sub_topic"` + ResearchNiche string `json:"research_niche"` + Keywords []string `json:"keywords"` +} + +func parseGeminiResponse(text string) (*geminiResult, error) { + // Strip markdown code fences if present + text = strings.TrimPrefix(text, "```json") + text = strings.TrimPrefix(text, "```") + text = strings.TrimSuffix(text, "```") + text = strings.TrimSpace(text) + + // Find JSON object boundaries + start := strings.Index(text, "{") + end := strings.LastIndex(text, "}") + if start == -1 || end == -1 || end <= start { + return nil, fmt.Errorf("no JSON object found in: %s", text) + } + + var result geminiResult + if err := json.Unmarshal([]byte(text[start:end+1]), &result); err != nil { + return nil, fmt.Errorf("parse JSON object: %w (text: %s)", err, text) + } + + result.Title = strings.TrimSpace(result.Title) + result.BroadField = strings.TrimSpace(result.BroadField) + result.SubTopic = strings.TrimSpace(result.SubTopic) + result.ResearchNiche = strings.TrimSpace(result.ResearchNiche) + + keywords := make([]string, 0, 10) + for _, kw := range result.Keywords { + kw = strings.TrimSpace(kw) + if kw == "" { + continue + } + keywords = append(keywords, strings.ToLower(kw)) + if len(keywords) == 10 { + break + } + } + if len(keywords) == 0 { + return nil, fmt.Errorf("no keywords extracted") + } + result.Keywords = keywords + return &result, nil +} + +// Search returns CID entries matching the given keyword query. Multiple words use AND logic. +func (ks *KeywordStore) Search(query string) []CIDKeywordEntry { + query = strings.ToLower(strings.TrimSpace(query)) + if query == "" { + return nil + } + + terms := strings.Fields(query) + ks.mu.RLock() + defer ks.mu.RUnlock() + + var matchSet map[string]struct{} + for _, term := range terms { + termMatches := make(map[string]struct{}) + for kw, cids := range ks.keywordCIDs { + if strings.Contains(kw, term) { + for c := range cids { + termMatches[c] = struct{}{} + } + } + } + // Also match against title, broad field, sub-topic, and research niche + for cid, entry := range ks.cidKeywords { + haystack := strings.ToLower(entry.Title + " " + entry.BroadField + " " + entry.SubTopic + " " + entry.ResearchNiche) + if strings.Contains(haystack, term) { + termMatches[cid] = struct{}{} + } + } + if matchSet == nil { + matchSet = termMatches + } else { + for c := range matchSet { + if _, ok := termMatches[c]; !ok { + delete(matchSet, c) + } + } + } + } + + results := make([]CIDKeywordEntry, 0, len(matchSet)) + for c := range matchSet { + if entry, ok := ks.cidKeywords[c]; ok { + results = append(results, *entry) + } + } + sort.Slice(results, func(i, j int) bool { + return results[i].IndexedAt.After(results[j].IndexedAt) + }) + return results +} + +// Suggest returns keyword suggestions matching the given prefix. +func (ks *KeywordStore) Suggest(prefix string) []KeywordSuggestion { + prefix = strings.ToLower(strings.TrimSpace(prefix)) + + ks.mu.RLock() + defer ks.mu.RUnlock() + + var suggestions []KeywordSuggestion + for kw, cids := range ks.keywordCIDs { + if prefix == "" || strings.Contains(kw, prefix) { + suggestions = append(suggestions, KeywordSuggestion{ + Keyword: kw, + CIDCount: len(cids), + }) + } + } + + sort.Slice(suggestions, func(i, j int) bool { + if suggestions[i].CIDCount != suggestions[j].CIDCount { + return suggestions[i].CIDCount > suggestions[j].CIDCount + } + return suggestions[i].Keyword < suggestions[j].Keyword + }) + + if len(suggestions) > 20 { + suggestions = suggestions[:20] + } + return suggestions +} + +// RecordSearch adds a successful search to the recent searches list. +func (ks *KeywordStore) RecordSearch(keyword string, resultCount int) { + if resultCount == 0 { + return + } + ks.mu.Lock() + defer ks.mu.Unlock() + + // Deduplicate: remove old entry for same keyword + for i, s := range ks.recent { + if strings.EqualFold(s.Keyword, keyword) { + ks.recent = append(ks.recent[:i], ks.recent[i+1:]...) + break + } + } + + ks.recent = append(ks.recent, RecentSearch{ + Keyword: strings.ToLower(keyword), + ResultCount: resultCount, + Timestamp: time.Now().Unix(), + }) + + if len(ks.recent) > maxRecentItems { + ks.recent = ks.recent[len(ks.recent)-maxRecentItems:] + } +} + +// GetRecentSearches returns recent successful searches, newest first. +func (ks *KeywordStore) GetRecentSearches() []RecentSearch { + ks.mu.RLock() + defer ks.mu.RUnlock() + + result := make([]RecentSearch, len(ks.recent)) + for i, s := range ks.recent { + result[len(ks.recent)-1-i] = s + } + return result +} + +// GetStats returns keyword extraction progress statistics. +func (ks *KeywordStore) GetStats(totalUniqueCIDs int) KeywordStats { + ks.mu.RLock() + defer ks.mu.RUnlock() + + ks.resetDayIfNeededRLocked() + + indexed := ks.totalOK + failed := ks.totalFail + skipped := ks.totalSkipped + pending := totalUniqueCIDs - indexed - skipped + + permanentFails := 0 + for _, f := range ks.failures { + if f.count >= maxRetries { + permanentFails++ + } + } + pending -= permanentFails + if pending < 0 { + pending = 0 + } + + return KeywordStats{ + TotalCIDs: totalUniqueCIDs, + Indexed: indexed, + Failed: failed + permanentFails, + Skipped: skipped, + Pending: pending, + UniqueKeywords: len(ks.keywordCIDs), + DailyRemaining: geminiRPD - ks.dailyCount, + Enabled: ks.apiKey != "", + } +} + +func (ks *KeywordStore) recordFailure(manifestCID string) { + ks.mu.Lock() + defer ks.mu.Unlock() + f, ok := ks.failures[manifestCID] + if !ok { + f = &failureRecord{} + ks.failures[manifestCID] = f + } + f.count++ + f.lastTry = time.Now() + ks.totalFail++ +} + +func (ks *KeywordStore) markSkipped(manifestCID string) { + ks.mu.Lock() + defer ks.mu.Unlock() + ks.processed[manifestCID] = true + ks.totalSkipped++ +} + +func (ks *KeywordStore) resetDayIfNeeded() { + now := time.Now() + dayStart := startOfDayPT(now) + if dayStart.After(ks.dayStart) { + ks.dayStart = dayStart + ks.dailyCount = 0 + } +} + +func (ks *KeywordStore) resetDayIfNeededRLocked() { + // read-only check (no mutation); safe under RLock + // actual reset happens in write-locked paths +} + +func looksLikePDF(metaRef string) bool { + lower := strings.ToLower(metaRef) + return strings.HasSuffix(lower, ".pdf") +} + +func startOfDayPT(t time.Time) time.Time { + loc, err := time.LoadLocation("America/Los_Angeles") + if err != nil { + loc = time.UTC + } + pt := t.In(loc) + return time.Date(pt.Year(), pt.Month(), pt.Day(), 0, 0, 0, 0, loc) +} diff --git a/internal/monitor/monitor_libp2p.go b/internal/monitor/monitor_libp2p.go index 01ceddc..9bc470f 100644 --- a/internal/monitor/monitor_libp2p.go +++ b/internal/monitor/monitor_libp2p.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "sync" + "time" "dlockss/internal/discovery" @@ -15,12 +16,18 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/discovery/mdns" "github.com/libp2p/go-libp2p/p2p/discovery/routing" dutil "github.com/libp2p/go-libp2p/p2p/discovery/util" ) +const ( + meshMaintenanceInterval = 10 * time.Minute + bootstrapConnectTimeout = 15 * time.Second +) + func getMonitorIdentityPath() string { if cwd, err := os.Getwd(); err == nil { path := filepath.Join(cwd, MonitorIdentityFile) @@ -128,6 +135,57 @@ func StartLibP2P(ctx context.Context, monitor *Monitor) (host.Host, error) { } go discovery.RunPeerFinder(ctx, h, routingDiscovery, DiscoveryServiceTag) + go runMeshMaintenance(ctx, h, kademliaDHT, routingDiscovery) return h, nil } + +// runMeshMaintenance periodically re-bootstraps the DHT, reconnects to +// bootstrap peers, and re-advertises the service tag. This recovers from +// GossipSub mesh degradation that can accumulate over days of runtime — +// without it the monitor silently stops receiving heartbeats and prunes +// all nodes, requiring a full restart to recover. +func runMeshMaintenance(ctx context.Context, h host.Host, kademliaDHT *dht.IpfsDHT, rd *routing.RoutingDiscovery) { + ticker := time.NewTicker(meshMaintenanceInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + connected := 0 + for _, pid := range h.Network().Peers() { + if h.Network().Connectedness(pid) == network.Connected { + connected++ + } + } + + if err := kademliaDHT.Bootstrap(ctx); err != nil { + slog.Warn("mesh maintenance: DHT re-bootstrap failed", "error", err) + } + + reconnected := 0 + for _, peerAddr := range dht.DefaultBootstrapPeers { + peerinfo, err := peer.AddrInfoFromP2pAddr(peerAddr) + if err != nil { + continue + } + if h.Network().Connectedness(peerinfo.ID) == network.Connected { + continue + } + connCtx, cancel := context.WithTimeout(ctx, bootstrapConnectTimeout) + if err := h.Connect(connCtx, *peerinfo); err == nil { + reconnected++ + } + cancel() + } + + dutil.Advertise(ctx, rd, DiscoveryServiceTag) + + slog.Info("mesh maintenance complete", + "connected_peers", connected, + "bootstrap_reconnected", reconnected, + ) + } +} From 5d7cbb4a24afe8acc063f75735f5383b0df0447d Mon Sep 17 00:00:00 2001 From: ihlec Date: Tue, 10 Mar 2026 17:08:43 +0100 Subject: [PATCH 2/2] fixed inconsitent docu --- Dockerfile_monitor | 2 +- README.md | 4 ++-- docs/DLOCKSS_PROTOCOL.md | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Dockerfile_monitor b/Dockerfile_monitor index fac52c6..082573a 100644 --- a/Dockerfile_monitor +++ b/Dockerfile_monitor @@ -21,6 +21,6 @@ ENV GODEBUG=madvdontneed=1 \ GOMEMLIMIT=512MiB \ GOGC=50 -EXPOSE 8000 +EXPOSE 8080 CMD ["dlockss-monitor"] diff --git a/README.md b/README.md index a51638f..aa4478b 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ services: restart: unless-stopped environment: DLOCKSS_IPFS_NODE: "/dns4/ipfs/tcp/5001" # "ipfs" resolves to the Kubo service below - DLOCKSS_DATA_DIR: "/data/ingest" # location that DLOCKSS monitors for igesting files + DLOCKSS_DATA_DIR: "/data/ingest" # location that D-LOCKSS monitors for ingesting files DLOCKSS_IPFS_CONFIG: "/ipfs-repo/config" # derive identity from IPFS node (shared peer ID) # DLOCKSS_NODE_NAME: my-node # human-readable name shown in the monitor; # # if empty the peer ID is displayed instead @@ -196,7 +196,7 @@ export DLOCKSS_MONITOR_GEOIP_DB=/path/to/GeoLite2-City.mmdb ``` Without a local database, the monitor falls back to the ip-api.com batch API with permanent caching. -The monitor bootstrap-subscribes to all shards up to depth 5 (63 shards) so it can see nodes even when started late. Set `DLOCKSS_MONITOR_BOOTSTRAP_SHARD_DEPTH` (0–12) to tune. +The monitor bootstrap-subscribes to all shards up to depth 6 (127 shards) so it can see nodes even when started late. Set `DLOCKSS_MONITOR_BOOTSTRAP_SHARD_DEPTH` (0–12) to tune. Alternatively use: https://dlockss-monitor.wmcloud.org. diff --git a/docs/DLOCKSS_PROTOCOL.md b/docs/DLOCKSS_PROTOCOL.md index 315db6c..b660bbc 100644 --- a/docs/DLOCKSS_PROTOCOL.md +++ b/docs/DLOCKSS_PROTOCOL.md @@ -243,7 +243,7 @@ services: volumes: - ipfs-data:/data/ipfs dlockss: - image: dlockss:latest + image: ghcr.io/gipplab/dlockss-single-node:latest depends_on: [ipfs] volumes: - ipfs-data:/ipfs-repo:ro