Skip to content

Commit

Permalink
Fix test case
Browse files Browse the repository at this point in the history
  • Loading branch information
waisingyiu committed Apr 11, 2024
1 parent 4f5ebdf commit 7fa4264
Show file tree
Hide file tree
Showing 9 changed files with 315,425 additions and 11 deletions.
143,016 changes: 143,016 additions & 0 deletions depend.txt

Large diffs are not rendered by default.

172,374 changes: 172,374 additions & 0 deletions depend2.txt

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions docs/testing/17-event-bridge.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@




For android-beta, android, ios,
46 - unknown application error

Total start - 1625 + 46
Total end- 1625 + 46

Received message:
Android - 795+12 = 807
ios - 648+32 = 680
AndroidBeta - 134+2 = 136
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,16 @@ class AndroidSender(val config: FcmWorkerConfiguration, val firebaseAppName: Opt

logger.info(s"Using thread pool size: ${config.threadPoolSize}")
logger.info(s"Topics for individual send: ${config.allowedTopicsForIndividualSend.mkString(",")}")
logger.info(s"Concurrency for individual send: ${config.concurrencyForIndividualSend}")

override implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec)
override implicit val timer: Timer[IO] = IO.timer(ec)

override val deliveryService: IO[Fcm[IO]] =
FcmClient(config.fcmConfig, firebaseAppName).fold(e => IO.raiseError(e), c => IO.delay(new Fcm(c)))
override val maxConcurrency = 100

def isIndividualSend(topics: List[Topic], selectedTopics: List[String]): Boolean =
topics.forall(topic => selectedTopics.exists(topic.toString.startsWith(_)))

override val maxConcurrency = config.concurrencyForIndividualSend
override val batchConcurrency = 100

//override the deliverChunkedTokens method to validate the success of sending batch notifications to the FCM client. This implementation could be refactored in the future to make it more streamlined with APNs
override def deliverChunkedTokens(chunkedTokenStream: Stream[IO, (ChunkedTokens, Long, Instant, Int)]): Stream[IO, Unit] = {
chunkedTokenStream.map {
Expand All @@ -62,13 +61,13 @@ class AndroidSender(val config: FcmWorkerConfiguration, val firebaseAppName: Opt
cleanupBatchFailures(chunkedTokens.notification.id),
trackBatchProgress(chunkedTokens.notification.id))
}
}.parJoin(maxConcurrency)
}.parJoin(batchConcurrency)
}

def deliverBatchNotificationStream[C <: FcmClient](batchNotificationStream: Stream[IO, BatchNotification]): Stream[IO, Either[DeliveryException, C#BatchSuccess]] = for {
deliveryService <- Stream.eval(deliveryService)
resp <- batchNotificationStream.map(batchNotification => deliveryService.sendBatch(batchNotification.notification, batchNotification.token))
.parJoin(maxConcurrency)
.parJoin(batchConcurrency)
.evalTap(Reporting.logBatch(s"Sending failure: "))
} yield resp

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ case class FcmWorkerConfiguration(
fcmConfig: FcmConfig,
threadPoolSize: Int,
allowedTopicsForIndividualSend: List[String],
concurrencyForIndividualSend: Int
) extends WorkerConfiguration {
def isIndividualSend(topics: List[String]): Boolean =
topics.forall(topic => allowedTopicsForIndividualSend.exists(topic.startsWith(_)))
Expand Down Expand Up @@ -120,6 +121,12 @@ object Configuration {
config.getString(path).split(",").toList
else
List()

def getOptionalInt(path: String, defVal: Int): Int =
if (config.hasPath(path))
config.getInt(path)
else
defVal

FcmWorkerConfiguration(
config.getString("cleaningSqsUrl"),
Expand All @@ -129,7 +136,8 @@ object Configuration {
dryRun = config.getBoolean("dryrun")
),
config.getInt("fcm.threadPoolSize"),
getStringList("fcm.allowedTopicsForIndividualSend")
getStringList("fcm.allowedTopicsForIndividualSend"),
getOptionalInt("fcm.concurrencyForIndividualSend", 100)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ class IOSSender(val config: ApnsWorkerConfiguration, val metricNs: String) exten
override val deliveryService: IO[Apns[IO]] =
ApnsClient(config.apnsConfig).fold(e => IO.raiseError(e), c => IO.delay(new Apns(c)))
override val maxConcurrency = config.apnsConfig.maxConcurrency

override val batchConcurrency = config.apnsConfig.maxConcurrency
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ trait SenderRequestHandler[C <: DeliveryClient] extends Logging {
val cleaningClient: CleaningClient
val cloudwatch: Cloudwatch
val maxConcurrency: Int
val batchConcurrency: Int

def env = Env()

Expand Down Expand Up @@ -88,7 +89,7 @@ trait SenderRequestHandler[C <: DeliveryClient] extends Logging {
reportLatency(chunkedTokens, chunkedTokens.metadata),
cleanupFailures,
trackProgress(chunkedTokens.notification.id))
}.parJoin(maxConcurrency)
}.parJoin(batchConcurrency)
}

def handleChunkTokens(event: SQSEvent, context: Context): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class FcmWorkerConfigurationSpec extends Specification with Matchers {
cleaningSqsUrl = "cleaning-sqs-url",
fcmConfig = FcmConfig(serviceAccountKey = "key", debug = false, dryRun = false),
threadPoolSize = 50,
allowedTopicsForIndividualSend = selectedTopics)
allowedTopicsForIndividualSend = selectedTopics,
concurrencyForIndividualSend = 200)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class SenderRequestHandlerSpec extends Specification with Matchers {
val workerRequestHandler = new SenderRequestHandler[ApnsClient] {

override val maxConcurrency = 100
override val batchConcurrency = 100

override def deliveryService: IO[DeliveryService[IO, ApnsClient]] = IO.pure(new DeliveryService[IO, ApnsClient] {
override def send(notification: Notification, token: String): Stream[IO, Either[DeliveryException, ApnsDeliverySuccess]] = {
Expand Down

0 comments on commit 7fa4264

Please sign in to comment.