diff --git a/src/query.js b/src/query.js index 5bb3f56..b6d1622 100644 --- a/src/query.js +++ b/src/query.js @@ -1,22 +1,31 @@ import { parquetReadObjects } from './hyparquet.js' import { parquetMetadataAsync } from './metadata.js' +import { equals } from './utils.js' /** - * Wraps parquetRead with orderBy support. + * Wraps parquetRead with filter and orderBy support. * This is a parquet-aware query engine that can read a subset of rows and columns. - * Accepts an optional orderBy column name to sort the results. + * Accepts optional filter object to filter the results and orderBy column name to sort the results. * Note that using orderBy may SIGNIFICANTLY increase the query time. * - * @param {ParquetReadOptions & { orderBy?: string }} options + * @import {ParquetQueryFilter} from '../src/types.d.ts' + * @param {ParquetReadOptions & { filter?: ParquetQueryFilter, orderBy?: string }} options * @returns {Promise[]>} resolves when all requested rows and columns are parsed */ export async function parquetQuery(options) { - const { file, rowStart, rowEnd, orderBy } = options + const { file, rowStart, rowEnd, orderBy, filter } = options options.metadata ||= await parquetMetadataAsync(file) // TODO: Faster path for: no orderBy, no rowStart/rowEnd, one row group - if (typeof orderBy === 'string') { + if (filter) { + // TODO: Move filter to parquetRead for performance + const results = await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined }) + return results + .filter(row => matchQuery(row, filter)) + .sort((a, b) => orderBy ? compare(a[orderBy], b[orderBy]) : 0) + .slice(rowStart, rowEnd) + } else if (typeof orderBy === 'string') { // Fetch orderBy column first const orderColumn = await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy] }) @@ -98,3 +107,57 @@ function compare(a, b) { if (a > b) return 1 return 1 // TODO: how to handle nulls? } + +/** + * Match a record against a query filter + * + * @param {any} record + * @param {ParquetQueryFilter} query + * @returns {boolean} + * @example matchQuery({ id: 1 }, { id: {$gte: 1} }) // true + */ +export function matchQuery(record, query = {}) { + + if (query.$not) { + return !matchQuery(record, query.$not) + } + + if (query.$and) { + return query.$and.every(subQuery => matchQuery(record, subQuery)) + } + + if (query.$or) { + return query.$or.some(subQuery => matchQuery(record, subQuery)) + } + + return Object.entries(query).every(([field, condition]) => { + const value = record[field] + + if (condition !== null && (Array.isArray(condition) || typeof condition !== 'object')) { + return equals(value, condition) + } + + return Object.entries(condition || {}).every(([operator, target]) => { + switch (operator) { + case '$gt': + return value > target + case '$gte': + return value >= target + case '$lt': + return value < target + case '$lte': + return value <= target + case '$ne': + return !equals(value, target) + case '$in': + return Array.isArray(target) && target.includes(value) + case '$nin': + return Array.isArray(target) && !target.includes(value) + case '$not': + return !matchQuery({ [field]: value }, { [field]: target }) + default: + return true + } + }) + }) +} diff --git a/src/types.d.ts b/src/types.d.ts index b8cf811..168ad88 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -350,3 +350,22 @@ export interface ParquetReadOptions { compressors?: Compressors // custom decompressors utf8?: boolean // decode byte arrays as utf8 strings (default true) } + +export type ParquetQueryValue = string | number | boolean | object | null | undefined + +export type ParquetQueryOperator = { + $gt?: ParquetQueryValue + $gte?: ParquetQueryValue + $lt?: ParquetQueryValue + $lte?: ParquetQueryValue + $ne?: ParquetQueryValue + $in?: ParquetQueryValue[] + $nin?: ParquetQueryValue[] +} + +export interface ParquetQueryFilter { + [key: string]: ParquetQueryValue | ParquetQueryOperator | ParquetQueryFilter[] | undefined + $and?: ParquetQueryFilter[] + $or?: ParquetQueryFilter[] + $not?: ParquetQueryFilter +} \ No newline at end of file diff --git a/src/utils.js b/src/utils.js index 10c65a2..1f9928e 100644 --- a/src/utils.js +++ b/src/utils.js @@ -37,6 +37,22 @@ export function concat(aaa, bbb) { } } +/** + * Deep equality comparison + * + * @param {any} a First object to compare + * @param {any} b Second object to compare + * @returns {boolean} true if objects are equal + */ +export function equals(a, b) { + if (a === b) return true + if (a instanceof Uint8Array && b instanceof Uint8Array) return equals(Array.from(a), Array.from(b)) + if (!a || !b || typeof a !== typeof b) return false + return Array.isArray(a) && Array.isArray(b) + ? a.length === b.length && a.every((v, i) => equals(v, b[i])) + : typeof a === 'object' && Object.keys(a).length === Object.keys(b).length && Object.keys(a).every(k => equals(a[k], b[k])) +} + /** * Get the byte length of a URL using a HEAD request. * If requestInit is provided, it will be passed to fetch. diff --git a/test/query.test.js b/test/query.test.js index 5679507..d5e50cd 100644 --- a/test/query.test.js +++ b/test/query.test.js @@ -58,4 +58,141 @@ describe('parquetQuery', () => { const futureRows = parquetQuery({ file, orderBy: 'nonexistent' }) await expect(futureRows).rejects.toThrow('parquet columns not found: nonexistent') }) + + it('reads data with filter', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { c: 2 } }) + expect(toJson(rows)).toEqual([ + { a: 'abc', b: 1, c: 2, d: true, e: [ 1, 2, 3 ] }, + { a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] }, + ]) + }) + + it('reads data with filter and rowStart/rowEnd', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { c: 2 }, rowStart: 1, rowEnd: 5 }) + expect(toJson(rows)).toEqual([ { a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] } ]) + }) + + it('reads data with filter and orderBy', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { c: 2 }, orderBy: 'b' }) + expect(toJson(rows)).toEqual([ + { a: 'abc', b: 1, c: 2, d: true, e: [ 1, 2, 3 ] }, + { a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] }, + ]) + }) + + it('reads data with filter, orderBy, and rowStart/rowEnd', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { c: 2 }, orderBy: 'b', rowStart: 1, rowEnd: 2 }) + expect(toJson(rows)).toEqual([ { a: 'abc', b: 5, c: 2, d: true, e: [ 1, 2 ] } ]) + }) + + it('reads data with $and filter', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { $and: [{ c: 2 }, { e: [1, 2, 3] }] } }) + expect(toJson(rows)).toEqual([ + { a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] }, + ]) + }) + + it('reads data with $or filter', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { $or: [{ c: 2 }, { d: false }] } }) + expect(toJson(rows)).toEqual([ + { a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] }, + { a: null, b: 4, c: 5, d: false, e: [1, 2, 3] }, + { a: 'abc', b: 5, c: 2, d: true, e: [1, 2] }, + ]) + }) + + it('reads data with $not filter', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { $not: { c: 2 } } }) + expect(toJson(rows)).toEqual([ + { a: 'abc', b: 2, c: 3, d: true }, + { a: 'abc', b: 3, c: 4, d: true }, + { a: null, b: 4, c: 5, d: false, e: [1, 2, 3] }, + ]) + }) + + it('reads data with $not value filter', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { c: { $not: 2 } } }) + expect(toJson(rows)).toEqual([ + { a: 'abc', b: 2, c: 3, d: true }, + { a: 'abc', b: 3, c: 4, d: true }, + { a: null, b: 4, c: 5, d: false, e: [1, 2, 3] }, + ]) + }) + + it('reads data with $gt filter', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { b: { $gt: 3 } } }) + expect(toJson(rows)).toEqual([ + { a: null, b: 4, c: 5, d: false, e: [1, 2, 3] }, + { a: 'abc', b: 5, c: 2, d: true, e: [1, 2] }, + ]) + }) + + + it('reads data with $gte filter', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { b: { $gte: 3 } } }) + expect(toJson(rows)).toEqual([ + { a: 'abc', b: 3, c: 4, d: true }, + { a: null, b: 4, c: 5, d: false, e: [1, 2, 3] }, + { a: 'abc', b: 5, c: 2, d: true, e: [1, 2] }, + ]) + }) + + it('reads data with $lt filter', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { b: { $lt: 3 } } }) + expect(toJson(rows)).toEqual([ + { a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] }, + { a: 'abc', b: 2, c: 3, d: true }, + ]) + }) + + it('reads data with $lte filter', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { b: { $lte: 3 } } }) + expect(toJson(rows)).toEqual([ + { a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] }, + { a: 'abc', b: 2, c: 3, d: true }, + { a: 'abc', b: 3, c: 4, d: true }, + ]) + }) + + it('reads data with $ne filter', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { b: { $ne: 3 } } }) + expect(toJson(rows)).toEqual([ + { a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] }, + { a: 'abc', b: 2, c: 3, d: true }, + { a: null, b: 4, c: 5, d: false, e: [1, 2, 3] }, + { a: 'abc', b: 5, c: 2, d: true, e: [1, 2] }, + ]) + }) + + it('reads data with $in filter', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { b: { $in: [2, 4] } } }) + expect(toJson(rows)).toEqual([ + { a: 'abc', b: 2, c: 3, d: true }, + { a: null, b: 4, c: 5, d: false, e: [1, 2, 3] }, + ]) + }) + + it('reads data with $nin filter', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + const rows = await parquetQuery({ file, filter: { b: { $nin: [2, 4] } } }) + expect(toJson(rows)).toEqual([ + { a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] }, + { a: 'abc', b: 3, c: 4, d: true }, + { a: 'abc', b: 5, c: 2, d: true, e: [1, 2] }, + ]) + }) })