Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

sentinel cid implementation #113

Merged
merged 30 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f400c13
feat: sentinel cid caboose impl
AmeanAsad Jun 15, 2023
b373c32
enhancement: remove unused code
AmeanAsad Jun 15, 2023
a0c57e3
fix: caboose_test
AmeanAsad Jun 15, 2023
25e4809
fix: formatting
AmeanAsad Jun 15, 2023
1227a10
fix: go fmt checks
AmeanAsad Jun 15, 2023
2f2a92b
fix: math/rand deprecation
AmeanAsad Jun 15, 2023
8593539
enhancement: add jwt key to caboose config
AmeanAsad Jun 16, 2023
838e281
feat: move sentinel cid fetching
AmeanAsad Jun 18, 2023
a0115bb
add unit tests
AmeanAsad Jun 18, 2023
043e90e
feat: add tests for sentinel cids
AmeanAsad Jun 19, 2023
8f0dd8e
fix: go fmt
AmeanAsad Jun 19, 2023
509fe86
add unit tests
AmeanAsad Jun 19, 2023
d741115
remove println
AmeanAsad Jun 19, 2023
9e687a1
add fetch sentinel cid tests
AmeanAsad Jun 19, 2023
970df5c
clarify default value
AmeanAsad Jun 19, 2023
8610eea
Merge branch 'main' into aa/sentinel-cids
AmeanAsad Jun 28, 2023
13cf8e9
modify default sentinel cid period
AmeanAsad Jun 30, 2023
8779617
Merge branch 'aa/sentinel-cids' of github.com:filecoin-saturn/caboose…
AmeanAsad Jun 30, 2023
d597216
feat: add prom metrics for sentinel cid calls
AmeanAsad Jul 5, 2023
4296810
change sentinel cid req format
AmeanAsad Jul 5, 2023
44d5802
add sentinel cid logging
AmeanAsad Jul 5, 2023
c20f01c
add error handling for sentinel cid lookups
AmeanAsad Jul 5, 2023
7c552c0
go fmt fix
AmeanAsad Jul 5, 2023
67a80e1
add temporary verbose debugging
AmeanAsad Jul 6, 2023
6707f99
fix logging
AmeanAsad Jul 6, 2023
9e05dd2
remove unused auth
AmeanAsad Jul 7, 2023
e7643e9
formatting
AmeanAsad Jul 7, 2023
007d8d7
rename to compliance cids
AmeanAsad Jul 10, 2023
0a7da6e
update compliance cid period
AmeanAsad Jul 10, 2023
784216f
remove verbose logging
AmeanAsad Jul 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package caboose

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"

"github.com/filecoin-saturn/caboose/tieredhashing"
Expand All @@ -32,7 +32,7 @@ type Config struct {
// OrchestratorClient is the HTTP client to use when communicating with the Saturn orchestrator.
OrchestratorClient *http.Client
// OrchestratorOverride replaces calls to the orchestrator with a fixed response.
OrchestratorOverride []string
OrchestratorOverride []tieredhashing.NodeInfo

// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to our Saturn retrieval requests.
LoggingEndpoint url.URL
Expand Down Expand Up @@ -87,7 +87,7 @@ const DefaultMaxRetries = 3
const DefaultMirrorFraction = 0.1

const maxBlockSize = 4194305 // 4 Mib + 1 byte
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200"
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes?maxNodes=200"
const DefaultPoolRefreshInterval = 5 * time.Minute

// we cool off sending requests to Saturn for a cid for a certain duration
Expand Down Expand Up @@ -189,7 +189,13 @@ func NewCaboose(config *Config) (*Caboose, error) {
config.MirrorFraction = DefaultMirrorFraction
}
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
config.OrchestratorOverride = strings.Split(override, ",")
var overrideNodes []tieredhashing.NodeInfo
err := json.Unmarshal([]byte(override), &overrideNodes)
if err != nil {
goLogger.Warnf("Error parsing BackendOverrideKey:", "err", err)
return nil, err
}
config.OrchestratorOverride = overrideNodes
}

