Skip to content

Commit

Permalink
try to remove some uses of ActorMaterializer (#347)
Browse files Browse the repository at this point in the history
* try to remove some uses of ActorMaterializer

* compile issue

* try to fix tests

* more changes

* Update PekkoHttp1020MigrationSpec.scala
  • Loading branch information
pjfanning committed Oct 30, 2023
1 parent 72221db commit 87f55da
Show file tree
Hide file tree
Showing 43 changed files with 28 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,12 @@

package docs.http.javadsl;

import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.http.javadsl.ConnectHttp;
import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.http.javadsl.ServerBinding;
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.http.javadsl.server.AllDirectives;
import org.apache.pekko.http.javadsl.server.PathMatcher1;
import org.apache.pekko.http.javadsl.server.Route;
import org.apache.pekko.stream.ActorMaterializer;
import org.apache.pekko.stream.javadsl.Flow;

import static org.apache.pekko.http.javadsl.common.PartialApplication.*;
import static org.apache.pekko.http.javadsl.server.PathMatchers.*;
Expand Down
3 changes: 0 additions & 3 deletions docs/src/test/java/docs/http/javadsl/Http2Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.japi.function.Function;
import org.apache.pekko.stream.ActorMaterializer;
import org.apache.pekko.stream.Materializer;

// #bindAndHandleSecure
// #bindAndHandlePlain
Expand All @@ -46,7 +44,6 @@ void testBindAndHandleAsync() {
Function<HttpRequest, CompletionStage<HttpResponse>> asyncHandler =
r -> CompletableFuture.completedFuture(HttpResponse.create());
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
HttpsConnectionContext httpsConnectionContext = null;

// #bindAndHandleSecure
Expand Down
5 changes: 0 additions & 5 deletions docs/src/test/java/docs/http/javadsl/UnmarshalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,10 @@
import static org.junit.Assert.assertEquals;
import org.apache.pekko.http.javadsl.testkit.JUnitRouteTest;

import org.apache.pekko.util.ByteString;
import org.apache.pekko.stream.ActorMaterializer;
import org.apache.pekko.stream.Materializer;
import org.junit.Test;

// #imports
import org.apache.pekko.http.javadsl.model.*;
import org.apache.pekko.http.javadsl.unmarshalling.StringUnmarshallers;
import org.apache.pekko.http.javadsl.unmarshalling.Unmarshaller;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.pekko.http.javadsl.model.ws.WebSocketUpgradeResponse;
import org.apache.pekko.http.javadsl.settings.ClientConnectionSettings;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.ActorMaterializer;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Keep;
Expand All @@ -47,7 +46,7 @@ public class WebSocketClientExampleTest {
public void testSingleWebSocketRequest() {
// #single-WebSocket-request
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
Materializer materializer = Materializer.createMaterializer(system);
Http http = Http.get(system);

// print each incoming text message
Expand Down Expand Up @@ -101,7 +100,7 @@ public void testSingleWebSocketRequest() {
public void halfClosedWebSocketClosingExample() {

final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final Materializer materializer = Materializer.createMaterializer(system);
final Http http = Http.get(system);

// #half-closed-WebSocket-closing
Expand All @@ -119,7 +118,7 @@ public void halfClosedWebSocketClosingExample() {

public void halfClosedWebSocketWorkingExample() {
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final Materializer materializer = Materializer.createMaterializer(system);
final Http http = Http.get(system);

// #half-closed-WebSocket-working
Expand All @@ -141,7 +140,7 @@ public void halfClosedWebSocketWorkingExample() {

public void halfClosedWebSocketFiniteWorkingExample() {
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final Materializer materializer = Materializer.createMaterializer(system);
final Http http = Http.get(system);

// #half-closed-WebSocket-finite
Expand Down Expand Up @@ -184,7 +183,7 @@ public void testAuthorizedSingleWebSocketRequest() {
public void testWebSocketClientFlow() {
// #WebSocket-client-flow
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
Materializer materializer = Materializer.createMaterializer(system);
Http http = Http.get(system);

// print each incoming text message
Expand Down Expand Up @@ -240,7 +239,7 @@ public void testSingleWebSocketRequestWithHttpsProxyExample() {
// #https-proxy-singleWebSocket-request-example

final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final Materializer materializer = Materializer.createMaterializer(system);

final Flow<Message, Message, NotUsed> flow =
Flow.fromSinkAndSource(
Expand Down Expand Up @@ -268,7 +267,7 @@ public void testSingleWebSocketRequestWithHttpsProxyExample() {
public void testSingleWebSocketRequestWithHttpsProxyExampleWithAuth() {

final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final Materializer materializer = Materializer.createMaterializer(system);

final Flow<Message, Message, NotUsed> flow =
Flow.fromSinkAndSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.CoordinatedShutdown;
import org.apache.pekko.http.javadsl.*;
import org.apache.pekko.http.javadsl.marshallers.jackson.Jackson;
import org.apache.pekko.http.javadsl.model.*;
import org.apache.pekko.http.javadsl.model.headers.Connection;
import org.apache.pekko.http.javadsl.server.AllDirectives;
import org.apache.pekko.http.javadsl.server.Directives;
import org.apache.pekko.http.javadsl.server.Route;
import org.apache.pekko.http.javadsl.unmarshalling.Unmarshaller;
import org.apache.pekko.japi.function.Function;
import org.apache.pekko.stream.ActorMaterializer;
import org.apache.pekko.stream.IOResult;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.FileIO;
Expand All @@ -51,7 +48,7 @@ public class HttpServerExampleDocTest {
public static void bindingExample() throws Exception {
// #binding-example
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
Materializer materializer = Materializer.createMaterializer(system);

Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind(ConnectHttp.toHost("localhost", 8080));
Expand All @@ -73,7 +70,7 @@ public static void bindingExample() throws Exception {
public static void bindingFailureExample() throws Exception {
// #binding-failure-handling
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
Materializer materializer = Materializer.createMaterializer(system);

Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind(ConnectHttp.toHost("localhost", 80));
Expand Down Expand Up @@ -101,7 +98,7 @@ public static void bindingFailureExample() throws Exception {
public static void connectionSourceFailureExample() throws Exception {
// #incoming-connections-source-failure-handling
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
Materializer materializer = Materializer.createMaterializer(system);

Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind(ConnectHttp.toHost("localhost", 8080));
Expand Down Expand Up @@ -137,7 +134,7 @@ public static void connectionSourceFailureExample() throws Exception {
public static void connectionStreamFailureExample() throws Exception {
// #connection-stream-failure-handling
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
Materializer materializer = Materializer.createMaterializer(system);

Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind(ConnectHttp.toHost("localhost", 8080));
Expand Down Expand Up @@ -186,7 +183,7 @@ public static void fullServerExample() throws Exception {
// #full-server-example
try {
// #full-server-example
final Materializer materializer = ActorMaterializer.create(system);
final Materializer materializer = Materializer.createMaterializer(system);

Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind(ConnectHttp.toHost("localhost", 8080));
Expand Down Expand Up @@ -265,7 +262,6 @@ class Bid {

final ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final Unmarshaller<HttpEntity, Bid> asBid = Jackson.unmarshaller(Bid.class);

Expand All @@ -287,7 +283,6 @@ void consumeEntityUsingRawDataBytes() {
// #consume-raw-dataBytes
final ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final Route s =
put(
Expand All @@ -302,7 +297,7 @@ void consumeEntityUsingRawDataBytes() {
final CompletionStage<IOResult> res =
bytes.runWith(
FileIO.toPath(new File("/tmp/example.out").toPath()),
materializer);
system);

return onComplete(
() -> res,
Expand Down Expand Up @@ -347,7 +342,6 @@ void discardEntityManuallyCloseConnections() {
// #discard-close-connections
final ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final Route s =
put(
Expand All @@ -364,7 +358,7 @@ void discardEntityManuallyCloseConnections() {
// right away:
bytes.runWith(
Sink.cancelled(),
materializer); // "brutally" closes the connection
system); // "brutally" closes the connection

// Closing connections, method 2 (graceful):
// consider draining connection and replying with `Connection:
Expand All @@ -381,7 +375,7 @@ void discardEntityManuallyCloseConnections() {
public static void gracefulTerminationExample() throws Exception {
// #graceful-termination
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
Materializer materializer = Materializer.createMaterializer(system);

CompletionStage<ServerBinding> binding =
Http.get(system)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.apache.pekko.http.javadsl.Http;
import static org.apache.pekko.http.javadsl.server.Directives.*;
import org.apache.pekko.http.javadsl.server.Route;
import org.apache.pekko.stream.ActorMaterializer;
import org.apache.pekko.stream.Materializer;

@SuppressWarnings("deprecation")
Expand All @@ -29,7 +28,7 @@ public static void main(String[] args) {
// only worked with classic actor system
org.apache.pekko.actor.ActorSystem system =
org.apache.pekko.actor.ActorSystem.create("TheSystem");
Materializer mat = ActorMaterializer.create(system);
Materializer mat = Materializer.createMaterializer(system);
Route route = get(() -> complete("Hello World!"));
Http.get(system)
.bindAndHandle(route.flow(system), ConnectHttp.toHost("localhost", 8080), mat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.pekko.japi.JavaPartialFunction;
import org.apache.pekko.japi.function.Function;

import org.apache.pekko.stream.ActorMaterializer;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.stream.ActorMaterializer;
import org.apache.pekko.stream.ActorMaterializerSettings;
import org.apache.pekko.stream.javadsl.FileIO;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.util.ByteString;
Expand All @@ -44,7 +43,6 @@
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.duration.FiniteDuration;

import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -58,13 +56,6 @@
import java.util.function.Supplier;
import java.util.stream.StreamSupport;

import static org.apache.pekko.http.javadsl.server.Directives.complete;
import static org.apache.pekko.http.javadsl.server.Directives.get;
import static org.apache.pekko.http.javadsl.server.Directives.onSuccess;
import static org.apache.pekko.http.javadsl.server.Directives.path;
import static org.apache.pekko.http.javadsl.server.Directives.pathPrefix;
import static org.apache.pekko.http.javadsl.server.Directives.post;

// #extract
import static org.apache.pekko.http.javadsl.server.Directives.extract;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package docs.http.scaladsl.server
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.server.Route
import org.apache.pekko.stream.{ ActorMaterializer, Materializer }
import scala.annotation.nowarn

@nowarn("msg=is deprecated")
Expand All @@ -27,7 +26,6 @@ class PekkoHttp1020MigrationSpec {
// #old-binding
// only worked with classic actor system
implicit val system = org.apache.pekko.actor.ActorSystem("TheSystem")
implicit val mat: Materializer = ActorMaterializer()
val route: Route =
get {
complete("Hello world")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import pekko.http.impl.util.enhanceString_
import pekko.http.scaladsl.model.HttpRequest
import pekko.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings }
import pekko.http.scaladsl.{ ClientTransport, Http }
import pekko.stream.ActorMaterializer
import pekko.stream.scaladsl.Flow
import pekko.util.ByteString
import com.typesafe.config.ConfigFactory
Expand All @@ -43,7 +42,6 @@ class ConnectionPoolBenchmark extends CommonBenchmark {
var maxConnections: String = _

implicit var system: ActorSystem = _
implicit var mat: ActorMaterializer = _
implicit def ec: ExecutionContext = system.dispatcher

private var poolSettings: ConnectionPoolSettings = _
Expand Down Expand Up @@ -77,7 +75,6 @@ class ConnectionPoolBenchmark extends CommonBenchmark {
""")
.withFallback(ConfigFactory.load())
system = ActorSystem("PekkoHttpBenchmarkSystem", config)
mat = ActorMaterializer()

val responseBytes = ByteString(
"""HTTP/1.1 200 OK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import pekko.dispatch.ExecutionContexts
import pekko.http.CommonBenchmark
import pekko.http.scaladsl.model.{ ContentTypes, HttpEntity }
import pekko.stream.scaladsl.Source
import pekko.stream.{ ActorMaterializer, Materializer }
import pekko.util.ByteString
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations.{ Benchmark, Param, Setup, TearDown }
Expand All @@ -31,14 +30,13 @@ class HttpEntityBenchmark extends CommonBenchmark {
var entityType: String = _

implicit var system: ActorSystem = _
implicit var mat: Materializer = _

var entity: HttpEntity = _

@Benchmark
def discardBytes(): Unit = {
val latch = new CountDownLatch(1)
entity.discardBytes(mat)
entity.discardBytes(system)
.future
.onComplete(_ => latch.countDown())(ExecutionContexts.parasitic)
latch.await()
Expand All @@ -54,7 +52,6 @@ class HttpEntityBenchmark extends CommonBenchmark {
""")
.withFallback(ConfigFactory.load())
system = ActorSystem("PekkoHttpBenchmarkSystem", config)
mat = ActorMaterializer()

entity = entityType match {
case "strict" =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model.HttpRequest
import pekko.http.scaladsl.model.HttpResponse
import pekko.http.scaladsl.settings.ServerSettings
import pekko.stream.ActorMaterializer
import pekko.stream.scaladsl.Flow
import pekko.stream.scaladsl.Source
import pekko.stream.scaladsl.TLSPlacebo
Expand All @@ -37,7 +36,6 @@ class ServerProcessingBenchmark extends CommonBenchmark {

var httpFlow: Flow[ByteString, ByteString, Any] = _
implicit var system: ActorSystem = _
implicit var mat: ActorMaterializer = _

@Benchmark
@OperationsPerInvocation(10000)
Expand All @@ -62,7 +60,7 @@ class ServerProcessingBenchmark extends CommonBenchmark {
""")
.withFallback(ConfigFactory.load())
system = ActorSystem("PekkoHttpBenchmarkSystem", config)
mat = ActorMaterializer()

httpFlow =
Flow[HttpRequest].map(_ => response).join(
HttpServerBluePrint(ServerSettings(system), NoLogging, false, Http().dateHeaderRendering).atop(
Expand Down
Loading

0 comments on commit 87f55da

Please sign in to comment.