hyparquet/demo/workers/parquetWorkerClient.ts

89 lines
2.7 KiB
TypeScript
Raw Normal View History

2024-10-16 08:09:18 +00:00
import { cachedAsyncBuffer } from '../../src/asyncBuffer.js'
2024-10-20 00:02:03 +00:00
import type { AsyncBuffer, ParquetReadOptions } from '../../src/hyparquet.js'
import { asyncBufferFromUrl } from '../../src/utils.js'
// Serializable constructors for AsyncBuffers
interface AsyncBufferFromFile {
file: File
byteLength: number
}
interface AsyncBufferFromUrl {
url: string
byteLength: number
}
export type AsyncBufferFrom = AsyncBufferFromFile | AsyncBufferFromUrl
// Same as ParquetReadOptions, but AsyncBufferFrom instead of AsyncBuffer
2024-10-20 00:02:03 +00:00
interface ParquetReadWorkerOptions extends Omit<ParquetReadOptions, 'file'> {
2024-10-29 06:03:31 +00:00
from: AsyncBufferFrom
2024-10-20 00:02:03 +00:00
orderBy?: string
}
let worker: Worker | undefined
2024-10-20 00:02:03 +00:00
let nextQueryId = 0
2024-10-29 06:03:31 +00:00
interface QueryAgent {
resolve: (value: any) => void
reject: (error: any) => void
onChunk?: (chunk: any) => void
}
const pending = new Map<number, QueryAgent>()
function getWorker() {
if (!worker) {
worker = new Worker(new URL('demo/workers/worker.min.js', import.meta.url))
worker.onmessage = ({ data }) => {
const { resolve, reject, onChunk } = pending.get(data.queryId)!
if (data.error) {
reject(data.error)
} else if (data.result) {
resolve(data.result)
} else if (data.chunk) {
onChunk?.(data.chunk)
} else {
reject(new Error('Unexpected message from worker'))
}
}
}
return worker
}
/**
* Presents almost the same interface as parquetRead, but runs in a worker.
* This is useful for reading large parquet files without blocking the main thread.
2024-10-29 06:03:31 +00:00
* Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs
* to be serialized to the worker.
*/
2024-10-20 00:02:03 +00:00
export function parquetQueryWorker(
2024-10-29 06:03:31 +00:00
{ metadata, from, rowStart, rowEnd, orderBy, onChunk }: ParquetReadWorkerOptions
): Promise<Record<string, any>[]> {
return new Promise((resolve, reject) => {
2024-10-20 00:02:03 +00:00
const queryId = nextQueryId++
2024-10-29 06:03:31 +00:00
pending.set(queryId, { resolve, reject, onChunk })
const worker = getWorker()
2024-10-20 00:02:03 +00:00
// If caller provided an onChunk callback, worker will send chunks as they are parsed
const chunks = onChunk !== undefined
worker.postMessage({
2024-10-29 06:03:31 +00:00
queryId, metadata, from, rowStart, rowEnd, orderBy, chunks,
2024-10-20 00:02:03 +00:00
})
})
}
/**
* Convert AsyncBufferFrom to AsyncBuffer.
*/
export async function asyncBufferFrom(from: AsyncBufferFrom): Promise<AsyncBuffer> {
2024-10-16 22:13:36 +00:00
if ('url' in from) {
// Cached asyncBuffer for urls only
const key = JSON.stringify(from)
const cached = cache.get(key)
if (cached) return cached
const asyncBuffer = asyncBufferFromUrl(from).then(cachedAsyncBuffer)
2024-10-16 22:13:36 +00:00
cache.set(key, asyncBuffer)
return asyncBuffer
} else {
return from.file.arrayBuffer()
}
}
2024-10-08 05:15:57 +00:00
const cache = new Map<string, Promise<AsyncBuffer>>()