Skip to content
This repository has been archived by the owner on May 14, 2022. It is now read-only.

Commit

Permalink
Improve visibility in the task publisher event stream processing (#1203)
Browse files Browse the repository at this point in the history
  • Loading branch information
tbak authored Jan 12, 2022
1 parent 1a7218a commit 9ad5af1
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,44 @@
import com.netflix.titus.grpc.protogen.Task;
import com.netflix.titus.runtime.endpoint.v3.grpc.GrpcJobManagementModelConverters;
import com.netflix.titus.supplementary.taskspublisher.es.ElasticSearchUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class TaskEventsGenerator {

private static final int THREAD_POOL_LIMIT = 500;

private final Logger logger = LoggerFactory.getLogger(TaskEventsGenerator.class);

private final Map<String, String> taskDocumentBaseContext;
private TitusClient titusClient;
private final TitusClient titusClient;
private final Scheduler scheduler;
private ConnectableFlux<TaskDocument> taskEvents;

public TaskEventsGenerator(TitusClient titusClient,
Map<String, String> taskDocumentBaseContext) {
this.titusClient = titusClient;
this.taskDocumentBaseContext = taskDocumentBaseContext;
this.scheduler = Schedulers.newBoundedElastic(THREAD_POOL_LIMIT, Integer.MAX_VALUE, "taskEventsGenerator", 60, true);
buildEventStream();
}

public void shutdown() {
scheduler.dispose();
}

public ConnectableFlux<TaskDocument> getTaskEvents() {
return taskEvents;
}

private void buildEventStream() {
taskEvents = titusClient.getJobAndTaskUpdates()
.publishOn(Schedulers.elastic())
.publishOn(scheduler)
.flatMap(jobOrTaskUpdate -> jobOrTaskUpdate.hasTask() ? Flux.just(jobOrTaskUpdate.getTask()) : Flux.empty())
.map(task -> {
final Mono<Job> jobById = titusClient.getJobById(task.getJobId());
Expand All @@ -63,6 +76,7 @@ private void buildEventStream() {
return TaskDocument.fromV3Task(coreTask, coreJob, ElasticSearchUtils.DATE_FORMAT, buildTaskContext(task));
}).flux();
})
.doOnError(error -> logger.error("TitusClient event stream error", error))
.retryWhen(TaskPublisherRetryUtil.buildRetryHandler(TaskPublisherRetryUtil.INITIAL_RETRY_DELAY_MS,
TaskPublisherRetryUtil.MAX_RETRY_DELAY_MS, -1))
.publish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,28 @@ public void checkPublisherState() {
mockTitusClient(numTasks),
Collections.emptyMap());

EsPublisher esPublisher = new EsPublisher(taskEventsGenerator, mockElasticSearchClient(),
mockEsPublisherConfiguration(), new DefaultRegistry());
esPublisher.activate();

final CountDownLatch latch = new CountDownLatch(1);
Flux.interval(Duration.ofSeconds(1), Schedulers.elastic())
.take(1)
.doOnNext(i -> {
final int numTasksUpdated = esPublisher.getNumTasksPublished();
final int numErrors = esPublisher.getNumErrorsInPublishing();
assertThat(numErrors).isEqualTo(0);
assertThat(numTasksUpdated).isGreaterThanOrEqualTo(numTasks);
latch.countDown();
}).subscribe();
try {
latch.await(2, TimeUnit.MINUTES);
} catch (InterruptedException e) {
fail("Timeout in checkPublisherState ", e);
EsPublisher esPublisher = new EsPublisher(taskEventsGenerator, mockElasticSearchClient(),
mockEsPublisherConfiguration(), new DefaultRegistry());
esPublisher.activate();

final CountDownLatch latch = new CountDownLatch(1);
Flux.interval(Duration.ofSeconds(1), Schedulers.elastic())
.take(1)
.doOnNext(i -> {
final int numTasksUpdated = esPublisher.getNumTasksPublished();
final int numErrors = esPublisher.getNumErrorsInPublishing();
assertThat(numErrors).isEqualTo(0);
assertThat(numTasksUpdated).isGreaterThanOrEqualTo(numTasks);
latch.countDown();
}).subscribe();
try {
latch.await(2, TimeUnit.MINUTES);
} catch (InterruptedException e) {
fail("Timeout in checkPublisherState ", e);
}
} finally {
taskEventsGenerator.shutdown();
}
}
}

0 comments on commit 9ad5af1

Please sign in to comment.