From 0a20750193631acaa271d15177cccd4feb3b016d Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Fri, 21 Nov 2025 03:07:56 -0800 Subject: [PATCH] Pushdown filter (#141) --- src/filter.js | 102 +++++++++++++++++++++++++++++++++++++++++++++ src/plan.js | 8 +++- src/query.js | 85 +++++++------------------------------ src/schema.js | 23 ++++++++++ src/types.d.ts | 1 + test/query.test.js | 33 ++++++++++----- 6 files changed, 170 insertions(+), 82 deletions(-) create mode 100644 src/filter.js diff --git a/src/filter.js b/src/filter.js new file mode 100644 index 0000000..b292f40 --- /dev/null +++ b/src/filter.js @@ -0,0 +1,102 @@ +import { equals } from './utils.js' + +/** + * Match a record against a query filter + * + * @param {Record} record + * @param {ParquetQueryFilter} filter + * @returns {boolean} + * @example matchQuery({ id: 1 }, { id: {$gte: 1} }) // true + */ +export function matchFilter(record, filter = {}) { + if ('$and' in filter && Array.isArray(filter.$and)) { + return filter.$and.every(subQuery => matchFilter(record, subQuery)) + } + if ('$or' in filter && Array.isArray(filter.$or)) { + return filter.$or.some(subQuery => matchFilter(record, subQuery)) + } + if ('$nor' in filter && Array.isArray(filter.$nor)) { + return !filter.$nor.some(subQuery => matchFilter(record, subQuery)) + } + + return Object.entries(filter).every(([field, condition]) => { + const value = record[field] + + // implicit $eq for non-object conditions + if (typeof condition !== 'object' || condition === null || Array.isArray(condition)) { + return equals(value, condition) + } + + return Object.entries(condition || {}).every(([operator, target]) => { + if (operator === '$gt') return value > target + if (operator === '$gte') return value >= target + if (operator === '$lt') return value < target + if (operator === '$lte') return value <= target + if (operator === '$eq') return equals(value, target) + if (operator === '$ne') return !equals(value, target) + if (operator === '$in') return Array.isArray(target) && target.includes(value) + if (operator === '$nin') return Array.isArray(target) && !target.includes(value) + if (operator === '$not') return !matchFilter({ [field]: value }, { [field]: target }) + return true + }) + }) +} + +/** + * Check if a row group can be skipped based on filter and column statistics. + * + * @import {ParquetQueryFilter, RowGroup} from '../src/types.js' + * @param {ParquetQueryFilter | undefined} filter + * @param {RowGroup} group + * @param {string[]} physicalColumns + * @returns {boolean} true if the row group can be skipped + */ +export function canSkipRowGroup(filter, group, physicalColumns) { + if (!filter) return false + + // Handle logical operators + if ('$and' in filter && Array.isArray(filter.$and)) { + // For AND, we can skip if ANY condition allows skipping + return filter.$and.some(subFilter => canSkipRowGroup(subFilter, group, physicalColumns)) + } + if ('$or' in filter && Array.isArray(filter.$or)) { + // For OR, we can skip only if ALL conditions allow skipping + return filter.$or.every(subFilter => canSkipRowGroup(subFilter, group, physicalColumns)) + } + if ('$nor' in filter && Array.isArray(filter.$nor)) { + // For NOR, we can skip if none of the conditions allow skipping + // This is complex, so we'll be conservative and not skip + return false + } + + // Check column filters + for (const [field, condition] of Object.entries(filter)) { + // Find the column chunk for this field + const columnIndex = physicalColumns.indexOf(field) + if (columnIndex === -1) continue + + const columnChunk = group.columns[columnIndex] + const stats = columnChunk.meta_data?.statistics + if (!stats) continue // No statistics available, can't skip + + const { min, max, min_value, max_value } = stats + const minVal = min_value !== undefined ? min_value : min + const maxVal = max_value !== undefined ? max_value : max + + if (minVal === undefined || maxVal === undefined) continue + + // Handle operators + for (const [operator, target] of Object.entries(condition || {})) { + if (operator === '$gt' && maxVal <= target) return true + if (operator === '$gte' && maxVal < target) return true + if (operator === '$lt' && minVal >= target) return true + if (operator === '$lte' && minVal > target) return true + if (operator === '$eq' && (target < minVal || target > maxVal)) return true + if (operator === '$ne' && equals(minVal, maxVal) && equals(minVal, target)) return true + if (operator === '$in' && Array.isArray(target) && target.every(v => v < minVal || v > maxVal)) return true + if (operator === '$nin' && Array.isArray(target) && equals(minVal, maxVal) && target.includes(minVal)) return true + } + } + + return false +} diff --git a/src/plan.js b/src/plan.js index 5304559..615f501 100644 --- a/src/plan.js +++ b/src/plan.js @@ -1,3 +1,6 @@ +import { canSkipRowGroup } from './filter.js' +import { parquetSchema } from './metadata.js' +import { getPhysicalColumns } from './schema.js' import { concat } from './utils.js' // Combine column chunks into a single byte range if less than 32mb @@ -13,12 +16,13 @@ const columnChunkAggregation = 1 << 25 // 32mb * @param {ParquetReadOptions} options * @returns {QueryPlan} */ -export function parquetPlan({ metadata, rowStart = 0, rowEnd = Infinity, columns }) { +export function parquetPlan({ metadata, rowStart = 0, rowEnd = Infinity, columns, filter }) { if (!metadata) throw new Error('parquetPlan requires metadata') /** @type {GroupPlan[]} */ const groups = [] /** @type {ByteRange[]} */ const fetches = [] + const physicalColumns = getPhysicalColumns(parquetSchema(metadata)) // find which row groups to read let groupStart = 0 // first row index of the current group @@ -26,7 +30,7 @@ export function parquetPlan({ metadata, rowStart = 0, rowEnd = Infinity, columns const groupRows = Number(rowGroup.num_rows) const groupEnd = groupStart + groupRows // if row group overlaps with row range, add it to the plan - if (groupRows > 0 && groupEnd > rowStart && groupStart < rowEnd) { + if (groupRows > 0 && groupEnd > rowStart && groupStart < rowEnd && !canSkipRowGroup(filter, rowGroup, physicalColumns)) { /** @type {ByteRange[]} */ const ranges = [] // loop through each column chunk diff --git a/src/query.js b/src/query.js index bffd891..1157fbf 100644 --- a/src/query.js +++ b/src/query.js @@ -1,17 +1,17 @@ +import { matchFilter } from './filter.js' import { parquetMetadataAsync, parquetSchema } from './metadata.js' import { parquetReadColumn, parquetReadObjects } from './read.js' -import { equals } from './utils.js' /** * @import {ParquetQueryFilter, BaseParquetReadOptions} from '../src/types.js' */ /** - * Wraps parquetRead with filter and orderBy support. + * Wraps parquetRead with orderBy support. * This is a parquet-aware query engine that can read a subset of rows and columns. - * Accepts optional filter object to filter the results and orderBy column name to sort the results. + * Accepts optional orderBy column name to sort the results. * Note that using orderBy may SIGNIFICANTLY increase the query time. * - * @param {BaseParquetReadOptions & { filter?: ParquetQueryFilter, orderBy?: string }} options + * @param {BaseParquetReadOptions & { orderBy?: string }} options * @returns {Promise[]>} resolves when all requested rows and columns are parsed */ export async function parquetQuery(options) { @@ -50,13 +50,11 @@ export async function parquetQuery(options) { const groupEnd = groupStart + Number(group.num_rows) // TODO: if expected > group size, start fetching next groups const groupData = await parquetReadObjects({ - ...options, - rowStart: groupStart, - rowEnd: groupEnd, - columns: relevantColumns, + ...options, rowStart: groupStart, rowEnd: groupEnd, columns: relevantColumns, }) + // filter and project rows for (const row of groupData) { - if (matchQuery(row, filter)) { + if (matchFilter(row, filter)) { if (requiresProjection && relevantColumns) { for (const column of relevantColumns) { if (columns && !columns.includes(column)) { @@ -74,16 +72,17 @@ export async function parquetQuery(options) { } else if (filter) { // read all rows, sort, and filter const results = await parquetReadObjects({ - ...options, - rowStart: undefined, - rowEnd: undefined, - columns: relevantColumns, + ...options, rowStart: undefined, rowEnd: undefined, columns: relevantColumns, }) + + // sort if (orderBy) results.sort((a, b) => compare(a[orderBy], b[orderBy])) + + // filter and project rows /** @type {Record[]} */ const filteredRows = new Array() for (const row of results) { - if (matchQuery(row, filter)) { + if (matchFilter(row, filter)) { if (requiresProjection && relevantColumns) { for (const column of relevantColumns) { if (columns && !columns.includes(column)) { @@ -97,7 +96,9 @@ export async function parquetQuery(options) { return filteredRows.slice(rowStart, rowEnd) } else if (typeof orderBy === 'string') { // sorted but unfiltered: fetch orderBy column first - const orderColumn = await parquetReadColumn({ ...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy] }) + const orderColumn = await parquetReadColumn({ + ...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy], + }) // compute row groups to fetch const sortedIndices = Array.from(orderColumn, (_, index) => index) @@ -180,60 +181,6 @@ function compare(a, b) { return 0 // TODO: null handling } -/** - * Match a record against a query filter - * - * @param {any} record - * @param {ParquetQueryFilter} query - * @returns {boolean} - * @example matchQuery({ id: 1 }, { id: {$gte: 1} }) // true - */ -export function matchQuery(record, query = {}) { - if ('$and' in query && Array.isArray(query.$and)) { - return query.$and.every(subQuery => matchQuery(record, subQuery)) - } - if ('$or' in query && Array.isArray(query.$or)) { - return query.$or.some(subQuery => matchQuery(record, subQuery)) - } - if ('$nor' in query && Array.isArray(query.$nor)) { - return !query.$nor.some(subQuery => matchQuery(record, subQuery)) - } - - return Object.entries(query).every(([field, condition]) => { - const value = record[field] - - // implicit $eq for non-object conditions - if (typeof condition !== 'object' || condition === null || Array.isArray(condition)) { - return equals(value, condition) - } - - return Object.entries(condition || {}).every(([operator, target]) => { - switch (operator) { - case '$gt': - return value > target - case '$gte': - return value >= target - case '$lt': - return value < target - case '$lte': - return value <= target - case '$eq': - return equals(value, target) - case '$ne': - return !equals(value, target) - case '$in': - return Array.isArray(target) && target.includes(value) - case '$nin': - return Array.isArray(target) && !target.includes(value) - case '$not': - return !matchQuery({ [field]: value }, { [field]: target }) - default: - return true - } - }) - }) -} - /** * Returns an array of column names that are needed to evaluate the mongo filter. * diff --git a/src/schema.js b/src/schema.js index a63199d..9e24f6b 100644 --- a/src/schema.js +++ b/src/schema.js @@ -44,6 +44,29 @@ export function getSchemaPath(schema, name) { return path } +/** + * Get all physical (leaf) column names. + * + * @param {SchemaTree} schemaTree + * @returns {string[]} list of physical column names + */ +export function getPhysicalColumns(schemaTree) { + /** @type {string[]} */ + const columns = [] + /** @param {SchemaTree} node */ + function traverse(node) { + if (node.children.length) { + for (const child of node.children) { + traverse(child) + } + } else { + columns.push(node.element.name) + } + } + traverse(schemaTree) + return columns +} + /** * Get the max repetition level for a given schema path. * diff --git a/src/types.d.ts b/src/types.d.ts index a124ad5..b1cacae 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -27,6 +27,7 @@ export interface BaseParquetReadOptions { file: AsyncBuffer // file-like object containing parquet data metadata?: FileMetaData // parquet metadata, will be parsed if not provided columns?: string[] // columns to read, all columns if undefined + filter?: ParquetQueryFilter // best-effort pushdown filter, NOT GUARANTEED to be applied rowStart?: number // first requested row index (inclusive) rowEnd?: number // last requested row index (exclusive) onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may contain data outside the requested range. diff --git a/test/query.test.js b/test/query.test.js index 2b3ae02..ecb075c 100644 --- a/test/query.test.js +++ b/test/query.test.js @@ -1,6 +1,7 @@ import { describe, expect, it } from 'vitest' import { parquetQuery } from '../src/query.js' import { asyncBufferFromFile } from '../src/node.js' +import { parquetMetadataAsync } from '../src/metadata.js' import { countingBuffer } from './helpers.js' describe('parquetQuery', () => { @@ -200,17 +201,27 @@ describe('parquetQuery', () => { }) it('reads data efficiently with filter', async () => { - const file = countingBuffer(await asyncBufferFromFile('test/files/page_indexed.parquet')) - const rows = await parquetQuery({ file, filter: { quality: { $eq: 'good' } }, rowStart: 1, rowEnd: 5 } ) - expect(rows).toEqual([ - { row: 10n, quality: 'good' }, - { row: 29n, quality: 'good' }, - { row: 32n, quality: 'good' }, - { row: 37n, quality: 'good' }, - ]) - // if we weren't streaming row groups, this would be 3: - expect(file.fetches).toBe(2) // 1 metadata, 1 row group - expect(file.bytes).toBe(5261) + const originalFile = await asyncBufferFromFile('test/files/alpha.parquet') + // don't count metadata reads + const metadata = await parquetMetadataAsync(originalFile) + const file = countingBuffer(await asyncBufferFromFile('test/files/alpha.parquet')) + // Query for rows where id = 'kk' + const rows = await parquetQuery({ file, metadata, filter: { id: { $eq: 'kk' } } }) + expect(rows).toEqual([{ id: 'kk' }]) + // if we weren't skipping row groups, this would be higher + expect(file.fetches).toBe(1) // 1 row group + expect(file.bytes).toBe(437) // 3rd row group + }) + + it('reads data efficiently with filter and sort', async () => { + const originalFile = await asyncBufferFromFile('test/files/alpha.parquet') + // don't count metadata reads + const metadata = await parquetMetadataAsync(originalFile) + const file = countingBuffer(await asyncBufferFromFile('test/files/alpha.parquet')) + const rows = await parquetQuery({ file, metadata, filter: { id: { $gt: 'xx' } }, orderBy: 'id' } ) + expect(rows[0]).toEqual({ id: 'xy' }) + expect(file.fetches).toBe(1) // 1 row group + expect(file.bytes).toBe(335) }) it('filter on columns that are not selected', async () => {