9
9
"os"
10
10
"strconv"
11
11
"sync"
12
+ "sync/atomic"
12
13
"time"
13
14
14
15
"github.com/ethereum/go-ethereum/common"
@@ -74,6 +75,8 @@ type Ingestor struct {
74
75
queries * db.Queries
75
76
etherscanClient * EtherscanClient
76
77
pubSub * PubSub
78
+ renderSignal chan struct {}
79
+ isRendering atomic.Bool
77
80
}
78
81
79
82
func NewIngestor (logger * zap.Logger , sqlDB * sql.DB , apiKey string ) * Ingestor {
@@ -83,11 +86,11 @@ func NewIngestor(logger *zap.Logger, sqlDB *sql.DB, apiKey string) *Ingestor {
83
86
queries : db .New (sqlDB ),
84
87
etherscanClient : NewEtherscanClient (apiKey , logger ),
85
88
pubSub : pubSub ,
89
+ renderSignal : make (chan struct {}, 1 ),
86
90
}
87
91
88
- // Start subscribers
89
- go ingestor .imageRenderSubscriber (pubSub .Subscribe (EventTypeImageRender ))
90
- go ingestor .discordNotificationSubscriber (pubSub .Subscribe (EventTypeDiscordNotification ))
92
+ // Start the continuous rendering process
93
+ go ingestor .continuousRenderProcess ()
91
94
92
95
return ingestor
93
96
}
@@ -302,15 +305,10 @@ func (i *Ingestor) processTransaction(ctx context.Context, tx *EtherscanTransact
302
305
return fmt .Errorf ("failed to insert data history: %w" , err )
303
306
}
304
307
305
- // Publish event for image rendering
306
- imageRenderPayload , _ := json .Marshal (map [string ]interface {}{
307
- "location" : location .String (),
308
- "imageData" : image ,
309
- "blockNumber" : blockNumber ,
310
- })
311
- i .pubSub .Publish (Event {Type : EventTypeImageRender , Payload : imageRenderPayload })
308
+ // Signal that new data is available to render
309
+ i .signalNewData ()
312
310
313
- // Publish event for Discord notification
311
+ // Keep the Discord notification
314
312
discordPayload , _ := json .Marshal (map [string ]interface {}{
315
313
"message" : fmt .Sprintf ("Tile %s updated by %s" , location .String (), tx .From ),
316
314
"url" : url ,
@@ -353,39 +351,82 @@ func (i *Ingestor) updateLastProcessedBlock(ctx context.Context, blockNumber int
353
351
return nil
354
352
}
355
353
356
- func (i * Ingestor ) imageRenderSubscriber (ch <- chan Event ) {
357
- for event := range ch {
358
- var payload struct {
359
- Location string `json:"location"`
360
- ImageData string `json:"imageData"`
361
- BlockNumber int64 `json:"blockNumber"`
362
- }
363
- json .Unmarshal (event .Payload , & payload )
364
-
365
- location , _ := new (big.Int ).SetString (payload .Location , 10 )
366
- if err := i .renderAndSaveImage (location , payload .ImageData , payload .BlockNumber ); err != nil {
367
- i .logger .Error ("Failed to render and save image" ,
368
- zap .Error (err ),
369
- zap .String ("location" , payload .Location ),
370
- zap .Int64 ("blockNumber" , payload .BlockNumber ))
354
+ func (i * Ingestor ) signalNewData () {
355
+ select {
356
+ case i .renderSignal <- struct {}{}:
357
+ default :
358
+ // Channel already has a signal, no need to send another
359
+ }
360
+ }
361
+
362
+ func (i * Ingestor ) continuousRenderProcess () {
363
+ for {
364
+ // Wait for a signal that new data is available
365
+ <- i .renderSignal
366
+
367
+ // Set the rendering flag
368
+ if ! i .isRendering .CompareAndSwap (false , true ) {
369
+ // If already rendering, continue waiting
370
+ continue
371
371
}
372
+
373
+ // Start rendering in a separate goroutine
374
+ go func () {
375
+ defer i .isRendering .Store (false )
376
+
377
+ for {
378
+ if err := i .processDataHistory (context .Background ()); err != nil {
379
+ i .logger .Error ("Failed to process data history" , zap .Error (err ))
380
+ return
381
+ }
382
+
383
+ // Check if there's more data to process
384
+ select {
385
+ case <- i .renderSignal :
386
+ // More data available, continue processing
387
+ continue
388
+ default :
389
+ // No more data, exit the rendering loop
390
+ return
391
+ }
392
+ }
393
+ }()
372
394
}
373
395
}
374
396
375
- func (i * Ingestor ) discordNotificationSubscriber (ch <- chan Event ) {
376
- for event := range ch {
377
- var payload struct {
378
- Message string `json:"message"`
379
- URL string `json:"url"`
397
+ func (i * Ingestor ) processDataHistory (ctx context.Context ) error {
398
+ lastProcessedID , err := i .queries .GetLastProcessedDataHistoryID (ctx )
399
+ if err != nil {
400
+ return fmt .Errorf ("failed to get last processed data history ID: %w" , err )
401
+ }
402
+
403
+ // Fetch a batch of unprocessed data history entries
404
+ history , err := i .queries .GetUnprocessedDataHistory (ctx , db.GetUnprocessedDataHistoryParams {
405
+ ID : lastProcessedID ,
406
+ Limit : 100 , // Process in batches of 100
407
+ })
408
+ if err != nil {
409
+ return fmt .Errorf ("failed to get unprocessed data history: %w" , err )
410
+ }
411
+
412
+ if len (history ) == 0 {
413
+ // No more data to process
414
+ return nil
415
+ }
416
+
417
+ for _ , row := range history {
418
+ location := big .NewInt (int64 (row .TileID ))
419
+ if err := i .renderAndSaveImage (location , row .Image , row .BlockNumber ); err != nil {
420
+ return fmt .Errorf ("failed to render and save image: %w" , err )
380
421
}
381
- json .Unmarshal (event .Payload , & payload )
382
422
383
- // Implement Discord notification logic here
384
- i .logger .Info ("Discord notification" ,
385
- zap .String ("message" , payload .Message ),
386
- zap .String ("url" , payload .URL ))
387
- // You would typically call a Discord API client here
423
+ // Update the last processed ID
424
+ if err := i .queries .UpdateLastProcessedDataHistoryID (ctx , row .ID ); err != nil {
425
+ return fmt .Errorf ("failed to update last processed data history ID: %w" , err )
426
+ }
388
427
}
428
+
429
+ return nil
389
430
}
390
431
391
432
func (i * Ingestor ) renderAndSaveImage (location * big.Int , imageData string , blockNumber int64 ) error {
0 commit comments