diff --git a/Dockerfile b/Dockerfile index da856a6..a3c13f4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,19 +7,19 @@ RUN apt-get update && \ apt-get install -y --no-install-recommends \ openjdk-11-jdk-headless \ curl \ - gnupg \ - && echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | tee /etc/apt/sources.list.d/sbt.list \ - && curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | gpg --dearmor > /etc/apt/trusted.gpg.d/sbt.gpg \ - && apt-get update \ - && apt-get install -y sbt \ - && apt-get clean \ - && rm -rf /var/lib/apt/lists/* + gnupg && \ + echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | tee /etc/apt/sources.list.d/sbt.list && \ + curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | gpg --dearmor > /etc/apt/trusted.gpg.d/sbt.gpg && \ + apt-get update && \ + apt-get install -y sbt && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* # Setup SBT RUN mkdir -p project && \ echo 'sbt.version=1.8.2' > project/build.properties && \ echo 'addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.16")\naddSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")' > project/plugins.sbt && \ - echo 'name := "erlangcast"\nversion := "0.1.0"\nscalaVersion := "2.13.8"' > build.sbt + echo 'name := "scalacast"\nversion := "0.1.0"\nscalaVersion := "2.13.8"' > build.sbt COPY src/scala ./src/scala/ RUN sbt compile @@ -30,7 +30,7 @@ WORKDIR /app ENV NODE_NAME_1=node1@127.0.0.1 \ NODE_NAME_2=node2@127.0.0.1 \ - COOKIE=erlangcast_cookie \ + COOKIE=scalacast_cookie \ DEBIAN_FRONTEND=noninteractive # Install runtime dependencies @@ -38,8 +38,8 @@ RUN apt-get update && \ apt-get install -y --no-install-recommends \ openjdk-11-jre-headless \ netcat-openbsd \ - curl \ - && apt-get clean && \ + curl && \ + apt-get clean && \ rm -rf /var/lib/apt/lists/* # Setup directories diff --git a/rebar.config b/rebar.config deleted file mode 100644 index 782970f..0000000 --- a/rebar.config +++ /dev/null @@ -1,50 +0,0 @@ -{erl_opts, [ - debug_info, - {parse_transform, lager_transform} -]}. - -{deps, [ - {lager, "3.9.2"}, - {cowboy, "2.9.0"}, - {jiffy, "1.1.1"}, - {sync, "0.1.3"} -]}. - -{shell, [ - {config, "config/sys.config"}, - {apps, [ - sasl, - lager, - cowboy, - erlangcast - ]} -]}. - -{relx, [ - {release, {erlangcast, "0.1.0"}, [ - kernel, - stdlib, - sasl, - lager, - cowboy, - erlangcast - ]}, - {dev_mode, true}, - {include_erts, false}, - {extended_start_script, true} -]}. - -{profiles, [ - {prod, [ - {relx, [ - {dev_mode, false}, - {include_erts, true} - ]} - ]}, - {test, [ - {deps, [ - {meck, "0.9.2"}, - {proper, "1.4.0"} - ]} - ]} -]}. \ No newline at end of file diff --git a/scripts/monitor.sh b/scripts/monitor.sh index 9b7ce67..5e153ca 100644 --- a/scripts/monitor.sh +++ b/scripts/monitor.sh @@ -36,6 +36,8 @@ monitor_logs() { restart_services() { log_message "Restarting all services..." # Add logic to restart all services + systemctl restart erlangcast || handle_error "Failed to restart erlangcast service" + systemctl restart another_service || handle_error "Failed to restart another_service" } # Main monitoring function diff --git a/scripts/run_local.py b/scripts/run_local.py index a7c7f01..24adee3 100644 --- a/scripts/run_local.py +++ b/scripts/run_local.py @@ -5,7 +5,7 @@ def run_app(port1, port2, camera1, camera2, restart): if restart: cmd = f"sbt run & sbt run --setcookie port2 --name port2@127.0.0.1" else: - cmd = f"rebar3 shell & rebar3 shell --setcookie port2 --name port2@127.0.0.1" + cmd = f"sbt run & sbt run --setcookie port2 --name port2@127.0.0.1" subprocess.run(cmd, shell=True) if __name__ == "__main__": diff --git a/src/scala/main/scala/centralized_server.scala b/src/scala/main/scala/centralized_server.scala index 18b18d8..aea125a 100644 --- a/src/scala/main/scala/centralized_server.scala +++ b/src/scala/main/scala/centralized_server.scala @@ -13,13 +13,19 @@ object CentralizedServer { if (isRunning) { restartServer(port) } else { - serverSocket = Some(new ServerSocket(port)) - isRunning = true - println(s"Centralized server started on port $port") + try { + serverSocket = Some(new ServerSocket(port)) + isRunning = true + println(s"Centralized server started on port $port") - while (isRunning) { - val clientSocket = serverSocket.get.accept() - handleClientRequest(clientSocket) + while (isRunning) { + val clientSocket = serverSocket.get.accept() + handleClientRequest(clientSocket) + } + } catch { + case e: Exception => + println(s"Error starting server: ${e.getMessage}") + stopServer() } } } @@ -31,17 +37,23 @@ object CentralizedServer { } def handleClientRequest(clientSocket: Socket): Future[Unit] = Future { - val in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream)) - val out = new PrintWriter(clientSocket.getOutputStream, true) + try { + val in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream)) + val out = new PrintWriter(clientSocket.getOutputStream, true) - val request = in.readLine() - println(s"Received request: $request") + val request = in.readLine() + println(s"Received request: $request") - // Handle the request and send a response - val response = s"Response to: $request" - out.println(response) + // Handle the request and send a response + val response = s"Response to: $request" + out.println(response) - clientSocket.close() + clientSocket.close() + } catch { + case e: Exception => + println(s"Error handling client request: ${e.getMessage}") + clientSocket.close() + } } def restartServer(port: Int): Future[Unit] = { diff --git a/src/scala/main/scala/fault_tolerance.scala b/src/scala/main/scala/fault_tolerance.scala index 8d1c2f2..c2b9e14 100644 --- a/src/scala/main/scala/fault_tolerance.scala +++ b/src/scala/main/scala/fault_tolerance.scala @@ -24,11 +24,20 @@ object FaultTolerance { def handleError(error: String): Future[Unit] = Future { println(s"Handling error: $error") errorCount += 1 + // Add detailed error handling logic + error match { + case "Critical" => println("Critical error occurred. Taking necessary actions.") + case "Warning" => println("Warning: Please check the system.") + case _ => println("Unknown error type.") + } } def recover(error: String): Future[Unit] = Future { println(s"Recovering from error: $error") errorCount -= 1 + // Add recovery mechanisms + if (errorCount < 0) errorCount = 0 + println("System recovered successfully.") } def restart(): Future[Unit] = { diff --git a/src/scala/main/scala/peer_discovery.scala b/src/scala/main/scala/peer_discovery.scala index 916c7d1..d8112d2 100644 --- a/src/scala/main/scala/peer_discovery.scala +++ b/src/scala/main/scala/peer_discovery.scala @@ -28,9 +28,14 @@ object PeerDiscovery { } def handleMessage(message: String): Future[Unit] = Future { - println(s"Received peer discovery message: $message") - // Handle incoming peer discovery message - peers += message + try { + println(s"Received peer discovery message: $message") + // Handle incoming peer discovery message + peers += message + } catch { + case e: Exception => + println(s"Error handling peer discovery message: ${e.getMessage}") + } } def broadcastMessage(message: String): Future[Unit] = Future { diff --git a/src/scala/main/scala/reliable_transmission.scala b/src/scala/main/scala/reliable_transmission.scala index 48a5a44..5b7d6f3 100644 --- a/src/scala/main/scala/reliable_transmission.scala +++ b/src/scala/main/scala/reliable_transmission.scala @@ -11,7 +11,8 @@ object ReliableTransmission { def sendData(peer: String, data: String): Future[Unit] = { def attemptSend(retryCount: Int): Future[Unit] = { if (retryCount >= retryLimit) { - restart() + handleTransmissionError(peer, "Retry limit reached") + Future.failed(new Exception("Retry limit reached")) } else { // Simulate sending data to peer println(s"Sending data to $peer: $data") diff --git a/src/scala/main/scala/scala_connector.scala b/src/scala/main/scala/scala_connector.scala deleted file mode 100644 index 551cbe1..0000000 --- a/src/scala/main/scala/scala_connector.scala +++ /dev/null @@ -1,49 +0,0 @@ -import org.apache.pekko.actor.{Actor, ActorSystem, Props} -import org.apache.pekko.pattern.ask -import org.apache.pekko.util.Timeout -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} -import scala.util.{Failure, Success} - -object ScalaConnector { - implicit val timeout: Timeout = Timeout(5.seconds) - val system: ActorSystem = ActorSystem("ScalaErlangSystem") - val erlangActor = system.actorOf(Props[ErlangActor], "erlangActor") - - def sendToErlang(message: String): Unit = { - if (system.whenTerminated.isCompleted) { - restart() - } else { - erlangActor ! message - } - } - - def receiveFromErlang(): Future[String] = { - (erlangActor ? "receive").mapTo[String] - } - - def restart(): Future[Unit] = { - for { - _ <- system.terminate() - _ <- system.whenTerminated - newSystem = ActorSystem("ScalaErlangSystem") - newErlangActor = newSystem.actorOf(Props[ErlangActor], "erlangActor") - } yield { - system = newSystem - erlangActor = newErlangActor - } - } - - class ErlangActor extends Actor { - def receive: Receive = { - case message: String => - // Handle message from Scala to Erlang - println(s"Sending message to Erlang: $message") - // Simulate sending message to Erlang - sender() ! "ack" - case "receive" => - // Simulate receiving message from Erlang - sender() ! "Message from Erlang" - } - } -} diff --git a/tests/python/test_peer_discovery.py b/tests/python/test_peer_discovery.py index 39c74a0..d7859ba 100644 --- a/tests/python/test_peer_discovery.py +++ b/tests/python/test_peer_discovery.py @@ -1,22 +1,22 @@ import unittest -from peer_discovery import start, stop, handle_message, broadcast_message +from peer_discovery import PeerDiscovery class TestPeerDiscovery(unittest.TestCase): def test_start(self): - result = start() + result = PeerDiscovery.start() self.assertEqual(result, 'ok') def test_stop(self): - result = stop() + result = PeerDiscovery.stop() self.assertEqual(result, 'ok') def test_handle_message(self): - result = handle_message("Test message") + result = PeerDiscovery.handleMessage("Test message") self.assertEqual(result, 'ok') def test_broadcast_message(self): - result = broadcast_message("Test message") + result = PeerDiscovery.broadcastMessage("Test message") self.assertEqual(result, 'ok') if __name__ == '__main__': diff --git a/tests/python/test_reliable_transmission.py b/tests/python/test_reliable_transmission.py index 5ee47b6..e8f9e9d 100644 --- a/tests/python/test_reliable_transmission.py +++ b/tests/python/test_reliable_transmission.py @@ -1,17 +1,17 @@ import unittest -from reliable_transmission import send_data, receive_data +from src.scala.main.scala.reliable_transmission import ReliableTransmission class TestReliableTransmission(unittest.TestCase): def test_send_data(self): peer = "peer1" data = "Test data" - result = send_data(peer, data) + result = ReliableTransmission.sendData(peer, data) self.assertEqual(result, 'ok') def test_receive_data(self): data_handler = lambda data: self.assertEqual(data, "Test data") - result = receive_data(data_handler) + result = ReliableTransmission.receiveData(data_handler) self.assertEqual(result, 'ok') if __name__ == '__main__': diff --git a/tests/python/test_video_chunking.py b/tests/python/test_video_chunking.py index ec171f6..ec297d4 100644 --- a/tests/python/test_video_chunking.py +++ b/tests/python/test_video_chunking.py @@ -1,18 +1,18 @@ import unittest -from video_chunking import chunk_video, get_chunk +from src.scala.main.scala.video_chunking import VideoChunking class TestVideoChunking(unittest.TestCase): def test_chunk_video(self): video_path = "path/to/test_video.mp4" chunk_size = 1048576 - result = chunk_video(video_path, chunk_size) + result = VideoChunking.chunkVideo(video_path, chunk_size) self.assertEqual(len(result), 5) # Assuming the video is chunked into 5 parts def test_get_chunk(self): video_path = "path/to/test_video.mp4" chunk_index = 1 - result = get_chunk(video_path, chunk_index) + result = VideoChunking.getChunk(video_path, chunk_index) self.assertEqual(len(result), 1048576) # Assuming each chunk is 1MB if __name__ == '__main__': diff --git a/tests/python/test_video_streaming.py b/tests/python/test_video_streaming.py index 284bddd..46e9cb4 100644 --- a/tests/python/test_video_streaming.py +++ b/tests/python/test_video_streaming.py @@ -1,6 +1,6 @@ import unittest import os -from video_streaming import start_hls_streaming, start_dash_streaming, get_hls_playlist, get_dash_manifest +from src.scala.main.scala.video_streaming import start_hls_streaming, start_dash_streaming, get_hls_playlist, get_dash_manifest class TestVideoStreaming(unittest.TestCase): diff --git a/tests/scala/test_scala_connector.scala b/tests/scala/test_scala_connector.scala deleted file mode 100644 index f3aa35e..0000000 --- a/tests/scala/test_scala_connector.scala +++ /dev/null @@ -1,24 +0,0 @@ -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers -import scala.concurrent.Await -import scala.concurrent.duration._ - -class TestScalaConnector extends AnyFlatSpec with Matchers { - - "sendToErlang" should "send a message to Erlang" in { - ScalaConnector.sendToErlang("Test message") - // Assuming some mechanism to verify the message was sent - // This is a placeholder for actual verification logic - true should be (true) - } - - "receiveFromErlang" should "receive a message from Erlang" in { - val message = Await.result(ScalaConnector.receiveFromErlang(), 5.seconds) - message should be ("Message from Erlang") - } - - "restart" should "restart the Scala connector" in { - val result = Await.result(ScalaConnector.restart(), 5.seconds) - result shouldBe () - } -}