diff --git a/src/query.js b/src/query.js index b82fe7e..76a0934 100644 --- a/src/query.js +++ b/src/query.js @@ -2,13 +2,16 @@ import { parquetMetadataAsync, parquetSchema } from './metadata.js' import { parquetReadColumn, parquetReadObjects } from './read.js' import { equals } from './utils.js' +/** + * @import {ParquetQueryFilter, BaseParquetReadOptions} from '../src/types.js' + */ /** * Wraps parquetRead with filter and orderBy support. * This is a parquet-aware query engine that can read a subset of rows and columns. * Accepts optional filter object to filter the results and orderBy column name to sort the results. * Note that using orderBy may SIGNIFICANTLY increase the query time. * - * @param {ParquetReadOptions & { filter?: ParquetQueryFilter, orderBy?: string }} options + * @param {BaseParquetReadOptions & { filter?: ParquetQueryFilter, orderBy?: string }} options * @returns {Promise[]>} resolves when all requested rows and columns are parsed */ export async function parquetQuery(options) { @@ -40,6 +43,7 @@ export async function parquetQuery(options) { if (filter && !orderBy && rowEnd < metadata.num_rows) { // iterate through row groups and filter until we have enough rows + /** @type {Record[]} */ const filteredRows = new Array() let groupStart = 0 for (const group of metadata.row_groups) { @@ -47,7 +51,6 @@ export async function parquetQuery(options) { // TODO: if expected > group size, start fetching next groups const groupData = await parquetReadObjects({ ...options, - rowFormat: 'object', rowStart: groupStart, rowEnd: groupEnd, columns: relevantColumns, @@ -72,12 +75,12 @@ export async function parquetQuery(options) { // read all rows, sort, and filter const results = await parquetReadObjects({ ...options, - rowFormat: 'object', rowStart: undefined, rowEnd: undefined, columns: relevantColumns, }) if (orderBy) results.sort((a, b) => compare(a[orderBy], b[orderBy])) + /** @type {Record[]} */ const filteredRows = new Array() for (const row of results) { if (matchQuery(row, filter)) { @@ -102,6 +105,8 @@ export async function parquetQuery(options) { .slice(rowStart, rowEnd) const sparseData = await parquetReadRows({ ...options, rows: sortedIndices }) + // warning: the type Record & {__index__: number})[] is simplified into Record[] + // when returning. The data contains the __index__ property, but it's not exposed as such. const data = sortedIndices.map(index => sparseData[index]) return data } else { @@ -112,9 +117,8 @@ export async function parquetQuery(options) { /** * Reads a list rows from a parquet file, reading only the row groups that contain the rows. * Returns a sparse array of rows. - * @import {ParquetQueryFilter, ParquetReadOptions} from '../src/types.d.ts' - * @param {ParquetReadOptions & { rows: number[] }} options - * @returns {Promise[]>} + * @param {BaseParquetReadOptions & { rows: number[] }} options + * @returns {Promise<(Record & {__index__: number})[]>} */ async function parquetReadRows(options) { const { file, rows } = options @@ -152,13 +156,14 @@ async function parquetReadRows(options) { } // Fetch by row group and map to rows + /** @type {(Record & {__index__: number})[]} */ const sparseData = new Array(Number(options.metadata.num_rows)) for (const [rangeStart, rangeEnd] of rowRanges) { // TODO: fetch in parallel const groupData = await parquetReadObjects({ ...options, rowStart: rangeStart, rowEnd: rangeEnd }) for (let i = rangeStart; i < rangeEnd; i++) { - sparseData[i] = groupData[i - rangeStart] - sparseData[i].__index__ = i + // warning: if the row contains a column named __index__, it will overwrite the index. + sparseData[i] = { __index__: i, ...groupData[i - rangeStart] } } } return sparseData diff --git a/src/read.js b/src/read.js index bc3ec77..7d3e7d6 100644 --- a/src/read.js +++ b/src/read.js @@ -4,7 +4,7 @@ import { assembleAsync, asyncGroupToRows, readRowGroup } from './rowgroup.js' import { concat, flatten } from './utils.js' /** - * @import {AsyncRowGroup, DecodedArray, ParquetReadOptions} from '../src/types.js' + * @import {AsyncRowGroup, DecodedArray, ParquetReadOptions, BaseParquetReadOptions} from '../src/types.js' */ /** * Read parquet data rows from a file-like object. @@ -61,15 +61,18 @@ export async function parquetRead(options) { // onComplete transpose column chunks to rows if (onComplete) { - /** @type {any[][]} */ + // loosen the types to avoid duplicate code + /** @type {any[]} */ const rows = [] for (const asyncGroup of assembled) { // filter to rows in range const selectStart = Math.max(rowStart - asyncGroup.groupStart, 0) const selectEnd = Math.min((rowEnd ?? Infinity) - asyncGroup.groupStart, asyncGroup.groupRows) // transpose column chunks to rows in output - const groupData = await asyncGroupToRows(asyncGroup, selectStart, selectEnd, columns, rowFormat) - concat(rows, groupData.slice(selectStart, selectEnd)) + const groupData = rowFormat === 'object' ? + await asyncGroupToRows(asyncGroup, selectStart, selectEnd, columns, 'object') : + await asyncGroupToRows(asyncGroup, selectStart, selectEnd, columns, 'array') + concat(rows, groupData) } onComplete(rows) } else { @@ -99,7 +102,7 @@ export function parquetReadAsync(options) { /** * Reads a single column from a parquet file. * - * @param {ParquetReadOptions} options + * @param {BaseParquetReadOptions} options * @returns {Promise} */ export async function parquetReadColumn(options) { @@ -127,12 +130,12 @@ export async function parquetReadColumn(options) { * * @param {Omit} options * @returns {Promise[]>} resolves when all requested rows and columns are parsed -*/ + */ export function parquetReadObjects(options) { return new Promise((onComplete, reject) => { parquetRead({ - rowFormat: 'object', ...options, + rowFormat: 'object', // force object output onComplete, }).catch(reject) }) diff --git a/src/rowgroup.js b/src/rowgroup.js index 916f556..ecdea1b 100644 --- a/src/rowgroup.js +++ b/src/rowgroup.js @@ -73,17 +73,33 @@ export function readRowGroup(options, { metadata, columns }, groupPlan) { return { groupStart: groupPlan.groupStart, groupRows: groupPlan.groupRows, asyncColumns } } +/** + * @overload + * @param {AsyncRowGroup} asyncGroup + * @param {number} selectStart + * @param {number} selectEnd + * @param {string[] | undefined} columns + * @param {'object'} rowFormat + * @returns {Promise[]>} resolves to row data + */ +/** + * @overload + * @param {AsyncRowGroup} asyncGroup + * @param {number} selectStart + * @param {number} selectEnd + * @param {string[] | undefined} columns + * @param {'array'} [rowFormat] + * @returns {Promise} resolves to row data + */ /** * @param {AsyncRowGroup} asyncGroup * @param {number} selectStart * @param {number} selectEnd * @param {string[] | undefined} columns * @param {'object' | 'array'} [rowFormat] - * @returns {Promise[]>} resolves to row data + * @returns {Promise[] | any[][]>} resolves to row data */ export async function asyncGroupToRows({ asyncColumns }, selectStart, selectEnd, columns, rowFormat) { - const groupData = new Array(selectEnd) - // columnData[i] for asyncColumns[i] // TODO: do it without flatten const columnDatas = await Promise.all(asyncColumns.map(({ data }) => data.then(flatten))) @@ -96,25 +112,35 @@ export async function asyncGroupToRows({ asyncColumns }, selectStart, selectEnd, const columnIndexes = columnOrder.map(name => asyncColumns.findIndex(column => column.pathInSchema[0] === name)) // transpose columns into rows - for (let row = selectStart; row < selectEnd; row++) { - if (rowFormat === 'object') { + const selectCount = selectEnd - selectStart + if (rowFormat === 'object') { + /** @type {Record[]} */ + const groupData = new Array(selectCount) + for (let selectRow = 0; selectRow < selectCount; selectRow++) { + const row = selectStart + selectRow // return each row as an object /** @type {Record} */ const rowData = {} for (let i = 0; i < asyncColumns.length; i++) { rowData[asyncColumns[i].pathInSchema[0]] = columnDatas[i][row] } - groupData[row] = rowData - } else { - // return each row as an array - const rowData = new Array(asyncColumns.length) - for (let i = 0; i < columnOrder.length; i++) { - if (columnIndexes[i] >= 0) { - rowData[i] = columnDatas[columnIndexes[i]][row] - } - } - groupData[row] = rowData + groupData[selectRow] = rowData } + return groupData + } + + /** @type {any[][]} */ + const groupData = new Array(selectCount) + for (let selectRow = 0; selectRow < selectCount; selectRow++) { + const row = selectStart + selectRow + // return each row as an array + const rowData = new Array(asyncColumns.length) + for (let i = 0; i < columnOrder.length; i++) { + if (columnIndexes[i] >= 0) { + rowData[i] = columnDatas[columnIndexes[i]][row] + } + } + groupData[selectRow] = rowData } return groupData } diff --git a/src/types.d.ts b/src/types.d.ts index ed7f6f8..ea2b446 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -18,21 +18,29 @@ export interface MetadataOptions { /** * Parquet query options for reading data */ -export interface ParquetReadOptions { +export interface BaseParquetReadOptions { file: AsyncBuffer // file-like object containing parquet data metadata?: FileMetaData // parquet metadata, will be parsed if not provided columns?: string[] // columns to read, all columns if undefined - rowFormat?: 'object' | 'array' // format of each row passed to the onComplete function rowStart?: number // first requested row index (inclusive) rowEnd?: number // last requested row index (exclusive) onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may contain data outside the requested range. onPage?: (chunk: ColumnData) => void // called when a data page is parsed. pages may contain data outside the requested range. - onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed compressors?: Compressors // custom decompressors utf8?: boolean // decode byte arrays as utf8 strings (default true) parsers?: ParquetParsers // custom parsers to decode advanced types } +interface ArrayRowFormat { + rowFormat?: 'array' // format of each row passed to the onComplete function. Can be omitted, as it's the default. + onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed +} +interface ObjectRowFormat { + rowFormat: 'object' // format of each row passed to the onComplete function + onComplete?: (rows: Record[]) => void // called when all requested rows and columns are parsed +} +export type ParquetReadOptions = BaseParquetReadOptions & (ArrayRowFormat | ObjectRowFormat) + /** * Parquet query options for filtering data */ diff --git a/test/query.test.js b/test/query.test.js index eeb478d..2b3ae02 100644 --- a/test/query.test.js +++ b/test/query.test.js @@ -22,18 +22,6 @@ describe('parquetQuery', () => { ]) }) - it('returns rows in "array" format if asked', async () => { - const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') - const rows = await parquetQuery({ file, rowFormat: 'array' }) - expect(rows).toEqual([ - [ 'abc', 1, 2, true, [1, 2, 3] ], - [ 'abc', 2, 3, true, undefined ], - [ 'abc', 3, 4, true, undefined ], - [ null, 4, 5, false, [1, 2, 3] ], - [ 'abc', 5, 2, true, [1, 2] ], - ]) - }) - it('reads data with orderBy', async () => { const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') const rows = await parquetQuery({ file, orderBy: 'c' }) @@ -75,18 +63,6 @@ describe('parquetQuery', () => { ]) }) - it('always returns rows in "object" format if filter is provided', async () => { - const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') - const expected = [ - { a: 'abc', b: 1, c: 2, d: true, e: [ 1, 2, 3 ] }, - { a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] }, - ] - const filter = { c: { $eq: 2 } } - expect(await parquetQuery({ file, filter, rowFormat: 'array' })).toEqual(expected) - expect(await parquetQuery({ file, filter, rowFormat: 'object' })).toEqual(expected) - expect(await parquetQuery({ file, filter })).toEqual(expected) - }) - it('reads data with filter and rowStart/rowEnd', async () => { const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') const rows = await parquetQuery({ file, filter: { c: { $eq: 2 } }, rowStart: 1, rowEnd: 5 })