Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions core/src/main/scala/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import nelson.cleanup.ExpirationPolicy
import nelson.docker.Docker
import nelson.health.KubernetesHealthClient
import nelson.logging.{WorkflowLogger,LoggingOp}
import nelson.notifications.{SlackHttp,SlackOp,EmailOp,EmailServer}
import nelson.notifications.{SlackHttp,SlackOp,EmailOp,EmailServer,WebHookHttp,WebHookOp}
import nelson.scheduler.{KubernetesShell, SchedulerOp}
import nelson.storage.StoreOp
import nelson.vault._
Expand All @@ -39,7 +39,7 @@ import javax.net.ssl.SSLContext

import journal.Logger

import org.http4s.Uri
import org.http4s.{Headers, Uri}
import org.http4s.client.Client
import org.http4s.client.blaze._

Expand Down Expand Up @@ -215,6 +215,16 @@ final case class EmailConfig(
useSSL: Boolean = true
)

final case class WebHookSubscriberConfig(
endpoint: Uri,
headers: Headers,
params: List[(String, String)]
)

final case class WebHookConfig(
subscribers: List[WebHookSubscriberConfig]
)

final case class CacheConfig(
stackStatusCache: Cache[(String,String,String), DeploymentStatus]
)
Expand Down Expand Up @@ -266,7 +276,8 @@ final case class Interpreters(
git: Github.GithubOp ~> IO,
storage: StoreOp ~> IO,
slack: Option[SlackOp ~> IO],
email: Option[EmailOp ~> IO]
email: Option[EmailOp ~> IO],
webhook: Option[WebHookOp ~> IO]
)

/**
Expand Down Expand Up @@ -363,6 +374,8 @@ final case class NelsonConfig(

lazy val email = interpreters.email

lazy val webhook = interpreters.webhook

lazy val auditor = new Auditor(auditQueue, git.systemUsername)

// i've currently assigned these pretty arbitrary values
Expand Down Expand Up @@ -449,6 +462,7 @@ object Config {
httpClient <- http
gitClient = new Github.GithubHttp(gitcfg, httpClient, timeout, pools.defaultExecutor)
slackClient = readSlack(cfg.subconfig("nelson.slack")).map(new SlackHttp(_, httpClient))
webhookClient = Option(new WebHookHttp(httpClient))
} yield {
NelsonConfig(
git = gitcfg,
Expand All @@ -467,7 +481,7 @@ object Config {
template = readTemplate(cfg),
http = httpClient,
pools = pools,
interpreters = Interpreters(gitClient,storage,slackClient,email),
interpreters = Interpreters(gitClient,storage,slackClient,email,webhookClient),
workflowLogger = wflogger,
bannedClients = readBannedClients(cfg.subconfig("nelson.banned-clients")),
ui = readUI(cfg.subconfig("nelson.ui")),
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/notifications/Notification.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
package nelson
package notifications

import org.http4s._

sealed trait NotificationSubscription
final case class SlackSubscription(channel: SlackChannel) extends NotificationSubscription
final case class EmailSubscription(recipient: EmailAddress) extends NotificationSubscription
final case class WebHookSubscription(uri: Uri, headers: Headers, params: List[(String, String)]) extends NotificationSubscription

final case class NotificationSubscriptions(slack: List[SlackSubscription], email: List[EmailSubscription])
final case class NotificationSubscriptions(slack: List[SlackSubscription], email: List[EmailSubscription], webhook: List[WebHookSubscription])

object NotificationSubscriptions {
val empty: NotificationSubscriptions = NotificationSubscriptions(Nil,Nil)
val empty: NotificationSubscriptions = NotificationSubscriptions(Nil,Nil,Nil)
}

130 changes: 130 additions & 0 deletions core/src/main/scala/notifications/NotificationEvent.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package nelson
package notifications

import Manifest._
import argonaut._, Argonaut._

sealed trait NotificationEvent

final case class DeployEvent(
unit: UnitDef,
actionConfig: ActionConfig
) extends NotificationEvent

final case class DecommissionEvent(
d: Datacenter.Deployment
) extends NotificationEvent

object NotificationEvent {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it looks like for webhooks you're sending big deploy and destroy blobs to the endpoint - I assume the intention is for (presumably custom) endpoints to then take that blob and do something useful with it? Put differently, this isn't specific to any specific kind of webhook for some service right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Precisely, the goal is to provide real-time context for events occurring within Nelson to any desired consumer.


def deploy(unit: UnitDef @@ Versioned, actionConfig: ActionConfig): DeployEvent = {
DeployEvent(Versioned.unwrap(unit), actionConfig)
}

def decommission(d: Datacenter.Deployment): DecommissionEvent = DecommissionEvent(d)

/**
* Encodes the final web hook payload by matching on the event class.
*
* The notification model here is influenced heavily by the Github event model, wherein
* a causal event is a name as well as an event class, the latter of which dictates its
* actual structure. This provides greater flexibility for the consumer of the payload
* who can then react contextually, e.g. act on initial deploy but not redeploy.
*
* This behavior is not currently type-encoded but should be if the number of event types is expanded.
*/
implicit def encodeEventDetail: EncodeJson[NotificationEvent] = EncodeJson {
case d: DeployEvent =>
("action" := "deploy") ->:
("deployed" := encodeDeploy(d)) ->:
jEmptyObject
case d: DecommissionEvent =>
("action" := "decommission") ->:
("decommissioned" := encodeDecommission(d)) ->:
jEmptyObject
}

