parquetReadAsync (#83)

This commit is contained in:
Kenny Daniel 2025-05-26 17:27:15 -07:00 committed by GitHub
parent bf6ac3b644
commit 4e2f76df09
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 224 additions and 141 deletions

@ -4,7 +4,7 @@ export { parquetMetadata, parquetMetadataAsync, parquetSchema } from './metadata
export { parquetRead }
export { parquetQuery } from './query.js'
export { snappyUncompress } from './snappy.js'
export { asyncBufferFromFile, asyncBufferFromUrl, byteLengthFromUrl, cachedAsyncBuffer, toJson } from './utils.js'
export { asyncBufferFromFile, asyncBufferFromUrl, byteLengthFromUrl, cachedAsyncBuffer, flatten, toJson } from './utils.js'
/**
* This is a helper function to read parquet row data as a promise.

@ -1,5 +1,6 @@
import { parquetReadObjects } from './hyparquet.js'
import { parquetMetadataAsync } from './metadata.js'
import { parquetReadColumn } from './read.js'
import { equals } from './utils.js'
/**
@ -45,11 +46,11 @@ export async function parquetQuery(options) {
return results.slice(rowStart, rowEnd)
} else if (typeof orderBy === 'string') {
// sorted but unfiltered: fetch orderBy column first
const orderColumn = await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy] })
const orderColumn = await parquetReadColumn({ ...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy] })
// compute row groups to fetch
const sortedIndices = Array.from(orderColumn, (_, index) => index)
.sort((a, b) => compare(orderColumn[a][orderBy], orderColumn[b][orderBy]))
.sort((a, b) => compare(orderColumn[a], orderColumn[b]))
.slice(rowStart, rowEnd)
const sparseData = await parquetReadRows({ ...options, rows: sortedIndices })

@ -1,10 +1,10 @@
import { parquetMetadataAsync } from './metadata.js'
import { parquetMetadataAsync, parquetSchema } from './metadata.js'
import { parquetPlan, prefetchAsyncBuffer } from './plan.js'
import { readRowGroup } from './rowgroup.js'
import { concat } from './utils.js'
import { assembleAsync, asyncGroupToRows, readRowGroup } from './rowgroup.js'
import { concat, flatten } from './utils.js'
/**
* @import {ParquetReadOptions} from '../src/types.d.ts'
* @import {AsyncBuffer, AsyncRowGroup, DecodedArray, FileMetaData, ParquetReadOptions} from '../src/types.js'
*/
/**
* Read parquet data rows from a file-like object.
@ -21,34 +21,102 @@ import { concat } from './utils.js'
export async function parquetRead(options) {
// load metadata if not provided
options.metadata ??= await parquetMetadataAsync(options.file)
const { metadata, onComplete, rowStart = 0, rowEnd } = options
if (rowStart < 0) throw new Error('parquetRead rowStart must be positive')
// read row groups
const asyncGroups = await parquetReadAsync(options)
const { rowStart = 0, rowEnd, columns, onChunk, onComplete, rowFormat } = options
// skip assembly if no onComplete or onChunk, but wait for reading to finish
if (!onComplete && !onChunk) {
for (const { asyncColumns } of asyncGroups) {
for (const { data } of asyncColumns) await data
}
return
}
// assemble struct columns
const schemaTree = parquetSchema(options.metadata)
const assembled = asyncGroups.map(arg => assembleAsync(arg, schemaTree))
// onChunk emit all chunks (don't await)
if (onChunk) {
for (const asyncGroup of assembled) {
for (const asyncColumn of asyncGroup.asyncColumns) {
asyncColumn.data.then(columnDatas => {
let rowStart = asyncGroup.groupStart
for (const columnData of columnDatas) {
onChunk({
columnName: asyncColumn.pathInSchema[0],
columnData,
rowStart,
rowEnd: rowStart + columnData.length,
})
rowStart += columnData.length
}
})
}
}
}
// onComplete transpose column chunks to rows
if (onComplete) {
/** @type {any[][]} */
const rows = []
for (const asyncGroup of assembled) {
// filter to rows in range
const selectStart = Math.max(rowStart - asyncGroup.groupStart, 0)
const selectEnd = Math.min((rowEnd ?? Infinity) - asyncGroup.groupStart, asyncGroup.groupRows)
// transpose column chunks to rows in output
const groupData = await asyncGroupToRows(asyncGroup, selectStart, selectEnd, columns, rowFormat)
concat(rows, groupData.slice(selectStart, selectEnd))
}
onComplete(rows)
} else {
// wait for all async groups to finish (complete takes care of this)
for (const { asyncColumns } of assembled) {
for (const { data } of asyncColumns) await data
}
}
}
/**
* @param {ParquetReadOptions} options read options
* @returns {AsyncRowGroup[]}
*/
export function parquetReadAsync(options) {
if (!options.metadata) throw new Error('parquet requires metadata')
// TODO: validate options (start, end, columns, etc)
// prefetch byte ranges
const plan = parquetPlan(options)
options.file = prefetchAsyncBuffer(options.file, plan)
/** @type {any[][]} */
const rowData = []
// read row groups
let groupStart = 0 // first row index of the current group
for (const rowGroup of metadata.row_groups) {
// number of rows in this row group
const groupRows = Number(rowGroup.num_rows)
// if row group overlaps with row range, read it
if (groupStart + groupRows >= rowStart && (rowEnd === undefined || groupStart < rowEnd)) {
// read row group
const groupData = await readRowGroup(options, rowGroup, groupStart)
if (onComplete) {
// filter to rows in range
const start = Math.max(rowStart - groupStart, 0)
const end = rowEnd === undefined ? undefined : rowEnd - groupStart
concat(rowData, groupData.slice(start, end))
}
}
groupStart += groupRows
}
if (onComplete) onComplete(rowData)
return plan.groups.map(groupPlan => readRowGroup(options, plan, groupPlan))
}
/**
* Reads a single column from a parquet file.
*
* @param {ParquetReadOptions} options
* @returns {Promise<DecodedArray>}
*/
export async function parquetReadColumn(options) {
if (options.columns?.length !== 1) {
throw new Error('parquetReadColumn expected columns: [columnName]')
}
options.metadata ??= await parquetMetadataAsync(options.file)
const asyncGroups = parquetReadAsync(options)
// assemble struct columns
const schemaTree = parquetSchema(options.metadata)
const assembled = asyncGroups.map(arg => assembleAsync(arg, schemaTree))
/** @type {DecodedArray[]} */
const columnData = []
for (const rg of assembled) {
columnData.push(flatten(await rg.asyncColumns[0].data))
}
return flatten(columnData)
}

