Skip to content

Commit

Permalink
Remove duplicated version vector from pack (#1024)
Browse files Browse the repository at this point in the history
Unify VersionVectors in ChangePack across three scenarios:

1. Pushing pack to server: represents document's current version vector
2. Pulling pack: represents minSyncedVersionVector for GC
3. Pulling pack(snapshot): represents snapshot's version vector at creation

This commit simplifies the codebase and ensures consistent version
vector handling throughout the document synchronization process.
  • Loading branch information
JOOHOJANG authored Oct 2, 2024
1 parent 42ca4e2 commit d267c11
Show file tree
Hide file tree
Showing 12 changed files with 855 additions and 891 deletions.
20 changes: 7 additions & 13 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,25 +102,19 @@ func FromChangePack(pbPack *api.ChangePack) (*change.Pack, error) {
return nil, err
}

minSyncedVersionVector, err := FromVersionVector(pbPack.MinSyncedVersionVector)
if err != nil {
return nil, err
}

minSyncedTicket, err := fromTimeTicket(pbPack.MinSyncedTicket)
if err != nil {
return nil, err
}

pack := &change.Pack{
DocumentKey: key.Key(pbPack.DocumentKey),
Checkpoint: fromCheckpoint(pbPack.Checkpoint),
Changes: changes,
Snapshot: pbPack.Snapshot,
IsRemoved: pbPack.IsRemoved,
VersionVector: versionVector,
MinSyncedVersionVector: minSyncedVersionVector,
MinSyncedTicket: minSyncedTicket,
DocumentKey: key.Key(pbPack.DocumentKey),
Checkpoint: fromCheckpoint(pbPack.Checkpoint),
Changes: changes,
Snapshot: pbPack.Snapshot,
IsRemoved: pbPack.IsRemoved,
VersionVector: versionVector,
MinSyncedTicket: minSyncedTicket,
}

return pack, nil
Expand Down
6 changes: 0 additions & 6 deletions api/docs/yorkie/v1/resources.openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,6 @@ components:
description: Deprecated
title: min_synced_ticket
type: object
minSyncedVersionVector:
$ref: '#/components/schemas/yorkie.v1.VersionVector'
additionalProperties: false
description: ""
title: min_synced_version_vector
type: object
snapshot:
additionalProperties: false
description: ""
Expand Down
6 changes: 0 additions & 6 deletions api/docs/yorkie/v1/yorkie.openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -470,12 +470,6 @@ components:
description: Deprecated
title: min_synced_ticket
type: object
minSyncedVersionVector:
$ref: '#/components/schemas/yorkie.v1.VersionVector'
additionalProperties: false
description: ""
title: min_synced_version_vector
type: object
snapshot:
additionalProperties: false
description: ""
Expand Down
1,646 changes: 816 additions & 830 deletions api/yorkie/v1/resources.pb.go

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion api/yorkie/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ message ChangePack {
repeated Change changes = 4;
bool is_removed = 6;
VersionVector version_vector = 7;
VersionVector min_synced_version_vector = 8;

TimeTicket min_synced_ticket = 5; // Deprecated
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/document/change/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ type Pack struct {
// 2. In response(Snapshot), it is the version vector of the snapshot of the document.
VersionVector time.VersionVector

// TODO(hackerwins): Consider to merge MinSyncedVersionVector with VersionVector.
// MinSyncedVersionVector is the minimum version vector taken by clients who
// attach the document.
MinSyncedVersionVector time.VersionVector

// IsRemoved is a flag that indicates whether the document is removed.
IsRemoved bool

Expand Down
12 changes: 7 additions & 5 deletions pkg/document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ func (d *Document) Update(
// ApplyChangePack applies the given change pack into this document.
func (d *Document) ApplyChangePack(pack *change.Pack) error {
// 01. Apply remote changes to both the cloneRoot and the document.
if len(pack.Snapshot) > 0 {
hasSnapshot := len(pack.Snapshot) > 0

if hasSnapshot {
d.cloneRoot = nil
d.clonePresences = nil
if err := d.doc.applySnapshot(pack.Snapshot, pack.Checkpoint.ServerSeq, pack.VersionVector); err != nil {
Expand Down Expand Up @@ -215,13 +217,13 @@ func (d *Document) ApplyChangePack(pack *change.Pack) error {
d.doc.checkpoint = d.doc.checkpoint.Forward(pack.Checkpoint)

// 04. Do Garbage collection.
if !d.options.DisableGC {
d.GarbageCollect(pack.MinSyncedVersionVector)
if !d.options.DisableGC && !hasSnapshot {
d.GarbageCollect(pack.VersionVector)
}

// 05. Remove detached client's lamport from version vector if it exists
if pack.MinSyncedVersionVector != nil {
actorIDs, err := pack.MinSyncedVersionVector.Keys()
if pack.VersionVector != nil && !hasSnapshot {
actorIDs, err := pack.VersionVector.Keys()
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/document/document_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ func TestDocument(t *testing.T) {
docB := document.New("doc")
docB.SetActor(actorB)
assert.Equal(t, "{}", docB.VersionVector().Marshal())
// NOTE(JOOHOJANG): Normally, docB's Lamport timestamp should be included in pack.versionVector because pack is applied after docB is attached.
// However, since this is not the case in this test method, docB's Lamport timestamp is manually added to packA's versionVector.
// In actual use, since changePacks cannot be exchanged directly between clients without going through a server, the following handling was added.
packA.VersionVector.Set(docB.ActorID(), docB.VersionVector().VersionOf(docB.ActorID()))
assert.NoError(t, docB.ApplyChangePack(packA))
assert.Equal(t, "{000000000000000000000001:2,000000000000000000000002:3}", docB.VersionVector().Marshal())

Expand Down
12 changes: 7 additions & 5 deletions pkg/document/internal_document.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,10 @@ func (d *InternalDocument) HasLocalChanges() bool {

// ApplyChangePack applies the given change pack into this document.
func (d *InternalDocument) ApplyChangePack(pack *change.Pack, disableGC bool) error {
hasSnapshot := len(pack.Snapshot) > 0

// 01. Apply remote changes to both the cloneRoot and the document.
if len(pack.Snapshot) > 0 {
if hasSnapshot {
if err := d.applySnapshot(pack.Snapshot, pack.Checkpoint.ServerSeq, pack.VersionVector); err != nil {
return err
}
Expand All @@ -167,15 +169,15 @@ func (d *InternalDocument) ApplyChangePack(pack *change.Pack, disableGC bool) er
// 03. Update the checkpoint.
d.checkpoint = d.checkpoint.Forward(pack.Checkpoint)

if !disableGC && pack.MinSyncedTicket != nil {
if _, err := d.GarbageCollect(pack.MinSyncedVersionVector); err != nil {
if !disableGC && pack.VersionVector != nil && !hasSnapshot {
if _, err := d.GarbageCollect(pack.VersionVector); err != nil {
return err
}
}

// 04. Remove detached client's lamport from version vector if it exists
if pack.MinSyncedVersionVector != nil {
actorIDs, err := pack.MinSyncedVersionVector.Keys()
if pack.VersionVector != nil && !hasSnapshot {
actorIDs, err := pack.VersionVector.Keys()
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ func PushPull(
if err != nil {
return nil, err
}
respPack.MinSyncedVersionVector = minSyncedVersionVector
if respPack.SnapshotLen() == 0 {
respPack.VersionVector = minSyncedVersionVector
}

// TODO(hackerwins): This is a previous implementation before the version
// vector was introduced. But it is necessary to support the previous
Expand Down Expand Up @@ -214,7 +216,7 @@ func PushPull(
ctx,
be,
docInfo,
minSyncedTicket,
minSyncedVersionVector,
); err != nil {
logging.From(ctx).Error(err)
}
Expand Down
14 changes: 0 additions & 14 deletions server/packs/serverpacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ type ServerPack struct {
// 2. In response(Snapshot), it is the version vector of the snapshot of the document.
VersionVector time.VersionVector

// TODO(hackerwins): Consider to merge MinSyncedVersionVector with VersionVector.
// MinSyncedVersionVector is the minimum version vector taken by clients who
// attach the document.
MinSyncedVersionVector time.VersionVector

// IsRemoved is a flag that indicates whether the document is removed.
IsRemoved bool

Expand Down Expand Up @@ -150,15 +145,6 @@ func (p *ServerPack) ToPBChangePack() (*api.ChangePack, error) {

pbPack.VersionVector = pbVersionVector

if p.MinSyncedVersionVector != nil {
pbMinSyncedVersionVector, err := converter.ToVersionVector(p.MinSyncedVersionVector)
if err != nil {
return nil, err
}

pbPack.MinSyncedVersionVector = pbMinSyncedVersionVector
}

return pbPack, nil
}

Expand Down
14 changes: 10 additions & 4 deletions server/packs/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func storeSnapshot(
ctx context.Context,
be *backend.Backend,
docInfo *database.DocInfo,
minSyncedTicket *time.Ticket,
minSyncedVersionVector time.VersionVector,
) error {
// 01. get the closest snapshot's metadata of this docInfo
docRefKey := docInfo.RefKey()
Expand Down Expand Up @@ -91,13 +91,19 @@ func storeSnapshot(
nil,
nil,
)
pack.MinSyncedTicket = minSyncedTicket

if err := doc.ApplyChangePack(pack, be.Config.SnapshotDisableGC); err != nil {
return err
}

// 04. save the snapshot of the docInfo
// 04. perform garbage collect to remove tombstones
if !be.Config.SnapshotDisableGC {
if _, err := doc.GarbageCollect(minSyncedVersionVector); err != nil {
return err
}
}

// 05. save the snapshot of the docInfo
if err := be.DB.CreateSnapshotInfo(
ctx,
docRefKey,
Expand All @@ -106,7 +112,7 @@ func storeSnapshot(
return err
}

// 05. delete changes before the smallest in `syncedseqs` to save storage.
// 06. delete changes before the smallest in `syncedseqs` to save storage.
if be.Config.SnapshotWithPurgingChanges {
if err := be.DB.PurgeStaleChanges(
ctx,
Expand Down

0 comments on commit d267c11

Please sign in to comment.