diff --git a/src/assemble.js b/src/assemble.js index d1865cb..c9df73d 100644 --- a/src/assemble.js +++ b/src/assemble.js @@ -8,21 +8,19 @@ import { isListLike, isMapLike } from './schema.js' * * @typedef {import('./types.d.ts').DecodedArray} DecodedArray * @typedef {import('./types.d.ts').FieldRepetitionType} FieldRepetitionType + * @param {any[]} output * @param {number[] | undefined} definitionLevels * @param {number[]} repetitionLevels * @param {DecodedArray} values * @param {(FieldRepetitionType | undefined)[]} repetitionPath * @param {number} maxDefinitionLevel definition level that corresponds to non-null - * @param {number} maxRepetitionLevel repetition level that corresponds to a new row - * @returns {DecodedArray} array of values + * @returns {any[]} */ export function assembleLists( - definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel, maxRepetitionLevel + output, definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel ) { const n = definitionLevels?.length || repetitionLevels.length let valueIndex = 0 - /** @type {any[]} */ - const output = [] // Track state of nested structures const containerStack = [output] @@ -31,6 +29,18 @@ export function assembleLists( let currentDefLevel = 0 // list depth let currentRepLevel = 0 + if (repetitionLevels[0]) { + // continue previous row + while (currentDepth < repetitionPath.length - 2 && currentRepLevel < repetitionLevels[0]) { + // go into last list + currentContainer = currentContainer.at(-1) + containerStack.push(currentContainer) + currentDepth++ + if (repetitionPath[currentDepth] !== 'REQUIRED') currentDefLevel++ + if (repetitionPath[currentDepth] === 'REPEATED') currentRepLevel++ + } + } + for (let i = 0; i < n; i++) { // assert(currentDefLevel === containerStack.length - 1) const def = definitionLevels?.length ? definitionLevels[i] : maxDefinitionLevel @@ -77,10 +87,7 @@ export function assembleLists( } // Handle edge cases for empty inputs or single-level data - if (output.length === 0) { - if (values.length > 0 && maxRepetitionLevel === 0) { - return values // flat list - } + if (!output.length) { // return max definition level of nested lists for (let i = 0; i < maxDefinitionLevel; i++) { /** @type {any[]} */ diff --git a/src/column.js b/src/column.js index 7b2ae17..0dee558 100644 --- a/src/column.js +++ b/src/column.js @@ -3,7 +3,7 @@ import { convertWithDictionary } from './convert.js' import { decompressPage, readDataPage, readDictionaryPage } from './datapage.js' import { readDataPageV2 } from './datapageV2.js' import { parquetHeader } from './header.js' -import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' +import { getMaxDefinitionLevel } from './schema.js' import { concat } from './utils.js' /** @@ -12,21 +12,20 @@ import { concat } from './utils.js' * @typedef {import('./types.js').ColumnMetaData} ColumnMetaData * @typedef {import('./types.js').DecodedArray} DecodedArray * @param {import('./types.js').DataReader} reader - * @param {import('./types.js').RowGroup} rowGroup row group metadata + * @param {number} rowLimit maximum number of rows to read * @param {ColumnMetaData} columnMetadata column metadata * @param {import('./types.js').SchemaTree[]} schemaPath schema path for the column * @param {import('./hyparquet.js').ParquetReadOptions} options read options * @returns {any[]} array of values */ -export function readColumn(reader, rowGroup, columnMetadata, schemaPath, { compressors, utf8 }) { +export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compressors, utf8 }) { const { element } = schemaPath[schemaPath.length - 1] /** @type {DecodedArray | undefined} */ let dictionary = undefined - let seen = 0 /** @type {any[]} */ const rowData = [] - while (seen < rowGroup.num_rows) { + while (rowData.length < rowLimit) { // parse column header const header = parquetHeader(reader) // assert(header.compressed_page_size !== undefined) @@ -45,17 +44,15 @@ export function readColumn(reader, rowGroup, columnMetadata, schemaPath, { compr const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors) const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata) - seen += daph.num_values - // assert(!daph.statistics || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length)) + // assert(!daph.statistics?.null_count || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length)) // convert types, dereference dictionary, and assemble lists values = convertWithDictionary(dataPage, dictionary, element, daph.encoding, utf8) if (repetitionLevels.length || definitionLevels?.length) { const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) - const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) const repetitionPath = schemaPath.map(({ element }) => element.repetition_type) - values = assembleLists( - definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel, maxRepetitionLevel + assembleLists( + rowData, definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel ) } else { // wrap nested flat data by depth @@ -64,9 +61,8 @@ export function readColumn(reader, rowGroup, columnMetadata, schemaPath, { compr values = Array.from(values, e => [e]) } } + concat(rowData, values) } - // assert(BigInt(values.length) === rowGroup.num_rows) - concat(rowData, values) } else if (header.type === 'DATA_PAGE_V2') { const daph2 = header.data_page_header_v2 if (!daph2) throw new Error('parquet data page header v2 is undefined') @@ -74,19 +70,18 @@ export function readColumn(reader, rowGroup, columnMetadata, schemaPath, { compr const { definitionLevels, repetitionLevels, dataPage } = readDataPageV2( compressedBytes, header, schemaPath, columnMetadata, compressors ) - seen += daph2.num_values // convert types, dereference dictionary, and assemble lists values = convertWithDictionary(dataPage, dictionary, element, daph2.encoding, utf8) if (repetitionLevels.length || definitionLevels?.length) { const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) - const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) const repetitionPath = schemaPath.map(({ element }) => element.repetition_type) - values = assembleLists( - definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel, maxRepetitionLevel + assembleLists( + rowData, definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel ) + } else { + concat(rowData, values) } - concat(rowData, values) } else if (header.type === 'DICTIONARY_PAGE') { const diph = header.dictionary_page_header if (!diph) throw new Error('parquet dictionary page header is undefined') @@ -100,8 +95,8 @@ export function readColumn(reader, rowGroup, columnMetadata, schemaPath, { compr } reader.offset += header.compressed_page_size } - if (rowData.length !== Number(rowGroup.num_rows)) { - throw new Error(`parquet row data length ${rowData.length} does not match row group length ${rowGroup.num_rows}}`) + if (rowData.length < rowLimit) { + throw new Error(`parquet row data length ${rowData.length} does not match row group limit ${rowLimit}}`) } return rowData } diff --git a/src/datapage.js b/src/datapage.js index 1597f0f..71f122c 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -25,7 +25,9 @@ export function readDataPage(bytes, daph, schemaPath, { type }) { // repetition and definition levels const repetitionLevels = readRepetitionLevels(reader, daph, schemaPath) + // assert(!repetitionLevels.length || repetitionLevels.length === daph.num_values) const { definitionLevels, numNulls } = readDefinitionLevels(reader, daph, schemaPath) + // assert(!definitionLevels.length || definitionLevels.length === daph.num_values) // read values based on encoding const nValues = daph.num_values - numNulls diff --git a/src/read.js b/src/read.js index aa5baf5..427338a 100644 --- a/src/read.js +++ b/src/read.js @@ -51,7 +51,8 @@ export async function parquetRead(options) { // if row group overlaps with row range, read it if (groupStart + groupRows >= rowStart && (rowEnd === undefined || groupStart < rowEnd)) { // read row group - const groupData = await readRowGroup(options, rowGroup, groupStart) + const rowLimit = rowEnd && rowEnd - groupStart + const groupData = await readRowGroup(options, rowGroup, groupStart, rowLimit) if (onComplete) { // filter to rows in range const start = Math.max(rowStart - groupStart, 0) @@ -78,11 +79,13 @@ export async function parquetRead(options) { * @param {Compressors} [options.compressors] * @param {RowGroup} rowGroup row group to read * @param {number} groupStart row index of the first row in the group + * @param {number} [rowLimit] max rows to read from this group * @returns {Promise} resolves to row data */ -async function readRowGroup(options, rowGroup, groupStart) { +export async function readRowGroup(options, rowGroup, groupStart, rowLimit) { const { file, metadata, columns } = options if (!metadata) throw new Error('parquet metadata not found') + if (rowLimit === undefined || rowLimit > rowGroup.num_rows) rowLimit = Number(rowGroup.num_rows) // loop through metadata to find min/max bytes to read let [groupStartByte, groupEndByte] = [file.byteLength, 0] @@ -151,7 +154,7 @@ async function readRowGroup(options, rowGroup, groupStart) { const schemaPath = getSchemaPath(metadata.schema, columnMetadata.path_in_schema) const reader = { view: new DataView(arrayBuffer), offset: bufferOffset } /** @type {any[] | undefined} */ - let columnData = readColumn(reader, rowGroup, columnMetadata, schemaPath, options) + let columnData = readColumn(reader, rowLimit, columnMetadata, schemaPath, options) // assert(columnData.length === Number(rowGroup.num_rows) // TODO: fast path for non-nested columns diff --git a/test/assemble.test.js b/test/assemble.test.js index aba4f96..0817766 100644 --- a/test/assemble.test.js +++ b/test/assemble.test.js @@ -13,22 +13,10 @@ describe('assembleLists', () => { /** @type {FieldRepetitionType[]} */ const nestedOptional = [undefined, 'OPTIONAL', 'REPEATED', 'OPTIONAL', 'REPEATED', 'OPTIONAL'] - it('should not change flat objects', () => { - const values = ['a', 'b'] - const result = assembleLists([], [], values, [undefined, 'REQUIRED'], 0, 0) - expect(result).toEqual(['a', 'b']) - }) - - it('should not change nested required objects', () => { - const values = ['a', 'b'] - const result = assembleLists([], [], values, [undefined, 'REQUIRED', 'REQUIRED'], 0, 0) - expect(result).toEqual(['a', 'b']) - }) - it('should assemble objects with non-null values', () => { const repetitionLevels = [0, 1] const values = ['a', 'b'] - const result = assembleLists([], repetitionLevels, values, nonnullable, 1, 1) + const result = assembleLists([], [], repetitionLevels, values, nonnullable, 1) expect(result).toEqual([['a', 'b']]) }) @@ -36,26 +24,26 @@ describe('assembleLists', () => { const definitionLevels = [3, 0, 3] const repetitionLevels = [0, 1, 1] const values = ['a', 'c'] - const result = assembleLists(definitionLevels, repetitionLevels, values, nullable, 3, 1) + const result = assembleLists([], definitionLevels, repetitionLevels, values, nullable, 3) expect(result).toEqual([[['a', null, 'c']]]) }) it('should handle empty lists', () => { - expect(assembleLists([], [], [], nonnullable, 0, 0)).toEqual([]) - expect(assembleLists([], [], [], nonnullable, 1, 0)).toEqual([[]]) + expect(assembleLists([], [], [], [], nonnullable, 0)).toEqual([]) + expect(assembleLists([], [], [], [], nonnullable, 1)).toEqual([[]]) }) it('should handle multiple lists', () => { const repetitionLevels = [0, 0] const values = [22, 33] - const result = assembleLists([], repetitionLevels, values, nonnullable, 1, 1) + const result = assembleLists([], [], repetitionLevels, values, nonnullable, 1) expect(result).toEqual([[22], [33]]) }) it('should handle multiple lists (6)', () => { const repetitionLevels = [0, 1, 1, 0, 1, 1] const values = [1, 2, 3, 4, 5, 6] - const result = assembleLists([], repetitionLevels, values, nonnullable, 1, 1) + const result = assembleLists([], [], repetitionLevels, values, nonnullable, 1) expect(result).toEqual([[1, 2, 3], [4, 5, 6]]) }) @@ -63,23 +51,24 @@ describe('assembleLists', () => { const definitionLevels = [3, 3, 0, 3, 3] const repetitionLevels = [0, 1, 0, 0, 1] const values = ['a', 'b', 'd', 'e'] - const result = assembleLists(definitionLevels, repetitionLevels, values, nullable, 3, 1) + const result = assembleLists([], definitionLevels, repetitionLevels, values, nullable, 3) expect(result).toEqual([[['a', 'b']], [], [['d', 'e']]]) }) - // it('should handle continuing a row from the previous page', () => { - // const definitionLevels = [3, 3, 3, 1] - // const repetitionLevels = [1, 0, 1, 0] - // const values = ['a', 'b', 'c', 'd'] - // const result = assembleObjects(definitionLevels, repetitionLevels, values, nullable, 3, 1) - // expect(result).toEqual([['b', 'c'], [undefined]]) - // }) + it('should handle continuing a row from the previous page', () => { + const definitionLevels = [3, 3, 3, 1] + const repetitionLevels = [1, 0, 1, 0] + const values = ['b', 'c', 'd', 'e'] + const prev = [[['a']]] + const result = assembleLists(prev, definitionLevels, repetitionLevels, values, nullable, 3) + expect(result).toEqual([[['a', 'b']], [['c', 'd']], [[]]]) + }) it('should handle nested arrays', () => { // from nullable.impala.parquet const repetitionLevels = [0, 2, 1, 2] const values = [1, 2, 3, 4] - const result = assembleLists([], repetitionLevels, values, nestedRequired, 2, 2) + const result = assembleLists([], [], repetitionLevels, values, nestedRequired, 2) expect(result).toEqual([[[1, 2], [3, 4]]]) }) @@ -90,7 +79,7 @@ describe('assembleLists', () => { const values = ['k1', 'k2', 'k1', 'k2', 'k1', 'k3'] /** @type {FieldRepetitionType[]} */ const repetitionPath = ['REQUIRED', 'OPTIONAL', 'REPEATED', 'REQUIRED'] // map key required - const result = assembleLists(definitionLevels, repetitionLevels, values, repetitionPath, 2, 1) + const result = assembleLists([], definitionLevels, repetitionLevels, values, repetitionPath, 2) expect(result).toEqual([ [['k1', 'k2']], [['k1', 'k2']], @@ -104,12 +93,12 @@ describe('assembleLists', () => { it('should handle empty lists with definition level', () => { // from nonnullable.impala.parquet - expect(assembleLists([0], [0], [], nonnullable, 1, 2)).toEqual([[]]) + expect(assembleLists([], [0], [0], [], nonnullable, 1)).toEqual([[]]) }) it('should handle nonnullable lists', () => { // from nonnullable.impala.parquet - expect(assembleLists([1], [0], [-1], nonnullable, 1, 2)).toEqual([[-1]]) + expect(assembleLists([], [1], [0], [-1], nonnullable, 1)).toEqual([[-1]]) }) it('should handle nullable int_array', () => { @@ -118,7 +107,7 @@ describe('assembleLists', () => { const definitionLevels = [3, 3, 3, 2, 3, 3, 2, 3, 2, 1, 0, 0] const repetitionLevels = [0, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 0] const values = [1, 2, 3, 1, 2, 3] - const result = assembleLists(definitionLevels, repetitionLevels, values, nullable, 3, 1) + const result = assembleLists([], definitionLevels, repetitionLevels, values, nullable, 3) expect(result).toEqual([ [[1, 2, 3]], [[null, 1, 2, null, 3, null]], @@ -134,7 +123,7 @@ describe('assembleLists', () => { const definitionLevels = [5, 5, 5, 5, 4, 5, 5, 4, 5, 4, 5, 3, 2, 2, 1, 0, 0, 2, 5, 5] const repetitionLevels = [0, 2, 1, 2, 0, 2, 2, 2, 1, 2, 2, 1, 1, 0, 0, 0, 0, 0, 1, 2] const values = [1, 2, 3, 4, 1, 2, 3, 4, 5, 6] - const result = assembleLists(definitionLevels, repetitionLevels, values, nestedOptional, 5, 2) + const result = assembleLists([], definitionLevels, repetitionLevels, values, nestedOptional, 5) expect(result).toEqual([ [[[[1, 2]], [[3, 4]]]], [[[[null, 1, 2, null]], [[3, null, 4]], [[]], []]], @@ -150,7 +139,7 @@ describe('assembleLists', () => { const definitionLevels = [3, 4, 3, 3] const repetitionLevels = [0, 1, 1, 1] const values = ['k1'] - const result = assembleLists(definitionLevels, repetitionLevels, values, nullable, 4, 2) + const result = assembleLists([], definitionLevels, repetitionLevels, values, nullable, 4) expect(result).toEqual([[[null, 'k1', null, null]]]) }) @@ -158,7 +147,7 @@ describe('assembleLists', () => { const definitionLevels = [3, 5, 3, 3] const repetitionLevels = [0, 1, 1, 1] const values = ['v1'] - const result = assembleLists(definitionLevels, repetitionLevels, values, nullable, 5, 2) + const result = assembleLists([], definitionLevels, repetitionLevels, values, nullable, 5) expect(result).toEqual([[[null, 'v1', null, null]]]) }) @@ -169,7 +158,7 @@ describe('assembleLists', () => { const values = [1, 2, 3, 1, 2, 3, 1, 2] /** @type {FieldRepetitionType[]} */ const repetitionPath = [undefined, 'OPTIONAL', 'REPEATED', 'REQUIRED'] - const result = assembleLists(definitionLevels, repetitionLevels, values, repetitionPath, 2, 1) + const result = assembleLists([], definitionLevels, repetitionLevels, values, repetitionPath, 2) expect(result).toEqual([[[1, 2, 3]], [], [], [[1, 2, 3]], [[1, 2]]]) }) @@ -179,21 +168,21 @@ describe('assembleLists', () => { const repetitionLevels = [0] /** @type {FieldRepetitionType[]} */ const repetitionPath = [undefined, 'REQUIRED', 'REQUIRED', 'REPEATED', 'REQUIRED', 'REQUIRED', 'REPEATED', 'REQUIRED'] - const result = assembleLists(definitionLevels, repetitionLevels, [], repetitionPath, 2, 2) + const result = assembleLists([], definitionLevels, repetitionLevels, [], repetitionPath, 2) expect(result).toEqual([[]]) }) it('should handle dzenilee', () => { const repetitionLevels = [0, 1, 1, 0, 1, 1] const values = ['a', 'b', 'c', 'd', 'e', 'f'] - const result = assembleLists([], repetitionLevels, values, nullable, 3, 1) + const result = assembleLists([], [], repetitionLevels, values, nullable, 3) expect(result).toEqual([[['a', 'b', 'c']], [['d', 'e', 'f']]]) }) it('handle complex.parquet with nested require', () => { const definitionLevels = [1, 1] const values = ['a', 'b'] - const result = assembleLists(definitionLevels, [], values, [undefined, 'OPTIONAL', 'REQUIRED', 'REQUIRED'], 1, 0) + const result = assembleLists([], definitionLevels, [], values, [undefined, 'OPTIONAL', 'REQUIRED', 'REQUIRED'], 1) expect(result).toEqual([['a'], ['b']]) }) }) diff --git a/test/read.test.js b/test/read.test.js index 3a749e5..6d1b472 100644 --- a/test/read.test.js +++ b/test/read.test.js @@ -21,9 +21,21 @@ describe('parquetRead', () => { const file = fileToAsyncBuffer('test/files/rowgroups.parquet') await parquetRead({ file, - rowEnd: 2, + rowStart: 2, + rowEnd: 4, onComplete: rows => { - expect(toJson(rows)).toEqual([[1], [2]]) + expect(toJson(rows)).toEqual([[3], [4]]) + }, + }) + }) + + it('filter by row overestimate', async () => { + const file = fileToAsyncBuffer('test/files/rowgroups.parquet') + await parquetRead({ + file, + rowEnd: 100, + onComplete: rows => { + expect(toJson(rows)).toEqual([[1], [2], [3], [4], [5], [6], [7], [8], [9], [10], [11], [12], [13], [14], [15]]) }, }) })