Fix definition level parsing

This commit is contained in:
Kenny Daniel 2024-01-20 13:52:36 -08:00
parent 8484426bc8
commit 6d03bd6d86
No known key found for this signature in database
GPG Key ID: 6A3C5E318BE71391
5 changed files with 34 additions and 12 deletions

@ -27,6 +27,7 @@ export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) {
// parse column data
let valuesSeen = 0
let byteOffset = 0 // byteOffset within the column
/** @type {ArrayLike<any> | undefined} */
let dictionary = undefined
const rowIndex = [0] // map/list object index
const rowData = []
@ -65,18 +66,23 @@ export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) {
const { definitionLevels, repetitionLevels, value: dataPage } = readDataPage(page, daph, schema, columnMetadata)
valuesSeen += daph.num_values
const dictionaryEncoding = daph.encoding === Encoding.PLAIN_DICTIONARY || daph.encoding === Encoding.RLE_DICTIONARY
// construct output values: skip nulls and construct lists
let values
if (repetitionLevels.length) {
// Use repetition levels to construct lists
if ([Encoding.PLAIN_DICTIONARY, Encoding.RLE_DICTIONARY].includes(daph.encoding)) {
// TODO: dereference dictionary values
if (dictionaryEncoding && dictionary !== undefined && Array.isArray(dataPage)) {
// dereference dictionary values
for (let i = 0; i < dataPage.length; i++) {
dataPage[i] = dictionary[dataPage[i]]
}
}
const isNull = columnMetadata && !isRequired(schema, [columnMetadata.path_in_schema[0]])
const nullValue = false // TODO: unused?
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
values = assembleObjects(definitionLevels, repetitionLevels, dataPage, isNull, nullValue, maxDefinitionLevel, rowIndex[0])
} else if (definitionLevels) {
} else if (definitionLevels?.length) {
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
// Use definition levels to skip nulls
let index = 0
@ -105,7 +111,12 @@ export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) {
}
}
} else {
// TODO: use dictionary
if (dictionaryEncoding && dictionary !== undefined && Array.isArray(dataPage)) {
// dereference dictionary values
for (let i = 0; i < dataPage.length; i++) {
dataPage[i] = dictionary[dataPage[i]]
}
}
values = dataPage
}

@ -23,7 +23,7 @@ const skipNulls = false // TODO
* @param {DataPageHeader} daph data page header
* @param {SchemaElement[]} schema schema for the file
* @param {ColumnMetaData} columnMetadata metadata for the column
* @returns {DataPage} array of values
* @returns {DataPage} definition levels, repetition levels, and array of values
*/
export function readDataPage(bytes, daph, schema, columnMetadata) {
const dataView = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
@ -142,8 +142,16 @@ function readDefinitionLevels(dataView, offset, daph, schema, columnMetadata) {
const { value: definitionLevels, byteLength } = readData(
dataView, daph.definition_level_encoding, offset, daph.num_values, bitWidth
)
const numNulls = daph.num_values - definitionLevels
.filter((/** @type number */ d) => d === maxDefinitionLevel).length
// count nulls
let numNulls = daph.num_values
for (const def of definitionLevels) {
if (def === maxDefinitionLevel) numNulls--
}
if (numNulls === 0) {
definitionLevels.length = 0
}
return { byteLength, definitionLevels, numNulls }
}
}

@ -206,7 +206,7 @@ export function readData(dataView, encoding, offset, count, bitWidth) {
if (encoding === ParquetEncoding.RLE) {
let seen = 0
while (seen < count) {
const { value: rleValues, byteLength: rleByteLength } = readRleBitPackedHybrid(dataView, offset + byteLength, bitWidth, 0, count)
const { value: rleValues, byteLength: rleByteLength } = readRleBitPackedHybrid(dataView, offset + byteLength, bitWidth, 0, 1)
if (!rleValues.length) break // EOF
value.push(...rleValues)
seen += rleValues.length
@ -220,13 +220,13 @@ export function readData(dataView, encoding, offset, count, bitWidth) {
/**
* Read values from a run-length encoded/bit-packed hybrid encoding.
* If length is not specified, then a 32-bit int is read first to grab the
* length of the encoded data.
*
* If length is zero, then read as int32 at the start of the encoded data.
*
* @param {DataView} dataView - buffer to read data from
* @param {number} offset - offset to start reading from the DataView
* @param {number} width - width of each bit-packed group
* @param {number | undefined} length - length of the encoded data
* @param {number} length - length of the encoded data
* @param {number} numValues - number of values to read
* @returns {Decoded<number[]>} array of rle/bit-packed values
*/

@ -113,11 +113,13 @@ async function readRowGroup(options, rowGroup) {
const columnStartByte = getColumnOffset(columnMetadata)
const columnEndByte = columnStartByte + Number(columnMetadata.total_compressed_size)
const columnBytes = columnEndByte - columnStartByte
// skip columns larger than 1gb
if (columnBytes > 1 << 30) {
console.warn(`parquet skipping huge column "${columnMetadata.path_in_schema}" ${columnBytes.toLocaleString()} bytes`)
continue
}
// use pre-loaded row group byte data if available, else read column data
let buffer
if (!groupBuffer) {
@ -127,6 +129,7 @@ async function readRowGroup(options, rowGroup) {
} else {
buffer = Promise.resolve(groupBuffer)
}
// read column data async
promises.push(buffer.then(arrayBuffer => {
// TODO: extract SchemaElement for this column

@ -117,7 +117,7 @@ describe('readRleBitPackedHybrid', () => {
dataView.setUint8(6, 0b00000011) // Bit-packed header for 3 values
dataView.setUint8(7, 0b00000100) // Bit-packed values (false, false, true)
const { byteLength, value } = readRleBitPackedHybrid(dataView, 0, 1, undefined, 6)
const { byteLength, value } = readRleBitPackedHybrid(dataView, 0, 1, 0, 6)
expect(byteLength).toBe(8)
expect(value).toEqual([1, 1, 1, 0, 0, 1])
})