From effa81f56046e2356881e3bfeac75c983aef5ac2 Mon Sep 17 00:00:00 2001 From: Wai Sing Yiu Date: Thu, 18 Apr 2024 13:01:26 +0100 Subject: [PATCH] Add configuration to set max connection pool size for HttpClient --- .../scala/com/gu/notifications/worker/AndroidSender.scala | 5 +++-- .../scala/com/gu/notifications/worker/Configuration.scala | 6 ++++-- .../gu/notifications/worker/delivery/fcm/FcmClient.scala | 6 ++++-- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/AndroidSender.scala b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/AndroidSender.scala index 54d998948..7fc1a5f31 100644 --- a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/AndroidSender.scala +++ b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/AndroidSender.scala @@ -35,19 +35,20 @@ class AndroidSender(val config: FcmWorkerConfiguration, val firebaseAppName: Opt logger.info(s"Using thread pool size: ${config.threadPoolSize}") logger.info(s"Topics for individual send: ${config.allowedTopicsForIndividualSend.mkString(",")}") logger.info(s"Concurrency for individual send: ${config.concurrencyForIndividualSend}") + logger.info(s"Connection pool size for each HttpClient: ${config.connectionPoolSizeForHttpClient}") override implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec) override implicit val timer: Timer[IO] = IO.timer(ec) val fcmFirebase: Try[FcmFirebase] = FcmFirebase(config.fcmConfig, firebaseAppName) - val fcmClients: Seq[Try[FcmClient]] = Seq.fill(20)(fcmFirebase).map(firebase => firebase.map(FcmClient(_))) + val fcmClients: Seq[Try[FcmClient]] = Seq.fill(20)(fcmFirebase).map(firebase => firebase.map(FcmClient(_, config.connectionPoolSizeForHttpClient))) val deliveryServiceStream: Stream[IO, Fcm[IO]] = Stream.emits(fcmClients).covary[IO].flatMap(_.fold(e => Stream.raiseError[IO](e), c => Stream.eval[IO, Fcm[IO]]( IO.delay(new Fcm(c))))) override val deliveryService: IO[Fcm[IO]] = - fcmFirebase.map(FcmClient(_)).fold(e => IO.raiseError(e), c => IO.delay(new Fcm(c))) + fcmFirebase.map(FcmClient(_, config.connectionPoolSizeForHttpClient)).fold(e => IO.raiseError(e), c => IO.delay(new Fcm(c))) override val maxConcurrency = config.concurrencyForIndividualSend override val batchConcurrency = 100 diff --git a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/Configuration.scala b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/Configuration.scala index 8c8bc8385..6d61d8129 100644 --- a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/Configuration.scala +++ b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/Configuration.scala @@ -36,7 +36,8 @@ case class FcmWorkerConfiguration( fcmConfig: FcmConfig, threadPoolSize: Int, allowedTopicsForIndividualSend: List[String], - concurrencyForIndividualSend: Int + concurrencyForIndividualSend: Int, + connectionPoolSizeForHttpClient: Int ) extends WorkerConfiguration { def isIndividualSend(topics: List[String]): Boolean = topics.forall(topic => allowedTopicsForIndividualSend.exists(topic.startsWith(_))) @@ -137,7 +138,8 @@ object Configuration { ), config.getInt("fcm.threadPoolSize"), getStringList("fcm.allowedTopicsForIndividualSend"), - getOptionalInt("fcm.concurrencyForIndividualSend", 100) + getOptionalInt("fcm.concurrencyForIndividualSend", 100), + getOptionalInt("fcm.connectionPoolSizeForHttpClient", 0) ) } diff --git a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/delivery/fcm/FcmClient.scala b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/delivery/fcm/FcmClient.scala index 94121ddea..a00fc953c 100644 --- a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/delivery/fcm/FcmClient.scala +++ b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/delivery/fcm/FcmClient.scala @@ -198,8 +198,10 @@ object FcmFirebase { } object FcmClient { - def apply(firebase: FcmFirebase): FcmClient = - new FcmClient(firebase.firebaseMessaging, firebase.firebaseApp, firebase.config, firebase.projectId, firebase.credential, firebase.jsonFactory) + def apply(firebase: FcmFirebase, connectionPoolSize: Int): FcmClient = { + System.setProperty("jdk.httpclient.connectionPoolSize", s"${connectionPoolSize}") + new FcmClient(firebase.firebaseMessaging, firebase.firebaseApp, firebase.config, firebase.projectId, firebase.credential, firebase.jsonFactory) + } } object FirebaseHelpers {