Skip to content

Commit

Permalink
load model from contract or server
Browse files Browse the repository at this point in the history
  • Loading branch information
stackdump committed Jul 22, 2024
1 parent 5d04664 commit f413bc7
Show file tree
Hide file tree
Showing 36 changed files with 615 additions and 535 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ go-build:

.PHONY: archive
archive:
git archive --format=zip --output=pflow.$$(date -I).zip main
git archive --format=zip --output=pflow-eth.$$(date -I).zip main

.PHONY: generate
generate:
Expand Down
12 changes: 4 additions & 8 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
WIP
---
This repo is currently a testbed for working with multiple streams of contract events
TODO: experiment with multiple streams of contract events & subscribe/store

BACKLOG
-------
- [ ] finish unpacking the petri-net model from (on-chain call or server)
- [ ] use model definition to populate UI labels for known values

- [ ] upgrade web2/web3 when wallet is connected - send signal command from wallet when connected instead of API
- [ ] make 'transaction builder' using the petri-net GUI
- [ ] launch MVP: sepolia-optimism.pflow.xyz

ICEBOX
-------
- [ ] experiment with multiple streams of contract events & subscribe/store
- [ ] when viewing model history let left/right arrows show what happened in each frame
- [ ] (UX) make status / green / ok compare w/ timeclock
- [ ] consider optimizing sync by monitoring the sequence value in the contract
- [ ] support monitoring multiple contracts

