Group selection of a row group into an object

This commit is contained in:
Kenny Daniel 2025-04-10 15:51:24 -07:00
parent 4df7095ab4
commit 90be536e05
No known key found for this signature in database
GPG Key ID: FDF16101AF5AFD3A
4 changed files with 56 additions and 34 deletions

@ -10,12 +10,11 @@ import { deserializeTCompactProtocol } from './thrift.js'
* Parse column data from a buffer.
*
* @param {DataReader} reader
* @param {number} rowGroupStart skip this many rows in the row group
* @param {number} rowGroupEnd read up to this index in the row group (Infinity reads all rows)
* @param {RowGroupSelect} rowGroupSelect row group selection
* @param {ColumnDecoder} columnDecoder column decoder params
* @returns {DecodedArray[]}
*/
export function readColumn(reader, rowGroupStart, rowGroupEnd, columnDecoder) {
export function readColumn(reader, { selectStart, selectEnd }, columnDecoder) {
const { element, utf8 } = columnDecoder
/** @type {DecodedArray[]} */
const chunks = []
@ -23,7 +22,7 @@ export function readColumn(reader, rowGroupStart, rowGroupEnd, columnDecoder) {
let dictionary = undefined
let rowCount = 0
while (rowCount < rowGroupEnd) {
while (rowCount < selectEnd) {
if (reader.offset >= reader.view.byteLength - 1) break // end of reader
// read page header
@ -35,7 +34,7 @@ export function readColumn(reader, rowGroupStart, rowGroupEnd, columnDecoder) {
} else {
const lastChunk = chunks.at(-1)
const lastChunkLength = lastChunk?.length || 0
const values = readPage(reader, header, columnDecoder, dictionary, lastChunk, rowGroupStart - rowCount)
const values = readPage(reader, header, columnDecoder, dictionary, lastChunk, selectStart - rowCount)
if (lastChunk === values) {
// continued from previous page
rowCount += values.length - lastChunkLength
@ -45,15 +44,11 @@ export function readColumn(reader, rowGroupStart, rowGroupEnd, columnDecoder) {
}
}
}
if (isFinite(rowGroupEnd)) {
if (rowCount < rowGroupEnd) {
throw new Error(`parquet row data length ${rowCount} does not match row group limit ${rowGroupEnd}}`)
}
if (rowCount > rowGroupEnd) {
// truncate last chunk to row limit
const lastChunk = chunks[chunks.length - 1]
chunks[chunks.length - 1] = lastChunk.slice(0, rowGroupEnd - (rowCount - lastChunk.length))
}
// assert(rowCount >= selectEnd)
if (rowCount > selectEnd) {
// truncate last chunk to row limit
const lastChunk = chunks[chunks.length - 1]
chunks[chunks.length - 1] = lastChunk.slice(0, selectEnd - (rowCount - lastChunk.length))
}
return chunks
}
@ -150,7 +145,7 @@ export function getColumnRange({ dictionary_page_offset, data_page_offset, total
/**
* Read parquet header from a buffer.
*
* @import {ColumnMetaData, DecodedArray, DataReader, PageHeader, ColumnDecoder} from '../src/types.d.ts'
* @import {ColumnMetaData, DecodedArray, DataReader, PageHeader, ColumnDecoder, RowGroupSelect} from '../src/types.d.ts'
* @param {DataReader} reader
* @returns {PageHeader}
*/

@ -67,8 +67,11 @@ export async function readRowGroup(options, rowGroup, groupStart) {
const { file, metadata, columns, rowStart, rowEnd } = options
if (!metadata) throw new Error('parquet metadata not found')
const numRows = Number(rowGroup.num_rows)
const rowGroupStart = Math.max((rowStart || 0) - groupStart, 0)
const rowGroupEnd = rowEnd === undefined ? numRows : Math.min(rowEnd - groupStart, numRows)
// index within the group to start and stop reading:
const selectStart = Math.max((rowStart || 0) - groupStart, 0)
const selectEnd = Math.min((rowEnd ?? Infinity) - groupStart, numRows)
/** @type {RowGroupSelect} */
const rowGroupSelect = { groupStart, selectStart, selectEnd, numRows }
// loop through metadata to find min/max bytes to read
let [groupStartByte, groupEndByte] = [file.byteLength, 0]
@ -143,7 +146,7 @@ export async function readRowGroup(options, rowGroup, groupStart) {
compressors: options.compressors,
utf8: options.utf8,
}
const columnData = readColumn(reader, rowGroupStart, rowGroupEnd, columnDecoder)
const columnData = readColumn(reader, rowGroupSelect, columnDecoder)
/** @type {DecodedArray[] | undefined} */
let chunks = columnData
@ -168,13 +171,15 @@ export async function readRowGroup(options, rowGroup, groupStart) {
// do not emit column data until structs are fully parsed
if (!chunks) return
// notify caller of column data
for (const chunk of chunks) {
options.onChunk?.({
columnName,
columnData: chunk,
rowStart: groupStart,
rowEnd: groupStart + chunk.length,
})
if (options.onChunk) {
for (const chunk of chunks) {
options.onChunk({
columnName,
columnData: chunk,
rowStart: groupStart,
rowEnd: groupStart + chunk.length,
})
}
}
}))
}
@ -188,8 +193,8 @@ export async function readRowGroup(options, rowGroup, groupStart) {
.map(name => includedColumnNames.includes(name) ? flatten(subcolumnData.get(name)) : undefined)
// transpose columns into rows
const groupData = new Array(rowGroupEnd)
for (let row = rowGroupStart; row < rowGroupEnd; row++) {
const groupData = new Array(selectEnd)
for (let row = selectStart; row < selectEnd; row++) {
if (options.rowFormat === 'object') {
// return each row as an object
/** @type {Record<string, any>} */
@ -228,7 +233,7 @@ function flatten(chunks) {
/**
* Return a list of sub-columns needed to construct a top-level column.
*
* @import {DecodedArray, ParquetReadOptions, RowGroup, SchemaTree} from '../src/types.d.ts'
* @import {DecodedArray, ParquetReadOptions, RowGroup, RowGroupSelect, SchemaTree} from '../src/types.d.ts'
* @param {SchemaTree} schema
* @param {string[]} output
* @returns {string[]}

7
src/types.d.ts vendored

@ -386,3 +386,10 @@ export interface ColumnDecoder {
compressors?: Compressors
utf8?: boolean
}
export interface RowGroupSelect {
groupStart: number // row index of the first row in the group
selectStart: number // row index in the group to start reading
selectEnd: number // row index in the group to stop reading
numRows: number
}

@ -6,12 +6,15 @@ import { asyncBufferFromFile } from '../src/utils.js'
const values = [null, 1, -2, NaN, 0, -1, -0, 2]
/**
* @import {RowGroupSelect} from '../src/types.d.ts'
*/
describe('readColumn', () => {
it.for([
{ rowGroupEnd: Infinity, expected: [values] },
{ rowGroupEnd: 2, expected: [values.slice(0, 2)] },
{ rowGroupEnd: 0, expected: [] },
])('readColumn with rowGroupEnd %p', async ({ rowGroupEnd, expected }) => {
{ selectEnd: Infinity, expected: [values] },
{ selectEnd: 2, expected: [values.slice(0, 2)] },
{ selectEnd: 0, expected: [] },
])('readColumn with rowGroupEnd %p', async ({ selectEnd, expected }) => {
const testFile = 'test/files/float16_nonzeros_and_nans.parquet'
const file = await asyncBufferFromFile(testFile)
const arrayBuffer = await file.slice(0)
@ -30,8 +33,14 @@ describe('readColumn', () => {
schemaPath,
codec: column.meta_data.codec,
}
const rowGroupSelect = {
groupStart: 0,
selectStart: 0,
selectEnd,
numRows: expected.length,
}
const result = readColumn(reader, 0, rowGroupEnd, columnDecoder)
const result = readColumn(reader, rowGroupSelect, columnDecoder)
expect(result).toEqual(expected)
})
@ -54,8 +63,14 @@ describe('readColumn', () => {
schemaPath,
codec: column.meta_data.codec,
}
const rowGroupSelect = {
groupStart: 0,
selectStart: 0,
selectEnd: Infinity,
numRows: Number(column.meta_data.num_values),
}
const columnData = readColumn(reader, 0, Infinity, columnDecoder)
const columnData = readColumn(reader, rowGroupSelect, columnDecoder)
expect(columnData[0]).toBeInstanceOf(Int32Array)
})
})