diff --git a/src/column.js b/src/column.js index 68169f9..8ab09a8 100644 --- a/src/column.js +++ b/src/column.js @@ -10,12 +10,11 @@ import { deserializeTCompactProtocol } from './thrift.js' * Parse column data from a buffer. * * @param {DataReader} reader - * @param {number} rowGroupStart skip this many rows in the row group - * @param {number} rowGroupEnd read up to this index in the row group (Infinity reads all rows) + * @param {RowGroupSelect} rowGroupSelect row group selection * @param {ColumnDecoder} columnDecoder column decoder params * @returns {DecodedArray[]} */ -export function readColumn(reader, rowGroupStart, rowGroupEnd, columnDecoder) { +export function readColumn(reader, { selectStart, selectEnd }, columnDecoder) { const { element, utf8 } = columnDecoder /** @type {DecodedArray[]} */ const chunks = [] @@ -23,7 +22,7 @@ export function readColumn(reader, rowGroupStart, rowGroupEnd, columnDecoder) { let dictionary = undefined let rowCount = 0 - while (rowCount < rowGroupEnd) { + while (rowCount < selectEnd) { if (reader.offset >= reader.view.byteLength - 1) break // end of reader // read page header @@ -35,7 +34,7 @@ export function readColumn(reader, rowGroupStart, rowGroupEnd, columnDecoder) { } else { const lastChunk = chunks.at(-1) const lastChunkLength = lastChunk?.length || 0 - const values = readPage(reader, header, columnDecoder, dictionary, lastChunk, rowGroupStart - rowCount) + const values = readPage(reader, header, columnDecoder, dictionary, lastChunk, selectStart - rowCount) if (lastChunk === values) { // continued from previous page rowCount += values.length - lastChunkLength @@ -45,15 +44,11 @@ export function readColumn(reader, rowGroupStart, rowGroupEnd, columnDecoder) { } } } - if (isFinite(rowGroupEnd)) { - if (rowCount < rowGroupEnd) { - throw new Error(`parquet row data length ${rowCount} does not match row group limit ${rowGroupEnd}}`) - } - if (rowCount > rowGroupEnd) { - // truncate last chunk to row limit - const lastChunk = chunks[chunks.length - 1] - chunks[chunks.length - 1] = lastChunk.slice(0, rowGroupEnd - (rowCount - lastChunk.length)) - } + // assert(rowCount >= selectEnd) + if (rowCount > selectEnd) { + // truncate last chunk to row limit + const lastChunk = chunks[chunks.length - 1] + chunks[chunks.length - 1] = lastChunk.slice(0, selectEnd - (rowCount - lastChunk.length)) } return chunks } @@ -150,7 +145,7 @@ export function getColumnRange({ dictionary_page_offset, data_page_offset, total /** * Read parquet header from a buffer. * - * @import {ColumnMetaData, DecodedArray, DataReader, PageHeader, ColumnDecoder} from '../src/types.d.ts' + * @import {ColumnMetaData, DecodedArray, DataReader, PageHeader, ColumnDecoder, RowGroupSelect} from '../src/types.d.ts' * @param {DataReader} reader * @returns {PageHeader} */ diff --git a/src/read.js b/src/read.js index 0505b3e..3f691cf 100644 --- a/src/read.js +++ b/src/read.js @@ -67,8 +67,11 @@ export async function readRowGroup(options, rowGroup, groupStart) { const { file, metadata, columns, rowStart, rowEnd } = options if (!metadata) throw new Error('parquet metadata not found') const numRows = Number(rowGroup.num_rows) - const rowGroupStart = Math.max((rowStart || 0) - groupStart, 0) - const rowGroupEnd = rowEnd === undefined ? numRows : Math.min(rowEnd - groupStart, numRows) + // index within the group to start and stop reading: + const selectStart = Math.max((rowStart || 0) - groupStart, 0) + const selectEnd = Math.min((rowEnd ?? Infinity) - groupStart, numRows) + /** @type {RowGroupSelect} */ + const rowGroupSelect = { groupStart, selectStart, selectEnd, numRows } // loop through metadata to find min/max bytes to read let [groupStartByte, groupEndByte] = [file.byteLength, 0] @@ -143,7 +146,7 @@ export async function readRowGroup(options, rowGroup, groupStart) { compressors: options.compressors, utf8: options.utf8, } - const columnData = readColumn(reader, rowGroupStart, rowGroupEnd, columnDecoder) + const columnData = readColumn(reader, rowGroupSelect, columnDecoder) /** @type {DecodedArray[] | undefined} */ let chunks = columnData @@ -168,13 +171,15 @@ export async function readRowGroup(options, rowGroup, groupStart) { // do not emit column data until structs are fully parsed if (!chunks) return // notify caller of column data - for (const chunk of chunks) { - options.onChunk?.({ - columnName, - columnData: chunk, - rowStart: groupStart, - rowEnd: groupStart + chunk.length, - }) + if (options.onChunk) { + for (const chunk of chunks) { + options.onChunk({ + columnName, + columnData: chunk, + rowStart: groupStart, + rowEnd: groupStart + chunk.length, + }) + } } })) } @@ -188,8 +193,8 @@ export async function readRowGroup(options, rowGroup, groupStart) { .map(name => includedColumnNames.includes(name) ? flatten(subcolumnData.get(name)) : undefined) // transpose columns into rows - const groupData = new Array(rowGroupEnd) - for (let row = rowGroupStart; row < rowGroupEnd; row++) { + const groupData = new Array(selectEnd) + for (let row = selectStart; row < selectEnd; row++) { if (options.rowFormat === 'object') { // return each row as an object /** @type {Record} */ @@ -228,7 +233,7 @@ function flatten(chunks) { /** * Return a list of sub-columns needed to construct a top-level column. * - * @import {DecodedArray, ParquetReadOptions, RowGroup, SchemaTree} from '../src/types.d.ts' + * @import {DecodedArray, ParquetReadOptions, RowGroup, RowGroupSelect, SchemaTree} from '../src/types.d.ts' * @param {SchemaTree} schema * @param {string[]} output * @returns {string[]} diff --git a/src/types.d.ts b/src/types.d.ts index 23b1339..961435b 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -386,3 +386,10 @@ export interface ColumnDecoder { compressors?: Compressors utf8?: boolean } + +export interface RowGroupSelect { + groupStart: number // row index of the first row in the group + selectStart: number // row index in the group to start reading + selectEnd: number // row index in the group to stop reading + numRows: number +} diff --git a/test/column.test.js b/test/column.test.js index b6e4ed3..34d85a6 100644 --- a/test/column.test.js +++ b/test/column.test.js @@ -6,12 +6,15 @@ import { asyncBufferFromFile } from '../src/utils.js' const values = [null, 1, -2, NaN, 0, -1, -0, 2] +/** + * @import {RowGroupSelect} from '../src/types.d.ts' + */ describe('readColumn', () => { it.for([ - { rowGroupEnd: Infinity, expected: [values] }, - { rowGroupEnd: 2, expected: [values.slice(0, 2)] }, - { rowGroupEnd: 0, expected: [] }, - ])('readColumn with rowGroupEnd %p', async ({ rowGroupEnd, expected }) => { + { selectEnd: Infinity, expected: [values] }, + { selectEnd: 2, expected: [values.slice(0, 2)] }, + { selectEnd: 0, expected: [] }, + ])('readColumn with rowGroupEnd %p', async ({ selectEnd, expected }) => { const testFile = 'test/files/float16_nonzeros_and_nans.parquet' const file = await asyncBufferFromFile(testFile) const arrayBuffer = await file.slice(0) @@ -30,8 +33,14 @@ describe('readColumn', () => { schemaPath, codec: column.meta_data.codec, } + const rowGroupSelect = { + groupStart: 0, + selectStart: 0, + selectEnd, + numRows: expected.length, + } - const result = readColumn(reader, 0, rowGroupEnd, columnDecoder) + const result = readColumn(reader, rowGroupSelect, columnDecoder) expect(result).toEqual(expected) }) @@ -54,8 +63,14 @@ describe('readColumn', () => { schemaPath, codec: column.meta_data.codec, } + const rowGroupSelect = { + groupStart: 0, + selectStart: 0, + selectEnd: Infinity, + numRows: Number(column.meta_data.num_values), + } - const columnData = readColumn(reader, 0, Infinity, columnDecoder) + const columnData = readColumn(reader, rowGroupSelect, columnDecoder) expect(columnData[0]).toBeInstanceOf(Int32Array) }) })