diff --git a/http/src/main/kotlin/io/wavebeans/http/JavalinServletHandler.kt b/http/src/main/kotlin/io/wavebeans/http/JavalinServletHandler.kt index a9eb10f8..2f16510f 100644 --- a/http/src/main/kotlin/io/wavebeans/http/JavalinServletHandler.kt +++ b/http/src/main/kotlin/io/wavebeans/http/JavalinServletHandler.kt @@ -6,6 +6,7 @@ import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.channel.socket.SocketChannel import io.netty.handler.codec.http.* +import io.wavebeans.lib.table.TableRegistry import mu.KotlinLogging import javax.servlet.http.HttpServlet @@ -17,6 +18,15 @@ class JavalinServletHandler( companion object { private val log = KotlinLogging.logger { } + + fun newInstance(tableRegistry: TableRegistry): JavalinServletHandler { + return JavalinServletHandler(DefaultJavalinApp( + listOf( + { it.tableService(tableRegistry) }, + { it.audioService(tableRegistry) } + ) + )) + } } private val javalin by lazy { Javalin.createStandalone() } diff --git a/http/src/main/kotlin/io/wavebeans/http/WbHttpService.kt b/http/src/main/kotlin/io/wavebeans/http/WbHttpService.kt index 6a23b354..608a5f2a 100644 --- a/http/src/main/kotlin/io/wavebeans/http/WbHttpService.kt +++ b/http/src/main/kotlin/io/wavebeans/http/WbHttpService.kt @@ -25,21 +25,18 @@ class WbHttpService( private val log = KotlinLogging.logger { } } - private val handlers: List by lazy { - listOf( - JavalinServletHandler(DefaultJavalinApp( - listOf( - { it.tableService(tableRegistry) }, - { it.audioService(tableRegistry) } - ) - )) - ) - } + private val handlers: MutableList = ArrayList() + private val bossGroup: EventLoopGroup = NioEventLoopGroup() private val workerGroup: EventLoopGroup = NioEventLoopGroup() private var server: ChannelFuture? = null private var communicatorServer: Server? = null + fun addHandler(handler: WbNettyHandler): WbHttpService { + handlers += handler + return this + } + fun start(andWait: Boolean = false): WbHttpService { startCommunicator() startNetty(andWait) diff --git a/http/src/main/kotlin/io/wavebeans/http/rtsp/MediaAnnouncement.kt b/http/src/main/kotlin/io/wavebeans/http/rtsp/MediaAnnouncement.kt new file mode 100644 index 00000000..97d623bb --- /dev/null +++ b/http/src/main/kotlin/io/wavebeans/http/rtsp/MediaAnnouncement.kt @@ -0,0 +1,9 @@ +package io.wavebeans.http.rtsp + +data class MediaAnnouncement( + val media: String, + val port: Int, + val numberOfPorts: Int, + val transport: String, + val fmtList: List +) \ No newline at end of file diff --git a/http/src/main/kotlin/io/wavebeans/http/rtsp/RtpMapping.kt b/http/src/main/kotlin/io/wavebeans/http/rtsp/RtpMapping.kt new file mode 100644 index 00000000..28e4b8f1 --- /dev/null +++ b/http/src/main/kotlin/io/wavebeans/http/rtsp/RtpMapping.kt @@ -0,0 +1,8 @@ +package io.wavebeans.http.rtsp + +data class RtpMapping( + val payloadType: Int, + val encoding: String, + val clockRate: Int, + val encodingParameters: String? +) \ No newline at end of file diff --git a/http/src/main/kotlin/io/wavebeans/http/rtsp/RtpsRecordControllerHandler.kt b/http/src/main/kotlin/io/wavebeans/http/rtsp/RtpsRecordControllerHandler.kt new file mode 100644 index 00000000..3d031fa7 --- /dev/null +++ b/http/src/main/kotlin/io/wavebeans/http/rtsp/RtpsRecordControllerHandler.kt @@ -0,0 +1,214 @@ +package io.wavebeans.http.rtsp + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandler +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelInboundHandlerAdapter +import io.netty.channel.socket.SocketChannel +import io.netty.handler.codec.http.DefaultFullHttpResponse +import io.netty.handler.codec.http.FullHttpRequest +import io.netty.handler.codec.http.HttpObjectAggregator +import io.netty.handler.codec.rtsp.* +import io.wavebeans.http.WbNettyHandler +import mu.KotlinLogging +import java.io.ByteArrayOutputStream +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import kotlin.random.Random + +@ChannelHandler.Sharable +class RtpsRecordControllerHandler( + val bufferHandler: (Long, RtpMapping, ByteArray) -> Unit, + val tearDownHandler: (Long, RtpMapping) -> Unit, + val sessionTtl: Long = 60_000L +) : ChannelInboundHandlerAdapter(), WbNettyHandler { + + companion object { + private val log = KotlinLogging.logger { } + } + + private val channels = ConcurrentHashMap() + private val sessions = ConcurrentHashMap() + private val rtspRecordReceiverHandler = RtspRecordReceiverHandler() + private val scheduler = Executors.newSingleThreadScheduledExecutor() + + override fun init() { + scheduler.scheduleAtFixedRate({ + sessions.forEach { (sessionId, session) -> + if (session.isTooOld(sessionTtl)) { + doTearDown(sessionId) + } + } + }, 0, 500, TimeUnit.MILLISECONDS) + } + + override fun initChannel(channel: SocketChannel) { + channel.pipeline().addLast(rtspRecordReceiverHandler) + .addLast(RtspDecoder()) + .addLast(HttpObjectAggregator(4 * 1024)) + .addLast(RtspEncoder()) + .addLast(this) + } + + override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { + val channelId = ctx.channel().id().asLongText() + channels[channelId]?.let { it.accessedLastTime = System.currentTimeMillis() } + log.trace { "Received $msg over channelId=$channelId" } + if (msg is FullHttpRequest && msg.protocolVersion() == RtspVersions.RTSP_1_0) { + log.trace { "Handling method ${msg.method()} on ${msg.uri()}" } + val response = DefaultFullHttpResponse(RtspVersions.RTSP_1_0, RtspResponseStatuses.OK) + msg.headers()[RtspHeaderNames.CSEQ]?.let { response.headers().add(RtspHeaderNames.CSEQ, it) } + val sessionId = msg.headers()[RtspHeaderNames.SESSION] + when (msg.method()) { + RtspMethods.OPTIONS -> { + response.headers().add( + RtspHeaderNames.PUBLIC, + listOf( + RtspMethods.OPTIONS, + RtspMethods.ANNOUNCE, + RtspMethods.SETUP, + RtspMethods.RECORD, + RtspMethods.TEARDOWN, + ).joinToString(", ") + ) + } + RtspMethods.ANNOUNCE -> { + channels.computeIfAbsent(channelId) { + log.info { "New channel $channelId registered" } + RtspControllerChannelState.create() + } + val msgContent = msg.content() + handleAnnounce(msgContent, channelId, msg.uri()) + } + RtspMethods.SETUP -> { + val transport = checkNotNull(msg.headers()[RtspHeaderNames.TRANSPORT]) { "Header ${RtspHeaderNames.TRANSPORT} is not found but required" } + val sid = doSetup(channelId, transport) + response.headers().add(RtspHeaderNames.TRANSPORT, transport) + response.headers().add(RtspHeaderNames.SESSION, sid) + } + RtspMethods.RECORD -> { + val sid = checkNotNull(sessionId) { "Header `${RtspHeaderNames.SESSION}` is not found but required." }.toLong() + doRecord(sid) + } + RtspMethods.TEARDOWN -> { + val sid = checkNotNull(sessionId) { "Header `${RtspHeaderNames.SESSION}` is not found but required." }.toLong() + doTearDown(sid) + } + else -> throw UnsupportedOperationException() + } + log.trace { "Responding $response" } + ctx.write(response) + ctx.flush() + } + } + + override fun close() { + log.info { "Session clean up scheduler shutting down" } + scheduler.shutdown() + if (!scheduler.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + scheduler.shutdown() + } + log.info { "Session clean up scheduler has shut down" } + } + + private fun doRecord(sessionId: Long) { + val session = sessions.getValue(sessionId) + session.accessed() + log.debug { "[sid=$sessionId] Started record" } + rtspRecordReceiverHandler.registerSession(session) + } + + private fun doSetup(channelId: String, transport: String): Long { + val sessionId = Random.nextLong(Long.MAX_VALUE) + + val values = transport.split(";") + if (values[0] != "RTP/AVP/TCP") throw UnsupportedOperationException("Only TCP is supported") + if (values[1] != "unicast") throw UnsupportedOperationException("Only `unicast` is supported") + val mode = values.firstOrNull { it.startsWith("mode") } + ?.split("=", limit = 2) + ?.get(1) + if (mode?.toLowerCase() != "record") throw UnsupportedOperationException("mode=$mode is not supported") + + val interleaved = values.firstOrNull { it.startsWith("interleaved") } + ?.split("=", limit = 2) + ?.get(1) + ?: throw UnsupportedOperationException("interleaved must be specified") + val (data, manage) = interleaved.split("-", limit = 2).map { it.toInt() } + + val channelState = channels.getValue(channelId) + val channelMappings = channelState.announcements.map { + require(it.fmtList.size == 1) { "fmtList != 1 is not supported" } + val fmt = it.fmtList.first() + require(fmt >= 96) { "Built in formats are not supported, only custom >= 96" } + val rtmMap = checkNotNull(channelState.rtpMappings.firstOrNull { it.payloadType == fmt }) { + "Format $fmt is not found among mappings: ${channelState.rtpMappings}" + } + it.port to rtmMap + }.toMap() + + val session = RtspRecordingSession.create(channelId, sessionId, data, manage, channelMappings, bufferHandler, tearDownHandler) + require(sessions.putIfAbsent(sessionId, session) == null) { + "Can't create session with id $sessionId as it already exists o_O" + } + channels.remove(channelId) // no longer needed channel state -- proper session started + return sessionId + } + + private fun handleAnnounce(msgContent: ByteBuf, channelId: String, uri: String) { + val buffer = ByteArrayOutputStream() + msgContent.readBytes(buffer, msgContent.readableBytes()) + val content = String(buffer.toByteArray()) + .split("[\r\n]+".toRegex()) + .filterNot(String::isEmpty) + log.debug { "[channelId=$channelId] Announced the following content:\n${content.joinToString("\n")}" } + + // search for media name and transport address + require(content.single { it.startsWith("v=") } == "v=0") { "Only version 0 is supported" } + val announced = content.filter { it.startsWith("m=") } + .map { mediaAnnouncementString -> + val d = mediaAnnouncementString.removePrefix("m=").split(" ") + require(d.size >= 4) { "The Media Announcement `$mediaAnnouncementString` doesn't have all expected elements (>=4)." } + val media = d[0] + val (port, portNumber) = if (d[1].indexOf('/') < 0) { + listOf(d[1].toInt(), 1) + } else { + d[1].split("/", limit = 2).map { it.toInt() } + } + val transport = d[2] + val fmtList = d.subList(3, d.size).map { it.toInt() } + require(fmtList.size == 1) { "fmtList != 1 is not supported" } + val fmt = fmtList.first() + require(fmt >= 96) { "Built in formats are not supported, only custom >= 96" } + MediaAnnouncement(media, port, portNumber, transport, fmtList) + } + + log.debug { "[channelId=$channelId] Announced media: $announced" } + + val rtpMappings = content.filter { it.startsWith("a=rtpmap:") } + .map { rtpMapString -> + val (payloadType, format) = rtpMapString.removePrefix("a=rtpmap:") + .split(" ", limit = 2) + .let { Pair(it[0].toInt(), it[1]) } + val (encoding, clockRate, encodingParameters) = format.split("/") + .let { Triple(it[0], it[1].toInt(), if (it.size > 2) it[2] else null) } + RtpMapping(payloadType, encoding, clockRate, encodingParameters) + } + log.debug { "[channelId=$channelId] RTP mappings: $rtpMappings" } + + channels.getValue(channelId).apply { + this.announcements.addAll(announced) + this.rtpMappings.addAll(rtpMappings) + this.path = uri + } + } + + private fun doTearDown(sessionId: Long) { + val session = sessions.remove(sessionId) ?: return + rtspRecordReceiverHandler.unregisterSession(session.nettyChannelId) + val mapping = checkNotNull(session.channelMappings.get(session.dataChannel)) { "Can't determine mapping for session=$session" } + log.info { "Finished buffered streaming. Tearing down with mapping=$mapping..." } + session.tearDownHandler.invoke(sessionId, mapping) + + } +} \ No newline at end of file diff --git a/http/src/main/kotlin/io/wavebeans/http/rtsp/RtspControllerChannelState.kt b/http/src/main/kotlin/io/wavebeans/http/rtsp/RtspControllerChannelState.kt new file mode 100644 index 00000000..612dcdce --- /dev/null +++ b/http/src/main/kotlin/io/wavebeans/http/rtsp/RtspControllerChannelState.kt @@ -0,0 +1,13 @@ +package io.wavebeans.http.rtsp + +data class RtspControllerChannelState( + val announcements: MutableSet, + val rtpMappings: MutableSet, + var accessedLastTime: Long, + var path: String = "/" +) { + + companion object { + fun create(): RtspControllerChannelState = RtspControllerChannelState(HashSet(), HashSet(), System.currentTimeMillis()) + } +} \ No newline at end of file diff --git a/http/src/main/kotlin/io/wavebeans/http/rtsp/RtspRecordReceiverChannelState.kt b/http/src/main/kotlin/io/wavebeans/http/rtsp/RtspRecordReceiverChannelState.kt new file mode 100644 index 00000000..a25e168d --- /dev/null +++ b/http/src/main/kotlin/io/wavebeans/http/rtsp/RtspRecordReceiverChannelState.kt @@ -0,0 +1,21 @@ +package io.wavebeans.http.rtsp + +class RtspRecordReceiverChannelState( + val session: RtspRecordingSession, + // 0 - searching for the packet, + // 1 - waiting for the channel, + // 2 - waiting for the packet size byte 0, + // 3 - waiting for the packet size byte 1, + // 4 - reading the buffer + var currentState: Int = 0, + var currentBuffer: ByteArray = ByteArray(0), + var currentPacketSize: Int = 0, + var bytesLeftToRead: Int = 0, + var currentChannel: Int = 0, +) { + companion object { + fun create(session: RtspRecordingSession): RtspRecordReceiverChannelState { + return RtspRecordReceiverChannelState(session) + } + } +} \ No newline at end of file diff --git a/http/src/main/kotlin/io/wavebeans/http/rtsp/RtspRecordReceiverHandler.kt b/http/src/main/kotlin/io/wavebeans/http/rtsp/RtspRecordReceiverHandler.kt new file mode 100644 index 00000000..e8c9fb3a --- /dev/null +++ b/http/src/main/kotlin/io/wavebeans/http/rtsp/RtspRecordReceiverHandler.kt @@ -0,0 +1,172 @@ +package io.wavebeans.http.rtsp + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelInboundHandlerAdapter +import io.netty.handler.codec.rtsp.RtspMethods +import mu.KotlinLogging +import java.io.ByteArrayOutputStream +import java.util.concurrent.ConcurrentHashMap +import kotlin.math.min + +class RtspRecordReceiverHandler : ChannelInboundHandlerAdapter() { + + private val log = KotlinLogging.logger { } + + private val passthroughTokens = listOf( + RtspMethods.DESCRIBE.name(), + RtspMethods.OPTIONS.name(), + RtspMethods.ANNOUNCE.name(), + RtspMethods.RECORD.name(), + RtspMethods.PAUSE.name(), + RtspMethods.PLAY.name(), + RtspMethods.TEARDOWN.name(), + RtspMethods.GET_PARAMETER.name(), + RtspMethods.REDIRECT.name(), + RtspMethods.SET_PARAMETER.name(), + RtspMethods.SETUP.name(), + ).map { it.toByteArray() } + + private val passthroughTokenMaxLength = passthroughTokens.maxOf { it.size } + private val channelStates = ConcurrentHashMap() + + fun registerSession(session: RtspRecordingSession) { + log.debug { "Attempting register session $session" } + require( + channelStates.putIfAbsent( + session.nettyChannelId, + RtspRecordReceiverChannelState.create(session) + ) == null + ) { + "Channel ${session.nettyChannelId} is already registered" + } + } + + fun unregisterSession( + channelId: String + ) { + log.debug { "Attempting unregister session for channel $channelId" } + channelStates.remove(channelId) + } + + override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { + log.trace { "Received $msg" } + if (msg is ByteBuf) { + val previewSize = min(passthroughTokenMaxLength, msg.readableBytes()) + val preview = ByteArray(previewSize) + msg.copy(0, previewSize).readBytes(preview) + if (passthroughTokens.any { it.contentEquals(preview.copyOfRange(0, it.size)) }) { + log.trace { "Passing through $msg" } + ctx.fireChannelRead(msg) + } else { + val channelId = ctx.channel().id().asLongText() + log.trace { "Handling $msg as RTP for channelId=$channelId" } + val state = checkNotNull(channelStates[channelId]) { + "Channel $channelId should have been registered by that moment" + } + state.session.accessed() + val buffer = ByteArrayOutputStream() + msg.readBytes(buffer, msg.readableBytes()) // read the rest + msg.release() + val bytes = buffer.toByteArray() + ctx.fireChannelReadComplete() + var i = 0 + while (i < bytes.size) { + when (state.currentState) { + 0 -> { + if (bytes[i] == '$'.toByte()) state.currentState++ + } + 1 -> { + state.currentChannel = bytes[i].toInt() and 0xFF + state.currentState++ + } + 2 -> { + state.currentPacketSize = (bytes[i].toInt() and 0xFF) shl 8 + state.currentState++ + } + 3 -> { + state.currentPacketSize = state.currentPacketSize or (bytes[i].toInt() and 0xFF) + state.currentState++ + log.trace { "Located packet of channel=${state.currentChannel}, bytesInThePacket=${state.currentPacketSize}." } + state.currentBuffer = ByteArray(state.currentPacketSize) + state.bytesLeftToRead = state.currentPacketSize + } + 4 -> { + if (state.bytesLeftToRead > 0) { + state.currentBuffer[state.currentPacketSize - state.bytesLeftToRead] = bytes[i] + state.bytesLeftToRead-- + } + if (state.bytesLeftToRead == 0) { + log.trace { "Read the packet of channel=${state.currentChannel}, bytesInThePacket=${state.currentPacketSize}" } + // read RTP header: https://tools.ietf.org/html/rfc3550#section-5.1 + val i1 = state.currentBuffer.take(4) + .mapIndexed { j, b -> (b.toLong() and 0xFF) shl (8 * (3 - j)) } + .reduce { acc, j -> acc or j } + val version = (i1 ushr 30) and 0x03 + require(version == 2L) { "RTPHeader.version=$version. Version 2 is supported only." } + val padding = (i1 ushr 29) and 0x01 + val extension = (i1 ushr 28) and 0x01 + require(extension == 0L) { "RTPHeader.extension=$extension. Non-0 value is not implemented." } + val csrcCount = (i1 ushr 24) and 0x07 + require(csrcCount == 0L) { "RTPHeader.csrcCount=$csrcCount. Non-0 is not implemented." } + val marker = (i1 ushr 23) and 0x01 +// require(marker == 0L) { "RTPHeader.marker=$marker. Non-0 value is not implemented." } + val payload = (i1 ushr 16) and 0x7F + val sequenceNumber = i1 and 0xFFFF + val timestamp = state.currentBuffer.drop(4).take(4) + .mapIndexed { j, b -> (b.toLong() and 0xFF) shl (8 * (3 - j)) } + .reduce { acc, j -> acc or j } + val ssrc = state.currentBuffer.drop(8).take(4) + .mapIndexed { j, b -> (b.toLong() and 0xFF) shl (8 * (3 - j)) } + .reduce { acc, j -> acc or j } + val csrc = (0 until csrcCount).map { + state.currentBuffer.drop(12 + 4 * i).take(4) + .mapIndexed { j, b -> (b.toLong() and 0xFF) shl (8 * (3 - j)) } + .reduce { acc, j -> acc or j } + } + + log.trace { + """ + RTP Header: + version=$version + padding=$padding + extension=$extension + csrcCount=$csrcCount + marker=$marker + payload=$payload + sequenceNumber=$sequenceNumber + timestamp=$timestamp + ssrc=$ssrc + csrc=$csrc + """.trimIndent() + } + + val rtpHeaderSize = 12 + csrcCount.toInt() + + if (state.currentChannel == state.session.dataChannel) { + state.session.bufferHandler.invoke( + state.session.sessionId, + state.session.channelMappings.getValue(state.currentChannel), + state.currentBuffer.copyOfRange(rtpHeaderSize, state.currentBuffer.size) + ) + } + state.currentState = 0 + } + } + else -> throw UnsupportedOperationException("state=${state.currentState}") + } + i++ + } + log.trace { "Finished the buffer with state=${state.currentState}, bytesLeftToRead=${state.bytesLeftToRead}" } + } + } else { + throw UnsupportedOperationException("$msg is unsupported") + } + } + + override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + // Close the connection when an exception is raised. + log.error(cause) { "Error in $ctx" } + ctx.close() + } +} \ No newline at end of file diff --git a/http/src/main/kotlin/io/wavebeans/http/rtsp/RtspRecordingSession.kt b/http/src/main/kotlin/io/wavebeans/http/rtsp/RtspRecordingSession.kt new file mode 100644 index 00000000..1f19a153 --- /dev/null +++ b/http/src/main/kotlin/io/wavebeans/http/rtsp/RtspRecordingSession.kt @@ -0,0 +1,40 @@ +package io.wavebeans.http.rtsp + +data class RtspRecordingSession( + val nettyChannelId: String, + val sessionId: Long, + val dataChannel: Int, + val manageChannel: Int, + val channelMappings: Map, + val bufferHandler: ((Long, RtpMapping, ByteArray) -> Unit), + val tearDownHandler: ((Long, RtpMapping) -> Unit), + private var accessedLastTime: Long = Long.MAX_VALUE +) { + companion object { + fun create( + nettyChannelId: String, + sessionId: Long, + dataChannel: Int, + manageChannel: Int, + channelMappings: Map, + bufferHandler: (Long, RtpMapping, ByteArray) -> Unit, + tearDownHandler: ((Long, RtpMapping) -> Unit) + ): RtspRecordingSession { + return RtspRecordingSession( + nettyChannelId, + sessionId, + dataChannel, + manageChannel, + channelMappings, + bufferHandler, + tearDownHandler + ) + } + } + + fun accessed() { + accessedLastTime = System.currentTimeMillis() + } + + fun isTooOld(ttl: Long): Boolean = System.currentTimeMillis() - accessedLastTime > ttl +} \ No newline at end of file diff --git a/http/src/main/kotlin/io/wavebeans/http/rtsp/RtspSessionClient.kt b/http/src/main/kotlin/io/wavebeans/http/rtsp/RtspSessionClient.kt new file mode 100644 index 00000000..08dc94d4 --- /dev/null +++ b/http/src/main/kotlin/io/wavebeans/http/rtsp/RtspSessionClient.kt @@ -0,0 +1,234 @@ +package io.wavebeans.http.rtsp + +import io.netty.bootstrap.Bootstrap +import io.netty.channel.* +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.NioSocketChannel +import io.netty.handler.codec.http.* +import io.netty.handler.codec.rtsp.* +import io.netty.handler.logging.LogLevel +import io.netty.handler.logging.LoggingHandler +import mu.KotlinLogging +import java.io.BufferedInputStream +import java.io.Closeable +import java.io.InputStream +import java.nio.charset.Charset +import java.util.concurrent.* +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference +import kotlin.random.Random + +class RtspSessionClient( + private val host: String, + private val port: Int, + path: String +) : Closeable { + + private val log = KotlinLogging.logger { } + + private val executor = Executors.newSingleThreadExecutor() + private val workerGroup: EventLoopGroup = NioEventLoopGroup() + + private val requestSemaphore = Semaphore(1) + + @Volatile + private var resultLatch = CountDownLatch(1) + private val result = AtomicReference(null) + private val client = client() + private val uri = "rtsp://$host:$port$path" + + private var sessionId: String? = null + private val cseq = AtomicInteger(1) + private val streamCounter = AtomicInteger(Random.nextInt(Int.MAX_VALUE / 4)) + + inner class RtspClientHandler : ChannelInboundHandlerAdapter() { + + override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { + log.info { "RtspClientHandler.channelRead(ctx=$ctx,msg=$msg)" } + if (msg is FullHttpResponse) { + result.set(msg) + } else { + result.set(UnsupportedOperationException("$msg is unsupported")) + } + resultLatch.countDown() + } + } + + fun options(): Future { + val request = DefaultFullHttpRequest(RtspVersions.RTSP_1_0, RtspMethods.OPTIONS, "*") + request.headers().add(RtspHeaderNames.CSEQ, cseq.getAndIncrement()) + return doRequest(request) { + log.info { "OPTIONS request resulted with $it" } + } + } + + fun announce( + title: String, + format: String, + formatId: Int, + port: Int + ): Future { + val buffer = client.alloc().buffer() + val contentLength = buffer.writeCharSequence(""" + v=0 + o=$title + m=audio $port RTP/AVP $formatId + a=rtpmap:$formatId $format + """.trimIndent(), Charset.defaultCharset()) + val request = DefaultFullHttpRequest( + RtspVersions.RTSP_1_0, + RtspMethods.ANNOUNCE, + uri, + buffer + ) + request.headers() + .add(RtspHeaderNames.CSEQ, cseq.getAndIncrement()) + .add(HttpHeaderNames.CONTENT_TYPE, "application/sdp") + .add(HttpHeaderNames.CONTENT_LENGTH, contentLength) + return doRequest(request) { + log.info { "ANNOUNCE request resulted with $it" } + } + } + + fun setup(mode: String, dataPort: Int): Future { + require(dataPort % 2 == 0) { "Data port $dataPort should be even" } + val managePort = dataPort + 1 + val request = DefaultFullHttpRequest( + RtspVersions.RTSP_1_0, + RtspMethods.SETUP, + uri + ) + request.headers() + .add(RtspHeaderNames.CSEQ, cseq.getAndIncrement()) + .add(RtspHeaderNames.TRANSPORT, "RTP/AVP/TCP;unicast;mode=$mode;interleaved=$dataPort-$managePort") + return doRequest(request) { + log.info { "SETUP request resulted with $it" } + sessionId = it.headers()[RtspHeaderNames.SESSION] + } + } + + fun record(): Future { + val request = DefaultFullHttpRequest( + RtspVersions.RTSP_1_0, + RtspMethods.RECORD, + uri + ) + request.headers() + .add(RtspHeaderNames.CSEQ, cseq.getAndIncrement()) + .add(RtspHeaderNames.SESSION, sessionId!!) + return doRequest(request) { + log.info { "RECORD request resulted with $it" } + } + } + + fun streamData(formatId: Int, channel: Int, stream: InputStream, maxPacketSize: Int = 1024) { + val buf = client.alloc().buffer() + val b = ByteArray(maxPacketSize) + val packetHeader = ByteArray(4) + packetHeader[0] = '$'.toByte() + packetHeader[1] = channel.toByte() + val rtpHeaderSize = 12 + val rtpHeader = ByteArray(rtpHeaderSize) + rtpHeader[0] = ( + (2 shl 6) or // version + (0 shl 5) or // padding + (0 shl 4) or // extension + (0) // csrc count + ).toByte() + rtpHeader[1] = ( + (0 shl 7) or // marker + (formatId and 0x7F) // payload type + ).toByte() + val ssrc = sessionId.hashCode() + rtpHeader[8] = (ssrc ushr 24 and 0xFF).toByte() + rtpHeader[9] = (ssrc ushr 16 and 0xFF).toByte() + rtpHeader[10] = (ssrc ushr 8 and 0xFF).toByte() + rtpHeader[11] = (ssrc and 0xFF).toByte() + BufferedInputStream(stream).use { reader -> + val bytesRead = reader.read(b) + val contentSize = bytesRead + rtpHeaderSize + packetHeader[2] = (contentSize ushr 8 and 0xFF).toByte() + packetHeader[3] = (contentSize and 0xFF).toByte() + val counter = streamCounter.getAndIncrement() + rtpHeader[2] = (counter ushr 8 and 0xFF).toByte() + rtpHeader[3] = (counter and 0xFF).toByte() + val timestamp = (System.currentTimeMillis() % Int.MAX_VALUE.toLong()).toInt() + rtpHeader[4] = (timestamp ushr 24 and 0xFF).toByte() + rtpHeader[5] = (timestamp ushr 16 and 0xFF).toByte() + rtpHeader[6] = (timestamp ushr 8 and 0xFF).toByte() + rtpHeader[7] = (timestamp and 0xFF).toByte() + + buf.writeBytes(packetHeader) + buf.writeBytes(rtpHeader) + buf.writeBytes(b, 0, bytesRead) + client.writeAndFlush(buf).sync() + } + } + + fun tearDown(): Future { + val request = DefaultFullHttpRequest( + RtspVersions.RTSP_1_0, + RtspMethods.TEARDOWN, + uri + ) + request.headers() + .add(RtspHeaderNames.CSEQ, cseq.getAndIncrement()) + .add(RtspHeaderNames.SESSION, sessionId) + return doRequest(request) { + log.info { "TEARDOWN request resulted with $it" } + } + } + + override fun close() { + client.close().sync() + workerGroup.shutdownGracefully().sync() + executor.shutdown() + if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + executor.shutdownNow() + } + } + + private fun doRequest(request: HttpRequest, resultHandler: (I) -> O): Future { + if (!requestSemaphore.tryAcquire()) throw IllegalStateException("Another request is in progress") + return executor.submit(Callable { + resultLatch = CountDownLatch(1) + client.writeAndFlush(request) + val result = awaitResult() + requestSemaphore.release() + resultHandler(result) + }) + } + + private fun awaitResult(timeoutMs: Long = 5000): T { + if (resultLatch.await(timeoutMs, TimeUnit.MILLISECONDS)) { + val result = checkNotNull(result.get()) { "Result is returned as null" } + if (result is Exception) throw result + @Suppress("UNCHECKED_CAST") + return result as T + } else { + throw TimeoutException("Didn't get result within $timeoutMs ms") + } + } + + private fun client(): Channel { + val b = Bootstrap() + b.group(workerGroup) + b.channel(NioSocketChannel::class.java) + b.option(ChannelOption.SO_KEEPALIVE, true) + b.handler(object : ChannelInitializer() { + @Throws(Exception::class) + override fun initChannel(ch: SocketChannel) { + ch.pipeline() + .addLast(LoggingHandler(RtspSessionClient::class.java, LogLevel.TRACE)) + .addLast(RtspEncoder()) + .addLast(RtspDecoder()) + .addLast(HttpObjectAggregator(4 * 1024)) + .addLast(RtspClientHandler()) + } + }) + + // Start the client. + return b.connect(host, port).sync().channel() + } +} \ No newline at end of file diff --git a/http/src/test/kotlin/io/wavebeans/http/WbHttpServiceIntegrationSpec.kt b/http/src/test/kotlin/io/wavebeans/http/WbHttpServiceIntegrationSpec.kt index b6699d3b..8cb1af0d 100644 --- a/http/src/test/kotlin/io/wavebeans/http/WbHttpServiceIntegrationSpec.kt +++ b/http/src/test/kotlin/io/wavebeans/http/WbHttpServiceIntegrationSpec.kt @@ -38,10 +38,13 @@ object WbHttpServiceIntegrationSpec : Spek({ OkHttp(c) } + val tableRegistry = TableRegistry.default + describe("Table service") { val port = findFreePort() - val httpService = WbHttpService(serverPort = port, gracePeriodMillis = 100) + val httpService = WbHttpService(serverPort = port, gracePeriodMillis = 100, tableRegistry = tableRegistry) + .addHandler(JavalinServletHandler.newInstance(tableRegistry)) beforeGroup { httpService.start() } afterGroup { httpService.close() } @@ -98,7 +101,8 @@ object WbHttpServiceIntegrationSpec : Spek({ describe("Audio service") { val port = findFreePort() - val httpService = WbHttpService(serverPort = port, gracePeriodMillis = 100) + val httpService = WbHttpService(serverPort = port, gracePeriodMillis = 100, tableRegistry = tableRegistry) + .addHandler(JavalinServletHandler.newInstance(tableRegistry)) beforeGroup { httpService.start() } @@ -129,12 +133,14 @@ object WbHttpServiceIntegrationSpec : Spek({ val communicatorPort = findFreePort() val facilitatorPort1 = findFreePort() val facilitatorPort2 = findFreePort() + val tableRegistryInner = TableRegistryImpl() val httpService = WbHttpService( serverPort = httpPort, gracePeriodMillis = 100, communicatorPort = communicatorPort, - tableRegistry = TableRegistryImpl() //just isolated instance - ) + tableRegistry = tableRegistryInner //just isolated instance + ).addHandler(JavalinServletHandler.newInstance(tableRegistryInner)) + val tableName = "tableDistributed" diff --git a/http/src/test/kotlin/io/wavebeans/http/rtsp/RtpsServerSpec.kt b/http/src/test/kotlin/io/wavebeans/http/rtsp/RtpsServerSpec.kt new file mode 100644 index 00000000..8e4c6ea5 --- /dev/null +++ b/http/src/test/kotlin/io/wavebeans/http/rtsp/RtpsServerSpec.kt @@ -0,0 +1,102 @@ +package io.wavebeans.http.rtsp + +import assertk.all +import assertk.assertThat +import assertk.assertions.isEqualTo +import assertk.assertions.isNotNull +import assertk.assertions.isNull +import assertk.assertions.prop +import assertk.catch +import io.wavebeans.http.WbHttpService +import io.wavebeans.tests.findFreePort +import org.spekframework.spek2.Spek +import org.spekframework.spek2.lifecycle.CachingMode +import org.spekframework.spek2.lifecycle.CachingMode.SCOPE +import org.spekframework.spek2.style.specification.describe +import java.io.ByteArrayInputStream +import java.io.ByteArrayOutputStream + +class RtpsServerSpec : Spek({ + describe("Base RECORD flow") { + + val content = "1234567890abcdefghijklmnopqrstuvwxyz".toByteArray() + + val buffer by memoized(CachingMode.TEST) { + ByteArrayOutputStream() + } + + val mappingContainer by memoized(CachingMode.TEST) { + Array(1) { null } + } + + val bufferHandler = { _: Long, _: RtpMapping, buf: ByteArray -> + buffer.write(buf) + } + + val tearDownHandler = { _: Long, mapping: RtpMapping -> + mappingContainer[0] = mapping + } + + val port = findFreePort() + + val server by memoized(SCOPE) { + WbHttpService(port).addHandler(RtpsRecordControllerHandler(bufferHandler, tearDownHandler)) + } + + beforeGroup { + server.start() + } + + afterGroup { + server.close() + } + + it("should access via designated path") { + + RtspSessionClient("localhost", port, "/test").use { client -> + assertThat( + catch { client.options().get() }, + "should run OPTIONS" + ).isNull() + + val formatId = 96 + val channel = 0 + assertThat( + catch { client.announce("test track", "L8/44100/1", formatId, channel).get() }, + "should ANNOUNCE the content" + ).isNull() + + assertThat( + catch { client.setup("record", channel).get() }, + "should SETUP the session" + ).isNull() + + assertThat( + catch { client.record().get() }, + "should initiate RECORD of the session" + ).isNull() + + assertThat( + catch { client.streamData(formatId, channel, ByteArrayInputStream(content)) }, + "should stream data" + ).isNull() + + assertThat( + catch { client.tearDown().get() }, + "should tear down the session" + ).isNull() + + assertThat( + buffer.toByteArray(), + "should have written the content" + ).isEqualTo(content) + + assertThat(mappingContainer[0], "should have specified mapping").isNotNull().all { + prop("encoding") { it.encoding.toLowerCase() }.isEqualTo("l8") + prop("clockRate") { it.clockRate }.isEqualTo(44100) + prop("encodingParameters") { it.encodingParameters?.toLowerCase() }.isEqualTo("1") + } + } + } + } +}) \ No newline at end of file diff --git a/http/src/test/resources/logback-test.xml b/http/src/test/resources/logback-test.xml index 71530433..de768305 100644 --- a/http/src/test/resources/logback-test.xml +++ b/http/src/test/resources/logback-test.xml @@ -10,7 +10,7 @@ - + \ No newline at end of file