Skip to content

Commit

Permalink
Sanitize with -Xsource:3 continued
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed May 21, 2024
1 parent bd54eb1 commit 918d92d
Show file tree
Hide file tree
Showing 98 changed files with 360 additions and 377 deletions.
8 changes: 3 additions & 5 deletions src/main/java/actor/DemoMessagesActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ public static void main(String[] args) {
.addTask(
CoordinatedShutdown.PhaseBeforeServiceUnbind(),
"stop",
() -> {
return akka.pattern.Patterns.ask(demoActor, new Stop(), Duration.ofSeconds(5))
.thenApply(reply -> Done.getInstance());
});
() -> Patterns.ask(demoActor, new Stop(), Duration.ofSeconds(5))
.thenApply(reply -> Done.getInstance()));

//Tell: Fire and forget
demoActor.tell(new GreetingTell("Hi tell"), ActorRef.noSender());
Expand Down Expand Up @@ -70,7 +68,7 @@ public Stop() {
/**
* Create Props for an actor of this type.
*
* @param initValue The inital value for the counterTell is passed to this actor’s constructor.
* @param initValue The initial value for the counterTell is passed to this actor’s constructor.
* @return a Props for creating this actor, which can then be further configured
* (e.g. calling `.withDispatcher()` on it)
*/
Expand Down
1 change: 0 additions & 1 deletion src/main/java/actor/HelloWorldMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ public static void main(String[] args) throws Exception {

system.tell(new HelloWorldMain.Start("World"));
system.tell(new HelloWorldMain.Start("Akka"));
//TODO Add ask messages

Thread.sleep(3000);
system.terminate();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/util/ConnectionStatusChecker.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private boolean testHttp(String endpointURL, SSLContext sslContext) {
try {
HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
httpClient.execute(new HttpOptions(endpointURL));
LOGGER.info("...successfully connected via HTTP OPTONS request to: {}", endpointURL);
LOGGER.info("...successfully connected via HTTP OPTIONS request to: {}", endpointURL);
return true;
} catch (Exception e) {
LOGGER.warn("...unable to connect to: {}. Reason: ", endpointURL, e);
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/akkahttp/HttpFileEcho.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ object HttpFileEcho extends App with JsonProtocol {
}
}

def roundtripClient(id: Int, address: String, port: Int) = {
def roundtripClient(id: Int, address: String, port: Int): Unit = {
val fileHandle = uploadClient(id, address, port)
fileHandle.onComplete {
case Success(each) => downloadClient(id, each, address, port)
Expand Down Expand Up @@ -188,7 +188,7 @@ object HttpFileEcho extends App with JsonProtocol {

def upload(file: File): Future[FileHandle] = {

def delayRequestSoTheServerIsNotHammered() = {
def delayRequestSoTheServerIsNotHammered(): Unit = {
val (start, end) = (1000, 5000)
val rnd = new scala.util.Random
val sleepTime = start + rnd.nextInt((end - start) + 1)
Expand Down Expand Up @@ -239,7 +239,7 @@ object HttpFileEcho extends App with JsonProtocol {
.runWith(FileIO.toPath(Paths.get(localFile.getAbsolutePath)))
}

def download(remoteFileHandle: FileHandle, localFile: File) = {
def download(remoteFileHandle: FileHandle, localFile: File): Unit = {

val result = for {
reqEntity <- Marshal(remoteFileHandle).to[RequestEntity]
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/akkahttp/ReverseProxy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import org.slf4j.{Logger, LoggerFactory}
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.parallel.CollectionConverters.ImmutableIterableIsParallelizable
import scala.concurrent.duration.DurationInt
import scala.concurrent.*
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}

/**
Expand Down Expand Up @@ -94,7 +94,7 @@ object ReverseProxy extends App {
clients.par.foreach(clientID => httpClient(clientID, proxyHost, proxyPort, mode, requestsPerClient))

def httpClient(clientId: Int, proxyHost: String, proxyPort: Int, targetHost: Mode, nbrOfRequests: Int) = {
def logResponse(response: HttpResponse) = {
def logResponse(response: HttpResponse): Unit = {
val id = response.getHeader("X-Correlation-ID").orElse(RawHeader("X-Correlation-ID", "N/A")).value()
val msg = response.entity.dataBytes.runReduce(_ ++ _).map(data => data.utf8String)
msg.onComplete(msg => logger.info(s"Client: $clientId got response: ${response.status.intValue()} for id: $id and msg: ${msg.getOrElse("N/A")}"))
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/akkahttp/WebsocketChatEcho.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object WebsocketChatEcho extends App with ClientCommon {
.map(addr => logger.info(s"Server bound to: $addr"))
}

private def clientWebSocketClientFlow(clientName: String, address: String, port: Int) = {
private def clientWebSocketClientFlow(clientName: String, address: String, port: Int): Unit = {

val webSocketNonReusableFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] = Http().webSocketClientFlow(WebSocketRequest(s"ws://$address:$port/echochat"))

Expand Down
14 changes: 7 additions & 7 deletions src/main/scala/akkahttp/WebsocketEcho.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ trait ClientCommon {
//see https://github.com/akka/akka-http/issues/65
case TextMessage.Strict(text) => logger.info(s"Client received TextMessage.Strict: $text")
case TextMessage.Streamed(textStream) => textStream.runFold("")(_ + _).onComplete(value => logger.info(s"Client received TextMessage.Streamed: ${value.get}"))
case BinaryMessage.Strict(binary) => //do nothing
case BinaryMessage.Strict(_) => // binary, do nothing
case BinaryMessage.Streamed(binaryStream) => binaryStream.runWith(Sink.ignore)
}

// see https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html?language=scala#half-closed-websockets
def namedSource(clientname: String) = {
def namedSource(clientname: String): Source[Message, Promise[Option[Message]]] = {
Source
.tick(1.second, 1.second, "tick")
.zipWithIndex
Expand All @@ -50,7 +50,7 @@ trait ClientCommon {
.concatMat(Source.maybe[Message])(Keep.right)
}

def browserClient() = {
def browserClient(): AnyVal = {
val os = System.getProperty("os.name").toLowerCase
if (os == "mac os x") Process("open src/main/resources/WebsocketEcho.html").!
else if (os == "windows 10") Seq("cmd", "/c", "start src/main/resources/WebsocketEcho.html").!
Expand Down Expand Up @@ -175,7 +175,7 @@ object WebsocketEcho extends App with WebSocketDirectives with ClientCommon {
}
}

def singleWebSocketRequestClient(id: Int, address: String, port: Int) = {
def singleWebSocketRequestClient(id: Int, address: String, port: Int): Unit = {

val webSocketNonReusableFlow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
Expand All @@ -191,7 +191,7 @@ object WebsocketEcho extends App with WebSocketDirectives with ClientCommon {
completionPromise.future.onComplete(closed => logger.info(s"Client: $id singleWebSocketRequestClient closed: $closed"))
}

def webSocketClientFlowClient(id: Int, address: String, port: Int) = {
def webSocketClientFlowClient(id: Int, address: String, port: Int): Unit = {

val webSocketNonReusableFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] = Http().webSocketClientFlow(WebSocketRequest(s"ws://$address:$port/echo"))

Expand All @@ -207,7 +207,7 @@ object WebsocketEcho extends App with WebSocketDirectives with ClientCommon {
closed.onComplete(closed => logger.info(s"Client: $id webSocketClientFlowClient closed: $closed"))
}

def singleWebSocketRequestSourceQueueClient(id: Int, address: String, port: Int) = {
def singleWebSocketRequestSourceQueueClient(id: Int, address: String, port: Int): Unit = {

val (source, sourceQueue) = {
val p = Promise[SourceQueue[Message]]()
Expand Down Expand Up @@ -248,7 +248,7 @@ object WebsocketEcho extends App with WebSocketDirectives with ClientCommon {
sourceQueueWithComplete.complete()
}

def actorClient(id: Int, address: String, port: Int) = {
def actorClient(id: Int, address: String, port: Int): Unit = {

val sourceBackpressure = Source.actorRefWithBackpressure[TextMessage](
ackMessage = "ack",
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/akkahttp/WebsocketEchoActors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object WebsocketEchoActors extends App with ClientCommon {
case class CloseConnection(uuid: UUID)
}

def server(address: String, port: Int) = {
def server(address: String, port: Int): Unit = {

val chatRef = system.actorOf(Props[ChatRef]())

Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/akkahttp/oidc/CORSHandler.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package akkahttp.oidc

import org.apache.pekko.http.scaladsl.model.HttpMethods._
import org.apache.pekko.http.scaladsl.model.headers._
import org.apache.pekko.http.scaladsl.model.HttpMethods.*
import org.apache.pekko.http.scaladsl.model.headers.*
import org.apache.pekko.http.scaladsl.model.{HttpResponse, StatusCodes}
import org.apache.pekko.http.scaladsl.server.Directives.{complete, options, respondWithHeaders, _}
import org.apache.pekko.http.scaladsl.server.Directives.{complete, options, respondWithHeaders, *}
import org.apache.pekko.http.scaladsl.server.{Directive0, Route}

trait CORSHandler {
Expand Down
27 changes: 9 additions & 18 deletions src/main/scala/akkahttp/oidc/OIDCKeycloak.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ import io.circe.parser.decode
import io.circe.syntax.*
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.model.headers.{HttpChallenge, OAuth2BearerToken}
import org.apache.pekko.http.scaladsl.model.*
import org.apache.pekko.http.scaladsl.model.headers.{HttpChallenge, OAuth2BearerToken}
import org.apache.pekko.http.scaladsl.server.Directives.*
import org.apache.pekko.http.scaladsl.server.{AuthenticationFailedRejection, Directive1, RejectionHandler, Route}
import org.apache.pekko.http.scaladsl.server.{AuthenticationFailedRejection, Directive1, Route}
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshal
import org.apache.pekko.util.Timeout
import org.keycloak.TokenVerifier
import org.keycloak.adapters.KeycloakDeploymentBuilder
import org.keycloak.admin.client.{CreatedResponseUtil, Keycloak, KeycloakBuilder}
Expand All @@ -27,7 +26,6 @@ import java.security.{KeyFactory, PublicKey}
import java.time.Duration
import java.util
import java.util.{Base64, Collections}
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.sys.process.{Process, stringSeqToProcess}
Expand Down Expand Up @@ -77,17 +75,17 @@ object OIDCKeycloak extends App with CORSHandler with JsonSupport {

def initAdminClient() = {
val keycloakAdminClient = KeycloakBuilder.builder()
.serverUrl(keycloak.getAuthServerUrl())
.serverUrl(keycloak.getAuthServerUrl)
.realm("master")
.clientId(adminClientId)
.username(keycloak.getAdminUsername())
.password(keycloak.getAdminPassword())
.username(keycloak.getAdminUsername)
.password(keycloak.getAdminPassword)
.build()
logger.info("Connected to Keycloak server version: " + keycloakAdminClient.serverInfo().getInfo().getSystemInfo().getVersion())
logger.info("Connected to Keycloak server version: " + keycloakAdminClient.serverInfo().getInfo.getSystemInfo.getVersion)
keycloakAdminClient
}

def createTestUser(keycloakAdminClient: Keycloak) = {
def createTestUser(keycloakAdminClient: Keycloak): Unit = {
val username = "test"
val password = "test"
val usersResource = keycloakAdminClient.realm("test").users()
Expand Down Expand Up @@ -118,7 +116,7 @@ object OIDCKeycloak extends App with CORSHandler with JsonSupport {
logger.info(s"User $username/$password may sign in via: http://localhost:${keycloak.getHttpPort}/realms/test/account")
}

def createClientConfig(keycloakAdminClient: Keycloak) = {
def createClientConfig(keycloakAdminClient: Keycloak): Unit = {
val clientId = "my-test-client"
val clientRepresentation = new ClientRepresentation()
clientRepresentation.setClientId(clientId)
Expand All @@ -144,14 +142,7 @@ object OIDCKeycloak extends App with CORSHandler with JsonSupport {
keycloakAdminClient
}

def runBackendServer(keycloak: KeycloakContainer) = {

implicit def rejectionHandler = RejectionHandler.newBuilder().handle {
case AuthenticationFailedRejection(reason, _) => complete(StatusCodes.Unauthorized, reason.toString)
}.result().mapRejectionResponse(addCORSHeaders)

implicit val timeout: Timeout = Timeout(5.seconds)

def runBackendServer(keycloak: KeycloakContainer): Unit = {
val config = new AdapterConfig()
config.setAuthServerUrl(keycloak.getAuthServerUrl)
config.setRealm("test")
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/alpakka/amqp/AmqpEcho.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object AmqpEcho extends App {
* @param id
* @param rabbitMQContainer
*/
def pubSubClient(id: Int, rabbitMQContainer: RabbitMQContainer) = {
def pubSubClient(id: Int, rabbitMQContainer: RabbitMQContainer): Unit = {
val connectionProvider =
AmqpCachedConnectionProvider(
AmqpDetailsConnectionProvider(
Expand Down Expand Up @@ -93,7 +93,7 @@ object AmqpEcho extends App {
* @param id
* @param rabbitMQContainer
*/
def rpcScenario(id: Int, rabbitMQContainer: RabbitMQContainer) = {
def rpcScenario(id: Int, rabbitMQContainer: RabbitMQContainer): Unit = {
val mappedPort = rabbitMQContainer.getAmqpPort
val amqpUri = s"amqp://$host:$mappedPort"
val connectionProvider = AmqpCachedConnectionProvider(AmqpUriConnectionProvider(amqpUri))
Expand Down Expand Up @@ -190,7 +190,7 @@ object AmqpEcho extends App {
writeResult
}

private def receiveFromQueueAck(id: Int, connectionProvider: AmqpCachedConnectionProvider, queueDeclaration: QueueDeclaration, noOfSentMsg: Int, queueNameFull: String) = {
private def receiveFromQueueAck(id: Int, connectionProvider: AmqpCachedConnectionProvider, queueDeclaration: QueueDeclaration, noOfSentMsg: Int, queueNameFull: String): Unit = {
logger.info(s"Starting receiveFromQueueAck: $queueNameFull...")

val amqpSource = AmqpSource.committableSource(
Expand Down Expand Up @@ -225,7 +225,7 @@ object AmqpEcho extends App {
}
}

private def sendToExchange(id: Int, connectionProvider: AmqpCachedConnectionProvider, exchangeName: String, exchangeDeclaration: ExchangeDeclaration) = {
private def sendToExchange(id: Int, connectionProvider: AmqpCachedConnectionProvider, exchangeName: String, exchangeDeclaration: ExchangeDeclaration): Unit = {
// Wait until the receiver has registered
Thread.sleep(1000)
logger.info(s"Starting sendToExchange: $exchangeName...")
Expand Down
7 changes: 4 additions & 3 deletions src/main/scala/alpakka/clickhousedb/ClickhouseDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package alpakka.clickhousedb
import com.crobox.clickhouse.ClickhouseClient
import com.crobox.clickhouse.stream.{ClickhouseSink, Insert}
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.apache.pekko.Done
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{Framing, Sink, Source}
import org.apache.pekko.util.ByteString
Expand Down Expand Up @@ -43,15 +44,15 @@ class ClickhouseDB(httpPort: Int) {
result.trim
}

def writeAll(noOfRecords: Integer) = {
def writeAll(noOfRecords: Integer): Future[Done] = {
Source(1 to noOfRecords)
.map(id => Insert("test.my_table", s"{\"myfloat_nullable\": $id, \"mystr\": $id, \"myint_id\": $id}"))
.wireTap((insert: Insert) => logger.debug(s"Insert record with type JSONEachRow: $insert"))
.runWith(ClickhouseSink.toSink(tweakedConf, client))
}

// The most intuitive way to read the streamed records
def readAllSource() = {
def readAllSource(): Future[Int] = {
val resultFut = client.source("SELECT * FROM test.my_table ORDER BY myint_id ASC FORMAT JSONEachRow SETTINGS output_format_json_named_tuples_as_objects=1;")
.wireTap((line: String) => logger.debug(s"Raw JSON record: $line"))
.runWith(Sink.seq)
Expand All @@ -60,7 +61,7 @@ class ClickhouseDB(httpPort: Int) {
}

// An alternative way to read, allows for more control, eg while massaging the result
def readAllSourceByteString() = {
def readAllSourceByteString(): Future[Int] = {
val resultFut = client.sourceByteString("SELECT * FROM test.my_table ORDER BY myint_id ASC FORMAT JSONEachRow SETTINGS output_format_json_named_tuples_as_objects=1;")
.wireTap((allLines: ByteString) => logger.debug("Raw JSON records all-in-one: \n" + allLines.utf8String))
.via(Framing.delimiter(ByteString.fromString(System.lineSeparator()), 1024))
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/alpakka/dynamodb/DynamoDBEcho.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class DynamoDBEcho(urlWithMappedPort: URI, accessKey: String, secretKey: String,

private val testTableName = "testTable"

val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))
val credentialsProvider: StaticCredentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))
implicit val client: DynamoDbAsyncClient = createAsyncClient()

def run(noOfItems: Int): Future[Int] = {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/alpakka/env/FileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ object FileServer extends App {
}
}

def randomSleeper() = {
def randomSleeper(): Unit = {
val (start, end) = (1000, 10000)
val rnd = new scala.util.Random
val sleepTime = start + rnd.nextInt((end - start) + 1)
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/alpakka/env/KafkaServerTestcontainers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ class KafkaServerTestcontainers {
val imageName = s"confluentinc/cp-kafka:$kafkaVersion"
val originalPort = 9093
var mappedPort = 1111
val kafkaContainer = new KafkaContainer(DockerImageName.parse(imageName)).
val kafkaContainer: KafkaContainer = new KafkaContainer(DockerImageName.parse(imageName)).
withExposedPorts(originalPort)

def run() = {
def run(): Unit = {
kafkaContainer.start()
mappedPort = kafkaContainer.getMappedPort(originalPort)
logger.info(s"Running Kafka: $imageName on mapped port: $mappedPort")
}

def stop() = {
def stop(): Unit = {
kafkaContainer.stop()
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/main/scala/alpakka/env/WebsocketServer.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package alpakka.env

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.actor.{ActorSystem, Terminated}
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.model.ws._
import org.apache.pekko.http.scaladsl.server.Directives._
import org.apache.pekko.http.scaladsl.model.ws.*
import org.apache.pekko.http.scaladsl.server.Directives.*
import org.apache.pekko.http.scaladsl.server.Route
import org.apache.pekko.http.scaladsl.server.directives.WebSocketDirectives
import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source}
import org.slf4j.{Logger, LoggerFactory}

import scala.concurrent.duration._
import scala.concurrent.duration.*
import scala.concurrent.{Await, Future}
import scala.language.postfixOps
import scala.util.{Failure, Success}
Expand All @@ -27,11 +27,11 @@ class WebsocketServer extends WebSocketDirectives {
val (address, port) = ("127.0.0.1", 6002)
var serverBinding: Future[Http.ServerBinding] = _

def run() = {
def run(): Unit = {
server(address, port)
}

def stop() = {
def stop(): Future[Terminated] = {
logger.info("About to shutdown...")
val fut = serverBinding.map(serverBinding => serverBinding.terminate(hardDeadline = 3.seconds))
logger.info("Waiting for connections to terminate...")
Expand All @@ -41,7 +41,7 @@ class WebsocketServer extends WebSocketDirectives {
}
}

private def server(address: String, port: Int) = {
private def server(address: String, port: Int): Unit = {

def echoFlow: Flow[Message, Message, Any] =
Flow[Message].mapConcat {
Expand Down
Loading

0 comments on commit 918d92d

Please sign in to comment.