Skip to content

Commit

Permalink
Update runner threads
Browse files Browse the repository at this point in the history
  • Loading branch information
iamyulong committed Dec 6, 2024
1 parent 1dfce57 commit 5cdde70
Showing 1 changed file with 4 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@

package com.radixdlt.environment.rx;

import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;

import com.google.common.collect.ImmutableList;
import com.radixdlt.consensus.event.CoreEvent;
import com.radixdlt.consensus.event.LocalEvent;
Expand All @@ -75,7 +73,6 @@
import com.radixdlt.environment.RemoteEventProcessor;
import com.radixdlt.environment.StartProcessor;
import com.radixdlt.modules.ModuleRunner;
import com.radixdlt.utils.ThreadFactories;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Scheduler;
Expand All @@ -86,6 +83,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -193,17 +191,14 @@ public void start(Consumer<Throwable> errorHandler) {
return;
}

this.executorService =
newSingleThreadScheduledExecutor(ThreadFactories.daemonThreads(threadName));
var singleThreadScheduler = Schedulers.from(this.executorService);
this.executorService = Executors.newScheduledThreadPool(4);
var scheduler = Schedulers.from(this.executorService);

logger.info("Starting Runner: {}", this.threadName);

this.executorService.submit(() -> startProcessors.forEach(StartProcessor::start));
final var disposables =
this.subscriptions.stream()
.map(s -> s.subscribe(singleThreadScheduler, errorHandler))
.toList();
this.subscriptions.stream().map(s -> s.subscribe(scheduler, errorHandler)).toList();
this.compositeDisposable = new CompositeDisposable(disposables);

this.onStart.forEach(f -> f.accept(this.executorService));
Expand Down

0 comments on commit 5cdde70

Please sign in to comment.