From 9ec478b16ffb6124a55ef7340470b997616f8409 Mon Sep 17 00:00:00 2001 From: Wai Sing Yiu Date: Thu, 18 Apr 2024 12:01:07 +0100 Subject: [PATCH 1/5] Set connection pool size for HttpClient --- notificationworkerlambda/cdk/lib/senderworker.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/notificationworkerlambda/cdk/lib/senderworker.ts b/notificationworkerlambda/cdk/lib/senderworker.ts index 56b1fb7f0..dd9827781 100644 --- a/notificationworkerlambda/cdk/lib/senderworker.ts +++ b/notificationworkerlambda/cdk/lib/senderworker.ts @@ -107,7 +107,8 @@ class SenderWorker extends cdkcore.Construct { Stage: scope.stage, Stack: scope.stack, App: id, - Platform: props.platform + Platform: props.platform, + JAVA_TOOL_OPTIONS: "-Djdk.httpclient.connectionPoolSize=2" }, memorySize: 10240, description: `sends notifications for ${id}`, From 77384ce6e24f3e6bf0edfecf3cae9aae8cd591fb Mon Sep 17 00:00:00 2001 From: Wai Sing Yiu Date: Thu, 18 Apr 2024 12:13:27 +0100 Subject: [PATCH 2/5] Pre-create a list of HttpClient instances --- .../notifications/worker/AndroidSender.scala | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/AndroidSender.scala b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/AndroidSender.scala index 555387f3f..54d998948 100644 --- a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/AndroidSender.scala +++ b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/AndroidSender.scala @@ -8,7 +8,7 @@ import com.gu.notifications.worker.delivery.DeliveryException.InvalidToken import com.gu.notifications.worker.delivery.{BatchDeliverySuccess, DeliveryClient, DeliveryException} import com.gu.notifications.worker.delivery.fcm.{Fcm, FcmClient} import com.gu.notifications.worker.models.{LatencyMetrics, SendingResults} -import com.gu.notifications.worker.tokens.{BatchNotification, ChunkedTokens} +import com.gu.notifications.worker.tokens.{BatchNotification, ChunkedTokens, IndividualNotification} import com.gu.notifications.worker.utils.{Cloudwatch, CloudwatchImpl, Reporting} import fs2.{Pipe, Stream} @@ -40,18 +40,24 @@ class AndroidSender(val config: FcmWorkerConfiguration, val firebaseAppName: Opt override implicit val timer: Timer[IO] = IO.timer(ec) val fcmFirebase: Try[FcmFirebase] = FcmFirebase(config.fcmConfig, firebaseAppName) + + val fcmClients: Seq[Try[FcmClient]] = Seq.fill(20)(fcmFirebase).map(firebase => firebase.map(FcmClient(_))) + + val deliveryServiceStream: Stream[IO, Fcm[IO]] = + Stream.emits(fcmClients).covary[IO].flatMap(_.fold(e => Stream.raiseError[IO](e), c => Stream.eval[IO, Fcm[IO]]( IO.delay(new Fcm(c))))) + override val deliveryService: IO[Fcm[IO]] = - fcmFirebase.fold(e => IO.raiseError(e), c => IO.delay(new Fcm(FcmClient(c)))) + fcmFirebase.map(FcmClient(_)).fold(e => IO.raiseError(e), c => IO.delay(new Fcm(c))) override val maxConcurrency = config.concurrencyForIndividualSend override val batchConcurrency = 100 //override the deliverChunkedTokens method to validate the success of sending batch notifications to the FCM client. This implementation could be refactored in the future to make it more streamlined with APNs override def deliverChunkedTokens(chunkedTokenStream: Stream[IO, (ChunkedTokens, Long, Instant, Int)]): Stream[IO, Unit] = { - chunkedTokenStream.map { - case (chunkedTokens, sentTime, functionStartTime, sqsMessageBatchSize) => + chunkedTokenStream.parZip(deliveryServiceStream.repeat).map { + case ((chunkedTokens, sentTime, functionStartTime, sqsMessageBatchSize), deliveryServiceForInd) => if (config.isIndividualSend(chunkedTokens.notification.topic.map(_.toString()))) - deliverIndividualNotificationStream(Stream.emits(chunkedTokens.toNotificationToSends).covary[IO]) + deliverIndividualNotificationStream(Stream.emits(chunkedTokens.toNotificationToSends).covary[IO], deliveryServiceForInd) .broadcastTo( reportSuccesses(chunkedTokens, sentTime, functionStartTime, sqsMessageBatchSize), cleanupFailures, @@ -68,6 +74,12 @@ class AndroidSender(val config: FcmWorkerConfiguration, val firebaseAppName: Opt }.parJoin(batchConcurrency) } + def deliverIndividualNotificationStream(individualNotificationStream: Stream[IO, IndividualNotification], deliveryService: Fcm[IO]): Stream[IO, Either[DeliveryException, FcmClient#Success]] = for { + resp <- individualNotificationStream.map(individualNotification => deliveryService.send(individualNotification.notification, individualNotification.token)) + .parJoin(maxConcurrency) + .evalTap(Reporting.log(s"Sending failure: ")) + } yield resp + def deliverBatchNotificationStream[C <: FcmClient](batchNotificationStream: Stream[IO, BatchNotification]): Stream[IO, Either[DeliveryException, C#BatchSuccess]] = for { deliveryService <- Stream.eval(deliveryService) resp <- batchNotificationStream.map(batchNotification => deliveryService.sendBatch(batchNotification.notification, batchNotification.token)) From 267035adb76610fd5c97cd818137c17780f30e54 Mon Sep 17 00:00:00 2001 From: Wai Sing Yiu Date: Thu, 18 Apr 2024 12:14:04 +0100 Subject: [PATCH 3/5] Add log data for validation --- .../com/gu/notifications/worker/delivery/fcm/FcmClient.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/delivery/fcm/FcmClient.scala b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/delivery/fcm/FcmClient.scala index 3e94817b5..94121ddea 100644 --- a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/delivery/fcm/FcmClient.scala +++ b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/delivery/fcm/FcmClient.scala @@ -76,7 +76,7 @@ class FcmClient (firebaseMessaging: FirebaseMessaging, firebaseApp: FirebaseApp, logger.info(Map( "worker.individualRequestLatency" -> Duration.between(start, requestCompletionTime).toMillis, "notificationId" -> notificationId, - ), "Individual send request completed") + ), s"Individual send request completed - ${this.toString()}") onAPICallComplete(parseSendResponse(notificationId, token, response, requestCompletionTime)) } } From 234cebd6282da6a831d6af2d394322a86f5331ca Mon Sep 17 00:00:00 2001 From: Wai Sing Yiu Date: Thu, 18 Apr 2024 18:14:05 +0100 Subject: [PATCH 4/5] Revert changes about connection pool --- notificationworkerlambda/cdk/lib/senderworker.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/notificationworkerlambda/cdk/lib/senderworker.ts b/notificationworkerlambda/cdk/lib/senderworker.ts index dd9827781..2343d4c0c 100644 --- a/notificationworkerlambda/cdk/lib/senderworker.ts +++ b/notificationworkerlambda/cdk/lib/senderworker.ts @@ -108,7 +108,6 @@ class SenderWorker extends cdkcore.Construct { Stack: scope.stack, App: id, Platform: props.platform, - JAVA_TOOL_OPTIONS: "-Djdk.httpclient.connectionPoolSize=2" }, memorySize: 10240, description: `sends notifications for ${id}`, From d4be7e93ab3148ff898bb3586fc3d61168d72546 Mon Sep 17 00:00:00 2001 From: Wai Sing Yiu Date: Fri, 19 Apr 2024 11:22:52 +0100 Subject: [PATCH 5/5] Add to log which API is being used --- .../main/scala/com/gu/notifications/worker/AndroidSender.scala | 2 +- .../com/gu/notifications/worker/SenderRequestHandler.scala | 2 +- .../main/scala/com/gu/notifications/worker/utils/Logging.scala | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/AndroidSender.scala b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/AndroidSender.scala index 54d998948..29271df22 100644 --- a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/AndroidSender.scala +++ b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/AndroidSender.scala @@ -95,7 +95,7 @@ class AndroidSender(val config: FcmWorkerConfiguration, val firebaseAppName: Opt } input .fold(SendingResults.empty) { case (acc, resp) => SendingResults.aggregateBatch(acc, chunkedTokens.tokens.size, resp) } - .evalTap(logInfoWithFields(logFields(env, chunkedTokens.notification, chunkedTokens.tokens.size, sentTime, functionStartTime, Configuration.platform, sqsMessageBatchSize), prefix = s"Results $notificationLog: ").andThen(_.map(cloudwatch.sendPerformanceMetrics(env.stage, enableAwsMetric)))) + .evalTap(logInfoWithFields(logFields(env, chunkedTokens.notification, chunkedTokens.tokens.size, sentTime, functionStartTime, Configuration.platform, sqsMessageBatchSize, messagingApi = Some("Batch")), prefix = s"Results $notificationLog: ").andThen(_.map(cloudwatch.sendPerformanceMetrics(env.stage, enableAwsMetric)))) .through(cloudwatch.sendResults(env.stage, Configuration.platform)) } diff --git a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/SenderRequestHandler.scala b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/SenderRequestHandler.scala index 88459262d..57b695c4f 100644 --- a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/SenderRequestHandler.scala +++ b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/SenderRequestHandler.scala @@ -43,7 +43,7 @@ trait SenderRequestHandler[C <: DeliveryClient] extends Logging { } input.fold(SendingResults.empty) { case (acc, resp) => SendingResults.aggregate(acc, resp) } - .evalTap(logInfoWithFields(logFields(env, chunkedTokens.notification, chunkedTokens.tokens.size, sentTime, functionStartTime, Configuration.platform, sqsMessageBatchSize = sqsMessageBatchSize), prefix = s"Results $notificationLog: ").andThen(_.map(cloudwatch.sendPerformanceMetrics(env.stage, enableAwsMetric)))) + .evalTap(logInfoWithFields(logFields(env, chunkedTokens.notification, chunkedTokens.tokens.size, sentTime, functionStartTime, Configuration.platform, sqsMessageBatchSize = sqsMessageBatchSize, messagingApi = Some("Individual")), prefix = s"Results $notificationLog: ").andThen(_.map(cloudwatch.sendPerformanceMetrics(env.stage, enableAwsMetric)))) .through(cloudwatch.sendResults(env.stage, Configuration.platform)) } diff --git a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/utils/Logging.scala b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/utils/Logging.scala index 785e7916b..0071ff45e 100644 --- a/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/utils/Logging.scala +++ b/notificationworkerlambda/src/main/scala/com/gu/notifications/worker/utils/Logging.scala @@ -51,6 +51,7 @@ trait Logging { functionStartTime: Instant, maybePlatform: Option[Platform], sqsMessageBatchSize: Int, + messagingApi: Option[String] = None )(end: Instant): Map[String, Any] = { val processingTime = Duration.between(functionStartTime, end).toMillis val processingRate = numberOfTokens.toDouble / processingTime * 1000 @@ -67,7 +68,7 @@ trait Logging { "worker.notificationProcessingEndTime.millis" -> end.toEpochMilli, "sqsMessageBatchSize" -> sqsMessageBatchSize, "worker.chunkTokenSize" -> numberOfTokens, - ) + ) ++ messagingApi.map(s => Map("worker.messagingApi" -> s)).getOrElse(Map()) } def logStartAndCount(acc: Int, chunkedTokens: ChunkedTokens): Int = {