Skip to content

Commit

Permalink
feat: use internal p2p in mtm
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain committed Dec 18, 2024
1 parent 2cb576b commit 9a0966e
Show file tree
Hide file tree
Showing 27 changed files with 653 additions and 1,466 deletions.
54 changes: 29 additions & 25 deletions cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"go.opentelemetry.io/otel/attribute"

"github.com/libsv/go-p2p"
"github.com/ordishs/go-bitcoin"
"google.golang.org/grpc"

Expand All @@ -26,9 +25,12 @@ import (
"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_jetstream"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/nats_connection"
"github.com/bitcoin-sv/arc/internal/metamorph"
"github.com/bitcoin-sv/arc/internal/metamorph/bcnet/metamorph_p2p"
"github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api"
"github.com/bitcoin-sv/arc/internal/metamorph/store"
"github.com/bitcoin-sv/arc/internal/metamorph/store/postgresql"

"github.com/bitcoin-sv/arc/internal/p2p"
"github.com/bitcoin-sv/arc/internal/version"
)

Expand All @@ -45,9 +47,8 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore

var (
metamorphStore store.MetamorphStore
peerHandler *metamorph.PeerHandler
pm metamorph.PeerManager
statusMessageCh chan *metamorph.TxStatusMessage
pm *p2p.PeerManager
statusMessageCh chan *metamorph_p2p.TxStatusMessage
mqClient metamorph.MessageQueue
processor *metamorph.Processor
server *metamorph.Server
Expand Down Expand Up @@ -84,7 +85,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore

stopFn := func() {
logger.Info("Shutting down metamorph")
disposeMtm(logger, server, processor, peerHandler, mqClient, metamorphStore, healthServer, shutdownFns)
disposeMtm(logger, server, processor, pm, mqClient, metamorphStore, healthServer, shutdownFns)
logger.Info("Shutdown complete")
}

Expand All @@ -93,7 +94,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
return nil, fmt.Errorf("failed to create metamorph store: %v", err)
}

