From 26b9032b4c2d246f71aabda63a2e8a338a4c5aff Mon Sep 17 00:00:00 2001 From: rlawrenc <6463856+rlawrenc@users.noreply.github.com> Date: Tue, 10 Feb 2026 22:09:20 +0000 Subject: [PATCH] update distribution files --- lib/Distribution/embedDB.c | 932 +++++++++++++++++++++++++------------ lib/Distribution/embedDB.h | 44 +- 2 files changed, 666 insertions(+), 310 deletions(-) diff --git a/lib/Distribution/embedDB.c b/lib/Distribution/embedDB.c index ee42905..bf90397 100644 --- a/lib/Distribution/embedDB.c +++ b/lib/Distribution/embedDB.c @@ -483,7 +483,7 @@ int8_t embedDBInitVarDataFromFile(embedDBState *state); int8_t shiftRecordLevelConsistencyBlocks(embedDBState *state); void embedDBInitSplineFromFile(embedDBState *state); int32_t getMaxError(embedDBState *state, void *buffer); -void updateMaxiumError(embedDBState *state, void *buffer); +void updateMaximumError(embedDBState *state, void *buffer); int8_t embedDBSetupVarDataStream(embedDBState *state, void *key, embedDBVarDataStream **varData, id_t recordNumber); uint32_t cleanSpline(embedDBState *state, uint32_t minPageNumber); void readToWriteBuf(embedDBState *state); @@ -634,7 +634,7 @@ int8_t embedDBInit(embedDBState *state, size_t indexMaxError) { return -1; } - /* Initalize the spline structure if being used */ + /* Initialize the spline structure if being used */ if (!EMBEDDB_USING_BINARY_SEARCH(state->parameters)) { if (state->numSplinePoints < 4) { #ifdef PRINT_ERRORS @@ -763,7 +763,7 @@ int8_t embedDBInitDataFromFile(embedDBState *state) { hasData = true; maxLogicalPageId = logicalPageId; physicalPageId++; - updateMaxiumError(state, buffer); + updateMaximumError(state, buffer); count++; i = 2; } else { @@ -784,7 +784,7 @@ int8_t embedDBInitDataFromFile(embedDBState *state) { if (validData && logicalPageId == maxLogicalPageId + 1) { maxLogicalPageId = logicalPageId; physicalPageId++; - updateMaxiumError(state, buffer); + updateMaximumError(state, buffer); moreToRead = !(readPage(state, physicalPageId)); count++; } else { @@ -807,7 +807,7 @@ int8_t embedDBInitDataFromFile(embedDBState *state) { physicalPageId = (physicalPageId + pagesToBlockBoundary) % state->numDataPages; moreToRead = !(readPage(state, physicalPageId)); - /* there should have been more to read becuase the file should not be empty at this point if it was not empty at the previous block */ + /* there should have been more to read because the file should not be empty at this point if it was not empty at the previous block */ if (!moreToRead) { return -1; } @@ -864,7 +864,7 @@ int8_t embedDBInitDataFromFileWithRecordLevelConsistency(embedDBState *state) { hasPermanentData = true; maxLogicalPageId = logicalPageId; physicalPageId++; - updateMaxiumError(state, buffer); + updateMaximumError(state, buffer); count++; i = 4; } else { @@ -882,7 +882,7 @@ int8_t embedDBInitDataFromFileWithRecordLevelConsistency(embedDBState *state) { if (validData && logicalPageId == maxLogicalPageId + 1) { maxLogicalPageId = logicalPageId; physicalPageId++; - updateMaxiumError(state, buffer); + updateMaximumError(state, buffer); moreToRead = !(readPage(state, physicalPageId)); count++; } else { @@ -916,7 +916,7 @@ int8_t embedDBInitDataFromFileWithRecordLevelConsistency(embedDBState *state) { /* record-level consistency recovery algorithm */ uint32_t numPagesRead = 0; uint32_t numPagesToRead = blockSize * 2; - uint32_t rlcMaxLogicialPageNumber = UINT32_MAX; + uint32_t rlcMaxLogicalPageNumber = UINT32_MAX; uint32_t rlcMaxRecordCount = UINT32_MAX; uint32_t rlcMaxPage = UINT32_MAX; moreToRead = !(readPage(state, physicalPageId)); @@ -929,7 +929,7 @@ int8_t embedDBInitDataFromFileWithRecordLevelConsistency(embedDBState *state) { uint32_t numRecords = EMBEDDB_GET_COUNT(buffer); if (rlcMaxRecordCount == UINT32_MAX || numRecords > rlcMaxRecordCount) { rlcMaxRecordCount = numRecords; - rlcMaxLogicialPageNumber = logicalPageId; + rlcMaxLogicalPageNumber = logicalPageId; rlcMaxPage = numPagesRead; } } @@ -938,11 +938,11 @@ int8_t embedDBInitDataFromFileWithRecordLevelConsistency(embedDBState *state) { numPagesRead++; } - /* need to find larged record-level consistency page to place back into the buffer and either one or both of the record-level consistency pages */ + /* need to find large record-level consistency page to place back into the buffer and either one or both of the record-level consistency pages */ uint32_t eraseStartingPage = 0; uint32_t eraseEndingPage = 0; uint32_t numBlocksToErase = 0; - if (rlcMaxLogicialPageNumber == UINT32_MAX) { + if (rlcMaxLogicalPageNumber == UINT32_MAX) { eraseStartingPage = state->rlcPhysicalStartingPage % state->numDataPages; numBlocksToErase = 2; } else { @@ -1079,7 +1079,7 @@ int8_t embedDBInitIndex(embedDBState *state) { int8_t embedDBInitIndexFromFile(embedDBState *state) { id_t logicalIndexPageId = 0; - id_t maxLogicaIndexPageId = 0; + id_t maxLogicalIndexPageId = 0; id_t physicalIndexPageId = 0; /* This will become zero if there is no more to read */ @@ -1091,13 +1091,13 @@ int8_t embedDBInitIndexFromFile(embedDBState *state) { while (moreToRead && count < state->numIndexPages) { memcpy(&logicalIndexPageId, buffer, sizeof(id_t)); - if (count == 0 || logicalIndexPageId == maxLogicaIndexPageId + 1) { - maxLogicaIndexPageId = logicalIndexPageId; + if (count == 0 || logicalIndexPageId == maxLogicalIndexPageId + 1) { + maxLogicalIndexPageId = logicalIndexPageId; physicalIndexPageId++; moreToRead = !(readIndexPage(state, physicalIndexPageId)); count++; } else { - haveWrappedInMemory = logicalIndexPageId == maxLogicaIndexPageId - state->numIndexPages + 1; + haveWrappedInMemory = logicalIndexPageId == maxLogicalIndexPageId - state->numIndexPages + 1; break; } } @@ -1105,20 +1105,20 @@ int8_t embedDBInitIndexFromFile(embedDBState *state) { if (count == 0) return 0; - state->nextIdxPageId = maxLogicaIndexPageId + 1; + state->nextIdxPageId = maxLogicalIndexPageId + 1; id_t physicalPageIDOfSmallestData = 0; if (haveWrappedInMemory) { physicalPageIDOfSmallestData = logicalIndexPageId % state->numIndexPages; } readIndexPage(state, physicalPageIDOfSmallestData); memcpy(&(state->minIndexPageId), buffer, sizeof(id_t)); - state->numAvailIndexPages = state->numIndexPages + state->minIndexPageId - maxLogicaIndexPageId - 1; + state->numAvailIndexPages = state->numIndexPages + state->minIndexPageId - maxLogicalIndexPageId - 1; return 0; } int8_t embedDBInitVarData(embedDBState *state) { - // Initialize variable data outpt buffer + // Initialize variable data output buffer initBufferPage(state, EMBEDDB_VAR_WRITE_BUFFER(state->parameters)); state->variableDataHeaderSize = state->keySize + sizeof(id_t); @@ -1219,7 +1219,7 @@ int8_t embedDBInitVarDataFromFile(embedDBState *state) { physicalVariablePageId = (physicalVariablePageId + pagesToBlockBoundary) % state->numVarPages; moreToRead = !(readVariablePage(state, physicalVariablePageId)); - /* there should have been more to read becuase the file should not be empty at this point if it was not empty at the previous block */ + /* there should have been more to read because the file should not be empty at this point if it was not empty at the previous block */ if (!moreToRead) { return -1; } @@ -1477,7 +1477,7 @@ int8_t embedDBPut(embedDBState *state, void *key, void *data) { memcpy((void *)((int8_t *)buf + EMBEDDB_IDX_HEADER_SIZE + state->bitmapSize * idxcount), bm, state->bitmapSize); } - updateMaxiumError(state, state->buffer); + updateMaximumError(state, state->buffer); count = 0; initBufferPage(state, 0); @@ -1561,7 +1561,7 @@ int8_t shiftRecordLevelConsistencyBlocks(embedDBState *state) { uint32_t eraseStartingPage = state->rlcPhysicalStartingPage; uint32_t eraseEndingPage = 0; - /* if we have wraped, we need to erase an additional block as the block we are shifting into is not empty */ + /* if we have wrapped, we need to erase an additional block as the block we are shifting into is not empty */ bool haveWrapped = (state->minDataPageId % state->numDataPages) == ((state->rlcPhysicalStartingPage + numRecordLevelConsistencyPages) % state->numDataPages); uint32_t numBlocksToErase = haveWrapped ? 2 : 3; @@ -1597,7 +1597,7 @@ int8_t shiftRecordLevelConsistencyBlocks(embedDBState *state) { return 0; } -void updateMaxiumError(embedDBState *state, void *buffer) { +void updateMaximumError(embedDBState *state, void *buffer) { // Calculate error within the page int32_t maxError = getMaxError(state, buffer); if (state->maxError < maxError) { @@ -2059,7 +2059,7 @@ void embedDBCloseIterator(embedDBIterator *it) { /** * @brief Flushes output buffer. * @param state algorithm state structure - * @returns 0 if successul and a non-zero value otherwise + * @returns 0 if successful and a non-zero value otherwise */ int8_t embedDBFlushVar(embedDBState *state) { /* Check if we actually have any variable data in the buffer */ @@ -2089,7 +2089,7 @@ int8_t embedDBFlushVar(embedDBState *state) { /** * @brief Flushes output buffer. * @param state algorithm state structure - * @returns 0 if successul and a non-zero value otherwise + * @returns 0 if successful and a non-zero value otherwise */ int8_t embedDBFlush(embedDBState *state) { // As the first buffer is the data write buffer, no address change is required @@ -2281,7 +2281,7 @@ int8_t embedDBNextVar(embedDBState *state, embedDBIterator *it, void *key, void * @param state embedDB algorithm state structure * @param key Key for the record * @param varData Return variable for variable data as a embedDBVarDataStream (Unallocated). Returns NULL if no variable data. **Be sure to free the stream after you are done with it** - * @return Returns 0 if sucessfull or no variable data for the record, 1 if the records variable data was overwritten, 2 if the page failed to read, and 3 if the memorey failed to allocate. + * @return Returns 0 if successful or no variable data for the record, 1 if the records variable data was overwritten, 2 if the page failed to read, and 3 if the memory failed to allocate. */ int8_t embedDBSetupVarDataStream(embedDBState *state, void *key, embedDBVarDataStream **varData, id_t recordNumber) { void *dataBuf = (int8_t *)state->buffer + state->pageSize * EMBEDDB_DATA_READ_BUFFER; @@ -2521,7 +2521,7 @@ int8_t writeTemporaryPage(embedDBState *state, void *buffer) { /** * @brief Calculates the number of spline points not in use by embedDB and deletes them * @param state embedDB algorithm state structure - * @param key The minimim key embedDB still needs points for + * @param key The minimum key embedDB still needs points for * @return Returns the number of points deleted */ uint32_t cleanSpline(embedDBState *state, uint32_t minPageNumber) { @@ -2601,7 +2601,7 @@ id_t writeVariablePage(embedDBState *state, void *buffer) { return -1; } - // Make sure the address being witten to wraps around + // Make sure the address being written to wraps around id_t physicalPageId = state->nextVarPageId % state->numVarPages; // Erase data if needed @@ -3473,6 +3473,7 @@ void closeOrderBy(embedDBOperator *op) { * @param dbState The database state * @param input The operator that this operator can pull records from * @param colNum The column that is being sorted on + * @param limit The first values to be read and sorted - not like a true limit at the moment * @param compareFn The function being used to make comparisons between row data */ embedDBOperator *createOrderByOperator(embedDBState *dbState, embedDBOperator *input, int8_t colNum, int32_t limit, int8_t (*compareFn)(void *a, void *b)) { @@ -3834,7 +3835,7 @@ void closeKeyJoin(embedDBOperator *op) { } /** - * @brief Creates an operator for perfoming an equijoin on the keys (sorted and distinct) of two tables + * @brief Creates an operator for performing an equi-join on the keys (sorted and distinct) of two tables */ embedDBOperator *createKeyJoinOperator(embedDBOperator *input1, embedDBOperator *input2) { embedDBOperator *op = malloc(sizeof(embedDBOperator)); @@ -4583,7 +4584,9 @@ void executeRules(embedDBState *state, void *key, void *data) { handleCustomQuery(state, state->rules[i], key, data); break; default: +#ifdef PRINT_ERRORS printf("ERROR: Unsupported rule type\n"); +#endif } } } @@ -4662,7 +4665,9 @@ embedDBOperator *createOperator(embedDBState *state, activeRule *rule, void ***a it->minKey = minKeyPtr; } } else { +#ifdef PRINT_ERRORS printf("ERROR: Unsupported key size\n"); +#endif return NULL; } @@ -4686,7 +4691,9 @@ embedDBOperator *createOperator(embedDBState *state, activeRule *rule, void ***a aggFunc = createMinAggregate(rule->colNum, rule->schema->columnSizes[rule->colNum]); break; default: +#ifdef PRINT_ERRORS printf("ERROR: Unsupported rule type\n"); +#endif } embedDBAggregateFunc *aggFuncs = (embedDBAggregateFunc *)malloc(1 * sizeof(embedDBAggregateFunc)); @@ -4730,7 +4737,9 @@ void executeComparison(activeRule *rule, void *aggregateValue, Comparator compar if (comparisonResult != 0) rule->callback(aggregateValue, data, rule->context); break; default: +#ifdef PRINT_ERRORS printf("ERROR: Unsupported operation\n"); +#endif } } @@ -4749,7 +4758,9 @@ void handleGetMinMax(embedDBState *state, activeRule *rule, void *key, void *dat int64_t minmax = GetMinMax64(state, rule, key); executeComparison(rule, &minmax, int64Comparator, data); } else { +#ifdef PRINT_ERRORS printf("ERROR: Unsupported column size\n"); +#endif } } @@ -4769,7 +4780,9 @@ void handleCustomQuery(embedDBState *state, activeRule *rule, void *key, void *d executeComparison(rule, result, doubleComparator, data); break; default: +#ifdef PRINT_ERRORS printf("ERROR: Unsupported return type\n"); +#endif } } @@ -4816,8 +4829,14 @@ void handleCustomQuery(embedDBState *state, activeRule *rule, void *key, void *d // #define DEBUG_OUTPUT 1 // #define DEBUG_READ 1 // #define DEBUG_HEAP 0 - +// #define ADAPTIVE_SORT_PRINT // #define ADAPTIVE_SORT_PRINT_FINISH +#if defined(DEBUG) || defined(DEBUG_OUTPUT) || defined(DEBUG_READ) || defined(DEBUG_HEAP) || defined(ADAPTIVE_SORT_PRINT) || defined(ADAPTIVE_SORT_PRINT_FINISH) +#else +#ifndef debug_log +#define debug_log(...) ((void)0) +#endif +#endif /** * Prints the contents of the heap. Used for debugging. @@ -4829,22 +4848,22 @@ void print_heap(char *buffer, int32_t heap_start_offset, int heap_size, int list int j; for (aa = 0; aa < 1; aa++) { addr = buffer + heap_start_offset; - printf("heap: "); + debug_log("heap: "); for (j = 0; j < heap_size; j++) - printf(" %d", *(int32_t *)(addr - j * es->record_size)); - printf("| "); + debug_log(" %d", *(int32_t *)(addr - j * es->record_size + es->key_offset)); + debug_log("| "); } - printf(" "); + debug_log(" "); // Prints the list for (aa = 0; aa < 1; aa++) { addr = buffer + es->page_size; - printf("list: "); + debug_log("list: "); for (j = 0; j < list_size; j++) - printf(" %d", *(int32_t *)(addr + j * es->record_size)); - printf("| "); + debug_log(" %d", *(int32_t *)(addr + j * es->record_size + es->key_offset)); + debug_log("| "); } - printf("\n"); + debug_log("\n"); } /** @@ -4903,11 +4922,11 @@ int adaptive_sort( /* Note: Could be int8_t as larger than 255 is above cutoff for using MinSort. */ uint8_t numDistinctInRun = 0; /* Number of distinct values in current run */ - int optimistic = true; + int optimistic = false; if (optimistic) { // Do FLASH MinSort init first #ifdef DEBUG - printf("*Optimistic*\n"); + debug_log("*Optimistic*\n"); #endif MinSortState ms; @@ -4923,18 +4942,18 @@ int adaptive_sort( int32_t nobSortCost = numPasses * (10 + writeToReadRatio) / 10; #ifdef DEBUG - printf("Adaptive calculation.\n"); - printf("NOB sort cost. # runs: %d", numSublist); - printf(" # passes: %d cost: %d\n", numPasses, nobSortCost); - printf("MinSort cost. Num sublists: %d ", numSublist); - printf(" Avg. distinct/sublist: %d\n", avgDistinct / 10); + debug_log("Adaptive calculation.\n"); + debug_log("NOB sort cost. # runs: %d", numSublist); + debug_log(" # passes: %d cost: %d\n", numPasses, nobSortCost); + debug_log("MinSort cost. Num sublists: %d ", numSublist); + debug_log(" Avg. distinct/sublist: %d\n", avgDistinct / 10); #endif if (avgDistinct < nobSortCost) // if (true) { #ifdef DEBUG - printf("Performing MinSort Optimistic\n"); + debug_log("Performing MinSort Optimistic\n"); #endif int16_t count = 0; @@ -4952,7 +4971,10 @@ int adaptive_sort( if (count == values_per_page) { *((int32_t *)outputBuffer) = blockIndex; /* Block index */ *((int16_t *)(outputBuffer + BLOCK_COUNT_OFFSET)) = count; /* Block record count */ - +#ifdef DEBUG + debug_log("Writing page adaptive sort: blockIndex=%d, count=%d, filePosition=%ld\n", + blockIndex, count, lastWritePos / PAGE_SIZE); +#endif // Write block to the ouput file if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->write(outputBuffer, blockIndex, es->page_size, outputFile)) { return 9; // Return error code if writing to the output file fails @@ -4963,10 +4985,11 @@ int adaptive_sort( metric->num_writes++; #ifdef DEBUG_OUTPUT - printf("Wrote output block. Block index: %d\n", blockIndex); + debug_log("Wrote output block. Block index: %d\n", blockIndex); for (int k = 0; k < values_per_page; k++) { - printf("%3d: 1 Output Record: %d\n", k, outputBuffer + es->headerSize + k * es->record_size + es->key_offset); + debug_log("%3d: 1 Output Record: %d\n", k, outputBuffer + es->headerSize + k * es->record_size + es->key_offset); } + #endif } } @@ -4975,7 +4998,10 @@ int adaptive_sort( if (count > 0) { *((int32_t *)outputBuffer) = blockIndex; /* Block index */ *((int16_t *)(outputBuffer + BLOCK_COUNT_OFFSET)) = count; /* Block record count */ - +#ifdef DEBUG + debug_log("Writing last page adaptive: blockIndex=%d, count=%d, filePosition=%ld\n", + blockIndex, count, lastWritePos / PAGE_SIZE); +#endif if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->write(outputBuffer, blockIndex, es->page_size, outputFile)) { return 9; // Return error code if writing to the output file fails } @@ -4985,10 +5011,11 @@ int adaptive_sort( metric->num_writes++; #ifdef DEBUG_OUTPUT - printf("Wrote output block. Block index: %d\n", blockIndex); + debug_log("Wrote output block. Block index: %d\n", blockIndex); for (int k = 0; k < values_per_page; k++) { - printf("%3d: 2 Output Record: %d\n", k, *(uint32_t *)(outputBuffer + es->headerSize + k * es->record_size + es->key_offset)); + debug_log("%3d: 2 Output Record: %d\n", k, *(uint32_t *)(outputBuffer + es->headerSize + k * es->record_size + es->key_offset)); } + #endif } @@ -5012,41 +5039,108 @@ int adaptive_sort( int32_t heapStartOffset = bufferSizeInBlocks * es->page_size - es->record_size; int32_t listSize = 0; - void *lastOutputKey = malloc(es->record_size); /* Pointer to memory storing value of last key output */ + void *lastOutputKey = malloc(es->record_size); int8_t haveOutputKey = 0; int32_t sublistSize = 0; /* size in blocks */ int32_t outputCount = 0; /* number of values in output block */ int32_t recordsLeft = 0; /* number of records in buffer */ void *heapVal, *inputVal; - // Fill all blocks other than the first with tuples - addr = buffer + es->page_size; - for (i = 0; i < (bufferSizeInBlocks - 1) * tuplesPerPage; i++) { + // Calculate safe initial heap size + // Need space for: heap (grows down) + list (grows up) + input page (block 0) + // Available buffer space = blocks 1 to (bufferSizeInBlocks-1) + // Reserve last block's worth of space for the list to grow safely + int32_t maxRecordsInBuffer = (bufferSizeInBlocks - 1) * tuplesPerPage; + +#ifdef DEBUG + debug_log("DEBUG: Buffer setup:\n"); + debug_log(" bufferSizeInBlocks=%d, tuplesPerPage=%d\n", bufferSizeInBlocks, tuplesPerPage); + debug_log(" maxRecordsInBuffer=%d", maxRecordsInBuffer); + debug_log(" heapStartOffset=%d, page_size=%d, record_size=%d\n", + heapStartOffset, es->page_size, es->record_size); + debug_log(" Buffer layout: [I/O Page 0][Data Pages 1-%d][Heap grows down from top]\n", bufferSizeInBlocks - 1); +#endif + + ((file_iterator_state_t *)iteratorState)->fileInterface->seek(0, outputFile); + lastWritePos = 0; + addr = buffer + es->page_size; // Start after I/O block + for (i = 0; i < maxRecordsInBuffer; i++) { status = !iterator(sortData, addr); if (status == 0) - break; + break; // No more records available recordsRead++; addr += es->record_size; } +#ifdef DEBUG + debug_log("DEBUG: Initial load completed:\n"); + debug_log(" recordsRead=%d (requested %d)\n", recordsRead, maxRecordsInBuffer); + debug_log(" Data spans from offset %d to %d\n", + (int)(es->page_size), (int)(es->page_size + recordsRead * es->record_size)); +#endif + recordsLeft = recordsRead; // Update metrics - metric->num_reads += bufferSizeInBlocks - 1; + // Note: num_reads tracks page reads, but we read data during initial fill via iterator + // The iterator handles its own page reads, so don't double-count here metric->num_runs++; - // Build heap from tuples in filled blocks + // Build heap from tuples in reverse order + // Start from the end of loaded data and work backwards + addr = buffer + es->page_size + recordsRead * es->record_size; for (i = 0; i < recordsRead; i++) { addr -= es->record_size; + memcpy(tupleBuffer, addr, es->record_size); metric->num_memcpys++; shiftUp_rev(buffer + heapStartOffset, tupleBuffer, heapSize, es, metric); heapSize++; } +#ifdef DEBUG + debug_log("DEBUG: Heap construction completed:\n"); + debug_log(" heapSize=%d, listSize=%d\n", heapSize, listSize); + + // Memory layout verification + int32_t heapBottom = heapStartOffset - (heapSize - 1) * es->record_size; + int32_t listTop = es->page_size + listSize * es->record_size; + int32_t gapSize = heapBottom - listTop; + + debug_log("DEBUG: Memory layout:\n"); + debug_log(" I/O block: offset 0 - %d\n", es->page_size); + debug_log(" List top: offset %d\n", listTop); + debug_log(" Gap: %d bytes (%d records)\n", gapSize, gapSize / es->record_size); + debug_log(" Heap bottom: offset %d\n", heapBottom); + debug_log(" Heap top: offset %d\n", heapStartOffset); + debug_log(" Buffer end: offset %d\n", bufferSizeInBlocks * es->page_size); + + if (heapBottom <= listTop) { + debug_log("ERROR: Heap and list overlap! This will cause corruption.\n"); + free(lastOutputKey); + return 9; + } +#endif + +#ifdef DEBUG_HEAP + print_heap(buffer, heapStartOffset, heapSize, listSize, es); +#endif + +#ifdef DEBUG + debug_log("DEBUG: About to enter main loop\n"); + debug_log(" heapSize=%d, listSize=%d, recordsLeft=%d\n", heapSize, listSize, recordsLeft); + debug_log(" Iterator position: recordsRead=%d, totalRecords=%d\n", + ((file_iterator_state_t *)iteratorState)->recordsRead, + ((file_iterator_state_t *)iteratorState)->totalRecords); +#endif + // Read each block and sort while (recordsLeft != 0) { recordsRead = 0; +#ifdef DEBUG + debug_log("\n=== Main loop iteration: sublistSize=%d, outputCount=%d, heapSize=%d, listSize=%d, recordsLeft=%d ===\n", + sublistSize, outputCount, heapSize, listSize, recordsLeft); +#endif // Read in page addr = buffer + es->headerSize; @@ -5063,6 +5157,16 @@ int adaptive_sort( print_heap(buffer, heapStartOffset, heapSize, listSize, es); #endif +#ifdef DEBUG + debug_log("DEBUG: Main loop iteration - read %d records\n", recordsRead); + if (recordsRead > 0) { + debug_log(" First record value: %d, Last record value: %d\n", + *(int32_t *)(buffer + es->headerSize + es->key_offset), + *(int32_t *)(buffer + es->headerSize + (recordsRead - 1) * es->record_size + es->key_offset)); + } + debug_log(" heapSize before processing: %d, listSize: %d\n", heapSize, listSize); +#endif + if (recordsRead > 1) { // Sort page using in memory quick sort metric->num_reads += 1; @@ -5078,14 +5182,15 @@ int adaptive_sort( // If first value in heap is smaller than lastOutputValue then start new sublist, otherwise continue with previous one. heapVal = buffer + heapStartOffset; - if (lastOutputKey == NULL || es->compare_fcn(heapVal, lastOutputKey) < 0) { + if (lastOutputKey == NULL || es->compare_fcn(heapVal + es->key_offset, lastOutputKey + es->key_offset) < 0) { // Start new sublist numSublist++; // Track number of distinct values per sublist avgDistinct = avgDistinct + (numDistinctInRun - avgDistinct / 10) * 10 / numSublist; #ifdef DEBUG - printf("Number of distinct values in sublist: %d Running average: %d\n", numDistinctInRun, avgDistinct / 10); + debug_log("Number of distinct values in sublist: %d Running average: %d\n", numDistinctInRun, avgDistinct / 10); + #endif numDistinctInRun = 1; @@ -5099,11 +5204,53 @@ int adaptive_sort( // Swap output records into output buffer from heap if smaller than records currently there. (I/O block is id zero) for (i = 0; i < tuplesPerPage; i++) { + /* + * HEAP-EMPTY START NEW RUN TRANSITION + */ + if (heapSize == 0) { + if (listSize > 0) { + // Finish current run and start a new one + numSublist++; + metric->num_runs++; + + sublistSize = 0; + outputCount = 0; + haveOutputKey = 0; + +#ifdef DEBUG + debug_log("DEBUG: Heap empty → starting new run, promoting list (%d records)\n", + listSize); +#endif + + // Promote frozen list to heap + for (int32_t k = listSize - 1; k >= 0; k--) { + shiftUp_rev(buffer + heapStartOffset, + buffer + es->page_size + k * es->record_size, + heapSize, es, metric); + heapSize++; + } + listSize = 0; + + // Restart filling the output page for the new run + i = -1; + continue; + } else { + // No heap, no list → nothing left to output + break; + } + } +#ifdef DEBUG + if (i < 3 || i == tuplesPerPage - 1) { // Only log first 3 and last iteration + debug_log(" Inner loop i=%d: recordsRead=%d, outputCount=%d, recordsLeft=%d, heapSize=%d\n", + i, recordsRead, outputCount, recordsLeft, heapSize); + } +#endif // Check if we've read all records from the current page - if (recordsRead == 0) { + if (recordsRead == 0 || i >= recordsRead) { // Check if there are any records left - if (recordsLeft <= 0) + if (recordsLeft <= 0) { break; + } // Just copy over from heap memcpy(buffer + es->headerSize + i * es->record_size, buffer + heapStartOffset, es->record_size); /* Heap into input/output block */ @@ -5130,7 +5277,8 @@ int adaptive_sort( // Track number of distinct values per sublist avgDistinct = avgDistinct + (numDistinctInRun - avgDistinct / 10) * 10 / numSublist; #ifdef DEBUG - printf("Number of distinct values in sublist: %d Running average: %d\n", numDistinctInRun, avgDistinct / 10); + debug_log("Number of distinct values in sublist: %d Running average: %d\n", numDistinctInRun, avgDistinct / 10); + #endif numDistinctInRun = 1; @@ -5217,15 +5365,32 @@ int adaptive_sort( } // Add Page Headers +#ifdef DEBUG + debug_log("About to write block: sublistSize=%d, outputCount=%d, numSublist=%d\n", + sublistSize, outputCount, numSublist); + debug_log(" First 3 output values:"); + for (int dbg = 0; dbg < 3 && dbg < outputCount; dbg++) { + debug_log(" %d", *(int32_t *)(buffer + es->headerSize + dbg * es->record_size + es->key_offset)); + } + debug_log("\n"); +#endif + if (outputCount == 0) { + continue; // Skip to next iteration + } *((int32_t *)buffer) = sublistSize; - *((int16_t *)(buffer + BLOCK_COUNT_OFFSET)) = (int8_t)outputCount; + *((int16_t *)(buffer + BLOCK_COUNT_OFFSET)) = (int16_t)outputCount; memcpy(tupleBuffer, buffer + (outputCount - 1) * es->record_size + es->headerSize, es->key_size); memcpy(lastOutputKey, tupleBuffer, es->record_size); metric->num_memcpys += 2; // Store the last key output temporarily in tuple buffer as once write out then read new block it would be gone // Write the output block +#ifdef DEBUG + debug_log("Writing page adaptive writeRel: blockIndex=%d, count=%d, filePosition=%ld\n", + sublistSize, outputCount, lastWritePos / PAGE_SIZE); +#endif ((file_iterator_state_t *)iteratorState)->fileInterface->writeRel(buffer, PAGE_SIZE, 1, outputFile); + if (((file_iterator_state_t *)iteratorState)->fileInterface->error(outputFile)) { // File write error free(lastOutputKey); @@ -5233,33 +5398,66 @@ int adaptive_sort( } #ifdef DEBUG_OUTPUT - printf("Wrote block. Sublist: %d ", numSublist); - printf(" Idx: %d\n", sublistSize); - // printf("Offset: %lu\n", ftell(outputFile)-es->page_size); + debug_log("Wrote block. Sublist: %d ", numSublist); + debug_log(" Idx: %d\n", sublistSize); + // debug_log("Offset: %lu\n", ftell(outputFile) - es->page_size); for (int k = 0; k < tuplesPerPage; k++) { - printf("%3d: 3 Output Record: %d\n", k, *(uint32_t *)(buffer + es->headerSize + k * es->record_size + es->key_offset)); + debug_log("%3d: 3 Output Record: %d\n", k, *(uint32_t *)(buffer + es->headerSize + k * es->record_size + es->key_offset)); } + #endif metric->num_writes += 1; + lastWritePos += es->page_size; sublistSize++; outputCount = 0; +#ifdef DEBUG + if (recordsLeft == 0) { + debug_log("DEBUG: Exiting main loop - heapSize=%d, listSize=%d, outputCount=%d, sublistSize=%d\n", + heapSize, listSize, outputCount, sublistSize); + } +#endif } /* while records left */ - // free(lastOutputKey); numSublist = metric->num_runs; #ifdef ADAPTIVE_SORT_PRINT - printf("Gen time: %d\n", metric->genTime); + debug_log("Gen time: %d\n", metric->genTime); + #endif // Track number of distinct values per sublist avgDistinct = avgDistinct + (numDistinctInRun - avgDistinct / 10) * 10 / numSublist; #ifdef ADAPTIVE_SORT_PRINT - printf("Final number of distinct values in sublist: %d Average: %d\n", numDistinctInRun, avgDistinct); + debug_log("Final number of distinct values in sublist: %d Average: %d\n", numDistinctInRun, avgDistinct); + #endif numDistinctInRun = 0; } /* end pessmistic */ +#ifdef DEBUG + debug_log("\n=== REPLACEMENT SELECTION COMPLETE ===\n"); + debug_log("Number of sublists created: %d\n", numSublist); + debug_log("Output file size: %ld bytes (%ld blocks)\n", lastWritePos, lastWritePos / es->page_size); + debug_log("About to start merge phase...\n\n"); + + // Read and display what's in each block + for (int debugBlock = 0; debugBlock < lastWritePos / es->page_size; debugBlock++) { + ((file_iterator_state_t *)iteratorState)->fileInterface->seek(debugBlock * es->page_size, outputFile); + ((file_iterator_state_t *)iteratorState)->fileInterface->readRel(buffer, es->page_size, 1, outputFile); + + uint32_t blockIdx = *((uint32_t *)buffer); + uint16_t count = *((uint16_t *)(buffer + BLOCK_COUNT_OFFSET)); + + debug_log("Block %d: blockIdx=%u, count=%u, first 10 values:", debugBlock, blockIdx, count); + for (int v = 0; v < count && v < 10; v++) { + debug_log(" %d", *(int32_t *)(buffer + es->headerSize + v * es->record_size + es->key_offset)); + } + if (count > 10) debug_log(" ..."); + debug_log("\n"); + } + debug_log("=================================\n\n"); +#endif + // No merge phase necessary if (numSublist == 1) { ((file_iterator_state_t *)iteratorState)->fileInterface->flush(outputFile); @@ -5285,11 +5483,12 @@ int adaptive_sort( int32_t nobSortCost = numPasses * (10 + writeToReadRatio) / 10; #ifdef ADAPTIVE_SORT_PRINT - printf("Adaptive calculation.\n"); - printf("NOB sort cost. # runs: %d", numSublist); - printf(" # passes: %d cost: %d\n", numPasses, nobSortCost); - printf("MinSort cost. Num sublists: %d ", numSublist); - printf(" Avg. distinct/sublist: %d\n", avgDistinct / 10); + debug_log("Adaptive calculation.\n"); + debug_log("NOB sort cost. # runs: %d", numSublist); + debug_log(" # passes: %d cost: %d\n", numPasses, nobSortCost); + debug_log("MinSort cost. Num sublists: %d ", numSublist); + debug_log(" Avg. distinct/sublist: %d\n", avgDistinct / 10); + #endif // Make decision to use either no output buffer sort or MinSort @@ -5302,20 +5501,20 @@ int adaptive_sort( if (sublistVersionPossible) { // Use better performing version of minsort #ifdef ADAPTIVE_SORT_PRINT - printf("Performing MinSort with sorted sublists\n"); + debug_log("Performing MinSort with sorted sublists\n"); #endif ((file_iterator_state_t *)iteratorState)->file = outputFile; - *resultFilePtr = 0; - flash_minsort_sublist(iteratorState, tupleBuffer, outputFile, buffer, bufferSizeBytes, es, resultFilePtr, metric, compareFn, numSublist); *resultFilePtr = lastWritePos; + flash_minsort_sublist(iteratorState, tupleBuffer, outputFile, buffer, bufferSizeBytes, es, resultFilePtr, metric, compareFn, numSublist); + //*resultFilePtr = lastWritePos; } else { // Use normal version of minsort. Do not have enough space to index a value per sublist. Assumes data is not sorted in each region #ifdef ADAPTIVE_SORT_PRINT - printf("Performing MinSort\n"); + debug_log("Performing MinSort\n"); #endif ((file_iterator_state_t *)iteratorState)->file = outputFile; + *resultFilePtr = lastWritePos; flash_minsort(iteratorState, tupleBuffer, outputFile, buffer, bufferSizeBytes, es, resultFilePtr, metric, compareFn); - *resultFilePtr = 0; } } else { /* */ @@ -5349,7 +5548,6 @@ int adaptive_sort( int16_t space = 0; int16_t outputCursor; int8_t destBlk; - int32_t other = 0; // Verify all memory has been allocated successfully if (record2 == NULL) { @@ -5365,7 +5563,7 @@ int adaptive_sort( // if (numSublist >= 32 && numSublist <= 64)// && avgDistinct/10 < 32) // { // // Switch to MinSort to finish off - // printf("Finishing sort with MinSort with sorted sublists\n"); + // debug_log("Finishing sort with MinSort with sorted sublists\n"); // ((file_iterator_state_t*) iteratorState)->file = outputFile; // // *resultFilePtr = lastMergeStart; // // fflush(outputFile); @@ -5383,7 +5581,8 @@ int adaptive_sort( lastWritePos = 0; } #ifdef ADAPTIVE_SORT_PRINT - printf("Pass number: %u Comparisons: %lu MemCopies: %lu TransferIn: %lu TransferOut: %lu TransferOther: %lu Other: %lu\n", passNumber, metric->num_compar, metric->num_memcpys, numShiftIntoOutput, numShiftOutOutput, numShiftOtherBlock, other); + debug_log("Pass number: %u Comparisons: %lu MemCopies: %lu TransferIn: %lu TransferOut: %lu TransferOther: %lu\n", passNumber, metric->num_compar, metric->num_memcpys, numShiftIntoOutput, numShiftOutOutput, numShiftOtherBlock); + #endif passNumber++; @@ -5460,7 +5659,8 @@ int adaptive_sort( #ifdef DEBUG void *buffer0Rec = (void *)buffer + es->headerSize; void *currentRec = (void *)buffer + i * es->page_size + es->headerSize; - printf("Swapping in buffer 0. Current key: %d New key: %d\n", *(uint32_t *)(buffer0Rec + es->key_offset), *(uint32_t *)(currentRec + es->key_offset)); + debug_log("Swapping in buffer 0. Current key: %d New key: %d\n", *(uint32_t *)(buffer0Rec + es->key_offset), *(uint32_t *)(currentRec + es->key_offset)); + #endif // Perform swap sublsBlkPos[i] = sublsFilePtr[0]; /* Note: Using subls_blk_pos[i] as a temp variable during swap */ // TODO: Update swap to not be variable length @@ -5501,8 +5701,9 @@ int adaptive_sort( #ifdef DEBUG_READ void *firstRec = (void *)buffer + i * es->page_size + es->headerSize; void *lastRec = (void *)buffer + i * es->page_size + es->headerSize + (*((int16_t *)(buffer + i * es->page_size + BLOCK_COUNT_OFFSET)) - 1) * es->record_size; - printf("Read Sublist: %d Block: %d NumRec: %d First key: %d Last key: %d\n", i, (int32_t) * (buffer + i * es->page_size), - *((int16_t *)(buffer + i * es->page_size + BLOCK_COUNT_OFFSET)), *(uint32_t *)(firstRec + es->key_offset), *(uint32_t *)(lastRec + es->key_offset)); + debug_log("Read Sublist: %d Block: %d NumRec: %d First key: %d Last key: %d\n", i, (int32_t) * (buffer + i * es->page_size), + *((int16_t *)(buffer + i * es->page_size + BLOCK_COUNT_OFFSET)), *(uint32_t *)(firstRec + es->key_offset), *(uint32_t *)(lastRec + es->key_offset)); + #endif // Initialize record1 to start of each block and record2 to empty record1[i] = i * es->page_size + es->headerSize; @@ -5573,21 +5774,22 @@ int adaptive_sort( #ifdef DEBUG void *buf = (void *)buffer + resultRecOffset; - printf("Smallest Record: %d From list: %d\n", *(uint32_t *)(buf + es->key_offset), resultBlock); - printf("List status: 0: (%d, %d) 1: (%d, %d) 2: (%d, %d) ResultList: %d\n", record1[0], record2[0], - record1[1], record2[1], record1[2], record2[2], resultBlock); + debug_log("Smallest Record: %d From list: %d\n", *(uint32_t *)(buf + es->key_offset), resultBlock); + debug_log("List status: 0: (%d, %d) 1: (%d, %d) 2: (%d, %d) ResultList: %d\n", record1[0], record2[0], + record1[1], record2[1], record1[2], record2[2], resultBlock); if (*(uint32_t *)(buf + es->key_offset) == 27391) { /* Output all block contents */ for (int l = 0; l < 2; l++) { - printf("Current block: %d # records: %d\n", l, tuplesPerPage); + debug_log("Current block: %d # records: %d\n", l, tuplesPerPage); for (int k = 0; k < tuplesPerPage; k++) { void *buf = (void *)(buffer + es->headerSize + k * es->record_size + l * es->page_size); - printf("%d: Record: %d Address: %p\n", k, buf + es->key_size, buf); + debug_log("%d: Record: %d Address: %p\n", k, buf + es->key_size, buf); } } - printf("HERE\n"); + debug_log("HERE\n"); } + #endif /* Add smallest tuple to output position in buffer (may already be in output buffer) */ @@ -5608,7 +5810,8 @@ int adaptive_sort( numShiftOutOutput++; #ifdef DEBUG void *buf = (void *)(buffer + record1[OUTPUT_BLOCK_ID]); - printf("Output record moved to list %d Key: %d\n", resultBlock, *(uint32_t *)(buf + es->key_size)); + debug_log("Output record moved to list %d Key: %d\n", resultBlock, *(uint32_t *)(buf + es->key_size)); + #endif /* Move result record into output block (record1[output_block]==record2[output_block]) */ metric->num_memcpys++; @@ -5688,6 +5891,10 @@ int adaptive_sort( *((int16_t *)(buffer + BLOCK_COUNT_OFFSET)) = (int16_t)tuplesPerPage; ((file_iterator_state_t *)iteratorState)->fileInterface->seek(lastWritePos, outputFile); +#ifdef DEBUG + debug_log("Writing page adaptive writeRel 2: blockIndex=%d, count=%d, filePosition=%ld\n", + currentBlockId, tuplesPerPage, lastWritePos / PAGE_SIZE); +#endif ((file_iterator_state_t *)iteratorState)->fileInterface->writeRel(buffer + OUTPUT_BLOCK_ID * es->page_size, PAGE_SIZE, 1, outputFile); if (((file_iterator_state_t *)iteratorState)->fileInterface->error(outputFile)) { // File read error @@ -5702,11 +5909,12 @@ int adaptive_sort( record2[OUTPUT_BLOCK_ID] = -1; metric->num_writes++; #ifdef DEBUG_OUTPUT - printf("Wrote output block: %d # records: %d\n", *((int32_t *)buffer), tuplesPerPage); + debug_log("Wrote output block: %d # records: %d\n", *((int32_t *)buffer), tuplesPerPage); for (int k = 0; k < tuplesPerPage; k++) { void *buf = (void *)(buffer + es->headerSize + k * es->record_size); - printf("%3d: 4 Output Record: %d Address: %p\n", k, *(uint32_t *)(buf + es->key_offset), buf); + debug_log("%3d: 4 Output Record: %d Address: %p\n", k, *(uint32_t *)(buf + es->key_offset), buf); } + #endif } @@ -5752,17 +5960,18 @@ int adaptive_sort( if (destBlk > bufferSizeInBlocks) { #ifdef ADAPTIVE_SORT_PRINT - printf("Incorrect destination block. List 1: (%d, %d) List 2: (%d, %d) List 3: (%d, %d) ResultList: %d\n", record1[0], record2[0], - record1[1], record2[1], record1[2], record2[2], resultBlock); + debug_log("Incorrect destination block. List 1: (%d, %d) List 2: (%d, %d) List 3: (%d, %d) ResultList: %d\n", record1[0], record2[0], + record1[1], record2[1], record1[2], record2[2], resultBlock); /* Output all block contents */ for (int l = 0; l < 3; l++) { - printf("Current block: %d # records: %d\n", l, tuplesPerPage); + debug_log("Current block: %d # records: %d\n", l, tuplesPerPage); for (int k = 0; k < tuplesPerPage; k++) { void *buf = (void *)(buffer + es->headerSize + k * es->record_size + l * es->page_size); - printf("%d: Record: %d Address: %p\n", k, buf + es->key_offset, buf); + debug_log("%d: Record: %d Address: %p\n", k, buf + es->key_offset, buf); } } + #endif } } @@ -5780,7 +5989,8 @@ int adaptive_sort( for (i = 0; i < numTransferThisPass; i++) { #ifdef DEBUG void *buf = (void *)(buffer + originPtr); - printf("Empty output block case. Moved output record back from list %d Key: %d\n", resultBlock, *(uint32_t *)(buf + es->key_offset)); + debug_log("Empty output block case. Moved output record back from list %d Key: %d\n", resultBlock, *(uint32_t *)(buf + es->key_offset)); + #endif numShiftIntoOutput++; /* Get top value from heap */ @@ -5801,7 +6011,8 @@ int adaptive_sort( record1[destBlk] = record1[destBlk] - es->record_size; #ifdef DEBUG void *buf = (void *)(buffer + originPtr); - printf("Moved output record back from list %d Key: %d\n", resultBlock, buf + es->key_offset); + debug_log("Moved output record back from list %d Key: %d\n", resultBlock, buf + es->key_offset); + #endif numShiftIntoOutput++; @@ -5811,7 +6022,8 @@ int adaptive_sort( metric->num_compar++; #ifdef DEBUG void *buf = (void *)(buffer + insert_ptr + es->record_size); - printf("Compare with list %d Key: %d\n", resultBlock, buf + es->key_offset); + debug_log("Compare with list %d Key: %d\n", resultBlock, buf + es->key_offset); + #endif if (0 < es->compare_fcn(buffer + originPtr + es->key_offset, buffer + insert_ptr + es->record_size + es->key_offset)) { /* shift next_val down */ @@ -5838,7 +6050,8 @@ int adaptive_sort( #ifdef DEBUG void *buf = (void *)(buffer + originPtr); - printf("Moved output record to list %d Key: %d\n", destBlk, buf + es->key_offset); + debug_log("Moved output record to list %d Key: %d\n", destBlk, buf + es->key_offset); + #endif numShiftOtherBlock++; @@ -5875,11 +6088,12 @@ int adaptive_sort( record2[resultBlock] = -1; record1[resultBlock] = resultBlock * es->page_size + es->headerSize; #ifdef DEBUG_READ - printf("Read block sublist: %d\n", resultBlock); + debug_log("Read block sublist: %d\n", resultBlock); void *firstRec = (void *)buffer + resultBlock * es->page_size + es->headerSize; void *lastRec = (void *)buffer + resultBlock * es->page_size + es->headerSize + (*((int16_t *)(buffer + resultBlock * es->page_size + BLOCK_COUNT_OFFSET)) - 1) * es->record_size; - printf("Read Sublist: %d Block: %d NumRec: %d First key: %d Last key: %d\n", resultBlock, (int32_t) * (buffer + resultBlock * es->page_size), - *((int16_t *)(buffer + resultBlock * es->page_size + BLOCK_COUNT_OFFSET)), firstRec + es->key_offset, lastRec + es->key_offset); + debug_log("Read Sublist: %d Block: %d NumRec: %d First key: %d Last key: %d\n", resultBlock, (int32_t) * (buffer + resultBlock * es->page_size), + *((int16_t *)(buffer + resultBlock * es->page_size + BLOCK_COUNT_OFFSET)), firstRec + es->key_offset, lastRec + es->key_offset); + #endif } } /* end if is the non output block empty */ @@ -5946,7 +6160,8 @@ int adaptive_sort( /* move the record */ #ifdef DEBUG void *buf = (void *)(buffer + outputCursor); - printf("Output list empty so moved record in output to list %d Key: %d\n", destBlk, *(uint32_t *)(buf + es->key_offset)); + debug_log("Output list empty so moved record in output to list %d Key: %d\n", destBlk, *(uint32_t *)(buf + es->key_offset)); + #endif numShiftOutOutput++; metric->num_memcpys++; @@ -5977,11 +6192,12 @@ int adaptive_sort( int16_t numRecords = *((int16_t *)(buffer + BLOCK_COUNT_OFFSET)); #ifdef DEBUG_READ - printf("Read block sublist: 0\n"); + debug_log("Read block sublist: 0\n"); void *firstRec = (void *)buffer + es->headerSize; void *lastRec = (void *)buffer + es->headerSize + (*((int16_t *)(buffer + BLOCK_COUNT_OFFSET)) - 1) * es->record_size; - printf("Read Sublist: %d Block: %d NumRec: %d First key: %d Last key: %d\n", 0, (int32_t) * (buffer + 0 * es->page_size), - *((int16_t *)(buffer + BLOCK_COUNT_OFFSET)), firstRec + es->key_offset, lastRec + es->key_offset); + debug_log("Read Sublist: %d Block: %d NumRec: %d First key: %d Last key: %d\n", 0, (int32_t) * (buffer + 0 * es->page_size), + *((int16_t *)(buffer + BLOCK_COUNT_OFFSET)), firstRec + es->key_offset, lastRec + es->key_offset); + #endif metric->num_reads += 1; @@ -6037,7 +6253,7 @@ int adaptive_sort( /* end of run */ } - if (record2[0] > 0) { /* Tuples in output block to write out */ + if (record2[0] != -1) { /* Tuples in output block to write out */ // fseek(outputFile, lastWritePos, SEEK_SET); // if (0 == fwrite(buffer + OUTPUT_BLOCK_ID * es->page_size, (size_t)es->page_size, 1, outputFile)) // { /* File write error - arduino prints 1st value nmemb times if nmemb != 1 */ @@ -6051,6 +6267,10 @@ int adaptive_sort( currentBlockId++; ((file_iterator_state_t *)iteratorState)->fileInterface->seek(lastWritePos, outputFile); +#ifdef DEBUG + debug_log("Writing page adaptive write rel 3: blockIndex=%d, count=%d, filePosition=%ld\n", + currentBlockId, (int16_t)(record2[0] - es->headerSize) / es->record_size + 1, lastWritePos / PAGE_SIZE); +#endif ((file_iterator_state_t *)iteratorState)->fileInterface->writeRel(buffer + OUTPUT_BLOCK_ID * es->page_size, PAGE_SIZE, 1, outputFile); if (((file_iterator_state_t *)iteratorState)->fileInterface->error(outputFile)) { // File write error @@ -6066,11 +6286,12 @@ int adaptive_sort( metric->num_writes += 1; #ifdef DEBUG_OUTPUT - printf("Wrote output block here.\n"); + debug_log("Wrote output block here.\n"); for (int k = 0; k < tuplesPerPage; k++) { void *buf = (void *)(buffer + es->headerSize + k * es->record_size); - printf("%3d: 5 Output Record: %d Address: %p\n", k, *(uint32_t *)(buf + es->key_offset), buf); // TODO: Update to no use test_record_t + debug_log("%3d: 5 Output Record: %d Address: %p\n", k, *(uint32_t *)(buf + es->key_offset), buf); // TODO: Update to no use test_record_t } + #endif } @@ -6082,7 +6303,8 @@ int adaptive_sort( } /* end of merge */ *resultFilePtr = lastMergeStart; #ifdef ADAPTIVE_SORT_PRINT_FINISH - printf("Complete. Comparisons: %u Writes: %u Reads: %u Memcpys:\n", metric->num_compar, metric->num_writes, metric->num_reads, metric->num_memcpys); + debug_log("Complete. Comparisons: %u Writes: %u Reads: %u Memcpys:\n", metric->num_compar, metric->num_writes, metric->num_reads, metric->num_memcpys); + #endif /* cleanup */ @@ -6140,6 +6362,12 @@ This is no output sort with block headers and iterator input. Heap used when mov // #define DEBUG 1 // #define DEBUG_OUTPUT 1 // #define DEBUG_READ 1 +#if defined(DEBUG) || defined(DEBUG_OUTPUT) || defined(DEBUG_READ) +#else +#ifndef debug_log +#define debug_log(...) ((void)0) +#endif +#endif #ifndef INT_MAX #define INT_MAX 0xFFFFFFFF @@ -6152,13 +6380,18 @@ This is no output sort with block headers and iterator input. Heap used when mov * @param es Sorting configuration, including page and record sizes. * @param metric Metrics tracking structure for performance analysis. */ -void readPageMinSort(MinSortState *ms, int pageNum, external_sort_t *es, metrics_t *metric) { +int8_t readPageMinSort(MinSortState *ms, int pageNum, external_sort_t *es, metrics_t *metric) { file_iterator_state_t *is = (file_iterator_state_t *)ms->iteratorState; void *fp = is->file; - +#ifdef DEBUG + debug_log("DEBUG: READ_PAGE %d (Offset %d)\n", pageNum, pageNum * es->page_size); +#endif // Read page into the buffer if (0 == is->fileInterface->read(ms->buffer, pageNum, es->page_size, fp)) { - printf("MINSORT: Failed to read block.\n"); +#ifdef DEBUG + debug_log("MINSORT: Failed to read block.\n"); +#endif + return 0; } metric->num_reads++; @@ -6166,12 +6399,13 @@ void readPageMinSort(MinSortState *ms, int pageNum, external_sort_t *es, metrics ms->lastBlockIdx = pageNum; #ifdef DEBUG_READ - printf("Reading block: %d\n", pageNum); + debug_log("Reading block: %d\n", pageNum); for (int k = 0; k < 31; k++) { test_record_t *buf = (void *)(ms->buffer + es->headerSize + k * es->record_size); - printf("%d: Record: %d\n", k, buf->key); + debug_log("%d: Record: %d\n", k, buf->key); } #endif + return 1; } /** @@ -6224,7 +6458,7 @@ void init_MinSort(MinSortState *ms, external_sort_t *es, metrics_t *metric, int8 ms->records_per_block = (es->page_size - es->headerSize) / es->record_size; j = (ms->memoryAvailable - 2 * es->page_size - 2 * es->key_size - INT_SIZE) / (es->key_size + sizeof(uint8_t)); #ifdef FLASH_MINSORT_PRINT - printf("Memory overhead: %d Max regions: %d\r\n", 2 * es->key_size + INT_SIZE, j); + debug_log("Memory overhead: %d Max regions: %d\r\n", 2 * es->key_size + INT_SIZE, j); #endif ms->blocks_per_region = (uint32_t)ceil((double)ms->numBlocks / j); ms->numRegions = (uint32_t)ceil((double)ms->numBlocks / ms->blocks_per_region); @@ -6235,9 +6469,9 @@ void init_MinSort(MinSortState *ms, external_sort_t *es, metrics_t *metric, int8 ms->min_initialized = (int8_t *)(ms->min + es->key_size * ms->numRegions); #ifdef DEBUG - printf("Memory overhead: %d Max regions: %d\r\n", 2 * SORT_KEY_SIZE + INT_SIZE, j); - printf("Page size: %d, Memory size: %d Record size: %d, Number of records: %lu, Number of blocks: %d, Blocks per region: %d Regions: %d\r\n", - es->page_size, ms->memoryAvailable, ms->record_size, ms->num_records, ms->numBlocks, ms->blocks_per_region, ms->numRegions); + // printf("Memory overhead: %d Max regions: %d\r\n", 2 * SORT_KEY_SIZE + INT_SIZE, j); + debug_log("Page size: %d, Memory size: %d Record size: %d, Number of records: %lu, Number of blocks: %d, Blocks per region: %d Regions: %d\r\n", + es->page_size, ms->memoryAvailable, ms->record_size, ms->num_records, ms->numBlocks, ms->blocks_per_region, ms->numRegions); #endif /* Initialize each region’s minimum value */ @@ -6245,28 +6479,21 @@ void init_MinSort(MinSortState *ms, external_sort_t *es, metrics_t *metric, int8 ms->min_initialized[i] = false; } - /* Populate each region’s minimum key by scanning blocks */ + /* Populate each region's minimum key by scanning blocks */ for (i = 0; i < ms->numBlocks; i++) { - readPageMinSort(ms, i, es, metric); // Load block i into buffer + readPageMinSort(ms, i, es, metric); regionIdx = i / ms->blocks_per_region; - // Set inital value to first read. - // ms->min[regionIdx] = getValuePtr(ms, 0, es); - memcpy(getMinRegionPtr(ms, regionIdx, es), getValuePtr(ms, 0, es), es->key_size); - metric->num_memcpys++; - ms->min_initialized[regionIdx] = true; - - /* Process remaining records in the block */ - for (j = 1; j < ms->records_per_block; j++) { + for (j = 0; j < ms->records_per_block; j++) { if (((i * ms->records_per_block) + j) < ms->num_records) { val = getValuePtr(ms, j, es); metric->num_compar++; - /* Update region’s minimum if current record is smaller */ - if (compareFn(val, getMinRegionPtr(ms, regionIdx, es)) == -1) { + // Only update if this is the first record for the region OR we found a new minimum + if (!ms->min_initialized[regionIdx] || compareFn(val, getMinRegionPtr(ms, regionIdx, es)) == -1) { memcpy(getMinRegionPtr(ms, regionIdx, es), val, es->key_size); - metric->num_memcpys++; ms->min_initialized[regionIdx] = true; + metric->num_memcpys++; } } else break; @@ -6275,7 +6502,7 @@ void init_MinSort(MinSortState *ms, external_sort_t *es, metrics_t *metric, int8 #ifdef DEBUG for (i = 0; i < ms->numRegions; i++) - printf("Region: %d Min: %d\r\n", i, ms->min[i]); + debug_log("Region: %d Min: %d\r\n", i, *(int *)getMinRegionPtr(ms, i, es)); #endif /* Allocate memory for current and next keys */ @@ -6339,7 +6566,7 @@ char *next_MinSort(MinSortState *ms, external_sort_t *es, void *tupleBuffer, met for (k = startIndex / ms->records_per_block; k < ms->blocks_per_region; k++) { curBlk = startBlk + k; - if (curBlk > ms->numBlocks) { + if (curBlk >= ms->numBlocks) { break; } @@ -6363,7 +6590,7 @@ char *next_MinSort(MinSortState *ms, external_sort_t *es, void *tupleBuffer, met #ifdef DEBUG test_record_t *buf = (test_record_t *)(ms->buffer + es->headerSize + i * es->record_size); buf = (test_record_t *)tupleBuffer; - printf("Returning tuple: %d\n", buf->key); + debug_log("Returning tuple: %d\n", buf->key); #endif i++; // Move to the next record ms->tuplesOut++; @@ -6383,7 +6610,7 @@ char *next_MinSort(MinSortState *ms, external_sort_t *es, void *tupleBuffer, met done: #ifdef DEBUG - printf("Updating minimum in region\r\n"); + debug_log("Updating minimum in region\r\n"); #endif // After processing the current block, scan the rest of the region to find a smaller record if possible @@ -6415,7 +6642,7 @@ char *next_MinSort(MinSortState *ms, external_sort_t *es, void *tupleBuffer, met if (compareFn(dataVal, ms->current) == 0) { ms->nextIdx = k * ms->records_per_block + i; #ifdef DEBUG - printf("Next tuple at: %d k: %d i: %d\r\n", ms->nextIdx, k, i); + debug_log("Next tuple at: %d k: %d i: %d\r\n", ms->nextIdx, k, i); #endif goto done2; } @@ -6445,7 +6672,7 @@ char *next_MinSort(MinSortState *ms, external_sort_t *es, void *tupleBuffer, met } #ifdef DEBUG - printf("Updated minimum in block to: %d\r\n", ms->min[ms->regionIdx]); + debug_log("Updated minimum in block to: %d\r\n", ms->min[ms->regionIdx]); #endif } @@ -6502,9 +6729,11 @@ int flash_minsort( metrics_t *metric, int8_t (*compareFn)(void *a, void *b)) { #ifdef DEBUG - printf("*Flash Minsort*\n"); + debug_log("*Flash Minsort*\n"); #endif +#ifndef ARDUINO clock_t start = clock(); +#endif MinSortState ms; ms.buffer = buffer; @@ -6517,6 +6746,7 @@ int flash_minsort( int32_t blockIndex = 0; int16_t values_per_page = (es->page_size - es->headerSize) / es->record_size; uint8_t *outputBuffer = buffer + es->page_size; + unsigned long lastWritePos = *resultFilePtr; // test_record_t *buf; // Main sorting loop: fetches and writes sorted records in blocks @@ -6528,31 +6758,41 @@ int flash_minsort( if (count == values_per_page) { // Write block *((int32_t *)outputBuffer) = blockIndex; /* Block index */ *((int16_t *)(outputBuffer + BLOCK_COUNT_OFFSET)) = count; /* Block record count */ - count = 0; // Reset count for the next block // Write the block to the output file using the file interface's write method - if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->write(outputBuffer, blockIndex, es->page_size, outputFile)) { + ((file_iterator_state_t *)iteratorState)->fileInterface->seek(lastWritePos, outputFile); +#ifdef DEBUG + debug_log("Writing page flash minsort: blockIndex=%d, count=%d, filePosition=%ld\n", + blockIndex, count, count / PAGE_SIZE); +#endif + if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->writeRel(outputBuffer, es->page_size, 1, outputFile)) { return 9; // Return error code if writing to the output file fails } #ifdef DEBUG - printf("Wrote output block. Block index: %d\n", blockIndex); + debug_log("Wrote output block. Block index: %d\n", blockIndex); for (int k = 0; k < values_per_page; k++) { test_record_t *buf = (void *)(outputBuffer + es->headerSize + k * es->record_size); - printf("%d: Output Record: %d\n", k, buf->key); + debug_log("%d: Output Record: %d\n", k, buf->key); } #endif metric->num_writes++; + lastWritePos += es->page_size; blockIndex++; + count = 0; } } // Write the last block if there are remaining records if (count > 0) { + ((file_iterator_state_t *)iteratorState)->fileInterface->seek(lastWritePos, outputFile); *((int32_t *)outputBuffer) = blockIndex; /* Block index */ *((int16_t *)(outputBuffer + BLOCK_COUNT_OFFSET)) = count; /* Block record count */ - - if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->write(outputBuffer, blockIndex, es->page_size, outputFile)) { +#ifdef DEBUG + debug_log("Writing page flash minsort: blockIndex=%d, count=%d, filePosition=%ld\n", + blockIndex, count, count / PAGE_SIZE); +#endif + if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->writeRel(outputBuffer, es->page_size, 1, outputFile)) { return 9; // Return error code if writing to the output file fails } metric->num_writes++; @@ -6561,19 +6801,18 @@ int flash_minsort( } #ifdef DEBUG - printf("Number of sorted records: %d", ms.num_records); + debug_log("Number of sorted records: %d", ms.num_records); #endif ((file_iterator_state_t *)iteratorState)->fileInterface->flush(outputFile); close_MinSort(&ms, es); - +#ifndef ARDUINO clock_t end = clock(); - - *resultFilePtr = 0; +#endif #ifdef DEBUG - printf("Complete. Comparisons: %d MemCopies: %d\n", metric->num_compar, metric->num_memcpys); + debug_log("Complete. Comparisons: %d MemCopies: %d\n", metric->num_compar, metric->num_memcpys); #endif return 0; // Successful completion @@ -6616,19 +6855,26 @@ int flash_minsort( */ /******************************************************************************/ -/* -#define DEBUG 1 -#define DEBUG_OUTPUT 1 -#define DEBUG_READ 1 -*/ +// #define DEBUG 1 +// #define DEBUG_OUTPUT 1 +// #define DEBUG_READ 1 +#if defined(DEBUG) || defined(DEBUG_OUTPUT) || defined(DEBUG_READ) +#else +#ifndef debug_log +#define debug_log(...) ((void)0) +#endif +#endif -void readPage_sublist(MinSortStateSublist *ms, int pageNum, external_sort_t *es, metrics_t *metric) { +int8_t readPage_sublist(MinSortStateSublist *ms, int pageNum, external_sort_t *es, metrics_t *metric) { file_iterator_state_t *is = (file_iterator_state_t *)ms->iteratorState; void *fp = is->file; // Read page into the buffer if (0 == is->fileInterface->read(ms->buffer, pageNum, es->page_size, fp)) { - printf("MINSORT SUBLIST: Failed to read block.\n"); +#ifdef DEBUG + debug_log("MINSORT SUBLIST: Failed to read block.\n"); +#endif + return 0; } metric->num_reads++; @@ -6636,12 +6882,13 @@ void readPage_sublist(MinSortStateSublist *ms, int pageNum, external_sort_t *es, ms->lastBlockIdx = pageNum; #ifdef DEBUG_READ - printf("Reading block: %d Offset: %lu\n", pageNum, offset); + debug_log("Reading block: %d Offset: %lu\n", pageNum, es->key_offset); for (int k = 0; k < 31; k++) { test_record_t *buf = (void *)(ms->buffer + es->headerSize + k * es->record_size); - printf("%d: Record: %d\n", k, buf->key); + debug_log("%d: Record: %d\n", k, buf->key); } #endif + return 1; } int32_t getBlockId(MinSortStateSublist *ms) { @@ -6675,7 +6922,7 @@ void init_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, metrics_ // j = (ms->memoryAvailable - 2 * SORT_KEY_SIZE - INT_SIZE) / SORT_KEY_SIZE; j = (ms->memoryAvailable) / (SORT_KEY_SIZE + sizeof(uint8_t)); #ifdef FLASH_MINSORT_PRINT - printf("Memory overhead: %d Max regions: %d\r\n", 2 * SORT_KEY_SIZE + INT_SIZE, j); + debug_log("Memory overhead: %d Max regions: %d\r\n", 2 * SORT_KEY_SIZE + INT_SIZE, j); #endif // Memory allocation // Allocate minimum index in separate memory space (block 0 is input buffer, block 1 is output buffer) @@ -6689,8 +6936,8 @@ void init_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, metrics_ ms->min_set = malloc(ms->numRegions * sizeof(uint8_t)); ms->offset = malloc(ms->numRegions * sizeof(long)); #ifdef FLASH_MINSORT_PRINT - printf("Page size: %d, Memory size: %d Record size: %d, Number of records: %lu, Number of blocks: %d, Regions: %d\r\n", - es->page_size, ms->memoryAvailable, ms->record_size, ms->num_records, ms->numBlocks, ms->numRegions); + debug_log("Page size: %d, Memory size: %d Record size: %d, Number of records: %lu, Number of blocks: %d, Regions: %d\r\n", + es->page_size, ms->memoryAvailable, ms->record_size, ms->num_records, ms->numBlocks, ms->numRegions); #endif for (i = 0; i < ms->numRegions; i++) @@ -6707,12 +6954,12 @@ void init_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, metrics_ int numBlocksSublist = *(int32_t *)ms->buffer; /* Retrieve block id (indexed from 0) to compute count of blocks in sublist */ #if DEBUG - printf("Read block: %d", lastBlock); - printf(" Num: %d\n", numBlocksSublist); + debug_log("Read block: %d", lastBlock); + debug_log(" Num: %d\n", numBlocksSublist); for (int k = 0; k < 31; k++) { test_record_t *buf = (void *)(ms->buffer + es->headerSize + k * es->record_size); - printf("%d: Record: %d\n", k, buf->key); + debug_log("%d: Record: %d\n", k, buf->key); } #endif lastBlock = lastBlock - numBlocksSublist; @@ -6723,22 +6970,22 @@ void init_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, metrics_ memcpy(ms->min + es->record_size * regionIdx, getTuple_sublist(ms, 0, es), es->value_size); metric->num_memcpys++; ms->min_set[regionIdx] = true; - ms->offset[regionIdx] = lastBlock * es->page_size + es->headerSize + ms->fileOffset; + ms->offset[regionIdx] = lastBlock * es->page_size + es->headerSize; #if DEBUG - printf("New min. Index: %d", regionIdx); - printf(" Min: %u", ms->min[regionIdx]); - printf(" Offset: %lu\n", ms->offset[regionIdx]); + debug_log("New min. Index: %d", regionIdx); + debug_log(" Min: %u", ms->min[regionIdx]); + debug_log(" Offset: %lu\n", ms->offset[regionIdx]); #endif regionIdx--; lastBlock--; } #ifdef DEBUG - printf("Region summary\n"); + debug_log("Region summary\n"); for (i = 0; i < ms->numRegions; i++) { - printf("Reg: %d", i); - printf(" Min: %u", ms->min[i]); - printf(" Offset: %lu\n", ms->offset[i]); + debug_log("Reg: %d", i); + debug_log(" Min: %u", ms->min[i]); + debug_log(" Offset: %lu\n", ms->offset[i]); } #endif @@ -6789,10 +7036,17 @@ char *next_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, void *t curBlk = startIndex / es->page_size; // Smallest value is at current index - if (curBlk != ms->lastBlockIdx) { // Read block into buffer - readPage_sublist(ms, curBlk, es, metric); + if (curBlk != ms->lastBlockIdx) { + /* Checking for read failure here */ + if (0 == readPage_sublist(ms, curBlk, es, metric)) { + // If we can't read the block, this region is exhausted. + ms->offset[ms->regionIdx] = -1; + ms->min_set[ms->regionIdx] = false; + // Recursive call to try again with this region disabled + return next_MinSort_sublist(ms, es, tupleBuffer, metric); + } } - } else { // Use next record in current block + } else { i = ms->nextIdx; } @@ -6802,7 +7056,7 @@ char *next_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, void *t #ifdef DEBUG test_record_t *buf = (test_record_t *)(ms->buffer + es->headerSize + i * es->record_size); buf = (test_record_t *)tupleBuffer; - printf("Returning tuple: %d\n", buf->key); + debug_log("Returning tuple: %d\n", buf->key); #endif // Advance to next tuple in block @@ -6814,10 +7068,15 @@ char *next_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, void *t i = 0; int32_t currentBlockId = getBlockId(ms); curBlk++; - readPage_sublist(ms, curBlk, es, metric); - if (currentBlockId >= getBlockId(ms)) { - // Transitioned to a block in a new sublist - // ms->min[ms->regionIdx] = INT_MAX; + + if (0 == readPage_sublist(ms, curBlk, es, metric)) { + // Read failed (EOF). Mark region as finished. + ms->offset[ms->regionIdx] = -1; + ms->min_set[ms->regionIdx] = false; + } + /* Only process the new block if read was successful */ + else if (currentBlockId >= getBlockId(ms)) { + // Transitioned to a block in a new sublist (ID check) ms->offset[ms->regionIdx] = -1; ms->min_set[ms->regionIdx] = false; } else { @@ -6843,7 +7102,7 @@ char *next_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, void *t } #ifdef DEBUG - printf("Updated minimum in block to: %d\r\n", ms->min[ms->regionIdx]); + debug_log("Updated minimum in block to: %d\r\n", ms->min[ms->regionIdx]); #endif return tupleBuffer; @@ -6909,7 +7168,7 @@ int flash_minsort_sublist( int32_t blockIndex = 0; int16_t values_per_page = (es->page_size - es->headerSize) / es->record_size; char *outputBuffer = buffer + es->page_size; - unsigned long lastWritePos = ms.fileOffset + es->num_pages * es->page_size; + unsigned long lastWritePos = *resultFilePtr; // Write while (next_MinSort_sublist(&ms, es, (char *)(outputBuffer + count * es->record_size + es->headerSize), metric) != NULL) { @@ -6924,6 +7183,10 @@ int flash_minsort_sublist( // Force seek to end of file as outputFile is also inputFile and have been reading it ((file_iterator_state_t *)iteratorState)->fileInterface->seek(lastWritePos, outputFile); // Write the block to the output file using the file interface's write method +#ifdef DEBUG + debug_log("Writing page flash minsort sublist: blockIndex=%d, count=%d, filePosition=%ld\n", + blockIndex, count, lastWritePos / PAGE_SIZE); +#endif if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->writeRel(outputBuffer, es->page_size, 1, outputFile)) { return 9; // Return error code if writing to the output file fails } @@ -6936,7 +7199,7 @@ int flash_minsort_sublist( printf("Last write pos: %lu Block: %d\n", lastWritePos, blockIndex); */ #ifdef DEBUG_OUTPUT - printf("Wrote output block. Block index: %d\n", blockIndex); + debug_log("Wrote output block. Block index: %d\n", blockIndex); for (int k = 0; k < values_per_page; k++) { test_record_t *buf = (void *)(outputBuffer + es->headerSize + k * es->record_size); printf("%d: Output Record: %d\n", k, buf->key); @@ -6950,11 +7213,13 @@ int flash_minsort_sublist( if (count > 0) { // fseek(outputFile, lastWritePos, SEEK_SET); ((file_iterator_state_t *)iteratorState)->fileInterface->seek(lastWritePos, outputFile); - - *((int32_t *)buffer) = blockIndex; /* Block index */ - *((int16_t *)(buffer + BLOCK_COUNT_OFFSET)) = count; /* Block record count */ - - if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->write(outputBuffer, es->page_size, 1, outputFile)) { + *((int32_t *)outputBuffer) = blockIndex; /* Block index */ + *((int16_t *)(outputBuffer + BLOCK_COUNT_OFFSET)) = count; /* Block record count */ +#ifdef DEBUG + debug_log("Writing last page minsort sublist: blockIndex=%d, count=%d, filePosition=%ld\n", + blockIndex, count, lastWritePos / PAGE_SIZE); +#endif + if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->writeRel(outputBuffer, es->page_size, 1, outputFile)) { return 9; // Return error code if writing to the output file fails } @@ -6967,7 +7232,6 @@ int flash_minsort_sublist( close_MinSort_sublist(&ms, es); - *resultFilePtr = 0; free(ms.min); free(ms.offset); free(ms.current); @@ -7260,13 +7524,19 @@ void shiftUp_rev(char *buffer, /************************************************************sortWrapper.c************************************************************/ -#define PRINT_METRIC +#ifndef ARDUINO +#endif -// External declaration for setupFile function -extern void *setupFile(const char *filename); +// #define PRINT_METRIC +// #define DEBUG +// #define PRINT_ERRORS +#if defined(DEBUG) || defined(PRINT_METRIC) || defined(PRINT_ERRORS) +#else +#ifndef debug_log +#define debug_log(...) ((void)0) +#endif -// Forward declaration for pure in-memory sort (no file I/O) -file_iterator_state_t *startPureMemorySort(sortData *data, embedDBOperator *op); +#endif /** * @brief Pure in-memory sort that avoids file I/O completely for very small datasets @@ -7275,21 +7545,28 @@ file_iterator_state_t *startPureMemorySort(sortData *data, embedDBOperator *op); * @return file_iterator_state_t* Iterator for reading sorted results from memory */ file_iterator_state_t *startPureMemorySort(sortData *data, embedDBOperator *op) { - printf("DEBUG: Starting pure in-memory sort\n"); - +#ifdef DEBUG + debug_log("DEBUG: Starting pure in-memory sort\n"); +#endif int record_count = 0; while (exec(op->input)) { record_count++; if (record_count > 10) { // Safety limit - printf("ERROR: Too many records for pure in-memory sort\n"); +#ifdef PRINT_ERRORS + debug_log("ERROR: Too many records for pure in-memory sort\n"); +#endif return NULL; } } - printf("DEBUG: Found %d records for pure in-memory sort\n", record_count); +#ifdef DEBUG + debug_log("DEBUG: Found %d records for pure in-memory sort\n", record_count); +#endif if (record_count == 0) { - printf("DEBUG: No records to sort\n"); +#ifdef DEBUG + debug_log("DEBUG: No records to sort\n"); +#endif file_iterator_state_t *iteratorState = malloc(sizeof(file_iterator_state_t)); if (iteratorState == NULL) { return NULL; @@ -7307,7 +7584,9 @@ file_iterator_state_t *startPureMemorySort(sortData *data, embedDBOperator *op) void *buffer = malloc(record_count * data->recordSize); if (buffer == NULL) { - printf("ERROR: Failed to allocate memory for pure in-memory sort\n"); +#ifdef PRINT_ERRORS + debug_log("ERROR: Failed to allocate memory for pure in-memory sort\n"); +#endif return NULL; } @@ -7322,23 +7601,31 @@ file_iterator_state_t *startPureMemorySort(sortData *data, embedDBOperator *op) records_read++; } - printf("DEBUG: Read %d records into memory buffer\n", records_read); +#ifdef DEBUG + debug_log("DEBUG: Read %d records into memory buffer\n", records_read); +#endif // Sort the records in memory using quicksort metrics_t metrics = {0}; int sort_result = in_memory_quick_sort(buffer, records_read, data->recordSize, data->keyOffset, data->compareFn, &metrics); if (sort_result != 0) { - printf("ERROR: In-memory sort failed\n"); +#ifdef PRINT_ERRORS + debug_log("ERROR: In-memory sort failed\n"); +#endif free(buffer); return NULL; } - printf("DEBUG: Pure in-memory sort completed successfully\n"); +#ifdef DEBUG + debug_log("DEBUG: Pure in-memory sort completed successfully\n"); +#endif file_iterator_state_t *iteratorState = malloc(sizeof(file_iterator_state_t)); if (iteratorState == NULL) { - printf("ERROR: Failed to allocate iterator state\n"); +#ifdef PRINT_ERRORS + debug_log("ERROR: Failed to allocate iterator state\n"); +#endif free(buffer); return NULL; } @@ -7366,14 +7653,15 @@ file_iterator_state_t *startPureMemorySort(sortData *data, embedDBOperator *op) * @param file The file being written to * @return int8_t */ -int8_t writePageWithHeader(void *buffer, const uint32_t blockIndex, const uint32_t numberOfValues, const uint32_t pageSize, const embedDBFileInterface *fileInterface, void *file) { - memcpy(buffer, &blockIndex, sizeof(int32_t)); - memcpy(buffer + sizeof(uint32_t), &numberOfValues, sizeof(int16_t)); +int8_t writePageWithHeader(void *buffer, const uint32_t blockIndex, const uint16_t numberOfValues, const uint32_t pageSize, const embedDBFileInterface *fileInterface, void *file) { + memcpy(buffer, &blockIndex, sizeof(uint32_t)); + memcpy(buffer + sizeof(uint32_t), &numberOfValues, sizeof(uint16_t)); fileInterface->write(buffer, blockIndex, pageSize, file); - if (fileInterface->error(file)) { - printf("ERROR: SORT: Failed to write unsorted data"); +#ifdef PRINT_ERRORS + debug_log("ERROR: SORT: Failed to write unsorted data"); +#endif return 1; } @@ -7385,7 +7673,7 @@ int8_t writePageWithHeader(void *buffer, const uint32_t blockIndex, const uint32 * * @param data The operator data * @param op The previous operator - * @param unsortedFile A prexisting file that the row data will be writen to + * @param unsortedFile A preexisting file that the row data will be written to * @param recordSize The size of the data * @param keySize The size of the key * @param keyOffset The offset of the key with in the record (# of bytes) @@ -7394,14 +7682,21 @@ int8_t writePageWithHeader(void *buffer, const uint32_t blockIndex, const uint32 */ uint32_t loadRowData(sortData *data, embedDBOperator *op, void *unsortedFile) { uint32_t count = 0; - int32_t blockIndex = 0; - int16_t valuesPerPage = (PAGE_SIZE - BLOCK_HEADER_SIZE) / data->recordSize; + uint32_t blockIndex = 0; + uint16_t valuesPerPage = (PAGE_SIZE - BLOCK_HEADER_SIZE) / data->recordSize; + +#ifdef DEBUG + debug_log("DEBUG loadRowData: PAGE_SIZE=%d, BLOCK_HEADER_SIZE=%d, recordSize=%d, valuesPerPage=%d\n", + PAGE_SIZE, BLOCK_HEADER_SIZE, data->recordSize, valuesPerPage); +#endif void *buffer = malloc(PAGE_SIZE); if (buffer == NULL) { - printf("ERROR: SORT: buffer malloc failed"); - return 0; +#ifdef PRINT_ERRORS + debug_log("ERROR: SORT: buffer malloc failed"); +#endif + return 1; } // Write row data to file @@ -7413,15 +7708,16 @@ uint32_t loadRowData(sortData *data, embedDBOperator *op, void *unsortedFile) { buffer = NULL; return 0; } - blockIndex++; } // Offset of the data in the page - uint32_t rowOffset = count % valuesPerPage * data->recordSize + BLOCK_HEADER_SIZE; + uint32_t rowOffset = (count % valuesPerPage) * data->recordSize + BLOCK_HEADER_SIZE; if (rowOffset + data->recordSize > PAGE_SIZE) { - printf("ERROR: SORT: error calculating row offset"); +#ifdef PRINT_ERRORS + debug_log("ERROR: SORT: error calculating row offset"); +#endif free(buffer); buffer = NULL; return 0; @@ -7429,6 +7725,26 @@ uint32_t loadRowData(sortData *data, embedDBOperator *op, void *unsortedFile) { // Write data to buffer memcpy((uint8_t *)buffer + rowOffset, op->input->recordBuffer, data->recordSize); +#ifdef DEBUG + if (count < 10) { + debug_log("DEBUG loadRowData record %d: ", count); + for (int i = 0; i < data->recordSize; i++) { + debug_log("%02x ", ((uint8_t *)op->input->recordBuffer)[i]); + } + debug_log("\n"); + + // Also show what we wrote to the buffer + debug_log("DEBUG wrote to buffer at offset %d: ", rowOffset); + for (int i = 0; i < data->recordSize; i++) { + debug_log("%02x ", ((uint8_t *)buffer)[rowOffset + i]); + } + debug_log("\n"); + } + if (count < 10 || count % 1000 == 0) { + int32_t *keyPtr = (int32_t *)(op->input->recordBuffer + data->keyOffset); + debug_log("DEBUG loadRowData: count=%d, rowOffset=%d, key=%d\n", count, rowOffset, *keyPtr); + } +#endif count++; @@ -7444,9 +7760,12 @@ uint32_t loadRowData(sortData *data, embedDBOperator *op, void *unsortedFile) { buffer = NULL; return 0; } - data->fileInterface->flush(unsortedFile); +#ifdef DEBUG + debug_log("DEBUG loadRowData: finished, totalRecords=%d\n", count); +#endif + // Clean up free(buffer); buffer = NULL; @@ -7464,57 +7783,68 @@ void prepareSort(embedDBOperator *op) { data->keyOffset = getColOffsetFromSchema(op->schema, data->colNum); data->recordSize = getRecordSizeFromSchema(op->schema); data->keySize = op->schema->columnSizes[data->colNum]; +#ifdef DEBUG + debug_log("DEBUG prepareSort: recordSize=%d, keySize=%d, keyOffset=%d, colNum=%d\n", + data->recordSize, data->keySize, data->keyOffset, data->colNum); + debug_log("DEBUG prepareSort: schema has %d columns\n", op->schema->numCols); + for (int i = 0; i < op->schema->numCols; i++) { + debug_log(" Column %d: size=%d\n", i, op->schema->columnSizes[i]); + } +#endif // A columns size will be negative if the column is signed // and positive if value is unsigned if (data->keySize < 0) { data->keySize = -1 * data->keySize; } - -#ifdef ARDUINO - // For Arduino Due, use pure in-memory sort to completely avoid SD card I/O issues - data->fileIterator = startPureMemorySort(data, op); - if (data->fileIterator == NULL) { - printf("ERROR: Pure memory sort failed\n"); + if (data->fileInterface == NULL || data->fileInterface->setup == NULL) { +#ifdef PRINT_ERRORS + debug_log("ERROR: File interface or setup function not provided while initializing ORDER BY operator\n"); +#endif return; } - return; -#endif - // Set up files - void *unsortedFile = setupFile(SORT_DATA_LOCATION); - void *sortedFile = setupFile(SORT_ORDER_LOCATION); + char *tmp1 = data->fileInterface->tempFilePath(); + char *tmp2 = data->fileInterface->tempFilePath(); + + void *unsortedFile = data->fileInterface->setup(tmp1); + void *sortedFile = data->fileInterface->setup(tmp2); + free(tmp1); + free(tmp2); if (unsortedFile == NULL || sortedFile == NULL) { #ifdef PRINT_ERRORS - printf("ERROR: Failed to open files while initializing ORDER BY operator"); + debug_log("ERROR: Failed to allocate file handles while initializing ORDER BY operator\n"); #endif return; } - const uint8_t unsortedOpen = data->fileInterface->open(unsortedFile, EMBEDDB_FILE_MODE_W_PLUS_B); const uint8_t sortedOpen = data->fileInterface->open(sortedFile, EMBEDDB_FILE_MODE_W_PLUS_B); if (!unsortedOpen || !sortedOpen) { #ifdef PRINT_ERRORS - printf("ERROR: Failed to open files while initializing ORDER BY operator"); + debug_log("ERROR: Failed to open files while initializing ORDER BY operator"); #endif return; } // Load row data data->count = loadRowData(data, op, unsortedFile); - // Start sorting file_iterator_state_t *iteratorState = startSort(data, unsortedFile, sortedFile); if (iteratorState == NULL) { - printf("ERROR: Sort failed"); +#ifdef PRINT_ERRORS + debug_log("ERROR: Sort failed"); +#endif return; } // Finish iteratorState->file = sortedFile; data->fileInterface->close(unsortedFile); + if (data->fileInterface->removeFile) { + data->fileInterface->removeFile(unsortedFile); + } data->fileIterator = iteratorState; } @@ -7539,19 +7869,14 @@ file_iterator_state_t *startSort(sortData *data, void *unsortedFile, void *sorte es.page_size = PAGE_SIZE; es.num_pages = (uint32_t)ceil((float)data->count / ((es.page_size - es.headerSize) / es.record_size)); -// Reduce buffer size for Arduino -#ifdef ARDUINO - const int buffer_max_pages = 1; // Reduced to minimum for Arduino -#else const int buffer_max_pages = 4; -#endif char *buffer = malloc(buffer_max_pages * es.page_size + es.record_size); char *tuple_buffer = buffer + es.page_size * buffer_max_pages; if (buffer == NULL) { #ifdef PRINT_ERRORS - printf("ERROR: SORT: buffer malloc failed m\n"); + debug_log("ERROR: SORT: buffer malloc failed m\n"); #endif return NULL; } @@ -7560,7 +7885,7 @@ file_iterator_state_t *startSort(sortData *data, void *unsortedFile, void *sorte file_iterator_state_t *iteratorState = malloc(sizeof(file_iterator_state_t)); if (iteratorState == NULL) { #ifdef PRINT_ERRORS - printf("Error: SORT: iterator malloc failed\n"); + debug_log("Error: SORT: iterator malloc failed\n"); #endif free(buffer); buffer = NULL; @@ -7569,7 +7894,7 @@ file_iterator_state_t *startSort(sortData *data, void *unsortedFile, void *sorte iteratorState->file = unsortedFile; iteratorState->recordsRead = 0; - iteratorState->totalRecords = data->count; // Total records from the previous while loop + iteratorState->totalRecords = data->count; iteratorState->recordSize = es.record_size; iteratorState->fileInterface = data->fileInterface; iteratorState->currentRecord = 0; @@ -7584,38 +7909,24 @@ file_iterator_state_t *startSort(sortData *data, void *unsortedFile, void *sorte long result_file_ptr = 0; int err; -// Use simpler sort for Arduino with small datasets -#ifdef ARDUINO - printf("DEBUG: Starting Arduino sort with %d records\n", data->count); - if (data->count <= 100) { // Use flash_minsort for all datasets on Arduino (more memory efficient) - printf("DEBUG: Using flash_minsort for small dataset\n"); - err = flash_minsort(iteratorState, tuple_buffer, sortedFile, buffer, buffer_max_pages * es.page_size, &es, &result_file_ptr, &metrics, data->compareFn); - } else { - printf("DEBUG: Using flash_minsort for large dataset\n"); - // Use flash_minsort for larger datasets (more memory efficient than adaptive_sort) - err = flash_minsort(iteratorState, tuple_buffer, sortedFile, buffer, buffer_max_pages * es.page_size, &es, &result_file_ptr, &metrics, data->compareFn); - } - printf("DEBUG: Arduino sort completed with error code: %d\n", err); -#else - // Use adaptive sort on desktop + int8_t runGenOnly = false; // Run full sort operation int8_t writeReadRatio = 19; // 1.97 * 10 => 19 err = adaptive_sort(readNextRecord, iteratorState, tuple_buffer, sortedFile, buffer, buffer_max_pages, &es, &result_file_ptr, &metrics, data->compareFn, runGenOnly, writeReadRatio, data); -#endif #ifdef PRINT_METRIC - printf("\tComplete. Comparisons: %d Writes: %d Reads: %d Memcpys: %d\n", metrics.num_compar, metrics.num_writes, metrics.num_reads, metrics.num_memcpys); + debug_log("\tComplete. Comparisons: %d Writes: %d Reads: %d Memcpys: %d\n", metrics.num_compar, metrics.num_writes, metrics.num_reads, metrics.num_memcpys); #endif iteratorState->resultFile = result_file_ptr; #ifdef PRINT_ERRORS if (8 == err) { - printf("Out of memory!\n"); + debug_log("Out of memory!\n"); } else if (10 == err) { - printf("File Read Error!\n"); + debug_log("File Read Error!\n"); } else if (9 == err) { - printf("File Write Error!\n"); + debug_log("File Write Error!\n"); } #endif @@ -7643,64 +7954,89 @@ uint8_t readNextRecord(void *data, void *buffer) { return 1; // No more records left to read } -#ifdef ARDUINO - // For pure memory sort on Arduino, read directly from memory buffer - if (iteratorState->file != NULL && iteratorState->resultFile == 0) { - memcpy(buffer, (char *)iteratorState->file + iteratorState->recordsRead * iteratorState->recordSize, - iteratorState->recordSize); - iteratorState->recordsRead++; - iteratorState->currentRecord++; - return 0; - } -#endif - uint32_t recordPerPage = (PAGE_SIZE - BLOCK_HEADER_SIZE) / iteratorState->recordSize; // Read next page if current buffer is empty if (iteratorState->currentRecord % recordPerPage == 0 || iteratorState->recordsRead == 0) { - iteratorState->fileInterface->seek(iteratorState->currentRecord / recordPerPage * PAGE_SIZE + iteratorState->resultFile, iteratorState->file); + uint32_t seekOffset = iteratorState->resultFile + (iteratorState->currentRecord / recordPerPage) * PAGE_SIZE; + + iteratorState->fileInterface->seek(seekOffset, iteratorState->file); iteratorState->fileInterface->readRel(((sortData *)data)->readBuffer, PAGE_SIZE, 1, iteratorState->file); +#ifdef DEBUG + if (iteratorState->recordsRead == 0 || iteratorState->recordsRead % 1000 == 0) { + debug_log("DEBUG readNextRecord: pageNum=%d, seekOffset=%d, recordsRead=%d\n", + iteratorState->currentRecord / recordPerPage, seekOffset, iteratorState->recordsRead); + } +#endif + if (((sortData *)data)->fileInterface->error(iteratorState->file)) { - printf("ERROR: SORT: next record read failed"); +#ifdef PRINT_ERRORS + debug_log("ERROR: SORT: next record read failed"); +#endif return 2; } } - // Copy result to ouput buffer - memcpy(buffer, ((sortData *)data)->readBuffer + BLOCK_HEADER_SIZE + iteratorState->recordSize * (iteratorState->currentRecord % recordPerPage), iteratorState->recordSize); - iteratorState->recordsRead++; - iteratorState->currentRecord++; + // Copy result to output buffer + uint16_t valuesInPage; + memcpy(&valuesInPage, ((sortData *)data)->readBuffer + sizeof(uint32_t), + sizeof(uint16_t)); + uint32_t recordIndexInPage = iteratorState->currentRecord % recordPerPage; +#ifdef DEBUG + +#endif + + if (recordIndexInPage >= valuesInPage) { + return 1; + } + uint32_t copyOffset = BLOCK_HEADER_SIZE + iteratorState->recordSize * recordIndexInPage; + memcpy(buffer, ((sortData *)data)->readBuffer + copyOffset, iteratorState->recordSize); #ifdef DEBUG - printf("DEBUG: ROWDATA from file:\n"); - for (int i = 0; i < iteratorState->recordSize - SORT_KEY_SIZE; i++) { - printf("%2x ", ((uint8_t *)buffer)[i]); + if (iteratorState->recordsRead < 10 || iteratorState->recordsRead % 1000 == 0) { + int32_t *keyPtr = (int32_t *)(buffer + ((sortData *)data)->keyOffset); + debug_log("DEBUG readNextRecord: recordsRead=%d, currentRecord=%d, pageIdx=%d, recordInPage=%d, copyOffset=%d, key=%d\n", + iteratorState->recordsRead, iteratorState->currentRecord, iteratorState->currentRecord / recordPerPage, + recordIndexInPage, copyOffset, *keyPtr); + uint32_t blockIdx; + memcpy(&blockIdx, ((sortData *)data)->readBuffer, sizeof(uint32_t)); + debug_log("READ PAGE hdr: blockIdx=%u values=%u\n", + blockIdx, valuesInPage); + debug_log("PAGE HEADER: page=%d values=%d\n", + iteratorState->currentRecord / recordPerPage, + valuesInPage); + int32_t *key0 = (int32_t *)(((sortData *)data)->readBuffer + BLOCK_HEADER_SIZE + ((sortData *)data)->keyOffset); + int32_t *keyLast = (int32_t *)(((sortData *)data)->readBuffer + BLOCK_HEADER_SIZE + (recordPerPage - 1) * iteratorState->recordSize + ((sortData *)data)->keyOffset); + debug_log(" First key on page: %d, Last key on page: %d\n", *key0, *keyLast); } - printf("\n"); #endif + iteratorState->recordsRead++; + iteratorState->currentRecord++; + + // #ifdef DEBUG + // printf("DEBUG: ROWDATA from file:\n"); + // for (int i = 0; i < iteratorState->recordSize - SORT_KEY_SIZE; i++) { + // printf("%2x ", ((uint8_t *)buffer)[i]); + // } + // printf("\n"); + // #endif return 0; } void closeSort(file_iterator_state_t *iteratorState) { -#ifdef ARDUINO - // For pure memory sort, we need to free the memory buffer - if (iteratorState->file != NULL && iteratorState->resultFile == 0) { - free(iteratorState->file); - iteratorState->file = NULL; - return; - } -#endif - if (iteratorState->file != NULL) { iteratorState->fileInterface->close(iteratorState->file); + if (iteratorState->fileInterface->removeFile) { + iteratorState->fileInterface->removeFile(iteratorState->file); + } iteratorState->file = NULL; } } /** - * @brief Initalizes default metric values + * @brief Initializes default metric values * * @return metrics_t */ diff --git a/lib/Distribution/embedDB.h b/lib/Distribution/embedDB.h index 1416277..76f2fbd 100644 --- a/lib/Distribution/embedDB.h +++ b/lib/Distribution/embedDB.h @@ -356,7 +356,7 @@ typedef struct { /** * @brief Erases a span of paes from file - * @param startPage The first page to earse + * @param startPage The first page to erase * @param pageSize The page to erase up to (exclusive) * @param file The file data that was stored in embedDBState->dataFile etc * @return 1 for success and 0 for failure @@ -412,6 +412,24 @@ typedef struct { */ int32_t (*tell)(void *file); + /** + * @brief Pointer to external function for file setup + */ + void *(*setup)(const char *filename); + /** + * @brief Pointer to external function for file teardown + */ + void (*teardown)(void *file); + /** + * @brief Pointer to platform specific tmp file path + */ + char *(*tempFilePath)(void); + + /** + * @brief Pointer to file for deletion + */ + int8_t (*removeFile)(void *file); + } embedDBFileInterface; struct activeRule; @@ -435,7 +453,7 @@ typedef struct { id_t nextIdxPageId; /* Next logical page id for index. Page id is an incrementing value and may not always be same as physical page id. */ id_t nextVarPageId; /* Page number of next var page to be written */ uint32_t nextRLCPhysicalPageLocation; /* Physical page number for the location for the next record-level-consistency page */ - uint32_t rlcPhysicalStartingPage; /* Physical page number for the starting page of the record-level consistnecy pages */ + uint32_t rlcPhysicalStartingPage; /* Physical page number for the starting page of the record-level consistency pages */ id_t currentVarLoc; /* Current variable address offset to write at (bytes from beginning of file) */ void *buffer; /* Pre-allocated memory buffer for use by algorithm */ spline *spl; /* Spline model */ @@ -608,14 +626,14 @@ uint32_t embedDBVarDataStreamRead(embedDBState *state, embedDBVarDataStream *str /** * @brief Flushes output buffer. * @param state algorithm state structure - * @returns 0 if successul and a non-zero value otherwise + * @returns 0 if successful and a non-zero value otherwise */ int8_t embedDBFlush(embedDBState *state); /** * @brief Flushes output buffer. * @param state algorithm state structure - * @returns 0 if successul and a non-zero value otherwise + * @returns 0 if successful and a non-zero value otherwise */ int8_t embedDBFlushVar(embedDBState *state); @@ -668,7 +686,7 @@ id_t writeIndexPage(embedDBState *state, void *buffer); id_t writeVariablePage(embedDBState *state, void *buffer); /** - * @brief Writes a temporary page when using record-levek-consistency to storage. + * @brief Writes a temporary page when using record-level-consistency to storage. * @param state embedDB algorithm state structure * @param pageNum Page number to read * @return Returns 0 for success and non-zero value for an error. @@ -1778,12 +1796,6 @@ void shiftUp_rev(char *buffer, #ifndef SORT_WRAPPER_H #define SORT_WRAPPER_H -#if defined(DESKTOP) -#endif - -#define SORT_DATA_LOCATION "sort_data.bin" -#define SORT_ORDER_LOCATION "sort_order.bin" - typedef struct embedDBOperator embedDBOperator; typedef struct sortData { @@ -1801,7 +1813,7 @@ typedef struct sortData { } sortData; /** - * @brief Initalizes default metric values + * @brief Initializes default metric values * * @return metrics_t */ @@ -1821,6 +1833,14 @@ metrics_t initMetric(); */ uint32_t loadRowData(sortData *data, embedDBOperator *op, void *unsortedFile); +/** + * @brief Pure in-memory sort that avoids file I/O completely for very small datasets + * @param data Sort configuration data + * @param op The operator to read data from + * @return file_iterator_state_t* Iterator for reading sorted results from memory + */ +file_iterator_state_t *startPureMemorySort(sortData *data, embedDBOperator *op); + /** * @brief The data given in the unsortedFile is sorted and stored in the sortedFile *