Parquet Query Planner: plan byte ranges, pre-fetch in parallel (#75)

* Parquet Query Planner: plan byte ranges, pre-fetch in parallel.

 - parquetPlan() that returns lists of byte ranges to fetch.
 - prefetchAsyncBuffer() pre-fetches all byte ranges in parallel.
   throws exception if non-pre-fetched slice is requested later.
This commit is contained in:
Kenny Daniel 2025-04-30 00:49:40 -07:00 committed by GitHub
parent 1d65bc68bb
commit 0e6d7dee6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 188 additions and 66 deletions

@ -144,21 +144,10 @@ export function readPage(reader, header, columnDecoder, dictionary, previousChun
}
}
/**
* Find the start byte offset for a column chunk.
*
* @param {ColumnMetaData} columnMetadata
* @returns {[bigint, bigint]} byte offset range
*/
export function getColumnRange({ dictionary_page_offset, data_page_offset, total_compressed_size }) {
const columnOffset = dictionary_page_offset || data_page_offset
return [columnOffset, columnOffset + total_compressed_size]
}
/**
* Read parquet header from a buffer.
*
* @import {ColumnData, ColumnDecoder, ColumnMetaData, DataReader, DecodedArray, PageHeader, RowGroupSelect} from '../src/types.d.ts'
* @import {ColumnData, ColumnDecoder, DataReader, DecodedArray, PageHeader, RowGroupSelect} from '../src/types.d.ts'
* @param {DataReader} reader
* @returns {PageHeader}
*/

104
src/plan.js Normal file

@ -0,0 +1,104 @@
import { concat } from './utils.js'
// Combine column chunks into a single byte range if less than 32mb
const columnChunkAggregation = 1 << 25 // 32mb
/**
* @import {AsyncBuffer, ByteRange, ColumnMetaData, GroupPlan, ParquetReadOptions, QueryPlan} from '../src/types.js'
*/
/**
* Plan which byte ranges to read to satisfy a read request.
* Metadata must be non-null.
*
* @param {ParquetReadOptions} options
* @returns {QueryPlan}
*/
export function parquetPlan({ metadata, rowStart = 0, rowEnd = Infinity, columns }) {
if (!metadata) throw new Error('parquetPlan requires metadata')
/** @type {GroupPlan[]} */
const groups = []
/** @type {ByteRange[]} */
const ranges = []
// find which row groups to read
let groupStart = 0 // first row index of the current group
for (const rowGroup of metadata.row_groups) {
const groupEnd = groupStart + Number(rowGroup.num_rows)
// if row group overlaps with row range, add it to the plan
if (groupEnd >= rowStart && groupStart < rowEnd) {
/** @type {ByteRange[]} */
const plan = []
// loop through each column chunk
for (const { meta_data } of rowGroup.columns) {
if (!meta_data) throw new Error('parquet column metadata is undefined')
// add included columns to the plan
if (!columns || columns.includes(meta_data.path_in_schema[0])) {
plan.push(getColumnRange(meta_data))
}
}
groups.push({ plan })
// map group plan to ranges
const groupSize = plan[plan.length - 1]?.endByte - plan[0]?.startByte
if (!columns && groupSize < columnChunkAggregation) {
// full row group
ranges.push({
startByte: plan[0].startByte,
endByte: plan[plan.length - 1].endByte,
})
} else if (plan.length) {
concat(ranges, plan)
} else if (columns?.length) {
throw new Error(`parquet columns not found: ${columns.join(', ')}`)
}
}
groupStart = groupEnd
}
return { ranges, groups }
}
/**
* @param {ColumnMetaData} columnMetadata
* @returns {ByteRange}
*/
export function getColumnRange({ dictionary_page_offset, data_page_offset, total_compressed_size }) {
const columnOffset = dictionary_page_offset || data_page_offset
return {
startByte: Number(columnOffset),
endByte: Number(columnOffset + total_compressed_size),
}
}
/**
* Prefetch byte ranges from an AsyncBuffer.
*
* @param {AsyncBuffer} file
* @param {QueryPlan} plan
* @returns {AsyncBuffer}
*/
export function prefetchAsyncBuffer(file, { ranges }) {
// fetch byte ranges from the file
const promises = ranges.map(({ startByte, endByte }) => file.slice(startByte, endByte))
return {
byteLength: file.byteLength,
slice(start, end = file.byteLength) {
// find matching slice
const index = ranges.findIndex(({ startByte, endByte }) => startByte <= start && end <= endByte)
if (index < 0) throw new Error(`no prefetch for range [${start}, ${end}]`)
if (ranges[index].startByte !== start || ranges[index].endByte !== end) {
// slice a subrange of the prefetch
const startOffset = start - ranges[index].startByte
const endOffset = end - ranges[index].startByte
if (promises[index] instanceof Promise) {
return promises[index].then(buffer => buffer.slice(startOffset, endOffset))
} else {
return promises[index].slice(startOffset, endOffset)
}
} else {
return promises[index]
}
},
}
}

