From 82329daf90b7f087198ce8c5a7fe19c4437caed6 Mon Sep 17 00:00:00 2001 From: NehharShah Date: Wed, 12 Nov 2025 16:29:11 -0500 Subject: [PATCH 1/3] adding support of TIME64 datatype --- flow/activities/flowable_core.go | 2 +- flow/connectors/bigquery/bigquery.go | 1 + flow/connectors/bigquery/qrep.go | 2 +- flow/connectors/bigquery/qrep_avro_sync.go | 2 +- flow/connectors/clickhouse/cdc.go | 5 ++- flow/connectors/clickhouse/clickhouse.go | 2 + flow/connectors/clickhouse/normalize.go | 4 +- flow/connectors/clickhouse/normalize_query.go | 17 +++++++- flow/connectors/core.go | 2 +- .../connectors/elasticsearch/elasticsearch.go | 2 +- flow/connectors/eventhub/eventhub.go | 2 +- flow/connectors/kafka/kafka.go | 2 +- flow/connectors/mysql/cdc.go | 7 ++-- flow/connectors/mysql/qrep.go | 2 +- flow/connectors/mysql/schema.go | 2 +- flow/connectors/mysql/type_conversion.go | 11 +++++- flow/connectors/postgres/postgres.go | 3 +- .../postgres/postgres_schema_delta_test.go | 8 ++-- flow/connectors/pubsub/pubsub.go | 2 +- flow/connectors/s3/s3.go | 2 +- .../snowflake/merge_stmt_generator.go | 2 +- flow/connectors/snowflake/snowflake.go | 7 ++-- flow/e2e/clickhouse_mysql_test.go | 39 +++++++++++++++++++ flow/e2e/snowflake_schema_delta_test.go | 9 +++-- flow/model/qvalue/kind.go | 10 +++++ flow/shared/constants.go | 3 ++ flow/workflows/qrep_flow.go | 1 + flow/workflows/setup_flow.go | 1 + protos/flow.proto | 1 + 29 files changed, 120 insertions(+), 33 deletions(-) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 412f6effff..434d3d69d6 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -320,7 +320,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon defer dstClose(ctx) syncState.Store(shared.Ptr("updating schema")) - if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas); err != nil { + if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas, config.Version); err != nil { return nil, fmt.Errorf("failed to sync schema: %w", err) } diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 83c602c8d2..e7fce002e7 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -239,6 +239,7 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas( flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + _ uint32, ) error { for _, schemaDelta := range schemaDeltas { if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 { diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index 71a360e1da..22412a88a0 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -87,7 +87,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep( } if err := c.ReplayTableSchemaDeltas( - ctx, config.Env, config.FlowJobName, nil, []*protos.TableSchemaDelta{tableSchemaDelta}, + ctx, config.Env, config.FlowJobName, nil, []*protos.TableSchemaDelta{tableSchemaDelta}, config.Version, ); err != nil { return nil, fmt.Errorf("failed to add columns to destination table: %w", err) } diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 2b2faace75..cf234031aa 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -98,7 +98,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( slog.String(string(shared.FlowNameKey), req.FlowJobName), slog.String("dstTableName", rawTableName)) - if err := s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil { + if err := s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version); err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 5ac23dd3b3..9076f89c98 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -131,7 +131,7 @@ func (c *ClickHouseConnector) syncRecordsViaAvro( } warnings := numericTruncator.Warnings() - if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil { + if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version); err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } @@ -165,6 +165,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas( flowJobName string, tableMappings []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + internalVersion uint32, ) error { if len(schemaDeltas) == 0 { return nil @@ -188,7 +189,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas( for _, addedColumn := range schemaDelta.AddedColumns { qvKind := types.QValueKind(addedColumn.Type) clickHouseColType, err := qvalue.ToDWHColumnType( - ctx, qvKind, env, protos.DBType_CLICKHOUSE, c.chVersion, addedColumn, schemaDelta.NullableEnabled, + ctx, qvKind, env, protos.DBType_CLICKHOUSE, c.chVersion, addedColumn, schemaDelta.NullableEnabled, internalVersion, ) if err != nil { return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 90c5baba24..e60d1e8208 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -443,6 +443,8 @@ func GetTableSchemaForTable(tm *protos.TableMapping, columns []driver.ColumnType qkind = types.QValueKindUUID case "DateTime64(6)", "Nullable(DateTime64(6))", "DateTime64(9)", "Nullable(DateTime64(9))": qkind = types.QValueKindTimestamp + case "Time64(3)", "Nullable(Time64(3))": + qkind = types.QValueKindTime case "Date32", "Nullable(Date32)": qkind = types.QValueKindDate case "Float32", "Nullable(Float32)": diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index ff6741d6a6..a7f4962ca9 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -66,6 +66,7 @@ func (c *ClickHouseConnector) SetupNormalizedTable( destinationTableIdentifier, sourceTableSchema, c.chVersion, + config.Version, ) if err != nil { return false, fmt.Errorf("error while generating create table sql for destination ClickHouse table: %w", err) @@ -85,6 +86,7 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable( tableIdentifier string, tableSchema *protos.TableSchema, chVersion *chproto.Version, + internalVersion uint32, ) ([]string, error) { var engine string tmEngine := protos.TableEngine_CH_ENGINE_REPLACING_MERGE_TREE @@ -203,7 +205,7 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable( if clickHouseType == "" { var err error clickHouseType, err = qvalue.ToDWHColumnType( - ctx, colType, config.Env, protos.DBType_CLICKHOUSE, chVersion, column, tableSchema.NullableEnabled || columnNullableEnabled, + ctx, colType, config.Env, protos.DBType_CLICKHOUSE, chVersion, column, tableSchema.NullableEnabled || columnNullableEnabled, internalVersion, ) if err != nil { return nil, fmt.Errorf("error while converting column type to ClickHouse type: %w", err) diff --git a/flow/connectors/clickhouse/normalize_query.go b/flow/connectors/clickhouse/normalize_query.go index 44f95dafec..a0cdfec3d8 100644 --- a/flow/connectors/clickhouse/normalize_query.go +++ b/flow/connectors/clickhouse/normalize_query.go @@ -123,7 +123,7 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error if clickHouseType == "" { var err error clickHouseType, err = qvalue.ToDWHColumnType( - ctx, colType, t.env, protos.DBType_CLICKHOUSE, t.chVersion, column, schema.NullableEnabled || columnNullableEnabled, + ctx, colType, t.env, protos.DBType_CLICKHOUSE, t.chVersion, column, schema.NullableEnabled || columnNullableEnabled, t.version, ) if err != nil { return "", fmt.Errorf("error while converting column type to clickhouse type: %w", err) @@ -131,6 +131,21 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error } switch clickHouseType { + case "Time64(3)", "Nullable(Time64(3))": + // Time64 is a time-of-day type, parse from JSON string + // toTime64 converts string to Time64(3), returns NULL if string is NULL or invalid + fmt.Fprintf(&projection, + "toTime64(JSONExtractString(_peerdb_data, %s),3) AS %s,", + peerdb_clickhouse.QuoteLiteral(colName), + peerdb_clickhouse.QuoteIdentifier(dstColName), + ) + if t.enablePrimaryUpdate { + fmt.Fprintf(&projectionUpdate, + "toTime64(JSONExtractString(_peerdb_match_data, %s),3) AS %s,", + peerdb_clickhouse.QuoteLiteral(colName), + peerdb_clickhouse.QuoteIdentifier(dstColName), + ) + } case "Date32", "Nullable(Date32)": fmt.Fprintf(&projection, "toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, %s),6,'UTC')) AS %s,", diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 37d493a48c..edd140ae1f 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -185,7 +185,7 @@ type CDCSyncConnectorCore interface { // This could involve adding multiple columns. // Connectors which are non-normalizing should implement this as a nop. ReplayTableSchemaDeltas(ctx context.Context, env map[string]string, flowJobName string, - tableMappings []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta) error + tableMappings []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, internalVersion uint32) error } type CDCSyncConnector interface { diff --git a/flow/connectors/elasticsearch/elasticsearch.go b/flow/connectors/elasticsearch/elasticsearch.go index 4c08d91a08..89edbb6f36 100644 --- a/flow/connectors/elasticsearch/elasticsearch.go +++ b/flow/connectors/elasticsearch/elasticsearch.go @@ -94,7 +94,7 @@ func (esc *ElasticsearchConnector) CreateRawTable(ctx context.Context, // we handle schema changes by not handling them since no mapping is being enforced right now func (esc *ElasticsearchConnector) ReplayTableSchemaDeltas(ctx context.Context, env map[string]string, - flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, _ uint32, ) error { return nil } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index eefbd22e23..d5d9908059 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -371,7 +371,7 @@ func (c *EventHubConnector) CreateRawTable(ctx context.Context, req *protos.Crea } func (c *EventHubConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string, - flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, _ uint32, ) error { return nil } diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index a33de879a3..266b8e959e 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -162,7 +162,7 @@ func (c *KafkaConnector) CreateRawTable(ctx context.Context, req *protos.CreateR } func (c *KafkaConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string, - flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, _ uint32, ) error { return nil } diff --git a/flow/connectors/mysql/cdc.go b/flow/connectors/mysql/cdc.go index 2256fa969e..bda24852e8 100644 --- a/flow/connectors/mysql/cdc.go +++ b/flow/connectors/mysql/cdc.go @@ -43,7 +43,7 @@ func (c *MySqlConnector) GetTableSchema( ) (map[string]*protos.TableSchema, error) { res := make(map[string]*protos.TableSchema, len(tableMappings)) for _, tm := range tableMappings { - tableSchema, err := c.getTableSchemaForTable(ctx, env, tm, system) + tableSchema, err := c.getTableSchemaForTable(ctx, env, tm, system, version) if err != nil { c.logger.Info("error fetching schema", slog.String("table", tm.SourceTableIdentifier), slog.Any("error", err)) return nil, err @@ -60,6 +60,7 @@ func (c *MySqlConnector) getTableSchemaForTable( env map[string]string, tm *protos.TableMapping, system protos.TypeSystem, + version uint32, ) (*protos.TableSchema, error) { qualifiedTable, err := common.ParseTableIdentifier(tm.SourceTableIdentifier) if err != nil { @@ -109,7 +110,7 @@ func (c *MySqlConnector) getTableSchemaForTable( if err != nil { return nil, err } - qkind, err := QkindFromMysqlColumnType(dataType) + qkind, err := QkindFromMysqlColumnType(dataType, version) if err != nil { return nil, err } @@ -723,7 +724,7 @@ func (c *MySqlConnector) processAlterTableQuery(ctx context.Context, catalogPool slog.String("tableName", sourceTableName)) continue } - qkind, err := QkindFromMysqlColumnType(col.Tp.InfoSchemaStr()) + qkind, err := QkindFromMysqlColumnType(col.Tp.InfoSchemaStr(), req.InternalVersion) if err != nil { return err } diff --git a/flow/connectors/mysql/qrep.go b/flow/connectors/mysql/qrep.go index 9605f5ca9d..379162ddbe 100644 --- a/flow/connectors/mysql/qrep.go +++ b/flow/connectors/mysql/qrep.go @@ -185,7 +185,7 @@ func (c *MySqlConnector) PullQRepRecords( stream *model.QRecordStream, ) (int64, int64, error) { tableSchema, err := c.getTableSchemaForTable(ctx, config.Env, - &protos.TableMapping{SourceTableIdentifier: config.WatermarkTable}, protos.TypeSystem_Q) + &protos.TableMapping{SourceTableIdentifier: config.WatermarkTable}, protos.TypeSystem_Q, config.Version) if err != nil { return 0, 0, fmt.Errorf("failed to get schema for watermark table %s: %w", config.WatermarkTable, err) } diff --git a/flow/connectors/mysql/schema.go b/flow/connectors/mysql/schema.go index c429b8db96..776131a118 100644 --- a/flow/connectors/mysql/schema.go +++ b/flow/connectors/mysql/schema.go @@ -109,7 +109,7 @@ func (c *MySqlConnector) GetColumns(ctx context.Context, version uint32, schema if err != nil { return nil, err } - qkind, err := QkindFromMysqlColumnType(columnType) + qkind, err := QkindFromMysqlColumnType(columnType, version) if err != nil { return nil, err } diff --git a/flow/connectors/mysql/type_conversion.go b/flow/connectors/mysql/type_conversion.go index b12038ac08..204d70aab9 100644 --- a/flow/connectors/mysql/type_conversion.go +++ b/flow/connectors/mysql/type_conversion.go @@ -4,10 +4,11 @@ import ( "fmt" "strings" + "github.com/PeerDB-io/peerdb/flow/shared" "github.com/PeerDB-io/peerdb/flow/shared/types" ) -func QkindFromMysqlColumnType(ct string) (types.QValueKind, error) { +func QkindFromMysqlColumnType(ct string, version uint32) (types.QValueKind, error) { // https://mariadb.com/docs/server/reference/data-types/date-and-time-data-types/timestamp#tab-current-1 ct, _ = strings.CutSuffix(ct, " /* mariadb-5.3 */") ct, _ = strings.CutSuffix(ct, " zerofill") @@ -24,7 +25,13 @@ func QkindFromMysqlColumnType(ct string) (types.QValueKind, error) { return types.QValueKindBytes, nil case "date": return types.QValueKindDate, nil - case "datetime", "timestamp", "time": + case "datetime", "timestamp": + return types.QValueKindTimestamp, nil + case "time": + // For new versions, map TIME to QValueKindTime instead of QValueKindTimestamp + if version >= shared.InternalVersion_ClickHouseTime64 { + return types.QValueKindTime, nil + } return types.QValueKindTimestamp, nil case "decimal", "numeric": return types.QValueKindNumeric, nil diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index c805a937c7..58bd7c9244 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -652,7 +652,7 @@ func syncRecordsCore[Items model.Items]( return nil, err } - if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil { + if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version); err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } @@ -1179,6 +1179,7 @@ func (c *PostgresConnector) ReplayTableSchemaDeltas( flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + _ uint32, ) error { if len(schemaDeltas) == 0 { return nil diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index fd27ac9400..8218be89cb 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -67,7 +67,7 @@ func (s PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { Nullable: true, }, }, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, shared.InternalVersion_Latest, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) @@ -117,7 +117,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, shared.InternalVersion_Latest, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) @@ -150,7 +150,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, shared.InternalVersion_Latest, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) @@ -183,7 +183,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, shared.InternalVersion_Latest, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) diff --git a/flow/connectors/pubsub/pubsub.go b/flow/connectors/pubsub/pubsub.go index 8c69d6d7b8..0d61ebe49e 100644 --- a/flow/connectors/pubsub/pubsub.go +++ b/flow/connectors/pubsub/pubsub.go @@ -75,7 +75,7 @@ func (c *PubSubConnector) CreateRawTable(ctx context.Context, req *protos.Create } func (c *PubSubConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string, - flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, _ uint32, ) error { return nil } diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index abc857cbcf..e507ab5d56 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -117,7 +117,7 @@ func (c *S3Connector) SyncRecords(ctx context.Context, req *model.SyncRecordsReq } func (c *S3Connector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string, - flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, _ uint32, ) error { return nil } diff --git a/flow/connectors/snowflake/merge_stmt_generator.go b/flow/connectors/snowflake/merge_stmt_generator.go index fe9721be27..f343497d88 100644 --- a/flow/connectors/snowflake/merge_stmt_generator.go +++ b/flow/connectors/snowflake/merge_stmt_generator.go @@ -36,7 +36,7 @@ func (m *mergeStmtGenerator) generateMergeStmt(ctx context.Context, env map[stri for _, column := range columns { genericColumnType := column.Type qvKind := types.QValueKind(genericColumnType) - sfType, err := qvalue.ToDWHColumnType(ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, column, normalizedTableSchema.NullableEnabled) + sfType, err := qvalue.ToDWHColumnType(ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, column, normalizedTableSchema.NullableEnabled, 0) if err != nil { return "", fmt.Errorf("failed to convert column type %s to snowflake type: %w", genericColumnType, err) } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 5fbd27b9b4..1c29c70d28 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -341,6 +341,7 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas( flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + _ uint32, ) error { if len(schemaDeltas) == 0 { return nil @@ -366,7 +367,7 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas( for _, addedColumn := range schemaDelta.AddedColumns { qvKind := types.QValueKind(addedColumn.Type) sfColtype, err := qvalue.ToDWHColumnType( - ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, addedColumn, schemaDelta.NullableEnabled, + ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, addedColumn, schemaDelta.NullableEnabled, 0, ) if err != nil { return fmt.Errorf("failed to convert column type %s to snowflake type: %w", @@ -451,7 +452,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( return nil, err } - if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil { + if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version); err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } @@ -664,7 +665,7 @@ func generateCreateTableSQLForNormalizedTable( normalizedColName := SnowflakeIdentifierNormalize(column.Name) qvKind := types.QValueKind(genericColumnType) sfColType, err := qvalue.ToDWHColumnType( - ctx, qvKind, config.Env, protos.DBType_SNOWFLAKE, nil, column, tableSchema.NullableEnabled, + ctx, qvKind, config.Env, protos.DBType_SNOWFLAKE, nil, column, tableSchema.NullableEnabled, 0, ) if err != nil { slog.WarnContext(ctx, fmt.Sprintf("failed to convert column type %s to snowflake type", genericColumnType), diff --git a/flow/e2e/clickhouse_mysql_test.go b/flow/e2e/clickhouse_mysql_test.go index 5e0c88a428..3ae1078eaf 100644 --- a/flow/e2e/clickhouse_mysql_test.go +++ b/flow/e2e/clickhouse_mysql_test.go @@ -3,6 +3,7 @@ package e2e import ( "fmt" "math" + "strconv" "strings" "github.com/stretchr/testify/require" @@ -115,6 +116,44 @@ func (s ClickHouseSuite) Test_MySQL_Time() { EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,\"key\",d,dt,tm,t") + // Verify that TIME column uses Time64(3) when ClickHouse version >= 25.6 and internal version >= InternalVersion_ClickHouseTime64 + ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig()) + require.NoError(s.t, err) + defer ch.Close() + + var columnType string + err = ch.QueryRow(s.t.Context(), fmt.Sprintf( + "SELECT type FROM system.columns WHERE database = currentDatabase() AND table = %s AND name = 't'", + clickhouse.QuoteLiteral(dstTableName), + )).Scan(&columnType) + require.NoError(s.t, err) + + chVersion, err := s.connector.GetVersion(s.t.Context()) + require.NoError(s.t, err) + + // Check if ClickHouse version >= 25.6 + versionParts := strings.Split(chVersion, ".") + if len(versionParts) >= 2 { + major, _ := strconv.Atoi(versionParts[0]) + minor, _ := strconv.Atoi(versionParts[1]) + + // If ClickHouse >= 25.6 and using latest internal version, should use Time64(3) + if major > 25 || (major == 25 && minor >= 6) { + if flowConnConfig.Version >= shared.InternalVersion_ClickHouseTime64 { + require.Contains(s.t, columnType, "Time64(3)", + "Expected Time64(3) for TIME column when ClickHouse >= 25.6 and internal version >= InternalVersion_ClickHouseTime64, got %s", columnType) + } else { + // Old version should use DateTime64(6) + require.Contains(s.t, columnType, "DateTime64(6)", + "Expected DateTime64(6) for TIME column with old internal version, got %s", columnType) + } + } else { + // Older ClickHouse versions should use DateTime64(6) + require.Contains(s.t, columnType, "DateTime64(6)", + "Expected DateTime64(6) for TIME column when ClickHouse < 25.6, got %s", columnType) + } + } + env.Cancel(s.t.Context()) RequireEnvCanceled(s.t, env) } diff --git a/flow/e2e/snowflake_schema_delta_test.go b/flow/e2e/snowflake_schema_delta_test.go index 333df110c6..ce92ff1e68 100644 --- a/flow/e2e/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake_schema_delta_test.go @@ -10,6 +10,7 @@ import ( connsnowflake "github.com/PeerDB-io/peerdb/flow/connectors/snowflake" "github.com/PeerDB-io/peerdb/flow/e2eshared" "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/shared" "github.com/PeerDB-io/peerdb/flow/shared/types" ) @@ -63,7 +64,7 @@ func (s SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { TypeModifier: -1, }, }, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, 0, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) @@ -171,7 +172,7 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, 0, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) @@ -250,7 +251,7 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, 0, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) @@ -305,7 +306,7 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, 0, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 814694de7f..009c41d2db 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -8,6 +8,7 @@ import ( "github.com/PeerDB-io/peerdb/flow/generated/protos" "github.com/PeerDB-io/peerdb/flow/internal" + "github.com/PeerDB-io/peerdb/flow/shared" "github.com/PeerDB-io/peerdb/flow/shared/datatypes" "github.com/PeerDB-io/peerdb/flow/shared/types" ) @@ -57,6 +58,7 @@ func ToDWHColumnType( dwhVersion *chproto.Version, column *protos.FieldDescription, nullableEnabled bool, + internalVersion uint32, ) (string, error) { var colType string switch dwhType { @@ -88,6 +90,14 @@ func ToDWHColumnType( colType = fmt.Sprintf("Array(%s)", colType) } else if (kind == types.QValueKindJSON || kind == types.QValueKindJSONB) && ShouldUseNativeJSONType(ctx, env, dwhVersion) { colType = "JSON" + } else if kind == types.QValueKindTime && internalVersion >= shared.InternalVersion_ClickHouseTime64 && dwhVersion != nil { + // Time64 was introduced in ClickHouse 25.6 + if chproto.CheckMinVersion(chproto.Version{Major: 25, Minor: 6, Patch: 0}, *dwhVersion) { + colType = "Time64(3)" + } else { + // Fall back to DateTime64(6) for older ClickHouse versions + colType = types.QValueKindToClickHouseTypeMap[kind] + } } else if val, ok := types.QValueKindToClickHouseTypeMap[kind]; ok { colType = val } else { diff --git a/flow/shared/constants.go b/flow/shared/constants.go index ec1bd3bb46..d926c24035 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -42,6 +42,9 @@ const ( InternalVersion_JsonEscapeDotsInKeys // MongoDB: `_id` column values stored as-is without redundant quotes InternalVersion_MongoDBIdWithoutRedundantQuotes + // ClickHouse: use Time64(3) data type for QValueKindTime when ClickHouse version >= 25.6 + // MySQL: map TIME type to QValueKindTime instead of QValueKindTimestamp + InternalVersion_ClickHouseTime64 TotalNumberOfInternalVersions InternalVersion_Latest = TotalNumberOfInternalVersions - 1 diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 38318c30b7..da86179438 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -161,6 +161,7 @@ func (q *QRepFlowExecution) setupWatermarkTableOnDestination(ctx workflow.Contex FlowName: q.config.FlowJobName, Env: q.config.Env, IsResync: q.config.DstTableFullResync, + Version: q.config.Version, } if err := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig).Get(ctx, nil); err != nil { diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index f4c79eb67c..cae1ffd1c0 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -213,6 +213,7 @@ func (s *SetupFlowExecution) setupNormalizedTables( FlowName: flowConnectionConfigs.FlowJobName, Env: flowConnectionConfigs.Env, IsResync: flowConnectionConfigs.Resync, + Version: flowConnectionConfigs.Version, } if err := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig).Get(ctx, nil); err != nil { diff --git a/protos/flow.proto b/protos/flow.proto index 87c278b0a8..5c0936d09c 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -263,6 +263,7 @@ message SetupNormalizedTableBatchInput { string flow_name = 6; string peer_name = 7; bool is_resync = 8; + uint32 version = 9; } message SetupNormalizedTableOutput { From 8f54f378ff6b3383924bfd3db21bdcda70a1f6a5 Mon Sep 17 00:00:00 2001 From: NehharShah Date: Thu, 13 Nov 2025 07:24:50 -0500 Subject: [PATCH 2/3] added backward compatibility test --- flow/e2e/clickhouse_mysql_test.go | 71 +++++++++++++++++++++++++++---- 1 file changed, 62 insertions(+), 9 deletions(-) diff --git a/flow/e2e/clickhouse_mysql_test.go b/flow/e2e/clickhouse_mysql_test.go index 3ae1078eaf..0f0b77945f 100644 --- a/flow/e2e/clickhouse_mysql_test.go +++ b/flow/e2e/clickhouse_mysql_test.go @@ -116,7 +116,8 @@ func (s ClickHouseSuite) Test_MySQL_Time() { EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,\"key\",d,dt,tm,t") - // Verify that TIME column uses Time64(3) when ClickHouse version >= 25.6 and internal version >= InternalVersion_ClickHouseTime64 + // Verify that TIME column uses Time64(3) when ClickHouse version >= 25.6 (assumes latest internal version) + // Backward compatibility with older internal versions is tested in Test_MySQL_Time_BackwardCompatibility ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig()) require.NoError(s.t, err) defer ch.Close() @@ -139,14 +140,8 @@ func (s ClickHouseSuite) Test_MySQL_Time() { // If ClickHouse >= 25.6 and using latest internal version, should use Time64(3) if major > 25 || (major == 25 && minor >= 6) { - if flowConnConfig.Version >= shared.InternalVersion_ClickHouseTime64 { - require.Contains(s.t, columnType, "Time64(3)", - "Expected Time64(3) for TIME column when ClickHouse >= 25.6 and internal version >= InternalVersion_ClickHouseTime64, got %s", columnType) - } else { - // Old version should use DateTime64(6) - require.Contains(s.t, columnType, "DateTime64(6)", - "Expected DateTime64(6) for TIME column with old internal version, got %s", columnType) - } + require.Contains(s.t, columnType, "Time64(3)", + "Expected Time64(3) for TIME column when ClickHouse >= 25.6 and using latest internal version, got %s", columnType) } else { // Older ClickHouse versions should use DateTime64(6) require.Contains(s.t, columnType, "DateTime64(6)", @@ -158,6 +153,64 @@ func (s ClickHouseSuite) Test_MySQL_Time() { RequireEnvCanceled(s.t, env) } +func (s ClickHouseSuite) Test_MySQL_Time_BackwardCompatibility() { + if _, ok := s.source.(*MySqlSource); !ok { + s.t.Skip("only applies to mysql") + } + + srcTableName := "test_datetime_backward_compat" + srcFullName := s.attachSchemaSuffix(srcTableName) + quotedSrcFullName := "\"" + strings.ReplaceAll(srcFullName, ".", "\".\"") + "\"" + dstTableName := "test_datetime_backward_compat_dst" + + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + "key" TEXT NOT NULL, + t TIME NOT NULL + ) + `, quotedSrcFullName))) + + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %s ("key",t) VALUES + ('init','14:21.654321')`, + quotedSrcFullName))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix(srcTableName), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + // Explicitly set to old internal version to test backward compatibility + flowConnConfig.Version = shared.InternalVersion_First + + tc := NewTemporalClient(s.t) + env := ExecutePeerflow(s.t, tc, flowConnConfig) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,\"key\",t") + + // Verify that TIME column uses DateTime64(6) even with ClickHouse >= 25.6 when using old internal version + ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig()) + require.NoError(s.t, err) + defer ch.Close() + + var columnType string + err = ch.QueryRow(s.t.Context(), fmt.Sprintf( + "SELECT type FROM system.columns WHERE database = currentDatabase() AND table = %s AND name = 't'", + clickhouse.QuoteLiteral(dstTableName), + )).Scan(&columnType) + require.NoError(s.t, err) + + // With old internal version, should always use DateTime64(6) regardless of ClickHouse version + require.Contains(s.t, columnType, "DateTime64(6)", + "Expected DateTime64(6) for TIME column with old internal version (InternalVersion_First), got %s", columnType) + + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + func (s ClickHouseSuite) Test_MySQL_Bit() { if _, ok := s.source.(*MySqlSource); !ok { s.t.Skip("only applies to mysql") From 8bd22b88c1d2b04c01171710c2cadfe598c2f6c0 Mon Sep 17 00:00:00 2001 From: Joy Gao <17896160+jgao54@users.noreply.github.com> Date: Wed, 21 Jan 2026 06:14:59 -1000 Subject: [PATCH 3/3] fix time64 support --- flow/activities/flowable_core.go | 4 +- flow/connectors/bigquery/qrep_avro_sync.go | 4 +- flow/connectors/clickhouse/avro_sync.go | 11 ++- flow/connectors/clickhouse/cdc.go | 6 +- flow/connectors/clickhouse/clickhouse.go | 2 +- flow/connectors/clickhouse/normalize.go | 11 ++- flow/connectors/clickhouse/normalize_query.go | 30 +++--- flow/connectors/clickhouse/table_function.go | 32 ++++++- .../clickhouse/table_function_test.go | 14 ++- flow/connectors/mysql/cdc.go | 7 +- flow/connectors/mysql/qrep.go | 2 +- flow/connectors/mysql/qvalue_convert.go | 13 ++- flow/connectors/mysql/schema.go | 2 +- flow/connectors/mysql/type_conversion.go | 9 +- flow/e2e/clickhouse.go | 21 ++++- flow/e2e/clickhouse_mysql_test.go | 92 ------------------- flow/e2e/clickhouse_test.go | 71 ++++++++++++++ flow/model/qvalue/avro_converter.go | 15 ++- flow/model/qvalue/equals.go | 6 +- flow/model/qvalue/kind.go | 20 ++-- flow/model/record_items.go | 5 +- .../clickhouse/settings.go} | 4 +- .../clickhouse/settings_test.go} | 2 +- flow/shared/constants.go | 2 +- flow/shared/types/time_format.go | 32 +++++++ flow/shared/types/time_format_test.go | 29 ++++++ 26 files changed, 279 insertions(+), 167 deletions(-) rename flow/{connectors/clickhouse/setting_util.go => pkg/clickhouse/settings.go} (95%) rename flow/{connectors/clickhouse/setting_util_test.go => pkg/clickhouse/settings_test.go} (99%) create mode 100644 flow/shared/types/time_format.go create mode 100644 flow/shared/types/time_format_test.go diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 434d3d69d6..1a7ad54b91 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -320,7 +320,9 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon defer dstClose(ctx) syncState.Store(shared.Ptr("updating schema")) - if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas, config.Version); err != nil { + if err := dstConn.ReplayTableSchemaDeltas( + ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas, config.Version, + ); err != nil { return nil, fmt.Errorf("failed to sync schema: %w", err) } diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index cf234031aa..f2d3b4d263 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -98,7 +98,9 @@ func (s *QRepAvroSyncMethod) SyncRecords( slog.String(string(shared.FlowNameKey), req.FlowJobName), slog.String("dstTableName", rawTableName)) - if err := s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version); err != nil { + if err := s.connector.ReplayTableSchemaDeltas( + ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version, + ); err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } diff --git a/flow/connectors/clickhouse/avro_sync.go b/flow/connectors/clickhouse/avro_sync.go index e1e0604b9c..0cb604ff90 100644 --- a/flow/connectors/clickhouse/avro_sync.go +++ b/flow/connectors/clickhouse/avro_sync.go @@ -293,11 +293,14 @@ func (s *ClickHouseAvroSyncMethod) pushS3DataToClickHouseForSnapshot( } numParts = max(numParts, 1) - chSettings := NewCHSettings(s.chVersion) - chSettings.Add(SettingThrowOnMaxPartitionsPerInsertBlock, "0") - chSettings.Add(SettingTypeJsonSkipDuplicatedPaths, "1") + chSettings := peerdb_clickhouse.NewCHSettings(s.chVersion) + chSettings.Add(peerdb_clickhouse.SettingThrowOnMaxPartitionsPerInsertBlock, "0") + chSettings.Add(peerdb_clickhouse.SettingTypeJsonSkipDuplicatedPaths, "1") if config.Version >= shared.InternalVersion_JsonEscapeDotsInKeys { - chSettings.Add(SettingJsonTypeEscapeDotsInKeys, "1") + chSettings.Add(peerdb_clickhouse.SettingJsonTypeEscapeDotsInKeys, "1") + } + if config.Version >= shared.InternalVersion_ClickHouseTime64 { + chSettings.Add(peerdb_clickhouse.SettingEnableTimeTime64Type, "1") } // Process each chunk file individually diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 9076f89c98..353a95c8a3 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -229,7 +229,8 @@ func (c *ClickHouseConnector) RenameTables( req *protos.RenameTablesInput, ) (*protos.RenameTablesOutput, error) { onCluster := c.onCluster() - dropTableSQLWithCHSetting := dropTableIfExistsSQL + NewCHSettingsString(c.chVersion, SettingMaxTableSizeToDrop, "0") + dropTableSQLWithCHSetting := dropTableIfExistsSQL + + peerdb_clickhouse.NewCHSettingsString(c.chVersion, peerdb_clickhouse.SettingMaxTableSizeToDrop, "0") for _, renameRequest := range req.RenameTableOptions { if renameRequest.CurrentName == renameRequest.NewName { c.logger.Info("table rename is nop, probably Null table engine, skipping rename for it", @@ -303,7 +304,8 @@ func (c *ClickHouseConnector) SyncFlowCleanup(ctx context.Context, jobName strin // delete raw table if exists rawTableIdentifier := c.GetRawTableName(jobName) onCluster := c.onCluster() - dropTableSQLWithCHSetting := dropTableIfExistsSQL + NewCHSettingsString(c.chVersion, SettingMaxTableSizeToDrop, "0") + dropTableSQLWithCHSetting := dropTableIfExistsSQL + + peerdb_clickhouse.NewCHSettingsString(c.chVersion, peerdb_clickhouse.SettingMaxTableSizeToDrop, "0") if err := c.execWithLogging(ctx, fmt.Sprintf(dropTableSQLWithCHSetting, peerdb_clickhouse.QuoteIdentifier(rawTableIdentifier), onCluster), ); err != nil { diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index e60d1e8208..5c3bc7958a 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -443,7 +443,7 @@ func GetTableSchemaForTable(tm *protos.TableMapping, columns []driver.ColumnType qkind = types.QValueKindUUID case "DateTime64(6)", "Nullable(DateTime64(6))", "DateTime64(9)", "Nullable(DateTime64(9))": qkind = types.QValueKindTimestamp - case "Time64(3)", "Nullable(Time64(3))": + case "Time64(6)", "Nullable(Time64(6))": qkind = types.QValueKindTime case "Date32", "Nullable(Date32)": qkind = types.QValueKindDate diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index a7f4962ca9..7badd15901 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -21,6 +21,7 @@ import ( "github.com/PeerDB-io/peerdb/flow/model/qvalue" peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse" "github.com/PeerDB-io/peerdb/flow/pkg/common" + "github.com/PeerDB-io/peerdb/flow/shared" "github.com/PeerDB-io/peerdb/flow/shared/types" ) @@ -205,7 +206,8 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable( if clickHouseType == "" { var err error clickHouseType, err = qvalue.ToDWHColumnType( - ctx, colType, config.Env, protos.DBType_CLICKHOUSE, chVersion, column, tableSchema.NullableEnabled || columnNullableEnabled, internalVersion, + ctx, colType, config.Env, protos.DBType_CLICKHOUSE, chVersion, column, + tableSchema.NullableEnabled || columnNullableEnabled, internalVersion, ) if err != nil { return nil, fmt.Errorf("error while converting column type to ClickHouse type: %w", err) @@ -265,9 +267,14 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable( } } + settings := peerdb_clickhouse.NewCHSettings(chVersion) if allowNullableKey { - stmtBuilder.WriteString(NewCHSettingsString(chVersion, SettingAllowNullableKey, "1")) + settings.Add(peerdb_clickhouse.SettingAllowNullableKey, "1") } + if internalVersion >= shared.InternalVersion_ClickHouseTime64 { + settings.Add(peerdb_clickhouse.SettingEnableTimeTime64Type, "1") + } + stmtBuilder.WriteString(settings.String()) if c.Config.Cluster != "" { fmt.Fprintf(&stmtBuilderDistributed, " ENGINE = Distributed(%s,%s,%s", diff --git a/flow/connectors/clickhouse/normalize_query.go b/flow/connectors/clickhouse/normalize_query.go index a0cdfec3d8..8928263fcc 100644 --- a/flow/connectors/clickhouse/normalize_query.go +++ b/flow/connectors/clickhouse/normalize_query.go @@ -131,17 +131,15 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error } switch clickHouseType { - case "Time64(3)", "Nullable(Time64(3))": - // Time64 is a time-of-day type, parse from JSON string - // toTime64 converts string to Time64(3), returns NULL if string is NULL or invalid + case "Time64(6)", "Nullable(Time64(6))": fmt.Fprintf(&projection, - "toTime64(JSONExtractString(_peerdb_data, %s),3) AS %s,", + "toTime64OrNull(JSONExtractString(_peerdb_data, %s)) AS %s,", peerdb_clickhouse.QuoteLiteral(colName), peerdb_clickhouse.QuoteIdentifier(dstColName), ) if t.enablePrimaryUpdate { fmt.Fprintf(&projectionUpdate, - "toTime64(JSONExtractString(_peerdb_match_data, %s),3) AS %s,", + "toTime64OrNull(JSONExtractString(_peerdb_match_data, %s)) AS %s,", peerdb_clickhouse.QuoteLiteral(colName), peerdb_clickhouse.QuoteIdentifier(dstColName), ) @@ -160,18 +158,19 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error ) } case "DateTime64(6)", "Nullable(DateTime64(6))": + // Handle legacy path where TIME is stored as DateTime64 (before Time64 support) if colType == types.QValueKindTime || colType == types.QValueKindTimeTZ { - // parseDateTime64BestEffortOrNull for hh:mm:ss puts the year as current year - // (or previous year if result would be in future) so explicitly anchor to unix epoch fmt.Fprintf(&projection, - "parseDateTime64BestEffortOrNull('1970-01-01 ' || JSONExtractString(_peerdb_data, %s),6,'UTC') AS %s,", + "CAST(toTime64OrNull(JSONExtractString(_peerdb_data, %s)), '%s') AS %s,", peerdb_clickhouse.QuoteLiteral(colName), + clickHouseType, peerdb_clickhouse.QuoteIdentifier(dstColName), ) if t.enablePrimaryUpdate { fmt.Fprintf(&projectionUpdate, - "parseDateTime64BestEffortOrNull('1970-01-01 ' || JSONExtractString(_peerdb_match_data, %s),6,'UTC') AS %s,", + "CAST(toTime64OrNull(JSONExtractString(_peerdb_match_data, %s)), '%s') AS %s,", peerdb_clickhouse.QuoteLiteral(colName), + clickHouseType, peerdb_clickhouse.QuoteIdentifier(dstColName), ) } @@ -312,14 +311,17 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error t.lastNormBatchID, t.endBatchID, peerdb_clickhouse.QuoteLiteral(t.TableName)) } - chSettings := NewCHSettings(t.chVersion) - chSettings.Add(SettingThrowOnMaxPartitionsPerInsertBlock, "0") - chSettings.Add(SettingTypeJsonSkipDuplicatedPaths, "1") + chSettings := peerdb_clickhouse.NewCHSettings(t.chVersion) + chSettings.Add(peerdb_clickhouse.SettingThrowOnMaxPartitionsPerInsertBlock, "0") + chSettings.Add(peerdb_clickhouse.SettingTypeJsonSkipDuplicatedPaths, "1") if t.cluster { - chSettings.Add(SettingParallelDistributedInsertSelect, "0") + chSettings.Add(peerdb_clickhouse.SettingParallelDistributedInsertSelect, "0") } if t.version >= shared.InternalVersion_JsonEscapeDotsInKeys { - chSettings.Add(SettingJsonTypeEscapeDotsInKeys, "1") + chSettings.Add(peerdb_clickhouse.SettingJsonTypeEscapeDotsInKeys, "1") + } + if t.version >= shared.InternalVersion_ClickHouseTime64 { + chSettings.Add(peerdb_clickhouse.SettingEnableTimeTime64Type, "1") } insertIntoSelectQuery := fmt.Sprintf("INSERT INTO %s %s %s%s", diff --git a/flow/connectors/clickhouse/table_function.go b/flow/connectors/clickhouse/table_function.go index 9bcd762bb7..eb267d5f0c 100644 --- a/flow/connectors/clickhouse/table_function.go +++ b/flow/connectors/clickhouse/table_function.go @@ -52,8 +52,36 @@ func jsonFieldExpressionConverter( return fmt.Sprintf("CAST(%s, 'JSON')", sourceFieldIdentifier), nil } +func timeFieldExpressionConverter( + _ context.Context, + config *insertFromTableFunctionConfig, + sourceFieldIdentifier string, + field types.QField, +) (string, error) { + if field.Type != types.QValueKindTime { + return sourceFieldIdentifier, nil + } + + // Skip for BigQuery source - TIME comes as Int64 from Parquet, not String + if config.config.SourceType == protos.DBType_BIGQUERY { + return sourceFieldIdentifier, nil + } + + // Handle legacy path where TIME is stored as DateTime64, before ClickHouse supported Time64 type + if !qvalue.ShouldUseTime64Type(config.connector.chVersion, config.config.Version) { + destType := "DateTime64(6)" + if field.Nullable { + destType = "Nullable(DateTime64(6))" + } + return fmt.Sprintf("CAST(toTime64OrNull(%s), '%s')", sourceFieldIdentifier, destType), nil + } + + return fmt.Sprintf("toTime64OrNull(%s)", sourceFieldIdentifier), nil +} + var defaultFieldExpressionConverters = []fieldExpressionConverter{ jsonFieldExpressionConverter, + timeFieldExpressionConverter, } // buildInsertFromTableFunctionQuery builds a complete INSERT query from a table function expression @@ -62,7 +90,7 @@ func buildInsertFromTableFunctionQuery( ctx context.Context, config *insertFromTableFunctionConfig, tableFunctionExpr string, - chSettings *CHSettings, + chSettings *peerdb_clickhouse.CHSettings, ) (string, error) { fieldExpressionConverters := defaultFieldExpressionConverters fieldExpressionConverters = append(fieldExpressionConverters, config.fieldExpressionConverters...) @@ -142,7 +170,7 @@ func buildInsertFromTableFunctionQueryWithPartitioning( tableFunctionExpr string, partitionIndex uint64, totalPartitions uint64, - chSettings *CHSettings, + chSettings *peerdb_clickhouse.CHSettings, ) (string, error) { var query strings.Builder diff --git a/flow/connectors/clickhouse/table_function_test.go b/flow/connectors/clickhouse/table_function_test.go index eb209c1e33..41eb062b23 100644 --- a/flow/connectors/clickhouse/table_function_test.go +++ b/flow/connectors/clickhouse/table_function_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/PeerDB-io/peerdb/flow/generated/protos" + peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse" "github.com/PeerDB-io/peerdb/flow/shared/types" ) @@ -33,14 +34,16 @@ func TestBuildInsertFromTableFunctionQuery(t *testing.T) { } tableFunctionExpr := "s3('s3://bucket/key', 'format')" - chSettings := NewCHSettings(&chproto.Version{Major: 25, Minor: 8}) - chSettings.Add(SettingTypeJsonSkipDuplicatedPaths, "1") + chSettings := peerdb_clickhouse.NewCHSettings(&chproto.Version{Major: 25, Minor: 8}) + chSettings.Add(peerdb_clickhouse.SettingTypeJsonSkipDuplicatedPaths, "1") // without partitioning query, err := buildInsertFromTableFunctionQuery(ctx, config, tableFunctionExpr, chSettings) require.NoError(t, err) - require.Equal(t, fmt.Sprintf("INSERT INTO `t1`(`id`,`name`) SELECT `id`,`name` FROM s3('s3://bucket/key', 'format') SETTINGS %s=%s", - string(SettingTypeJsonSkipDuplicatedPaths), "1"), query) + require.Equal(t, + fmt.Sprintf("INSERT INTO `t1`(`id`,`name`) SELECT `id`,`name` FROM s3('s3://bucket/key', 'format') SETTINGS %s=%s", + string(peerdb_clickhouse.SettingTypeJsonSkipDuplicatedPaths), "1"), + query) // with partitioning totalPartitions := uint64(8) @@ -49,6 +52,7 @@ func TestBuildInsertFromTableFunctionQuery(t *testing.T) { require.NoError(t, err) require.Equal(t, query, "INSERT INTO `t1`(`id`,`name`) SELECT `id`,`name` FROM s3('s3://bucket/key', 'format')"+ - fmt.Sprintf(" WHERE cityHash64(`id`) %% 8 = %d SETTINGS %s=%s", idx, string(SettingTypeJsonSkipDuplicatedPaths), "1")) + fmt.Sprintf(" WHERE cityHash64(`id`) %% 8 = %d SETTINGS %s=%s", + idx, string(peerdb_clickhouse.SettingTypeJsonSkipDuplicatedPaths), "1")) } } diff --git a/flow/connectors/mysql/cdc.go b/flow/connectors/mysql/cdc.go index bda24852e8..2256fa969e 100644 --- a/flow/connectors/mysql/cdc.go +++ b/flow/connectors/mysql/cdc.go @@ -43,7 +43,7 @@ func (c *MySqlConnector) GetTableSchema( ) (map[string]*protos.TableSchema, error) { res := make(map[string]*protos.TableSchema, len(tableMappings)) for _, tm := range tableMappings { - tableSchema, err := c.getTableSchemaForTable(ctx, env, tm, system, version) + tableSchema, err := c.getTableSchemaForTable(ctx, env, tm, system) if err != nil { c.logger.Info("error fetching schema", slog.String("table", tm.SourceTableIdentifier), slog.Any("error", err)) return nil, err @@ -60,7 +60,6 @@ func (c *MySqlConnector) getTableSchemaForTable( env map[string]string, tm *protos.TableMapping, system protos.TypeSystem, - version uint32, ) (*protos.TableSchema, error) { qualifiedTable, err := common.ParseTableIdentifier(tm.SourceTableIdentifier) if err != nil { @@ -110,7 +109,7 @@ func (c *MySqlConnector) getTableSchemaForTable( if err != nil { return nil, err } - qkind, err := QkindFromMysqlColumnType(dataType, version) + qkind, err := QkindFromMysqlColumnType(dataType) if err != nil { return nil, err } @@ -724,7 +723,7 @@ func (c *MySqlConnector) processAlterTableQuery(ctx context.Context, catalogPool slog.String("tableName", sourceTableName)) continue } - qkind, err := QkindFromMysqlColumnType(col.Tp.InfoSchemaStr(), req.InternalVersion) + qkind, err := QkindFromMysqlColumnType(col.Tp.InfoSchemaStr()) if err != nil { return err } diff --git a/flow/connectors/mysql/qrep.go b/flow/connectors/mysql/qrep.go index 379162ddbe..9605f5ca9d 100644 --- a/flow/connectors/mysql/qrep.go +++ b/flow/connectors/mysql/qrep.go @@ -185,7 +185,7 @@ func (c *MySqlConnector) PullQRepRecords( stream *model.QRecordStream, ) (int64, int64, error) { tableSchema, err := c.getTableSchemaForTable(ctx, config.Env, - &protos.TableMapping{SourceTableIdentifier: config.WatermarkTable}, protos.TypeSystem_Q, config.Version) + &protos.TableMapping{SourceTableIdentifier: config.WatermarkTable}, protos.TypeSystem_Q) if err != nil { return 0, 0, fmt.Errorf("failed to get schema for watermark table %s: %w", config.WatermarkTable, err) } diff --git a/flow/connectors/mysql/qvalue_convert.go b/flow/connectors/mysql/qvalue_convert.go index 2622ff8519..c4307b5b45 100644 --- a/flow/connectors/mysql/qvalue_convert.go +++ b/flow/connectors/mysql/qvalue_convert.go @@ -58,9 +58,11 @@ func qkindFromMysqlType(mytype byte, unsigned bool, charset uint16) (types.QValu return types.QValueKindInvalid, nil case mysql.MYSQL_TYPE_DATE, mysql.MYSQL_TYPE_NEWDATE: return types.QValueKindDate, nil - case mysql.MYSQL_TYPE_TIMESTAMP, mysql.MYSQL_TYPE_TIME, mysql.MYSQL_TYPE_DATETIME, - mysql.MYSQL_TYPE_TIMESTAMP2, mysql.MYSQL_TYPE_DATETIME2, mysql.MYSQL_TYPE_TIME2: + case mysql.MYSQL_TYPE_TIMESTAMP, mysql.MYSQL_TYPE_DATETIME, + mysql.MYSQL_TYPE_TIMESTAMP2, mysql.MYSQL_TYPE_DATETIME2: return types.QValueKindTimestamp, nil + case mysql.MYSQL_TYPE_TIME, mysql.MYSQL_TYPE_TIME2: + return types.QValueKindTime, nil case mysql.MYSQL_TYPE_YEAR: return types.QValueKindInt16, nil case mysql.MYSQL_TYPE_BIT: @@ -309,6 +311,8 @@ func QValueFromMysqlFieldValue(qkind types.QValueKind, mytype byte, fv mysql.Fie } return types.QValueNumeric{Val: val}, nil case types.QValueKindTimestamp: + // Deprecated: we mapped MySQL time to QValueKindTimestamp before ClickHouse supported Time64 + // Keep code path for backwards compatibility if mytype == mysql.MYSQL_TYPE_TIME || mytype == mysql.MYSQL_TYPE_TIME2 { tm, err := processTime(unsafeString) if err != nil { @@ -325,9 +329,8 @@ func QValueFromMysqlFieldValue(qkind types.QValueKind, mytype byte, fv mysql.Fie } return types.QValueTimestamp{Val: val}, nil case types.QValueKindTime: - // deprecated: most databases expect time to be time part of datetime - // mysql it's a +/- 800 hour range to represent duration - // keep codepath for backwards compat when mysql time was mapped to QValueKindTime + // MySQL TIME supports range: [-838:59:59.000000, 838:59:59.999999] + // ClickHouse Time64 supports range: [-999:59:59.000000, 999:59:59.999999] tm, err := processTime(unsafeString) if err != nil { return nil, err diff --git a/flow/connectors/mysql/schema.go b/flow/connectors/mysql/schema.go index 776131a118..c429b8db96 100644 --- a/flow/connectors/mysql/schema.go +++ b/flow/connectors/mysql/schema.go @@ -109,7 +109,7 @@ func (c *MySqlConnector) GetColumns(ctx context.Context, version uint32, schema if err != nil { return nil, err } - qkind, err := QkindFromMysqlColumnType(columnType, version) + qkind, err := QkindFromMysqlColumnType(columnType) if err != nil { return nil, err } diff --git a/flow/connectors/mysql/type_conversion.go b/flow/connectors/mysql/type_conversion.go index 204d70aab9..ad4e615944 100644 --- a/flow/connectors/mysql/type_conversion.go +++ b/flow/connectors/mysql/type_conversion.go @@ -4,11 +4,10 @@ import ( "fmt" "strings" - "github.com/PeerDB-io/peerdb/flow/shared" "github.com/PeerDB-io/peerdb/flow/shared/types" ) -func QkindFromMysqlColumnType(ct string, version uint32) (types.QValueKind, error) { +func QkindFromMysqlColumnType(ct string) (types.QValueKind, error) { // https://mariadb.com/docs/server/reference/data-types/date-and-time-data-types/timestamp#tab-current-1 ct, _ = strings.CutSuffix(ct, " /* mariadb-5.3 */") ct, _ = strings.CutSuffix(ct, " zerofill") @@ -28,11 +27,7 @@ func QkindFromMysqlColumnType(ct string, version uint32) (types.QValueKind, erro case "datetime", "timestamp": return types.QValueKindTimestamp, nil case "time": - // For new versions, map TIME to QValueKindTime instead of QValueKindTimestamp - if version >= shared.InternalVersion_ClickHouseTime64 { - return types.QValueKindTime, nil - } - return types.QValueKindTimestamp, nil + return types.QValueKindTime, nil case "decimal", "numeric": return types.QValueKindNumeric, nil case "float": diff --git a/flow/e2e/clickhouse.go b/flow/e2e/clickhouse.go index 1bfc2b58c6..df3b6727e2 100644 --- a/flow/e2e/clickhouse.go +++ b/flow/e2e/clickhouse.go @@ -190,7 +190,16 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch for idx, ty := range colTypes { fieldDesc := tableSchema.Columns[idx] - row = append(row, reflect.New(ty.ScanType()).Interface()) + // Workaround: driver's ScanType() returns time.Time which fails to scan into time.Duration + // https://github.com/ClickHouse/clickhouse-go/issues/1757 + dbTypeName := ty.DatabaseTypeName() + if dbTypeName == "Time64(6)" { + row = append(row, new(time.Duration)) + } else if dbTypeName == "Nullable(Time64(6))" { + row = append(row, new(*time.Duration)) + } else { + row = append(row, reflect.New(ty.ScanType()).Interface()) + } batch.Schema.Fields = append(batch.Schema.Fields, types.QField{ Name: ty.Name(), Type: types.QValueKind(fieldDesc.Type), @@ -261,6 +270,13 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch } else { qrow = append(qrow, types.QValueTimestamp{Val: **v}) } + case **time.Duration: + if *v == nil { + qrow = append(qrow, types.QValueNull(types.QValueKindTime)) + } else { + // Time64(6) stores microseconds, but time.Duration is nanoseconds + qrow = append(qrow, types.QValueTime{Val: **v * 1000}) + } case **uint8: if *v == nil { qrow = append(qrow, types.QValueNull(types.QValueKindUInt8)) @@ -301,6 +317,9 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch } case *time.Time: qrow = append(qrow, types.QValueTimestamp{Val: *v}) + case *time.Duration: + // Time64(6) stores microseconds, but time.Duration is nanoseconds + qrow = append(qrow, types.QValueTime{Val: *v * 1000}) case *[]time.Time: qrow = append(qrow, types.QValueArrayTimestamp{Val: *v}) case **decimal.Decimal: diff --git a/flow/e2e/clickhouse_mysql_test.go b/flow/e2e/clickhouse_mysql_test.go index 0f0b77945f..5e0c88a428 100644 --- a/flow/e2e/clickhouse_mysql_test.go +++ b/flow/e2e/clickhouse_mysql_test.go @@ -3,7 +3,6 @@ package e2e import ( "fmt" "math" - "strconv" "strings" "github.com/stretchr/testify/require" @@ -116,97 +115,6 @@ func (s ClickHouseSuite) Test_MySQL_Time() { EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,\"key\",d,dt,tm,t") - // Verify that TIME column uses Time64(3) when ClickHouse version >= 25.6 (assumes latest internal version) - // Backward compatibility with older internal versions is tested in Test_MySQL_Time_BackwardCompatibility - ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig()) - require.NoError(s.t, err) - defer ch.Close() - - var columnType string - err = ch.QueryRow(s.t.Context(), fmt.Sprintf( - "SELECT type FROM system.columns WHERE database = currentDatabase() AND table = %s AND name = 't'", - clickhouse.QuoteLiteral(dstTableName), - )).Scan(&columnType) - require.NoError(s.t, err) - - chVersion, err := s.connector.GetVersion(s.t.Context()) - require.NoError(s.t, err) - - // Check if ClickHouse version >= 25.6 - versionParts := strings.Split(chVersion, ".") - if len(versionParts) >= 2 { - major, _ := strconv.Atoi(versionParts[0]) - minor, _ := strconv.Atoi(versionParts[1]) - - // If ClickHouse >= 25.6 and using latest internal version, should use Time64(3) - if major > 25 || (major == 25 && minor >= 6) { - require.Contains(s.t, columnType, "Time64(3)", - "Expected Time64(3) for TIME column when ClickHouse >= 25.6 and using latest internal version, got %s", columnType) - } else { - // Older ClickHouse versions should use DateTime64(6) - require.Contains(s.t, columnType, "DateTime64(6)", - "Expected DateTime64(6) for TIME column when ClickHouse < 25.6, got %s", columnType) - } - } - - env.Cancel(s.t.Context()) - RequireEnvCanceled(s.t, env) -} - -func (s ClickHouseSuite) Test_MySQL_Time_BackwardCompatibility() { - if _, ok := s.source.(*MySqlSource); !ok { - s.t.Skip("only applies to mysql") - } - - srcTableName := "test_datetime_backward_compat" - srcFullName := s.attachSchemaSuffix(srcTableName) - quotedSrcFullName := "\"" + strings.ReplaceAll(srcFullName, ".", "\".\"") + "\"" - dstTableName := "test_datetime_backward_compat_dst" - - require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - "key" TEXT NOT NULL, - t TIME NOT NULL - ) - `, quotedSrcFullName))) - - require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %s ("key",t) VALUES - ('init','14:21.654321')`, - quotedSrcFullName))) - - connectionGen := FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix(srcTableName), - TableNameMapping: map[string]string{srcFullName: dstTableName}, - Destination: s.Peer().Name, - } - flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) - flowConnConfig.DoInitialSnapshot = true - // Explicitly set to old internal version to test backward compatibility - flowConnConfig.Version = shared.InternalVersion_First - - tc := NewTemporalClient(s.t) - env := ExecutePeerflow(s.t, tc, flowConnConfig) - SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) - - EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,\"key\",t") - - // Verify that TIME column uses DateTime64(6) even with ClickHouse >= 25.6 when using old internal version - ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig()) - require.NoError(s.t, err) - defer ch.Close() - - var columnType string - err = ch.QueryRow(s.t.Context(), fmt.Sprintf( - "SELECT type FROM system.columns WHERE database = currentDatabase() AND table = %s AND name = 't'", - clickhouse.QuoteLiteral(dstTableName), - )).Scan(&columnType) - require.NoError(s.t, err) - - // With old internal version, should always use DateTime64(6) regardless of ClickHouse version - require.Contains(s.t, columnType, "DateTime64(6)", - "Expected DateTime64(6) for TIME column with old internal version (InternalVersion_First), got %s", columnType) - env.Cancel(s.t.Context()) RequireEnvCanceled(s.t, env) } diff --git a/flow/e2e/clickhouse_test.go b/flow/e2e/clickhouse_test.go index 3609ae400e..1a02d4732c 100644 --- a/flow/e2e/clickhouse_test.go +++ b/flow/e2e/clickhouse_test.go @@ -1300,6 +1300,77 @@ func (s ClickHouseSuite) Test_Types_CH() { RequireEnvCanceled(s.t, env) } +func (s ClickHouseSuite) Test_Time64() { + _, isPostgres := s.source.(*PostgresSource) + _, isMySQL := s.source.(*MySqlSource) + if !isPostgres && !isMySQL { + s.t.Skip("only applies to postgres and mysql") + } + + srcTableName := "test_time" + srcFullName := s.attachSchemaSuffix(srcTableName) + dstTableName := srcTableName + "_dst" + + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + t TIME NOT NULL, + t_nullable TIME, + t_nullable_2 TIME + ) + `, srcFullName))) + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf( + `INSERT INTO %s (t, t_nullable, t_nullable_2) VALUES ('14:21:00', '08:30:00.123456', NULL)`, srcFullName))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix(srcTableName), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + + flowConnConfig.TableMappings[0].Columns = []*protos.ColumnSetting{ + {SourceName: "t_nullable", NullableEnabled: true}, + {SourceName: "t_nullable_2", NullableEnabled: true}, + } + tc := NewTemporalClient(s.t) + env := ExecutePeerflow(s.t, tc, flowConnConfig) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,t,t_nullable,t_nullable_2") + + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf( + `INSERT INTO %s (t, t_nullable, t_nullable_2) VALUES ('00:00:00', '23:59:59.999999', NULL)`, srcFullName))) + + EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,t,t_nullable,t_nullable_2") + + ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig()) + require.NoError(s.t, err) + defer ch.Close() + assertColumnType := func(columnName string, expectedColumnType string) { + var columnType string + row := ch.QueryRow( + s.t.Context(), + fmt.Sprintf( + "select type from system.columns where database = '%s' and table = '%s' and name = '%s'", + s.Peer().GetClickhouseConfig().Database, + dstTableName, + columnName, + ), + ) + require.NoError(s.t, row.Err()) + require.NoError(s.t, row.Scan(&columnType)) + require.Equal(s.t, expectedColumnType, columnType, "unexpected type for column %s", columnName) + } + assertColumnType("t", "Time64(6)") + assertColumnType("t_nullable", "Nullable(Time64(6))") + assertColumnType("t_nullable_2", "Nullable(Time64(6))") + + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + func (s ClickHouseSuite) Test_InfiniteTimestamp() { if _, ok := s.source.(*PostgresSource); !ok { s.t.Skip("only applies to postgres") diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 048ac2a038..f241afb693 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -107,10 +107,8 @@ func GetAvroSchemaFromQValueKind( } return avro.NewPrimitiveSchema(avro.Int, avro.NewPrimitiveLogicalSchema(avro.Date)), nil case types.QValueKindTime, types.QValueKindTimeTZ: - if targetDWH == protos.DBType_SNOWFLAKE { - return avro.NewPrimitiveSchema(avro.String, nil), nil - } - return avro.NewPrimitiveSchema(avro.Long, avro.NewPrimitiveLogicalSchema(avro.TimeMicros)), nil + // Use string format "15:04:05.999999" for TIME types - compatible with both DateTime64 and Time64 + return avro.NewPrimitiveSchema(avro.String, nil), nil case types.QValueKindTimestamp, types.QValueKindTimestampTZ: if targetDWH == protos.DBType_SNOWFLAKE { return avro.NewPrimitiveSchema(avro.String, nil), nil @@ -355,14 +353,13 @@ func QValueToAvro( } func (c *QValueAvroConverter) processGoTime(t time.Duration, so sizeOpt) (any, int64) { - // Snowflake has issues with avro timestamp types, returning as string form - // See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file if c.TargetDWH == protos.DBType_SNOWFLAKE { + // Snowflake TIME must be in range [0, 24h) t = max(min(t, 86399999999*time.Microsecond), 0) - s := time.Time{}.Add(t).Format("15:04:05.999999") - return s, stringSize(s, so) } - return t, constSize(8, so) + + s := types.FormatExtendedTimeDuration(t) + return s, stringSize(s, so) } func (c *QValueAvroConverter) processGeneralTime(t time.Time, format string, avroVal int64, so sizeOpt) (any, int64) { diff --git a/flow/model/qvalue/equals.go b/flow/model/qvalue/equals.go index d990ce7fa7..6e0449a7c4 100644 --- a/flow/model/qvalue/equals.go +++ b/flow/model/qvalue/equals.go @@ -188,7 +188,11 @@ func compareGoTime(value1, value2 any) bool { t2 = tm.Sub(time.Unix(0, 0).UTC()) } - return ok1 && ok2 && t1 == t2 + // Workaround for Go driver behavior where precision is lost when + // scanning to time.Duration (QValue Equal is only used for e2e testing) + // https://github.com/ClickHouse/clickhouse-go/issues/1757 + // TODO: should be t1 == t2 when upstream is fixed + return ok1 && ok2 && t1.Milliseconds() == t2.Milliseconds() } func compareGoDate(value1, value2 any) bool { diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 009c41d2db..4aea3ccd96 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -8,6 +8,7 @@ import ( "github.com/PeerDB-io/peerdb/flow/generated/protos" "github.com/PeerDB-io/peerdb/flow/internal" + peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse" "github.com/PeerDB-io/peerdb/flow/shared" "github.com/PeerDB-io/peerdb/flow/shared/datatypes" "github.com/PeerDB-io/peerdb/flow/shared/types" @@ -90,14 +91,8 @@ func ToDWHColumnType( colType = fmt.Sprintf("Array(%s)", colType) } else if (kind == types.QValueKindJSON || kind == types.QValueKindJSONB) && ShouldUseNativeJSONType(ctx, env, dwhVersion) { colType = "JSON" - } else if kind == types.QValueKindTime && internalVersion >= shared.InternalVersion_ClickHouseTime64 && dwhVersion != nil { - // Time64 was introduced in ClickHouse 25.6 - if chproto.CheckMinVersion(chproto.Version{Major: 25, Minor: 6, Patch: 0}, *dwhVersion) { - colType = "Time64(3)" - } else { - // Fall back to DateTime64(6) for older ClickHouse versions - colType = types.QValueKindToClickHouseTypeMap[kind] - } + } else if (kind == types.QValueKindTime || kind == types.QValueKindTimeTZ) && ShouldUseTime64Type(dwhVersion, internalVersion) { + colType = "Time64(6)" } else if val, ok := types.QValueKindToClickHouseTypeMap[kind]; ok { colType = val } else { @@ -126,3 +121,12 @@ func ShouldUseNativeJSONType(ctx context.Context, env map[string]string, chVersi isJsonEnabled, _ := internal.PeerDBEnableClickHouseJSON(ctx, env) return isJsonSupported && isJsonEnabled } + +func ShouldUseTime64Type(chVersion *chproto.Version, internalVersion uint32) bool { + if chVersion == nil { + return false + } + // check clickhouse minimum supported version, and peerdb internal version for backwards-compatibility + minSupportedVersion, exists := peerdb_clickhouse.GetMinVersion(peerdb_clickhouse.SettingEnableTimeTime64Type) + return exists && chproto.CheckMinVersion(minSupportedVersion, *chVersion) && internalVersion >= shared.InternalVersion_ClickHouseTime64 +} diff --git a/flow/model/record_items.go b/flow/model/record_items.go index e828a31f0a..0333cafa04 100644 --- a/flow/model/record_items.go +++ b/flow/model/record_items.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "math" - "time" "github.com/PeerDB-io/peerdb/flow/shared/datatypes" "github.com/PeerDB-io/peerdb/flow/shared/types" @@ -154,9 +153,9 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]any, error) { case types.QValueDate: jsonStruct[col] = v.Val.Format("2006-01-02") case types.QValueTime: - jsonStruct[col] = time.Time{}.Add(v.Val).Format("15:04:05.999999") + jsonStruct[col] = types.FormatExtendedTimeDuration(v.Val) case types.QValueTimeTZ: - jsonStruct[col] = time.Time{}.Add(v.Val).Format("15:04:05.999999") + jsonStruct[col] = types.FormatExtendedTimeDuration(v.Val) case types.QValueArrayDate: dateArr := v.Val formattedDateArr := make([]string, 0, len(dateArr)) diff --git a/flow/connectors/clickhouse/setting_util.go b/flow/pkg/clickhouse/settings.go similarity index 95% rename from flow/connectors/clickhouse/setting_util.go rename to flow/pkg/clickhouse/settings.go index 9f5309865a..02278f0a6e 100644 --- a/flow/connectors/clickhouse/setting_util.go +++ b/flow/pkg/clickhouse/settings.go @@ -1,4 +1,4 @@ -package connclickhouse +package clickhouse import ( "maps" @@ -20,6 +20,7 @@ const ( SettingThrowOnMaxPartitionsPerInsertBlock CHSetting = "throw_on_max_partitions_per_insert_block" SettingParallelDistributedInsertSelect CHSetting = "parallel_distributed_insert_select" SettingMaxTableSizeToDrop CHSetting = "max_table_size_to_drop" + SettingEnableTimeTime64Type CHSetting = "enable_time_time64_type" ) // CHSettingMinVersions maps setting names to their minimum required ClickHouse versions @@ -28,6 +29,7 @@ var CHSettingMinVersions = map[CHSetting]chproto.Version{ SettingJsonTypeEscapeDotsInKeys: {Major: 25, Minor: 8, Patch: 0}, SettingTypeJsonSkipDuplicatedPaths: {Major: 24, Minor: 8, Patch: 0}, SettingMaxTableSizeToDrop: {Major: 23, Minor: 12, Patch: 0}, + SettingEnableTimeTime64Type: {Major: 25, Minor: 6, Patch: 0}, } type CHSetting string diff --git a/flow/connectors/clickhouse/setting_util_test.go b/flow/pkg/clickhouse/settings_test.go similarity index 99% rename from flow/connectors/clickhouse/setting_util_test.go rename to flow/pkg/clickhouse/settings_test.go index 8b98570153..5795a0c3d7 100644 --- a/flow/connectors/clickhouse/setting_util_test.go +++ b/flow/pkg/clickhouse/settings_test.go @@ -1,4 +1,4 @@ -package connclickhouse +package clickhouse import ( "testing" diff --git a/flow/shared/constants.go b/flow/shared/constants.go index d926c24035..62d62a6321 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -42,7 +42,7 @@ const ( InternalVersion_JsonEscapeDotsInKeys // MongoDB: `_id` column values stored as-is without redundant quotes InternalVersion_MongoDBIdWithoutRedundantQuotes - // ClickHouse: use Time64(3) data type for QValueKindTime when ClickHouse version >= 25.6 + // ClickHouse: use Time64(6) data type for QValueKindTime when ClickHouse version >= 25.6 // MySQL: map TIME type to QValueKindTime instead of QValueKindTimestamp InternalVersion_ClickHouseTime64 diff --git a/flow/shared/types/time_format.go b/flow/shared/types/time_format.go new file mode 100644 index 0000000000..7069412cc5 --- /dev/null +++ b/flow/shared/types/time_format.go @@ -0,0 +1,32 @@ +package types + +import ( + "fmt" + "time" +) + +// FormatExtendedTimeDuration formats a time.Duration as a time string. +// The extended format [-]HHH:MM:SS.xxxxxx supports: +// - Negative values (e.g., "-01:30:00.000000") +// - Hours exceeding 24 (e.g., "838:59:59.999999") +// +// The extended format is compatible with ClickHouse's Time64 type and toTime64OrNull(). +func FormatExtendedTimeDuration(d time.Duration) string { + negative := d < 0 + if negative { + d = -d + } + + totalMicros := d.Microseconds() + totalSeconds := totalMicros / 1_000_000 + micros := totalMicros % 1_000_000 + + hours := totalSeconds / 3600 + minutes := (totalSeconds % 3600) / 60 + seconds := totalSeconds % 60 + + if negative { + return fmt.Sprintf("-%d:%02d:%02d.%06d", hours, minutes, seconds, micros) + } + return fmt.Sprintf("%d:%02d:%02d.%06d", hours, minutes, seconds, micros) +} diff --git a/flow/shared/types/time_format_test.go b/flow/shared/types/time_format_test.go new file mode 100644 index 0000000000..e4ff06c0c7 --- /dev/null +++ b/flow/shared/types/time_format_test.go @@ -0,0 +1,29 @@ +package types + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestFormatExtendedTimeDuration(t *testing.T) { + tests := map[string]time.Duration{ + "0:00:00.000000": 0, + "14:21:00.000000": 14*time.Hour + 21*time.Minute, + "8:30:00.123456": 8*time.Hour + 30*time.Minute + 123456*time.Microsecond, + "24:00:00.000000": 24 * time.Hour, + "-1:30:00.000000": -(1*time.Hour + 30*time.Minute), + "-2:15:30.500000": -(2*time.Hour + 15*time.Minute + 30*time.Second + 500000*time.Microsecond), + "838:59:59.999999": 838*time.Hour + 59*time.Minute + 59*time.Second + 999999*time.Microsecond, + "-838:59:59.000000": -(838*time.Hour + 59*time.Minute + 59*time.Second), + "999:59:59.999999": 999*time.Hour + 59*time.Minute + 59*time.Second + 999999*time.Microsecond, + "-999:59:59.999999": -(999*time.Hour + 59*time.Minute + 59*time.Second + 999999*time.Microsecond), + } + + for expected, duration := range tests { + t.Run(expected, func(t *testing.T) { + require.Equal(t, expected, FormatExtendedTimeDuration(duration)) + }) + } +}