Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/distributed create entity #1321

Merged
merged 33 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0dee194
wip: removeattribute after csr call
thomasBousselin Jan 16, 2025
f8d1dae
wip: getFilteredAndRemoved on ExpandedEntity
thomasBousselin Jan 16, 2025
a3928a2
feat: functionning call for distributeCreateEntity
thomasBousselin Jan 17, 2025
099433c
feat: createEntity with error gestion
thomasBousselin Jan 20, 2025
37a7af9
fix: minor formating for pr
thomasBousselin Jan 20, 2025
74bf13c
fix: build
thomasBousselin Jan 20, 2025
fcfd294
fix: existing tests
thomasBousselin Jan 20, 2025
3a10c7e
refactor: BatchEntityError.error as ProblemDetail
thomasBousselin Jan 21, 2025
7da65b8
feat: first easy fixes
thomasBousselin Jan 21, 2025
bc78dbd
feat: case when the entity is entirely merged + test for create entit…
thomasBousselin Jan 22, 2025
245ed78
fix: verify entity before distribution + accept non Problem Detail error
thomasBousselin Jan 22, 2025
4733f84
fix: case receive 207
thomasBousselin Jan 22, 2025
04a5456
refactor: rename InternalCsrFilter to RegistrationInfoFilter
thomasBousselin Jan 23, 2025
060fae4
refactor: put entity filter from the csr info in CSR class
thomasBousselin Jan 23, 2025
4f93e3d
fix: test on getAssociatedAttributes
thomasBousselin Jan 23, 2025
0485871
feat: test for postDistributedInformation
thomasBousselin Jan 23, 2025
9287baf
feat: test for distributeCreateEntityForContextSources
thomasBousselin Jan 23, 2025
fff3c2f
feat: test for distributeCreateEntity
thomasBousselin Jan 24, 2025
76abdf2
fix: only call with content-type json-ld (as @context always is on th…
thomasBousselin Jan 27, 2025
f80b3f8
wip: add success to multistatus result
thomasBousselin Feb 3, 2025
1d25690
fix: tests
thomasBousselin Feb 3, 2025
7a6f3f2
Apply suggestions from code review
thomasBousselin Feb 5, 2025
1e60319
fix: pr comments
thomasBousselin Feb 5, 2025
c416194
fix: pr comments
thomasBousselin Feb 5, 2025
5be9d25
fix: test
thomasBousselin Feb 5, 2025
0a68a6c
fix: test and no conflict for inclusive
thomasBousselin Feb 5, 2025
f817a79
feat: PR fixes
thomasBousselin Feb 7, 2025
b64f020
fix: PR comments
thomasBousselin Feb 7, 2025
d0b9ac9
Apply suggestions from code review
thomasBousselin Feb 7, 2025
439aecc
fix: PR comments
thomasBousselin Feb 7, 2025
0473491
Update search-service/src/test/kotlin/com/egm/stellio/search/csr/serv…
thomasBousselin Feb 7, 2025
df06705
fix: detekt after suggestion
thomasBousselin Feb 7, 2025
ee26f8d
Feature/local parameter (#1323)
thomasBousselin Feb 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions search-service/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
<ID>LongParameterList:EntityAttributeService.kt$EntityAttributeService$( entityUri: URI, ngsiLdAttributes: List&lt;NgsiLdAttribute&gt;, expandedAttributes: ExpandedAttributes, disallowOverwrite: Boolean, createdAt: ZonedDateTime, sub: Sub? )</ID>
<ID>LongParameterList:TemporalEntityHandler.kt$TemporalEntityHandler$( @RequestHeader httpHeaders: HttpHeaders, @PathVariable entityId: URI, @PathVariable attrId: String, @PathVariable instanceId: URI, @RequestBody requestBody: Mono&lt;String&gt;, @AllowedParameters(notImplemented = [QP.LOCAL, QP.VIA]) @RequestParam queryParams: MultiValueMap&lt;String, String&gt; )</ID>
<ID>LongParameterList:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$( entityId: URI, attributeName: ExpandedTerm, datasetId: URI?, attributePayload: ExpandedAttributeInstance, ngsiLdAttributeInstance: NgsiLdAttributeInstance, defaultCreatedAt: ZonedDateTime )</ID>
<ID>MaxLineLength:DistributedEntityConsumptionServiceTests.kt$DistributedEntityConsumptionServiceTests$fun</ID>
<ID>MaximumLineLength:DistributedEntityConsumptionServiceTests.kt$DistributedEntityConsumptionServiceTests$</ID>
<ID>MaxLineLength:DistributedEntityProvisionServiceTests.kt$DistributedEntityProvisionServiceTests$fun</ID>
<ID>MaximumLineLength:DistributedEntityProvisionServiceTests.kt$DistributedEntityProvisionServiceTests$</ID>
<ID>NestedBlockDepth:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$override fun migrate(context: Context)</ID>
<ID>SwallowedException:TemporalQueryUtils.kt$e: IllegalArgumentException</ID>
</CurrentIssues>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.egm.stellio.search.csr.model
import com.egm.stellio.shared.model.EntityTypeSelection
import java.net.URI

data class CSRFilters( // we should use a combination of EntitiesQuery TemporalQuery (when we implement all operations)
open class CSRFilters( // we should use a combination of EntitiesQuery TemporalQuery (when we implement all operations)
val ids: Set<URI> = emptySet(),
val typeSelection: EntityTypeSelection? = null,
val idPattern: String? = null,
Expand All @@ -13,12 +13,24 @@ data class CSRFilters( // we should use a combination of EntitiesQuery TemporalQ
ids: Set<URI> = emptySet(),
typeSelection: EntityTypeSelection? = null,
idPattern: String? = null,
operations: List<Operation>
operations: List<Operation>?
) :
this(
ids = ids,
typeSelection = typeSelection,
idPattern = idPattern,
csf = operations.joinToString("|") { "${ContextSourceRegistration::operations.name}==${it.key}" }
csf = operations?.joinToString("|") { "${ContextSourceRegistration::operations.name}==${it.key}" }
)

constructor(
ids: Set<URI> = emptySet(),
types: Set<String>,
idPattern: String? = null,
operations: List<Operation>? = null
) : this(
ids = ids,
typeSelection = types.joinToString("|"),
idPattern = idPattern,
operations = operations
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.BadRequestDataException
import com.egm.stellio.shared.model.ExpandedEntity
import com.egm.stellio.shared.model.ExpandedTerm
import com.egm.stellio.shared.model.toAPIException
import com.egm.stellio.shared.util.DataTypes
import com.egm.stellio.shared.util.JSON_LD_MEDIA_TYPE
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_CONTEXT
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_TYPE
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_CSR_TERM
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_RELATIONSHIP_TYPE
import com.egm.stellio.shared.util.JsonLdUtils.compactTerm
import com.egm.stellio.shared.util.JsonLdUtils.expandJsonLdTerm
import com.egm.stellio.shared.util.JsonUtils.deserializeAs
Expand Down Expand Up @@ -154,6 +158,46 @@ data class ContextSourceRegistration(
if (!id.isAbsolute)
BadRequestDataException(invalidUriMessage("$id")).left()
else Unit.right()

fun getAssociatedAttributes(
registrationInfoFilter: RegistrationInfoFilter,
entity: ExpandedEntity,
): Set<ExpandedTerm> {
val matchingRegistrationsInfo = getMatchingInformation(registrationInfoFilter)

val properties =
bobeal marked this conversation as resolved.
Show resolved Hide resolved
if (matchingRegistrationsInfo.any { it.propertyNames == null }) null
else matchingRegistrationsInfo.flatMap { it.propertyNames!! }.toSet()

val relationships =
if (matchingRegistrationsInfo.any { it.relationshipNames == null }) null
else matchingRegistrationsInfo.flatMap { it.relationshipNames!! }.toSet()

return entity.getAttributes().filter { (term, attribute) ->
val attributeType = attribute.first()[JSONLD_TYPE]?.first()
if (NGSILD_RELATIONSHIP_TYPE.uri == attributeType) {
relationships == null || term in relationships
} else {
properties == null || term in properties
}
}.keys
}

private fun getMatchingInformation(registrationInfoFilter: RegistrationInfoFilter): List<RegistrationInfo> =
information.filter { info ->
info.entities?.any { entityInfo ->
entityInfo.id?.let { registrationInfoFilter.ids.contains(it) } ?: true &&
entityInfo.types.let { types ->
types.any {
registrationInfoFilter.types?.contains(it) ?: true
}
} &&
entityInfo.idPattern?.let { pattern ->
registrationInfoFilter.ids.any { pattern.toRegex().matches(it.toString()) }
} ?: true
} ?: true
}

companion object {

fun deserialize(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.egm.stellio.search.csr.model

import java.net.URI

class RegistrationInfoFilter(
ids: Set<URI> = emptySet(),
val types: Set<String>? = null,
idPattern: String? = null,
operations: List<Operation>? = null
) : CSRFilters(
ids = ids,
typeSelection = types?.joinToString("|"),
idPattern = idPattern,
operations = operations
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package com.egm.stellio.search.csr.service

import arrow.core.Either
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.search.csr.model.CSRFilters
import com.egm.stellio.search.csr.model.ContextSourceRegistration
import com.egm.stellio.search.csr.model.Mode
import com.egm.stellio.search.csr.model.Operation
import com.egm.stellio.search.csr.model.RegistrationInfoFilter
import com.egm.stellio.search.entity.web.BatchEntityError
import com.egm.stellio.search.entity.web.BatchEntitySuccess
import com.egm.stellio.search.entity.web.BatchOperationResult
import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.BadGatewayException
import com.egm.stellio.shared.model.CompactedEntity
import com.egm.stellio.shared.model.ConflictException
import com.egm.stellio.shared.model.ContextSourceException
import com.egm.stellio.shared.model.ErrorType
import com.egm.stellio.shared.model.ExpandedEntity
import com.egm.stellio.shared.model.ExpandedTerm
import com.egm.stellio.shared.model.GatewayTimeoutException
import com.egm.stellio.shared.util.JSON_LD_CONTENT_TYPE
import com.egm.stellio.shared.util.JsonLdUtils.compactEntity
import com.egm.stellio.shared.util.toUri
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.stereotype.Service
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.awaitBodyOrNull
import org.springframework.web.reactive.function.client.awaitExchange
import java.net.URI

@Service
class DistributedEntityProvisionService(
private val contextSourceRegistrationService: ContextSourceRegistrationService,
) {
val createPath = "/ngsi-ld/v1/entities"

val logger: Logger = LoggerFactory.getLogger(javaClass)

suspend fun distributeCreateEntity(
entity: ExpandedEntity,
contexts: List<String>,
): Pair<BatchOperationResult, ExpandedEntity?> {
val csrFilters =
CSRFilters(
ids = setOf(entity.id.toUri()),
types = entity.types.toSet()
)
val result = BatchOperationResult()
val registrationInfoFilter =
RegistrationInfoFilter(ids = setOf(entity.id.toUri()), types = entity.types.toSet())

val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(
filters = csrFilters,
).groupBy { it.mode }

val entityAfterExclusive = distributeCreateEntityForContextSources(
matchingCSR[Mode.EXCLUSIVE],
registrationInfoFilter,
entity,
contexts,
result
)
if (entityAfterExclusive == null) return result to null

val entityAfterRedirect = distributeCreateEntityForContextSources(
matchingCSR[Mode.REDIRECT],
registrationInfoFilter,
entityAfterExclusive,
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
contexts,
result
)
if (entityAfterRedirect == null) return result to null

distributeCreateEntityForContextSources(
matchingCSR[Mode.INCLUSIVE],
registrationInfoFilter,
entityAfterRedirect,
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
contexts,
result
)
return result to entityAfterRedirect
}

internal suspend fun distributeCreateEntityForContextSources(
csrs: List<ContextSourceRegistration>?,
registrationInfoFilter: RegistrationInfoFilter,
entity: ExpandedEntity,
contexts: List<String>,
resultToUpdate: BatchOperationResult
): ExpandedEntity? {
val allProcessedAttrs = mutableSetOf<ExpandedTerm>()
csrs?.forEach { csr ->
csr.getAssociatedAttributes(registrationInfoFilter, entity)
.let { attrs ->
allProcessedAttrs.addAll(attrs)
if (attrs.isEmpty()) Unit
else if (csr.operations.any {
it == Operation.CREATE_ENTITY ||
it == Operation.UPDATE_OPS ||
it == Operation.REDIRECTION_OPS
}
) {
postDistributedInformation(
compactEntity(entity.filterAttributes(attrs, emptySet()), contexts),
csr,
createPath
).fold(
{
resultToUpdate.errors.add(
BatchEntityError(
entityId = entity.id.toUri(),
registrationId = csr.id,
error = it.toProblemDetail()
)
)
},
{ resultToUpdate.success.add(BatchEntitySuccess(csr.id)) }
)
} else if (csr.mode != Mode.INCLUSIVE) {
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
resultToUpdate.errors.add(
BatchEntityError(
entityId = entity.id.toUri(),
registrationId = csr.id,
error = ConflictException(
"The csr: ${csr.id} does not support the creation of entities"
).toProblemDetail()
)
)
}
}
}
return if (allProcessedAttrs.isNotEmpty()) {
val remainingEntity = entity.omitAttributes(allProcessedAttrs)
if (remainingEntity.hasNonCoreAttributes()) remainingEntity else null
} else entity
}

internal suspend fun postDistributedInformation(
entity: CompactedEntity,
csr: ContextSourceRegistration,
path: String,
): Either<APIException, Unit> = either {
val uri = URI("${csr.endpoint}$path")

val request = WebClient.create()
.method(HttpMethod.POST)
.uri { uriBuilder ->
uriBuilder.scheme(uri.scheme)
.host(uri.host)
.port(uri.port)
.path(uri.path)
.build()
}.headers { newHeaders ->
newHeaders[HttpHeaders.CONTENT_TYPE] = JSON_LD_CONTENT_TYPE
}.bodyValue(entity)

return runCatching {
val (statusCode, response, _) = request.awaitExchange { response ->
Triple(response.statusCode(), response.awaitBodyOrNull<String>(), response.headers())
}
if (statusCode.value() == HttpStatus.MULTI_STATUS.value()) {
ContextSourceException(
type = ErrorType.MULTI_STATUS.type,
status = HttpStatus.MULTI_STATUS,
title = "Context source returned 207",
detail = response ?: "no message"
).left()
} else if (statusCode.is2xxSuccessful) {
logger.info("Successfully post data to CSR ${csr.id} at $uri")
Unit.right()
} else if (response == null) {
val message = "No error message received from CSR ${csr.id} at $uri"
logger.warn(message)
BadGatewayException(message).left()
} else {
logger.warn("Error creating an entity for CSR at $uri: $response")
ContextSourceException.fromResponse(response).left()
}
}.fold(
onSuccess = { it },
onFailure = { e ->
logger.warn("Error contacting CSR at $uri: ${e.message}")
logger.warn(e.stackTraceToString())
GatewayTimeoutException(
"Error connecting to CSR at $uri: \"${e.cause}:${e.message}\""
).left()
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.egm.stellio.search.entity.web.JsonLdNgsiLdEntity
import com.egm.stellio.search.entity.web.entityId
import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.BadRequestDataException
import com.egm.stellio.shared.model.toAPIException
import com.egm.stellio.shared.util.Sub
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional
Expand Down Expand Up @@ -90,7 +91,7 @@ class EntityOperationService(
entityService.createEntity(jsonLdNgsiLdEntity.second, jsonLdNgsiLdEntity.first, sub).map {
BatchEntitySuccess(jsonLdNgsiLdEntity.entityId())
}.mapLeft { apiException ->
BatchEntityError(jsonLdNgsiLdEntity.entityId(), arrayListOf(apiException.message))
BatchEntityError(jsonLdNgsiLdEntity.entityId(), apiException.toProblemDetail())
}.bind()
}
}.fold(
Expand All @@ -114,7 +115,7 @@ class EntityOperationService(
BatchEntitySuccess(id)
}
.mapLeft { apiException ->
BatchEntityError(id, arrayListOf(apiException.message))
BatchEntityError(id, apiException.toProblemDetail())
}.bind()
}
}.fold(
Expand Down Expand Up @@ -242,10 +243,10 @@ class EntityOperationService(
}.map {
BatchEntitySuccess(entity.entityId(), it)
}.mapLeft {
BatchEntityError(entity.entityId(), arrayListOf(it.message))
BatchEntityError(entity.entityId(), it.toProblemDetail())
}
}.fold(
onFailure = { BatchEntityError(entity.entityId(), arrayListOf(it.message!!)).left() },
onFailure = { BatchEntityError(entity.entityId(), it.toAPIException().toProblemDetail()).left() },
onSuccess = { it }
)

Expand Down
Loading
Loading