Skip unnecessary pages

Do this by passing rowGroupStart and rowGroupEnd for the rows to
fetch within a rowgroup. If a page is outside those bounds, we can
skip the page. Replaces rowLimit.
This commit is contained in:
Kenny Daniel 2025-04-06 15:10:31 -07:00
parent ba74d58dd3
commit 6c225888c4
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
6 changed files with 56 additions and 30 deletions

@ -53,6 +53,7 @@ export default [
'no-useless-return': 'error',
'no-var': 'error',
'object-curly-spacing': ['error', 'always'],
'object-shorthand': 'error',
'prefer-const': 'error',
'prefer-destructuring': ['warn', {
object: true,

@ -3,46 +3,47 @@ import { Encoding, PageType } from './constants.js'
import { convertWithDictionary } from './convert.js'
import { decompressPage, readDataPage, readDataPageV2 } from './datapage.js'
import { readPlain } from './plain.js'
import { isFlatColumn } from './schema.js'
import { deserializeTCompactProtocol } from './thrift.js'
/**
* Parse column data from a buffer.
*
* @param {DataReader} reader
* @param {number | undefined} rowLimit maximum number of rows to read (undefined reads all rows)
* @param {number} rowGroupStart skip this many rows in the row group
* @param {number} rowGroupEnd read up to this index in the row group (Infinity reads all rows)
* @param {ColumnMetaData} columnMetadata column metadata
* @param {SchemaTree[]} schemaPath schema path for the column
* @param {ParquetReadOptions} options read options
* @returns {DecodedArray[]}
*/
export function readColumn(reader, rowLimit, columnMetadata, schemaPath, options) {
export function readColumn(reader, rowGroupStart, rowGroupEnd, columnMetadata, schemaPath, options) {
const { element } = schemaPath[schemaPath.length - 1]
/** @type {DecodedArray[]} */
const chunks = []
/** @type {DecodedArray | undefined} */
let dictionary = undefined
const hasRowLimit = rowLimit !== undefined && rowLimit >= 0 && isFinite(rowLimit)
let rowCount = 0
// read dictionary
if (hasDictionary(columnMetadata)) {
dictionary = readPage(reader, columnMetadata, schemaPath, element, dictionary, options)
dictionary = readPage(reader, columnMetadata, schemaPath, element, dictionary, 0, options)
}
while (!hasRowLimit || rowCount < rowLimit) {
while (rowCount < rowGroupEnd) {
if (reader.offset >= reader.view.byteLength - 1) break // end of reader
const values = readPage(reader, columnMetadata, schemaPath, element, dictionary, options)
const values = readPage(reader, columnMetadata, schemaPath, element, dictionary, rowGroupStart - rowCount, options)
chunks.push(values)
rowCount += values.length
}
if (hasRowLimit) {
if (rowCount < rowLimit) {
throw new Error(`parquet row data length ${rowCount} does not match row group limit ${rowLimit}}`)
if (isFinite(rowGroupEnd)) {
if (rowCount < rowGroupEnd) {
throw new Error(`parquet row data length ${rowCount} does not match row group limit ${rowGroupEnd}}`)
}
if (rowCount > rowLimit) {
if (rowCount > rowGroupEnd) {
// truncate last chunk to row limit
const lastChunk = chunks[chunks.length - 1]
chunks[chunks.length - 1] = lastChunk.slice(0, rowLimit - (rowCount - lastChunk.length))
chunks[chunks.length - 1] = lastChunk.slice(0, rowGroupEnd - (rowCount - lastChunk.length))
}
}
return chunks
@ -56,10 +57,11 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, options
* @param {SchemaTree[]} schemaPath
* @param {SchemaElement} element
* @param {DecodedArray | undefined} dictionary
* @param {number} pageStart skip this many rows in the page
* @param {ParquetReadOptions} options
* @returns {DecodedArray}
*/
export function readPage(reader, columnMetadata, schemaPath, element, dictionary, { utf8, compressors }) {
export function readPage(reader, columnMetadata, schemaPath, element, dictionary, pageStart, { utf8, compressors }) {
const header = parquetHeader(reader) // column header
// read compressed_page_size bytes
@ -73,6 +75,11 @@ export function readPage(reader, columnMetadata, schemaPath, element, dictionary
const daph = header.data_page_header
if (!daph) throw new Error('parquet data page header is undefined')
// skip unnecessary non-nested pages
if (pageStart > daph.num_values && isFlatColumn(schemaPath)) {
return new Array(daph.num_values) // TODO: don't allocate array
}
const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors)
const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata)
// assert(!daph.statistics?.null_count || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length))
@ -94,6 +101,11 @@ export function readPage(reader, columnMetadata, schemaPath, element, dictionary
const daph2 = header.data_page_header_v2
if (!daph2) throw new Error('parquet data page header v2 is undefined')
// skip unnecessary pages
if (pageStart > daph2.num_rows) {
return new Array(daph2.num_values) // TODO: don't allocate array
}
const { definitionLevels, repetitionLevels, dataPage } = readDataPageV2(
compressedBytes, header, schemaPath, columnMetadata, compressors
)

@ -41,8 +41,7 @@ export async function parquetRead(options) {
// if row group overlaps with row range, read it
if (groupStart + groupRows >= rowStart && (rowEnd === undefined || groupStart < rowEnd)) {
// read row group
const rowLimit = rowEnd && rowEnd - groupStart
const groupData = await readRowGroup(options, rowGroup, groupStart, rowLimit)
const groupData = await readRowGroup(options, rowGroup, groupStart)
if (onComplete) {
// filter to rows in range
const start = Math.max(rowStart - groupStart, 0)
@ -62,13 +61,14 @@ export async function parquetRead(options) {
* @param {ParquetReadOptions} options read options
* @param {RowGroup} rowGroup row group to read
* @param {number} groupStart row index of the first row in the group
* @param {number} [rowLimit] max rows to read from this group
* @returns {Promise<any[][]>} resolves to row data
*/
export async function readRowGroup(options, rowGroup, groupStart, rowLimit) {
const { file, metadata, columns } = options
export async function readRowGroup(options, rowGroup, groupStart) {
const { file, metadata, columns, rowStart, rowEnd } = options
if (!metadata) throw new Error('parquet metadata not found')
if (rowLimit === undefined || rowLimit > rowGroup.num_rows) rowLimit = Number(rowGroup.num_rows)
const numRows = Number(rowGroup.num_rows)
const rowGroupStart = Math.max((rowStart || 0) - groupStart, 0)
const rowGroupEnd = rowEnd === undefined ? numRows : Math.min(rowEnd - groupStart, numRows)
// loop through metadata to find min/max bytes to read
let [groupStartByte, groupEndByte] = [file.byteLength, 0]
@ -134,7 +134,7 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) {
promises.push(buffer.then(arrayBuffer => {
const schemaPath = getSchemaPath(metadata.schema, columnMetadata.path_in_schema)
const reader = { view: new DataView(arrayBuffer), offset: bufferOffset }
const columnData = readColumn(reader, rowLimit, columnMetadata, schemaPath, options)
const columnData = readColumn(reader, rowGroupStart, rowGroupEnd, columnMetadata, schemaPath, options)
/** @type {DecodedArray[] | undefined} */
let chunks = columnData
@ -164,7 +164,7 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) {
columnName,
columnData: chunk,
rowStart: groupStart,
rowEnd: groupStart + rowLimit,
rowEnd: groupStart + chunk.length,
})
}
}))
@ -179,8 +179,8 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) {
.map(name => includedColumnNames.includes(name) ? flatten(subcolumnData.get(name)) : undefined)
// transpose columns into rows
const groupData = new Array(rowLimit)
for (let row = 0; row < rowLimit; row++) {
const groupData = new Array(rowGroupEnd)
for (let row = rowGroupStart; row < rowGroupEnd; row++) {
if (options.rowFormat === 'object') {
// return each row as an object
/** @type {Record<string, any>} */

@ -117,3 +117,17 @@ export function isMapLike(schema) {
return true
}
/**
* Returns true if a column is non-nested.
*
* @param {SchemaTree[]} schemaPath
* @returns {boolean}
*/
export function isFlatColumn(schemaPath) {
if (schemaPath.length !== 2) return false
const [, column] = schemaPath
if (column.element.repetition_type === 'REPEATED') return false
if (column.children.length) return false
return true
}

@ -9,11 +9,10 @@ const values = [null, 1, -2, NaN, 0, -1, -0, 2]
describe('readColumn', () => {
it.for([
{ rowLimit: undefined, expected: [values] },
{ rowLimit: Infinity, expected: [values] },
{ rowLimit: 2, expected: [values.slice(0, 2)] },
{ rowLimit: 0, expected: [] },
])('readColumn with rowLimit %p', async ({ rowLimit, expected }) => {
{ rowGroupEnd: Infinity, expected: [values] },
{ rowGroupEnd: 2, expected: [values.slice(0, 2)] },
{ rowGroupEnd: 0, expected: [] },
])('readColumn with rowGroupEnd %p', async ({ rowGroupEnd, expected }) => {
const testFile = 'test/files/float16_nonzeros_and_nans.parquet'
const asyncBuffer = await asyncBufferFromFile(testFile)
const arrayBuffer = await asyncBuffer.slice(0)
@ -26,7 +25,7 @@ describe('readColumn', () => {
const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? [])
const reader = { view: new DataView(columnArrayBuffer), offset: 0 }
const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors })
const result = readColumn(reader, 0, rowGroupEnd, column.meta_data, schemaPath, { file: asyncBuffer, compressors })
expect(result).toEqual(expected)
})
@ -43,7 +42,7 @@ describe('readColumn', () => {
const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? [])
const reader = { view: new DataView(columnArrayBuffer), offset: 0 }
const columnData = readColumn(reader, Infinity, column.meta_data, schemaPath, { file: asyncBuffer, compressors })
const columnData = readColumn(reader, 0, Infinity, column.meta_data, schemaPath, { file: asyncBuffer, compressors })
expect(columnData[0]).toBeInstanceOf(Int32Array)
})
})

@ -179,6 +179,6 @@ describe('parquetRead', () => {
rowEnd: 91,
})
expect(rows).toEqual([{ col: 'bad' }])
expect(convertWithDictionary).toHaveBeenCalledTimes(10)
expect(convertWithDictionary).toHaveBeenCalledTimes(2)
})
})