DONE
----
- [x] make faucet route work
- [x] add control panel for developers to reset/restart/update config
- [x] Connect Wallet button (use ethers - since it seems typechain generates with this)
- [x] include the network id in the status bar
- [x] REVIEW: A: (no) should we build a top-level callback system? to trigger gui refresh
? should this object still be called Metamodel
- [x] enhance the pflowDSL so metamodel declaration looks more like solidity
2 changes: 1 addition & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
restart: on-failure
volumes:
- ./volumes/db/data:/var/lib/postgresql/data:Z
app:
dapp:
build:
context: ../
dockerfile: Dockerfile
Expand Down
5 changes: 3 additions & 2 deletions internal/config/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (

// Global variables, set at build time with ldflags or dynamically via environment variables.
var (
JsBuild = "51c9bf03" // Update to match the path ./public/p/static/js/main.<JsBuild>.js
CssBuild = "16b9238e" // Update to match the path ./public/p/static/css/main.<CssBuild>.css
JsBuild = "41682e86" // Update to match the path ./public/p/static/js/main.<JsBuild>.js
CssBuild = "1b3b3d9c" // Update to match the path ./public/p/static/css/main.<CssBuild>.css
Endpoint = "http://hardhat:8545" // Default endpoint, can be overridden by ENDPOINT env var
DbConn = "dbname=pflow user=pflow password=pflow sslmode=disable host=db" // Default DB connection string, can be overridden by DB_HOST env var
Port = "8083" // Default port, can be overridden by PORT env var
Expand All @@ -28,6 +28,7 @@ func init() {
// Override Endpoint if ENDPOINT environment variable is set.
if endpoint := os.Getenv("ENDPOINT"); endpoint != "" {
Endpoint = endpoint
fmt.Printf("Endpoint set to %s\n", endpoint)
}

// Override Port if PORT environment variable is set.
Expand Down
96 changes: 74 additions & 22 deletions internal/service/block_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,39 @@ import (
"fmt"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/lib/pq"
"github.com/pflow-dev/pflow-eth/internal/config"
"github.com/pflow-dev/pflow-eth/metamodel"
"math/big"
"os"
"strings"
"time"
)

const notify_channel = "node_sync_channel"

func convertToWebSocketURL(url string) string {
if strings.HasPrefix(url, "http://") {
return strings.Replace(url, "http://", "ws://", 1)
} else if strings.HasPrefix(url, "https://") {
return strings.Replace(url, "https://", "wss://", 1)
}
return url
}

func (s *Service) subscribeToBlockchainEvents(sink chan *metamodel.MetamodelSignaledEvent) (event.Subscription, error) {
address := common.HexToAddress(config.Address)
m, err := metamodel.NewMetamodel(address, s.Client)
client, dialErr := ethclient.Dial(convertToWebSocketURL(config.Endpoint))
if dialErr != nil {
return nil, dialErr
}
m, err := metamodel.NewMetamodel(address, client)
if err != nil {
panic(err)
}
opts := &bind.WatchOpts{Context: context.Background()}
scalars := []*big.Int{}
scalars = append(scalars, big.NewInt(0))
scalars = append(scalars, big.NewInt(2))
scalars = append(scalars, big.NewInt(1))

// REVIEW: can we listen to all events?
return m.WatchSignaledEvent(opts, sink, []uint8{0, 1}, []uint8{0, 1, 2, 3}, scalars)
return m.WatchSignaledEvent(opts, sink, nil, nil, nil)
}

func (s *Service) getNotificationListener() *pq.Listener {
Expand All @@ -55,6 +63,19 @@ func (s *Service) getNotificationListener() *pq.Listener {
return listener
}

func (s *Service) SignalEvent(evt *metamodel.MetamodelSignaledEvent) {
next := make(map[string]interface{})
next["sequence"] = evt.Sequence.Int64()
next["role"] = evt.Role
next["action"] = evt.ActionId
next["scalar"] = evt.Scalar.Int64()

next["address"] = evt.Raw.Address
next["block"] = evt.Raw.BlockNumber
next["tx"] = evt.Raw.TxHash
s.Event("signaled_event", next)
}

func (s *Service) runBlockIndexer(ctx context.Context, networks map[string]bool) {
s.Event("block_indexer_started", map[string]interface{}{
"pid": os.Getpid(),
Expand All @@ -67,17 +88,40 @@ func (s *Service) runBlockIndexer(ctx context.Context, networks map[string]bool)

eventSub, subError := s.subscribeToBlockchainEvents(eventChannel)
if subError != nil {
fmt.Printf("event subscription disabled: %s", subError)
fmt.Printf("event_subscription disabled: %s\n", subError)
} else {
fmt.Printf("event_subscription: enabled\n")
}
start := time.Now()
highestSeq := int64(-1)

syncBlockchainNetworks := func() {
syncByBlocks := func() {
for schema, enabled := range networks {
if enabled {
s.syncNodeWithBlockchain(schema)
}
}
}
refreshDataView := func() {
for schema, enabled := range networks {
if enabled {
s.refreshDataView(schema)
}
}
}

insertBlock := func(address common.Address, blockNumber uint64) {
for schema, enabled := range networks {
if enabled {
// FIXME: also check if the address belongs to the schema
s.setSearchPath(schema)
if s.InsertBlockNumber(blockNumber) {
s.addressHeights[address] = blockNumber
refreshDataView()
}
}
}
}

for {
select {
Expand All @@ -92,22 +136,30 @@ func (s *Service) runBlockIndexer(ctx context.Context, networks map[string]bool)
os.Exit(0)
case notification := <-listener.Notify:
if schema := notification.Extra; networks[schema] == true {
fmt.Println("refresh notification from schema: " + schema)
s.confirmSentTransactions(schema)
for {
if !s.importNextTransaction(schema) {
break
}
}
s.computeTransactionStates(schema)
} else {
fmt.Println("notification from unknown schema: " + schema)
panic("notification from unknown schema: " + schema)
}
case <-ticker.C:
syncBlockchainNetworks()
onChainSequence := s.getContractSequence()
if highestSeq < 0 {
highestSeq = s.getHightestSequence()
}
if highestSeq != onChainSequence {
syncByBlocks()
}
case evt := <-eventChannel:
fmt.Printf("Received event: %v\n", evt)
syncBlockchainNetworks()
if highestSeq < 0 { // initialize from db
highestSeq = s.getHightestSequence()
}
insertBlock(evt.Raw.Address, evt.Raw.BlockNumber)
s.SignalEvent(evt)
if highestSeq+1 == evt.Sequence.Int64() {
highestSeq = evt.Sequence.Int64()
} else {
highestSeq = -1
syncByBlocks()
}
}
}
}
4 changes: 2 additions & 2 deletions internal/service/control_panel_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (s *Service) ResetDb() bool {
return err == nil
}

func (s *Service) InsertBlockNumber(blockNumber int) bool {
func (s *Service) InsertBlockNumber(blockNumber uint64) bool {
stmt, err := s.NodeDb.Prepare("INSERT INTO block_numbers (block_number) VALUES ($1)")
if err != nil {
return false
Expand All @@ -26,7 +26,7 @@ func (s *Service) InsertBlockNumber(blockNumber int) bool {
return err == nil
}

func (s Service) ControlPanelHandler(w http.ResponseWriter, r *http.Request) {
func (s *Service) ControlPanelHandler(w http.ResponseWriter, r *http.Request) {
if s.getNetwork(r.Host) != "hardhat" {
respondWithError(w, http.StatusBadRequest, "ControlPanel is only available on hardhat network")
return
Expand Down
4 changes: 3 additions & 1 deletion internal/service/ping_status_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func getBuild() string {
hexString[0:8], hexString[8:12], hexString[12:16], hexString[16:20], hexString[20:32])
}

func (s Service) SyncStatsHandler(w http.ResponseWriter, r *http.Request) {
func (s *Service) SyncStatsHandler(w http.ResponseWriter, r *http.Request) {
s.setSearchPathForRequest(r)

query := `SELECT sync_data FROM node_sync_data_view LIMIT 1;`
Expand Down Expand Up @@ -45,5 +45,7 @@ func (s Service) SyncStatsHandler(w http.ResponseWriter, r *http.Request) {
return
}
status["build"] = getBuild()
// add addressHeights
status["height"] = s.addressHeights
err = json.NewEncoder(w).Encode(status)
}
Loading

0 comments on commit f413bc7

Please sign in to comment.