Never copy data

This commit is contained in:
Kenny Daniel 2024-02-09 13:44:35 -08:00
parent 2e11ab275d
commit 961b92650c
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
9 changed files with 26 additions and 98 deletions

@ -80,10 +80,10 @@ function processFile(file) {
const metadata = parquetMetadata(arrayBuffer)
renderSidebar(arrayBuffer, metadata, file.name)
const startTime = performance.now()
parquetRead({ file: arrayBuffer, onComplete(data) {
const ms = performance.now() - startTime
console.log(`parsed ${file.name} in ${ms.toFixed(0)} ms`)
} }) // TODO
// parquetRead({ file: arrayBuffer, onComplete(data) {
// const ms = performance.now() - startTime
// console.log(`parsed ${file.name} in ${ms.toFixed(0)} ms`)
// } }) // TODO
} catch (e) {
console.error('Error parsing file', e)
dropzone.innerHTML = `<strong>${file.name}</strong>`

@ -1,30 +0,0 @@
/**
* Return an offset view into an existing array buffer.
* If slice is called on data outside the original array buffer, an error is thrown.
*
* This is useful for pre-loading a section of a file into memory,
* then reading slices from it, but indexed relative to the original file.
*
* @typedef {import('./types.js').ArrayBufferLike} ArrayBufferLike
* @param {ArrayBuffer} arrayBuffer array buffer to place at offset
* @param {number} offset offset in bytes
* @returns {ArrayBufferLike} array buffer view with offset
*/
export function offsetArrayBuffer(arrayBuffer, offset) {
if (offset < 0) throw new Error(`offset must be positive ${offset}`)
return {
byteLength: offset + arrayBuffer.byteLength,
slice(start, end) {
if (start < offset || start > offset + arrayBuffer.byteLength) {
throw new Error(`start out of bounds: ${start} not in ${offset}..${offset + arrayBuffer.byteLength}`)
}
if (end) {
if (end < offset || end > offset + arrayBuffer.byteLength) {
throw new Error(`end out of bounds: ${end} not in ${offset}..${offset + arrayBuffer.byteLength}`)
}
end -= offset
}
return arrayBuffer.slice(start - offset, end)
},
}
}

@ -16,16 +16,14 @@ const dayMillis = 86400000000000 // 1 day in milliseconds
/**
* Read a column from the file.
*
* @param {ArrayBufferLike} arrayBuffer parquet file contents
* @param {ArrayBuffer} arrayBuffer parquet file contents
* @param {number} columnOffset offset to start reading from
* @param {RowGroup} rowGroup row group metadata
* @param {ColumnMetaData} columnMetadata column metadata
* @param {SchemaElement[]} schema schema for the file
* @returns {ArrayLike<any>} array of values
*/
export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) {
// find start of column data
const columnOffset = getColumnOffset(columnMetadata)
export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schema) {
// parse column data
let valuesSeen = 0
let byteOffset = 0 // byteOffset within the column
@ -42,10 +40,10 @@ export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) {
}
// read compressed_page_size bytes starting at offset
const compressedBytes = new Uint8Array(arrayBuffer.slice(
const compressedBytes = new Uint8Array(arrayBuffer).subarray(
columnOffset + byteOffset,
columnOffset + byteOffset + header.compressed_page_size
))
)
// decompress bytes
/** @type {Uint8Array | undefined} */
let page

