From 3a6ef5a6ab633ba4bf80c40e96ce1f6ac31a51db Mon Sep 17 00:00:00 2001 From: Cory Parent Date: Fri, 7 Dec 2018 23:06:36 -0500 Subject: [PATCH] add webhook notification --- core/src/main/scala/Config.scala | 22 ++- .../scala/notifications/Notification.scala | 7 +- .../notifications/NotificationEvent.scala | 130 ++++++++++++++++++ .../src/main/scala/notifications/Notify.scala | 10 +- .../main/scala/notifications/WebHook.scala | 41 ++++++ .../main/scala/yaml/ManifestV1Parser.scala | 37 ++++- .../nelson/manifest.v1.everything.yml | 3 + core/src/test/scala/Fixtures.scala | 29 +++- core/src/test/scala/NelsonSuite.scala | 12 +- .../test/scala/yaml/ManifestYamlSpec.scala | 16 ++- .../hugo/content/documentation/manifest.md | 13 +- 11 files changed, 299 insertions(+), 21 deletions(-) create mode 100644 core/src/main/scala/notifications/NotificationEvent.scala create mode 100644 core/src/main/scala/notifications/WebHook.scala diff --git a/core/src/main/scala/Config.scala b/core/src/main/scala/Config.scala index 78a2497d..7f6cc1fe 100644 --- a/core/src/main/scala/Config.scala +++ b/core/src/main/scala/Config.scala @@ -23,7 +23,7 @@ import nelson.cleanup.ExpirationPolicy import nelson.docker.Docker import nelson.health.KubernetesHealthClient import nelson.logging.{WorkflowLogger,LoggingOp} -import nelson.notifications.{SlackHttp,SlackOp,EmailOp,EmailServer} +import nelson.notifications.{SlackHttp,SlackOp,EmailOp,EmailServer,WebHookHttp,WebHookOp} import nelson.scheduler.{KubernetesShell, SchedulerOp} import nelson.storage.StoreOp import nelson.vault._ @@ -39,7 +39,7 @@ import javax.net.ssl.SSLContext import journal.Logger -import org.http4s.Uri +import org.http4s.{Headers, Uri} import org.http4s.client.Client import org.http4s.client.blaze._ @@ -215,6 +215,16 @@ final case class EmailConfig( useSSL: Boolean = true ) +final case class WebHookSubscriberConfig( + endpoint: Uri, + headers: Headers, + params: List[(String, String)] +) + +final case class WebHookConfig( + subscribers: List[WebHookSubscriberConfig] +) + final case class CacheConfig( stackStatusCache: Cache[(String,String,String), DeploymentStatus] ) @@ -266,7 +276,8 @@ final case class Interpreters( git: Github.GithubOp ~> IO, storage: StoreOp ~> IO, slack: Option[SlackOp ~> IO], - email: Option[EmailOp ~> IO] + email: Option[EmailOp ~> IO], + webhook: Option[WebHookOp ~> IO] ) /** @@ -363,6 +374,8 @@ final case class NelsonConfig( lazy val email = interpreters.email + lazy val webhook = interpreters.webhook + lazy val auditor = new Auditor(auditQueue, git.systemUsername) // i've currently assigned these pretty arbitrary values @@ -449,6 +462,7 @@ object Config { httpClient <- http gitClient = new Github.GithubHttp(gitcfg, httpClient, timeout, pools.defaultExecutor) slackClient = readSlack(cfg.subconfig("nelson.slack")).map(new SlackHttp(_, httpClient)) + webhookClient = Option(new WebHookHttp(httpClient)) } yield { NelsonConfig( git = gitcfg, @@ -467,7 +481,7 @@ object Config { template = readTemplate(cfg), http = httpClient, pools = pools, - interpreters = Interpreters(gitClient,storage,slackClient,email), + interpreters = Interpreters(gitClient,storage,slackClient,email,webhookClient), workflowLogger = wflogger, bannedClients = readBannedClients(cfg.subconfig("nelson.banned-clients")), ui = readUI(cfg.subconfig("nelson.ui")), diff --git a/core/src/main/scala/notifications/Notification.scala b/core/src/main/scala/notifications/Notification.scala index 7784fbcc..c5592a01 100644 --- a/core/src/main/scala/notifications/Notification.scala +++ b/core/src/main/scala/notifications/Notification.scala @@ -17,14 +17,15 @@ package nelson package notifications +import org.http4s._ sealed trait NotificationSubscription final case class SlackSubscription(channel: SlackChannel) extends NotificationSubscription final case class EmailSubscription(recipient: EmailAddress) extends NotificationSubscription +final case class WebHookSubscription(uri: Uri, headers: Headers, params: List[(String, String)]) extends NotificationSubscription -final case class NotificationSubscriptions(slack: List[SlackSubscription], email: List[EmailSubscription]) +final case class NotificationSubscriptions(slack: List[SlackSubscription], email: List[EmailSubscription], webhook: List[WebHookSubscription]) object NotificationSubscriptions { - val empty: NotificationSubscriptions = NotificationSubscriptions(Nil,Nil) + val empty: NotificationSubscriptions = NotificationSubscriptions(Nil,Nil,Nil) } - diff --git a/core/src/main/scala/notifications/NotificationEvent.scala b/core/src/main/scala/notifications/NotificationEvent.scala new file mode 100644 index 00000000..1d68b059 --- /dev/null +++ b/core/src/main/scala/notifications/NotificationEvent.scala @@ -0,0 +1,130 @@ +package nelson +package notifications + +import Manifest._ +import argonaut._, Argonaut._ + +sealed trait NotificationEvent + +final case class DeployEvent( + unit: UnitDef, + actionConfig: ActionConfig +) extends NotificationEvent + +final case class DecommissionEvent( + d: Datacenter.Deployment +) extends NotificationEvent + +object NotificationEvent { + + def deploy(unit: UnitDef @@ Versioned, actionConfig: ActionConfig): DeployEvent = { + DeployEvent(Versioned.unwrap(unit), actionConfig) + } + + def decommission(d: Datacenter.Deployment): DecommissionEvent = DecommissionEvent(d) + + /** + * Encodes the final web hook payload by matching on the event class. + * + * The notification model here is influenced heavily by the Github event model, wherein + * a causal event is a name as well as an event class, the latter of which dictates its + * actual structure. This provides greater flexibility for the consumer of the payload + * who can then react contextually, e.g. act on initial deploy but not redeploy. + * + * This behavior is not currently type-encoded but should be if the number of event types is expanded. + */ + implicit def encodeEventDetail: EncodeJson[NotificationEvent] = EncodeJson { + case d: DeployEvent => + ("action" := "deploy") ->: + ("deployed" := encodeDeploy(d)) ->: + jEmptyObject + case d: DecommissionEvent => + ("action" := "decommission") ->: + ("decommissioned" := encodeDecommission(d)) ->: + jEmptyObject + } + + implicit def encodeDeploy: EncodeJson[DeployEvent] = EncodeJson { ev => + + val ns = ev.actionConfig.namespace + val dc = ev.actionConfig.datacenter + val plan = ev.actionConfig.plan + val env = plan.environment + val unit = ev.unit + + ("namespace" := ns.name.asString) ->: + ("datacenter" := dc.name) ->: + ("plan" := + ("name" := plan.name) ->: + ("schedule" := env.schedule.flatMap(_.toCron())) ->: + ("health_checks" := env.healthChecks.map(mkHealthCheck)) ->: + ("constraints" := env.constraints.map(_.fieldName)) ->: + ("bindings" := env.bindings.map(b => b.name -> b.value)) ->: + ("resources" := env.resources.mapValues(_.toString)) ->: jEmptyObject + ) ->: + ("unit" := + ("name" := unit.name) ->: + ("description" := unit.description) ->: + ("artifact" := unit.deployable.map(mkDeployable)) ->: + ("ports" := unit.ports.map(mkPorts)) ->: + ("dependencies" := unit.dependencies.mapValues(_.toString)) ->: + ("resources" := unit.resources.map(_.name)) ->: jEmptyObject + ) ->: + jEmptyObject + } + + implicit def encodeDecommission: EncodeJson[DecommissionEvent] = EncodeJson { ev => + val deployment = ev.d + val unit = deployment.unit + + ("namespace" := deployment.namespace.name.asString) ->: + ("datacenter" := deployment.namespace.datacenter) ->: + ("deployment" := + ("id" := deployment.id) ->: + ("hash" := deployment.hash) ->: + ("guid" := deployment.guid) ->: + ("plan" := deployment.plan) ->: + ("stack" := deployment.stackName.toString) ->: + ("deploy_time" := deployment.deployTime.toString) ->: + ("workflow" := deployment.workflow) ->: jEmptyObject + ) ->: + ("unit" := + ("name" := unit.name) ->: + ("description" := unit.description) ->: + ("ports" := unit.ports.map(mkPort)) ->: + ("dependencies" := unit.dependencies.map(sn => (sn.serviceType, sn.version.toString))) ->: + ("resources" := unit.resources) ->: jEmptyObject + ) ->: + jEmptyObject + } + + private def mkDeployable(d: Manifest.Deployable) = + ("name" := d.name) ->: + ("version" := d.version.toString) ->: + ("deployable" := (d.output match { case Deployable.Container(i) => i })) ->: + jEmptyObject + + private def mkHealthCheck(h: Manifest.HealthCheck) = + ("name" := h.name) ->: + ("path" := h.path) ->: + ("port" := h.portRef) ->: + ("protocol" := h.protocol) ->: + ("interval" := h.interval.toMillis) ->: + ("timeout" := h.timeout.toMillis) ->: + jEmptyObject + + private def mkPort(p: Manifest.Port) = + ("default" := p.isDefault) ->: + ("ref" := p.ref) ->: + ("port" := p.port) ->: + ("protocol" := p.protocol) ->: + jEmptyObject + + private def mkPort(p: Datacenter.Port) = + ("ref" := p.name) ->: + ("port" := p.port) ->: + ("protocol" := p.protocol) ->: + jEmptyObject + + private def mkPorts(ps: Ports) = ps.nel.map(mkPort).toList +} diff --git a/core/src/main/scala/notifications/Notify.scala b/core/src/main/scala/notifications/Notify.scala index 275e472d..7a6891c4 100644 --- a/core/src/main/scala/notifications/Notify.scala +++ b/core/src/main/scala/notifications/Notify.scala @@ -48,7 +48,8 @@ object Notify { val msg = deployedTemplate(actionConfig.datacenter.name,actionConfig.namespace.name,sn) val sub = s"Deployed $sn in ${actionConfig.datacenter.name} ${actionConfig.namespace.name.asString}" sendSlack(actionConfig.notifications.slack.map(_.channel), msg)(cfg.slack) productR - sendEmail(actionConfig.notifications.email.map(_.recipient), sub, msg)(cfg.email) + sendEmail(actionConfig.notifications.email.map(_.recipient), sub, msg)(cfg.email) productR + sendWebHooks(actionConfig.notifications.webhook, NotificationEvent.deploy(unit, actionConfig))(cfg.webhook) } def sendDecommissionedNotifications(dc: Datacenter, ns: Namespace, d: Datacenter.Deployment)(cfg: NelsonConfig): IO[Unit] = { @@ -78,6 +79,7 @@ object Notify { n <- fetchNotifications(d) _ <- sendSlack(n.slack.map(_.channel), msg)(cfg.slack) _ <- sendEmail(n.email.map(_.recipient), subject, msg)(cfg.email) + _ <- sendWebHooks(n.webhook, NotificationEvent.decommission(d))(cfg.webhook) } yield () } @@ -93,6 +95,12 @@ object Notify { SlackOp.send(cs, msg).foldMap(interp) } + private def sendWebHooks(ss: List[WebHookSubscription], ev: NotificationEvent)(i: Option[WebHookOp ~> IO]): IO[Unit] = + if (ss.isEmpty) IO.unit + else i.fold(log(s"webhook notification was not sent because webhooks are not configured")) { interp => + WebHookOp.send(ss, ev).foldMap(interp) + } + private val logger = Logger[Notify.type] private def log(msg: String): IO[Unit] = diff --git a/core/src/main/scala/notifications/WebHook.scala b/core/src/main/scala/notifications/WebHook.scala new file mode 100644 index 00000000..1cd36e26 --- /dev/null +++ b/core/src/main/scala/notifications/WebHook.scala @@ -0,0 +1,41 @@ +package nelson +package notifications + +import cats.~> +import cats.effect.IO +import cats.free.Free +import cats.implicits._ + +import org.http4s.Method.POST +import org.http4s.Request +import org.http4s.argonaut._ +import org.http4s.client.Client + +sealed abstract class WebHookOp[A] extends Product with Serializable + +object WebHookOp { + + type WebHookOpF[A] = Free[WebHookOp, A] + + final case class SendWebHookNotification(subscribers: List[WebHookSubscription], ev: NotificationEvent) extends WebHookOp[Unit] + + def send(subscribers: List[WebHookSubscription], ev: NotificationEvent): WebHookOpF[Unit] = + Free.liftF(SendWebHookNotification(subscribers, ev)) +} + +final class WebHookHttp(client: Client[IO]) extends (WebHookOp ~> IO) { + import argonaut._, Argonaut._ + import WebHookOp._ + + def apply[A](op: WebHookOp[A]): IO[A] = op match { + case SendWebHookNotification(subscriptions, ev) => + subscriptions.traverse_(s => send(s, ev)) + } + + def send(s: WebHookSubscription, ev: NotificationEvent): IO[Unit] = + client.expect[String](Request[IO]( + method = POST, + uri = s.params.foldLeft(s.uri)((u, p) => u.withQueryParam(p._1, p._2)), + headers = s.headers) + .withBody(ev.asJson)).void +} diff --git a/core/src/main/scala/yaml/ManifestV1Parser.scala b/core/src/main/scala/yaml/ManifestV1Parser.scala index e2861a01..06cacad5 100644 --- a/core/src/main/scala/yaml/ManifestV1Parser.scala +++ b/core/src/main/scala/yaml/ManifestV1Parser.scala @@ -30,7 +30,7 @@ import cats.implicits._ import java.net.URI import java.nio.file.Paths -import java.util.{ArrayList => JList} +import java.util.{ArrayList => JList, HashMap => JMap} import scala.beans.BeanProperty import scala.collection.JavaConverters._ @@ -326,7 +326,8 @@ object ManifestV1Parser { def parseNotifications(raw: NotificationYaml): YamlValidation[NotificationSubscriptions] = { val slack = parseSlackNotifications(raw.slack).map(_.map(SlackSubscription(_))) val email = parseEmailNotifications(raw.email).map(_.map(EmailSubscription(_))) - (slack, email).mapN(NotificationSubscriptions.apply) + val webhook = parseWebHookNotifications(raw.webhook).map(_.map(sc => WebHookSubscription(sc.endpoint, sc.headers, sc.params))) + (slack, email, webhook).mapN(NotificationSubscriptions.apply) } private[this] val validEmail = """(?i)\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,4}\b""".r @@ -341,6 +342,27 @@ object ManifestV1Parser { def parseSlackNotifications(raw: NotificationSlackYaml): YamlValidation[List[String]] = raw.channels.asScala.toList.validNel + def parseWebHookConfiguration(raw: WebHookConfigurationYaml): YamlValidation[WebHookSubscriberConfig] = { + import org.http4s._ + + def parseWebHookUri(str: String): YamlValidation[Uri] = + Uri.fromString(str).leftMap(_ => invalidURI(str)).toValidatedNel + + def parseHeaders(hs: collection.mutable.Map[String, String]): YamlValidation[Headers] = + Headers(hs.map(h => Header(h._1, h._2)).toList).validNel + + def parseParams(ps: collection.mutable.Map[String, String]): YamlValidation[List[(String, String)]] = + ps.toList.validNel + + (parseWebHookUri(raw.uri), + parseHeaders(raw.headers.asScala), + parseParams(raw.params.asScala)) + .mapN((u, hs, ps) => WebHookSubscriberConfig(u, hs, ps)) + } + + def parseWebHookNotifications(raw: NotificationWebHookYaml): YamlValidation[List[WebHookSubscriberConfig]] = + raw.subscribers.asScala.toList.traverse(parseWebHookConfiguration) + def parseAlerting(unitName: UnitName, rawAlerting: AlertingYaml): YamlValidation[Alerting] = parsePrometheusAlerting(unitName, rawAlerting.prometheus).map(Alerting.apply) @@ -634,6 +656,7 @@ class PrometheusRuleYaml { class NotificationYaml { @BeanProperty var slack: NotificationSlackYaml = new NotificationSlackYaml @BeanProperty var email: NotificationEmailYaml = new NotificationEmailYaml + @BeanProperty var webhook: NotificationWebHookYaml = new NotificationWebHookYaml } class NotificationSlackYaml { @@ -643,3 +666,13 @@ class NotificationSlackYaml { class NotificationEmailYaml { @BeanProperty var recipients: JList[String] = new java.util.ArrayList } + +class WebHookConfigurationYaml { + @BeanProperty var uri: String = _ + @BeanProperty var headers: JMap[String, String] = new java.util.HashMap + @BeanProperty var params: JMap[String, String] = new java.util.HashMap +} + +class NotificationWebHookYaml { + @BeanProperty var subscribers: JList[WebHookConfigurationYaml] = new java.util.ArrayList +} diff --git a/core/src/test/resources/nelson/manifest.v1.everything.yml b/core/src/test/resources/nelson/manifest.v1.everything.yml index 39037fb2..a14c25e8 100644 --- a/core/src/test/resources/nelson/manifest.v1.everything.yml +++ b/core/src/test/resources/nelson/manifest.v1.everything.yml @@ -197,3 +197,6 @@ notifications: channels: - development - general + webhook: + subscribers: + - uri: https://localhost:80/ diff --git a/core/src/test/scala/Fixtures.scala b/core/src/test/scala/Fixtures.scala index a4d87fb2..c6e88668 100644 --- a/core/src/test/scala/Fixtures.scala +++ b/core/src/test/scala/Fixtures.scala @@ -46,6 +46,7 @@ object Fixtures { implicit lazy val arbGithubRelease: Arbitrary[Github.Release] = Arbitrary(genGithubRelease) implicit lazy val arbSlackSub: Arbitrary[notifications.SlackSubscription] = Arbitrary(genSlackSubscription) implicit lazy val arbEmailSub: Arbitrary[notifications.EmailSubscription] = Arbitrary(genEmailSubscription) + implicit lazy val arbWebHookSub: Arbitrary[notifications.WebHookSubscription] = Arbitrary(genWebHookSubscription) implicit lazy val arbRegex: Arbitrary[Regex] = Arbitrary(genRegex) implicit lazy val arbDeployment: Arbitrary[Datacenter.Deployment] = Arbitrary(genDeployment) implicit lazy val arbTrafficShiftPolicy: Arbitrary[TrafficShiftPolicy] = Arbitrary(genTrafficShiftPolicy) @@ -281,11 +282,33 @@ object Fixtures { a <- arbitrary[String] } yield notifications.EmailSubscription(a) - val getNotifications: Gen[notifications.NotificationSubscriptions] = + val genWebHookSubscription: Gen[notifications.WebHookSubscription] = { + import org.http4s.{Header, Headers, Uri} + + type StringPair = (String, String) + + val genUri: Gen[Option[Uri]] = + for { + s <- Gen.oneOf("http", "https") + h <- Gen.identifier + p <- Gen.choose(80, 1024) + sc <- Gen.choose(0, 10) + ss <- Gen.listOfN(sc, Gen.identifier) + } yield Uri.fromString(s"$s://$h:$p/${ss.mkString("/")}").toOption + + for { + a <- genUri suchThat (_.nonEmpty) + b <- Gen.listOfN(5, arbitrary[StringPair]).map(ps => Headers(ps.map(h => Header(h._1, h._2)))) + c <- Gen.listOfN(5, arbitrary[StringPair]) + } yield notifications.WebHookSubscription(a.get, b, c) + } + + val genNotifications: Gen[notifications.NotificationSubscriptions] = for { a <- arbitrary[List[notifications.SlackSubscription]] b <- arbitrary[List[notifications.EmailSubscription]] - } yield notifications.NotificationSubscriptions(a,b) + c <- arbitrary[List[notifications.WebHookSubscription]] + } yield notifications.NotificationSubscriptions(a,b,c) val genPlan: Gen[Manifest.Plan] = for { @@ -304,7 +327,7 @@ object Fixtures { b <- Gen.listOfN(2, genManifestUnitDef) c <- Gen.listOfN(2, genManifestUnitDef) d <- Gen.listOfN(4, alphaNumStr) - e <- getNotifications + e <- genNotifications f <- genManifestDeployable h <- genPlan i <- Gen.listOfN(2, genLoadbalancer) diff --git a/core/src/test/scala/NelsonSuite.scala b/core/src/test/scala/NelsonSuite.scala index 9d959679..a7842c7c 100644 --- a/core/src/test/scala/NelsonSuite.scala +++ b/core/src/test/scala/NelsonSuite.scala @@ -20,7 +20,7 @@ import cats.~> import cats.effect.IO import cats.implicits._ -import nelson.notifications.{SlackOp,EmailOp} +import nelson.notifications.{SlackOp,EmailOp,WebHookOp} import doobie._ import doobie.implicits._ @@ -123,6 +123,14 @@ trait NelsonSuite } } + lazy val testWebHook: WebHookOp ~> IO = new (WebHookOp ~> IO) { + import WebHookOp._ + def apply[A](op: WebHookOp[A]): IO[A] = op match { + case SendWebHookNotification(subscribers, ev) => + IO.unit + } + } + import docker._ lazy val testDocker = new (DockerOp ~> IO) { def apply[A](op: DockerOp[A]) = op match { @@ -190,7 +198,7 @@ trait NelsonSuite database = dbConfig, // let each suite get its own h2 dockercfg = DockerConfig(sys.env.getOrElse("DOCKER_HOST", "unix:///var/run/docker.sock"), true), datacenters = List(datacenter(testName).copy(interpreters = testInterpreters)), - interpreters = Interpreters(GitFixtures.interpreter,stg,Some(testSlack),Some(testEmail)) + interpreters = Interpreters(GitFixtures.interpreter,stg,Some(testSlack),Some(testEmail),Some(testWebHook)) ) } diff --git a/core/src/test/scala/yaml/ManifestYamlSpec.scala b/core/src/test/scala/yaml/ManifestYamlSpec.scala index b0b0fa5d..6e5c5e64 100644 --- a/core/src/test/scala/yaml/ManifestYamlSpec.scala +++ b/core/src/test/scala/yaml/ManifestYamlSpec.scala @@ -17,12 +17,16 @@ package nelson package yaml -import java.nio.file.Paths +import cats.instances.either._ +import cats.syntax.foldable._ + +import org.http4s.Uri.uri import org.scalatest.{FlatSpec,Matchers} + +import java.nio.file.Paths + import scala.concurrent.duration._ -import cats.instances.either._ -import cats.syntax.foldable._ class ManifestYamlSpec extends FlatSpec with Matchers with SnakeCharmer { import Manifest._ @@ -152,7 +156,11 @@ class ManifestYamlSpec extends FlatSpec with Matchers with SnakeCharmer { )), notifications = NotificationSubscriptions( List(SlackSubscription("development"),SlackSubscription("general")), - List(EmailSubscription("baxter@example.com"))), + List(EmailSubscription("baxter@example.com")), + List(WebHookSubscription( + uri = uri("https://localhost:80/"), + headers = org.http4s.Headers.empty, + params = List.empty))), loadbalancers = List( Loadbalancer("howdy-lb", Vector(Route(Port("default",8444,"http"), BackendDestination("foobar", "default")), diff --git a/docs/src/hugo/content/documentation/manifest.md b/docs/src/hugo/content/documentation/manifest.md index b04977b5..fc426827 100644 --- a/docs/src/hugo/content/documentation/manifest.md +++ b/docs/src/hugo/content/documentation/manifest.md @@ -343,12 +343,12 @@ eventually cleaned up by nelson. No other intervention is needed by the user aft Notifications -Nelson can notify you about your deployment results via slack and/or email. Notifications are sent for a deployment when: +Nelson can notify you about your deployment results via slack, email and/or generic webhook. Notifications are sent for a deployment when: * a deployment has successfully completed or failed * a deployment has been decommissioned -The following is a simple example that configure both email and slack notifications: +The following is a simple example that configures email, slack, and webhook notifications: ``` @@ -361,6 +361,15 @@ notifications: channels: - infrastructure - devops + webhook: + subscribers: + - uri: https://fakehooks.com/_foo + - uri: https://fakehooks.com/_bar + headers: + Authorization: "token 75b413aed31d0f6b964af9735d2b8d0d" + - uri: https://fakehooks.com/_bar + params: + auth_key: 75b413aed31d0f6b964af9735d2b8d0d ```

Units