demo: upgrade hightable

This commit is contained in:
Kenny Daniel 2024-10-28 23:03:31 -07:00
parent d8cc46b915
commit ecacb2b03b
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
8 changed files with 46 additions and 37 deletions

@ -102,7 +102,7 @@ function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): DataFr
*/
rows(rowStart, rowEnd, orderBy) {
console.log(`reading rows ${rowStart}-${rowEnd}`, orderBy)
return parquetQueryWorker({ asyncBuffer: from, metadata, rowStart, rowEnd, orderBy })
return parquetQueryWorker({ from, metadata, rowStart, rowEnd, orderBy })
},
sortable: true,
}

2
demo/bundle.min.js vendored

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

@ -3,8 +3,8 @@ import { parquetQuery } from '../../src/query.js'
import { asyncBufferFrom } from './parquetWorkerClient.js'
self.onmessage = async ({ data }) => {
const { metadata, asyncBuffer, rowStart, rowEnd, orderBy, queryId, chunks } = data
const file = await asyncBufferFrom(asyncBuffer)
const { metadata, from, rowStart, rowEnd, orderBy, columns, queryId, chunks } = data
const file = await asyncBufferFrom(from)
/**
* @typedef {import('../../src/hyparquet.js').ColumnData} ColumnData
* @type {((chunk: ColumnData) => void) | undefined}
@ -12,7 +12,7 @@ self.onmessage = async ({ data }) => {
const onChunk = chunks ? chunk => self.postMessage({ chunk, queryId }) : undefined
try {
const result = await parquetQuery({
metadata, file, rowStart, rowEnd, orderBy, compressors, onChunk,
metadata, file, rowStart, rowEnd, orderBy, columns, compressors, onChunk,
})
self.postMessage({ result, queryId })
} catch (error) {

@ -15,47 +15,56 @@ export type AsyncBufferFrom = AsyncBufferFromFile | AsyncBufferFromUrl
// Same as ParquetReadOptions, but AsyncBufferFrom instead of AsyncBuffer
interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'file'> {
asyncBuffer: AsyncBufferFrom
from: AsyncBufferFrom
orderBy?: string
}
let worker: Worker | undefined
let nextQueryId = 0
const pending = new Map<number, { resolve: (value: any) => void, reject: (error: any) => void }>()
interface QueryAgent {
resolve: (value: any) => void
reject: (error: any) => void
onChunk?: (chunk: any) => void
}
const pending = new Map<number, QueryAgent>()
function getWorker() {
if (!worker) {
worker = new Worker(new URL('demo/workers/worker.min.js', import.meta.url))
worker.onmessage = ({ data }) => {
const { resolve, reject, onChunk } = pending.get(data.queryId)!
if (data.error) {
reject(data.error)
} else if (data.result) {
resolve(data.result)
} else if (data.chunk) {
onChunk?.(data.chunk)
} else {
reject(new Error('Unexpected message from worker'))
}
}
}
return worker
}
/**
* Presents almost the same interface as parquetRead, but runs in a worker.
* This is useful for reading large parquet files without blocking the main thread.
* Instead of taking an AsyncBuffer, it takes a FileContent, because it needs
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
* to be serialized to the worker.
*/
export function parquetQueryWorker(
{ metadata, asyncBuffer, rowStart, rowEnd, orderBy, onChunk }: ParquetReadWorkerOptions
{ metadata, from, rowStart, rowEnd, orderBy, onChunk }: ParquetReadWorkerOptions
): Promise<Record<string, any>[]> {
return new Promise((resolve, reject) => {
const queryId = nextQueryId++
pending.set(queryId, { resolve, reject })
// Create a worker
if (!worker) {
worker = new Worker(new URL('demo/workers/worker.min.js', import.meta.url))
worker.onmessage = ({ data }) => {
const { resolve, reject } = pending.get(data.queryId)!
// Convert postmessage data to callbacks
if (data.error) {
reject(data.error)
} else if (data.result) {
resolve(data.result)
} else if (data.chunk) {
onChunk?.(data.chunk)
} else {
reject(new Error('Unexpected message from worker'))
}
}
}
pending.set(queryId, { resolve, reject, onChunk })
const worker = getWorker()
// If caller provided an onChunk callback, worker will send chunks as they are parsed
const chunks = onChunk !== undefined
worker.postMessage({
queryId, metadata, asyncBuffer, rowStart, rowEnd, orderBy, chunks
queryId, metadata, from, rowStart, rowEnd, orderBy, chunks,
})
})
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

@ -32,20 +32,20 @@
"@rollup/plugin-replace": "6.0.1",
"@rollup/plugin-terser": "0.4.4",
"@rollup/plugin-typescript": "12.1.1",
"@types/node": "22.7.9",
"@types/node": "22.8.2",
"@types/react": "18.3.12",
"@types/react-dom": "18.3.1",
"@vitest/coverage-v8": "2.1.3",
"@vitest/coverage-v8": "2.1.4",
"eslint": "9.13.0",
"eslint-plugin-jsdoc": "50.4.3",
"hightable": "0.6.0",
"hightable": "0.6.3",
"http-server": "14.1.1",
"hyparquet-compressors": "0.1.4",
"react": "18.3.1",
"react-dom": "18.3.1",
"rollup": "4.24.0",
"rollup": "4.24.2",
"typescript": "5.6.3",
"typescript-eslint": "8.11.0",
"vitest": "2.1.3"
"typescript-eslint": "8.12.1",
"vitest": "2.1.4"
}
}