-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathlinksystem.go
218 lines (194 loc) · 8.11 KB
/
linksystem.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
package engine
import (
"bytes"
"errors"
"io"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagjson"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipni/go-libipni/ingest/schema"
provider "github.com/ipni/index-provider"
"github.com/libp2p/go-libp2p/core/peer"
)
var (
errNoEntries = errors.New("no entries; see schema.NoEntries")
// ErrEntriesLinkMismatch signals that the link generated from chunking the mulithashes returned by provider.MultihashLister does not match the previously generated link. This error is most likely caused by the lister returning inconsistent multihashes for the same key.
ErrEntriesLinkMismatch = errors.New("regenerated link from multihash lister did not match the original link; multihashes returned by the lister for the same key are not consistent")
)
// Creates the main engine linksystem.
func (e *Engine) mkLinkSystem() ipld.LinkSystem {
lsys := cidlink.DefaultLinkSystem()
storageReadOpener := func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
// If link corresponds to schema.NoEntries return error immediately.
if lnk == schema.NoEntries {
return nil, errNoEntries
}
ctx := lctx.Ctx
c := lnk.(cidlink.Link).Cid
log.Debugf("Triggered ReadOpener from engine's linksystem with cid (%s)", c)
// Get the node from main datastore. If it is in the
// main datastore it means it is an advertisement.
val, err := e.ds.Get(ctx, datastore.NewKey(c.String()))
if err != nil && err != datastore.ErrNotFound {
log.Errorf("Error getting object from datastore in linksystem: %s", err)
return nil, err
}
// If data was retrieved from the datastore, this may be an advertisement.
if len(val) != 0 {
// Decode the node to check its type to see if it is an Advertisement.
n, err := decodeIPLDNode(bytes.NewBuffer(val))
if err != nil {
log.Errorf("Could not decode IPLD node for potential advertisement: %s", err)
return nil, err
}
// If this was an advertisement, then return it.
if isAdvertisement(n) {
log.Debugw("Retrieved advertisement from datastore", "cid", c, "size", len(val))
return bytes.NewBuffer(val), nil
}
log.Debugw("Retrieved non-advertisement object from datastore", "cid", c, "size", len(val))
}
// Not an advertisement, so this means we are receiving ingestion data.
// If no lister registered return error
if e.mhLister == nil {
log.Error("No multihash lister has been registered in engine")
return nil, provider.ErrNoMultihashLister
}
log.Debugw("Checking cache for data", "cid", c)
// Check if the key is already cached.
b, err := e.entriesChunker.GetRawCachedChunk(ctx, lnk)
if err != nil {
log.Errorf("Error fetching cached list for Cid (%s): %s", c, err)
return nil, err
}
// If we don't have the link, generate the linked list of entries in
// cache so it is ready to serve for this and future ingestion.
//
// The reason for caching this is because the indexer requests each
// chunk entry, and a specific subset of entries cannot be read from a
// car. So all entry chunks are kept in cache to serve to the indexer.
// The cache uses the entry chunk CID as a key that maps to the entry
// chunk data.
if b == nil {
log.Infow("Entry for CID is not cached, generating chunks", "cid", c)
// If the link is not found, it means that the root link of the list has
// not been generated and we need to get the relationship between the cid
// received and the contextID so the lister knows how to
// regenerate the list of CIDs. It's enough to fetch *any* provider's mapping
// as same entries from different providers would result into the same chunks
key, err := e.getCidKeyMap(ctx, c)
if err != nil {
if errors.Is(err, datastore.ErrNotFound) {
log.Error("No mapping between CID and contextID to provider identity found. Treating ad as skippable.")
// We have to return ipld.ErrNotExists because the version of storetheindex Boost is using depends
// on the old HTTP publisher that only treats this error as 404.
return nil, ipld.ErrNotExists{}
}
log.Errorf("Error fetching relationship between CID and contextID: %s", err)
return nil, err
}
// Get the car iterator needed to create the entry chunks.
// Normally for removal this is not needed since the indexer
// deletes all indexes for the contextID in the removal
// advertisement. Only if the removal had no contextID would the
// indexer ask for entry chunks to remove.
provider, err := peer.IDFromBytes(key.Provider)
if err != nil {
return nil, err
}
mhIter, err := e.mhLister(ctx, provider, key.ContextID)
if err != nil {
return nil, err
}
// Store the linked list entries in cache as we generate them. We
// use the cache linksystem that stores entries in an in-memory
// datastore.
regeneratedLink, err := e.entriesChunker.Chunk(ctx, mhIter)
if err != nil {
log.Errorf("Error generating linked list from multihash lister: %s", err)
return nil, err
}
if regeneratedLink == nil || !c.Equals(regeneratedLink.(cidlink.Link).Cid) {
log.Errorw("Regeneration of entries link from multihash iterator did not match the original link. Check that multihash iterator consistently returns the same entries for the same key.", "want", lnk, "got", regeneratedLink)
return nil, ErrEntriesLinkMismatch
}
} else {
log.Debugw("Found cache entry for CID", "cid", c)
}
// Return the linked list node.
val, err = e.entriesChunker.GetRawCachedChunk(ctx, lnk)
if err != nil {
log.Errorf("Error fetching cached list for CID (%s): %s", c, err)
return nil, err
}
// If no value was populated it means that nothing was found
// in the multiple datastores.
if len(val) == 0 {
log.Errorf("No object found in linksystem for CID (%s)", c)
return nil, datastore.ErrNotFound
}
return bytes.NewBuffer(val), nil
}
// If error hook provided, call error hook function on storageReadOpener
// error. Otherwise, only call storageReadOpener.
if e.options.storageReadOpenerErrorHook != nil {
lsys.StorageReadOpener = func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
r, err := storageReadOpener(lctx, lnk)
if err != nil {
return r, e.options.storageReadOpenerErrorHook(lctx, lnk, err)
}
return r, nil
}
} else {
lsys.StorageReadOpener = storageReadOpener
}
lsys.StorageWriteOpener = func(lctx ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) {
buf := bytes.NewBuffer(nil)
return buf, func(lnk ipld.Link) error {
c := lnk.(cidlink.Link).Cid
return e.ds.Put(lctx.Ctx, datastore.NewKey(c.String()), buf.Bytes())
}, nil
}
return lsys
}
// vanillaLinkSystem plainly loads and stores from engine datastore.
//
// This is used to plainly load and store links without the complex
// logic of the main linksystem. This is mainly used to retrieve
// stored advertisements through the link from the main blockstore.
func (e *Engine) vanillaLinkSystem() ipld.LinkSystem {
lsys := cidlink.DefaultLinkSystem()
lsys.StorageReadOpener = func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
c := lnk.(cidlink.Link).Cid
val, err := e.ds.Get(lctx.Ctx, datastore.NewKey(c.String()))
if err != nil {
return nil, err
}
return bytes.NewBuffer(val), nil
}
lsys.StorageWriteOpener = func(lctx ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) {
buf := bytes.NewBuffer(nil)
return buf, func(lnk ipld.Link) error {
c := lnk.(cidlink.Link).Cid
return e.ds.Put(lctx.Ctx, datastore.NewKey(c.String()), buf.Bytes())
}, nil
}
return lsys
}
// decodeIPLDNode reads the content of the given reader fully as an IPLD node.
func decodeIPLDNode(r io.Reader) (ipld.Node, error) {
nb := basicnode.Prototype.Any.NewBuilder()
err := dagjson.Decode(nb, r)
if err != nil {
return nil, err
}
return nb.Build(), nil
}
// isAdvertisement loosely checks if an IPLD node is an advertisement or an index.
// This is done simply by checking if `Signature` filed is present.
func isAdvertisement(n ipld.Node) bool {
indexID, _ := n.LookupByString("Signature")
return indexID != nil
}