Skip to content

Commit

Permalink
Add configuration to set max connection pool size for HttpClient
Browse files Browse the repository at this point in the history
  • Loading branch information
waisingyiu committed Apr 18, 2024
1 parent 267035a commit effa81f
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,20 @@ class AndroidSender(val config: FcmWorkerConfiguration, val firebaseAppName: Opt
logger.info(s"Using thread pool size: ${config.threadPoolSize}")
logger.info(s"Topics for individual send: ${config.allowedTopicsForIndividualSend.mkString(",")}")
logger.info(s"Concurrency for individual send: ${config.concurrencyForIndividualSend}")
logger.info(s"Connection pool size for each HttpClient: ${config.connectionPoolSizeForHttpClient}")

override implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec)
override implicit val timer: Timer[IO] = IO.timer(ec)

val fcmFirebase: Try[FcmFirebase] = FcmFirebase(config.fcmConfig, firebaseAppName)

val fcmClients: Seq[Try[FcmClient]] = Seq.fill(20)(fcmFirebase).map(firebase => firebase.map(FcmClient(_)))
val fcmClients: Seq[Try[FcmClient]] = Seq.fill(20)(fcmFirebase).map(firebase => firebase.map(FcmClient(_, config.connectionPoolSizeForHttpClient)))

val deliveryServiceStream: Stream[IO, Fcm[IO]] =
Stream.emits(fcmClients).covary[IO].flatMap(_.fold(e => Stream.raiseError[IO](e), c => Stream.eval[IO, Fcm[IO]]( IO.delay(new Fcm(c)))))

override val deliveryService: IO[Fcm[IO]] =
fcmFirebase.map(FcmClient(_)).fold(e => IO.raiseError(e), c => IO.delay(new Fcm(c)))
fcmFirebase.map(FcmClient(_, config.connectionPoolSizeForHttpClient)).fold(e => IO.raiseError(e), c => IO.delay(new Fcm(c)))

override val maxConcurrency = config.concurrencyForIndividualSend
override val batchConcurrency = 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ case class FcmWorkerConfiguration(
fcmConfig: FcmConfig,
threadPoolSize: Int,
allowedTopicsForIndividualSend: List[String],
concurrencyForIndividualSend: Int
concurrencyForIndividualSend: Int,
connectionPoolSizeForHttpClient: Int
) extends WorkerConfiguration {
def isIndividualSend(topics: List[String]): Boolean =
topics.forall(topic => allowedTopicsForIndividualSend.exists(topic.startsWith(_)))
Expand Down Expand Up @@ -137,7 +138,8 @@ object Configuration {
),
config.getInt("fcm.threadPoolSize"),
getStringList("fcm.allowedTopicsForIndividualSend"),
getOptionalInt("fcm.concurrencyForIndividualSend", 100)
getOptionalInt("fcm.concurrencyForIndividualSend", 100),
getOptionalInt("fcm.connectionPoolSizeForHttpClient", 0)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,10 @@ object FcmFirebase {
}

object FcmClient {
def apply(firebase: FcmFirebase): FcmClient =
new FcmClient(firebase.firebaseMessaging, firebase.firebaseApp, firebase.config, firebase.projectId, firebase.credential, firebase.jsonFactory)
def apply(firebase: FcmFirebase, connectionPoolSize: Int): FcmClient = {
System.setProperty("jdk.httpclient.connectionPoolSize", s"${connectionPoolSize}")
new FcmClient(firebase.firebaseMessaging, firebase.firebaseApp, firebase.config, firebase.projectId, firebase.credential, firebase.jsonFactory)
}
}

object FirebaseHelpers {
Expand Down

0 comments on commit effa81f

Please sign in to comment.