From d093b0dcaacd47e6e21ec2936ccdd1ae958d2457 Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Wed, 1 May 2024 00:55:16 -0700 Subject: [PATCH] Use DataReader for thrift --- src/column.js | 12 ++-- src/datapageV2.js | 20 +++--- src/encoding.js | 3 +- src/header.js | 34 ++++------ src/metadata.js | 3 +- src/plain.js | 3 +- src/thrift.js | 154 +++++++++++++++++++------------------------- src/types.d.ts | 11 ---- test/thrift.test.js | 5 +- 9 files changed, 104 insertions(+), 141 deletions(-) diff --git a/src/column.js b/src/column.js index dfa2439..cf95cab 100644 --- a/src/column.js +++ b/src/column.js @@ -29,23 +29,23 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, /** @type {ArrayLike | undefined} */ let dictionary = undefined let valuesSeen = 0 - let byteOffset = 0 // byteOffset within the column /** @type {any[]} */ const rowData = [] const { element } = schemaPath[schemaPath.length - 1] + // column reader: + const reader = { view: new DataView(arrayBuffer, columnOffset), offset: 0 } while (valuesSeen < rowGroup.num_rows) { // parse column header - const { value: header, byteLength: headerLength } = parquetHeader(arrayBuffer, columnOffset + byteOffset) - byteOffset += headerLength + const header = parquetHeader(reader) if (header.compressed_page_size === undefined) { throw new Error(`parquet compressed page size is undefined in column '${columnMetadata.path_in_schema}'`) } // read compressed_page_size bytes starting at offset const compressedBytes = new Uint8Array(arrayBuffer).subarray( - columnOffset + byteOffset, - columnOffset + byteOffset + header.compressed_page_size + columnOffset + reader.offset, + columnOffset + reader.offset + header.compressed_page_size ) // parse page data by type @@ -134,7 +134,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, } else { throw new Error(`parquet unsupported page type: ${header.type}`) } - byteOffset += header.compressed_page_size + reader.offset += header.compressed_page_size } if (rowData.length !== Number(rowGroup.num_rows)) { throw new Error(`parquet row data length ${rowData.length} does not match row group length ${rowGroup.num_rows}}`) diff --git a/src/datapageV2.js b/src/datapageV2.js index 52fdaf6..880da08 100644 --- a/src/datapageV2.js +++ b/src/datapageV2.js @@ -139,20 +139,20 @@ function readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) { * @param {any[]} values array to write to */ function deltaBinaryUnpack(page, nValues, values) { - const dataView = new DataView(page.buffer, page.byteOffset, page.byteLength) - const [blockSize, index1] = readVarInt(dataView, 0) - const [miniblockPerBlock, index2] = readVarInt(dataView, index1) - const [count, index3] = readVarInt(dataView, index2) - let [value, offset] = readZigZag(dataView, index3) + const view = new DataView(page.buffer, page.byteOffset, page.byteLength) + const reader = { view, offset: 0 } + const blockSize = readVarInt(reader) + const miniblockPerBlock = readVarInt(reader) + const count = readVarInt(reader) + let value = readZigZag(reader) const valuesPerMiniblock = blockSize / miniblockPerBlock for (let valueIndex = 0; valueIndex < nValues;) { - const [minDelta, index4] = readZigZag(dataView, offset) - offset = index4 + const minDelta = readZigZag(reader) const bitWidths = new Uint8Array(miniblockPerBlock) - for (let i = 0; i < miniblockPerBlock; i++, offset++) { - bitWidths[i] = page[offset] + for (let i = 0; i < miniblockPerBlock; i++, reader.offset++) { + bitWidths[i] = page[reader.offset] } for (let i = 0; i < miniblockPerBlock; i++) { @@ -167,7 +167,7 @@ function deltaBinaryUnpack(page, nValues, values) { while (count) { if (stop < 0) { // fails when data gets too large - data = (data << 8) | dataView.getUint8(offset++) + data = (data << 8) | view.getUint8(reader.offset++) stop += 8 } else { values.push((data >> stop) & mask) diff --git a/src/encoding.js b/src/encoding.js index 7ceb84f..3c037a9 100644 --- a/src/encoding.js +++ b/src/encoding.js @@ -30,8 +30,7 @@ export function readRleBitPackedHybrid(reader, width, length, values) { let seen = 0 const startOffset = reader.offset while (reader.offset - startOffset < length && seen < values.length) { - const [header, newOffset] = readVarInt(reader.view, reader.offset) - reader.offset = newOffset + const header = readVarInt(reader) if ((header & 1) === 0) { // rle const count = header >>> 1 diff --git a/src/header.js b/src/header.js index f96f070..d68702f 100644 --- a/src/header.js +++ b/src/header.js @@ -1,21 +1,16 @@ import { Encoding, PageType } from './constants.js' import { deserializeTCompactProtocol } from './thrift.js' -/** - * @typedef {import("./types.d.ts").Decoded} Decoded - * @template T - */ - /** * Read parquet header from a buffer. * + * @typedef {import("./types.d.ts").DataReader} DataReader * @typedef {import("./types.d.ts").PageHeader} PageHeader - * @param {ArrayBuffer} arrayBuffer parquet file contents - * @param {number} offset offset to start reading from - * @returns {Decoded} metadata object and bytes read + * @param {DataReader} reader - parquet file reader + * @returns {PageHeader} metadata object and bytes read */ -export function parquetHeader(arrayBuffer, offset) { - const { value: header, byteLength } = deserializeTCompactProtocol(arrayBuffer, offset) +export function parquetHeader(reader) { + const header = deserializeTCompactProtocol(reader) // Parse parquet header from thrift data const type = PageType[header.field_1] @@ -54,16 +49,13 @@ export function parquetHeader(arrayBuffer, offset) { } return { - byteLength, - value: { - type, - uncompressed_page_size, - compressed_page_size, - crc, - data_page_header, - index_page_header, - dictionary_page_header, - data_page_header_v2, - }, + type, + uncompressed_page_size, + compressed_page_size, + crc, + data_page_header, + index_page_header, + dictionary_page_header, + data_page_header_v2, } } diff --git a/src/metadata.js b/src/metadata.js index 5e33ac8..ce1f2fd 100644 --- a/src/metadata.js +++ b/src/metadata.js @@ -94,7 +94,8 @@ export function parquetMetadata(arrayBuffer) { } const metadataOffset = metadataLengthOffset - metadataLength - const { value: metadata } = deserializeTCompactProtocol(view.buffer, view.byteOffset + metadataOffset) + const reader = { view, offset: metadataOffset } + const metadata = deserializeTCompactProtocol(reader) // Parse metadata from thrift data const version = metadata.field_1 diff --git a/src/plain.js b/src/plain.js index cfd6e37..de1fc9a 100644 --- a/src/plain.js +++ b/src/plain.js @@ -137,13 +137,12 @@ function readPlainByteArrayFixed(reader, fixedLength) { /** * Read `count` values of the given type from the reader.view. * - * @typedef {import("./types.d.ts").DecodedArray} DecodedArray * @typedef {import("./types.d.ts").ParquetType} ParquetType * @param {DataReader} reader - buffer to read data from * @param {ParquetType} type - parquet type of the data * @param {number} count - number of values to read * @param {boolean} utf8 - whether to decode byte arrays as UTF-8 - * @returns {DecodedArray} array of values + * @returns {ArrayLike} array of values */ export function readPlain(reader, type, count, utf8) { if (count === 0) return [] diff --git a/src/thrift.js b/src/thrift.js index 8161002..2a9b80a 100644 --- a/src/thrift.js +++ b/src/thrift.js @@ -19,23 +19,17 @@ const CompactType = { /** * Parse TCompactProtocol * - * @typedef {import("./types.d.ts").Decoded} Decoded - * @template T - * @param {ArrayBuffer} arrayBuffer - * @param {number} byteOffset offset into the buffer - * @returns {Decoded>} + * @param {DataReader} reader + * @returns {Record} */ -export function deserializeTCompactProtocol(arrayBuffer, byteOffset) { - const view = new DataView(arrayBuffer, byteOffset) - let byteLength = 0 +export function deserializeTCompactProtocol(reader) { let lastFid = 0 /** @type {Record} */ const value = {} - while (byteLength < arrayBuffer.byteLength - byteOffset) { + while (reader.offset < reader.view.byteLength) { // Parse each field based on its type and add to the result object - const [type, fid, newIndex, newLastFid] = readFieldBegin(view, byteLength, lastFid) - byteLength = newIndex + const [type, fid, newLastFid] = readFieldBegin(reader, lastFid) lastFid = newLastFid if (type === CompactType.STOP) { @@ -43,80 +37,75 @@ export function deserializeTCompactProtocol(arrayBuffer, byteOffset) { } // Handle the field based on its type - let fieldValue - [fieldValue, byteLength] = readElement(view, type, byteLength) - value[`field_${fid}`] = fieldValue + value[`field_${fid}`] = readElement(reader, type) } - return { value, byteLength } + return value } /** * Read a single element based on its type * - * @param {DataView} view + * @param {DataReader} reader * @param {number} type - * @param {number} index - * @returns {[any, number]} [value, newIndex] + * @returns {any} value */ -function readElement(view, type, index) { +function readElement(reader, type) { switch (type) { case CompactType.TRUE: - return [true, index] + return true case CompactType.FALSE: - return [false, index] + return false case CompactType.BYTE: // read byte directly - return [view.getInt8(index), index + 1] + return reader.view.getInt8(reader.offset++) case CompactType.I16: case CompactType.I32: - return readZigZag(view, index) + return readZigZag(reader) case CompactType.I64: - return readZigZagBigInt(view, index) - case CompactType.DOUBLE: - return [view.getFloat64(index, true), index + 8] + return readZigZagBigInt(reader) + case CompactType.DOUBLE: { + const value = reader.view.getFloat64(reader.offset, true) + reader.offset += 8 + return value + } case CompactType.BINARY: { // strings are encoded as utf-8, no \0 delimiter - const [stringLength, stringIndex] = readVarInt(view, index) - const strBytes = new Uint8Array(view.buffer, view.byteOffset + stringIndex, stringLength) - return [new TextDecoder().decode(strBytes), stringIndex + stringLength] + const stringLength = readVarInt(reader) + const strBytes = new Uint8Array(reader.view.buffer, reader.view.byteOffset + reader.offset, stringLength) + reader.offset += stringLength + return new TextDecoder().decode(strBytes) } case CompactType.LIST: { - const [elemType, listSize, listIndex] = readCollectionBegin(view, index) - index = listIndex + const [elemType, listSize] = readCollectionBegin(reader) const values = new Array(listSize) for (let i = 0; i < listSize; i++) { - let listElem - [listElem, index] = readElement(view, elemType, index) - values[i] = listElem + values[i] = readElement(reader, elemType) } - return [values, index] + return values } case CompactType.STRUCT: { /** @type {Record} */ const structValues = {} let structLastFid = 0 while (true) { - let structFieldType, structFid, structIndex - [structFieldType, structFid, structIndex, structLastFid] = readFieldBegin(view, index, structLastFid) - index = structIndex + let structFieldType, structFid + [structFieldType, structFid, structLastFid] = readFieldBegin(reader, structLastFid) if (structFieldType === CompactType.STOP) { break } - let structFieldValue - [structFieldValue, index] = readElement(view, structFieldType, index) - structValues[`field_${structFid}`] = structFieldValue + structValues[`field_${structFid}`] = readElement(reader, structFieldType) } - return [structValues, index] + return structValues } // TODO: MAP and SET case CompactType.UUID: { // Read 16 bytes to uuid string let uuid = '' for (let i = 0; i < 16; i++) { - uuid += view.getUint8(index++).toString(16).padStart(2, '0') + uuid += reader.view.getUint8(reader.offset++).toString(16).padStart(2, '0') } - return [uuid, index] + return uuid } default: throw new Error(`thrift unhandled type: ${type}`) @@ -131,18 +120,18 @@ function readElement(view, type, index) { * 7-bit group with the 0 bit, prefixing the remaining 7-bit groups with the * 1 bit and encode the resulting bit-string as Little Endian. * - * @param {DataView} view - * @param {number} index - * @returns {[number, number]} [value, newIndex] + * @typedef {import("./types.d.ts").DataReader} DataReader + * @param {DataReader} reader + * @returns {number} value */ -export function readVarInt(view, index) { +export function readVarInt(reader) { let result = 0 let shift = 0 while (true) { - const byte = view.getUint8(index++) + const byte = reader.view.getUint8(reader.offset++) result |= (byte & 0x7f) << shift if ((byte & 0x80) === 0) { - return [result, index] + return result } shift += 7 } @@ -151,18 +140,17 @@ export function readVarInt(view, index) { /** * Read a varint as a bigint. * - * @param {DataView} view - * @param {number} index - * @returns {[bigint, number]} [value, newIndex] + * @param {DataReader} reader + * @returns {bigint} value */ -function readVarBigInt(view, index) { +function readVarBigInt(reader) { let result = BigInt(0) let shift = BigInt(0) while (true) { - const byte = BigInt(view.getUint8(index++)) + const byte = BigInt(reader.view.getUint8(reader.offset++)) result |= (byte & BigInt(0x7f)) << shift if ((byte & BigInt(0x80)) === BigInt(0)) { - return [result, index] + return result } shift += BigInt(7) } @@ -172,30 +160,26 @@ function readVarBigInt(view, index) { * Values of type int32 and int64 are transformed to a zigzag int. * A zigzag int folds positive and negative numbers into the positive number space. * - * @param {DataView} view - * @param {number} index - * @returns {[number, number]} [value, newIndex] + * @param {DataReader} reader + * @returns {number} value */ -export function readZigZag(view, index) { - const [zigzag, newIndex] = readVarInt(view, index) +export function readZigZag(reader) { + const zigzag = readVarInt(reader) // convert zigzag to int - const value = (zigzag >>> 1) ^ -(zigzag & 1) - return [value, newIndex] + return (zigzag >>> 1) ^ -(zigzag & 1) } /** * A zigzag int folds positive and negative numbers into the positive number space. * This version returns a BigInt. * - * @param {DataView} view - * @param {number} index - * @returns {[bigint, number]} [value, newIndex] + * @param {DataReader} reader + * @returns {bigint} value */ -function readZigZagBigInt(view, index) { - const [zigzag, newIndex] = readVarBigInt(view, index) +function readZigZagBigInt(reader) { + const zigzag = readVarBigInt(reader) // convert zigzag to int - const value = (zigzag >> BigInt(1)) ^ -(zigzag & BigInt(1)) - return [value, newIndex] + return (zigzag >> BigInt(1)) ^ -(zigzag & BigInt(1)) } /** @@ -211,45 +195,43 @@ function getCompactType(byte) { /** * Read field type and field id * - * @param {DataView} view - * @param {number} index + * @param {DataReader} reader * @param {number} lastFid - * @returns {[number, number, number, number]} [type, fid, newIndex, newLastFid] + * @returns {[number, number, number]} [type, fid, newLastFid] */ -function readFieldBegin(view, index, lastFid) { - const type = view.getUint8(index++) +function readFieldBegin(reader, lastFid) { + const type = reader.view.getUint8(reader.offset++) if ((type & 0x0f) === CompactType.STOP) { // STOP also ends a struct - return [0, 0, index, lastFid] + return [0, 0, lastFid] } const delta = type >> 4 let fid // field id if (delta === 0) { // not a delta, read zigzag varint field id - [fid, index] = readZigZag(view, index) + fid = readZigZag(reader) } else { // add delta to last field id fid = lastFid + delta } - return [getCompactType(type), fid, index, fid] + return [getCompactType(type), fid, fid] } /** * Read collection type and size * - * @param {DataView} view - * @param {number} index - * @returns {[number, number, number]} [type, size, newIndex] + * @param {DataReader} reader + * @returns {[number, number]} [type, size] */ -function readCollectionBegin(view, index) { - const sizeType = view.getUint8(index++) +function readCollectionBegin(reader) { + const sizeType = reader.view.getUint8(reader.offset++) const size = sizeType >> 4 const type = getCompactType(sizeType) if (size === 15) { - const [newSize, newIndex] = readVarInt(view, index) - return [type, newSize, newIndex] + const newSize = readVarInt(reader) + return [type, newSize] } - return [type, size, index] + return [type, size] } /** diff --git a/src/types.d.ts b/src/types.d.ts index 8ad741c..886ba1e 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -8,15 +8,6 @@ export interface AsyncBuffer { slice(start: number, end?: number): Awaitable } -/** - * Represents a decoded value, and includes the number of bytes read. - * This is used to read data from the file and advance a virtual file pointer. - */ -export interface Decoded { - value: T - byteLength: number -} - export interface DataReader { view: DataView offset: number @@ -248,8 +239,6 @@ interface DataPageHeaderV2 { statistics?: Statistics } -type DecodedArray = any[] | Uint8Array - interface DataPage { definitionLevels: number[] | undefined repetitionLevels: number[] diff --git a/test/thrift.test.js b/test/thrift.test.js index f2fe29f..b37b25c 100644 --- a/test/thrift.test.js +++ b/test/thrift.test.js @@ -63,8 +63,9 @@ describe('deserializeTCompactProtocol function', () => { // Mark the end of the structure view.setUint8(index, 0x00) // STOP field - const { byteLength, value } = deserializeTCompactProtocol(buffer, 0) - expect(byteLength).toBe(index + 1) + const reader = { view, offset: 0 } + const value = deserializeTCompactProtocol(reader) + expect(reader.offset).toBe(index + 1) // Assertions for each basic type expect(value.field_1).toBe(true) // TRUE