diff --git a/src/ParquetMsg.chpl b/src/ParquetMsg.chpl index 160160bed..5803dc827 100644 --- a/src/ParquetMsg.chpl +++ b/src/ParquetMsg.chpl @@ -172,7 +172,7 @@ module ParquetMsg { } proc readStrFilesByName(ref A: [] ?t, filenames: [] string, sizes: [] int, dsetname: string) throws { - extern proc c_readStrColumnByName(filename, arr_chpl, colname, batchSize, errMsg): int; + extern proc c_readStrColumnByName(filename, arr_chpl, colname, numElems, batchSize, errMsg): int; var (subdoms, length) = getSubdomains(sizes); coforall loc in A.targetLocales() do on loc { @@ -188,7 +188,7 @@ module ParquetMsg { var col: [filedom] t; if c_readStrColumnByName(filename.localize().c_str(), c_ptrTo(col), - dsetname.localize().c_str(), + dsetname.localize().c_str(), filedom.size, batchSize, c_ptrTo(pqErr.errMsg)) == ARROWERROR { pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName()); } diff --git a/src/parquet/ReadParquet.cpp b/src/parquet/ReadParquet.cpp index 2ab2958ca..244276d1b 100644 --- a/src/parquet/ReadParquet.cpp +++ b/src/parquet/ReadParquet.cpp @@ -98,7 +98,7 @@ int64_t readColumnIrregularBitWidth(void* chpl_arr, int64_t startIdx, std::share return i; } -int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t batchSize, char** errMsg) { +int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t numElems, int64_t batchSize, char** errMsg) { try { int64_t ty = cpp_getType(filename, colname, errMsg); @@ -131,23 +131,37 @@ int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* co column_reader = row_group_reader->Column(idx); if(ty == ARROWSTRING) { - int16_t definition_level; // nullable type and only reading single records in batch auto chpl_ptr = (unsigned char*)chpl_arr; parquet::ByteArrayReader* reader = static_cast(column_reader.get()); - while (reader->HasNext()) { - parquet::ByteArray value; - (void)reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read); - // if values_read is 0, that means that it was a null value - if(values_read > 0) { - for(int j = 0; j < value.len; j++) { - chpl_ptr[i] = value.ptr[j]; + int totalProcessed = 0; + std::vector values(batchSize); + while (reader->HasNext() && totalProcessed < numElems) { + std::vector definition_levels(batchSize,-1); + if((numElems - totalProcessed) < batchSize) // adjust batchSize if needed + batchSize = numElems - totalProcessed; + + (void)reader->ReadBatch(batchSize, definition_levels.data(), nullptr, values.data(), &values_read); + totalProcessed += values_read; + int j = 0; + int numProcessed = 0; + while(j < batchSize) { + if(definition_levels[j] == 1) { + for(int k = 0; k < values[numProcessed].len; k++) { + chpl_ptr[i] = values[numProcessed].ptr[k]; + i++; + } + i++; // skip one space so the strings are null terminated with a 0 + numProcessed++; + } else if(definition_levels[j] == 0) { i++; + } else { + j = batchSize; // exit loop, not read } + j++; } - i++; // skip one space so the strings are null terminated with a 0 - } + } } } return 0; @@ -744,8 +758,8 @@ int64_t cpp_getListColumnSize(const char* filename, const char* colname, void* c } extern "C" { - int c_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t batchSize, char** errMsg) { - return cpp_readStrColumnByName(filename, chpl_arr, colname, batchSize, errMsg); + int c_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t numElems, int64_t batchSize, char** errMsg) { + return cpp_readStrColumnByName(filename, chpl_arr, colname, numElems, batchSize, errMsg); } int c_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_chpl, const char* colname, int64_t numElems, int64_t startIdx, int64_t batchSize, int64_t byteLength, bool hasNonFloatNulls, char** errMsg) { diff --git a/src/parquet/ReadParquet.h b/src/parquet/ReadParquet.h index 6a90ee07e..c449204bf 100644 --- a/src/parquet/ReadParquet.h +++ b/src/parquet/ReadParquet.h @@ -15,9 +15,9 @@ #include extern "C" { #endif - int c_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t batchSize, char** errMsg); + int c_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t numElems, int64_t batchSize, char** errMsg); - int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t batchSize, char** errMsg); + int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t numElems, int64_t batchSize, char** errMsg); int c_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_chpl, const char* colname, int64_t numElems, int64_t startIdx,