@ -4,33 +4,25 @@ import { getColumnRange } from './plan.js'
import { getSchemaPath } from './schema.js'
import { flatten } from './utils.js'
/**
* @import {AsyncColumn, AsyncRowGroup, DecodedArray, GroupPlan, ParquetReadOptions, QueryPlan, RowGroup, SchemaTree} from './types.js'
*/
/**
* Read a row group from a file-like object.
*
* @param {ParquetReadOptions} options read options
* @param {RowGroup} rowGroup row group to read
* @param {number} groupStart row index of the first row in the group
* @returns {Promise<any[][]>} resolves to row data
* @param {ParquetReadOptions} options
* @param {QueryPlan} plan
* @param {GroupPlan} groupPlan
* @returns {AsyncRowGroup} resolves to column data
*/
export async function readRowGroup(options, rowGroup, groupStart) {
const { file, metadata, columns, rowStart = 0, rowEnd } = options
if (!metadata) throw new Error('parquet metadata not found')
const groupRows = Number(rowGroup.num_rows)
// indexes within the group to read:
const selectStart = Math.max(rowStart - groupStart, 0)
const selectEnd = Math.min((rowEnd ?? Infinity) - groupStart, groupRows)
/** @type {RowGroupSelect} */
const rowGroupSelect = { groupStart, selectStart, selectEnd, groupRows }
export function readRowGroup(options, { metadata, columns }, groupPlan) {
const { file, compressors, utf8 } = options
/** @type {AsyncColumn[]} */
const asyncColumns = []
/** @type {Promise<void>[]} */
const promises = []
// top-level columns to assemble
const { children } = getSchemaPath(metadata.schema, [])[0]
const subcolumnNames = new Map(children.map(child => [child.element.name, getSubcolumns(child)]))
/** @type {Map<string, DecodedArray[]>} */
const subcolumnData = new Map() // columns to assemble as maps
// read column data
for (const { file_path, meta_data } of rowGroup.columns) {
for (const { file_path, meta_data } of groupPlan.rowGroup.columns) {
if (file_path) throw new Error('parquet file_path not supported')
if (!meta_data) throw new Error('parquet column metadata is undefined')
@ -54,102 +46,114 @@ export async function readRowGroup(options, rowGroup, groupStart) {
const buffer = Promise.resolve(file.slice(startByte, endByte))
// read column data async
promises.push(buffer.then(arrayBuffer => {
const schemaPath = getSchemaPath(metadata.schema, meta_data.path_in_schema)
const reader = { view: new DataView(arrayBuffer), offset: 0 }
const subcolumn = meta_data.path_in_schema.join('.')
const columnDecoder = {
columnName: subcolumn,
type: meta_data.type,
element: schemaPath[schemaPath.length - 1].element,
schemaPath,
codec: meta_data.codec,
compressors: options.compressors,
utf8: options.utf8,
}
/** @type {DecodedArray[] | undefined} */
let chunks = readColumn(reader, rowGroupSelect, columnDecoder, options.onPage)
// skip assembly if no onComplete or onChunk
if (!options.onComplete && !options.onChunk) return
// TODO: fast path for non-nested columns
// save column data for assembly
subcolumnData.set(subcolumn, chunks)
chunks = undefined
const subcolumns = subcolumnNames.get(columnName)
if (subcolumns?.every(name => subcolumnData.has(name))) {
// For every subcolumn, flatten and assemble the column
const flatData = new Map(subcolumns.map(name => [name, flatten(subcolumnData.get(name))]))
assembleNested(flatData, schemaPath[1])
const flatColumn = flatData.get(columnName)
if (!flatColumn) throw new Error(`parquet column data not assembled: ${columnName}`)
chunks = [flatColumn]
subcolumns.forEach(name => subcolumnData.delete(name))
subcolumnData.set(columnName, chunks)
}
// do not emit column data until structs are fully parsed
if (!chunks) return
// notify caller of column data
if (options.onChunk) {
for (const columnData of chunks) {
options.onChunk({
columnName,
columnData,
rowStart: groupStart,
rowEnd: groupStart + columnData.length,
})
asyncColumns.push({
pathInSchema: meta_data.path_in_schema,
data: buffer.then(arrayBuffer => {
const schemaPath = getSchemaPath(metadata.schema, meta_data.path_in_schema)
const reader = { view: new DataView(arrayBuffer), offset: 0 }
const subcolumn = meta_data.path_in_schema.join('.')
const columnDecoder = {
columnName: subcolumn,
type: meta_data.type,
element: schemaPath[schemaPath.length - 1].element,
schemaPath,
codec: meta_data.codec,
compressors,
utf8,
}
}
}))
return readColumn(reader, groupPlan, columnDecoder, options.onPage)
}),
})
}
await Promise.all(promises)
if (options.onComplete) {
const includedColumnNames = children
.map(child => child.element.name)
.filter(name => !columns || columns.includes(name))
const columnOrder = columns || includedColumnNames
const includedColumns = columnOrder
.map(name => includedColumnNames.includes(name) ? flatten(subcolumnData.get(name)) : undefined)
// transpose columns into rows
const groupData = new Array(selectEnd)
for (let row = selectStart; row < selectEnd; row++) {
if (options.rowFormat === 'object') {
// return each row as an object
/** @type {Record<string, any>} */
const rowData = {}
for (let i = 0; i < columnOrder.length; i++) {
rowData[columnOrder[i]] = includedColumns[i]?.[row]
}
groupData[row] = rowData
} else {
// return each row as an array
groupData[row] = includedColumns.map(column => column?.[row])
}
}
return groupData
}
return []
return { groupStart: groupPlan.groupStart, groupRows: groupPlan.groupRows, asyncColumns }
}
/**
* Return a list of sub-columns needed to construct a top-level column.
*
* @import {DecodedArray, ParquetReadOptions, RowGroup, RowGroupSelect, SchemaTree} from '../src/types.d.ts'
* @param {SchemaTree} schema
* @param {string[]} output
* @returns {string[]}
* @param {AsyncRowGroup} asyncGroup
* @param {number} selectStart
* @param {number} selectEnd
* @param {string[] | undefined} columns
* @param {'object' | 'array'} [rowFormat]
* @returns {Promise<Record<string, any>[]>} resolves to row data
*/
function getSubcolumns(schema, output = []) {
if (schema.children.length) {
for (const child of schema.children) {
getSubcolumns(child, output)
export async function asyncGroupToRows({ asyncColumns }, selectStart, selectEnd, columns, rowFormat) {
const groupData = new Array(selectEnd)
// columnData[i] for asyncColumns[i]
// TODO: do it without flatten
const columnDatas = await Promise.all(asyncColumns.map(({ data }) => data.then(flatten)))
// careful mapping of column order for rowFormat: array
const includedColumnNames = asyncColumns
.map(child => child.pathInSchema[0])
.filter(name => !columns || columns.includes(name))
const columnOrder = columns ?? includedColumnNames
const columnIndexes = columnOrder.map(name => asyncColumns.findIndex(column => column.pathInSchema[0] === name))
// transpose columns into rows
for (let row = selectStart; row < selectEnd; row++) {
if (rowFormat === 'object') {
// return each row as an object
/** @type {Record<string, any>} */
const rowData = {}
for (let i = 0; i < asyncColumns.length; i++) {
rowData[asyncColumns[i].pathInSchema[0]] = columnDatas[i][row]
}
groupData[row] = rowData
} else {
// return each row as an array
const rowData = new Array(asyncColumns.length)
for (let i = 0; i < columnOrder.length; i++) {
if (columnIndexes[i] >= 0) {
rowData[i] = columnDatas[columnIndexes[i]][row]
}
}
groupData[row] = rowData
}
} else {
output.push(schema.path.join('.'))
}
return output
return groupData
}
/**
* Assemble physical columns into top-level columns asynchronously.
*
* @param {AsyncRowGroup} asyncRowGroup
* @param {SchemaTree} schemaTree
* @returns {AsyncRowGroup}
*/
export function assembleAsync(asyncRowGroup, schemaTree) {
const { asyncColumns } = asyncRowGroup
/** @type {AsyncColumn[]} */
const assembled = []
for (const child of schemaTree.children) {
if (child.children.length) {
const childColumns = asyncColumns.filter(column => column.pathInSchema[0] === child.element.name)
if (!childColumns.length) continue
// wait for all child columns to be read
/** @type {Map<string, DecodedArray>} */
const flatData = new Map()
const data = Promise.all(childColumns.map(column => {
return column.data.then(columnData => {
flatData.set(column.pathInSchema.join('.'), flatten(columnData))
})
})).then(() => {
// assemble the column
assembleNested(flatData, child)
const flatColumn = flatData.get(child.path.join('.'))
if (!flatColumn) throw new Error('parquet column data not assembled')
return [flatColumn]
})
assembled.push({ pathInSchema: child.path, data })
} else {
// leaf node, return the column
const asyncColumn = asyncColumns.find(column => column.pathInSchema[0] === child.element.name)
if (asyncColumn) {
assembled.push(asyncColumn)
}
}
}
return { ...asyncRowGroup, asyncColumns: assembled }
}

10
src/types.d.ts vendored

@ -419,3 +419,13 @@ export interface RowGroupSelect {
selectEnd: number // row index in the group to stop reading
groupRows: number
}
export interface AsyncColumn {
pathInSchema: string[]
data: Promise<DecodedArray[]>
}
export interface AsyncRowGroup {
groupStart: number
groupRows: number
asyncColumns: AsyncColumn[]
}