diff --git a/.github/workflows/self_hosted_e2e.yaml b/.github/workflows/self_hosted_e2e.yaml index c3631c784..d1e059fdd 100644 --- a/.github/workflows/self_hosted_e2e.yaml +++ b/.github/workflows/self_hosted_e2e.yaml @@ -42,6 +42,7 @@ jobs: DAPR_DASHBOARD_PINNED_VERSION: 0.13.0 DAPR_RUNTIME_LATEST_STABLE_VERSION: DAPR_DASHBOARD_LATEST_STABLE_VERSION: + GOLANG_PROTOBUF_REGISTRATION_CONFLICT: warn PODMAN_VERSION: 4.4.4 strategy: # TODO: Remove this when our E2E tests are stable for podman on MacOS. diff --git a/.gitignore b/.gitignore index 19d8ec1aa..605a37812 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,8 @@ cli # CLI's auto-generated components directory **/components +# Auto generated deploy dir inside .dapr directory +**/.dapr/deploy # Auto generated logs dir inside .dapr directory **/.dapr/logs diff --git a/cmd/init.go b/cmd/init.go index 08f960b5a..1b5c638eb 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -33,6 +33,7 @@ var ( wait bool timeout uint slimMode bool + devMode bool runtimeVersion string dashboardVersion string allNamespaces bool @@ -68,6 +69,9 @@ dapr init --image-registry # Initialize Dapr in Kubernetes dapr init -k +# Initialize Dapr in Kubernetes in dev mode +dapr init -k --dev + # Initialize Dapr in Kubernetes and wait for the installation to complete (default timeout is 300s/5m) dapr init -k --wait --timeout 600 @@ -127,6 +131,7 @@ dapr init --runtime-path DashboardVersion: dashboardVersion, EnableMTLS: enableMTLS, EnableHA: enableHA, + EnableDev: devMode, Args: values, Wait: wait, Timeout: timeout, @@ -202,6 +207,7 @@ func init() { defaultContainerRuntime := string(utils.DOCKER) InitCmd.Flags().BoolVarP(&kubernetesMode, "kubernetes", "k", false, "Deploy Dapr to a Kubernetes cluster") + InitCmd.Flags().BoolVarP(&devMode, "dev", "", false, "Use Dev mode. Deploy Redis, Zipkin also in the Kubernetes cluster") InitCmd.Flags().BoolVarP(&wait, "wait", "", false, "Wait for Kubernetes initialization to complete") InitCmd.Flags().UintVarP(&timeout, "timeout", "", 300, "The wait timeout for the Kubernetes installation") InitCmd.Flags().BoolVarP(&slimMode, "slim", "s", false, "Exclude placement service, Redis and Zipkin containers from self-hosted installation") diff --git a/cmd/run.go b/cmd/run.go index bf89f68a2..65d3ed40b 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -27,11 +27,12 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/dapr/cli/pkg/kubernetes" "github.com/dapr/cli/pkg/metadata" "github.com/dapr/cli/pkg/print" runExec "github.com/dapr/cli/pkg/runexec" + "github.com/dapr/cli/pkg/runfileconfig" "github.com/dapr/cli/pkg/standalone" - "github.com/dapr/cli/pkg/standalone/runfileconfig" daprsyscall "github.com/dapr/cli/pkg/syscall" "github.com/dapr/cli/utils" ) @@ -64,6 +65,7 @@ var ( apiListenAddresses string runFilePath string appChannelAddress string + enableRunK8s bool ) const ( @@ -105,6 +107,12 @@ dapr run --run-file dapr.yaml # Run multiple apps by providing a directory path containing the run config file(dapr.yaml) dapr run --run-file /path/to/directory + +# Run multiple apps in Kubernetes by proficing path of a run config file +dapr run --run-file dapr.yaml -k + +# Run multiple apps in Kubernetes by providing a directory path containing the run config file(dapr.yaml) +dapr run --run-file /path/to/directory -k `, Args: cobra.MinimumNArgs(0), PreRun: func(cmd *cobra.Command, args []string) { @@ -117,7 +125,7 @@ dapr run --run-file /path/to/directory print.FailureStatusEvent(os.Stderr, "Failed to get run file path: %v", err) os.Exit(1) } - executeRunWithAppsConfigFile(runConfigFilePath) + executeRunWithAppsConfigFile(runConfigFilePath, enableRunK8s) return } if len(args) == 0 { @@ -457,6 +465,7 @@ func init() { RunCmd.Flags().IntVar(&appHealthTimeout, "app-health-probe-timeout", 0, "Timeout for app health probes in milliseconds") RunCmd.Flags().IntVar(&appHealthThreshold, "app-health-threshold", 0, "Number of consecutive failures for the app to be considered unhealthy") RunCmd.Flags().BoolVar(&enableAPILogging, "enable-api-logging", false, "Log API calls at INFO verbosity. Valid values are: true or false") + RunCmd.Flags().BoolVarP(&enableRunK8s, "kubernetes", "k", false, "Run the multi-app run template against Kubernetes environment.") RunCmd.Flags().StringVar(&apiListenAddresses, "dapr-listen-addresses", "", "Comma separated list of IP addresses that sidecar will listen to") RunCmd.Flags().StringVarP(&runFilePath, "run-file", "f", "", "Path to the run template file for the list of apps to run") RunCmd.Flags().StringVarP(&appChannelAddress, "app-channel-address", "", utils.DefaultAppChannelAddress, "The network address the application listens on") @@ -507,11 +516,11 @@ func executeRun(runTemplateName, runFilePath string, apps []runfileconfig.App) ( // A custom writer used for trimming ASCII color codes from logs when writing to files. var customAppLogWriter io.Writer - daprdLogWriterCloser := getLogWriter(app.DaprdLogWriteCloser, app.DaprdLogDestination) + daprdLogWriterCloser := runfileconfig.GetLogWriter(app.DaprdLogWriteCloser, app.DaprdLogDestination) if len(runConfig.Command) == 0 { print.StatusEvent(os.Stdout, print.LogWarning, "No application command found for app %q present in %s", runConfig.AppID, runFilePath) - appDaprdWriter = getAppDaprdWriter(app, true) + appDaprdWriter = runExec.GetAppDaprdWriter(app, true) appLogWriter = app.DaprdLogWriteCloser } else { err = app.CreateAppLogFile() @@ -520,8 +529,8 @@ func executeRun(runTemplateName, runFilePath string, apps []runfileconfig.App) ( exitWithError = true break } - appDaprdWriter = getAppDaprdWriter(app, false) - appLogWriter = getLogWriter(app.AppLogWriteCloser, app.AppLogDestination) + appDaprdWriter = runExec.GetAppDaprdWriter(app, false) + appLogWriter = runfileconfig.GetLogWriter(app.AppLogWriteCloser, app.AppLogDestination) } customAppLogWriter = print.CustomLogWriter{W: appLogWriter} runState, err := startDaprdAndAppProcesses(&runConfig, app.AppDirPath, sigCh, @@ -590,43 +599,6 @@ func executeRun(runTemplateName, runFilePath string, apps []runfileconfig.App) ( return exitWithError, closeError } -// getAppDaprdWriter returns the writer for writing logs common to both daprd, app and stdout. -func getAppDaprdWriter(app runfileconfig.App, isAppCommandEmpty bool) io.Writer { - var appDaprdWriter io.Writer - if isAppCommandEmpty { - if app.DaprdLogDestination != standalone.Console { - appDaprdWriter = io.MultiWriter(os.Stdout, app.DaprdLogWriteCloser) - } else { - appDaprdWriter = os.Stdout - } - } else { - if app.AppLogDestination != standalone.Console && app.DaprdLogDestination != standalone.Console { - appDaprdWriter = io.MultiWriter(app.AppLogWriteCloser, app.DaprdLogWriteCloser, os.Stdout) - } else if app.AppLogDestination != standalone.Console { - appDaprdWriter = io.MultiWriter(app.AppLogWriteCloser, os.Stdout) - } else if app.DaprdLogDestination != standalone.Console { - appDaprdWriter = io.MultiWriter(app.DaprdLogWriteCloser, os.Stdout) - } else { - appDaprdWriter = os.Stdout - } - } - return appDaprdWriter -} - -// getLogWriter returns the log writer based on the log destination. -func getLogWriter(fileLogWriterCloser io.WriteCloser, logDestination standalone.LogDestType) io.Writer { - var logWriter io.Writer - switch logDestination { - case standalone.Console: - logWriter = os.Stdout - case standalone.File: - logWriter = fileLogWriterCloser - case standalone.FileAndConsole: - logWriter = io.MultiWriter(os.Stdout, fileLogWriterCloser) - } - return logWriter -} - func logInformationalStatusToStdout(app runfileconfig.App) { print.InfoStatusEvent(os.Stdout, "Started Dapr with app id %q. HTTP Port: %d. gRPC Port: %d", app.AppID, app.RunConfig.HTTPPort, app.RunConfig.GRPCPort) @@ -652,9 +624,8 @@ func gracefullyShutdownAppsAndCloseResources(runState []*runExec.RunExec, apps [ return err } -func executeRunWithAppsConfigFile(runFilePath string) { - config := runfileconfig.RunFileConfig{} - apps, err := config.GetApps(runFilePath) +func executeRunWithAppsConfigFile(runFilePath string, k8sEnabled bool) { + config, apps, err := getRunConfigFromRunFile(runFilePath) if err != nil { print.StatusEvent(os.Stdout, print.LogFailure, "Error getting apps from config file: %s", err) os.Exit(1) @@ -663,7 +634,13 @@ func executeRunWithAppsConfigFile(runFilePath string) { print.StatusEvent(os.Stdout, print.LogFailure, "No apps to run") os.Exit(1) } - exitWithError, closeErr := executeRun(config.Name, runFilePath, apps) + var exitWithError bool + var closeErr error + if !k8sEnabled { + exitWithError, closeErr = executeRun(config.Name, runFilePath, apps) + } else { + exitWithError, closeErr = kubernetes.Run(runFilePath, config) + } if exitWithError { if closeErr != nil { print.StatusEvent(os.Stdout, print.LogFailure, "Error closing resources: %s", closeErr) @@ -672,6 +649,12 @@ func executeRunWithAppsConfigFile(runFilePath string) { } } +func getRunConfigFromRunFile(runFilePath string) (runfileconfig.RunFileConfig, []runfileconfig.App, error) { + config := runfileconfig.RunFileConfig{} + apps, err := config.GetApps(runFilePath) + return config, apps, err +} + // startDaprdAndAppProcesses is a function to start the App process and the associated Daprd process. // This should be called as a blocking function call. func startDaprdAndAppProcesses(runConfig *standalone.RunConfig, commandDir string, sigCh chan os.Signal, diff --git a/cmd/stop.go b/cmd/stop.go index 3540bac90..fe2141092 100644 --- a/cmd/stop.go +++ b/cmd/stop.go @@ -20,11 +20,15 @@ import ( "github.com/spf13/cobra" + "github.com/dapr/cli/pkg/kubernetes" "github.com/dapr/cli/pkg/print" "github.com/dapr/cli/pkg/standalone" ) -var stopAppID string +var ( + stopAppID string + stopK8s bool +) var StopCmd = &cobra.Command{ Use: "stop", @@ -38,6 +42,12 @@ dapr stop --run-file dapr.yaml # Stop multiple apps by providing a directory path containing the run config file(dapr.yaml) dapr stop --run-file /path/to/directory + +# Stop and delete Kubernetes deployment of multiple apps by providing a run config file +dapr stop --run-file dapr.yaml -k + +# Stop and delete Kubernetes deployment of multiple apps by providing a directory path containing the run config file(dapr.yaml) +dapr stop --run-file /path/to/directory -k `, Run: func(cmd *cobra.Command, args []string) { var err error @@ -47,13 +57,23 @@ dapr stop --run-file /path/to/directory print.FailureStatusEvent(os.Stderr, "Failed to get run file path: %v", err) os.Exit(1) } - err = executeStopWithRunFile(runFilePath) + if !stopK8s { + err = executeStopWithRunFile(runFilePath) + if err != nil { + print.FailureStatusEvent(os.Stderr, "Failed to stop Dapr and app processes: %s", err) + } else { + print.SuccessStatusEvent(os.Stdout, "Dapr and app processes stopped successfully") + } + return + } + config, _, cErr := getRunConfigFromRunFile(runFilePath) + if cErr != nil { + print.FailureStatusEvent(os.Stderr, "Failed to parse run template file %q: %s", runFilePath, cErr.Error()) + } + err = kubernetes.Stop(runFilePath, config) if err != nil { - print.FailureStatusEvent(os.Stderr, "Failed to stop Dapr and app processes: %s", err) - } else { - print.SuccessStatusEvent(os.Stdout, "Dapr and app processes stopped successfully") + print.FailureStatusEvent(os.Stderr, "Error stopping deployments from multi-app run template: %v", err) } - return } if stopAppID != "" { args = append(args, stopAppID) @@ -78,6 +98,7 @@ dapr stop --run-file /path/to/directory func init() { StopCmd.Flags().StringVarP(&stopAppID, "app-id", "a", "", "The application id to be stopped") StopCmd.Flags().StringVarP(&runFilePath, "run-file", "f", "", "Path to the run template file for the list of apps to stop") + StopCmd.Flags().BoolVarP(&stopK8s, "kubernetes", "k", false, "Stop deployments in Kunernetes based on multi-app run file") StopCmd.Flags().BoolP("help", "h", false, "Print this help message") RootCmd.AddCommand(StopCmd) } diff --git a/cmd/uninstall.go b/cmd/uninstall.go index a4885c935..f451dd781 100644 --- a/cmd/uninstall.go +++ b/cmd/uninstall.go @@ -30,6 +30,7 @@ import ( var ( uninstallNamespace string uninstallKubernetes bool + uninstallDev bool uninstallAll bool uninstallContainerRuntime string ) @@ -48,6 +49,15 @@ dapr uninstall --all # Uninstall from Kubernetes dapr uninstall -k +# Uninstall from Kubernetes and remove CRDs +dapr uninstall -k --all + +# Uninstall from Kubernetes remove dev deployments of Redis, Zipkin +dapr uninstall -k --dev + +# Uninstall from Kubernetes remove dev deployments of Redis, Zipkin and CRDs +dapr uninstall -k --dev --all + # Uninstall Dapr from non-default install directory # This will remove the .dapr directory present in the path dapr uninstall --runtime-path @@ -66,7 +76,7 @@ dapr uninstall --runtime-path } print.InfoStatusEvent(os.Stdout, "Removing Dapr from your cluster...") - err = kubernetes.Uninstall(uninstallNamespace, uninstallAll, timeout) + err = kubernetes.Uninstall(uninstallNamespace, uninstallAll, uninstallDev, timeout) } else { if !utils.IsValidContainerRuntime(uninstallContainerRuntime) { print.FailureStatusEvent(os.Stdout, "Invalid container runtime. Supported values are docker and podman.") @@ -87,6 +97,7 @@ dapr uninstall --runtime-path func init() { UninstallCmd.Flags().BoolVarP(&uninstallKubernetes, "kubernetes", "k", false, "Uninstall Dapr from a Kubernetes cluster") + UninstallCmd.Flags().BoolVarP(&uninstallDev, "dev", "", false, "Uninstall Dapr Redis and Zipking installations from Kubernetes cluster") UninstallCmd.Flags().UintVarP(&timeout, "timeout", "", 300, "The timeout for the Kubernetes uninstall") UninstallCmd.Flags().BoolVar(&uninstallAll, "all", false, "Remove .dapr directory, Redis, Placement and Zipkin containers on local machine, and CRDs on a Kubernetes cluster") UninstallCmd.Flags().String("network", "", "The Docker network from which to remove the Dapr runtime") diff --git a/pkg/kubernetes/components.go b/pkg/kubernetes/components.go index ada308b28..a18352146 100644 --- a/pkg/kubernetes/components.go +++ b/pkg/kubernetes/components.go @@ -25,6 +25,7 @@ import ( "github.com/dapr/cli/pkg/age" "github.com/dapr/cli/utils" v1alpha1 "github.com/dapr/dapr/pkg/apis/components/v1alpha1" + "github.com/dapr/dapr/pkg/client/clientset/versioned" ) // ComponentsOutput represent a Dapr component. @@ -46,19 +47,35 @@ func PrintComponents(name, namespace, outputFormat string) error { return nil, err } - list, err := client.ComponentsV1alpha1().Components(namespace).List(meta_v1.ListOptions{}) - // This means that the Dapr Components CRD is not installed and - // therefore no component items exist. - if apierrors.IsNotFound(err) { - list = &v1alpha1.ComponentList{ - Items: []v1alpha1.Component{}, - } - } else if err != nil { - return nil, err + return listComponents(client, namespace) + }, name, outputFormat) +} + +func listComponents(client versioned.Interface, namespace string) (*v1alpha1.ComponentList, error) { + list, err := client.ComponentsV1alpha1().Components(namespace).List(meta_v1.ListOptions{}) + // This means that the Dapr Components CRD is not installed and + // therefore no component items exist. + if apierrors.IsNotFound(err) { + list = &v1alpha1.ComponentList{ + Items: []v1alpha1.Component{}, } + } else if err != nil { + return nil, err + } - return list, nil - }, name, outputFormat) + return list, nil +} + +func getComponent(client versioned.Interface, namespace string, componentName string) (*v1alpha1.Component, error) { + c, err := client.ComponentsV1alpha1().Components(namespace).Get(componentName, meta_v1.GetOptions{}) + // This means that the Dapr Components CRD is not installed and + // therefore no component items exist. + if apierrors.IsNotFound(err) { + return &v1alpha1.Component{}, nil + } else if err != nil { + return nil, err + } + return c, err } func writeComponents(writer io.Writer, getConfigFunc func() (*v1alpha1.ComponentList, error), name, outputFormat string) error { diff --git a/pkg/kubernetes/configurations.go b/pkg/kubernetes/configurations.go index 1933b2d0f..97f005218 100644 --- a/pkg/kubernetes/configurations.go +++ b/pkg/kubernetes/configurations.go @@ -26,6 +26,7 @@ import ( "github.com/dapr/cli/pkg/age" "github.com/dapr/cli/utils" v1alpha1 "github.com/dapr/dapr/pkg/apis/configuration/v1alpha1" + "github.com/dapr/dapr/pkg/client/clientset/versioned" ) type configurationsOutput struct { @@ -66,6 +67,18 @@ func PrintConfigurations(name, namespace, outputFormat string) error { }, name, outputFormat) } +func getDaprConfiguration(client versioned.Interface, namespace string, configurationName string) (*v1alpha1.Configuration, error) { + c, err := client.ConfigurationV1alpha1().Configurations(namespace).Get(configurationName, meta_v1.GetOptions{}) + // This means that the Dapr Configurations CRD is not installed and + // therefore no configuration items exist. + if apierrors.IsNotFound(err) { + return &v1alpha1.Configuration{}, nil + } else if err != nil { + return nil, err + } + return c, err +} + func writeConfigurations(writer io.Writer, getConfigFunc func() (*v1alpha1.ConfigurationList, error), name, outputFormat string) error { confs, err := getConfigFunc() if err != nil { diff --git a/pkg/kubernetes/kubernetes.go b/pkg/kubernetes/kubernetes.go index e2b6ac285..c3fef7ede 100644 --- a/pkg/kubernetes/kubernetes.go +++ b/pkg/kubernetes/kubernetes.go @@ -33,13 +33,27 @@ import ( "github.com/dapr/cli/pkg/print" cli_ver "github.com/dapr/cli/pkg/version" "github.com/dapr/cli/utils" + "github.com/dapr/dapr/pkg/client/clientset/versioned" ) const ( daprReleaseName = "dapr" dashboardReleaseName = "dapr-dashboard" - daprHelmRepo = "https://dapr.github.io/helm-charts" latestVersion = "latest" + + // dev mode constants. + thirdPartyDevNamespace = "default" + zipkinChartName = "zipkin" + redisChartName = "redis" + zipkinReleaseName = "dapr-dev-zipkin" + redisReleaseName = "dapr-dev-redis" + redisVersion = "6.2" + bitnamiHelmRepo = "https://charts.bitnami.com/bitnami" + daprHelmRepo = "https://dapr.github.io/helm-charts" + zipkinHelmRepo = "https://openzipkin.github.io/zipkin" + stateStoreComponentName = "statestore" + pubsubComponentName = "pubsub" + zipkingConfigurationName = "appconfig" ) type InitConfiguration struct { @@ -48,6 +62,7 @@ type InitConfiguration struct { Namespace string EnableMTLS bool EnableHA bool + EnableDev bool Args []string Wait bool Timeout uint @@ -60,7 +75,8 @@ type InitConfiguration struct { // Init deploys the Dapr operator using the supplied runtime version. func Init(config InitConfiguration) error { - err := installWithConsole(daprReleaseName, config.Version, "Dapr control plane", config) + helmRepoDapr := utils.GetEnv("DAPR_HELM_REPO_URL", daprHelmRepo) + err := installWithConsole(daprReleaseName, config.Version, helmRepoDapr, "Dapr control plane", config) if err != nil { return err } @@ -75,19 +91,53 @@ func Init(config InitConfiguration) error { } } - err = installWithConsole(dashboardReleaseName, config.DashboardVersion, "Dapr dashboard", config) + err = installWithConsole(dashboardReleaseName, config.DashboardVersion, helmRepoDapr, "Dapr dashboard", config) + if err != nil { + return err + } + + if config.EnableDev { + redisChartVals := []string{ + "image.tag=" + redisVersion, + } + + err = installThirdPartyWithConsole(redisReleaseName, redisChartName, latestVersion, bitnamiHelmRepo, "Dapr Redis", redisChartVals, config) + if err != nil { + return err + } + + err = installThirdPartyWithConsole(zipkinReleaseName, zipkinChartName, latestVersion, zipkinHelmRepo, "Dapr Zipkin", []string{}, config) + if err != nil { + return err + } + + err = initDevConfigs() + if err != nil { + return err + } + } + return nil +} + +func installThirdPartyWithConsole(releaseName, chartName, releaseVersion, helmRepo string, prettyName string, chartValues []string, config InitConfiguration) error { + installSpinning := print.Spinner(os.Stdout, "Deploying the "+prettyName+" with "+releaseVersion+" version to your cluster...") + defer installSpinning(print.Failure) + + // releaseVersion of chart will always be latest version. + err := installThirdParty(releaseName, chartName, latestVersion, helmRepo, chartValues, config) if err != nil { return err } + installSpinning(print.Success) return nil } -func installWithConsole(releaseName string, releaseVersion string, prettyName string, config InitConfiguration) error { +func installWithConsole(releaseName, releaseVersion, helmRepo string, prettyName string, config InitConfiguration) error { installSpinning := print.Spinner(os.Stdout, "Deploying the "+prettyName+" with "+releaseVersion+" version to your cluster...") defer installSpinning(print.Failure) - err := install(releaseName, releaseVersion, config) + err := install(releaseName, releaseVersion, helmRepo, config) if err != nil { return err } @@ -156,9 +206,9 @@ func locateChartFile(dirPath string) (string, error) { return filepath.Join(dirPath, files[0].Name()), nil } -func daprChart(version string, releaseName string, config *helm.Configuration) (*chart.Chart, error) { +func getHelmChart(version, releaseName, helmRepo string, config *helm.Configuration) (*chart.Chart, error) { pull := helm.NewPullWithOpts(helm.WithConfig(config)) - pull.RepoURL = utils.GetEnv("DAPR_HELM_REPO_URL", daprHelmRepo) + pull.RepoURL = helmRepo pull.Username = utils.GetEnv("DAPR_HELM_REPO_USERNAME", "") pull.Password = utils.GetEnv("DAPR_HELM_REPO_PASSWORD", "") @@ -188,7 +238,7 @@ func daprChart(version string, releaseName string, config *helm.Configuration) ( return loader.Load(chartPath) } -func chartValues(config InitConfiguration, version string) (map[string]interface{}, error) { +func daprChartValues(config InitConfiguration, version string) (map[string]interface{}, error) { chartVals := map[string]interface{}{} err := utils.ValidateImageVariant(config.ImageVariant) if err != nil { @@ -227,7 +277,7 @@ func chartValues(config InitConfiguration, version string) (map[string]interface return chartVals, nil } -func install(releaseName string, releaseVersion string, config InitConfiguration) error { +func install(releaseName, releaseVersion, helmRepo string, config InitConfiguration) error { err := createNamespace(config.Namespace) if err != nil { return err @@ -238,7 +288,7 @@ func install(releaseName string, releaseVersion string, config InitConfiguration return err } - daprChart, err := daprChart(releaseVersion, releaseName, helmConf) + daprChart, err := getHelmChart(releaseVersion, releaseName, helmRepo, helmConf) if err != nil { return err } @@ -261,7 +311,7 @@ func install(releaseName string, releaseVersion string, config InitConfiguration installClient.Wait = config.Wait installClient.Timeout = time.Duration(config.Timeout) * time.Second - values, err := chartValues(config, version) + values, err := daprChartValues(config, version) if err != nil { return err } @@ -273,6 +323,38 @@ func install(releaseName string, releaseVersion string, config InitConfiguration return nil } +func installThirdParty(releaseName, chartName, releaseVersion, helmRepo string, chartVals []string, config InitConfiguration) error { + helmConf, err := helmConfig(thirdPartyDevNamespace) + if err != nil { + return err + } + + helmChart, err := getHelmChart(releaseVersion, chartName, helmRepo, helmConf) + if err != nil { + return err + } + + installClient := helm.NewInstall(helmConf) + installClient.ReleaseName = releaseName + installClient.Namespace = thirdPartyDevNamespace + installClient.Wait = config.Wait + installClient.Timeout = time.Duration(config.Timeout) * time.Second + + values := map[string]interface{}{} + + for _, val := range chartVals { + if err = strvals.ParseInto(val, values); err != nil { + return err + } + } + + if _, err = installClient.Run(helmChart, values); err != nil { + return err + } + + return nil +} + func debugLogf(format string, v ...interface{}) { } @@ -290,3 +372,163 @@ func confirmExist(cfg *helm.Configuration, releaseName string) (bool, error) { return true, nil } + +func checkAndOverWriteFile(filePath string, b []byte) error { + _, err := os.Stat(filePath) + if os.IsNotExist(err) { + // #nosec G306 + if err = os.WriteFile(filePath, b, 0o644); err != nil { + return err + } + } + return nil +} + +func isComponentPresent(client versioned.Interface, namespace string, componentName string) (bool, error) { + c, err := getComponent(client, namespace, componentName) + if err != nil { + return false, err + } + return c.Name == componentName, err +} + +func isConfigurationPresent(client versioned.Interface, namespace string, configurationName string) (bool, error) { + c, err := getDaprConfiguration(client, namespace, configurationName) + if err != nil { + return false, err + } + return c.Name == configurationName, nil +} + +func initDevConfigs() error { + redisStatestore := ` +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.redis + version: v1 + metadata: + # These settings will work out of the box if you use helm install + # bitnami/redis. If you have your own setup, replace + # redis-master:6379 with your own Redis master address, and the + # Redis password with your own Secret's name. For more information, + # see https://docs.dapr.io/operations/components/component-secrets . + - name: redisHost + value: dapr-dev-redis-master:6379 + - name: redisPassword + secretKeyRef: + name: dapr-dev-redis + key: redis-password +auth: + secretStore: kubernetes +` + + redisPubsub := ` +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: pubsub +spec: + type: pubsub.redis + version: v1 + metadata: + # These settings will work out of the box if you use helm install + # bitnami/redis. If you have your own setup, replace + # redis-master:6379 with your own Redis master address, and the + # Redis password with your own Secret's name. For more information, + # see https://docs.dapr.io/operations/components/component-secrets . + - name: redisHost + value: dapr-dev-redis-master:6379 + - name: redisPassword + secretKeyRef: + name: dapr-dev-redis + key: redis-password +auth: + secretStore: kubernetes +` + + zipkinConfig := ` +apiVersion: dapr.io/v1alpha1 +kind: Configuration +metadata: + name: appconfig +spec: + tracing: + samplingRate: "1" + zipkin: + endpointAddress: "http://dapr-dev-zipkin.default.svc.cluster.local:9411/api/v2/spans" +` + tempDirPath, err := createTempDir() + defer os.RemoveAll(tempDirPath) + if err != nil { + return err + } + client, err := DaprClient() + if err != nil { + return err + } + present, err := isComponentPresent(client, thirdPartyDevNamespace, stateStoreComponentName) + if present || err != nil { + if err != nil { + print.WarningStatusEvent(os.Stderr, "Error listing components, skipping default dev component creation.") + } else { + print.WarningStatusEvent(os.Stderr, "Component with name %q already present in namespace %q. Skipping component creation.", stateStoreComponentName, thirdPartyDevNamespace) + } + return err + } + + redisPath := filepath.Join(tempDirPath, "redis-statestore.yaml") + err = checkAndOverWriteFile(redisPath, []byte(redisStatestore)) + if err != nil { + return err + } + print.InfoStatusEvent(os.Stdout, "Applying %q component to Kubernetes %q namespace.", stateStoreComponentName, thirdPartyDevNamespace) + _, err = utils.RunCmdAndWait("kubectl", "apply", "-f", redisPath) + if err != nil { + return err + } + + present, err = isComponentPresent(client, thirdPartyDevNamespace, pubsubComponentName) + if present || err != nil { + if err != nil { + print.WarningStatusEvent(os.Stderr, "Error listing components, skipping default dev component creation.") + } else { + print.WarningStatusEvent(os.Stderr, "Component with name %q already present in namespace %q. Skipping component creation.", pubsubComponentName, thirdPartyDevNamespace) + } + return err + } + redisPath = filepath.Join(tempDirPath, "redis-pubsub.yaml") + err = checkAndOverWriteFile(redisPath, []byte(redisPubsub)) + if err != nil { + return err + } + print.InfoStatusEvent(os.Stdout, "Applying %q component to Kubernetes %q namespace.", pubsubComponentName, thirdPartyDevNamespace) + _, err = utils.RunCmdAndWait("kubectl", "apply", "-f", redisPath) + if err != nil { + return err + } + + present, err = isConfigurationPresent(client, thirdPartyDevNamespace, zipkingConfigurationName) + if present || err != nil { + if err != nil { + print.WarningStatusEvent(os.Stderr, "Error listing configurations, skipping default dev configuration creation.") + } else { + print.WarningStatusEvent(os.Stderr, "Configuration with name %q already present in namespace %q. Skipping configuration creation.", zipkingConfigurationName, thirdPartyDevNamespace) + } + return err + } + zipkinPath := filepath.Join(tempDirPath, "zipkin-config.yaml") + err = checkAndOverWriteFile(zipkinPath, []byte(zipkinConfig)) + if err != nil { + return err + } + print.InfoStatusEvent(os.Stdout, "Applying %q zipkin configuration to Kubernetes %q namespace.", zipkingConfigurationName, thirdPartyDevNamespace) + _, err = utils.RunCmdAndWait("kubectl", "apply", "-f", zipkinPath) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/kubernetes/logs.go b/pkg/kubernetes/logs.go index 1cf062971..9dd8d450a 100644 --- a/pkg/kubernetes/logs.go +++ b/pkg/kubernetes/logs.go @@ -14,17 +14,32 @@ limitations under the License. package kubernetes import ( + "bufio" "context" + "errors" "fmt" "io" "os" + "strings" + "time" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" + + "github.com/dapr/cli/pkg/print" ) const ( daprdContainerName = "daprd" appIDContainerArgName = "--app-id" + + // number of retries when trying to list pods for getting logs. + maxListingRetry = 10 + // delay between retries of pod listing. + listingDelay = 200 * time.Microsecond + // delay before retrying for getting logs. + streamingDelay = 100 * time.Millisecond ) // Logs fetches Dapr sidecar logs from Kubernetes. @@ -84,3 +99,99 @@ func Logs(appID, podName, namespace string) error { return nil } + +// streamContainerLogsToDisk streams all containers logs for the given selector to a given disk directory. +func streamContainerLogsToDisk(ctx context.Context, appID string, appLogWriter, daprdLogWriter io.Writer, podClient v1.PodInterface) error { + var err error + var podList *corev1.PodList + counter := 0 + for { + podList, err = getPods(ctx, appID, podClient) + if err != nil { + return fmt.Errorf("error listing the pod with label %s=%s: %w", daprAppIDKey, appID, err) + } + if len(podList.Items) != 0 { + break + } + counter++ + if counter == maxListingRetry { + return fmt.Errorf("error getting logs: error listing the pod with label %s=%s after %d retires", daprAppIDKey, appID, maxListingRetry) + } + // Retry after a delay. + time.Sleep(listingDelay) + } + + for _, pod := range podList.Items { + print.InfoStatusEvent(os.Stdout, "Streaming logs for containers in pod %q", pod.GetName()) + for _, container := range pod.Spec.Containers { + fileWriter := daprdLogWriter + if container.Name != daprdContainerName { + fileWriter = appLogWriter + } + + // create a go routine for each container to stream logs into file/console. + go func(pod, containerName, appID string, fileWriter io.Writer) { + loop: + for { + req := podClient.GetLogs(pod, &corev1.PodLogOptions{ + Container: containerName, + Follow: true, + }) + stream, err := req.Stream(ctx) + if err != nil { + switch { + case strings.Contains(err.Error(), "Pending"): + // Retry after a delay. + time.Sleep(streamingDelay) + continue loop + case strings.Contains(err.Error(), "ContainerCreating"): + // Retry after a delay. + time.Sleep(streamingDelay) + continue loop + case errors.Is(err, context.Canceled): + return + default: + return + } + } + defer stream.Close() + + if containerName != daprdContainerName { + streamScanner := bufio.NewScanner(stream) + for streamScanner.Scan() { + fmt.Fprintln(fileWriter, print.Blue(fmt.Sprintf("== APP - %s == %s", appID, streamScanner.Text()))) + } + } else { + _, err = io.Copy(fileWriter, stream) + if err != nil { + switch { + case errors.Is(err, context.Canceled): + return + default: + return + } + } + } + + return + } + }(pod.GetName(), container.Name, appID, fileWriter) + } + } + + return nil +} + +func getPods(ctx context.Context, appID string, podClient v1.PodInterface) (*corev1.PodList, error) { + listCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + labelSelector := fmt.Sprintf("%s=%s", daprAppIDKey, appID) + fmt.Println("Select", labelSelector) + podList, err := podClient.List(listCtx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + cancel() + if err != nil { + return nil, err + } + return podList, nil +} diff --git a/pkg/kubernetes/pods.go b/pkg/kubernetes/pods.go index 6212667f5..ee88fdabf 100644 --- a/pkg/kubernetes/pods.go +++ b/pkg/kubernetes/pods.go @@ -15,24 +15,31 @@ package kubernetes import ( "context" + "errors" + "fmt" "strings" - core_v1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/watch" k8s "k8s.io/client-go/kubernetes" ) -func ListPodsInterface(client k8s.Interface, labelSelector map[string]string) (*core_v1.PodList, error) { - opts := v1.ListOptions{} +const podWatchErrTemplate = "error creating pod watcher" + +var errPodUnknown error = errors.New("pod in unknown/failed state") + +func ListPodsInterface(client k8s.Interface, labelSelector map[string]string) (*corev1.PodList, error) { + opts := metav1.ListOptions{} if labelSelector != nil { opts.LabelSelector = labels.FormatLabels(labelSelector) } - return client.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), opts) + return client.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), opts) } -func ListPods(client *k8s.Clientset, namespace string, labelSelector map[string]string) (*core_v1.PodList, error) { - opts := v1.ListOptions{} +func ListPods(client *k8s.Clientset, namespace string, labelSelector map[string]string) (*corev1.PodList, error) { + opts := metav1.ListOptions{} if labelSelector != nil { opts.LabelSelector = labels.FormatLabels(labelSelector) } @@ -41,8 +48,8 @@ func ListPods(client *k8s.Clientset, namespace string, labelSelector map[string] // CheckPodExists returns a boolean representing the pod's existence and the namespace that the given pod resides in, // or empty if not present in the given namespace. -func CheckPodExists(client *k8s.Clientset, namespace string, labelSelector map[string]string, deployName string) (bool, string) { - opts := v1.ListOptions{} +func CheckPodExists(client k8s.Interface, namespace string, labelSelector map[string]string, deployName string) (bool, string) { + opts := metav1.ListOptions{} if labelSelector != nil { opts.LabelSelector = labels.FormatLabels(labelSelector) } @@ -53,7 +60,7 @@ func CheckPodExists(client *k8s.Clientset, namespace string, labelSelector map[s } for _, pod := range podList.Items { - if pod.Status.Phase == core_v1.PodRunning { + if pod.Status.Phase == corev1.PodRunning { if strings.HasPrefix(pod.Name, deployName) { return true, pod.Namespace } @@ -61,3 +68,61 @@ func CheckPodExists(client *k8s.Clientset, namespace string, labelSelector map[s } return false, "" } + +func createPodWatcher(ctx context.Context, client k8s.Interface, namespace, appID string) (watch.Interface, error) { + labelSelector := fmt.Sprintf("%s=%s", daprAppIDKey, appID) + + opts := metav1.ListOptions{ + TypeMeta: metav1.TypeMeta{}, + LabelSelector: labelSelector, + } + + return client.CoreV1().Pods(namespace).Watch(ctx, opts) +} + +func waitPodDeleted(ctx context.Context, client k8s.Interface, namespace, appID string) error { + watcher, err := createPodWatcher(ctx, client, namespace, appID) + if err != nil { + return fmt.Errorf("%s : %w", podWatchErrTemplate, err) + } + + defer watcher.Stop() + + for { + select { + case event := <-watcher.ResultChan(): + + if event.Type == watch.Deleted { + return nil + } + + case <-ctx.Done(): + return fmt.Errorf("error context cancelled while waiting for pod deletion: %w", context.Canceled) + } + } +} + +func waitPodRunning(ctx context.Context, client k8s.Interface, namespace, appID string) error { + watcher, err := createPodWatcher(ctx, client, namespace, appID) + if err != nil { + return fmt.Errorf("%s : %w", podWatchErrTemplate, err) + } + + defer watcher.Stop() + + for { + select { + case event := <-watcher.ResultChan(): + pod := event.Object.(*corev1.Pod) + + if pod.Status.Phase == corev1.PodRunning { + return nil + } else if pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodUnknown { + return fmt.Errorf("error waiting for pod run: %w", errPodUnknown) + } + + case <-ctx.Done(): + return fmt.Errorf("error context cancelled while waiting for pod run: %w", context.Canceled) + } + } +} diff --git a/pkg/kubernetes/renew_certificate.go b/pkg/kubernetes/renew_certificate.go index 9caf426b1..43d19006b 100644 --- a/pkg/kubernetes/renew_certificate.go +++ b/pkg/kubernetes/renew_certificate.go @@ -112,7 +112,8 @@ func renewCertificate(rootCert, issuerCert, issuerKey []byte, timeout uint, imag return err } - daprChart, err := daprChart(daprVersion, "dapr", helmConf) + helmRepo := utils.GetEnv("DAPR_HELM_REPO_URL", daprHelmRepo) + daprChart, err := getHelmChart(daprVersion, "dapr", helmRepo, helmConf) if err != nil { return err } diff --git a/pkg/kubernetes/run.go b/pkg/kubernetes/run.go index e7e3d35a3..e441e2be0 100644 --- a/pkg/kubernetes/run.go +++ b/pkg/kubernetes/run.go @@ -13,24 +13,416 @@ limitations under the License. package kubernetes -// RunConfig represents the application configuration parameters. -type RunConfig struct { - AppID string - AppPort int - HTTPPort int - GRPCPort int - CodeDirectory string - Arguments []string - Image string -} - -// RunOutput represents the run output. -type RunOutput struct { - Message string -} - -// Run executes the application based on the run configuration. -func Run(config *RunConfig) (*RunOutput, error) { - //nolint - return nil, nil +import ( + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "sync" + "syscall" + "time" + + appV1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + k8s "k8s.io/client-go/kubernetes" + podsv1 "k8s.io/client-go/kubernetes/typed/core/v1" + + // Specifically use k8s sig yaml to marshal into json, then convert to yaml. + k8sYaml "sigs.k8s.io/yaml" + + "github.com/dapr/cli/pkg/print" + "github.com/dapr/cli/pkg/runfileconfig" + daprsyscall "github.com/dapr/cli/pkg/syscall" + "github.com/dapr/cli/utils" +) + +const ( + serviceKind = "Service" + deploymentKind = "Deployment" + serviceAPIVersion = "v1" + deploymentAPIVersion = "apps/v1" + loadBalanceType = "LoadBalancer" + daprEnableAnnotationKey = "dapr.io/enabled" + serviceFileName = "service.yaml" + deploymentFileName = "deployment.yaml" + appLabelKey = "app" + nameKey = "name" + labelsKey = "labels" + tcpProtocol = "TCP" + + podCreationDeletionTimeout = 1 * time.Minute +) + +type deploymentConfig struct { + Kind string `json:"kind"` + APIVersion string `json:"apiVersion"` + Metadata map[string]any `json:"metadata"` + Spec appV1.DeploymentSpec `json:"spec"` +} + +type serviceConfig struct { + Kind string `json:"kind"` + APIVersion string `json:"apiVersion"` + Metadata map[string]any `json:"metadata"` + Spec corev1.ServiceSpec `json:"spec"` +} + +type runState struct { + serviceFilePath string + deploymentFilePath string + app runfileconfig.App + logCancel context.CancelFunc +} + +// Run executes the application based on the run file configuration. +// Run creates a temporary `deploy` folder within the app/.dapr directory and then applies that to the context pointed to +// kubectl client. +func Run(runFilePath string, config runfileconfig.RunFileConfig) (bool, error) { + // At this point, we expect the runfile to be parsed and the values within config + // Validations and default setting will only be done after this point. + var exitWithError bool + + // get k8s client PodsInterface. + client, cErr := Client() + if cErr != nil { + // exit with error. + return true, fmt.Errorf("error getting k8s client: %w", cErr) + } + + namespace := corev1.NamespaceDefault + podsInterface := client.CoreV1().Pods(namespace) + + // setup a monitoring context for shutdown call from another cli process. + monitoringContext, monitoringCancel := context.WithCancel(context.Background()) + defer monitoringCancel() + + // setup shutdown notify channel. + sigCh := make(chan os.Signal, 1) + daprsyscall.SetupShutdownNotify(sigCh) + + runStates := []runState{} + + for _, app := range config.Apps { + print.StatusEvent(os.Stdout, print.LogInfo, "Validating config and starting app %q", app.RunConfig.AppID) + // Set defaults if zero value provided in config yaml. + app.RunConfig.SetDefaultFromSchema() + + // Validate validates the configs for k8s and modifies appId etc. + err := app.RunConfig.ValidateK8s() + if err != nil { + print.FailureStatusEvent(os.Stderr, "Error validating run config for app %q present in %s: %s", app.RunConfig.AppID, runFilePath, err.Error()) + exitWithError = true + break + } + + var svc serviceConfig + // create default service config. + if app.ContainerConfiguration.CreateService { + svc = createServiceConfig(app) + } + + // create default deployment config. + dep := createDeploymentConfig(app) + if err != nil { + print.FailureStatusEvent(os.Stderr, "Error creating deployment file for app %q present in %s: %s", app.RunConfig.AppID, runFilePath, err.Error()) + exitWithError = true + break + } + // overwrite /.dapr/deploy/service.yaml. + // overwrite /.dapr/deploy/deployment.yaml. + + err = writeYamlFile(app, svc, dep) + if err != nil { + print.FailureStatusEvent(os.Stderr, "Error creating deployment/service yaml files: %s", err.Error()) + exitWithError = true + break + } + + deployDir := app.GetDeployDir() + print.InfoStatusEvent(os.Stdout, "Deploying app %q to Kubernetes", app.AppID) + serviceFilePath := filepath.Join(deployDir, serviceFileName) + deploymentFilePath := filepath.Join(deployDir, deploymentFileName) + rState := runState{} + if app.CreateService { + print.InfoStatusEvent(os.Stdout, "Deploying service YAML %q to Kubernetes", serviceFilePath) + err = deployYamlToK8s(serviceFilePath) + if err != nil { + print.FailureStatusEvent(os.Stderr, "Error deploying service yaml file %q : %s", serviceFilePath, err.Error()) + exitWithError = true + break + } + rState.serviceFilePath = serviceFilePath + } + + print.InfoStatusEvent(os.Stdout, "Deploying deployment YAML %q to Kubernetes", deploymentFilePath) + err = deployYamlToK8s(deploymentFilePath) + if err != nil { + print.FailureStatusEvent(os.Stderr, "Error deploying deployment yaml file %q : %s", deploymentFilePath, err.Error()) + exitWithError = true + break + } + + // create log files and save state. + err = app.CreateDaprdLogFile() + if err != nil { + print.StatusEvent(os.Stderr, print.LogFailure, "Error getting daprd log file for app %q present in %s: %s", app.AppID, runFilePath, err.Error()) + exitWithError = true + break + } + err = app.CreateAppLogFile() + if err != nil { + print.StatusEvent(os.Stderr, print.LogFailure, "Error getting app log file for app %q present in %s: %s", app.AppID, runFilePath, err.Error()) + exitWithError = true + break + } + + daprdLogWriter := runfileconfig.GetLogWriter(app.DaprdLogWriteCloser, app.DaprdLogDestination) + // appDaprdWriter := runExec.GetAppDaprdWriter(app, false). + appLogWriter := runfileconfig.GetLogWriter(app.AppLogWriteCloser, app.AppLogDestination) + customAppLogWriter := print.CustomLogWriter{W: appLogWriter} + ctx, cancel := context.WithTimeout(context.Background(), podCreationDeletionTimeout) + err = waitPodRunning(ctx, client, namespace, app.AppID) + cancel() + if err != nil { + print.WarningStatusEvent(os.Stderr, "Error deploying pod to Kubernetes. See logs directly from Kubernetes command line.") + // Close the log files since there is deployment error, and the container might be in crash loop back off state. + app.CloseAppLogFile() + app.CloseDaprdLogFile() + } else { + logContext, cancel := context.WithCancel(context.Background()) + rState.logCancel = cancel + err = setupLogs(logContext, app.AppID, daprdLogWriter, customAppLogWriter, podsInterface) + if err != nil { + print.StatusEvent(os.Stderr, print.LogWarning, "Error setting up logs for app %q present in %q . See logs directly from Kubernetes command line.: %s ", app.AppID, runFilePath, err.Error()) + } + } + + rState.deploymentFilePath = deploymentFilePath + rState.app = app + + // append runSate only on successful k8s deploy. + runStates = append(runStates, rState) + + print.InfoStatusEvent(os.Stdout, "Writing log files to directory : %s", app.GetLogsDir()) + } + + // If all apps have been started and there are no errors in starting the apps wait for signal from sigCh. + if !exitWithError { + print.InfoStatusEvent(os.Stdout, "Starting to monitor Kubernetes pods for deletion.") + go monitorK8sPods(monitoringContext, client, namespace, runStates, sigCh) + // After all apps started wait for sigCh. + <-sigCh + monitoringCancel() + print.InfoStatusEvent(os.Stdout, "Stopping Kubernetes pods monitoring.") + // To add a new line in Stdout. + fmt.Println() + print.InfoStatusEvent(os.Stdout, "Received signal to stop. Deleting K8s Dapr app deployments.") + } + + closeErr := gracefullyShutdownK8sDeployment(runStates, client, namespace) + return exitWithError, closeErr +} + +func createServiceConfig(app runfileconfig.App) serviceConfig { + return serviceConfig{ + Kind: serviceKind, + APIVersion: serviceAPIVersion, + Metadata: map[string]any{ + nameKey: app.RunConfig.AppID, + labelsKey: map[string]string{ + appLabelKey: app.AppID, + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Protocol: tcpProtocol, + Port: 80, + TargetPort: intstr.FromInt(app.AppPort), + }, + }, + Selector: map[string]string{ + appLabelKey: app.AppID, + }, + Type: loadBalanceType, + }, + } +} + +func createDeploymentConfig(app runfileconfig.App) deploymentConfig { + replicas := int32(1) + dep := deploymentConfig{ + Kind: deploymentKind, + APIVersion: deploymentAPIVersion, + Metadata: map[string]any{ + nameKey: app.AppID, + }, + } + + dep.Spec = appV1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + appLabelKey: app.AppID, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + appLabelKey: app.AppID, + }, + Annotations: app.RunConfig.GetAnnotations(), + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: app.AppID, + Image: app.ContainerImage, + Env: getEnv(app), + ImagePullPolicy: corev1.PullAlways, + }, + }, + }, + }, + } + // Set dapr.io/enable annotation. + dep.Spec.Template.ObjectMeta.Annotations[daprEnableAnnotationKey] = "true" + + // set containerPort only if app port is present. + if app.AppPort != 0 { + dep.Spec.Template.Spec.Containers[0].Ports = []corev1.ContainerPort{ + { + ContainerPort: int32(app.AppPort), + }, + } + } + + return dep +} + +func getEnv(app runfileconfig.App) []corev1.EnvVar { + envs := app.GetEnv() + envVars := make([]corev1.EnvVar, len(envs)) + i := 0 + for k, v := range app.GetEnv() { + envVars[i] = corev1.EnvVar{ + Name: k, + Value: v, + } + i++ + } + return envVars +} + +func writeYamlFile(app runfileconfig.App, svc serviceConfig, dep deploymentConfig) error { + var yamlBytes []byte + var err error + var writeFile io.WriteCloser + deployDir := app.GetDeployDir() + if app.CreateService { + yamlBytes, err = k8sYaml.Marshal(svc) + if err != nil { + return fmt.Errorf("error marshalling service yaml: %w", err) + } + serviceFilePath := filepath.Join(deployDir, serviceFileName) + writeFile, err = os.Create(serviceFilePath) + if err != nil { + return fmt.Errorf("error creating file %s : %w", serviceFilePath, err) + } + _, err = writeFile.Write(yamlBytes) + if err != nil { + writeFile.Close() + return fmt.Errorf("error writing to file %s : %w", serviceFilePath, err) + } + writeFile.Close() + } + yamlBytes, err = k8sYaml.Marshal(dep) + if err != nil { + return fmt.Errorf("error marshalling deployment yaml: %w", err) + } + deploymentFilePath := filepath.Join(deployDir, deploymentFileName) + writeFile, err = os.Create(deploymentFilePath) + if err != nil { + return fmt.Errorf("error creating file %s : %w", deploymentFilePath, err) + } + _, err = writeFile.Write(yamlBytes) + if err != nil { + writeFile.Close() + return fmt.Errorf("error writing to file %s : %w", deploymentFilePath, err) + } + writeFile.Close() + return nil +} + +func deployYamlToK8s(yamlToDeployPath string) error { + _, err := utils.RunCmdAndWait("kubectl", "apply", "-f", yamlToDeployPath) + if err != nil { + return fmt.Errorf("error deploying the yaml %s to Kubernetes: %w", yamlToDeployPath, err) + } + return nil +} + +func deleteYamlK8s(yamlToDeletePath string) error { + print.InfoStatusEvent(os.Stdout, "Deleting %q from Kubernetes", yamlToDeletePath) + _, err := utils.RunCmdAndWait("kubectl", "delete", "-f", yamlToDeletePath) + if err != nil { + return fmt.Errorf("error deploying the yaml %s to Kubernetes: %w", yamlToDeletePath, err) + } + return nil +} + +func setupLogs(ctx context.Context, appID string, daprdLogWriter, appLogWriter io.Writer, podInterface podsv1.PodInterface) error { + return streamContainerLogsToDisk(ctx, appID, appLogWriter, daprdLogWriter, podInterface) +} + +func gracefullyShutdownK8sDeployment(runStates []runState, client k8s.Interface, namespace string) error { + errs := make([]error, 0, len(runStates)*4) + for _, r := range runStates { + if len(r.serviceFilePath) != 0 { + errs = append(errs, deleteYamlK8s(r.serviceFilePath)) + } + errs = append(errs, deleteYamlK8s(r.deploymentFilePath)) + labelSelector := map[string]string{ + daprAppIDKey: r.app.AppID, + } + if ok, _ := CheckPodExists(client, namespace, labelSelector, r.app.AppID); ok { + ctx, cancel := context.WithTimeout(context.Background(), podCreationDeletionTimeout) + err := waitPodDeleted(ctx, client, namespace, r.app.AppID) + cancel() + if err != nil { + // swallowing err here intentionally. + print.WarningStatusEvent(os.Stderr, "Error waiting for pods to be deleted. Final logs might only be partially available.") + } + } + + // shutdown logs. + r.logCancel() + errs = append(errs, r.app.CloseAppLogFile(), r.app.CloseDaprdLogFile()) + } + return errors.Join(errs...) +} + +func monitorK8sPods(ctx context.Context, client k8s.Interface, namespace string, runStates []runState, sigCh chan os.Signal) { + // for each app wait for pod to be deleted, if all pods are deleted, then send shutdown signal to the cli process. + + wg := sync.WaitGroup{} + + for _, r := range runStates { + go func(appID string, wg *sync.WaitGroup) { + err := waitPodDeleted(ctx, client, namespace, r.app.AppID) + if err != nil && strings.Contains(err.Error(), podWatchErrTemplate) { + print.WarningStatusEvent(os.Stderr, "Error monitoring Kubernetes pod(s) for app %q.", appID) + } + wg.Done() + }(r.app.AppID, &wg) + wg.Add(1) + } + wg.Wait() + // Send signal to gracefully close log writers and shut down process. + sigCh <- syscall.SIGINT } diff --git a/pkg/kubernetes/stop.go b/pkg/kubernetes/stop.go new file mode 100644 index 000000000..04db05568 --- /dev/null +++ b/pkg/kubernetes/stop.go @@ -0,0 +1,51 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "errors" + "fmt" + "path/filepath" + + corev1 "k8s.io/api/core/v1" + + "github.com/dapr/cli/pkg/runfileconfig" +) + +func Stop(runFilePath string, config runfileconfig.RunFileConfig) error { + errs := []error{} + // get k8s client. + client, cErr := Client() + if cErr != nil { + return fmt.Errorf("error getting k8s client for monitoring pod deletion: %w", cErr) + } + + namespace := corev1.NamespaceDefault + for _, app := range config.Apps { + deployDir := app.GetDeployDir() + serviceFilePath := filepath.Join(deployDir, serviceFileName) + deploymentFilePath := filepath.Join(deployDir, deploymentFileName) + if app.CreateService { + errs = append(errs, deleteYamlK8s(serviceFilePath)) + } + errs = append(errs, deleteYamlK8s(deploymentFilePath)) + ctx, cancel := context.WithTimeout(context.Background(), podCreationDeletionTimeout) + + // Ignoring errors here as it will anyway be printed in the other dapr cli process. + waitPodDeleted(ctx, client, namespace, app.AppID) + cancel() + } + return errors.Join(errs...) +} diff --git a/pkg/kubernetes/testdata/runfile/apps.yaml b/pkg/kubernetes/testdata/runfile/apps.yaml new file mode 100644 index 000000000..70aeda7e7 --- /dev/null +++ b/pkg/kubernetes/testdata/runfile/apps.yaml @@ -0,0 +1,11 @@ +version: 1 +common: +apps: + - appDirPath: ./nodeapp/ + appPort: 3000 + containerImage: ghcr.io/dapr/samples/hello-k8s-node:latest + createService: true + env: + APP_PORT: 3000 + - appDirPath: ./pythonapp/ + containerImage: ghcr.io/dapr/samples/hello-k8s-python:latest \ No newline at end of file diff --git a/pkg/standalone/testdata/runfileconfig/app/resources/.gitkeep b/pkg/kubernetes/testdata/runfile/nodeapp/.gitkeep similarity index 100% rename from pkg/standalone/testdata/runfileconfig/app/resources/.gitkeep rename to pkg/kubernetes/testdata/runfile/nodeapp/.gitkeep diff --git a/pkg/standalone/testdata/runfileconfig/app_precedence_rule/.gitkeep b/pkg/kubernetes/testdata/runfile/pythonapp/.gitkeep similarity index 100% rename from pkg/standalone/testdata/runfileconfig/app_precedence_rule/.gitkeep rename to pkg/kubernetes/testdata/runfile/pythonapp/.gitkeep diff --git a/pkg/kubernetes/uninstall.go b/pkg/kubernetes/uninstall.go index 9a866954b..7a0240920 100644 --- a/pkg/kubernetes/uninstall.go +++ b/pkg/kubernetes/uninstall.go @@ -24,7 +24,7 @@ import ( ) // Uninstall removes Dapr from a Kubernetes cluster. -func Uninstall(namespace string, uninstallAll bool, timeout uint) error { +func Uninstall(namespace string, uninstallAll bool, uninstallDev bool, timeout uint) error { config, err := helmConfig(namespace) if err != nil { return err @@ -48,6 +48,11 @@ func Uninstall(namespace string, uninstallAll bool, timeout uint) error { // Deleting Dashboard here is for versions >= 1.11. uninstallClient.Run(dashboardReleaseName) + if uninstallDev { + // uninstall dapr-dev-zipkin and dapr-dev-redis as best effort. + uninstallThirdParty() + } + _, err = uninstallClient.Run(daprReleaseName) if err != nil { @@ -65,3 +70,14 @@ func Uninstall(namespace string, uninstallAll bool, timeout uint) error { return nil } + +func uninstallThirdParty() { + print.InfoStatusEvent(os.Stdout, "Removing dapr-dev-redis and dapr-dev-zipkin from the cluster...") + // Uninstall dapr-dev-redis and dapr-dev-zipkin from k8s as best effort. + config, _ := helmConfig(thirdPartyDevNamespace) + + uninstallClient := helm.NewUninstall(config) + + uninstallClient.Run(redisReleaseName) + uninstallClient.Run(zipkinReleaseName) +} diff --git a/pkg/kubernetes/upgrade.go b/pkg/kubernetes/upgrade.go index 80bc0aac2..f2c944ce7 100644 --- a/pkg/kubernetes/upgrade.go +++ b/pkg/kubernetes/upgrade.go @@ -57,6 +57,7 @@ type UpgradeConfig struct { } func Upgrade(conf UpgradeConfig) error { + helmRepo := utils.GetEnv("DAPR_HELM_REPO_URL", daprHelmRepo) status, err := GetDaprResourcesStatus() if err != nil { return err @@ -75,7 +76,7 @@ func Upgrade(conf UpgradeConfig) error { return err } - controlPlaneChart, err := daprChart(conf.RuntimeVersion, "dapr", helmConf) + controlPlaneChart, err := getHelmChart(conf.RuntimeVersion, "dapr", helmRepo, helmConf) if err != nil { return err } @@ -109,7 +110,7 @@ func Upgrade(conf UpgradeConfig) error { var dashboardChart *chart.Chart if conf.DashboardVersion != "" { - dashboardChart, err = daprChart(conf.DashboardVersion, dashboardReleaseName, helmConf) + dashboardChart, err = getHelmChart(conf.DashboardVersion, dashboardReleaseName, helmRepo, helmConf) if err != nil { return err } @@ -176,7 +177,7 @@ func Upgrade(conf UpgradeConfig) error { } } else { // We need to install Dashboard since it does not exist yet. - err = install(dashboardReleaseName, conf.DashboardVersion, InitConfiguration{ + err = install(dashboardReleaseName, conf.DashboardVersion, helmRepo, InitConfiguration{ DashboardVersion: conf.DashboardVersion, Namespace: upgradeClient.Namespace, Wait: upgradeClient.Wait, diff --git a/pkg/runexec/runexec.go b/pkg/runexec/runexec.go index abf066705..4eab89153 100644 --- a/pkg/runexec/runexec.go +++ b/pkg/runexec/runexec.go @@ -16,8 +16,10 @@ package runexec import ( "fmt" "io" + "os" "os/exec" + "github.com/dapr/cli/pkg/runfileconfig" "github.com/dapr/cli/pkg/standalone" ) @@ -129,3 +131,26 @@ func NewOutput(config *standalone.RunConfig) (*RunOutput, error) { DaprGRPCPort: config.GRPCPort, }, nil } + +// GetAppDaprdWriter returns the writer for writing logs common to both daprd, app and stdout. +func GetAppDaprdWriter(app runfileconfig.App, isAppCommandEmpty bool) io.Writer { + var appDaprdWriter io.Writer + if isAppCommandEmpty { + if app.DaprdLogDestination != standalone.Console { + appDaprdWriter = io.MultiWriter(os.Stdout, app.DaprdLogWriteCloser) + } else { + appDaprdWriter = os.Stdout + } + } else { + if app.AppLogDestination != standalone.Console && app.DaprdLogDestination != standalone.Console { + appDaprdWriter = io.MultiWriter(app.AppLogWriteCloser, app.DaprdLogWriteCloser, os.Stdout) + } else if app.AppLogDestination != standalone.Console { + appDaprdWriter = io.MultiWriter(app.AppLogWriteCloser, os.Stdout) + } else if app.DaprdLogDestination != standalone.Console { + appDaprdWriter = io.MultiWriter(app.DaprdLogWriteCloser, os.Stdout) + } else { + appDaprdWriter = os.Stdout + } + } + return appDaprdWriter +} diff --git a/pkg/standalone/runfileconfig/run_file_config.go b/pkg/runfileconfig/run_file_config.go similarity index 69% rename from pkg/standalone/runfileconfig/run_file_config.go rename to pkg/runfileconfig/run_file_config.go index 422d4d6b9..50f17a977 100644 --- a/pkg/standalone/runfileconfig/run_file_config.go +++ b/pkg/runfileconfig/run_file_config.go @@ -27,6 +27,7 @@ const ( daprdLogFileNamePrefix = "daprd" logFileExtension = ".log" logsDir = "logs" + deployDir = "deploy" ) // RunFileConfig represents the complete configuration options for the run file. @@ -38,14 +39,21 @@ type RunFileConfig struct { Name string `yaml:"name,omitempty"` } +// ContainerConfiguration represents the application container configuration parameters. +type ContainerConfiguration struct { + ContainerImage string `yaml:"containerImage"` + CreateService bool `yaml:"createService"` +} + // App represents the configuration options for the apps in the run file. type App struct { - standalone.RunConfig `yaml:",inline"` - AppDirPath string `yaml:"appDirPath"` - AppLogFileName string - DaprdLogFileName string - AppLogWriteCloser io.WriteCloser - DaprdLogWriteCloser io.WriteCloser + standalone.RunConfig `yaml:",inline"` + ContainerConfiguration `yaml:",inline"` + AppDirPath string `yaml:"appDirPath"` + AppLogFileName string + DaprdLogFileName string + AppLogWriteCloser io.WriteCloser + DaprdLogWriteCloser io.WriteCloser } // Common represents the configuration options for the common section in the run file. @@ -59,6 +67,12 @@ func (a *App) GetLogsDir() string { return logsPath } +func (a *App) GetDeployDir() string { + logsPath := filepath.Join(a.AppDirPath, standalone.DefaultDaprDirName, deployDir) + os.MkdirAll(logsPath, 0o755) + return logsPath +} + // CreateAppLogFile creates the log file, sets internal file handle // and returns error if any. func (a *App) CreateAppLogFile() error { @@ -104,14 +118,32 @@ func (a *App) createLogFile(logType string) (*os.File, error) { func (a *App) CloseAppLogFile() error { if a.AppLogWriteCloser != nil { - return a.AppLogWriteCloser.Close() + err := a.AppLogWriteCloser.Close() + a.AppLogWriteCloser = nil + return err } return nil } func (a *App) CloseDaprdLogFile() error { if a.DaprdLogWriteCloser != nil { - return a.DaprdLogWriteCloser.Close() + err := a.DaprdLogWriteCloser.Close() + a.DaprdLogWriteCloser = nil + return err } return nil } + +// GetLogWriter returns the log writer based on the log destination. +func GetLogWriter(fileLogWriterCloser io.WriteCloser, logDestination standalone.LogDestType) io.Writer { + var logWriter io.Writer + switch logDestination { + case standalone.Console: + logWriter = os.Stdout + case standalone.File: + logWriter = fileLogWriterCloser + case standalone.FileAndConsole: + logWriter = io.MultiWriter(os.Stdout, fileLogWriterCloser) + } + return logWriter +} diff --git a/pkg/standalone/runfileconfig/run_file_config_parser.go b/pkg/runfileconfig/run_file_config_parser.go similarity index 100% rename from pkg/standalone/runfileconfig/run_file_config_parser.go rename to pkg/runfileconfig/run_file_config_parser.go diff --git a/pkg/standalone/runfileconfig/run_file_config_parser_test.go b/pkg/runfileconfig/run_file_config_parser_test.go similarity index 93% rename from pkg/standalone/runfileconfig/run_file_config_parser_test.go rename to pkg/runfileconfig/run_file_config_parser_test.go index 7527a4c2f..8fd009e73 100644 --- a/pkg/standalone/runfileconfig/run_file_config_parser_test.go +++ b/pkg/runfileconfig/run_file_config_parser_test.go @@ -25,13 +25,13 @@ import ( ) var ( - validRunFilePath = filepath.Join("..", "testdata", "runfileconfig", "test_run_config.yaml") - invalidRunFilePath1 = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_invalid_path.yaml") - invalidRunFilePath2 = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_empty_app_dir.yaml") - runFileForPrecedenceRule = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_precedence_rule.yaml") - runFileForPrecedenceRuleDaprDir = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_precedence_rule_dapr_dir.yaml") - runFileForLogDestination = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_log_destination.yaml") - runFileForMultiResourcePaths = filepath.Join("..", "testdata", "runfileconfig", "test_run_config_multiple_resources_paths.yaml") + validRunFilePath = filepath.Join(".", "testdata", "test_run_config.yaml") + invalidRunFilePath1 = filepath.Join(".", "testdata", "test_run_config_invalid_path.yaml") + invalidRunFilePath2 = filepath.Join(".", "testdata", "test_run_config_empty_app_dir.yaml") + runFileForPrecedenceRule = filepath.Join(".", "testdata", "test_run_config_precedence_rule.yaml") + runFileForPrecedenceRuleDaprDir = filepath.Join(".", "testdata", "test_run_config_precedence_rule_dapr_dir.yaml") + runFileForLogDestination = filepath.Join(".", "testdata", "test_run_config_log_destination.yaml") + runFileForMultiResourcePaths = filepath.Join(".", "testdata", "test_run_config_multiple_resources_paths.yaml") ) func TestRunConfigFile(t *testing.T) { diff --git a/pkg/standalone/testdata/runfileconfig/app/config.yaml b/pkg/runfileconfig/testdata/app/config.yaml similarity index 100% rename from pkg/standalone/testdata/runfileconfig/app/config.yaml rename to pkg/runfileconfig/testdata/app/config.yaml diff --git a/pkg/standalone/testdata/runfileconfig/backend/.dapr/resources/.gitkeep b/pkg/runfileconfig/testdata/app/resources/.gitkeep similarity index 100% rename from pkg/standalone/testdata/runfileconfig/backend/.dapr/resources/.gitkeep rename to pkg/runfileconfig/testdata/app/resources/.gitkeep diff --git a/pkg/standalone/testdata/runfileconfig/backend/.gitkeep b/pkg/runfileconfig/testdata/app_precedence_rule/.gitkeep similarity index 100% rename from pkg/standalone/testdata/runfileconfig/backend/.gitkeep rename to pkg/runfileconfig/testdata/app_precedence_rule/.gitkeep diff --git a/pkg/standalone/testdata/runfileconfig/backend/.dapr/config.yaml b/pkg/runfileconfig/testdata/backend/.dapr/config.yaml similarity index 100% rename from pkg/standalone/testdata/runfileconfig/backend/.dapr/config.yaml rename to pkg/runfileconfig/testdata/backend/.dapr/config.yaml diff --git a/pkg/standalone/testdata/runfileconfig/webapp/resources/.gitkeep b/pkg/runfileconfig/testdata/backend/.dapr/resources/.gitkeep similarity index 100% rename from pkg/standalone/testdata/runfileconfig/webapp/resources/.gitkeep rename to pkg/runfileconfig/testdata/backend/.dapr/resources/.gitkeep diff --git a/pkg/standalone/testdata/runfileconfig/custom_dapr_dir/config.yaml b/pkg/runfileconfig/testdata/backend/.gitkeep similarity index 100% rename from pkg/standalone/testdata/runfileconfig/custom_dapr_dir/config.yaml rename to pkg/runfileconfig/testdata/backend/.gitkeep diff --git a/pkg/standalone/testdata/runfileconfig/webapp/config.yaml b/pkg/runfileconfig/testdata/custom_dapr_dir/config.yaml similarity index 100% rename from pkg/standalone/testdata/runfileconfig/webapp/config.yaml rename to pkg/runfileconfig/testdata/custom_dapr_dir/config.yaml diff --git a/pkg/standalone/testdata/runfileconfig/test_run_config.yaml b/pkg/runfileconfig/testdata/test_run_config.yaml similarity index 100% rename from pkg/standalone/testdata/runfileconfig/test_run_config.yaml rename to pkg/runfileconfig/testdata/test_run_config.yaml diff --git a/pkg/standalone/testdata/runfileconfig/test_run_config_empty_app_dir.yaml b/pkg/runfileconfig/testdata/test_run_config_empty_app_dir.yaml similarity index 100% rename from pkg/standalone/testdata/runfileconfig/test_run_config_empty_app_dir.yaml rename to pkg/runfileconfig/testdata/test_run_config_empty_app_dir.yaml diff --git a/pkg/standalone/testdata/runfileconfig/test_run_config_invalid_path.yaml b/pkg/runfileconfig/testdata/test_run_config_invalid_path.yaml similarity index 100% rename from pkg/standalone/testdata/runfileconfig/test_run_config_invalid_path.yaml rename to pkg/runfileconfig/testdata/test_run_config_invalid_path.yaml diff --git a/pkg/standalone/testdata/runfileconfig/test_run_config_log_destination.yaml b/pkg/runfileconfig/testdata/test_run_config_log_destination.yaml similarity index 100% rename from pkg/standalone/testdata/runfileconfig/test_run_config_log_destination.yaml rename to pkg/runfileconfig/testdata/test_run_config_log_destination.yaml diff --git a/pkg/standalone/testdata/runfileconfig/test_run_config_multiple_resources_paths.yaml b/pkg/runfileconfig/testdata/test_run_config_multiple_resources_paths.yaml similarity index 100% rename from pkg/standalone/testdata/runfileconfig/test_run_config_multiple_resources_paths.yaml rename to pkg/runfileconfig/testdata/test_run_config_multiple_resources_paths.yaml diff --git a/pkg/standalone/testdata/runfileconfig/test_run_config_precedence_rule.yaml b/pkg/runfileconfig/testdata/test_run_config_precedence_rule.yaml similarity index 100% rename from pkg/standalone/testdata/runfileconfig/test_run_config_precedence_rule.yaml rename to pkg/runfileconfig/testdata/test_run_config_precedence_rule.yaml diff --git a/pkg/standalone/testdata/runfileconfig/test_run_config_precedence_rule_dapr_dir.yaml b/pkg/runfileconfig/testdata/test_run_config_precedence_rule_dapr_dir.yaml similarity index 100% rename from pkg/standalone/testdata/runfileconfig/test_run_config_precedence_rule_dapr_dir.yaml rename to pkg/runfileconfig/testdata/test_run_config_precedence_rule_dapr_dir.yaml diff --git a/pkg/runfileconfig/testdata/webapp/config.yaml b/pkg/runfileconfig/testdata/webapp/config.yaml new file mode 100644 index 000000000..e69de29bb diff --git a/pkg/runfileconfig/testdata/webapp/resources/.gitkeep b/pkg/runfileconfig/testdata/webapp/resources/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/pkg/standalone/run.go b/pkg/standalone/run.go index f43fb4a31..7e7e5c9d0 100644 --- a/pkg/standalone/run.go +++ b/pkg/standalone/run.go @@ -47,39 +47,46 @@ const ( // RunConfig represents the application configuration parameters. type RunConfig struct { SharedRunConfig `yaml:",inline"` - AppID string `env:"APP_ID" arg:"app-id" yaml:"appID"` + AppID string `env:"APP_ID" arg:"app-id" annotation:"dapr.io/app-id" yaml:"appID"` AppChannelAddress string `env:"APP_CHANNEL_ADDRESS" arg:"app-channel-address" ifneq:"127.0.0.1" yaml:"appChannelAddress"` - AppPort int `env:"APP_PORT" arg:"app-port" yaml:"appPort" default:"-1"` + AppPort int `env:"APP_PORT" arg:"app-port" annotation:"dapr.io/app-port" yaml:"appPort" default:"-1"` HTTPPort int `env:"DAPR_HTTP_PORT" arg:"dapr-http-port" yaml:"daprHTTPPort" default:"-1"` GRPCPort int `env:"DAPR_GRPC_PORT" arg:"dapr-grpc-port" yaml:"daprGRPCPort" default:"-1"` ProfilePort int `arg:"profile-port" yaml:"profilePort" default:"-1"` Command []string `yaml:"command"` - MetricsPort int `env:"DAPR_METRICS_PORT" arg:"metrics-port" yaml:"metricsPort" default:"-1"` - UnixDomainSocket string `arg:"unix-domain-socket" yaml:"unixDomainSocket"` + MetricsPort int `env:"DAPR_METRICS_PORT" arg:"metrics-port" annotation:"dapr.io/metrics-port" yaml:"metricsPort" default:"-1"` + UnixDomainSocket string `arg:"unix-domain-socket" annotation:"dapr.io/unix-domain-socket-path" yaml:"unixDomainSocket"` InternalGRPCPort int `arg:"dapr-internal-grpc-port" yaml:"daprInternalGRPCPort" default:"-1"` } // SharedRunConfig represents the application configuration parameters, which can be shared across many apps. type SharedRunConfig struct { - ConfigFile string `arg:"config" yaml:"configFilePath"` - AppProtocol string `arg:"app-protocol" yaml:"appProtocol" default:"http"` - APIListenAddresses string `arg:"dapr-listen-addresses" yaml:"apiListenAddresses"` - EnableProfiling bool `arg:"enable-profiling" yaml:"enableProfiling"` - LogLevel string `arg:"log-level" yaml:"logLevel"` - MaxConcurrency int `arg:"app-max-concurrency" yaml:"appMaxConcurrency" default:"-1"` - PlacementHostAddr string `arg:"placement-host-address" yaml:"placementHostAddress"` - ComponentsPath string `arg:"components-path"` // Deprecated in run template file: use ResourcesPaths instead. - ResourcesPath string `yaml:"resourcesPath"` // Deprecated in run template file: use ResourcesPaths instead. - ResourcesPaths []string `arg:"resources-path" yaml:"resourcesPaths"` - AppSSL bool `arg:"app-ssl" yaml:"appSSL"` - MaxRequestBodySize int `arg:"dapr-http-max-request-size" yaml:"daprHTTPMaxRequestSize" default:"-1"` - HTTPReadBufferSize int `arg:"dapr-http-read-buffer-size" yaml:"daprHTTPReadBufferSize" default:"-1"` - EnableAppHealth bool `arg:"enable-app-health-check" yaml:"enableAppHealthCheck"` - AppHealthPath string `arg:"app-health-check-path" yaml:"appHealthCheckPath"` - AppHealthInterval int `arg:"app-health-probe-interval" ifneq:"0" yaml:"appHealthProbeInterval"` - AppHealthTimeout int `arg:"app-health-probe-timeout" ifneq:"0" yaml:"appHealthProbeTimeout"` - AppHealthThreshold int `arg:"app-health-threshold" ifneq:"0" yaml:"appHealthThreshold"` - EnableAPILogging bool `arg:"enable-api-logging" yaml:"enableApiLogging"` + // Specifically omitted from annotations see https://github.com/dapr/cli/issues/1324 + ConfigFile string `arg:"config" yaml:"configFilePath"` + AppProtocol string `arg:"app-protocol" annotation:"dapr.io/app-protocol" yaml:"appProtocol" default:"http"` + APIListenAddresses string `arg:"dapr-listen-addresses" annotation:"dapr.io/sidecar-listen-address" yaml:"apiListenAddresses"` + EnableProfiling bool `arg:"enable-profiling" annotation:"dapr.io/enable-profiling" yaml:"enableProfiling"` + LogLevel string `arg:"log-level" annotation:"dapr.io.log-level" yaml:"logLevel"` + MaxConcurrency int `arg:"app-max-concurrency" annotation:"dapr.io/app-max-concurrerncy" yaml:"appMaxConcurrency" default:"-1"` + // Speicifcally omitted from annotations similar to config file path above. + PlacementHostAddr string `arg:"placement-host-address" yaml:"placementHostAddress"` + // Speicifcally omitted from annotations similar to config file path above. + ComponentsPath string `arg:"components-path"` // Deprecated in run template file: use ResourcesPaths instead. + // Speicifcally omitted from annotations similar to config file path above. + ResourcesPath string `yaml:"resourcesPath"` // Deprecated in run template file: use ResourcesPaths instead. + // Speicifcally omitted from annotations similar to config file path above. + ResourcesPaths []string `arg:"resources-path" yaml:"resourcesPaths"` + // Speicifcally omitted from annotations as appSSL is deprecated. + AppSSL bool `arg:"app-ssl" yaml:"appSSL"` + MaxRequestBodySize int `arg:"dapr-http-max-request-size" annotation:"dapr.io/http-max-request-size" yaml:"daprHTTPMaxRequestSize" default:"-1"` + HTTPReadBufferSize int `arg:"dapr-http-read-buffer-size" annotation:"dapr.io/http-read-buffer-size" yaml:"daprHTTPReadBufferSize" default:"-1"` + EnableAppHealth bool `arg:"enable-app-health-check" annotation:"dapr.io/enable-app-health-check" yaml:"enableAppHealthCheck"` + AppHealthPath string `arg:"app-health-check-path" annotation:"dapr.io/app-health-check-path" yaml:"appHealthCheckPath"` + AppHealthInterval int `arg:"app-health-probe-interval" annotation:"dapr.io/app-health-probe-interval" ifneq:"0" yaml:"appHealthProbeInterval"` + AppHealthTimeout int `arg:"app-health-probe-timeout" annotation:"dapr.io/app-health-probe-timeout" ifneq:"0" yaml:"appHealthProbeTimeout"` + AppHealthThreshold int `arg:"app-health-threshold" annotation:"dapr.io/app-health-threshold" ifneq:"0" yaml:"appHealthThreshold"` + EnableAPILogging bool `arg:"enable-api-logging" annotation:"dapr.io/enable-api-logging" yaml:"enableApiLogging"` + // Specifically omitted from annotations see https://github.com/dapr/cli/issues/1324 . DaprdInstallPath string `yaml:"runtimePath"` Env map[string]string `yaml:"env"` DaprdLogDestination LogDestType `yaml:"daprdLogDestination"` @@ -213,6 +220,36 @@ func (config *RunConfig) Validate() error { return nil } +func (config *RunConfig) ValidateK8s() error { + meta, err := newDaprMeta() + if err != nil { + return err + } + + if config.AppID == "" { + config.AppID = meta.newAppID() + } + if config.AppPort < 0 { + config.AppPort = 0 + } + err = config.validatePort("MetricsPort", &config.MetricsPort, meta) + if err != nil { + return err + } + if config.MaxConcurrency < 1 { + config.MaxConcurrency = -1 + } + if config.MaxRequestBodySize < 0 { + config.MaxRequestBodySize = -1 + } + + if config.HTTPReadBufferSize < 0 { + config.HTTPReadBufferSize = -1 + } + + return nil +} + type DaprMeta struct { ExistingIDs map[string]bool ExistingPorts map[int]bool @@ -409,6 +446,50 @@ func (config *RunConfig) getAppProtocol() string { } } +func (config *RunConfig) GetEnv() map[string]string { + env := map[string]string{} + schema := reflect.ValueOf(*config) + for i := 0; i < schema.NumField(); i++ { + valueField := schema.Field(i).Interface() + typeField := schema.Type().Field(i) + key := typeField.Tag.Get("env") + if len(key) == 0 { + continue + } + if value, ok := valueField.(int); ok && value <= 0 { + // ignore unset numeric variables. + continue + } + + value := fmt.Sprintf("%v", reflect.ValueOf(valueField)) + env[key] = value + } + for k, v := range config.Env { + env[k] = v + } + return env +} + +func (config *RunConfig) GetAnnotations() map[string]string { + annotations := map[string]string{} + schema := reflect.ValueOf(*config) + for i := 0; i < schema.NumField(); i++ { + valueField := schema.Field(i).Interface() + typeField := schema.Type().Field(i) + key := typeField.Tag.Get("annotation") + if len(key) == 0 { + continue + } + if value, ok := valueField.(int); ok && value <= 0 { + // ignore unset numeric variables. + continue + } + value := fmt.Sprintf("%v", reflect.ValueOf(valueField)) + annotations[key] = value + } + return annotations +} + func GetDaprCommand(config *RunConfig) (*exec.Cmd, error) { daprCMD, err := lookupBinaryFilePath(config.DaprdInstallPath, "daprd") if err != nil { diff --git a/tests/e2e/common/common.go b/tests/e2e/common/common.go index ffc3d8d3f..59ee0c6e4 100644 --- a/tests/e2e/common/common.go +++ b/tests/e2e/common/common.go @@ -48,6 +48,10 @@ const ( numHAPods = 13 numNonHAPods = 5 + + thirdPartyDevNamespace = "default" + devRedisReleaseName = "dapr-dev-redis" + devZipkinReleaseName = "dapr-dev-zipkin" ) type VersionDetails struct { @@ -61,6 +65,7 @@ type VersionDetails struct { } type TestOptions struct { + DevEnabled bool HAEnabled bool MTLSEnabled bool ApplyComponentChanges bool @@ -189,12 +194,12 @@ func GetTestsOnInstall(details VersionDetails, opts TestOptions) []TestCase { func GetTestsOnUninstall(details VersionDetails, opts TestOptions) []TestCase { return []TestCase{ - {"uninstall " + details.RuntimeVersion, uninstallTest(opts.UninstallAll)}, // waits for pod deletion. + {"uninstall " + details.RuntimeVersion, uninstallTest(opts.UninstallAll, opts.DevEnabled)}, // waits for pod deletion. {"cluster not exist", kubernetesTestOnUninstall()}, {"crds exist on uninstall " + details.RuntimeVersion, CRDTest(details, opts)}, {"clusterroles not exist " + details.RuntimeVersion, ClusterRolesTest(details, opts)}, {"clusterrolebindings not exist " + details.RuntimeVersion, ClusterRoleBindingsTest(details, opts)}, - {"check components exist on uninstall " + details.RuntimeVersion, componentsTestOnUninstall(opts.UninstallAll)}, + {"check components exist on uninstall " + details.RuntimeVersion, componentsTestOnUninstall(opts)}, {"check httpendpoints exist on uninstall " + details.RuntimeVersion, httpEndpointsTestOnUninstall(opts)}, {"check mtls error " + details.RuntimeVersion, uninstallMTLSTest()}, {"check status error " + details.RuntimeVersion, statusTestOnUninstall()}, @@ -293,13 +298,19 @@ func ComponentsTestOnInstallUpgrade(opts TestOptions) func(t *testing.T) { output, err = spawn.Command("kubectl", "apply", "-f", "../testdata/statestore.yaml") t.Log(output) require.NoError(t, err, "expected no error on kubectl apply") - require.Equal(t, "component.dapr.io/statestore created\ncomponent.dapr.io/statestore created\n", output, "expceted output to match") + // if Dev install, statestore in default namespace will already be created as part of dev install once, so the above command output will be + // changed to statestore configured for the default namespace statestore. + if opts.DevEnabled { + require.Equal(t, "component.dapr.io/statestore configured\ncomponent.dapr.io/statestore created\n", output, "expceted output to match") + } else { + require.Equal(t, "component.dapr.io/statestore created\ncomponent.dapr.io/statestore created\n", output, "expceted output to match") + } } t.Log("check applied component exists") output, err := spawn.Command(daprPath, "components", "-k") require.NoError(t, err, "expected no error on calling dapr components") - componentOutputCheck(t, output, false) + componentOutputCheck(t, opts, output) } } @@ -747,6 +758,10 @@ func installTest(details VersionDetails, opts TestOptions) func(t *testing.T) { "-n", DaprTestNamespace, "--log-as-json", } + if opts.DevEnabled { + t.Log("install dev mode") + args = append(args, "--dev") + } if !details.UseDaprLatestVersion { // TODO: Pass dashboard-version also when charts are released. args = append(args, "--runtime-version", details.RuntimeVersion) @@ -776,10 +791,13 @@ func installTest(details VersionDetails, opts TestOptions) func(t *testing.T) { require.NoError(t, err, "init failed") validatePodsOnInstallUpgrade(t, details) + if opts.DevEnabled { + validateThirdpartyPodsOnInit(t) + } } } -func uninstallTest(all bool) func(t *testing.T) { +func uninstallTest(all bool, devEnabled bool) func(t *testing.T) { return func(t *testing.T) { output, err := EnsureUninstall(all) t.Log(output) @@ -792,11 +810,23 @@ func uninstallTest(all bool) func(t *testing.T) { go waitPodDeletion(t, done, podsDeleted) select { case <-podsDeleted: - t.Log("pods were deleted as expected on uninstall") - return + t.Log("dapr pods were deleted as expected on uninstall") case <-time.After(2 * time.Minute): done <- struct{}{} - t.Error("timeout verifying pods were deleted as expectedx") + t.Error("timeout verifying pods were deleted as expected") + return + } + if devEnabled { + t.Log("waiting for dapr dev pods to be deleted") + go waitPodDeletionDev(t, done, podsDeleted) + select { + case <-podsDeleted: + t.Log("dapr dev pods were deleted as expected on uninstall dev") + return + case <-time.After(2 * time.Minute): + done <- struct{}{} + t.Error("timeout verifying pods were deleted as expected") + } } } } @@ -823,7 +853,7 @@ func uninstallMTLSTest() func(t *testing.T) { } } -func componentsTestOnUninstall(all bool) func(t *testing.T) { +func componentsTestOnUninstall(opts TestOptions) func(t *testing.T) { return func(t *testing.T) { daprPath := GetDaprPath() // On Dapr uninstall CRDs are not removed, consequently the components will not be removed. @@ -831,10 +861,10 @@ func componentsTestOnUninstall(all bool) func(t *testing.T) { // For now the components remain. output, err := spawn.Command(daprPath, "components", "-k") require.NoError(t, err, "expected no error on calling dapr components") - componentOutputCheck(t, output, all) + componentOutputCheck(t, opts, output) // If --all, then the below does not need to run. - if all { + if opts.UninstallAll { output, err = spawn.Command("kubectl", "delete", "-f", "../testdata/namespace.yaml") require.NoError(t, err, "expected no error on kubectl delete") t.Log(output) @@ -898,29 +928,37 @@ func statusTestOnUninstall() func(t *testing.T) { } } -func componentOutputCheck(t *testing.T, output string, all bool) { +func componentOutputCheck(t *testing.T, opts TestOptions, output string) { output = strings.TrimSpace(output) // remove empty string. lines := strings.Split(output, "\n") for i, line := range lines { t.Logf("num:%d line:%+v", i, line) } - if all { + if opts.UninstallAll { assert.Equal(t, 2, len(lines), "expected at 0 components and 2 output lines") return } lines = strings.Split(output, "\n")[2:] // remove header and warning message. - assert.Equal(t, 2, len(lines), "expected 2 components") // default and test namespace components. + if opts.DevEnabled { + // default, test statestore. + // default pubsub. + // 3 components. + assert.Equal(t, 3, len(lines), "expected 3 components") + } else { + assert.Equal(t, 2, len(lines), "expected 2 components") // default and test namespace components. - // for fresh cluster only one component yaml has been applied. - testNsFields := strings.Fields(lines[0]) - defaultNsFields := strings.Fields(lines[1]) + // for fresh cluster only one component yaml has been applied. + testNsFields := strings.Fields(lines[0]) + defaultNsFields := strings.Fields(lines[1]) - // Fields splits on space, so Created time field might be split again. - namespaceComponentOutputCheck(t, testNsFields, "test") - namespaceComponentOutputCheck(t, defaultNsFields, "default") + // Fields splits on space, so Created time field might be split again. + // Scopes are only applied in for this scenario in tests. + namespaceComponentOutputCheck(t, testNsFields, "test") + namespaceComponentOutputCheck(t, defaultNsFields, "default") + } } func namespaceComponentOutputCheck(t *testing.T, fields []string, namespace string) { @@ -943,6 +981,41 @@ func httpEndpointOutputCheck(t *testing.T, output string) { assert.Contains(t, output, "httpendpoint") } +func validateThirdpartyPodsOnInit(t *testing.T) { + ctx := context.Background() + ctxt, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + k8sClient, err := getClient() + require.NoError(t, err) + list, err := k8sClient.CoreV1().Pods(thirdPartyDevNamespace).List(ctxt, v1.ListOptions{ + Limit: 100, + }) + require.NoError(t, err) + notFound := map[string]struct{}{ + devRedisReleaseName: {}, + devZipkinReleaseName: {}, + } + prefixes := map[string]string{ + devRedisReleaseName: "dapr-dev-redis-master-", + devZipkinReleaseName: "dapr-dev-zipkin-", + } + for _, pod := range list.Items { + t.Log(pod.ObjectMeta.Name) + for component, prefix := range prefixes { + if pod.Status.Phase != core_v1.PodRunning { + continue + } + if !pod.Status.ContainerStatuses[0].Ready { + continue + } + if strings.HasPrefix(pod.ObjectMeta.Name, prefix) { + delete(notFound, component) + } + } + } + assert.Empty(t, notFound) +} + func validatePodsOnInstallUpgrade(t *testing.T, details VersionDetails) { ctx := context.Background() ctxt, cancel := context.WithTimeout(ctx, 10*time.Second) @@ -1010,6 +1083,52 @@ func validatePodsOnInstallUpgrade(t *testing.T, details VersionDetails) { assert.Empty(t, notFound) } +func waitPodDeletionDev(t *testing.T, done, podsDeleted chan struct{}) { + for { + select { + case <-done: // if timeout was reached. + return + default: + break + } + ctx := context.Background() + ctxt, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + k8sClient, err := getClient() + require.NoError(t, err, "error getting k8s client for pods check") + list, err := k8sClient.CoreV1().Pods(thirdPartyDevNamespace).List(ctxt, v1.ListOptions{ + Limit: 100, + }) + require.NoError(t, err) + found := map[string]struct{}{ + devRedisReleaseName: {}, + devZipkinReleaseName: {}, + } + prefixes := map[string]string{ + devRedisReleaseName: "dapr-dev-redis-master-", + devZipkinReleaseName: "dapr-dev-zipkin-", + } + for _, pod := range list.Items { + t.Log(pod.ObjectMeta.Name) + for component, prefix := range prefixes { + if pod.Status.Phase != core_v1.PodRunning { + continue + } + if !pod.Status.ContainerStatuses[0].Ready { + continue + } + if strings.HasPrefix(pod.ObjectMeta.Name, prefix) { + delete(found, component) + } + } + } + if len(found) == 2 { + podsDeleted <- struct{}{} + } + time.Sleep(15 * time.Second) + } +} + func waitPodDeletion(t *testing.T, done, podsDeleted chan struct{}) { for { select { diff --git a/tests/e2e/kubernetes/kubernetes_test.go b/tests/e2e/kubernetes/kubernetes_test.go index fecd713b4..7952e6ad3 100644 --- a/tests/e2e/kubernetes/kubernetes_test.go +++ b/tests/e2e/kubernetes/kubernetes_test.go @@ -118,6 +118,40 @@ func TestKubernetesHAModeMTLSDisabled(t *testing.T) { } } +func TestKubernetesDev(t *testing.T) { + // ensure clean env for test + ensureCleanEnv(t, false) + + // setup tests + tests := []common.TestCase{} + tests = append(tests, common.GetTestsOnInstall(currentVersionDetails, common.TestOptions{ + DevEnabled: true, + HAEnabled: false, + MTLSEnabled: true, + ApplyComponentChanges: true, + CheckResourceExists: map[common.Resource]bool{ + common.CustomResourceDefs: true, + common.ClusterRoles: true, + common.ClusterRoleBindings: true, + }, + })...) + + tests = append(tests, common.GetTestsOnUninstall(currentVersionDetails, common.TestOptions{ + DevEnabled: true, + UninstallAll: true, + CheckResourceExists: map[common.Resource]bool{ + common.CustomResourceDefs: false, + common.ClusterRoles: false, + common.ClusterRoleBindings: false, + }, + })...) + + // execute tests + for _, tc := range tests { + t.Run(tc.Name, tc.Callable) + } +} + func TestKubernetesNonHAModeMTLSEnabled(t *testing.T) { // ensure clean env for test ensureCleanEnv(t, false)