This repository has been archived by the owner on Dec 9, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdht.go
139 lines (125 loc) · 2.98 KB
/
dht.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package dione
import (
ctx "context"
"errors"
"fmt"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
dht "github.com/libp2p/go-libp2p-kad-dht"
record "github.com/libp2p/go-libp2p-record"
record_pb "github.com/libp2p/go-libp2p-record/pb"
)
type CustomValidator struct{}
func (v CustomValidator) Validate(key string, _ []byte) error {
fmt.Printf("Validating %v\n", key)
return nil
}
func (v CustomValidator) Select(_ string, values [][]byte) (int, error) {
if len(values) == 0 {
return 0, errors.New("zero values, can't unwrap when only zero values are found")
} else {
return 0, nil
}
}
func newDht(h host.Host) *dht.IpfsDHT {
ret, err := dht.New(ctx.TODO(), h,
dht.NamespacedValidator("kad-m", CustomValidator{}),
dht.ProtocolPrefix("dione"),
dht.ProtocolExtension("kad"),
dht.Mode(dht.ModeServer),
)
if err != nil {
panic(err)
}
return ret
}
type GeneralCommand struct {
closest *ClosestCommand
put *PutCommand
get *GetCommand
}
type ClosestCommand struct {
key string
response chan<- []peer.ID
}
func newClosestCommand(key string) (ClosestCommand, chan []peer.ID) {
ret := new(ClosestCommand)
ret.key = key
response := make(chan []peer.ID, 1)
ret.response = response
return *ret, response
}
type PutCommand struct {
key string
value []byte
}
func newPutCommand(key string, value []byte) PutCommand {
ret := new(PutCommand)
ret.key = key
ret.value = value
return *ret
}
type GetCommand struct {
key string
response chan<- []byte
}
func newGetCommand(key string) (GetCommand, chan []byte) {
ret := new(GetCommand)
ret.key = key
response := make(chan []byte, 1)
ret.response = response
return *ret, response
}
type dhtHandler struct {
d *dht.IpfsDHT
inputs <-chan GeneralCommand
}
func newDhtHandler(d *dht.IpfsDHT) (dhtHandler, chan<- GeneralCommand) {
ret := dhtHandler{}
ret.d = d
channel := make(chan GeneralCommand, 10)
ret.inputs = channel
return ret, channel
}
func (dhtHand dhtHandler) handle() {
for command := range dhtHand.inputs {
if command.closest != nil {
responseChannel := command.closest.response
keyString := command.closest.key
peers, err := dhtHand.d.GetClosestPeers(ctx.TODO(), keyString)
if err != nil {
response := make([]peer.ID, 0)
responseChannel <- response
} else {
responseChannel <- peers
}
}
if command.put != nil {
rawKey := command.put.key
key := fmt.Sprintf("/kad-m/%v", rawKey)
rec := record.MakePutRecord(key, command.put.value)
data, err := rec.Marshal()
if err != nil {
panic(err)
}
err = dhtHand.d.PutValue(ctx.TODO(), key, data)
if err != nil {
panic(err)
}
}
if command.get != nil {
rawKey := command.get.key
key := fmt.Sprintf("/kad-m/%v", rawKey)
rawData, err := dhtHand.d.GetValue(ctx.TODO(), key)
if err != nil {
panic(err)
}
rec := new(record_pb.Record)
err = rec.Unmarshal(rawData)
if err != nil {
panic(err)
}
command.get.response <- rec.GetValue()
}
}
}