Skip to content

Commit

Permalink
380: Get lacking parent vertex from the connected nodes. (#382)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartossh authored Mar 6, 2024
1 parent 3316617 commit 5e009ee
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 81 deletions.
Binary file modified artefacts/test_wallet
Binary file not shown.
1 change: 1 addition & 0 deletions protobuf/gossip.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ service GossipAPI {
rpc Discover(ConnectionData) returns (ConnectedNodes) {}
rpc GossipVrx(VrxMsgGossip) returns (google.protobuf.Empty) {}
rpc GossipTrx(TrxMsgGossip) returns (google.protobuf.Empty) {}
rpc GetVertex(SignedHash) returns (Vertex) {}
}
13 changes: 12 additions & 1 deletion src/accountant/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
ErrTransferringfundsFailure = errors.New("transferring spice failure")
ErrEntityNotfund = errors.New("entity not fund")
ErrBreak = errors.New("just break")
ErrParentDoesNotExists = errors.New("parent doesn't exists")
)

type signatureVerifier interface {
Expand Down Expand Up @@ -556,7 +557,7 @@ func (ab *AccountingBook) addLeafMemorized(ctx context.Context, m memory) error
)
return ErrLeafRejected
}
return nil
return ErrParentDoesNotExists
}
existringLeaf, ok := item.(*Vertex)
if !ok {
Expand Down Expand Up @@ -1115,3 +1116,13 @@ func (ab *AccountingBook) ReadDAGTransactionsByAddress(ctx context.Context, addr
func (ab *AccountingBook) Address() string {
return ab.signer.Address()
}

// ReadVertex reads vertex by its hash from the vertex if exists or returns error otherwise.
func (ab *AccountingBook) ReadVertex(ctx context.Context, h [32]byte) (Vertex, error) {
v, err := ab.readVertex(h[:])
if err != nil {
ab.log.Info(fmt.Sprintf("reading vertex hash %v failed, %s", h, err.Error()))
return Vertex{}, ErrVertexHashNotfund
}
return v, nil
}
4 changes: 2 additions & 2 deletions src/accountant/replier.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
)

const (
maxArraySize = 250
maxRepeats = 5
maxArraySize = 500
maxRepeats = 25
)

const longevity = time.Minute
Expand Down
14 changes: 9 additions & 5 deletions src/cmd/wallet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/bartossh/Computantis/src/walletmiddleware"
"github.com/pterm/pterm"
"github.com/urfave/cli/v2"
"golang.org/x/exp/rand"
)

