Skip to content

Commit

Permalink
✨ Added pub/sub model for updates using go channels
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Erwin committed Sep 18, 2024
1 parent 79b0dab commit 51aedd8
Showing 1 changed file with 106 additions and 6 deletions.
112 changes: 106 additions & 6 deletions new-backend/internal/ingestor/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package ingestor
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"math/big"
"os"
"strconv"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -24,18 +26,70 @@ const (
imageSize = 512
)

// Event types
const (
EventTypeImageRender = "image_render"
EventTypeDiscordNotification = "discord_notification"
// Add more event types as needed
)

type Event struct {
Type string
Payload json.RawMessage
}

type PubSub struct {
subscribers map[string][]chan Event
mu sync.RWMutex
}

func NewPubSub() *PubSub {
return &PubSub{
subscribers: make(map[string][]chan Event),
}
}

func (ps *PubSub) Subscribe(eventType string) <-chan Event {
ps.mu.Lock()
defer ps.mu.Unlock()
ch := make(chan Event, 100)
ps.subscribers[eventType] = append(ps.subscribers[eventType], ch)
return ch
}

func (ps *PubSub) Publish(event Event) {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, ch := range ps.subscribers[event.Type] {
select {
case ch <- event:
default:
// Channel full, event dropped
}
}
}

type Ingestor struct {
logger *zap.Logger
queries *db.Queries
etherscanClient *EtherscanClient
pubSub *PubSub
}

func NewIngestor(logger *zap.Logger, sqlDB *sql.DB, apiKey string) *Ingestor {
return &Ingestor{
pubSub := NewPubSub()
ingestor := &Ingestor{
logger: logger,
queries: db.New(sqlDB), // This should now work correctly
queries: db.New(sqlDB),
etherscanClient: NewEtherscanClient(apiKey, logger),
pubSub: pubSub,
}

// Start subscribers
go ingestor.imageRenderSubscriber(pubSub.Subscribe(EventTypeImageRender))
go ingestor.discordNotificationSubscriber(pubSub.Subscribe(EventTypeDiscordNotification))

return ingestor
}

func (i *Ingestor) IngestTransactions(ctx context.Context) error {
Expand Down Expand Up @@ -231,10 +285,6 @@ func (i *Ingestor) processTransaction(ctx context.Context, tx *EtherscanTransact
fmt.Printf("setTile called with location: %s, image: %s, url: %s, price: %s ETH\n",
location.String(), image, url, priceEthStr)

// Render and save the image
if err := i.renderAndSaveImage(location, image, blockNumber.Int64()); err != nil {
return fmt.Errorf("failed to render and save image: %w", err)
}
// Add to dataHistory
dataHistory := db.InsertDataHistoryParams{
TileID: int32(location.Int64()),
Expand All @@ -251,6 +301,21 @@ func (i *Ingestor) processTransaction(ctx context.Context, tx *EtherscanTransact
if _, err := i.queries.InsertDataHistory(ctx, dataHistory); err != nil {
return fmt.Errorf("failed to insert data history: %w", err)
}

// Publish event for image rendering
imageRenderPayload, _ := json.Marshal(map[string]interface{}{
"location": location.String(),
"imageData": image,
"blockNumber": blockNumber,
})
i.pubSub.Publish(Event{Type: EventTypeImageRender, Payload: imageRenderPayload})

// Publish event for Discord notification
discordPayload, _ := json.Marshal(map[string]interface{}{
"message": fmt.Sprintf("Tile %s updated by %s", location.String(), tx.From),
"url": url,
})
i.pubSub.Publish(Event{Type: EventTypeDiscordNotification, Payload: discordPayload})
}

// Add other cases as needed
Expand Down Expand Up @@ -288,6 +353,41 @@ func (i *Ingestor) updateLastProcessedBlock(ctx context.Context, blockNumber int
return nil
}

func (i *Ingestor) imageRenderSubscriber(ch <-chan Event) {
for event := range ch {
var payload struct {
Location string `json:"location"`
ImageData string `json:"imageData"`
BlockNumber int64 `json:"blockNumber"`
}
json.Unmarshal(event.Payload, &payload)

location, _ := new(big.Int).SetString(payload.Location, 10)
if err := i.renderAndSaveImage(location, payload.ImageData, payload.BlockNumber); err != nil {
i.logger.Error("Failed to render and save image",
zap.Error(err),
zap.String("location", payload.Location),
zap.Int64("blockNumber", payload.BlockNumber))
}
}
}

func (i *Ingestor) discordNotificationSubscriber(ch <-chan Event) {
for event := range ch {
var payload struct {
Message string `json:"message"`
URL string `json:"url"`
}
json.Unmarshal(event.Payload, &payload)

// Implement Discord notification logic here
i.logger.Info("Discord notification",
zap.String("message", payload.Message),
zap.String("url", payload.URL))
// You would typically call a Discord API client here
}
}

func (i *Ingestor) renderAndSaveImage(location *big.Int, imageData string, blockNumber int64) error {
// Create the directory if it doesn't exist
dirPath := fmt.Sprintf("cache/%s", location.String())
Expand Down

0 comments on commit 51aedd8

Please sign in to comment.