Skip to content

Commit

Permalink
Merge branch 'main' into etag-prepro
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-parker committed Sep 19, 2024
2 parents b422c16 + 767221a commit 1498dec
Show file tree
Hide file tree
Showing 72 changed files with 4,915 additions and 2,985 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/backend-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,17 @@ jobs:
- name: Shorten sha
run: echo "sha=${sha::7}" >> $GITHUB_ENV
- uses: actions/checkout@v4
- name: Add filename hash to environment
run: |
# This needs to be a separate step because hashFiles is done before the run steps
find backend -type f -print | sort | sha256sum > backend/filename_hash
cat backend/filename_hash
- name: Generate files hash
id: files-hash
run: |
DIR_HASH=$(echo -n ${{ hashFiles('backend/**', '.github/workflows/backend-image.yml') }})
echo "DIR_HASH=$DIR_HASH${{ env.BUILD_ARM == 'true' && '-arm' || '' }}" >> $GITHUB_ENV
rm backend/filename_hash
- name: Setup Docker metadata
id: dockerMetadata
uses: docker/metadata-action@v5
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e-k3d.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
env:
ALL_BROWSERS: ${{ github.ref == 'refs/heads/main' || github.event.inputs.all_browsers && 'true' || 'false' }}
sha: ${{ github.event.pull_request.head.sha || github.sha }}
wait_timeout: ${{ github.ref == 'refs/heads/main' && 900 || 180 }}
wait_timeout: ${{ github.ref == 'refs/heads/main' && 900 || 240 }}
steps:
- name: Shorten sha
run: echo "sha=${sha::7}" >> $GITHUB_ENV
Expand Down
37 changes: 37 additions & 0 deletions .github/workflows/ena-submission-tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: ena-submission-tests
on:
# test
pull_request:
paths:
- "ena-submission/**"
- ".github/workflows/ena-submission-tests.yml"
push:
branches:
- main
workflow_dispatch:
concurrency:
group: ci-${{ github.ref == 'refs/heads/main' && github.run_id || github.ref }}-ena-submission-tests
cancel-in-progress: true
jobs:
unitTests:
name: Unit Tests
runs-on: codebuild-loculus-ci-${{ github.run_id }}-${{ github.run_attempt }}
timeout-minutes: 15
steps:
- uses: actions/checkout@v4
- name: Set up micromamba
uses: mamba-org/setup-micromamba@v1
with:
environment-file: ena-submission/environment.yml
micromamba-version: 'latest'
init-shell: >-
bash
powershell
cache-environment: true
post-cleanup: 'all'
- name: Run tests
run: |
micromamba activate loculus-ena-submission
python3 scripts/test_ena_submission.py
shell: micromamba-shell {0}
working-directory: ena-submission
2 changes: 2 additions & 0 deletions backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ The service listens, by default, to **port 8079**: <http://localhost:8079/swagge
Note: When using a postgresSQL development platform (e.g. pgAdmin) the hostname is 127.0.0.1 and not localhost - this is defined in the `deploy.py` file.
Note that we also use flyway in the ena-submission pod to create an additional schema in the database, ena-submission. This schema is not added here.
### Operating the backend behind a proxy
When running the backend behind a proxy, the proxy needs to set X-Forwarded headers:
Expand Down
2 changes: 1 addition & 1 deletion backend/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ dependencies {
implementation "org.postgresql:postgresql:42.7.4"
implementation "org.apache.commons:commons-csv:1.11.0"
implementation "org.springdoc:springdoc-openapi-starter-webmvc-ui:2.6.0"
implementation "org.flywaydb:flyway-database-postgresql:10.17.3"
implementation "org.flywaydb:flyway-database-postgresql:10.18.0"
implementation "org.jetbrains.exposed:exposed-spring-boot-starter:0.54.0"
implementation "org.jetbrains.exposed:exposed-jdbc:0.54.0"
implementation "org.jetbrains.exposed:exposed-json:0.54.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,10 @@ enum class PreprocessingStatus {
FINISHED,
}

