Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions internal/diff/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,179 @@ func generateIndexSQLWithName(index *ir.Index, indexName string, targetSchema st

return builder.String()
}

// generateIndexModifications handles index drops, adds, and online replacements
// Works for both table indexes and materialized view indexes
func generateIndexModifications(
droppedIndexes []*ir.Index,
addedIndexes []*ir.Index,
modifiedIndexes []*IndexDiff,
targetSchema string,
indexDiffType DiffType,
commentDiffType DiffType,
collector *diffCollector,
) {
// Identify indexes that need online replacement (dropped and added with same name)
onlineReplacements := make(map[string]*ir.Index)
regularDrops := []*ir.Index{}

for _, droppedIndex := range droppedIndexes {
foundReplacement := false
for _, addedIndex := range addedIndexes {
if droppedIndex.Name == addedIndex.Name {
onlineReplacements[droppedIndex.Name] = addedIndex
foundReplacement = true
break
}
}
if !foundReplacement {
regularDrops = append(regularDrops, droppedIndex)
}
}

// Remove replaced indexes from added list
remainingAdded := []*ir.Index{}
for _, addedIndex := range addedIndexes {
if _, isReplacement := onlineReplacements[addedIndex.Name]; !isReplacement {
remainingAdded = append(remainingAdded, addedIndex)
}
}

// Drop indexes that are not being replaced
for _, index := range regularDrops {
sql := fmt.Sprintf("DROP INDEX IF EXISTS %s;", qualifyEntityName(index.Schema, index.Name, targetSchema))
context := &diffContext{
Type: indexDiffType,
Operation: DiffOperationDrop,
Path: fmt.Sprintf("%s.%s.%s", index.Schema, index.Table, index.Name),
Source: index,
CanRunInTransaction: true,
}
collector.collect(context, sql)
}

// Handle modified indexes
for _, indexDiff := range modifiedIndexes {
// Check if only comment changed
structurallyEqual := indexesStructurallyEqual(indexDiff.Old, indexDiff.New)
commentChanged := indexDiff.Old.Comment != indexDiff.New.Comment

if structurallyEqual && commentChanged {
// Only comment changed - generate COMMENT ON INDEX statement
generateIndexComment(indexDiff.New, targetSchema, commentDiffType, DiffOperationAlter, collector)
} else {
// Structure changed - use online replacement approach
dropSQL := fmt.Sprintf("DROP INDEX IF EXISTS %s;", qualifyEntityName(indexDiff.Old.Schema, indexDiff.Old.Name, targetSchema))
canonicalSQL := generateIndexSQL(indexDiff.New, targetSchema, false)

statements := []SQLStatement{
{
SQL: dropSQL,
CanRunInTransaction: true,
},
{
SQL: canonicalSQL,
CanRunInTransaction: true,
},
}

alterContext := &diffContext{
Type: indexDiffType,
Operation: DiffOperationAlter,
Path: fmt.Sprintf("%s.%s.%s", indexDiff.New.Schema, indexDiff.New.Table, indexDiff.New.Name),
Source: indexDiff,
CanRunInTransaction: true,
}
collector.collectStatements(alterContext, statements)
}
}

// Process index replacements with online approach
if len(onlineReplacements) > 0 {
// Sort for deterministic order
sortedOnlineIndexNames := make([]string, 0, len(onlineReplacements))
for indexName := range onlineReplacements {
sortedOnlineIndexNames = append(sortedOnlineIndexNames, indexName)
}
sort.Strings(sortedOnlineIndexNames)

for _, indexName := range sortedOnlineIndexNames {
newIndex := onlineReplacements[indexName]

// Step 1: DROP old index, Step 2: CREATE new index
dropSQL := fmt.Sprintf("DROP INDEX IF EXISTS %s;", qualifyEntityName(newIndex.Schema, indexName, targetSchema))
canonicalSQL := generateIndexSQL(newIndex, targetSchema, false)

statements := []SQLStatement{
{
SQL: dropSQL,
CanRunInTransaction: true,
},
{
SQL: canonicalSQL,
CanRunInTransaction: true,
},
}

alterContext := &diffContext{
Type: indexDiffType,
Operation: DiffOperationAlter,
Path: fmt.Sprintf("%s.%s.%s", newIndex.Schema, newIndex.Table, indexName),
Source: newIndex,
CanRunInTransaction: true,
}
collector.collectStatements(alterContext, statements)

// Add index comment if present as a separate operation
if newIndex.Comment != "" {
generateIndexComment(newIndex, targetSchema, commentDiffType, DiffOperationCreate, collector)
}
}
}

// Create new indexes (not replacements)
for _, index := range remainingAdded {
canonicalSQL := generateIndexSQL(index, targetSchema, false)

context := &diffContext{
Type: indexDiffType,
Operation: DiffOperationCreate,
Path: fmt.Sprintf("%s.%s.%s", index.Schema, index.Table, index.Name),
Source: index,
CanRunInTransaction: true,
}

collector.collect(context, canonicalSQL)

// Add index comment if present
if index.Comment != "" {
generateIndexComment(index, targetSchema, commentDiffType, DiffOperationCreate, collector)
}
}
}

