try to fix the types again (#120)

* try to fix the types again

* fix test (breaking)

* [breaking] only support object format for parquetReadObjects and parquetQuery

* remove internal types

* remove redundant test

* override __index__ with original data if present

Also: add comments to explain special cases.

* remove the need to slice arrays

* loosen the types to avoid code duplication

* always write the index, because the results should be consistent

* Revert "always write the index, because the results should be consistent"

This reverts commit fd4e3060674fa6e81bd32fc894d7c366103e004a.
This commit is contained in:
Sylvain Lesage 2025-09-16 18:29:44 -04:00 committed by GitHub
parent 6f1b0b53e4
commit c6429d5abe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 75 additions and 57 deletions

@ -2,13 +2,16 @@ import { parquetMetadataAsync, parquetSchema } from './metadata.js'
import { parquetReadColumn, parquetReadObjects } from './read.js'
import { equals } from './utils.js'
/**
* @import {ParquetQueryFilter, BaseParquetReadOptions} from '../src/types.js'
*/
/**
* Wraps parquetRead with filter and orderBy support.
* This is a parquet-aware query engine that can read a subset of rows and columns.
* Accepts optional filter object to filter the results and orderBy column name to sort the results.
* Note that using orderBy may SIGNIFICANTLY increase the query time.
*
* @param {ParquetReadOptions & { filter?: ParquetQueryFilter, orderBy?: string }} options
* @param {BaseParquetReadOptions & { filter?: ParquetQueryFilter, orderBy?: string }} options
* @returns {Promise<Record<string, any>[]>} resolves when all requested rows and columns are parsed
*/
export async function parquetQuery(options) {
@ -40,6 +43,7 @@ export async function parquetQuery(options) {
if (filter && !orderBy && rowEnd < metadata.num_rows) {
// iterate through row groups and filter until we have enough rows
/** @type {Record<string, any>[]} */
const filteredRows = new Array()
let groupStart = 0
for (const group of metadata.row_groups) {
@ -47,7 +51,6 @@ export async function parquetQuery(options) {
// TODO: if expected > group size, start fetching next groups
const groupData = await parquetReadObjects({
...options,
rowFormat: 'object',
rowStart: groupStart,
rowEnd: groupEnd,
columns: relevantColumns,
@ -72,12 +75,12 @@ export async function parquetQuery(options) {
// read all rows, sort, and filter
const results = await parquetReadObjects({
...options,
rowFormat: 'object',
rowStart: undefined,
rowEnd: undefined,
columns: relevantColumns,
})
if (orderBy) results.sort((a, b) => compare(a[orderBy], b[orderBy]))
/** @type {Record<string, any>[]} */
const filteredRows = new Array()
for (const row of results) {
if (matchQuery(row, filter)) {
@ -102,6 +105,8 @@ export async function parquetQuery(options) {
.slice(rowStart, rowEnd)
const sparseData = await parquetReadRows({ ...options, rows: sortedIndices })
// warning: the type Record<string, any> & {__index__: number})[] is simplified into Record<string, any>[]
// when returning. The data contains the __index__ property, but it's not exposed as such.
const data = sortedIndices.map(index => sparseData[index])
return data
} else {
@ -112,9 +117,8 @@ export async function parquetQuery(options) {
/**
* Reads a list rows from a parquet file, reading only the row groups that contain the rows.
* Returns a sparse array of rows.
* @import {ParquetQueryFilter, ParquetReadOptions} from '../src/types.d.ts'
* @param {ParquetReadOptions & { rows: number[] }} options
* @returns {Promise<Record<string, any>[]>}
* @param {BaseParquetReadOptions & { rows: number[] }} options
* @returns {Promise<(Record<string, any> & {__index__: number})[]>}
*/
async function parquetReadRows(options) {
const { file, rows } = options
@ -152,13 +156,14 @@ async function parquetReadRows(options) {
}
// Fetch by row group and map to rows
/** @type {(Record<string, any> & {__index__: number})[]} */
const sparseData = new Array(Number(options.metadata.num_rows))
for (const [rangeStart, rangeEnd] of rowRanges) {
// TODO: fetch in parallel
const groupData = await parquetReadObjects({ ...options, rowStart: rangeStart, rowEnd: rangeEnd })
for (let i = rangeStart; i < rangeEnd; i++) {
sparseData[i] = groupData[i - rangeStart]
sparseData[i].__index__ = i
// warning: if the row contains a column named __index__, it will overwrite the index.
sparseData[i] = { __index__: i, ...groupData[i - rangeStart] }
}
}
return sparseData

@ -4,7 +4,7 @@ import { assembleAsync, asyncGroupToRows, readRowGroup } from './rowgroup.js'
import { concat, flatten } from './utils.js'
/**
* @import {AsyncRowGroup, DecodedArray, ParquetReadOptions} from '../src/types.js'
* @import {AsyncRowGroup, DecodedArray, ParquetReadOptions, BaseParquetReadOptions} from '../src/types.js'
*/
/**
* Read parquet data rows from a file-like object.
@ -61,15 +61,18 @@ export async function parquetRead(options) {
// onComplete transpose column chunks to rows
if (onComplete) {
/** @type {any[][]} */
// loosen the types to avoid duplicate code
/** @type {any[]} */
const rows = []
for (const asyncGroup of assembled) {
// filter to rows in range
const selectStart = Math.max(rowStart - asyncGroup.groupStart, 0)
const selectEnd = Math.min((rowEnd ?? Infinity) - asyncGroup.groupStart, asyncGroup.groupRows)
// transpose column chunks to rows in output
const groupData = await asyncGroupToRows(asyncGroup, selectStart, selectEnd, columns, rowFormat)
concat(rows, groupData.slice(selectStart, selectEnd))
const groupData = rowFormat === 'object' ?
await asyncGroupToRows(asyncGroup, selectStart, selectEnd, columns, 'object') :
await asyncGroupToRows(asyncGroup, selectStart, selectEnd, columns, 'array')
concat(rows, groupData)
}
onComplete(rows)
} else {
@ -99,7 +102,7 @@ export function parquetReadAsync(options) {
/**
* Reads a single column from a parquet file.
*
* @param {ParquetReadOptions} options
* @param {BaseParquetReadOptions} options
* @returns {Promise<DecodedArray>}
*/
export async function parquetReadColumn(options) {
@ -127,12 +130,12 @@ export async function parquetReadColumn(options) {
*
* @param {Omit<ParquetReadOptions, 'onComplete'>} options
* @returns {Promise<Record<string, any>[]>} resolves when all requested rows and columns are parsed
*/
*/
export function parquetReadObjects(options) {
return new Promise((onComplete, reject) => {
parquetRead({
rowFormat: 'object',
...options,
rowFormat: 'object', // force object output
onComplete,
}).catch(reject)
})

@ -73,17 +73,33 @@ export function readRowGroup(options, { metadata, columns }, groupPlan) {
return { groupStart: groupPlan.groupStart, groupRows: groupPlan.groupRows, asyncColumns }
}
/**
* @overload
* @param {AsyncRowGroup} asyncGroup
* @param {number} selectStart
* @param {number} selectEnd
* @param {string[] | undefined} columns
* @param {'object'} rowFormat
* @returns {Promise<Record<string, any>[]>} resolves to row data
*/
/**
* @overload
* @param {AsyncRowGroup} asyncGroup
* @param {number} selectStart
* @param {number} selectEnd
* @param {string[] | undefined} columns
* @param {'array'} [rowFormat]
* @returns {Promise<any[][]>} resolves to row data
*/
/**
* @param {AsyncRowGroup} asyncGroup
* @param {number} selectStart
* @param {number} selectEnd
* @param {string[] | undefined} columns
* @param {'object' | 'array'} [rowFormat]
* @returns {Promise<Record<string, any>[]>} resolves to row data
* @returns {Promise<Record<string, any>[] | any[][]>} resolves to row data
*/
export async function asyncGroupToRows({ asyncColumns }, selectStart, selectEnd, columns, rowFormat) {
const groupData = new Array(selectEnd)
// columnData[i] for asyncColumns[i]
// TODO: do it without flatten
const columnDatas = await Promise.all(asyncColumns.map(({ data }) => data.then(flatten)))
@ -96,25 +112,35 @@ export async function asyncGroupToRows({ asyncColumns }, selectStart, selectEnd,
const columnIndexes = columnOrder.map(name => asyncColumns.findIndex(column => column.pathInSchema[0] === name))
// transpose columns into rows
for (let row = selectStart; row < selectEnd; row++) {
if (rowFormat === 'object') {
const selectCount = selectEnd - selectStart
if (rowFormat === 'object') {
/** @type {Record<string, any>[]} */
const groupData = new Array(selectCount)
for (let selectRow = 0; selectRow < selectCount; selectRow++) {
const row = selectStart + selectRow
// return each row as an object
/** @type {Record<string, any>} */
const rowData = {}
for (let i = 0; i < asyncColumns.length; i++) {
rowData[asyncColumns[i].pathInSchema[0]] = columnDatas[i][row]
}
groupData[row] = rowData
} else {
// return each row as an array
const rowData = new Array(asyncColumns.length)
for (let i = 0; i < columnOrder.length; i++) {
if (columnIndexes[i] >= 0) {
rowData[i] = columnDatas[columnIndexes[i]][row]
}
}
groupData[row] = rowData
groupData[selectRow] = rowData
}
return groupData
}
/** @type {any[][]} */
const groupData = new Array(selectCount)
for (let selectRow = 0; selectRow < selectCount; selectRow++) {
const row = selectStart + selectRow
// return each row as an array
const rowData = new Array(asyncColumns.length)
for (let i = 0; i < columnOrder.length; i++) {
if (columnIndexes[i] >= 0) {
rowData[i] = columnDatas[columnIndexes[i]][row]
}
}
groupData[selectRow] = rowData
}
return groupData
}

14
src/types.d.ts vendored

@ -18,21 +18,29 @@ export interface MetadataOptions {
/**
* Parquet query options for reading data
*/
export interface ParquetReadOptions {
export interface BaseParquetReadOptions {
file: AsyncBuffer // file-like object containing parquet data
metadata?: FileMetaData // parquet metadata, will be parsed if not provided
columns?: string[] // columns to read, all columns if undefined
rowFormat?: 'object' | 'array' // format of each row passed to the onComplete function
rowStart?: number // first requested row index (inclusive)
rowEnd?: number // last requested row index (exclusive)
onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may contain data outside the requested range.
onPage?: (chunk: ColumnData) => void // called when a data page is parsed. pages may contain data outside the requested range.
onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed
compressors?: Compressors // custom decompressors
utf8?: boolean // decode byte arrays as utf8 strings (default true)
parsers?: ParquetParsers // custom parsers to decode advanced types
}
interface ArrayRowFormat {
rowFormat?: 'array' // format of each row passed to the onComplete function. Can be omitted, as it's the default.
onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed
}
interface ObjectRowFormat {
rowFormat: 'object' // format of each row passed to the onComplete function
onComplete?: (rows: Record<string, any>[]) => void // called when all requested rows and columns are parsed
}
export type ParquetReadOptions = BaseParquetReadOptions & (ArrayRowFormat | ObjectRowFormat)
/**
* Parquet query options for filtering data
*/

@ -22,18 +22,6 @@ describe('parquetQuery', () => {
])
})
it('returns rows in "array" format if asked', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, rowFormat: 'array' })
expect(rows).toEqual([
[ 'abc', 1, 2, true, [1, 2, 3] ],
[ 'abc', 2, 3, true, undefined ],
[ 'abc', 3, 4, true, undefined ],
[ null, 4, 5, false, [1, 2, 3] ],
[ 'abc', 5, 2, true, [1, 2] ],
])
})
it('reads data with orderBy', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, orderBy: 'c' })
@ -75,18 +63,6 @@ describe('parquetQuery', () => {
])
})
it('always returns rows in "object" format if filter is provided', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const expected = [
{ a: 'abc', b: 1, c: 2, d: true, e: [ 1, 2, 3 ] },
{ a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] },
]
const filter = { c: { $eq: 2 } }
expect(await parquetQuery({ file, filter, rowFormat: 'array' })).toEqual(expected)
expect(await parquetQuery({ file, filter, rowFormat: 'object' })).toEqual(expected)
expect(await parquetQuery({ file, filter })).toEqual(expected)
})
it('reads data with filter and rowStart/rowEnd', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { c: { $eq: 2 } }, rowStart: 1, rowEnd: 5 })