diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 5b85062..ddba6b0 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -30,16 +30,13 @@ jobs: needs: - build steps: - - name: Checkout code - uses: actions/checkout@v3 - - name: Set up Go uses: actions/setup-go@v3 with: go-version: 1.19 - - name: Install dependencies - run: make deps + - name: Checkout code + uses: actions/checkout@v3 - name: Run linter run: make lint diff --git a/cmd/main.go b/cmd/main.go index 2c95157..96497aa 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -13,6 +13,7 @@ import ( "log" "os/signal" "syscall" + "time" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/cors" @@ -56,56 +57,39 @@ func main() { } zapLogger.Info("DB connected successfully") - //err = migration.RunMigrations(db, cfg.MongoDb.MongoDbName, zapLogger) - //if err != nil { - // zapLogger.Fatalf("failed to run migration: %s", err) - //} - // Firebase firebaseClient, err := firebase.NewClient(cfg.Firebase.CredPath) if err != nil { zapLogger.Fatalf("failed to create firebase messaging client - %v", err) } - zapLogger.Info("Firebase client created successfully") - cloudMessagingClient, err := firebaseClient.CreateCloudMessagingClient() if err != nil { zapLogger.Fatalf("failed to create firebase cloud messaging client - %v", err) } - zapLogger.Info("Firebase cloud messaging client created successfully") - // Firebase message cloudMessagingService, err := cloudmessaging.NewCloudMessagingService(cloudMessagingClient, cfg.Firebase.AndroidChannelName) if err != nil { zapLogger.Fatalf("failed to create firebase message service - %v", err) } - zapLogger.Info("Firebase message service created successfully") - // Repository watcherRepository, err := watcher.NewRepository(db, cfg.MongoDb.MongoDbName, zapLogger) if err != nil { zapLogger.Fatalf("failed to create watcher repository - %v", err) } - zapLogger.Info("Watcher repository created successfully") - // Services watcherService, err := watcher.NewService(watcherRepository, cloudMessagingService, zapLogger, cfg.ExplorerApi, cfg.TokenPriceUrl, cfg.CallbackUrl, cfg.ExplorerToken) if err != nil { zapLogger.Fatalf("failed to create watcher service - %v", err) } - zapLogger.Info("Watcher service created successfully") - if err := watcherService.Init(context.Background()); err != nil { zapLogger.Fatalf("failed to init watchers - %v", err) } - zapLogger.Info("Watcher service initialized successfully") - // Handlers healthHandler := health.NewHandler() @@ -114,33 +98,27 @@ func main() { zapLogger.Fatalf("failed to create watcher handler - %v", err) } - zapLogger.Info("Watcher handler created successfully") - // Create config variable config := fiber.Config{ ServerHeader: "AIRDAO-Mobile-Api", // add custom server header } // Run DeleteWatchersWithStaleData on start for check and delete stale data - //if err := watcherService.DeleteWatchersWithStaleData(context.Background()); err != nil { - // zapLogger.Errorf("failed to delete watchers with stale data - %v", err) - //} - // - //zapLogger.Info("Deleted watchers with stale data successfully") - // - //// Run DeleteWatchersWithStaleData every 24 hours for check and delete stale data - //go func() { - // for { - // err := watcherService.DeleteWatchersWithStaleData(context.Background()) - // if err != nil { - // zapLogger.Errorf("failed to delete watchers with stale data - %v", err) - // } - // - // time.Sleep(24 * time.Hour) - // } - //}() - - zapLogger.Info("Deleted watchers with stale data every 24 hours successfully") + if err := watcherService.DeleteWatchersWithStaleData(context.Background()); err != nil { + zapLogger.Errorf("failed to delete watchers with stale data - %v", err) + } + + // Run DeleteWatchersWithStaleData every 24 hours for check and delete stale data + go func() { + for { + err := watcherService.DeleteWatchersWithStaleData(context.Background()) + if err != nil { + zapLogger.Errorf("failed to delete watchers with stale data - %v", err) + } + + time.Sleep(24 * time.Hour) + } + }() // Create fiber app app := fiber.New(config) @@ -169,7 +147,6 @@ func main() { if err := app.Listen(":" + cfg.Port); err != nil { zapLogger.Fatal(err) } - zapLogger.Info("Server started on port %v", cfg.Port) }() zapLogger.Infoln("Server started on port %v", cfg.Port) diff --git a/services/migration/migration.go b/services/migration/migration.go deleted file mode 100644 index 55a07f7..0000000 --- a/services/migration/migration.go +++ /dev/null @@ -1,111 +0,0 @@ -package migration - -import ( - "airdao-mobile-api/services/watcher" - "errors" - - "go.mongodb.org/mongo-driver/bson/primitive" - "go.uber.org/zap" -) - -import ( - "context" - - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" -) - -type HistoryNotificationDocument struct { - WatcherID primitive.ObjectID `json:"watcher_id" bson:"watcher_id"` - Notification *watcher.HistoryNotification `json:"notification" bson:"notification"` -} - -type Migration struct { - ID primitive.ObjectID `json:"id" bson:"_id"` - Name string `json:"name" bson:"name"` - IsApplied bool `json:"is_applied" bson:"is_applied"` -} - -func RunMigrations(db *mongo.Client, dbName string, logger *zap.SugaredLogger) error { - err := historicalNotificationMigration(db, dbName, logger) - // Add more migrations here - return err -} - -func historicalNotificationMigration(db *mongo.Client, dbName string, logger *zap.SugaredLogger) error { - if db == nil { - return errors.New("[watcher_repository] invalid user database") - } - name := "historical_notification_migration" - - migrationsCollection := db.Database(dbName).Collection("migrations") - watcherCollection := db.Database(dbName).Collection(watcher.CollectionName) - historyCollection := db.Database(dbName).Collection(watcher.HistoricalNotificationsCollectionName) - - // Check if migration has already been run - var migration Migration - if err := migrationsCollection.FindOne(context.Background(), bson.M{"name": name}).Decode(&migration); err == nil { - logger.Info("Migration already applied.") - return nil - } - - cursor, err := watcherCollection.Find(context.Background(), bson.M{}) - if err != nil { - return err - } - logger.Info("Starting migration...", name) - defer cursor.Close(context.Background()) - - for cursor.Next(context.Background()) { - var watcher watcher.Watcher - if err := cursor.Decode(&watcher); err != nil { - return err - } - - // Check if historical_notifications field is nil - if watcher.HistoricalNotifications != nil { - logger.Info("Migrating historical notifications...", watcher.ID) - - // Iterate over historical notifications only if it's not nil - for _, notification := range *watcher.HistoricalNotifications { - _, err := historyCollection.InsertOne(context.Background(), &HistoryNotificationDocument{ - WatcherID: watcher.ID, - Notification: notification, - }) - if err != nil { - return err - } - } - - // Update watcher to set historical_notifications to nil - _, err := watcherCollection.UpdateOne( - context.Background(), - bson.M{"_id": watcher.ID}, - bson.M{"$set": bson.M{"historical_notifications": nil}}, - ) - if err != nil { - return err - } - } else { - // Log a message or handle the case when historical_notifications is nil - logger.Info("No historical notifications found for watcher:", watcher.ID) - } - - } - - if err := cursor.Err(); err != nil { - return err - } - - logger.Info("Migration completed successfully.") - - // Mark migration as applied - _, err = migrationsCollection.InsertOne(context.Background(), &Migration{ - Name: name, - IsApplied: true, - }) - if err != nil { - return err - } - return nil -} diff --git a/services/watcher/repository.go b/services/watcher/repository.go index e75cae9..42565b5 100644 --- a/services/watcher/repository.go +++ b/services/watcher/repository.go @@ -7,15 +7,11 @@ import ( "time" "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" ) -const HistoricalNotificationsCollectionName = "historical_notifications" -const CollectionName = "watcher" - //go:generate mockgen -source=repository.go -destination=mocks/repository_mock.go type Repository interface { GetWatcher(ctx context.Context, filters bson.M) (*Watcher, error) @@ -28,18 +24,11 @@ type Repository interface { DeleteWatchersWithStaleData(ctx context.Context) error } -type HistoryNotificationDocument struct { - ID primitive.ObjectID `json:"id" bson:"_id"` - WatcherID primitive.ObjectID `json:"watcher_id" bson:"watcher_id"` - Notification *HistoryNotification `json:"notification" bson:"notification"` -} - type repository struct { - db *mongo.Client - dbName string - watchersCollectionName string - historyNotificationCollectionName string - logger *zap.SugaredLogger + db *mongo.Client + dbName string + dbCollectionName string + logger *zap.SugaredLogger } func NewRepository(db *mongo.Client, dbName string, logger *zap.SugaredLogger) (Repository, error) { @@ -53,12 +42,13 @@ func NewRepository(db *mongo.Client, dbName string, logger *zap.SugaredLogger) ( return nil, errors.New("[watcher_repository] invalid logger") } - return &repository{db: db, dbName: dbName, watchersCollectionName: CollectionName, historyNotificationCollectionName: HistoricalNotificationsCollectionName, logger: logger}, nil + return &repository{db: db, dbName: dbName, dbCollectionName: "watcher", logger: logger}, nil } func (r *repository) GetWatcher(ctx context.Context, filters bson.M) (*Watcher, error) { var watcher Watcher - if err := r.db.Database(r.dbName).Collection(r.watchersCollectionName).FindOne(ctx, filters).Decode(&watcher); err != nil { + + if err := r.db.Database(r.dbName).Collection(r.dbCollectionName).FindOne(ctx, filters).Decode(&watcher); err != nil { if err == mongo.ErrNoDocuments { r.logger.Errorf("unable to find watcher: %v", err) // return nil, errors.New("watcher not found") @@ -69,16 +59,11 @@ func (r *repository) GetWatcher(ctx context.Context, filters bson.M) (*Watcher, return nil, err } - if err := r.attachHistory(ctx, &watcher); err != nil { - r.logger.Errorf("unable to fetch and set history notifications: %v", err) - return nil, err - } - return &watcher, nil } func (r *repository) GetAllWatchers(ctx context.Context) ([]*Watcher, error) { - collection := r.db.Database(r.dbName).Collection(r.watchersCollectionName) + collection := r.db.Database(r.dbName).Collection(r.dbCollectionName) filter := bson.D{{}} @@ -97,12 +82,6 @@ func (r *repository) GetAllWatchers(ctx context.Context) ([]*Watcher, error) { r.logger.Errorf("Unable to decode watcher document: %v", err) return nil, nil } - - if err := r.attachHistory(ctx, watcher); err != nil { - r.logger.Errorf("unable to fetch and set history notifications: %v", err) - return nil, nil - } - watchers = append(watchers, watcher) } @@ -114,44 +93,6 @@ func (r *repository) GetAllWatchers(ctx context.Context) ([]*Watcher, error) { return watchers, nil } -// Fetches and assigns historical notifications to the watcher. -func (r *repository) attachHistory(ctx context.Context, watcher *Watcher) error { - historyNotifications := make([]*HistoryNotification, 0) - - sortField := bson.E{ - Key: "notification.timestamp", - Value: -1, - } - sortCriteria := bson.D{sortField} - - cursor, err := r.db.Database(r.dbName).Collection(r.historyNotificationCollectionName).Find(ctx, bson.M{"watcher_id": watcher.ID}, options.Find().SetSort(sortCriteria)) - if err != nil { - r.logger.Errorf("unable to find history notifications due to internal error: %v", err) - return err - } - defer cursor.Close(ctx) - - for cursor.Next(ctx) { - // fetch history notification document, set notification field in HistoryNotification struct - historyNotificationDocument := new(HistoryNotificationDocument) - if err := cursor.Decode(historyNotificationDocument); err != nil { - r.logger.Errorf("Unable to decode history notification document: %v", err) - return err - } - - historyNotifications = append(historyNotifications, historyNotificationDocument.Notification) - } - - if err := cursor.Err(); err != nil { - r.logger.Errorf("cursor iteration error: %v", err) - return err - } - - watcher.HistoricalNotifications = &historyNotifications - - return nil -} - func (r *repository) DeleteWatchersWithStaleData(ctx context.Context) error { watchers, err := r.GetAllWatchers(context.Background()) if err != nil { @@ -178,7 +119,7 @@ func (r *repository) GetWatcherList(ctx context.Context, filters bson.M, page in findOptions := options.Find().SetSkip(int64(skip)).SetLimit(int64(pageSize)) - cur, err := r.db.Database(r.dbName).Collection(r.watchersCollectionName).Find(ctx, filters, findOptions) + cur, err := r.db.Database(r.dbName).Collection(r.dbCollectionName).Find(ctx, filters, findOptions) if err != nil { r.logger.Errorf("unable to find watcher due to internal error: %v", err) return nil, err @@ -192,11 +133,6 @@ func (r *repository) GetWatcherList(ctx context.Context, filters bson.M, page in r.logger.Errorf("unable to decode watcher document: %v", err) return nil, err } - - if err := r.attachHistory(ctx, watcher); err != nil { - r.logger.Errorf("unable to fetch and set history notifications: %v", err) - return nil, err - } watchers = append(watchers, watcher) } @@ -210,11 +146,7 @@ func (r *repository) GetWatcherList(ctx context.Context, filters bson.M, page in } func (r *repository) CreateWatcher(ctx context.Context, watcher *Watcher) error { - // don't write historical notifications to watcher collection, set it to history notifications collection - historicalNotifications := watcher.HistoricalNotifications - watcher.HistoricalNotifications = nil - - _, err := r.db.Database(r.dbName).Collection(r.watchersCollectionName).InsertOne(ctx, watcher) + _, err := r.db.Database(r.dbName).Collection(r.dbCollectionName).InsertOne(ctx, watcher) if err != nil { if mongo.IsDuplicateKeyError(err) { r.logger.Errorf("failed to insert watcher to db due duplicate error: %s", err) @@ -225,28 +157,11 @@ func (r *repository) CreateWatcher(ctx context.Context, watcher *Watcher) error return errors.New("failed to create watcher") } - // add new history notifications to history notifications collection - for _, historyNotification := range *historicalNotifications { - historyNotificationDocument := &HistoryNotificationDocument{ - ID: primitive.NewObjectID(), - WatcherID: watcher.ID, - Notification: historyNotification, - } - _, err := r.db.Database(r.dbName).Collection(r.historyNotificationCollectionName).InsertOne(ctx, historyNotificationDocument) - if err != nil { - r.logger.Errorf("failed to insert history notification to db: %s", err) - return errors.New("failed to create history notification") - } - } - return nil } func (r *repository) UpdateWatcher(ctx context.Context, watcher *Watcher) error { - historicalNotifications := watcher.HistoricalNotifications - watcher.HistoricalNotifications = nil - - _, err := r.db.Database(r.dbName).Collection(r.watchersCollectionName).UpdateOne(ctx, + _, err := r.db.Database(r.dbName).Collection(r.dbCollectionName).UpdateOne(ctx, bson.M{"_id": watcher.ID}, bson.M{"$set": watcher}) @@ -260,55 +175,14 @@ func (r *repository) UpdateWatcher(ctx context.Context, watcher *Watcher) error return errors.New("failed to update watcher") } - // delete all history notifications for this watcher and add historicalNotifications to history notifications collection - _, err = r.db.Database(r.dbName).Collection(r.historyNotificationCollectionName).DeleteMany(ctx, bson.M{"watcher_id": watcher.ID}) - if err != nil { - r.logger.Errorf("failed to delete history notifications: %v", err) - return err - } - - // add updated history notifications to history notifications collection - for _, historyNotification := range *historicalNotifications { - historyNotificationDocument := &HistoryNotificationDocument{ - ID: primitive.NewObjectID(), - WatcherID: watcher.ID, - Notification: historyNotification, - } - _, err := r.db.Database(r.dbName).Collection(r.historyNotificationCollectionName).InsertOne(ctx, historyNotificationDocument) - if err != nil { - r.logger.Errorf("failed to insert history notification to db: %s", err) - return errors.New("failed to create history notification") - } - } - return nil } -// DeleteWatcher deletes the watcher and its associated history notifications from the database -// based on the provided filters. It retrieves the watcher using the filters and then uses its ID -// to delete both the watcher and its associated history notifications. func (r *repository) DeleteWatcher(ctx context.Context, filters bson.M) error { - watcher, err := r.GetWatcher(ctx, filters) + _, err := r.db.Database(r.dbName).Collection(r.dbCollectionName).DeleteOne(ctx, filters) if err != nil { - r.logger.Errorf("unable to get watcher: %v", err) - return err - } - - if watcher == nil { - r.logger.Errorf("watcher not found") - return errors.New("watcher not found") - } - - _, err = r.db.Database(r.dbName).Collection(r.watchersCollectionName).DeleteOne(ctx, filters) - if err != nil { - r.logger.Errorf("failed to delete watcher: %v", err) - return err - } - - _, err = r.db.Database(r.dbName).Collection(r.historyNotificationCollectionName).DeleteMany(ctx, bson.M{"watcher_id": watcher.ID}) - if err != nil { - r.logger.Errorf("failed to delete history notifications: %v", err) - return err + r.logger.Errorf("failed to delete watcher: %s", err) + return errors.New("failed to delete watcher") } return nil