From 4e2f76df09f7375a110c889078e582e82b9ebb49 Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Mon, 26 May 2025 17:27:15 -0700 Subject: [PATCH] parquetReadAsync (#83) --- src/hyparquet.js | 2 +- src/query.js | 5 +- src/read.js | 124 ++++++++++++++++++++------ src/rowgroup.js | 224 ++++++++++++++++++++++++----------------------- src/types.d.ts | 10 +++ 5 files changed, 224 insertions(+), 141 deletions(-) diff --git a/src/hyparquet.js b/src/hyparquet.js index ea5f2b8..893445a 100644 --- a/src/hyparquet.js +++ b/src/hyparquet.js @@ -4,7 +4,7 @@ export { parquetMetadata, parquetMetadataAsync, parquetSchema } from './metadata export { parquetRead } export { parquetQuery } from './query.js' export { snappyUncompress } from './snappy.js' -export { asyncBufferFromFile, asyncBufferFromUrl, byteLengthFromUrl, cachedAsyncBuffer, toJson } from './utils.js' +export { asyncBufferFromFile, asyncBufferFromUrl, byteLengthFromUrl, cachedAsyncBuffer, flatten, toJson } from './utils.js' /** * This is a helper function to read parquet row data as a promise. diff --git a/src/query.js b/src/query.js index 846c4cc..f40d495 100644 --- a/src/query.js +++ b/src/query.js @@ -1,5 +1,6 @@ import { parquetReadObjects } from './hyparquet.js' import { parquetMetadataAsync } from './metadata.js' +import { parquetReadColumn } from './read.js' import { equals } from './utils.js' /** @@ -45,11 +46,11 @@ export async function parquetQuery(options) { return results.slice(rowStart, rowEnd) } else if (typeof orderBy === 'string') { // sorted but unfiltered: fetch orderBy column first - const orderColumn = await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy] }) + const orderColumn = await parquetReadColumn({ ...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy] }) // compute row groups to fetch const sortedIndices = Array.from(orderColumn, (_, index) => index) - .sort((a, b) => compare(orderColumn[a][orderBy], orderColumn[b][orderBy])) + .sort((a, b) => compare(orderColumn[a], orderColumn[b])) .slice(rowStart, rowEnd) const sparseData = await parquetReadRows({ ...options, rows: sortedIndices }) diff --git a/src/read.js b/src/read.js index 1bbefd7..ae017c6 100644 --- a/src/read.js +++ b/src/read.js @@ -1,10 +1,10 @@ -import { parquetMetadataAsync } from './metadata.js' +import { parquetMetadataAsync, parquetSchema } from './metadata.js' import { parquetPlan, prefetchAsyncBuffer } from './plan.js' -import { readRowGroup } from './rowgroup.js' -import { concat } from './utils.js' +import { assembleAsync, asyncGroupToRows, readRowGroup } from './rowgroup.js' +import { concat, flatten } from './utils.js' /** - * @import {ParquetReadOptions} from '../src/types.d.ts' + * @import {AsyncBuffer, AsyncRowGroup, DecodedArray, FileMetaData, ParquetReadOptions} from '../src/types.js' */ /** * Read parquet data rows from a file-like object. @@ -21,34 +21,102 @@ import { concat } from './utils.js' export async function parquetRead(options) { // load metadata if not provided options.metadata ??= await parquetMetadataAsync(options.file) - const { metadata, onComplete, rowStart = 0, rowEnd } = options - if (rowStart < 0) throw new Error('parquetRead rowStart must be positive') + + // read row groups + const asyncGroups = await parquetReadAsync(options) + + const { rowStart = 0, rowEnd, columns, onChunk, onComplete, rowFormat } = options + + // skip assembly if no onComplete or onChunk, but wait for reading to finish + if (!onComplete && !onChunk) { + for (const { asyncColumns } of asyncGroups) { + for (const { data } of asyncColumns) await data + } + return + } + + // assemble struct columns + const schemaTree = parquetSchema(options.metadata) + const assembled = asyncGroups.map(arg => assembleAsync(arg, schemaTree)) + + // onChunk emit all chunks (don't await) + if (onChunk) { + for (const asyncGroup of assembled) { + for (const asyncColumn of asyncGroup.asyncColumns) { + asyncColumn.data.then(columnDatas => { + let rowStart = asyncGroup.groupStart + for (const columnData of columnDatas) { + onChunk({ + columnName: asyncColumn.pathInSchema[0], + columnData, + rowStart, + rowEnd: rowStart + columnData.length, + }) + rowStart += columnData.length + } + }) + } + } + } + + // onComplete transpose column chunks to rows + if (onComplete) { + /** @type {any[][]} */ + const rows = [] + for (const asyncGroup of assembled) { + // filter to rows in range + const selectStart = Math.max(rowStart - asyncGroup.groupStart, 0) + const selectEnd = Math.min((rowEnd ?? Infinity) - asyncGroup.groupStart, asyncGroup.groupRows) + // transpose column chunks to rows in output + const groupData = await asyncGroupToRows(asyncGroup, selectStart, selectEnd, columns, rowFormat) + concat(rows, groupData.slice(selectStart, selectEnd)) + } + onComplete(rows) + } else { + // wait for all async groups to finish (complete takes care of this) + for (const { asyncColumns } of assembled) { + for (const { data } of asyncColumns) await data + } + } +} + +/** + * @param {ParquetReadOptions} options read options + * @returns {AsyncRowGroup[]} + */ +export function parquetReadAsync(options) { + if (!options.metadata) throw new Error('parquet requires metadata') + // TODO: validate options (start, end, columns, etc) // prefetch byte ranges const plan = parquetPlan(options) options.file = prefetchAsyncBuffer(options.file, plan) - /** @type {any[][]} */ - const rowData = [] - // read row groups - let groupStart = 0 // first row index of the current group - for (const rowGroup of metadata.row_groups) { - // number of rows in this row group - const groupRows = Number(rowGroup.num_rows) - // if row group overlaps with row range, read it - if (groupStart + groupRows >= rowStart && (rowEnd === undefined || groupStart < rowEnd)) { - // read row group - const groupData = await readRowGroup(options, rowGroup, groupStart) - if (onComplete) { - // filter to rows in range - const start = Math.max(rowStart - groupStart, 0) - const end = rowEnd === undefined ? undefined : rowEnd - groupStart - concat(rowData, groupData.slice(start, end)) - } - } - groupStart += groupRows - } - - if (onComplete) onComplete(rowData) + return plan.groups.map(groupPlan => readRowGroup(options, plan, groupPlan)) +} + +/** + * Reads a single column from a parquet file. + * + * @param {ParquetReadOptions} options + * @returns {Promise} + */ +export async function parquetReadColumn(options) { + if (options.columns?.length !== 1) { + throw new Error('parquetReadColumn expected columns: [columnName]') + } + options.metadata ??= await parquetMetadataAsync(options.file) + const asyncGroups = parquetReadAsync(options) + + // assemble struct columns + const schemaTree = parquetSchema(options.metadata) + const assembled = asyncGroups.map(arg => assembleAsync(arg, schemaTree)) + + /** @type {DecodedArray[]} */ + const columnData = [] + for (const rg of assembled) { + columnData.push(flatten(await rg.asyncColumns[0].data)) + } + return flatten(columnData) } diff --git a/src/rowgroup.js b/src/rowgroup.js index 9f2580c..12fdcae 100644 --- a/src/rowgroup.js +++ b/src/rowgroup.js @@ -4,33 +4,25 @@ import { getColumnRange } from './plan.js' import { getSchemaPath } from './schema.js' import { flatten } from './utils.js' +/** + * @import {AsyncColumn, AsyncRowGroup, DecodedArray, GroupPlan, ParquetReadOptions, QueryPlan, RowGroup, SchemaTree} from './types.js' + */ /** * Read a row group from a file-like object. * - * @param {ParquetReadOptions} options read options - * @param {RowGroup} rowGroup row group to read - * @param {number} groupStart row index of the first row in the group - * @returns {Promise} resolves to row data + * @param {ParquetReadOptions} options + * @param {QueryPlan} plan + * @param {GroupPlan} groupPlan + * @returns {AsyncRowGroup} resolves to column data */ -export async function readRowGroup(options, rowGroup, groupStart) { - const { file, metadata, columns, rowStart = 0, rowEnd } = options - if (!metadata) throw new Error('parquet metadata not found') - const groupRows = Number(rowGroup.num_rows) - // indexes within the group to read: - const selectStart = Math.max(rowStart - groupStart, 0) - const selectEnd = Math.min((rowEnd ?? Infinity) - groupStart, groupRows) - /** @type {RowGroupSelect} */ - const rowGroupSelect = { groupStart, selectStart, selectEnd, groupRows } +export function readRowGroup(options, { metadata, columns }, groupPlan) { + const { file, compressors, utf8 } = options + + /** @type {AsyncColumn[]} */ + const asyncColumns = [] - /** @type {Promise[]} */ - const promises = [] - // top-level columns to assemble - const { children } = getSchemaPath(metadata.schema, [])[0] - const subcolumnNames = new Map(children.map(child => [child.element.name, getSubcolumns(child)])) - /** @type {Map} */ - const subcolumnData = new Map() // columns to assemble as maps // read column data - for (const { file_path, meta_data } of rowGroup.columns) { + for (const { file_path, meta_data } of groupPlan.rowGroup.columns) { if (file_path) throw new Error('parquet file_path not supported') if (!meta_data) throw new Error('parquet column metadata is undefined') @@ -54,102 +46,114 @@ export async function readRowGroup(options, rowGroup, groupStart) { const buffer = Promise.resolve(file.slice(startByte, endByte)) // read column data async - promises.push(buffer.then(arrayBuffer => { - const schemaPath = getSchemaPath(metadata.schema, meta_data.path_in_schema) - const reader = { view: new DataView(arrayBuffer), offset: 0 } - const subcolumn = meta_data.path_in_schema.join('.') - const columnDecoder = { - columnName: subcolumn, - type: meta_data.type, - element: schemaPath[schemaPath.length - 1].element, - schemaPath, - codec: meta_data.codec, - compressors: options.compressors, - utf8: options.utf8, - } - /** @type {DecodedArray[] | undefined} */ - let chunks = readColumn(reader, rowGroupSelect, columnDecoder, options.onPage) - - // skip assembly if no onComplete or onChunk - if (!options.onComplete && !options.onChunk) return - - // TODO: fast path for non-nested columns - // save column data for assembly - subcolumnData.set(subcolumn, chunks) - chunks = undefined - - const subcolumns = subcolumnNames.get(columnName) - if (subcolumns?.every(name => subcolumnData.has(name))) { - // For every subcolumn, flatten and assemble the column - const flatData = new Map(subcolumns.map(name => [name, flatten(subcolumnData.get(name))])) - assembleNested(flatData, schemaPath[1]) - const flatColumn = flatData.get(columnName) - if (!flatColumn) throw new Error(`parquet column data not assembled: ${columnName}`) - chunks = [flatColumn] - subcolumns.forEach(name => subcolumnData.delete(name)) - subcolumnData.set(columnName, chunks) - } - - // do not emit column data until structs are fully parsed - if (!chunks) return - // notify caller of column data - if (options.onChunk) { - for (const columnData of chunks) { - options.onChunk({ - columnName, - columnData, - rowStart: groupStart, - rowEnd: groupStart + columnData.length, - }) + asyncColumns.push({ + pathInSchema: meta_data.path_in_schema, + data: buffer.then(arrayBuffer => { + const schemaPath = getSchemaPath(metadata.schema, meta_data.path_in_schema) + const reader = { view: new DataView(arrayBuffer), offset: 0 } + const subcolumn = meta_data.path_in_schema.join('.') + const columnDecoder = { + columnName: subcolumn, + type: meta_data.type, + element: schemaPath[schemaPath.length - 1].element, + schemaPath, + codec: meta_data.codec, + compressors, + utf8, } - } - })) + return readColumn(reader, groupPlan, columnDecoder, options.onPage) + }), + }) } - await Promise.all(promises) - if (options.onComplete) { - const includedColumnNames = children - .map(child => child.element.name) - .filter(name => !columns || columns.includes(name)) - const columnOrder = columns || includedColumnNames - const includedColumns = columnOrder - .map(name => includedColumnNames.includes(name) ? flatten(subcolumnData.get(name)) : undefined) - // transpose columns into rows - const groupData = new Array(selectEnd) - for (let row = selectStart; row < selectEnd; row++) { - if (options.rowFormat === 'object') { - // return each row as an object - /** @type {Record} */ - const rowData = {} - for (let i = 0; i < columnOrder.length; i++) { - rowData[columnOrder[i]] = includedColumns[i]?.[row] - } - groupData[row] = rowData - } else { - // return each row as an array - groupData[row] = includedColumns.map(column => column?.[row]) - } - } - return groupData - } - return [] + return { groupStart: groupPlan.groupStart, groupRows: groupPlan.groupRows, asyncColumns } } /** - * Return a list of sub-columns needed to construct a top-level column. - * - * @import {DecodedArray, ParquetReadOptions, RowGroup, RowGroupSelect, SchemaTree} from '../src/types.d.ts' - * @param {SchemaTree} schema - * @param {string[]} output - * @returns {string[]} + * @param {AsyncRowGroup} asyncGroup + * @param {number} selectStart + * @param {number} selectEnd + * @param {string[] | undefined} columns + * @param {'object' | 'array'} [rowFormat] + * @returns {Promise[]>} resolves to row data */ -function getSubcolumns(schema, output = []) { - if (schema.children.length) { - for (const child of schema.children) { - getSubcolumns(child, output) +export async function asyncGroupToRows({ asyncColumns }, selectStart, selectEnd, columns, rowFormat) { + const groupData = new Array(selectEnd) + + // columnData[i] for asyncColumns[i] + // TODO: do it without flatten + const columnDatas = await Promise.all(asyncColumns.map(({ data }) => data.then(flatten))) + + // careful mapping of column order for rowFormat: array + const includedColumnNames = asyncColumns + .map(child => child.pathInSchema[0]) + .filter(name => !columns || columns.includes(name)) + const columnOrder = columns ?? includedColumnNames + const columnIndexes = columnOrder.map(name => asyncColumns.findIndex(column => column.pathInSchema[0] === name)) + + // transpose columns into rows + for (let row = selectStart; row < selectEnd; row++) { + if (rowFormat === 'object') { + // return each row as an object + /** @type {Record} */ + const rowData = {} + for (let i = 0; i < asyncColumns.length; i++) { + rowData[asyncColumns[i].pathInSchema[0]] = columnDatas[i][row] + } + groupData[row] = rowData + } else { + // return each row as an array + const rowData = new Array(asyncColumns.length) + for (let i = 0; i < columnOrder.length; i++) { + if (columnIndexes[i] >= 0) { + rowData[i] = columnDatas[columnIndexes[i]][row] + } + } + groupData[row] = rowData } - } else { - output.push(schema.path.join('.')) } - return output + return groupData +} + +/** + * Assemble physical columns into top-level columns asynchronously. + * + * @param {AsyncRowGroup} asyncRowGroup + * @param {SchemaTree} schemaTree + * @returns {AsyncRowGroup} + */ +export function assembleAsync(asyncRowGroup, schemaTree) { + const { asyncColumns } = asyncRowGroup + /** @type {AsyncColumn[]} */ + const assembled = [] + for (const child of schemaTree.children) { + if (child.children.length) { + const childColumns = asyncColumns.filter(column => column.pathInSchema[0] === child.element.name) + if (!childColumns.length) continue + + // wait for all child columns to be read + /** @type {Map} */ + const flatData = new Map() + const data = Promise.all(childColumns.map(column => { + return column.data.then(columnData => { + flatData.set(column.pathInSchema.join('.'), flatten(columnData)) + }) + })).then(() => { + // assemble the column + assembleNested(flatData, child) + const flatColumn = flatData.get(child.path.join('.')) + if (!flatColumn) throw new Error('parquet column data not assembled') + return [flatColumn] + }) + + assembled.push({ pathInSchema: child.path, data }) + } else { + // leaf node, return the column + const asyncColumn = asyncColumns.find(column => column.pathInSchema[0] === child.element.name) + if (asyncColumn) { + assembled.push(asyncColumn) + } + } + } + return { ...asyncRowGroup, asyncColumns: assembled } } diff --git a/src/types.d.ts b/src/types.d.ts index 75c4ab7..d110367 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -419,3 +419,13 @@ export interface RowGroupSelect { selectEnd: number // row index in the group to stop reading groupRows: number } + +export interface AsyncColumn { + pathInSchema: string[] + data: Promise +} +export interface AsyncRowGroup { + groupStart: number + groupRows: number + asyncColumns: AsyncColumn[] +}