Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ See [docs/DLOCKSS_PROTOCOL.md](docs/DLOCKSS_PROTOCOL.md) for protocol details.
D-LOCKSS acts as a self-healing, sharded storage cluster using the IPFS/Libp2p stack.

### Key Components
1. **Shard Manager:** Dynamically splits responsibilities based on peer count to maintain scalability.
1. **Shard Manager:** Dynamically splits responsibilities based on peer count to maintain scalability. Delegates lifecycle decisions (split/merge/discovery) to a `lifecycleManager` and replication to a `replicationManager`.
2. **Cluster Manager:** Manages embedded **IPFS Cluster** instances (one per shard) using **CRDTs** for state consensus; nodes in a shard sync and pin content assigned to that shard.
3. **File Watcher:** Monitors the data directory to automatically ingest content.
3. **File Watcher:** Monitors the data directory to automatically ingest content (via `handleWatcherEvent` / `handleNewDirectory`).
4. **Storage Monitor:** Protects nodes from disk exhaustion by rejecting custodial requests when full.
5. **BadBits Manager:** Enforces content blocking (e.g., DMCA) based on configured country codes.

Expand Down Expand Up @@ -186,14 +186,6 @@ go build -o dlockss-monitor ./cmd/dlockss-monitor
```
Open http://localhost:8080. The monitor displays each node's **name** (if configured via `DLOCKSS_NODE_NAME`), falling back to the Peer ID. Names propagate via HEARTBEAT/JOIN messages and appear in the node table, charts, and shard modals. Client-side aliases (EDIT button) override server-side names. Each node has **one peer ID**: when `DLOCKSS_IPFS_CONFIG` is set (e.g. in testnet), D-LOCKSS uses the IPFS repo identity so the same ID appears in the monitor and in `node_x.ipfs.log`.

For geographic region display, optionally provide a GeoIP database:
```bash
./dlockss-monitor --geoip-db /path/to/GeoLite2-City.mmdb
# or via environment variable:
export DLOCKSS_MONITOR_GEOIP_DB=/path/to/GeoLite2-City.mmdb
```
Without a local database, the monitor falls back to the ip-api.com batch API with permanent caching.

The monitor bootstrap-subscribes to all shards up to depth 6 (127 shards) so it can see nodes even when started late. Set `DLOCKSS_MONITOR_BOOTSTRAP_SHARD_DEPTH` (0–12) to tune.

Alternatively use: https://dlockss-monitor.wmcloud.org.
Expand All @@ -207,7 +199,7 @@ go test ./... -v
```

### Project Status
* **Current Phase:** Production — active refactoring for code quality and operational robustness (see [Code Elegance Plan](docs/CODE_ELEGANCE_PLAN.md)).
* **Current Phase:** Production — structural refactoring complete (see [Code Elegance Plan](docs/CODE_ELEGANCE_PLAN.md)). Config uses nested sub-structs (`Sharding`, `Replication`, `Files`, `Security`, `Orphan`). ShardManager delegates to `replicationManager` and `lifecycleManager`.

---

Expand Down
10 changes: 1 addition & 9 deletions cmd/dlockss-monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"log"
"log/slog"
Expand All @@ -21,9 +20,6 @@ import (
func main() {
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})))

geoipDB := flag.String("geoip-db", "", "Path to a MaxMind/DB-IP .mmdb GeoIP database file")
flag.Parse()

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()

Expand All @@ -49,11 +45,7 @@ func main() {
slog.Info("topic name from env", "topic", cfg.TopicName)
}

geoDBPath := *geoipDB
if geoDBPath == "" {
geoDBPath = os.Getenv("DLOCKSS_MONITOR_GEOIP_DB")
}
m := monitor.NewMonitor(cfg, geoDBPath)
m := monitor.NewMonitor(cfg)
defer m.Close()