implicit def encodeDeploy: EncodeJson[DeployEvent] = EncodeJson { ev =>

val ns = ev.actionConfig.namespace
val dc = ev.actionConfig.datacenter
val plan = ev.actionConfig.plan
val env = plan.environment
val unit = ev.unit

("namespace" := ns.name.asString) ->:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More a note for the future than a review comment, since we're moving to Protobuf (#159) we may want these as protocols as well, esp. since this is quite a bit of information. But fine for now :)

("datacenter" := dc.name) ->:
("plan" :=
("name" := plan.name) ->:
("schedule" := env.schedule.flatMap(_.toCron())) ->:
("health_checks" := env.healthChecks.map(mkHealthCheck)) ->:
("constraints" := env.constraints.map(_.fieldName)) ->:
("bindings" := env.bindings.map(b => b.name -> b.value)) ->:
("resources" := env.resources.mapValues(_.toString)) ->: jEmptyObject
) ->:
("unit" :=
("name" := unit.name) ->:
("description" := unit.description) ->:
("artifact" := unit.deployable.map(mkDeployable)) ->:
("ports" := unit.ports.map(mkPorts)) ->:
("dependencies" := unit.dependencies.mapValues(_.toString)) ->:
("resources" := unit.resources.map(_.name)) ->: jEmptyObject
) ->:
jEmptyObject
}

implicit def encodeDecommission: EncodeJson[DecommissionEvent] = EncodeJson { ev =>
val deployment = ev.d
val unit = deployment.unit

("namespace" := deployment.namespace.name.asString) ->:
("datacenter" := deployment.namespace.datacenter) ->:
("deployment" :=
("id" := deployment.id) ->:
("hash" := deployment.hash) ->:
("guid" := deployment.guid) ->:
("plan" := deployment.plan) ->:
("stack" := deployment.stackName.toString) ->:
("deploy_time" := deployment.deployTime.toString) ->:
("workflow" := deployment.workflow) ->: jEmptyObject
) ->:
("unit" :=
("name" := unit.name) ->:
("description" := unit.description) ->:
("ports" := unit.ports.map(mkPort)) ->:
("dependencies" := unit.dependencies.map(sn => (sn.serviceType, sn.version.toString))) ->:
("resources" := unit.resources) ->: jEmptyObject
) ->:
jEmptyObject
}

private def mkDeployable(d: Manifest.Deployable) =
("name" := d.name) ->:
("version" := d.version.toString) ->:
("deployable" := (d.output match { case Deployable.Container(i) => i })) ->:
jEmptyObject

