From 54f3383602bd5eff99c4c4e1c4f837b85d18ce76 Mon Sep 17 00:00:00 2001 From: Peter Banda Date: Thu, 19 Dec 2024 10:05:50 +0100 Subject: [PATCH] Removing AWS stream frame decoder --- .../impl/AwsEventStreamFrameDecoder.scala | 59 ------------------- 1 file changed, 59 deletions(-) delete mode 100644 anthropic-client/src/main/scala/io/cequence/openaiscala/anthropic/service/impl/AwsEventStreamFrameDecoder.scala diff --git a/anthropic-client/src/main/scala/io/cequence/openaiscala/anthropic/service/impl/AwsEventStreamFrameDecoder.scala b/anthropic-client/src/main/scala/io/cequence/openaiscala/anthropic/service/impl/AwsEventStreamFrameDecoder.scala deleted file mode 100644 index 0a9555ab..00000000 --- a/anthropic-client/src/main/scala/io/cequence/openaiscala/anthropic/service/impl/AwsEventStreamFrameDecoder.scala +++ /dev/null @@ -1,59 +0,0 @@ -package io.cequence.openaiscala.anthropic.service.impl - -import akka.stream._ -import akka.stream.stage._ -import akka.util.ByteString - -class AwsEventStreamFrameDecoder extends GraphStage[FlowShape[ByteString, ByteString]] { - val in = Inlet[ByteString]("AwsEventStreamFrameDecoder.in") - val out = Outlet[ByteString]("AwsEventStreamFrameDecoder.out") - override val shape = FlowShape(in, out) - - private implicit val order = java.nio.ByteOrder.BIG_ENDIAN - - override def createLogic(attrs: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - var buffer = ByteString.empty - - setHandler( - in, - new InHandler { - override def onPush(): Unit = { - buffer ++= grab(in) - emitFrames() - } - override def onUpstreamFinish(): Unit = { - emitFrames() - if (buffer.isEmpty) completeStage() - else failStage(new RuntimeException("Truncated frame at stream end")) - } - } - ) - - setHandler( - out, - new OutHandler { - override def onPull(): Unit = { - if (!hasBeenPulled(in)) pull(in) - } - } - ) - - def emitFrames(): Unit = { - while (buffer.size >= 4) { - val totalLength = buffer.iterator.getInt - - if (buffer.size < 4 + totalLength) { - // not enough data yet - return - } - val frame = buffer.slice(4, 4 + totalLength) - buffer = buffer.drop(4 + totalLength) - emit(out, frame) - } - - if (!hasBeenPulled(in) && !isClosed(in)) { - pull(in) - } - } - } -}