diff --git a/codepropertygraph/src/main/scala/io/shiftleft/passes/CpgPass.scala b/codepropertygraph/src/main/scala/io/shiftleft/passes/CpgPass.scala index e8699131f..16a5e9e4d 100644 --- a/codepropertygraph/src/main/scala/io/shiftleft/passes/CpgPass.scala +++ b/codepropertygraph/src/main/scala/io/shiftleft/passes/CpgPass.scala @@ -9,8 +9,8 @@ import org.slf4j.{Logger, LoggerFactory, MDC} import java.util.concurrent.{TimeUnit, TimeoutException} import java.util.function.{BiConsumer, Supplier} import scala.annotation.nowarn -import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration.{Duration, DurationLong} +import scala.concurrent.{Await, Future} import scala.util.{Failure, Success, Try} /* CpgPass @@ -162,30 +162,16 @@ abstract class ForkJoinParallelCpgPass[T <: AnyRef](cpg: Cpg, @nowarn outName: S case 1 => runOnPart(externalBuilder, parts(0).asInstanceOf[T]) case _ => - val stream = - if (!isParallel) - java.util.Arrays - .stream(parts) - .sequential() - else - java.util.Arrays - .stream(parts) - .parallel() - val diff = stream.collect( - new Supplier[DiffGraphBuilder] { - override def get(): DiffGraphBuilder = - Cpg.newDiffGraphBuilder - }, - new BiConsumer[DiffGraphBuilder, AnyRef] { - override def accept(builder: DiffGraphBuilder, part: AnyRef): Unit = - runOnPart(builder, part.asInstanceOf[T]) - }, - new BiConsumer[DiffGraphBuilder, DiffGraphBuilder] { - override def accept(leftBuilder: DiffGraphBuilder, rightBuilder: DiffGraphBuilder): Unit = - leftBuilder.absorb(rightBuilder) + implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global + val futures: List[Future[DiffGraphBuilder]] = parts.map { part => + val future = Future { + val diffGraphBuilder = Cpg.newDiffGraphBuilder + runOnPart(diffGraphBuilder, part.asInstanceOf[T]) + diffGraphBuilder } - ) - externalBuilder.absorb(diff) + future + }.toList + Await.result(Future.sequence(futures), Duration.Inf).foreach(externalBuilder.absorb) } nParts } finally { diff --git a/codepropertygraph/src/main/scala/io/shiftleft/utils/StatsCollector.scala b/codepropertygraph/src/main/scala/io/shiftleft/utils/StatsCollector.scala index fb8cf1600..a58d42e8b 100644 --- a/codepropertygraph/src/main/scala/io/shiftleft/utils/StatsCollector.scala +++ b/codepropertygraph/src/main/scala/io/shiftleft/utils/StatsCollector.scala @@ -13,6 +13,10 @@ object StatsLogger extends DataLogger { ): Unit = { logger.foreach(log => log.initiateNewStage(stageName, stageFullName, additionalMetaDataToLog)) } def endLastStage(): Unit = { logger.foreach(log => log.endLastStage()) } + + def justLogMessage(message: String): Unit = { + logger.foreach(log => log.justLogMessage(message)) + } } trait DataLogger { @@ -25,4 +29,6 @@ trait DataLogger { ): Unit def endLastStage(): Unit + + def justLogMessage(message: String): Unit }