diff --git a/demo.js b/demo.js index dbc2cfe..1797dd7 100644 --- a/demo.js +++ b/demo.js @@ -80,10 +80,10 @@ function processFile(file) { const metadata = parquetMetadata(arrayBuffer) renderSidebar(arrayBuffer, metadata, file.name) const startTime = performance.now() - parquetRead({ file: arrayBuffer, onComplete(data) { - const ms = performance.now() - startTime - console.log(`parsed ${file.name} in ${ms.toFixed(0)} ms`) - } }) // TODO + // parquetRead({ file: arrayBuffer, onComplete(data) { + // const ms = performance.now() - startTime + // console.log(`parsed ${file.name} in ${ms.toFixed(0)} ms`) + // } }) // TODO } catch (e) { console.error('Error parsing file', e) dropzone.innerHTML = `${file.name}` diff --git a/src/asyncbuffer.js b/src/asyncbuffer.js deleted file mode 100644 index a69dd66..0000000 --- a/src/asyncbuffer.js +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Return an offset view into an existing array buffer. - * If slice is called on data outside the original array buffer, an error is thrown. - * - * This is useful for pre-loading a section of a file into memory, - * then reading slices from it, but indexed relative to the original file. - * - * @typedef {import('./types.js').ArrayBufferLike} ArrayBufferLike - * @param {ArrayBuffer} arrayBuffer array buffer to place at offset - * @param {number} offset offset in bytes - * @returns {ArrayBufferLike} array buffer view with offset - */ -export function offsetArrayBuffer(arrayBuffer, offset) { - if (offset < 0) throw new Error(`offset must be positive ${offset}`) - return { - byteLength: offset + arrayBuffer.byteLength, - slice(start, end) { - if (start < offset || start > offset + arrayBuffer.byteLength) { - throw new Error(`start out of bounds: ${start} not in ${offset}..${offset + arrayBuffer.byteLength}`) - } - if (end) { - if (end < offset || end > offset + arrayBuffer.byteLength) { - throw new Error(`end out of bounds: ${end} not in ${offset}..${offset + arrayBuffer.byteLength}`) - } - end -= offset - } - return arrayBuffer.slice(start - offset, end) - }, - } -} diff --git a/src/column.js b/src/column.js index 3f3b523..32f70c0 100644 --- a/src/column.js +++ b/src/column.js @@ -16,16 +16,14 @@ const dayMillis = 86400000000000 // 1 day in milliseconds /** * Read a column from the file. * - * @param {ArrayBufferLike} arrayBuffer parquet file contents + * @param {ArrayBuffer} arrayBuffer parquet file contents + * @param {number} columnOffset offset to start reading from * @param {RowGroup} rowGroup row group metadata * @param {ColumnMetaData} columnMetadata column metadata * @param {SchemaElement[]} schema schema for the file * @returns {ArrayLike} array of values */ -export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) { - // find start of column data - const columnOffset = getColumnOffset(columnMetadata) - +export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schema) { // parse column data let valuesSeen = 0 let byteOffset = 0 // byteOffset within the column @@ -42,10 +40,10 @@ export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) { } // read compressed_page_size bytes starting at offset - const compressedBytes = new Uint8Array(arrayBuffer.slice( + const compressedBytes = new Uint8Array(arrayBuffer).subarray( columnOffset + byteOffset, columnOffset + byteOffset + header.compressed_page_size - )) + ) // decompress bytes /** @type {Uint8Array | undefined} */ let page diff --git a/src/header.js b/src/header.js index 61be0a6..7ba54cf 100644 --- a/src/header.js +++ b/src/header.js @@ -13,13 +13,12 @@ import { deserializeTCompactProtocol } from './thrift.js' * * @typedef {import("./types.d.ts").ArrayBufferLike} ArrayBufferLike * @typedef {import("./types.d.ts").PageHeader} PageHeader - * @param {ArrayBufferLike} arrayBuffer parquet file contents + * @param {ArrayBuffer} arrayBuffer parquet file contents * @param {number} offset offset to start reading from * @returns {Decoded} metadata object and bytes read */ export function parquetHeader(arrayBuffer, offset) { - const headerBuffer = arrayBuffer.slice(offset) - const { value: header, byteLength } = deserializeTCompactProtocol(headerBuffer) + const { value: header, byteLength } = deserializeTCompactProtocol(arrayBuffer, offset) // Parse parquet header from thrift data const type = header.field_1 diff --git a/src/metadata.js b/src/metadata.js index 937bbae..1b02da7 100644 --- a/src/metadata.js +++ b/src/metadata.js @@ -90,8 +90,7 @@ export function parquetMetadata(arrayBuffer) { } const metadataOffset = metadataLengthOffset - metadataLength - const metadataBuffer = view.buffer.slice(metadataOffset, metadataLengthOffset) - const { value: metadata } = deserializeTCompactProtocol(metadataBuffer) + const { value: metadata } = deserializeTCompactProtocol(view.buffer, view.byteOffset + metadataOffset) // Parse parquet metadata from thrift data const version = metadata.field_1 diff --git a/src/read.js b/src/read.js index 47780ff..8ad9bab 100644 --- a/src/read.js +++ b/src/read.js @@ -1,5 +1,4 @@ -import { offsetArrayBuffer } from './asyncbuffer.js' import { getColumnOffset, readColumn } from './column.js' import { parquetMetadataAsync } from './metadata.js' @@ -94,11 +93,11 @@ async function readRowGroup(options, rowGroup) { throw new Error('parquet missing row group metadata') } // if row group size is less than 128mb, pre-load in one read - let groupBuffer = undefined + let groupBuffer if (groupEndByte - groupStartByte <= 1 << 27) { // pre-load row group byte data in one big read, // otherwise read column data individually - groupBuffer = offsetArrayBuffer(await file.slice(groupStartByte, groupEndByte), groupStartByte) + groupBuffer = await file.slice(groupStartByte, groupEndByte) } /** @type {any[][]} */ @@ -122,18 +121,18 @@ async function readRowGroup(options, rowGroup) { // use pre-loaded row group byte data if available, else read column data let buffer - if (!groupBuffer) { - buffer = file.slice(columnStartByte, columnEndByte).then(arrayBuffer => { - return offsetArrayBuffer(arrayBuffer, columnStartByte) - }) - } else { + let bufferOffset = 0 + if (groupBuffer) { buffer = Promise.resolve(groupBuffer) + bufferOffset = columnStartByte - groupStartByte + } else { + buffer = file.slice(columnStartByte, columnEndByte) } // read column data async promises.push(buffer.then(arrayBuffer => { // TODO: extract SchemaElement for this column - const columnData = readColumn(arrayBuffer, rowGroup, columnMetadata, metadata.schema) + const columnData = readColumn(arrayBuffer, bufferOffset, rowGroup, columnMetadata, metadata.schema) if (columnData.length !== Number(rowGroup.num_rows)) { throw new Error('parquet column length does not match row group length') } diff --git a/src/thrift.js b/src/thrift.js index cc866fa..ff6676e 100644 --- a/src/thrift.js +++ b/src/thrift.js @@ -22,16 +22,17 @@ const CompactType = { * @typedef {import("./types.d.ts").Decoded} Decoded * @template T * @param {ArrayBuffer} arrayBuffer + * @param {number} byteOffset offset into the buffer * @returns {Decoded>} */ -export function deserializeTCompactProtocol(arrayBuffer) { - const view = new DataView(arrayBuffer) +export function deserializeTCompactProtocol(arrayBuffer, byteOffset) { + const view = new DataView(arrayBuffer, byteOffset) let byteLength = 0 let lastFid = 0 /** @type {Record} */ const value = {} - while (byteLength < arrayBuffer.byteLength) { + while (byteLength < arrayBuffer.byteLength - byteOffset) { // Parse each field based on its type and add to the result object const [type, fid, newIndex, newLastFid] = readFieldBegin(view, byteLength, lastFid) byteLength = newIndex @@ -77,7 +78,7 @@ function readElement(view, type, index) { case CompactType.BINARY: { // strings are encoded as utf-8, no \0 delimiter const [stringLength, stringIndex] = readVarInt(view, index) - const strBytes = new Uint8Array(view.buffer, stringIndex, stringLength) + const strBytes = new Uint8Array(view.buffer, view.byteOffset + stringIndex, stringLength) return [new TextDecoder().decode(strBytes), stringIndex + stringLength] } case CompactType.LIST: { diff --git a/test/asyncbuffer.test.js b/test/asyncbuffer.test.js deleted file mode 100644 index b5dc2cb..0000000 --- a/test/asyncbuffer.test.js +++ /dev/null @@ -1,38 +0,0 @@ -import { describe, expect, it } from 'vitest' -import { offsetArrayBuffer } from '../src/asyncbuffer.js' - -describe('offsetArrayBuffer', () => { - it('creates a valid offset array buffer', () => { - const buffer = new ArrayBuffer(10) - const offsetBuffer = offsetArrayBuffer(buffer, 5) - expect(offsetBuffer.byteLength).toBe(15) - }) - - it('correctly slices the array buffer with offset', () => { - const buffer = new ArrayBuffer(10) - const offsetBuffer = offsetArrayBuffer(buffer, 5) - const view = new Uint8Array(buffer) - for (let i = 0; i < view.length; i++) { - view[i] = i // Populate the buffer with data [0, 1, 2, ...] - } - - const slicedBuffer = offsetBuffer.slice(5, 10) // This should give us [0, 1, 2, 3, 4] from the original buffer - const slicedView = new Uint8Array(slicedBuffer) - - for (let i = 0; i < slicedView.length; i++) { - expect(slicedView[i]).toBe(i) // Each item should match its index - } - }) - - it('throws error for negative offset', () => { - const buffer = new ArrayBuffer(10) - expect(() => offsetArrayBuffer(buffer, -5)).toThrow('offset must be positive') - }) - - it('throws error for out of bounds slice', () => { - const buffer = new ArrayBuffer(10) - const offsetBuffer = offsetArrayBuffer(buffer, 5) - expect(() => offsetBuffer.slice(3, 7)).toThrow('start out of bounds') - expect(() => offsetBuffer.slice(5, 20)).toThrow('end out of bounds') - }) -}) diff --git a/test/thrift.test.js b/test/thrift.test.js index 05046cc..f2fe29f 100644 --- a/test/thrift.test.js +++ b/test/thrift.test.js @@ -63,7 +63,7 @@ describe('deserializeTCompactProtocol function', () => { // Mark the end of the structure view.setUint8(index, 0x00) // STOP field - const { byteLength, value } = deserializeTCompactProtocol(buffer) + const { byteLength, value } = deserializeTCompactProtocol(buffer, 0) expect(byteLength).toBe(index + 1) // Assertions for each basic type