enum class SiloVersionStatus {
REVOKED,
REVISED,
LATEST_VERSION,
enum class VersionStatus {
REVOKED, // This is not the highest version of the sequence entry, and a higher version is a revocation
REVISED, // This is not the highest version of the sequence entry, and no higher version is a revocation
LATEST_VERSION, // This is the highest version of the sequence entry
}

enum class CompressionFormat(val compressionName: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.loculus.backend.controller

import io.swagger.v3.oas.annotations.Operation
import io.swagger.v3.oas.annotations.Parameter
import io.swagger.v3.oas.annotations.headers.Header
import io.swagger.v3.oas.annotations.media.Content
import io.swagger.v3.oas.annotations.media.Schema
import io.swagger.v3.oas.annotations.responses.ApiResponse
Expand Down Expand Up @@ -32,6 +33,7 @@ import org.loculus.backend.api.UnprocessedData
import org.loculus.backend.api.WarningsFilter
import org.loculus.backend.auth.AuthenticatedUser
import org.loculus.backend.auth.HiddenParam
import org.loculus.backend.model.RELEASED_DATA_RELATED_TABLES
import org.loculus.backend.model.ReleasedDataModel
import org.loculus.backend.model.SubmissionParams
import org.loculus.backend.model.SubmitModel
Expand Down Expand Up @@ -64,7 +66,7 @@ private val log = KotlinLogging.logger { }
@RequestMapping("/{organism}")
@Validated
@SecurityRequirement(name = "bearerAuth")
class SubmissionController(
open class SubmissionController(
private val submitModel: SubmitModel,
private val releasedDataModel: ReleasedDataModel,
private val submissionDatabaseService: SubmissionDatabaseService,
Expand Down Expand Up @@ -243,15 +245,29 @@ class SubmissionController(
schema = Schema(implementation = ProcessedData::class),
),
],
headers = [
Header(
name = "x-total-records",
description = "The total number of records sent in responseBody",
schema = Schema(type = "integer"),
),
Header(
name = "eTag",
description = "Last database write Etag",
schema = Schema(type = "integer"),
),
],
)
@ApiResponse(responseCode = "304", description = "Not Modified")
@GetMapping("/get-released-data", produces = [MediaType.APPLICATION_NDJSON_VALUE])
fun getReleasedData(
@PathVariable @Valid organism: Organism,
@RequestParam compression: CompressionFormat?,
@Parameter(description = "(Optional) Only retrieve all released data if Etag has changed.")
@RequestHeader(value = HttpHeaders.IF_NONE_MATCH, required = false) ifNoneMatch: String?,
): ResponseEntity<StreamingResponseBody> {
val lastDatabaseWriteETag = releasedDataModel.getLastDatabaseWriteETag()
val lastDatabaseWriteETag = releasedDataModel.getLastDatabaseWriteETag(
RELEASED_DATA_RELATED_TABLES,
)
if (ifNoneMatch == lastDatabaseWriteETag) {
return ResponseEntity.status(HttpStatus.NOT_MODIFIED).build()
}
Expand All @@ -260,6 +276,15 @@ class SubmissionController(
headers.eTag = lastDatabaseWriteETag
headers.contentType = MediaType.APPLICATION_NDJSON
compression?.let { headers.add(HttpHeaders.CONTENT_ENCODING, it.compressionName) }

val totalRecords = submissionDatabaseService.countReleasedSubmissions(organism)
headers.add("x-total-records", totalRecords.toString())
// TODO(https://github.com/loculus-project/loculus/issues/2778)
// There's a possibility that the totalRecords change between the count and the actual query
// this is not too bad, if the client ends up with a few more records than expected
// We just need to make sure the etag used is from before the count
// Alternatively, we could read once to file while counting and then stream the file

val streamBody = streamTransactioned(compression) { releasedDataModel.getReleasedData(organism) }
return ResponseEntity.ok().headers(headers).body(streamBody)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ and roll back the whole transaction.
const val GET_RELEASED_DATA_DESCRIPTION = """
Get released data as a stream of NDJSON.
This returns all accession versions that have the status 'APPROVED_FOR_RELEASE'.
Optionally add HttpHeader If-Modified-Since in unix timestamp (in seconds),
to only retrieve all released data if the database has changed since If-Modified-Since.
"""

const val GET_RELEASED_DATA_RESPONSE_DESCRIPTION = """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import org.loculus.backend.api.DataUseTerms
import org.loculus.backend.api.GeneticSequence
import org.loculus.backend.api.Organism
import org.loculus.backend.api.ProcessedData
import org.loculus.backend.api.SiloVersionStatus
import org.loculus.backend.api.VersionStatus
import org.loculus.backend.config.BackendConfig
import org.loculus.backend.service.submission.RawProcessedData
import org.loculus.backend.service.submission.SubmissionDatabaseService
import org.loculus.backend.service.submission.UpdateTrackerTable
import org.loculus.backend.utils.Accession
import org.loculus.backend.utils.Version
import org.loculus.backend.utils.toTimestamp
Expand All @@ -28,6 +29,16 @@ import org.springframework.transaction.annotation.Transactional

private val log = KotlinLogging.logger { }

val RELEASED_DATA_RELATED_TABLES: List<String> =
listOf(
"sequence_entries",
"sequence_entries_preprocessed_data",
"external_metadata",
"current_processing_pipeline",
"metadata_upload_aux_table",
"sequence_upload_aux_table",
)

@Service
open class ReleasedDataModel(
private val submissionDatabaseService: SubmissionDatabaseService,
Expand All @@ -45,10 +56,17 @@ open class ReleasedDataModel(
}

@Transactional(readOnly = true)
open fun getLastDatabaseWriteETag(): String {
val lastUpdateTime = UpdateTrackerTable.selectAll()
open fun getLastDatabaseWriteETag(tableNames: List<String>? = null): String {
val query = UpdateTrackerTable.select(UpdateTrackerTable.lastTimeUpdatedDbColumn).apply {
tableNames?.let {
where { UpdateTrackerTable.tableNameColumn inList it }
}
}

val lastUpdateTime = query
.mapNotNull { it[UpdateTrackerTable.lastTimeUpdatedDbColumn] }
.maxOrNull()
// Replace not strictly necessary but does no harm and a) shows UTC, b) simplifies silo import script logic
?.replace(" ", "Z")
?: ""
return "\"$lastUpdateTime\"" // ETag must be enclosed in double quotes
Expand All @@ -59,7 +77,7 @@ open class ReleasedDataModel(
latestVersions: Map<Accession, Version>,
latestRevocationVersions: Map<Accession, Version>,
): ProcessedData<GeneticSequence> {
val siloVersionStatus = computeSiloVersionStatus(rawProcessedData, latestVersions, latestRevocationVersions)
val versionStatus = computeVersionStatus(rawProcessedData, latestVersions, latestRevocationVersions)

val currentDataUseTerms = computeDataUseTerm(rawProcessedData)
val restrictedDataUseTermsUntil = if (currentDataUseTerms is DataUseTerms.Restricted) {
Expand All @@ -81,7 +99,7 @@ open class ReleasedDataModel(
("submittedAtTimestamp" to LongNode(rawProcessedData.submittedAtTimestamp.toTimestamp())) +
("releasedAtTimestamp" to LongNode(rawProcessedData.releasedAtTimestamp.toTimestamp())) +
("releasedDate" to TextNode(rawProcessedData.releasedAtTimestamp.toUtcDateString())) +
("versionStatus" to TextNode(siloVersionStatus.name)) +
("versionStatus" to TextNode(versionStatus.name)) +
("dataUseTerms" to TextNode(currentDataUseTerms.type.name)) +
("dataUseTermsRestrictedUntil" to restrictedDataUseTermsUntil) +
("versionComment" to TextNode(rawProcessedData.versionComment))
Expand Down Expand Up @@ -114,22 +132,26 @@ open class ReleasedDataModel(
DataUseTerms.Open
}

private fun computeSiloVersionStatus(
// LATEST_VERSION: This is the highest version of the sequence entry
// REVOKED: This is not the highest version of the sequence entry, and a higher version is a revocation
// REVISED: This is not the highest version of the sequence entry, and no higher version is a revocation
// Note: a revocation entry is only REVOKED when there's a higher version that is a revocation
private fun computeVersionStatus(
rawProcessedData: RawProcessedData,
latestVersions: Map<Accession, Version>,
latestRevocationVersions: Map<Accession, Version>,
): SiloVersionStatus {
): VersionStatus {
val isLatestVersion = (latestVersions[rawProcessedData.accession] == rawProcessedData.version)
if (isLatestVersion) {
return SiloVersionStatus.LATEST_VERSION
return VersionStatus.LATEST_VERSION
}

latestRevocationVersions[rawProcessedData.accession]?.let {
if (it > rawProcessedData.version) {
return SiloVersionStatus.REVOKED
return VersionStatus.REVOKED
}
}

return SiloVersionStatus.REVISED
return VersionStatus.REVISED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,16 @@ open class SubmissionDatabaseService(
.associate { it[SequenceEntriesView.accessionColumn] to it[maxVersionExpression]!! }
}

// Make sure to keep in sync with streamReleasedSubmissions query
fun countReleasedSubmissions(organism: Organism): Long = SequenceEntriesView.select(
SequenceEntriesView.accessionColumn,
).where {
SequenceEntriesView.statusIs(Status.APPROVED_FOR_RELEASE) and SequenceEntriesView.organismIs(
organism,
)
}.count()

// Make sure to keep in sync with countReleasedSubmissions query
fun streamReleasedSubmissions(organism: Organism): Sequence<RawProcessedData> = SequenceEntriesView.join(
DataUseTermsTable,
JoinType.LEFT,
Expand Down Expand Up @@ -986,36 +996,31 @@ open class SubmissionDatabaseService(
Clock.System.now().toEpochMilliseconds() - timeToStaleInSeconds * 1000,
).toLocalDateTime(TimeZone.UTC)

transaction {
// Check if there are any stale sequences before attempting to delete
val staleSequencesExist = SequenceEntriesPreprocessedDataTable
.selectAll()
.where {
SequenceEntriesPreprocessedDataTable.statusIs(PreprocessingStatus.IN_PROCESSING) and
(SequenceEntriesPreprocessedDataTable.startedProcessingAtColumn.less(staleDateTime))
}
.limit(1)
.count() > 0
// Check if there are any stale sequences before attempting to delete
val staleSequencesExist = SequenceEntriesPreprocessedDataTable
.selectAll()
.where {
SequenceEntriesPreprocessedDataTable.statusIs(PreprocessingStatus.IN_PROCESSING) and
(SequenceEntriesPreprocessedDataTable.startedProcessingAtColumn.less(staleDateTime))
}
.limit(1)
.count() > 0

if (staleSequencesExist) {
val numberDeleted = SequenceEntriesPreprocessedDataTable.deleteWhere {
statusIs(IN_PROCESSING) and startedProcessingAtColumn.less(staleDateTime)
}
log.info { "Cleaned up $numberDeleted stale sequences in processing" }
} else {
log.info { "No stale sequences found for cleanup" }
if (staleSequencesExist) {
val numberDeleted = SequenceEntriesPreprocessedDataTable.deleteWhere {
statusIs(IN_PROCESSING) and startedProcessingAtColumn.less(staleDateTime)
}
log.info { "Cleaned up $numberDeleted stale sequences in processing" }
} else {
log.info { "No stale sequences found for cleanup" }
}
}

fun useNewerProcessingPipelineIfPossible(): Long? {
log.info("Checking for newer processing pipeline versions")
return transaction {
val newVersion = findNewPreprocessingPipelineVersion()

if (newVersion == null) {
return@transaction null
}
?: return@transaction null

val pipelineNeedsUpdate = CurrentProcessingPipelineTable
.selectAll().where { CurrentProcessingPipelineTable.versionColumn neq newVersion }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package org.loculus.backend.service.submission

import org.jetbrains.exposed.sql.Table

const val UPDATE_TRACKER_TABLE_NAME = "table_update_tracker"

object UpdateTrackerTable : Table(UPDATE_TRACKER_TABLE_NAME) {
val lastTimeUpdatedDbColumn = varchar("last_time_updated", 255).nullable()
val tableNameColumn = varchar("table_name", 255)
val tableNameColumn = text("table_name")
val lastTimeUpdatedDbColumn = text("last_time_updated")

override val primaryKey = PrimaryKey(tableNameColumn)
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ SELECT create_update_trigger_for_table('groups_table');
SELECT create_update_trigger_for_table('current_processing_pipeline');
SELECT create_update_trigger_for_table('metadata_upload_aux_table');
SELECT create_update_trigger_for_table('sequence_upload_aux_table');
SELECT create_update_trigger_for_table('user_groups_table');
SELECT create_update_trigger_for_table('user_groups_table');
Loading

0 comments on commit 1498dec

Please sign in to comment.