Skip to content

Commit

Permalink
Add checks to validate referential integrity in the data (#7)
Browse files Browse the repository at this point in the history
* Add checks to validate referential integrity

* Add unit tests for referential integrity validation

* Use EXISTS in referential integrity validation queries
  • Loading branch information
prathamesh0 authored Jun 1, 2022
1 parent effbdc3 commit cc935dc
Show file tree
Hide file tree
Showing 7 changed files with 707 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ integrationtest: | $(GINKGO) $(GOOSE)
test: | $(GINKGO) $(GOOSE)
go vet ./...
go fmt ./...
$(GINKGO) -r validator_test/ -v
$(GINKGO) -r pkg/validator/ validator_test/ -v

build:
go fmt ./...
Expand Down
224 changes: 224 additions & 0 deletions pkg/validator/ref_integrity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package validator

import (
"fmt"

"github.com/jmoiron/sqlx"
)

// ValidateReferentialIntegrity validates referential integrity at the given height
func ValidateReferentialIntegrity(db *sqlx.DB, blockNumber uint64) error {

err := ValidateHeaderCIDsRef(db, blockNumber)
if err != nil {
return err
}

err = ValidateUncleCIDsRef(db, blockNumber)
if err != nil {
return err
}

err = ValidateTransactionCIDsRef(db, blockNumber)
if err != nil {
return err
}

err = ValidateReceiptCIDsRef(db, blockNumber)
if err != nil {
return err
}

err = ValidateStateCIDsRef(db, blockNumber)
if err != nil {
return err
}

err = ValidateStorageCIDsRef(db, blockNumber)
if err != nil {
return err
}

err = ValidateStateAccountsRef(db, blockNumber)
if err != nil {
return err
}

err = ValidateAccessListElementsRef(db, blockNumber)
if err != nil {
return err
}

err = ValidateLogCIDsRef(db, blockNumber)
if err != nil {
return err
}

return nil
}

// ValidateHeaderCIDsRef does a reference integrity check on references in eth.header_cids table
func ValidateHeaderCIDsRef(db *sqlx.DB, blockNumber uint64) error {
err := ValidateIPFSBlocks(db, blockNumber, "eth.header_cids", "mh_key")
if err != nil {
return err
}

return nil
}

// ValidateUncleCIDsRef does a reference integrity check on references in eth.uncle_cids table
func ValidateUncleCIDsRef(db *sqlx.DB, blockNumber uint64) error {
var exists bool
err := db.Get(&exists, UncleCIDsRefHeaderCIDs, blockNumber)
if err != nil {
return err
}
if exists {
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.header_cids")
}

err = ValidateIPFSBlocks(db, blockNumber, "eth.uncle_cids", "mh_key")
if err != nil {
return err
}

return nil
}

// ValidateTransactionCIDsRef does a reference integrity check on references in eth.header_cids table
func ValidateTransactionCIDsRef(db *sqlx.DB, blockNumber uint64) error {
var exists bool
err := db.Get(&exists, TransactionCIDsRefHeaderCIDs, blockNumber)
if err != nil {
return err
}
if exists {
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.header_cids")
}

err = ValidateIPFSBlocks(db, blockNumber, "eth.transaction_cids", "mh_key")
if err != nil {
return err
}

return nil
}

// ValidateReceiptCIDsRef does a reference integrity check on references in eth.receipt_cids table
func ValidateReceiptCIDsRef(db *sqlx.DB, blockNumber uint64) error {
var exists bool
err := db.Get(&exists, ReceiptCIDsRefTransactionCIDs, blockNumber)
if err != nil {
return err
}
if exists {
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.transaction_cids")
}

err = ValidateIPFSBlocks(db, blockNumber, "eth.receipt_cids", "leaf_mh_key")
if err != nil {
return err
}

return nil
}

// ValidateStateCIDsRef does a reference integrity check on references in eth.state_cids table
func ValidateStateCIDsRef(db *sqlx.DB, blockNumber uint64) error {
var exists bool
err := db.Get(&exists, StateCIDsRefHeaderCIDs, blockNumber)
if err != nil {
return err
}
if exists {
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.header_cids")
}

err = ValidateIPFSBlocks(db, blockNumber, "eth.state_cids", "mh_key")
if err != nil {
return err
}

return nil
}

// ValidateStorageCIDsRef does a reference integrity check on references in eth.storage_cids table
func ValidateStorageCIDsRef(db *sqlx.DB, blockNumber uint64) error {
var exists bool
err := db.Get(&exists, StorageCIDsRefStateCIDs, blockNumber)
if err != nil {
return err
}
if exists {
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.state_cids")
}

err = ValidateIPFSBlocks(db, blockNumber, "eth.storage_cids", "mh_key")
if err != nil {
return err
}

return nil
}

// ValidateStateAccountsRef does a reference integrity check on references in eth.state_accounts table
func ValidateStateAccountsRef(db *sqlx.DB, blockNumber uint64) error {
var exists bool
err := db.Get(&exists, StateAccountsRefStateCIDs, blockNumber)
if err != nil {
return err
}
if exists {
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.state_cids")
}

return nil
}

// ValidateAccessListElementsRef does a reference integrity check on references in eth.access_list_elements table
func ValidateAccessListElementsRef(db *sqlx.DB, blockNumber uint64) error {
var exists bool
err := db.Get(&exists, AccessListElementsRefTransactionCIDs, blockNumber)
if err != nil {
return err
}
if exists {
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.transaction_cids")
}

return nil
}

// ValidateLogCIDsRef does a reference integrity check on references in eth.log_cids table
func ValidateLogCIDsRef(db *sqlx.DB, blockNumber uint64) error {
var exists bool
err := db.Get(&exists, LogCIDsRefReceiptCIDs, blockNumber)
if err != nil {
return err
}
if exists {
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.receipt_cids")
}

err = ValidateIPFSBlocks(db, blockNumber, "eth.log_cids", "leaf_mh_key")
if err != nil {
return err
}

return nil
}

// ValidateIPFSBlocks does a reference integrity check between the given CID table and IPFS blocks table on MHKey and block number
func ValidateIPFSBlocks(db *sqlx.DB, blockNumber uint64, CIDTable string, mhKeyField string) error {
var exists bool
err := db.Get(&exists, fmt.Sprintf(CIDsRefIPLDBlocks, CIDTable, mhKeyField), blockNumber)
if err != nil {
return err
}
if exists {
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "public.blocks")
}

return nil
}
119 changes: 119 additions & 0 deletions pkg/validator/ref_integrity_queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package validator

