mirror of
https://github.com/asadbek064/hyparquet.git
synced 2026-01-09 04:26:38 +00:00
Enable readColumn to read all rows (#53)
* Enable readColumn to read all rows * Refactor readColumn to use hasRowLimit * Simplify hasRowLimit condition * Check less common condition first * add readColumn test files * implement readColumn tests for undefined rowLimits * remove unused variable * return early if no metadata is present * address tsc warnings * add comparison * clarify that undefined is valid for rowLimit * remove test files * verify edge case works when rowLimit is undefined * add test cases for readColumn --------- Co-authored-by: Brian Park <park-brian@users.noreply.github.com>
This commit is contained in:
parent
7ce11ad844
commit
9992316748
@ -10,7 +10,7 @@ import { concat } from './utils.js'
|
||||
* Parse column data from a buffer.
|
||||
*
|
||||
* @param {DataReader} reader
|
||||
* @param {number} rowLimit maximum number of rows to read
|
||||
* @param {number | undefined} rowLimit maximum number of rows to read (undefined reads all rows)
|
||||
* @param {ColumnMetaData} columnMetadata column metadata
|
||||
* @param {SchemaTree[]} schemaPath schema path for the column
|
||||
* @param {ParquetReadOptions} options read options
|
||||
@ -22,8 +22,10 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr
|
||||
let dictionary = undefined
|
||||
/** @type {any[]} */
|
||||
const rowData = []
|
||||
const hasRowLimit = rowLimit !== undefined && rowLimit >= 0 && isFinite(rowLimit)
|
||||
|
||||
while (rowData.length < rowLimit) {
|
||||
while (!hasRowLimit || rowData.length < rowLimit) {
|
||||
if (reader.offset >= reader.view.byteLength - 1) break // end of reader
|
||||
// parse column header
|
||||
const header = parquetHeader(reader)
|
||||
// assert(header.compressed_page_size !== undefined)
|
||||
@ -93,11 +95,13 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr
|
||||
}
|
||||
reader.offset += header.compressed_page_size
|
||||
}
|
||||
if (rowData.length < rowLimit) {
|
||||
throw new Error(`parquet row data length ${rowData.length} does not match row group limit ${rowLimit}}`)
|
||||
}
|
||||
if (rowData.length > rowLimit) {
|
||||
rowData.length = rowLimit // truncate to row limit
|
||||
if (hasRowLimit) {
|
||||
if (rowData.length < rowLimit) {
|
||||
throw new Error(`parquet row data length ${rowData.length} does not match row group limit ${rowLimit}}`)
|
||||
}
|
||||
if (rowData.length > rowLimit) {
|
||||
rowData.length = rowLimit // truncate to row limit
|
||||
}
|
||||
}
|
||||
return rowData
|
||||
}
|
||||
|
||||
83
test/column.test.js
Normal file
83
test/column.test.js
Normal file
@ -0,0 +1,83 @@
|
||||
import { compressors } from 'hyparquet-compressors'
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import { parquetMetadata } from '../src/hyparquet.js'
|
||||
import { getSchemaPath } from '../src/schema.js'
|
||||
import { getColumnRange, readColumn } from '../src/column.js'
|
||||
import { asyncBufferFromFile } from '../src/utils.js'
|
||||
|
||||
describe('readColumn', () => {
|
||||
it('read columns when rowLimit is undefined', async () => {
|
||||
const testFile = 'test/files/float16_nonzeros_and_nans.parquet'
|
||||
const asyncBuffer = await asyncBufferFromFile(testFile)
|
||||
const arrayBuffer = await asyncBuffer.slice(0)
|
||||
const metadata = parquetMetadata(arrayBuffer)
|
||||
|
||||
const column = metadata.row_groups[0].columns[0]
|
||||
if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`)
|
||||
const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number)
|
||||
const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte)
|
||||
const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? [])
|
||||
const reader = { view: new DataView(columnArrayBuffer), offset: 0 }
|
||||
|
||||
const rowLimit = undefined
|
||||
const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors })
|
||||
const expected = [null, 1, -2, NaN, 0, -1, -0, 2]
|
||||
expect(result).toEqual(expected)
|
||||
})
|
||||
|
||||
it('read columns when rowLimit is Infinity', async () => {
|
||||
const testFile = 'test/files/float16_nonzeros_and_nans.parquet'
|
||||
const asyncBuffer = await asyncBufferFromFile(testFile)
|
||||
const arrayBuffer = await asyncBuffer.slice(0)
|
||||
const metadata = parquetMetadata(arrayBuffer)
|
||||
|
||||
const column = metadata.row_groups[0].columns[0]
|
||||
if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`)
|
||||
const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number)
|
||||
const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte)
|
||||
const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? [])
|
||||
const reader = { view: new DataView(columnArrayBuffer), offset: 0 }
|
||||
|
||||
const rowLimit = Infinity
|
||||
const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors })
|
||||
const expected = [null, 1, -2, NaN, 0, -1, -0, 2]
|
||||
expect(result).toEqual(expected)
|
||||
})
|
||||
|
||||
it('read columns when rowLimit is defined', async () => {
|
||||
const testFile = 'test/files/float16_nonzeros_and_nans.parquet'
|
||||
const asyncBuffer = await asyncBufferFromFile(testFile)
|
||||
const arrayBuffer = await asyncBuffer.slice(0)
|
||||
const metadata = parquetMetadata(arrayBuffer)
|
||||
|
||||
const column = metadata.row_groups[0].columns[0]
|
||||
if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`)
|
||||
const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number)
|
||||
const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte)
|
||||
const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? [])
|
||||
const reader = { view: new DataView(columnArrayBuffer), offset: 0 }
|
||||
|
||||
const rowLimit = 2
|
||||
const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors })
|
||||
expect(result.length).toBe(rowLimit)
|
||||
})
|
||||
|
||||
it('read columns when rowLimit is 0', async () => {
|
||||
const testFile = 'test/files/float16_nonzeros_and_nans.parquet'
|
||||
const asyncBuffer = await asyncBufferFromFile(testFile)
|
||||
const arrayBuffer = await asyncBuffer.slice(0)
|
||||
const metadata = parquetMetadata(arrayBuffer)
|
||||
|
||||
const column = metadata.row_groups[0].columns[0]
|
||||
if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`)
|
||||
const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number)
|
||||
const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte)
|
||||
const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? [])
|
||||
const reader = { view: new DataView(columnArrayBuffer), offset: 0 }
|
||||
|
||||
const rowLimit = 0
|
||||
const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors })
|
||||
expect(result.length).toBe(rowLimit)
|
||||
})
|
||||
|
||||
})
|
||||
Loading…
Reference in New Issue
Block a user