Skip to content

Commit

Permalink
feat: update domain daily trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
askyrie authored and SongZhen0704 committed Aug 20, 2024
1 parent ca0bc9f commit 5a6ffa7
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 30 deletions.
27 changes: 26 additions & 1 deletion cli/ctl/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func RegisterCloudCommand() *cobra.Command {
Use: "cloud",
Short: "debug cloud data commands",
Run: func(cmd *cobra.Command, args []string) {
fmt.Printf("please run with 'info | task'.\n")
fmt.Printf("please run with 'info | task | trigger'.\n")
},
}

Expand Down Expand Up @@ -68,6 +68,16 @@ func RegisterCloudCommand() *cobra.Command {
}
cloud.AddCommand(task)

trigger := &cobra.Command{
Use: "trigger domain-lcuuid",
Short: "trigger domain",
Example: "deepflow-ctl cloud trigger domain-lcuuid",
Run: func(cmd *cobra.Command, args []string) {
triggerDomain(cmd, args)
},
}
cloud.AddCommand(trigger)

return cloud
}

Expand Down Expand Up @@ -145,3 +155,18 @@ func getTask(cmd *cobra.Command, args []string) {
}
common.PrettyPrint(resp.Get("DATA"))
}

func triggerDomain(cmd *cobra.Command, args []string) {
if len(args) == 0 {
fmt.Fprintln(os.Stderr, "must specify domain-lcuuid.")
return
}
server := common.GetServerInfo(cmd)
url := fmt.Sprintf("http://%s:%d/v1/trigger-domain/%s/", server.IP, server.Port, args[0])
resp, err := common.CURLResponseRawJson("GET", url, []common.HTTPOption{common.WithTimeout(common.GetTimeout(cmd))}...)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
common.PrettyPrint(resp)
}
61 changes: 54 additions & 7 deletions server/controller/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ var log = logging.MustGetLogger("cloud")

