From 9992316748a372d47644537c185553db413fadb0 Mon Sep 17 00:00:00 2001 From: Brian Park <12477449+park-brian@users.noreply.github.com> Date: Thu, 19 Dec 2024 21:08:22 -0500 Subject: [PATCH] Enable readColumn to read all rows (#53) * Enable readColumn to read all rows * Refactor readColumn to use hasRowLimit * Simplify hasRowLimit condition * Check less common condition first * add readColumn test files * implement readColumn tests for undefined rowLimits * remove unused variable * return early if no metadata is present * address tsc warnings * add comparison * clarify that undefined is valid for rowLimit * remove test files * verify edge case works when rowLimit is undefined * add test cases for readColumn --------- Co-authored-by: Brian Park --- src/column.js | 18 ++++++---- test/column.test.js | 83 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 7 deletions(-) create mode 100644 test/column.test.js diff --git a/src/column.js b/src/column.js index a72967d..9b76a68 100644 --- a/src/column.js +++ b/src/column.js @@ -10,7 +10,7 @@ import { concat } from './utils.js' * Parse column data from a buffer. * * @param {DataReader} reader - * @param {number} rowLimit maximum number of rows to read + * @param {number | undefined} rowLimit maximum number of rows to read (undefined reads all rows) * @param {ColumnMetaData} columnMetadata column metadata * @param {SchemaTree[]} schemaPath schema path for the column * @param {ParquetReadOptions} options read options @@ -22,8 +22,10 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr let dictionary = undefined /** @type {any[]} */ const rowData = [] + const hasRowLimit = rowLimit !== undefined && rowLimit >= 0 && isFinite(rowLimit) - while (rowData.length < rowLimit) { + while (!hasRowLimit || rowData.length < rowLimit) { + if (reader.offset >= reader.view.byteLength - 1) break // end of reader // parse column header const header = parquetHeader(reader) // assert(header.compressed_page_size !== undefined) @@ -93,11 +95,13 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr } reader.offset += header.compressed_page_size } - if (rowData.length < rowLimit) { - throw new Error(`parquet row data length ${rowData.length} does not match row group limit ${rowLimit}}`) - } - if (rowData.length > rowLimit) { - rowData.length = rowLimit // truncate to row limit + if (hasRowLimit) { + if (rowData.length < rowLimit) { + throw new Error(`parquet row data length ${rowData.length} does not match row group limit ${rowLimit}}`) + } + if (rowData.length > rowLimit) { + rowData.length = rowLimit // truncate to row limit + } } return rowData } diff --git a/test/column.test.js b/test/column.test.js new file mode 100644 index 0000000..7e5e20b --- /dev/null +++ b/test/column.test.js @@ -0,0 +1,83 @@ +import { compressors } from 'hyparquet-compressors' +import { describe, expect, it } from 'vitest' +import { parquetMetadata } from '../src/hyparquet.js' +import { getSchemaPath } from '../src/schema.js' +import { getColumnRange, readColumn } from '../src/column.js' +import { asyncBufferFromFile } from '../src/utils.js' + +describe('readColumn', () => { + it('read columns when rowLimit is undefined', async () => { + const testFile = 'test/files/float16_nonzeros_and_nans.parquet' + const asyncBuffer = await asyncBufferFromFile(testFile) + const arrayBuffer = await asyncBuffer.slice(0) + const metadata = parquetMetadata(arrayBuffer) + + const column = metadata.row_groups[0].columns[0] + if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`) + const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number) + const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte) + const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? []) + const reader = { view: new DataView(columnArrayBuffer), offset: 0 } + + const rowLimit = undefined + const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors }) + const expected = [null, 1, -2, NaN, 0, -1, -0, 2] + expect(result).toEqual(expected) + }) + + it('read columns when rowLimit is Infinity', async () => { + const testFile = 'test/files/float16_nonzeros_and_nans.parquet' + const asyncBuffer = await asyncBufferFromFile(testFile) + const arrayBuffer = await asyncBuffer.slice(0) + const metadata = parquetMetadata(arrayBuffer) + + const column = metadata.row_groups[0].columns[0] + if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`) + const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number) + const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte) + const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? []) + const reader = { view: new DataView(columnArrayBuffer), offset: 0 } + + const rowLimit = Infinity + const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors }) + const expected = [null, 1, -2, NaN, 0, -1, -0, 2] + expect(result).toEqual(expected) + }) + + it('read columns when rowLimit is defined', async () => { + const testFile = 'test/files/float16_nonzeros_and_nans.parquet' + const asyncBuffer = await asyncBufferFromFile(testFile) + const arrayBuffer = await asyncBuffer.slice(0) + const metadata = parquetMetadata(arrayBuffer) + + const column = metadata.row_groups[0].columns[0] + if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`) + const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number) + const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte) + const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? []) + const reader = { view: new DataView(columnArrayBuffer), offset: 0 } + + const rowLimit = 2 + const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors }) + expect(result.length).toBe(rowLimit) + }) + + it('read columns when rowLimit is 0', async () => { + const testFile = 'test/files/float16_nonzeros_and_nans.parquet' + const asyncBuffer = await asyncBufferFromFile(testFile) + const arrayBuffer = await asyncBuffer.slice(0) + const metadata = parquetMetadata(arrayBuffer) + + const column = metadata.row_groups[0].columns[0] + if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`) + const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number) + const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte) + const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? []) + const reader = { view: new DataView(columnArrayBuffer), offset: 0 } + + const rowLimit = 0 + const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors }) + expect(result.length).toBe(rowLimit) + }) + +})