Skip to content

Commit

Permalink
Migrate to cats effect fs2 3 (#179)
Browse files Browse the repository at this point in the history
* Converted project to use cats-effect 3.x and fs2 3.x.
It may be worth looking into tightening our library dependency on cats-effect-kernel instead of the whole library.

NOTE: tests are broken and will need to be fixed.

* Added in weaver-test. Migrated test files to cats-effect/fs2 3.x. Made TestUtils.readFileFromResource use a Stream instead of reading with IO and a Resource.

* Fixed issue with DiscordClient and `expect` swallowing response bodies. `withHeaders` was replacing all of the headers and Discord was requiring a `Content-Type` header.

* Fixed IntelliJ import

* Added in integration test basis. Could definitely be improved and is a little slow...

* Removed my now invalidated test bot token...

* Added in environment variable for dissonance IT bot.

* Changed the IT channel to be on the main Dissonance Discord server.

* Alphabetized dependencies.

* Reformat for re-build.
  • Loading branch information
hogiyogi597 authored Apr 22, 2021
1 parent 00179ad commit e567f07
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 132 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ jobs:
if: matrix.scala == '2.13.5'
run: sbt ++${{ matrix.scala }} scalafmtCheckAll
- name: Run tests
env:
DISSONANCE_IT_TOKEN: ${{secrets.DISSONANCE_IT_TOKEN}}
run: sbt ++${{ matrix.scala }} clean coverage test
2 changes: 2 additions & 0 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ jobs:
with:
java-version: 11
- name: Generate code coverage
env:
DISSONANCE_IT_TOKEN: ${{secrets.DISSONANCE_IT_TOKEN}}
run: sbt ++2.13.5 'project core' clean coverage test coverageReport
- name: Upload coverage
uses: codecov/codecov-action@v1
8 changes: 5 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Dependencies._

lazy val dissonance = project.in(file("."))
lazy val dissonance = project
.in(file("."))
.settings(commonSettings, releaseSettings, publish / skip := true)
.aggregate(core)

Expand All @@ -13,6 +14,7 @@ lazy val core = project
fork := true, // Fork to separate process
connectInput := true, // Connects stdin to sbt during forked runs
outputStrategy := Some(StdoutOutput), // Get rid of output prefix
testFrameworks += new TestFramework("weaver.framework.CatsEffect")
)

lazy val docs = project
Expand All @@ -38,7 +40,7 @@ lazy val commonSettings = Seq(
case _ => Nil
}) ++ dependencies,
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1"),
addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.11.3" cross CrossVersion.full),
addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.11.3" cross CrossVersion.full)
)

lazy val releaseSettings = Seq(
Expand All @@ -61,4 +63,4 @@ lazy val releaseSettings = Seq(
)
)

Global / onChangedBuildSource := ReloadOnSourceChanges
Global / onChangedBuildSource := ReloadOnSourceChanges
22 changes: 13 additions & 9 deletions core/src/main/scala/dissonance/Discord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ package dissonance
import java.io.IOException

import cats.Applicative
import cats.effect.std.Queue
import cats.effect._
import cats.effect.concurrent._
import cats.syntax.all._
import dissonance.Discord._
import dissonance.data.ControlMessage._
import dissonance.data._
import dissonance.data.events.Ready
import dissonance.utils._
import fs2.Stream
import fs2.concurrent.Queue
import io.circe.Json
import io.circe.parser._
import io.circe.syntax._
Expand All @@ -24,10 +23,11 @@ import org.http4s.client.jdkhttpclient._
import org.http4s.headers._
import org.http4s.implicits._
import org.http4s.{headers => _, _}
import org.typelevel.ci.CIString

import scala.concurrent.duration._

