From 13672e8865ff8154fa0395b112ec52a5ff60876a Mon Sep 17 00:00:00 2001 From: Ken Erwin Date: Sun, 22 Sep 2024 20:44:24 -0400 Subject: [PATCH] still bugs --- new-backend/internal/db/querier.go | 2 +- new-backend/internal/db/queries.sql.go | 10 +---- new-backend/internal/db/queries/queries.sql | 3 +- new-backend/internal/ingestor/ingestor.go | 45 +++++++++++++++---- .../internal/utils/decompress_tile_code.go | 1 - new-backend/internal/utils/render_image.go | 11 ++++- 6 files changed, 49 insertions(+), 23 deletions(-) diff --git a/new-backend/internal/db/querier.go b/new-backend/internal/db/querier.go index 9e9096d..de68ef5 100644 --- a/new-backend/internal/db/querier.go +++ b/new-backend/internal/db/querier.go @@ -22,7 +22,7 @@ type Querier interface { GetPurchaseHistoryByTileId(ctx context.Context, tileID int32) ([]PurchaseHistory, error) GetTileById(ctx context.Context, id int32) (Tile, error) GetTilesByOwner(ctx context.Context, owner string) ([]Tile, error) - GetUnprocessedDataHistory(ctx context.Context, arg GetUnprocessedDataHistoryParams) ([]DataHistory, error) + GetUnprocessedDataHistory(ctx context.Context, id int32) ([]DataHistory, error) GetWrappedTiles(ctx context.Context) ([]Tile, error) InsertDataHistory(ctx context.Context, arg InsertDataHistoryParams) (int32, error) InsertPixelMapTransaction(ctx context.Context, arg InsertPixelMapTransactionParams) (int32, error) diff --git a/new-backend/internal/db/queries.sql.go b/new-backend/internal/db/queries.sql.go index 8ece935..74aa6cf 100644 --- a/new-backend/internal/db/queries.sql.go +++ b/new-backend/internal/db/queries.sql.go @@ -329,16 +329,10 @@ const getUnprocessedDataHistory = `-- name: GetUnprocessedDataHistory :many SELECT id, time_stamp, block_number, tx, log_index, image, price, url, updated_by, tile_id FROM data_histories WHERE id > $1 ORDER BY id ASC -LIMIT $2 ` -type GetUnprocessedDataHistoryParams struct { - ID int32 `json:"id"` - Limit int32 `json:"limit"` -} - -func (q *Queries) GetUnprocessedDataHistory(ctx context.Context, arg GetUnprocessedDataHistoryParams) ([]DataHistory, error) { - rows, err := q.db.QueryContext(ctx, getUnprocessedDataHistory, arg.ID, arg.Limit) +func (q *Queries) GetUnprocessedDataHistory(ctx context.Context, id int32) ([]DataHistory, error) { + rows, err := q.db.QueryContext(ctx, getUnprocessedDataHistory, id) if err != nil { return nil, err } diff --git a/new-backend/internal/db/queries/queries.sql b/new-backend/internal/db/queries/queries.sql index 1788379..c268fde 100644 --- a/new-backend/internal/db/queries/queries.sql +++ b/new-backend/internal/db/queries/queries.sql @@ -134,8 +134,7 @@ RETURNING COALESCE(CAST(value AS INTEGER), 0)::INT4; -- name: GetUnprocessedDataHistory :many SELECT * FROM data_histories WHERE id > $1 -ORDER BY id ASC -LIMIT $2; +ORDER BY id ASC; -- name: UpdateLastProcessedDataHistoryID :exec INSERT INTO current_state (state, value) diff --git a/new-backend/internal/ingestor/ingestor.go b/new-backend/internal/ingestor/ingestor.go index 6098820..16c6ebb 100644 --- a/new-backend/internal/ingestor/ingestor.go +++ b/new-backend/internal/ingestor/ingestor.go @@ -102,6 +102,9 @@ func NewIngestor(logger *zap.Logger, sqlDB *sql.DB, apiKey string) *Ingestor { // Start the continuous rendering process go ingestor.continuousRenderProcess() + // Trigger an initial render + ingestor.signalNewData() + return ingestor } @@ -575,10 +578,7 @@ func (i *Ingestor) processDataHistory(ctx context.Context) error { } // Fetch a batch of unprocessed data history entries - history, err := i.queries.GetUnprocessedDataHistory(ctx, db.GetUnprocessedDataHistoryParams{ - ID: lastProcessedID, - Limit: 100, // Process in batches of 100 - }) + history, err := i.queries.GetUnprocessedDataHistory(ctx, lastProcessedID) if err != nil { return fmt.Errorf("failed to get unprocessed data history: %w", err) } @@ -636,13 +636,34 @@ func (i *Ingestor) renderAndSaveImage(location *big.Int, imageData string, block // Render the image using the existing RenderImage function err := utils.RenderImage(imageData, imageSize, imageSize, blockFilePath) if err != nil { - return fmt.Errorf("failed to render image: %w", err) + i.logger.Error("Failed to render block image", + zap.Error(err), + zap.String("path", blockFilePath), + zap.String("imageData", imageData)) + return fmt.Errorf("failed to render block image: %w", err) } // Render the image using the existing RenderImage function err = utils.RenderImage(imageData, imageSize, imageSize, latestFilePath) if err != nil { - return fmt.Errorf("failed to render image: %w", err) + i.logger.Error("Failed to render latest image", + zap.Error(err), + zap.String("path", latestFilePath), + zap.String("imageData", imageData)) + return fmt.Errorf("failed to render latest image: %w", err) + } + + // Verify that the files were created + if _, err := os.Stat(blockFilePath); os.IsNotExist(err) { + i.logger.Error("Block image file not created", + zap.String("path", blockFilePath)) + return nil + } + + if _, err := os.Stat(latestFilePath); os.IsNotExist(err) { + i.logger.Error("Latest image file not created", + zap.String("path", latestFilePath)) + return fmt.Errorf("latest image file not created: %s", latestFilePath) } i.logger.Info("Image rendered and saved", @@ -653,7 +674,14 @@ func (i *Ingestor) renderAndSaveImage(location *big.Int, imageData string, block func (i *Ingestor) processTileUpdate(ctx context.Context, location *big.Int, image, url string, priceWei *big.Int, tx *EtherscanTransaction, timestamp, blockNumber int64, transactionIndex int32) error { var priceEthStr string - if priceWei != nil && priceWei.Sign() > 0 { + if priceWei == nil { + // Fetch the current price from the database + currentTile, err := i.queries.GetTileById(ctx, int32(location.Int64())) + if err != nil { + return fmt.Errorf("failed to get current tile data: %w", err) + } + priceEthStr = currentTile.Price + } else if priceWei.Sign() > 0 { // Convert Wei to Ether priceEth := new(big.Float).Quo(new(big.Float).SetInt(priceWei), big.NewFloat(1e18)) @@ -668,8 +696,7 @@ func (i *Ingestor) processTileUpdate(ctx context.Context, location *big.Int, ima priceEthStr = strings.TrimRight(strings.TrimRight(fmt.Sprintf("%.18f", priceEth), "0"), ".") } } else { - // If price is nil or zero, set it to an empty string or a default value - priceEthStr = "" // or you could use "0" if you prefer + priceEthStr = "0" } i.logger.Info("Tile update", diff --git a/new-backend/internal/utils/decompress_tile_code.go b/new-backend/internal/utils/decompress_tile_code.go index 1e9f206..e45bc1f 100644 --- a/new-backend/internal/utils/decompress_tile_code.go +++ b/new-backend/internal/utils/decompress_tile_code.go @@ -51,7 +51,6 @@ func uncompressedDecoder(tileCodeString string) (string, error) { if strings.HasPrefix(tileCodeString, ImageCompressed) || strings.HasPrefix(tileCodeString, ImageCompressedV2) { return "", fmt.Errorf("not an uncompressed image") } - log.Println("Not a compressed image, returning as-is") return tileCodeString, nil } diff --git a/new-backend/internal/utils/render_image.go b/new-backend/internal/utils/render_image.go index 0af0912..f80ad50 100644 --- a/new-backend/internal/utils/render_image.go +++ b/new-backend/internal/utils/render_image.go @@ -12,7 +12,14 @@ import ( ) func RenderImage(tileImageData string, sizeX, sizeY int, outputPath string) error { - if len(tileImageData) < 768 { + // First try to decompress the tile image data + decompressedImage, err := DecompressTileCode(tileImageData) + if err != nil { + return fmt.Errorf("failed to decompress tile image data: %w", err) + } + + if len(decompressedImage) < 768 { + fmt.Printf("decompressed tile image data is too short: %d bytes", len(decompressedImage)) return nil } @@ -24,7 +31,7 @@ func RenderImage(tileImageData string, sizeX, sizeY int, outputPath string) erro for i := 0; i < 256; i++ { x, y := i%16, i/16 - hexStr := tileImageData[i*3 : i*3+3] + hexStr := decompressedImage[i*3 : i*3+3] r := parseHexChar(hexStr[0]) g := parseHexChar(hexStr[1]) b := parseHexChar(hexStr[2])