Use DataReader over Decoded. Fewer allocations, slightly faster.

This commit is contained in:
Kenny Daniel 2024-04-17 00:48:33 -07:00
parent d000142cd7
commit f826bff757
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
7 changed files with 256 additions and 271 deletions

@ -10,17 +10,13 @@ import {
const skipNulls = false // TODO
/**
* @typedef {{ byteLength: number, definitionLevels: number[], numNulls: number }} DefinitionLevels
* @typedef {{ definitionLevels: number[], numNulls: number }} DefinitionLevels
* @typedef {import("./types.d.ts").DataPage} DataPage
* @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData
* @typedef {import("./types.d.ts").DataPageHeader} DataPageHeader
* @typedef {import("./types.d.ts").DictionaryPageHeader} DictionaryPageHeader
* @typedef {import("./types.d.ts").SchemaElement} SchemaElement
*/
/**
* @typedef {import("./types.d.ts").Decoded<T>} Decoded
* @template T
*/
/**
* Read a data page from the given Uint8Array.
@ -32,16 +28,15 @@ const skipNulls = false // TODO
* @returns {DataPage} definition levels, repetition levels, and array of values
*/
export function readDataPage(bytes, daph, schema, columnMetadata) {
const dataView = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
let offset = 0
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
const reader = { view, offset: 0 }
/** @type {any[]} */
let values = []
// repetition levels
const { value: repetitionLevels, byteLength } = readRepetitionLevels(
dataView, offset, daph, schema, columnMetadata
const repetitionLevels = readRepetitionLevels(
reader, daph, schema, columnMetadata
)
offset += byteLength
// definition levels
let definitionLevels = undefined
@ -50,12 +45,11 @@ export function readDataPage(bytes, daph, schema, columnMetadata) {
// TODO: move into readDefinitionLevels
if (skipNulls && !isRequired(schema, columnMetadata.path_in_schema)) {
// skip_definition_bytes
offset += skipDefinitionBytes(daph.num_values)
reader.offset += skipDefinitionBytes(daph.num_values)
} else {
const dl = readDefinitionLevels(dataView, offset, daph, schema, columnMetadata.path_in_schema)
const dl = readDefinitionLevels(reader, daph, schema, columnMetadata.path_in_schema)
definitionLevels = dl.definitionLevels
numNulls = dl.numNulls
offset += dl.byteLength
}
// read values based on encoding
@ -63,9 +57,8 @@ export function readDataPage(bytes, daph, schema, columnMetadata) {
if (daph.encoding === 'PLAIN') {
const { element } = schemaElement(schema, columnMetadata.path_in_schema)
const utf8 = element.converted_type === 'UTF8'
const plainObj = readPlain(dataView, columnMetadata.type, nValues, offset, utf8)
values = Array.isArray(plainObj.value) ? plainObj.value : Array.from(plainObj.value)
offset += plainObj.byteLength
const plainObj = readPlain(reader, columnMetadata.type, nValues, utf8)
values = Array.isArray(plainObj) ? plainObj : Array.from(plainObj)
} else if (
daph.encoding === 'PLAIN_DICTIONARY' ||
daph.encoding === 'RLE_DICTIONARY' ||
@ -77,14 +70,13 @@ export function readDataPage(bytes, daph, schema, columnMetadata) {
if (columnMetadata.type === 'BOOLEAN') {
bitWidth = 1
} else {
bitWidth = dataView.getUint8(offset)
offset += 1
bitWidth = view.getUint8(reader.offset)
reader.offset++
}
if (bitWidth) {
const { value, byteLength } = readRleBitPackedHybrid(
dataView, offset, bitWidth, dataView.byteLength - offset, nValues
const value = readRleBitPackedHybrid(
reader, bitWidth, view.byteLength - reader.offset, nValues
)
offset += byteLength
values = Array.isArray(value) ? value : Array.from(value)
} else {
// nval zeros
@ -107,51 +99,51 @@ export function readDataPage(bytes, daph, schema, columnMetadata) {
* @returns {ArrayLike<any>} array of values
*/
export function readDictionaryPage(bytes, diph, schema, columnMetadata) {
const dataView = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
return readPlain(dataView, columnMetadata.type, diph.num_values, 0, false).value
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
const reader = { view, offset: 0 }
return readPlain(reader, columnMetadata.type, diph.num_values, false)
}
/**
* Read the repetition levels from this page, if any.
*
* @param {DataView} dataView data view for the page
* @param {number} offset offset to start reading from
* @typedef {import("./types.d.ts").DataReader} DataReader
* @param {DataReader} reader data view for the page
* @param {DataPageHeader} daph data page header
* @param {SchemaElement[]} schema schema for the file
* @param {ColumnMetaData} columnMetadata metadata for the column
* @returns {Decoded<any[]>} repetition levels and number of bytes read
* @returns {any[]} repetition levels and number of bytes read
*/
function readRepetitionLevels(dataView, offset, daph, schema, columnMetadata) {
function readRepetitionLevels(reader, daph, schema, columnMetadata) {
if (columnMetadata.path_in_schema.length > 1) {
const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema)
if (maxRepetitionLevel) {
const bitWidth = widthFromMaxInt(maxRepetitionLevel)
return readData(
dataView, daph.repetition_level_encoding, offset, daph.num_values, bitWidth
reader, daph.repetition_level_encoding, daph.num_values, bitWidth
)
}
}
return { value: [], byteLength: 0 }
return []
}
/**
* Read the definition levels from this page, if any.
*
* @param {DataView} dataView data view for the page
* @param {number} offset offset to start reading from
* @param {DataReader} reader data view for the page
* @param {DataPageHeader} daph data page header
* @param {SchemaElement[]} schema schema for the file
* @param {string[]} path_in_schema path in the schema
* @returns {DefinitionLevels} definition levels and number of bytes read
*/
function readDefinitionLevels(dataView, offset, daph, schema, path_in_schema) {
function readDefinitionLevels(reader, daph, schema, path_in_schema) {
if (!isRequired(schema, path_in_schema)) {
const maxDefinitionLevel = getMaxDefinitionLevel(schema, path_in_schema)
const bitWidth = widthFromMaxInt(maxDefinitionLevel)
if (bitWidth) {
// num_values is index 1 for either type of page header
const { value: definitionLevels, byteLength } = readData(
dataView, daph.definition_level_encoding, offset, daph.num_values, bitWidth
const definitionLevels = readData(
reader, daph.definition_level_encoding, daph.num_values, bitWidth
)
// count nulls
@ -163,8 +155,8 @@ function readDefinitionLevels(dataView, offset, daph, schema, path_in_schema) {
definitionLevels.length = 0
}
return { byteLength, definitionLevels, numNulls }
return { definitionLevels, numNulls }
}
}
return { byteLength: 0, definitionLevels: [], numNulls: 0 }
return { definitionLevels: [], numNulls: 0 }
}

@ -3,11 +3,6 @@ import { readPlain, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.j
import { getMaxDefinitionLevel, getMaxRepetitionLevel, schemaElement } from './schema.js'
import { readVarInt, readZigZag } from './thrift.js'
/**
* @typedef {import("./types.d.ts").Decoded<T>} Decoded
* @template T
*/
/**
* Read a data page from the given Uint8Array.
*
@ -25,8 +20,8 @@ import { readVarInt, readZigZag } from './thrift.js'
* @returns {DataPage} definition levels, repetition levels, and array of values
*/
export function readDataPageV2(compressedBytes, ph, schema, columnMetadata, compressors) {
const dataView = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength)
let offset = 0
const view = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength)
const reader = { view, offset: 0 }
/** @type {any} */
let values = []
@ -34,13 +29,19 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata, comp
if (!daph2) throw new Error('parquet data page header v2 is undefined')
// repetition levels
const repetitionLevels = readRepetitionLevelsV2(dataView, offset, daph2, schema, columnMetadata)
const repetitionLevels = readRepetitionLevelsV2(reader, daph2, schema, columnMetadata)
if (reader.offset !== daph2.repetition_levels_byte_length) {
throw new Error(`parquet repetition levels byte length ${reader.offset} does not match expected ${daph2.repetition_levels_byte_length}`)
}
// definition levels
offset += daph2.repetition_levels_byte_length
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
const definitionLevels = readDefinitionLevelsV2(dataView, offset, daph2, maxDefinitionLevel)
offset += daph2.definition_levels_byte_length
const definitionLevels = readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel)
if (reader.offset !== daph2.repetition_levels_byte_length + daph2.definition_levels_byte_length) {
throw new Error(`parquet definition levels byte length ${reader.offset} does not match expected ${daph2.repetition_levels_byte_length + daph2.definition_levels_byte_length}`)
}
const uncompressedPageSize = ph.uncompressed_page_size - daph2.definition_levels_byte_length - daph2.repetition_levels_byte_length
@ -49,13 +50,13 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata, comp
if (daph2.encoding === 'PLAIN') {
const { element } = schemaElement(schema, columnMetadata.path_in_schema)
const utf8 = element.converted_type === 'UTF8'
let page = compressedBytes.slice(offset)
let page = compressedBytes.slice(reader.offset)
if (daph2.is_compressed && columnMetadata.codec !== 'UNCOMPRESSED') {
page = decompressPage(page, uncompressedPageSize, columnMetadata.codec, compressors)
}
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const plainObj = readPlain(pageView, columnMetadata.type, nValues, 0, utf8)
values = plainObj.value
const pageReader = { view: pageView, offset: 0 }
values = readPlain(pageReader, columnMetadata.type, nValues, utf8)
} else if (daph2.encoding === 'RLE') {
const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec, compressors)
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
@ -63,21 +64,22 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata, comp
if (daph2.num_nulls) {
throw new Error('parquet RLE encoding with nulls not supported')
} else {
const pageReader = { view: pageView, offset: 4 }
values = readRleBitPackedHybrid(
pageView, 4, bitWidth, uncompressedPageSize, nValues
).value
pageReader, bitWidth, uncompressedPageSize, nValues
)
}
} else if (
daph2.encoding === 'PLAIN_DICTIONARY' ||
daph2.encoding === 'RLE_DICTIONARY'
) {
compressedBytes = compressedBytes.subarray(offset)
compressedBytes = compressedBytes.subarray(reader.offset)
const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec, compressors)
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const bitWidth = pageView.getUint8(0)
const { value } = readRleBitPackedHybrid(
pageView, 1, bitWidth, uncompressedPageSize, nValues
const pageReader = { view: pageView, offset: 1 }
const value = readRleBitPackedHybrid(
pageReader, bitWidth, uncompressedPageSize, nValues
)
values = value
} else if (daph2.encoding === 'DELTA_BINARY_PACKED') {
@ -95,40 +97,39 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata, comp
/**
* Read the repetition levels from this page, if any.
*
* @param {DataView} dataView data view for the page
* @param {number} offset offset to start reading from
* @typedef {import("./types.d.ts").DataReader} DataReader
* @param {DataReader} reader data view for the page
* @param {DataPageHeaderV2} daph2 data page header
* @param {SchemaElement[]} schema schema for the file
* @param {ColumnMetaData} columnMetadata metadata for the column
* @returns {any[]} repetition levels and number of bytes read
*/
export function readRepetitionLevelsV2(dataView, offset, daph2, schema, columnMetadata) {
export function readRepetitionLevelsV2(reader, daph2, schema, columnMetadata) {
const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema)
if (!maxRepetitionLevel) return []
const bitWidth = widthFromMaxInt(maxRepetitionLevel)
// num_values is index 1 for either type of page header
return readRleBitPackedHybrid(
dataView, offset, bitWidth, daph2.repetition_levels_byte_length, daph2.num_values
).value
reader, bitWidth, daph2.repetition_levels_byte_length, daph2.num_values
)
}
/**
* Read the definition levels from this page, if any.
*
* @param {DataView} dataView data view for the page
* @param {number} offset offset to start reading from
* @param {DataReader} reader data view for the page
* @param {DataPageHeaderV2} daph2 data page header v2
* @param {number} maxDefinitionLevel maximum definition level for this column
* @returns {number[] | undefined} definition levels and number of bytes read
*/
function readDefinitionLevelsV2(dataView, offset, daph2, maxDefinitionLevel) {
function readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) {
if (maxDefinitionLevel) {
// not the same as V1, because we know the length
const bitWidth = widthFromMaxInt(maxDefinitionLevel)
return readRleBitPackedHybrid(
dataView, offset, bitWidth, daph2.definition_levels_byte_length, daph2.num_values
).value
reader, bitWidth, daph2.definition_levels_byte_length, daph2.num_values
)
}
}

@ -1,189 +1,176 @@
import { readVarInt } from './thrift.js'
import { concat } from './utils.js'
/**
* Return type with bytes read.
* This is useful to advance an offset through a buffer.
*
* @typedef {import("./types.d.ts").Decoded<T>} Decoded
* @template T
*/
/**
* Read `count` boolean values.
*
* @param {DataView} dataView - buffer to read data from
* @param {number} offset - offset to start reading from the DataView
* @param {DataReader} reader - buffer to read data from
* @param {number} count - number of values to read
* @returns {Decoded<boolean[]>} array of boolean values
* @returns {boolean[]} array of boolean values
*/
function readPlainBoolean(dataView, offset, count) {
function readPlainBoolean(reader, count) {
const value = []
for (let i = 0; i < count; i++) {
const byteOffset = offset + Math.floor(i / 8)
const byteOffset = reader.offset + Math.floor(i / 8)
const bitOffset = i % 8
const byte = dataView.getUint8(byteOffset)
const byte = reader.view.getUint8(byteOffset)
value.push((byte & (1 << bitOffset)) !== 0)
}
return { value, byteLength: Math.ceil(count / 8) }
reader.offset += Math.ceil(count / 8)
return value
}
/**
* Read `count` int32 values.
*
* @param {DataView} dataView - buffer to read data from
* @param {number} offset - offset to start reading from the DataView
* @param {DataReader} reader - buffer to read data from
* @param {number} count - number of values to read
* @returns {Decoded<number[]>} array of int32 values
* @returns {number[]} array of int32 values
*/
function readPlainInt32(dataView, offset, count) {
function readPlainInt32(reader, count) {
const value = []
for (let i = 0; i < count; i++) {
value.push(dataView.getInt32(offset + i * 4, true))
value.push(reader.view.getInt32(reader.offset + i * 4, true))
}
return { value, byteLength: count * 4 }
reader.offset += count * 4
return value
}
/**
* Read `count` int64 values.
*
* @param {DataView} dataView - buffer to read data from
* @param {number} offset - offset to start reading from the DataView
* @param {DataReader} reader - buffer to read data from
* @param {number} count - number of values to read
* @returns {Decoded<bigint[]>} array of int64 values
* @returns {bigint[]} array of int64 values
*/
function readPlainInt64(dataView, offset, count) {
function readPlainInt64(reader, count) {
const value = []
for (let i = 0; i < count; i++) {
value.push(dataView.getBigInt64(offset + i * 8, true))
value.push(reader.view.getBigInt64(reader.offset + i * 8, true))
}
return { value, byteLength: count * 8 }
reader.offset += count * 8
return value
}
/**
* Read `count` int96 values.
*
* @param {DataView} dataView - buffer to read data from
* @param {number} offset - offset to start reading from the DataView
* @param {DataReader} reader - buffer to read data from
* @param {number} count - number of values to read
* @returns {Decoded<bigint[]>} array of int96 values
* @returns {bigint[]} array of int96 values
*/
function readPlainInt96(dataView, offset, count) {
function readPlainInt96(reader, count) {
const value = []
for (let i = 0; i < count; i++) {
const low = dataView.getBigInt64(offset + i * 12, true)
const high = dataView.getInt32(offset + i * 12 + 8, true)
const low = reader.view.getBigInt64(reader.offset + i * 12, true)
const high = reader.view.getInt32(reader.offset + i * 12 + 8, true)
value.push((BigInt(high) << BigInt(32)) | low)
}
return { value, byteLength: count * 12 }
reader.offset += count * 12
return value
}
/**
* Read `count` float values.
*
* @param {DataView} dataView - buffer to read data from
* @param {number} offset - offset to start reading from the DataView
* @param {DataReader} reader - buffer to read data from
* @param {number} count - number of values to read
* @returns {Decoded<number[]>} array of float values
* @returns {number[]} array of float values
*/
function readPlainFloat(dataView, offset, count) {
function readPlainFloat(reader, count) {
const value = []
for (let i = 0; i < count; i++) {
value.push(dataView.getFloat32(offset + i * 4, true))
value.push(reader.view.getFloat32(reader.offset + i * 4, true))
}
return { value, byteLength: count * 4 }
reader.offset += count * 4
return value
}
/**
* Read `count` double values.
*
* @param {DataView} dataView - buffer to read data from
* @param {number} offset - offset to start reading from the DataView
* @param {DataReader} reader - buffer to read data from
* @param {number} count - number of values to read
* @returns {Decoded<number[]>} array of double values
* @returns {number[]} array of double values
*/
function readPlainDouble(dataView, offset, count) {
function readPlainDouble(reader, count) {
const value = []
for (let i = 0; i < count; i++) {
value.push(dataView.getFloat64(offset + i * 8, true))
value.push(reader.view.getFloat64(reader.offset + i * 8, true))
}
return { value, byteLength: count * 8 }
reader.offset += count * 8
return value
}
/**
* Read `count` byte array values.
*
* @param {DataView} dataView - buffer to read data from
* @param {number} offset - offset to start reading from the DataView
* @param {DataReader} reader - buffer to read data from
* @param {number} count - number of values to read
* @returns {Decoded<Uint8Array[]>} array of byte arrays
* @returns {Uint8Array[]} array of byte arrays
*/
function readPlainByteArray(dataView, offset, count) {
function readPlainByteArray(reader, count) {
const value = []
let byteLength = 0 // byte length of all data read
for (let i = 0; i < count; i++) {
const length = dataView.getInt32(offset + byteLength, true)
byteLength += 4
const bytes = new Uint8Array(dataView.buffer, dataView.byteOffset + offset + byteLength, length)
const length = reader.view.getInt32(reader.offset, true)
reader.offset += 4
const bytes = new Uint8Array(reader.view.buffer, reader.view.byteOffset + reader.offset, length)
value.push(bytes)
byteLength += length
reader.offset += length
}
return { value, byteLength }
return value
}
/**
* Read a fixed length byte array.
*
* @param {DataView} dataView - buffer to read data from
* @param {number} offset - offset to start reading from the DataView
* @param {DataReader} reader - buffer to read data from
* @param {number} fixedLength - length of each fixed length byte array
* @returns {Decoded<Uint8Array>} array of fixed length byte arrays
* @returns {Uint8Array} array of fixed length byte arrays
*/
function readPlainByteArrayFixed(dataView, offset, fixedLength) {
return {
value: new Uint8Array(dataView.buffer, dataView.byteOffset + offset, fixedLength),
byteLength: fixedLength,
}
function readPlainByteArrayFixed(reader, fixedLength) {
reader.offset += fixedLength
return new Uint8Array(
reader.view.buffer,
reader.view.byteOffset + reader.offset - fixedLength,
fixedLength
)
}
/**
* Read `count` values of the given type from the dataView.
* Read `count` values of the given type from the reader.view.
*
* @typedef {import("./types.d.ts").DecodedArray} DecodedArray
* @typedef {import("./types.d.ts").ParquetType} ParquetType
* @param {DataView} dataView - buffer to read data from
* @param {DataReader} reader - buffer to read data from
* @param {ParquetType} type - parquet type of the data
* @param {number} count - number of values to read
* @param {number} offset - offset to start reading from the DataView
* @param {boolean} utf8 - whether to decode byte arrays as UTF-8
* @returns {Decoded<DecodedArray>} array of values
* @returns {DecodedArray} array of values
*/
export function readPlain(dataView, type, count, offset, utf8) {
if (count === 0) return { value: [], byteLength: 0 }
export function readPlain(reader, type, count, utf8) {
if (count === 0) return []
if (type === 'BOOLEAN') {
return readPlainBoolean(dataView, offset, count)
return readPlainBoolean(reader, count)
} else if (type === 'INT32') {
return readPlainInt32(dataView, offset, count)
return readPlainInt32(reader, count)
} else if (type === 'INT64') {
return readPlainInt64(dataView, offset, count)
return readPlainInt64(reader, count)
} else if (type === 'INT96') {
return readPlainInt96(dataView, offset, count)
return readPlainInt96(reader, count)
} else if (type === 'FLOAT') {
return readPlainFloat(dataView, offset, count)
return readPlainFloat(reader, count)
} else if (type === 'DOUBLE') {
return readPlainDouble(dataView, offset, count)
return readPlainDouble(reader, count)
} else if (type === 'BYTE_ARRAY') {
const byteArray = readPlainByteArray(dataView, offset, count)
const byteArray = readPlainByteArray(reader, count)
if (utf8) {
const decoder = new TextDecoder()
return {
value: byteArray.value.map(bytes => decoder.decode(bytes)),
byteLength: byteArray.byteLength,
}
return byteArray.map(bytes => decoder.decode(bytes))
}
return byteArray
} else if (type === 'FIXED_LEN_BYTE_ARRAY') {
return readPlainByteArrayFixed(dataView, offset, count)
return readPlainByteArrayFixed(reader, count)
} else {
throw new Error(`parquet unhandled type: ${type}`)
}
@ -204,30 +191,27 @@ export function widthFromMaxInt(value) {
* The data could be definition levels, repetition levels, or actual values.
*
* @typedef {import("./types.d.ts").Encoding} Encoding
* @param {DataView} dataView - buffer to read data from
* @param {DataReader} reader - buffer to read data from
* @param {Encoding} encoding - encoding type
* @param {number} offset - offset to start reading from the DataView
* @param {number} count - number of values to read
* @param {number} bitWidth - width of each bit-packed group
* @returns {Decoded<any>} array of values
* @returns {any[]} array of values
*/
export function readData(dataView, encoding, offset, count, bitWidth) {
export function readData(reader, encoding, count, bitWidth) {
/** @type {any[]} */
const value = []
let byteLength = 0
if (encoding === 'RLE') {
let seen = 0
while (seen < count) {
const rle = readRleBitPackedHybrid(dataView, offset + byteLength, bitWidth, 0, count)
if (!rle.value.length) break // EOF
concat(value, rle.value)
seen += rle.value.length
byteLength += rle.byteLength
const rle = readRleBitPackedHybrid(reader, bitWidth, 0, count)
if (!rle.length) break // EOF
concat(value, rle)
seen += rle.length
}
} else {
throw new Error(`parquet encoding not supported ${encoding}`)
}
return { value, byteLength }
return value
}
/**
@ -235,42 +219,39 @@ export function readData(dataView, encoding, offset, count, bitWidth) {
*
* If length is zero, then read as int32 at the start of the encoded data.
*
* @param {DataView} dataView - buffer to read data from
* @param {number} offset - offset to start reading from the DataView
* @typedef {import("./types.d.ts").DataReader} DataReader
* @param {DataReader} reader - buffer to read data from
* @param {number} width - width of each bit-packed group
* @param {number} length - length of the encoded data
* @param {number} numValues - number of values to read
* @returns {Decoded<number[]>} array of rle/bit-packed values
* @returns {number[]} array of rle/bit-packed values
*/
export function readRleBitPackedHybrid(dataView, offset, width, length, numValues) {
let byteLength = 0
export function readRleBitPackedHybrid(reader, width, length, numValues) {
if (!length) {
length = dataView.getInt32(offset, true)
length = reader.view.getInt32(reader.offset, true)
reader.offset += 4
if (length < 0) throw new Error(`parquet invalid rle/bitpack length ${length}`)
byteLength += 4
}
/** @type {number[]} */
const value = []
const startByteLength = byteLength
while (byteLength - startByteLength < length && value.length < numValues) {
const [header, newOffset] = readVarInt(dataView, offset + byteLength)
byteLength = newOffset - offset
const startOffset = reader.offset
while (reader.offset - startOffset < length && value.length < numValues) {
const [header, newOffset] = readVarInt(reader.view, reader.offset)
reader.offset = newOffset
if ((header & 1) === 0) {
// rle
const rle = readRle(dataView, offset + byteLength, header, width)
concat(value, rle.value)
byteLength += rle.byteLength
const rle = readRle(reader, header, width)
concat(value, rle)
} else {
// bit-packed
const bitPacked = readBitPacked(
dataView, offset + byteLength, header, width, numValues - value.length
reader, header, width, numValues - value.length
)
concat(value, bitPacked.value)
byteLength += bitPacked.byteLength
concat(value, bitPacked)
}
}
return { value, byteLength }
return value
}
/**
@ -279,26 +260,24 @@ export function readRleBitPackedHybrid(dataView, offset, width, length, numValue
* The count is determined from the header and the width is used to grab the
* value that's repeated. Yields the value repeated count times.
*
* @param {DataView} dataView - buffer to read data from
* @param {number} offset - offset to start reading from the DataView
* @param {DataReader} reader - buffer to read data from
* @param {number} header - header information
* @param {number} bitWidth - width of each bit-packed group
* @returns {Decoded<number[]>} array of rle values
* @returns {number[]} array of rle values
*/
function readRle(dataView, offset, header, bitWidth) {
function readRle(reader, header, bitWidth) {
const count = header >>> 1
const width = (bitWidth + 7) >> 3
let byteLength = 0
let readValue
if (width === 1) {
readValue = dataView.getUint8(offset)
byteLength += 1
readValue = reader.view.getUint8(reader.offset)
reader.offset++
} else if (width === 2) {
readValue = dataView.getUint16(offset, true)
byteLength += 2
readValue = reader.view.getUint16(reader.offset, true)
reader.offset += 2
} else if (width === 4) {
readValue = dataView.getUint32(offset, true)
byteLength += 4
readValue = reader.view.getUint32(reader.offset, true)
reader.offset += 4
} else {
throw new Error(`parquet invalid rle width ${width}`)
}
@ -308,33 +287,32 @@ function readRle(dataView, offset, header, bitWidth) {
for (let i = 0; i < count; i++) {
value.push(readValue)
}
return { value, byteLength }
return value
}
/**
* Read a bit-packed run of the rle/bitpack hybrid.
* Supports width > 8 (crossing bytes).
*
* @param {DataView} dataView - buffer to read data from
* @param {number} offset - offset to start reading from the DataView
* @param {DataReader} reader - buffer to read data from
* @param {number} header - header information
* @param {number} bitWidth - width of each bit-packed group
* @param {number} remaining - number of values remaining to be read
* @returns {Decoded<number[]>} array of bit-packed values
* @returns {number[]} array of bit-packed values
*/
function readBitPacked(dataView, offset, header, bitWidth, remaining) {
function readBitPacked(reader, header, bitWidth, remaining) {
// extract number of values to read from header
let count = (header >> 1) << 3
const mask = maskForBits(bitWidth)
// Sometimes it tries to read outside of available memory, but it will be masked out anyway
let data = 0
if (offset < dataView.byteLength) {
data = dataView.getUint8(offset)
if (reader.offset < reader.view.byteLength) {
data = reader.view.getUint8(reader.offset)
reader.offset++
} else if (mask) {
throw new Error(`parquet bitpack offset ${offset} out of range`)
throw new Error(`parquet bitpack offset ${reader.offset} out of range`)
}
let byteLength = 1
let left = 8
let right = 0
/** @type {number[]} */
@ -349,8 +327,8 @@ function readBitPacked(dataView, offset, header, bitWidth, remaining) {
data >>= 8
} else if (left - right < bitWidth) {
// if we don't have bitWidth number of bits to read, read next byte
data |= dataView.getUint8(offset + byteLength) << left
byteLength++
data |= reader.view.getUint8(reader.offset) << left
reader.offset++
left += 8
} else {
// otherwise, read bitWidth number of bits
@ -367,7 +345,7 @@ function readBitPacked(dataView, offset, header, bitWidth, remaining) {
}
// return values and number of bytes read
return { value, byteLength }
return value
}
/**

@ -79,7 +79,7 @@ export function getMaxRepetitionLevel(schema, parts) {
parts.forEach((part, i) => {
const { element } = schemaElement(schema, parts.slice(0, i + 1))
if (element.repetition_type === 'REPEATED') {
maxLevel += 1
maxLevel++
}
})
return maxLevel
@ -97,7 +97,7 @@ export function getMaxDefinitionLevel(schema, parts) {
parts.forEach((part, i) => {
const { element } = schemaElement(schema, parts.slice(0, i + 1))
if (element.repetition_type !== 'REQUIRED') {
maxLevel += 1
maxLevel++
}
})
return maxLevel
@ -113,7 +113,7 @@ export function skipDefinitionBytes(num) {
let byteLength = 6
let n = num >>> 8
while (n !== 0) {
byteLength += 1
byteLength++
n >>>= 7
}
return byteLength

@ -54,7 +54,7 @@ export function snappyUncompress(input, output) {
// skip preamble (contains uncompressed length as varint)
while (pos < inputLength) {
const c = input[pos]
pos += 1
pos++
if (c < 128) {
break
}
@ -66,7 +66,7 @@ export function snappyUncompress(input, output) {
while (pos < inputLength) {
const c = input[pos]
let len = 0
pos += 1
pos++
if (pos >= inputLength) {
throw new Error('missing eof marker')
@ -103,7 +103,7 @@ export function snappyUncompress(input, output) {
// Copy with 1-byte offset
len = ((c >>> 2) & 0x7) + 4
offset = input[pos] + ((c >>> 5) << 8)
pos += 1
pos++
break
case 2:
// Copy with 2-byte offset

5
src/types.d.ts vendored

@ -15,6 +15,11 @@ export interface Decoded<T> {
byteLength: number
}
export interface DataReader {
view: DataView
offset: number
}
// Parquet file metadata types
export interface FileMetaData {
version: number

@ -4,89 +4,96 @@ import { readPlain, readRleBitPackedHybrid } from '../src/encoding.js'
describe('readPlain', () => {
it('reads BOOLEAN values correctly', () => {
const dataView = new DataView(new ArrayBuffer(1))
dataView.setUint8(0, 0b00000001) // Set the first bit to 1
const result = readPlain(dataView, 'BOOLEAN', 1, 0, false)
expect(result).toEqual({ value: [true], byteLength: 1 })
const view = new DataView(new ArrayBuffer(1))
view.setUint8(0, 0b00000001) // Set the first bit to 1
const reader = { view, offset: 0 }
const result = readPlain(reader, 'BOOLEAN', 1, false)
expect(result).toEqual([true])
expect(reader.offset).toBe(1)
})
it('reads INT32 values correctly', () => {
const dataView = new DataView(new ArrayBuffer(4))
dataView.setInt32(0, 123456789, true) // little-endian
const result = readPlain(dataView, 'INT32', 1, 0, false)
expect(result).toEqual({ value: [123456789], byteLength: 4 })
const view = new DataView(new ArrayBuffer(4))
view.setInt32(0, 123456789, true) // little-endian
const reader = { view, offset: 0 }
const result = readPlain(reader, 'INT32', 1, false)
expect(result).toEqual([123456789])
expect(reader.offset).toBe(4)
})
it('reads INT64 values correctly', () => {
const dataView = new DataView(new ArrayBuffer(8))
dataView.setBigInt64(0, BigInt('1234567890123456789'), true)
const result = readPlain(dataView, 'INT64', 1, 0, false)
expect(result).toEqual({ value: [1234567890123456789n], byteLength: 8 })
const view = new DataView(new ArrayBuffer(8))
view.setBigInt64(0, BigInt('1234567890123456789'), true)
const reader = { view, offset: 0 }
const result = readPlain(reader, 'INT64', 1, false)
expect(result).toEqual([1234567890123456789n])
expect(reader.offset).toBe(8)
})
it('reads INT96 values correctly', () => {
const buffer = new ArrayBuffer(12)
const dataView = new DataView(buffer)
const view = new DataView(buffer)
// Example INT96 value split into 64-bit low part and 32-bit high part
const low = BigInt('0x0123456789ABCDEF')
const high = 0x02345678
dataView.setBigInt64(0, low, true)
dataView.setInt32(8, high, true)
view.setBigInt64(0, low, true)
view.setInt32(8, high, true)
const reader = { view, offset: 0 }
const result = readPlain(reader, 'INT96', 1, false)
const expectedValue = (BigInt(high) << BigInt(32)) | low
const result = readPlain(dataView, 'INT96', 1, 0, false)
expect(result).toEqual({
value: [expectedValue],
byteLength: 12,
})
expect(result).toEqual([expectedValue])
expect(reader.offset).toBe(12)
})
it('reads FLOAT values correctly', () => {
const dataView = new DataView(new ArrayBuffer(4))
dataView.setFloat32(0, 1234.5, true) // little-endian
const result = readPlain(dataView, 'FLOAT', 1, 0, false)
expect(result).toEqual({ value: [1234.5], byteLength: 4 })
const view = new DataView(new ArrayBuffer(4))
view.setFloat32(0, 1234.5, true) // little-endian
const reader = { view, offset: 0 }
const result = readPlain(reader, 'FLOAT', 1, false)
expect(result).toEqual([1234.5])
expect(reader.offset).toBe(4)
})
it('reads DOUBLE values correctly', () => {
const dataView = new DataView(new ArrayBuffer(8))
dataView.setFloat64(0, 12345.6789, true) // little-endian
const result = readPlain(dataView, 'DOUBLE', 1, 0, false)
expect(result).toEqual({ value: [12345.6789], byteLength: 8 })
const view = new DataView(new ArrayBuffer(8))
view.setFloat64(0, 12345.6789, true) // little-endian
const reader = { view, offset: 0 }
const result = readPlain(reader, 'DOUBLE', 1, false)
expect(result).toEqual([12345.6789])
expect(reader.offset).toBe(8)
})
it('reads BYTE_ARRAY values correctly', () => {
const dataView = new DataView(new ArrayBuffer(10))
dataView.setInt32(0, 3, true) // length of the first byte array
dataView.setUint8(4, 1) // first byte array data
dataView.setUint8(5, 2)
dataView.setUint8(6, 3)
const result = readPlain(dataView, 'BYTE_ARRAY', 1, 0, false)
expect(result).toEqual({
value: [new Uint8Array([1, 2, 3])],
byteLength: 7,
})
const view = new DataView(new ArrayBuffer(10))
view.setInt32(0, 3, true) // length of the first byte array
view.setUint8(4, 1) // first byte array data
view.setUint8(5, 2)
view.setUint8(6, 3)
const reader = { view, offset: 0 }
const result = readPlain(reader, 'BYTE_ARRAY', 1, false)
expect(result).toEqual([new Uint8Array([1, 2, 3])])
expect(reader.offset).toBe(7)
})
it('reads FIXED_LEN_BYTE_ARRAY values correctly', () => {
const fixedLength = 3
const dataView = new DataView(new ArrayBuffer(fixedLength))
dataView.setUint8(0, 4)
dataView.setUint8(1, 5)
dataView.setUint8(2, 6)
const result = readPlain(dataView, 'FIXED_LEN_BYTE_ARRAY', fixedLength, 0, false)
expect(result).toEqual({
value: new Uint8Array([4, 5, 6]),
byteLength: fixedLength,
})
const view = new DataView(new ArrayBuffer(fixedLength))
view.setUint8(0, 4)
view.setUint8(1, 5)
view.setUint8(2, 6)
const reader = { view, offset: 0 }
const result = readPlain(reader, 'FIXED_LEN_BYTE_ARRAY', fixedLength, false)
expect(result).toEqual(new Uint8Array([4, 5, 6]))
expect(reader.offset).toBe(fixedLength)
})
it('throws an error for unhandled types', () => {
const dataView = new DataView(new ArrayBuffer(0))
const view = new DataView(new ArrayBuffer(0))
const reader = { view, offset: 0 }
/** @type any */
const invalidType = 'invalidType'
expect(() => readPlain(dataView, invalidType, 1, 0, false))
expect(() => readPlain(reader, invalidType, 1, false))
.toThrow(`parquet unhandled type: ${invalidType}`)
})
})
@ -97,29 +104,31 @@ describe('readRleBitPackedHybrid', () => {
// RLE values: true x3
// Bit-packed values: false, false, true
const buffer = new ArrayBuffer(4)
const dataView = new DataView(buffer)
dataView.setUint8(0, 0b00000110) // RLE header for 3 true values
dataView.setUint8(1, 0b00000001) // RLE value (true)
dataView.setUint8(2, 0b00000011) // Bit-packed header for 3 values
dataView.setUint8(3, 0b00000100) // Bit-packed values (false, false, true)
const view = new DataView(buffer)
view.setUint8(0, 0b00000110) // RLE header for 3 true values
view.setUint8(1, 0b00000001) // RLE value (true)
view.setUint8(2, 0b00000011) // Bit-packed header for 3 values
view.setUint8(3, 0b00000100) // Bit-packed values (false, false, true)
const reader = { view, offset: 0 }
const { byteLength, value } = readRleBitPackedHybrid(dataView, 0, 1, 3, 6)
expect(byteLength).toBe(4)
const value = readRleBitPackedHybrid(reader, 1, 3, 6)
expect(reader.offset).toBe(4)
expect(value).toEqual([1, 1, 1, 0, 0, 1])
})
it('reads RLE bit-packed hybrid values with implicit length', () => {
// Example buffer: same as previous test, but with implicit length
const buffer = new ArrayBuffer(8)
const dataView = new DataView(buffer)
dataView.setInt32(0, 3, true) // length 3 little-endian
dataView.setUint8(4, 0b00000110) // RLE header for 3 true values
dataView.setUint8(5, 0b00000001) // RLE value (true)
dataView.setUint8(6, 0b00000011) // Bit-packed header for 3 values
dataView.setUint8(7, 0b00000100) // Bit-packed values (false, false, true)
const view = new DataView(buffer)
view.setInt32(0, 3, true) // length 3 little-endian
view.setUint8(4, 0b00000110) // RLE header for 3 true values
view.setUint8(5, 0b00000001) // RLE value (true)
view.setUint8(6, 0b00000011) // Bit-packed header for 3 values
view.setUint8(7, 0b00000100) // Bit-packed values (false, false, true)
const reader = { view, offset: 0 }
const { byteLength, value } = readRleBitPackedHybrid(dataView, 0, 1, 0, 6)
expect(byteLength).toBe(8)
const value = readRleBitPackedHybrid(reader, 1, 0, 6)
expect(reader.offset).toBe(8)
expect(value).toEqual([1, 1, 1, 0, 0, 1])
})
})