// Queries to validate referential integrity in the indexed data:
// At the given block number,
// In each table, for each (would be) foreign key reference, perform left join with the referenced table on the foreign key fields.
// Select rows where there are no matching rows in the referenced table.
// If any such rows exist, there are missing entries in the referenced table.

const (
CIDsRefIPLDBlocks = `SELECT EXISTS (
SELECT *
FROM %[1]s
LEFT JOIN public.blocks ON (
%[1]s.%[2]s = blocks.key
AND %[1]s.block_number = blocks.block_number
)
WHERE
%[1]s.block_number = $1
AND blocks.key IS NULL
)`

UncleCIDsRefHeaderCIDs = `SELECT EXISTS (
SELECT *
FROM eth.uncle_cids
LEFT JOIN eth.header_cids ON (
uncle_cids.header_id = header_cids.block_hash
AND uncle_cids.block_number = header_cids.block_number
)
WHERE
uncle_cids.block_number = $1
AND header_cids.block_hash IS NULL
)`

TransactionCIDsRefHeaderCIDs = `SELECT EXISTS (
SELECT *
FROM eth.transaction_cids
LEFT JOIN eth.header_cids ON (
transaction_cids.header_id = header_cids.block_hash
AND transaction_cids.block_number = header_cids.block_number
)
WHERE
transaction_cids.block_number = $1
AND header_cids.block_hash IS NULL
)`

ReceiptCIDsRefTransactionCIDs = `SELECT EXISTS (
SELECT *
FROM eth.receipt_cids
LEFT JOIN eth.transaction_cids ON (
receipt_cids.tx_id = transaction_cids.tx_hash
AND receipt_cids.block_number = transaction_cids.block_number
)
WHERE
receipt_cids.block_number = $1
AND transaction_cids.tx_hash IS NULL
)`

StateCIDsRefHeaderCIDs = `SELECT EXISTS (
SELECT *
FROM eth.state_cids
LEFT JOIN eth.header_cids ON (
state_cids.header_id = header_cids.block_hash
AND state_cids.block_number = header_cids.block_number
)
WHERE
state_cids.block_number = $1
AND header_cids.block_hash IS NULL
)`

StorageCIDsRefStateCIDs = `SELECT EXISTS (
SELECT *
FROM eth.storage_cids
LEFT JOIN eth.state_cids ON (
storage_cids.state_path = state_cids.state_path
AND storage_cids.header_id = state_cids.header_id
AND storage_cids.block_number = state_cids.block_number
)
WHERE
storage_cids.block_number = $1
AND state_cids.state_path IS NULL
)`

StateAccountsRefStateCIDs = `SELECT EXISTS (
SELECT *
FROM eth.state_accounts
LEFT JOIN eth.state_cids ON (
state_accounts.state_path = state_cids.state_path
AND state_accounts.header_id = state_cids.header_id
AND state_accounts.block_number = state_cids.block_number
)
WHERE
state_accounts.block_number = $1
AND state_cids.state_path IS NULL
)`

AccessListElementsRefTransactionCIDs = `SELECT EXISTS (
SELECT *
FROM eth.access_list_elements
LEFT JOIN eth.transaction_cids ON (
access_list_elements.tx_id = transaction_cids.tx_hash
AND access_list_elements.block_number = transaction_cids.block_number
)
WHERE
access_list_elements.block_number = $1
AND transaction_cids.tx_hash IS NULL
)`

LogCIDsRefReceiptCIDs = `SELECT EXISTS (
SELECT *
FROM eth.log_cids
LEFT JOIN eth.receipt_cids ON (
log_cids.rct_id = receipt_cids.tx_id
AND log_cids.block_number = receipt_cids.block_number
)
WHERE
log_cids.block_number = $1
AND receipt_cids.tx_id IS NULL
)`
)
Loading

0 comments on commit cc935dc

Please sign in to comment.