Skip to content

Commit

Permalink
add event subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
stackdump committed Jul 20, 2024
1 parent 6f2d117 commit 5d04664
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 8 deletions.
5 changes: 5 additions & 0 deletions internal/config/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
Host = "127.0.0.1" // Default host, can be overridden by HOST env var
NewRelicLicense string // New Relic License Key for monitoring, set via ldflags
NewRelicApp string // New Relic Application Name, set via ldflags
Address string // Address for the faucet, set via ldflags
)

func init() {
Expand All @@ -38,4 +39,8 @@ func init() {
if listenHost, hostSet := os.LookupEnv("HOST"); hostSet {
Host = listenHost
}
Address = os.Getenv("ADDRESS")
if Address == "" {
Address = "0x5fbdb2315678afecb367f032d93f642f64180aa3"
}
}
46 changes: 41 additions & 5 deletions internal/service/block_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,35 @@ import (
"context"
"database/sql"
"fmt"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/lib/pq"
"github.com/pflow-dev/pflow-eth/internal/config"
"github.com/pflow-dev/pflow-eth/metamodel"
"math/big"
"os"
"time"
)

const notify_channel = "node_sync_channel"

func (s *Service) subscribeToBlockchainEvents(sink chan *metamodel.MetamodelSignaledEvent) (event.Subscription, error) {
address := common.HexToAddress(config.Address)
m, err := metamodel.NewMetamodel(address, s.Client)
if err != nil {
panic(err)
}
opts := &bind.WatchOpts{Context: context.Background()}
scalars := []*big.Int{}
scalars = append(scalars, big.NewInt(0))
scalars = append(scalars, big.NewInt(2))
scalars = append(scalars, big.NewInt(1))

// REVIEW: can we listen to all events?
return m.WatchSignaledEvent(opts, sink, []uint8{0, 1}, []uint8{0, 1, 2, 3}, scalars)
}

func (s *Service) getNotificationListener() *pq.Listener {
_, err := sql.Open("postgres", config.DbConn)
if err != nil {
Expand Down Expand Up @@ -42,8 +63,22 @@ func (s *Service) runBlockIndexer(ctx context.Context, networks map[string]bool)
listener := s.getNotificationListener()
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
eventChannel := make(chan *metamodel.MetamodelSignaledEvent)

eventSub, subError := s.subscribeToBlockchainEvents(eventChannel)
if subError != nil {
fmt.Printf("event subscription disabled: %s", subError)
}
start := time.Now()

syncBlockchainNetworks := func() {
for schema, enabled := range networks {
if enabled {
s.syncNodeWithBlockchain(schema)
}
}
}

for {
select {
case <-ctx.Done():
Expand All @@ -52,6 +87,8 @@ func (s *Service) runBlockIndexer(ctx context.Context, networks map[string]bool)
"networks": networks,
"duration": time.Since(start).String(),
})
eventSub.Unsubscribe()
_ = listener.UnlistenAll()
os.Exit(0)
case notification := <-listener.Notify:
if schema := notification.Extra; networks[schema] == true {
Expand All @@ -67,11 +104,10 @@ func (s *Service) runBlockIndexer(ctx context.Context, networks map[string]bool)
fmt.Println("notification from unknown schema: " + schema)
}
case <-ticker.C:
for schema, enabled := range networks {
if enabled {
s.syncNodeWithBlockchain(schema)
}
}
syncBlockchainNetworks()
case evt := <-eventChannel:
fmt.Printf("Received event: %v\n", evt)
syncBlockchainNetworks()
}
}
}
2 changes: 1 addition & 1 deletion internal/service/context_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type ModelApiResponse struct {
Error string `json:"error,omitempty"`
}

func (s Service) ContextHandler(w http.ResponseWriter, r *http.Request) {
func (s *Service) ContextHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")

queryValues := r.URL.Query()
Expand Down
2 changes: 0 additions & 2 deletions internal/service/raw_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ func (s *Service) RawTransactionHandler(w http.ResponseWriter, r *http.Request)
return
}

// TODO: record the transaction in the database

response := map[string]string{"txHash": txHash}
json.NewEncoder(w).Encode(response)
}

0 comments on commit 5d04664

Please sign in to comment.