private def mkHealthCheck(h: Manifest.HealthCheck) =
("name" := h.name) ->:
("path" := h.path) ->:
("port" := h.portRef) ->:
("protocol" := h.protocol) ->:
("interval" := h.interval.toMillis) ->:
("timeout" := h.timeout.toMillis) ->:
jEmptyObject

private def mkPort(p: Manifest.Port) =
("default" := p.isDefault) ->:
("ref" := p.ref) ->:
("port" := p.port) ->:
("protocol" := p.protocol) ->:
jEmptyObject

private def mkPort(p: Datacenter.Port) =
("ref" := p.name) ->:
("port" := p.port) ->:
("protocol" := p.protocol) ->:
jEmptyObject

private def mkPorts(ps: Ports) = ps.nel.map(mkPort).toList
}
10 changes: 9 additions & 1 deletion core/src/main/scala/notifications/Notify.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ object Notify {
val msg = deployedTemplate(actionConfig.datacenter.name,actionConfig.namespace.name,sn)
val sub = s"Deployed $sn in ${actionConfig.datacenter.name} ${actionConfig.namespace.name.asString}"
sendSlack(actionConfig.notifications.slack.map(_.channel), msg)(cfg.slack) productR
sendEmail(actionConfig.notifications.email.map(_.recipient), sub, msg)(cfg.email)
sendEmail(actionConfig.notifications.email.map(_.recipient), sub, msg)(cfg.email) productR
sendWebHooks(actionConfig.notifications.webhook, NotificationEvent.deploy(unit, actionConfig))(cfg.webhook)
}

def sendDecommissionedNotifications(dc: Datacenter, ns: Namespace, d: Datacenter.Deployment)(cfg: NelsonConfig): IO[Unit] = {
Expand Down Expand Up @@ -78,6 +79,7 @@ object Notify {
n <- fetchNotifications(d)
_ <- sendSlack(n.slack.map(_.channel), msg)(cfg.slack)
_ <- sendEmail(n.email.map(_.recipient), subject, msg)(cfg.email)
_ <- sendWebHooks(n.webhook, NotificationEvent.decommission(d))(cfg.webhook)
} yield ()
}

Expand All @@ -93,6 +95,12 @@ object Notify {
SlackOp.send(cs, msg).foldMap(interp)
}

private def sendWebHooks(ss: List[WebHookSubscription], ev: NotificationEvent)(i: Option[WebHookOp ~> IO]): IO[Unit] =
if (ss.isEmpty) IO.unit
else i.fold(log(s"webhook notification was not sent because webhooks are not configured")) { interp =>
WebHookOp.send(ss, ev).foldMap(interp)
}

private val logger = Logger[Notify.type]

private def log(msg: String): IO[Unit] =
Expand Down
41 changes: 41 additions & 0 deletions core/src/main/scala/notifications/WebHook.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package nelson
package notifications

import cats.~>
import cats.effect.IO
import cats.free.Free
import cats.implicits._

import org.http4s.Method.POST
import org.http4s.Request
import org.http4s.argonaut._
import org.http4s.client.Client

sealed abstract class WebHookOp[A] extends Product with Serializable

object WebHookOp {

type WebHookOpF[A] = Free[WebHookOp, A]

final case class SendWebHookNotification(subscribers: List[WebHookSubscription], ev: NotificationEvent) extends WebHookOp[Unit]

def send(subscribers: List[WebHookSubscription], ev: NotificationEvent): WebHookOpF[Unit] =
Free.liftF(SendWebHookNotification(subscribers, ev))
}

