From bf40a752b16bc1b2dadb5a805b0a6806d3322410 Mon Sep 17 00:00:00 2001 From: fearlessfe <505380967@qq.com> Date: Thu, 12 Sep 2024 08:11:51 +0800 Subject: [PATCH] feat: add state api --- cmd/shisui/main.go | 44 ++++++++++++++- portalnetwork/history/storage.go | 9 ++-- portalnetwork/history/storage_test.go | 2 +- portalnetwork/state/api.go | 75 ++++++++++++++++++++++++++ portalnetwork/state/network.go | 77 +++++++++++++++++++++++++++ portalnetwork/storage/config.go | 1 + 6 files changed, 202 insertions(+), 6 deletions(-) create mode 100644 portalnetwork/state/api.go create mode 100644 portalnetwork/state/network.go diff --git a/cmd/shisui/main.go b/cmd/shisui/main.go index 0c5c3c825b87..d77a0ec6da61 100644 --- a/cmd/shisui/main.go +++ b/cmd/shisui/main.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/portalnetwork/beacon" "github.com/ethereum/go-ethereum/portalnetwork/history" + "github.com/ethereum/go-ethereum/portalnetwork/state" "github.com/ethereum/go-ethereum/portalnetwork/storage" "github.com/ethereum/go-ethereum/portalnetwork/web3" "github.com/ethereum/go-ethereum/rpc" @@ -135,6 +136,13 @@ func startPortalRpcServer(config Config, conn discover.UDPConn, addr string) err } } + if slices.Contains(config.Networks, portalwire.State.Name()) { + err = initState(config, server, conn, localNode, discV5) + if err != nil { + return err + } + } + httpServer := &http.Server{ Addr: addr, Handler: server, @@ -167,7 +175,8 @@ func initDiscV5(config Config, conn discover.UDPConn) (*discover.UDPv5, *enode.L } func initHistory(config Config, server *rpc.Server, conn discover.UDPConn, localNode *enode.LocalNode, discV5 *discover.UDPv5) error { - db, err := history.NewDB(config.DataDir) + networkName := portalwire.History.Name() + db, err := history.NewDB(config.DataDir, networkName) if err != nil { return err } @@ -175,6 +184,7 @@ func initHistory(config Config, server *rpc.Server, conn discover.UDPConn, local StorageCapacityMB: config.DataCapacity, DB: db, NodeId: localNode.ID(), + NetworkName: networkName, }) if err != nil { return err @@ -239,6 +249,38 @@ func initBeacon(config Config, server *rpc.Server, conn discover.UDPConn, localN return beaconNetwork.Start() } +func initState(config Config, server *rpc.Server, conn discover.UDPConn, localNode *enode.LocalNode, discV5 *discover.UDPv5) error { + networkName := portalwire.State.Name() + db, err := history.NewDB(config.DataDir, networkName) + if err != nil { + return err + } + contentStorage, err := history.NewHistoryStorage(storage.PortalStorageConfig{ + StorageCapacityMB: config.DataCapacity, + DB: db, + NodeId: localNode.ID(), + NetworkName: networkName, + }) + if err != nil { + return err + } + contentQueue := make(chan *discover.ContentElement, 50) + + protocol, err := discover.NewPortalProtocol(config.Protocol, portalwire.State, config.PrivateKey, conn, localNode, discV5, contentStorage, contentQueue) + + if err != nil { + return err + } + api := discover.NewPortalAPI(protocol) + stateNetworkAPI := state.NewStateNetworkAPI(api) + err = server.RegisterName("portal", stateNetworkAPI) + if err != nil { + return err + } + historyNetwork := state.NewStateNetwork(protocol) + return historyNetwork.Start() +} + func getPortalConfig(ctx *cli.Context) (*Config, error) { config := &Config{ Protocol: discover.DefaultPortalProtocolConfig(), diff --git a/portalnetwork/history/storage.go b/portalnetwork/history/storage.go index 3c440a8ca058..70ccadfc1109 100644 --- a/portalnetwork/history/storage.go +++ b/portalnetwork/history/storage.go @@ -4,6 +4,7 @@ import ( "bytes" "database/sql" "errors" + "fmt" "math/big" "os" "path" @@ -73,8 +74,8 @@ func greater(a, b []byte) int { return bytes.Compare(a, b) } -func NewDB(dataDir string) (*sql.DB, error) { - dbPath := path.Join(dataDir, "history") +func NewDB(dataDir string, network string) (*sql.DB, error) { + dbPath := path.Join(dataDir, network) err := os.MkdirAll(dbPath, 0755) if err != nil { return nil, err @@ -93,7 +94,7 @@ func NewDB(dataDir string) (*sql.DB, error) { }, }) }) - sqlDb, err := sql.Open("sqlite3_custom", path.Join(dbPath, sqliteName)) + sqlDb, err := sql.Open("sqlite3_custom", path.Join(dbPath, fmt.Sprintf("%s.sqlite", network))) return sqlDb, err } @@ -102,7 +103,7 @@ func NewHistoryStorage(config storage.PortalStorageConfig) (storage.ContentStora nodeId: config.NodeId, sqliteDB: config.DB, storageCapacityInBytes: config.StorageCapacityMB * 1000000, - log: log.New("history_storage"), + log: log.New("storage", config.NetworkName), } hs.radius.Store(storage.MaxDistance) err := hs.createTable() diff --git a/portalnetwork/history/storage_test.go b/portalnetwork/history/storage_test.go index a5441dce67b7..a725b472b133 100644 --- a/portalnetwork/history/storage_test.go +++ b/portalnetwork/history/storage_test.go @@ -28,7 +28,7 @@ func genBytes(length int) []byte { } func newContentStorage(storageCapacityInMB uint64, nodeId enode.ID, nodeDataDir string) (*ContentStorage, error) { - db, err := NewDB(nodeDataDir) + db, err := NewDB(nodeDataDir, "history") if err != nil { return nil, err } diff --git a/portalnetwork/state/api.go b/portalnetwork/state/api.go new file mode 100644 index 000000000000..a09f88d595a8 --- /dev/null +++ b/portalnetwork/state/api.go @@ -0,0 +1,75 @@ +package state + +import ( + "github.com/ethereum/go-ethereum/p2p/discover" +) + +type API struct { + *discover.PortalProtocolAPI +} + +func (p *API) StateRoutingTableInfo() *discover.RoutingTableInfo { + return p.RoutingTableInfo() +} + +func (p *API) StateAddEnr(enr string) (bool, error) { + return p.AddEnr(enr) +} + +func (p *API) StateGetEnr(nodeId string) (string, error) { + return p.GetEnr(nodeId) +} + +func (p *API) StateDeleteEnr(nodeId string) (bool, error) { + return p.DeleteEnr(nodeId) +} + +func (p *API) StateLookupEnr(nodeId string) (string, error) { + return p.LookupEnr(nodeId) +} + +func (p *API) StatePing(enr string) (*discover.PortalPongResp, error) { + return p.Ping(enr) +} + +func (p *API) StateFindNodes(enr string, distances []uint) ([]string, error) { + return p.FindNodes(enr, distances) +} + +func (p *API) StateFindContent(enr string, contentKey string) (interface{}, error) { + return p.FindContent(enr, contentKey) +} + +func (p *API) StateOffer(enr string, contentKey string, contentValue string) (string, error) { + return p.Offer(enr, contentKey, contentValue) +} + +func (p *API) StateRecursiveFindNodes(nodeId string) ([]string, error) { + return p.RecursiveFindNodes(nodeId) +} + +func (p *API) StateRecursiveFindContent(contentKeyHex string) (*discover.ContentInfo, error) { + return p.RecursiveFindContent(contentKeyHex) +} + +func (p *API) StateLocalContent(contentKeyHex string) (string, error) { + return p.LocalContent(contentKeyHex) +} + +func (p *API) StateStore(contentKeyHex string, contextHex string) (bool, error) { + return p.Store(contentKeyHex, contextHex) +} + +func (p *API) StateGossip(contentKeyHex, contentHex string) (int, error) { + return p.Gossip(contentKeyHex, contentHex) +} + +func (p *API) StateTraceRecursiveFindContent(contentKeyHex string) (*discover.TraceContentResult, error) { + return p.TraceRecursiveFindContent(contentKeyHex) +} + +func NewStateNetworkAPI(portalProtocolAPI *discover.PortalProtocolAPI) *API { + return &API{ + portalProtocolAPI, + } +} diff --git a/portalnetwork/state/network.go b/portalnetwork/state/network.go new file mode 100644 index 000000000000..8e793dc26c54 --- /dev/null +++ b/portalnetwork/state/network.go @@ -0,0 +1,77 @@ +package state + +import ( + "context" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +type StateNetwork struct { + portalProtocol *discover.PortalProtocol + closeCtx context.Context + closeFunc context.CancelFunc + log log.Logger +} + +func NewStateNetwork(portalProtocol *discover.PortalProtocol) *StateNetwork { + ctx, cancel := context.WithCancel(context.Background()) + + return &StateNetwork{ + portalProtocol: portalProtocol, + closeCtx: ctx, + closeFunc: cancel, + log: log.New("sub-protocol", "state"), + } +} + +func (h *StateNetwork) Start() error { + err := h.portalProtocol.Start() + if err != nil { + return err + } + go h.processContentLoop(h.closeCtx) + h.log.Debug("state network start successfully") + return nil +} + +func (h *StateNetwork) Stop() { + h.closeFunc() + h.portalProtocol.Stop() +} + +func (h *StateNetwork) processContentLoop(ctx context.Context) { + contentChan := h.portalProtocol.GetContent() + for { + select { + case <-ctx.Done(): + return + case contentElement := <-contentChan: + err := h.validateContents(contentElement.ContentKeys, contentElement.Contents) + if err != nil { + h.log.Error("validate content failed", "err", err) + continue + } + + go func(ctx context.Context) { + select { + case <-ctx.Done(): + return + default: + var gossippedNum int + gossippedNum, err = h.portalProtocol.Gossip(&contentElement.Node, contentElement.ContentKeys, contentElement.Contents) + h.log.Trace("gossippedNum", "gossippedNum", gossippedNum) + if err != nil { + h.log.Error("gossip failed", "err", err) + return + } + } + }(ctx) + } + } +} + +func (h *StateNetwork) validateContents(contentKeys [][]byte, contents [][]byte) error { + // TODO + panic("implement me") +} diff --git a/portalnetwork/storage/config.go b/portalnetwork/storage/config.go index 3dad16f24465..13a5e53f0c5b 100644 --- a/portalnetwork/storage/config.go +++ b/portalnetwork/storage/config.go @@ -12,4 +12,5 @@ type PortalStorageConfig struct { DB *sql.DB NodeId enode.ID Spec *common.Spec + NetworkName string }