Skip to content

Commit

Permalink
refactor: first try to refactor DAO Pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
sfxcode committed Dec 10, 2024
1 parent 3caf2e6 commit ec72741
Show file tree
Hide file tree
Showing 52 changed files with 448 additions and 69 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ developers := List(

licenses += ("Apache-2.0", url("https://www.apache.org/licenses/LICENSE-2.0.html"))

crossScalaVersions := Seq("3.6.0")
crossScalaVersions := Seq("2.13.15")
//crossScalaVersions := Seq("3.6.0")

scalaVersion := crossScalaVersions.value.head

Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ addSbtPlugin("com.github.sbt" % "sbt-release" % "1.4.0")

addSbtPlugin("dev.quadstingray" %% "sbt-json" % "0.7.1")

//addSbtPlugin("ch.epfl.scala" % "sbt-scala3-migrate" % "0.7.1")
addSbtPlugin("ch.epfl.scala" % "sbt-scala3-migrate" % "0.7.1")


addDependencyTreePlugin
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/dev/mongocamp/driver/MongoImplicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ trait MongoImplicits extends ObservableIncludes with ObservableImplicits {

implicit def observableToResult[T](obs: Observable[T]): T = obs.result()

implicit def findObservableToResultList[T](obs: FindObservable[T]): List[T] = obs.resultList()
implicit def findObservableToResultList[T](obs: Observable[T]): List[T] = obs.resultList()

implicit def findObservableToResultOption[T](obs: FindObservable[T]): Option[T] = obs.resultOption()
implicit def findObservableToResultOption[T](obs: Observable[T]): Option[T] = obs.resultOption()

// gridfs-dao

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/dev/mongocamp/driver/mongodb/MongoDAO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ abstract class MongoDAO[A](provider: DatabaseProvider, collectionName: String)(i

val name: String = provider.guessName(collectionName)

val collection: MongoCollection[A] = provider.collection[A](collectionName)
val collection: MongoCollection[Document] = provider.collection[Document](collectionName)

def addChangeObserver(observer: ChangeObserver[A]): ChangeObserver[A] = {
coll.watch[A]().subscribe(observer)
Expand Down Expand Up @@ -64,7 +64,7 @@ abstract class MongoDAO[A](provider: DatabaseProvider, collectionName: String)(i
BsonConverter.fromBson(aggregationResult.get("keySet").head).asInstanceOf[List[String]]
}

protected def coll: MongoCollection[A] = collection
protected def coll: MongoCollection[Document] = collection

// internal object for raw document access
object Raw extends MongoDAO[Document](provider, collectionName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.mongodb.scala.gridfs.GridFSBucket
import scala.collection.mutable
import scala.reflect.ClassTag

class DatabaseProvider(val config: MongoConfig, val registry: CodecRegistry) extends Serializable {
class DatabaseProvider(val config: MongoConfig) extends Serializable {
private val cachedDatabaseMap = new mutable.HashMap[String, MongoDatabase]()
private val cachedMongoDAOMap = new mutable.HashMap[String, MongoDAO[Document]]()
private var cachedClient: Option[MongoClient] = None
Expand Down Expand Up @@ -67,7 +67,7 @@ class DatabaseProvider(val config: MongoConfig, val registry: CodecRegistry) ext

def database(databaseName: String = DefaultDatabaseName): MongoDatabase = {
if (!cachedDatabaseMap.contains(databaseName)) {
cachedDatabaseMap.put(databaseName, client.getDatabase(databaseName).withCodecRegistry(registry))
cachedDatabaseMap.put(databaseName, client.getDatabase(databaseName))
}
cachedDatabaseMap(databaseName)
}
Expand Down Expand Up @@ -155,16 +155,12 @@ object DatabaseProvider {
val ObjectIdKey = "_id"
val CollectionSeparator = ":"

private val CustomRegistry = fromProviders(CustomCodecProvider())

private val codecRegistry: CodecRegistry = fromRegistries(CustomRegistry, DEFAULT_CODEC_REGISTRY)

def apply(config: MongoConfig, registry: CodecRegistry = codecRegistry): DatabaseProvider = {
new DatabaseProvider(config, fromRegistries(registry, CustomRegistry, DEFAULT_CODEC_REGISTRY))
def apply(config: MongoConfig): DatabaseProvider = {
new DatabaseProvider(config)
}

def fromPath(configPath: String = MongoConfig.DefaultConfigPathPrefix, registry: CodecRegistry = codecRegistry): DatabaseProvider = {
apply(MongoConfig.fromPath(configPath), fromRegistries(registry, CustomRegistry, DEFAULT_CODEC_REGISTRY))
def fromPath(configPath: String = MongoConfig.DefaultConfigPathPrefix): DatabaseProvider = {
apply(MongoConfig.fromPath(configPath))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ import scala.concurrent.duration.durationToPair

abstract class Base[A] extends LazyLogging {

protected def coll: MongoCollection[A]
implicit def documentToObject[A](document: Document): A = {
// docment to json
// json to object
null.asInstanceOf[A]
}

protected def coll: MongoCollection[Document]

def count(filter: Bson = Document(), options: CountOptions = CountOptions()): Observable[Long] = {
coll.countDocuments(filter, options)
Expand Down
49 changes: 30 additions & 19 deletions src/main/scala/dev/mongocamp/driver/mongodb/operation/Crud.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dev.mongocamp.driver.mongodb.operation

import java.util.Date

import dev.mongocamp.driver.mongodb.database.DatabaseProvider
import dev.mongocamp.driver.mongodb.{ Converter, _ }
import org.mongodb.scala.bson.conversions.Bson
Expand All @@ -13,29 +12,34 @@ import org.mongodb.scala.{ BulkWriteResult, Observable, SingleObservable }
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import Updates._
import dev.mongocamp.driver.mongodb.bson.BsonConverter
import dev.mongocamp.driver.mongodb.sync.MongoSyncOperation

abstract class Crud[A]()(implicit ct: ClassTag[A]) extends Search[A] {

// create
def insertOne(value: A): Observable[InsertOneResult] = coll.insertOne(value)
def insertOne(value: A): Observable[InsertOneResult] = coll.insertOne(Converter.toDocument(value))

def insertOne(value: A, options: InsertOneOptions): Observable[InsertOneResult] =
coll.insertOne(value, options)
coll.insertOne(Converter.toDocument(value), options)

def insertMany(values: Seq[A]): Observable[InsertManyResult] =
coll.insertMany(values)
coll.insertMany(values.map(Converter.toDocument))

def insertMany(values: Seq[A], options: InsertManyOptions): Observable[InsertManyResult] =
coll.insertMany(values, options)
coll.insertMany(values.map(Converter.toDocument), options)

// bulk write

def bulkWrite(requests: List[WriteModel[_ <: A]], options: BulkWriteOptions): SingleObservable[BulkWriteResult] =
coll.bulkWrite(requests, options)
def bulkWrite(requests: List[WriteModel[_ <: A]], options: BulkWriteOptions): SingleObservable[BulkWriteResult] = {
// coll.bulkWrite(requests, options)
// todo
???
}

def bulkWrite(requests: List[WriteModel[_ <: A]], ordered: Boolean = true): SingleObservable[BulkWriteResult] =
def bulkWrite(requests: List[WriteModel[_ <: A]], ordered: Boolean = true): SingleObservable[BulkWriteResult] = {
bulkWrite(requests, BulkWriteOptions().ordered(ordered))
}

def bulkWriteMany(values: Seq[A], options: BulkWriteOptions): SingleObservable[BulkWriteResult] = {
val requests: ArrayBuffer[WriteModel[_ <: A]] = ArrayBuffer()
Expand All @@ -54,35 +58,42 @@ abstract class Crud[A]()(implicit ct: ClassTag[A]) extends Search[A] {
def replaceOne(value: A): Observable[UpdateResult] = {
val document = Converter.toDocument(value)
val oid = document.get(DatabaseProvider.ObjectIdKey).get
coll.replaceOne(equal(DatabaseProvider.ObjectIdKey, oid), value)
coll.replaceOne(equal(DatabaseProvider.ObjectIdKey, oid), document)
}

def replaceOne(value: A, options: ReplaceOptions): Observable[UpdateResult] = {
val document = Converter.toDocument(value)
val oid = document.get(DatabaseProvider.ObjectIdKey).get
coll.replaceOne(equal(DatabaseProvider.ObjectIdKey, oid), value, options)
coll.replaceOne(equal(DatabaseProvider.ObjectIdKey, oid), document, options)
}

def replaceOne(filter: Bson, value: A): Observable[UpdateResult] =
coll.replaceOne(filter, value)
def replaceOne(filter: Bson, value: A): Observable[UpdateResult] = {
coll.replaceOne(filter, Converter.toDocument(value))
}

def replaceOne(filter: Bson, value: A, options: ReplaceOptions): Observable[UpdateResult] =
coll.replaceOne(filter, value, options)
def replaceOne(filter: Bson, value: A, options: ReplaceOptions): Observable[UpdateResult] = {
coll.replaceOne(filter, Converter.toDocument(value), options)
}

def updateOne(filter: Bson, update: Bson): Observable[UpdateResult] =
def updateOne(filter: Bson, update: Bson): Observable[UpdateResult] = {
coll.updateOne(filter, update)
}

def updateOne(filter: Bson, update: Bson, options: UpdateOptions): Observable[UpdateResult] =
def updateOne(filter: Bson, update: Bson, options: UpdateOptions): Observable[UpdateResult] = {
coll.updateOne(filter, update, options)
}

def updateMany(filter: Bson, update: Bson): Observable[UpdateResult] =
def updateMany(filter: Bson, update: Bson): Observable[UpdateResult] = {
coll.updateMany(filter, update)
}

def updateMany(filter: Bson, update: Bson, options: UpdateOptions): Observable[UpdateResult] =
def updateMany(filter: Bson, update: Bson, options: UpdateOptions): Observable[UpdateResult] = {
coll.updateMany(filter, update, options)
}

def touchInternal(filter: Bson): Observable[UpdateResult] =
def touchInternal(filter: Bson): Observable[UpdateResult] = {
updateMany(filter, set(MongoSyncOperation.SyncColumnLastUpdate, new Date()))
}

// delete

Expand Down
42 changes: 25 additions & 17 deletions src/main/scala/dev/mongocamp/driver/mongodb/operation/Search.scala
Original file line number Diff line number Diff line change
@@ -1,45 +1,53 @@
package dev.mongocamp.driver.mongodb.operation

import dev.mongocamp.driver.mongodb._
import dev.mongocamp.driver.mongodb.bson.BsonConverter
import dev.mongocamp.driver.mongodb.bson.BsonConverter._
import dev.mongocamp.driver.mongodb.database.DatabaseProvider
import org.bson.BsonValue
import org.mongodb.scala.bson.ObjectId
import org.mongodb.scala.bson.conversions.Bson
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.{ AggregateObservable, DistinctObservable, Document, FindObservable, MongoCollection }
import org.mongodb.scala.{ AggregateObservable, DistinctObservable, Document, FindObservable, MongoCollection, Observable }

import scala.reflect.ClassTag

abstract class Search[A]()(implicit ct: ClassTag[A]) extends Base[A] {

protected def coll: MongoCollection[A]
protected def coll: MongoCollection[Document]

def find(
filter: Bson = Document(),
sort: Bson = Document(),
projection: Bson = Document(),
limit: Int = 0
): FindObservable[A] =
if (limit > 0) {
coll.find(filter).sort(sort).projection(projection).limit(limit)
}
else {
coll.find(filter).sort(sort).projection(projection)
}

def findById(oid: ObjectId): FindObservable[A] = find(equal(DatabaseProvider.ObjectIdKey, oid))

def find(name: String, value: Any): FindObservable[A] =
): Observable[A] = {
{
if (limit > 0) {
coll.find(filter).sort(sort).projection(projection).limit(limit)
}
else {
coll.find(filter).sort(sort).projection(projection)
}
}.map(doc => documentToObject[A](doc))
}

def findById(oid: ObjectId): Observable[A] = find(equal(DatabaseProvider.ObjectIdKey, oid))

def find(name: String, value: Any): Observable[A] = {
find(equal(name, value))
}

def distinct[S <: Any](fieldName: String, filter: Bson = Document()): DistinctObservable[BsonValue] =
def distinct[S <: Any](fieldName: String, filter: Bson = Document()): DistinctObservable[BsonValue] = {
coll.distinct[BsonValue](fieldName, filter)
}

def distinctResult[S <: Any](fieldName: String, filter: Bson = Document()): Seq[S] =
def distinctResult[S <: Any](fieldName: String, filter: Bson = Document()): Seq[S] = {
distinct(fieldName, filter).resultList().map(v => fromBson(v).asInstanceOf[S])
}

def findAggregated(pipeline: Seq[Bson], allowDiskUse: Boolean = false): AggregateObservable[A] =
coll.aggregate(pipeline).allowDiskUse(allowDiskUse)
def findAggregated(pipeline: Seq[Bson], allowDiskUse: Boolean = false): Observable[A] = {
coll.aggregate(pipeline).allowDiskUse(allowDiskUse).map(doc => documentToObject[A](doc)).asInstanceOf[AggregateObservable[A]]
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ case class MongoPaginatedFilter[A <: Any](dao: MongoDAO[A], filter: Bson = Map()
if (page <= 0) {
throw MongoCampPaginationException("page must be greater then 0.")
}
val allPages = Math.ceil(count.toDouble / rows).toInt
val skip = (page - 1) * rows
val responseList = dao.find(filter, sort, projection, rows.toInt).skip(skip.toInt).resultList(maxWait)
val allPages = Math.ceil(count.toDouble / rows).toInt
val skip = (page - 1) * rows
// todo: add projection
val responseList = dao.find(filter, sort, projection, rows.toInt).resultList(maxWait)
// val responseList = dao.find(filter, sort, projection, rows.toInt).skip(skip.toInt).resultList(maxWait)
PaginationResult(responseList, PaginationInfo(count, rows, page, allPages))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ case class MongoSyncOperation(
idColumnName: String = DatabaseProvider.ObjectIdKey
) extends LazyLogging
with Filter {
val includes: Bson = include(idColumnName, MongoSyncOperation.SyncColumnLastSync, MongoSyncOperation.SyncColumnLastUpdate)

def excecute(source: DatabaseProvider, target: DatabaseProvider): List[MongoSyncResult] =
try {
val sourceInfos: Seq[Document] = source.dao(collectionName).find().projection(includes).results(MongoSyncOperation.MaxWait)
val targetInfos: Seq[Document] = target.dao(collectionName).find().projection(includes).results(MongoSyncOperation.MaxWait)
val sourceInfos: Seq[Document] = source.dao(collectionName).find().results(MongoSyncOperation.MaxWait)
val targetInfos: Seq[Document] = target.dao(collectionName).find().results(MongoSyncOperation.MaxWait)

if (SyncDirection.SourceToTarget == syncDirection) {
val diff = sourceInfos.diff(targetInfos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class DocumentHelperSpec extends Specification {

val document = DocumentHelper.documentFromJsonString(lines.head)

document must beSome()
document.isDefined must be equalTo(true)

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ object TestDatabase extends LazyLogging {

File(ImageDAOTargetPath).createIfNotExists()

private val registry = fromProviders(classOf[Person], classOf[Friend], classOf[CodecTest], classOf[Book], classOf[Grade], classOf[Score])

val provider: DatabaseProvider = DatabaseProvider.fromPath(configPath = "unit.test.mongo", registry = fromRegistries(registry))
val provider: DatabaseProvider = DatabaseProvider.fromPath(configPath = "unit.test.mongo")

def consumeDatabaseChanges(changeStreamDocument: ChangeStreamDocument[Document]): Unit = {
if (changeStreamDocument.getOperationType != OperationType.INSERT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,16 @@ import dev.mongocamp.driver.mongodb.database.DatabaseProvider
import dev.mongocamp.driver.mongodb.model._
import dev.mongocamp.driver.mongodb.server.LocalServer
import dev.mongocamp.driver.mongodb.{GridFSDAO, MongoDAO}
import org.bson.codecs.configuration.CodecRegistries.{fromProviders, fromRegistries}
import org.bson.codecs.configuration.CodecRegistry
import org.mongodb.scala.bson.codecs.Macros._


object UniversityDatabase {
// create local test server (mongodb-java-server)
var LocalTestServer: LocalServer = _

// create codecs for custom classes
private val universityRegistry: CodecRegistry = fromProviders(classOf[Student], classOf[Score], classOf[Grade])

private val registry: CodecRegistry = fromRegistries(universityRegistry)

// create provider
val provider: DatabaseProvider = DatabaseProvider.fromPath(configPath = "unit.test.mongo.local", registry = registry)
val provider: DatabaseProvider = DatabaseProvider.fromPath(configPath = "unit.test.mongo.local")

// setup DAO objects with mongodb collection names

Expand Down
Loading

0 comments on commit ec72741

Please sign in to comment.