Return typed arrays in onChunk. Change readColumn to return DecodedArray[]. (#67)

Refactored readColumn to avoid `concat` operations.
This avoids extra copying and allocation.
This commit is contained in:
Kenny Daniel 2025-03-10 23:33:47 -07:00 committed by GitHub
parent a9467f6c3d
commit d7f8d39de3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 98 additions and 52 deletions

@ -104,7 +104,7 @@ export function assembleLists(
* https://github.com/apache/parquet-format/blob/apache-parquet-format-2.10.0/LogicalTypes.md#nested-types
*
* @import {SchemaTree} from '../src/types.d.ts'
* @param {Map<string, any[]>} subcolumnData
* @param {Map<string, DecodedArray>} subcolumnData
* @param {SchemaTree} schema top-level schema element
* @param {number} [depth] depth of nested structure
*/
@ -180,7 +180,7 @@ export function assembleNested(subcolumnData, schema, depth = 0) {
}
/**
* @param {any[]} arr
* @param {DecodedArray} arr
* @param {number} depth
*/
function flattenAtDepth(arr, depth) {
@ -194,8 +194,8 @@ function flattenAtDepth(arr, depth) {
}
/**
* @param {any[]} keys
* @param {any[]} values
* @param {DecodedArray} keys
* @param {DecodedArray} values
* @param {number} depth
* @returns {any[]}
*/

@ -4,7 +4,6 @@ import { convertWithDictionary } from './convert.js'
import { decompressPage, readDataPage, readDataPageV2, readDictionaryPage } from './datapage.js'
import { getMaxDefinitionLevel } from './schema.js'
import { deserializeTCompactProtocol } from './thrift.js'
import { concat } from './utils.js'
/**
* Parse column data from a buffer.
@ -14,21 +13,20 @@ import { concat } from './utils.js'
* @param {ColumnMetaData} columnMetadata column metadata
* @param {SchemaTree[]} schemaPath schema path for the column
* @param {ParquetReadOptions} options read options
* @returns {any[]} array of values
* @returns {DecodedArray[]}
*/
export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compressors, utf8 }) {
const { element } = schemaPath[schemaPath.length - 1]
/** @type {DecodedArray[]} */
const chunks = []
/** @type {DecodedArray | undefined} */
let dictionary = undefined
/** @type {any[]} */
const rowData = []
const hasRowLimit = rowLimit !== undefined && rowLimit >= 0 && isFinite(rowLimit)
let rowCount = 0
while (!hasRowLimit || rowData.length < rowLimit) {
while (!hasRowLimit || rowCount < rowLimit) {
if (reader.offset >= reader.view.byteLength - 1) break // end of reader
// parse column header
const header = parquetHeader(reader)
// assert(header.compressed_page_size !== undefined)
const header = parquetHeader(reader) // column header
// read compressed_page_size bytes starting at offset
const compressedBytes = new Uint8Array(
@ -36,8 +34,6 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr
)
// parse page data by type
/** @type {DecodedArray} */
let values
if (header.type === 'DATA_PAGE') {
const daph = header.data_page_header
if (!daph) throw new Error('parquet data page header is undefined')
@ -47,13 +43,15 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr
// assert(!daph.statistics?.null_count || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length))
// convert types, dereference dictionary, and assemble lists
values = convertWithDictionary(dataPage, dictionary, element, daph.encoding, utf8)
let values = convertWithDictionary(dataPage, dictionary, element, daph.encoding, utf8)
if (repetitionLevels.length || definitionLevels?.length) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const repetitionPath = schemaPath.map(({ element }) => element.repetition_type)
assembleLists(
rowData, definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel
const chunk = assembleLists(
[], definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel
)
chunks.push(chunk)
rowCount += chunk.length
} else {
// wrap nested flat data by depth
for (let i = 2; i < schemaPath.length; i++) {
@ -61,7 +59,8 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr
values = Array.from(values, e => [e])
}
}
concat(rowData, values)
chunks.push(values)
rowCount += values.length
}
} else if (header.type === 'DATA_PAGE_V2') {
const daph2 = header.data_page_header_v2
@ -72,15 +71,18 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr
)
// convert types, dereference dictionary, and assemble lists
values = convertWithDictionary(dataPage, dictionary, element, daph2.encoding, utf8)
const values = convertWithDictionary(dataPage, dictionary, element, daph2.encoding, utf8)
if (repetitionLevels.length || definitionLevels?.length) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const repetitionPath = schemaPath.map(({ element }) => element.repetition_type)
assembleLists(
rowData, definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel
const chunk = assembleLists(
[], definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel
)
chunks.push(chunk)
rowCount += chunk.length
} else {
concat(rowData, values)
chunks.push(values)
rowCount += values.length
}
} else if (header.type === 'DICTIONARY_PAGE') {
const diph = header.dictionary_page_header
@ -96,14 +98,16 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr
reader.offset += header.compressed_page_size
}
if (hasRowLimit) {
if (rowData.length < rowLimit) {
throw new Error(`parquet row data length ${rowData.length} does not match row group limit ${rowLimit}}`)
if (rowCount < rowLimit) {
throw new Error(`parquet row data length ${rowCount} does not match row group limit ${rowLimit}}`)
}
if (rowData.length > rowLimit) {
rowData.length = rowLimit // truncate to row limit
if (rowCount > rowLimit) {
// truncate last chunk to row limit
const lastChunk = chunks[chunks.length - 1]
chunks[chunks.length - 1] = lastChunk.slice(0, rowLimit - (rowCount - lastChunk.length))
}
}
return rowData
return chunks
}
/**

@ -96,10 +96,11 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) {
// Top-level columns to assemble
const { children } = getSchemaPath(metadata.schema, [])[0]
const subcolumnNames = new Map(children.map(child => [child.element.name, getSubcolumns(child)]))
/** @type {Map<string, DecodedArray[]>} */
const subcolumnData = new Map() // columns to assemble as maps
// read column data
for (let columnIndex = 0; columnIndex < rowGroup.columns.length; columnIndex++) {
const columnMetadata = rowGroup.columns[columnIndex].meta_data
for (let i = 0; i < rowGroup.columns.length; i++) {
const columnMetadata = rowGroup.columns[i].meta_data
if (!columnMetadata) throw new Error('parquet column metadata is undefined')
// skip columns that are not requested
@ -133,35 +134,43 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) {
promises.push(buffer.then(arrayBuffer => {
const schemaPath = getSchemaPath(metadata.schema, columnMetadata.path_in_schema)
const reader = { view: new DataView(arrayBuffer), offset: bufferOffset }
/** @type {any[] | undefined} */
let columnData = readColumn(reader, rowLimit, columnMetadata, schemaPath, options)
// assert(columnData.length === Number(rowGroup.num_rows)
const columnData = readColumn(reader, rowLimit, columnMetadata, schemaPath, options)
/** @type {DecodedArray[] | undefined} */
let chunks = columnData
// TODO: fast path for non-nested columns
// Save column data for assembly
const subcolumn = columnMetadata.path_in_schema.join('.')
subcolumnData.set(subcolumn, columnData)
columnData = undefined
subcolumnData.set(subcolumn, chunks)
chunks = undefined
const subcolumns = subcolumnNames.get(columnName)
if (subcolumns?.every(name => subcolumnData.has(name))) {
// For every subcolumn, flatten and assemble the column
const flatData = new Map(subcolumns.map(name => [name, flatten(subcolumnData.get(name))]))
// We have all data needed to assemble a top level column
assembleNested(subcolumnData, schemaPath[1])
columnData = subcolumnData.get(columnName)
if (!columnData) {
assembleNested(flatData, schemaPath[1])
const flatColumn = flatData.get(columnName)
if (flatColumn) {
chunks = [flatColumn]
subcolumns.forEach(name => subcolumnData.delete(name))
subcolumnData.set(columnName, chunks)
} else {
throw new Error(`parquet column data not assembled: ${columnName}`)
}
}
// do not emit column data until structs are fully parsed
if (!columnData) return
if (!chunks) return
// notify caller of column data
options.onChunk?.({
columnName,
columnData,
rowStart: groupStart,
rowEnd: groupStart + columnData.length,
})
for (const chunk of chunks) {
options.onChunk?.({
columnName,
columnData: chunk,
rowStart: groupStart,
rowEnd: groupStart + rowLimit,
})
}
}))
}
await Promise.all(promises)
@ -173,7 +182,7 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) {
.filter(name => !columns || columns.includes(name))
const columnOrder = columns || includedColumnNames
const includedColumns = columnOrder
.map(name => includedColumnNames.includes(name) ? subcolumnData.get(name) : undefined)
.map(name => includedColumnNames.includes(name) ? flatten(subcolumnData.get(name)) : undefined)
for (let row = 0; row < rowLimit; row++) {
if (options.rowFormat === 'object') {
@ -194,11 +203,27 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) {
return []
}
/**
* Flatten a list of lists into a single list.
*
* @param {DecodedArray[] | undefined} chunks
* @returns {DecodedArray}
*/
function flatten(chunks) {
if (!chunks) return []
if (chunks.length === 1) return chunks[0]
/** @type {any[]} */
const output = []
for (const chunk of chunks) {
concat(output, chunk)
}
return output
}
/**
* Return a list of sub-columns needed to construct a top-level column.
*
* @import {ParquetReadOptions, RowGroup, SchemaTree} from '../src/types.d.ts'
* @import {DecodedArray, ParquetReadOptions, RowGroup, SchemaTree} from '../src/types.d.ts'
* @param {SchemaTree} schema
* @param {string[]} output
* @returns {string[]}

@ -9,9 +9,9 @@ const values = [null, 1, -2, NaN, 0, -1, -0, 2]
describe('readColumn', () => {
it.for([
{ rowLimit: undefined, expected: values },
{ rowLimit: Infinity, expected: values },
{ rowLimit: 2, expected: values.slice(0, 2) },
{ rowLimit: undefined, expected: [values] },
{ rowLimit: Infinity, expected: [values] },
{ rowLimit: 2, expected: [values.slice(0, 2)] },
{ rowLimit: 0, expected: [] },
])('readColumn with rowLimit %p', async ({ rowLimit, expected }) => {
const testFile = 'test/files/float16_nonzeros_and_nans.parquet'
@ -29,4 +29,21 @@ describe('readColumn', () => {
const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors })
expect(result).toEqual(expected)
})
it('readColumn should return a typed array', async () => {
const testFile = 'test/files/datapage_v2.snappy.parquet'
const asyncBuffer = await asyncBufferFromFile(testFile)
const arrayBuffer = await asyncBuffer.slice(0)
const metadata = parquetMetadata(arrayBuffer)
const column = metadata.row_groups[0].columns[1] // second column
if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`)
const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number)
const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte)
const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? [])
const reader = { view: new DataView(columnArrayBuffer), offset: 0 }
const columnData = readColumn(reader, Infinity, column.meta_data, schemaPath, { file: asyncBuffer, compressors })
expect(columnData[0]).toBeInstanceOf(Int32Array)
})
})

@ -41,7 +41,7 @@ describe('parquetRead', () => {
})
})
it('read a single column', async () => {
it('read a single column as typed array', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
await parquetRead({
file,
@ -49,11 +49,11 @@ describe('parquetRead', () => {
onChunk(chunk) {
expect(chunk).toEqual({
columnName: 'b',
columnData: [1, 2, 3, 4, 5],
columnData: new Int32Array([1, 2, 3, 4, 5]),
rowStart: 0,
rowEnd: 5,
})
expect(chunk.columnData).toBeInstanceOf(Array)
expect(chunk.columnData).toBeInstanceOf(Int32Array)
},
})
})