From 9a9519f0b7f67319d250af146b5f03160fb0c479 Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Sun, 25 May 2025 15:21:58 -0700 Subject: [PATCH] Add more details to QueryPlan. (#82) - Add metadata - Add rowStart and rowEnd - Add columns - Add groupStart, selectStart, selectEnd, and groupRows to GroupPlan - Rename ranges to fetches - Rename numRows to groupRows in ColumnDecoder --- src/plan.js | 40 ++++++++++++++++++++++------------------ src/read.js | 7 +++---- src/rowgroup.js | 6 +++--- src/types.d.ts | 34 ++++++++++++++++++++++------------ test/column.test.js | 4 ++-- test/plan.test.js | 15 +++++++++++---- 6 files changed, 63 insertions(+), 43 deletions(-) diff --git a/src/plan.js b/src/plan.js index 11f7d84..a7ca7c8 100644 --- a/src/plan.js +++ b/src/plan.js @@ -18,37 +18,40 @@ export function parquetPlan({ metadata, rowStart = 0, rowEnd = Infinity, columns /** @type {GroupPlan[]} */ const groups = [] /** @type {ByteRange[]} */ - const ranges = [] + const fetches = [] // find which row groups to read let groupStart = 0 // first row index of the current group for (const rowGroup of metadata.row_groups) { - const groupEnd = groupStart + Number(rowGroup.num_rows) + const groupRows = Number(rowGroup.num_rows) + const groupEnd = groupStart + groupRows // if row group overlaps with row range, add it to the plan if (groupEnd >= rowStart && groupStart < rowEnd) { /** @type {ByteRange[]} */ - const plan = [] + const ranges = [] // loop through each column chunk for (const { file_path, meta_data } of rowGroup.columns) { if (file_path) throw new Error('parquet file_path not supported') if (!meta_data) throw new Error('parquet column metadata is undefined') // add included columns to the plan if (!columns || columns.includes(meta_data.path_in_schema[0])) { - plan.push(getColumnRange(meta_data)) + ranges.push(getColumnRange(meta_data)) } } - groups.push({ plan }) + const selectStart = Math.max(rowStart - groupStart, 0) + const selectEnd = Math.min(rowEnd - groupStart, groupRows) + groups.push({ ranges, rowGroup, groupStart, groupRows, selectStart, selectEnd }) // map group plan to ranges - const groupSize = plan[plan.length - 1]?.endByte - plan[0]?.startByte + const groupSize = ranges[ranges.length - 1]?.endByte - ranges[0]?.startByte if (!columns && groupSize < columnChunkAggregation) { // full row group - ranges.push({ - startByte: plan[0].startByte, - endByte: plan[plan.length - 1].endByte, + fetches.push({ + startByte: ranges[0].startByte, + endByte: ranges[ranges.length - 1].endByte, }) - } else if (plan.length) { - concat(ranges, plan) + } else if (ranges.length) { + concat(fetches, ranges) } else if (columns?.length) { throw new Error(`parquet columns not found: ${columns.join(', ')}`) } @@ -56,8 +59,9 @@ export function parquetPlan({ metadata, rowStart = 0, rowEnd = Infinity, columns groupStart = groupEnd } + if (!isFinite(rowEnd)) rowEnd = groupStart - return { ranges, groups } + return { metadata, rowStart, rowEnd, columns, fetches, groups } } /** @@ -79,19 +83,19 @@ export function getColumnRange({ dictionary_page_offset, data_page_offset, total * @param {QueryPlan} plan * @returns {AsyncBuffer} */ -export function prefetchAsyncBuffer(file, { ranges }) { +export function prefetchAsyncBuffer(file, { fetches }) { // fetch byte ranges from the file - const promises = ranges.map(({ startByte, endByte }) => file.slice(startByte, endByte)) + const promises = fetches.map(({ startByte, endByte }) => file.slice(startByte, endByte)) return { byteLength: file.byteLength, slice(start, end = file.byteLength) { // find matching slice - const index = ranges.findIndex(({ startByte, endByte }) => startByte <= start && end <= endByte) + const index = fetches.findIndex(({ startByte, endByte }) => startByte <= start && end <= endByte) if (index < 0) throw new Error(`no prefetch for range [${start}, ${end}]`) - if (ranges[index].startByte !== start || ranges[index].endByte !== end) { + if (fetches[index].startByte !== start || fetches[index].endByte !== end) { // slice a subrange of the prefetch - const startOffset = start - ranges[index].startByte - const endOffset = end - ranges[index].startByte + const startOffset = start - fetches[index].startByte + const endOffset = end - fetches[index].startByte if (promises[index] instanceof Promise) { return promises[index].then(buffer => buffer.slice(startOffset, endOffset)) } else { diff --git a/src/read.js b/src/read.js index 477e868..beb31af 100644 --- a/src/read.js +++ b/src/read.js @@ -3,6 +3,9 @@ import { parquetPlan, prefetchAsyncBuffer } from './plan.js' import { readRowGroup } from './rowgroup.js' import { concat } from './utils.js' +/** + * @import {ParquetReadOptions} from '../src/types.d.ts' + */ /** * Read parquet data rows from a file-like object. * Reads the minimal number of row groups and columns to satisfy the request. @@ -53,7 +56,3 @@ export async function parquetRead(options) { if (onComplete) onComplete(rowData) } - -/** - * @import {DecodedArray, ParquetReadOptions, RowGroup, RowGroupSelect, SchemaTree} from '../src/types.d.ts' - */ diff --git a/src/rowgroup.js b/src/rowgroup.js index 08bad5e..9f2580c 100644 --- a/src/rowgroup.js +++ b/src/rowgroup.js @@ -15,12 +15,12 @@ import { flatten } from './utils.js' export async function readRowGroup(options, rowGroup, groupStart) { const { file, metadata, columns, rowStart = 0, rowEnd } = options if (!metadata) throw new Error('parquet metadata not found') - const numRows = Number(rowGroup.num_rows) + const groupRows = Number(rowGroup.num_rows) // indexes within the group to read: const selectStart = Math.max(rowStart - groupStart, 0) - const selectEnd = Math.min((rowEnd ?? Infinity) - groupStart, numRows) + const selectEnd = Math.min((rowEnd ?? Infinity) - groupStart, groupRows) /** @type {RowGroupSelect} */ - const rowGroupSelect = { groupStart, selectStart, selectEnd, numRows } + const rowGroupSelect = { groupStart, selectStart, selectEnd, groupRows } /** @type {Promise[]} */ const promises = [] diff --git a/src/types.d.ts b/src/types.d.ts index 10b12ef..75c4ab7 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -58,17 +58,6 @@ export interface ByteRange { endByte: number // exclusive } -/** - * Query plan for which byte ranges to read. - */ -export interface QueryPlan { - ranges: ByteRange[] // byte ranges to fetch - groups: GroupPlan[] // byte ranges by row group -} -interface GroupPlan { - plan: ByteRange[] -} - export interface DataReader { view: DataView offset: number @@ -393,6 +382,27 @@ export type BoundaryOrder = 'UNORDERED' | 'ASCENDING' | 'DESCENDING' export type ThriftObject = { [ key: `field_${number}` ]: ThriftType } export type ThriftType = boolean | number | bigint | Uint8Array | ThriftType[] | ThriftObject +/** + * Query plan for which byte ranges to read. + */ +export interface QueryPlan { + metadata: FileMetaData + rowStart: number + rowEnd?: number + columns?: string[] // columns to read + fetches: ByteRange[] // byte ranges to fetch + groups: GroupPlan[] // byte ranges by row group +} +// Plan for one group +interface GroupPlan { + ranges: ByteRange[] + rowGroup: RowGroup // row group metadata + groupStart: number // row index of the first row in the group + selectStart: number // row index in the group to start reading + selectEnd: number // row index in the group to stop reading + groupRows: number +} + export interface ColumnDecoder { columnName: string type: ParquetType @@ -407,5 +417,5 @@ export interface RowGroupSelect { groupStart: number // row index of the first row in the group selectStart: number // row index in the group to start reading selectEnd: number // row index in the group to stop reading - numRows: number + groupRows: number } diff --git a/test/column.test.js b/test/column.test.js index bc8b567..47800c4 100644 --- a/test/column.test.js +++ b/test/column.test.js @@ -35,7 +35,7 @@ describe('readColumn', () => { groupStart: 0, selectStart: 0, selectEnd, - numRows: expected.length, + groupRows: expected.length, } const result = readColumn(reader, rowGroupSelect, columnDecoder) @@ -65,7 +65,7 @@ describe('readColumn', () => { groupStart: 0, selectStart: 0, selectEnd: Infinity, - numRows: Number(column.meta_data.num_values), + groupRows: Number(column.meta_data.num_values), } const columnData = readColumn(reader, rowGroupSelect, columnDecoder) diff --git a/test/plan.test.js b/test/plan.test.js index 1bfdb5e..d7be474 100644 --- a/test/plan.test.js +++ b/test/plan.test.js @@ -8,20 +8,27 @@ describe('parquetPlan', () => { const file = await asyncBufferFromFile('test/files/page_indexed.parquet') const metadata = await parquetMetadataAsync(file) const plan = parquetPlan({ file, metadata }) - expect(plan).toEqual({ - ranges: [ + expect(plan).toMatchObject({ + metadata, + rowStart: 0, + rowEnd: 200, + fetches: [ { startByte: 4, endByte: 1166 }, { startByte: 1166, endByte: 2326 }, ], groups: [ { - plan: [ + groupRows: 100, + groupStart: 0, + ranges: [ { startByte: 4, endByte: 832 }, { startByte: 832, endByte: 1166 }, ], }, { - plan: [ + groupRows: 100, + groupStart: 100, + ranges: [ { startByte: 1166, endByte: 1998 }, { startByte: 1998, endByte: 2326 }, ],