Skip to content

Commit

Permalink
Merge pull request eclipse-ditto#10 from bosch-iot-things/feature/fix…
Browse files Browse the repository at this point in the history
…-entity-id

Feature/fix entity
  • Loading branch information
Yannic92 authored and GitHub Enterprise committed Apr 12, 2021
2 parents 3f32158 + a0a9707 commit 8cca671
Show file tree
Hide file tree
Showing 154 changed files with 498 additions and 458 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.entity.type.WithEntityType;
import org.eclipse.ditto.model.base.headers.DittoHeadersSettable;
import org.eclipse.ditto.signals.base.WithId;
import org.eclipse.ditto.model.base.entity.id.WithEntityId;

/**
* Common interface for all Thing related changes.
*
* @since 1.0.0
*/
public interface Change extends WithId, WithEntityType, DittoHeadersSettable<Change>, Acknowledgeable {
public interface Change extends WithEntityId, WithEntityType, DittoHeadersSettable<Change>, Acknowledgeable {

/**
* Returns the {@link ChangeAction} which caused this change.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public String getManifest() {
}

@Override
public ThingId getThingEntityId() {
public ThingId getEntityId() {
return (ThingId) ack.getEntityId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@
import org.eclipse.ditto.protocoladapter.Adaptable;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.model.base.entity.id.WithEntityId;
import org.eclipse.ditto.signals.base.WithOptionalEntity;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.ErrorResponse;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
import org.eclipse.ditto.signals.commands.things.modify.CreateThing;
import org.eclipse.ditto.signals.commands.things.modify.DeleteThing;
Expand Down Expand Up @@ -645,21 +647,18 @@ public void registerForThingChanges(final String registrationId, final Consumer<
* @param protocolCommandAck the expected acknowledgement.
* @param futureToCompleteOrFailAfterAck the future to complete or fail after receiving the expected acknowledgement
* or not.
* @param adaptableToMessage function to convert an adaptable into a message.
* @return the subscription ID.
*/
protected AdaptableBus.SubscriptionId subscribe(
@Nullable final AdaptableBus.SubscriptionId previousSubscriptionId,
final Classification.StreamingType streamingType,
final String protocolCommand,
final String protocolCommandAck,
final CompletableFuture<Void> futureToCompleteOrFailAfterAck,
final Function<Adaptable, Message<?>> adaptableToMessage) {
final CompletableFuture<Void> futureToCompleteOrFailAfterAck) {

return subscribeAndPublishMessage(previousSubscriptionId, streamingType, protocolCommand, protocolCommandAck,
futureToCompleteOrFailAfterAck, adaptable -> bus -> {
final Message<?> message = adaptableToMessage.apply(adaptable);
bus.notify(message.getSubject(), message);
asThingMessage(adaptable).ifPresent(message -> bus.notify(message.getSubject(), message));
});
}

Expand Down Expand Up @@ -745,17 +744,31 @@ protected void unsubscribe(@Nullable final AdaptableBus.SubscriptionId subscript
}
}

protected static Message<?> asThingMessage(final Adaptable adaptable) {
/**
* Build a {@link Message} out of the given {@link Adaptable}.
*
* @param adaptable from which the things {@link Message} shall be build from.
* @return empty if the adaptable doesn't provide a thingId, or the build {@link Message}.
*/
private static Optional<Message<?>> asThingMessage(final Adaptable adaptable) {
final Signal<?> signal = PROTOCOL_ADAPTER.fromAdaptable(adaptable);
final ThingId thingId = ThingId.of(signal.getEntityId());
final MessageHeaders messageHeaders =
MessageHeaders.newBuilder(MessageDirection.FROM, thingId, signal.getType())
.correlationId(signal.getDittoHeaders().getCorrelationId().orElse(null))
.build();
return Message.newBuilder(messageHeaders)
.payload(signal)
.extra(adaptable.getPayload().getExtra().orElse(null))
.build();
final Optional<ThingId> thingIdOptional = WithEntityId.getEntityIdOfType(ThingId.class, signal);
final Message<?> message;
if (thingIdOptional.isPresent()) {
final ThingId thingId = thingIdOptional.get();
final MessageHeaders messageHeaders = MessageHeaders
.newBuilder(MessageDirection.FROM, thingId, signal.getType())
.correlationId(signal.getDittoHeaders().getCorrelationId().orElse(null))
.build();
message = Message.newBuilder(messageHeaders)
.payload(signal)
.extra(adaptable.getPayload().getExtra().orElse(null))
.build();
} else {
LOGGER.warn("Cannot build ThingMessage out of Signal without an ThingId: <{}>", signal);
message = null;
}
return Optional.ofNullable(message);
}

private static void adjoin(final CompletionStage<?> stage, final CompletableFuture<Void> future) {
Expand All @@ -780,7 +793,9 @@ private static void assertThatThingHasId(final Thing thing) {
}

private CompletionStage<List<Thing>> sendRetrieveThingsMessage(final RetrieveThings command) {
return askThingCommand(command, RetrieveThingsResponse.class, RetrieveThingsResponse::getThings);
return sendSignalAndExpectResponse(command, RetrieveThingsResponse.class, RetrieveThingsResponse::getThings,
ErrorResponse.class,
ErrorResponse::getDittoRuntimeException);
}

@Nullable
Expand Down
Loading

0 comments on commit 8cca671

Please sign in to comment.