From 8050e0e38dc27d53718809b56077ca79fa326e79 Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Mon, 30 Jun 2025 01:47:05 -0700 Subject: [PATCH] Fix filter on unselected column (#95) --- eslint.config.js | 2 + src/query.js | 100 ++++++++++++++++++++++++++++++++++++++------- src/types.d.ts | 14 ++++--- test/query.test.js | 61 ++++++++++++++++++--------- 4 files changed, 137 insertions(+), 40 deletions(-) diff --git a/eslint.config.js b/eslint.config.js index f3790ed..1b37493 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -39,6 +39,8 @@ export default [ 'jsdoc/require-returns': 'error', 'jsdoc/require-returns-type': 'error', 'jsdoc/sort-tags': 'error', + 'key-spacing': 'error', + 'keyword-spacing': 'error', 'no-constant-condition': 'off', 'no-extra-parens': 'error', 'no-multi-spaces': 'error', diff --git a/src/query.js b/src/query.js index 1832422..abea2c4 100644 --- a/src/query.js +++ b/src/query.js @@ -1,5 +1,5 @@ import { parquetReadObjects } from './index.js' -import { parquetMetadataAsync } from './metadata.js' +import { parquetMetadataAsync, parquetSchema } from './metadata.js' import { parquetReadColumn } from './read.js' import { equals } from './utils.js' @@ -17,10 +17,28 @@ export async function parquetQuery(options) { throw new Error('parquet expected AsyncBuffer') } options.metadata ??= await parquetMetadataAsync(options.file) - const { metadata, rowStart = 0, orderBy, filter } = options + + const { metadata, rowStart = 0, columns, orderBy, filter } = options if (rowStart < 0) throw new Error('parquet rowStart must be positive') const rowEnd = options.rowEnd ?? Number(metadata.num_rows) + // Collect columns needed for the query + const filterColumns = columnsNeededForFilter(filter) + const allColumns = parquetSchema(options.metadata).children.map(c => c.element.name) + // Check if all filter columns exist + const missingColumns = filterColumns.filter(column => !allColumns.includes(column)) + if (missingColumns.length) { + throw new Error(`parquet filter columns not found: ${missingColumns.join(', ')}`) + } + if (orderBy && !allColumns.includes(orderBy)) { + throw new Error(`parquet orderBy column not found: ${orderBy}`) + } + const relevantColumns = columns ? allColumns.filter(column => + columns.includes(column) || filterColumns.includes(column) || column === orderBy + ) : undefined + // Is the output a subset of the relevant columns? + const requiresProjection = columns && relevantColumns ? columns.length < relevantColumns.length : false + if (filter && !orderBy && rowEnd < metadata.num_rows) { // iterate through row groups and filter until we have enough rows const filteredRows = new Array() @@ -28,9 +46,21 @@ export async function parquetQuery(options) { for (const group of metadata.row_groups) { const groupEnd = groupStart + Number(group.num_rows) // TODO: if expected > group size, start fetching next groups - const groupData = await parquetReadObjects({ ...options, rowStart: groupStart, rowEnd: groupEnd }) + const groupData = await parquetReadObjects({ + ...options, + rowStart: groupStart, + rowEnd: groupEnd, + columns: relevantColumns, + }) for (const row of groupData) { if (matchQuery(row, filter)) { + if (requiresProjection && relevantColumns) { + for (const column of relevantColumns) { + if (columns && !columns.includes(column)) { + delete row[column] // remove columns not in the projection + } + } + } filteredRows.push(row) } } @@ -40,10 +70,27 @@ export async function parquetQuery(options) { return filteredRows.slice(rowStart, rowEnd) } else if (filter) { // read all rows, sort, and filter - const results = (await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined })) - .filter(row => matchQuery(row, filter)) + const results = await parquetReadObjects({ + ...options, + rowStart: undefined, + rowEnd: undefined, + columns: relevantColumns, + }) if (orderBy) results.sort((a, b) => compare(a[orderBy], b[orderBy])) - return results.slice(rowStart, rowEnd) + const filteredRows = new Array() + for (const row of results) { + if (matchQuery(row, filter)) { + if (requiresProjection && relevantColumns) { + for (const column of relevantColumns) { + if (columns && !columns.includes(column)) { + delete row[column] // remove columns not in the projection + } + } + } + filteredRows.push(row) + } + } + return filteredRows.slice(rowStart, rowEnd) } else if (typeof orderBy === 'string') { // sorted but unfiltered: fetch orderBy column first const orderColumn = await parquetReadColumn({ ...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy] }) @@ -136,23 +183,21 @@ function compare(a, b) { * @example matchQuery({ id: 1 }, { id: {$gte: 1} }) // true */ export function matchQuery(record, query = {}) { - - if (query.$not) { - return !matchQuery(record, query.$not) - } - - if (query.$and) { + if ('$and' in query && Array.isArray(query.$and)) { return query.$and.every(subQuery => matchQuery(record, subQuery)) } - - if (query.$or) { + if ('$or' in query && Array.isArray(query.$or)) { return query.$or.some(subQuery => matchQuery(record, subQuery)) } + if ('$nor' in query && Array.isArray(query.$nor)) { + return !query.$nor.some(subQuery => matchQuery(record, subQuery)) + } return Object.entries(query).every(([field, condition]) => { const value = record[field] - if (condition !== null && (Array.isArray(condition) || typeof condition !== 'object')) { + // implicit $eq for non-object conditions + if (typeof condition !== 'object' || condition === null || Array.isArray(condition)) { return equals(value, condition) } @@ -166,6 +211,8 @@ export function matchQuery(record, query = {}) { return value < target case '$lte': return value <= target + case '$eq': + return equals(value, target) case '$ne': return !equals(value, target) case '$in': @@ -180,3 +227,26 @@ export function matchQuery(record, query = {}) { }) }) } + +/** + * Returns an array of column names that are needed to evaluate the mongo filter. + * + * @param {ParquetQueryFilter} [filter] + * @returns {string[]} + */ +function columnsNeededForFilter(filter) { + if (!filter) return [] + /** @type {string[]} */ + const columns = [] + if ('$and' in filter && Array.isArray(filter.$and)) { + columns.push(...filter.$and.flatMap(columnsNeededForFilter)) + } else if ('$or' in filter && Array.isArray(filter.$or)) { + columns.push(...filter.$or.flatMap(columnsNeededForFilter)) + } else if ('$nor' in filter && Array.isArray(filter.$nor)) { + columns.push(...filter.$nor.flatMap(columnsNeededForFilter)) + } else { + // Column filters + columns.push(...Object.keys(filter)) + } + return columns +} diff --git a/src/types.d.ts b/src/types.d.ts index e874ff0..ed7f6f8 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -36,21 +36,23 @@ export interface ParquetReadOptions { /** * Parquet query options for filtering data */ -export interface ParquetQueryFilter { - [key: string]: ParquetQueryValue | ParquetQueryOperator | ParquetQueryFilter[] | undefined - $and?: ParquetQueryFilter[] - $or?: ParquetQueryFilter[] - $not?: ParquetQueryFilter -} +export type ParquetQueryFilter = + | ParquetQueryColumnsFilter + | { $and: ParquetQueryFilter[] } + | { $or: ParquetQueryFilter[] } + | { $nor: ParquetQueryFilter[] } +type ParquetQueryColumnsFilter = { [key: string]: ParquetQueryOperator } export type ParquetQueryValue = string | number | boolean | object | null | undefined export type ParquetQueryOperator = { $gt?: ParquetQueryValue $gte?: ParquetQueryValue $lt?: ParquetQueryValue $lte?: ParquetQueryValue + $eq?: ParquetQueryValue $ne?: ParquetQueryValue $in?: ParquetQueryValue[] $nin?: ParquetQueryValue[] + $not?: ParquetQueryOperator } /** diff --git a/test/query.test.js b/test/query.test.js index d2a4be8..2b3ae02 100644 --- a/test/query.test.js +++ b/test/query.test.js @@ -54,15 +54,9 @@ describe('parquetQuery', () => { ]) }) - it('throws for invalid orderBy column', async () => { - const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') - const futureRows = parquetQuery({ file, orderBy: 'nonexistent' }) - await expect(futureRows).rejects.toThrow('parquet columns not found: nonexistent') - }) - it('reads data with filter', async () => { const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') - const rows = await parquetQuery({ file, filter: { c: 2 } }) + const rows = await parquetQuery({ file, filter: { c: { $eq: 2 } } }) expect(rows).toEqual([ { a: 'abc', b: 1, c: 2, d: true, e: [ 1, 2, 3 ] }, { a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] }, @@ -71,13 +65,13 @@ describe('parquetQuery', () => { it('reads data with filter and rowStart/rowEnd', async () => { const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') - const rows = await parquetQuery({ file, filter: { c: 2 }, rowStart: 1, rowEnd: 5 }) + const rows = await parquetQuery({ file, filter: { c: { $eq: 2 } }, rowStart: 1, rowEnd: 5 }) expect(rows).toEqual([ { a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] } ]) }) it('reads data with filter and orderBy', async () => { const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') - const rows = await parquetQuery({ file, filter: { c: 2 }, orderBy: 'b' }) + const rows = await parquetQuery({ file, filter: { c: { $eq: 2 } }, orderBy: 'b' }) expect(rows).toEqual([ { a: 'abc', b: 1, c: 2, d: true, e: [ 1, 2, 3 ] }, { a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] }, @@ -86,13 +80,23 @@ describe('parquetQuery', () => { it('reads data with filter, orderBy, and rowStart/rowEnd', async () => { const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') - const rows = await parquetQuery({ file, filter: { c: 2 }, orderBy: 'b', rowStart: 1, rowEnd: 2 }) + const rows = await parquetQuery({ file, filter: { c: { $eq: 2 } }, orderBy: 'b', rowStart: 1, rowEnd: 2 }) expect(rows).toEqual([ { a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] } ]) }) + it('reads data with multiple column filter operators', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { c: { $gt: 1, $lt: 4 }, d: { $eq: true } } }) + expect(rows).toEqual([ + { a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] }, + { a: 'abc', b: 2, c: 3, d: true }, + { a: 'abc', b: 5, c: 2, d: true, e: [1, 2] }, + ]) + }) + it('reads data with $and filter', async () => { const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') - const rows = await parquetQuery({ file, filter: { $and: [{ c: 2 }, { e: [1, 2, 3] }] } }) + const rows = await parquetQuery({ file, filter: { $and: [{ c: { $eq: 2 } }, { e: { $eq: [1, 2, 3] } }] } }) expect(rows).toEqual([ { a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] }, ]) @@ -100,7 +104,7 @@ describe('parquetQuery', () => { it('reads data with $or filter', async () => { const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') - const rows = await parquetQuery({ file, filter: { $or: [{ c: 2 }, { d: false }] } }) + const rows = await parquetQuery({ file, filter: { $or: [{ c: { $eq: 2 } }, { d: { $eq: false } }] } }) expect(rows).toEqual([ { a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] }, { a: null, b: 4, c: 5, d: false, e: [1, 2, 3] }, @@ -108,19 +112,17 @@ describe('parquetQuery', () => { ]) }) - it('reads data with $not filter', async () => { + it('reads data with $nor filter', async () => { const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') - const rows = await parquetQuery({ file, filter: { $not: { c: 2 } } }) + const rows = await parquetQuery({ file, filter: { $nor: [{ c: { $eq: 2 } }, { d: { $eq: true } }] } }) expect(rows).toEqual([ - { a: 'abc', b: 2, c: 3, d: true }, - { a: 'abc', b: 3, c: 4, d: true }, { a: null, b: 4, c: 5, d: false, e: [1, 2, 3] }, ]) }) - it('reads data with $not value filter', async () => { + it('reads data with $not filter', async () => { const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') - const rows = await parquetQuery({ file, filter: { c: { $not: 2 } } }) + const rows = await parquetQuery({ file, filter: { c: { $not: { $eq: 2 } } } }) expect(rows).toEqual([ { a: 'abc', b: 2, c: 3, d: true }, { a: 'abc', b: 3, c: 4, d: true }, @@ -199,7 +201,7 @@ describe('parquetQuery', () => { it('reads data efficiently with filter', async () => { const file = countingBuffer(await asyncBufferFromFile('test/files/page_indexed.parquet')) - const rows = await parquetQuery({ file, filter: { quality: 'good' }, rowStart: 1, rowEnd: 5 }) + const rows = await parquetQuery({ file, filter: { quality: { $eq: 'good' } }, rowStart: 1, rowEnd: 5 } ) expect(rows).toEqual([ { row: 10n, quality: 'good' }, { row: 29n, quality: 'good' }, @@ -210,4 +212,25 @@ describe('parquetQuery', () => { expect(file.fetches).toBe(2) // 1 metadata, 1 row group expect(file.bytes).toBe(5261) }) + + it('filter on columns that are not selected', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, columns: ['a', 'b'], filter: { c: { $eq: 2 } } }) + expect(rows).toEqual([ + { a: 'abc', b: 1 }, + { a: 'abc', b: 5 }, + ]) + }) + + it('throws on non-existent column in filter', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + await expect(parquetQuery({ file, filter: { nonExistent: { $eq: 1 } } })) + .rejects.toThrow('parquet filter columns not found: nonExistent') + }) + + it('throws on non-existent column in orderBy', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + await expect(parquetQuery({ file, orderBy: 'nonExistent' })) + .rejects.toThrow('parquet orderBy column not found: nonExistent') + }) })