From d7f8d39de3dac77d46ac168167533783195eeb97 Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Mon, 10 Mar 2025 23:33:47 -0700 Subject: [PATCH] Return typed arrays in onChunk. Change readColumn to return DecodedArray[]. (#67) Refactored readColumn to avoid `concat` operations. This avoids extra copying and allocation. --- src/assemble.js | 8 +++--- src/column.js | 50 ++++++++++++++++++----------------- src/read.js | 63 +++++++++++++++++++++++++++++++-------------- test/column.test.js | 23 ++++++++++++++--- test/read.test.js | 6 ++--- 5 files changed, 98 insertions(+), 52 deletions(-) diff --git a/src/assemble.js b/src/assemble.js index b0376d5..ab42ba2 100644 --- a/src/assemble.js +++ b/src/assemble.js @@ -104,7 +104,7 @@ export function assembleLists( * https://github.com/apache/parquet-format/blob/apache-parquet-format-2.10.0/LogicalTypes.md#nested-types * * @import {SchemaTree} from '../src/types.d.ts' - * @param {Map} subcolumnData + * @param {Map} subcolumnData * @param {SchemaTree} schema top-level schema element * @param {number} [depth] depth of nested structure */ @@ -180,7 +180,7 @@ export function assembleNested(subcolumnData, schema, depth = 0) { } /** - * @param {any[]} arr + * @param {DecodedArray} arr * @param {number} depth */ function flattenAtDepth(arr, depth) { @@ -194,8 +194,8 @@ function flattenAtDepth(arr, depth) { } /** - * @param {any[]} keys - * @param {any[]} values + * @param {DecodedArray} keys + * @param {DecodedArray} values * @param {number} depth * @returns {any[]} */ diff --git a/src/column.js b/src/column.js index 9b76a68..61a97ae 100644 --- a/src/column.js +++ b/src/column.js @@ -4,7 +4,6 @@ import { convertWithDictionary } from './convert.js' import { decompressPage, readDataPage, readDataPageV2, readDictionaryPage } from './datapage.js' import { getMaxDefinitionLevel } from './schema.js' import { deserializeTCompactProtocol } from './thrift.js' -import { concat } from './utils.js' /** * Parse column data from a buffer. @@ -14,21 +13,20 @@ import { concat } from './utils.js' * @param {ColumnMetaData} columnMetadata column metadata * @param {SchemaTree[]} schemaPath schema path for the column * @param {ParquetReadOptions} options read options - * @returns {any[]} array of values + * @returns {DecodedArray[]} */ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compressors, utf8 }) { const { element } = schemaPath[schemaPath.length - 1] + /** @type {DecodedArray[]} */ + const chunks = [] /** @type {DecodedArray | undefined} */ let dictionary = undefined - /** @type {any[]} */ - const rowData = [] const hasRowLimit = rowLimit !== undefined && rowLimit >= 0 && isFinite(rowLimit) + let rowCount = 0 - while (!hasRowLimit || rowData.length < rowLimit) { + while (!hasRowLimit || rowCount < rowLimit) { if (reader.offset >= reader.view.byteLength - 1) break // end of reader - // parse column header - const header = parquetHeader(reader) - // assert(header.compressed_page_size !== undefined) + const header = parquetHeader(reader) // column header // read compressed_page_size bytes starting at offset const compressedBytes = new Uint8Array( @@ -36,8 +34,6 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr ) // parse page data by type - /** @type {DecodedArray} */ - let values if (header.type === 'DATA_PAGE') { const daph = header.data_page_header if (!daph) throw new Error('parquet data page header is undefined') @@ -47,13 +43,15 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr // assert(!daph.statistics?.null_count || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length)) // convert types, dereference dictionary, and assemble lists - values = convertWithDictionary(dataPage, dictionary, element, daph.encoding, utf8) + let values = convertWithDictionary(dataPage, dictionary, element, daph.encoding, utf8) if (repetitionLevels.length || definitionLevels?.length) { const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) const repetitionPath = schemaPath.map(({ element }) => element.repetition_type) - assembleLists( - rowData, definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel + const chunk = assembleLists( + [], definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel ) + chunks.push(chunk) + rowCount += chunk.length } else { // wrap nested flat data by depth for (let i = 2; i < schemaPath.length; i++) { @@ -61,7 +59,8 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr values = Array.from(values, e => [e]) } } - concat(rowData, values) + chunks.push(values) + rowCount += values.length } } else if (header.type === 'DATA_PAGE_V2') { const daph2 = header.data_page_header_v2 @@ -72,15 +71,18 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr ) // convert types, dereference dictionary, and assemble lists - values = convertWithDictionary(dataPage, dictionary, element, daph2.encoding, utf8) + const values = convertWithDictionary(dataPage, dictionary, element, daph2.encoding, utf8) if (repetitionLevels.length || definitionLevels?.length) { const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) const repetitionPath = schemaPath.map(({ element }) => element.repetition_type) - assembleLists( - rowData, definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel + const chunk = assembleLists( + [], definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel ) + chunks.push(chunk) + rowCount += chunk.length } else { - concat(rowData, values) + chunks.push(values) + rowCount += values.length } } else if (header.type === 'DICTIONARY_PAGE') { const diph = header.dictionary_page_header @@ -96,14 +98,16 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr reader.offset += header.compressed_page_size } if (hasRowLimit) { - if (rowData.length < rowLimit) { - throw new Error(`parquet row data length ${rowData.length} does not match row group limit ${rowLimit}}`) + if (rowCount < rowLimit) { + throw new Error(`parquet row data length ${rowCount} does not match row group limit ${rowLimit}}`) } - if (rowData.length > rowLimit) { - rowData.length = rowLimit // truncate to row limit + if (rowCount > rowLimit) { + // truncate last chunk to row limit + const lastChunk = chunks[chunks.length - 1] + chunks[chunks.length - 1] = lastChunk.slice(0, rowLimit - (rowCount - lastChunk.length)) } } - return rowData + return chunks } /** diff --git a/src/read.js b/src/read.js index 78d68ed..8d2fb39 100644 --- a/src/read.js +++ b/src/read.js @@ -96,10 +96,11 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) { // Top-level columns to assemble const { children } = getSchemaPath(metadata.schema, [])[0] const subcolumnNames = new Map(children.map(child => [child.element.name, getSubcolumns(child)])) + /** @type {Map} */ const subcolumnData = new Map() // columns to assemble as maps // read column data - for (let columnIndex = 0; columnIndex < rowGroup.columns.length; columnIndex++) { - const columnMetadata = rowGroup.columns[columnIndex].meta_data + for (let i = 0; i < rowGroup.columns.length; i++) { + const columnMetadata = rowGroup.columns[i].meta_data if (!columnMetadata) throw new Error('parquet column metadata is undefined') // skip columns that are not requested @@ -133,35 +134,43 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) { promises.push(buffer.then(arrayBuffer => { const schemaPath = getSchemaPath(metadata.schema, columnMetadata.path_in_schema) const reader = { view: new DataView(arrayBuffer), offset: bufferOffset } - /** @type {any[] | undefined} */ - let columnData = readColumn(reader, rowLimit, columnMetadata, schemaPath, options) - // assert(columnData.length === Number(rowGroup.num_rows) + const columnData = readColumn(reader, rowLimit, columnMetadata, schemaPath, options) + /** @type {DecodedArray[] | undefined} */ + let chunks = columnData // TODO: fast path for non-nested columns // Save column data for assembly const subcolumn = columnMetadata.path_in_schema.join('.') - subcolumnData.set(subcolumn, columnData) - columnData = undefined + subcolumnData.set(subcolumn, chunks) + chunks = undefined const subcolumns = subcolumnNames.get(columnName) if (subcolumns?.every(name => subcolumnData.has(name))) { + // For every subcolumn, flatten and assemble the column + const flatData = new Map(subcolumns.map(name => [name, flatten(subcolumnData.get(name))])) // We have all data needed to assemble a top level column - assembleNested(subcolumnData, schemaPath[1]) - columnData = subcolumnData.get(columnName) - if (!columnData) { + assembleNested(flatData, schemaPath[1]) + const flatColumn = flatData.get(columnName) + if (flatColumn) { + chunks = [flatColumn] + subcolumns.forEach(name => subcolumnData.delete(name)) + subcolumnData.set(columnName, chunks) + } else { throw new Error(`parquet column data not assembled: ${columnName}`) } } // do not emit column data until structs are fully parsed - if (!columnData) return + if (!chunks) return // notify caller of column data - options.onChunk?.({ - columnName, - columnData, - rowStart: groupStart, - rowEnd: groupStart + columnData.length, - }) + for (const chunk of chunks) { + options.onChunk?.({ + columnName, + columnData: chunk, + rowStart: groupStart, + rowEnd: groupStart + rowLimit, + }) + } })) } await Promise.all(promises) @@ -173,7 +182,7 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) { .filter(name => !columns || columns.includes(name)) const columnOrder = columns || includedColumnNames const includedColumns = columnOrder - .map(name => includedColumnNames.includes(name) ? subcolumnData.get(name) : undefined) + .map(name => includedColumnNames.includes(name) ? flatten(subcolumnData.get(name)) : undefined) for (let row = 0; row < rowLimit; row++) { if (options.rowFormat === 'object') { @@ -194,11 +203,27 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) { return [] } +/** + * Flatten a list of lists into a single list. + * + * @param {DecodedArray[] | undefined} chunks + * @returns {DecodedArray} + */ +function flatten(chunks) { + if (!chunks) return [] + if (chunks.length === 1) return chunks[0] + /** @type {any[]} */ + const output = [] + for (const chunk of chunks) { + concat(output, chunk) + } + return output +} /** * Return a list of sub-columns needed to construct a top-level column. * - * @import {ParquetReadOptions, RowGroup, SchemaTree} from '../src/types.d.ts' + * @import {DecodedArray, ParquetReadOptions, RowGroup, SchemaTree} from '../src/types.d.ts' * @param {SchemaTree} schema * @param {string[]} output * @returns {string[]} diff --git a/test/column.test.js b/test/column.test.js index 73f88b7..6e8eded 100644 --- a/test/column.test.js +++ b/test/column.test.js @@ -9,9 +9,9 @@ const values = [null, 1, -2, NaN, 0, -1, -0, 2] describe('readColumn', () => { it.for([ - { rowLimit: undefined, expected: values }, - { rowLimit: Infinity, expected: values }, - { rowLimit: 2, expected: values.slice(0, 2) }, + { rowLimit: undefined, expected: [values] }, + { rowLimit: Infinity, expected: [values] }, + { rowLimit: 2, expected: [values.slice(0, 2)] }, { rowLimit: 0, expected: [] }, ])('readColumn with rowLimit %p', async ({ rowLimit, expected }) => { const testFile = 'test/files/float16_nonzeros_and_nans.parquet' @@ -29,4 +29,21 @@ describe('readColumn', () => { const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors }) expect(result).toEqual(expected) }) + + it('readColumn should return a typed array', async () => { + const testFile = 'test/files/datapage_v2.snappy.parquet' + const asyncBuffer = await asyncBufferFromFile(testFile) + const arrayBuffer = await asyncBuffer.slice(0) + const metadata = parquetMetadata(arrayBuffer) + + const column = metadata.row_groups[0].columns[1] // second column + if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`) + const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number) + const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte) + const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? []) + const reader = { view: new DataView(columnArrayBuffer), offset: 0 } + + const columnData = readColumn(reader, Infinity, column.meta_data, schemaPath, { file: asyncBuffer, compressors }) + expect(columnData[0]).toBeInstanceOf(Int32Array) + }) }) diff --git a/test/read.test.js b/test/read.test.js index 02a6db5..4ad9a34 100644 --- a/test/read.test.js +++ b/test/read.test.js @@ -41,7 +41,7 @@ describe('parquetRead', () => { }) }) - it('read a single column', async () => { + it('read a single column as typed array', async () => { const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') await parquetRead({ file, @@ -49,11 +49,11 @@ describe('parquetRead', () => { onChunk(chunk) { expect(chunk).toEqual({ columnName: 'b', - columnData: [1, 2, 3, 4, 5], + columnData: new Int32Array([1, 2, 3, 4, 5]), rowStart: 0, rowEnd: 5, }) - expect(chunk.columnData).toBeInstanceOf(Array) + expect(chunk.columnData).toBeInstanceOf(Int32Array) }, }) })