Use DataReader for thrift

This commit is contained in:
Kenny Daniel 2024-05-01 00:55:16 -07:00
parent b8660baea1
commit d093b0dcaa
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
9 changed files with 104 additions and 141 deletions

@ -29,23 +29,23 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
/** @type {ArrayLike<any> | undefined} */
let dictionary = undefined
let valuesSeen = 0
let byteOffset = 0 // byteOffset within the column
/** @type {any[]} */
const rowData = []
const { element } = schemaPath[schemaPath.length - 1]
// column reader:
const reader = { view: new DataView(arrayBuffer, columnOffset), offset: 0 }
while (valuesSeen < rowGroup.num_rows) {
// parse column header
const { value: header, byteLength: headerLength } = parquetHeader(arrayBuffer, columnOffset + byteOffset)
byteOffset += headerLength
const header = parquetHeader(reader)
if (header.compressed_page_size === undefined) {
throw new Error(`parquet compressed page size is undefined in column '${columnMetadata.path_in_schema}'`)
}
// read compressed_page_size bytes starting at offset
const compressedBytes = new Uint8Array(arrayBuffer).subarray(
columnOffset + byteOffset,
columnOffset + byteOffset + header.compressed_page_size
columnOffset + reader.offset,
columnOffset + reader.offset + header.compressed_page_size
)
// parse page data by type
@ -134,7 +134,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
} else {
throw new Error(`parquet unsupported page type: ${header.type}`)
}
byteOffset += header.compressed_page_size
reader.offset += header.compressed_page_size
}
if (rowData.length !== Number(rowGroup.num_rows)) {
throw new Error(`parquet row data length ${rowData.length} does not match row group length ${rowGroup.num_rows}}`)

@ -139,20 +139,20 @@ function readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) {
* @param {any[]} values array to write to
*/
function deltaBinaryUnpack(page, nValues, values) {
const dataView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const [blockSize, index1] = readVarInt(dataView, 0)
const [miniblockPerBlock, index2] = readVarInt(dataView, index1)
const [count, index3] = readVarInt(dataView, index2)
let [value, offset] = readZigZag(dataView, index3)
const view = new DataView(page.buffer, page.byteOffset, page.byteLength)
const reader = { view, offset: 0 }
const blockSize = readVarInt(reader)
const miniblockPerBlock = readVarInt(reader)
const count = readVarInt(reader)
let value = readZigZag(reader)
const valuesPerMiniblock = blockSize / miniblockPerBlock
for (let valueIndex = 0; valueIndex < nValues;) {
const [minDelta, index4] = readZigZag(dataView, offset)
offset = index4
const minDelta = readZigZag(reader)
const bitWidths = new Uint8Array(miniblockPerBlock)
for (let i = 0; i < miniblockPerBlock; i++, offset++) {
bitWidths[i] = page[offset]
for (let i = 0; i < miniblockPerBlock; i++, reader.offset++) {
bitWidths[i] = page[reader.offset]
}
for (let i = 0; i < miniblockPerBlock; i++) {
@ -167,7 +167,7 @@ function deltaBinaryUnpack(page, nValues, values) {
while (count) {
if (stop < 0) {
// fails when data gets too large
data = (data << 8) | dataView.getUint8(offset++)
data = (data << 8) | view.getUint8(reader.offset++)
stop += 8
} else {
values.push((data >> stop) & mask)

@ -30,8 +30,7 @@ export function readRleBitPackedHybrid(reader, width, length, values) {
let seen = 0
const startOffset = reader.offset
while (reader.offset - startOffset < length && seen < values.length) {
const [header, newOffset] = readVarInt(reader.view, reader.offset)
reader.offset = newOffset
const header = readVarInt(reader)
if ((header & 1) === 0) {
// rle
const count = header >>> 1

@ -1,21 +1,16 @@
import { Encoding, PageType } from './constants.js'
import { deserializeTCompactProtocol } from './thrift.js'
/**
* @typedef {import("./types.d.ts").Decoded<T>} Decoded
* @template T
*/
/**
* Read parquet header from a buffer.
*
* @typedef {import("./types.d.ts").DataReader} DataReader
* @typedef {import("./types.d.ts").PageHeader} PageHeader
* @param {ArrayBuffer} arrayBuffer parquet file contents
* @param {number} offset offset to start reading from
* @returns {Decoded<PageHeader>} metadata object and bytes read
* @param {DataReader} reader - parquet file reader
* @returns {PageHeader} metadata object and bytes read
*/
export function parquetHeader(arrayBuffer, offset) {
const { value: header, byteLength } = deserializeTCompactProtocol(arrayBuffer, offset)
export function parquetHeader(reader) {
const header = deserializeTCompactProtocol(reader)
// Parse parquet header from thrift data
const type = PageType[header.field_1]
@ -54,16 +49,13 @@ export function parquetHeader(arrayBuffer, offset) {
}
return {
byteLength,
value: {
type,
uncompressed_page_size,
compressed_page_size,
crc,
data_page_header,
index_page_header,
dictionary_page_header,
data_page_header_v2,
},
type,
uncompressed_page_size,
compressed_page_size,
crc,
data_page_header,
index_page_header,
dictionary_page_header,
data_page_header_v2,
}
}

@ -94,7 +94,8 @@ export function parquetMetadata(arrayBuffer) {
}
const metadataOffset = metadataLengthOffset - metadataLength
const { value: metadata } = deserializeTCompactProtocol(view.buffer, view.byteOffset + metadataOffset)
const reader = { view, offset: metadataOffset }
const metadata = deserializeTCompactProtocol(reader)
// Parse metadata from thrift data
const version = metadata.field_1

@ -137,13 +137,12 @@ function readPlainByteArrayFixed(reader, fixedLength) {
/**
* Read `count` values of the given type from the reader.view.
*
* @typedef {import("./types.d.ts").DecodedArray} DecodedArray
* @typedef {import("./types.d.ts").ParquetType} ParquetType
* @param {DataReader} reader - buffer to read data from
* @param {ParquetType} type - parquet type of the data
* @param {number} count - number of values to read
* @param {boolean} utf8 - whether to decode byte arrays as UTF-8
* @returns {DecodedArray} array of values
* @returns {ArrayLike<any>} array of values
*/
export function readPlain(reader, type, count, utf8) {
if (count === 0) return []

@ -19,23 +19,17 @@ const CompactType = {
/**
* Parse TCompactProtocol
*
* @typedef {import("./types.d.ts").Decoded<T>} Decoded
* @template T
* @param {ArrayBuffer} arrayBuffer
* @param {number} byteOffset offset into the buffer
* @returns {Decoded<Record<string, any>>}
* @param {DataReader} reader
* @returns {Record<string, any>}
*/
export function deserializeTCompactProtocol(arrayBuffer, byteOffset) {
const view = new DataView(arrayBuffer, byteOffset)
let byteLength = 0
export function deserializeTCompactProtocol(reader) {
let lastFid = 0
/** @type {Record<string, any>} */
const value = {}
while (byteLength < arrayBuffer.byteLength - byteOffset) {
while (reader.offset < reader.view.byteLength) {
// Parse each field based on its type and add to the result object
const [type, fid, newIndex, newLastFid] = readFieldBegin(view, byteLength, lastFid)
byteLength = newIndex
const [type, fid, newLastFid] = readFieldBegin(reader, lastFid)
lastFid = newLastFid
if (type === CompactType.STOP) {
@ -43,80 +37,75 @@ export function deserializeTCompactProtocol(arrayBuffer, byteOffset) {
}
// Handle the field based on its type
let fieldValue
[fieldValue, byteLength] = readElement(view, type, byteLength)
value[`field_${fid}`] = fieldValue
value[`field_${fid}`] = readElement(reader, type)
}
return { value, byteLength }
return value
}
/**
* Read a single element based on its type
*
* @param {DataView} view
* @param {DataReader} reader
* @param {number} type
* @param {number} index
* @returns {[any, number]} [value, newIndex]
* @returns {any} value
*/
function readElement(view, type, index) {
function readElement(reader, type) {
switch (type) {
case CompactType.TRUE:
return [true, index]
return true
case CompactType.FALSE:
return [false, index]
return false
case CompactType.BYTE:
// read byte directly
return [view.getInt8(index), index + 1]
return reader.view.getInt8(reader.offset++)
case CompactType.I16:
case CompactType.I32:
return readZigZag(view, index)
return readZigZag(reader)
case CompactType.I64:
return readZigZagBigInt(view, index)
case CompactType.DOUBLE:
return [view.getFloat64(index, true), index + 8]
return readZigZagBigInt(reader)
case CompactType.DOUBLE: {
const value = reader.view.getFloat64(reader.offset, true)
reader.offset += 8
return value
}
case CompactType.BINARY: {
// strings are encoded as utf-8, no \0 delimiter
const [stringLength, stringIndex] = readVarInt(view, index)
const strBytes = new Uint8Array(view.buffer, view.byteOffset + stringIndex, stringLength)
return [new TextDecoder().decode(strBytes), stringIndex + stringLength]
const stringLength = readVarInt(reader)
const strBytes = new Uint8Array(reader.view.buffer, reader.view.byteOffset + reader.offset, stringLength)
reader.offset += stringLength
return new TextDecoder().decode(strBytes)
}
case CompactType.LIST: {
const [elemType, listSize, listIndex] = readCollectionBegin(view, index)
index = listIndex
const [elemType, listSize] = readCollectionBegin(reader)
const values = new Array(listSize)
for (let i = 0; i < listSize; i++) {
let listElem
[listElem, index] = readElement(view, elemType, index)
values[i] = listElem
values[i] = readElement(reader, elemType)
}
return [values, index]
return values
}
case CompactType.STRUCT: {
/** @type {Record<string, any>} */
const structValues = {}
let structLastFid = 0
while (true) {
let structFieldType, structFid, structIndex
[structFieldType, structFid, structIndex, structLastFid] = readFieldBegin(view, index, structLastFid)
index = structIndex
let structFieldType, structFid
[structFieldType, structFid, structLastFid] = readFieldBegin(reader, structLastFid)
if (structFieldType === CompactType.STOP) {
break
}
let structFieldValue
[structFieldValue, index] = readElement(view, structFieldType, index)
structValues[`field_${structFid}`] = structFieldValue
structValues[`field_${structFid}`] = readElement(reader, structFieldType)
}
return [structValues, index]
return structValues
}
// TODO: MAP and SET
case CompactType.UUID: {
// Read 16 bytes to uuid string
let uuid = ''
for (let i = 0; i < 16; i++) {
uuid += view.getUint8(index++).toString(16).padStart(2, '0')
uuid += reader.view.getUint8(reader.offset++).toString(16).padStart(2, '0')
}
return [uuid, index]
return uuid
}
default:
throw new Error(`thrift unhandled type: ${type}`)
@ -131,18 +120,18 @@ function readElement(view, type, index) {
* 7-bit group with the 0 bit, prefixing the remaining 7-bit groups with the
* 1 bit and encode the resulting bit-string as Little Endian.
*
* @param {DataView} view
* @param {number} index
* @returns {[number, number]} [value, newIndex]
* @typedef {import("./types.d.ts").DataReader} DataReader
* @param {DataReader} reader
* @returns {number} value
*/
export function readVarInt(view, index) {
export function readVarInt(reader) {
let result = 0
let shift = 0
while (true) {
const byte = view.getUint8(index++)
const byte = reader.view.getUint8(reader.offset++)
result |= (byte & 0x7f) << shift
if ((byte & 0x80) === 0) {
return [result, index]
return result
}
shift += 7
}
@ -151,18 +140,17 @@ export function readVarInt(view, index) {
/**
* Read a varint as a bigint.
*
* @param {DataView} view
* @param {number} index
* @returns {[bigint, number]} [value, newIndex]
* @param {DataReader} reader
* @returns {bigint} value
*/
function readVarBigInt(view, index) {
function readVarBigInt(reader) {
let result = BigInt(0)
let shift = BigInt(0)
while (true) {
const byte = BigInt(view.getUint8(index++))
const byte = BigInt(reader.view.getUint8(reader.offset++))
result |= (byte & BigInt(0x7f)) << shift
if ((byte & BigInt(0x80)) === BigInt(0)) {
return [result, index]
return result
}
shift += BigInt(7)
}
@ -172,30 +160,26 @@ function readVarBigInt(view, index) {
* Values of type int32 and int64 are transformed to a zigzag int.
* A zigzag int folds positive and negative numbers into the positive number space.
*
* @param {DataView} view
* @param {number} index
* @returns {[number, number]} [value, newIndex]
* @param {DataReader} reader
* @returns {number} value
*/
export function readZigZag(view, index) {
const [zigzag, newIndex] = readVarInt(view, index)
export function readZigZag(reader) {
const zigzag = readVarInt(reader)
// convert zigzag to int
const value = (zigzag >>> 1) ^ -(zigzag & 1)
return [value, newIndex]
return (zigzag >>> 1) ^ -(zigzag & 1)
}
/**
* A zigzag int folds positive and negative numbers into the positive number space.
* This version returns a BigInt.
*
* @param {DataView} view
* @param {number} index
* @returns {[bigint, number]} [value, newIndex]
* @param {DataReader} reader
* @returns {bigint} value
*/
function readZigZagBigInt(view, index) {
const [zigzag, newIndex] = readVarBigInt(view, index)
function readZigZagBigInt(reader) {
const zigzag = readVarBigInt(reader)
// convert zigzag to int
const value = (zigzag >> BigInt(1)) ^ -(zigzag & BigInt(1))
return [value, newIndex]
return (zigzag >> BigInt(1)) ^ -(zigzag & BigInt(1))
}
/**
@ -211,45 +195,43 @@ function getCompactType(byte) {
/**
* Read field type and field id
*
* @param {DataView} view
* @param {number} index
* @param {DataReader} reader
* @param {number} lastFid
* @returns {[number, number, number, number]} [type, fid, newIndex, newLastFid]
* @returns {[number, number, number]} [type, fid, newLastFid]
*/
function readFieldBegin(view, index, lastFid) {
const type = view.getUint8(index++)
function readFieldBegin(reader, lastFid) {
const type = reader.view.getUint8(reader.offset++)
if ((type & 0x0f) === CompactType.STOP) {
// STOP also ends a struct
return [0, 0, index, lastFid]
return [0, 0, lastFid]
}
const delta = type >> 4
let fid // field id
if (delta === 0) {
// not a delta, read zigzag varint field id
[fid, index] = readZigZag(view, index)
fid = readZigZag(reader)
} else {
// add delta to last field id
fid = lastFid + delta
}
return [getCompactType(type), fid, index, fid]
return [getCompactType(type), fid, fid]
}
/**
* Read collection type and size
*
* @param {DataView} view
* @param {number} index
* @returns {[number, number, number]} [type, size, newIndex]
* @param {DataReader} reader
* @returns {[number, number]} [type, size]
*/
function readCollectionBegin(view, index) {
const sizeType = view.getUint8(index++)
function readCollectionBegin(reader) {
const sizeType = reader.view.getUint8(reader.offset++)
const size = sizeType >> 4
const type = getCompactType(sizeType)
if (size === 15) {
const [newSize, newIndex] = readVarInt(view, index)
return [type, newSize, newIndex]
const newSize = readVarInt(reader)
return [type, newSize]
}
return [type, size, index]
return [type, size]
}
/**

11
src/types.d.ts vendored

@ -8,15 +8,6 @@ export interface AsyncBuffer {
slice(start: number, end?: number): Awaitable<ArrayBuffer>
}
/**
* Represents a decoded value, and includes the number of bytes read.
* This is used to read data from the file and advance a virtual file pointer.
*/
export interface Decoded<T> {
value: T
byteLength: number
}
export interface DataReader {
view: DataView
offset: number
@ -248,8 +239,6 @@ interface DataPageHeaderV2 {
statistics?: Statistics
}
type DecodedArray = any[] | Uint8Array
interface DataPage {
definitionLevels: number[] | undefined
repetitionLevels: number[]

@ -63,8 +63,9 @@ describe('deserializeTCompactProtocol function', () => {
// Mark the end of the structure
view.setUint8(index, 0x00) // STOP field
const { byteLength, value } = deserializeTCompactProtocol(buffer, 0)
expect(byteLength).toBe(index + 1)
const reader = { view, offset: 0 }
const value = deserializeTCompactProtocol(reader)
expect(reader.offset).toBe(index + 1)
// Assertions for each basic type
expect(value.field_1).toBe(true) // TRUE