Pushdown filter (#141)

This commit is contained in:
Kenny Daniel 2025-11-21 03:07:56 -08:00 committed by GitHub
parent e827a849ad
commit 0a20750193
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 170 additions and 82 deletions

102
src/filter.js Normal file

@ -0,0 +1,102 @@
import { equals } from './utils.js'
/**
* Match a record against a query filter
*
* @param {Record<string, any>} record
* @param {ParquetQueryFilter} filter
* @returns {boolean}
* @example matchQuery({ id: 1 }, { id: {$gte: 1} }) // true
*/
export function matchFilter(record, filter = {}) {
if ('$and' in filter && Array.isArray(filter.$and)) {
return filter.$and.every(subQuery => matchFilter(record, subQuery))
}
if ('$or' in filter && Array.isArray(filter.$or)) {
return filter.$or.some(subQuery => matchFilter(record, subQuery))
}
if ('$nor' in filter && Array.isArray(filter.$nor)) {
return !filter.$nor.some(subQuery => matchFilter(record, subQuery))
}
return Object.entries(filter).every(([field, condition]) => {
const value = record[field]
// implicit $eq for non-object conditions
if (typeof condition !== 'object' || condition === null || Array.isArray(condition)) {
return equals(value, condition)
}
return Object.entries(condition || {}).every(([operator, target]) => {
if (operator === '$gt') return value > target
if (operator === '$gte') return value >= target
if (operator === '$lt') return value < target
if (operator === '$lte') return value <= target
if (operator === '$eq') return equals(value, target)
if (operator === '$ne') return !equals(value, target)
if (operator === '$in') return Array.isArray(target) && target.includes(value)
if (operator === '$nin') return Array.isArray(target) && !target.includes(value)
if (operator === '$not') return !matchFilter({ [field]: value }, { [field]: target })
return true
})
})
}
/**
* Check if a row group can be skipped based on filter and column statistics.
*
* @import {ParquetQueryFilter, RowGroup} from '../src/types.js'
* @param {ParquetQueryFilter | undefined} filter
* @param {RowGroup} group
* @param {string[]} physicalColumns
* @returns {boolean} true if the row group can be skipped
*/
export function canSkipRowGroup(filter, group, physicalColumns) {
if (!filter) return false
// Handle logical operators
if ('$and' in filter && Array.isArray(filter.$and)) {
// For AND, we can skip if ANY condition allows skipping
return filter.$and.some(subFilter => canSkipRowGroup(subFilter, group, physicalColumns))
}
if ('$or' in filter && Array.isArray(filter.$or)) {
// For OR, we can skip only if ALL conditions allow skipping
return filter.$or.every(subFilter => canSkipRowGroup(subFilter, group, physicalColumns))
}
if ('$nor' in filter && Array.isArray(filter.$nor)) {
// For NOR, we can skip if none of the conditions allow skipping
// This is complex, so we'll be conservative and not skip
return false
}
// Check column filters
for (const [field, condition] of Object.entries(filter)) {
// Find the column chunk for this field
const columnIndex = physicalColumns.indexOf(field)
if (columnIndex === -1) continue
const columnChunk = group.columns[columnIndex]
const stats = columnChunk.meta_data?.statistics
if (!stats) continue // No statistics available, can't skip
const { min, max, min_value, max_value } = stats
const minVal = min_value !== undefined ? min_value : min
const maxVal = max_value !== undefined ? max_value : max
if (minVal === undefined || maxVal === undefined) continue
// Handle operators
for (const [operator, target] of Object.entries(condition || {})) {
if (operator === '$gt' && maxVal <= target) return true
if (operator === '$gte' && maxVal < target) return true
if (operator === '$lt' && minVal >= target) return true
if (operator === '$lte' && minVal > target) return true
if (operator === '$eq' && (target < minVal || target > maxVal)) return true
if (operator === '$ne' && equals(minVal, maxVal) && equals(minVal, target)) return true
if (operator === '$in' && Array.isArray(target) && target.every(v => v < minVal || v > maxVal)) return true
if (operator === '$nin' && Array.isArray(target) && equals(minVal, maxVal) && target.includes(minVal)) return true
}
}
return false
}

@ -1,3 +1,6 @@
import { canSkipRowGroup } from './filter.js'
import { parquetSchema } from './metadata.js'
import { getPhysicalColumns } from './schema.js'
import { concat } from './utils.js'
// Combine column chunks into a single byte range if less than 32mb
@ -13,12 +16,13 @@ const columnChunkAggregation = 1 << 25 // 32mb
* @param {ParquetReadOptions} options
* @returns {QueryPlan}
*/
export function parquetPlan({ metadata, rowStart = 0, rowEnd = Infinity, columns }) {
export function parquetPlan({ metadata, rowStart = 0, rowEnd = Infinity, columns, filter }) {
if (!metadata) throw new Error('parquetPlan requires metadata')
/** @type {GroupPlan[]} */
const groups = []
/** @type {ByteRange[]} */
const fetches = []
const physicalColumns = getPhysicalColumns(parquetSchema(metadata))
// find which row groups to read
let groupStart = 0 // first row index of the current group
@ -26,7 +30,7 @@ export function parquetPlan({ metadata, rowStart = 0, rowEnd = Infinity, columns
const groupRows = Number(rowGroup.num_rows)
const groupEnd = groupStart + groupRows
// if row group overlaps with row range, add it to the plan
if (groupRows > 0 && groupEnd > rowStart && groupStart < rowEnd) {
if (groupRows > 0 && groupEnd > rowStart && groupStart < rowEnd && !canSkipRowGroup(filter, rowGroup, physicalColumns)) {
/** @type {ByteRange[]} */
const ranges = []
// loop through each column chunk

@ -1,17 +1,17 @@
import { matchFilter } from './filter.js'
import { parquetMetadataAsync, parquetSchema } from './metadata.js'
import { parquetReadColumn, parquetReadObjects } from './read.js'
import { equals } from './utils.js'
/**
* @import {ParquetQueryFilter, BaseParquetReadOptions} from '../src/types.js'
*/
/**
* Wraps parquetRead with filter and orderBy support.
* Wraps parquetRead with orderBy support.
* This is a parquet-aware query engine that can read a subset of rows and columns.
* Accepts optional filter object to filter the results and orderBy column name to sort the results.
* Accepts optional orderBy column name to sort the results.
* Note that using orderBy may SIGNIFICANTLY increase the query time.
*
* @param {BaseParquetReadOptions & { filter?: ParquetQueryFilter, orderBy?: string }} options
* @param {BaseParquetReadOptions & { orderBy?: string }} options
* @returns {Promise<Record<string, any>[]>} resolves when all requested rows and columns are parsed
*/
export async function parquetQuery(options) {
@ -50,13 +50,11 @@ export async function parquetQuery(options) {
const groupEnd = groupStart + Number(group.num_rows)
// TODO: if expected > group size, start fetching next groups
const groupData = await parquetReadObjects({
...options,
rowStart: groupStart,
rowEnd: groupEnd,
columns: relevantColumns,
...options, rowStart: groupStart, rowEnd: groupEnd, columns: relevantColumns,
})
// filter and project rows
for (const row of groupData) {
if (matchQuery(row, filter)) {
if (matchFilter(row, filter)) {
if (requiresProjection && relevantColumns) {
for (const column of relevantColumns) {
if (columns && !columns.includes(column)) {
@ -74,16 +72,17 @@ export async function parquetQuery(options) {
} else if (filter) {
// read all rows, sort, and filter
const results = await parquetReadObjects({
...options,
rowStart: undefined,
rowEnd: undefined,
columns: relevantColumns,
...options, rowStart: undefined, rowEnd: undefined, columns: relevantColumns,
})
// sort
if (orderBy) results.sort((a, b) => compare(a[orderBy], b[orderBy]))
// filter and project rows
/** @type {Record<string, any>[]} */
const filteredRows = new Array()
for (const row of results) {
if (matchQuery(row, filter)) {
if (matchFilter(row, filter)) {
if (requiresProjection && relevantColumns) {
for (const column of relevantColumns) {
if (columns && !columns.includes(column)) {
@ -97,7 +96,9 @@ export async function parquetQuery(options) {
return filteredRows.slice(rowStart, rowEnd)
} else if (typeof orderBy === 'string') {
// sorted but unfiltered: fetch orderBy column first
const orderColumn = await parquetReadColumn({ ...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy] })
const orderColumn = await parquetReadColumn({
...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy],
})
// compute row groups to fetch
const sortedIndices = Array.from(orderColumn, (_, index) => index)
@ -180,60 +181,6 @@ function compare(a, b) {
return 0 // TODO: null handling
}
/**
* Match a record against a query filter
*
* @param {any} record
* @param {ParquetQueryFilter} query
* @returns {boolean}
* @example matchQuery({ id: 1 }, { id: {$gte: 1} }) // true
*/
export function matchQuery(record, query = {}) {
if ('$and' in query && Array.isArray(query.$and)) {
return query.$and.every(subQuery => matchQuery(record, subQuery))
}
if ('$or' in query && Array.isArray(query.$or)) {
return query.$or.some(subQuery => matchQuery(record, subQuery))
}
if ('$nor' in query && Array.isArray(query.$nor)) {
return !query.$nor.some(subQuery => matchQuery(record, subQuery))
}
return Object.entries(query).every(([field, condition]) => {
const value = record[field]
// implicit $eq for non-object conditions
if (typeof condition !== 'object' || condition === null || Array.isArray(condition)) {
return equals(value, condition)
}
return Object.entries(condition || {}).every(([operator, target]) => {
switch (operator) {
case '$gt':
return value > target
case '$gte':
return value >= target
case '$lt':
return value < target
case '$lte':
return value <= target
case '$eq':
return equals(value, target)
case '$ne':
return !equals(value, target)
case '$in':
return Array.isArray(target) && target.includes(value)
case '$nin':
return Array.isArray(target) && !target.includes(value)
case '$not':
return !matchQuery({ [field]: value }, { [field]: target })
default:
return true
}
})
})
}
/**
* Returns an array of column names that are needed to evaluate the mongo filter.
*

@ -44,6 +44,29 @@ export function getSchemaPath(schema, name) {
return path
}
/**
* Get all physical (leaf) column names.
*
* @param {SchemaTree} schemaTree
* @returns {string[]} list of physical column names
*/
export function getPhysicalColumns(schemaTree) {
/** @type {string[]} */
const columns = []
/** @param {SchemaTree} node */
function traverse(node) {
if (node.children.length) {
for (const child of node.children) {
traverse(child)
}
} else {
columns.push(node.element.name)
}
}
traverse(schemaTree)
return columns
}
/**
* Get the max repetition level for a given schema path.
*

1
src/types.d.ts vendored

@ -27,6 +27,7 @@ export interface BaseParquetReadOptions {
file: AsyncBuffer // file-like object containing parquet data
metadata?: FileMetaData // parquet metadata, will be parsed if not provided
columns?: string[] // columns to read, all columns if undefined
filter?: ParquetQueryFilter // best-effort pushdown filter, NOT GUARANTEED to be applied
rowStart?: number // first requested row index (inclusive)
rowEnd?: number // last requested row index (exclusive)
onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may contain data outside the requested range.

@ -1,6 +1,7 @@
import { describe, expect, it } from 'vitest'
import { parquetQuery } from '../src/query.js'
import { asyncBufferFromFile } from '../src/node.js'
import { parquetMetadataAsync } from '../src/metadata.js'
import { countingBuffer } from './helpers.js'
describe('parquetQuery', () => {
@ -200,17 +201,27 @@ describe('parquetQuery', () => {
})
it('reads data efficiently with filter', async () => {
const file = countingBuffer(await asyncBufferFromFile('test/files/page_indexed.parquet'))
const rows = await parquetQuery({ file, filter: { quality: { $eq: 'good' } }, rowStart: 1, rowEnd: 5 } )
expect(rows).toEqual([
{ row: 10n, quality: 'good' },
{ row: 29n, quality: 'good' },
{ row: 32n, quality: 'good' },
{ row: 37n, quality: 'good' },
])
// if we weren't streaming row groups, this would be 3:
expect(file.fetches).toBe(2) // 1 metadata, 1 row group
expect(file.bytes).toBe(5261)
const originalFile = await asyncBufferFromFile('test/files/alpha.parquet')
// don't count metadata reads
const metadata = await parquetMetadataAsync(originalFile)
const file = countingBuffer(await asyncBufferFromFile('test/files/alpha.parquet'))
// Query for rows where id = 'kk'
const rows = await parquetQuery({ file, metadata, filter: { id: { $eq: 'kk' } } })
expect(rows).toEqual([{ id: 'kk' }])
// if we weren't skipping row groups, this would be higher
expect(file.fetches).toBe(1) // 1 row group
expect(file.bytes).toBe(437) // 3rd row group
})
it('reads data efficiently with filter and sort', async () => {
const originalFile = await asyncBufferFromFile('test/files/alpha.parquet')
// don't count metadata reads
const metadata = await parquetMetadataAsync(originalFile)
const file = countingBuffer(await asyncBufferFromFile('test/files/alpha.parquet'))
const rows = await parquetQuery({ file, metadata, filter: { id: { $gt: 'xx' } }, orderBy: 'id' } )
expect(rows[0]).toEqual({ id: 'xy' })
expect(file.fetches).toBe(1) // 1 row group
expect(file.bytes).toBe(335)
})
it('filter on columns that are not selected', async () => {