Skip to content

Commit

Permalink
feat: add state api
Browse files Browse the repository at this point in the history
  • Loading branch information
fearlessfe authored and GrapeBaBa committed Sep 12, 2024
1 parent 8dc76c1 commit bf40a75
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 6 deletions.
44 changes: 43 additions & 1 deletion cmd/shisui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/portalnetwork/beacon"
"github.com/ethereum/go-ethereum/portalnetwork/history"
"github.com/ethereum/go-ethereum/portalnetwork/state"
"github.com/ethereum/go-ethereum/portalnetwork/storage"
"github.com/ethereum/go-ethereum/portalnetwork/web3"
"github.com/ethereum/go-ethereum/rpc"
Expand Down Expand Up @@ -135,6 +136,13 @@ func startPortalRpcServer(config Config, conn discover.UDPConn, addr string) err
}
}

if slices.Contains(config.Networks, portalwire.State.Name()) {
err = initState(config, server, conn, localNode, discV5)
if err != nil {
return err
}
}

httpServer := &http.Server{
Addr: addr,
Handler: server,
Expand Down Expand Up @@ -167,14 +175,16 @@ func initDiscV5(config Config, conn discover.UDPConn) (*discover.UDPv5, *enode.L
}

func initHistory(config Config, server *rpc.Server, conn discover.UDPConn, localNode *enode.LocalNode, discV5 *discover.UDPv5) error {
db, err := history.NewDB(config.DataDir)
networkName := portalwire.History.Name()
db, err := history.NewDB(config.DataDir, networkName)
if err != nil {
return err
}
contentStorage, err := history.NewHistoryStorage(storage.PortalStorageConfig{
StorageCapacityMB: config.DataCapacity,
DB: db,
NodeId: localNode.ID(),
NetworkName: networkName,
})
if err != nil {
return err
Expand Down Expand Up @@ -239,6 +249,38 @@ func initBeacon(config Config, server *rpc.Server, conn discover.UDPConn, localN
return beaconNetwork.Start()
}

func initState(config Config, server *rpc.Server, conn discover.UDPConn, localNode *enode.LocalNode, discV5 *discover.UDPv5) error {
networkName := portalwire.State.Name()
db, err := history.NewDB(config.DataDir, networkName)
if err != nil {
return err
}
contentStorage, err := history.NewHistoryStorage(storage.PortalStorageConfig{
StorageCapacityMB: config.DataCapacity,
DB: db,
NodeId: localNode.ID(),
NetworkName: networkName,
})
if err != nil {
return err
}
contentQueue := make(chan *discover.ContentElement, 50)

protocol, err := discover.NewPortalProtocol(config.Protocol, portalwire.State, config.PrivateKey, conn, localNode, discV5, contentStorage, contentQueue)

if err != nil {
return err
}
api := discover.NewPortalAPI(protocol)
stateNetworkAPI := state.NewStateNetworkAPI(api)
err = server.RegisterName("portal", stateNetworkAPI)
if err != nil {
return err
}
historyNetwork := state.NewStateNetwork(protocol)
return historyNetwork.Start()
}

func getPortalConfig(ctx *cli.Context) (*Config, error) {
config := &Config{
Protocol: discover.DefaultPortalProtocolConfig(),
Expand Down
9 changes: 5 additions & 4 deletions portalnetwork/history/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"database/sql"
"errors"
"fmt"
"math/big"
"os"
"path"
Expand Down Expand Up @@ -73,8 +74,8 @@ func greater(a, b []byte) int {
return bytes.Compare(a, b)
}

func NewDB(dataDir string) (*sql.DB, error) {
dbPath := path.Join(dataDir, "history")
func NewDB(dataDir string, network string) (*sql.DB, error) {
dbPath := path.Join(dataDir, network)
err := os.MkdirAll(dbPath, 0755)
if err != nil {
return nil, err
Expand All @@ -93,7 +94,7 @@ func NewDB(dataDir string) (*sql.DB, error) {
},
})
})
sqlDb, err := sql.Open("sqlite3_custom", path.Join(dbPath, sqliteName))
sqlDb, err := sql.Open("sqlite3_custom", path.Join(dbPath, fmt.Sprintf("%s.sqlite", network)))
return sqlDb, err
}

Expand All @@ -102,7 +103,7 @@ func NewHistoryStorage(config storage.PortalStorageConfig) (storage.ContentStora
nodeId: config.NodeId,
sqliteDB: config.DB,
storageCapacityInBytes: config.StorageCapacityMB * 1000000,
log: log.New("history_storage"),
log: log.New("storage", config.NetworkName),
}
hs.radius.Store(storage.MaxDistance)
err := hs.createTable()
Expand Down
2 changes: 1 addition & 1 deletion portalnetwork/history/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func genBytes(length int) []byte {
}

func newContentStorage(storageCapacityInMB uint64, nodeId enode.ID, nodeDataDir string) (*ContentStorage, error) {
db, err := NewDB(nodeDataDir)
db, err := NewDB(nodeDataDir, "history")
if err != nil {
return nil, err
}
Expand Down
75 changes: 75 additions & 0 deletions portalnetwork/state/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package state

import (
"github.com/ethereum/go-ethereum/p2p/discover"
)

type API struct {
*discover.PortalProtocolAPI
}

func (p *API) StateRoutingTableInfo() *discover.RoutingTableInfo {
return p.RoutingTableInfo()
}

func (p *API) StateAddEnr(enr string) (bool, error) {
return p.AddEnr(enr)
}

func (p *API) StateGetEnr(nodeId string) (string, error) {
return p.GetEnr(nodeId)
}

func (p *API) StateDeleteEnr(nodeId string) (bool, error) {
return p.DeleteEnr(nodeId)
}

func (p *API) StateLookupEnr(nodeId string) (string, error) {
return p.LookupEnr(nodeId)
}

func (p *API) StatePing(enr string) (*discover.PortalPongResp, error) {
return p.Ping(enr)
}

func (p *API) StateFindNodes(enr string, distances []uint) ([]string, error) {
return p.FindNodes(enr, distances)
}

func (p *API) StateFindContent(enr string, contentKey string) (interface{}, error) {
return p.FindContent(enr, contentKey)
}

func (p *API) StateOffer(enr string, contentKey string, contentValue string) (string, error) {
return p.Offer(enr, contentKey, contentValue)
}

func (p *API) StateRecursiveFindNodes(nodeId string) ([]string, error) {
return p.RecursiveFindNodes(nodeId)
}

func (p *API) StateRecursiveFindContent(contentKeyHex string) (*discover.ContentInfo, error) {
return p.RecursiveFindContent(contentKeyHex)
}

func (p *API) StateLocalContent(contentKeyHex string) (string, error) {
return p.LocalContent(contentKeyHex)
}

func (p *API) StateStore(contentKeyHex string, contextHex string) (bool, error) {
return p.Store(contentKeyHex, contextHex)
}

func (p *API) StateGossip(contentKeyHex, contentHex string) (int, error) {
return p.Gossip(contentKeyHex, contentHex)
}

func (p *API) StateTraceRecursiveFindContent(contentKeyHex string) (*discover.TraceContentResult, error) {
return p.TraceRecursiveFindContent(contentKeyHex)
}

func NewStateNetworkAPI(portalProtocolAPI *discover.PortalProtocolAPI) *API {
return &API{
portalProtocolAPI,
}
}
77 changes: 77 additions & 0 deletions portalnetwork/state/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package state

import (
"context"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
)

type StateNetwork struct {
portalProtocol *discover.PortalProtocol
closeCtx context.Context
closeFunc context.CancelFunc
log log.Logger
}

func NewStateNetwork(portalProtocol *discover.PortalProtocol) *StateNetwork {
ctx, cancel := context.WithCancel(context.Background())

return &StateNetwork{
portalProtocol: portalProtocol,
closeCtx: ctx,
closeFunc: cancel,
log: log.New("sub-protocol", "state"),
}
}

func (h *StateNetwork) Start() error {
err := h.portalProtocol.Start()
if err != nil {
return err
}
go h.processContentLoop(h.closeCtx)
h.log.Debug("state network start successfully")
return nil
}

func (h *StateNetwork) Stop() {
h.closeFunc()
h.portalProtocol.Stop()
}

func (h *StateNetwork) processContentLoop(ctx context.Context) {
contentChan := h.portalProtocol.GetContent()
for {
select {
case <-ctx.Done():
return
case contentElement := <-contentChan:
err := h.validateContents(contentElement.ContentKeys, contentElement.Contents)
if err != nil {
h.log.Error("validate content failed", "err", err)
continue
}

go func(ctx context.Context) {
select {
case <-ctx.Done():
return
default:
var gossippedNum int
gossippedNum, err = h.portalProtocol.Gossip(&contentElement.Node, contentElement.ContentKeys, contentElement.Contents)
h.log.Trace("gossippedNum", "gossippedNum", gossippedNum)
if err != nil {
h.log.Error("gossip failed", "err", err)
return
}
}
}(ctx)
}
}
}

func (h *StateNetwork) validateContents(contentKeys [][]byte, contents [][]byte) error {
// TODO
panic("implement me")
}
1 change: 1 addition & 0 deletions portalnetwork/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ type PortalStorageConfig struct {
DB *sql.DB
NodeId enode.ID
Spec *common.Spec
NetworkName string
}

0 comments on commit bf40a75

Please sign in to comment.