Skip to content

Commit

Permalink
[OB] Support remote argo-events execution
Browse files Browse the repository at this point in the history
josephsirak committed Jan 15, 2025

Unverified

This user has not yet uploaded their public signing key.
1 parent 7fc4271 commit 05c5626
Showing 2 changed files with 56 additions and 3 deletions.
36 changes: 34 additions & 2 deletions pkg/reconciler/eventsource/resource.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,8 @@ import (
"encoding/json"
"fmt"
"sort"
"os"
"strings"

"github.com/argoproj/argo-events/pkg/apis/events/v1alpha1"
aev1 "github.com/argoproj/argo-events/pkg/apis/events/v1alpha1"
@@ -98,7 +100,7 @@ func Reconcile(client client.Client, args *AdaptorArgs, logger *zap.SugaredLogge
logger.Errorw("error getting existing service", "error", err)
return err
}
expectedSvc, err := buildService(args)
expectedSvc, err := buildService(args, logger)
if err != nil {
eventSource.Status.MarkDeployFailed("BuildServiceFailed", "Failed to build service spec")
logger.Errorw("error building service spec", "error", err)
@@ -424,7 +426,7 @@ func getService(ctx context.Context, cl client.Client, args *AdaptorArgs) (*core
return nil, apierrors.NewNotFound(schema.GroupResource{}, "")
}

func buildService(args *AdaptorArgs) (*corev1.Service, error) {
func buildService(args *AdaptorArgs, logger *zap.SugaredLogger) (*corev1.Service, error) {
eventSource := args.EventSource
if eventSource.Spec.Service == nil {
return nil, nil
@@ -464,6 +466,36 @@ func buildService(args *AdaptorArgs) (*corev1.Service, error) {
svc.ObjectMeta.SetLabels(labels)
svc.ObjectMeta.SetAnnotations(annotations)

if os.Getenv("DEPLOYMENT_NAME") != "" {
webhookEndpoint := "webhook." + eventSource.Namespace + "." + os.Getenv("DEPLOYMENT_NAME")
if svc.ObjectMeta.Annotations == nil {
svc.ObjectMeta.Annotations = map[string]string{
"external-dns.alpha.kubernetes.io/hostname": webhookEndpoint,
}
} else {
svc.ObjectMeta.Annotations["external-dns.alpha.kubernetes.io/hostname"] = webhookEndpoint
}
}

if os.Getenv("WEBHOOK_SERVICE_TYPE") != "" {
switch os.Getenv("WEBHOOK_SERVICE_TYPE") {
case "LoadBalancer":
svc.Spec.Type = corev1.ServiceTypeLoadBalancer
default:
svc.Spec.Type = corev1.ServiceTypeClusterIP
}
}

if os.Getenv("LOADBALANCER_IP_RANGES") != "" {
ipRangesOpts := os.Getenv("LOADBALANCER_IP_RANGES")
ipRanges := strings.Split(ipRangesOpts, ",")
for idx := range ipRanges {
ipRanges[idx] = ipRanges[idx] + "/32"
}
svc.Spec.LoadBalancerSourceRanges = append(svc.Spec.LoadBalancerSourceRanges, ipRanges...)
}

logger.Infof("created service object %#v", svc)
if err := controllerscommon.SetObjectMeta(eventSource, svc, v1alpha1.EventSourceGroupVersionKind); err != nil {
return nil, err
}
23 changes: 22 additions & 1 deletion pkg/sensors/triggers/argo-workflow/argo-workflow.go
Original file line number Diff line number Diff line change
@@ -129,6 +129,17 @@ func (t *ArgoWorkflowTrigger) Execute(ctx context.Context, events map[string]*v1
namespace = t.Sensor.Namespace
}

// name: ARGO_CLI_EXTRA_ARGS
// value: --kubeconfig /my/path/to/kubeconfig
var extraArgs []string
extraArgsEnv := os.Getenv("ARGO_CLI_EXTRA_ARGS")
if extraArgsEnv != "" {
esplit := strings.Split(extraArgsEnv, " ")
for i := range esplit {
extraArgs = append(extraArgs, esplit[i])
}
}

var cmd *exec.Cmd

switch op {
@@ -157,7 +168,10 @@ func (t *ArgoWorkflowTrigger) Execute(ctx context.Context, events map[string]*v1
if _, err := file.Write(jObj); err != nil {
return nil, fmt.Errorf("failed to write workflow json %s to the temp file %s, %w", name, file.Name(), err)
}
cmd = exec.Command("argo", "-n", namespace, "submit", file.Name())

allArgs := []string{"-n", namespace, "submit", file.Name()}
allArgs = append(allArgs, extraArgs...)
cmd = exec.Command("argo", allArgs...)
case v1alpha1.SubmitFrom:
kind := obj.GetKind()
switch strings.ToLower(kind) {
@@ -189,10 +203,17 @@ func (t *ArgoWorkflowTrigger) Execute(ctx context.Context, events map[string]*v1
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Args = append(cmd.Args, trigger.Template.ArgoWorkflow.Args...)
t.Logger.Infof("submitting command %#v", cmd)
if err := t.cmdRunner(cmd); err != nil {
return nil, fmt.Errorf("failed to execute %s command for workflow %s, %w", string(op), name, err)
}

triggerOnly := os.Getenv("TRIGGER_WF_ONLY")
if !strings.EqualFold(triggerOnly, "") {
var i interface{}
return i, nil
}

t.namespableDynamicClient = t.DynamicClient.Resource(schema.GroupVersionResource{
Group: "argoproj.io",
Version: "v1alpha1",

0 comments on commit 05c5626

Please sign in to comment.