Query api

This commit is contained in:
Kenny Daniel 2024-09-14 21:12:30 -07:00
parent 9a2f4fdcba
commit 9d49dabc15
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
8 changed files with 184 additions and 12 deletions

@ -1,8 +1,8 @@
import HighTable, { DataFrame, sortableDataFrame } from 'hightable'
import HighTable, { DataFrame } from 'hightable'
import { compressors } from 'hyparquet-compressors'
import React, { useEffect, useState } from 'react'
import { parquetReadObjects } from '../src/hyparquet.js'
import { FileMetaData, parquetMetadataAsync, parquetSchema } from '../src/metadata.js'
import { parquetQuery } from '../src/query.js'
import type { AsyncBuffer } from '../src/types.js'
import { asyncBufferFromUrl } from '../src/utils.js'
import Dropdown from './Dropdown.js'
@ -52,10 +52,7 @@ export default function App({ url }: { url?: string }) {
setMetadata(metadata)
setName(name)
setByteLength(asyncBuffer.byteLength)
let df = parquetDataFrame(asyncBuffer, metadata)
if (df.numRows <= 10000) {
df = sortableDataFrame(df)
}
const df = parquetDataFrame(asyncBuffer, metadata)
setDf(df)
document.getElementById('welcome')?.remove()
}
@ -99,12 +96,14 @@ function parquetDataFrame(file: AsyncBuffer, metadata: FileMetaData): DataFrame
/**
* @param {number} rowStart
* @param {number} rowEnd
* @param {string} orderBy
* @returns {Promise<any[][]>}
*/
rows(rowStart, rowEnd) {
console.log(`reading rows ${rowStart}-${rowEnd}`)
return parquetReadObjects({ file, compressors, rowStart, rowEnd })
rows(rowStart, rowEnd, orderBy) {
console.log(`reading rows ${rowStart}-${rowEnd}`, orderBy)
return parquetQuery({ file, compressors, rowStart, rowEnd, orderBy })
},
sortable: true,
}
}

2
demo/bundle.min.js vendored

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

@ -40,7 +40,7 @@
"eslint": "8.57.0",
"eslint-plugin-import": "2.30.0",
"eslint-plugin-jsdoc": "50.2.4",
"hightable": "0.4.2",
"hightable": "0.4.3",
"http-server": "14.1.1",
"hyparquet-compressors": "0.1.4",
"react": "18.3.1",

11
src/hyparquet.d.ts vendored