pm, peerHandler, statusMessageCh, err = initPeerManager(logger, metamorphStore, arcConfig)
pm, statusMessageCh, err = initPeerManager(logger, metamorphStore, arcConfig)
if err != nil {
stopFn()
return nil, err
Expand Down Expand Up @@ -165,7 +166,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
processor, err = metamorph.NewProcessor(
metamorphStore,
cacheStore,
pm,
p2p.NewNetworkMessenger(logger, pm),
statusMessageCh,
processorOpts...,
)
Expand Down Expand Up @@ -276,50 +277,53 @@ func NewMetamorphStore(dbConfig *config.DbConfig, tracingConfig *config.TracingC
return s, err
}

func initPeerManager(logger *slog.Logger, s store.MetamorphStore, arcConfig *config.ArcConfig) (p2p.PeerManagerI, *metamorph.PeerHandler, chan *metamorph.TxStatusMessage, error) {
func initPeerManager(logger *slog.Logger, s store.MetamorphStore, arcConfig *config.ArcConfig) (*p2p.PeerManager, chan *metamorph_p2p.TxStatusMessage, error) {
network, err := config.GetNetwork(arcConfig.Network)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to get network: %v", err)
return nil, nil, fmt.Errorf("failed to get network: %v", err)
}

logger.Info("Assuming bitcoin network", "network", network)

messageCh := make(chan *metamorph.TxStatusMessage, 10000)
messageCh := make(chan *metamorph_p2p.TxStatusMessage, 10000)
var pmOpts []p2p.PeerManagerOptions
if arcConfig.Metamorph.MonitorPeers {
pmOpts = append(pmOpts, p2p.WithRestartUnhealthyPeers())
}

pm := p2p.NewPeerManager(logger.With(slog.String("module", "peer-handler")), network, pmOpts...)
pm := p2p.NewPeerManager(logger.With(slog.String("module", "peer-mng")), network, pmOpts...)

peerHandler := metamorph.NewPeerHandler(s, messageCh)
msgHandler := metamorph_p2p.NewMsgHandler(logger.With(slog.String("module", "peer-msg-handler")), s, messageCh)

peerOpts := []p2p.PeerOptions{
p2p.WithRetryReadWriteMessageInterval(5 * time.Second),
p2p.WithPingInterval(30*time.Second, 1*time.Minute),
p2p.WithPingInterval(30*time.Second, 2*time.Minute),
p2p.WithNrOfWriteHandlers(8),
p2p.WithWriteChannelSize(4096),
}

if version.Version != "" {
peerOpts = append(peerOpts, p2p.WithUserAgent("ARC", version.Version))
}

l := logger.With(slog.String("module", "peer"))
for _, peerSetting := range arcConfig.Broadcasting.Unicast.Peers {
peerURL, err := peerSetting.GetP2PUrl()
if err != nil {
return nil, nil, nil, fmt.Errorf("error getting peer url: %v", err)
return nil, nil, fmt.Errorf("error getting peer url: %v", err)
}

var peer *p2p.Peer
peer, err = p2p.NewPeer(logger.With(slog.String("module", "peer")), peerURL, peerHandler, network, peerOpts...)
if err != nil {
return nil, nil, nil, fmt.Errorf("error creating peer %s: %v", peerURL, err)
peer := p2p.NewPeer(l, msgHandler, peerURL, network, peerOpts...)
ok := peer.Connect()
if !ok {
return nil, nil, fmt.Errorf("cannot connect to peer %s", peerURL)
}

if err = pm.AddPeer(peer); err != nil {
return nil, nil, nil, fmt.Errorf("error adding peer %s: %v", peerURL, err)
return nil, nil, fmt.Errorf("error adding peer %s: %v", peerURL, err)
}
}

return pm, peerHandler, messageCh, nil
return pm, messageCh, nil
}

func initGrpcCallbackerConn(address, prometheusEndpoint string, grpcMsgSize int, tracingConfig *config.TracingConfig) (callbacker_api.CallbackerAPIClient, error) {
Expand All @@ -336,14 +340,14 @@ func initGrpcCallbackerConn(address, prometheusEndpoint string, grpcMsgSize int,
}

func disposeMtm(l *slog.Logger, server *metamorph.Server, processor *metamorph.Processor,
peerHandler *metamorph.PeerHandler, mqClient metamorph.MessageQueue,
peerManaager *p2p.PeerManager, mqClient metamorph.MessageQueueClient,
metamorphStore store.MetamorphStore, healthServer *grpc_opts.GrpcServer,
shutdownFns []func(),
) {
// dispose the dependencies in the correct order:
// 1. server - ensure no new request will be received
// 2. processor - ensure all started job are complete
// 3. peerHandler
// 3. peerManaager
// 4. mqClient
// 5. store
// 6. healthServer
Expand All @@ -355,8 +359,8 @@ func disposeMtm(l *slog.Logger, server *metamorph.Server, processor *metamorph.P
if processor != nil {
processor.Shutdown()
}
if peerHandler != nil {
peerHandler.Shutdown()
if peerManaager != nil {
peerManaager.Shutdown()
}
if mqClient != nil {
mqClient.Shutdown()
Expand Down
2 changes: 1 addition & 1 deletion config/example_config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
logLevel: DEBUG # mode of logging. Value can be one of TRACE | DEBUG | INFO | WARN | ERROR
logFormat: text # format of logging. Value can be one of text | json | tint
logLevel: DEBUG # mode of logging. Value can be one of TRACE | DEBUG | INFO | WARN | ERROR
profilerAddr: localhost:9999 # address to start profiler server on (optional)
prometheus:
enabled: false # if true, then prometheus metrics are enabled
Expand Down
5 changes: 0 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down Expand Up @@ -120,8 +119,6 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/runc v1.1.14 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/ordishs/go-utils v1.0.51 // indirect
github.com/paulmach/orb v0.9.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/perimeterx/marshmallow v1.1.5 // indirect
Expand All @@ -141,8 +138,6 @@ require (
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
Expand Down
13 changes: 0 additions & 13 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ github.com/ClickHouse/ch-go v0.55.0 h1:jw4Tpx887YXrkyL5DfgUome/po8MLz92nz2heOQ6R
github.com/ClickHouse/ch-go v0.55.0/go.mod h1:kQT2f+yp2p+sagQA/7kS6G3ukym+GQ5KAu1kuFAFDiU=
github.com/ClickHouse/clickhouse-go/v2 v2.9.1 h1:IeE2bwVvAba7Yw5ZKu98bKI4NpDmykEy6jUaQdJJCk8=
github.com/ClickHouse/clickhouse-go/v2 v2.9.1/go.mod h1:teXfZNM90iQ99Jnuht+dxQXCuhDZ8nvvMoTJOFrcmcg=
github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM=
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
Expand Down Expand Up @@ -142,8 +140,6 @@ github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwn
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0 h1:CWyXh/jylQWp2dtiV33mY4iSSp6yf4lmn+c7/tN+ObI=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0/go.mod h1:nCLIt0w3Ept2NwF8ThLmrppXsfT07oC8k0XNDxd8sVU=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 h1:MJG/KsmcqMwFAkh8mTnAwhyKoB+sTAnY4CACC110tbU=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down Expand Up @@ -271,12 +267,8 @@ github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQ
github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM=
github.com/opencontainers/runc v1.1.14 h1:rgSuzbmgz5DUJjeSnw337TxDbRuqjs6iqQck/2weR6w=
github.com/opencontainers/runc v1.1.14/go.mod h1:E4C2z+7BxR7GHXp0hAY53mek+x49X1LjPNeMTfRGvOA=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/ordishs/go-bitcoin v1.0.86 h1:OuLnaOfzCe/dHFlCredPFSJKQLOQIuUuuJj/faPtJnE=
github.com/ordishs/go-bitcoin v1.0.86/go.mod h1:O3lqD8unDlwLXTmQTT4F5x/Gl3xgP4IgMQDFxTmi9V4=
github.com/ordishs/go-utils v1.0.51 h1:XgBphXkjoUxRdahzyNRpQ5NnB96ygkggIqqzX6ruaFo=
github.com/ordishs/go-utils v1.0.51/go.mod h1:AlHKaGdyFidMIzXcltV/dPtcfoHlhcJl42H4d482dh8=
github.com/ory/dockertest/v3 v3.11.0 h1:OiHcxKAvSDUwsEVh2BjxQQc/5EHz9n0va9awCtNGuyA=
github.com/ory/dockertest/v3 v3.11.0/go.mod h1:VIPxS1gwT9NpPOrfD3rACs8Y9Z7yhzO4SB194iUDnUI=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
Expand Down Expand Up @@ -335,7 +327,6 @@ github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKk
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand All @@ -348,10 +339,6 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o=
github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg=
github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
Expand Down
18 changes: 9 additions & 9 deletions internal/blocktx/integration_test/reorg_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestReorg(t *testing.T) {

const blockHash822011 = "bf9be09b345cc2d904b59951cc8a2ed452d8d143e2e25cde64058270fb3a667a"

//blockHash := testutils.RevChainhash(t, blockHash822011)
blockHash := testutils.RevChainhash(t, blockHash822011)
prevBlockHash := testutils.RevChainhash(t, "00000000000000000a00c377b260a3219b0c314763f486bc363df7aa7e22ad72")
txHash, err := chainhash.NewHashFromStr("be181e91217d5f802f695e52144078f8dfbe51b8a815c3d6fb48c0d853ec683b")
require.NoError(t, err)
Expand All @@ -114,7 +114,7 @@ func TestReorg(t *testing.T) {

// should become LONGEST
blockMessage := &bcnet.BlockMessage{
//Hash: blockHash,
Hash: blockHash,
Header: &wire.BlockHeader{
Version: 541065216,
PrevBlock: *prevBlockHash, // NON-existent in the db
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestReorg(t *testing.T) {
txhash822015Competing = "b16cea53fc823e146fbb9ae4ad3124f7c273f30562585ad6e4831495d609f430"
)

// blockHash := testutils.RevChainhash(t, blockHash822015Fork)
blockHash := testutils.RevChainhash(t, blockHash822015Fork)
prevBlockHash := testutils.RevChainhash(t, blockHash822014StartOfChain)
txHash := testutils.RevChainhash(t, txhash822015)
txHash2 := testutils.RevChainhash(t, txhash822015Competing) // should not be published - is already in the longest chain
Expand All @@ -157,7 +157,7 @@ func TestReorg(t *testing.T) {

// should become STALE
blockMessage := &bcnet.BlockMessage{
//Hash: blockHash,
Hash: blockHash,
Header: &wire.BlockHeader{
Version: 541065216,
PrevBlock: *prevBlockHash, // block with status LONGEST at height 822014
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestReorg(t *testing.T) {
// should become LONGEST
// reorg should happen
blockMessage := &bcnet.BlockMessage{
//Hash: blockHash,
Hash: blockHash,
Header: &wire.BlockHeader{
Version: 541065216,
PrevBlock: *prevhash, // block with status STALE at height 822015
Expand Down Expand Up @@ -294,14 +294,14 @@ func TestReorg(t *testing.T) {
blockHash822023Orphan = "0000000000000000082131979a4e25a5101912a5f8461e18f306d23e158161cd"
)

//blockHash := testutils.RevChainhash(t, blockHash822021)
blockHash := testutils.RevChainhash(t, blockHash822021)
txHash := testutils.RevChainhash(t, "de0753d9ce6f92e340843cbfdd11e58beff8c578956ecdec4c461b018a26b8a9")
merkleRoot := testutils.RevChainhash(t, "de0753d9ce6f92e340843cbfdd11e58beff8c578956ecdec4c461b018a26b8a9")
prevhash := testutils.RevChainhash(t, blockHash822020Orphan)

// should become STALE
blockMessage := &bcnet.BlockMessage{
//Hash: blockHash,
Hash: blockHash,
Header: &wire.BlockHeader{
Version: 541065216,
PrevBlock: *prevhash, // block with status ORPHANED at height 822020 - connected to STALE chain
Expand Down Expand Up @@ -366,15 +366,15 @@ func TestReorg(t *testing.T) {
txhash822017 = "ece2b7e40d98749c03c551b783420d6e3fdc3c958244bbf275437839585829a6"
)

//blockHash := testutils.RevChainhash(t, blockHash822021)
blockHash := testutils.RevChainhash(t, blockHash822021)
prevhash := testutils.RevChainhash(t, blockHash822020Orphan)
txHash := testutils.RevChainhash(t, "3e15f823a7de25c26ce9001d4814a6f0ebc915a1ca4f1ba9cfac720bd941c39c")
merkleRoot := testutils.RevChainhash(t, "3e15f823a7de25c26ce9001d4814a6f0ebc915a1ca4f1ba9cfac720bd941c39c")

// should become LONGEST
// reorg should happen
blockMessage := &bcnet.BlockMessage{
//Hash: blockHash,
Hash: blockHash,
Header: &wire.BlockHeader{
Version: 541065216,
PrevBlock: *prevhash, // block with status ORPHANED at height 822020 - connected to STALE chain
Expand Down
Loading

0 comments on commit 9a0966e

Please sign in to comment.