Skip to content

Commit

Permalink
Change port
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed May 22, 2024
1 parent 4be426b commit 3ea0a89
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 42 deletions.
6 changes: 3 additions & 3 deletions src/main/scala/alpakka/file/uploader/DirectoryWatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class DirectoryWatcher(uploadDir: Path, processedDir: Path) {
throw new IllegalArgumentException(errorMessage)
}

val uploadSourceQueue: SourceQueueWithComplete[Path] = Source
private val uploadSourceQueue: SourceQueueWithComplete[Path] = Source
.queue[Path](bufferSize = 1000, OverflowStrategy.backpressure, maxConcurrentOffers = 1000)
.mapAsync(1)(path => uploadAndMove(path))
.to(Sink.ignore)
Expand Down Expand Up @@ -89,7 +89,7 @@ class DirectoryWatcher(uploadDir: Path, processedDir: Path) {
observer.addListener(listener)
monitor.addObserver(observer)
monitor.start()
uploadDirPath
monitor
}

private def addToUploadQueue(path: Path) = {
Expand Down Expand Up @@ -120,7 +120,7 @@ class DirectoryWatcher(uploadDir: Path, processedDir: Path) {
}

def stop(): Future[Terminated] = {
logger.info("About to shutdown DirectoryWatcher...")
logger.info("About to shutdown DirectoryWatcher/Uploader...")
uploader.stop()
}
}
Expand Down
18 changes: 8 additions & 10 deletions src/main/scala/alpakka/file/uploader/Uploader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ import scala.concurrent.{Await, ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}

