diff --git a/pkg/reconciler/eventsource/resource.go b/pkg/reconciler/eventsource/resource.go index decc44ebcc..d4eae8422d 100644 --- a/pkg/reconciler/eventsource/resource.go +++ b/pkg/reconciler/eventsource/resource.go @@ -6,6 +6,8 @@ import ( "encoding/json" "fmt" "sort" + "os" + "strings" "github.com/argoproj/argo-events/pkg/apis/events/v1alpha1" aev1 "github.com/argoproj/argo-events/pkg/apis/events/v1alpha1" @@ -98,7 +100,7 @@ func Reconcile(client client.Client, args *AdaptorArgs, logger *zap.SugaredLogge logger.Errorw("error getting existing service", "error", err) return err } - expectedSvc, err := buildService(args) + expectedSvc, err := buildService(args, logger) if err != nil { eventSource.Status.MarkDeployFailed("BuildServiceFailed", "Failed to build service spec") logger.Errorw("error building service spec", "error", err) @@ -424,7 +426,7 @@ func getService(ctx context.Context, cl client.Client, args *AdaptorArgs) (*core return nil, apierrors.NewNotFound(schema.GroupResource{}, "") } -func buildService(args *AdaptorArgs) (*corev1.Service, error) { +func buildService(args *AdaptorArgs, logger *zap.SugaredLogger) (*corev1.Service, error) { eventSource := args.EventSource if eventSource.Spec.Service == nil { return nil, nil @@ -464,6 +466,36 @@ func buildService(args *AdaptorArgs) (*corev1.Service, error) { svc.ObjectMeta.SetLabels(labels) svc.ObjectMeta.SetAnnotations(annotations) + if os.Getenv("DEPLOYMENT_NAME") != "" { + webhookEndpoint := "webhook." + eventSource.Namespace + "." + os.Getenv("DEPLOYMENT_NAME") + if svc.ObjectMeta.Annotations == nil { + svc.ObjectMeta.Annotations = map[string]string{ + "external-dns.alpha.kubernetes.io/hostname": webhookEndpoint, + } + } else { + svc.ObjectMeta.Annotations["external-dns.alpha.kubernetes.io/hostname"] = webhookEndpoint + } + } + + if os.Getenv("WEBHOOK_SERVICE_TYPE") != "" { + switch os.Getenv("WEBHOOK_SERVICE_TYPE") { + case "LoadBalancer": + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + default: + svc.Spec.Type = corev1.ServiceTypeClusterIP + } + } + + if os.Getenv("LOADBALANCER_IP_RANGES") != "" { + ipRangesOpts := os.Getenv("LOADBALANCER_IP_RANGES") + ipRanges := strings.Split(ipRangesOpts, ",") + for idx := range ipRanges { + ipRanges[idx] = ipRanges[idx] + "/32" + } + svc.Spec.LoadBalancerSourceRanges = append(svc.Spec.LoadBalancerSourceRanges, ipRanges...) + } + + logger.Infof("created service object %#v", svc) if err := controllerscommon.SetObjectMeta(eventSource, svc, v1alpha1.EventSourceGroupVersionKind); err != nil { return nil, err } diff --git a/pkg/sensors/triggers/argo-workflow/argo-workflow.go b/pkg/sensors/triggers/argo-workflow/argo-workflow.go index 08506c8ff0..434af8b86a 100644 --- a/pkg/sensors/triggers/argo-workflow/argo-workflow.go +++ b/pkg/sensors/triggers/argo-workflow/argo-workflow.go @@ -129,6 +129,17 @@ func (t *ArgoWorkflowTrigger) Execute(ctx context.Context, events map[string]*v1 namespace = t.Sensor.Namespace } + // name: ARGO_CLI_EXTRA_ARGS + // value: --kubeconfig /my/path/to/kubeconfig + var extraArgs []string + extraArgsEnv := os.Getenv("ARGO_CLI_EXTRA_ARGS") + if extraArgsEnv != "" { + esplit := strings.Split(extraArgsEnv, " ") + for i := range esplit { + extraArgs = append(extraArgs, esplit[i]) + } + } + var cmd *exec.Cmd switch op { @@ -157,7 +168,10 @@ func (t *ArgoWorkflowTrigger) Execute(ctx context.Context, events map[string]*v1 if _, err := file.Write(jObj); err != nil { return nil, fmt.Errorf("failed to write workflow json %s to the temp file %s, %w", name, file.Name(), err) } - cmd = exec.Command("argo", "-n", namespace, "submit", file.Name()) + + allArgs := []string{"-n", namespace, "submit", file.Name()} + allArgs = append(allArgs, extraArgs...) + cmd = exec.Command("argo", allArgs...) case v1alpha1.SubmitFrom: kind := obj.GetKind() switch strings.ToLower(kind) { @@ -189,10 +203,17 @@ func (t *ArgoWorkflowTrigger) Execute(ctx context.Context, events map[string]*v1 cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr cmd.Args = append(cmd.Args, trigger.Template.ArgoWorkflow.Args...) + t.Logger.Infof("submitting command %#v", cmd) if err := t.cmdRunner(cmd); err != nil { return nil, fmt.Errorf("failed to execute %s command for workflow %s, %w", string(op), name, err) } + triggerOnly := os.Getenv("TRIGGER_WF_ONLY") + if !strings.EqualFold(triggerOnly, "") { + var i interface{} + return i, nil + } + t.namespableDynamicClient = t.DynamicClient.Resource(schema.GroupVersionResource{ Group: "argoproj.io", Version: "v1alpha1",