From a7a3ec64e867cee366f8ede3b33ff7e8b2e459fc Mon Sep 17 00:00:00 2001 From: Nahom Amare Date: Sun, 15 Mar 2026 16:08:17 +0300 Subject: [PATCH] feat: notify 0box after file operation --- .../0chain.net/blobbercore/config/config.go | 9 ++++ .../0chain.net/blobbercore/handler/handler.go | 4 ++ .../blobbercore/handler/handler_common.go | 1 + .../blobbercore/handler/sync_notify.go | 47 +++++++++++++++++++ 4 files changed, 61 insertions(+) create mode 100644 code/go/0chain.net/blobbercore/handler/sync_notify.go diff --git a/code/go/0chain.net/blobbercore/config/config.go b/code/go/0chain.net/blobbercore/config/config.go index 9420b2276..91472c373 100644 --- a/code/go/0chain.net/blobbercore/config/config.go +++ b/code/go/0chain.net/blobbercore/config/config.go @@ -59,6 +59,9 @@ func SetupDefaultConfig() { viper.SetDefault("kv.pebble_cache", 4*1024*1024*1024) viper.SetDefault("kv.pebble_memtable_size", 256*1024*1024) viper.SetDefault("kv.pebble_max_open_files", 10000) + + viper.SetDefault("0box.sync_notify_url", "") + viper.SetDefault("0box.sync_notify_enabled", false) } /*SetupConfig - setup the configuration system */ @@ -171,6 +174,9 @@ type Config struct { PebbleCache int64 PebbleMemtableSize int64 PebbleMaxOpenFiles int + + SyncNotifyURL string + SyncNotifyEnabled bool } /*Configuration of the system */ @@ -321,6 +327,9 @@ func ReadConfig(deploymentMode int) { Configuration.PebbleCache = viper.GetInt64("kv.pebble_cache") Configuration.PebbleMemtableSize = viper.GetInt64("kv.pebble_memtable_size") Configuration.PebbleMaxOpenFiles = viper.GetInt("kv.pebble_max_open_files") + + Configuration.SyncNotifyURL = viper.GetString("0box.sync_notify_url") + Configuration.SyncNotifyEnabled = viper.GetBool("0box.sync_notify_enabled") } // StorageSCConfiguration will include all the required sc configs to operate blobber diff --git a/code/go/0chain.net/blobbercore/handler/handler.go b/code/go/0chain.net/blobbercore/handler/handler.go index ccd9afb73..ace660ee0 100644 --- a/code/go/0chain.net/blobbercore/handler/handler.go +++ b/code/go/0chain.net/blobbercore/handler/handler.go @@ -1779,6 +1779,8 @@ func RevokeShare(ctx context.Context, r *http.Request) (interface{}, error) { return nil, err } + notifySyncEvent(allocationID, clientID, "share_updated") + resp := map[string]interface{}{ "status": http.StatusNoContent, "message": "Path successfully removed from allocation", @@ -1922,6 +1924,8 @@ func InsertShare(ctx context.Context, r *http.Request) (interface{}, error) { return nil, common.NewError("share_info_insert", "Unable to save share info") } + notifySyncEvent(allocationID, clientID, "share_updated") + return map[string]interface{}{"message": "Share info added successfully"}, nil } diff --git a/code/go/0chain.net/blobbercore/handler/handler_common.go b/code/go/0chain.net/blobbercore/handler/handler_common.go index 333d6d5c6..5854d1324 100644 --- a/code/go/0chain.net/blobbercore/handler/handler_common.go +++ b/code/go/0chain.net/blobbercore/handler/handler_common.go @@ -191,6 +191,7 @@ func WithStatusConnectionForWM(handler common.StatusCodeResponderF) common.Statu // Save the write marker data if blobberRes.WriteMarker != nil { writemarker.SaveMarkerData(allocationID, blobberRes.WriteMarker.WM.Timestamp, blobberRes.WriteMarker.WM.ChainLength) + notifySyncEvent(allocationID, blobberRes.WriteMarker.WM.ClientID, "file_updated") trie := blobberRes.Trie if trie != nil && blobberRes.WriteMarker.WM.AllocationRoot != blobberRes.WriteMarker.WM.PreviousAllocationRoot { _ = trie.DeleteNodes() diff --git a/code/go/0chain.net/blobbercore/handler/sync_notify.go b/code/go/0chain.net/blobbercore/handler/sync_notify.go new file mode 100644 index 000000000..a6b62e656 --- /dev/null +++ b/code/go/0chain.net/blobbercore/handler/sync_notify.go @@ -0,0 +1,47 @@ +package handler + +import ( + "encoding/json" + + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" + . "github.com/0chain/blobber/code/go/0chain.net/core/logging" + "github.com/0chain/blobber/code/go/0chain.net/core/util" + "go.uber.org/zap" +) + +func notifySyncEvent(allocationID, clientID, eventType string) { + if !config.Configuration.SyncNotifyEnabled || config.Configuration.SyncNotifyURL == "" { + return + } + + Logger.Info("sync_notify_queued", + zap.String("allocation_id", allocationID), + zap.String("client_id", clientID), + zap.String("event_type", eventType), + zap.String("notify_url", config.Configuration.SyncNotifyURL)) + + payload, err := json.Marshal(map[string]string{ + "allocation_id": allocationID, + "client_id": clientID, + "event_type": eventType, + }) + if err != nil { + Logger.Error("sync_notify_marshal_failed", zap.Error(err)) + return + } + + go func() { + resp, err := util.SendPostRequest(config.Configuration.SyncNotifyURL, payload, nil) + if err != nil { + Logger.Error("sync_notify_failed", + zap.String("allocation_id", allocationID), + zap.String("event_type", eventType), + zap.Error(err)) + } else { + Logger.Info("sync_notify_sent", + zap.String("allocation_id", allocationID), + zap.String("event_type", eventType), + zap.String("response", string(resp))) + } + }() +}