hyparquet/src/read.js

225 lines
8.0 KiB
JavaScript
Raw Normal View History

2024-05-18 05:44:03 +00:00
import { assembleNested } from './assemble.js'
import { readColumn } from './column.js'
2024-01-15 19:08:48 +00:00
import { parquetMetadataAsync } from './metadata.js'
import { getColumnRange, parquetPlan, prefetchAsyncBuffer } from './plan.js'
2024-05-18 05:44:03 +00:00
import { getSchemaPath } from './schema.js'
2024-04-07 16:33:57 +00:00
import { concat } from './utils.js'
2024-01-15 19:08:48 +00:00
/**
* Read parquet data rows from a file-like object.
* Reads the minimal number of row groups and columns to satisfy the request.
*
* Returns a void promise when complete.
* Errors are thrown on the returned promise.
* Data is returned in callbacks onComplete, onChunk, onPage, NOT the return promise.
* See parquetReadObjects for a more convenient API.
2024-01-15 19:08:48 +00:00
*
* @param {ParquetReadOptions} options read options
2025-03-04 17:38:39 +00:00
* @returns {Promise<void>} resolves when all requested rows and columns are parsed, all errors are thrown here
2024-01-15 19:08:48 +00:00
*/
export async function parquetRead(options) {
2025-03-04 17:38:39 +00:00
if (!options.file || !(options.file.byteLength >= 0)) {
throw new Error('parquetRead expected file AsyncBuffer')
}
2024-01-15 19:08:48 +00:00
// load metadata if not provided
options.metadata ??= await parquetMetadataAsync(options.file)
const { metadata, onComplete, rowStart = 0, rowEnd } = options
if (rowStart < 0) throw new Error('parquetRead rowStart must be positive')
// prefetch byte ranges
const plan = parquetPlan(options)
options.file = prefetchAsyncBuffer(options.file, plan)
2024-01-15 19:08:48 +00:00
/** @type {any[][]} */
2024-04-07 16:33:57 +00:00
const rowData = []
2024-01-15 19:08:48 +00:00
// read row groups
2024-01-15 19:08:48 +00:00
let groupStart = 0 // first row index of the current group
for (const rowGroup of metadata.row_groups) {
// number of rows in this row group
const groupRows = Number(rowGroup.num_rows)
// if row group overlaps with row range, read it
2024-05-21 06:09:31 +00:00
if (groupStart + groupRows >= rowStart && (rowEnd === undefined || groupStart < rowEnd)) {
2024-01-15 19:08:48 +00:00
// read row group
const groupData = await readRowGroup(options, rowGroup, groupStart)
2024-01-15 19:08:48 +00:00
if (onComplete) {
// filter to rows in range
const start = Math.max(rowStart - groupStart, 0)
2024-05-21 06:09:31 +00:00
const end = rowEnd === undefined ? undefined : rowEnd - groupStart
2024-04-07 16:33:57 +00:00
concat(rowData, groupData.slice(start, end))
2024-01-15 19:08:48 +00:00
}
}
groupStart += groupRows
}
if (onComplete) onComplete(rowData)
}
/**
* Read a row group from a file-like object.
*
* @param {ParquetReadOptions} options read options
2024-01-15 19:08:48 +00:00
* @param {RowGroup} rowGroup row group to read
* @param {number} groupStart row index of the first row in the group
2024-01-15 19:08:48 +00:00
* @returns {Promise<any[][]>} resolves to row data
*/
export async function readRowGroup(options, rowGroup, groupStart) {
const { file, metadata, columns, rowStart = 0, rowEnd } = options
2024-01-15 19:08:48 +00:00
if (!metadata) throw new Error('parquet metadata not found')
const numRows = Number(rowGroup.num_rows)
// indexes within the group to read:
const selectStart = Math.max(rowStart - groupStart, 0)
const selectEnd = Math.min((rowEnd ?? Infinity) - groupStart, numRows)
/** @type {RowGroupSelect} */
const rowGroupSelect = { groupStart, selectStart, selectEnd, numRows }
2024-01-15 19:08:48 +00:00
/** @type {Promise<void>[]} */
2024-01-15 19:08:48 +00:00
const promises = []
// top-level columns to assemble
2024-05-18 05:44:03 +00:00
const { children } = getSchemaPath(metadata.schema, [])[0]
const subcolumnNames = new Map(children.map(child => [child.element.name, getSubcolumns(child)]))
/** @type {Map<string, DecodedArray[]>} */
2024-05-18 05:44:03 +00:00
const subcolumnData = new Map() // columns to assemble as maps
2024-01-15 19:08:48 +00:00
// read column data
for (const { file_path, meta_data } of rowGroup.columns) {
if (file_path) throw new Error('parquet file_path not supported')
if (!meta_data) throw new Error('parquet column metadata is undefined')
2024-03-14 22:39:00 +00:00
// skip columns that are not requested
const columnName = meta_data.path_in_schema[0]
2024-03-14 22:39:00 +00:00
if (columns && !columns.includes(columnName)) continue
const { startByte, endByte } = getColumnRange(meta_data)
const columnBytes = endByte - startByte
2024-01-20 21:52:36 +00:00
2024-01-15 19:08:48 +00:00
// skip columns larger than 1gb
// TODO: stream process the data, returning only the requested rows
2024-01-15 19:08:48 +00:00
if (columnBytes > 1 << 30) {
console.warn(`parquet skipping huge column "${meta_data.path_in_schema}" ${columnBytes} bytes`)
2024-03-12 02:35:57 +00:00
// TODO: set column to new Error('parquet column too large')
2024-01-15 19:08:48 +00:00
continue
}
2024-01-20 21:52:36 +00:00
// wrap awaitable to ensure it's a promise
2024-04-26 19:52:42 +00:00
/** @type {Promise<ArrayBuffer>} */
const buffer = Promise.resolve(file.slice(startByte, endByte))
2024-01-20 21:52:36 +00:00
2024-01-15 19:08:48 +00:00
// read column data async
promises.push(buffer.then(arrayBuffer => {
const schemaPath = getSchemaPath(metadata.schema, meta_data.path_in_schema)
const reader = { view: new DataView(arrayBuffer), offset: 0 }
const subcolumn = meta_data.path_in_schema.join('.')
const columnDecoder = {
columnName: subcolumn,
type: meta_data.type,
element: schemaPath[schemaPath.length - 1].element,
schemaPath,
codec: meta_data.codec,
compressors: options.compressors,
utf8: options.utf8,
}
/** @type {DecodedArray[] | undefined} */
2025-04-11 06:29:58 +00:00
let chunks = readColumn(reader, rowGroupSelect, columnDecoder, options.onPage)
// skip assembly if no onComplete or onChunk
if (!options.onComplete && !options.onChunk) return
2024-05-18 05:44:03 +00:00
// TODO: fast path for non-nested columns
// save column data for assembly
subcolumnData.set(subcolumn, chunks)
chunks = undefined
2024-05-18 05:44:03 +00:00
const subcolumns = subcolumnNames.get(columnName)
if (subcolumns?.every(name => subcolumnData.has(name))) {
// For every subcolumn, flatten and assemble the column
const flatData = new Map(subcolumns.map(name => [name, flatten(subcolumnData.get(name))]))
assembleNested(flatData, schemaPath[1])
const flatColumn = flatData.get(columnName)
2025-03-17 17:07:08 +00:00
if (!flatColumn) throw new Error(`parquet column data not assembled: ${columnName}`)
chunks = [flatColumn]
subcolumns.forEach(name => subcolumnData.delete(name))
subcolumnData.set(columnName, chunks)
2024-03-12 02:35:57 +00:00
}
// do not emit column data until structs are fully parsed
if (!chunks) return
2024-01-15 19:08:48 +00:00
// notify caller of column data
if (options.onChunk) {
2025-04-11 06:29:58 +00:00
for (const columnData of chunks) {
options.onChunk({
columnName,
2025-04-11 06:29:58 +00:00
columnData,
rowStart: groupStart,
2025-04-11 06:29:58 +00:00
rowEnd: groupStart + columnData.length,
})
}
}
2024-01-15 19:08:48 +00:00
}))
}
await Promise.all(promises)
2024-05-14 09:19:37 +00:00
if (options.onComplete) {
const includedColumnNames = children
2024-07-22 19:03:02 +00:00
.map(child => child.element.name)
.filter(name => !columns || columns.includes(name))
const columnOrder = columns || includedColumnNames
const includedColumns = columnOrder
.map(name => includedColumnNames.includes(name) ? flatten(subcolumnData.get(name)) : undefined)
2025-03-17 17:07:08 +00:00
// transpose columns into rows
const groupData = new Array(selectEnd)
for (let row = selectStart; row < selectEnd; row++) {
if (options.rowFormat === 'object') {
// return each row as an object
/** @type {Record<string, any>} */
const rowData = {}
2025-03-17 17:07:08 +00:00
for (let i = 0; i < columnOrder.length; i++) {
rowData[columnOrder[i]] = includedColumns[i]?.[row]
}
groupData[row] = rowData
} else {
// return each row as an array
groupData[row] = includedColumns.map(column => column?.[row])
}
2024-07-22 19:03:02 +00:00
}
return groupData
2024-01-15 19:08:48 +00:00
}
2024-05-14 09:19:37 +00:00
return []
2024-01-15 19:08:48 +00:00
}
2024-05-18 05:44:03 +00:00
/**
* Flatten a list of lists into a single list.
*
* @param {DecodedArray[] | undefined} chunks
* @returns {DecodedArray}
*/
function flatten(chunks) {
if (!chunks) return []
if (chunks.length === 1) return chunks[0]
/** @type {any[]} */
const output = []
for (const chunk of chunks) {
concat(output, chunk)
}
return output
}
2024-05-18 05:44:03 +00:00
/**
* Return a list of sub-columns needed to construct a top-level column.
*
* @import {DecodedArray, ParquetReadOptions, RowGroup, RowGroupSelect, SchemaTree} from '../src/types.d.ts'
* @param {SchemaTree} schema
2024-05-18 05:44:03 +00:00
* @param {string[]} output
* @returns {string[]}
*/
function getSubcolumns(schema, output = []) {
if (schema.children.length) {
for (const child of schema.children) {
getSubcolumns(child, output)
}
} else {
output.push(schema.path.join('.'))
}
return output
}