diff --git a/src/column.js b/src/column.js index e50f8df..7174944 100644 --- a/src/column.js +++ b/src/column.js @@ -144,21 +144,10 @@ export function readPage(reader, header, columnDecoder, dictionary, previousChun } } -/** - * Find the start byte offset for a column chunk. - * - * @param {ColumnMetaData} columnMetadata - * @returns {[bigint, bigint]} byte offset range - */ -export function getColumnRange({ dictionary_page_offset, data_page_offset, total_compressed_size }) { - const columnOffset = dictionary_page_offset || data_page_offset - return [columnOffset, columnOffset + total_compressed_size] -} - /** * Read parquet header from a buffer. * - * @import {ColumnData, ColumnDecoder, ColumnMetaData, DataReader, DecodedArray, PageHeader, RowGroupSelect} from '../src/types.d.ts' + * @import {ColumnData, ColumnDecoder, DataReader, DecodedArray, PageHeader, RowGroupSelect} from '../src/types.d.ts' * @param {DataReader} reader * @returns {PageHeader} */ diff --git a/src/plan.js b/src/plan.js new file mode 100644 index 0000000..7c6e572 --- /dev/null +++ b/src/plan.js @@ -0,0 +1,104 @@ +import { concat } from './utils.js' + +// Combine column chunks into a single byte range if less than 32mb +const columnChunkAggregation = 1 << 25 // 32mb + +/** + * @import {AsyncBuffer, ByteRange, ColumnMetaData, GroupPlan, ParquetReadOptions, QueryPlan} from '../src/types.js' + */ +/** + * Plan which byte ranges to read to satisfy a read request. + * Metadata must be non-null. + * + * @param {ParquetReadOptions} options + * @returns {QueryPlan} + */ +export function parquetPlan({ metadata, rowStart = 0, rowEnd = Infinity, columns }) { + if (!metadata) throw new Error('parquetPlan requires metadata') + /** @type {GroupPlan[]} */ + const groups = [] + /** @type {ByteRange[]} */ + const ranges = [] + + // find which row groups to read + let groupStart = 0 // first row index of the current group + for (const rowGroup of metadata.row_groups) { + const groupEnd = groupStart + Number(rowGroup.num_rows) + // if row group overlaps with row range, add it to the plan + if (groupEnd >= rowStart && groupStart < rowEnd) { + /** @type {ByteRange[]} */ + const plan = [] + // loop through each column chunk + for (const { meta_data } of rowGroup.columns) { + if (!meta_data) throw new Error('parquet column metadata is undefined') + // add included columns to the plan + if (!columns || columns.includes(meta_data.path_in_schema[0])) { + plan.push(getColumnRange(meta_data)) + } + } + groups.push({ plan }) + + // map group plan to ranges + const groupSize = plan[plan.length - 1]?.endByte - plan[0]?.startByte + if (!columns && groupSize < columnChunkAggregation) { + // full row group + ranges.push({ + startByte: plan[0].startByte, + endByte: plan[plan.length - 1].endByte, + }) + } else if (plan.length) { + concat(ranges, plan) + } else if (columns?.length) { + throw new Error(`parquet columns not found: ${columns.join(', ')}`) + } + } + + groupStart = groupEnd + } + + return { ranges, groups } +} + +/** + * @param {ColumnMetaData} columnMetadata + * @returns {ByteRange} + */ +export function getColumnRange({ dictionary_page_offset, data_page_offset, total_compressed_size }) { + const columnOffset = dictionary_page_offset || data_page_offset + return { + startByte: Number(columnOffset), + endByte: Number(columnOffset + total_compressed_size), + } +} + +/** + * Prefetch byte ranges from an AsyncBuffer. + * + * @param {AsyncBuffer} file + * @param {QueryPlan} plan + * @returns {AsyncBuffer} + */ +export function prefetchAsyncBuffer(file, { ranges }) { + // fetch byte ranges from the file + const promises = ranges.map(({ startByte, endByte }) => file.slice(startByte, endByte)) + return { + byteLength: file.byteLength, + slice(start, end = file.byteLength) { + // find matching slice + const index = ranges.findIndex(({ startByte, endByte }) => startByte <= start && end <= endByte) + if (index < 0) throw new Error(`no prefetch for range [${start}, ${end}]`) + if (ranges[index].startByte !== start || ranges[index].endByte !== end) { + // slice a subrange of the prefetch + const startOffset = start - ranges[index].startByte + const endOffset = end - ranges[index].startByte + if (promises[index] instanceof Promise) { + return promises[index].then(buffer => buffer.slice(startOffset, endOffset)) + } else { + return promises[index].slice(startOffset, endOffset) + } + } else { + return promises[index] + } + }, + } +} diff --git a/src/read.js b/src/read.js index 533e274..fdb53e2 100644 --- a/src/read.js +++ b/src/read.js @@ -1,6 +1,7 @@ import { assembleNested } from './assemble.js' -import { getColumnRange, readColumn } from './column.js' +import { readColumn } from './column.js' import { parquetMetadataAsync } from './metadata.js' +import { getColumnRange, parquetPlan, prefetchAsyncBuffer } from './plan.js' import { getSchemaPath } from './schema.js' import { concat } from './utils.js' @@ -22,18 +23,20 @@ export async function parquetRead(options) { if (!options.file || !(options.file.byteLength >= 0)) { throw new Error('parquetRead expected file AsyncBuffer') } - const rowStart = options.rowStart || 0 - if (rowStart < 0) throw new Error('parquetRead rowStart must be postive') // load metadata if not provided - options.metadata ||= await parquetMetadataAsync(options.file) - if (!options.metadata) throw new Error('parquet metadata not found') + options.metadata ??= await parquetMetadataAsync(options.file) + const { metadata, onComplete, rowStart = 0, rowEnd } = options + if (rowStart < 0) throw new Error('parquetRead rowStart must be postive') + + // prefetch byte ranges + const plan = parquetPlan(options) + options.file = prefetchAsyncBuffer(options.file, plan) - const { metadata, onComplete, rowEnd } = options /** @type {any[][]} */ const rowData = [] - // find which row groups to read + // read row groups let groupStart = 0 // first row index of the current group for (const rowGroup of metadata.row_groups) { // number of rows in this row group @@ -64,37 +67,16 @@ export async function parquetRead(options) { * @returns {Promise} resolves to row data */ export async function readRowGroup(options, rowGroup, groupStart) { - const { file, metadata, columns, rowStart, rowEnd } = options + const { file, metadata, columns, rowStart = 0, rowEnd } = options if (!metadata) throw new Error('parquet metadata not found') const numRows = Number(rowGroup.num_rows) - // index within the group to start and stop reading: - const selectStart = Math.max((rowStart || 0) - groupStart, 0) + // indexes within the group to read: + const selectStart = Math.max(rowStart - groupStart, 0) const selectEnd = Math.min((rowEnd ?? Infinity) - groupStart, numRows) /** @type {RowGroupSelect} */ const rowGroupSelect = { groupStart, selectStart, selectEnd, numRows } - // loop through metadata to find min/max bytes to read - let [groupStartByte, groupEndByte] = [file.byteLength, 0] - for (const { meta_data } of rowGroup.columns) { - if (!meta_data) throw new Error('parquet column metadata is undefined') - // skip columns that are not requested - if (columns && !columns.includes(meta_data.path_in_schema[0])) continue - - const [columnStartByte, columnEndByte] = getColumnRange(meta_data).map(Number) - groupStartByte = Math.min(groupStartByte, columnStartByte) - groupEndByte = Math.max(groupEndByte, columnEndByte) - } - if (groupStartByte >= groupEndByte && columns?.length) { - throw new Error(`parquet columns not found: ${columns.join(', ')}`) - } - // if row group size is less than 32mb, pre-load in one read - let groupBuffer - if (groupEndByte - groupStartByte <= 1 << 25) { - // pre-load row group byte data in one big read, - // otherwise read column data individually - groupBuffer = await file.slice(groupStartByte, groupEndByte) - } - + /** @type {Promise[]} */ const promises = [] // top-level columns to assemble const { children } = getSchemaPath(metadata.schema, [])[0] @@ -110,33 +92,25 @@ export async function readRowGroup(options, rowGroup, groupStart) { const columnName = columnMetadata.path_in_schema[0] if (columns && !columns.includes(columnName)) continue - const [columnStartByte, columnEndByte] = getColumnRange(columnMetadata).map(Number) - const columnBytes = columnEndByte - columnStartByte + const { startByte, endByte } = getColumnRange(columnMetadata) + const columnBytes = endByte - startByte // skip columns larger than 1gb // TODO: stream process the data, returning only the requested rows if (columnBytes > 1 << 30) { - console.warn(`parquet skipping huge column "${columnMetadata.path_in_schema}" ${columnBytes.toLocaleString()} bytes`) + console.warn(`parquet skipping huge column "${columnMetadata.path_in_schema}" ${columnBytes} bytes`) // TODO: set column to new Error('parquet column too large') continue } - // use pre-loaded row group byte data if available, else read column data + // wrap awaitable to ensure it's a promise /** @type {Promise} */ - let buffer - let bufferOffset = 0 - if (groupBuffer) { - buffer = Promise.resolve(groupBuffer) - bufferOffset = columnStartByte - groupStartByte - } else { - // wrap awaitable to ensure it's a promise - buffer = Promise.resolve(file.slice(columnStartByte, columnEndByte)) - } + const buffer = Promise.resolve(file.slice(startByte, endByte)) // read column data async promises.push(buffer.then(arrayBuffer => { const schemaPath = getSchemaPath(metadata.schema, columnMetadata.path_in_schema) - const reader = { view: new DataView(arrayBuffer), offset: bufferOffset } + const reader = { view: new DataView(arrayBuffer), offset: 0 } const columnDecoder = { columnName: columnMetadata.path_in_schema.join('.'), type: columnMetadata.type, diff --git a/src/types.d.ts b/src/types.d.ts index f5e44db..68da9b6 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -42,7 +42,7 @@ export interface ColumnData { columnName: string columnData: DecodedArray rowStart: number - rowEnd: number + rowEnd: number // exclusive } /** @@ -53,6 +53,21 @@ export interface AsyncBuffer { slice(start: number, end?: number): Awaitable } export type Awaitable = T | Promise +export interface ByteRange { + startByte: number + endByte: number // exclusive +} + +/** + * Query plan for which byte ranges to read. + */ +export interface QueryPlan { + ranges: ByteRange[] // byte ranges to fetch + groups: GroupPlan[] // byte ranges by row group +} +interface GroupPlan { + plan: ByteRange[] +} export interface DataReader { view: DataView diff --git a/test/column.test.js b/test/column.test.js index de86d88..bc8b567 100644 --- a/test/column.test.js +++ b/test/column.test.js @@ -1,6 +1,7 @@ import { describe, expect, it } from 'vitest' -import { getColumnRange, readColumn } from '../src/column.js' +import { readColumn } from '../src/column.js' import { parquetMetadata } from '../src/hyparquet.js' +import { getColumnRange } from '../src/plan.js' import { getSchemaPath } from '../src/schema.js' import { asyncBufferFromFile } from '../src/utils.js' @@ -19,8 +20,8 @@ describe('readColumn', () => { const column = metadata.row_groups[0].columns[0] if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`) - const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number) - const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte) + const { startByte, endByte } = getColumnRange(column.meta_data) + const columnArrayBuffer = arrayBuffer.slice(startByte, endByte) const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? []) const reader = { view: new DataView(columnArrayBuffer), offset: 0 } const columnDecoder = { @@ -49,8 +50,8 @@ describe('readColumn', () => { const column = metadata.row_groups[0].columns[1] // second column if (!column.meta_data) throw new Error(`No column metadata for ${testFile}`) - const [columnStartByte, columnEndByte] = getColumnRange(column.meta_data).map(Number) - const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte) + const { startByte, endByte } = getColumnRange(column.meta_data) + const columnArrayBuffer = arrayBuffer.slice(startByte, endByte) const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? []) const reader = { view: new DataView(columnArrayBuffer), offset: 0 } const columnDecoder = { diff --git a/test/plan.test.js b/test/plan.test.js new file mode 100644 index 0000000..1bfdb5e --- /dev/null +++ b/test/plan.test.js @@ -0,0 +1,32 @@ +import { describe, expect, it } from 'vitest' +import { parquetMetadataAsync } from '../src/hyparquet.js' +import { asyncBufferFromFile } from '../src/utils.js' +import { parquetPlan } from '../src/plan.js' + +describe('parquetPlan', () => { + it('generates a query plan', async () => { + const file = await asyncBufferFromFile('test/files/page_indexed.parquet') + const metadata = await parquetMetadataAsync(file) + const plan = parquetPlan({ file, metadata }) + expect(plan).toEqual({ + ranges: [ + { startByte: 4, endByte: 1166 }, + { startByte: 1166, endByte: 2326 }, + ], + groups: [ + { + plan: [ + { startByte: 4, endByte: 832 }, + { startByte: 832, endByte: 1166 }, + ], + }, + { + plan: [ + { startByte: 1166, endByte: 1998 }, + { startByte: 1998, endByte: 2326 }, + ], + }, + ], + }) + }) +}) diff --git a/test/read.test.js b/test/read.test.js index 5540102..212a34b 100644 --- a/test/read.test.js +++ b/test/read.test.js @@ -187,6 +187,7 @@ describe('parquetRead', () => { /** @type {ColumnData[]} */ const pages = [] + // check onPage callback await parquetRead({ file, onPage(page) { @@ -194,7 +195,7 @@ describe('parquetRead', () => { }, }) - expect(pages).toEqual([ + const expectedPages = [ { columnName: 'row', columnData: Array.from({ length: 100 }, (_, i) => BigInt(i)), @@ -241,7 +242,13 @@ describe('parquetRead', () => { rowStart: 100, rowEnd: 200, }, - ]) + ] + + // expect each page to exist in expected + for (const expected of expectedPages) { + const page = pages.find(p => p.columnName === expected.columnName && p.rowStart === expected.rowStart) + expect(page).toEqual(expected) + } expect(file.fetches).toBe(3) // 1 metadata, 2 rowgroups }) })