diff --git a/lib/Debug/debug_print.c b/lib/Debug/debug_print.c new file mode 100644 index 00000000..09032a1f --- /dev/null +++ b/lib/Debug/debug_print.c @@ -0,0 +1,26 @@ +#include "debug_print.h" + +#include +#include +#include + + +#if defined(_WIN32) || defined(_WIN64) +#include +#define write _write +#else +#include +#endif + +void debug_log(const char *format, ...) { + char buf[256]; + va_list ap; + va_start(ap, format); + int n = vsnprintf(buf, sizeof(buf), format, ap); + va_end(ap); + if (n > 0) { + if (n > (int)sizeof(buf)) n = sizeof(buf); + /* Use low-level write to avoid stdio buffering/locks that can block under some debuggers/targets */ + (void)write(2, buf, n); + } +} diff --git a/lib/Debug/debug_print.h b/lib/Debug/debug_print.h new file mode 100644 index 00000000..4aba361a --- /dev/null +++ b/lib/Debug/debug_print.h @@ -0,0 +1,8 @@ +#ifndef DEBUG_PRINT_H_ +#define DEBUG_PRINT_H_ + +#include + +void debug_log(const char *format, ...); + +#endif // DEBUG_PRINT_H_ diff --git a/lib/Desktop-File-Interface/desktopFileInterface.c b/lib/Desktop-File-Interface/desktopFileInterface.c index 86ffefb3..8fb4a7cc 100644 --- a/lib/Desktop-File-Interface/desktopFileInterface.c +++ b/lib/Desktop-File-Interface/desktopFileInterface.c @@ -5,7 +5,7 @@ typedef struct { FILE *file; } FILE_INFO; -void *setupFile(char *filename) { +void *setupFile(const char *filename) { FILE_INFO *fileInfo = malloc(sizeof(FILE_INFO)); int nameLen = strlen(filename); fileInfo->filename = calloc(1, nameLen + 1); @@ -22,6 +22,27 @@ void tearDownFile(void *file) { free(file); } +int8_t FILE_REMOVE(void *file) { + if (file == NULL) return 1; + FILE_INFO *fileInfo = (FILE_INFO *)file; + + if (fileInfo->file != NULL) { + fclose(fileInfo->file); + fileInfo->file = NULL; + } + + int8_t result = 1; + if (fileInfo->filename != NULL) { + if (remove(fileInfo->filename) != 0) { + result = 0; +#ifdef PRINT_ERRORS + perror("ERROR: Failed to remove temp file"); +#endif + } + return result; + } +} + int8_t FILE_READ(void *buffer, uint32_t pageNum, uint32_t pageSize, void *file) { FILE_INFO *fileInfo = (FILE_INFO *)file; fseek(fileInfo->file, pageSize * pageNum, SEEK_SET); @@ -121,6 +142,39 @@ int32_t FILE_TELL(void *file) { return ftell(fileInfo->file); } +char *tempFilePath(void) { + static char tempPathBuffer[256]; + +#if defined(_WIN32) || defined(_WIN64) + char *path = _tempnam(NULL, "embeddb_"); + if (path != NULL) { + strncpy(tempPathBuffer, path, sizeof(tempPathBuffer) - 1); + tempPathBuffer[sizeof(tempPathBuffer) - 1] = '\0'; + free(path); + } else { + /* Fallback */ + snprintf(tempPathBuffer, sizeof(tempPathBuffer), + "embeddb_%lu.tmp", (unsigned long)rand()); + } + +#else + /* POSIX systems */ + snprintf(tempPathBuffer, sizeof(tempPathBuffer), + "/tmp/embeddb_%luXXXXXX", (unsigned long)rand()); + + int fd = mkstemp(tempPathBuffer); + if (fd >= 0) { + close(fd); + } +#endif + + char *out = malloc(strlen(tempPathBuffer) + 1); + if (out) { + strcpy(out, tempPathBuffer); + } + return out; +} + embedDBFileInterface *getFileInterface() { embedDBFileInterface *fileInterface = malloc(sizeof(embedDBFileInterface)); fileInterface->close = FILE_CLOSE; @@ -135,6 +189,10 @@ embedDBFileInterface *getFileInterface() { fileInterface->writeRel = FILE_WRITE_REL; fileInterface->seek = FILE_SEEK; fileInterface->tell = FILE_TELL; + fileInterface->setup = setupFile; + fileInterface->teardown = tearDownFile; + fileInterface->removeFile = FILE_REMOVE; + fileInterface->tempFilePath = tempFilePath; return fileInterface; } @@ -152,5 +210,9 @@ embedDBFileInterface *getMockEraseFileInterface() { fileInterface->writeRel = FILE_WRITE_REL; fileInterface->seek = FILE_SEEK; fileInterface->tell = FILE_TELL; + fileInterface->setup = setupFile; + fileInterface->teardown = tearDownFile; + fileInterface->removeFile = FILE_REMOVE; + fileInterface->tempFilePath = tempFilePath; return fileInterface; } diff --git a/lib/Desktop-File-Interface/desktopFileInterface.h b/lib/Desktop-File-Interface/desktopFileInterface.h index 25b20b9a..1baf1467 100644 --- a/lib/Desktop-File-Interface/desktopFileInterface.h +++ b/lib/Desktop-File-Interface/desktopFileInterface.h @@ -17,7 +17,7 @@ extern "C" { /* File functions */ embedDBFileInterface *getFileInterface(); embedDBFileInterface *getMockEraseFileInterface(); -void *setupFile(char *filename); +void *setupFile(const char *filename); void tearDownFile(void *file); #ifdef __cplusplus diff --git a/lib/SD-File-Interface/SDFileInterface.c b/lib/SD-File-Interface/SDFileInterface.c index 58da4602..578965cb 100644 --- a/lib/SD-File-Interface/SDFileInterface.c +++ b/lib/SD-File-Interface/SDFileInterface.c @@ -40,7 +40,7 @@ typedef struct { SD_FILE *sdFile; } SD_FILE_INFO; -void *setupSDFile(char *filename) { +void *setupSDFile(const char *filename) { SD_FILE_INFO *fileInfo = malloc(sizeof(SD_FILE_INFO)); int nameLen = strlen(filename); fileInfo->filename = calloc(1, nameLen + 1); @@ -57,6 +57,22 @@ void tearDownSDFile(void *file) { free(file); } +int8_t FILE_REMOVE(void *file) { + if (file == NULL) return 1; + SD_FILE_INFO *fileInfo = (SD_FILE_INFO *)file; + + if (fileInfo->sdFile != NULL) { + sd_fclose(fileInfo->sdFile); + fileInfo->sdFile = NULL; + } + + if (fileInfo->filename != NULL) { + int result = sd_remove(fileInfo->filename); + return (result == 0); + } + return 1; +} + int8_t FILE_READ(void *buffer, uint32_t pageNum, uint32_t pageSize, void *file) { SD_FILE_INFO *fileInfo = (SD_FILE_INFO *)file; sd_fseek(fileInfo->sdFile, pageSize * pageNum, SEEK_SET); @@ -65,31 +81,25 @@ int8_t FILE_READ(void *buffer, uint32_t pageNum, uint32_t pageSize, void *file) int8_t FILE_WRITE(void *buffer, uint32_t pageNum, uint32_t pageSize, void *file) { SD_FILE_INFO *fileInfo = (SD_FILE_INFO *)file; + if (fileInfo->sdFile == NULL) return 0; + size_t fileSize = sd_length(fileInfo->sdFile); - size_t requiredSize = pageNum * pageSize; - if (fileSize < pageNum * pageSize) { - int8_t seekSuccess = sd_fseek(fileInfo->sdFile, fileSize, SEEK_SET); - if (seekSuccess == -1) { - return -1; - } - size_t currentSize = fileSize; - uint32_t max = UINT32_MAX; - uint32_t writeSuccess = 0; - while (currentSize < requiredSize) { - writeSuccess = sd_fwrite(&max, sizeof(uint32_t), 1, fileInfo->sdFile); - if (writeSuccess == 0) - return -1; - currentSize += 4; + size_t requiredSize = (size_t)pageNum * pageSize; + + if (fileSize < requiredSize) { + sd_fseek(fileInfo->sdFile, 0, SEEK_END); + uint8_t zero = 0; + while (sd_length(fileInfo->sdFile) < requiredSize) { + if (sd_fwrite(&zero, 1, 1, fileInfo->sdFile) != 1) return 0; } } - int8_t seekSuccess = sd_fseek(fileInfo->sdFile, pageNum * pageSize, SEEK_SET); - if (seekSuccess == -1) { - return -1; + + if (sd_fseek(fileInfo->sdFile, requiredSize, SEEK_SET) != 0) return 0; + + if (sd_fwrite(buffer, pageSize, 1, fileInfo->sdFile) == 1) { + return 1; } - int8_t writeSuccess = sd_fwrite(buffer, pageSize, 1, fileInfo->sdFile) == pageSize; - if (seekSuccess == -1) - return 0; - return 1; + return 0; } int8_t FILE_ERASE(uint32_t startPage, uint32_t endPage, uint32_t pageSize, void *file) { @@ -103,6 +113,16 @@ int8_t FILE_CLOSE(void *file) { return 1; } +int8_t FILE_READ_REL(void *buffer, uint32_t size, uint32_t n, void *file) { + SD_FILE_INFO *fileInfo = (SD_FILE_INFO *)file; + return sd_fread(buffer, size, n, fileInfo->sdFile); +} + +int8_t FILE_WRITE_REL(void *buffer, uint32_t size, uint32_t n, void *file) { + SD_FILE_INFO *fileInfo = (SD_FILE_INFO *)file; + return sd_fwrite(buffer, size, n, fileInfo->sdFile); +} + int8_t FILE_FLUSH(void *file) { SD_FILE_INFO *fileInfo = (SD_FILE_INFO *)file; return sd_fflush(fileInfo->sdFile) == 0; @@ -126,6 +146,40 @@ int8_t FILE_OPEN(void *file, uint8_t mode) { } } +int32_t FILE_TELL(void *file) { + SD_FILE_INFO *fileInfo = (SD_FILE_INFO *)file; + if (fileInfo == NULL || fileInfo->sdFile == NULL) { + return -1; + } + return (int32_t)sd_ftell(fileInfo->sdFile); +} + +char *sdFat_tempFilePath(void) { + char tempPathBuffer[32]; + snprintf(tempPathBuffer, sizeof(tempPathBuffer), "TMP%lu.DAT", random()); + + char *out = malloc(strlen(tempPathBuffer) + 1); + if (out) { + strcpy(out, tempPathBuffer); + } + return out; +} + +int8_t FILE_SEEK(uint32_t n, void *file) { + SD_FILE_INFO *fileInfo = (SD_FILE_INFO *)file; + return sd_fseek(fileInfo->sdFile, n, SEEK_SET); +} + +int8_t FILE_ERROR(void *file) { + if (file == NULL) return 1; + SD_FILE_INFO *fileInfo = (SD_FILE_INFO *)file; + + if (sd_ferror(fileInfo->sdFile)) { + return 1; + } + return 0; +} + embedDBFileInterface *getSDInterface() { embedDBFileInterface *fileInterface = malloc(sizeof(embedDBFileInterface)); fileInterface->close = FILE_CLOSE; @@ -133,6 +187,15 @@ embedDBFileInterface *getSDInterface() { fileInterface->write = FILE_WRITE; fileInterface->erase = FILE_ERASE; fileInterface->open = FILE_OPEN; + fileInterface->seek = FILE_SEEK; fileInterface->flush = FILE_FLUSH; + fileInterface->error = FILE_ERROR; + fileInterface->readRel = FILE_READ_REL; + fileInterface->writeRel = FILE_WRITE_REL; + fileInterface->tell = FILE_TELL; + fileInterface->setup = setupSDFile; + fileInterface->teardown = tearDownSDFile; + fileInterface->removeFile = FILE_REMOVE; + fileInterface->tempFilePath = sdFat_tempFilePath; return fileInterface; } diff --git a/lib/SD-File-Interface/SDFileInterface.h b/lib/SD-File-Interface/SDFileInterface.h index 9267de42..56f467ec 100644 --- a/lib/SD-File-Interface/SDFileInterface.h +++ b/lib/SD-File-Interface/SDFileInterface.h @@ -52,7 +52,7 @@ extern "C" { #include "sdcard_c_iface.h" embedDBFileInterface *getSDInterface(); -void *setupSDFile(char *filename); +void *setupSDFile(const char *filename); void tearDownSDFile(void *file); #ifdef __cplusplus diff --git a/lib/SD-Wrapper/sdcard_c_iface.cpp b/lib/SD-Wrapper/sdcard_c_iface.cpp index 2f72f1e7..7217d20b 100644 --- a/lib/SD-Wrapper/sdcard_c_iface.cpp +++ b/lib/SD-Wrapper/sdcard_c_iface.cpp @@ -104,7 +104,7 @@ SD_FILE *sd_fopen(const char *filename, const char *mode) { size_t sd_fread(void *ptr, size_t size, size_t nmemb, SD_FILE *stream) { /* read is the size of bytes * num of size-bytes */ - int16_t num_bytes = stream->f.read((char *)ptr, size * nmemb); + int32_t num_bytes = stream->f.read((char *)ptr, size * nmemb); if (num_bytes < 0) return 0; @@ -112,24 +112,44 @@ size_t sd_fread(void *ptr, size_t size, size_t nmemb, SD_FILE *stream) { } int sd_fseek(SD_FILE *stream, unsigned long int offset, int whence) { - if (NULL == stream) - return -1; + if (NULL == stream) return -1; - bool result = stream->f.seek(offset); - if (!result) - return -1; - return 0; + unsigned long absolute_pos = offset; + if (whence == SEEK_CUR) { + absolute_pos = stream->f.position() + offset; + } else if (whence == SEEK_END) { + absolute_pos = stream->f.size() - offset; + } + return stream->f.seek(absolute_pos) ? 0 : -1; } size_t sd_fwrite(void *ptr, size_t size, size_t nmemb, SD_FILE *stream) { size_t total_count = size * nmemb; size_t bytes_written = stream->f.write(ptr, total_count); - if (total_count != bytes_written) - return 0; - return total_count; + if (bytes_written == 0) return 0; + return bytes_written / size; } size_t sd_length(SD_FILE *stream) { return stream->f.size(); } + +int sd_remove(const char *filename) { + if (sdcard->remove(filename)) { + return 0; + } + return -1; +} + +long sd_ftell(SD_FILE *stream) { + if (stream == NULL) { + return -1; + } + return (long)stream->f.position(); +} + +int sd_ferror(SD_FILE *stream) { + if (stream == NULL) return 1; + return stream->f.getWriteError() ? 1 : 0; +} diff --git a/lib/SD-Wrapper/sdcard_c_iface.h b/lib/SD-Wrapper/sdcard_c_iface.h index 2dc785ea..80059533 100644 --- a/lib/SD-Wrapper/sdcard_c_iface.h +++ b/lib/SD-Wrapper/sdcard_c_iface.h @@ -168,6 +168,22 @@ sd_fwrite( */ size_t sd_length(SD_FILE *stream); +/** +@brief Remove (delete) a file from the SD card. +@param filename The name of the file to delete. +@returns 0 on success, -1 on failure. +*/ +int sd_remove(const char *filename); + +/** +@brief Find location of current file position. +@param stream A pointer to a C file struct type associated with an SD file object. +@returns 0 on success, -1 on failure. +*/ +long sd_ftell(SD_FILE *stream); + +int sd_ferror(SD_FILE *stream); + void init_sdcard(void *sd); #if defined(__cplusplus) diff --git a/makefile b/makefile index 0840eae4..d8a3674c 100644 --- a/makefile +++ b/makefile @@ -54,7 +54,7 @@ DEV_TEST_OBJECTS = $(EMBEDDB_OBJECTS) $(QUERY_OBJECTS) $(EMBEDDB_FILE_INTERFACE) TEST_FLAGS = -I. -I$(PATHU) -I $(PATHS) -I$(PATH_UTILITY) -I$(PATH_FILE_INTERFACE) -D TEST -EXAMPLE_FLAGS = -I. -I$(PATHS) -I$(PATH_UTILITY) -I$(PATH_FILE_INTERFACE) -I$(PATH_DISTRIBUTION) -DPRINT_ERRORS +EXAMPLE_FLAGS = -I. -I$(PATHS) -I$(PATH_UTILITY) -I$(PATH_FILE_INTERFACE) -I$(PATH_DISTRIBUTION) TEST_DIST_FLAGS = -I. -I$(PATHS) -I$(PATHU) -I$(PATH_FILE_INTERFACE) -I$(PATH_DISTRIBUTION) -I$(PATH_UTILITY) -DDIST -D TEST override CFLAGS += $(if $(filter test-dist,$(MAKECMDGOALS)), $(TEST_DIST_FLAGS), $(if $(filter test,$(MAKECMDGOALS)),$(TEST_FLAGS),$(EXAMPLE_FLAGS)) ) diff --git a/platformio.ini b/platformio.ini index e685579d..44e7ffd7 100644 --- a/platformio.ini +++ b/platformio.ini @@ -20,9 +20,9 @@ build_src_filter = lib_ignore = Dataflash, Dataflash-File-Interface, Dataflash-Wrapper, Distribution, Due, Mega, Memboard, SD-File-Interface, SD-Test, SD-Wrapper, SdFat, Serial-Wrapper, Unity-Desktop build_flags = -lm - -DPRINT_ERRORS extra_scripts = pre:scripts/create_build_folder.py lib_deps = +build_type = debug [env:desktop-dist] diff --git a/src/benchmarks/sortBenchmark.h b/src/benchmarks/sortBenchmark.h index 33a0ca49..174c4cd7 100644 --- a/src/benchmarks/sortBenchmark.h +++ b/src/benchmarks/sortBenchmark.h @@ -163,7 +163,7 @@ void sort_order_last(int32_t numValues, embedDBState* stateUWA, embedDBSchema* b embedDBOperator* projColsOrderBy = createProjectionOperator(scanOpOrderBy, 2, projColsOB); embedDBOperator* orderByOp = createOrderByOperator(stateUWA, projColsOrderBy, 1, numValues, merge_sort_int32_comparator); orderByOp->init(orderByOp); - int32_t* recordBuffer = orderByOp->recordBuffer; + int32_t* recordBuffer = (int32_t*)orderByOp->recordBuffer; for (uint32_t i = 0; i < 10; i++) { if (!exec(orderByOp)) { @@ -188,7 +188,7 @@ void sort_order_first(int32_t numValues, embedDBState* stateUWA, embedDBSchema* uint8_t projColsOB[] = {0, 1}; embedDBOperator* projColsOrderBy = createProjectionOperator(orderByOp, 2, projColsOB); projColsOrderBy->init(projColsOrderBy); - int32_t* recordBuffer = projColsOrderBy->recordBuffer; + int32_t* recordBuffer = (int32_t*)projColsOrderBy->recordBuffer; for (uint32_t i = 0; i < 10; i++) { if (!exec(projColsOrderBy)) { diff --git a/src/embedDB/embedDB.c b/src/embedDB/embedDB.c index 6ce7c6c0..8a4ffec6 100644 --- a/src/embedDB/embedDB.c +++ b/src/embedDB/embedDB.c @@ -61,7 +61,7 @@ int8_t embedDBInitVarDataFromFile(embedDBState *state); int8_t shiftRecordLevelConsistencyBlocks(embedDBState *state); void embedDBInitSplineFromFile(embedDBState *state); int32_t getMaxError(embedDBState *state, void *buffer); -void updateMaxiumError(embedDBState *state, void *buffer); +void updateMaximumError(embedDBState *state, void *buffer); int8_t embedDBSetupVarDataStream(embedDBState *state, void *key, embedDBVarDataStream **varData, id_t recordNumber); uint32_t cleanSpline(embedDBState *state, uint32_t minPageNumber); void readToWriteBuf(embedDBState *state); @@ -212,7 +212,7 @@ int8_t embedDBInit(embedDBState *state, size_t indexMaxError) { return -1; } - /* Initalize the spline structure if being used */ + /* Initialize the spline structure if being used */ if (!EMBEDDB_USING_BINARY_SEARCH(state->parameters)) { if (state->numSplinePoints < 4) { #ifdef PRINT_ERRORS @@ -341,7 +341,7 @@ int8_t embedDBInitDataFromFile(embedDBState *state) { hasData = true; maxLogicalPageId = logicalPageId; physicalPageId++; - updateMaxiumError(state, buffer); + updateMaximumError(state, buffer); count++; i = 2; } else { @@ -362,7 +362,7 @@ int8_t embedDBInitDataFromFile(embedDBState *state) { if (validData && logicalPageId == maxLogicalPageId + 1) { maxLogicalPageId = logicalPageId; physicalPageId++; - updateMaxiumError(state, buffer); + updateMaximumError(state, buffer); moreToRead = !(readPage(state, physicalPageId)); count++; } else { @@ -385,7 +385,7 @@ int8_t embedDBInitDataFromFile(embedDBState *state) { physicalPageId = (physicalPageId + pagesToBlockBoundary) % state->numDataPages; moreToRead = !(readPage(state, physicalPageId)); - /* there should have been more to read becuase the file should not be empty at this point if it was not empty at the previous block */ + /* there should have been more to read because the file should not be empty at this point if it was not empty at the previous block */ if (!moreToRead) { return -1; } @@ -442,7 +442,7 @@ int8_t embedDBInitDataFromFileWithRecordLevelConsistency(embedDBState *state) { hasPermanentData = true; maxLogicalPageId = logicalPageId; physicalPageId++; - updateMaxiumError(state, buffer); + updateMaximumError(state, buffer); count++; i = 4; } else { @@ -460,7 +460,7 @@ int8_t embedDBInitDataFromFileWithRecordLevelConsistency(embedDBState *state) { if (validData && logicalPageId == maxLogicalPageId + 1) { maxLogicalPageId = logicalPageId; physicalPageId++; - updateMaxiumError(state, buffer); + updateMaximumError(state, buffer); moreToRead = !(readPage(state, physicalPageId)); count++; } else { @@ -494,7 +494,7 @@ int8_t embedDBInitDataFromFileWithRecordLevelConsistency(embedDBState *state) { /* record-level consistency recovery algorithm */ uint32_t numPagesRead = 0; uint32_t numPagesToRead = blockSize * 2; - uint32_t rlcMaxLogicialPageNumber = UINT32_MAX; + uint32_t rlcMaxLogicalPageNumber = UINT32_MAX; uint32_t rlcMaxRecordCount = UINT32_MAX; uint32_t rlcMaxPage = UINT32_MAX; moreToRead = !(readPage(state, physicalPageId)); @@ -507,7 +507,7 @@ int8_t embedDBInitDataFromFileWithRecordLevelConsistency(embedDBState *state) { uint32_t numRecords = EMBEDDB_GET_COUNT(buffer); if (rlcMaxRecordCount == UINT32_MAX || numRecords > rlcMaxRecordCount) { rlcMaxRecordCount = numRecords; - rlcMaxLogicialPageNumber = logicalPageId; + rlcMaxLogicalPageNumber = logicalPageId; rlcMaxPage = numPagesRead; } } @@ -516,11 +516,11 @@ int8_t embedDBInitDataFromFileWithRecordLevelConsistency(embedDBState *state) { numPagesRead++; } - /* need to find larged record-level consistency page to place back into the buffer and either one or both of the record-level consistency pages */ + /* need to find large record-level consistency page to place back into the buffer and either one or both of the record-level consistency pages */ uint32_t eraseStartingPage = 0; uint32_t eraseEndingPage = 0; uint32_t numBlocksToErase = 0; - if (rlcMaxLogicialPageNumber == UINT32_MAX) { + if (rlcMaxLogicalPageNumber == UINT32_MAX) { eraseStartingPage = state->rlcPhysicalStartingPage % state->numDataPages; numBlocksToErase = 2; } else { @@ -657,7 +657,7 @@ int8_t embedDBInitIndex(embedDBState *state) { int8_t embedDBInitIndexFromFile(embedDBState *state) { id_t logicalIndexPageId = 0; - id_t maxLogicaIndexPageId = 0; + id_t maxLogicalIndexPageId = 0; id_t physicalIndexPageId = 0; /* This will become zero if there is no more to read */ @@ -669,13 +669,13 @@ int8_t embedDBInitIndexFromFile(embedDBState *state) { while (moreToRead && count < state->numIndexPages) { memcpy(&logicalIndexPageId, buffer, sizeof(id_t)); - if (count == 0 || logicalIndexPageId == maxLogicaIndexPageId + 1) { - maxLogicaIndexPageId = logicalIndexPageId; + if (count == 0 || logicalIndexPageId == maxLogicalIndexPageId + 1) { + maxLogicalIndexPageId = logicalIndexPageId; physicalIndexPageId++; moreToRead = !(readIndexPage(state, physicalIndexPageId)); count++; } else { - haveWrappedInMemory = logicalIndexPageId == maxLogicaIndexPageId - state->numIndexPages + 1; + haveWrappedInMemory = logicalIndexPageId == maxLogicalIndexPageId - state->numIndexPages + 1; break; } } @@ -683,20 +683,20 @@ int8_t embedDBInitIndexFromFile(embedDBState *state) { if (count == 0) return 0; - state->nextIdxPageId = maxLogicaIndexPageId + 1; + state->nextIdxPageId = maxLogicalIndexPageId + 1; id_t physicalPageIDOfSmallestData = 0; if (haveWrappedInMemory) { physicalPageIDOfSmallestData = logicalIndexPageId % state->numIndexPages; } readIndexPage(state, physicalPageIDOfSmallestData); memcpy(&(state->minIndexPageId), buffer, sizeof(id_t)); - state->numAvailIndexPages = state->numIndexPages + state->minIndexPageId - maxLogicaIndexPageId - 1; + state->numAvailIndexPages = state->numIndexPages + state->minIndexPageId - maxLogicalIndexPageId - 1; return 0; } int8_t embedDBInitVarData(embedDBState *state) { - // Initialize variable data outpt buffer + // Initialize variable data output buffer initBufferPage(state, EMBEDDB_VAR_WRITE_BUFFER(state->parameters)); state->variableDataHeaderSize = state->keySize + sizeof(id_t); @@ -797,7 +797,7 @@ int8_t embedDBInitVarDataFromFile(embedDBState *state) { physicalVariablePageId = (physicalVariablePageId + pagesToBlockBoundary) % state->numVarPages; moreToRead = !(readVariablePage(state, physicalVariablePageId)); - /* there should have been more to read becuase the file should not be empty at this point if it was not empty at the previous block */ + /* there should have been more to read because the file should not be empty at this point if it was not empty at the previous block */ if (!moreToRead) { return -1; } @@ -1055,7 +1055,7 @@ int8_t embedDBPut(embedDBState *state, void *key, void *data) { memcpy((void *)((int8_t *)buf + EMBEDDB_IDX_HEADER_SIZE + state->bitmapSize * idxcount), bm, state->bitmapSize); } - updateMaxiumError(state, state->buffer); + updateMaximumError(state, state->buffer); count = 0; initBufferPage(state, 0); @@ -1139,7 +1139,7 @@ int8_t shiftRecordLevelConsistencyBlocks(embedDBState *state) { uint32_t eraseStartingPage = state->rlcPhysicalStartingPage; uint32_t eraseEndingPage = 0; - /* if we have wraped, we need to erase an additional block as the block we are shifting into is not empty */ + /* if we have wrapped, we need to erase an additional block as the block we are shifting into is not empty */ bool haveWrapped = (state->minDataPageId % state->numDataPages) == ((state->rlcPhysicalStartingPage + numRecordLevelConsistencyPages) % state->numDataPages); uint32_t numBlocksToErase = haveWrapped ? 2 : 3; @@ -1175,7 +1175,7 @@ int8_t shiftRecordLevelConsistencyBlocks(embedDBState *state) { return 0; } -void updateMaxiumError(embedDBState *state, void *buffer) { +void updateMaximumError(embedDBState *state, void *buffer) { // Calculate error within the page int32_t maxError = getMaxError(state, buffer); if (state->maxError < maxError) { @@ -1637,7 +1637,7 @@ void embedDBCloseIterator(embedDBIterator *it) { /** * @brief Flushes output buffer. * @param state algorithm state structure - * @returns 0 if successul and a non-zero value otherwise + * @returns 0 if successful and a non-zero value otherwise */ int8_t embedDBFlushVar(embedDBState *state) { /* Check if we actually have any variable data in the buffer */ @@ -1667,7 +1667,7 @@ int8_t embedDBFlushVar(embedDBState *state) { /** * @brief Flushes output buffer. * @param state algorithm state structure - * @returns 0 if successul and a non-zero value otherwise + * @returns 0 if successful and a non-zero value otherwise */ int8_t embedDBFlush(embedDBState *state) { // As the first buffer is the data write buffer, no address change is required @@ -1859,7 +1859,7 @@ int8_t embedDBNextVar(embedDBState *state, embedDBIterator *it, void *key, void * @param state embedDB algorithm state structure * @param key Key for the record * @param varData Return variable for variable data as a embedDBVarDataStream (Unallocated). Returns NULL if no variable data. **Be sure to free the stream after you are done with it** - * @return Returns 0 if sucessfull or no variable data for the record, 1 if the records variable data was overwritten, 2 if the page failed to read, and 3 if the memorey failed to allocate. + * @return Returns 0 if successful or no variable data for the record, 1 if the records variable data was overwritten, 2 if the page failed to read, and 3 if the memory failed to allocate. */ int8_t embedDBSetupVarDataStream(embedDBState *state, void *key, embedDBVarDataStream **varData, id_t recordNumber) { void *dataBuf = (int8_t *)state->buffer + state->pageSize * EMBEDDB_DATA_READ_BUFFER; @@ -2099,7 +2099,7 @@ int8_t writeTemporaryPage(embedDBState *state, void *buffer) { /** * @brief Calculates the number of spline points not in use by embedDB and deletes them * @param state embedDB algorithm state structure - * @param key The minimim key embedDB still needs points for + * @param key The minimum key embedDB still needs points for * @return Returns the number of points deleted */ uint32_t cleanSpline(embedDBState *state, uint32_t minPageNumber) { @@ -2179,7 +2179,7 @@ id_t writeVariablePage(embedDBState *state, void *buffer) { return -1; } - // Make sure the address being witten to wraps around + // Make sure the address being written to wraps around id_t physicalPageId = state->nextVarPageId % state->numVarPages; // Erase data if needed diff --git a/src/embedDB/embedDB.h b/src/embedDB/embedDB.h index 68964504..7b97b80c 100644 --- a/src/embedDB/embedDB.h +++ b/src/embedDB/embedDB.h @@ -176,7 +176,7 @@ typedef struct { /** * @brief Erases a span of paes from file - * @param startPage The first page to earse + * @param startPage The first page to erase * @param pageSize The page to erase up to (exclusive) * @param file The file data that was stored in embedDBState->dataFile etc * @return 1 for success and 0 for failure @@ -232,6 +232,24 @@ typedef struct { */ int32_t (*tell)(void *file); + /** + * @brief Pointer to external function for file setup + */ + void *(*setup)(const char *filename); + /** + * @brief Pointer to external function for file teardown + */ + void (*teardown)(void *file); + /** + * @brief Pointer to platform specific tmp file path + */ + char *(*tempFilePath)(void); + + /** + * @brief Pointer to file for deletion + */ + int8_t (*removeFile)(void *file); + } embedDBFileInterface; struct activeRule; @@ -255,7 +273,7 @@ typedef struct { id_t nextIdxPageId; /* Next logical page id for index. Page id is an incrementing value and may not always be same as physical page id. */ id_t nextVarPageId; /* Page number of next var page to be written */ uint32_t nextRLCPhysicalPageLocation; /* Physical page number for the location for the next record-level-consistency page */ - uint32_t rlcPhysicalStartingPage; /* Physical page number for the starting page of the record-level consistnecy pages */ + uint32_t rlcPhysicalStartingPage; /* Physical page number for the starting page of the record-level consistency pages */ id_t currentVarLoc; /* Current variable address offset to write at (bytes from beginning of file) */ void *buffer; /* Pre-allocated memory buffer for use by algorithm */ spline *spl; /* Spline model */ @@ -428,14 +446,14 @@ uint32_t embedDBVarDataStreamRead(embedDBState *state, embedDBVarDataStream *str /** * @brief Flushes output buffer. * @param state algorithm state structure - * @returns 0 if successul and a non-zero value otherwise + * @returns 0 if successful and a non-zero value otherwise */ int8_t embedDBFlush(embedDBState *state); /** * @brief Flushes output buffer. * @param state algorithm state structure - * @returns 0 if successul and a non-zero value otherwise + * @returns 0 if successful and a non-zero value otherwise */ int8_t embedDBFlushVar(embedDBState *state); @@ -488,7 +506,7 @@ id_t writeIndexPage(embedDBState *state, void *buffer); id_t writeVariablePage(embedDBState *state, void *buffer); /** - * @brief Writes a temporary page when using record-levek-consistency to storage. + * @brief Writes a temporary page when using record-level-consistency to storage. * @param state embedDB algorithm state structure * @param pageNum Page number to read * @return Returns 0 for success and non-zero value for an error. diff --git a/src/embedDBExample.h b/src/embedDBExample.h index ea4a3a58..8da2ab57 100644 --- a/src/embedDBExample.h +++ b/src/embedDBExample.h @@ -2,7 +2,7 @@ /** * @file embedDBExample.h * @author EmbedDB Team (See Authors.md) - * @brief This file includes and example for insterting and retrieving sequential records for EmbeDB. + * @brief This file includes and example for inserting and retrieving sequential records for EmbedDB. * @copyright Copyright 2023 * EmbedDB Team * @par Redistribution and use in source and binary forms, with or without diff --git a/src/query-interface/activeRules.c b/src/query-interface/activeRules.c index e7c63064..8fb1ba16 100644 --- a/src/query-interface/activeRules.c +++ b/src/query-interface/activeRules.c @@ -71,7 +71,9 @@ void executeRules(embedDBState* state, void* key, void* data) { handleCustomQuery(state, state->rules[i], key, data); break; default: +#ifdef PRINT_ERRORS printf("ERROR: Unsupported rule type\n"); +#endif } } } @@ -150,7 +152,9 @@ embedDBOperator* createOperator(embedDBState* state, activeRule* rule, void*** a it->minKey = minKeyPtr; } } else { +#ifdef PRINT_ERRORS printf("ERROR: Unsupported key size\n"); +#endif return NULL; } @@ -174,7 +178,9 @@ embedDBOperator* createOperator(embedDBState* state, activeRule* rule, void*** a aggFunc = createMinAggregate(rule->colNum, rule->schema->columnSizes[rule->colNum]); break; default: +#ifdef PRINT_ERRORS printf("ERROR: Unsupported rule type\n"); +#endif } embedDBAggregateFunc* aggFuncs = (embedDBAggregateFunc*)malloc(1 * sizeof(embedDBAggregateFunc)); @@ -218,7 +224,9 @@ void executeComparison(activeRule* rule, void* aggregateValue, Comparator compar if (comparisonResult != 0) rule->callback(aggregateValue, data, rule->context); break; default: +#ifdef PRINT_ERRORS printf("ERROR: Unsupported operation\n"); +#endif } } @@ -237,7 +245,9 @@ void handleGetMinMax(embedDBState* state, activeRule* rule, void* key, void* dat int64_t minmax = GetMinMax64(state, rule, key); executeComparison(rule, &minmax, int64Comparator, data); } else { +#ifdef PRINT_ERRORS printf("ERROR: Unsupported column size\n"); +#endif } } @@ -257,6 +267,8 @@ void handleCustomQuery(embedDBState* state, activeRule* rule, void* key, void* d executeComparison(rule, result, doubleComparator, data); break; default: +#ifdef PRINT_ERRORS printf("ERROR: Unsupported return type\n"); +#endif } } diff --git a/src/query-interface/advancedQueries.c b/src/query-interface/advancedQueries.c index f0903708..0b085f05 100644 --- a/src/query-interface/advancedQueries.c +++ b/src/query-interface/advancedQueries.c @@ -536,6 +536,7 @@ void closeOrderBy(embedDBOperator* op) { * @param dbState The database state * @param input The operator that this operator can pull records from * @param colNum The column that is being sorted on + * @param limit The first values to be read and sorted - not like a true limit at the moment * @param compareFn The function being used to make comparisons between row data */ embedDBOperator* createOrderByOperator(embedDBState* dbState, embedDBOperator* input, int8_t colNum, int32_t limit, int8_t (*compareFn)(void* a, void* b)) { @@ -897,7 +898,7 @@ void closeKeyJoin(embedDBOperator* op) { } /** - * @brief Creates an operator for perfoming an equijoin on the keys (sorted and distinct) of two tables + * @brief Creates an operator for performing an equi-join on the keys (sorted and distinct) of two tables */ embedDBOperator* createKeyJoinOperator(embedDBOperator* input1, embedDBOperator* input2) { embedDBOperator* op = malloc(sizeof(embedDBOperator)); diff --git a/src/query-interface/sort/adaptive_sort.c b/src/query-interface/sort/adaptive_sort.c index ed7d6ad1..07f9e036 100644 --- a/src/query-interface/sort/adaptive_sort.c +++ b/src/query-interface/sort/adaptive_sort.c @@ -54,8 +54,15 @@ // #define DEBUG_OUTPUT 1 // #define DEBUG_READ 1 // #define DEBUG_HEAP 0 - +// #define ADAPTIVE_SORT_PRINT // #define ADAPTIVE_SORT_PRINT_FINISH +#if defined(DEBUG) || defined(DEBUG_OUTPUT) || defined(DEBUG_READ) || defined(DEBUG_HEAP) || defined(ADAPTIVE_SORT_PRINT) || defined(ADAPTIVE_SORT_PRINT_FINISH) +#include "debug_print.h" +#else +#ifndef debug_log +#define debug_log(...) ((void)0) +#endif +#endif /** * Prints the contents of the heap. Used for debugging. @@ -67,22 +74,22 @@ void print_heap(char* buffer, int32_t heap_start_offset, int heap_size, int list int j; for (aa = 0; aa < 1; aa++) { addr = buffer + heap_start_offset; - printf("heap: "); + debug_log("heap: "); for (j = 0; j < heap_size; j++) - printf(" %d", *(int32_t*)(addr - j * es->record_size)); - printf("| "); + debug_log(" %d", *(int32_t*)(addr - j * es->record_size + es->key_offset)); + debug_log("| "); } - printf(" "); + debug_log(" "); // Prints the list for (aa = 0; aa < 1; aa++) { addr = buffer + es->page_size; - printf("list: "); + debug_log("list: "); for (j = 0; j < list_size; j++) - printf(" %d", *(int32_t*)(addr + j * es->record_size)); - printf("| "); + debug_log(" %d", *(int32_t*)(addr + j * es->record_size + es->key_offset)); + debug_log("| "); } - printf("\n"); + debug_log("\n"); } /** @@ -141,11 +148,11 @@ int adaptive_sort( /* Note: Could be int8_t as larger than 255 is above cutoff for using MinSort. */ uint8_t numDistinctInRun = 0; /* Number of distinct values in current run */ - int optimistic = true; + int optimistic = false; if (optimistic) { // Do FLASH MinSort init first #ifdef DEBUG - printf("*Optimistic*\n"); + debug_log("*Optimistic*\n"); #endif MinSortState ms; @@ -161,18 +168,18 @@ int adaptive_sort( int32_t nobSortCost = numPasses * (10 + writeToReadRatio) / 10; #ifdef DEBUG - printf("Adaptive calculation.\n"); - printf("NOB sort cost. # runs: %d", numSublist); - printf(" # passes: %d cost: %d\n", numPasses, nobSortCost); - printf("MinSort cost. Num sublists: %d ", numSublist); - printf(" Avg. distinct/sublist: %d\n", avgDistinct / 10); + debug_log("Adaptive calculation.\n"); + debug_log("NOB sort cost. # runs: %d", numSublist); + debug_log(" # passes: %d cost: %d\n", numPasses, nobSortCost); + debug_log("MinSort cost. Num sublists: %d ", numSublist); + debug_log(" Avg. distinct/sublist: %d\n", avgDistinct / 10); #endif if (avgDistinct < nobSortCost) // if (true) { #ifdef DEBUG - printf("Performing MinSort Optimistic\n"); + debug_log("Performing MinSort Optimistic\n"); #endif int16_t count = 0; @@ -190,7 +197,10 @@ int adaptive_sort( if (count == values_per_page) { *((int32_t*)outputBuffer) = blockIndex; /* Block index */ *((int16_t*)(outputBuffer + BLOCK_COUNT_OFFSET)) = count; /* Block record count */ - +#ifdef DEBUG + debug_log("Writing page adaptive sort: blockIndex=%d, count=%d, filePosition=%ld\n", + blockIndex, count, lastWritePos / PAGE_SIZE); +#endif // Write block to the ouput file if (0 == ((file_iterator_state_t*)iteratorState)->fileInterface->write(outputBuffer, blockIndex, es->page_size, outputFile)) { return 9; // Return error code if writing to the output file fails @@ -201,10 +211,11 @@ int adaptive_sort( metric->num_writes++; #ifdef DEBUG_OUTPUT - printf("Wrote output block. Block index: %d\n", blockIndex); + debug_log("Wrote output block. Block index: %d\n", blockIndex); for (int k = 0; k < values_per_page; k++) { - printf("%3d: 1 Output Record: %d\n", k, outputBuffer + es->headerSize + k * es->record_size + es->key_offset); + debug_log("%3d: 1 Output Record: %d\n", k, outputBuffer + es->headerSize + k * es->record_size + es->key_offset); } + #endif } } @@ -213,7 +224,10 @@ int adaptive_sort( if (count > 0) { *((int32_t*)outputBuffer) = blockIndex; /* Block index */ *((int16_t*)(outputBuffer + BLOCK_COUNT_OFFSET)) = count; /* Block record count */ - +#ifdef DEBUG + debug_log("Writing last page adaptive: blockIndex=%d, count=%d, filePosition=%ld\n", + blockIndex, count, lastWritePos / PAGE_SIZE); +#endif if (0 == ((file_iterator_state_t*)iteratorState)->fileInterface->write(outputBuffer, blockIndex, es->page_size, outputFile)) { return 9; // Return error code if writing to the output file fails } @@ -223,10 +237,11 @@ int adaptive_sort( metric->num_writes++; #ifdef DEBUG_OUTPUT - printf("Wrote output block. Block index: %d\n", blockIndex); + debug_log("Wrote output block. Block index: %d\n", blockIndex); for (int k = 0; k < values_per_page; k++) { - printf("%3d: 2 Output Record: %d\n", k, *(uint32_t*)(outputBuffer + es->headerSize + k * es->record_size + es->key_offset)); + debug_log("%3d: 2 Output Record: %d\n", k, *(uint32_t*)(outputBuffer + es->headerSize + k * es->record_size + es->key_offset)); } + #endif } @@ -250,41 +265,108 @@ int adaptive_sort( int32_t heapStartOffset = bufferSizeInBlocks * es->page_size - es->record_size; int32_t listSize = 0; - void* lastOutputKey = malloc(es->record_size); /* Pointer to memory storing value of last key output */ + void* lastOutputKey = malloc(es->record_size); int8_t haveOutputKey = 0; int32_t sublistSize = 0; /* size in blocks */ int32_t outputCount = 0; /* number of values in output block */ int32_t recordsLeft = 0; /* number of records in buffer */ void *heapVal, *inputVal; - // Fill all blocks other than the first with tuples - addr = buffer + es->page_size; - for (i = 0; i < (bufferSizeInBlocks - 1) * tuplesPerPage; i++) { + // Calculate safe initial heap size + // Need space for: heap (grows down) + list (grows up) + input page (block 0) + // Available buffer space = blocks 1 to (bufferSizeInBlocks-1) + // Reserve last block's worth of space for the list to grow safely + int32_t maxRecordsInBuffer = (bufferSizeInBlocks - 1) * tuplesPerPage; + +#ifdef DEBUG + debug_log("DEBUG: Buffer setup:\n"); + debug_log(" bufferSizeInBlocks=%d, tuplesPerPage=%d\n", bufferSizeInBlocks, tuplesPerPage); + debug_log(" maxRecordsInBuffer=%d", maxRecordsInBuffer); + debug_log(" heapStartOffset=%d, page_size=%d, record_size=%d\n", + heapStartOffset, es->page_size, es->record_size); + debug_log(" Buffer layout: [I/O Page 0][Data Pages 1-%d][Heap grows down from top]\n", bufferSizeInBlocks - 1); +#endif + + ((file_iterator_state_t*)iteratorState)->fileInterface->seek(0, outputFile); + lastWritePos = 0; + addr = buffer + es->page_size; // Start after I/O block + for (i = 0; i < maxRecordsInBuffer; i++) { status = !iterator(sortData, addr); if (status == 0) - break; + break; // No more records available recordsRead++; addr += es->record_size; } +#ifdef DEBUG + debug_log("DEBUG: Initial load completed:\n"); + debug_log(" recordsRead=%d (requested %d)\n", recordsRead, maxRecordsInBuffer); + debug_log(" Data spans from offset %d to %d\n", + (int)(es->page_size), (int)(es->page_size + recordsRead * es->record_size)); +#endif + recordsLeft = recordsRead; // Update metrics - metric->num_reads += bufferSizeInBlocks - 1; + // Note: num_reads tracks page reads, but we read data during initial fill via iterator + // The iterator handles its own page reads, so don't double-count here metric->num_runs++; - // Build heap from tuples in filled blocks + // Build heap from tuples in reverse order + // Start from the end of loaded data and work backwards + addr = buffer + es->page_size + recordsRead * es->record_size; for (i = 0; i < recordsRead; i++) { addr -= es->record_size; + memcpy(tupleBuffer, addr, es->record_size); metric->num_memcpys++; shiftUp_rev(buffer + heapStartOffset, tupleBuffer, heapSize, es, metric); heapSize++; } +#ifdef DEBUG + debug_log("DEBUG: Heap construction completed:\n"); + debug_log(" heapSize=%d, listSize=%d\n", heapSize, listSize); + + // Memory layout verification + int32_t heapBottom = heapStartOffset - (heapSize - 1) * es->record_size; + int32_t listTop = es->page_size + listSize * es->record_size; + int32_t gapSize = heapBottom - listTop; + + debug_log("DEBUG: Memory layout:\n"); + debug_log(" I/O block: offset 0 - %d\n", es->page_size); + debug_log(" List top: offset %d\n", listTop); + debug_log(" Gap: %d bytes (%d records)\n", gapSize, gapSize / es->record_size); + debug_log(" Heap bottom: offset %d\n", heapBottom); + debug_log(" Heap top: offset %d\n", heapStartOffset); + debug_log(" Buffer end: offset %d\n", bufferSizeInBlocks * es->page_size); + + if (heapBottom <= listTop) { + debug_log("ERROR: Heap and list overlap! This will cause corruption.\n"); + free(lastOutputKey); + return 9; + } +#endif + +#ifdef DEBUG_HEAP + print_heap(buffer, heapStartOffset, heapSize, listSize, es); +#endif + +#ifdef DEBUG + debug_log("DEBUG: About to enter main loop\n"); + debug_log(" heapSize=%d, listSize=%d, recordsLeft=%d\n", heapSize, listSize, recordsLeft); + debug_log(" Iterator position: recordsRead=%d, totalRecords=%d\n", + ((file_iterator_state_t*)iteratorState)->recordsRead, + ((file_iterator_state_t*)iteratorState)->totalRecords); +#endif + // Read each block and sort while (recordsLeft != 0) { recordsRead = 0; +#ifdef DEBUG + debug_log("\n=== Main loop iteration: sublistSize=%d, outputCount=%d, heapSize=%d, listSize=%d, recordsLeft=%d ===\n", + sublistSize, outputCount, heapSize, listSize, recordsLeft); +#endif // Read in page addr = buffer + es->headerSize; @@ -301,6 +383,16 @@ int adaptive_sort( print_heap(buffer, heapStartOffset, heapSize, listSize, es); #endif +#ifdef DEBUG + debug_log("DEBUG: Main loop iteration - read %d records\n", recordsRead); + if (recordsRead > 0) { + debug_log(" First record value: %d, Last record value: %d\n", + *(int32_t*)(buffer + es->headerSize + es->key_offset), + *(int32_t*)(buffer + es->headerSize + (recordsRead - 1) * es->record_size + es->key_offset)); + } + debug_log(" heapSize before processing: %d, listSize: %d\n", heapSize, listSize); +#endif + if (recordsRead > 1) { // Sort page using in memory quick sort metric->num_reads += 1; @@ -316,14 +408,15 @@ int adaptive_sort( // If first value in heap is smaller than lastOutputValue then start new sublist, otherwise continue with previous one. heapVal = buffer + heapStartOffset; - if (lastOutputKey == NULL || es->compare_fcn(heapVal, lastOutputKey) < 0) { + if (lastOutputKey == NULL || es->compare_fcn(heapVal + es->key_offset, lastOutputKey + es->key_offset) < 0) { // Start new sublist numSublist++; // Track number of distinct values per sublist avgDistinct = avgDistinct + (numDistinctInRun - avgDistinct / 10) * 10 / numSublist; #ifdef DEBUG - printf("Number of distinct values in sublist: %d Running average: %d\n", numDistinctInRun, avgDistinct / 10); + debug_log("Number of distinct values in sublist: %d Running average: %d\n", numDistinctInRun, avgDistinct / 10); + #endif numDistinctInRun = 1; @@ -337,11 +430,53 @@ int adaptive_sort( // Swap output records into output buffer from heap if smaller than records currently there. (I/O block is id zero) for (i = 0; i < tuplesPerPage; i++) { + /* + * HEAP-EMPTY START NEW RUN TRANSITION + */ + if (heapSize == 0) { + if (listSize > 0) { + // Finish current run and start a new one + numSublist++; + metric->num_runs++; + + sublistSize = 0; + outputCount = 0; + haveOutputKey = 0; + +#ifdef DEBUG + debug_log("DEBUG: Heap empty → starting new run, promoting list (%d records)\n", + listSize); +#endif + + // Promote frozen list to heap + for (int32_t k = listSize - 1; k >= 0; k--) { + shiftUp_rev(buffer + heapStartOffset, + buffer + es->page_size + k * es->record_size, + heapSize, es, metric); + heapSize++; + } + listSize = 0; + + // Restart filling the output page for the new run + i = -1; + continue; + } else { + // No heap, no list → nothing left to output + break; + } + } +#ifdef DEBUG + if (i < 3 || i == tuplesPerPage - 1) { // Only log first 3 and last iteration + debug_log(" Inner loop i=%d: recordsRead=%d, outputCount=%d, recordsLeft=%d, heapSize=%d\n", + i, recordsRead, outputCount, recordsLeft, heapSize); + } +#endif // Check if we've read all records from the current page - if (recordsRead == 0) { + if (recordsRead == 0 || i >= recordsRead) { // Check if there are any records left - if (recordsLeft <= 0) + if (recordsLeft <= 0) { break; + } // Just copy over from heap memcpy(buffer + es->headerSize + i * es->record_size, buffer + heapStartOffset, es->record_size); /* Heap into input/output block */ @@ -368,7 +503,8 @@ int adaptive_sort( // Track number of distinct values per sublist avgDistinct = avgDistinct + (numDistinctInRun - avgDistinct / 10) * 10 / numSublist; #ifdef DEBUG - printf("Number of distinct values in sublist: %d Running average: %d\n", numDistinctInRun, avgDistinct / 10); + debug_log("Number of distinct values in sublist: %d Running average: %d\n", numDistinctInRun, avgDistinct / 10); + #endif numDistinctInRun = 1; @@ -455,15 +591,32 @@ int adaptive_sort( } // Add Page Headers +#ifdef DEBUG + debug_log("About to write block: sublistSize=%d, outputCount=%d, numSublist=%d\n", + sublistSize, outputCount, numSublist); + debug_log(" First 3 output values:"); + for (int dbg = 0; dbg < 3 && dbg < outputCount; dbg++) { + debug_log(" %d", *(int32_t*)(buffer + es->headerSize + dbg * es->record_size + es->key_offset)); + } + debug_log("\n"); +#endif + if (outputCount == 0) { + continue; // Skip to next iteration + } *((int32_t*)buffer) = sublistSize; - *((int16_t*)(buffer + BLOCK_COUNT_OFFSET)) = (int8_t)outputCount; + *((int16_t*)(buffer + BLOCK_COUNT_OFFSET)) = (int16_t)outputCount; memcpy(tupleBuffer, buffer + (outputCount - 1) * es->record_size + es->headerSize, es->key_size); memcpy(lastOutputKey, tupleBuffer, es->record_size); metric->num_memcpys += 2; // Store the last key output temporarily in tuple buffer as once write out then read new block it would be gone // Write the output block +#ifdef DEBUG + debug_log("Writing page adaptive writeRel: blockIndex=%d, count=%d, filePosition=%ld\n", + sublistSize, outputCount, lastWritePos / PAGE_SIZE); +#endif ((file_iterator_state_t*)iteratorState)->fileInterface->writeRel(buffer, PAGE_SIZE, 1, outputFile); + if (((file_iterator_state_t*)iteratorState)->fileInterface->error(outputFile)) { // File write error free(lastOutputKey); @@ -471,33 +624,66 @@ int adaptive_sort( } #ifdef DEBUG_OUTPUT - printf("Wrote block. Sublist: %d ", numSublist); - printf(" Idx: %d\n", sublistSize); - // printf("Offset: %lu\n", ftell(outputFile)-es->page_size); + debug_log("Wrote block. Sublist: %d ", numSublist); + debug_log(" Idx: %d\n", sublistSize); + // debug_log("Offset: %lu\n", ftell(outputFile) - es->page_size); for (int k = 0; k < tuplesPerPage; k++) { - printf("%3d: 3 Output Record: %d\n", k, *(uint32_t*)(buffer + es->headerSize + k * es->record_size + es->key_offset)); + debug_log("%3d: 3 Output Record: %d\n", k, *(uint32_t*)(buffer + es->headerSize + k * es->record_size + es->key_offset)); } + #endif metric->num_writes += 1; + lastWritePos += es->page_size; sublistSize++; outputCount = 0; +#ifdef DEBUG + if (recordsLeft == 0) { + debug_log("DEBUG: Exiting main loop - heapSize=%d, listSize=%d, outputCount=%d, sublistSize=%d\n", + heapSize, listSize, outputCount, sublistSize); + } +#endif } /* while records left */ - // free(lastOutputKey); numSublist = metric->num_runs; #ifdef ADAPTIVE_SORT_PRINT - printf("Gen time: %d\n", metric->genTime); + debug_log("Gen time: %d\n", metric->genTime); + #endif // Track number of distinct values per sublist avgDistinct = avgDistinct + (numDistinctInRun - avgDistinct / 10) * 10 / numSublist; #ifdef ADAPTIVE_SORT_PRINT - printf("Final number of distinct values in sublist: %d Average: %d\n", numDistinctInRun, avgDistinct); + debug_log("Final number of distinct values in sublist: %d Average: %d\n", numDistinctInRun, avgDistinct); + #endif numDistinctInRun = 0; } /* end pessmistic */ +#ifdef DEBUG + debug_log("\n=== REPLACEMENT SELECTION COMPLETE ===\n"); + debug_log("Number of sublists created: %d\n", numSublist); + debug_log("Output file size: %ld bytes (%ld blocks)\n", lastWritePos, lastWritePos / es->page_size); + debug_log("About to start merge phase...\n\n"); + + // Read and display what's in each block + for (int debugBlock = 0; debugBlock < lastWritePos / es->page_size; debugBlock++) { + ((file_iterator_state_t*)iteratorState)->fileInterface->seek(debugBlock * es->page_size, outputFile); + ((file_iterator_state_t*)iteratorState)->fileInterface->readRel(buffer, es->page_size, 1, outputFile); + + uint32_t blockIdx = *((uint32_t*)buffer); + uint16_t count = *((uint16_t*)(buffer + BLOCK_COUNT_OFFSET)); + + debug_log("Block %d: blockIdx=%u, count=%u, first 10 values:", debugBlock, blockIdx, count); + for (int v = 0; v < count && v < 10; v++) { + debug_log(" %d", *(int32_t*)(buffer + es->headerSize + v * es->record_size + es->key_offset)); + } + if (count > 10) debug_log(" ..."); + debug_log("\n"); + } + debug_log("=================================\n\n"); +#endif + // No merge phase necessary if (numSublist == 1) { ((file_iterator_state_t*)iteratorState)->fileInterface->flush(outputFile); @@ -523,11 +709,12 @@ int adaptive_sort( int32_t nobSortCost = numPasses * (10 + writeToReadRatio) / 10; #ifdef ADAPTIVE_SORT_PRINT - printf("Adaptive calculation.\n"); - printf("NOB sort cost. # runs: %d", numSublist); - printf(" # passes: %d cost: %d\n", numPasses, nobSortCost); - printf("MinSort cost. Num sublists: %d ", numSublist); - printf(" Avg. distinct/sublist: %d\n", avgDistinct / 10); + debug_log("Adaptive calculation.\n"); + debug_log("NOB sort cost. # runs: %d", numSublist); + debug_log(" # passes: %d cost: %d\n", numPasses, nobSortCost); + debug_log("MinSort cost. Num sublists: %d ", numSublist); + debug_log(" Avg. distinct/sublist: %d\n", avgDistinct / 10); + #endif // Make decision to use either no output buffer sort or MinSort @@ -540,20 +727,20 @@ int adaptive_sort( if (sublistVersionPossible) { // Use better performing version of minsort #ifdef ADAPTIVE_SORT_PRINT - printf("Performing MinSort with sorted sublists\n"); + debug_log("Performing MinSort with sorted sublists\n"); #endif ((file_iterator_state_t*)iteratorState)->file = outputFile; - *resultFilePtr = 0; - flash_minsort_sublist(iteratorState, tupleBuffer, outputFile, buffer, bufferSizeBytes, es, resultFilePtr, metric, compareFn, numSublist); *resultFilePtr = lastWritePos; + flash_minsort_sublist(iteratorState, tupleBuffer, outputFile, buffer, bufferSizeBytes, es, resultFilePtr, metric, compareFn, numSublist); + //*resultFilePtr = lastWritePos; } else { // Use normal version of minsort. Do not have enough space to index a value per sublist. Assumes data is not sorted in each region #ifdef ADAPTIVE_SORT_PRINT - printf("Performing MinSort\n"); + debug_log("Performing MinSort\n"); #endif ((file_iterator_state_t*)iteratorState)->file = outputFile; + *resultFilePtr = lastWritePos; flash_minsort(iteratorState, tupleBuffer, outputFile, buffer, bufferSizeBytes, es, resultFilePtr, metric, compareFn); - *resultFilePtr = 0; } } else { /* */ @@ -587,7 +774,6 @@ int adaptive_sort( int16_t space = 0; int16_t outputCursor; int8_t destBlk; - int32_t other = 0; // Verify all memory has been allocated successfully if (record2 == NULL) { @@ -603,7 +789,7 @@ int adaptive_sort( // if (numSublist >= 32 && numSublist <= 64)// && avgDistinct/10 < 32) // { // // Switch to MinSort to finish off - // printf("Finishing sort with MinSort with sorted sublists\n"); + // debug_log("Finishing sort with MinSort with sorted sublists\n"); // ((file_iterator_state_t*) iteratorState)->file = outputFile; // // *resultFilePtr = lastMergeStart; // // fflush(outputFile); @@ -621,7 +807,8 @@ int adaptive_sort( lastWritePos = 0; } #ifdef ADAPTIVE_SORT_PRINT - printf("Pass number: %u Comparisons: %lu MemCopies: %lu TransferIn: %lu TransferOut: %lu TransferOther: %lu Other: %lu\n", passNumber, metric->num_compar, metric->num_memcpys, numShiftIntoOutput, numShiftOutOutput, numShiftOtherBlock, other); + debug_log("Pass number: %u Comparisons: %lu MemCopies: %lu TransferIn: %lu TransferOut: %lu TransferOther: %lu\n", passNumber, metric->num_compar, metric->num_memcpys, numShiftIntoOutput, numShiftOutOutput, numShiftOtherBlock); + #endif passNumber++; @@ -698,7 +885,8 @@ int adaptive_sort( #ifdef DEBUG void* buffer0Rec = (void*)buffer + es->headerSize; void* currentRec = (void*)buffer + i * es->page_size + es->headerSize; - printf("Swapping in buffer 0. Current key: %d New key: %d\n", *(uint32_t*)(buffer0Rec + es->key_offset), *(uint32_t*)(currentRec + es->key_offset)); + debug_log("Swapping in buffer 0. Current key: %d New key: %d\n", *(uint32_t*)(buffer0Rec + es->key_offset), *(uint32_t*)(currentRec + es->key_offset)); + #endif // Perform swap sublsBlkPos[i] = sublsFilePtr[0]; /* Note: Using subls_blk_pos[i] as a temp variable during swap */ // TODO: Update swap to not be variable length @@ -739,8 +927,9 @@ int adaptive_sort( #ifdef DEBUG_READ void* firstRec = (void*)buffer + i * es->page_size + es->headerSize; void* lastRec = (void*)buffer + i * es->page_size + es->headerSize + (*((int16_t*)(buffer + i * es->page_size + BLOCK_COUNT_OFFSET)) - 1) * es->record_size; - printf("Read Sublist: %d Block: %d NumRec: %d First key: %d Last key: %d\n", i, (int32_t) * (buffer + i * es->page_size), - *((int16_t*)(buffer + i * es->page_size + BLOCK_COUNT_OFFSET)), *(uint32_t*)(firstRec + es->key_offset), *(uint32_t*)(lastRec + es->key_offset)); + debug_log("Read Sublist: %d Block: %d NumRec: %d First key: %d Last key: %d\n", i, (int32_t) * (buffer + i * es->page_size), + *((int16_t*)(buffer + i * es->page_size + BLOCK_COUNT_OFFSET)), *(uint32_t*)(firstRec + es->key_offset), *(uint32_t*)(lastRec + es->key_offset)); + #endif // Initialize record1 to start of each block and record2 to empty record1[i] = i * es->page_size + es->headerSize; @@ -811,21 +1000,22 @@ int adaptive_sort( #ifdef DEBUG void* buf = (void*)buffer + resultRecOffset; - printf("Smallest Record: %d From list: %d\n", *(uint32_t*)(buf + es->key_offset), resultBlock); - printf("List status: 0: (%d, %d) 1: (%d, %d) 2: (%d, %d) ResultList: %d\n", record1[0], record2[0], - record1[1], record2[1], record1[2], record2[2], resultBlock); + debug_log("Smallest Record: %d From list: %d\n", *(uint32_t*)(buf + es->key_offset), resultBlock); + debug_log("List status: 0: (%d, %d) 1: (%d, %d) 2: (%d, %d) ResultList: %d\n", record1[0], record2[0], + record1[1], record2[1], record1[2], record2[2], resultBlock); if (*(uint32_t*)(buf + es->key_offset) == 27391) { /* Output all block contents */ for (int l = 0; l < 2; l++) { - printf("Current block: %d # records: %d\n", l, tuplesPerPage); + debug_log("Current block: %d # records: %d\n", l, tuplesPerPage); for (int k = 0; k < tuplesPerPage; k++) { void* buf = (void*)(buffer + es->headerSize + k * es->record_size + l * es->page_size); - printf("%d: Record: %d Address: %p\n", k, buf + es->key_size, buf); + debug_log("%d: Record: %d Address: %p\n", k, buf + es->key_size, buf); } } - printf("HERE\n"); + debug_log("HERE\n"); } + #endif /* Add smallest tuple to output position in buffer (may already be in output buffer) */ @@ -846,7 +1036,8 @@ int adaptive_sort( numShiftOutOutput++; #ifdef DEBUG void* buf = (void*)(buffer + record1[OUTPUT_BLOCK_ID]); - printf("Output record moved to list %d Key: %d\n", resultBlock, *(uint32_t*)(buf + es->key_size)); + debug_log("Output record moved to list %d Key: %d\n", resultBlock, *(uint32_t*)(buf + es->key_size)); + #endif /* Move result record into output block (record1[output_block]==record2[output_block]) */ metric->num_memcpys++; @@ -926,6 +1117,10 @@ int adaptive_sort( *((int16_t*)(buffer + BLOCK_COUNT_OFFSET)) = (int16_t)tuplesPerPage; ((file_iterator_state_t*)iteratorState)->fileInterface->seek(lastWritePos, outputFile); +#ifdef DEBUG + debug_log("Writing page adaptive writeRel 2: blockIndex=%d, count=%d, filePosition=%ld\n", + currentBlockId, tuplesPerPage, lastWritePos / PAGE_SIZE); +#endif ((file_iterator_state_t*)iteratorState)->fileInterface->writeRel(buffer + OUTPUT_BLOCK_ID * es->page_size, PAGE_SIZE, 1, outputFile); if (((file_iterator_state_t*)iteratorState)->fileInterface->error(outputFile)) { // File read error @@ -940,11 +1135,12 @@ int adaptive_sort( record2[OUTPUT_BLOCK_ID] = -1; metric->num_writes++; #ifdef DEBUG_OUTPUT - printf("Wrote output block: %d # records: %d\n", *((int32_t*)buffer), tuplesPerPage); + debug_log("Wrote output block: %d # records: %d\n", *((int32_t*)buffer), tuplesPerPage); for (int k = 0; k < tuplesPerPage; k++) { void* buf = (void*)(buffer + es->headerSize + k * es->record_size); - printf("%3d: 4 Output Record: %d Address: %p\n", k, *(uint32_t*)(buf + es->key_offset), buf); + debug_log("%3d: 4 Output Record: %d Address: %p\n", k, *(uint32_t*)(buf + es->key_offset), buf); } + #endif } @@ -990,17 +1186,18 @@ int adaptive_sort( if (destBlk > bufferSizeInBlocks) { #ifdef ADAPTIVE_SORT_PRINT - printf("Incorrect destination block. List 1: (%d, %d) List 2: (%d, %d) List 3: (%d, %d) ResultList: %d\n", record1[0], record2[0], - record1[1], record2[1], record1[2], record2[2], resultBlock); + debug_log("Incorrect destination block. List 1: (%d, %d) List 2: (%d, %d) List 3: (%d, %d) ResultList: %d\n", record1[0], record2[0], + record1[1], record2[1], record1[2], record2[2], resultBlock); /* Output all block contents */ for (int l = 0; l < 3; l++) { - printf("Current block: %d # records: %d\n", l, tuplesPerPage); + debug_log("Current block: %d # records: %d\n", l, tuplesPerPage); for (int k = 0; k < tuplesPerPage; k++) { void* buf = (void*)(buffer + es->headerSize + k * es->record_size + l * es->page_size); - printf("%d: Record: %d Address: %p\n", k, buf + es->key_offset, buf); + debug_log("%d: Record: %d Address: %p\n", k, buf + es->key_offset, buf); } } + #endif } } @@ -1018,7 +1215,8 @@ int adaptive_sort( for (i = 0; i < numTransferThisPass; i++) { #ifdef DEBUG void* buf = (void*)(buffer + originPtr); - printf("Empty output block case. Moved output record back from list %d Key: %d\n", resultBlock, *(uint32_t*)(buf + es->key_offset)); + debug_log("Empty output block case. Moved output record back from list %d Key: %d\n", resultBlock, *(uint32_t*)(buf + es->key_offset)); + #endif numShiftIntoOutput++; /* Get top value from heap */ @@ -1039,7 +1237,8 @@ int adaptive_sort( record1[destBlk] = record1[destBlk] - es->record_size; #ifdef DEBUG void* buf = (void*)(buffer + originPtr); - printf("Moved output record back from list %d Key: %d\n", resultBlock, buf + es->key_offset); + debug_log("Moved output record back from list %d Key: %d\n", resultBlock, buf + es->key_offset); + #endif numShiftIntoOutput++; @@ -1049,7 +1248,8 @@ int adaptive_sort( metric->num_compar++; #ifdef DEBUG void* buf = (void*)(buffer + insert_ptr + es->record_size); - printf("Compare with list %d Key: %d\n", resultBlock, buf + es->key_offset); + debug_log("Compare with list %d Key: %d\n", resultBlock, buf + es->key_offset); + #endif if (0 < es->compare_fcn(buffer + originPtr + es->key_offset, buffer + insert_ptr + es->record_size + es->key_offset)) { /* shift next_val down */ @@ -1076,7 +1276,8 @@ int adaptive_sort( #ifdef DEBUG void* buf = (void*)(buffer + originPtr); - printf("Moved output record to list %d Key: %d\n", destBlk, buf + es->key_offset); + debug_log("Moved output record to list %d Key: %d\n", destBlk, buf + es->key_offset); + #endif numShiftOtherBlock++; @@ -1113,11 +1314,12 @@ int adaptive_sort( record2[resultBlock] = -1; record1[resultBlock] = resultBlock * es->page_size + es->headerSize; #ifdef DEBUG_READ - printf("Read block sublist: %d\n", resultBlock); + debug_log("Read block sublist: %d\n", resultBlock); void* firstRec = (void*)buffer + resultBlock * es->page_size + es->headerSize; void* lastRec = (void*)buffer + resultBlock * es->page_size + es->headerSize + (*((int16_t*)(buffer + resultBlock * es->page_size + BLOCK_COUNT_OFFSET)) - 1) * es->record_size; - printf("Read Sublist: %d Block: %d NumRec: %d First key: %d Last key: %d\n", resultBlock, (int32_t) * (buffer + resultBlock * es->page_size), - *((int16_t*)(buffer + resultBlock * es->page_size + BLOCK_COUNT_OFFSET)), firstRec + es->key_offset, lastRec + es->key_offset); + debug_log("Read Sublist: %d Block: %d NumRec: %d First key: %d Last key: %d\n", resultBlock, (int32_t) * (buffer + resultBlock * es->page_size), + *((int16_t*)(buffer + resultBlock * es->page_size + BLOCK_COUNT_OFFSET)), firstRec + es->key_offset, lastRec + es->key_offset); + #endif } } /* end if is the non output block empty */ @@ -1184,7 +1386,8 @@ int adaptive_sort( /* move the record */ #ifdef DEBUG void* buf = (void*)(buffer + outputCursor); - printf("Output list empty so moved record in output to list %d Key: %d\n", destBlk, *(uint32_t*)(buf + es->key_offset)); + debug_log("Output list empty so moved record in output to list %d Key: %d\n", destBlk, *(uint32_t*)(buf + es->key_offset)); + #endif numShiftOutOutput++; metric->num_memcpys++; @@ -1215,11 +1418,12 @@ int adaptive_sort( int16_t numRecords = *((int16_t*)(buffer + BLOCK_COUNT_OFFSET)); #ifdef DEBUG_READ - printf("Read block sublist: 0\n"); + debug_log("Read block sublist: 0\n"); void* firstRec = (void*)buffer + es->headerSize; void* lastRec = (void*)buffer + es->headerSize + (*((int16_t*)(buffer + BLOCK_COUNT_OFFSET)) - 1) * es->record_size; - printf("Read Sublist: %d Block: %d NumRec: %d First key: %d Last key: %d\n", 0, (int32_t) * (buffer + 0 * es->page_size), - *((int16_t*)(buffer + BLOCK_COUNT_OFFSET)), firstRec + es->key_offset, lastRec + es->key_offset); + debug_log("Read Sublist: %d Block: %d NumRec: %d First key: %d Last key: %d\n", 0, (int32_t) * (buffer + 0 * es->page_size), + *((int16_t*)(buffer + BLOCK_COUNT_OFFSET)), firstRec + es->key_offset, lastRec + es->key_offset); + #endif metric->num_reads += 1; @@ -1275,7 +1479,7 @@ int adaptive_sort( /* end of run */ } - if (record2[0] > 0) { /* Tuples in output block to write out */ + if (record2[0] != -1) { /* Tuples in output block to write out */ // fseek(outputFile, lastWritePos, SEEK_SET); // if (0 == fwrite(buffer + OUTPUT_BLOCK_ID * es->page_size, (size_t)es->page_size, 1, outputFile)) // { /* File write error - arduino prints 1st value nmemb times if nmemb != 1 */ @@ -1289,6 +1493,10 @@ int adaptive_sort( currentBlockId++; ((file_iterator_state_t*)iteratorState)->fileInterface->seek(lastWritePos, outputFile); +#ifdef DEBUG + debug_log("Writing page adaptive write rel 3: blockIndex=%d, count=%d, filePosition=%ld\n", + currentBlockId, (int16_t)(record2[0] - es->headerSize) / es->record_size + 1, lastWritePos / PAGE_SIZE); +#endif ((file_iterator_state_t*)iteratorState)->fileInterface->writeRel(buffer + OUTPUT_BLOCK_ID * es->page_size, PAGE_SIZE, 1, outputFile); if (((file_iterator_state_t*)iteratorState)->fileInterface->error(outputFile)) { // File write error @@ -1304,11 +1512,12 @@ int adaptive_sort( metric->num_writes += 1; #ifdef DEBUG_OUTPUT - printf("Wrote output block here.\n"); + debug_log("Wrote output block here.\n"); for (int k = 0; k < tuplesPerPage; k++) { void* buf = (void*)(buffer + es->headerSize + k * es->record_size); - printf("%3d: 5 Output Record: %d Address: %p\n", k, *(uint32_t*)(buf + es->key_offset), buf); // TODO: Update to no use test_record_t + debug_log("%3d: 5 Output Record: %d Address: %p\n", k, *(uint32_t*)(buf + es->key_offset), buf); // TODO: Update to no use test_record_t } + #endif } @@ -1320,7 +1529,8 @@ int adaptive_sort( } /* end of merge */ *resultFilePtr = lastMergeStart; #ifdef ADAPTIVE_SORT_PRINT_FINISH - printf("Complete. Comparisons: %u Writes: %u Reads: %u Memcpys:\n", metric->num_compar, metric->num_writes, metric->num_reads, metric->num_memcpys); + debug_log("Complete. Comparisons: %u Writes: %u Reads: %u Memcpys:\n", metric->num_compar, metric->num_writes, metric->num_reads, metric->num_memcpys); + #endif /* cleanup */ diff --git a/src/query-interface/sort/flash_minsort.c b/src/query-interface/sort/flash_minsort.c index ac1435a2..a9835618 100644 --- a/src/query-interface/sort/flash_minsort.c +++ b/src/query-interface/sort/flash_minsort.c @@ -52,6 +52,13 @@ This is no output sort with block headers and iterator input. Heap used when mov // #define DEBUG 1 // #define DEBUG_OUTPUT 1 // #define DEBUG_READ 1 +#if defined(DEBUG) || defined(DEBUG_OUTPUT) || defined(DEBUG_READ) +#include "debug_print.h" +#else +#ifndef debug_log +#define debug_log(...) ((void)0) +#endif +#endif #ifndef INT_MAX #define INT_MAX 0xFFFFFFFF @@ -64,13 +71,18 @@ This is no output sort with block headers and iterator input. Heap used when mov * @param es Sorting configuration, including page and record sizes. * @param metric Metrics tracking structure for performance analysis. */ -void readPageMinSort(MinSortState *ms, int pageNum, external_sort_t *es, metrics_t *metric) { +int8_t readPageMinSort(MinSortState *ms, int pageNum, external_sort_t *es, metrics_t *metric) { file_iterator_state_t *is = (file_iterator_state_t *)ms->iteratorState; void *fp = is->file; - +#ifdef DEBUG + debug_log("DEBUG: READ_PAGE %d (Offset %d)\n", pageNum, pageNum * es->page_size); +#endif // Read page into the buffer if (0 == is->fileInterface->read(ms->buffer, pageNum, es->page_size, fp)) { - printf("MINSORT: Failed to read block.\n"); +#ifdef DEBUG + debug_log("MINSORT: Failed to read block.\n"); +#endif + return 0; } metric->num_reads++; @@ -78,12 +90,13 @@ void readPageMinSort(MinSortState *ms, int pageNum, external_sort_t *es, metrics ms->lastBlockIdx = pageNum; #ifdef DEBUG_READ - printf("Reading block: %d\n", pageNum); + debug_log("Reading block: %d\n", pageNum); for (int k = 0; k < 31; k++) { test_record_t *buf = (void *)(ms->buffer + es->headerSize + k * es->record_size); - printf("%d: Record: %d\n", k, buf->key); + debug_log("%d: Record: %d\n", k, buf->key); } #endif + return 1; } /** @@ -136,7 +149,7 @@ void init_MinSort(MinSortState *ms, external_sort_t *es, metrics_t *metric, int8 ms->records_per_block = (es->page_size - es->headerSize) / es->record_size; j = (ms->memoryAvailable - 2 * es->page_size - 2 * es->key_size - INT_SIZE) / (es->key_size + sizeof(uint8_t)); #ifdef FLASH_MINSORT_PRINT - printf("Memory overhead: %d Max regions: %d\r\n", 2 * es->key_size + INT_SIZE, j); + debug_log("Memory overhead: %d Max regions: %d\r\n", 2 * es->key_size + INT_SIZE, j); #endif ms->blocks_per_region = (uint32_t)ceil((double)ms->numBlocks / j); ms->numRegions = (uint32_t)ceil((double)ms->numBlocks / ms->blocks_per_region); @@ -147,9 +160,9 @@ void init_MinSort(MinSortState *ms, external_sort_t *es, metrics_t *metric, int8 ms->min_initialized = (int8_t *)(ms->min + es->key_size * ms->numRegions); #ifdef DEBUG - printf("Memory overhead: %d Max regions: %d\r\n", 2 * SORT_KEY_SIZE + INT_SIZE, j); - printf("Page size: %d, Memory size: %d Record size: %d, Number of records: %lu, Number of blocks: %d, Blocks per region: %d Regions: %d\r\n", - es->page_size, ms->memoryAvailable, ms->record_size, ms->num_records, ms->numBlocks, ms->blocks_per_region, ms->numRegions); + // printf("Memory overhead: %d Max regions: %d\r\n", 2 * SORT_KEY_SIZE + INT_SIZE, j); + debug_log("Page size: %d, Memory size: %d Record size: %d, Number of records: %lu, Number of blocks: %d, Blocks per region: %d Regions: %d\r\n", + es->page_size, ms->memoryAvailable, ms->record_size, ms->num_records, ms->numBlocks, ms->blocks_per_region, ms->numRegions); #endif /* Initialize each region’s minimum value */ @@ -157,28 +170,21 @@ void init_MinSort(MinSortState *ms, external_sort_t *es, metrics_t *metric, int8 ms->min_initialized[i] = false; } - /* Populate each region’s minimum key by scanning blocks */ + /* Populate each region's minimum key by scanning blocks */ for (i = 0; i < ms->numBlocks; i++) { - readPageMinSort(ms, i, es, metric); // Load block i into buffer + readPageMinSort(ms, i, es, metric); regionIdx = i / ms->blocks_per_region; - // Set inital value to first read. - // ms->min[regionIdx] = getValuePtr(ms, 0, es); - memcpy(getMinRegionPtr(ms, regionIdx, es), getValuePtr(ms, 0, es), es->key_size); - metric->num_memcpys++; - ms->min_initialized[regionIdx] = true; - - /* Process remaining records in the block */ - for (j = 1; j < ms->records_per_block; j++) { + for (j = 0; j < ms->records_per_block; j++) { if (((i * ms->records_per_block) + j) < ms->num_records) { val = getValuePtr(ms, j, es); metric->num_compar++; - /* Update region’s minimum if current record is smaller */ - if (compareFn(val, getMinRegionPtr(ms, regionIdx, es)) == -1) { + // Only update if this is the first record for the region OR we found a new minimum + if (!ms->min_initialized[regionIdx] || compareFn(val, getMinRegionPtr(ms, regionIdx, es)) == -1) { memcpy(getMinRegionPtr(ms, regionIdx, es), val, es->key_size); - metric->num_memcpys++; ms->min_initialized[regionIdx] = true; + metric->num_memcpys++; } } else break; @@ -187,7 +193,7 @@ void init_MinSort(MinSortState *ms, external_sort_t *es, metrics_t *metric, int8 #ifdef DEBUG for (i = 0; i < ms->numRegions; i++) - printf("Region: %d Min: %d\r\n", i, ms->min[i]); + debug_log("Region: %d Min: %d\r\n", i, *(int *)getMinRegionPtr(ms, i, es)); #endif /* Allocate memory for current and next keys */ @@ -251,7 +257,7 @@ char *next_MinSort(MinSortState *ms, external_sort_t *es, void *tupleBuffer, met for (k = startIndex / ms->records_per_block; k < ms->blocks_per_region; k++) { curBlk = startBlk + k; - if (curBlk > ms->numBlocks) { + if (curBlk >= ms->numBlocks) { break; } @@ -275,7 +281,7 @@ char *next_MinSort(MinSortState *ms, external_sort_t *es, void *tupleBuffer, met #ifdef DEBUG test_record_t *buf = (test_record_t *)(ms->buffer + es->headerSize + i * es->record_size); buf = (test_record_t *)tupleBuffer; - printf("Returning tuple: %d\n", buf->key); + debug_log("Returning tuple: %d\n", buf->key); #endif i++; // Move to the next record ms->tuplesOut++; @@ -295,7 +301,7 @@ char *next_MinSort(MinSortState *ms, external_sort_t *es, void *tupleBuffer, met done: #ifdef DEBUG - printf("Updating minimum in region\r\n"); + debug_log("Updating minimum in region\r\n"); #endif // After processing the current block, scan the rest of the region to find a smaller record if possible @@ -327,7 +333,7 @@ char *next_MinSort(MinSortState *ms, external_sort_t *es, void *tupleBuffer, met if (compareFn(dataVal, ms->current) == 0) { ms->nextIdx = k * ms->records_per_block + i; #ifdef DEBUG - printf("Next tuple at: %d k: %d i: %d\r\n", ms->nextIdx, k, i); + debug_log("Next tuple at: %d k: %d i: %d\r\n", ms->nextIdx, k, i); #endif goto done2; } @@ -357,7 +363,7 @@ char *next_MinSort(MinSortState *ms, external_sort_t *es, void *tupleBuffer, met } #ifdef DEBUG - printf("Updated minimum in block to: %d\r\n", ms->min[ms->regionIdx]); + debug_log("Updated minimum in block to: %d\r\n", ms->min[ms->regionIdx]); #endif } @@ -414,9 +420,11 @@ int flash_minsort( metrics_t *metric, int8_t (*compareFn)(void *a, void *b)) { #ifdef DEBUG - printf("*Flash Minsort*\n"); + debug_log("*Flash Minsort*\n"); #endif +#ifndef ARDUINO clock_t start = clock(); +#endif MinSortState ms; ms.buffer = buffer; @@ -429,6 +437,7 @@ int flash_minsort( int32_t blockIndex = 0; int16_t values_per_page = (es->page_size - es->headerSize) / es->record_size; uint8_t *outputBuffer = buffer + es->page_size; + unsigned long lastWritePos = *resultFilePtr; // test_record_t *buf; // Main sorting loop: fetches and writes sorted records in blocks @@ -440,31 +449,41 @@ int flash_minsort( if (count == values_per_page) { // Write block *((int32_t *)outputBuffer) = blockIndex; /* Block index */ *((int16_t *)(outputBuffer + BLOCK_COUNT_OFFSET)) = count; /* Block record count */ - count = 0; // Reset count for the next block // Write the block to the output file using the file interface's write method - if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->write(outputBuffer, blockIndex, es->page_size, outputFile)) { + ((file_iterator_state_t *)iteratorState)->fileInterface->seek(lastWritePos, outputFile); +#ifdef DEBUG + debug_log("Writing page flash minsort: blockIndex=%d, count=%d, filePosition=%ld\n", + blockIndex, count, count / PAGE_SIZE); +#endif + if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->writeRel(outputBuffer, es->page_size, 1, outputFile)) { return 9; // Return error code if writing to the output file fails } #ifdef DEBUG - printf("Wrote output block. Block index: %d\n", blockIndex); + debug_log("Wrote output block. Block index: %d\n", blockIndex); for (int k = 0; k < values_per_page; k++) { test_record_t *buf = (void *)(outputBuffer + es->headerSize + k * es->record_size); - printf("%d: Output Record: %d\n", k, buf->key); + debug_log("%d: Output Record: %d\n", k, buf->key); } #endif metric->num_writes++; + lastWritePos += es->page_size; blockIndex++; + count = 0; } } // Write the last block if there are remaining records if (count > 0) { + ((file_iterator_state_t *)iteratorState)->fileInterface->seek(lastWritePos, outputFile); *((int32_t *)outputBuffer) = blockIndex; /* Block index */ *((int16_t *)(outputBuffer + BLOCK_COUNT_OFFSET)) = count; /* Block record count */ - - if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->write(outputBuffer, blockIndex, es->page_size, outputFile)) { +#ifdef DEBUG + debug_log("Writing page flash minsort: blockIndex=%d, count=%d, filePosition=%ld\n", + blockIndex, count, count / PAGE_SIZE); +#endif + if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->writeRel(outputBuffer, es->page_size, 1, outputFile)) { return 9; // Return error code if writing to the output file fails } metric->num_writes++; @@ -473,19 +492,18 @@ int flash_minsort( } #ifdef DEBUG - printf("Number of sorted records: %d", ms.num_records); + debug_log("Number of sorted records: %d", ms.num_records); #endif ((file_iterator_state_t *)iteratorState)->fileInterface->flush(outputFile); close_MinSort(&ms, es); - +#ifndef ARDUINO clock_t end = clock(); - - *resultFilePtr = 0; +#endif #ifdef DEBUG - printf("Complete. Comparisons: %d MemCopies: %d\n", metric->num_compar, metric->num_memcpys); + debug_log("Complete. Comparisons: %d MemCopies: %d\n", metric->num_compar, metric->num_memcpys); #endif return 0; // Successful completion diff --git a/src/query-interface/sort/flash_minsort_sublist.c b/src/query-interface/sort/flash_minsort_sublist.c index d2ce3722..daa6d778 100644 --- a/src/query-interface/sort/flash_minsort_sublist.c +++ b/src/query-interface/sort/flash_minsort_sublist.c @@ -46,19 +46,27 @@ #include "in_memory_sort.h" -/* -#define DEBUG 1 -#define DEBUG_OUTPUT 1 -#define DEBUG_READ 1 -*/ +// #define DEBUG 1 +// #define DEBUG_OUTPUT 1 +// #define DEBUG_READ 1 +#if defined(DEBUG) || defined(DEBUG_OUTPUT) || defined(DEBUG_READ) +#include "debug_print.h" +#else +#ifndef debug_log +#define debug_log(...) ((void)0) +#endif +#endif -void readPage_sublist(MinSortStateSublist *ms, int pageNum, external_sort_t *es, metrics_t *metric) { +int8_t readPage_sublist(MinSortStateSublist *ms, int pageNum, external_sort_t *es, metrics_t *metric) { file_iterator_state_t *is = (file_iterator_state_t *)ms->iteratorState; void *fp = is->file; // Read page into the buffer if (0 == is->fileInterface->read(ms->buffer, pageNum, es->page_size, fp)) { - printf("MINSORT SUBLIST: Failed to read block.\n"); +#ifdef DEBUG + debug_log("MINSORT SUBLIST: Failed to read block.\n"); +#endif + return 0; } metric->num_reads++; @@ -66,12 +74,13 @@ void readPage_sublist(MinSortStateSublist *ms, int pageNum, external_sort_t *es, ms->lastBlockIdx = pageNum; #ifdef DEBUG_READ - printf("Reading block: %d Offset: %lu\n", pageNum, offset); + debug_log("Reading block: %d Offset: %lu\n", pageNum, es->key_offset); for (int k = 0; k < 31; k++) { test_record_t *buf = (void *)(ms->buffer + es->headerSize + k * es->record_size); - printf("%d: Record: %d\n", k, buf->key); + debug_log("%d: Record: %d\n", k, buf->key); } #endif + return 1; } int32_t getBlockId(MinSortStateSublist *ms) { @@ -105,7 +114,7 @@ void init_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, metrics_ // j = (ms->memoryAvailable - 2 * SORT_KEY_SIZE - INT_SIZE) / SORT_KEY_SIZE; j = (ms->memoryAvailable) / (SORT_KEY_SIZE + sizeof(uint8_t)); #ifdef FLASH_MINSORT_PRINT - printf("Memory overhead: %d Max regions: %d\r\n", 2 * SORT_KEY_SIZE + INT_SIZE, j); + debug_log("Memory overhead: %d Max regions: %d\r\n", 2 * SORT_KEY_SIZE + INT_SIZE, j); #endif // Memory allocation // Allocate minimum index in separate memory space (block 0 is input buffer, block 1 is output buffer) @@ -119,8 +128,8 @@ void init_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, metrics_ ms->min_set = malloc(ms->numRegions * sizeof(uint8_t)); ms->offset = malloc(ms->numRegions * sizeof(long)); #ifdef FLASH_MINSORT_PRINT - printf("Page size: %d, Memory size: %d Record size: %d, Number of records: %lu, Number of blocks: %d, Regions: %d\r\n", - es->page_size, ms->memoryAvailable, ms->record_size, ms->num_records, ms->numBlocks, ms->numRegions); + debug_log("Page size: %d, Memory size: %d Record size: %d, Number of records: %lu, Number of blocks: %d, Regions: %d\r\n", + es->page_size, ms->memoryAvailable, ms->record_size, ms->num_records, ms->numBlocks, ms->numRegions); #endif for (i = 0; i < ms->numRegions; i++) @@ -137,12 +146,12 @@ void init_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, metrics_ int numBlocksSublist = *(int32_t *)ms->buffer; /* Retrieve block id (indexed from 0) to compute count of blocks in sublist */ #if DEBUG - printf("Read block: %d", lastBlock); - printf(" Num: %d\n", numBlocksSublist); + debug_log("Read block: %d", lastBlock); + debug_log(" Num: %d\n", numBlocksSublist); for (int k = 0; k < 31; k++) { test_record_t *buf = (void *)(ms->buffer + es->headerSize + k * es->record_size); - printf("%d: Record: %d\n", k, buf->key); + debug_log("%d: Record: %d\n", k, buf->key); } #endif lastBlock = lastBlock - numBlocksSublist; @@ -153,22 +162,22 @@ void init_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, metrics_ memcpy(ms->min + es->record_size * regionIdx, getTuple_sublist(ms, 0, es), es->value_size); metric->num_memcpys++; ms->min_set[regionIdx] = true; - ms->offset[regionIdx] = lastBlock * es->page_size + es->headerSize + ms->fileOffset; + ms->offset[regionIdx] = lastBlock * es->page_size + es->headerSize; #if DEBUG - printf("New min. Index: %d", regionIdx); - printf(" Min: %u", ms->min[regionIdx]); - printf(" Offset: %lu\n", ms->offset[regionIdx]); + debug_log("New min. Index: %d", regionIdx); + debug_log(" Min: %u", ms->min[regionIdx]); + debug_log(" Offset: %lu\n", ms->offset[regionIdx]); #endif regionIdx--; lastBlock--; } #ifdef DEBUG - printf("Region summary\n"); + debug_log("Region summary\n"); for (i = 0; i < ms->numRegions; i++) { - printf("Reg: %d", i); - printf(" Min: %u", ms->min[i]); - printf(" Offset: %lu\n", ms->offset[i]); + debug_log("Reg: %d", i); + debug_log(" Min: %u", ms->min[i]); + debug_log(" Offset: %lu\n", ms->offset[i]); } #endif @@ -219,10 +228,17 @@ char *next_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, void *t curBlk = startIndex / es->page_size; // Smallest value is at current index - if (curBlk != ms->lastBlockIdx) { // Read block into buffer - readPage_sublist(ms, curBlk, es, metric); + if (curBlk != ms->lastBlockIdx) { + /* Checking for read failure here */ + if (0 == readPage_sublist(ms, curBlk, es, metric)) { + // If we can't read the block, this region is exhausted. + ms->offset[ms->regionIdx] = -1; + ms->min_set[ms->regionIdx] = false; + // Recursive call to try again with this region disabled + return next_MinSort_sublist(ms, es, tupleBuffer, metric); + } } - } else { // Use next record in current block + } else { i = ms->nextIdx; } @@ -232,7 +248,7 @@ char *next_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, void *t #ifdef DEBUG test_record_t *buf = (test_record_t *)(ms->buffer + es->headerSize + i * es->record_size); buf = (test_record_t *)tupleBuffer; - printf("Returning tuple: %d\n", buf->key); + debug_log("Returning tuple: %d\n", buf->key); #endif // Advance to next tuple in block @@ -244,10 +260,15 @@ char *next_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, void *t i = 0; int32_t currentBlockId = getBlockId(ms); curBlk++; - readPage_sublist(ms, curBlk, es, metric); - if (currentBlockId >= getBlockId(ms)) { - // Transitioned to a block in a new sublist - // ms->min[ms->regionIdx] = INT_MAX; + + if (0 == readPage_sublist(ms, curBlk, es, metric)) { + // Read failed (EOF). Mark region as finished. + ms->offset[ms->regionIdx] = -1; + ms->min_set[ms->regionIdx] = false; + } + /* Only process the new block if read was successful */ + else if (currentBlockId >= getBlockId(ms)) { + // Transitioned to a block in a new sublist (ID check) ms->offset[ms->regionIdx] = -1; ms->min_set[ms->regionIdx] = false; } else { @@ -273,7 +294,7 @@ char *next_MinSort_sublist(MinSortStateSublist *ms, external_sort_t *es, void *t } #ifdef DEBUG - printf("Updated minimum in block to: %d\r\n", ms->min[ms->regionIdx]); + debug_log("Updated minimum in block to: %d\r\n", ms->min[ms->regionIdx]); #endif return tupleBuffer; @@ -339,7 +360,7 @@ int flash_minsort_sublist( int32_t blockIndex = 0; int16_t values_per_page = (es->page_size - es->headerSize) / es->record_size; char *outputBuffer = buffer + es->page_size; - unsigned long lastWritePos = ms.fileOffset + es->num_pages * es->page_size; + unsigned long lastWritePos = *resultFilePtr; // Write while (next_MinSort_sublist(&ms, es, (char *)(outputBuffer + count * es->record_size + es->headerSize), metric) != NULL) { @@ -354,6 +375,10 @@ int flash_minsort_sublist( // Force seek to end of file as outputFile is also inputFile and have been reading it ((file_iterator_state_t *)iteratorState)->fileInterface->seek(lastWritePos, outputFile); // Write the block to the output file using the file interface's write method +#ifdef DEBUG + debug_log("Writing page flash minsort sublist: blockIndex=%d, count=%d, filePosition=%ld\n", + blockIndex, count, lastWritePos / PAGE_SIZE); +#endif if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->writeRel(outputBuffer, es->page_size, 1, outputFile)) { return 9; // Return error code if writing to the output file fails } @@ -366,7 +391,7 @@ int flash_minsort_sublist( printf("Last write pos: %lu Block: %d\n", lastWritePos, blockIndex); */ #ifdef DEBUG_OUTPUT - printf("Wrote output block. Block index: %d\n", blockIndex); + debug_log("Wrote output block. Block index: %d\n", blockIndex); for (int k = 0; k < values_per_page; k++) { test_record_t *buf = (void *)(outputBuffer + es->headerSize + k * es->record_size); printf("%d: Output Record: %d\n", k, buf->key); @@ -380,11 +405,13 @@ int flash_minsort_sublist( if (count > 0) { // fseek(outputFile, lastWritePos, SEEK_SET); ((file_iterator_state_t *)iteratorState)->fileInterface->seek(lastWritePos, outputFile); - - *((int32_t *)buffer) = blockIndex; /* Block index */ - *((int16_t *)(buffer + BLOCK_COUNT_OFFSET)) = count; /* Block record count */ - - if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->write(outputBuffer, es->page_size, 1, outputFile)) { + *((int32_t *)outputBuffer) = blockIndex; /* Block index */ + *((int16_t *)(outputBuffer + BLOCK_COUNT_OFFSET)) = count; /* Block record count */ +#ifdef DEBUG + debug_log("Writing last page minsort sublist: blockIndex=%d, count=%d, filePosition=%ld\n", + blockIndex, count, lastWritePos / PAGE_SIZE); +#endif + if (0 == ((file_iterator_state_t *)iteratorState)->fileInterface->writeRel(outputBuffer, es->page_size, 1, outputFile)) { return 9; // Return error code if writing to the output file fails } @@ -397,7 +424,6 @@ int flash_minsort_sublist( close_MinSort_sublist(&ms, es); - *resultFilePtr = 0; free(ms.min); free(ms.offset); free(ms.current); diff --git a/src/query-interface/sort/sortWrapper.c b/src/query-interface/sort/sortWrapper.c index 576c0120..c07d0e17 100644 --- a/src/query-interface/sort/sortWrapper.c +++ b/src/query-interface/sort/sortWrapper.c @@ -1,14 +1,21 @@ #include "sortWrapper.h" #include "query-interface/sort/in_memory_sort.h" +#ifndef ARDUINO +#include "unistd.h" +#endif -#define PRINT_METRIC - -// External declaration for setupFile function -extern void *setupFile(const char *filename); +// #define PRINT_METRIC +// #define DEBUG +// #define PRINT_ERRORS +#if defined(DEBUG) || defined(PRINT_METRIC) || defined(PRINT_ERRORS) +#include "debug_print.h" +#else +#ifndef debug_log +#define debug_log(...) ((void)0) +#endif -// Forward declaration for pure in-memory sort (no file I/O) -file_iterator_state_t *startPureMemorySort(sortData *data, embedDBOperator *op); +#endif /** * @brief Pure in-memory sort that avoids file I/O completely for very small datasets @@ -17,21 +24,28 @@ file_iterator_state_t *startPureMemorySort(sortData *data, embedDBOperator *op); * @return file_iterator_state_t* Iterator for reading sorted results from memory */ file_iterator_state_t *startPureMemorySort(sortData *data, embedDBOperator *op) { - printf("DEBUG: Starting pure in-memory sort\n"); - +#ifdef DEBUG + debug_log("DEBUG: Starting pure in-memory sort\n"); +#endif int record_count = 0; while (exec(op->input)) { record_count++; if (record_count > 10) { // Safety limit - printf("ERROR: Too many records for pure in-memory sort\n"); +#ifdef PRINT_ERRORS + debug_log("ERROR: Too many records for pure in-memory sort\n"); +#endif return NULL; } } - printf("DEBUG: Found %d records for pure in-memory sort\n", record_count); +#ifdef DEBUG + debug_log("DEBUG: Found %d records for pure in-memory sort\n", record_count); +#endif if (record_count == 0) { - printf("DEBUG: No records to sort\n"); +#ifdef DEBUG + debug_log("DEBUG: No records to sort\n"); +#endif file_iterator_state_t *iteratorState = malloc(sizeof(file_iterator_state_t)); if (iteratorState == NULL) { return NULL; @@ -49,7 +63,9 @@ file_iterator_state_t *startPureMemorySort(sortData *data, embedDBOperator *op) void *buffer = malloc(record_count * data->recordSize); if (buffer == NULL) { - printf("ERROR: Failed to allocate memory for pure in-memory sort\n"); +#ifdef PRINT_ERRORS + debug_log("ERROR: Failed to allocate memory for pure in-memory sort\n"); +#endif return NULL; } @@ -64,23 +80,31 @@ file_iterator_state_t *startPureMemorySort(sortData *data, embedDBOperator *op) records_read++; } - printf("DEBUG: Read %d records into memory buffer\n", records_read); +#ifdef DEBUG + debug_log("DEBUG: Read %d records into memory buffer\n", records_read); +#endif // Sort the records in memory using quicksort metrics_t metrics = {0}; int sort_result = in_memory_quick_sort(buffer, records_read, data->recordSize, data->keyOffset, data->compareFn, &metrics); if (sort_result != 0) { - printf("ERROR: In-memory sort failed\n"); +#ifdef PRINT_ERRORS + debug_log("ERROR: In-memory sort failed\n"); +#endif free(buffer); return NULL; } - printf("DEBUG: Pure in-memory sort completed successfully\n"); +#ifdef DEBUG + debug_log("DEBUG: Pure in-memory sort completed successfully\n"); +#endif file_iterator_state_t *iteratorState = malloc(sizeof(file_iterator_state_t)); if (iteratorState == NULL) { - printf("ERROR: Failed to allocate iterator state\n"); +#ifdef PRINT_ERRORS + debug_log("ERROR: Failed to allocate iterator state\n"); +#endif free(buffer); return NULL; } @@ -108,14 +132,15 @@ file_iterator_state_t *startPureMemorySort(sortData *data, embedDBOperator *op) * @param file The file being written to * @return int8_t */ -int8_t writePageWithHeader(void *buffer, const uint32_t blockIndex, const uint32_t numberOfValues, const uint32_t pageSize, const embedDBFileInterface *fileInterface, void *file) { - memcpy(buffer, &blockIndex, sizeof(int32_t)); - memcpy(buffer + sizeof(uint32_t), &numberOfValues, sizeof(int16_t)); +int8_t writePageWithHeader(void *buffer, const uint32_t blockIndex, const uint16_t numberOfValues, const uint32_t pageSize, const embedDBFileInterface *fileInterface, void *file) { + memcpy(buffer, &blockIndex, sizeof(uint32_t)); + memcpy(buffer + sizeof(uint32_t), &numberOfValues, sizeof(uint16_t)); fileInterface->write(buffer, blockIndex, pageSize, file); - if (fileInterface->error(file)) { - printf("ERROR: SORT: Failed to write unsorted data"); +#ifdef PRINT_ERRORS + debug_log("ERROR: SORT: Failed to write unsorted data"); +#endif return 1; } @@ -127,7 +152,7 @@ int8_t writePageWithHeader(void *buffer, const uint32_t blockIndex, const uint32 * * @param data The operator data * @param op The previous operator - * @param unsortedFile A prexisting file that the row data will be writen to + * @param unsortedFile A preexisting file that the row data will be written to * @param recordSize The size of the data * @param keySize The size of the key * @param keyOffset The offset of the key with in the record (# of bytes) @@ -136,14 +161,21 @@ int8_t writePageWithHeader(void *buffer, const uint32_t blockIndex, const uint32 */ uint32_t loadRowData(sortData *data, embedDBOperator *op, void *unsortedFile) { uint32_t count = 0; - int32_t blockIndex = 0; - int16_t valuesPerPage = (PAGE_SIZE - BLOCK_HEADER_SIZE) / data->recordSize; + uint32_t blockIndex = 0; + uint16_t valuesPerPage = (PAGE_SIZE - BLOCK_HEADER_SIZE) / data->recordSize; + +#ifdef DEBUG + debug_log("DEBUG loadRowData: PAGE_SIZE=%d, BLOCK_HEADER_SIZE=%d, recordSize=%d, valuesPerPage=%d\n", + PAGE_SIZE, BLOCK_HEADER_SIZE, data->recordSize, valuesPerPage); +#endif void *buffer = malloc(PAGE_SIZE); if (buffer == NULL) { - printf("ERROR: SORT: buffer malloc failed"); - return 0; +#ifdef PRINT_ERRORS + debug_log("ERROR: SORT: buffer malloc failed"); +#endif + return 1; } // Write row data to file @@ -155,15 +187,16 @@ uint32_t loadRowData(sortData *data, embedDBOperator *op, void *unsortedFile) { buffer = NULL; return 0; } - blockIndex++; } // Offset of the data in the page - uint32_t rowOffset = count % valuesPerPage * data->recordSize + BLOCK_HEADER_SIZE; + uint32_t rowOffset = (count % valuesPerPage) * data->recordSize + BLOCK_HEADER_SIZE; if (rowOffset + data->recordSize > PAGE_SIZE) { - printf("ERROR: SORT: error calculating row offset"); +#ifdef PRINT_ERRORS + debug_log("ERROR: SORT: error calculating row offset"); +#endif free(buffer); buffer = NULL; return 0; @@ -171,6 +204,26 @@ uint32_t loadRowData(sortData *data, embedDBOperator *op, void *unsortedFile) { // Write data to buffer memcpy((uint8_t *)buffer + rowOffset, op->input->recordBuffer, data->recordSize); +#ifdef DEBUG + if (count < 10) { + debug_log("DEBUG loadRowData record %d: ", count); + for (int i = 0; i < data->recordSize; i++) { + debug_log("%02x ", ((uint8_t *)op->input->recordBuffer)[i]); + } + debug_log("\n"); + + // Also show what we wrote to the buffer + debug_log("DEBUG wrote to buffer at offset %d: ", rowOffset); + for (int i = 0; i < data->recordSize; i++) { + debug_log("%02x ", ((uint8_t *)buffer)[rowOffset + i]); + } + debug_log("\n"); + } + if (count < 10 || count % 1000 == 0) { + int32_t *keyPtr = (int32_t *)(op->input->recordBuffer + data->keyOffset); + debug_log("DEBUG loadRowData: count=%d, rowOffset=%d, key=%d\n", count, rowOffset, *keyPtr); + } +#endif count++; @@ -186,9 +239,12 @@ uint32_t loadRowData(sortData *data, embedDBOperator *op, void *unsortedFile) { buffer = NULL; return 0; } - data->fileInterface->flush(unsortedFile); +#ifdef DEBUG + debug_log("DEBUG loadRowData: finished, totalRecords=%d\n", count); +#endif + // Clean up free(buffer); buffer = NULL; @@ -206,57 +262,68 @@ void prepareSort(embedDBOperator *op) { data->keyOffset = getColOffsetFromSchema(op->schema, data->colNum); data->recordSize = getRecordSizeFromSchema(op->schema); data->keySize = op->schema->columnSizes[data->colNum]; +#ifdef DEBUG + debug_log("DEBUG prepareSort: recordSize=%d, keySize=%d, keyOffset=%d, colNum=%d\n", + data->recordSize, data->keySize, data->keyOffset, data->colNum); + debug_log("DEBUG prepareSort: schema has %d columns\n", op->schema->numCols); + for (int i = 0; i < op->schema->numCols; i++) { + debug_log(" Column %d: size=%d\n", i, op->schema->columnSizes[i]); + } +#endif // A columns size will be negative if the column is signed // and positive if value is unsigned if (data->keySize < 0) { data->keySize = -1 * data->keySize; } - -#ifdef ARDUINO - // For Arduino Due, use pure in-memory sort to completely avoid SD card I/O issues - data->fileIterator = startPureMemorySort(data, op); - if (data->fileIterator == NULL) { - printf("ERROR: Pure memory sort failed\n"); + if (data->fileInterface == NULL || data->fileInterface->setup == NULL) { +#ifdef PRINT_ERRORS + debug_log("ERROR: File interface or setup function not provided while initializing ORDER BY operator\n"); +#endif return; } - return; -#endif - // Set up files - void *unsortedFile = setupFile(SORT_DATA_LOCATION); - void *sortedFile = setupFile(SORT_ORDER_LOCATION); + char *tmp1 = data->fileInterface->tempFilePath(); + char *tmp2 = data->fileInterface->tempFilePath(); + + void *unsortedFile = data->fileInterface->setup(tmp1); + void *sortedFile = data->fileInterface->setup(tmp2); + free(tmp1); + free(tmp2); if (unsortedFile == NULL || sortedFile == NULL) { #ifdef PRINT_ERRORS - printf("ERROR: Failed to open files while initializing ORDER BY operator"); + debug_log("ERROR: Failed to allocate file handles while initializing ORDER BY operator\n"); #endif return; } - const uint8_t unsortedOpen = data->fileInterface->open(unsortedFile, EMBEDDB_FILE_MODE_W_PLUS_B); const uint8_t sortedOpen = data->fileInterface->open(sortedFile, EMBEDDB_FILE_MODE_W_PLUS_B); if (!unsortedOpen || !sortedOpen) { #ifdef PRINT_ERRORS - printf("ERROR: Failed to open files while initializing ORDER BY operator"); + debug_log("ERROR: Failed to open files while initializing ORDER BY operator"); #endif return; } // Load row data data->count = loadRowData(data, op, unsortedFile); - // Start sorting file_iterator_state_t *iteratorState = startSort(data, unsortedFile, sortedFile); if (iteratorState == NULL) { - printf("ERROR: Sort failed"); +#ifdef PRINT_ERRORS + debug_log("ERROR: Sort failed"); +#endif return; } // Finish iteratorState->file = sortedFile; data->fileInterface->close(unsortedFile); + if (data->fileInterface->removeFile) { + data->fileInterface->removeFile(unsortedFile); + } data->fileIterator = iteratorState; } @@ -281,19 +348,14 @@ file_iterator_state_t *startSort(sortData *data, void *unsortedFile, void *sorte es.page_size = PAGE_SIZE; es.num_pages = (uint32_t)ceil((float)data->count / ((es.page_size - es.headerSize) / es.record_size)); -// Reduce buffer size for Arduino -#ifdef ARDUINO - const int buffer_max_pages = 1; // Reduced to minimum for Arduino -#else const int buffer_max_pages = 4; -#endif char *buffer = malloc(buffer_max_pages * es.page_size + es.record_size); char *tuple_buffer = buffer + es.page_size * buffer_max_pages; if (buffer == NULL) { #ifdef PRINT_ERRORS - printf("ERROR: SORT: buffer malloc failed m\n"); + debug_log("ERROR: SORT: buffer malloc failed m\n"); #endif return NULL; } @@ -302,7 +364,7 @@ file_iterator_state_t *startSort(sortData *data, void *unsortedFile, void *sorte file_iterator_state_t *iteratorState = malloc(sizeof(file_iterator_state_t)); if (iteratorState == NULL) { #ifdef PRINT_ERRORS - printf("Error: SORT: iterator malloc failed\n"); + debug_log("Error: SORT: iterator malloc failed\n"); #endif free(buffer); buffer = NULL; @@ -311,7 +373,7 @@ file_iterator_state_t *startSort(sortData *data, void *unsortedFile, void *sorte iteratorState->file = unsortedFile; iteratorState->recordsRead = 0; - iteratorState->totalRecords = data->count; // Total records from the previous while loop + iteratorState->totalRecords = data->count; iteratorState->recordSize = es.record_size; iteratorState->fileInterface = data->fileInterface; iteratorState->currentRecord = 0; @@ -326,38 +388,24 @@ file_iterator_state_t *startSort(sortData *data, void *unsortedFile, void *sorte long result_file_ptr = 0; int err; -// Use simpler sort for Arduino with small datasets -#ifdef ARDUINO - printf("DEBUG: Starting Arduino sort with %d records\n", data->count); - if (data->count <= 100) { // Use flash_minsort for all datasets on Arduino (more memory efficient) - printf("DEBUG: Using flash_minsort for small dataset\n"); - err = flash_minsort(iteratorState, tuple_buffer, sortedFile, buffer, buffer_max_pages * es.page_size, &es, &result_file_ptr, &metrics, data->compareFn); - } else { - printf("DEBUG: Using flash_minsort for large dataset\n"); - // Use flash_minsort for larger datasets (more memory efficient than adaptive_sort) - err = flash_minsort(iteratorState, tuple_buffer, sortedFile, buffer, buffer_max_pages * es.page_size, &es, &result_file_ptr, &metrics, data->compareFn); - } - printf("DEBUG: Arduino sort completed with error code: %d\n", err); -#else - // Use adaptive sort on desktop + int8_t runGenOnly = false; // Run full sort operation int8_t writeReadRatio = 19; // 1.97 * 10 => 19 err = adaptive_sort(readNextRecord, iteratorState, tuple_buffer, sortedFile, buffer, buffer_max_pages, &es, &result_file_ptr, &metrics, data->compareFn, runGenOnly, writeReadRatio, data); -#endif #ifdef PRINT_METRIC - printf("\tComplete. Comparisons: %d Writes: %d Reads: %d Memcpys: %d\n", metrics.num_compar, metrics.num_writes, metrics.num_reads, metrics.num_memcpys); + debug_log("\tComplete. Comparisons: %d Writes: %d Reads: %d Memcpys: %d\n", metrics.num_compar, metrics.num_writes, metrics.num_reads, metrics.num_memcpys); #endif iteratorState->resultFile = result_file_ptr; #ifdef PRINT_ERRORS if (8 == err) { - printf("Out of memory!\n"); + debug_log("Out of memory!\n"); } else if (10 == err) { - printf("File Read Error!\n"); + debug_log("File Read Error!\n"); } else if (9 == err) { - printf("File Write Error!\n"); + debug_log("File Write Error!\n"); } #endif @@ -385,64 +433,89 @@ uint8_t readNextRecord(void *data, void *buffer) { return 1; // No more records left to read } -#ifdef ARDUINO - // For pure memory sort on Arduino, read directly from memory buffer - if (iteratorState->file != NULL && iteratorState->resultFile == 0) { - memcpy(buffer, (char *)iteratorState->file + iteratorState->recordsRead * iteratorState->recordSize, - iteratorState->recordSize); - iteratorState->recordsRead++; - iteratorState->currentRecord++; - return 0; - } -#endif - uint32_t recordPerPage = (PAGE_SIZE - BLOCK_HEADER_SIZE) / iteratorState->recordSize; // Read next page if current buffer is empty if (iteratorState->currentRecord % recordPerPage == 0 || iteratorState->recordsRead == 0) { - iteratorState->fileInterface->seek(iteratorState->currentRecord / recordPerPage * PAGE_SIZE + iteratorState->resultFile, iteratorState->file); + uint32_t seekOffset = iteratorState->resultFile + (iteratorState->currentRecord / recordPerPage) * PAGE_SIZE; + + iteratorState->fileInterface->seek(seekOffset, iteratorState->file); iteratorState->fileInterface->readRel(((sortData *)data)->readBuffer, PAGE_SIZE, 1, iteratorState->file); +#ifdef DEBUG + if (iteratorState->recordsRead == 0 || iteratorState->recordsRead % 1000 == 0) { + debug_log("DEBUG readNextRecord: pageNum=%d, seekOffset=%d, recordsRead=%d\n", + iteratorState->currentRecord / recordPerPage, seekOffset, iteratorState->recordsRead); + } +#endif + if (((sortData *)data)->fileInterface->error(iteratorState->file)) { - printf("ERROR: SORT: next record read failed"); +#ifdef PRINT_ERRORS + debug_log("ERROR: SORT: next record read failed"); +#endif return 2; } } - // Copy result to ouput buffer - memcpy(buffer, ((sortData *)data)->readBuffer + BLOCK_HEADER_SIZE + iteratorState->recordSize * (iteratorState->currentRecord % recordPerPage), iteratorState->recordSize); - iteratorState->recordsRead++; - iteratorState->currentRecord++; + // Copy result to output buffer + uint16_t valuesInPage; + memcpy(&valuesInPage, ((sortData *)data)->readBuffer + sizeof(uint32_t), + sizeof(uint16_t)); + uint32_t recordIndexInPage = iteratorState->currentRecord % recordPerPage; +#ifdef DEBUG + +#endif + + if (recordIndexInPage >= valuesInPage) { + return 1; + } + uint32_t copyOffset = BLOCK_HEADER_SIZE + iteratorState->recordSize * recordIndexInPage; + memcpy(buffer, ((sortData *)data)->readBuffer + copyOffset, iteratorState->recordSize); #ifdef DEBUG - printf("DEBUG: ROWDATA from file:\n"); - for (int i = 0; i < iteratorState->recordSize - SORT_KEY_SIZE; i++) { - printf("%2x ", ((uint8_t *)buffer)[i]); + if (iteratorState->recordsRead < 10 || iteratorState->recordsRead % 1000 == 0) { + int32_t *keyPtr = (int32_t *)(buffer + ((sortData *)data)->keyOffset); + debug_log("DEBUG readNextRecord: recordsRead=%d, currentRecord=%d, pageIdx=%d, recordInPage=%d, copyOffset=%d, key=%d\n", + iteratorState->recordsRead, iteratorState->currentRecord, iteratorState->currentRecord / recordPerPage, + recordIndexInPage, copyOffset, *keyPtr); + uint32_t blockIdx; + memcpy(&blockIdx, ((sortData *)data)->readBuffer, sizeof(uint32_t)); + debug_log("READ PAGE hdr: blockIdx=%u values=%u\n", + blockIdx, valuesInPage); + debug_log("PAGE HEADER: page=%d values=%d\n", + iteratorState->currentRecord / recordPerPage, + valuesInPage); + int32_t *key0 = (int32_t *)(((sortData *)data)->readBuffer + BLOCK_HEADER_SIZE + ((sortData *)data)->keyOffset); + int32_t *keyLast = (int32_t *)(((sortData *)data)->readBuffer + BLOCK_HEADER_SIZE + (recordPerPage - 1) * iteratorState->recordSize + ((sortData *)data)->keyOffset); + debug_log(" First key on page: %d, Last key on page: %d\n", *key0, *keyLast); } - printf("\n"); #endif + iteratorState->recordsRead++; + iteratorState->currentRecord++; + + // #ifdef DEBUG + // printf("DEBUG: ROWDATA from file:\n"); + // for (int i = 0; i < iteratorState->recordSize - SORT_KEY_SIZE; i++) { + // printf("%2x ", ((uint8_t *)buffer)[i]); + // } + // printf("\n"); + // #endif return 0; } void closeSort(file_iterator_state_t *iteratorState) { -#ifdef ARDUINO - // For pure memory sort, we need to free the memory buffer - if (iteratorState->file != NULL && iteratorState->resultFile == 0) { - free(iteratorState->file); - iteratorState->file = NULL; - return; - } -#endif - if (iteratorState->file != NULL) { iteratorState->fileInterface->close(iteratorState->file); + if (iteratorState->fileInterface->removeFile) { + iteratorState->fileInterface->removeFile(iteratorState->file); + } iteratorState->file = NULL; } } /** - * @brief Initalizes default metric values + * @brief Initializes default metric values * * @return metrics_t */ diff --git a/src/query-interface/sort/sortWrapper.h b/src/query-interface/sort/sortWrapper.h index 1dcdf9ac..533052f7 100644 --- a/src/query-interface/sort/sortWrapper.h +++ b/src/query-interface/sort/sortWrapper.h @@ -11,13 +11,6 @@ #include "flash_minsort.h" #include "in_memory_sort.h" -#if defined(DESKTOP) -#include -#endif - -#define SORT_DATA_LOCATION "sort_data.bin" -#define SORT_ORDER_LOCATION "sort_order.bin" - typedef struct embedDBOperator embedDBOperator; typedef struct sortData { @@ -35,7 +28,7 @@ typedef struct sortData { } sortData; /** - * @brief Initalizes default metric values + * @brief Initializes default metric values * * @return metrics_t */ @@ -55,6 +48,14 @@ metrics_t initMetric(); */ uint32_t loadRowData(sortData *data, embedDBOperator *op, void *unsortedFile); +/** + * @brief Pure in-memory sort that avoids file I/O completely for very small datasets + * @param data Sort configuration data + * @param op The operator to read data from + * @return file_iterator_state_t* Iterator for reading sorted results from memory + */ +file_iterator_state_t *startPureMemorySort(sortData *data, embedDBOperator *op); + /** * @brief The data given in the unsortedFile is sorted and stored in the sortedFile * diff --git a/test/test_buffered_read_iterator/test_buffered_read_iterator.cpp b/test/test_buffered_read_iterator/test_buffered_read_iterator.cpp index da6a6264..e5e76f8b 100644 --- a/test/test_buffered_read_iterator/test_buffered_read_iterator.cpp +++ b/test/test_buffered_read_iterator/test_buffered_read_iterator.cpp @@ -123,7 +123,7 @@ void embedDBIterator_should_return_records_in_storage_and_in_write_buffer(void) while (embedDBNext(state, &it, &itKey, itData)) { uint32_t actualDataValue; memcpy(&actualDataValue, itData, sizeof(int)); - snprintf(message, 100, "embedDBIterator returned the wrong data value for key %li.", key); + snprintf(message, 100, "embedDBIterator returned the wrong data value for key %u.", key); TEST_ASSERT_EQUAL_UINT32_MESSAGE(expectedDataValue, actualDataValue, message); expectedDataValue += 5; numRecordsReturned += 1; @@ -169,7 +169,7 @@ void embedDBIterator_should_return_records_in_storage_and_in_write_buffer_with_f /* test data and keys are returned correctly */ while (embedDBNext(state, &it, &actualKeyValue, returnedDataValue)) { TEST_ASSERT_EQUAL_UINT32_MESSAGE(expectedKeyValue, actualKeyValue, "embedDBIterator returned an unexpected key value"); - snprintf(message, 100, "embedDBIterator did not return the correct data for key %li).", expectedKeyValue); + snprintf(message, 100, "embedDBIterator did not return the correct data for key %u).", expectedKeyValue); memcpy(&actualDataValue, returnedDataValue, sizeof(float)); TEST_ASSERT_EQUAL_FLOAT_MESSAGE(expectedDataValue, actualDataValue, message); expectedKeyValue += 3; @@ -216,7 +216,7 @@ void embedDBIterator_should_return_keys_in_write_buffer_when_no_data_has_been_fl while (embedDBNext(state, &it, &actualKeyValue, returnedDataBuffer)) { TEST_ASSERT_EQUAL_UINT32_MESSAGE(key, actualKeyValue, "embedDBIterator returned an unexpected key value"); memcpy(&acutalDataValue, returnedDataBuffer, sizeof(uint32_t)); - snprintf(message, 100, "embedDBIterator did not return the correct data for key %li).", key); + snprintf(message, 100, "embedDBIterator did not return the correct data for key %u).", key); TEST_ASSERT_EQUAL_UINT32_MESSAGE(data, acutalDataValue, message); data += 15; key += 1; @@ -261,7 +261,7 @@ void embedDBIterator_should_filter_and_rechieve_records_by_data_value(void) { /* assert returned records have correct values */ while (embedDBNext(state, &it, &itKey, itData)) { TEST_ASSERT_EQUAL_UINT32_MESSAGE(expectedKeyValue, itKey, "embedDBIterator returned a key value which should have been filtered out"); - snprintf(message, 100, "embedDBIterator did not return the correct data for key %li).", expectedKeyValue); + snprintf(message, 100, "embedDBIterator did not return the correct data for key %u).", expectedKeyValue); TEST_ASSERT_EQUAL_UINT32_MESSAGE(expectedDataValue, itData[0], message); expectedKeyValue += 1; expectedDataValue += 5; diff --git a/test/test_embedDB_data_recovery/test_embedDB_data_recovery.cpp b/test/test_embedDB_data_recovery/test_embedDB_data_recovery.cpp index cf12045b..eff41b6e 100644 --- a/test/test_embedDB_data_recovery/test_embedDB_data_recovery.cpp +++ b/test/test_embedDB_data_recovery/test_embedDB_data_recovery.cpp @@ -224,9 +224,9 @@ void embedDB_inserts_correctly_into_data_file_after_reload() { /* Records inserted before reload */ for (int i = 0; i < 3654; i++) { int8_t getResult = embedDBGet(state, &key, recordBuffer); - snprintf(message, 100, "EmbedDB get encountered an error fetching the data for key %li.", key); + snprintf(message, 100, "EmbedDB get encountered an error fetching the data for key %u.", key); TEST_ASSERT_EQUAL_INT8_MESSAGE(0, getResult, message); - snprintf(message, 100, "EmbedDB get did not return correct data for a record inserted before reloading (key %li).", key); + snprintf(message, 100, "EmbedDB get did not return correct data for a record inserted before reloading (key %u).", key); TEST_ASSERT_EQUAL_MEMORY_MESSAGE(&data, ((int64_t *)recordBuffer), state->dataSize, message); key++; data++; @@ -235,9 +235,9 @@ void embedDB_inserts_correctly_into_data_file_after_reload() { data = 11; for (int i = 0; i < 42; i++) { int8_t getResult = embedDBGet(state, &key, recordBuffer); - snprintf(message, 100, "EmbedDB get encountered an error fetching the data for key %li.", key); + snprintf(message, 100, "EmbedDB get encountered an error fetching the data for key %u.", key); TEST_ASSERT_EQUAL_INT8_MESSAGE(0, getResult, message); - snprintf(message, 100, "EmbedDB get did not return correct data for a record inserted after reloading (key %li).", key); + snprintf(message, 100, "EmbedDB get did not return correct data for a record inserted after reloading (key %u).", key); TEST_ASSERT_EQUAL_MEMORY_MESSAGE(&data, ((int64_t *)recordBuffer), state->dataSize, message); key++; data++; @@ -258,9 +258,9 @@ void embedDB_correctly_gets_records_after_reload_with_wrapped_data() { /* Records inserted before reload */ for (int i = 0; i < 3678; i++) { getResult = embedDBGet(state, &key, recordBuffer); - snprintf(message, 100, "EmbedDB get encountered an error fetching the data for key %li.", key); + snprintf(message, 100, "EmbedDB get encountered an error fetching the data for key %u.", key); TEST_ASSERT_EQUAL_INT8_MESSAGE(0, getResult, message); - snprintf(message, 100, "EmbedDB get did not return correct data for a record inserted before reloading (key %li).", key); + snprintf(message, 100, "EmbedDB get did not return correct data for a record inserted before reloading (key %u).", key); TEST_ASSERT_EQUAL_MEMORY_MESSAGE(&data, ((int64_t *)recordBuffer), state->dataSize, message); key++; data++; @@ -293,9 +293,9 @@ void embedDB_queries_correctly_with_non_liner_data_after_reload() { uint32_t i; for (i = 0; i < 3822; i++) { int8_t getResult = embedDBGet(state, &key, recordBuffer); - snprintf(message, 80, "EmbedDB get encountered an error fetching the data for key %li.", key); + snprintf(message, 80, "EmbedDB get encountered an error fetching the data for key %u.", key); TEST_ASSERT_EQUAL_INT8_MESSAGE(0, getResult, message); - snprintf(message, 100, "EmbedDB get did not return correct data for a record inserted before reloading (key %li).", key); + snprintf(message, 100, "EmbedDB get did not return correct data for a record inserted before reloading (key %u).", key); TEST_ASSERT_EQUAL_MEMORY_MESSAGE(&data, recordBuffer, sizeof(int64_t), message); key += increment; data += 1; diff --git a/test/test_embedDB_multiple_instances/test_embedDB_multiple_instances.cpp b/test/test_embedDB_multiple_instances/test_embedDB_multiple_instances.cpp index 27e92b8b..46fe1ead 100644 --- a/test/test_embedDB_multiple_instances/test_embedDB_multiple_instances.cpp +++ b/test/test_embedDB_multiple_instances/test_embedDB_multiple_instances.cpp @@ -67,9 +67,9 @@ #else #include "desktopFileInterface.h" #define FILE_TYPE FILE -#define DATA_FILE_PATH "build/artifacts/dataFile%li.bin" -#define INDEX_FILE_PATH "build/artifacts/indexFile%li.bin" -#define VAR_DATA_FILE_PATH "build/artifacts/varFile%li.bin" +#define DATA_FILE_PATH "build/artifacts/dataFile%u.bin" +#define INDEX_FILE_PATH "build/artifacts/indexFile%u.bin" +#define VAR_DATA_FILE_PATH "build/artifacts/varFile%u.bin" #endif #include "unity.h" @@ -128,9 +128,9 @@ void queryRecords(embedDBState *state, int32_t numberOfRecords, int32_t starting char message[120]; for (int32_t i = 0; i < numberOfRecords; i++) { int8_t getResult = embedDBGet(state, &key, &dataBuffer); - snprintf(message, 120, "embedDBGet returned a non-zero value when getting key %li from state %li", key, i); + snprintf(message, 120, "embedDBGet returned a non-zero value when getting key %u from state %u", key, i); TEST_ASSERT_EQUAL_INT8_MESSAGE(0, getResult, message); - snprintf(message, 120, "embedDBGet did not return the correct data for key %li from state %li", key, i); + snprintf(message, 120, "embedDBGet did not return the correct data for key %u from state %u", key, i); TEST_ASSERT_EQUAL_INT32_MESSAGE(data, dataBuffer, message); key++; data++; @@ -151,7 +151,7 @@ void insertRecordsFromFile(embedDBState *state, const char *fileName, int32_t nu for (int16_t i = 0; i < count; i++) { void *buf = (infileBuffer + headerSize + i * state->recordSize); int8_t putResult = embedDBPut(state, buf, (void *)((int8_t *)buf + 4)); - snprintf(message, 100, "embedDBPut returned non-zero value for insert of key %li", *((uint32_t *)buf)); + snprintf(message, 100, "embedDBPut returned non-zero value for insert of key %u", *((uint32_t *)buf)); TEST_ASSERT_EQUAL_INT8_MESSAGE(0, putResult, message); numInserted++; if (numInserted >= numRecords) { @@ -182,7 +182,7 @@ void insertRecordsFromFileWithVarData(embedDBState *state, const char *fileName, memcpy(&key, buf, sizeof(uint32_t)); snprintf(varData, 30, "Hello world %li", key); int8_t putResult = embedDBPutVar(state, buf, (void *)((int8_t *)buf + 4), varData, strlen(varData)); - snprintf(message, 100, "embedDBPut returned non-zero value for insert of key %li", key); + snprintf(message, 100, "embedDBPut returned non-zero value for insert of key %u", key); TEST_ASSERT_EQUAL_INT8_MESSAGE(0, putResult, message); numInserted++; if (numInserted >= numRecords) { @@ -213,9 +213,9 @@ void queryRecordsFromFile(embedDBState *state, const char *fileName, int32_t num int8_t getResult = embedDBGet(state, buf, dataBuffer); uint32_t key = 0; memcpy(&key, buf, sizeof(uint32_t)); - snprintf(message, 100, "embedDBGet was not able to find the data for key %li", key); + snprintf(message, 100, "embedDBGet was not able to find the data for key %u", key); TEST_ASSERT_EQUAL_INT8_MESSAGE(0, getResult, message); - snprintf(message, 100, "embedDBGet did not return the correct data for key %li", key); + snprintf(message, 100, "embedDBGet did not return the correct data for key %u", key); TEST_ASSERT_EQUAL_MEMORY_MESSAGE((int8_t *)buf + 4, dataBuffer, state->dataSize, message); numRead++; if (numRead >= numRecords) @@ -246,17 +246,17 @@ void queryRecordsFromFileWithVarData(embedDBState *state, const char *fileName, void *buf = (infileBuffer + headerSize + i * (state->keySize + state->dataSize)); uint32_t key = 0; memcpy(&key, buf, sizeof(uint32_t)); - snprintf(varDataExpected, 30, "Hello world %li", key); + snprintf(varDataExpected, 30, "Hello world %u", key); embedDBVarDataStream *stream = NULL; int8_t getResult = embedDBGetVar(state, buf, dataBuffer, &stream); - snprintf(message, 100, "embedDBGetVar was not able to find the data for key %li", key); + snprintf(message, 100, "embedDBGetVar was not able to find the data for key %u", key); TEST_ASSERT_EQUAL_INT8_MESSAGE(0, getResult, message); - snprintf(message, 100, "embedDBGetBar did not return the correct data for key %li", key); + snprintf(message, 100, "embedDBGetBar did not return the correct data for key %u", key); TEST_ASSERT_EQUAL_MEMORY_MESSAGE((int8_t *)buf + 4, dataBuffer, state->dataSize, message); uint32_t streamBytesRead = embedDBVarDataStreamRead(state, stream, varDataBuffer, strlen(varDataExpected)); - snprintf(message, 100, "embedDBGetVar did not return the correct number of bytes read for key %li.", key); + snprintf(message, 100, "embedDBGetVar did not return the correct number of bytes read for key %u.", key); TEST_ASSERT_EQUAL_UINT32_MESSAGE(strlen(varDataExpected), streamBytesRead, message); - snprintf(message, 100, "embedDBGetVar did not return the correct variable data for key %li", key); + snprintf(message, 100, "embedDBGetVar did not return the correct variable data for key %u", key); TEST_ASSERT_EQUAL_MEMORY_MESSAGE(varDataExpected, varDataBuffer, strlen(varDataExpected), message); numRead++; diff --git a/test/test_sort/test_sort_query_interface.cpp b/test/test_sort/test_sort_query_interface.cpp index 85c5a509..3f43f220 100644 --- a/test/test_sort/test_sort_query_interface.cpp +++ b/test/test_sort/test_sort_query_interface.cpp @@ -6,31 +6,11 @@ #endif -#ifdef ARDUINO -// For Arduino, setupFile is not used since we use pure memory sort -// But we need to define it for linking compatibility -#ifdef __cplusplus -extern "C" { -#endif - -void* setupSDFile(char* filename); - -void* setupFile(const char* filename) { - return setupSDFile((char*)filename); -} - -#ifdef __cplusplus -} -#endif -#endif - #define STORAGE_TYPE 0 -#ifdef ARDUINO -// pio test --environment due --filter "test_sort" - #if defined(MEMBOARD) && STORAGE_TYPE == 1 #include "dataflashFileInterface.h" +#include "memboardTestSetup.h" #endif #if defined(MEGA) @@ -41,80 +21,80 @@ void* setupFile(const char* filename) { #include "dueTestSetup.h" #endif -#include "SDFileInterface.h" +#ifdef ARDUINO #define FILE_TYPE SD_FILE +#include "SDFileInterface.h" #define getFileInterface getSDInterface +#define setupFile setupSDFile #define tearDownFile tearDownSDFile - +#define DATA_FILE_PATH "dataFile.bin" #define clock millis #define DATA_FILE_PATH_UWA "dataFileUWA.bin" #define INDEX_FILE_PATH_UWA "indexFileUWA.bin" #define DATA_FILE_PATH_SEA "dataFileSEA.bin" #define INDEX_FILE_PATH_SEA "indexFileSEA.bin" - #else - #define FILE_TYPE FILE #include "desktopFileInterface.h" #define DATA_FILE_PATH_UWA "build/artifacts/dataFileUWA.bin" #define INDEX_FILE_PATH_UWA "build/artifacts/indexFileUWA.bin" #define DATA_FILE_PATH_SEA "build/artifacts/dataFileSEA.bin" #define INDEX_FILE_PATH_SEA "build/artifacts/indexFileSEA.bin" - #endif #include "unity.h" -embedDBState* stateUWA; +embedDBState* state; embedDBSchema* baseSchema; void setUp() { if (STORAGE_TYPE == 1) { TEST_FAIL_MESSAGE("Dataflash is not currently supported. Defaulting to SD card interface."); } - stateUWA = (embedDBState*)malloc(sizeof(embedDBState)); - stateUWA->keySize = 4; - stateUWA->dataSize = 12; - stateUWA->compareKey = int32Comparator; - stateUWA->compareData = int32Comparator; - stateUWA->pageSize = 512; - stateUWA->eraseSizeInPages = 4; - stateUWA->numDataPages = 20000; - stateUWA->numIndexPages = 1000; - stateUWA->numSplinePoints = 30; + state = (embedDBState*)malloc(sizeof(embedDBState)); + state->keySize = 4; + state->dataSize = 12; + state->compareKey = int32Comparator; + state->compareData = int32Comparator; + state->pageSize = 512; + state->eraseSizeInPages = 4; + state->numDataPages = 20000; + state->numIndexPages = 1000; + state->numSplinePoints = 120; /* Setup files */ char dataPath[] = DATA_FILE_PATH_UWA, indexPath[] = INDEX_FILE_PATH_UWA; - stateUWA->fileInterface = getFileInterface(); - stateUWA->dataFile = setupFile(dataPath); - stateUWA->indexFile = setupFile(indexPath); - - stateUWA->bufferSizeInBlocks = 4; - stateUWA->buffer = malloc(stateUWA->bufferSizeInBlocks * stateUWA->pageSize); - stateUWA->parameters = EMBEDDB_USE_BMAP | EMBEDDB_USE_INDEX | EMBEDDB_RESET_DATA; - stateUWA->bitmapSize = 2; - stateUWA->inBitmap = inBitmapInt16; - stateUWA->updateBitmap = updateBitmapInt16; - stateUWA->buildBitmapFromRange = buildBitmapInt16FromRange; - int8_t initResult = embedDBInit(stateUWA, 1); + state->fileInterface = getFileInterface(); + + state->dataFile = state->fileInterface->setup(dataPath); + state->indexFile = state->fileInterface->setup(indexPath); + + state->bufferSizeInBlocks = 4; + state->buffer = malloc(state->bufferSizeInBlocks * state->pageSize); + state->parameters = EMBEDDB_USE_BMAP | EMBEDDB_USE_INDEX | EMBEDDB_RESET_DATA; + state->bitmapSize = 2; + state->inBitmap = inBitmapInt16; + state->updateBitmap = updateBitmapInt16; + state->buildBitmapFromRange = buildBitmapInt16FromRange; + int8_t initResult = embedDBInit(state, 1); if (initResult != 0) { TEST_FAIL_MESSAGE("There was an error setting up the state of the UWA dataset."); } - stateUWA->rules = NULL; - stateUWA->numRules = 0; + state->rules = NULL; + state->numRules = 0; - int8_t colSizes[] = {4, 12}; - int8_t colSignedness[] = {embedDB_COLUMN_UNSIGNED, embedDB_COLUMN_UNSIGNED}; - ColumnType colTypes[] = {embedDB_COLUMN_UINT32, embedDB_COLUMN_UINT32}; - baseSchema = embedDBCreateSchema(2, colSizes, colSignedness, colTypes); + int8_t colSizes[] = {4, 4, 4, 4}; + int8_t colSignedness[] = {embedDB_COLUMN_UNSIGNED, embedDB_COLUMN_SIGNED, embedDB_COLUMN_SIGNED, embedDB_COLUMN_SIGNED}; + ColumnType colTypes[] = {embedDB_COLUMN_UINT32, embedDB_COLUMN_INT32, embedDB_COLUMN_INT32, embedDB_COLUMN_INT32}; + baseSchema = embedDBCreateSchema(4, colSizes, colSignedness, colTypes); } void tearDown() { - embedDBClose(stateUWA); - tearDownFile(stateUWA->dataFile); - tearDownFile(stateUWA->indexFile); - free(stateUWA->fileInterface); - free(stateUWA->buffer); - free(stateUWA); + embedDBClose(state); + tearDownFile(state->dataFile); + tearDownFile(state->indexFile); + free(state->fileInterface); + free(state->buffer); + free(state); embedDBFreeSchema(&baseSchema); } @@ -147,10 +127,17 @@ void insertNValues(embedDBState* state, int n, int mode) { } break; case 1: + key = 1; for (int i = n; i >= 0; i--) { - key = i; value = i; embedDBPut(state, &key, &value); + key++; + } + for (int i = 0, data = n; i <= n; i++) { + key = i + 1; + embedDBGet(state, (void*)&key, (void*)&value); + TEST_ASSERT_MESSAGE(value == data, "value isn't equal to extracted data"); + data--; } break; default: @@ -158,127 +145,90 @@ void insertNValues(embedDBState* state, int n, int mode) { } } +void debugBinData(embedDBOperator* op, uint32_t numValues, uint8_t col) { + op->init(op); + int32_t* buffer = (int32_t*)op->recordBuffer; + printf("\n"); + for (uint32_t i = 0; i <= numValues; ++i) { + exec(op); + printf("%i ", (int32_t)buffer[col]); + } + printf("\n"); + // fflush(stdout); +} + void runTestSequentialValues() { // Insert test data -#ifdef ARDUINO - insertNValues(stateUWA, 1, 0); -#else - insertNValues(stateUWA, 10, 0); -#endif + insertNValues(state, 300, 1); embedDBIterator it; it.minKey = NULL; it.maxKey = NULL; it.minData = NULL; it.maxData = NULL; - embedDBInitIterator(stateUWA, &it); - - embedDBOperator* scanOpOrderBy = createTableScanOperator(stateUWA, &it, baseSchema); + embedDBInitIterator(state, &it); + embedDBOperator* scanOpOrderBy = createTableScanOperator(state, &it, baseSchema); uint8_t projColsOB[] = {0, 1}; embedDBOperator* projColsOrderBy = createProjectionOperator(scanOpOrderBy, 2, projColsOB); - embedDBOperator* orderByOp = createOrderByOperator(stateUWA, projColsOrderBy, 1, 3, int32Comparator); + embedDBOperator* orderByOp = createOrderByOperator(state, projColsOrderBy, 1, -1, int32Comparator); + // debugBinData(orderByOp, 70, 1); orderByOp->init(orderByOp); int32_t* recordBuffer = (int32_t*)orderByOp->recordBuffer; - uint32_t previous = 0; + exec(orderByOp); + int32_t previous = ((int32_t)recordBuffer[1]); int recordCount = 0; while (exec(orderByOp)) { - TEST_ASSERT_GREATER_OR_EQUAL_UINT32_MESSAGE(previous, ((uint32_t)recordBuffer[1]) / 10.0, "Sort value is not greater than or equal to previous value."); - previous = ((uint32_t)recordBuffer[1]) / 10.0; + TEST_ASSERT_GREATER_OR_EQUAL_INT32_MESSAGE(previous, ((int32_t)recordBuffer[1]), "Sort value is not greater than or equal to previous value."); + previous = ((int32_t)recordBuffer[1]); recordCount++; - - // Safety break to prevent infinite loop - if (recordCount >= 10) { - break; - } } orderByOp->close(orderByOp); embedDBFreeOperatorRecursive(&orderByOp); } -void runTestUsingUWA500k() { - printf("Advanced Query Example.\n"); - embedDBState* stateUWA = (embedDBState*)malloc(sizeof(embedDBState)); - stateUWA->keySize = 4; - stateUWA->dataSize = 12; - stateUWA->compareKey = int32Comparator; - stateUWA->compareData = int32Comparator; - stateUWA->pageSize = 512; - stateUWA->eraseSizeInPages = 4; - stateUWA->numDataPages = 20000; - stateUWA->numIndexPages = 1000; - stateUWA->numSplinePoints = 30; - - if (STORAGE_TYPE == 1) { - TEST_FAIL_MESSAGE("Dataflash is not currently supported. Defaulting to SD card interface."); - } - - /* Setup files */ - char dataPath[] = DATA_FILE_PATH_UWA, indexPath[] = INDEX_FILE_PATH_UWA; - stateUWA->fileInterface = getFileInterface(); - stateUWA->dataFile = setupFile(dataPath); - stateUWA->indexFile = setupFile(indexPath); - - stateUWA->bufferSizeInBlocks = 4; - stateUWA->buffer = malloc(stateUWA->bufferSizeInBlocks * stateUWA->pageSize); - stateUWA->parameters = EMBEDDB_USE_BMAP | EMBEDDB_USE_INDEX | EMBEDDB_RESET_DATA; - stateUWA->bitmapSize = 2; - stateUWA->inBitmap = inBitmapInt16; - stateUWA->updateBitmap = updateBitmapInt16; - stateUWA->buildBitmapFromRange = buildBitmapInt16FromRange; - int8_t initResult = embedDBInit(stateUWA, 1); - if (initResult != 0) { - TEST_FAIL_MESSAGE("There was an error setting up the state of the UWA dataset."); - } - - int8_t colSizes[] = {4, 4, 4, 4}; - int8_t colSignedness[] = {embedDB_COLUMN_UNSIGNED, embedDB_COLUMN_SIGNED, embedDB_COLUMN_SIGNED, embedDB_COLUMN_SIGNED}; - ColumnType colTypes[] = {embedDB_COLUMN_UINT32, embedDB_COLUMN_INT32, embedDB_COLUMN_INT32, embedDB_COLUMN_INT32}; - embedDBSchema* baseSchema = embedDBCreateSchema(4, colSizes, colSignedness, colTypes); - +void runTestUsingSEA100k() { // Insert data - const char datafileName[] = "data/uwa500K.bin"; - insertData(stateUWA, datafileName); + const char datafileName[] = "data/sea100K.bin"; + insertData(state, datafileName); embedDBIterator it; it.minKey = NULL; it.maxKey = NULL; it.minData = NULL; it.maxData = NULL; - embedDBInitIterator(stateUWA, &it); + embedDBInitIterator(state, &it); - embedDBOperator* scanOpOrderBy = createTableScanOperator(stateUWA, &it, baseSchema); + embedDBOperator* scanOpOrderBy = createTableScanOperator(state, &it, baseSchema); + // debugBinData(scanOpOrderBy, 200, 0); uint8_t projColsOB[] = {0, 1}; embedDBOperator* projColsOrderBy = createProjectionOperator(scanOpOrderBy, 2, projColsOB); - embedDBOperator* orderByOp = createOrderByOperator(stateUWA, projColsOrderBy, 1, -1, int32Comparator); + // debugBinData(projColsOrderBy, 300, 1); + embedDBOperator* orderByOp = createOrderByOperator(state, projColsOrderBy, 1, -1, int32Comparator); + // debugBinData(orderByOp, 100000, 1); + orderByOp->init(orderByOp); + int32_t* recordBuffer = (int32_t*)orderByOp->recordBuffer; - uint32_t previous = 0; + exec(orderByOp); + int32_t previous = ((int32_t)recordBuffer[1]) / 10.0; // Result of the sort - while (exec(orderByOp)) { - TEST_ASSERT_GREATER_OR_EQUAL_UINT32_MESSAGE(previous, ((uint32_t)recordBuffer[1]) / 10.0, "Sort value is not greater than or equal to previous value previous values."); - previous = ((uint32_t)recordBuffer[1]) / 10.0; + TEST_ASSERT_GREATER_OR_EQUAL_INT32_MESSAGE(previous, ((int32_t)recordBuffer[1]) / 10.0, "Sort value is not greater than or equal to previous value previous values."); + previous = ((int32_t)recordBuffer[1]) / 10.0; } orderByOp->close(orderByOp); embedDBFreeOperatorRecursive(&orderByOp); - // Close embedDB - embedDBClose(stateUWA); - tearDownFile(stateUWA->dataFile); - tearDownFile(stateUWA->indexFile); - free(stateUWA->fileInterface); - free(stateUWA->buffer); - free(stateUWA); - embedDBFreeSchema(&baseSchema); } int runUnityTests() { UNITY_BEGIN(); RUN_TEST(runTestSequentialValues); + RUN_TEST(runTestUsingSEA100k); return UNITY_END(); }