Skip to content

Commit

Permalink
Reorganize project structure (#113)
Browse files Browse the repository at this point in the history
The `streaming` module was splat into three modules:
- consumers
- graph_builders
- processors
  • Loading branch information
s-vitaliy authored Dec 6, 2024
1 parent 1e6f09e commit 70fa1ec
Show file tree
Hide file tree
Showing 11 changed files with 36 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.sneaksanddata.arcane.framework
package services.consumers

import services.streaming.BackfillDataGraphBuilder
import services.streaming.graph_builders.BackfillDataGraphBuilder

import org.slf4j.{Logger, LoggerFactory}
import zio.{ZIO, ZLayer}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.sneaksanddata.arcane.framework
package services.streaming
package services.streaming.consumers

import models.app.StreamContext
import models.settings.SinkSettings
import models.{ArcaneSchema, DataRow}
import services.base.SchemaProvider
import services.consumers.{BatchApplicationResult, SqlServerChangeTrackingMergeBatch, StagedVersionedBatch}
import services.lakehouse.{CatalogWriter, given_Conversion_ArcaneSchema_Schema}
import services.streaming.IcebergConsumer.{getTableName, toStagedBatch}
import IcebergConsumer.{getTableName, toStagedBatch}
import services.streaming.base.{BatchConsumer, BatchProcessor}

import org.apache.iceberg.rest.RESTCatalog
Expand All @@ -19,6 +19,11 @@ import zio.{Chunk, Task, ZIO, ZLayer}
import java.time.format.DateTimeFormatter
import java.time.{ZoneOffset, ZonedDateTime}

/**
* A trait that represents a streaming consumer.
*/
trait StreamingConsumer extends BatchConsumer[Chunk[DataRow]]

/**
* A consumer that writes the data to the staging table.
*
Expand All @@ -30,7 +35,7 @@ class IcebergConsumer(streamContext: StreamContext,
sinkSettings: SinkSettings,
catalogWriter: CatalogWriter[RESTCatalog, Table, Schema],
schemaProvider: SchemaProvider[ArcaneSchema],
mergeProcessor: BatchProcessor[StagedVersionedBatch, BatchApplicationResult]) extends BatchConsumer[Chunk[DataRow]]:
mergeProcessor: BatchProcessor[StagedVersionedBatch, BatchApplicationResult]) extends StreamingConsumer:

private val logger: Logger = LoggerFactory.getLogger(classOf[IcebergConsumer])

Expand Down Expand Up @@ -102,7 +107,7 @@ object IcebergConsumer:
/**
* The ZLayer that creates the IcebergConsumer.
*/
val layer: ZLayer[Environment, Nothing, IcebergConsumer] =
val layer: ZLayer[Environment, Nothing, StreamingConsumer] =
ZLayer {
for
streamContext <- ZIO.service[StreamContext]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.sneaksanddata.arcane.framework
package services.streaming
package services.streaming.graph_builders

import models.DataRow
import services.app.base.StreamLifetimeService
Expand Down Expand Up @@ -29,11 +29,11 @@ class BackfillDataGraphBuilder(backfillDataProvider: BackfillDataProvider,
.flatMap(batch => ZStream.fromIterable(batch.read))
.via(batchProcessor.process)

override def consume: ZSink[Any, Throwable, StreamElementType, Any, Unit] =
ZSink.foreach { e =>
logger.info(s"Received ${e.size} rows from the streaming source")
ZIO.unit
}
override def consume: ZSink[Any, Throwable, StreamElementType, Any, Unit] = batchConsumer.consume
// ZSink.foreach { e =>
// logger.info(s"Received ${e.size} rows from the streaming source")
// ZIO.unit
// }

/**
* The companion object for the VersionedDataGraphBuilder class.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.sneaksanddata.arcane.framework
package services.streaming
package services.streaming.graph_builders

import models.DataRow
import models.settings.VersionedDataGraphBuilderSettings
import services.app.base.StreamLifetimeService
import services.mssql.MsSqlConnection.{DataBatch, VersionedBatch}
import services.mssql.given_HasVersion_VersionedBatch
import services.streaming.base.{BatchConsumer, BatchProcessor, StreamGraphBuilder, VersionedDataProvider}
import services.streaming.base.{BatchProcessor, StreamGraphBuilder, VersionedDataProvider}
import services.streaming.consumers.StreamingConsumer

import org.slf4j.{Logger, LoggerFactory}
import zio.stream.{ZSink, ZStream}
Expand All @@ -23,7 +24,7 @@ class VersionedDataGraphBuilder(versionedDataGraphBuilderSettings: VersionedData
versionedDataProvider: VersionedDataProvider[Long, VersionedBatch],
streamLifetimeService: StreamLifetimeService,
batchProcessor: BatchProcessor[DataBatch, Chunk[DataRow]],
batchConsumer: BatchConsumer[Chunk[DataRow]])
batchConsumer: StreamingConsumer)
extends StreamGraphBuilder:

private val logger: Logger = LoggerFactory.getLogger(classOf[VersionedDataGraphBuilder])
Expand Down Expand Up @@ -68,7 +69,7 @@ object VersionedDataGraphBuilder:
type Environment = VersionedDataProvider[Long, VersionedBatch]
& StreamLifetimeService
& BatchProcessor[DataBatch, Chunk[DataRow]]
& BatchConsumer[Chunk[DataRow]]
& StreamingConsumer
& VersionedDataGraphBuilderSettings

/**
Expand All @@ -83,7 +84,7 @@ object VersionedDataGraphBuilder:
versionedDataProvider: VersionedDataProvider[Long, VersionedBatch],
streamLifetimeService: StreamLifetimeService,
batchProcessor: BatchProcessor[DataBatch, Chunk[DataRow]],
batchConsumer: BatchConsumer[Chunk[DataRow]]): VersionedDataGraphBuilder =
batchConsumer: StreamingConsumer): VersionedDataGraphBuilder =
new VersionedDataGraphBuilder(versionedDataGraphBuilderSettings,
versionedDataProvider,
streamLifetimeService,
Expand All @@ -102,7 +103,7 @@ object VersionedDataGraphBuilder:
dp <- ZIO.service[VersionedDataProvider[Long, VersionedBatch]]
ls <- ZIO.service[StreamLifetimeService]
bp <- ZIO.service[BatchProcessor[DataBatch, Chunk[DataRow]]]
bc <- ZIO.service[BatchConsumer[Chunk[DataRow]]]
bc <- ZIO.service[StreamingConsumer]
yield VersionedDataGraphBuilder(sss, dp, ls, bp, bc)


Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.sneaksanddata.arcane.framework
package services.streaming
package services.streaming.processors

import models.DataRow
import models.settings.GroupingSettings
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.sneaksanddata.arcane.framework
package services.streaming
package services.streaming.processors

import models.DataRow
import models.settings.GroupingSettings
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.sneaksanddata.arcane.framework
package services.streaming
package services.streaming.processors

import models.querygen.MergeQuery
import services.consumers.{BatchApplicationResult, JdbcConsumer, StagedBatch, StagedVersionedBatch}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import services.app.base.StreamLifetimeService
import services.mssql.MsSqlConnection.{DataBatch, VersionedBatch}
import services.mssql.query.{LazyQueryResult, QueryRunner, ScalarQueryResult}
import services.mssql.{ConnectionOptions, MsSqlConnection, MsSqlDataProvider}
import services.streaming.base.{BatchConsumer, BatchProcessor, VersionedDataProvider}
import services.streaming.base.{BatchProcessor, VersionedDataProvider}
import services.streaming.consumers.StreamingConsumer
import services.streaming.graph_builders.VersionedDataGraphBuilder
import services.streaming.processors.LazyListGroupingProcessor
import utils.{TestConnectionInfo, TestGroupingSettings, TestStreamLifetimeService}

import com.microsoft.sqlserver.jdbc.SQLServerDriver
Expand All @@ -18,7 +21,7 @@ import org.scalatest.matchers.should.Matchers.*
import org.scalatest.prop.TableDrivenPropertyChecks.forAll
import org.scalatest.prop.Tables.Table
import zio.stream.ZSink
import zio.{Chunk, FiberFailure, Runtime, Task, ULayer, Unsafe, ZIO, ZLayer}
import zio.{Chunk, FiberFailure, Runtime, ULayer, Unsafe, ZIO, ZLayer}

import java.sql.Connection
import java.time.Duration
Expand Down Expand Up @@ -185,7 +188,7 @@ class VersionedStreamGraphBuilderTests extends flatspec.AsyncFlatSpec with Match
test(conn)


class EmptyConsumer extends BatchConsumer[Chunk[DataRow]]:
class EmptyConsumer extends StreamingConsumer:
def consume: ZSink[Any, Throwable, Chunk[DataRow], Any, Unit] = ZSink.drain

class TestVersionedDataGraphBuilderSettings(override val lookBackInterval: Duration,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
STREAMCONTEXT__BACKFILL=false
STREAMCONTEXT__BACKFILL=true
STREAMCONTEXT__SPEC='{ "database": "IntegrationTests", "schema": "dbo", "table": "TestTable", "changeCaptureIntervalSeconds": 1, "commandTimeout": 3600, "groupingIntervalSeconds": 3, "groupsPerFile": 1, "lookBackInterval": 3600, "rowsPerGroup": 10000, "sinkLocation": "integration_tests_target_table", "catalogSettings": { "namespace": "test", "warehouse": "polaris", "catalogUri": "http://localhost:8181/api/catalog" } }'
STREAMCONTEXT__STREAM_ID=test
STREAMCONTEXT__STREAM_KIND=SqlServerChangeTracking
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import com.sneaksanddata.arcane.framework.services.consumers.JdbcConsumer
import com.sneaksanddata.arcane.framework.services.lakehouse.IcebergS3CatalogWriter
import com.sneaksanddata.arcane.framework.services.mssql.{ConnectionOptions, MsSqlConnection, MsSqlDataProvider}
import com.sneaksanddata.arcane.framework.services.streaming.base.{BatchProcessor, StreamGraphBuilder}
import com.sneaksanddata.arcane.framework.services.streaming.{BackfillGroupingProcessor, IcebergConsumer, LazyListGroupingProcessor, MergeProcessor}
import com.sneaksanddata.arcane.framework.services.streaming.consumers.IcebergConsumer
import com.sneaksanddata.arcane.framework.services.streaming.processors.{BackfillGroupingProcessor, LazyListGroupingProcessor, MergeProcessor}
import org.slf4j.MDC
import zio.logging.LogFormat
import zio.logging.backend.SLF4J
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package services

import com.sneaksanddata.arcane.framework.models.app.StreamContext
import com.sneaksanddata.arcane.framework.services.streaming.base.StreamGraphBuilder
import com.sneaksanddata.arcane.framework.services.streaming.{BackfillDataGraphBuilder, VersionedDataGraphBuilder}
import com.sneaksanddata.arcane.framework.services.streaming.graph_builders.{BackfillDataGraphBuilder, VersionedDataGraphBuilder}
import zio.{ZIO, ZLayer}

/**
Expand Down

0 comments on commit 70fa1ec

Please sign in to comment.