// generateIndexComment generates COMMENT ON INDEX statement
func generateIndexComment(
index *ir.Index,
targetSchema string,
diffType DiffType,
operation DiffOperation,
collector *diffCollector,
) {
indexName := qualifyEntityName(index.Schema, index.Name, targetSchema)
var sql string
if index.Comment == "" {
sql = fmt.Sprintf("COMMENT ON INDEX %s IS NULL;", indexName)
} else {
sql = fmt.Sprintf("COMMENT ON INDEX %s IS %s;", indexName, quoteString(index.Comment))
}

context := &diffContext{
Type: diffType,
Operation: operation,
Path: fmt.Sprintf("%s.%s.%s", index.Schema, index.Table, index.Name),
Source: index,
CanRunInTransaction: true,
}
collector.collect(context, sql)
}
155 changes: 10 additions & 145 deletions internal/diff/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,38 +817,6 @@ func (td *tableDiff) generateAlterTableStatements(targetSchema string, collector
collector.collect(context, sql)
}

// Handle online index replacement for CONCURRENT indexes
// First, identify indexes that need online replacement (dropped and added with same name and CONCURRENT)
onlineReplacements := make(map[string]*ir.Index)
regularDrops := []*ir.Index{}

for _, droppedIndex := range td.DroppedIndexes {
foundReplacement := false
for _, addedIndex := range td.AddedIndexes {
if droppedIndex.Name == addedIndex.Name {
onlineReplacements[droppedIndex.Name] = addedIndex
foundReplacement = true
break
}
}
if !foundReplacement {
regularDrops = append(regularDrops, droppedIndex)
}
}

// Regular index drops (not part of online replacement)
for _, index := range regularDrops {
sql := fmt.Sprintf("DROP INDEX IF EXISTS %s;", qualifyEntityName(index.Schema, index.Name, targetSchema))
context := &diffContext{
Type: DiffTypeTableIndex,
Operation: DiffOperationDrop,
Path: fmt.Sprintf("%s.%s.%s", index.Schema, index.Table, index.Name),
Source: index,
CanRunInTransaction: true,
}
collector.collect(context, sql)
}

