Skip to content

Commit

Permalink
feat: refactors resource event
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhengYa-0110 authored and SongZhen0704 committed Feb 20, 2025
1 parent e152fef commit 483b3fe
Show file tree
Hide file tree
Showing 60 changed files with 982 additions and 1,066 deletions.
9 changes: 8 additions & 1 deletion server/controller/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/deepflowio/deepflow/server/controller/native_field"
"github.com/deepflowio/deepflow/server/controller/prometheus"
"github.com/deepflowio/deepflow/server/controller/recorder"
"github.com/deepflowio/deepflow/server/controller/recorder/event"
"github.com/deepflowio/deepflow/server/controller/report"
"github.com/deepflowio/deepflow/server/controller/statsd"
"github.com/deepflowio/deepflow/server/controller/tagrecorder"
Expand Down Expand Up @@ -140,7 +141,13 @@ func Start(ctx context.Context, configPath, serverLogFile string, shared *server
router.SetInitStageForHealthChecker("Manager init")
// 启动resource manager
// 每个云平台启动一个cloud和recorder
m := manager.NewManager(cfg.ManagerCfg, shared.ResourceEventQueue)
err := event.GetSubscriberManager().Start(shared.ResourceEventQueue)
if err != nil {
log.Errorf("resource event subscriber manager start failed: %s", err.Error())
time.Sleep(time.Second)
os.Exit(0)
}
m := manager.NewManager(cfg.ManagerCfg)
m.Start()

router.SetInitStageForHealthChecker("Trisolaris init")
Expand Down
21 changes: 9 additions & 12 deletions server/controller/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,20 @@ import (
"github.com/deepflowio/deepflow/server/controller/recorder"
recordercfg "github.com/deepflowio/deepflow/server/controller/recorder/config"
"github.com/deepflowio/deepflow/server/libs/logger"
"github.com/deepflowio/deepflow/server/libs/queue"
)

var log = logger.MustGetLogger("manager")

type Manager struct {
cfg config.ManagerConfig
taskMap map[string]*Task
mutex sync.RWMutex
resourceEventQueue *queue.OverwriteQueue
cfg config.ManagerConfig
taskMap map[string]*Task
mutex sync.RWMutex
}

func NewManager(cfg config.ManagerConfig, resourceEventQueue *queue.OverwriteQueue) *Manager {
func NewManager(cfg config.ManagerConfig) *Manager {
return &Manager{
cfg: cfg,
taskMap: make(map[string]*Task),
resourceEventQueue: resourceEventQueue,
cfg: cfg,
taskMap: make(map[string]*Task),
}
}

Expand Down Expand Up @@ -240,7 +237,7 @@ func (m *Manager) run(ctx context.Context) {
for _, lcuuid := range addDomainLcuuids.ToSlice() {
addedLcuuid := lcuuid.(string)
domain := lcuuidToDomain[addedLcuuid]
task := NewTask(orgID, domain, m.cfg.TaskCfg, ctx, m.resourceEventQueue)
task := NewTask(orgID, domain, m.cfg.TaskCfg, ctx)
if task == nil || task.Cloud == nil {
log.Errorf("domain (%s) init failed", domain.Name, logger.NewORGPrefix(orgID))
continue
Expand All @@ -263,7 +260,7 @@ func (m *Manager) run(ctx context.Context) {
log.Infof("domain (%s) oldDomainConfig: %s", newDomain.Name, oldDomainConfig, logger.NewORGPrefix(orgID))
log.Infof("domain (%s) newDomainConfig: %s", newDomain.Name, newDomain.Config, logger.NewORGPrefix(orgID))
m.taskMap[domainLcuuid].Stop()
task := NewTask(orgID, newDomain, m.cfg.TaskCfg, ctx, m.resourceEventQueue)
task := NewTask(orgID, newDomain, m.cfg.TaskCfg, ctx)
if task == nil || task.Cloud == nil {
log.Errorf("domain (%s) init failed", newDomain.Name, logger.NewORGPrefix(orgID))
continue
Expand All @@ -279,7 +276,7 @@ func (m *Manager) run(ctx context.Context) {
if oldDomainName != newDomain.Name {
if m.taskMap[domainLcuuid].Cloud.GetBasicInfo().Type == common.KUBERNETES {
m.taskMap[domainLcuuid].Stop()
task := NewTask(orgID, newDomain, m.cfg.TaskCfg, ctx, m.resourceEventQueue)
task := NewTask(orgID, newDomain, m.cfg.TaskCfg, ctx)
if task == nil || task.Cloud == nil {
log.Errorf("domain (%s) init failed", newDomain.Name, logger.NewORGPrefix(orgID))
continue
Expand Down
4 changes: 2 additions & 2 deletions server/controller/manager/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Task struct {
subDomainRefreshSignals cmap.ConcurrentMap[string, *queue.OverwriteQueue] // key: subDomainLcuuid
}

func NewTask(orgID int, domain metadbmodel.Domain, cfg config.TaskConfig, ctx context.Context, resourceEventQueue *queue.OverwriteQueue) *Task {
func NewTask(orgID int, domain metadbmodel.Domain, cfg config.TaskConfig, ctx context.Context) *Task {
tCtx, tCancel := context.WithCancel(ctx)
t := &Task{
tCtx: tCtx,
Expand All @@ -68,7 +68,7 @@ func NewTask(orgID int, domain metadbmodel.Domain, cfg config.TaskConfig, ctx co
log.Errorf("domain: %s %s, failed to create cloud task", domain.Name, domain.Lcuuid, t.LogPrefixORGID)
return nil
}
rcd := recorder.NewRecorder(tCtx, cfg.RecorderCfg, resourceEventQueue, orgID, domain.Lcuuid)
rcd := recorder.NewRecorder(tCtx, cfg.RecorderCfg, orgID, domain.Lcuuid)
if rcd == nil {
log.Errorf("domain: %s %s, failed to create recorder", domain.Name, domain.Lcuuid, t.LogPrefixORGID)
return nil
Expand Down
2 changes: 1 addition & 1 deletion server/controller/recorder/cache/diffbase/diff_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type DataSet struct {

LogController

Regions map[string]*Region
Regions map[string]*Region // TODO change to private
AZs map[string]*AZ
SubDomains map[string]*SubDomain
Hosts map[string]*Host
Expand Down
12 changes: 12 additions & 0 deletions server/controller/recorder/common/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (m *Metadata) Copy() *Metadata {
}
}

func (m *Metadata) GetDB() *metadb.DB {
return m.DB
}

func (m *Metadata) GetORGID() int {
return m.ORGID
}
Expand All @@ -67,6 +71,14 @@ func (m *Metadata) GetTeamID() int {
}
}

func (m *Metadata) GetDomainInfo() *DomainInfo {
return m.Domain
}

func (m *Metadata) GetSubDomainInfo() *SubDomainInfo {
return m.SubDomain
}

func (m *Metadata) SetDomain(domain metadbmodel.Domain) {
m.Domain = &DomainInfo{domain}
m.LogPrefixes = append(m.LogPrefixes, logger.NewTeamPrefix(domain.TeamID))
Expand Down
51 changes: 30 additions & 21 deletions server/controller/recorder/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (
rcommon "github.com/deepflowio/deepflow/server/controller/recorder/common"
"github.com/deepflowio/deepflow/server/controller/recorder/config"
"github.com/deepflowio/deepflow/server/controller/recorder/listener"
"github.com/deepflowio/deepflow/server/controller/recorder/pubsub"
"github.com/deepflowio/deepflow/server/controller/recorder/pubsub/message"
"github.com/deepflowio/deepflow/server/controller/recorder/statsd"
"github.com/deepflowio/deepflow/server/controller/recorder/updater"
"github.com/deepflowio/deepflow/server/controller/trisolaris/refresh"
"github.com/deepflowio/deepflow/server/libs/queue"
)

const (
Expand All @@ -48,20 +49,29 @@ type domain struct {
metadata *rcommon.Metadata
statsd *statsd.DomainStatsd

eventQueue *queue.OverwriteQueue
cache *cache.Cache
subDomains *subDomains

pubsub pubsub.AnyChangePubSub
msgMetadata *message.Metadata
}

func newDomain(ctx context.Context, cfg config.RecorderConfig, eventQueue *queue.OverwriteQueue, md *rcommon.Metadata) *domain {
func newDomain(ctx context.Context, cfg config.RecorderConfig, md *rcommon.Metadata) *domain {
cacheMng := cache.NewCacheManager(ctx, cfg, md)
return &domain{
metadata: md,
statsd: statsd.NewDomainStatsd(md),

eventQueue: eventQueue,
cache: cacheMng.DomainCache,
subDomains: newSubDomains(ctx, cfg, eventQueue, md, cacheMng),
subDomains: newSubDomains(ctx, cfg, md, cacheMng),

pubsub: pubsub.GetPubSub(pubsub.PubSubTypeWholeDomain).(pubsub.AnyChangePubSub),
msgMetadata: message.NewMetadata(
md.GetORGID(),
message.MetadataDomainLcuuid(md.GetDomainInfo().Lcuuid),
message.MetadataToolDataSet(cacheMng.DomainCache.ToolDataSet),
message.MetadataDB(md.GetDB()),
),
}
}

Expand Down Expand Up @@ -141,11 +151,10 @@ func (d *domain) refresh(cloudData cloudmodel.Resource) {

// 指定创建及更新操作的资源顺序
// 基本原则:无依赖资源优先;实时性需求高资源优先
listener := listener.NewWholeDomain(d.metadata.Domain.Lcuuid, d.cache, d.eventQueue)
domainUpdatersInUpdateOrder := d.getUpdatersInOrder(cloudData)
d.executeUpdaters(domainUpdatersInUpdateOrder)
d.notifyOnResourceChanged(domainUpdatersInUpdateOrder)
listener.OnUpdatersCompleted()
d.pubsub.PublishChange(d.msgMetadata)

d.updateSyncedAt(cloudData.SyncAt)

Expand All @@ -154,8 +163,8 @@ func (d *domain) refresh(cloudData cloudmodel.Resource) {

func (d *domain) getUpdatersInOrder(cloudData cloudmodel.Resource) []updater.ResourceUpdater {
ip := updater.NewIP(d.cache, cloudData.IPs, nil)
ip.GetLANIP().RegisterListener(listener.NewLANIP(d.cache, d.eventQueue))
ip.GetWANIP().RegisterListener(listener.NewWANIP(d.cache, d.eventQueue))
ip.GetLANIP().RegisterListener(listener.NewLANIP(d.cache))
ip.GetWANIP().RegisterListener(listener.NewWANIP(d.cache))

return []updater.ResourceUpdater{
updater.NewRegion(d.cache, cloudData.Regions).RegisterListener(
Expand All @@ -167,21 +176,21 @@ func (d *domain) getUpdatersInOrder(cloudData cloudmodel.Resource) []updater.Res
updater.NewVPC(d.cache, cloudData.VPCs).RegisterListener(
listener.NewVPC(d.cache)),
updater.NewHost(d.cache, cloudData.Hosts).RegisterListener(
listener.NewHost(d.cache, d.eventQueue)),
listener.NewHost(d.cache)),
updater.NewVM(d.cache, cloudData.VMs).RegisterListener(
listener.NewVM(d.cache, d.eventQueue)).BuildStatsd(d.statsd),
listener.NewVM(d.cache)).BuildStatsd(d.statsd),
updater.NewPodCluster(d.cache, cloudData.PodClusters).RegisterListener(
listener.NewPodCluster(d.cache)),
updater.NewPodNode(d.cache, cloudData.PodNodes).RegisterListener(
listener.NewPodNode(d.cache, d.eventQueue)),
listener.NewPodNode(d.cache)),
updater.NewPodNamespace(d.cache, cloudData.PodNamespaces).RegisterListener(
listener.NewPodNamespace(d.cache)),
updater.NewPodIngress(d.cache, cloudData.PodIngresses).RegisterListener(
listener.NewPodIngress(d.cache)),
updater.NewPodIngressRule(d.cache, cloudData.PodIngressRules).RegisterListener(
listener.NewPodIngressRule(d.cache)),
updater.NewPodService(d.cache, cloudData.PodServices).RegisterListener(
listener.NewPodService(d.cache, d.eventQueue)),
listener.NewPodService(d.cache)),
updater.NewPodIngressRuleBackend(d.cache, cloudData.PodIngressRuleBackends).RegisterListener(
listener.NewPodIngressRuleBackend(d.cache)),
updater.NewPodServicePort(d.cache, cloudData.PodServicePorts).RegisterListener(
Expand All @@ -193,35 +202,35 @@ func (d *domain) getUpdatersInOrder(cloudData cloudmodel.Resource) []updater.Res
updater.NewPodReplicaSet(d.cache, cloudData.PodReplicaSets).RegisterListener(
listener.NewPodReplicaSet(d.cache)),
updater.NewPod(d.cache, cloudData.Pods).RegisterListener(
listener.NewPod(d.cache, d.eventQueue)).BuildStatsd(d.statsd),
listener.NewPod(d.cache)).BuildStatsd(d.statsd),
updater.NewNetwork(d.cache, cloudData.Networks).RegisterListener(
listener.NewNetwork(d.cache)),
updater.NewSubnet(d.cache, cloudData.Subnets).RegisterListener(
listener.NewSubnet(d.cache)),
updater.NewVRouter(d.cache, cloudData.VRouters).RegisterListener(
listener.NewVRouter(d.cache, d.eventQueue)),
listener.NewVRouter(d.cache)),
updater.NewRoutingTable(d.cache, cloudData.RoutingTables).RegisterListener(
listener.NewRoutingTable(d.cache)),
updater.NewDHCPPort(d.cache, cloudData.DHCPPorts).RegisterListener(
listener.NewDHCPPort(d.cache, d.eventQueue)),
listener.NewDHCPPort(d.cache)),
updater.NewNATGateway(d.cache, cloudData.NATGateways).RegisterListener(
listener.NewNATGateway(d.cache, d.eventQueue)),
listener.NewNATGateway(d.cache)),
updater.NewNATVMConnection(d.cache, cloudData.NATVMConnections).RegisterListener(
listener.NewNATVMConnection(d.cache)),
updater.NewNATRule(d.cache, cloudData.NATRules).RegisterListener(
listener.NewNATRule(d.cache)),
updater.NewLB(d.cache, cloudData.LBs).RegisterListener(
listener.NewLB(d.cache, d.eventQueue)),
listener.NewLB(d.cache)),
updater.NewLBVMConnection(d.cache, cloudData.LBVMConnections).RegisterListener(
listener.NewLBVMConnection(d.cache)),
updater.NewLBListener(d.cache, cloudData.LBListeners).RegisterListener(
listener.NewLBListener(d.cache)),
updater.NewLBTargetServer(d.cache, cloudData.LBTargetServers).RegisterListener(
listener.NewLBTargetServer(d.cache)),
updater.NewRDSInstance(d.cache, cloudData.RDSInstances).RegisterListener(
listener.NewRDSInstance(d.cache, d.eventQueue)),
listener.NewRDSInstance(d.cache)),
updater.NewRedisInstance(d.cache, cloudData.RedisInstances).RegisterListener(
listener.NewRedisInstance(d.cache, d.eventQueue)),
listener.NewRedisInstance(d.cache)),
updater.NewPeerConnection(d.cache, cloudData.PeerConnections).RegisterListener(
listener.NewPeerConnection(d.cache)),
updater.NewCEN(d.cache, cloudData.CENs).RegisterListener(
Expand All @@ -236,7 +245,7 @@ func (d *domain) getUpdatersInOrder(cloudData cloudmodel.Resource) []updater.Res
updater.NewVMPodNodeConnection(d.cache, cloudData.VMPodNodeConnections).RegisterListener( // VMPodNodeConnection需放在最后
listener.NewVMPodNodeConnection(d.cache)),
updater.NewProcess(d.cache, cloudData.Processes).RegisterListener(
listener.NewProcess(d.cache, d.eventQueue)),
listener.NewProcess(d.cache)),
}
}

Expand Down
Loading

0 comments on commit 483b3fe

Please sign in to comment.