@ -41,6 +41,17 @@ export function parquetRead(options: ParquetReadOptions): Promise<void>
*/
export function parquetReadObjects(options: ParquetReadOptions): Promise<Array<Record<string, any>>>
/**
* Wraps parquetRead with orderBy support.
* This is a parquet-aware query engine that can read a subset of rows and columns.
* Accepts an optional orderBy column name to sort the results.
* Note that using orderBy may SIGNIFICANTLY increase the query time.
*
* @param {ParquetReadOptions & { orderBy?: string }} options
* @returns {Promise<Record<string, any>[]>} resolves when all requested rows and columns are parsed
*/
export function parquetQuery(options: ParquetReadOptions & { orderBy?: string }): Promise<Array<Record<string, any>>>
/**
* Read parquet metadata from an async buffer.
*

@ -3,6 +3,8 @@ export { parquetMetadata, parquetMetadataAsync, parquetSchema } from './metadata
import { parquetRead } from './read.js'
export { parquetRead }
export { parquetQuery } from './query.js'
export { snappyUncompress } from './snappy.js'
export { asyncBufferFromFile, asyncBufferFromUrl, toJson } from './utils.js'

99
src/query.js Normal file

@ -0,0 +1,99 @@
import { parquetReadObjects } from './hyparquet.js'
import { parquetMetadataAsync } from './metadata.js'
/**
* Wraps parquetRead with orderBy support.
* This is a parquet-aware query engine that can read a subset of rows,
* with an optional orderBy clause.
*
* @typedef {import('./hyparquet.js').ParquetReadOptions} ParquetReadOptions
* @param {ParquetReadOptions & { orderBy?: string }} options
* @returns {Promise<Record<string, any>[]>}
*/
export async function parquetQuery(options) {
const { file, rowStart, rowEnd, orderBy } = options
options.metadata ||= await parquetMetadataAsync(file)
// TODO: Faster path for: no orderBy, no rowStart/rowEnd, one row group
if (typeof orderBy === 'string') {
// Fetch orderBy column first
const orderColumn = await parquetReadObjects({ ...options, rowStart: undefined, rowEnd: undefined, columns: [orderBy] })
// Compute row groups to fetch
const sortedIndices = Array.from(orderColumn, (_, index) => index)
.sort((a, b) => compare(orderColumn[a][orderBy], orderColumn[b][orderBy]))
.slice(rowStart, rowEnd)
const sparseData = await parquetReadRows({ ...options, rows: sortedIndices })
const data = sortedIndices.map(index => sparseData[index])
return data
} else {
return await parquetReadObjects(options)
}
}
/**
* Reads a list rows from a parquet file, reading only the row groups that contain the rows.
* Returns a sparse array of rows.
* @param {ParquetReadOptions & { rows: number[] }} options
* @returns {Promise<Record<string, any>[]>}
*/
async function parquetReadRows(options) {
const { file, rows } = options
options.metadata ||= await parquetMetadataAsync(file)
const { row_groups: rowGroups } = options.metadata
// Compute row groups to fetch
const groupIncluded = Array(rowGroups.length).fill(false)
let groupStart = 0
const groupEnds = rowGroups.map(group => groupStart += Number(group.num_rows))
for (const index of rows) {
const groupIndex = groupEnds.findIndex(end => index < end)
groupIncluded[groupIndex] = true
}
// Compute row ranges to fetch
const rowRanges = []
let rangeStart
groupStart = 0
for (let i = 0; i < groupIncluded.length; i++) {
const groupEnd = groupStart + Number(rowGroups[i].num_rows)
if (groupIncluded[i]) {
if (rangeStart === undefined) {
rangeStart = groupStart
}
} else {
if (rangeStart !== undefined) {
rowRanges.push([rangeStart, groupEnd])
rangeStart = undefined
}
}
groupStart = groupEnd
}
if (rangeStart !== undefined) {
rowRanges.push([rangeStart, groupStart])
}
// Fetch by row group and map to rows
const sparseData = new Array(Number(options.metadata.num_rows))
for (const [rangeStart, rangeEnd] of rowRanges) {
// TODO: fetch in parallel
const groupData = await parquetReadObjects({ ...options, rowStart: rangeStart, rowEnd: rangeEnd })
for (let i = rangeStart; i < rangeEnd; i++) {
sparseData[i] = groupData[i - rangeStart]
sparseData[i].__index__ = i
}
}
return sparseData
}
/**
* @param {any} a
* @param {any} b
* @returns {number}
*/
function compare(a, b) {
if (a < b) return -1
if (a > b) return 1
return 1 // TODO: how to handle nulls?
}

61
test/query.test.js Normal file

@ -0,0 +1,61 @@
import { describe, expect, it } from 'vitest'
import { parquetQuery } from '../src/query.js'
import { asyncBufferFromFile, toJson } from '../src/utils.js'
describe('parquetQuery', () => {
it('throws error for undefined file', async () => {
// @ts-expect-error testing invalid input
await expect(parquetQuery({ file: undefined }))
.rejects.toThrow('parquet file is required')
})
it('reads data without orderBy', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file })
expect(toJson(rows)).toEqual([
{ a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
{ a: 'abc', b: 2, c: 3, d: true },
{ a: 'abc', b: 3, c: 4, d: true },
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
{ a: 'abc', b: 5, c: 2, d: true, e: [1, 2] },
])
})
it('reads data with orderBy', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, orderBy: 'c' })
expect(toJson(rows)).toEqual([
{ __index__: 0, a: 'abc', b: 1, c: 2, d: true, e: [1, 2, 3] },
{ __index__: 4, a: 'abc', b: 5, c: 2, d: true, e: [1, 2] },
{ __index__: 1, a: 'abc', b: 2, c: 3, d: true },
{ __index__: 2, a: 'abc', b: 3, c: 4, d: true },
{ __index__: 3, a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
])
})
it('reads data with orderBy and limits', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, orderBy: 'c', rowStart: 1, rowEnd: 4 })
expect(toJson(rows)).toEqual([
{ __index__: 4, a: 'abc', b: 5, c: 2, d: true, e: [1, 2] },
{ __index__: 1, a: 'abc', b: 2, c: 3, d: true },
{ __index__: 2, a: 'abc', b: 3, c: 4, d: true },
])
})
it('reads data with rowStart and rowEnd without orderBy', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const rows = await parquetQuery({ file, rowStart: 1, rowEnd: 4 })
expect(toJson(rows)).toEqual([
{ a: 'abc', b: 2, c: 3, d: true },
{ a: 'abc', b: 3, c: 4, d: true },
{ a: null, b: 4, c: 5, d: false, e: [1, 2, 3] },
])
})
it('throws for invalid orderBy column', async () => {
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
const futureRows = parquetQuery({ file, orderBy: 'nonexistent' })
await expect(futureRows).rejects.toThrow('parquet columns not found: nonexistent')
})
})