class Discord[F[_]: Concurrent](token: String, val httpClient: Client[F], wsClient: WSClient[F])(implicit cs: ContextShift[F], t: Timer[F]) {
class Discord[F[_]: Async](token: String, val httpClient: Client[F], wsClient: WSClient[F])(implicit t: Temporal[F]) {
type SequenceNumber = Ref[F, Option[Int]]
type SessionId = Ref[F, Option[String]]
type Acks = Queue[F, Unit]
Expand Down Expand Up @@ -103,7 +103,7 @@ class Discord[F[_]: Concurrent](token: String, val httpClient: Client[F], wsClie
case Hello(intervalDuration) =>
interval.complete(intervalDuration) >> identifyOrResume(state.sessionId, state.sequenceNumber, shard, intents).flatMap(connection.send).as(Result(None))
case HeartBeatAck =>
state.acks.enqueue1(()).as(Result(None))
state.acks.offer(()).as(Result(None))
case Heartbeat(d) =>
putStrLn(s"Heartbeat received: $d").as(Result(None))
case Reconnect =>
Expand All @@ -121,7 +121,7 @@ class Discord[F[_]: Concurrent](token: String, val httpClient: Client[F], wsClie
}

private def connection(uri: Uri): Stream[F, WSConnectionHighLevel[F]] =
Stream.resource(wsClient.connectHighLevel(WSRequest(uri, Headers.of(headers(token)))))
Stream.resource(wsClient.connectHighLevel(WSRequest(uri, Headers(headers(token)))))

private def heartbeatInterval: Stream[F, HeartbeatInterval] =
Stream.eval(Deferred[F, FiniteDuration])
Expand All @@ -138,7 +138,7 @@ class Discord[F[_]: Concurrent](token: String, val httpClient: Client[F], wsClie
val heartbeats = Stream.eval(sendHeartbeat) ++ Stream.repeatEval(sendHeartbeat).metered(interval)

// TODO: Something besides true, false
(heartbeats.as(true) merge acks.dequeue.as(false)).zipWithPrevious.flatMap {
(heartbeats.as(true) merge Stream.emit(acks.take.as(false))).zipWithPrevious.flatMap {
case (Some(true), true) => Stream.raiseError[F](Errors.NoHeartbeatAck)
case _ => Stream.emit(())
}
Expand All @@ -162,11 +162,15 @@ class Discord[F[_]: Concurrent](token: String, val httpClient: Client[F], wsClie
}

object Discord {
def make[F[_]: ConcurrentEffect](token: String)(implicit cs: ContextShift[F], t: Timer[F]): Resource[F, Discord[F]] =
Resource.eval(utils.javaClient.map(javaClient => new Discord(token, JdkHttpClient[F](javaClient), JdkWSClient[F](javaClient))))
def make[F[_]: Async](token: String): Resource[F, Discord[F]] =
for {
javaClient <- Resource.eval(utils.javaClient)
javaHttpClient <- JdkHttpClient[F](javaClient)
javaWsClient <- JdkWSClient[F](javaClient)
} yield new Discord(token, javaHttpClient, javaWsClient)

val apiEndpoint = uri"https://discordapp.com/api/v8"
def headers(token: String): Authorization = Authorization(Credentials.Token("Bot".ci, token))
def headers(token: String): Authorization = Authorization(Credentials.Token(CIString("Bot"), token))

sealed trait EventResult extends Product with Serializable { val terminate: Boolean }
case class Result(event: Option[Event]) extends EventResult { val terminate = false }
Expand Down
87 changes: 48 additions & 39 deletions core/src/main/scala/dissonance/DiscordClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import org.http4s.client.jdkhttpclient.JdkHttpClient
import org.http4s.multipart.{Multipart, Part}
import org.http4s.{Request, Status, Uri}

class DiscordClient[F[_]: Sync](token: String, client: Client[F])(implicit cs: ContextShift[F]) {
class DiscordClient[F[_]: Async](token: String, client: Client[F]) {

def sendMessage(message: String, channelId: Snowflake, tts: Boolean = false): F[Message] =
client
.expect[Message](
.fetchAs[Message](
Request[F]()
.withMethod(POST)
.withUri(apiEndpoint.addPath(s"channels/$channelId/messages"))
Expand All @@ -35,7 +35,7 @@ class DiscordClient[F[_]: Sync](token: String, client: Client[F])(implicit cs: C
"tts" -> tts.asJson
)
)
.withHeaders(headers(token))
.putHeaders(headers(token))
)

def deleteMessage(channelId: Snowflake, messageId: Snowflake): F[Unit] =
Expand All @@ -44,21 +44,21 @@ class DiscordClient[F[_]: Sync](token: String, client: Client[F])(implicit cs: C
Request[F]()
.withMethod(DELETE)
.withUri(apiEndpoint.addPath(s"channels/$channelId/messages/$messageId"))
.withHeaders(headers(token))
.putHeaders(headers(token))
)
.handleErrorWith(_ => Applicative[F].unit) // Throws: java.io.IOException: unexpected content length header with 204 response

def sendEmbed(embed: Embed, channelId: Snowflake): F[Message] =
client
.expect[Message](
.fetchAs[Message](
Request[F]()
.withMethod(POST)
.withUri(apiEndpoint.addPath(s"channels/$channelId/messages"))
.withEntity(
// TODO Case class here
Json.obj("embed" -> embed.asJson)
)
.withHeaders(headers(token))
.putHeaders(headers(token))
)

def sendInteractionResponse(interactionResponse: InteractionResponse, interactionId: Snowflake, interactionToken: String): F[Unit] =
Expand All @@ -71,32 +71,32 @@ class DiscordClient[F[_]: Sync](token: String, client: Client[F])(implicit cs: C
)
.handleErrorWith(_ => Applicative[F].unit) // Throws: java.io.IOException: unexpected content length header with 204 response

def sendEmbedWithFileImage(embed: Embed, file: File, channelId: Snowflake, blocker: Blocker): F[Message] = {
def sendEmbedWithFileImage(embed: Embed, file: File, channelId: Snowflake): F[Message] = {
val multipart = Multipart[F](
Vector(
Part.fileData[F]("file", file, blocker),
Part.fileData[F]("file", file),
Part.formData("payload_json", Json.obj("embed" -> embed.withImage(Image(Some(Uri.unsafeFromString(s"attachment://${file.getName}")), None, None, None)).asJson).noSpaces)
)
)
client
.expect[Message](
.fetchAs[Message](
Request[F]()
.withMethod(POST)
.withUri(apiEndpoint.addPath(s"channels/$channelId/messages"))
.withEntity(multipart)
.withHeaders(headers(token) :: multipart.headers.toList: _*)
.putHeaders(multipart.headers.headers, headers(token))
)
}

def sendFile(file: File, channelId: Snowflake, blocker: Blocker): F[Message] = {
val multipart = Multipart[F](Vector(Part.fileData[F]("file", file, blocker)))
def sendFile(file: File, channelId: Snowflake): F[Message] = {
val multipart = Multipart[F](Vector(Part.fileData[F]("file", file)))
client
.expect[Message](
.fetchAs[Message](
Request[F]()
.withMethod(POST)
.withUri(apiEndpoint.addPath(s"channels/$channelId/messages"))
.withEntity(multipart)
.withHeaders(headers(token) :: multipart.headers.toList: _*)
.putHeaders(multipart.headers.headers, headers(token))
)
}

Expand All @@ -106,14 +106,14 @@ class DiscordClient[F[_]: Sync](token: String, client: Client[F])(implicit cs: C
Request[F]()
.withMethod(PUT)
.withUri(apiEndpoint.addPath(s"channels/$channelId/messages/$messageId/reactions/$emoji/@me"))
.withHeaders(headers(token))
.putHeaders(headers(token))
)
.handleErrorWith(_ => Applicative[F].unit) // Throws: java.io.IOException: unexpected content length header with 204 response

def addEmoji(guildId: Snowflake, name: String, emojiData: Array[Byte], roles: List[Snowflake] = Nil): F[Emoji] = {
val imageData = "data:;base64," + fs2.Stream.emits(emojiData).through(fs2.text.base64.encode).compile.foldMonoid
client
.expect[Emoji](
.fetchAs[Emoji](
Request[F]()
.withMethod(POST)
.withUri(apiEndpoint.addPath(s"guilds/$guildId/emojis"))
Expand All @@ -125,31 +125,31 @@ class DiscordClient[F[_]: Sync](token: String, client: Client[F])(implicit cs: C
"roles" -> roles.asJson
)
)
.withHeaders(headers(token))
.putHeaders(headers(token))
)
}

def listEmojis(guildId: Snowflake): F[List[Emoji]] =
client
.expect[List[Emoji]](
.fetchAs[List[Emoji]](
Request[F]()
.withMethod(GET)
.withUri(apiEndpoint.addPath(s"guilds/$guildId/emojis"))
.withHeaders(headers(token))
.putHeaders(headers(token))
)

def getChannelMessage(channelId: Snowflake, messageId: Snowflake): F[Message] =
client
.expect[Message](
.fetchAs[Message](
Request[F]()
.withMethod(GET)
.withUri(apiEndpoint.addPath(s"channels/$channelId/messages/$messageId"))
.withHeaders(headers(token))
.putHeaders(headers(token))
)

def createWebhook(name: String, avatar: Option[ImageDataUri], channelId: Snowflake): F[Webhook] =
client
.expect[Webhook](
.fetchAs[Webhook](
Request[F]()
.withMethod(POST)
.withUri(apiEndpoint.addPath(s"channels/$channelId/webhooks"))
Expand All @@ -159,47 +159,47 @@ class DiscordClient[F[_]: Sync](token: String, client: Client[F])(implicit cs: C
"avatar" -> avatar.asJson
)
)
.withHeaders(headers(token))
.putHeaders(headers(token))
)

def getChannelWebhooks(channelId: Snowflake): F[List[Webhook]] =
client
.expect[List[Webhook]](
.fetchAs[List[Webhook]](
Request[F]()
.withMethod(GET)
.withUri(apiEndpoint.addPath(s"channels/$channelId/webhooks"))
.withHeaders(headers(token))
.putHeaders(headers(token))
)

def getGuildWebhooks(guildId: Snowflake): F[List[Webhook]] =
client
.expect[List[Webhook]](
.fetchAs[List[Webhook]](
Request[F]()
.withMethod(GET)
.withUri(apiEndpoint.addPath(s"guilds/$guildId/webhooks"))
.withHeaders(headers(token))
.putHeaders(headers(token))
)

def getWebhook(webhookId: Snowflake): F[Webhook] =
client
.expect[Webhook](
.fetchAs[Webhook](
Request[F]()
.withMethod(GET)
.withUri(apiEndpoint.addPath(s"webhooks/$webhookId"))
.withHeaders(headers(token))
.putHeaders(headers(token))
)

def getWebhookWithToken(webhookId: Snowflake, token: String): F[Webhook] =
client
.expect[Webhook](
.fetchAs[Webhook](
Request[F]()
.withMethod(GET)
.withUri(apiEndpoint.addPath(s"webhooks/$webhookId/$token"))
)

def modifyWebhook(webhookId: Snowflake, name: Option[String], avatar: Option[ImageDataUri], channelId: Option[Snowflake]): F[Webhook] =
client
.expect[Webhook](
.fetchAs[Webhook](
Request[F]()
.withMethod(PATCH)
.withUri(apiEndpoint.addPath(s"webhooks/$webhookId"))
Expand All @@ -211,12 +211,12 @@ class DiscordClient[F[_]: Sync](token: String, client: Client[F])(implicit cs: C
"channel_id" -> channelId.asJson
)
)
.withHeaders(headers(token))
.putHeaders(headers(token))
)

def modifyWebhookWithToken(webhookId: Snowflake, name: Option[String], avatar: Option[ImageDataUri], channelId: Option[Snowflake], token: String): F[Webhook] =
client
.expect[Webhook](
.fetchAs[Webhook](
Request[F]()
.withMethod(PATCH)
.withUri(apiEndpoint.addPath(s"webhooks/$webhookId/$token"))
Expand All @@ -236,7 +236,7 @@ class DiscordClient[F[_]: Sync](token: String, client: Client[F])(implicit cs: C
Request[F]()
.withMethod(DELETE)
.withUri(apiEndpoint.addPath(s"webhooks/$webhookId"))
.withHeaders(headers(token))
.putHeaders(headers(token))
)

def deleteWebhookWithToken(webhookId: Snowflake, token: String): F[Status] =
Expand All @@ -248,10 +248,16 @@ class DiscordClient[F[_]: Sync](token: String, client: Client[F])(implicit cs: C
)

def executeWebhookWithResponse(webhook: Webhook, webhookMessage: WebhookMessage): F[Message] =
client.expect[Message](createExecuteWebhookRequest(webhook, webhookMessage, wait = true))
client
.fetchAs[Message](
createExecuteWebhookRequest(webhook, webhookMessage, wait = true)
)

def executeWebhook(webhook: Webhook, webhookMessage: WebhookMessage): F[Status] =
client.status(createExecuteWebhookRequest(webhook, webhookMessage, wait = false))
client
.status(
createExecuteWebhookRequest(webhook, webhookMessage, wait = false)
)

// TODO: Handle uploading files which requires multipart/form-data
private def createExecuteWebhookRequest(webhook: Webhook, webhookMessage: WebhookMessage, wait: Boolean): Request[F] =
Expand All @@ -263,14 +269,17 @@ class DiscordClient[F[_]: Sync](token: String, client: Client[F])(implicit cs: C
.withQueryParam("wait", wait)
)
.withEntity(webhookMessage)
.withHeaders(headers(token))
.putHeaders(headers(token))

// TODO: Add Slack and Github Webhooks
}

object DiscordClient {
def make[F[_]: ConcurrentEffect](token: String)(implicit cs: ContextShift[F]): Resource[F, DiscordClient[F]] =
Resource.eval(utils.javaClient.map(javaClient => new DiscordClient(token, JdkHttpClient[F](javaClient))))
def make[F[_]: Async](token: String): Resource[F, DiscordClient[F]] =
for {
javaHttpClient <- Resource.eval(utils.javaClient)
javaClient <- JdkHttpClient[F](javaHttpClient)
} yield new DiscordClient(token, javaClient)

type AllowedMentions = Unit // TODO: Implement this

Expand Down
Loading

0 comments on commit e567f07

Please sign in to comment.