Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for fs2 2.3.0 and scala 2.13 #44

Open
wants to merge 4 commits into
base: series/0.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
language : scala

scala:
- 2.11.12
- 2.12.6
- 2.12.11
- 2.13.0

dist: trusty

Expand Down
42 changes: 28 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ lazy val contributors = Seq(

lazy val commonSettings = Seq(
organization := "com.spinoco",
scalaVersion := "2.12.6",
crossScalaVersions := Seq("2.11.12", "2.12.6"),
scalaVersion := "2.12.11",
crossScalaVersions := Seq("2.12.11", "2.13.0"),
scalacOptions ++= Seq(
"-feature",
"-deprecation",
Expand All @@ -19,23 +19,37 @@ lazy val commonSettings = Seq(
"-language:existentials",
"-language:postfixOps",
"-Xfatal-warnings",
"-Yno-adapted-args",
"-Ywarn-value-discard",
"-Ywarn-unused-import"
"-Ywarn-value-discard"
),
scalacOptions ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, v)) if v >= 13 =>
Seq("-Ymacro-annotations", "-Ywarn-unused:imports")
case _ =>
Seq("-Yno-adapted-args", "-Ywarn-unused-import")
}
},
scalacOptions in (Compile, console) ~= {_.filterNot("-Ywarn-unused-import" == _)},
scalacOptions in (Test, console) := (scalacOptions in (Compile, console)).value,
libraryDependencies ++= Seq(
compilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full)
, "org.scodec" %% "scodec-bits" % "1.1.4"
, "org.scodec" %% "scodec-core" % "1.10.3"
, "com.spinoco" %% "protocol-http" % "0.3.15"
, "com.spinoco" %% "protocol-websocket" % "0.3.15"
, "co.fs2" %% "fs2-core" % "1.0.0"
, "co.fs2" %% "fs2-io" % "1.0.0"
, "com.spinoco" %% "fs2-crypto" % "0.4.0"
, "org.scalacheck" %% "scalacheck" % "1.13.4" % "test"
"org.scalacheck" %% "scalacheck" % "1.14.3" % "test"
, "com.spinoco" %% "protocol-http" % "0.4.0-M1"
, "com.spinoco" %% "protocol-websocket" % "0.4.0-M1"
, "co.fs2" %% "fs2-core" % "2.3.0"
, "co.fs2" %% "fs2-io" % "2.3.0"
),
libraryDependencies ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, v)) if v <= 12 =>
Seq(
compilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.full)
)
case _ =>
// if scala 2.13.0-M4 or later, macro annotations merged into scala-reflect
// https://github.com/scala/scala/pull/6606
Nil
}
},
scmInfo := Some(ScmInfo(url("https://github.com/Spinoco/fs2-http"), "git@github.com:Spinoco/fs2-http.git")),
homepage := None,
licenses += ("MIT", url("http://opensource.org/licenses/MIT")),
Expand Down
33 changes: 15 additions & 18 deletions src/main/scala/spinoco/fs2/http/HttpClient.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package spinoco.fs2.http

import java.nio.channels.AsynchronousChannelGroup

import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Sync, Timer}
import java.util.concurrent.TimeUnit

import cats.Applicative
import javax.net.ssl.SSLContext
import cats.effect._
import fs2._
import fs2.concurrent.SignallingRef
import fs2.io.tcp.Socket
import fs2.io.tcp.{Socket, SocketGroup}
import fs2.io.tls.TLSContext
import scodec.{Codec, Decoder, Encoder}
import spinoco.fs2.http.internal.{addressForRequest, clientLiftToSecure, readWithTimeout}
import spinoco.fs2.http.sse.{SSEDecoder, SSEEncoding}
Expand All @@ -17,7 +17,6 @@ import spinoco.protocol.http.header._
import spinoco.protocol.mime.MediaType
import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._


