@@ -31,7 +31,10 @@ public TopicTerminator(ApplicationProperties props, KafkaAdmin kafkaAdmin) {
31
31
32
32
@ Scheduled (fixedRateString = "${app.fixed-rate-string}" )
33
33
public void terminateUnusedTopics () throws ExecutionException , InterruptedException {
34
- log .info ("Terminating unused topics{}" , props .isDryRun () ? " in dry-run mode" : "" );
34
+ log .atInfo ()
35
+ .setMessage ("Terminating unused topics" )
36
+ .addKeyValue ("dry-run" , props .isDryRun ())
37
+ .log ();
35
38
try (AdminClient client = AdminClient .create (kafkaAdmin .getConfigurationProperties ())) {
36
39
final Set <String > allTopics = client .listTopics ().names ().get ();
37
40
@@ -47,11 +50,20 @@ public void terminateUnusedTopics() throws ExecutionException, InterruptedExcept
47
50
unusedTopics .removeAll (reservedTopic .getNames (client ));
48
51
}
49
52
50
- log .info ("{} topic(s) to be deleted: " , unusedTopics .size ());
53
+ log .atInfo ()
54
+ .setMessage ("Start deleting unused topics" )
55
+ .addKeyValue ("count" , unusedTopics .size ())
56
+ .log ();
51
57
if (props .isDryRun ()) {
52
- unusedTopics .forEach (t -> log .info ("Topic {} is considered unused and would be deleted in non dry-run mode" , t ));
58
+ unusedTopics .forEach (t -> log .atInfo ()
59
+ .setMessage ("NOT deleting unused topic in dry-run mode" )
60
+ .addKeyValue ("topic" , t )
61
+ .log ());
53
62
} else {
54
- unusedTopics .forEach (t -> log .info ("Delete unused topic: {}" , t ));
63
+ unusedTopics .forEach (t -> log .atInfo ()
64
+ .setMessage ("Deleting unused topic" )
65
+ .addKeyValue ("topic" , t )
66
+ .log ());
55
67
client .deleteTopics (unusedTopics );
56
68
}
57
69
}
0 commit comments