mirror of
https://github.com/asadbek064/hyparquet.git
synced 2025-12-30 08:56:37 +00:00
Query filter (#56)
* implement ParquetQueryFilter types * implement parquetQuery filter tests * implement parquetQuery filter * filter before ordering * apply filters before sorting/slicing * format types * add deep equality utility * document and format equals utility * use deep equality checks * update filter tests * support more types for equality * make $not unary * ensure arrays are correctly compared * support both forms of $not * add operator tests * Filter operator tests --------- Co-authored-by: Brian Park <park-brian@users.noreply.github.com> Co-authored-by: Kenny Daniel <platypii@gmail.com>
This commit is contained in:
parent
cb639a0b45
commit
c9727a4246
73
src/query.js
73
src/query.js
@ -1,22 +1,31 @@
|
||||
import { parquetReadObjects } from './hyparquet.js'
|
||||
import { parquetMetadataAsync } from './metadata.js'
|
||||
import { equals } from './utils.js'
|
||||
|
||||
/**
|
||||
* Wraps parquetRead with orderBy support.
|
||||
* Wraps parquetRead with filter and orderBy support.
|
||||
* This is a parquet-aware query engine that can read a subset of rows and columns.
|
||||
* Accepts an optional orderBy column name to sort the results.
|
||||
* Accepts optional filter object to filter the results and orderBy column name to sort the results.
|
||||
* Note that using orderBy may SIGNIFICANTLY increase the query time.
|
||||
*
|
||||
* @param {ParquetReadOptions & { orderBy?: string }} options
|
||||
* @import {ParquetQueryFilter} from '../src/types.d.ts'
|
||||
* @param {ParquetReadOptions & { filter?: ParquetQueryFilter, orderBy?: string }} options
|
||||
* @returns {Promise<Record<string, any>[]>} resolves when all requested rows and columns are parsed
|
||||
*/
|
||||
export async function parquetQuery(options) {
|
||||
const { file, rowStart, rowEnd, orderBy } = options
|
||||
const { file, rowStart, rowEnd, orderBy, filter } = options
|
||||
options.metadata ||= await parquetMetadataAsync(file)
|
||||
|
||||
// TODO: Faster path for: no orderBy, no rowStart/rowEnd, one row group
|
||||
|
||||
if (typeof orderBy === 'string') {
|
||||
if (filter) {
|
||||
// TODO: Move filter to parquetRead for performance
|
||||
const results = await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined })
|
||||
return results
|
||||
.filter(row => matchQuery(row, filter))
|
||||
.sort((a, b) => orderBy ? compare(a[orderBy], b[orderBy]) : 0)
|
||||
.slice(rowStart, rowEnd)
|
||||
} else if (typeof orderBy === 'string') {
|
||||
// Fetch orderBy column first
|
||||
const orderColumn = await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy] })
|
||||
|
||||
@ -98,3 +107,57 @@ function compare(a, b) {
|
||||
if (a > b) return 1
|
||||
return 1 // TODO: how to handle nulls?
|
||||
}
|
||||
|
||||
/**
|
||||
* Match a record against a query filter
|
||||
*
|
||||
* @param {any} record
|
||||
* @param {ParquetQueryFilter} query
|
||||
* @returns {boolean}
|
||||
* @example matchQuery({ id: 1 }, { id: {$gte: 1} }) // true
|
||||
*/
|
||||
export function matchQuery(record, query = {}) {
|
||||
|
||||
if (query.$not) {
|
||||
return !matchQuery(record, query.$not)
|
||||
}
|
||||
|
||||
if (query.$and) {
|
||||
return query.$and.every(subQuery => matchQuery(record, subQuery))
|
||||
}
|
||||
|
||||
if (query.$or) {
|
||||
return query.$or.some(subQuery => matchQuery(record, subQuery))
|
||||
}
|
||||
|
||||
return Object.entries(query).every(([field, condition]) => {
|
||||
const value = record[field]
|
||||
|
||||
if (condition !== null && (Array.isArray(condition) || typeof condition !== 'object')) {
|
||||
return equals(value, condition)
|
||||
}
|
||||
|
||||
return Object.entries(condition || {}).every(([operator, target]) => {
|
||||
switch (operator) {
|
||||
case '$gt':
|
||||
return value > target
|
||||
case '$gte':
|
||||
return value >= target
|
||||
case '$lt':
|
||||
return value < target
|
||||
case '$lte':
|
||||
return value <= target
|
||||
case '$ne':
|
||||
return !equals(value, target)
|
||||
case '$in':
|
||||
return Array.isArray(target) && target.includes(value)
|
||||
case '$nin':
|
||||
return Array.isArray(target) && !target.includes(value)
|
||||
case '$not':
|
||||
return !matchQuery({ [field]: value }, { [field]: target })
|
||||
default:
|
||||
return true
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
19
src/types.d.ts
vendored
19
src/types.d.ts
vendored
@ -350,3 +350,22 @@ export interface ParquetReadOptions {
|
||||
compressors?: Compressors // custom decompressors
|
||||
utf8?: boolean // decode byte arrays as utf8 strings (default true)
|
||||
}
|
||||
|
||||
export type ParquetQueryValue = string | number | boolean | object | null | undefined
|
||||
|
||||
export type ParquetQueryOperator = {
|
||||
$gt?: ParquetQueryValue
|
||||
$gte?: ParquetQueryValue
|
||||
$lt?: ParquetQueryValue
|
||||
$lte?: ParquetQueryValue
|
||||
$ne?: ParquetQueryValue
|
||||
$in?: ParquetQueryValue[]
|
||||
$nin?: ParquetQueryValue[]
|
||||
}
|
||||
|
||||
export interface ParquetQueryFilter {
|
||||
[key: string]: ParquetQueryValue | ParquetQueryOperator | ParquetQueryFilter[] | undefined
|
||||
$and?: ParquetQueryFilter[]
|
||||
$or?: ParquetQueryFilter[]
|
||||
$not?: ParquetQueryFilter
|
||||
}
|
||||
16
src/utils.js
16
src/utils.js
@ -37,6 +37,22 @@ export function concat(aaa, bbb) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deep equality comparison
|
||||
*
|
||||
* @param {any} a First object to compare
|
||||
* @param {any} b Second object to compare
|
||||
* @returns {boolean} true if objects are equal
|
||||
*/
|
||||
export function equals(a, b) {
|
||||
if (a === b) return true
|
||||
if (a instanceof Uint8Array && b instanceof Uint8Array) return equals(Array.from(a), Array.from(b))
|
||||
if (!a || !b || typeof a !== typeof b) return false
|
||||
return Array.isArray(a) && Array.isArray(b)
|
||||
? a.length === b.length && a.every((v, i) => equals(v, b[i]))
|
||||
: typeof a === 'object' && Object.keys(a).length === Object.keys(b).length && Object.keys(a).every(k => equals(a[k], b[k]))
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the byte length of a URL using a HEAD request.
|
||||
* If requestInit is provided, it will be passed to fetch.
|
||||
|
||||
@ -58,4 +58,141 @@ describe('parquetQuery', () => {
|
||||
const futureRows = parquetQuery({ file, orderBy: 'nonexistent' })
|
||||
await expect(futureRows).rejects.toThrow('parquet columns not found: nonexistent')
|
||||
})
|
||||
|
||||
it('reads data with filter', async () => {
|
||||
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
|
||||
const rows = await parquetQuery({ file, filter: { c: 2 } })
|
||||
expect(toJson(rows)).toEqual([
|
||||
{ a: 'abc', b: 1, c: 2, d: true, e: [ 1, 2, 3 ] },
|
||||
{ a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] },
|
||||
])
|
||||
})
|
||||
|
||||
it('reads data with filter and rowStart/rowEnd', async () => {
|
||||
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
|
||||
const rows = await parquetQuery({ file, filter: { c: 2 }, rowStart: 1, rowEnd: 5 })
|
||||
expect(toJson(rows)).toEqual([ { a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] } ])
|
||||
})
|
||||
|
||||
it('reads data with filter and orderBy', async () => {
|
||||
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
|
||||
const rows = await parquetQuery({ file, filter: { c: 2 }, orderBy: 'b' })
|
||||
expect(toJson(rows)).toEqual([
|
||||
{ a: 'abc', b: 1, c: 2, d: true, e: [ 1, 2, 3 ] },
|
||||
{ a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] },
|
||||
])
|
||||
})
|
||||
|
||||
it('reads data with filter, orderBy, and rowStart/rowEnd', async () => {
|
||||
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
|
||||
const rows = await parquetQuery({ file, filter: { c: 2 }, orderBy: 'b', rowStart: 1, rowEnd: 2 })
|
||||
expect(toJson(rows)).toEqual([ { a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] } ])
|
||||
})
|
||||
|
||||
it('reads data with $and filter', async () => {
|
||||
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
|
||||
const rows = await parquetQuery({ file, filter: { $and: [{ c: 2 }, { e: [1, 2, 3] }] } })
|
||||
expect(toJson(rows)).toEqual([
|
||||
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
|
||||
])
|
||||
})
|
||||
|
||||
it('reads data with $or filter', async () => {
|
||||
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
|
||||
const rows = await parquetQuery({ file, filter: { $or: [{ c: 2 }, { d: false }] } })
|
||||
expect(toJson(rows)).toEqual([
|
||||
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
|
||||
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
|
||||
{ a: 'abc', b: 5, c: 2, d: true, e: [1, 2] },
|
||||
])
|
||||
})
|
||||
|
||||
it('reads data with $not filter', async () => {
|
||||
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
|
||||
const rows = await parquetQuery({ file, filter: { $not: { c: 2 } } })
|
||||
expect(toJson(rows)).toEqual([
|
||||
{ a: 'abc', b: 2, c: 3, d: true },
|
||||
{ a: 'abc', b: 3, c: 4, d: true },
|
||||
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
|
||||
])
|
||||
})
|
||||
|
||||
it('reads data with $not value filter', async () => {
|
||||
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
|
||||
const rows = await parquetQuery({ file, filter: { c: { $not: 2 } } })
|
||||
expect(toJson(rows)).toEqual([
|
||||
{ a: 'abc', b: 2, c: 3, d: true },
|
||||
{ a: 'abc', b: 3, c: 4, d: true },
|
||||
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
|
||||
])
|
||||
})
|
||||
|
||||
it('reads data with $gt filter', async () => {
|
||||
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
|
||||
const rows = await parquetQuery({ file, filter: { b: { $gt: 3 } } })
|
||||
expect(toJson(rows)).toEqual([
|
||||
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
|
||||
{ a: 'abc', b: 5, c: 2, d: true, e: [1, 2] },
|
||||
])
|
||||
})
|
||||
|
||||
|
||||
it('reads data with $gte filter', async () => {
|
||||
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
|
||||
const rows = await parquetQuery({ file, filter: { b: { $gte: 3 } } })
|
||||
expect(toJson(rows)).toEqual([
|
||||
{ a: 'abc', b: 3, c: 4, d: true },
|
||||
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
|
||||
{ a: 'abc', b: 5, c: 2, d: true, e: [1, 2] },
|
||||
])
|
||||
})
|
||||
|
||||
it('reads data with $lt filter', async () => {
|
||||
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
|
||||
const rows = await parquetQuery({ file, filter: { b: { $lt: 3 } } })
|
||||
expect(toJson(rows)).toEqual([
|
||||
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
|
||||
{ a: 'abc', b: 2, c: 3, d: true },
|
||||
])
|
||||
})
|
||||
|
||||
it('reads data with $lte filter', async () => {
|
||||
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
|
||||
const rows = await parquetQuery({ file, filter: { b: { $lte: 3 } } })
|
||||
expect(toJson(rows)).toEqual([
|
||||
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
|
||||
{ a: 'abc', b: 2, c: 3, d: true },
|
||||
{ a: 'abc', b: 3, c: 4, d: true },
|
||||
])
|
||||
})
|
||||
|
||||
it('reads data with $ne filter', async () => {
|
||||
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
|
||||
const rows = await parquetQuery({ file, filter: { b: { $ne: 3 } } })
|
||||
expect(toJson(rows)).toEqual([
|
||||
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
|
||||
{ a: 'abc', b: 2, c: 3, d: true },
|
||||
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
|
||||
{ a: 'abc', b: 5, c: 2, d: true, e: [1, 2] },
|
||||
])
|
||||
})
|
||||
|
||||
it('reads data with $in filter', async () => {
|
||||
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
|
||||
const rows = await parquetQuery({ file, filter: { b: { $in: [2, 4] } } })
|
||||
expect(toJson(rows)).toEqual([
|
||||
{ a: 'abc', b: 2, c: 3, d: true },
|
||||
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
|
||||
])
|
||||
})
|
||||
|
||||
it('reads data with $nin filter', async () => {
|
||||
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
|
||||
const rows = await parquetQuery({ file, filter: { b: { $nin: [2, 4] } } })
|
||||
expect(toJson(rows)).toEqual([
|
||||
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
|
||||
{ a: 'abc', b: 3, c: 4, d: true },
|
||||
{ a: 'abc', b: 5, c: 2, d: true, e: [1, 2] },
|
||||
])
|
||||
})
|
||||
})
|
||||
|
||||
Loading…
Reference in New Issue
Block a user