Expand Down Expand Up @@ -114,12 +113,10 @@ trait HttpClient[F[_]] {
def apply[F[_] : ConcurrentEffect : ContextShift : Timer](
requestCodec : Codec[HttpRequestHeader]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need still the timer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is required for the websocket client, as we need to do pings there.

, responseCodec : Codec[HttpResponseHeader]
, sslExecutionContext: => ExecutionContext
, sslContext : => SSLContext
)(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = Sync[F].delay {
lazy val sslCtx = sslContext
lazy val sslS = sslExecutionContext

)(
socketGroup: SocketGroup
, tlsContext: TLSContext
):F[HttpClient[F]] = Sync[F].delay {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for Sync, I think Applicative will do

new HttpClient[F] {
def request(
request: HttpRequest[F]
Expand All @@ -128,10 +125,10 @@ trait HttpClient[F[_]] {
, timeout: Duration
): Stream[F, HttpResponse[F]] = {
Stream.eval(addressForRequest[F](request.scheme, request.host)).flatMap { address =>
Stream.resource(io.tcp.client[F](address))
.evalMap { socket =>
if (!request.isSecure) Applicative[F].pure(socket)
else clientLiftToSecure[F](sslS, sslCtx)(socket, request.host)
Stream.resource(socketGroup.client(address))
.flatMap { socket =>
if (!request.isSecure) Stream.emit(socket)
else Stream.resource(clientLiftToSecure[F](tlsContext)(socket, request.host))
}
.flatMap { impl.request[F](request, chunkSize, maxResponseHeaderSize, timeout, requestCodec, responseCodec ) }}
}
Expand All @@ -143,7 +140,7 @@ trait HttpClient[F[_]] {
, chunkSize: Int
, maxFrameSize: Int
): Stream[F, Option[HttpResponseHeader]] =
WebSocket.client(request,pipe,maxResponseHeaderSize,chunkSize,maxFrameSize, requestCodec, responseCodec, sslS, sslCtx)
WebSocket.client(request,pipe,maxResponseHeaderSize,chunkSize,maxFrameSize, requestCodec, responseCodec)(socketGroup, tlsContext)


def sse[A : SSEDecoder](rq: HttpRequest[F], maxResponseHeaderSize: Int, chunkSize: Int): Stream[F, A] =
Expand Down Expand Up @@ -174,7 +171,7 @@ trait HttpClient[F[_]] {
timeout match {
case fin: FiniteDuration =>
eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { start =>
HttpRequest.toStream(request, requestCodec).to(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ =>
HttpRequest.toStream(request, requestCodec).through(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ =>
eval(SignallingRef[F, Boolean](true)).flatMap { timeoutSignal =>
eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { sent =>
val remains = fin - (sent - start).millis
Expand All @@ -186,7 +183,7 @@ trait HttpClient[F[_]] {
}}}}

case _ =>
HttpRequest.toStream(request, requestCodec).to(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ =>
HttpRequest.toStream(request, requestCodec).through(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ =>
socket.reads(chunkSize, None) through HttpResponse.fromStream[F](maxResponseHeaderSize, responseCodec)
}
}
Expand Down
35 changes: 18 additions & 17 deletions src/main/scala/spinoco/fs2/http/HttpRequestOrResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self =>

/** yields to true, if body of this request shall be chunked **/
lazy val bodyIsChunked : Boolean =
withHeaders(internal.bodyIsChunked)
withHeaders(spinoco.fs2.http.internal.bodyIsChunked)

/** allows to stream arbitrary sized stream of `A` to remote party (i.e. upload) **/
def withStreamBody[A](body: Stream[F, A])(implicit E: StreamBodyEncoder[F, A]): Self = {
Expand All @@ -37,7 +37,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self =>

/** sets body size to supplied value **/
def withBodySize(sz: Long): Self =
updateHeaders(withHeaders(internal.swapHeader(`Content-Length`(sz))))
updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Content-Length`(sz))))

/** gets body size, if one specified **/
def bodySize: Option[Long] =
Expand All @@ -46,6 +46,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self =>
protected def body: Stream[F, Byte]

/** encodes body `A` given BodyEncoder exists **/

def withBody[A](a: A)(implicit W: BodyEncoder[A], ev: RaiseThrowable[F]): Self = {
W.encode(a) match {
case Failure(err) => updateBody(body = Stream.raiseError(new Throwable(s"failed to encode $a: $err")))
Expand All @@ -70,7 +71,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self =>
withHeaders { _.collectFirst { case `Content-Type`(ct) => ct } match {
case None => F.pure(Attempt.failure(Err("Content type is not known")))
case Some(ct) =>
F.map(self.body.chunks.map(util.chunk2ByteVector).compile.toVector) { bs =>
F.map(self.body.chunks.map(_.toByteVector).compile.toVector) { bs =>
if (bs.isEmpty) Attempt.failure(Err("Body is empty"))
else D.decode(bs.reduce(_ ++ _), ct)
}
Expand All @@ -79,15 +80,15 @@ sealed trait HttpRequestOrResponse[F[_]] { self =>

/** gets body as stream of byteVectors **/
def bodyAsByteVectorStream:Stream[F,ByteVector] =
self.body.chunks.map(util.chunk2ByteVector)
self.body.chunks.map(_.toByteVector)

/** decodes body as string with encoding supplied in ContentType **/
def bodyAsString(implicit F: Sync[F]): F[Attempt[String]] =
bodyAs[String](BodyDecoder.stringDecoder, F)

/** updates content type to one specified **/
def withContentType(ct: ContentType): Self =
updateHeaders(withHeaders(internal.swapHeader(`Content-Type`(ct))))
updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Content-Type`(ct))))

/** gets ContentType, if one specififed **/
def contentType: Option[ContentType] =
Expand All @@ -96,7 +97,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self =>

/** configures encoding as chunked **/
def chunkedEncoding: Self =
updateHeaders(withHeaders(internal.swapHeader(`Transfer-Encoding`(List("chunked")))))
updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Transfer-Encoding`(List("chunked")))))

def withHeaders[A](f: List[HttpHeader] => A): A = self match {
case HttpRequest(_,_,header,_) => f(header.headers)
Expand Down Expand Up @@ -190,10 +191,10 @@ object HttpRequest {
)
, body = Stream.empty)

def post[F[_] : RaiseThrowable, A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] =
def post[F[_]: RaiseThrowable, A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] =
get(uri).withMethod(HttpMethod.POST).withBody(a)

def put[F[_] : RaiseThrowable, A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] =
def put[F[_]: RaiseThrowable, A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] =
get(uri).withMethod(HttpMethod.PUT).withBody(a)

def delete[F[_]](uri: Uri): HttpRequest[F] =
Expand All @@ -210,11 +211,11 @@ object HttpRequest {
* @tparam F
* @return
*/
def fromStream[F[_] : RaiseThrowable](
def fromStream[F[_]: RaiseThrowable](
maxHeaderSize: Int
, headerCodec: Codec[HttpRequestHeader]
): Pipe[F, Byte, (HttpRequestHeader, Stream[F, Byte])] = {
import internal._
import spinoco.fs2.http.internal._
_ through httpHeaderAndBody(maxHeaderSize) flatMap { case (header, bodyRaw) =>
headerCodec.decodeValue(header.bits) match {
case Failure(err) => Stream.raiseError(new Throwable(s"Decoding of the request header failed: $err"))
Expand All @@ -240,11 +241,11 @@ object HttpRequest {
* @param request request to convert to stream
* @param headerCodec Codec to convert the header to bytes
*/
def toStream[F[_] : RaiseThrowable](
def toStream[F[_]: RaiseThrowable](
request: HttpRequest[F]
, headerCodec: Codec[HttpRequestHeader]
): Stream[F, Byte] = Stream.suspend {
import internal._
import spinoco.fs2.http.internal._

headerCodec.encode(request.header) match {
case Failure(err) => Stream.raiseError(new Throwable(s"Encoding of the header failed: $err"))
Expand Down Expand Up @@ -283,7 +284,7 @@ final case class HttpResponse[F[_]](
def sseBody[A](in: Stream[F, A])(implicit E: SSEEncoder[A], ev: RaiseThrowable[F]): Self =
self
.updateBody(in through SSEEncoding.encodeA[F, A])
.updateHeaders(withHeaders(internal.swapHeader(`Content-Type`(ContentType.TextContent(MediaType.`text/event-stream`, None)))))
.updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Content-Type`(ContentType.TextContent(MediaType.`text/event-stream`, None)))))
}


Expand All @@ -301,11 +302,11 @@ object HttpResponse {
/**
* Decodes stream of bytes as HttpResponse.
*/
def fromStream[F[_] : RaiseThrowable](
def fromStream[F[_]: RaiseThrowable](
maxHeaderSize: Int
, responseCodec: Codec[HttpResponseHeader]
): Pipe[F,Byte, HttpResponse[F]] = {
import internal._
import spinoco.fs2.http.internal._

_ through httpHeaderAndBody(maxHeaderSize) flatMap { case (header, bodyRaw) =>
responseCodec.decodeValue(header.bits) match {
Expand All @@ -325,11 +326,11 @@ object HttpResponse {


/** Encodes response to stream of bytes **/
def toStream[F[_] : RaiseThrowable](
def toStream[F[_]: RaiseThrowable](
response: HttpResponse[F]
, headerCodec: Codec[HttpResponseHeader]
): Stream[F, Byte] = Stream.suspend {
import internal._
import spinoco.fs2.http.internal._

headerCodec.encode(response.header) match {
case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode http response : $response :$err "))
Expand Down
15 changes: 7 additions & 8 deletions src/main/scala/spinoco/fs2/http/HttpServer.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package spinoco.fs2.http

import java.net.InetSocketAddress
import java.nio.channels.AsynchronousChannelGroup

import cats.effect.{ConcurrentEffect, Sync, Timer}
import cats.effect.{ConcurrentEffect, ContextShift, Sync, Timer}
import cats.syntax.all._
import fs2._
import fs2.concurrent.SignallingRef
import fs2.io.tcp.SocketGroup
import scodec.Codec
import spinoco.protocol.http.codec.{HttpRequestHeaderCodec, HttpResponseHeaderCodec}
import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader, HttpStatusCode}
Expand Down Expand Up @@ -36,7 +36,7 @@ object HttpServer {
* Request is not suplied if failure happened before request was constructed.
*
*/
def apply[F[_] : ConcurrentEffect : Timer](
def apply[F[_] : ConcurrentEffect : Timer: ContextShift](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we (and same for client) get rid of apply[F] as being constructor? Lets do mk, and have apply as summoner of the instance

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed them both to mk, however server cannot have summoner instance as the result is Stream[F, Unit].

maxConcurrent: Int = Int.MaxValue
, receiveBufferSize: Int = 256 * 1024
, maxHeaderSize: Int = 10 *1024
Expand All @@ -48,19 +48,18 @@ object HttpServer {
, requestFailure : Throwable => Stream[F, HttpResponse[F]]
, sendFailure: (Option[HttpRequestHeader], HttpResponse[F], Throwable) => Stream[F, Nothing]
)(
implicit
AG: AsynchronousChannelGroup
socketGroup: SocketGroup
): Stream[F, Unit] = {
import Stream._
import internal._
import spinoco.fs2.http.internal._
val (initial, readDuration) = requestHeaderReceiveTimeout match {
case fin: FiniteDuration => (true, fin)
case _ => (false, 0.millis)
}

io.tcp.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource =>
socketGroup.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource =>
Stream.resource(resource).flatMap { socket =>
eval(SignallingRef(initial)).flatMap { timeoutSignal =>
eval(SignallingRef[F, Boolean](initial)).flatMap { timeoutSignal =>
readWithTimeout[F](socket, readDuration, timeoutSignal.get, receiveBufferSize)
.through(HttpRequest.fromStream(maxHeaderSize, requestCodec))
.flatMap { case (request, body) =>
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/spinoco/fs2/http/body/StreamBodyEncoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object StreamBodyEncoder {
StreamBodyEncoder.instance(ContentType.BinaryContent(MediaType.`application/octet-stream`, None)) { _.flatMap { bv => Stream.chunk(ByteVectorChunk(bv)) } }

/** encoder that encodes utf8 string, with `text/plain` utf8 content type **/
def utf8StringEncoder[F[_]](implicit F: MonadError[F, Throwable]) : StreamBodyEncoder[F, String] =
def utf8StringEncoder[F[_]: RaiseThrowable](implicit F: MonadError[F, Throwable]) : StreamBodyEncoder[F, String] =
byteVectorEncoder mapInF[String] { s =>
ByteVector.encodeUtf8(s) match {
case Right(bv) => F.pure(bv)
Expand All @@ -57,7 +57,7 @@ object StreamBodyEncoder {
} withContentType ContentType.TextContent(MediaType.`text/plain`, Some(MIMECharset.`UTF-8`))

/** a convenience wrapper to convert body encoder to StreamBodyEncoder **/
def fromBodyEncoder[F[_] : RaiseThrowable, A](implicit E: BodyEncoder[A]):StreamBodyEncoder[F, A] =
def fromBodyEncoder[F[_]: RaiseThrowable, A](implicit E: BodyEncoder[A]):StreamBodyEncoder[F, A] =
StreamBodyEncoder.instance(E.contentType) { _.flatMap { a =>
E.encode(a) match {
case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode: $err ($a)"))
Expand Down
Loading