Skip to content

Commit

Permalink
(BIDS-2634) reordered transactions, fix almost-empty table (#2663)
Browse files Browse the repository at this point in the history
* (BIDS-2634) reordered transactions, fix almost-empty table

* (BIDS-2634) reverse sorting for /address tables

* (BIDS-2634) workaround: sort misplaced reversePaddedIndex 0 into correct position

* (BIDS-2634) handle trace position 0

* (BIDS-2634) changed comment

* (BIDS-2634) return all index keys

* (BIDS-2634) using constant (i)tx limits

* (BIDS-2634) fixed last key handling

* (BIDS-2634) adjusted error message
  • Loading branch information
remoterami committed Feb 6, 2024
1 parent ed188e4 commit 9d21f02
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 13 deletions.
173 changes: 164 additions & 9 deletions db/bigtable_eth1.go
Original file line number Diff line number Diff line change
Expand Up @@ -1941,6 +1941,138 @@ func (bigtable *Bigtable) TransformWithdrawals(block *types.Eth1Block, cache *fr
return bulkData, bulkMetadataUpdates, nil
}

type IndexKeys struct {
indexes []string
keys []string
}

type SortByIndexes IndexKeys

func (sbi SortByIndexes) Len() int {
return len(sbi.indexes)
}

func (sbi SortByIndexes) Swap(i, j int) {
sbi.indexes[i], sbi.indexes[j] = sbi.indexes[j], sbi.indexes[i]
sbi.keys[i], sbi.keys[j] = sbi.keys[j], sbi.keys[i]
}

func (sbi SortByIndexes) Less(i, j int) bool {
i_splits := strings.Split(sbi.indexes[i], ":")
j_splits := strings.Split(sbi.indexes[j], ":")
if len(i_splits) != len(j_splits) || len(i_splits) < 7 {
utils.LogError(nil, "unexpected bigtable transaction indices", 0, map[string]interface{}{"index_i": sbi.indexes[i], "index_j": sbi.indexes[j]})
return false
}

// block
if i_splits[5] != j_splits[5] {
return i_splits[5] < j_splits[5]
}
// tx idx
if i_splits[6] != j_splits[6] {
if i_splits[6] == strconv.Itoa(TX_PER_BLOCK_LIMIT) || j_splits[6] == strconv.Itoa(TX_PER_BLOCK_LIMIT) {
return j_splits[6] == strconv.Itoa(TX_PER_BLOCK_LIMIT)
}
return i_splits[6] < j_splits[6]
}
// itx idx
if len(i_splits) > 7 && i_splits[7] != j_splits[7] {
if i_splits[7] == strconv.Itoa(ITX_PER_TX_LIMIT) || j_splits[7] == strconv.Itoa(ITX_PER_TX_LIMIT) {
return j_splits[7] == strconv.Itoa(ITX_PER_TX_LIMIT)
}
return i_splits[7] < j_splits[7]
}
// shouldn't happen, this means we've the same key twice
utils.LogError(nil, "unexpected bigtable transaction indices", 0, map[string]interface{}{"index_i": sbi.indexes[i], "index_j": sbi.indexes[j]})
return false
}

func (bigtable *Bigtable) rearrangeReversePaddedIndexZero(ctx context.Context, indexes, keys []string) ([]string, []string) {
if len(indexes) < 2 {
return indexes, keys
}

// first find out if we've a (sub)transaction with index 0 whose block/transaction has maybe not been completed by the request
// if we find one, make sure we do complete the request by querying the remainder from bigtable. So we won't miss that (i)tx next time (=> ignoring the query limit)
for i := 0; i < len(indexes); i++ {
splits := strings.Split(indexes[i], ":")
if len(splits) < 7 {
utils.LogError(nil, "unexpected bigtable transaction index", 0, map[string]interface{}{"index": indexes[i]})
continue
}

if splits[6] != strconv.Itoa(TX_PER_BLOCK_LIMIT) && len(splits) > 7 && splits[7] != strconv.Itoa(ITX_PER_TX_LIMIT) {
continue
}
// check if results list all following (i)txs already
for i++; i < len(indexes); i++ {
next_splits := strings.Split(indexes[i], ":")
if len(next_splits) < 7 {
utils.LogError(nil, "unexpected bigtable transaction index", 0, map[string]interface{}{"index": indexes[i]})
continue
}
if next_splits[5] != splits[5] || (len(splits) > 7 && next_splits[6] != splits[6]) {
// next block/tx
break
}
}
if i == len(indexes) {
// block/tx maybe isn't fully included in results, request all missing entries (ignoring the query limit)
i, err := strconv.Atoi(splits[5])
if err != nil {
utils.LogError(err, "error converting bigtable transaction index timestamp", 0, map[string]interface{}{"index": splits[5]})
continue
}
splits[5] = fmt.Sprintf("%d", i+1)
rowRange := gcp_bigtable.NewRange(indexes[len(indexes)-1]+"\x00", strings.Join(splits[:6], ":"))
err = bigtable.tableData.ReadRows(ctx, rowRange, func(row gcp_bigtable.Row) bool {
keys = append(keys, strings.TrimPrefix(row[DEFAULT_FAMILY][0].Column, "f:"))
indexes = append(indexes, row.Key())
return true
})
if err != nil {
utils.LogError(err, "error reading from bigtable", 0)
}
break
}
i--
}

// sort
ik := IndexKeys{indexes, keys}
sort.Sort(SortByIndexes(ik))

return ik.indexes, ik.keys
}

func skipBlockIfLastTxIndex(key string) string {
splits := strings.Split(key, ":")
if len(splits) < 7 {
utils.LogError(nil, "unexpected bigtable transaction index", 0, map[string]interface{}{"index": key})
return key
}
if len(splits) == 8 && splits[7] == strconv.Itoa(ITX_PER_TX_LIMIT) && splits[6] != strconv.Itoa(TX_PER_BLOCK_LIMIT) {
i, err := strconv.Atoi(splits[6])
if err != nil {
utils.LogError(err, "error converting bigtable transaction index", 0, map[string]interface{}{"index": splits[6]})
} else {
splits[6] = fmt.Sprintf("%d", i+1)
}
splits = splits[:7]
}
if splits[6] == strconv.Itoa(TX_PER_BLOCK_LIMIT) {
i, err := strconv.Atoi(splits[5])
if err != nil {
utils.LogError(err, "error converting bigtable transaction index timestamp", 0, map[string]interface{}{"index": splits[5]})
} else {
splits[5] = fmt.Sprintf("%d", i+1)
return strings.Join(splits[:6], ":") + ":"
}
}
return key
}

func (bigtable *Bigtable) GetEth1TxsForAddress(prefix string, limit int64) ([]*types.Eth1TransactionIndexed, []string, error) {

tmr := time.AfterFunc(REPORT_TIMEOUT, func() {
Expand Down Expand Up @@ -1974,6 +2106,8 @@ func (bigtable *Bigtable) GetEth1TxsForAddress(prefix string, limit int64) ([]*t
return data, nil, nil
}

indexes, keys = bigtable.rearrangeReversePaddedIndexZero(ctx, indexes, keys)

err = bigtable.tableData.ReadRows(ctx, gcp_bigtable.RowList(keys), func(row gcp_bigtable.Row) bool {
b := &types.Eth1TransactionIndexed{}
err := proto.Unmarshal(row[DEFAULT_FAMILY][0].Value, b)
Expand All @@ -1986,7 +2120,7 @@ func (bigtable *Bigtable) GetEth1TxsForAddress(prefix string, limit int64) ([]*t
return true
})
if err != nil {
logger.WithError(err).WithField("prefix", prefix).WithField("limit", limit).Errorf("error reading rows in bigtable_eth1 / GetEth1TxForAddress")
logger.WithError(err).WithField("prefix", prefix).WithField("limit", limit).Errorf("error reading rows in bigtable_eth1 / GetEth1TxsForAddress")
return nil, nil, err
}

Expand Down Expand Up @@ -2104,6 +2238,10 @@ func (bigtable *Bigtable) GetAddressTransactionsTableData(address []byte, pageTo
if err != nil {
return nil, fmt.Errorf("error parsing Eth1InternalTransactionIndexed tx index: %v", err)
}
if tx_idx == TX_PER_BLOCK_LIMIT {
// handle "reversePaddedIndex 0"-issue
tx_idx = 0
}
tx_idx = TX_PER_BLOCK_LIMIT - tx_idx
if tx_idx < 0 {
return nil, fmt.Errorf("invalid Eth1InternalTransactionIndexed tx index: %d", tx_idx)
Expand Down Expand Up @@ -2151,7 +2289,7 @@ func (bigtable *Bigtable) GetAddressTransactionsTableData(address []byte, pageTo

token := ""
if len(keys) > 0 {
token = keys[len(keys)-1]
token = skipBlockIfLastTxIndex(keys[len(keys)-1])
}

data := &types.DataTableResponse{
Expand Down Expand Up @@ -2386,6 +2524,8 @@ func (bigtable *Bigtable) GetEth1BtxForAddress(prefix string, limit int64) ([]*t
return data, "", nil
}

indexes, keys = bigtable.rearrangeReversePaddedIndexZero(ctx, indexes, keys)

err = bigtable.tableData.ReadRows(ctx, gcp_bigtable.RowList(keys), func(row gcp_bigtable.Row) bool {
b := &types.Eth1BlobTransactionIndexed{}
err := proto.Unmarshal(row[DEFAULT_FAMILY][0].Value, b)
Expand All @@ -2406,7 +2546,7 @@ func (bigtable *Bigtable) GetEth1BtxForAddress(prefix string, limit int64) ([]*t
}
}

return data, indexes[len(indexes)-1], nil
return data, skipBlockIfLastTxIndex(indexes[len(indexes)-1]), nil
}

func (bigtable *Bigtable) GetAddressBlobTableData(address []byte, pageToken string) (*types.DataTableResponse, error) {
Expand Down Expand Up @@ -2487,7 +2627,6 @@ func (bigtable *Bigtable) GetEth1ItxsForAddress(prefix string, limit int64) ([]*

keysMap := make(map[string]*types.Eth1InternalTransactionIndexed, limit)
err := bigtable.tableData.ReadRows(ctx, rowRange, func(row gcp_bigtable.Row) bool {

keys = append(keys, strings.TrimPrefix(row[DEFAULT_FAMILY][0].Column, "f:"))
indexes = append(indexes, row.Key())
return true
Expand All @@ -2499,6 +2638,8 @@ func (bigtable *Bigtable) GetEth1ItxsForAddress(prefix string, limit int64) ([]*
return data, nil, nil
}

indexes, keys = bigtable.rearrangeReversePaddedIndexZero(ctx, indexes, keys)

err = bigtable.tableData.ReadRows(ctx, gcp_bigtable.RowList(keys), func(row gcp_bigtable.Row) bool {
b := &types.Eth1InternalTransactionIndexed{}
err := proto.Unmarshal(row[DEFAULT_FAMILY][0].Value, b)
Expand All @@ -2525,6 +2666,7 @@ func (bigtable *Bigtable) GetEth1ItxsForAddress(prefix string, limit int64) ([]*
}
}

indexes[len(indexes)-1] = skipBlockIfLastTxIndex(indexes[len(indexes)-1])
return data, indexes, nil
}

Expand Down Expand Up @@ -2564,6 +2706,10 @@ func (bigtable *Bigtable) GetAddressInternalTableData(address []byte, pageToken
if err != nil {
return nil, fmt.Errorf("error parsing Eth1InternalTransactionIndexed tx index: %v", err)
}
if tx_idx == TX_PER_BLOCK_LIMIT {
// handle "reversePaddedIndex 0"-issue
tx_idx = 0
}
tx_idx = TX_PER_BLOCK_LIMIT - tx_idx
if tx_idx < 0 {
return nil, fmt.Errorf("invalid Eth1InternalTransactionIndexed tx index: %d", tx_idx)
Expand All @@ -2573,6 +2719,10 @@ func (bigtable *Bigtable) GetAddressInternalTableData(address []byte, pageToken
if err != nil {
return nil, fmt.Errorf("error parsing Eth1InternalTransactionIndexed trace index: %v", err)
}
if trace_idx == ITX_PER_TX_LIMIT {
// handle "reversePaddedIndex 0"-issue
trace_idx = 0
}
trace_idx = ITX_PER_TX_LIMIT - trace_idx
if tx_idx < 0 {
return nil, fmt.Errorf("invalid Eth1InternalTransactionIndexed trace index: %d", trace_idx)
Expand Down Expand Up @@ -2615,7 +2765,7 @@ func (bigtable *Bigtable) GetAddressInternalTableData(address []byte, pageToken

token := ""
if len(keys) > 0 {
token = keys[len(keys)-1]
token = skipBlockIfLastTxIndex(keys[len(keys)-1])
}

data := &types.DataTableResponse{
Expand Down Expand Up @@ -2841,6 +2991,8 @@ func (bigtable *Bigtable) GetEth1ERC20ForAddress(prefix string, limit int64) ([]
return data, "", nil
}

indexes, keys = bigtable.rearrangeReversePaddedIndexZero(ctx, indexes, keys)

err = bigtable.tableData.ReadRows(ctx, gcp_bigtable.RowList(keys), func(row gcp_bigtable.Row) bool {
b := &types.Eth1ERC20Indexed{}
err := proto.Unmarshal(row[DEFAULT_FAMILY][0].Value, b)
Expand All @@ -2862,7 +3014,7 @@ func (bigtable *Bigtable) GetEth1ERC20ForAddress(prefix string, limit int64) ([]
}
}

return data, indexes[len(indexes)-1], nil
return data, skipBlockIfLastTxIndex(indexes[len(indexes)-1]), nil
}

func (bigtable *Bigtable) GetAddressErc20TableData(address []byte, pageToken string) (*types.DataTableResponse, error) {
Expand Down Expand Up @@ -2968,6 +3120,8 @@ func (bigtable *Bigtable) GetEth1ERC721ForAddress(prefix string, limit int64) ([
return data, "", nil
}

indexes, keys = bigtable.rearrangeReversePaddedIndexZero(ctx, indexes, keys)

err = bigtable.tableData.ReadRows(ctx, gcp_bigtable.RowList(keys), func(row gcp_bigtable.Row) bool {
b := &types.Eth1ERC721Indexed{}
err := proto.Unmarshal(row[DEFAULT_FAMILY][0].Value, b)
Expand All @@ -2988,7 +3142,7 @@ func (bigtable *Bigtable) GetEth1ERC721ForAddress(prefix string, limit int64) ([
data = append(data, d)
}
}
return data, indexes[len(indexes)-1], nil
return data, skipBlockIfLastTxIndex(indexes[len(indexes)-1]), nil
}

func (bigtable *Bigtable) GetAddressErc721TableData(address []byte, pageToken string) (*types.DataTableResponse, error) {
Expand Down Expand Up @@ -3078,6 +3232,8 @@ func (bigtable *Bigtable) GetEth1ERC1155ForAddress(prefix string, limit int64) (
return data, "", nil
}

indexes, keys = bigtable.rearrangeReversePaddedIndexZero(ctx, indexes, keys)

err = bigtable.tableData.ReadRows(ctx, gcp_bigtable.RowList(keys), func(row gcp_bigtable.Row) bool {
b := &types.ETh1ERC1155Indexed{}
err := proto.Unmarshal(row[DEFAULT_FAMILY][0].Value, b)
Expand All @@ -3098,7 +3254,7 @@ func (bigtable *Bigtable) GetEth1ERC1155ForAddress(prefix string, limit int64) (
data = append(data, d)
}
}
return data, indexes[len(indexes)-1], nil
return data, skipBlockIfLastTxIndex(indexes[len(indexes)-1]), nil
}

func (bigtable *Bigtable) GetAddressErc1155TableData(address []byte, pageToken string) (*types.DataTableResponse, error) {
Expand Down Expand Up @@ -3789,7 +3945,6 @@ func (bigtable *Bigtable) GetAddressContractInteractionsAtITransactions(itransac
}

// convenience function to get contract interaction status per parity trace
// 2nd parameter specifies [tx_idx, trace_idx] for each internal tx
func (bigtable *Bigtable) GetAddressContractInteractionsAtParityTraces(traces []*rpc.ParityTraceResult) ([][2]types.ContractInteractionType, error) {
requests := make([]contractInteractionAtRequest, 0, len(traces)*2)
for i, itx := range traces {
Expand Down
5 changes: 3 additions & 2 deletions handlers/eth1Transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func getTransactionDataStartingWithPageToken(pageToken string) *types.DataTableR
}
}
}
if pageTokenId == 0 {
if pageTokenId == 0 && pageToken != "0" {
pageTokenId = services.LatestEth1BlockNumber()
}

Expand Down Expand Up @@ -87,7 +87,8 @@ func getTransactionDataStartingWithPageToken(pageToken string) *types.DataTableR
}

var wg errgroup.Group
for i, v := range t {
for i := len(t) - 1; i >= 0; i-- {
v := t[i]
wg.Go(func() error {
method := "Transfer"
{
Expand Down
2 changes: 1 addition & 1 deletion templates/execution/address.html
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@
content.innerHTML = col
el.appendChild(content)
infLoading.insertAdjacentElement("beforebegin", el)
drawCallback()
}
}
drawCallback()
} else if (data.pagingToken === "") {
infLoading.remove()
} else if (data && data.data && data.data.length == 0) {
Expand Down
2 changes: 1 addition & 1 deletion templates/execution/token.html
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@
el.classList.add('text-truncate')
el.innerHTML = col
infLoading.insertAdjacentElement("beforebegin", el)
drawCallback()
}
}
drawCallback()
} else if (data && data.data) {
if (data.data.length < 25) {
infLoading.innerText = ''
Expand Down

0 comments on commit 9d21f02

Please sign in to comment.