|
30 | 30 | import org.eclipse.hono.application.client.TimeUntilDisconnectNotification;
|
31 | 31 | import org.eclipse.hono.application.client.amqp.AmqpApplicationClient;
|
32 | 32 | import org.eclipse.hono.application.client.amqp.ProtonBasedApplicationClient;
|
| 33 | +import org.eclipse.hono.application.client.kafka.KafkaApplicationClient; |
33 | 34 | import org.eclipse.hono.application.client.kafka.impl.KafkaApplicationClientImpl;
|
34 | 35 | import org.eclipse.hono.client.ServiceInvocationException;
|
35 | 36 | import org.eclipse.hono.client.amqp.config.ClientConfigProperties;
|
|
46 | 47 | import io.vertx.core.CompositeFuture;
|
47 | 48 | import io.vertx.core.Future;
|
48 | 49 | import io.vertx.core.Handler;
|
| 50 | +import io.vertx.core.Promise; |
49 | 51 | import io.vertx.core.Vertx;
|
50 | 52 | import io.vertx.core.buffer.Buffer;
|
51 | 53 | import io.vertx.core.json.JsonObject;
|
@@ -173,10 +175,17 @@ protected void consumeData() {
|
173 | 175 | ac.addReconnectListener(c -> LOG.info("reconnected to Hono"));
|
174 | 176 | }
|
175 | 177 |
|
| 178 | + final Promise<Void> readyTracker = Promise.promise(); |
| 179 | + if (client instanceof KafkaApplicationClient kafkaApplicationClient) { |
| 180 | + kafkaApplicationClient.addOnClientReadyHandler(readyTracker); |
| 181 | + } else { |
| 182 | + readyTracker.complete(); |
| 183 | + } |
176 | 184 | client.start()
|
177 |
| - .compose(v -> CompositeFuture.all(createEventConsumer(), createTelemetryConsumer())) |
178 |
| - .onSuccess(ok -> startup.complete(client)) |
179 |
| - .onFailure(startup::completeExceptionally); |
| 185 | + .compose(ok -> readyTracker.future()) |
| 186 | + .compose(v -> CompositeFuture.all(createEventConsumer(), createTelemetryConsumer())) |
| 187 | + .onSuccess(ok -> startup.complete(client)) |
| 188 | + .onFailure(startup::completeExceptionally); |
180 | 189 |
|
181 | 190 | try {
|
182 | 191 | startup.join();
|
|
0 commit comments