Skip to content

Commit

Permalink
More hybrid features
Browse files Browse the repository at this point in the history
  • Loading branch information
frikky committed Mar 27, 2021
1 parent 33d71c8 commit 225713b
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 92 deletions.
116 changes: 76 additions & 40 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ func GetPrioritizedApps(ctx context.Context, user User) ([]WorkflowApp, error) {
allApps := []WorkflowApp{}
//log.Printf("[INFO] LOOPING REAL APPS: %d. Private: %d", len(user.PrivateApps))

// 1. Caching apps locally
cacheKey := fmt.Sprintf("apps_%s", user.Id)
if project.CacheDb {
cache, err := GetCache(ctx, cacheKey)
Expand Down Expand Up @@ -971,66 +972,101 @@ func GetPrioritizedApps(ctx context.Context, user User) ([]WorkflowApp, error) {
}
}

query = datastore.NewQuery("workflowapp").Filter("public =", true).Limit(limit)
for {
it := project.Dbclient.Run(ctx, query)
// Find public apps
publicApps := []WorkflowApp{}
publicAppsKey := fmt.Sprintf("public_apps")
if project.CacheDb {
cache, err := GetCache(ctx, publicAppsKey)
if err == nil {
cacheData := []byte(cache.([]uint8))
err = json.Unmarshal(cacheData, &publicApps)
if err == nil {
return allApps, nil
} else {
log.Printf("Failed unmarshaling PUBLIC apps: %s", err)
log.Printf("DATALEN: %d", len(cacheData))
}
} else {
log.Printf("[INFO] Failed getting cache for PUBLIC apps: %s", err)
}
}

if len(publicApps) == 0 {
query = datastore.NewQuery("workflowapp").Filter("public =", true).Limit(limit)
for {
innerApp := WorkflowApp{}
_, err := it.Next(&innerApp)
if err != nil {
if strings.Contains(fmt.Sprintf("%s", err), "cannot load field") {
log.Printf("[WARNING] Error in public app load: %s.", err)
continue
it := project.Dbclient.Run(ctx, query)

for {
innerApp := WorkflowApp{}
_, err := it.Next(&innerApp)
if err != nil {
if strings.Contains(fmt.Sprintf("%s", err), "cannot load field") {
log.Printf("[WARNING] Error in public app load: %s.", err)
continue
}

log.Printf("[WARNING] No more apps (public)? Breaking: %s.", err)
break
}

log.Printf("[WARNING] No more apps (public)? Breaking: %s.", err)
break
//log.Printf("APP: %s", innerApp.Name)
found := false
//log.Printf("ACTIONS: %d - %s", len(app.Actions), app.Name)
for _, loopedApp := range allApps {
if loopedApp.Name == innerApp.Name || loopedApp.ID == innerApp.ID {
found = true
break
}
}

if !found {
publicApps = append(publicApps, innerApp)
}
}

//log.Printf("APP: %s", innerApp.Name)
found := false
//log.Printf("ACTIONS: %d - %s", len(app.Actions), app.Name)
for _, loopedApp := range allApps {
if loopedApp.Name == innerApp.Name || loopedApp.ID == innerApp.ID {
found = true
if err != iterator.Done {
//log.Printf("[INFO] Failed fetching results: %v", err)
//break
}

// Get the cursor for the next page of results.
nextCursor, err := it.Cursor()
if err != nil {
log.Printf("Cursorerror: %s", err)
break
} else {
//log.Printf("NEXTCURSOR: %s", nextCursor)
nextStr := fmt.Sprintf("%s", nextCursor)
if cursorStr == nextStr {
break
}

cursorStr = nextStr
query = query.Start(nextCursor)
//cursorStr = nextCursor
//break
}

if !found {
allApps = append(allApps, innerApp)
if len(allApps) > maxLen {
break
}
}

if err != iterator.Done {
//log.Printf("[INFO] Failed fetching results: %v", err)
//break
newbody, err := json.Marshal(publicApps)
if err != nil {
return allApps, nil
}

// Get the cursor for the next page of results.
nextCursor, err := it.Cursor()
err = SetCache(ctx, publicAppsKey, newbody)
if err != nil {
log.Printf("Cursorerror: %s", err)
break
log.Printf("[INFO] Error setting app cache item for %s: %v", publicAppsKey, err)
} else {
//log.Printf("NEXTCURSOR: %s", nextCursor)
nextStr := fmt.Sprintf("%s", nextCursor)
if cursorStr == nextStr {
break
}

cursorStr = nextStr
query = query.Start(nextCursor)
//cursorStr = nextCursor
//break
}

if len(allApps) > maxLen {
break
log.Printf("[INFO] Set app cache for %s", publicAppsKey)
}
}

allApps = append(allApps, publicApps...)

if len(allApps) > 0 {
newbody, err := json.Marshal(allApps)
if err != nil {
Expand Down
124 changes: 72 additions & 52 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,38 +173,33 @@ func HandleGetOrg(resp http.ResponseWriter, request *http.Request) {
return
}

//FIXME : cleanup org before marshal
admin := false
userFound := false
for _, foundUser := range org.Users {
if foundUser.Id == user.Id {
for _, inneruser := range org.Users {
if inneruser.Id == user.Id {
userFound = true

if inneruser.Role == "admin" {
admin = true
}

break
}
}

// FIXME
if org.Users == nil || len(org.Users) == 0 {
log.Printf("No users found in org. Checking if ok")
if org.Name == user.Username {
user.PrivateApps = []WorkflowApp{}
user.Role = "admin"
org.Users = append(org.Users, user)
_ = SetOrg(ctx, *org, org.Id)
} else {
log.Printf("Couldn't find user in org")
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "reason": "User doesn't have access to org (1)"}`))
return
}
} else if !userFound {
log.Printf("Couldn't find user in org")
_ = admin
if !userFound {
log.Printf("[WARNING] User %s isn't a part of org %s (get)", user.Id, org.Id)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "reason": "User doesn't have access to org"}`))
return

}

org.Users = []User{}
org.SyncConfig.Apikey = ""
org.SyncConfig.Source = ""

newjson, err := json.Marshal(org)
if err != nil {
log.Printf("Failed unmarshal of org: %s", err)
Expand Down Expand Up @@ -1251,7 +1246,7 @@ func GetWorkflowExecutions(resp http.ResponseWriter, request *http.Request) {
if workflow.OrgId == user.ActiveOrg.Id && user.Role == "admin" {
log.Printf("[INFO] User %s is accessing %s executions as admin", user.Username, workflow.ID)
} else {
log.Printf("[WARNING] Wrong user (%s) for workflow %s (get execution)", user.Username, workflow.ID)
log.Printf("[WARNING] Wrong user (%s) for workflow %s (get workflow)", user.Username, workflow.ID)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
Expand Down Expand Up @@ -2155,7 +2150,6 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {

// Here to check access rights
ctx := getContext(request)

tmpworkflow, err := GetWorkflow(ctx, fileId)
if err != nil {
log.Printf("Failed getting the workflow locally (save workflow): %s", err)
Expand All @@ -2165,45 +2159,67 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
}

// FIXME - have a check for org etc too..
workflow := Workflow{}
if user.Id != tmpworkflow.Owner {
if tmpworkflow.OrgId == user.ActiveOrg.Id && user.Role == "admin" {
log.Printf("[INFO] User %s is accessing %s executions as admin", user.Username, tmpworkflow.ID)
} else if tmpworkflow.Public {
log.Printf("\n\nSHOULD CREATE A NEW WORKFLOW FOR THE USER :O\n\n")

workflow = *tmpworkflow
workflow.ID = uuid.NewV4().String()
workflow.Public = false
workflow.Owner = user.Id
workflow.Org = []Org{
user.ActiveOrg,
}
workflow.ExecutingOrg = user.ActiveOrg
workflow.OrgId = user.ActiveOrg.Id
workflow.PreviouslySaved = false

err = SetWorkflow(ctx, workflow, workflow.ID)
if err != nil {
log.Printf("[WARNING] Failed saving NEW version of public %s for user %s: %s", tmpworkflow.ID, user.Username, err)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
}

resp.WriteHeader(200)
resp.Write([]byte(fmt.Sprintf(`{"success": true, "new_id": "%s"}`, workflow.ID)))
return
} else {
log.Printf("Wrong user (%s) for workflow %s (save)", user.Username, tmpworkflow.ID)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
}
}

//log.Printf("PRE BODY")
body, err := ioutil.ReadAll(request.Body)
if err != nil {
log.Printf("Failed hook unmarshaling: %s", err)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
}
} else {
body, err := ioutil.ReadAll(request.Body)
if err != nil {
log.Printf("Failed hook unmarshaling: %s", err)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
}

var workflow Workflow
err = json.Unmarshal([]byte(body), &workflow)
//log.Printf(string(body))
if err != nil {
log.Printf(string(body))
log.Printf("[ERROR] Failed workflow unmarshaling: %s", err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
return
}
err = json.Unmarshal([]byte(body), &workflow)
if err != nil {
log.Printf(string(body))
log.Printf("[ERROR] Failed workflow unmarshaling: %s", err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
return
}

if workflow.Public {
log.Printf("[WARNING] Rolling back public as the user set it to true themselves")
workflow.Public = false
}
if workflow.Public {
log.Printf("[WARNING] Rolling back public as the user set it to true themselves")
workflow.Public = false
}

if len(workflow.PublishedId) > 0 {
log.Printf("[INFO] Workflow %s has the published ID %s", workflow.ID, workflow.PublishedId)
//PubishedId string `json:"published_id" yaml:"published_id"`
if len(workflow.PublishedId) > 0 {
log.Printf("[INFO] Workflow %s has the published ID %s", workflow.ID, workflow.PublishedId)
}
}

// FIXME - auth and check if they should have access
Expand Down Expand Up @@ -3468,7 +3484,7 @@ func GetSpecificWorkflow(resp http.ResponseWriter, request *http.Request) {

user, err := HandleApiAuthentication(resp, request)
if err != nil {
log.Printf("Api authentication failed in getting specific workflow: %s", err)
log.Printf("[WARNING] Api authentication failed in getting specific workflow: %s", err)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
Expand Down Expand Up @@ -3500,20 +3516,24 @@ func GetSpecificWorkflow(resp http.ResponseWriter, request *http.Request) {
ctx := getContext(request)
workflow, err := GetWorkflow(ctx, fileId)
if err != nil {
log.Printf("Workflow %s doesn't exist.", fileId)
log.Printf("[WARNING] Workflow %s doesn't exist.", fileId)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "reason": "Item already exists."}`))
return
}

//log.Printf("%#v", workflow)

// CHECK orgs of user, or if user is owner
// FIXME - add org check too, and not just owner
// Check workflow.Sharing == private / public / org too
if user.Id != workflow.Owner || len(user.Id) == 0 {
if workflow.OrgId == user.ActiveOrg.Id && user.Role == "admin" {
log.Printf("[INFO] User %s is accessing workflow %s as admin", user.Username, workflow.ID)
} else if workflow.Public {
log.Printf("[INFO] Letting user %s access workflow %s because it's public", user.Username, workflow.ID)
} else {
log.Printf("Wrong user (%s) for workflow %s (get workflow)", user.Username, workflow.ID)
log.Printf("[WARNING] Wrong user (%s) for workflow %s (get workflow)", user.Username, workflow.ID)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
Expand Down Expand Up @@ -4440,7 +4460,6 @@ func AbortExecution(resp http.ResponseWriter, request *http.Request) {
func SanitizeWorkflow(workflow Workflow) Workflow {
log.Printf("[INFO] Sanitizing workflow %s", workflow.ID)

workflow.Owner = ""
for _, trigger := range workflow.Triggers {
_ = trigger
}
Expand All @@ -4453,6 +4472,7 @@ func SanitizeWorkflow(workflow Workflow) Workflow {
_ = variable
}

workflow.Owner = ""
workflow.Org = []Org{}
workflow.OrgId = ""
workflow.ExecutingOrg = Org{}
Expand Down
1 change: 1 addition & 0 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ type Defaults struct {
type SyncConfig struct {
Interval int64 `json:"interval" datastore:"interval"`
Apikey string `json:"api_key" datastore:"api_key"`
Source string `json:"source" datastore:"source"`
}

type PaymentSubscription struct {
Expand Down

0 comments on commit 225713b

Please sign in to comment.