Fix bug in readData, support more parquet files

This commit is contained in:
Kenny Daniel 2024-02-11 10:05:21 -08:00
parent 0bc1a46b9d
commit 17f7ace840
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
4 changed files with 33 additions and 18 deletions

@ -47,13 +47,14 @@
"prefer-const": "error",
"prefer-destructuring": ["warn", {"object": true, "array": false}],
"prefer-promise-reject-errors": "error",
"require-await": "error",
"quotes": ["error", "single"],
"require-await": "error",
"semi": ["error", "never"],
"sort-imports": ["error", {
"ignoreDeclarationSort": true,
"ignoreMemberSort": false,
"memberSyntaxSortOrder": ["none", "all", "multiple", "single"]
}]
}],
"space-infix-ops": "error"
}
}

@ -49,7 +49,9 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
let page
const uncompressed_page_size = Number(header.uncompressed_page_size)
const { codec } = columnMetadata
if (codec === CompressionCodec.SNAPPY) {
if (codec === CompressionCodec.UNCOMPRESSED) {
page = compressedBytes
} else if (codec === CompressionCodec.SNAPPY) {
page = new Uint8Array(uncompressed_page_size)
snappyUncompress(compressedBytes, page)
} else {
@ -138,6 +140,8 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
if (!diph) throw new Error('parquet dictionary page header is undefined')
dictionary = readDictionaryPage(page, diph, schema, columnMetadata)
} else if (header.type === PageType.DATA_PAGE_V2) {
throw new Error('parquet data page v2 not supported')
} else {
throw new Error(`parquet unsupported page type: ${header.type}`)
}

@ -32,7 +32,9 @@ export function readDataPage(bytes, daph, schema, columnMetadata) {
let values = []
// repetition levels
const { value: repetitionLevels, byteLength } = readRepetitionLevels(dataView, offset, daph, schema, columnMetadata)
const { value: repetitionLevels, byteLength } = readRepetitionLevels(
dataView, offset, daph, schema, columnMetadata
)
offset += byteLength
// definition levels
@ -52,9 +54,14 @@ export function readDataPage(bytes, daph, schema, columnMetadata) {
// read values based on encoding
const nval = daph.num_values - numNulls
if (daph.encoding === Encoding.PLAIN) {
const plainObj = readPlain(dataView, columnMetadata.type, daph.num_values - numNulls, offset)
const plainObj = readPlain(dataView, columnMetadata.type, nval, offset)
values = plainObj.value
offset += plainObj.byteLength
} else if (daph.encoding === Encoding.PLAIN_DICTIONARY) {
const plainObj = readPlain(dataView, columnMetadata.type, nval, offset)
values = plainObj.value
offset += plainObj.byteLength
// TODO: dictionary decoding
} else if (daph.encoding === Encoding.RLE_DICTIONARY) {
// bit width is stored as single byte
let bitWidth
@ -66,7 +73,9 @@ export function readDataPage(bytes, daph, schema, columnMetadata) {
offset += 1
}
if (bitWidth) {
const { value, byteLength } = readRleBitPackedHybrid(dataView, offset, bitWidth, dataView.byteLength - offset, daph.num_values - numNulls)
const { value, byteLength } = readRleBitPackedHybrid(
dataView, offset, bitWidth, dataView.byteLength - offset, nval
)
offset += byteLength
values = value
} else {
@ -124,7 +133,6 @@ function readRepetitionLevels(dataView, offset, daph, schema, columnMetadata) {
/**
* Read the definition levels from this page, if any.
* Other implementations read the definition levels and num nulls, but we don't need em.
*
* @param {DataView} dataView data view for the page
* @param {number} offset offset to start reading from

@ -206,11 +206,11 @@ export function readData(dataView, encoding, offset, count, bitWidth) {
if (encoding === ParquetEncoding.RLE) {
let seen = 0
while (seen < count) {
const { value: rleValues, byteLength: rleByteLength } = readRleBitPackedHybrid(dataView, offset + byteLength, bitWidth, 0, 1)
if (!rleValues.length) break // EOF
value.push(...rleValues)
seen += rleValues.length
byteLength += rleByteLength
const rle = readRleBitPackedHybrid(dataView, offset + byteLength, bitWidth, 0, count)
if (!rle.value.length) break // EOF
value.push(...rle.value)
seen += rle.value.length
byteLength += rle.byteLength
}
} else {
throw new Error(`parquet encoding not supported ${encoding}`)
@ -244,14 +244,16 @@ export function readRleBitPackedHybrid(dataView, offset, width, length, numValue
byteLength = newOffset - offset
if ((header & 1) === 0) {
// rle
const { value: rleValues, byteLength: rleByteLength } = readRle(dataView, offset + byteLength, header, width)
value.push(...rleValues)
byteLength += rleByteLength
const rle = readRle(dataView, offset + byteLength, header, width)
value.push(...rle.value)
byteLength += rle.byteLength
} else {
// bit-packed
const { value: bitPackedValues, byteLength: bitPackedByteLength } = readBitPacked(dataView, offset + byteLength, header, width, numValues-value.length)
value.push(...bitPackedValues)
byteLength += bitPackedByteLength
const bitPacked = readBitPacked(
dataView, offset + byteLength, header, width, numValues - value.length
)
value.push(...bitPacked.value)
byteLength += bitPacked.byteLength
}
}