// Add triggers - already sorted by the Diff operation
for _, trigger := range td.AddedTriggers {
sql := generateTriggerSQLWithMode(trigger, targetSchema)
Expand Down Expand Up @@ -877,98 +845,6 @@ func (td *tableDiff) generateAlterTableStatements(targetSchema string, collector
collector.collect(context, sql)
}

// Process index replacements with improved concurrent approach for zero-downtime
// Sort indexes by name for deterministic output
var sortedOnlineIndexNames []string
for indexName := range onlineReplacements {
sortedOnlineIndexNames = append(sortedOnlineIndexNames, indexName)
}
sort.Strings(sortedOnlineIndexNames)

for _, indexName := range sortedOnlineIndexNames {
newIndex := onlineReplacements[indexName]

// Step 1: DROP old index, Step 2: CREATE new index (canonical approach - has downtime)
dropSQL := fmt.Sprintf("DROP INDEX IF EXISTS %s;", qualifyEntityName(newIndex.Schema, indexName, targetSchema))
canonicalSQL := generateIndexSQL(newIndex, targetSchema, false) // Regular CREATE INDEX

// Create statements for the canonical approach (DROP + CREATE - with downtime)
statements := []SQLStatement{
{
SQL: dropSQL,
CanRunInTransaction: true,
},
{
SQL: canonicalSQL,
CanRunInTransaction: true,
},
}

// Create diff for replacing the index
alterContext := &diffContext{
Type: DiffTypeTableIndex,
Operation: DiffOperationAlter,
Path: fmt.Sprintf("%s.%s.%s", newIndex.Schema, newIndex.Table, indexName),
Source: newIndex,
CanRunInTransaction: true,
}

// Use canonical approach: drop old, create new as a single diff
collector.collectStatements(alterContext, statements)

// Add index comment if present as a separate operation
if newIndex.Comment != "" {
qualifiedIndexName := qualifyEntityName(newIndex.Schema, indexName, targetSchema)
sql := fmt.Sprintf("COMMENT ON INDEX %s IS %s;", qualifiedIndexName, quoteString(newIndex.Comment))

context := &diffContext{
Type: DiffTypeTableIndexComment,
Operation: DiffOperationCreate,
Path: fmt.Sprintf("%s.%s.%s", newIndex.Schema, newIndex.Table, indexName),
Source: newIndex,
CanRunInTransaction: true,
}
collector.collect(context, sql)
}
}

// Add regular indexes (not part of online replacement)
regularAdds := []*ir.Index{}
for _, index := range td.AddedIndexes {
if _, isReplacement := onlineReplacements[index.Name]; !isReplacement {
regularAdds = append(regularAdds, index)
}
}

for _, index := range regularAdds {
canonicalSQL := generateIndexSQL(index, targetSchema, false) // Always generate canonical form

context := &diffContext{
Type: DiffTypeTableIndex,
Operation: DiffOperationCreate,
Path: fmt.Sprintf("%s.%s.%s", index.Schema, index.Table, index.Name),
Source: index,
CanRunInTransaction: true,
}

collector.collect(context, canonicalSQL)

// Add index comment if present
if index.Comment != "" {
indexName := qualifyEntityName(index.Schema, index.Name, targetSchema)
sql := fmt.Sprintf("COMMENT ON INDEX %s IS %s;", indexName, quoteString(index.Comment))

context := &diffContext{
Type: DiffTypeTableIndexComment,
Operation: DiffOperationCreate,
Path: fmt.Sprintf("%s.%s.%s", index.Schema, index.Table, index.Name),
Source: index,
CanRunInTransaction: true,
}
collector.collect(context, sql)
}
}

// Modify triggers - already sorted by the Diff operation
for _, triggerDiff := range td.ModifiedTriggers {
// Constraint triggers don't support CREATE OR REPLACE, so we need to DROP and CREATE
Expand Down Expand Up @@ -1094,27 +970,16 @@ func (td *tableDiff) generateAlterTableStatements(targetSchema string, collector
}
}

// Handle index comment changes
for _, IndexDiff := range td.ModifiedIndexes {
if IndexDiff.Old.Comment != IndexDiff.New.Comment {
indexName := qualifyEntityName(IndexDiff.New.Schema, IndexDiff.New.Name, targetSchema)
var sql string
if IndexDiff.New.Comment == "" {
sql = fmt.Sprintf("COMMENT ON INDEX %s IS NULL;", indexName)
} else {
sql = fmt.Sprintf("COMMENT ON INDEX %s IS %s;", indexName, quoteString(IndexDiff.New.Comment))
}

context := &diffContext{
Type: DiffTypeTableIndexComment,
Operation: DiffOperationAlter,
Path: fmt.Sprintf("%s.%s.%s", IndexDiff.New.Schema, IndexDiff.New.Table, IndexDiff.New.Name),
Source: IndexDiff,
CanRunInTransaction: true,
}
collector.collect(context, sql)
}
}
// Handle index modifications using shared function
generateIndexModifications(
td.DroppedIndexes,
td.AddedIndexes,
td.ModifiedIndexes,
targetSchema,
DiffTypeTableIndex,
DiffTypeTableIndexComment,
collector,
)
}

// ensureCheckClauseParens guarantees that a CHECK clause string contains
Expand Down
Loading