Fast filter by loading each row group and filtering until rowEnd (#78)

This commit is contained in:
Kenny Daniel 2025-05-19 02:13:37 -07:00 committed by GitHub
parent 6c1b49819b
commit e4504c524d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 73 additions and 41 deletions

@ -12,26 +12,42 @@ import { equals } from './utils.js'
* @returns {Promise<Record<string, any>[]>} resolves when all requested rows and columns are parsed
*/
export async function parquetQuery(options) {
const { file, rowStart, rowEnd, orderBy, filter } = options
if (!file || !(file.byteLength >= 0)) {
if (!options.file || !(options.file.byteLength >= 0)) {
throw new Error('parquetQuery expected file AsyncBuffer')
}
options.metadata ||= await parquetMetadataAsync(file)
options.metadata ??= await parquetMetadataAsync(options.file)
const { metadata, rowStart = 0, orderBy, filter } = options
if (rowStart < 0) throw new Error('parquetQuery rowStart must be positive')
const rowEnd = options.rowEnd ?? Number(metadata.num_rows)
// TODO: Faster path for: no orderBy, no rowStart/rowEnd, one row group
if (filter) {
// TODO: Move filter to parquetRead for performance
const results = await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined })
return results
if (filter && !orderBy && rowEnd < metadata.num_rows) {
// iterate through row groups and filter until we have enough rows
const filteredRows = new Array()
let groupStart = 0
for (const group of metadata.row_groups) {
const groupEnd = groupStart + Number(group.num_rows)
// TODO: if expected > group size, start fetching next groups
const groupData = await parquetReadObjects({ ...options, rowStart: groupStart, rowEnd: groupEnd })
for (const row of groupData) {
if (matchQuery(row, filter)) {
filteredRows.push(row)
}
}
if (filteredRows.length >= rowEnd) break
groupStart = groupEnd
}
return filteredRows.slice(rowStart, rowEnd)
} else if (filter) {
// read all rows, sort, and filter
const results = (await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined }))
.filter(row => matchQuery(row, filter))
.sort((a, b) => orderBy ? compare(a[orderBy], b[orderBy]) : 0)
.slice(rowStart, rowEnd)
if (orderBy) results.sort((a, b) => compare(a[orderBy], b[orderBy]))
return results.slice(rowStart, rowEnd)
} else if (typeof orderBy === 'string') {
// Fetch orderBy column first
// sorted but unfiltered: fetch orderBy column first
const orderColumn = await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy] })
// Compute row groups to fetch
// compute row groups to fetch
const sortedIndices = Array.from(orderColumn, (_, index) => index)
.sort((a, b) => compare(orderColumn[a][orderBy], orderColumn[b][orderBy]))
.slice(rowStart, rowEnd)
@ -107,7 +123,7 @@ async function parquetReadRows(options) {
function compare(a, b) {
if (a < b) return -1
if (a > b) return 1
return 1 // TODO: how to handle nulls?
return 0 // TODO: null handling
}
/**

@ -9,12 +9,10 @@ import { concat } from './utils.js'
* Read parquet data rows from a file-like object.
* Reads the minimal number of row groups and columns to satisfy the request.
*
* Returns a void promise when complete, and to throw errors.
* Data is returned in onComplete, not the return promise, because
* if onComplete is undefined, we parse the data, and emit chunks, but skip
* computing the row view directly. This saves on allocation if the caller
* wants to cache the full chunks, and make their own view of the data from
* the chunks.
* Returns a void promise when complete.
* Errors are thrown on the returned promise.
* Data is returned in callbacks onComplete, onChunk, onPage, NOT the return promise.
* See parquetReadObjects for a more convenient API.
*
* @param {ParquetReadOptions} options read options
* @returns {Promise<void>} resolves when all requested rows and columns are parsed, all errors are thrown here
@ -27,7 +25,7 @@ export async function parquetRead(options) {
// load metadata if not provided
options.metadata ??= await parquetMetadataAsync(options.file)
const { metadata, onComplete, rowStart = 0, rowEnd } = options
if (rowStart < 0) throw new Error('parquetRead rowStart must be postive')
if (rowStart < 0) throw new Error('parquetRead rowStart must be positive')
// prefetch byte ranges
const plan = parquetPlan(options)

@ -21,3 +21,23 @@ export function fileToJson(filePath) {
export function reader(bytes) {
return { view: new DataView(new Uint8Array(bytes).buffer), offset: 0 }
}
/**
* Wraps an AsyncBuffer to count the number of fetches made
*
* @import {AsyncBuffer} from '../src/types.js'
* @param {AsyncBuffer} asyncBuffer
* @returns {AsyncBuffer & {fetches: number, bytes: number}}
*/
export function countingBuffer(asyncBuffer) {
return {
...asyncBuffer,
fetches: 0,
bytes: 0,
slice(start, end) {
this.fetches++
this.bytes += (end ?? asyncBuffer.byteLength) - start
return asyncBuffer.slice(start, end)
},
}
}

@ -1,6 +1,7 @@
import { describe, expect, it } from 'vitest'
import { parquetQuery } from '../src/query.js'
import { asyncBufferFromFile } from '../src/utils.js'
import { countingBuffer } from './helpers.js'
describe('parquetQuery', () => {
it('throws error for undefined file', async () => {
@ -195,4 +196,18 @@ describe('parquetQuery', () => {
{ a: 'abc', b: 5, c: 2, d: true, e: [1, 2] },
])
})
it('reads data efficiently with filter', async () => {
const file = countingBuffer(await asyncBufferFromFile('test/files/page_indexed.parquet'))
const rows = await parquetQuery({ file, filter: { quality: 'good' }, rowStart: 1, rowEnd: 5 })
expect(rows).toEqual([
{ row: 10n, quality: 'good' },
{ row: 29n, quality: 'good' },
{ row: 32n, quality: 'good' },
{ row: 37n, quality: 'good' },
])
// if we weren't streaming row groups, this would be 3:
expect(file.fetches).toBe(2) // 1 metadata, 1 row group
expect(file.bytes).toBe(5261)
})
})

@ -2,6 +2,7 @@ import { describe, expect, it, vi } from 'vitest'
import { convertWithDictionary } from '../src/convert.js'
import { parquetMetadataAsync, parquetRead, parquetReadObjects } from '../src/hyparquet.js'
import { asyncBufferFromFile } from '../src/utils.js'
import { countingBuffer } from './helpers.js'
vi.mock('../src/convert.js', { spy: true })
@ -184,7 +185,7 @@ describe('parquetRead', () => {
it('reads individual pages', async () => {
const file = countingBuffer(await asyncBufferFromFile('test/files/page_indexed.parquet'))
/** @type {ColumnData[]} */
/** @type {import('../src/types.js').ColumnData[]} */
const pages = []
// check onPage callback
@ -250,24 +251,6 @@ describe('parquetRead', () => {
expect(page).toEqual(expected)
}
expect(file.fetches).toBe(3) // 1 metadata, 2 rowgroups
expect(file.bytes).toBe(6421)
})
})
/**
* Wraps an AsyncBuffer to count the number of fetches made
*
* @import {AsyncBuffer, ColumnData} from '../src/types.js'
* @param {AsyncBuffer} asyncBuffer
* @returns {AsyncBuffer & {fetches: number}}
*/
function countingBuffer(asyncBuffer) {
return {
...asyncBuffer,
fetches: 0,
slice(start, end) {
this.fetches++
return asyncBuffer.slice(start, end)
},
}
}