Skip to content

Commit

Permalink
Await all parts of migration (#244)
Browse files Browse the repository at this point in the history
Summary:
Right now we await only last furure in this migration but we are not guaranteed that all the previous futures finish before this one. Each future consists of several separate queries to ZK...

JIRA issues: DCOS-39324
  • Loading branch information
alenkacz authored Aug 21, 2018
1 parent d9e5298 commit 02dc83f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
4 changes: 3 additions & 1 deletion jobs/src/main/scala/dcos/metronome/migration/Migration.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package dcos.metronome
package migration

import scala.concurrent.ExecutionContext

/** Handles the state migration */
trait Migration {

/** This call will block until the migration completed */
def migrate(): Unit
def migrate()(implicit ec: ExecutionContext): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,29 @@ import dcos.metronome.repository.impl.kv.{ JobHistoryPathResolver, JobRunPathRes
import mesosphere.util.state.{ PersistentStore, PersistentStoreManagement, PersistentStoreWithNestedPathsSupport }
import org.slf4j.LoggerFactory

import scala.async.Async.{ async, await }
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }
import scala.concurrent.{ Await, ExecutionContext, Future }

class MigrationImpl(store: PersistentStore) extends Migration {
import MigrationImpl._

override def migrate(): Unit = {
override def migrate()(implicit ec: ExecutionContext): Unit = {
Await.result(initializeStore(), Duration.Inf)
log.info("Migration successfully applied for version")
}

private[this] def initializeStore(): Future[Unit] = store match {
case store: PersistentStoreManagement with PersistentStoreWithNestedPathsSupport =>
store.initialize()
store.createPath(JobSpecPathResolver.basePath)
store.createPath(JobRunPathResolver.basePath)
store.createPath(JobHistoryPathResolver.basePath)
case _: PersistentStore => Future.successful(())
private[this] def initializeStore()(implicit ec: ExecutionContext): Future[Unit] = async {
store match {
case store: PersistentStoreManagement with PersistentStoreWithNestedPathsSupport =>
await(store.initialize())
await(store.createPath(JobSpecPathResolver.basePath))
await(store.createPath(JobRunPathResolver.basePath))
await(store.createPath(JobHistoryPathResolver.basePath))
case _: PersistentStore =>
log.info("Unsupported type of persistent store. Not running any migrations.")
Future.successful(())
}
}

}
Expand Down

0 comments on commit 02dc83f

Please sign in to comment.