Fix onComplete return type (#104)

* attempt to fix #28

* remove breaking changes

* loosen the types a bit, but no breaking change

* fix format and doc

* fix format

* fix format

* 'remove unused import and add space

Co-authored-by: Mario <mario@autarc.energy>
This commit is contained in:
Sylvain Lesage 2025-08-22 15:09:28 -04:00 committed by GitHub
parent d8a9317875
commit 49bd895fb5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 25 additions and 11 deletions

@ -9,7 +9,7 @@ import { equals } from './utils.js'
* Note that using orderBy may SIGNIFICANTLY increase the query time.
*
* @param {ParquetReadOptions & { filter?: ParquetQueryFilter, orderBy?: string }} options
* @returns {Promise<Record<string, any>[]>} resolves when all requested rows and columns are parsed
* @returns {Promise<Record<string, any>[] | any[][]>} resolves when all requested rows and columns are parsed
*/
export async function parquetQuery(options) {
if (!options.file || !(options.file.byteLength >= 0)) {
@ -45,12 +45,15 @@ export async function parquetQuery(options) {
for (const group of metadata.row_groups) {
const groupEnd = groupStart + Number(group.num_rows)
// TODO: if expected > group size, start fetching next groups
const groupData = await parquetReadObjects({
// eslint-disable-next-line no-extra-parens
const groupData = /** @type {Record<string,any>[]} */ (await parquetReadObjects({
...options,
rowFormat: 'object',
rowStart: groupStart,
rowEnd: groupEnd,
columns: relevantColumns,
})
}))
for (const row of groupData) {
if (matchQuery(row, filter)) {
if (requiresProjection && relevantColumns) {
@ -69,12 +72,15 @@ export async function parquetQuery(options) {
return filteredRows.slice(rowStart, rowEnd)
} else if (filter) {
// read all rows, sort, and filter
const results = await parquetReadObjects({
// eslint-disable-next-line no-extra-parens
const results = /** @type {Record<string,any>[]} */ (await parquetReadObjects({
...options,
rowFormat: 'object',
rowStart: undefined,
rowEnd: undefined,
columns: relevantColumns,
})
}))
if (orderBy) results.sort((a, b) => compare(a[orderBy], b[orderBy]))
const filteredRows = new Array()
for (const row of results) {
@ -112,7 +118,7 @@ export async function parquetQuery(options) {
* Returns a sparse array of rows.
* @import {ParquetQueryFilter, ParquetReadOptions} from '../src/types.d.ts'
* @param {ParquetReadOptions & { rows: number[] }} options
* @returns {Promise<Record<string, any>[]>}
* @returns {Promise<Record<string, any>[] | any[][]>}
*/
async function parquetReadRows(options) {
const { file, rows } = options

@ -126,7 +126,8 @@ export async function parquetReadColumn(options) {
* It is a wrapper around the more configurable parquetRead function.
*
* @param {Omit<ParquetReadOptions, 'onComplete'>} options
* @returns {Promise<Record<string, any>[]>} resolves when all requested rows and columns are parsed
* @returns {Promise<Record<string, any>[] | any[][]>} resolves when all requested rows and columns are parsed.
* Resolves to an array of arrays if 'rowFormat' is 'array', else to an array of objects (default).
*/
export function parquetReadObjects(options) {
return new Promise((onComplete, reject) => {

15
src/types.d.ts vendored

@ -15,23 +15,30 @@ export interface MetadataOptions {
parsers?: ParquetParsers // custom parsers to decode advanced types
}
interface ArrayRowFormat {
rowFormat?: 'array' // format of each row passed to the onComplete function. Can be omitted, as it's the default.
onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed
}
interface ObjectRowFormat {
rowFormat: 'object' // format of each row passed to the onComplete function
onComplete?: (rows: Record<string, any>[]) => void // called when all requested rows and columns are parsed
}
/**
* Parquet query options for reading data
*/
export interface ParquetReadOptions {
export type ParquetReadOptions = {
file: AsyncBuffer // file-like object containing parquet data
metadata?: FileMetaData // parquet metadata, will be parsed if not provided
columns?: string[] // columns to read, all columns if undefined
rowFormat?: 'object' | 'array' // format of each row passed to the onComplete function
rowStart?: number // first requested row index (inclusive)
rowEnd?: number // last requested row index (exclusive)
onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may contain data outside the requested range.
onPage?: (chunk: ColumnData) => void // called when a data page is parsed. pages may contain data outside the requested range.
onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed
compressors?: Compressors // custom decompressors
utf8?: boolean // decode byte arrays as utf8 strings (default true)
parsers?: ParquetParsers // custom parsers to decode advanced types
}
} & (ArrayRowFormat | ObjectRowFormat)
/**
* Parquet query options for filtering data