Skip to content

Commit

Permalink
treat inputStream as stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Kamil-Lontkowski committed Mar 3, 2025
1 parent 87ed9f0 commit 8895a75
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[client4] trait BodyToHttpClient[F[_], S, R] {
case StreamBody(s) => streamToPublisher(s.asInstanceOf[streams.BinaryStream])
case m: MultipartBody[_] =>
val baseContentType = contentType.getOrElse("multipart/form-data")
val (body, boundary) = multiPartBodyBuilder.multipartBodyPublisher(m.parts)(monad)
val (body, boundary) = multiPartBodyBuilder(m.parts)(monad)
builder.header(HeaderNames.ContentType, s"$baseContentType; boundary=$boundary")
body
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._

trait MultipartBodyBuilder[BinaryStream, F[_]] {
def multipartBodyPublisher(
def apply(
parts: Seq[Part[GenericRequestBody[_]]]
)(implicit m: MonadError[F]): (F[HttpRequest.BodyPublisher], String)
}

class NonStreamMultipartBodyBuilder[BinaryStream, F[_]] extends MultipartBodyBuilder[BinaryStream, F] {
override def multipartBodyPublisher(
trait NonStreamMultipartBodyBuilder[BinaryStream, F[_]] extends MultipartBodyBuilder[BinaryStream, F] {
override def apply(
parts: Seq[Part[GenericRequestBody[_]]]
)(implicit m: MonadError[F]): (F[HttpRequest.BodyPublisher], String) = {
val multipartBuilder = new MultiPartBodyPublisher()
Expand Down Expand Up @@ -72,7 +72,9 @@ trait StreamMultipartBodyBuilder[BinaryStream, F[_]] extends MultipartBodyBuilde

def toPublisher(stream: BinaryStream): F[HttpRequest.BodyPublisher]

override def multipartBodyPublisher(
def inputStreamToStream(stream: InputStream): BinaryStream

override def apply(
parts: Seq[Part[GenericRequestBody[_]]]
)(implicit m: MonadError[F]): (F[HttpRequest.BodyPublisher], String) = {
val boundary: String = UUID.randomUUID.toString
Expand All @@ -96,22 +98,28 @@ trait StreamMultipartBodyBuilder[BinaryStream, F[_]] extends MultipartBodyBuilde
case ByteBufferBody(b, _) =>
if ((b: Buffer).isReadOnly) {
val buffer = new ByteBufferBackedInputStream(b)
concatBytesToStream(accumulatedStream, encodeBytes(buffer.readAllBytes(), partHeaders, boundary))
concatStreams(accumulatedStream, encodeStream(inputStreamToStream(buffer), partHeaders, boundary))
} else
concatBytesToStream(accumulatedStream, encodeBytes(b.array(), partHeaders, boundary))
case InputStreamBody(b, _) =>
concatBytesToStream(accumulatedStream, encodeBytes(b.readAllBytes(), partHeaders, boundary))
concatStreams(accumulatedStream, encodeStream(inputStreamToStream(b), partHeaders, boundary))
case StreamBody(s) =>
concatStreams(
concatBytesToStream(accumulatedStream, encodeHeaders(partHeaders, boundary)),
concatBytesToStream(s.asInstanceOf[BinaryStream], CRLFBytes)
)
concatStreams(accumulatedStream, encodeStream(s.asInstanceOf[BinaryStream], partHeaders, boundary))
case _: MultipartBody[_] => throwNestedMultipartNotAllowed
}
}
(toPublisher(concatBytesToStream(resultStream, lastBoundary(boundary))), boundary)
}

private def encodeStream(stream: BinaryStream, headers: Map[String, String], boundary: String): BinaryStream =
concatBytesToStream(
concatStreams(
byteArrayToStream(encodeHeaders(headers, boundary)),
stream
),
CRLFBytes
)

private def concatBytesToStream(stream: BinaryStream, array: Array[Byte]): BinaryStream =
concatStreams(stream, byteArrayToStream(array))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import java.net.http.HttpResponse
import java.net.http.HttpResponse.BodyHandlers
import java.nio.ByteBuffer
import java.io.File
import java.io.InputStream
import java.{util => ju}
import java.util.concurrent.Flow.Publisher
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -70,6 +71,8 @@ class HttpClientFs2Backend[F[_]: ConcurrentEffect: ContextShift] private (
new StreamMultipartBodyBuilder[streams.BinaryStream, F] {
override def fileToStream(file: File): streams.BinaryStream = fs2.io.file.readAll(file.toPath, blocker, 8192)
override def byteArrayToStream(array: Array[Byte]): streams.BinaryStream = Stream.emits(array)
override def inputStreamToStream(stream: InputStream): streams.BinaryStream =
fs2.io.readInputStream(monad.unit(stream), 8192, blocker, closeAfterUse = true)
override def concatStreams(
stream1: streams.BinaryStream,
stream2: streams.BinaryStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import java.net.http.HttpResponse
import java.net.http.HttpResponse.BodyHandlers
import java.nio.ByteBuffer
import java.io.File
import java.io.InputStream
import java.util
import java.{util => ju}
import java.util.concurrent.Flow.Publisher
Expand Down Expand Up @@ -75,6 +76,8 @@ class HttpClientFs2Backend[F[_]: Async] private (
override def fileToStream(file: File): streams.BinaryStream =
Files.forAsync[F].readAll(Path.fromNioPath(file.toPath))
override def byteArrayToStream(array: Array[Byte]): streams.BinaryStream = Stream.emits(array)
override def inputStreamToStream(stream: InputStream): streams.BinaryStream =
fs2.io.readInputStream(monad.unit(stream), 8192, closeAfterUse = true)
override def concatStreams(
stream1: streams.BinaryStream,
stream2: streams.BinaryStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import java.net.http.HttpResponse
import java.net.http.HttpResponse.BodyHandlers
import java.nio.ByteBuffer
import java.io.File
import java.io.InputStream
import java.util
import java.{util => ju}
import java.util.concurrent.Flow.Publisher
Expand Down Expand Up @@ -91,6 +92,8 @@ class HttpClientZioBackend private (
new StreamMultipartBodyBuilder[ZioStreams.BinaryStream, Task] {
override def fileToStream(file: File): streams.BinaryStream = ZStream.fromFile(file, 8192)
override def byteArrayToStream(array: Array[Byte]): streams.BinaryStream = ZStream.fromIterable(array)
override def inputStreamToStream(stream: InputStream): streams.BinaryStream =
ZStream.fromInputStream(stream, 8192)
override def concatStreams(
stream1: streams.BinaryStream,
stream2: streams.BinaryStream
Expand Down

0 comments on commit 8895a75

Please sign in to comment.