Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
defer dstClose(ctx)

syncState.Store(shared.Ptr("updating schema"))
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas); err != nil {
if err := dstConn.ReplayTableSchemaDeltas(
ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas, config.Version,
); err != nil {
return nil, fmt.Errorf("failed to sync schema: %w", err)
}

Expand Down
1 change: 1 addition & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(
flowJobName string,
_ []*protos.TableMapping,
schemaDeltas []*protos.TableSchemaDelta,
_ uint32,
) error {
for _, schemaDelta := range schemaDeltas {
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(
}

if err := c.ReplayTableSchemaDeltas(
ctx, config.Env, config.FlowJobName, nil, []*protos.TableSchemaDelta{tableSchemaDelta},
ctx, config.Env, config.FlowJobName, nil, []*protos.TableSchemaDelta{tableSchemaDelta}, config.Version,
); err != nil {
return nil, fmt.Errorf("failed to add columns to destination table: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func (s *QRepAvroSyncMethod) SyncRecords(
slog.String(string(shared.FlowNameKey), req.FlowJobName),
slog.String("dstTableName", rawTableName))

if err := s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil {
if err := s.connector.ReplayTableSchemaDeltas(
ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version,
); err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

Expand Down
11 changes: 7 additions & 4 deletions flow/connectors/clickhouse/avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,14 @@ func (s *ClickHouseAvroSyncMethod) pushS3DataToClickHouseForSnapshot(
}
numParts = max(numParts, 1)

chSettings := NewCHSettings(s.chVersion)
chSettings.Add(SettingThrowOnMaxPartitionsPerInsertBlock, "0")
chSettings.Add(SettingTypeJsonSkipDuplicatedPaths, "1")
chSettings := peerdb_clickhouse.NewCHSettings(s.chVersion)
chSettings.Add(peerdb_clickhouse.SettingThrowOnMaxPartitionsPerInsertBlock, "0")
chSettings.Add(peerdb_clickhouse.SettingTypeJsonSkipDuplicatedPaths, "1")
if config.Version >= shared.InternalVersion_JsonEscapeDotsInKeys {
chSettings.Add(SettingJsonTypeEscapeDotsInKeys, "1")
chSettings.Add(peerdb_clickhouse.SettingJsonTypeEscapeDotsInKeys, "1")
}
if config.Version >= shared.InternalVersion_ClickHouseTime64 {
chSettings.Add(peerdb_clickhouse.SettingEnableTimeTime64Type, "1")
}

// Process each chunk file individually
Expand Down
11 changes: 7 additions & 4 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
}
warnings := numericTruncator.Warnings()

if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil {
if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version); err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

Expand Down Expand Up @@ -165,6 +165,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
flowJobName string,
tableMappings []*protos.TableMapping,
schemaDeltas []*protos.TableSchemaDelta,
internalVersion uint32,
) error {
if len(schemaDeltas) == 0 {
return nil
Expand All @@ -188,7 +189,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
for _, addedColumn := range schemaDelta.AddedColumns {
qvKind := types.QValueKind(addedColumn.Type)
clickHouseColType, err := qvalue.ToDWHColumnType(
ctx, qvKind, env, protos.DBType_CLICKHOUSE, c.chVersion, addedColumn, schemaDelta.NullableEnabled,
ctx, qvKind, env, protos.DBType_CLICKHOUSE, c.chVersion, addedColumn, schemaDelta.NullableEnabled, internalVersion,
)
if err != nil {
return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err)
Expand Down Expand Up @@ -228,7 +229,8 @@ func (c *ClickHouseConnector) RenameTables(
req *protos.RenameTablesInput,
) (*protos.RenameTablesOutput, error) {
onCluster := c.onCluster()
dropTableSQLWithCHSetting := dropTableIfExistsSQL + NewCHSettingsString(c.chVersion, SettingMaxTableSizeToDrop, "0")
dropTableSQLWithCHSetting := dropTableIfExistsSQL +
peerdb_clickhouse.NewCHSettingsString(c.chVersion, peerdb_clickhouse.SettingMaxTableSizeToDrop, "0")
for _, renameRequest := range req.RenameTableOptions {
if renameRequest.CurrentName == renameRequest.NewName {
c.logger.Info("table rename is nop, probably Null table engine, skipping rename for it",
Expand Down Expand Up @@ -302,7 +304,8 @@ func (c *ClickHouseConnector) SyncFlowCleanup(ctx context.Context, jobName strin
// delete raw table if exists
rawTableIdentifier := c.GetRawTableName(jobName)
onCluster := c.onCluster()
dropTableSQLWithCHSetting := dropTableIfExistsSQL + NewCHSettingsString(c.chVersion, SettingMaxTableSizeToDrop, "0")
dropTableSQLWithCHSetting := dropTableIfExistsSQL +
peerdb_clickhouse.NewCHSettingsString(c.chVersion, peerdb_clickhouse.SettingMaxTableSizeToDrop, "0")
if err := c.execWithLogging(ctx,
fmt.Sprintf(dropTableSQLWithCHSetting, peerdb_clickhouse.QuoteIdentifier(rawTableIdentifier), onCluster),
); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ func GetTableSchemaForTable(tm *protos.TableMapping, columns []driver.ColumnType
qkind = types.QValueKindUUID
case "DateTime64(6)", "Nullable(DateTime64(6))", "DateTime64(9)", "Nullable(DateTime64(9))":
qkind = types.QValueKindTimestamp
case "Time64(6)", "Nullable(Time64(6))":
qkind = types.QValueKindTime
case "Date32", "Nullable(Date32)":
qkind = types.QValueKindDate
case "Float32", "Nullable(Float32)":
Expand Down
13 changes: 11 additions & 2 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/PeerDB-io/peerdb/flow/model/qvalue"
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
"github.com/PeerDB-io/peerdb/flow/pkg/common"
"github.com/PeerDB-io/peerdb/flow/shared"
"github.com/PeerDB-io/peerdb/flow/shared/types"
)

Expand Down Expand Up @@ -66,6 +67,7 @@ func (c *ClickHouseConnector) SetupNormalizedTable(
destinationTableIdentifier,
sourceTableSchema,
c.chVersion,
config.Version,
)
if err != nil {
return false, fmt.Errorf("error while generating create table sql for destination ClickHouse table: %w", err)
Expand All @@ -85,6 +87,7 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
tableIdentifier string,
tableSchema *protos.TableSchema,
chVersion *chproto.Version,
internalVersion uint32,
) ([]string, error) {
var engine string
tmEngine := protos.TableEngine_CH_ENGINE_REPLACING_MERGE_TREE
Expand Down Expand Up @@ -203,7 +206,8 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
if clickHouseType == "" {
var err error
clickHouseType, err = qvalue.ToDWHColumnType(
ctx, colType, config.Env, protos.DBType_CLICKHOUSE, chVersion, column, tableSchema.NullableEnabled || columnNullableEnabled,
ctx, colType, config.Env, protos.DBType_CLICKHOUSE, chVersion, column,
tableSchema.NullableEnabled || columnNullableEnabled, internalVersion,
)
if err != nil {
return nil, fmt.Errorf("error while converting column type to ClickHouse type: %w", err)
Expand Down Expand Up @@ -263,9 +267,14 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
}
}

settings := peerdb_clickhouse.NewCHSettings(chVersion)
if allowNullableKey {
stmtBuilder.WriteString(NewCHSettingsString(chVersion, SettingAllowNullableKey, "1"))
settings.Add(peerdb_clickhouse.SettingAllowNullableKey, "1")
}
if internalVersion >= shared.InternalVersion_ClickHouseTime64 {
settings.Add(peerdb_clickhouse.SettingEnableTimeTime64Type, "1")
}
stmtBuilder.WriteString(settings.String())

if c.Config.Cluster != "" {
fmt.Fprintf(&stmtBuilderDistributed, " ENGINE = Distributed(%s,%s,%s",
Expand Down
37 changes: 27 additions & 10 deletions flow/connectors/clickhouse/normalize_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,27 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error
if clickHouseType == "" {
var err error
clickHouseType, err = qvalue.ToDWHColumnType(
ctx, colType, t.env, protos.DBType_CLICKHOUSE, t.chVersion, column, schema.NullableEnabled || columnNullableEnabled,
ctx, colType, t.env, protos.DBType_CLICKHOUSE, t.chVersion, column, schema.NullableEnabled || columnNullableEnabled, t.version,
)
if err != nil {
return "", fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}
}

switch clickHouseType {
case "Time64(6)", "Nullable(Time64(6))":
fmt.Fprintf(&projection,
"toTime64OrNull(JSONExtractString(_peerdb_data, %s)) AS %s,",
peerdb_clickhouse.QuoteLiteral(colName),
peerdb_clickhouse.QuoteIdentifier(dstColName),
)
if t.enablePrimaryUpdate {
fmt.Fprintf(&projectionUpdate,
"toTime64OrNull(JSONExtractString(_peerdb_match_data, %s)) AS %s,",
peerdb_clickhouse.QuoteLiteral(colName),
peerdb_clickhouse.QuoteIdentifier(dstColName),
)
}
case "Date32", "Nullable(Date32)":
fmt.Fprintf(&projection,
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, %s),6,'UTC')) AS %s,",
Expand All @@ -145,18 +158,19 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error
)
}
case "DateTime64(6)", "Nullable(DateTime64(6))":
// Handle legacy path where TIME is stored as DateTime64 (before Time64 support)
if colType == types.QValueKindTime || colType == types.QValueKindTimeTZ {
// parseDateTime64BestEffortOrNull for hh:mm:ss puts the year as current year
// (or previous year if result would be in future) so explicitly anchor to unix epoch
fmt.Fprintf(&projection,
"parseDateTime64BestEffortOrNull('1970-01-01 ' || JSONExtractString(_peerdb_data, %s),6,'UTC') AS %s,",
"CAST(toTime64OrNull(JSONExtractString(_peerdb_data, %s)), '%s') AS %s,",
peerdb_clickhouse.QuoteLiteral(colName),
clickHouseType,
peerdb_clickhouse.QuoteIdentifier(dstColName),
)
if t.enablePrimaryUpdate {
fmt.Fprintf(&projectionUpdate,
"parseDateTime64BestEffortOrNull('1970-01-01 ' || JSONExtractString(_peerdb_match_data, %s),6,'UTC') AS %s,",
"CAST(toTime64OrNull(JSONExtractString(_peerdb_match_data, %s)), '%s') AS %s,",
peerdb_clickhouse.QuoteLiteral(colName),
clickHouseType,
peerdb_clickhouse.QuoteIdentifier(dstColName),
)
}
Expand Down Expand Up @@ -297,14 +311,17 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error
t.lastNormBatchID, t.endBatchID, peerdb_clickhouse.QuoteLiteral(t.TableName))
}

chSettings := NewCHSettings(t.chVersion)
chSettings.Add(SettingThrowOnMaxPartitionsPerInsertBlock, "0")
chSettings.Add(SettingTypeJsonSkipDuplicatedPaths, "1")
chSettings := peerdb_clickhouse.NewCHSettings(t.chVersion)
chSettings.Add(peerdb_clickhouse.SettingThrowOnMaxPartitionsPerInsertBlock, "0")
chSettings.Add(peerdb_clickhouse.SettingTypeJsonSkipDuplicatedPaths, "1")
if t.cluster {
chSettings.Add(SettingParallelDistributedInsertSelect, "0")
chSettings.Add(peerdb_clickhouse.SettingParallelDistributedInsertSelect, "0")
}
if t.version >= shared.InternalVersion_JsonEscapeDotsInKeys {
chSettings.Add(SettingJsonTypeEscapeDotsInKeys, "1")
chSettings.Add(peerdb_clickhouse.SettingJsonTypeEscapeDotsInKeys, "1")
}
if t.version >= shared.InternalVersion_ClickHouseTime64 {
chSettings.Add(peerdb_clickhouse.SettingEnableTimeTime64Type, "1")
}

insertIntoSelectQuery := fmt.Sprintf("INSERT INTO %s %s %s%s",
Expand Down
32 changes: 30 additions & 2 deletions flow/connectors/clickhouse/table_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,36 @@ func jsonFieldExpressionConverter(
return fmt.Sprintf("CAST(%s, 'JSON')", sourceFieldIdentifier), nil
}

func timeFieldExpressionConverter(
_ context.Context,
config *insertFromTableFunctionConfig,
sourceFieldIdentifier string,
field types.QField,
) (string, error) {
if field.Type != types.QValueKindTime {
return sourceFieldIdentifier, nil
}

// Skip for BigQuery source - TIME comes as Int64 from Parquet, not String
if config.config.SourceType == protos.DBType_BIGQUERY {
return sourceFieldIdentifier, nil
}

// Handle legacy path where TIME is stored as DateTime64, before ClickHouse supported Time64 type
if !qvalue.ShouldUseTime64Type(config.connector.chVersion, config.config.Version) {
destType := "DateTime64(6)"
if field.Nullable {
destType = "Nullable(DateTime64(6))"
}
return fmt.Sprintf("CAST(toTime64OrNull(%s), '%s')", sourceFieldIdentifier, destType), nil
}

return fmt.Sprintf("toTime64OrNull(%s)", sourceFieldIdentifier), nil
}

var defaultFieldExpressionConverters = []fieldExpressionConverter{
jsonFieldExpressionConverter,
timeFieldExpressionConverter,
}

// buildInsertFromTableFunctionQuery builds a complete INSERT query from a table function expression
Expand All @@ -62,7 +90,7 @@ func buildInsertFromTableFunctionQuery(
ctx context.Context,
config *insertFromTableFunctionConfig,
tableFunctionExpr string,
chSettings *CHSettings,
chSettings *peerdb_clickhouse.CHSettings,
) (string, error) {
fieldExpressionConverters := defaultFieldExpressionConverters
fieldExpressionConverters = append(fieldExpressionConverters, config.fieldExpressionConverters...)
Expand Down Expand Up @@ -142,7 +170,7 @@ func buildInsertFromTableFunctionQueryWithPartitioning(
tableFunctionExpr string,
partitionIndex uint64,
totalPartitions uint64,
chSettings *CHSettings,
chSettings *peerdb_clickhouse.CHSettings,
) (string, error) {
var query strings.Builder

Expand Down
14 changes: 9 additions & 5 deletions flow/connectors/clickhouse/table_function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/PeerDB-io/peerdb/flow/generated/protos"
peerdb_clickhouse "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
"github.com/PeerDB-io/peerdb/flow/shared/types"
)

Expand All @@ -33,14 +34,16 @@ func TestBuildInsertFromTableFunctionQuery(t *testing.T) {
}

tableFunctionExpr := "s3('s3://bucket/key', 'format')"
chSettings := NewCHSettings(&chproto.Version{Major: 25, Minor: 8})
chSettings.Add(SettingTypeJsonSkipDuplicatedPaths, "1")
chSettings := peerdb_clickhouse.NewCHSettings(&chproto.Version{Major: 25, Minor: 8})
chSettings.Add(peerdb_clickhouse.SettingTypeJsonSkipDuplicatedPaths, "1")

// without partitioning
query, err := buildInsertFromTableFunctionQuery(ctx, config, tableFunctionExpr, chSettings)
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("INSERT INTO `t1`(`id`,`name`) SELECT `id`,`name` FROM s3('s3://bucket/key', 'format') SETTINGS %s=%s",
string(SettingTypeJsonSkipDuplicatedPaths), "1"), query)
require.Equal(t,
fmt.Sprintf("INSERT INTO `t1`(`id`,`name`) SELECT `id`,`name` FROM s3('s3://bucket/key', 'format') SETTINGS %s=%s",
string(peerdb_clickhouse.SettingTypeJsonSkipDuplicatedPaths), "1"),
query)

// with partitioning
totalPartitions := uint64(8)
Expand All @@ -49,6 +52,7 @@ func TestBuildInsertFromTableFunctionQuery(t *testing.T) {
require.NoError(t, err)
require.Equal(t, query,
"INSERT INTO `t1`(`id`,`name`) SELECT `id`,`name` FROM s3('s3://bucket/key', 'format')"+
fmt.Sprintf(" WHERE cityHash64(`id`) %% 8 = %d SETTINGS %s=%s", idx, string(SettingTypeJsonSkipDuplicatedPaths), "1"))
fmt.Sprintf(" WHERE cityHash64(`id`) %% 8 = %d SETTINGS %s=%s",
idx, string(peerdb_clickhouse.SettingTypeJsonSkipDuplicatedPaths), "1"))
}
}
2 changes: 1 addition & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ type CDCSyncConnectorCore interface {
// This could involve adding multiple columns.
// Connectors which are non-normalizing should implement this as a nop.
ReplayTableSchemaDeltas(ctx context.Context, env map[string]string, flowJobName string,
tableMappings []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta) error
tableMappings []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, internalVersion uint32) error
}

type CDCSyncConnector interface {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (esc *ElasticsearchConnector) CreateRawTable(ctx context.Context,

// we handle schema changes by not handling them since no mapping is being enforced right now
func (esc *ElasticsearchConnector) ReplayTableSchemaDeltas(ctx context.Context, env map[string]string,
flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta,
flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, _ uint32,
) error {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (c *EventHubConnector) CreateRawTable(ctx context.Context, req *protos.Crea
}

func (c *EventHubConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string,
flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta,
flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, _ uint32,
) error {
return nil
}
2 changes: 1 addition & 1 deletion flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (c *KafkaConnector) CreateRawTable(ctx context.Context, req *protos.CreateR
}

func (c *KafkaConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string,
flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta,
flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, _ uint32,
) error {
return nil
}
Expand Down
Loading
Loading