final class WebHookHttp(client: Client[IO]) extends (WebHookOp ~> IO) {
import argonaut._, Argonaut._
import WebHookOp._

def apply[A](op: WebHookOp[A]): IO[A] = op match {
case SendWebHookNotification(subscriptions, ev) =>
subscriptions.traverse_(s => send(s, ev))
}

def send(s: WebHookSubscription, ev: NotificationEvent): IO[Unit] =
client.expect[String](Request[IO](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double checking, does client.expect[String] accept any response, or specifically only strings (e.g. if the webhook sent back a JSON blob will this fail? I seem to vaguely remember some edge cases here but maybe I'm crazy. Is there a way to make this fire-and-forget or something, if that is desirable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great point, this should be fire and forget. We really shouldn't care about the response body, only the response code and that should only drive different logging behaviors imo.

method = POST,
uri = s.params.foldLeft(s.uri)((u, p) => u.withQueryParam(p._1, p._2)),
headers = s.headers)
.withBody(ev.asJson)).void
}
37 changes: 35 additions & 2 deletions core/src/main/scala/yaml/ManifestV1Parser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import cats.implicits._

import java.net.URI
import java.nio.file.Paths
import java.util.{ArrayList => JList}
import java.util.{ArrayList => JList, HashMap => JMap}

import scala.beans.BeanProperty
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -326,7 +326,8 @@ object ManifestV1Parser {
def parseNotifications(raw: NotificationYaml): YamlValidation[NotificationSubscriptions] = {
val slack = parseSlackNotifications(raw.slack).map(_.map(SlackSubscription(_)))
val email = parseEmailNotifications(raw.email).map(_.map(EmailSubscription(_)))
(slack, email).mapN(NotificationSubscriptions.apply)
val webhook = parseWebHookNotifications(raw.webhook).map(_.map(sc => WebHookSubscription(sc.endpoint, sc.headers, sc.params)))
(slack, email, webhook).mapN(NotificationSubscriptions.apply)
}

private[this] val validEmail = """(?i)\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,4}\b""".r
Expand All @@ -341,6 +342,27 @@ object ManifestV1Parser {
def parseSlackNotifications(raw: NotificationSlackYaml): YamlValidation[List[String]] =
raw.channels.asScala.toList.validNel

def parseWebHookConfiguration(raw: WebHookConfigurationYaml): YamlValidation[WebHookSubscriberConfig] = {
import org.http4s._

def parseWebHookUri(str: String): YamlValidation[Uri] =
Uri.fromString(str).leftMap(_ => invalidURI(str)).toValidatedNel

def parseHeaders(hs: collection.mutable.Map[String, String]): YamlValidation[Headers] =
Headers(hs.map(h => Header(h._1, h._2)).toList).validNel

def parseParams(ps: collection.mutable.Map[String, String]): YamlValidation[List[(String, String)]] =
ps.toList.validNel

(parseWebHookUri(raw.uri),
parseHeaders(raw.headers.asScala),
parseParams(raw.params.asScala))
.mapN((u, hs, ps) => WebHookSubscriberConfig(u, hs, ps))
}

def parseWebHookNotifications(raw: NotificationWebHookYaml): YamlValidation[List[WebHookSubscriberConfig]] =
raw.subscribers.asScala.toList.traverse(parseWebHookConfiguration)

def parseAlerting(unitName: UnitName, rawAlerting: AlertingYaml): YamlValidation[Alerting] =
parsePrometheusAlerting(unitName, rawAlerting.prometheus).map(Alerting.apply)

Expand Down Expand Up @@ -634,6 +656,7 @@ class PrometheusRuleYaml {
class NotificationYaml {
@BeanProperty var slack: NotificationSlackYaml = new NotificationSlackYaml
@BeanProperty var email: NotificationEmailYaml = new NotificationEmailYaml
@BeanProperty var webhook: NotificationWebHookYaml = new NotificationWebHookYaml
}

class NotificationSlackYaml {
Expand All @@ -643,3 +666,13 @@ class NotificationSlackYaml {
class NotificationEmailYaml {
@BeanProperty var recipients: JList[String] = new java.util.ArrayList
}

class WebHookConfigurationYaml {
@BeanProperty var uri: String = _
@BeanProperty var headers: JMap[String, String] = new java.util.HashMap
@BeanProperty var params: JMap[String, String] = new java.util.HashMap
}

class NotificationWebHookYaml {
@BeanProperty var subscribers: JList[WebHookConfigurationYaml] = new java.util.ArrayList
}
3 changes: 3 additions & 0 deletions core/src/test/resources/nelson/manifest.v1.everything.yml
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,6 @@ notifications:
channels:
- development
- general
webhook:
subscribers:
- uri: https://localhost:80/
Loading