@ -13,13 +13,12 @@ import { deserializeTCompactProtocol } from './thrift.js'
*
* @typedef {import("./types.d.ts").ArrayBufferLike} ArrayBufferLike
* @typedef {import("./types.d.ts").PageHeader} PageHeader
* @param {ArrayBufferLike} arrayBuffer parquet file contents
* @param {ArrayBuffer} arrayBuffer parquet file contents
* @param {number} offset offset to start reading from
* @returns {Decoded<PageHeader>} metadata object and bytes read
*/
export function parquetHeader(arrayBuffer, offset) {
const headerBuffer = arrayBuffer.slice(offset)
const { value: header, byteLength } = deserializeTCompactProtocol(headerBuffer)
const { value: header, byteLength } = deserializeTCompactProtocol(arrayBuffer, offset)
// Parse parquet header from thrift data
const type = header.field_1

@ -90,8 +90,7 @@ export function parquetMetadata(arrayBuffer) {
}
const metadataOffset = metadataLengthOffset - metadataLength
const metadataBuffer = view.buffer.slice(metadataOffset, metadataLengthOffset)
const { value: metadata } = deserializeTCompactProtocol(metadataBuffer)
const { value: metadata } = deserializeTCompactProtocol(view.buffer, view.byteOffset + metadataOffset)
// Parse parquet metadata from thrift data
const version = metadata.field_1

@ -1,5 +1,4 @@
import { offsetArrayBuffer } from './asyncbuffer.js'
import { getColumnOffset, readColumn } from './column.js'
import { parquetMetadataAsync } from './metadata.js'
@ -94,11 +93,11 @@ async function readRowGroup(options, rowGroup) {
throw new Error('parquet missing row group metadata')
}
// if row group size is less than 128mb, pre-load in one read
let groupBuffer = undefined
let groupBuffer
if (groupEndByte - groupStartByte <= 1 << 27) {
// pre-load row group byte data in one big read,
// otherwise read column data individually
groupBuffer = offsetArrayBuffer(await file.slice(groupStartByte, groupEndByte), groupStartByte)
groupBuffer = await file.slice(groupStartByte, groupEndByte)
}
/** @type {any[][]} */
@ -122,18 +121,18 @@ async function readRowGroup(options, rowGroup) {
// use pre-loaded row group byte data if available, else read column data
let buffer
if (!groupBuffer) {
buffer = file.slice(columnStartByte, columnEndByte).then(arrayBuffer => {
return offsetArrayBuffer(arrayBuffer, columnStartByte)
})
} else {
let bufferOffset = 0
if (groupBuffer) {
buffer = Promise.resolve(groupBuffer)
bufferOffset = columnStartByte - groupStartByte
} else {
buffer = file.slice(columnStartByte, columnEndByte)
}
// read column data async
promises.push(buffer.then(arrayBuffer => {
// TODO: extract SchemaElement for this column
const columnData = readColumn(arrayBuffer, rowGroup, columnMetadata, metadata.schema)
const columnData = readColumn(arrayBuffer, bufferOffset, rowGroup, columnMetadata, metadata.schema)
if (columnData.length !== Number(rowGroup.num_rows)) {
throw new Error('parquet column length does not match row group length')
}

@ -22,16 +22,17 @@ const CompactType = {
* @typedef {import("./types.d.ts").Decoded<T>} Decoded
* @template T
* @param {ArrayBuffer} arrayBuffer
* @param {number} byteOffset offset into the buffer
* @returns {Decoded<Record<string, any>>}
*/
export function deserializeTCompactProtocol(arrayBuffer) {
const view = new DataView(arrayBuffer)
export function deserializeTCompactProtocol(arrayBuffer, byteOffset) {
const view = new DataView(arrayBuffer, byteOffset)
let byteLength = 0
let lastFid = 0
/** @type {Record<string, any>} */
const value = {}
while (byteLength < arrayBuffer.byteLength) {
while (byteLength < arrayBuffer.byteLength - byteOffset) {
// Parse each field based on its type and add to the result object
const [type, fid, newIndex, newLastFid] = readFieldBegin(view, byteLength, lastFid)
byteLength = newIndex
@ -77,7 +78,7 @@ function readElement(view, type, index) {
case CompactType.BINARY: {
// strings are encoded as utf-8, no \0 delimiter
const [stringLength, stringIndex] = readVarInt(view, index)
const strBytes = new Uint8Array(view.buffer, stringIndex, stringLength)
const strBytes = new Uint8Array(view.buffer, view.byteOffset + stringIndex, stringLength)
return [new TextDecoder().decode(strBytes), stringIndex + stringLength]
}
case CompactType.LIST: {

@ -1,38 +0,0 @@
import { describe, expect, it } from 'vitest'
import { offsetArrayBuffer } from '../src/asyncbuffer.js'
describe('offsetArrayBuffer', () => {
it('creates a valid offset array buffer', () => {
const buffer = new ArrayBuffer(10)
const offsetBuffer = offsetArrayBuffer(buffer, 5)
expect(offsetBuffer.byteLength).toBe(15)
})
it('correctly slices the array buffer with offset', () => {
const buffer = new ArrayBuffer(10)
const offsetBuffer = offsetArrayBuffer(buffer, 5)
const view = new Uint8Array(buffer)
for (let i = 0; i < view.length; i++) {
view[i] = i // Populate the buffer with data [0, 1, 2, ...]
}
const slicedBuffer = offsetBuffer.slice(5, 10) // This should give us [0, 1, 2, 3, 4] from the original buffer
const slicedView = new Uint8Array(slicedBuffer)
for (let i = 0; i < slicedView.length; i++) {
expect(slicedView[i]).toBe(i) // Each item should match its index
}
})
it('throws error for negative offset', () => {
const buffer = new ArrayBuffer(10)
expect(() => offsetArrayBuffer(buffer, -5)).toThrow('offset must be positive')
})
it('throws error for out of bounds slice', () => {
const buffer = new ArrayBuffer(10)
const offsetBuffer = offsetArrayBuffer(buffer, 5)
expect(() => offsetBuffer.slice(3, 7)).toThrow('start out of bounds')
expect(() => offsetBuffer.slice(5, 20)).toThrow('end out of bounds')
})
})

@ -63,7 +63,7 @@ describe('deserializeTCompactProtocol function', () => {
// Mark the end of the structure
view.setUint8(index, 0x00) // STOP field
const { byteLength, value } = deserializeTCompactProtocol(buffer)
const { byteLength, value } = deserializeTCompactProtocol(buffer, 0)
expect(byteLength).toBe(index + 1)
// Assertions for each basic type