Add more details to QueryPlan. (#82)

- Add metadata
 - Add rowStart and rowEnd
 - Add columns
 - Add groupStart, selectStart, selectEnd, and groupRows to GroupPlan
 - Rename ranges to fetches
 - Rename numRows to groupRows in ColumnDecoder
This commit is contained in:
Kenny Daniel 2025-05-25 15:21:58 -07:00 committed by GitHub
parent 78f19aaf6d
commit 9a9519f0b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 63 additions and 43 deletions

@ -18,37 +18,40 @@ export function parquetPlan({ metadata, rowStart = 0, rowEnd = Infinity, columns
/** @type {GroupPlan[]} */
const groups = []
/** @type {ByteRange[]} */
const ranges = []
const fetches = []
// find which row groups to read
let groupStart = 0 // first row index of the current group
for (const rowGroup of metadata.row_groups) {
const groupEnd = groupStart + Number(rowGroup.num_rows)
const groupRows = Number(rowGroup.num_rows)
const groupEnd = groupStart + groupRows
// if row group overlaps with row range, add it to the plan
if (groupEnd >= rowStart && groupStart < rowEnd) {
/** @type {ByteRange[]} */
const plan = []
const ranges = []
// loop through each column chunk
for (const { file_path, meta_data } of rowGroup.columns) {
if (file_path) throw new Error('parquet file_path not supported')
if (!meta_data) throw new Error('parquet column metadata is undefined')
// add included columns to the plan
if (!columns || columns.includes(meta_data.path_in_schema[0])) {
plan.push(getColumnRange(meta_data))
ranges.push(getColumnRange(meta_data))
}
}
groups.push({ plan })
const selectStart = Math.max(rowStart - groupStart, 0)
const selectEnd = Math.min(rowEnd - groupStart, groupRows)
groups.push({ ranges, rowGroup, groupStart, groupRows, selectStart, selectEnd })
// map group plan to ranges
const groupSize = plan[plan.length - 1]?.endByte - plan[0]?.startByte
const groupSize = ranges[ranges.length - 1]?.endByte - ranges[0]?.startByte
if (!columns && groupSize < columnChunkAggregation) {
// full row group
ranges.push({
startByte: plan[0].startByte,
endByte: plan[plan.length - 1].endByte,
fetches.push({
startByte: ranges[0].startByte,
endByte: ranges[ranges.length - 1].endByte,
})
} else if (plan.length) {
concat(ranges, plan)
} else if (ranges.length) {
concat(fetches, ranges)
} else if (columns?.length) {
throw new Error(`parquet columns not found: ${columns.join(', ')}`)
}
@ -56,8 +59,9 @@ export function parquetPlan({ metadata, rowStart = 0, rowEnd = Infinity, columns
groupStart = groupEnd
}
if (!isFinite(rowEnd)) rowEnd = groupStart
return { ranges, groups }
return { metadata, rowStart, rowEnd, columns, fetches, groups }
}
/**
@ -79,19 +83,19 @@ export function getColumnRange({ dictionary_page_offset, data_page_offset, total
* @param {QueryPlan} plan
* @returns {AsyncBuffer}
*/
export function prefetchAsyncBuffer(file, { ranges }) {
export function prefetchAsyncBuffer(file, { fetches }) {
// fetch byte ranges from the file
const promises = ranges.map(({ startByte, endByte }) => file.slice(startByte, endByte))
const promises = fetches.map(({ startByte, endByte }) => file.slice(startByte, endByte))
return {
byteLength: file.byteLength,
slice(start, end = file.byteLength) {
// find matching slice
const index = ranges.findIndex(({ startByte, endByte }) => startByte <= start && end <= endByte)
const index = fetches.findIndex(({ startByte, endByte }) => startByte <= start && end <= endByte)
if (index < 0) throw new Error(`no prefetch for range [${start}, ${end}]`)
if (ranges[index].startByte !== start || ranges[index].endByte !== end) {
if (fetches[index].startByte !== start || fetches[index].endByte !== end) {
// slice a subrange of the prefetch
const startOffset = start - ranges[index].startByte
const endOffset = end - ranges[index].startByte
const startOffset = start - fetches[index].startByte
const endOffset = end - fetches[index].startByte
if (promises[index] instanceof Promise) {
return promises[index].then(buffer => buffer.slice(startOffset, endOffset))
} else {

@ -3,6 +3,9 @@ import { parquetPlan, prefetchAsyncBuffer } from './plan.js'
import { readRowGroup } from './rowgroup.js'
import { concat } from './utils.js'
/**
* @import {ParquetReadOptions} from '../src/types.d.ts'
*/
/**
* Read parquet data rows from a file-like object.
* Reads the minimal number of row groups and columns to satisfy the request.
@ -53,7 +56,3 @@ export async function parquetRead(options) {
if (onComplete) onComplete(rowData)
}
/**
* @import {DecodedArray, ParquetReadOptions, RowGroup, RowGroupSelect, SchemaTree} from '../src/types.d.ts'
*/

@ -15,12 +15,12 @@ import { flatten } from './utils.js'
export async function readRowGroup(options, rowGroup, groupStart) {
const { file, metadata, columns, rowStart = 0, rowEnd } = options
if (!metadata) throw new Error('parquet metadata not found')
const numRows = Number(rowGroup.num_rows)
const groupRows = Number(rowGroup.num_rows)
// indexes within the group to read:
const selectStart = Math.max(rowStart - groupStart, 0)
const selectEnd = Math.min((rowEnd ?? Infinity) - groupStart, numRows)
const selectEnd = Math.min((rowEnd ?? Infinity) - groupStart, groupRows)
/** @type {RowGroupSelect} */
const rowGroupSelect = { groupStart, selectStart, selectEnd, numRows }
const rowGroupSelect = { groupStart, selectStart, selectEnd, groupRows }
/** @type {Promise<void>[]} */
const promises = []

34
src/types.d.ts vendored

@ -58,17 +58,6 @@ export interface ByteRange {
endByte: number // exclusive
}
/**
* Query plan for which byte ranges to read.
*/
export interface QueryPlan {
ranges: ByteRange[] // byte ranges to fetch
groups: GroupPlan[] // byte ranges by row group
}
interface GroupPlan {
plan: ByteRange[]
}
export interface DataReader {
view: DataView
offset: number
@ -393,6 +382,27 @@ export type BoundaryOrder = 'UNORDERED' | 'ASCENDING' | 'DESCENDING'
export type ThriftObject = { [ key: `field_${number}` ]: ThriftType }
export type ThriftType = boolean | number | bigint | Uint8Array | ThriftType[] | ThriftObject
/**
* Query plan for which byte ranges to read.
*/
export interface QueryPlan {
metadata: FileMetaData
rowStart: number
rowEnd?: number
columns?: string[] // columns to read
fetches: ByteRange[] // byte ranges to fetch
groups: GroupPlan[] // byte ranges by row group
}
// Plan for one group
interface GroupPlan {
ranges: ByteRange[]
rowGroup: RowGroup // row group metadata
groupStart: number // row index of the first row in the group
selectStart: number // row index in the group to start reading
selectEnd: number // row index in the group to stop reading
groupRows: number
}
export interface ColumnDecoder {
columnName: string
type: ParquetType
@ -407,5 +417,5 @@ export interface RowGroupSelect {
groupStart: number // row index of the first row in the group
selectStart: number // row index in the group to start reading
selectEnd: number // row index in the group to stop reading
numRows: number
groupRows: number
}

@ -35,7 +35,7 @@ describe('readColumn', () => {
groupStart: 0,
selectStart: 0,
selectEnd,
numRows: expected.length,
groupRows: expected.length,
}
const result = readColumn(reader, rowGroupSelect, columnDecoder)
@ -65,7 +65,7 @@ describe('readColumn', () => {
groupStart: 0,
selectStart: 0,
selectEnd: Infinity,
numRows: Number(column.meta_data.num_values),
groupRows: Number(column.meta_data.num_values),
}
const columnData = readColumn(reader, rowGroupSelect, columnDecoder)

@ -8,20 +8,27 @@ describe('parquetPlan', () => {
const file = await asyncBufferFromFile('test/files/page_indexed.parquet')
const metadata = await parquetMetadataAsync(file)
const plan = parquetPlan({ file, metadata })
expect(plan).toEqual({
ranges: [
expect(plan).toMatchObject({
metadata,
rowStart: 0,
rowEnd: 200,
fetches: [
{ startByte: 4, endByte: 1166 },
{ startByte: 1166, endByte: 2326 },
],
groups: [
{
plan: [
groupRows: 100,
groupStart: 0,
ranges: [
{ startByte: 4, endByte: 832 },
{ startByte: 832, endByte: 1166 },
],
},
{
plan: [
groupRows: 100,
groupStart: 100,
ranges: [
{ startByte: 1166, endByte: 1998 },
{ startByte: 1998, endByte: 2326 },
],