From 4ae0ce69e3ecba13d2bf7e76617850d1f8484b56 Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Tue, 12 Nov 2024 14:09:30 -0800 Subject: [PATCH] bulk-cdk-core-extract: fix mistaken assumption about non-global streams (#48466) --- .../io/airbyte/cdk/read/StateManager.kt | 9 ++---- .../cdk/read/StateManagerGlobalStatesTest.kt | 29 ++++++++++--------- .../connectors/source-mysql/metadata.yaml | 2 +- .../source/mysql/MysqlJdbcEncryption.kt | 2 +- 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt index 412eb54b5b72..32301851737d 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt @@ -37,18 +37,15 @@ class StateManager( .mapKeys { it.key.id } } else { val globalStreams: Map = - global.streams.associateWith { initialStreamStates[it] } + global.streams.associateWith { initialStreamStates[it] } + + initialStreamStates.filterKeys { global.streams.contains(it).not() } this.global = GlobalStateManager( global = global, initialGlobalState = initialGlobalState, initialStreamStates = globalStreams, ) - nonGlobal = - initialStreamStates - .filterKeys { !globalStreams.containsKey(it) } - .mapValues { NonGlobalStreamStateManager(it.key, it.value) } - .mapKeys { it.key.id } + nonGlobal = emptyMap() } } diff --git a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/StateManagerGlobalStatesTest.kt b/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/StateManagerGlobalStatesTest.kt index 6f21a1d532c4..163834dd0653 100644 --- a/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/StateManagerGlobalStatesTest.kt +++ b/airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/StateManagerGlobalStatesTest.kt @@ -78,16 +78,11 @@ class StateManagerGlobalStatesTest { |"global":{"shared_state":{"cdc":"starting"}, |"stream_states":[ |{"stream_descriptor":{"name":"KV","namespace":"PUBLIC"}, - |"stream_state":{"initial_sync":"ongoing"}} + |"stream_state":{"initial_sync":"ongoing"}}, + |{"stream_descriptor":{"name":"EVENTS","namespace":"PUBLIC"}, + |"stream_state":{"full_refresh":"ongoing"}} |]}, - |"sourceStats":{"recordCount":123.0} - |} - """.trimMargin(), - """{ - |"type":"STREAM", - |"stream":{"stream_descriptor":{"name":"EVENTS","namespace":"PUBLIC"}, - |"stream_state":{"full_refresh":"ongoing"}}, - |"sourceStats":{"recordCount":456.0} + |"sourceStats":{"recordCount":579.0} |} """.trimMargin(), ) @@ -124,7 +119,9 @@ class StateManagerGlobalStatesTest { |"global":{"shared_state":{"cdc":"starting"}, |"stream_states":[ |{"stream_descriptor":{"name":"KV","namespace":"PUBLIC"}, - |"stream_state":{"initial_sync":"ongoing"}} + |"stream_state":{"initial_sync":"ongoing"}}, + |{"stream_descriptor":{"name":"EVENTS","namespace":"PUBLIC"}, + |"stream_state":{}} |]},"sourceStats":{"recordCount":123.0} |} """.trimMargin(), @@ -147,7 +144,9 @@ class StateManagerGlobalStatesTest { |"global":{"shared_state":{"cdc":"starting"}, |"stream_states":[ |{"stream_descriptor":{"name":"KV","namespace":"PUBLIC"}, - |"stream_state":{"initial_sync":"completed"}} + |"stream_state":{"initial_sync":"completed"}}, + |{"stream_descriptor":{"name":"EVENTS","namespace":"PUBLIC"}, + |"stream_state":{}} |]},"sourceStats":{"recordCount":1245.0} |} """.trimMargin(), @@ -197,7 +196,9 @@ class StateManagerGlobalStatesTest { |"global":{"shared_state":{"cdc":"starting"}, |"stream_states":[ |{"stream_descriptor":{"name":"KV","namespace":"PUBLIC"}, - |"stream_state":{"initial_sync":"completed"}} + |"stream_state":{"initial_sync":"completed"}}, + |{"stream_descriptor":{"name":"EVENTS","namespace":"PUBLIC"}, + |"stream_state":{}} |]},"sourceStats":{"recordCount":789.0} |} """.trimMargin(), @@ -245,7 +246,9 @@ class StateManagerGlobalStatesTest { |"global":{"shared_state":{"cdc":"ongoing"}, |"stream_states":[ |{"stream_descriptor":{"name":"KV","namespace":"PUBLIC"}, - |"stream_state":{"initial_sync":"completed"}} + |"stream_state":{"initial_sync":"completed"}}, + |{"stream_descriptor":{"name":"EVENTS","namespace":"PUBLIC"}, + |"stream_state":{}} |]}, |"sourceStats":{"recordCount":741.0} |} diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 35cc478ed2bc..cd58a33782da 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.9.0-rc.5 + dockerImageTag: 3.9.0-rc.6 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcEncryption.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcEncryption.kt index ab21dbc88774..f7bf5a456144 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcEncryption.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcEncryption.kt @@ -11,7 +11,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging import java.net.MalformedURLException import java.net.URI import java.nio.file.FileSystems -import java.util.* +import java.util.UUID private val log = KotlinLogging.logger {}