Skip to content

Commit

Permalink
blocks: use txn hash not block hash
Browse files Browse the repository at this point in the history
  • Loading branch information
iquidus committed Dec 5, 2023
1 parent f5fe5a6 commit 2cada80
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 57 deletions.
54 changes: 27 additions & 27 deletions common/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (b *RawBlock) Convert(rpcClient RPCClient) (Block, error) {
var logs []Log
for i, txn := range b.Transactions {
// get transaction receipts
receipt, err := rpcClient.GetTransactionReceipt(b.Hash)
receipt, err := rpcClient.GetTransactionReceipt(txn.Hash)
if err != nil {
return Block{}, err
}
Expand All @@ -44,14 +44,14 @@ func (b *RawBlock) Convert(rpcClient RPCClient) (Block, error) {

// get logs
for _, log := range receipt.Logs {
logs[len(logs)] = log.Convert(txns[i])
logs = append(logs, log.Convert(txns[i]))
}
}

return Block{
Number: util.DecodeHex(b.Number),
Timestamp: util.DecodeHex(b.Timestamp),
// Transactions: txns,
Number: util.DecodeHex(b.Number),
Timestamp: util.DecodeHex(b.Timestamp),
Transactions: txns,
Hash: b.Hash,
ParentHash: b.ParentHash,
BaseFeePerGas: baseFeePerGas,
Expand All @@ -75,28 +75,28 @@ func (b *RawBlock) Convert(rpcClient RPCClient) (Block, error) {
}

type Block struct {
Number uint64 `bson:"number" json:"number"`
Timestamp uint64 `bson:"timestamp" json:"timestamp"`
Hash string `bson:"hash" json:"hash"`
ParentHash string `bson:"parentHash" json:"parentHash"`
// Transactions []Transaction `bson:"transactions" json:"transactions,omitempty"`
BaseFeePerGas string `bson:"baseFeePerGas" json:"baseFeePerGas,omitempty"`
GasUsed uint64 `bson:"gasUsed" json:"gasUsed,omitempty"`
GasLimit uint64 `bson:"gasLimit" json:"gasLimit,omitempty"`
MixHash string `bson:"mixHash" json:"mixHash,omitempty"`
StateRoot string `bson:"stateRoot" json:"stateRoot,omitempty"`
TotalDifficulty string `bson:"totalDifficulty" json:"totalDifficulty,omitempty"`
Sha3Uncles string `bson:"sha3Uncles" json:"sha3Uncles,omitempty"`
Miner string `bson:"miner" json:"miner,omitempty"`
Difficulty string `bson:"difficulty" json:"difficulty,omitempty"`
Nonce string `bson:"nonce" json:"nonce,omitempty"`
TransactionCount uint64 `bson:"transactionCount" json:"transactionCount,omitempty"`
TransactionsRoot string `bson:"transactionsRoot" json:"transactionsRoot,omitempty"`
ReceiptsRoot string `bson:"receiptsRoot" json:"receiptsRoot,omitempty"`
LogsBloom string `bson:"logsBloom" json:"logsBloom,omitempty"`
ExtraData string `bson:"extraData" json:"extraData,omitempty"`
Uncles []string `bson:"uncles" json:"uncles,omitempty"`
Logs []Log `bson:"logs" json:"logs,omitempty"`
Number uint64 `bson:"number" json:"number"`
Timestamp uint64 `bson:"timestamp" json:"timestamp"`
Hash string `bson:"hash" json:"hash"`
ParentHash string `bson:"parentHash" json:"parentHash"`
Transactions []Transaction `bson:"transactions" json:"transactions,omitempty"`
BaseFeePerGas string `bson:"baseFeePerGas" json:"baseFeePerGas,omitempty"`
GasUsed uint64 `bson:"gasUsed" json:"gasUsed,omitempty"`
GasLimit uint64 `bson:"gasLimit" json:"gasLimit,omitempty"`
MixHash string `bson:"mixHash" json:"mixHash,omitempty"`
StateRoot string `bson:"stateRoot" json:"stateRoot,omitempty"`
TotalDifficulty string `bson:"totalDifficulty" json:"totalDifficulty,omitempty"`
Sha3Uncles string `bson:"sha3Uncles" json:"sha3Uncles,omitempty"`
Miner string `bson:"miner" json:"miner,omitempty"`
Difficulty string `bson:"difficulty" json:"difficulty,omitempty"`
Nonce string `bson:"nonce" json:"nonce,omitempty"`
TransactionCount uint64 `bson:"transactionCount" json:"transactionCount,omitempty"`
TransactionsRoot string `bson:"transactionsRoot" json:"transactionsRoot,omitempty"`
ReceiptsRoot string `bson:"receiptsRoot" json:"receiptsRoot,omitempty"`
LogsBloom string `bson:"logsBloom" json:"logsBloom,omitempty"`
ExtraData string `bson:"extraData" json:"extraData,omitempty"`
Uncles []string `bson:"uncles" json:"uncles,omitempty"`
Logs []Log `bson:"logs" json:"logs,omitempty"`
}

// TODO(iquidus): write a compact function for Block
Expand Down
34 changes: 7 additions & 27 deletions crawler/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (c *Crawler) reorg() error {
for i := len(sidechain) - 1; i >= 0; i-- {
c.cache.Push(sidechain[i])
c.logger.Info("Adding remote block", "number", sidechain[i].Number, "hash", sidechain[i].Hash)
err := c.sendBlockMessage(dropped[i])
err := c.sendBlockMessage(&dropped[i])
if err != nil {
return errors.New("Failed to send reorg hook: " + err.Error())
}
Expand Down Expand Up @@ -230,14 +230,14 @@ Logs:
return ret
}

func (c *Crawler) sendBlockMessage(block common.Block) error {
func (c *Crawler) sendBlockMessage(block *common.Block) error {
for _, ktopic := range c.cfg.Kafka.Params {
nb := block
filteredLogs := filterLogs(nb.Logs, ktopic.Addresses, ktopic.Topics)
nb.Logs = filteredLogs
// nb := block
// filteredLogs := filterLogs(nb.Logs, ktopic.Addresses, ktopic.Topics)
// nb.Logs = filteredLogs
var bp = kafka.Payload{
Status: "ACCEPTED",
Block: nb,
Block: *block,
Version: 1,
}
payload, err := json.Marshal(bp)
Expand Down Expand Up @@ -303,31 +303,11 @@ func (c *Crawler) syncBlock(block common.Block, task *syncronizer.Task) {
}

// handle block hook here
err = c.sendBlockMessage(block)
err = c.sendBlockMessage(&block)
if err != nil {
c.logger.Error("Failed to send block hook", "err", err)
}

// TODO(iquidus): Running a getlogs with each filter is expensive
// migrate to a single getlogs call (or derive from block itself), and filter locally

// handle getlogs requests
// logopts := c.cfg.Kafka.Events
// for i := 0; i < len(logopts); i++ {
// logs, err := c.rpc.GetLogs(logopts[i].Addresses, block.Hash, logopts[i].Topics)
// if err != nil {
// c.logger.Error("Failed to get logs", "err", err)
// return
// }
// if len(logs) > 0 {
// // handle webhook here
// err = c.sendEventsMessage(logs, logopts[i].Topic)
// if err != nil {
// c.logger.Error("Failed to send getlogs hook", "err", err)
// }
// }
// }

// write block to state
err = c.state.Update(block)
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions kafka/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kafka

import (
"context"
"log"

"github.com/segmentio/kafka-go"
)
Expand Down Expand Up @@ -33,7 +32,6 @@ func (k *Reader) FetchMessage(ctx context.Context, messages chan<- kafka.Message
case <-ctx.Done():
return ctx.Err()
case messages <- message:
log.Printf("message fetched and sent to channel: %v \n", string(message.Value))
}
}
}
Expand All @@ -47,7 +45,6 @@ func (k *Reader) CommitMessages(ctx context.Context, commits <-chan kafka.Messag
if err != nil {
return err
}
log.Printf("committed a msg: %v \n", string(message.Value))
}
}
}

0 comments on commit 2cada80

Please sign in to comment.