h, err := monitor.StartLibP2P(ctx, m)
Expand Down
26 changes: 7 additions & 19 deletions cmd/dlockss/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"dlockss/internal/managers/shard"
"dlockss/internal/managers/storage"
"dlockss/internal/signing"
"dlockss/internal/telemetry"
"dlockss/internal/trust"
"dlockss/pkg/ipfs"
"dlockss/pkg/schema"
Expand Down Expand Up @@ -176,8 +175,8 @@ func main() {
go discovery.RunPeerFinder(ctx, h, routingDiscovery, cfg.DiscoveryServiceTag)

// Trust (optional: load peers if file exists)
trustMgr := trust.NewTrustManager(cfg.TrustMode)
if err := trustMgr.LoadTrustedPeers(cfg.TrustStorePath); err != nil && !os.IsNotExist(err) {
trustMgr := trust.NewTrustManager(cfg.Security.TrustMode)
if err := trustMgr.LoadTrustedPeers(cfg.Security.TrustStorePath); err != nil && !os.IsNotExist(err) {
slog.Warn("trust store load failed", "error", err)
}

Expand All @@ -191,8 +190,7 @@ func main() {
defer dstore.Close()

rateLimiter := common.NewRateLimiter(cfg.RateLimitWindow, cfg.MaxMessagesPerWindow)
metrics := telemetry.NewMetricsManager(cfg)
storageMgr := storage.NewStorageManager(cfg, dht, metrics, badBitsFilter)
storageMgr := storage.NewStorageManager(cfg, dht, badBitsFilter)
signer := signing.NewSigner(signing.SignerConfig{
Cfg: cfg,
Host: h,
Expand All @@ -213,7 +211,7 @@ func main() {
}
// Provide manifest in its own goroutine with its own timeout.
go func() {
pctx, pcancel := context.WithTimeout(ctx, cfg.DHTProvideTimeout)
pctx, pcancel := context.WithTimeout(ctx, cfg.Files.DHTProvideTimeout)
defer pcancel()
storageMgr.ProvideFile(pctx, manifestCIDStr)
}()
Expand All @@ -224,7 +222,7 @@ func main() {
// this call adds the missing pin entry. Blocks are already local
// from the manifest's recursive pin so this returns quickly.
go func() {
pctx, pcancel := context.WithTimeout(ctx, cfg.DHTProvideTimeout)
pctx, pcancel := context.WithTimeout(ctx, cfg.Files.DHTProvideTimeout)
defer pcancel()
manifestCID, err := cid.Decode(manifestCIDStr)
if err != nil {
Expand Down Expand Up @@ -274,7 +272,6 @@ func main() {
PubSub: ps,
IPFSClient: ipfsClient,
Storage: storageMgr,
Metrics: metrics,
Signer: signer,
RateLimiter: rateLimiter,
Cluster: clusterMgr,
Expand All @@ -286,17 +283,7 @@ func main() {
clusterMgr.SetShardPeerProvider(shardMgr) // CRDT Peers() and allocations use real shard membership
announcePinned = shardMgr.AnnouncePinned

metrics.RegisterProviders(shardMgr, storageMgr, rateLimiter)
metrics.RegisterClusterProvider(clusterMgr) // cluster-style metrics: pins/peers/allocations per shard
metrics.SetPeerID(h.ID().String())

// Telemetry and API
tc := telemetry.NewTelemetryClient(cfg, h, ps, metrics)
if tc != nil {
tc.SetShardPublisher(shardMgr, shardMgr)
tc.Start(ctx)
}
apiServer := api.NewAPIServer(cfg.APIPort, metrics)
apiServer := api.NewAPIServer(cfg.APIPort)
apiServer.Start()

// File processor and watcher
Expand Down Expand Up @@ -330,6 +317,7 @@ func main() {

<-ctx.Done()
slog.Info("shutting down")
fp.Stop()
if err := shardMgr.Close(); err != nil {
slog.Error("shard manager close error", "error", err)
}
Expand Down
11 changes: 7 additions & 4 deletions docs/DLOCKSS_PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,18 @@ Nodes in any shard (not just root) periodically run discovery to join existing d

### 6.2 Replication & Repair

Replication is handled by the **Cluster Manager** and **LocalPinTracker** per shard:
Replication is handled by the **Cluster Manager**, **LocalPinTracker**, and **replicationManager** (a delegate of ShardManager):

1. **Pinning**: When a responsible node ingests or accepts a file, it calls `ClusterManager.Pin(ctx, shardID, cid, ...)` on the shard's embedded cluster. Allocations are chosen deterministically from Peers() (shard mesh via ShardPeerProvider).
2. **State Sync**: The CRDT (Merkle-DAG based) propagates pin/unpin to all peers in the shard via PubSub (`dlockss-shard-<id>`).
3. **Local Pin Tracker**:
* Each node runs a `LocalPinTracker` per shard that polls CRDT State() (and on TriggerSync).
* For each pin in state, if this node is in **Allocations** (or Allocations is empty), it pins the ManifestCID locally via IPFS. The **onPinSynced** callback then: (a) registers the file with StorageManager, (b) announces PINNED, (c) resolves the PayloadCID from the manifest, (d) **pins the PayloadCID as its own root** so Kubo's reprovider (`pinned` strategy) re-announces it, and (e) provides both ManifestCID and PayloadCID to the DHT. On the ingesting node the payload is already a pin root from `ImportFile`; on replicas only the ManifestCID was pinned, so step (d) adds the missing pin entry — blocks are already local from the manifest's recursive pin so this returns quickly.
* Pins no longer in state or no longer allocated are unpinned locally and onPinRemoved is called.
4. **Repair**: Under-replicated files trigger ReplicationRequest on the shard topic; peers that have the file JoinShard(targetShard), Pin, TriggerSync. CRDT sync and LocalPinTracker then replicate to allocated peers.
4. **Repair (replicationManager)**: The `replicationManager` (in `shard_replication.go`) periodically broadcasts `ReplicationRequest` messages for all pinned manifests (cooldown: 5 minutes per manifest). Receiving peers handle requests as follows:
* **Already pinned**: Ensure cluster membership and trigger CRDT sync.
* **Not pinned (auto-replication)**: Fetch the manifest via `PinRecursive` (timeout: 5 minutes), add to cluster via `ClusterPinIfAbsent`, and trigger sync. Concurrency is bounded by a semaphore (default: 5 concurrent auto-replications).
* Legacy manifests (containing a `ts` field) are silently ignored.

### 6.2.1 Heartbeat-Driven Re-Pin and Re-Provide

Expand Down Expand Up @@ -260,8 +263,8 @@ volumes:

### 7.2 Integrity & Authenticity
* **Content Addressing**: CIDs guarantee content integrity.
* **Signatures**: All `ResearchObjects` and protocol messages are signed by the sender's private key.
* **Nonces**: Protocol messages include nonces to prevent replay attacks.
* **Signatures**: All `ResearchObjects` and protocol messages are signed by the sender's private key. The `Signer` type (in `internal/signing/`) handles signing and verification, with `verifySignedMessage` decomposed into focused steps: field validation (`validateMessageFields`), timestamp checking (`checkTimestamp`), public key retrieval (`fetchPublicKey`), and cryptographic verification (`verifySignatureBytes`).
* **Nonces**: Protocol messages include nonces (generated by the signing package's internal `newNonce` function) to prevent replay attacks. A `nonceStore` tracks recently seen nonces.

### 7.3 Liar Detection
* Nodes verify that the actual file size matches the `TotalSize` claimed in the `ResearchObject` manifest.
Expand Down
Loading
Loading