Skip to content

Commit

Permalink
fix clean collection meta command (#229)
Browse files Browse the repository at this point in the history
Signed-off-by: longjiquan <jiquan.long@zilliz.com>
  • Loading branch information
longjiquan committed Dec 14, 2023
1 parent b4d65c5 commit 94afd38
Showing 1 changed file with 28 additions and 11 deletions.
39 changes: 28 additions & 11 deletions states/etcd/remove/collection_dropping.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,34 @@ func cleanCollectionDropMeta(cli clientv3.KV, basePath string, collection *model
fmt.Printf("Collection %s[%d] key is empty string, cannot perform cleanup", collection.Schema.Name, collection.ID)
return
}
// remove collection meta
ops = append(ops, clientv3.OpDelete(collection.Key()))
fmt.Printf("Collection Meta: %s\n", collection.Key())
// remove collection field meta
fieldsPrefix := path.Join(basePath, fmt.Sprintf("root-coord/fields/%d", collection.ID)) + "/"
fmt.Printf("Collection Field Meta(Prefix): %s\n", collection.Key())
ops = append(ops, clientv3.OpDelete(fieldsPrefix, clientv3.WithPrefix()))
// remove collection partition meta
partitionsPrefix := path.Join(basePath, fmt.Sprintf("root-coord/partitions/%d", collection.ID)) + "/"
fmt.Printf("Collection Partition Meta(Prefix): %s\n", partitionsPrefix)
ops = append(ops, clientv3.OpDelete(partitionsPrefix, clientv3.WithPrefix()))

// better to remove collection meta finally for better atomicity.
// TODO: alias meta can't be cleaned conveniently.
prefixes := []string{
// remove collection field meta
path.Join(basePath, fmt.Sprintf("root-coord/fields/%d", collection.ID)) + "/",
path.Join(basePath, common.SnapshotPrefix, fmt.Sprintf("root-coord/fields/%d", collection.ID)) + "/",

// remove collection partition meta
path.Join(basePath, fmt.Sprintf("root-coord/partitions/%d", collection.ID)) + "/",
path.Join(basePath, common.SnapshotPrefix, fmt.Sprintf("root-coord/partitions/%d", collection.ID)) + "/",
}

var collectionKey string
if collection.DBID != 0 {
collectionKey = fmt.Sprintf("root-coord/database/collection-info/%d/%d", collection.DBID, collection.ID)
} else {
collectionKey = fmt.Sprintf("root-coord/collection/%d", collection.ID)
}

// collection will have timestamp suffix, which should be also removed by prefix.
prefixes = append(prefixes, path.Join(basePath, collectionKey))
prefixes = append(prefixes, path.Join(basePath, common.SnapshotPrefix, collectionKey))

for _, prefix := range prefixes {
ops = append(ops, clientv3.OpDelete(prefix, clientv3.WithPrefix()))
fmt.Printf("%s will be cleaned\n", prefix)
}

channelWatchInfos, err := common.ListChannelWatch(context.Background(), cli, basePath, etcdversion.GetVersion(), func(cw *models.ChannelWatch) bool {
return cw.Vchan.CollectionID == collection.ID
Expand Down

0 comments on commit 94afd38

Please sign in to comment.