@ -1,6 +1,7 @@
import { assembleNested } from './assemble.js'
import { getColumnRange, readColumn } from './column.js'
import { readColumn } from './column.js'
import { parquetMetadataAsync } from './metadata.js'
import { getColumnRange, parquetPlan, prefetchAsyncBuffer } from './plan.js'
import { getSchemaPath } from './schema.js'
import { concat } from './utils.js'
@ -22,18 +23,20 @@ export async function parquetRead(options) {
if (!options.file || !(options.file.byteLength >= 0)) {
throw new Error('parquetRead expected file AsyncBuffer')
}
const rowStart = options.rowStart || 0
if (rowStart < 0) throw new Error('parquetRead rowStart must be postive')
// load metadata if not provided
options.metadata ||= await parquetMetadataAsync(options.file)
if (!options.metadata) throw new Error('parquet metadata not found')
options.metadata ??= await parquetMetadataAsync(options.file)
const { metadata, onComplete, rowStart = 0, rowEnd } = options
if (rowStart < 0) throw new Error('parquetRead rowStart must be postive')
// prefetch byte ranges
const plan = parquetPlan(options)
options.file = prefetchAsyncBuffer(options.file, plan)
const { metadata, onComplete, rowEnd } = options
/** @type {any[][]} */
const rowData = []
// find which row groups to read
// read row groups
let groupStart = 0 // first row index of the current group
for (const rowGroup of metadata.row_groups) {
// number of rows in this row group
@ -64,37 +67,16 @@ export async function parquetRead(options) {
* @returns {Promise<any[][]>} resolves to row data
*/
export async function readRowGroup(options, rowGroup, groupStart) {
const { file, metadata, columns, rowStart, rowEnd } = options
const { file, metadata, columns, rowStart = 0, rowEnd } = options
if (!metadata) throw new Error('parquet metadata not found')
const numRows = Number(rowGroup.num_rows)
// index within the group to start and stop reading:
const selectStart = Math.max((rowStart || 0) - groupStart, 0)
// indexes within the group to read:
const selectStart = Math.max(rowStart - groupStart, 0)
const selectEnd = Math.min((rowEnd ?? Infinity) - groupStart, numRows)
/** @type {RowGroupSelect} */
const rowGroupSelect = { groupStart, selectStart, selectEnd, numRows }
// loop through metadata to find min/max bytes to read
let [groupStartByte, groupEndByte] = [file.byteLength, 0]
for (const { meta_data } of rowGroup.columns) {
if (!meta_data) throw new Error('parquet column metadata is undefined')
// skip columns that are not requested
if (columns && !columns.includes(meta_data.path_in_schema[0])) continue
const [columnStartByte, columnEndByte] = getColumnRange(meta_data).map(Number)
groupStartByte = Math.min(groupStartByte, columnStartByte)
groupEndByte = Math.max(groupEndByte, columnEndByte)
}
if (groupStartByte >= groupEndByte && columns?.length) {
throw new Error(`parquet columns not found: ${columns.join(', ')}`)
}
// if row group size is less than 32mb, pre-load in one read
let groupBuffer
if (groupEndByte - groupStartByte <= 1 << 25) {
// pre-load row group byte data in one big read,
// otherwise read column data individually
groupBuffer = await file.slice(groupStartByte, groupEndByte)
}
/** @type {Promise<void>[]} */
const promises = []
// top-level columns to assemble
const { children } = getSchemaPath(metadata.schema, [])[0]
@ -110,33 +92,25 @@ export async function readRowGroup(options, rowGroup, groupStart) {
const columnName = columnMetadata.path_in_schema[0]
if (columns && !columns.includes(columnName)) continue
const [columnStartByte, columnEndByte] = getColumnRange(columnMetadata).map(Number)
const columnBytes = columnEndByte - columnStartByte
const { startByte, endByte } = getColumnRange(columnMetadata)
const columnBytes = endByte - startByte
// skip columns larger than 1gb
// TODO: stream process the data, returning only the requested rows
if (columnBytes > 1 << 30) {
console.warn(`parquet skipping huge column "${columnMetadata.path_in_schema}" ${columnBytes.toLocaleString()} bytes`)
console.warn(`parquet skipping huge column "${columnMetadata.path_in_schema}" ${columnBytes} bytes`)
// TODO: set column to new Error('parquet column too large')
continue
}
// use pre-loaded row group byte data if available, else read column data
// wrap awaitable to ensure it's a promise
/** @type {Promise<ArrayBuffer>} */
let buffer
let bufferOffset = 0
if (groupBuffer) {
buffer = Promise.resolve(groupBuffer)
bufferOffset = columnStartByte - groupStartByte
} else {
// wrap awaitable to ensure it's a promise
buffer = Promise.resolve(file.slice(columnStartByte, columnEndByte))
}
const buffer = Promise.resolve(file.slice(startByte, endByte))
// read column data async
promises.push(buffer.then(arrayBuffer => {
const schemaPath = getSchemaPath(metadata.schema, columnMetadata.path_in_schema)
const reader = { view: new DataView(arrayBuffer), offset: bufferOffset }
const reader = { view: new DataView(arrayBuffer), offset: 0 }
const columnDecoder = {
columnName: columnMetadata.path_in_schema.join('.'),
type: columnMetadata.type,

17
src/types.d.ts vendored

@ -42,7 +42,7 @@ export interface ColumnData {
columnName: string
columnData: DecodedArray
rowStart: number
rowEnd: number
rowEnd: number // exclusive
}
/**
@ -53,6 +53,21 @@ export interface AsyncBuffer {
slice(start: number, end?: number): Awaitable<ArrayBuffer>
}
export type Awaitable<T> = T | Promise<T>
export interface ByteRange {
startByte: number
endByte: number // exclusive
}
/**
* Query plan for which byte ranges to read.
*/
export interface QueryPlan {
ranges: ByteRange[] // byte ranges to fetch
groups: GroupPlan[] // byte ranges by row group
}
interface GroupPlan {
plan: ByteRange[]
}
export interface DataReader {
view: DataView

@ -1,6 +1,7 @@
import { describe, expect, it } from 'vitest'
import { getColumnRange, readColumn } from '../src/column.js'
import { readColumn } from '../src/column.js'
import { parquetMetadata } from '../src/hyparquet.js'
import { getColumnRange } from '../src/plan.js'
import { getSchemaPath } from '../src/schema.js'
import { asyncBufferFromFile } from '../src/utils.js'
@ -19,8 +20,8 @@ describe('readColumn', () => {
const column = metadata.row_groups[0].columns[0]
if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`)
const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number)
const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte)
const { startByte, endByte } = getColumnRange(column.meta_data)
const columnArrayBuffer = arrayBuffer.slice(startByte, endByte)
const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? [])
const reader = { view: new DataView(columnArrayBuffer), offset: 0 }
const columnDecoder = {
@ -49,8 +50,8 @@ describe('readColumn', () => {
const column = metadata.row_groups[0].columns[1] // second column
if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`)
const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number)
const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte)
const { startByte, endByte } = getColumnRange(column.meta_data)
const columnArrayBuffer = arrayBuffer.slice(startByte, endByte)
const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? [])
const reader = { view: new DataView(columnArrayBuffer), offset: 0 }
const columnDecoder = {

32
test/plan.test.js Normal file

@ -0,0 +1,32 @@
import { describe, expect, it } from 'vitest'
import { parquetMetadataAsync } from '../src/hyparquet.js'
import { asyncBufferFromFile } from '../src/utils.js'
import { parquetPlan } from '../src/plan.js'
describe('parquetPlan', () => {
it('generates a query plan', async () => {
const file = await asyncBufferFromFile('test/files/page_indexed.parquet')
const metadata = await parquetMetadataAsync(file)
const plan = parquetPlan({ file, metadata })
expect(plan).toEqual({
ranges: [
{ startByte: 4, endByte: 1166 },
{ startByte: 1166, endByte: 2326 },
],
groups: [
{
plan: [
{ startByte: 4, endByte: 832 },
{ startByte: 832, endByte: 1166 },
],
},
{
plan: [
{ startByte: 1166, endByte: 1998 },
{ startByte: 1998, endByte: 2326 },
],
},
],
})
})
})

@ -187,6 +187,7 @@ describe('parquetRead', () => {
/** @type {ColumnData[]} */
const pages = []
// check onPage callback
await parquetRead({
file,
onPage(page) {
@ -194,7 +195,7 @@ describe('parquetRead', () => {
},
})
expect(pages).toEqual([
const expectedPages = [
{
columnName: 'row',
columnData: Array.from({ length: 100 }, (_, i) => BigInt(i)),
@ -241,7 +242,13 @@ describe('parquetRead', () => {
rowStart: 100,
rowEnd: 200,
},
])
]
// expect each page to exist in expected
for (const expected of expectedPages) {
const page = pages.find(p => p.columnName === expected.columnName && p.rowStart === expected.rowStart)
expect(page).toEqual(expected)
}
expect(file.fetches).toBe(3) // 1 metadata, 2 rowgroups
})
})