c := Caboose{
Expand Down
27 changes: 18 additions & 9 deletions caboose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"

"github.com/filecoin-saturn/caboose"
"github.com/filecoin-saturn/caboose/tieredhashing"
"github.com/ipfs/go-cid"
Expand All @@ -17,13 +26,6 @@ import (
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/require"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
)

func TestCidCoolDown(t *testing.T) {
Expand Down Expand Up @@ -212,11 +214,18 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt
ch := &CabooseHarness{}

ch.pool = make([]*ep, n)
purls := make([]string, n)
purls := make([]tieredhashing.NodeInfo, n)
for i := 0; i < len(ch.pool); i++ {
ch.pool[i] = &ep{}
ch.pool[i].Setup()
purls[i] = strings.TrimPrefix(ch.pool[i].server.URL, "https://")
ip := strings.TrimPrefix(ch.pool[i].server.URL, "https://")
purls[i] = tieredhashing.NodeInfo{
IP: ip,
ID: "node-id",
Weight: rand.Intn(100),
Distance: rand.Float32(),
SentinelCid: "sentinel-cid",
}
}
ch.goodOrch = true
orch := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
13 changes: 13 additions & 0 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package caboose

import (
"context"
"crypto/rand"
"errors"
"fmt"
"hash/crc32"
"io"
"math/big"
"net/http"
"os"
"strconv"
Expand All @@ -32,6 +34,7 @@ const (
saturnRetryAfterKey = "Retry-After"
resourceTypeCar = "car"
resourceTypeBlock = "block"
sentinelCidPeriod = 200
)

var (
Expand All @@ -48,6 +51,16 @@ var (
func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int) (b blocks.Block, rm tieredhashing.ResponseMetrics, e error) {
reqUrl := fmt.Sprintf(saturnReqTmpl, c)

rand, _ := rand.Int(rand.Reader, big.NewInt(sentinelCidPeriod))
AmeanAsad marked this conversation as resolved.
Show resolved Hide resolved
if rand == big.NewInt(1) {
sc, _ := p.th.GetSentinelCid(from)
if len(sc) > 0 {
sentinelCid, _ := cid.Decode(sc)
sentinelReqUrl := fmt.Sprintf(saturnReqTmpl, sentinelCid)
go p.fetchResource(ctx, from, sentinelReqUrl, "application/vnd.ipld.raw", attempt, func(rsrc string, r io.Reader) error { return nil })
}
}

rm, e = p.fetchResource(ctx, from, reqUrl, "application/vnd.ipld.raw", attempt, func(rsrc string, r io.Reader) error {
block, err := io.ReadAll(io.LimitReader(r, maxBlockSize))
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/asecurityteam/rolling v0.0.0-20230418204413-b4052899307d
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/google/uuid v1.3.0
github.com/ipfs/boxo v0.8.0-rc2.0.20230329082438-360b031ed895
github.com/ipfs/go-block-format v0.1.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE=
github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/gddo v0.0.0-20180823221919-9d8ff1c67be5 h1:yrv1uUvgXH/tEat+wdvJMRJ4g51GlIydtDpU9pFjaaI=
github.com/golang/gddo v0.0.0-20180823221919-9d8ff1c67be5/go.mod h1:xEhNfoBDX1hzLm2Nf80qUvZ2sVwoMZ8d6IE2SrsQfh4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
Expand Down
63 changes: 58 additions & 5 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"os"
"sync"
Expand All @@ -18,6 +19,8 @@ import (

"github.com/filecoin-saturn/caboose/tieredhashing"

"github.com/golang-jwt/jwt/v5"

"github.com/ipfs/boxo/path"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
Expand All @@ -28,26 +31,76 @@ const (
tierMainToUnknown = "main-to-unknown"
tierUnknownToMain = "unknown-to-main"
BackendOverrideKey = "CABOOSE_BACKEND_OVERRIDE"
CabooseJwtIssuer = "caboose-client"
)

// authenticateReq adds authentication to a request when a JWT_SECRET is present as an environment variable.
func authenticateReq(req *http.Request) (*http.Request, error) {

// Check for existense of an auth secret.
jwtKey := []byte(os.Getenv("JWT_SECRET"))
AmeanAsad marked this conversation as resolved.
Show resolved Hide resolved
if len(jwtKey) == 0 {
goLogger.Warnw("No JWT SECRET found")
return req, nil
}

claims := &jwt.MapClaims{
"ExpiresAt": time.Now().Add(10 * time.Minute).Unix(), // Token expires after 10 minutes
"Issuer": CabooseJwtIssuer,
}

token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
ss, err := token.SignedString(jwtKey)

if err != nil {
goLogger.Warnw("failed to generate JWT", "err", err)
return nil, err
}

req.Header.Add("Authorization", "Bearer "+ss)

return req, nil

}

// loadPool refreshes the set of Saturn endpoints in the pool by fetching an updated list of responsive Saturn nodes from the
// Saturn Orchestrator.
func (p *pool) loadPool() ([]string, error) {
func (p *pool) loadPool() ([]tieredhashing.NodeInfo, error) {

if p.config.OrchestratorOverride != nil {
return p.config.OrchestratorOverride, nil
}
resp, err := p.config.OrchestratorClient.Get(p.config.OrchestratorEndpoint.String())

client := p.config.OrchestratorClient

req, err := http.NewRequest("GET", p.config.OrchestratorEndpoint.String(), nil)

if err != nil {
goLogger.Warnw("failed to get backends from orchestrator", "err", err, "endpoint", p.config.OrchestratorEndpoint.String())
goLogger.Warnw("failed to create request to orchestrator", "err", err, "endpoint", p.config.OrchestratorEndpoint)
return nil, err
}

req, err = authenticateReq(req)

if err != nil {
goLogger.Warnw("failed to authenticate request to orchestrator", "err", err, "endpoint", p.config.OrchestratorEndpoint)
return nil, err
}

resp, err := client.Do(req)
if err != nil {
goLogger.Warnw("failed to get backends from orchestrator", "err", err, "endpoint", p.config.OrchestratorEndpoint)
return nil, err
}
defer resp.Body.Close()

responses := make([]string, 0)
responses := make([]tieredhashing.NodeInfo, 0)

if err := json.NewDecoder(resp.Body).Decode(&responses); err != nil {
goLogger.Warnw("failed to decode backends from orchestrator", "err", err, "endpoint", p.config.OrchestratorEndpoint.String())
return nil, err
}

goLogger.Infow("got backends from orchestrators", "cnt", len(responses), "endpoint", p.config.OrchestratorEndpoint.String())
return responses, nil
}
Expand Down Expand Up @@ -115,7 +168,7 @@ func (p *pool) doRefresh() {
}
}

func (p *pool) refreshWithNodes(newEP []string) {
func (p *pool) refreshWithNodes(newEP []tieredhashing.NodeInfo) {
p.lk.Lock()
defer p.lk.Unlock()

Expand Down
19 changes: 17 additions & 2 deletions pool_refresh_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package caboose

import (
"math/rand"
"testing"

"github.com/filecoin-saturn/caboose/tieredhashing"
"github.com/stretchr/testify/require"
"testing"
)

func TestPoolRefresh(t *testing.T) {
Expand Down Expand Up @@ -59,7 +61,20 @@ func TestPoolRefreshWithLatencyDistribution(t *testing.T) {
}

func andAndAssertPool(t *testing.T, p *pool, nodes []string, expectedMain, expectedUnknown, expectedTotal, expectedNew int) {
p.refreshWithNodes(nodes)

parsedNodes := make([]tieredhashing.NodeInfo, 0)

for _, n := range nodes {
parsedNodes = append(parsedNodes, tieredhashing.NodeInfo{
IP: n,
ID: n,
Weight: rand.Intn(100),
Distance: rand.Float32(),
SentinelCid: n,
})
}

p.refreshWithNodes(parsedNodes)
nds := p.th.GetPerf()
require.Equal(t, expectedTotal, len(nds))
mts := p.th.GetPoolMetrics()
Expand Down
17 changes: 16 additions & 1 deletion pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/tls"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -94,19 +95,33 @@ func TestPoolMiroring(t *testing.T) {
e.lk.Lock()
e.resp = carBytes.Bytes()
eURL := strings.TrimPrefix(e.server.URL, "https://")
eNodeInfo := tieredhashing.NodeInfo{
IP: eURL,
ID: eURL,
Weight: rand.Intn(100),
Distance: rand.Float32(),
SentinelCid: "node1",
}
e.lk.Unlock()

e2 := ep{}
e2.Setup()
e2.lk.Lock()
e2.resp = carBytes.Bytes()
e2URL := strings.TrimPrefix(e2.server.URL, "https://")
e2NodeInfo := tieredhashing.NodeInfo{
IP: e2URL,
ID: e2URL,
Weight: rand.Intn(100),
Distance: rand.Float32(),
SentinelCid: "node2",
}
e2.lk.Unlock()

conf := Config{
OrchestratorEndpoint: &url.URL{},
OrchestratorClient: http.DefaultClient,
OrchestratorOverride: []string{eURL, e2URL},
OrchestratorOverride: []tieredhashing.NodeInfo{eNodeInfo, e2NodeInfo},
LoggingEndpoint: url.URL{},
LoggingClient: http.DefaultClient,
LoggingInterval: time.Hour,
Expand Down
Loading