Skip to content

Commit

Permalink
Merge pull request #382 from meshplus/feat/adapt_one2Multi
Browse files Browse the repository at this point in the history
feat: adapt one to multi IBTP
  • Loading branch information
peterzhuzhuzhuz authored Jan 6, 2023
2 parents 932fda9 + d18aef3 commit 266d25e
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 57 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/libp2p/go-libp2p-core v0.6.1
github.com/meshplus/bitxhub-core v1.3.1-0.20221216095518-2b59e2647227
github.com/meshplus/bitxhub-kit v1.2.1-0.20220412092457-5836414df781
github.com/meshplus/bitxhub-model v1.2.1-0.20221216071922-172cad9ff0c6
github.com/meshplus/bitxhub-model v1.2.1-0.20230103095329-f8638b97544e
github.com/meshplus/go-bitxhub-client v1.4.1-0.20220412093230-11ca79f069fc
github.com/meshplus/go-lightp2p v0.0.0-20221205091217-47f605aa3067
github.com/mitchellh/go-homedir v1.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,8 @@ github.com/meshplus/bitxhub-model v1.2.1-0.20220412064024-c35cae241eb2/go.mod h1
github.com/meshplus/bitxhub-model v1.2.1-0.20221216071442-fd226b10045c/go.mod h1:EehJ/neXJcXy0o1jglfdD1/vsU7tsXveNr9pGZTDtrY=
github.com/meshplus/bitxhub-model v1.2.1-0.20221216071922-172cad9ff0c6 h1:UX6pc0ml0VVSXI8toZOgy7XiRrM4OMG1mykx5DWtZWI=
github.com/meshplus/bitxhub-model v1.2.1-0.20221216071922-172cad9ff0c6/go.mod h1:EehJ/neXJcXy0o1jglfdD1/vsU7tsXveNr9pGZTDtrY=
github.com/meshplus/bitxhub-model v1.2.1-0.20230103095329-f8638b97544e h1:MSaLAMmzZpmlCzZIp3I6jxpxpMasB3SpH8w/wvr4aRQ=
github.com/meshplus/bitxhub-model v1.2.1-0.20230103095329-f8638b97544e/go.mod h1:EehJ/neXJcXy0o1jglfdD1/vsU7tsXveNr9pGZTDtrY=
github.com/meshplus/go-bitxhub-client v1.4.1-0.20220412093230-11ca79f069fc h1:3J52XPtUFf0gvJHn61EBr+pI268/edQv544w+TWgEwU=
github.com/meshplus/go-bitxhub-client v1.4.1-0.20220412093230-11ca79f069fc/go.mod h1:pvV1elDWb8ITsOccfJa6CXEHHHb7yURmtGSOngTPL8c=
github.com/meshplus/go-lightp2p v0.0.0-20200817105923-6b3aee40fa54/go.mod h1:G89UJaeqCQFxFdp8wzy1AdKfMtDEhpySau0pjDNeeaw=
Expand Down
2 changes: 1 addition & 1 deletion internal/adapt/appchain_adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (a *AppchainAdapter) SendIBTP(ibtp *pb.IBTP) error {
a.logger.Info("appchain adapter submit ibtp success")
} else {
result := &pb.Result{}
if proof.TxStatus != pb.TransactionStatus_BEGIN {
if proof.TxStatus == pb.TransactionStatus_BEGIN_FAILURE || proof.TxStatus == pb.TransactionStatus_BEGIN_ROLLBACK {
content := &pb.Content{}
if err := content.Unmarshal(pd.Content); err != nil {
return fmt.Errorf("unmarshal content of ibtp %s: %w", ibtp.ID(), err)
Expand Down
1 change: 0 additions & 1 deletion internal/adapt/appchain_adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/meshplus/pier/internal/adapt"

"github.com/meshplus/pier/internal/repo"

"github.com/hashicorp/go-plugin"
Expand Down
159 changes: 109 additions & 50 deletions internal/adapt/bxh_adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
rpcx "github.com/meshplus/go-bitxhub-client"
"github.com/meshplus/pier/internal/adapt"
"github.com/meshplus/pier/internal/repo"
"github.com/meshplus/pier/internal/utils"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -123,7 +124,11 @@ func (b *BxhAdapter) QueryIBTP(id string, isReq bool) (*pb.IBTP, error) {
return nil, err
}

// TODO: figure out what scenes will trigger
// when pier broken, if multi IBTP need rollback, but src chain does not end rollback
// 1. pier restart and handleMissing IBTP
// 2. bxh adapter will queryIBTP(isReq is false) because of src adapter receipt is lower than bxh adapter
// 3. bxh status is not begin(Global state have been changed)
// 4. need get Interchain for rollback
if txStatus != pb.TransactionStatus_BEGIN {
ibtp, err = b.getIBTPByID(id, true)
if err != nil {
Expand Down Expand Up @@ -177,7 +182,7 @@ func (b *BxhAdapter) SendIBTP(ibtp *pb.IBTP) error {
"ibtp_id": ibtp.ID(),
"ibtp_type": ibtp.Type,
"msg": string(receipt.Ret),
}).Error("Receipt result for ibtp")
}).Error("get Receipt from bxh")
// if no rule bind for this appchain or appchain not available, exit pier
errMsg := string(receipt.Ret)

Expand All @@ -200,48 +205,6 @@ func (b *BxhAdapter) SendIBTP(ibtp *pb.IBTP) error {
return nil
}

/* redundant code:
** if target err occur, bxh will actively notify src and dest appchain rollback. bxh receipt type is success,
** There is no need for pier actively handle error.
*/

//// if target service is unavailable(maybe enter wrong format serviceID), just throw error
//if strings.Contains(errMsg, InvalidTargetService) {
// b.logger.Errorf("ignore invalid IBTP ID: invalid target service: %s", errMsg)
// return nil
//}

//// if target chain is not available, this ibtp should be rollback
//if strings.Contains(errMsg, TargetAppchainNotAvailable) ||
// strings.Contains(errMsg, TargetServiceNotAvailable) ||
// strings.Contains(errMsg, TargetBitXHubNotAvailable) {
// var isReq bool
// switch ibtp.Category() {
// case pb.IBTP_REQUEST:
// isReq = true
// case pb.IBTP_RESPONSE:
// isReq = false
// default:
// b.logger.Warnf("ignore invalid IBTP ID %s", ibtp.ID())
// return nil
// }
//
// // get multiSign
// if err := retry.Retry(func(attempt uint) error {
// proof, err := b.getMultiSign(ibtp, isReq)
// if err != nil {
// return fmt.Errorf("multiSign %w", err)
// }
// ibtp.Proof = proof
// return nil
// }, strategy.Wait(1*time.Second)); err != nil {
// b.logger.Errorf("get ibtp multiSign from bxh err: %w", err)
// }
//
// b.ibtpC <- ibtp
// return nil
//}

// check index
if strings.Contains(errMsg, ibtpIndexExist) {
// if ibtp index is lower than index recorded on bitxhub, then ignore this ibtp
Expand All @@ -250,13 +213,24 @@ func (b *BxhAdapter) SendIBTP(ibtp *pb.IBTP) error {

if strings.Contains(errMsg, ibtpIndexWrong) {
// if index is wrong , means appchain's ibtp_index > bxh's ibtp_index+1
// TODO: redundant code? maybe it's impossible to occur?
b.logger.Error("invalid ibtp %s", ibtp.ID())
// it may happen in one to multi IBTP:
// for ex: service3-9 is waiting for service2-9's receipt,
// but service3-10's receipt arrived before than service2-9's receipt,
// because it is not received service2-9's receipt in bxh,
// so all child ibtp receipt counter is not updated(still 8);
// when bxh receive service3-10's receipt, check ibtp index will return error

//b.logger.Errorf("invalid ibtp %s", ibtp.ID())
retErr = &adapt.SendIbtpError{
Err: errMsg,
Status: adapt.Index_Wrong,
}

go func() {
err1 := b.handleIndexWrong(ibtp)
if err1 != nil {
b.logger.Errorf("handleIndexWrong err:", err1)
}
}()
return nil
}
if strings.Contains(errMsg, invalidIBTP) {
Expand All @@ -277,6 +251,15 @@ func (b *BxhAdapter) SendIBTP(ibtp *pb.IBTP) error {
return nil
}

// if the receipt had already rollback need not to rollback
if strings.Contains(errMsg, otherRollback) {
retErr = &adapt.SendIbtpError{
Err: errMsg,
Status: adapt.Other_Error,
}
return nil
}

// when bxh trigger timeout rollback, destPier's bxhAdapter want to send receipt to bxh.
// bxhAdapter will receive ibtpRollback error from bxh, bxhAdapter processing steps are as follows:
// step1: bxhAdapter QueryIBTP() from bxh, return the ibtp which receive same ibtp sending from srcAppchain to bxh.
Expand All @@ -286,7 +269,7 @@ func (b *BxhAdapter) SendIBTP(ibtp *pb.IBTP) error {
// step4: destAppchain receive the ibtp, will rollback according to the txStatus(TransactionStatus_BEGIN_ROLLBACK).
// step5: desAppchain rollback finished, send receipt(type is pb.IBTP_RECEIPT_ROLLBACK) to bxh.
// step6: bxh change status to TransactionStatus_ROLLBACK, it's done.
if strings.Contains(errMsg, ibtpRollback) {
if strings.Contains(errMsg, ibtpRollback) || strings.Contains(errMsg, ibtpDstRollback) {
b.logger.WithFields(logrus.Fields{
"id": ibtp.ID(),
"err": errMsg,
Expand Down Expand Up @@ -511,7 +494,7 @@ func (b *BxhAdapter) handleInterchainTxWrapper(w *pb.InterchainTxWrapper, i int)
b.logger.WithFields(logrus.Fields{
"ibtp_id": ibtp.ID(),
"type": ibtp.Type,
}).Debugf("Sync IBTP from bitxhub")
}).Info("Sync IBTP from bitxhub")

var isReq bool
switch ibtp.Category() {
Expand All @@ -526,6 +509,7 @@ func (b *BxhAdapter) handleInterchainTxWrapper(w *pb.InterchainTxWrapper, i int)

proof, err := b.getSign(ibtp, isReq)
if err != nil {
b.logger.WithFields(logrus.Fields{"id": ibtp.ID(), "type": ibtp.Type}).Errorf("get sign err:%s", err)
return false
}
ibtp.Proof = proof
Expand All @@ -552,7 +536,28 @@ func (b *BxhAdapter) handleInterchainTxWrapper(w *pb.InterchainTxWrapper, i int)

for _, id := range w.MultiTxIbtps {
if err := retry.Retry(func(attempt uint) error {
ibtp, err := b.QueryIBTP(id, true)
var isReq bool
// if dest pier get MultiTxIbtps, need dst chain Rollback
// so query child Interchain ibtp
_, to, _, err := utils.ParseIBTPID(id)
if err != nil {
b.logger.Errorf("MultiTxIbtps parse ibtp id[%s] err:%s", id, err)
return err
}
_, chainID, _, err := utils.ParseFullServiceID(to)
if err != nil {
b.logger.Errorf("MultiTxIbtps parse fullServiceid[%s] err:%s", to, err)
return err
}
if chainID == b.appchainId {
isReq = true
b.logger.Warningf("dest pier get MultiTxIbtps [%s], "+
"need query interchain for dest chain rollback", id)
}

// for src pier, need handle all MultiIBTPs
// so query child ibtp receipt
ibtp, err := b.QueryIBTP(id, isReq)
if err != nil {
b.logger.Warnf("query multitx ibtp %s: %v", ibtp, err)
} else {
Expand All @@ -569,6 +574,7 @@ func (b *BxhAdapter) handleInterchainTxWrapper(w *pb.InterchainTxWrapper, i int)
"count": len(w.Transactions),
"index": i,
"timeout IDs": w.TimeoutIbtps,
"multi IDs": w.MultiTxIbtps,
}).Info("Handle interchain tx wrapper")
return true
}
Expand Down Expand Up @@ -700,3 +706,56 @@ func (b *BxhAdapter) getIBTPByID(id string, isReq bool) (*pb.IBTP, error) {

return retIBTP, nil
}

func (b *BxhAdapter) getReceiptCounterByIBTPID(fromId, toId string) (uint64, error) {
queryTx, err := b.client.GenerateContractTx(pb.TransactionData_BVM, constant.InterchainContractAddr.Address(),
"GetInterchain", rpcx.String(fromId))
if err != nil {
return 0, err
}
queryTx.Nonce = 1
receipt, err := b.client.SendView(queryTx)
if err != nil {
return 0, err
}

if !receipt.IsSuccess() {
return 0, fmt.Errorf("get ibtp[%s] Interchain Counter failed", fromId)
}

ic := &pb.Interchain{}
err = ic.Unmarshal(receipt.Ret)
if err != nil {
return 0, fmt.Errorf("unmarshal interchain err: %s", err)
}

return ic.ReceiptCounter[toId], nil
}

func (b *BxhAdapter) handleIndexWrong(ibtp *pb.IBTP) error {
if ibtp.Category() != pb.IBTP_RESPONSE {
return nil
}
if err := retry.Retry(func(attempt uint) error {
bxhIndex, err := b.getReceiptCounterByIBTPID(ibtp.From, ibtp.To)
if err != nil {
return err
}
if bxhIndex+1 < ibtp.Index {
err = fmt.Errorf("current index is too high, bxh counter is %d, current %d", bxhIndex, ibtp.Index)
b.logger.Warningf("retry handleIndexWrong:%s", err.Error())
return err
}
return nil
}, strategy.Wait(5*time.Second)); err != nil {
b.logger.Errorf("Retry to get tx status")
}

b.logger.WithFields(logrus.Fields{"id": ibtp.ID(), "index": ibtp.Index}).Warningf("handleIndexWrong: start send ibtp")
err := b.SendIBTP(ibtp)
if err != nil {
b.logger.Errorf("handleIndexWrong, retry send ibtp err:%s", err)
return err
}
return nil
}
2 changes: 2 additions & 0 deletions internal/adapt/bxh_adapter/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
noBindRule = "appchain didn't register rule"
InvalidTargetService = "invalid target service"
ibtpRollback = "state BEGIN_ROLLBACK get unexpected receipt"
ibtpDstRollback = "state BEGIN_FAILURE get unexpected receipt"
otherRollback = "state ROLLBACK get unexpected receipt"
)

func getTxView(client rpcx.Client, tx *pb.BxhTransaction) []byte {
Expand Down
2 changes: 1 addition & 1 deletion internal/exchanger/exchanger.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (ex *Exchanger) listenIBTPFromSrcAdapt(servicePair string) {
// if err occurs, try to get new ibtp and resend
if err, ok := err.(*adapt.SendIbtpError); ok {
if err.NeedRetry() {
ex.logger.Errorf("send IBTP to Adapt:%s", ex.destAdaptName, "error", err.Error())
ex.logger.Errorf("send IBTP to Adapt:%s err:%s", ex.destAdaptName, err.Error())
// query to new ibtp
ibtp = ex.queryIBTP(ex.srcAdapt, ibtp.ID(), ex.isIBTPBelongSrc(ibtp))
return fmt.Errorf("retry sending ibtp")
Expand Down
6 changes: 3 additions & 3 deletions internal/exchanger/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package exchanger

import (
"fmt"
"strings"
"time"

"github.com/Rican7/retry"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
Expand All @@ -11,8 +14,6 @@ import (
"github.com/meshplus/pier/internal/adapt/appchain_adapter"
"github.com/meshplus/pier/internal/repo"
"github.com/sirupsen/logrus"
"strings"
"time"
)

func (ex *Exchanger) handleMissingIBTPByServicePair(begin, end uint64, fromAdapt, toAdapt adapt.Adapt, srcService, targetService string, isReq bool) {
Expand Down Expand Up @@ -57,7 +58,6 @@ func (ex *Exchanger) recover(srcServiceMeta map[string]*pb.Interchain, destServi
// handle src -> dest
ex.logger.Info("Start To Recover IBTPs!")
for _, interchain := range srcServiceMeta {

for k, count := range interchain.InterchainCounter {
destCount, ok := destServiceMeta[interchain.ID].InterchainCounter[k]
if !ok {
Expand Down

0 comments on commit 266d25e

Please sign in to comment.