Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable readColumn to read all rows #53

Merged
merged 15 commits into from
Dec 20, 2024
Merged
18 changes: 11 additions & 7 deletions src/column.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { concat } from './utils.js'
* Parse column data from a buffer.
*
* @param {DataReader} reader
* @param {number} rowLimit maximum number of rows to read
* @param {number | undefined} rowLimit maximum number of rows to read (undefined reads all rows)
* @param {ColumnMetaData} columnMetadata column metadata
* @param {SchemaTree[]} schemaPath schema path for the column
* @param {ParquetReadOptions} options read options
Expand All @@ -22,8 +22,10 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr
let dictionary = undefined
/** @type {any[]} */
const rowData = []
const hasRowLimit = rowLimit !== undefined && rowLimit >= 0 && isFinite(rowLimit)

while (rowData.length < rowLimit) {
while (!hasRowLimit || rowData.length < rowLimit) {
if (reader.offset >= reader.view.byteLength - 1) break // end of reader
// parse column header
const header = parquetHeader(reader)
// assert(header.compressed_page_size !== undefined)
Expand Down Expand Up @@ -93,11 +95,13 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr
}
reader.offset += header.compressed_page_size
}
if (rowData.length < rowLimit) {
throw new Error(`parquet row data length ${rowData.length} does not match row group limit ${rowLimit}}`)
}
if (rowData.length > rowLimit) {
rowData.length = rowLimit // truncate to row limit
if (hasRowLimit) {
if (rowData.length < rowLimit) {
throw new Error(`parquet row data length ${rowData.length} does not match row group limit ${rowLimit}}`)
}
if (rowData.length > rowLimit) {
rowData.length = rowLimit // truncate to row limit
}
}
return rowData
}
Expand Down
83 changes: 83 additions & 0 deletions test/column.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { compressors } from 'hyparquet-compressors'
import { describe, expect, it } from 'vitest'
import { parquetMetadata } from '../src/hyparquet.js'
import { getSchemaPath } from '../src/schema.js'
import { getColumnRange, readColumn } from '../src/column.js'
import { asyncBufferFromFile } from '../src/utils.js'

describe('readColumn', () => {
it('read columns when rowLimit is undefined', async () => {
Copy link
Contributor

@severo severo Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could factor these four tests with https://vitest.dev/api/#test-for, as they only differ in the last two lines. I'll send a PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#55

const testFile = 'test/files/float16_nonzeros_and_nans.parquet'
const asyncBuffer = await asyncBufferFromFile(testFile)
const arrayBuffer = await asyncBuffer.slice(0)
const metadata = parquetMetadata(arrayBuffer)

const column = metadata.row_groups[0].columns[0]
if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`)
const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number)
const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte)
const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? [])
const reader = { view: new DataView(columnArrayBuffer), offset: 0 }

const rowLimit = undefined
const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors })
const expected = [null, 1, -2, NaN, 0, -1, -0, 2]
expect(result).toEqual(expected)
})

it('read columns when rowLimit is Infinity', async () => {
const testFile = 'test/files/float16_nonzeros_and_nans.parquet'
const asyncBuffer = await asyncBufferFromFile(testFile)
const arrayBuffer = await asyncBuffer.slice(0)
const metadata = parquetMetadata(arrayBuffer)

const column = metadata.row_groups[0].columns[0]
if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`)
const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number)
const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte)
const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? [])
const reader = { view: new DataView(columnArrayBuffer), offset: 0 }

const rowLimit = Infinity
const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors })
const expected = [null, 1, -2, NaN, 0, -1, -0, 2]
expect(result).toEqual(expected)
})

it('read columns when rowLimit is defined', async () => {
const testFile = 'test/files/float16_nonzeros_and_nans.parquet'
const asyncBuffer = await asyncBufferFromFile(testFile)
const arrayBuffer = await asyncBuffer.slice(0)
const metadata = parquetMetadata(arrayBuffer)

const column = metadata.row_groups[0].columns[0]
if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`)
const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number)
const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte)
const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? [])
const reader = { view: new DataView(columnArrayBuffer), offset: 0 }

const rowLimit = 2
const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors })
expect(result.length).toBe(rowLimit)
})

it('read columns when rowLimit is 0', async () => {
const testFile = 'test/files/float16_nonzeros_and_nans.parquet'
const asyncBuffer = await asyncBufferFromFile(testFile)
const arrayBuffer = await asyncBuffer.slice(0)
const metadata = parquetMetadata(arrayBuffer)

const column = metadata.row_groups[0].columns[0]
if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`)
const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number)
const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte)
const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? [])
const reader = { view: new DataView(columnArrayBuffer), offset: 0 }

const rowLimit = 0
const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors })
expect(result.length).toBe(rowLimit)
})

})
Loading