demo: use web worker for parquet parsing to avoid blocking main thread

This commit is contained in:
Kenny Daniel 2024-09-25 01:59:21 -07:00
parent 4f1b0bb74d
commit e6301a8bc8
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
11 changed files with 250 additions and 45 deletions

@ -6,7 +6,7 @@
[![workflow status](https://github.com/hyparam/hyparquet/actions/workflows/ci.yml/badge.svg)](https://github.com/hyparam/hyparquet/actions)
[![mit license](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![dependencies](https://img.shields.io/badge/Dependencies-0-blueviolet)](https://www.npmjs.com/package/hyparquet?activeTab=dependencies)
![coverage](https://img.shields.io/badge/Coverage-95-darkred)
![coverage](https://img.shields.io/badge/Coverage-96-darkred)
Dependency free since 2023!

@ -1,15 +1,13 @@
import HighTable, { DataFrame } from 'hightable'
import { compressors } from 'hyparquet-compressors'
import React, { useEffect, useState } from 'react'
import { FileMetaData, parquetMetadataAsync, parquetSchema } from '../src/metadata.js'
import { parquetQuery } from '../src/query.js'
import type { AsyncBuffer } from '../src/types.js'
import { asyncBufferFromUrl } from '../src/utils.js'
import { byteLengthFromUrl } from '../src/utils.js'
import Dropdown from './Dropdown.js'
import Dropzone from './Dropzone.js'
import Layout from './Layout.js'
import ParquetLayout from './ParquetLayout.js'
import ParquetMetadata from './ParquetMetadata.js'
import { AsyncBufferFrom, asyncBufferFrom, parquetQueryWorker } from './workers/parquetWorkerClient.js'
type Lens = 'table' | 'metadata' | 'layout'
@ -30,29 +28,35 @@ export default function App({ url }: { url?: string }) {
useEffect(() => {
if (!df && url) {
asyncBufferFromUrl(url).then(asyncBuffer => setAsyncBuffer(url, asyncBuffer))
onUrlDrop(url)
}
}, [ url ])
async function onFileDrop(file: File) {
// Clear query string
history.pushState({}, '', location.pathname)
setAsyncBuffer(file.name, await file.arrayBuffer())
setAsyncBuffer(file.name, { file, byteLength: file.size })
}
async function onUrlDrop(url: string) {
// Add key=url to query string
const params = new URLSearchParams(location.search)
params.set('key', url)
history.pushState({}, '', `${location.pathname}?${params}`)
setAsyncBuffer(url, await asyncBufferFromUrl(url))
try {
const byteLength = await byteLengthFromUrl(url)
setAsyncBuffer(url, { url, byteLength })
} catch (e) {
setError(e as Error)
}
}
async function setAsyncBuffer(name: string, asyncBuffer: AsyncBuffer) {
async function setAsyncBuffer(name: string, from: AsyncBufferFrom) {
// TODO: Replace welcome with spinner
const asyncBuffer = await asyncBufferFrom(from)
const metadata = await parquetMetadataAsync(asyncBuffer)
setMetadata(metadata)
setName(name)
setByteLength(asyncBuffer.byteLength)
const df = parquetDataFrame(asyncBuffer, metadata)
setByteLength(from.byteLength)
const df = parquetDataFrame(from, metadata)
setDf(df)
document.getElementById('welcome')?.remove()
}
@ -83,12 +87,8 @@ export default function App({ url }: { url?: string }) {
/**
* Convert a parquet file into a dataframe.
*
* @param {AsyncBuffer} file - parquet file asyncbuffer
* @param {FileMetaData} metadata - parquet file metadata
* @returns {DataFrame} dataframe
*/
function parquetDataFrame(file: AsyncBuffer, metadata: FileMetaData): DataFrame {
function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): DataFrame {
const { children } = parquetSchema(metadata)
return {
header: children.map(child => child.element.name),
@ -101,7 +101,7 @@ function parquetDataFrame(file: AsyncBuffer, metadata: FileMetaData): DataFrame
*/
rows(rowStart, rowEnd, orderBy) {
console.log(`reading rows ${rowStart}-${rowEnd}`, orderBy)
return parquetQuery({ file, compressors, rowStart, rowEnd, orderBy })
return parquetQueryWorker({ asyncBuffer: from, rowStart, rowEnd, orderBy })
},
sortable: true,
}

4
demo/bundle.min.js vendored

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

@ -0,0 +1,16 @@
import { compressors } from 'hyparquet-compressors'
import { parquetQuery } from '../../src/query.js'
import { asyncBufferFrom } from './parquetWorkerClient.js'
self.onmessage = async ({ data }) => {
const { metadata, asyncBuffer, rowStart, rowEnd, orderBy } = data
const file = await asyncBufferFrom(asyncBuffer)
try {
const result = await parquetQuery({
metadata, file, rowStart, rowEnd, orderBy, compressors,
})
self.postMessage({ result })
} catch (error) {
self.postMessage({ error })
}
}

@ -0,0 +1,64 @@
import type { AsyncBuffer, FileMetaData } from '../../src/hyparquet.js'
import { asyncBufferFromUrl } from '../../src/utils.js'
// Serializable constructors for AsyncBuffers
interface AsyncBufferFromFile {
file: File
byteLength: number
}
interface AsyncBufferFromUrl {
url: string
byteLength: number
}
export type AsyncBufferFrom = AsyncBufferFromFile | AsyncBufferFromUrl
// Same as ParquetReadOptions, but AsyncBufferFrom instead of AsyncBuffer
interface ParquetReadWorkerOptions {
asyncBuffer: AsyncBufferFrom
metadata?: FileMetaData // parquet metadata, will be parsed if not provided
columns?: number[] // columns to read, all columns if undefined
rowStart?: number // inclusive
rowEnd?: number // exclusive
orderBy?: string // column to sort by
}
let worker: Worker | undefined
/**
* Presents almost the same interface as parquetRead, but runs in a worker.
* This is useful for reading large parquet files without blocking the main thread.
* Instead of taking an AsyncBuffer, it takes a FileContent, because it needs
* to be serialized to the worker.
*/
export function parquetQueryWorker({
metadata, asyncBuffer, rowStart, rowEnd, orderBy }: ParquetReadWorkerOptions
): Promise<Record<string, any>[]> {
return new Promise((resolve, reject) => {
// Create a worker
if (!worker) {
worker = new Worker(new URL('demo/workers/worker.min.js', import.meta.url))
}
worker.onmessage = ({ data }) => {
// Convert postmessage data to callbacks
if (data.error) {
reject(data.error)
} else if (data.result) {
resolve(data.result)
} else {
reject(new Error('Unexpected message from worker'))
}
}
worker.postMessage({ metadata, asyncBuffer, rowStart, rowEnd, orderBy })
})
}
/**
* Convert AsyncBufferFrom to AsyncBuffer.
*/
export async function asyncBufferFrom(from: AsyncBufferFrom): Promise<AsyncBuffer> {
if ('url' in from) {
return asyncBufferFromUrl(from.url)
} else {
return from.file.arrayBuffer()
}
}

2
demo/workers/worker.min.js vendored Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

@ -4,21 +4,39 @@ import replace from '@rollup/plugin-replace'
import terser from '@rollup/plugin-terser'
import typescript from '@rollup/plugin-typescript'
export default {
input: 'demo/demo.js',
output: {
file: 'demo/bundle.min.js',
format: 'umd',
sourcemap: true,
export default [
// demo bundle
{
input: 'demo/demo.js',
output: {
file: 'demo/bundle.min.js',
format: 'umd',
sourcemap: true,
},
plugins: [
commonjs(),
replace({
'process.env.NODE_ENV': JSON.stringify('production'), // or 'development' based on your build environment
preventAssignment: true,
}),
resolve({ browser: true }),
terser(),
typescript(),
],
},
plugins: [
commonjs(),
replace({
'process.env.NODE_ENV': JSON.stringify('production'), // or 'development' based on your build environment
preventAssignment: true,
}),
resolve({ browser: true }),
terser(),
typescript(),
],
}
// web worker
{
input: 'demo/workers/parquetWorker.js',
output: {
file: 'demo/workers/worker.min.js',
format: 'umd',
sourcemap: true,
},
plugins: [
commonjs(),
resolve({ browser: true }),
terser(),
typescript(),
],
},
]

@ -37,21 +37,32 @@ export function concat(aaa, bbb) {
}
/**
* Construct an AsyncBuffer for a URL.
* Get the byte length of a URL using a HEAD request.
*
* @typedef {import('./types.js').AsyncBuffer} AsyncBuffer
* @param {string} url
* @returns {Promise<AsyncBuffer>}
* @returns {Promise<number>}
*/
export async function asyncBufferFromUrl(url) {
// byte length from HEAD request
const byteLength = await fetch(url, { method: 'HEAD' })
export async function byteLengthFromUrl(url) {
return await fetch(url, { method: 'HEAD' })
.then(res => {
if (!res.ok) throw new Error(`fetch head failed ${res.status}`)
const length = res.headers.get('Content-Length')
if (!length) throw new Error('missing content length')
return parseInt(length)
})
}
/**
* Construct an AsyncBuffer for a URL.
*
* @typedef {import('./types.js').AsyncBuffer} AsyncBuffer
* @param {string} url
* @param {number} [byteLength]
* @returns {Promise<AsyncBuffer>}
*/
export async function asyncBufferFromUrl(url, byteLength) {
// byte length from HEAD request
byteLength ||= await byteLengthFromUrl(url)
return {
byteLength,
async slice(start, end) {

@ -1,5 +1,5 @@
import { describe, expect, it } from 'vitest'
import { toJson } from '../src/utils.js'
import { describe, expect, it, vi } from 'vitest'
import { asyncBufferFromUrl, byteLengthFromUrl, toJson } from '../src/utils.js'
describe('toJson', () => {
it('convert undefined to null', () => {
@ -37,3 +37,96 @@ describe('toJson', () => {
expect(toJson(true)).toBe(true)
})
})
describe('byteLengthFromUrl', () => {
it('returns the byte length from Content-Length header', async () => {
global.fetch = vi.fn().mockResolvedValue({
ok: true,
headers: new Map([['Content-Length', '1024']]),
})
const result = await byteLengthFromUrl('https://example.com')
expect(result).toBe(1024)
expect(fetch).toHaveBeenCalledWith('https://example.com', { method: 'HEAD' })
})
it('throws an error if the response is not ok', async () => {
global.fetch = vi.fn().mockResolvedValue({
ok: false,
status: 404,
})
await expect(byteLengthFromUrl('https://example.com')).rejects.toThrow('fetch head failed 404')
})
it('throws an error if Content-Length header is missing', async () => {
global.fetch = vi.fn().mockResolvedValue({
ok: true,
headers: new Map(),
})
await expect(byteLengthFromUrl('https://example.com')).rejects.toThrow('missing content length')
})
})
describe('asyncBufferFromUrl', () => {
it('creates an AsyncBuffer with the correct byte length', async () => {
global.fetch = vi.fn()
.mockResolvedValueOnce({
ok: true,
headers: new Map([['Content-Length', '1024']]),
})
const buffer = await asyncBufferFromUrl('https://example.com')
expect(buffer.byteLength).toBe(1024)
})
it('uses provided byte length if given', async () => {
const buffer = await asyncBufferFromUrl('https://example.com', 2048)
expect(buffer.byteLength).toBe(2048)
expect(fetch).toHaveBeenCalledOnce()
})
it('slice method fetches correct byte range', async () => {
const mockArrayBuffer = new ArrayBuffer(100)
global.fetch = vi.fn().mockResolvedValue({
ok: true,
body: {},
arrayBuffer: () => Promise.resolve(mockArrayBuffer),
})
const buffer = await asyncBufferFromUrl('https://example.com', 1024)
const result = await buffer.slice(0, 100)
expect(result).toBe(mockArrayBuffer)
expect(fetch).toHaveBeenCalledWith('https://example.com', {
headers: new Headers({ Range: 'bytes=0-99' }),
})
})
it('slice method handles undefined end parameter', async () => {
const mockArrayBuffer = new ArrayBuffer(100)
global.fetch = vi.fn().mockResolvedValue({
ok: true,
body: {},
arrayBuffer: () => Promise.resolve(mockArrayBuffer),
})
const buffer = await asyncBufferFromUrl('https://example.com', 1024)
await buffer.slice(100)
expect(fetch).toHaveBeenCalledWith('https://example.com', {
headers: new Headers({ Range: 'bytes=100-' }),
})
})
it('slice method throws an error if fetch fails', async () => {
global.fetch = vi.fn().mockResolvedValue({
ok: false,
status: 404,
})
const buffer = await asyncBufferFromUrl('https://example.com', 1024)
await expect(buffer.slice(0, 100)).rejects.toThrow('fetch failed 404')
})
})