Skip to content

Commit

Permalink
Documentation/stack (#76)
Browse files Browse the repository at this point in the history
* Refactor of the code to follow the diamond architecture guidelines.

Creation of the API layer.
Simplification of logic.
Execution of datagenerator and flink in 05-main and spark in 04-processor-spark using sbt.
Update of aliases.sbt

* Cleaning of overloaded dependencies from the three in build.sbt

* Changing module names in build.sbt to help reading the tree visually.

* WIP.

Updating documentation

* Change the -c- in data generator to -u-

* Delete runSpark.sh and finish the HowTo

* Try new anchor for going back to the index

* Update all the "go back to index" so they work on GitHub page.
remove the go to index under the tittle:
- "Run with Docker"
- "Data Generator image"

* Remove all the "Go back to Index for levels h4 and h5"

* Remove three files that are not needed anymore

* Fix style

* Remove unused plugins.sbt

* Update versions in plugins.sbt

* Update library version and rename a final cas class in Vulcan (data-generator) to avoid ambiguous reference between it and anonymous class io.confluent.kafka.serializers.KafkaAvroSerializer {...}

* Replace the extension methods of the refined types for the configuration for Conversion types. It removed the need to add and additional method call using dot notation on the values.

* Expanded index and links to all the new levels.

* Add final in front of case classes and classes that should not be extended.

* Replace the safeAplly method with a Try plus .toEither on the method valueOf that is internal to enums. Include testing and fix a bug in the apply for positive ints.

* Fix Style and typo

* Abstract over the configuration.
- Move the definition of configuration to the api layer (for now kafka-consumer and flink)
- Change the API of kafka-consumer and flink to receive a configuration that inherits the api layer definition of the configuration
- Implement the configuration loader for both kafka-consumer and flink in the configuration modules ciris and pureconfig
- Update the ciris modules to match application.conf files
- Update the module main to be able to run Flink loading ciris  or pureconfig configurations
- Update the build.sbt to removed unwanted dependencies and add a couple of TODOs

* Delete kafkaConsumerGivens.scala

* Flink:
- Rename DataProcessor to FlinkProcessor

* Spark:
- Remove spark conf from ciris since no effect systems is needed for Spark
- Rename of some vals in the API configuration to match the application.conf
- Modification of SparkProcessorAPI removing the effect and moving the configuration type to the process method
- Creation of the implementation of the configuration loader in pureconfig
- Rename of DataProcessor to SparkProcessor
- Rename of SparkDataProcessor to SparkEngine
- Update of build.sbt and Main.s

* Spark:
- Deletion of the configuration that was present in the spark module

* Data Generator:
- build.sbt - cleaning up dependencies and unused libraries
- Delete configuration from module data-generator
- Update DataGenerator.scala to DataGeneratorAPI
- Update DataGeneratorAPI moving the type of the config to the method generate
- New main using Ciris in module main
- New main using Pureconfig in module main
- Creation of the trait  DataGeneratorConfigurationI in common api
- Implementation on the configuration in both ciris and pureconfig

* Fix badly pointed import and remove unnecessary ScalaDoc

* ReadMe:
- Scala 3 section
- Frameworks section
- Libraries section
- Tooling section
  • Loading branch information
dagmendez authored Sep 7, 2023
1 parent f9843e0 commit 0c05abb
Show file tree
Hide file tree
Showing 62 changed files with 1,662 additions and 867 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object model:
* @param distance
* value of the aggregation, in meters.
*/
case class TotalDistanceByTrip(tripId: TripId, distance: Meters)
final case class TotalDistanceByTrip(tripId: TripId, distance: Meters)

/**
* Total distance cycled by a given user.
Expand All @@ -46,7 +46,7 @@ object model:
* @param distance
* Distance cycled in meters for the given user ID.
*/
case class TotalDistanceByUser(userId: UserId, distance: Meters)
final case class TotalDistanceByUser(userId: UserId, distance: Meters)

/**
* Current speed of the given trip ID.
Expand All @@ -60,7 +60,7 @@ object model:
* @todo
* maybe a new parameter with a timestamp should be added to capture the time of the speed in the bicycle.
*/
case class CurrentSpeed(tripId: TripId, speed: Speed)
final case class CurrentSpeed(tripId: TripId, speed: Speed)

/**
* Total range of the given bicycle for the given trip.
Expand All @@ -74,4 +74,4 @@ object model:
* @param remainingRange
* Remaining range in meters calculated from the remaining energy in the battery.
*/
case class TotalRange(tripId: TripId, bicycleId: BicycleId, remainingRange: Meters)
final case class TotalRange(tripId: TripId, bicycleId: BicycleId, remainingRange: Meters)
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.concurrent.duration.FiniteDuration
import com.fortyseven.domain.model.types.refinedTypes.*

/**
* Contains case classes that represent atomic information captured by sensors.
* Contains final case classes that represent atomic information captured by sensors.
*/
object model:

Expand All @@ -36,7 +36,7 @@ object model:
* @param longitude
* Expressed in a refined type that is a subset of Double (-180.0, 180.0).
*/
case class GPSPosition(latitude: Latitude, longitude: Longitude)
final case class GPSPosition(latitude: Latitude, longitude: Longitude)

/**
* WheelRotation
Expand All @@ -46,7 +46,7 @@ object model:
* @param frequency
* Expressed in a refined type, that is a subset of Double (>=0.0).
*/
case class WheelRotation(frequency: Hz)
final case class WheelRotation(frequency: Hz)

/**
* BatteryCharge.
Expand All @@ -57,7 +57,7 @@ object model:
* @param percentage
* Expressed in a refined type that is a subset of Double (0.00, 100.00).
*/
case class BatteryCharge(percentage: Percentage)
final case class BatteryCharge(percentage: Percentage)

/**
* BatteryHealth.
Expand All @@ -68,7 +68,7 @@ object model:
* @param remaining
* Expressed in a refined type that is a subset of Double (0.0, 100.0).
*/
case class BatteryHealth(remaining: Percentage)
final case class BatteryHealth(remaining: Percentage)

/**
* PneumaticPressure.
Expand All @@ -79,7 +79,7 @@ object model:
* @param pressure
* Expressed in a refined type that is a subset of Double (>=0.0).
*/
case class PneumaticPressure(pressure: Bar)
final case class PneumaticPressure(pressure: Bar)

/**
* BreaksUsage.
Expand All @@ -90,7 +90,7 @@ object model:
* @param finiteDuration
* Expressed in seconds [[scala.concurrent.duration.FiniteDuration]].
*/
case class BreaksUsage(finiteDuration: FiniteDuration)
final case class BreaksUsage(finiteDuration: FiniteDuration)

/**
* BreaksHealth.
Expand All @@ -101,4 +101,4 @@ object model:
* @param remaining
* Expressed in a refined type that is a subset of Double (0.0, 100.0).
*/
case class BreaksHealth(remaining: Percentage)
final case class BreaksHealth(remaining: Percentage)
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.fortyseven.common.api

trait DataGeneratorAPI[F[_], Configuration]:
import com.fortyseven.common.configuration.DataGeneratorConfigurationI

def generate(configuration: ConfigurationAPI[F, Configuration]): F[Unit]
trait DataGeneratorAPI[F[_]]:

def generate[Configuration <: DataGeneratorConfigurationI](configuration: Configuration): F[Unit]
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2023 Xebia Functional
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.fortyseven.common.configuration

import scala.concurrent.duration.FiniteDuration

import com.fortyseven.common.configuration.refinedTypes.{KafkaCompressionType, NonEmptyString, PositiveInt}

trait DataGeneratorConfigurationI:
val kafka: DataGeneratorKafkaConfigurationI
val schemaRegistry: DataGeneratorSchemaRegistryConfigurationI

trait DataGeneratorKafkaConfigurationI:
val broker: DataGeneratorBrokerConfigurationI
val producer: DataGeneratorProducerConfigurationI

trait DataGeneratorBrokerConfigurationI:
val bootstrapServers: NonEmptyString

trait DataGeneratorProducerConfigurationI:
val topicName: NonEmptyString
val valueSerializerClass: NonEmptyString
val maxConcurrent: PositiveInt
val compressionType: KafkaCompressionType
val commitBatchWithinSize: PositiveInt
val commitBatchWithinTime: FiniteDuration

trait DataGeneratorSchemaRegistryConfigurationI:
val schemaRegistryUrl: NonEmptyString
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2023 Xebia Functional
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.fortyseven.common.configuration

import scala.concurrent.duration.FiniteDuration

import com.fortyseven.common.configuration.refinedTypes.{KafkaAutoOffsetReset, KafkaCompressionType, NonEmptyString, PositiveInt}

trait FlinkProcessorConfigurationI:
val kafka: FlinkProcessorKafkaConfigurationI
val schemaRegistry: FlinkProcessorSchemaRegistryConfigurationI

trait FlinkProcessorKafkaConfigurationI:
val broker: FlinkProcessorBrokerConfigurationI
val consumer: Option[FlinkProcessorConsumerConfigurationI]
val producer: Option[FlinkProcessorProducerConfigurationI]

trait FlinkProcessorBrokerConfigurationI:
val brokerAddress: NonEmptyString

trait FlinkProcessorConsumerConfigurationI:
val topicName: NonEmptyString
val autoOffsetReset: KafkaAutoOffsetReset
val groupId: NonEmptyString
val maxConcurrent: PositiveInt

trait FlinkProcessorProducerConfigurationI:
val topicName: NonEmptyString
val valueSerializerClass: NonEmptyString
val maxConcurrent: PositiveInt
val compressionType: KafkaCompressionType
val commitBatchWithinSize: PositiveInt
val commitBatchWithinTime: FiniteDuration

trait FlinkProcessorSchemaRegistryConfigurationI:
val schemaRegistryUrl: NonEmptyString
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2023 Xebia Functional
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.fortyseven.common.configuration

import scala.concurrent.duration.FiniteDuration

import com.fortyseven.common.configuration.refinedTypes.*

trait KafkaConsumerConfigurationI:
val broker: KafkaConsumerBrokerConfigurationI
val consumer: Option[KafkaConsumerConsumerConfigurationI]
val producer: Option[KafkaConsumerProducerConfigurationI]

trait KafkaConsumerBrokerConfigurationI:
val brokerAddress: NonEmptyString

trait KafkaConsumerConsumerConfigurationI:
val topicName: NonEmptyString
val autoOffsetReset: KafkaAutoOffsetReset
val groupId: NonEmptyString
val maxConcurrent: PositiveInt

trait KafkaConsumerProducerConfigurationI:
val topicName: NonEmptyString
val valueSerializerClass: NonEmptyString
val maxConcurrent: PositiveInt
val compressionType: KafkaCompressionType
val commitBatchWithinSize: PositiveInt
val commitBatchWithinTime: FiniteDuration
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2023 Xebia Functional
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.fortyseven.common.configuration

import scala.concurrent.duration.FiniteDuration

import com.fortyseven.common.configuration.refinedTypes.NonEmptyString

trait SparkProcessorConfigurationI:
val app: SparkProcessorApplicationConfigurationI
val streaming: SparkProcessorStreamingConfigurationI
val reader: SparkProcessorReaderConfigurationI
val writer: SparkProcessorWriterConfigurationI

trait SparkProcessorApplicationConfigurationI:
val appName: NonEmptyString
val masterUrl: NonEmptyString

trait SparkProcessorStreamingConfigurationI:
val backpressureEnabled: Boolean
val blockInterval: FiniteDuration
val stopGracefullyOnShutdown: Boolean

trait SparkProcessorReaderConfigurationI:
val kafka: SparkProcessorKafkaConfigurationI

trait SparkProcessorWriterConfigurationI:
val format: NonEmptyString

trait SparkProcessorKafkaConfigurationI:
val bootstrapServers: NonEmptyString
val topic: NonEmptyString
val startingOffsets: NonEmptyString
val endingOffsets: NonEmptyString
Loading

0 comments on commit 0c05abb

Please sign in to comment.