From 17f7ace840f0b2d1bb5d9394d1bc50951b29f144 Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Sun, 11 Feb 2024 10:05:21 -0800 Subject: [PATCH] Fix bug in readData, support more parquet files --- .eslintrc.json | 5 +++-- src/column.js | 6 +++++- src/datapage.js | 16 ++++++++++++---- src/encoding.js | 24 +++++++++++++----------- 4 files changed, 33 insertions(+), 18 deletions(-) diff --git a/.eslintrc.json b/.eslintrc.json index e0e1ea6..a23944f 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -47,13 +47,14 @@ "prefer-const": "error", "prefer-destructuring": ["warn", {"object": true, "array": false}], "prefer-promise-reject-errors": "error", - "require-await": "error", "quotes": ["error", "single"], + "require-await": "error", "semi": ["error", "never"], "sort-imports": ["error", { "ignoreDeclarationSort": true, "ignoreMemberSort": false, "memberSyntaxSortOrder": ["none", "all", "multiple", "single"] - }] + }], + "space-infix-ops": "error" } } diff --git a/src/column.js b/src/column.js index 32f70c0..74ccdd4 100644 --- a/src/column.js +++ b/src/column.js @@ -49,7 +49,9 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, let page const uncompressed_page_size = Number(header.uncompressed_page_size) const { codec } = columnMetadata - if (codec === CompressionCodec.SNAPPY) { + if (codec === CompressionCodec.UNCOMPRESSED) { + page = compressedBytes + } else if (codec === CompressionCodec.SNAPPY) { page = new Uint8Array(uncompressed_page_size) snappyUncompress(compressedBytes, page) } else { @@ -138,6 +140,8 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, if (!diph) throw new Error('parquet dictionary page header is undefined') dictionary = readDictionaryPage(page, diph, schema, columnMetadata) + } else if (header.type === PageType.DATA_PAGE_V2) { + throw new Error('parquet data page v2 not supported') } else { throw new Error(`parquet unsupported page type: ${header.type}`) } diff --git a/src/datapage.js b/src/datapage.js index 9867b34..9e49276 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -32,7 +32,9 @@ export function readDataPage(bytes, daph, schema, columnMetadata) { let values = [] // repetition levels - const { value: repetitionLevels, byteLength } = readRepetitionLevels(dataView, offset, daph, schema, columnMetadata) + const { value: repetitionLevels, byteLength } = readRepetitionLevels( + dataView, offset, daph, schema, columnMetadata + ) offset += byteLength // definition levels @@ -52,9 +54,14 @@ export function readDataPage(bytes, daph, schema, columnMetadata) { // read values based on encoding const nval = daph.num_values - numNulls if (daph.encoding === Encoding.PLAIN) { - const plainObj = readPlain(dataView, columnMetadata.type, daph.num_values - numNulls, offset) + const plainObj = readPlain(dataView, columnMetadata.type, nval, offset) values = plainObj.value offset += plainObj.byteLength + } else if (daph.encoding === Encoding.PLAIN_DICTIONARY) { + const plainObj = readPlain(dataView, columnMetadata.type, nval, offset) + values = plainObj.value + offset += plainObj.byteLength + // TODO: dictionary decoding } else if (daph.encoding === Encoding.RLE_DICTIONARY) { // bit width is stored as single byte let bitWidth @@ -66,7 +73,9 @@ export function readDataPage(bytes, daph, schema, columnMetadata) { offset += 1 } if (bitWidth) { - const { value, byteLength } = readRleBitPackedHybrid(dataView, offset, bitWidth, dataView.byteLength - offset, daph.num_values - numNulls) + const { value, byteLength } = readRleBitPackedHybrid( + dataView, offset, bitWidth, dataView.byteLength - offset, nval + ) offset += byteLength values = value } else { @@ -124,7 +133,6 @@ function readRepetitionLevels(dataView, offset, daph, schema, columnMetadata) { /** * Read the definition levels from this page, if any. - * Other implementations read the definition levels and num nulls, but we don't need em. * * @param {DataView} dataView data view for the page * @param {number} offset offset to start reading from diff --git a/src/encoding.js b/src/encoding.js index 948a96a..2265997 100644 --- a/src/encoding.js +++ b/src/encoding.js @@ -206,11 +206,11 @@ export function readData(dataView, encoding, offset, count, bitWidth) { if (encoding === ParquetEncoding.RLE) { let seen = 0 while (seen < count) { - const { value: rleValues, byteLength: rleByteLength } = readRleBitPackedHybrid(dataView, offset + byteLength, bitWidth, 0, 1) - if (!rleValues.length) break // EOF - value.push(...rleValues) - seen += rleValues.length - byteLength += rleByteLength + const rle = readRleBitPackedHybrid(dataView, offset + byteLength, bitWidth, 0, count) + if (!rle.value.length) break // EOF + value.push(...rle.value) + seen += rle.value.length + byteLength += rle.byteLength } } else { throw new Error(`parquet encoding not supported ${encoding}`) @@ -244,14 +244,16 @@ export function readRleBitPackedHybrid(dataView, offset, width, length, numValue byteLength = newOffset - offset if ((header & 1) === 0) { // rle - const { value: rleValues, byteLength: rleByteLength } = readRle(dataView, offset + byteLength, header, width) - value.push(...rleValues) - byteLength += rleByteLength + const rle = readRle(dataView, offset + byteLength, header, width) + value.push(...rle.value) + byteLength += rle.byteLength } else { // bit-packed - const { value: bitPackedValues, byteLength: bitPackedByteLength } = readBitPacked(dataView, offset + byteLength, header, width, numValues-value.length) - value.push(...bitPackedValues) - byteLength += bitPackedByteLength + const bitPacked = readBitPacked( + dataView, offset + byteLength, header, width, numValues - value.length + ) + value.push(...bitPacked.value) + byteLength += bitPacked.byteLength } }