Fix filter on unselected column (#95)

This commit is contained in:
Kenny Daniel 2025-06-30 01:47:05 -07:00 committed by GitHub
parent 52f056f6be
commit 8050e0e38d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 137 additions and 40 deletions

@ -39,6 +39,8 @@ export default [
'jsdoc/require-returns': 'error',
'jsdoc/require-returns-type': 'error',
'jsdoc/sort-tags': 'error',
'key-spacing': 'error',
'keyword-spacing': 'error',
'no-constant-condition': 'off',
'no-extra-parens': 'error',
'no-multi-spaces': 'error',

@ -1,5 +1,5 @@
import { parquetReadObjects } from './index.js'
import { parquetMetadataAsync } from './metadata.js'
import { parquetMetadataAsync, parquetSchema } from './metadata.js'
import { parquetReadColumn } from './read.js'
import { equals } from './utils.js'
@ -17,10 +17,28 @@ export async function parquetQuery(options) {
throw new Error('parquet expected AsyncBuffer')
}
options.metadata ??= await parquetMetadataAsync(options.file)
const { metadata, rowStart = 0, orderBy, filter } = options
const { metadata, rowStart = 0, columns, orderBy, filter } = options
if (rowStart < 0) throw new Error('parquet rowStart must be positive')
const rowEnd = options.rowEnd ?? Number(metadata.num_rows)
// Collect columns needed for the query
const filterColumns = columnsNeededForFilter(filter)
const allColumns = parquetSchema(options.metadata).children.map(c => c.element.name)
// Check if all filter columns exist
const missingColumns = filterColumns.filter(column => !allColumns.includes(column))
if (missingColumns.length) {
throw new Error(`parquet filter columns not found: ${missingColumns.join(', ')}`)
}
if (orderBy && !allColumns.includes(orderBy)) {
throw new Error(`parquet orderBy column not found: ${orderBy}`)
}
const relevantColumns = columns ? allColumns.filter(column =>
columns.includes(column) || filterColumns.includes(column) || column === orderBy
) : undefined
// Is the output a subset of the relevant columns?
const requiresProjection = columns && relevantColumns ? columns.length < relevantColumns.length : false
if (filter && !orderBy && rowEnd < metadata.num_rows) {
// iterate through row groups and filter until we have enough rows
const filteredRows = new Array()
@ -28,9 +46,21 @@ export async function parquetQuery(options) {
for (const group of metadata.row_groups) {
const groupEnd = groupStart + Number(group.num_rows)
// TODO: if expected > group size, start fetching next groups
const groupData = await parquetReadObjects({ ...options, rowStart: groupStart, rowEnd: groupEnd })
const groupData = await parquetReadObjects({
...options,
rowStart: groupStart,
rowEnd: groupEnd,
columns: relevantColumns,
})
for (const row of groupData) {
if (matchQuery(row, filter)) {
if (requiresProjection && relevantColumns) {
for (const column of relevantColumns) {
if (columns && !columns.includes(column)) {
delete row[column] // remove columns not in the projection
}
}
}
filteredRows.push(row)
}
}
@ -40,10 +70,27 @@ export async function parquetQuery(options) {
return filteredRows.slice(rowStart, rowEnd)
} else if (filter) {
// read all rows, sort, and filter
const results = (await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined }))
.filter(row => matchQuery(row, filter))
const results = await parquetReadObjects({
...options,
rowStart: undefined,
rowEnd: undefined,
columns: relevantColumns,
})
if (orderBy) results.sort((a, b) => compare(a[orderBy], b[orderBy]))
return results.slice(rowStart, rowEnd)
const filteredRows = new Array()
for (const row of results) {
if (matchQuery(row, filter)) {
if (requiresProjection && relevantColumns) {
for (const column of relevantColumns) {
if (columns && !columns.includes(column)) {
delete row[column] // remove columns not in the projection
}
}
}
filteredRows.push(row)
}
}
return filteredRows.slice(rowStart, rowEnd)
} else if (typeof orderBy === 'string') {
// sorted but unfiltered: fetch orderBy column first
const orderColumn = await parquetReadColumn({ ...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy] })
@ -136,23 +183,21 @@ function compare(a, b) {
* @example matchQuery({ id: 1 }, { id: {$gte: 1} }) // true
*/
export function matchQuery(record, query = {}) {
if (query.$not) {
return !matchQuery(record, query.$not)
}
if (query.$and) {
if ('$and' in query && Array.isArray(query.$and)) {
return query.$and.every(subQuery => matchQuery(record, subQuery))
}
if (query.$or) {
if ('$or' in query && Array.isArray(query.$or)) {
return query.$or.some(subQuery => matchQuery(record, subQuery))
}
if ('$nor' in query && Array.isArray(query.$nor)) {
return !query.$nor.some(subQuery => matchQuery(record, subQuery))
}
return Object.entries(query).every(([field, condition]) => {
const value = record[field]
if (condition !== null && (Array.isArray(condition) || typeof condition !== 'object')) {
// implicit $eq for non-object conditions
if (typeof condition !== 'object' || condition === null || Array.isArray(condition)) {
return equals(value, condition)
}
@ -166,6 +211,8 @@ export function matchQuery(record, query = {}) {
return value < target
case '$lte':
return value <= target
case '$eq':
return equals(value, target)
case '$ne':
return !equals(value, target)
case '$in':
@ -180,3 +227,26 @@ export function matchQuery(record, query = {}) {
})
})
}
/**
* Returns an array of column names that are needed to evaluate the mongo filter.
*
* @param {ParquetQueryFilter} [filter]
* @returns {string[]}
*/
function columnsNeededForFilter(filter) {
if (!filter) return []
/** @type {string[]} */
const columns = []
if ('$and' in filter && Array.isArray(filter.$and)) {
columns.push(...filter.$and.flatMap(columnsNeededForFilter))
} else if ('$or' in filter && Array.isArray(filter.$or)) {
columns.push(...filter.$or.flatMap(columnsNeededForFilter))
} else if ('$nor' in filter && Array.isArray(filter.$nor)) {
columns.push(...filter.$nor.flatMap(columnsNeededForFilter))
} else {
// Column filters
columns.push(...Object.keys(filter))
}
return columns
}

14
src/types.d.ts vendored

@ -36,21 +36,23 @@ export interface ParquetReadOptions {
/**
* Parquet query options for filtering data
*/
export interface ParquetQueryFilter {
[key: string]: ParquetQueryValue | ParquetQueryOperator | ParquetQueryFilter[] | undefined
$and?: ParquetQueryFilter[]
$or?: ParquetQueryFilter[]
$not?: ParquetQueryFilter
}
export type ParquetQueryFilter =
| ParquetQueryColumnsFilter
| { $and: ParquetQueryFilter[] }
| { $or: ParquetQueryFilter[] }
| { $nor: ParquetQueryFilter[] }
type ParquetQueryColumnsFilter = { [key: string]: ParquetQueryOperator }
export type ParquetQueryValue = string | number | boolean | object | null | undefined
export type ParquetQueryOperator = {
$gt?: ParquetQueryValue
$gte?: ParquetQueryValue
$lt?: ParquetQueryValue
$lte?: ParquetQueryValue
$eq?: ParquetQueryValue
$ne?: ParquetQueryValue
$in?: ParquetQueryValue[]
$nin?: ParquetQueryValue[]
$not?: ParquetQueryOperator
}
/**

@ -54,15 +54,9 @@ describe('parquetQuery', () => {
])
})
it('throws for invalid orderBy column', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const futureRows = parquetQuery({ file, orderBy: 'nonexistent' })
await expect(futureRows).rejects.toThrow('parquet columns not found: nonexistent')
})
it('reads data with filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { c: 2 } })
const rows = await parquetQuery({ file, filter: { c: { $eq: 2 } } })
expect(rows).toEqual([
{ a: 'abc', b: 1, c: 2, d: true, e: [ 1, 2, 3 ] },
{ a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] },
@ -71,13 +65,13 @@ describe('parquetQuery', () => {
it('reads data with filter and rowStart/rowEnd', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { c: 2 }, rowStart: 1, rowEnd: 5 })
const rows = await parquetQuery({ file, filter: { c: { $eq: 2 } }, rowStart: 1, rowEnd: 5 })
expect(rows).toEqual([ { a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] } ])
})
it('reads data with filter and orderBy', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { c: 2 }, orderBy: 'b' })
const rows = await parquetQuery({ file, filter: { c: { $eq: 2 } }, orderBy: 'b' })
expect(rows).toEqual([
{ a: 'abc', b: 1, c: 2, d: true, e: [ 1, 2, 3 ] },
{ a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] },
@ -86,13 +80,23 @@ describe('parquetQuery', () => {
it('reads data with filter, orderBy, and rowStart/rowEnd', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { c: 2 }, orderBy: 'b', rowStart: 1, rowEnd: 2 })
const rows = await parquetQuery({ file, filter: { c: { $eq: 2 } }, orderBy: 'b', rowStart: 1, rowEnd: 2 })
expect(rows).toEqual([ { a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] } ])
})
it('reads data with multiple column filter operators', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { c: { $gt: 1, $lt: 4 }, d: { $eq: true } } })
expect(rows).toEqual([
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
{ a: 'abc', b: 2, c: 3, d: true },
{ a: 'abc', b: 5, c: 2, d: true, e: [1, 2] },
])
})
it('reads data with $and filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { $and: [{ c: 2 }, { e: [1, 2, 3] }] } })
const rows = await parquetQuery({ file, filter: { $and: [{ c: { $eq: 2 } }, { e: { $eq: [1, 2, 3] } }] } })
expect(rows).toEqual([
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
])
@ -100,7 +104,7 @@ describe('parquetQuery', () => {
it('reads data with $or filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { $or: [{ c: 2 }, { d: false }] } })
const rows = await parquetQuery({ file, filter: { $or: [{ c: { $eq: 2 } }, { d: { $eq: false } }] } })
expect(rows).toEqual([
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
@ -108,19 +112,17 @@ describe('parquetQuery', () => {
])
})
it('reads data with $not filter', async () => {
it('reads data with $nor filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { $not: { c: 2 } } })
const rows = await parquetQuery({ file, filter: { $nor: [{ c: { $eq: 2 } }, { d: { $eq: true } }] } })
expect(rows).toEqual([
{ a: 'abc', b: 2, c: 3, d: true },
{ a: 'abc', b: 3, c: 4, d: true },
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
])
})
it('reads data with $not value filter', async () => {
it('reads data with $not filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, filter: { c: { $not: 2 } } })
const rows = await parquetQuery({ file, filter: { c: { $not: { $eq: 2 } } } })
expect(rows).toEqual([
{ a: 'abc', b: 2, c: 3, d: true },
{ a: 'abc', b: 3, c: 4, d: true },
@ -199,7 +201,7 @@ describe('parquetQuery', () => {
it('reads data efficiently with filter', async () => {
const file = countingBuffer(await asyncBufferFromFile('test/files/page_indexed.parquet'))
const rows = await parquetQuery({ file, filter: { quality: 'good' }, rowStart: 1, rowEnd: 5 })
const rows = await parquetQuery({ file, filter: { quality: { $eq: 'good' } }, rowStart: 1, rowEnd: 5 } )
expect(rows).toEqual([
{ row: 10n, quality: 'good' },
{ row: 29n, quality: 'good' },
@ -210,4 +212,25 @@ describe('parquetQuery', () => {
expect(file.fetches).toBe(2) // 1 metadata, 1 row group
expect(file.bytes).toBe(5261)
})
it('filter on columns that are not selected', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, columns: ['a', 'b'], filter: { c: { $eq: 2 } } })
expect(rows).toEqual([
{ a: 'abc', b: 1 },
{ a: 'abc', b: 5 },
])
})
it('throws on non-existent column in filter', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
await expect(parquetQuery({ file, filter: { nonExistent: { $eq: 1 } } }))
.rejects.toThrow('parquet filter columns not found: nonExistent')
})
it('throws on non-existent column in orderBy', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
await expect(parquetQuery({ file, orderBy: 'nonExistent' }))
.rejects.toThrow('parquet orderBy column not found: nonExistent')
})
})