Skip to content

Broken Insertion Cursor #173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ extension DiskPersistence.Datastore.Index {

var bytesForEntry: Bytes?
var isEntryComplete = false
var previousBlock: DiskPersistence.CursorBlock? = nil
var previousCompleteBlock: DiskPersistence.CursorBlock? = nil
var currentBlock: DiskPersistence.CursorBlock? = nil
var pageIndex = startingPageIndex

Expand Down Expand Up @@ -613,8 +613,15 @@ extension DiskPersistence.Datastore.Index {
/// In the final position, lets save and continue.
bytesForEntry?.append(contentsOf: bytes)
case .tail(let bytes):
/// In the first position, lets skip it.
guard bytesForEntry != nil else { continue }
/// In the first position, lets skip it, but add it as the last complete block in case the next one is a step too far.
guard bytesForEntry != nil else {
previousCompleteBlock = DiskPersistence.CursorBlock(
pageIndex: pageIndex,
page: page,
blockIndex: blockIndex
)
continue
}
/// In the final position, lets save and stop.
bytesForEntry?.append(contentsOf: bytes)
isEntryComplete = true
Expand All @@ -625,7 +632,14 @@ extension DiskPersistence.Datastore.Index {
page: page,
blockIndex: blockIndex
)
defer { previousBlock = currentBlock }

/// Make sure to only keep a reference to the end of the last complete block, so if we roll back, we'll have a valid cursor
defer {
switch block {
case .complete, .tail: previousCompleteBlock = currentBlock
case .head, .slice: break
}
}

if let bytes = bytesForEntry, isEntryComplete {
let entry = try DatastorePageEntry(bytes: bytes, isPartial: false)
Expand All @@ -643,7 +657,7 @@ extension DiskPersistence.Datastore.Index {
persistence: datastore.snapshot.persistence,
datastore: datastore,
index: self,
insertAfter: previousBlock
insertAfter: previousCompleteBlock
)
}

Expand All @@ -658,7 +672,7 @@ extension DiskPersistence.Datastore.Index {
persistence: datastore.snapshot.persistence,
datastore: datastore,
index: self,
insertAfter: previousBlock
insertAfter: previousCompleteBlock
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,66 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase {
XCTAssertEqual(result, expectedIndex, file: file, line: line)
}

func assertInsertionCursor(
proposedEntry: UInt8,
pages: [[DatastorePageEntryBlock]],
pageIndex: Int?,
blockIndex: Int?,
file: StaticString = #filePath,
line: UInt = #line
) async throws {
let persistence = DiskPersistence(readOnlyURL: temporaryStoreURL)
let snapshot = Snapshot(
id: .init(rawValue: "Snapshot"),
persistence: persistence
)
let datastore = DiskPersistence.Datastore(
id: .init(rawValue: "Datastore"),
snapshot: snapshot
)
let index = DiskPersistence.Datastore.Index(
datastore: datastore,
id: .primary(manifest: .init(rawValue: "Index")),
manifest: DatastoreIndexManifest(
id: .init(rawValue: "Index"),
orderedPages: pages.enumerated().map { (index, _) in
.existing(.init(rawValue: "Page \(index)"))
}
)
)

var pageLookup: [DatastorePageIdentifier : DiskPersistence<ReadOnly>.Datastore.Page] = [:]
var pageInfos: [DatastoreIndexManifest.PageInfo] = []

for (index, blocks) in pages.enumerated() {
let pageID = DatastorePageIdentifier(rawValue: "Page \(index)")
let page = DiskPersistence<ReadOnly>.Datastore.Page(
datastore: datastore,
id: .init(
index: .primary(manifest: .init(rawValue: "Index")),
page: pageID
),
blocks: blocks
)
pageLookup[pageID] = page
pageInfos.append(.existing(pageID))
}

let result = try await index.insertionCursor(for: RangeBoundExpression.including(proposedEntry), in: pageInfos) { pageID in
pageLookup[pageID]!
} comparator: { lhs, rhs in
lhs.sortOrder(comparedTo: rhs.headers[0][0], order: .ascending)
}

XCTAssertTrue(result.persistence === persistence, file: file, line: line)
XCTAssertTrue(result.datastore === datastore, file: file, line: line)
XCTAssertEqual(result.index, index, file: file, line: line)
XCTAssertEqual(result.insertAfter?.pageIndex, pageIndex, file: file, line: line)
let pageID = await result.insertAfter?.page.id.page
XCTAssertEqual(pageID, pageIndex.map({ DatastorePageIdentifier(rawValue: "Page \($0)") }), file: file, line: line)
XCTAssertEqual(result.insertAfter?.blockIndex, blockIndex, file: file, line: line)
}

func testEmptyPagesSearch() async throws {
try await assertPageSearch(proposedEntry: 0, pages: [], expectedIndex: nil)
try await assertPageSearch(proposedEntry: 0, pages: [], expectedIndex: nil)
Expand Down Expand Up @@ -330,4 +390,24 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase {
wait(for: [exp], timeout: 200.0)
}
}

func testNonContiguousInsertionCursor() async throws {
let entry1 = DatastorePageEntry(headers: [[1]], content: [1]).blocks(remainingPageSpace: 1024, maxPageSpace: 1024)
let entry2 = DatastorePageEntry(headers: [[2]], content: [2]).blocks(remainingPageSpace: 1024, maxPageSpace: 1024)
let entry3 = DatastorePageEntry(headers: [[3]], content: Array(repeating: 3, count: 100)).blocks(remainingPageSpace: 20, maxPageSpace: 60)
let entry4 = DatastorePageEntry(headers: [[4]], content: [4]).blocks(remainingPageSpace: 1024, maxPageSpace: 1024)

let pages = [
[entry1[0], entry2[0], entry3[0]],
[entry3[1]],
[entry3[2], entry4[0]]
]

try await assertInsertionCursor(proposedEntry: 0, pages: pages, pageIndex: nil, blockIndex: nil)
try await assertInsertionCursor(proposedEntry: 1, pages: pages, pageIndex: nil, blockIndex: nil)
try await assertInsertionCursor(proposedEntry: 2, pages: pages, pageIndex: 0, blockIndex: 0)
try await assertInsertionCursor(proposedEntry: 3, pages: pages, pageIndex: 0, blockIndex: 1)
try await assertInsertionCursor(proposedEntry: 4, pages: pages, pageIndex: 2, blockIndex: 0)
try await assertInsertionCursor(proposedEntry: 5, pages: pages, pageIndex: 2, blockIndex: 1)
}
}