No copy readRleBitPackedHybrid

This commit is contained in:
Kenny Daniel 2024-04-30 14:40:18 -07:00
parent 195ee9bc80
commit f16e23f501
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
4 changed files with 29 additions and 65 deletions

@ -1,4 +1,4 @@
import { readData, readPlain, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js'
import { readPlain, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired, skipDefinitionBytes } from './schema.js'
const skipNulls = false // TODO
@ -62,10 +62,8 @@ export function readDataPage(bytes, daph, schemaPath, columnMetadata) {
reader.offset++
}
if (bitWidth) {
const value = readRleBitPackedHybrid(
reader, bitWidth, view.byteLength - reader.offset, nValues
)
values = Array.isArray(value) ? value : Array.from(value)
values = new Array(nValues)
readRleBitPackedHybrid(reader, bitWidth, view.byteLength - reader.offset, values)
} else {
// nval zeros
values = new Array(nValues).fill(0)
@ -106,9 +104,9 @@ function readRepetitionLevels(reader, daph, schemaPath) {
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
if (maxRepetitionLevel) {
const bitWidth = widthFromMaxInt(maxRepetitionLevel)
return readData(
reader, daph.repetition_level_encoding, daph.num_values, bitWidth
)
const values = new Array(daph.num_values)
readRleBitPackedHybrid(reader, bitWidth, 0, values)
return values
}
}
return []
@ -128,9 +126,8 @@ function readDefinitionLevels(reader, daph, schemaPath) {
const bitWidth = widthFromMaxInt(maxDefinitionLevel)
if (bitWidth) {
// num_values is index 1 for either type of page header
const definitionLevels = readData(
reader, daph.definition_level_encoding, daph.num_values, bitWidth
)
const definitionLevels = new Array(daph.num_values)
readRleBitPackedHybrid(reader, bitWidth, 0, definitionLevels)
// count nulls
let numNulls = daph.num_values

@ -64,9 +64,8 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata,
throw new Error('parquet RLE encoding with nulls not supported')
} else {
const pageReader = { view: pageView, offset: 4 }
values = readRleBitPackedHybrid(
pageReader, bitWidth, uncompressedPageSize, nValues
)
values = new Array(nValues)
readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize, values)
}
} else if (
daph2.encoding === 'PLAIN_DICTIONARY' ||
@ -77,10 +76,8 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata,
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const bitWidth = pageView.getUint8(0)
const pageReader = { view: pageView, offset: 1 }
const value = readRleBitPackedHybrid(
pageReader, bitWidth, uncompressedPageSize, nValues
)
values = value
values = new Array(nValues)
readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize, values)
} else if (daph2.encoding === 'DELTA_BINARY_PACKED') {
if (daph2.num_nulls) throw new Error('parquet delta-int not supported')
const codec = daph2.is_compressed ? columnMetadata.codec : 'UNCOMPRESSED'
@ -108,9 +105,11 @@ export function readRepetitionLevelsV2(reader, daph2, schemaPath) {
const bitWidth = widthFromMaxInt(maxRepetitionLevel)
// num_values is index 1 for either type of page header
return readRleBitPackedHybrid(
reader, bitWidth, daph2.repetition_levels_byte_length, daph2.num_values
const values = new Array(daph2.num_values)
readRleBitPackedHybrid(
reader, bitWidth, daph2.repetition_levels_byte_length, values
)
return values
}
/**
@ -125,9 +124,9 @@ function readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) {
if (maxDefinitionLevel) {
// not the same as V1, because we know the length
const bitWidth = widthFromMaxInt(maxDefinitionLevel)
return readRleBitPackedHybrid(
reader, bitWidth, daph2.definition_levels_byte_length, daph2.num_values
)
const values = new Array(daph2.num_values)
readRleBitPackedHybrid(reader, bitWidth, daph2.definition_levels_byte_length, values)
return values
}
}

@ -185,33 +185,6 @@ export function widthFromMaxInt(value) {
return Math.ceil(Math.log2(value + 1))
}
/**
* Read data from the file-object using the given encoding.
* The data could be definition levels, repetition levels, or actual values.
*
* @typedef {import("./types.d.ts").Encoding} Encoding
* @param {DataReader} reader - buffer to read data from
* @param {Encoding} encoding - encoding type
* @param {number} count - number of values to read
* @param {number} bitWidth - width of each bit-packed group
* @returns {any[]} array of values
*/
export function readData(reader, encoding, count, bitWidth) {
const values = new Array(count)
if (encoding === 'RLE') {
let seen = 0
while (seen < count) {
const rle = readRleBitPackedHybrid(reader, bitWidth, 0, count)
if (!rle.length) break // EOF
splice(values, rle, seen)
seen += rle.length
}
} else {
throw new Error(`parquet encoding not supported ${encoding}`)
}
return values
}
/**
* Read values from a run-length encoded/bit-packed hybrid encoding.
*
@ -221,20 +194,17 @@ export function readData(reader, encoding, count, bitWidth) {
* @param {DataReader} reader - buffer to read data from
* @param {number} width - width of each bit-packed group
* @param {number} length - length of the encoded data
* @param {number} numValues - number of values to read
* @returns {number[]} array of rle/bit-packed values
* @param {number[]} values - output array
*/
export function readRleBitPackedHybrid(reader, width, length, numValues) {
export function readRleBitPackedHybrid(reader, width, length, values) {
if (!length) {
length = reader.view.getInt32(reader.offset, true)
reader.offset += 4
if (length < 0) throw new Error(`parquet invalid rle/bitpack length ${length}`)
}
/** @type {number[]} */
const values = new Array(numValues)
let seen = 0
const startOffset = reader.offset
while (reader.offset - startOffset < length && seen < numValues) {
while (reader.offset - startOffset < length && seen < values.length) {
const [header, newOffset] = readVarInt(reader.view, reader.offset)
reader.offset = newOffset
if ((header & 1) === 0) {
@ -244,15 +214,11 @@ export function readRleBitPackedHybrid(reader, width, length, numValues) {
seen += rle.length
} else {
// bit-packed
const bitPacked = readBitPacked(
reader, header, width, numValues - seen
)
const bitPacked = readBitPacked(reader, header, width, values.length - seen)
splice(values, bitPacked, seen)
seen += bitPacked.length
}
}
return values
}
/**

@ -111,9 +111,10 @@ describe('readRleBitPackedHybrid', () => {
view.setUint8(3, 0b00000100) // Bit-packed values (false, false, true)
const reader = { view, offset: 0 }
const value = readRleBitPackedHybrid(reader, 1, 3, 6)
const values = new Array(6)
readRleBitPackedHybrid(reader, 1, 3, values)
expect(reader.offset).toBe(4)
expect(value).toEqual([1, 1, 1, 0, 0, 1])
expect(values).toEqual([1, 1, 1, 0, 0, 1])
})
it('reads RLE bit-packed hybrid values with implicit length', () => {
@ -127,8 +128,9 @@ describe('readRleBitPackedHybrid', () => {
view.setUint8(7, 0b00000100) // Bit-packed values (false, false, true)
const reader = { view, offset: 0 }
const value = readRleBitPackedHybrid(reader, 1, 0, 6)
const values = new Array(6)
readRleBitPackedHybrid(reader, 1, 0, values)
expect(reader.offset).toBe(8)
expect(value).toEqual([1, 1, 1, 0, 0, 1])
expect(values).toEqual([1, 1, 1, 0, 0, 1])
})
})