/**
* Upload file, eg from file system
* HTTP file upload to the embedded mock server
* Is used by [[DirectoryWatcher]]
*
* Also starts a mock server to handle the uploaded files
*/
class Uploader(system: ActorSystem) {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
Expand All @@ -30,7 +29,7 @@ class Uploader(system: ActorSystem) {

var serverBinding: Future[Http.ServerBinding] = _

val (protocol, address, port) = ("http", "localhost", 6000)
val (protocol, address, port) = ("http", "localhost", 6010)

server(address, port)

Expand All @@ -43,7 +42,7 @@ class Uploader(system: ActorSystem) {

storeUploadedFile("uploadedFile", tempDestination) {
case (metadataFromClient: FileInfo, uploadedFile: File) =>
logger.info(s"Server stored uploaded tmp file with name: ${uploadedFile.getName} (Metadata from client: $metadataFromClient)")
logger.info(s"Mock server stored uploaded tmp file with name: ${uploadedFile.getName} (Metadata from client: $metadataFromClient)")
complete(Future(uploadedFile.getName))
}
}
Expand All @@ -52,18 +51,18 @@ class Uploader(system: ActorSystem) {
val bindingFuture = Http().newServerAt(address, port).bindFlow(routes)
bindingFuture.onComplete {
case Success(b) =>
logger.info("Server started, listening on: " + b.localAddress)
logger.info("Mock server started, listening on: {}", b.localAddress)
serverBinding = bindingFuture
case Failure(e) =>
logger.info(s"Server could not bind to $address:$port. Exception message: ${e.getMessage}")
logger.info(s"Mock server could not bind to: $address:$port. Exception message: ${e.getMessage}")
system.terminate()
}

sys.addShutdownHook {
logger.info("About to shutdown...")
val fut = bindingFuture.map(serverBinding => serverBinding.terminate(hardDeadline = 3.seconds))
val fut = bindingFuture.map(serverBinding => serverBinding.terminate(hardDeadline = 2.seconds))
logger.info("Waiting for connections to terminate...")
val onceAllConnectionsTerminated = Await.result(fut, 10.seconds)
val onceAllConnectionsTerminated = Await.result(fut, 3.seconds)
logger.info("Connections terminated")
onceAllConnectionsTerminated.flatMap { _ => system.terminate()
}
Expand Down Expand Up @@ -129,8 +128,7 @@ class Uploader(system: ActorSystem) {
logger.info("Waiting for connections to terminate...")
val onceAllConnectionsTerminated = Await.result(fut, 10.seconds)
logger.info("Connections terminated")
onceAllConnectionsTerminated.flatMap { _ => system.terminate()
}
onceAllConnectionsTerminated.flatMap(_ => system.terminate())
}

}
Expand Down
53 changes: 24 additions & 29 deletions src/test/scala/alpakka/file/DirectoryWatcherSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,42 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEachTestData, TestData}
import org.slf4j.{Logger, LoggerFactory}

import java.nio.file.{Files, Path, Paths}
import scala.concurrent.Await
import scala.concurrent.duration.*
import scala.util.Random

/**
* Designed as IT test on purpose to demonstrate
* the realistic usage of [[DirectoryWatcher]], hence we:
* - copy files to the file system before each test
* - clean up after each test
* - have a shared listener instance
* - create dir structure and copy files before each test
* - clean up files after each test
* - use a shared watcher instance
* - waitForCondition with Thread.sleep()
*/
final class DirectoryWatcherSpec extends AsyncWordSpec with Matchers with BeforeAndAfterAll with BeforeAndAfterEachTestData {
val logger: Logger = LoggerFactory.getLogger(this.getClass)

var listener: DirectoryWatcher = _
var watcher: DirectoryWatcher = _
var tmpRootDir: Path = _
var parentDir: Path = _
var uploadDir: Path = _
var processedDir: Path = _

"DirectoryWatcher" should {
"detect_files_on_startup" in {
listener = DirectoryWatcher(parentDir, processedDir)
waitForCondition(3.seconds)(listener.countFilesProcessed() == 2) shouldBe true
watcher = DirectoryWatcher(uploadDir, processedDir)
waitForCondition(3.seconds)(watcher.countFilesProcessed() == 2) shouldBe true
}

"detect_added_files_at_runtime_in_parent" in {
copyTestFileToDir(parentDir)
listener = DirectoryWatcher(parentDir, processedDir)
waitForCondition(3.seconds)(listener.countFilesProcessed() == 2 + 1) shouldBe true
copyTestFileToDir(uploadDir)
watcher = DirectoryWatcher(uploadDir, processedDir)
waitForCondition(3.seconds)(watcher.countFilesProcessed() == 2 + 1) shouldBe true
}

"detect_added_files_at_runtime_in_subdir" in {
copyTestFileToDir(parentDir.resolve("subdir"))
listener = DirectoryWatcher(parentDir, processedDir)
waitForCondition(3.seconds)(listener.countFilesProcessed() == 2 + 1) shouldBe true
copyTestFileToDir(uploadDir.resolve("subdir"))
watcher = DirectoryWatcher(uploadDir, processedDir)
waitForCondition(3.seconds)(watcher.countFilesProcessed() == 2 + 1) shouldBe true
}

"detect_added_nested_subdir_at_runtime_with_files_in_subdir" in {
Expand All @@ -52,19 +54,19 @@ final class DirectoryWatcherSpec extends AsyncWordSpec with Matchers with Before
Files.copy(sourcePath, targetPath)
Files.copy(sourcePath, targetPath2)

val targetDir = Files.createDirectories(parentDir.resolve("subdir").resolve("nestedDirWithFiles"))
val targetDir = Files.createDirectories(uploadDir.resolve("subdir").resolve("nestedDirWithFiles"))
FileUtils.copyDirectory(tmpDir.toFile, targetDir.toFile)

listener = DirectoryWatcher(parentDir, processedDir)
waitForCondition(3.seconds)(listener.countFilesProcessed() == 2 + 2) shouldBe true
watcher = DirectoryWatcher(uploadDir, processedDir)
waitForCondition(3.seconds)(watcher.countFilesProcessed() == 2 + 2) shouldBe true
}

"handle invalid parent directory path" in {
val invalidParentDir = Paths.get("/path/to/non-existent/directory")
val processedDir = Files.createTempDirectory("processed")

the[IllegalArgumentException] thrownBy {
listener = DirectoryWatcher(invalidParentDir, processedDir)
watcher = DirectoryWatcher(invalidParentDir, processedDir)
} should have message s"Invalid upload directory path: $invalidParentDir"
}
}
Expand All @@ -75,10 +77,10 @@ final class DirectoryWatcherSpec extends AsyncWordSpec with Matchers with Before
tmpRootDir = Files.createTempDirectory(testData.text)
logger.info(s"Created tmp dir: $tmpRootDir")

parentDir = tmpRootDir.resolve("upload")
uploadDir = tmpRootDir.resolve("upload")
processedDir = tmpRootDir.resolve("processed")
Files.createDirectories(parentDir)
Files.createDirectories(parentDir.resolve("subdir"))
Files.createDirectories(uploadDir)
Files.createDirectories(uploadDir.resolve("subdir"))
Files.createDirectories(processedDir)

// Populate dirs BEFORE startup
Expand All @@ -88,7 +90,7 @@ final class DirectoryWatcherSpec extends AsyncWordSpec with Matchers with Before

override protected def afterEach(testData: TestData): Unit = {
logger.info(s"Cleaning up after test: ${testData.name}")
listener.stop()
Await.result(watcher.stop(), 5.seconds)
FileUtils.deleteDirectory(tmpRootDir.toFile)
}

Expand All @@ -104,14 +106,7 @@ final class DirectoryWatcherSpec extends AsyncWordSpec with Matchers with Before
}

private def waitForCondition(maxDuration: FiniteDuration)(condition: => Boolean): Boolean = {
val startTime = System.currentTimeMillis()
var elapsed = 0.millis

while (!condition && elapsed < maxDuration) {
Thread.sleep(100)
elapsed = (System.currentTimeMillis() - startTime).millis
}

Thread.sleep(maxDuration.toMillis)
condition
}
}

0 comments on commit 3ea0a89

Please sign in to comment.