1+ package com.dragos.kafkacsvloader.integration
2+
3+ import com.dragos.kafkacsvloader.avro.AvroRecordMapper
4+ import com.dragos.kafkacsvloader.avro.AvroSchemaLoader
5+ import com.dragos.kafkacsvloader.csv.CsvParser
6+ import com.dragos.kafkacsvloader.kafka.KafkaProducerClient
7+ import io.confluent.kafka.serializers.KafkaAvroDeserializer
8+ import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
9+ import io.kotest.matchers.shouldBe
10+ import org.apache.avro.generic.GenericRecord
11+ import org.apache.kafka.clients.consumer.ConsumerConfig
12+ import org.apache.kafka.clients.consumer.KafkaConsumer
13+ import org.apache.kafka.common.serialization.StringDeserializer
14+ import org.junit.jupiter.api.AfterAll
15+ import org.junit.jupiter.api.BeforeAll
16+ import org.junit.jupiter.api.Test
17+ import org.junit.jupiter.api.io.TempDir
18+ import org.testcontainers.containers.GenericContainer
19+ import org.testcontainers.containers.KafkaContainer
20+ import org.testcontainers.containers.Network
21+ import org.testcontainers.utility.DockerImageName
22+ import java.io.File
23+ import java.time.Duration
24+ import java.util.Properties
25+
26+ class KafkaIntegrationTest {
27+
28+ companion object {
29+ private lateinit var network: Network
30+ private lateinit var kafka: KafkaContainer
31+ private lateinit var schemaRegistry: GenericContainer <* >
32+
33+ private lateinit var bootstrapServers: String
34+ private lateinit var schemaRegistryUrl: String
35+
36+ @JvmStatic
37+ @BeforeAll
38+ fun setup () {
39+ network = Network .newNetwork()
40+
41+ // Start Kafka
42+ kafka = KafkaContainer (DockerImageName .parse(" confluentinc/cp-kafka:7.5.3" ))
43+ .withNetwork(network)
44+ .withNetworkAliases(" kafka" )
45+ kafka.start()
46+
47+ // Start Schema Registry
48+ schemaRegistry = GenericContainer (DockerImageName .parse(" confluentinc/cp-schema-registry:7.5.3" ))
49+ .withNetwork(network)
50+ .withExposedPorts(8081 )
51+ .withEnv(" SCHEMA_REGISTRY_HOST_NAME" , " schema-registry" )
52+ .withEnv(" SCHEMA_REGISTRY_LISTENERS" , " http://0.0.0.0:8081" )
53+ .withEnv(" SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS" , " PLAINTEXT://kafka:9092" )
54+ schemaRegistry.start()
55+
56+ bootstrapServers = kafka.bootstrapServers
57+ schemaRegistryUrl = " http://" + schemaRegistry.host+ " :" + schemaRegistry.getMappedPort(8081 )+ " "
58+
59+ println (" Kafka started at: $bootstrapServers " )
60+ println (" Schema Registry started at: $schemaRegistryUrl " )
61+ }
62+
63+ @JvmStatic
64+ @AfterAll
65+ fun teardown () {
66+ schemaRegistry.stop()
67+ kafka.stop()
68+ network.close()
69+ }
70+ }
71+
72+ @TempDir
73+ lateinit var tempDir: File
74+
75+ @Test
76+ fun `should load CSV data into Kafka with Avro schema end-to-end` () {
77+ // Given: Create test schema
78+ val schemaContent = """
79+ {
80+ "type": "record",
81+ "name": "User",
82+ "namespace": "com.dragos.test",
83+ "fields": [
84+ {"name": "id", "type": "int"},
85+ {"name": "name", "type": "string"},
86+ {"name": "email", "type": "string"},
87+ {"name": "age", "type": "int"},
88+ {"name": "active", "type": "boolean"}
89+ ]
90+ }
91+ """ .trimIndent()
92+ val schemaFile = File (tempDir, " user.avsc" ).apply {
93+ writeText(schemaContent)
94+ }
95+ val schema = AvroSchemaLoader .loadFromFile(schemaFile.absolutePath)
96+
97+ // Given: Create test CSV
98+ val csvContent = """
99+ id,name,email,age,active
100+ 1,Alice,alice@example.com,30,true
101+ 2,Bob,bob@example.com,25,false
102+ 3,Charlie,charlie@example.com,35,true
103+ """ .trimIndent()
104+ val csvFile = File (tempDir, " users.csv" ).apply {
105+ writeText(csvContent)
106+ }
107+ val csvData = CsvParser .parse(csvFile.absolutePath)
108+
109+ // Given: Kafka topic
110+ val topic = " test-users-" + System .currentTimeMillis()
111+
112+ // When: Send data to Kafka
113+ KafkaProducerClient (bootstrapServers, schemaRegistryUrl).use { producer ->
114+ csvData.rows.forEach { row ->
115+ val result = AvroRecordMapper .mapRow(schema, row)
116+ if (result is com.dragos.kafkacsvloader.avro.RowMappingResult .Success ) {
117+ val key = row[" id" ]
118+ producer.sendSync(topic, key, result.record)
119+ }
120+ }
121+ }
122+
123+ // Then: Consume and verify
124+ val consumer = createConsumer()
125+ consumer.subscribe(listOf (topic))
126+
127+ val records = mutableListOf<GenericRecord >()
128+ val startTime = System .currentTimeMillis()
129+ val timeout = 30_000L // 30 seconds
130+
131+ while (records.size < 3 && (System .currentTimeMillis() - startTime) < timeout) {
132+ val polled = consumer.poll(Duration .ofSeconds(2 ))
133+ polled.forEach { record ->
134+ records.add(record.value() as GenericRecord )
135+ }
136+ }
137+
138+ consumer.close()
139+
140+ // Verify we received all 3 records
141+ records.size shouldBe 3
142+
143+ // Verify first record
144+ val alice = records.find { (it.get(" name" ) as String ) == " Alice" }
145+ alice shouldBe org.junit.jupiter.api.Assertions .assertNotNull(alice)
146+ alice?.get(" id" ) shouldBe 1
147+ alice?.get(" email" ) shouldBe " alice@example.com"
148+ alice?.get(" age" ) shouldBe 30
149+ alice?.get(" active" ) shouldBe true
150+
151+ // Verify second record
152+ val bob = records.find { (it.get(" name" ) as String ) == " Bob" }
153+ bob shouldBe org.junit.jupiter.api.Assertions .assertNotNull(bob)
154+ bob?.get(" id" ) shouldBe 2
155+ bob?.get(" email" ) shouldBe " bob@example.com"
156+ bob?.get(" age" ) shouldBe 25
157+ bob?.get(" active" ) shouldBe false
158+
159+ // Verify third record
160+ val charlie = records.find { (it.get(" name" ) as String ) == " Charlie" }
161+ charlie shouldBe org.junit.jupiter.api.Assertions .assertNotNull(charlie)
162+ charlie?.get(" id" ) shouldBe 3
163+ charlie?.get(" email" ) shouldBe " charlie@example.com"
164+ charlie?.get(" age" ) shouldBe 35
165+ charlie?.get(" active" ) shouldBe true
166+ }
167+
168+ @Test
169+ fun `should handle validation errors gracefully` () {
170+ // Given: Create test schema
171+ val schemaContent = """
172+ {
173+ "type": "record",
174+ "name": "User",
175+ "namespace": "com.dragos.test",
176+ "fields": [
177+ {"name": "id", "type": "int"},
178+ {"name": "name", "type": "string"}
179+ ]
180+ }
181+ """ .trimIndent()
182+ val schemaFile = File (tempDir, " user.avsc" ).apply {
183+ writeText(schemaContent)
184+ }
185+ val schema = AvroSchemaLoader .loadFromFile(schemaFile.absolutePath)
186+
187+ // Given: CSV with invalid data
188+ val csvContent = """
189+ id,name
190+ not-a-number,Alice
191+ """ .trimIndent()
192+ val csvFile = File (tempDir, " invalid.csv" ).apply {
193+ writeText(csvContent)
194+ }
195+ val csvData = CsvParser .parse(csvFile.absolutePath)
196+
197+ // When: Try to map invalid row
198+ val result = AvroRecordMapper .mapRow(schema, csvData.rows.first())
199+
200+ // Then: Should fail with validation error
201+ result shouldBe org.junit.jupiter.api.Assertions .assertInstanceOf(
202+ com.dragos.kafkacsvloader.avro.RowMappingResult .Failure ::class .java,
203+ result
204+ )
205+ }
206+
207+ private fun createConsumer (): KafkaConsumer <String , GenericRecord > {
208+ val props = Properties ().apply {
209+ put(ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers)
210+ put(ConsumerConfig .GROUP_ID_CONFIG , " test-consumer-" + System .currentTimeMillis())
211+ put(ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , " earliest" )
212+ put(ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer ::class .java.name)
213+ put(ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , KafkaAvroDeserializer ::class .java.name)
214+ put(KafkaAvroDeserializerConfig .SCHEMA_REGISTRY_URL_CONFIG , schemaRegistryUrl)
215+ put(KafkaAvroDeserializerConfig .SPECIFIC_AVRO_READER_CONFIG , false )
216+ }
217+ return KafkaConsumer (props)
218+ }
219+ }
0 commit comments