type Cloud struct {
orgID int
synchronizing bool
db *mysql.DB
cfg config.CloudConfig
cCtx context.Context
cCancel context.CancelFunc
mutex sync.RWMutex
triggerTime time.Time
basicInfo model.BasicInfo
resource model.Resource
platform platform.Platform
Expand All @@ -74,12 +76,22 @@ func NewCloud(orgID int, domain mysql.Domain, cfg config.CloudConfig, ctx contex
return nil
}

// maybe all types will be supported later
var triggerTime time.Time
if domain.Type == common.QINGCLOUD || domain.Type == common.QINGCLOUD_PRIVATE {
triggerTime, err = time.ParseInLocation("15:04", cfg.QingCloudConfig.DailyTriggerTime, time.Local)
if err != nil {
log.Errorf("parse qing config daily trigger time failed: (%s)", err.Error())
}
}

log.Infof("org (%d) cloud task (%s) init success", orgID, domain.Name)

cCtx, cCancel := context.WithCancel(ctx)
return &Cloud{
orgID: orgID,
db: mysqlDB,
orgID: orgID,
db: mysqlDB,
triggerTime: triggerTime,
basicInfo: model.BasicInfo{
OrgID: orgID,
TeamID: domain.TeamID,
Expand Down Expand Up @@ -287,6 +299,11 @@ func (c *Cloud) getCloudGatherInterval() int {
}

func (c *Cloud) getCloudData() {
log.Infof("org (%d) cloud (%s) assemble data starting", c.orgID, c.basicInfo.Name)

c.synchronizing = true
defer func() { c.synchronizing = false }()

var cResource model.Resource
var cloudCost float64
if c.basicInfo.Type != common.KUBERNETES {
Expand Down Expand Up @@ -327,6 +344,7 @@ func (c *Cloud) getCloudData() {
}
// trigger recorder refresh domain resource
c.domainRefreshSignal.Put(struct{}{})
log.Infof("org (%d) cloud (%s) assemble data complete", c.orgID, c.basicInfo.Name)
}

func (c *Cloud) sendStatsd(cloudCost float64) {
Expand All @@ -336,15 +354,40 @@ func (c *Cloud) sendStatsd(cloudCost float64) {
statsd.MetaStatsd.RegisterStatsdTable(c)
}

func (c *Cloud) ClientTrigger() error {
if c.synchronizing {
return fmt.Errorf("cloud (%s) is synchronizing, please try again later", c.basicInfo.Name)
}
go c.getCloudData()
return nil
}

func (c *Cloud) dailyTrigger() bool {
if c.triggerTime.IsZero() {
return true
}

now := time.Now()
dailyTime := time.Date(now.Year(), now.Month(), now.Day(), c.triggerTime.Hour(), c.triggerTime.Minute(), 0, 0, time.Local)
timeSub := now.Sub(dailyTime)
if timeSub >= 0 && timeSub <= time.Minute {
return true
}
log.Infof("now is not trigger time (%s), task (%s) not running", dailyTime.Format(common.GO_BIRTHDAY), c.basicInfo.Name)
return false
}

func (c *Cloud) run() {
log.Infof("org (%d) cloud (%s) started", c.orgID, c.basicInfo.Name)

if err := c.platform.CheckAuth(); err != nil {
log.Errorf("org (%d) cloud (%+v) check auth failed", c.orgID, c.basicInfo)
}
log.Infof("org (%d) cloud (%s) assemble data starting", c.orgID, c.basicInfo.Name)
c.getCloudData()
log.Infof("org (%d) cloud (%s) assemble data complete", c.orgID, c.basicInfo.Name)

// execute immediately upon startup
if c.dailyTrigger() {
c.getCloudData()
}

cloudGatherInterval := c.getCloudGatherInterval()
c.basicInfo.Interval = cloudGatherInterval
Expand All @@ -354,9 +397,13 @@ func (c *Cloud) run() {
for {
select {
case <-ticker.C:
log.Infof("org (%d) cloud (%s) assemble data starting", c.orgID, c.basicInfo.Name)
if c.synchronizing {
continue
}
if !c.dailyTrigger() {
continue
}
c.getCloudData()
log.Infof("org (%d) cloud (%s) assemble data complete", c.orgID, c.basicInfo.Name)
case <-c.cCtx.Done():
log.Infof("org (%d) cloud (%s) stopped", c.orgID, c.basicInfo.Name)
return
Expand Down
21 changes: 0 additions & 21 deletions server/controller/cloud/qingcloud/qingcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ type QingCloud struct {
httpTimeout int
MaxRetries uint
RetryDuration uint
DailyTriggerTime time.Time

defaultVPCName string
defaultVxnetName string
Expand Down Expand Up @@ -116,15 +115,6 @@ func NewQingCloud(orgID int, domain mysql.Domain, cfg cloudconfig.CloudConfig) (
url = "https://api.qingcloud.com"
}

var dailyTriggerTime time.Time
if cfg.QingCloudConfig.DailyTriggerTime != "" {
dailyTriggerTime, err = time.ParseInLocation("15:04", cfg.QingCloudConfig.DailyTriggerTime, time.Local)
if err != nil {
log.Errorf("parse qing config daily trigger time failed: (%s)", err.Error())
return nil, err
}
}

return &QingCloud{
orgID: orgID,
teamID: domain.TeamID,
Expand All @@ -141,7 +131,6 @@ func NewQingCloud(orgID int, domain mysql.Domain, cfg cloudconfig.CloudConfig) (
MaxRetries: cfg.QingCloudConfig.MaxRetries,
RetryDuration: cfg.QingCloudConfig.RetryDuration,
DisableSyncLBListener: cfg.QingCloudConfig.DisableSyncLBListener,
DailyTriggerTime: dailyTriggerTime,

defaultVPCName: domain.Name + "_default_vpc",
defaultVxnetName: "vxnet-0",
Expand Down Expand Up @@ -335,16 +324,6 @@ func (q *QingCloud) GetStatter() statsd.StatsdStatter {
func (q *QingCloud) GetCloudData() (model.Resource, error) {
var resource model.Resource

if !q.DailyTriggerTime.IsZero() {
now := time.Now()
triggerTime := time.Date(now.Year(), now.Month(), now.Day(), q.DailyTriggerTime.Hour(), q.DailyTriggerTime.Minute(), 0, 0, time.Local)
timeSub := now.Sub(triggerTime)
if timeSub < time.Second || timeSub > time.Minute {
log.Infof("now is not the trigger time (%s), the task is not running", triggerTime.Format(common.GO_BIRTHDAY))
return resource, nil
}
}

// every tasks must init
q.CloudStatsd = statsd.NewCloudStatsd()

Expand Down
2 changes: 1 addition & 1 deletion server/controller/cloud/volcengine/cen.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (v *VolcEngine) getCens(sess *session.Session) ([]model.CEN, error) {
}

if len(vpcLcuuids) == 0 {
log.Infof("cen (%s) not bind vpc")
log.Infof("cen (%s) not bind vpc", cenName)
continue
}
cens = append(cens, model.CEN{
Expand Down
9 changes: 9 additions & 0 deletions server/controller/http/router/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (d *Debug) RegisterTo(e *gin.Engine) {
e.GET("/v1/tasks/", getCloudBasicInfos(d.m))
e.GET("/v1/tasks/:lcuuid/", getCloudBasicInfo(d.m))
e.GET("/v1/info/:lcuuid/", getCloudResource(d.m))
e.GET("/v1/trigger-domain/:lcuuid/", triggerDomain(d.m))
e.GET("/v1/genesis/:type/", getGenesisSyncData(d.g, true))
e.GET("/v1/sync/:type/", getGenesisSyncData(d.g, false))
e.GET("/v1/agent-stats/:vtapID/", getAgentStats(d.g))
Expand Down Expand Up @@ -87,6 +88,14 @@ func getCloudResource(m *manager.Manager) gin.HandlerFunc {
})
}

func triggerDomain(m *manager.Manager) gin.HandlerFunc {
return gin.HandlerFunc(func(c *gin.Context) {
lcuuid := c.Param("lcuuid")
err := service.TriggerDomain(lcuuid, m)
JsonResponse(c, nil, err)
})
}

func getKubernetesGatherBasicInfos(m *manager.Manager) gin.HandlerFunc {
return gin.HandlerFunc(func(c *gin.Context) {
data, err := service.GetKubernetesGatherBasicInfos(c.Param("lcuuid"), m)
Expand Down
4 changes: 4 additions & 0 deletions server/controller/http/service/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func GetCloudResource(lcuuid string, m *manager.Manager) (resp cloudmodel.Resour
}
}

func TriggerDomain(lcuuid string, m *manager.Manager) error {
return m.TriggerDomain(lcuuid)
}

func TriggerKubernetesRefresh(domainLcuuid, subDomainLcuuid string, version int, m *manager.Manager) error {
return m.TriggerKubernetesRefresh(domainLcuuid, subDomainLcuuid, version)
}
Expand Down
10 changes: 10 additions & 0 deletions server/controller/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ func (m *Manager) GetCloudResource(lcuuid string) (model.Resource, error) {
return cloudResource, nil
}

func (m *Manager) TriggerDomain(lcuuid string) error {
m.mutex.RLock()
defer m.mutex.RUnlock()
task, ok := m.taskMap[lcuuid]
if !ok {
return fmt.Errorf("domain (%s) not found", lcuuid)
}
return task.Cloud.ClientTrigger()
}

func (m *Manager) GetKubernetesGatherBasicInfos(lcuuid string) ([]gathermodel.KubernetesGatherBasicInfo, error) {
var k8sGatherBasicInfos []gathermodel.KubernetesGatherBasicInfo

Expand Down

0 comments on commit 5a6ffa7

Please sign in to comment.