From e4504c524d13737122c7def988052730b4d7b17f Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Mon, 19 May 2025 02:13:37 -0700 Subject: [PATCH] Fast filter by loading each row group and filtering until rowEnd (#78) --- src/query.js | 44 ++++++++++++++++++++++++++++++-------------- src/read.js | 12 +++++------- test/helpers.js | 20 ++++++++++++++++++++ test/query.test.js | 15 +++++++++++++++ test/read.test.js | 23 +++-------------------- 5 files changed, 73 insertions(+), 41 deletions(-) diff --git a/src/query.js b/src/query.js index 7379ade..77f6b12 100644 --- a/src/query.js +++ b/src/query.js @@ -12,26 +12,42 @@ import { equals } from './utils.js' * @returns {Promise[]>} resolves when all requested rows and columns are parsed */ export async function parquetQuery(options) { - const { file, rowStart, rowEnd, orderBy, filter } = options - if (!file || !(file.byteLength >= 0)) { + if (!options.file || !(options.file.byteLength >= 0)) { throw new Error('parquetQuery expected file AsyncBuffer') } - options.metadata ||= await parquetMetadataAsync(file) + options.metadata ??= await parquetMetadataAsync(options.file) + const { metadata, rowStart = 0, orderBy, filter } = options + if (rowStart < 0) throw new Error('parquetQuery rowStart must be positive') + const rowEnd = options.rowEnd ?? Number(metadata.num_rows) - // TODO: Faster path for: no orderBy, no rowStart/rowEnd, one row group - - if (filter) { - // TODO: Move filter to parquetRead for performance - const results = await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined }) - return results + if (filter && !orderBy && rowEnd < metadata.num_rows) { + // iterate through row groups and filter until we have enough rows + const filteredRows = new Array() + let groupStart = 0 + for (const group of metadata.row_groups) { + const groupEnd = groupStart + Number(group.num_rows) + // TODO: if expected > group size, start fetching next groups + const groupData = await parquetReadObjects({ ...options, rowStart: groupStart, rowEnd: groupEnd }) + for (const row of groupData) { + if (matchQuery(row, filter)) { + filteredRows.push(row) + } + } + if (filteredRows.length >= rowEnd) break + groupStart = groupEnd + } + return filteredRows.slice(rowStart, rowEnd) + } else if (filter) { + // read all rows, sort, and filter + const results = (await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined })) .filter(row => matchQuery(row, filter)) - .sort((a, b) => orderBy ? compare(a[orderBy], b[orderBy]) : 0) - .slice(rowStart, rowEnd) + if (orderBy) results.sort((a, b) => compare(a[orderBy], b[orderBy])) + return results.slice(rowStart, rowEnd) } else if (typeof orderBy === 'string') { - // Fetch orderBy column first + // sorted but unfiltered: fetch orderBy column first const orderColumn = await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy] }) - // Compute row groups to fetch + // compute row groups to fetch const sortedIndices = Array.from(orderColumn, (_, index) => index) .sort((a, b) => compare(orderColumn[a][orderBy], orderColumn[b][orderBy])) .slice(rowStart, rowEnd) @@ -107,7 +123,7 @@ async function parquetReadRows(options) { function compare(a, b) { if (a < b) return -1 if (a > b) return 1 - return 1 // TODO: how to handle nulls? + return 0 // TODO: null handling } /** diff --git a/src/read.js b/src/read.js index 3a8587f..9c83683 100644 --- a/src/read.js +++ b/src/read.js @@ -9,12 +9,10 @@ import { concat } from './utils.js' * Read parquet data rows from a file-like object. * Reads the minimal number of row groups and columns to satisfy the request. * - * Returns a void promise when complete, and to throw errors. - * Data is returned in onComplete, not the return promise, because - * if onComplete is undefined, we parse the data, and emit chunks, but skip - * computing the row view directly. This saves on allocation if the caller - * wants to cache the full chunks, and make their own view of the data from - * the chunks. + * Returns a void promise when complete. + * Errors are thrown on the returned promise. + * Data is returned in callbacks onComplete, onChunk, onPage, NOT the return promise. + * See parquetReadObjects for a more convenient API. * * @param {ParquetReadOptions} options read options * @returns {Promise} resolves when all requested rows and columns are parsed, all errors are thrown here @@ -27,7 +25,7 @@ export async function parquetRead(options) { // load metadata if not provided options.metadata ??= await parquetMetadataAsync(options.file) const { metadata, onComplete, rowStart = 0, rowEnd } = options - if (rowStart < 0) throw new Error('parquetRead rowStart must be postive') + if (rowStart < 0) throw new Error('parquetRead rowStart must be positive') // prefetch byte ranges const plan = parquetPlan(options) diff --git a/test/helpers.js b/test/helpers.js index d76e324..0f89fff 100644 --- a/test/helpers.js +++ b/test/helpers.js @@ -21,3 +21,23 @@ export function fileToJson(filePath) { export function reader(bytes) { return { view: new DataView(new Uint8Array(bytes).buffer), offset: 0 } } + +/** + * Wraps an AsyncBuffer to count the number of fetches made + * + * @import {AsyncBuffer} from '../src/types.js' + * @param {AsyncBuffer} asyncBuffer + * @returns {AsyncBuffer & {fetches: number, bytes: number}} + */ +export function countingBuffer(asyncBuffer) { + return { + ...asyncBuffer, + fetches: 0, + bytes: 0, + slice(start, end) { + this.fetches++ + this.bytes += (end ?? asyncBuffer.byteLength) - start + return asyncBuffer.slice(start, end) + }, + } +} diff --git a/test/query.test.js b/test/query.test.js index aff7e3a..d46ded8 100644 --- a/test/query.test.js +++ b/test/query.test.js @@ -1,6 +1,7 @@ import { describe, expect, it } from 'vitest' import { parquetQuery } from '../src/query.js' import { asyncBufferFromFile } from '../src/utils.js' +import { countingBuffer } from './helpers.js' describe('parquetQuery', () => { it('throws error for undefined file', async () => { @@ -195,4 +196,18 @@ describe('parquetQuery', () => { { a: 'abc', b: 5, c: 2, d: true, e: [1, 2] }, ]) }) + + it('reads data efficiently with filter', async () => { + const file = countingBuffer(await asyncBufferFromFile('test/files/page_indexed.parquet')) + const rows = await parquetQuery({ file, filter: { quality: 'good' }, rowStart: 1, rowEnd: 5 }) + expect(rows).toEqual([ + { row: 10n, quality: 'good' }, + { row: 29n, quality: 'good' }, + { row: 32n, quality: 'good' }, + { row: 37n, quality: 'good' }, + ]) + // if we weren't streaming row groups, this would be 3: + expect(file.fetches).toBe(2) // 1 metadata, 1 row group + expect(file.bytes).toBe(5261) + }) }) diff --git a/test/read.test.js b/test/read.test.js index 212a34b..117482e 100644 --- a/test/read.test.js +++ b/test/read.test.js @@ -2,6 +2,7 @@ import { describe, expect, it, vi } from 'vitest' import { convertWithDictionary } from '../src/convert.js' import { parquetMetadataAsync, parquetRead, parquetReadObjects } from '../src/hyparquet.js' import { asyncBufferFromFile } from '../src/utils.js' +import { countingBuffer } from './helpers.js' vi.mock('../src/convert.js', { spy: true }) @@ -184,7 +185,7 @@ describe('parquetRead', () => { it('reads individual pages', async () => { const file = countingBuffer(await asyncBufferFromFile('test/files/page_indexed.parquet')) - /** @type {ColumnData[]} */ + /** @type {import('../src/types.js').ColumnData[]} */ const pages = [] // check onPage callback @@ -250,24 +251,6 @@ describe('parquetRead', () => { expect(page).toEqual(expected) } expect(file.fetches).toBe(3) // 1 metadata, 2 rowgroups + expect(file.bytes).toBe(6421) }) }) - -/** - * Wraps an AsyncBuffer to count the number of fetches made - * - * @import {AsyncBuffer, ColumnData} from '../src/types.js' - * @param {AsyncBuffer} asyncBuffer - * @returns {AsyncBuffer & {fetches: number}} - */ - -function countingBuffer(asyncBuffer) { - return { - ...asyncBuffer, - fetches: 0, - slice(start, end) { - this.fetches++ - return asyncBuffer.slice(start, end) - }, - } -}