const (
Expand All @@ -25,8 +26,6 @@ const (
actionReadAddress
)

const pauseDuration = time.Second * 2

const usage = `Wallet CLI tool allows to create a new Wallet or act on the local Wallet by using keys from different formats and transforming them between formats.
Please use with the best security practices. GOBINARY is safer to move between machines as this file format is encrypted with AES key.
Tool provides Spice and Contract transfer, reading balance, reading contracts, approving and rejecting contracts.`
Expand Down Expand Up @@ -305,7 +304,7 @@ func runTransactionOps(cfg fileoperations.Config, nodeURL string) error {
continue
}
spinnerInfo, _ := pterm.DefaultSpinner.Start("Sending transaction ...")
time.Sleep(pauseDuration)
time.Sleep(getPause())
if err := c.ProposeTransaction(ctx, receiver, subject, melange, []byte{}); err != nil {
spinnerInfo.Stop()
printError(fmt.Errorf("cannot propose transaction due to, %e", err))
Expand All @@ -315,7 +314,7 @@ func runTransactionOps(cfg fileoperations.Config, nodeURL string) error {
printSuccess()
case "Check balance":
spinnerInfo, _ := pterm.DefaultSpinner.Start("Checking balance ...")
time.Sleep(pauseDuration)
time.Sleep(getPause())
melange, err := c.ReadBalance(ctx)
if err != nil {
spinnerInfo.Stop()
Expand All @@ -332,7 +331,7 @@ func runTransactionOps(cfg fileoperations.Config, nodeURL string) error {
printSuccess()
case "Read Transactions":
spinnerInfo, _ := pterm.DefaultSpinner.Start("Reading transactions ...")
time.Sleep(pauseDuration)
time.Sleep(getPause())
transactions, err := c.ReadDAGTransactions(ctx)
if err != nil {
spinnerInfo.Stop()
Expand Down Expand Up @@ -453,3 +452,8 @@ func printWarning(warning string) {
pterm.Warning.Printf(" %s\n", warning)
pterm.Warning.Println("")
}

func getPause() time.Duration {
p := rand.Intn(5)
return time.Duration(p) * time.Second
}
115 changes: 89 additions & 26 deletions src/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"time"

"github.com/bartossh/Computantis/src/accountant"
Expand All @@ -35,17 +36,19 @@ var (
ErrNilTrx = errors.New("transaction is nil")
ErrNilTrxMsgGossip = errors.New("transaction gossip is nil")
ErrNilSignedHash = errors.New("signed hash is nil")
ErrInvalidSignature = errors.New("invalid signature")
)

const (
vertexCacheLongevity = time.Second * 10
)

const (
vertexGossipChCapacity = 100
trxGossipChCappacity = 100
rejectHashChCapacity = 80
totalRetries = 10
vertexGossipChCapacity = 100
trxGossipChCappacity = 100
rejectHashChCapacity = 80
totalRetries = 10
maxVertexParentProcessingCount = 250
)

type nodeData struct {
Expand Down Expand Up @@ -81,6 +84,7 @@ type accounter interface {
StreamDAG(ctx context.Context) <-chan *accountant.Vertex
LoadDag(cancelF context.CancelCauseFunc, cVrx <-chan *accountant.Vertex)
DagLoaded() bool
ReadVertex(ctx context.Context, h [32]byte) (accountant.Vertex, error)
}

type piper interface {
Expand All @@ -90,17 +94,18 @@ type piper interface {

type gossiper struct {
protobufcompiled.UnimplementedGossipAPIServer
accounter accounter
verifier signatureVerifier
signer accountant.Signer
log logger.Logger
trxCache providers.AwaitedTrxCacheProviderBalanceCacher
flash providers.FlashbackMemoryHashProviderAddressRemover
piper piper
nodes map[string]nodeData
url string
mux sync.RWMutex
timeout time.Duration
accounter accounter
verifier signatureVerifier
signer accountant.Signer
log logger.Logger
trxCache providers.AwaitedTrxCacheProviderBalanceCacher
flash providers.FlashbackMemoryHashProviderAddressRemover
piper piper
nodes map[string]nodeData
url string
mux sync.RWMutex
timeout time.Duration
processingParentCount atomic.Int32
}

// RunGRPC runs the service application that exposes the GRPC API for gossip protocol.
Expand All @@ -117,17 +122,18 @@ func RunGRPC(ctx context.Context, cfg Config, l logger.Logger, t time.Duration,
defer cancel()

g := gossiper{
accounter: a,
verifier: v,
signer: s,
log: l,
trxCache: trxCache,
flash: flash,
piper: p,
nodes: make(map[string]nodeData),
url: cfg.URL,
mux: sync.RWMutex{},
timeout: t,
accounter: a,
verifier: v,
signer: s,
log: l,
trxCache: trxCache,
flash: flash,
piper: p,
nodes: make(map[string]nodeData),
url: cfg.URL,
mux: sync.RWMutex{},
timeout: t,
processingParentCount: atomic.Int32{},
}

switch cfg.LoadDagURL {
Expand Down Expand Up @@ -402,6 +408,19 @@ func (g *gossiper) GossipTrx(ctx context.Context, tg *protobufcompiled.TrxMsgGos
return &emptypb.Empty{}, nil
}

// GetVertex reads vertex from the accountant internal DAG by its hash if exists or returns error otherwise.
func (g *gossiper) GetVertex(ctx context.Context, in *protobufcompiled.SignedHash) (*protobufcompiled.Vertex, error) {
if err := g.verifier.Verify(in.Data, in.Signature, [32]byte(in.Hash), in.Address); err != nil {
g.log.Error(fmt.Sprintf("get vertex endpoint failed to verify signature of transaction [ %x ] for address: %s, %s", in.Hash, in.Address, err))
return nil, ErrInvalidSignature
}
vrx, err := g.accounter.ReadVertex(ctx, [32]byte(in.Data))
if err != nil {
return nil, err
}
return mapAccountantVertexToProtoVertex(&vrx), nil
}

func (g *gossiper) updateDag(ctx context.Context, url string) error {
nd, err := connectToNode(url)
if err != nil {
Expand Down Expand Up @@ -611,12 +630,56 @@ func (g *gossiper) updateNodesConnectionsFromGensisNode(ctx context.Context, gen
return nil
}

func (g *gossiper) processLackingParent(ctx context.Context, h [32]byte) {
if g.processingParentCount.Load() == maxVertexParentProcessingCount {
return
}
g.processingParentCount.Add(1)

digest, signature := g.signer.Sign(h[:])
for _, nd := range g.nodes {

vrx, err := nd.client.GetVertex(ctx, &protobufcompiled.SignedHash{
Address: g.signer.Address(),
Data: h[:],
Hash: digest[:],
Signature: signature,
})
if err != nil || vrx == nil {
continue
}
v := mapProtoVertexToAccountantVertex(vrx)
if err := g.accounter.AddLeaf(ctx, &v); err != nil {
if errors.Is(err, accountant.ErrParentDoesNotExists) {
parentHases := [][32]byte{v.LeftParentHash}
if v.LeftParentHash != v.RightParentHash {
parentHases = append(parentHases, v.RightParentHash)
}
for _, parentHash := range parentHases {
go g.processLackingParent(ctx, parentHash)
}
}
}
break
}
g.processingParentCount.Add(-1)
}

func (g *gossiper) sendToAccountant(ctx context.Context, vg *protobufcompiled.Vertex) error {
if vg.Transaction == nil {
return ErrNilTrx
}
v := mapProtoVertexToAccountantVertex(vg)
if err := g.accounter.AddLeaf(ctx, &v); err != nil {
if errors.Is(err, accountant.ErrParentDoesNotExists) {
parentHases := [][32]byte{v.LeftParentHash}
if v.LeftParentHash != v.RightParentHash {
parentHases = append(parentHases, v.RightParentHash)
}
for _, parentHash := range parentHases {
go g.processLackingParent(ctx, parentHash)
}
}
return err
}
return nil
Expand Down
37 changes: 22 additions & 15 deletions src/protobufcompiled/gossip.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5e009ee

Please sign in to comment.