mirror of
https://github.com/asadbek064/hyparquet.git
synced 2026-01-02 02:06:36 +00:00
181 lines
7.1 KiB
JavaScript
181 lines
7.1 KiB
JavaScript
|
|
import { decompressPage } from './column.js'
|
||
|
|
import { Encoding } from './constants.js'
|
||
|
|
import { readPlain, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js'
|
||
|
|
import { getMaxDefinitionLevel, getMaxRepetitionLevel, schemaElement } from './schema.js'
|
||
|
|
import { readVarInt, readZigZag } from './thrift.js'
|
||
|
|
|
||
|
|
/**
|
||
|
|
* @typedef {import("./types.d.ts").Decoded<T>} Decoded
|
||
|
|
* @template T
|
||
|
|
*/
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Read a data page from the given Uint8Array.
|
||
|
|
*
|
||
|
|
* @typedef {import("./types.d.ts").DataPage} DataPage
|
||
|
|
* @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData
|
||
|
|
* @typedef {import("./types.d.ts").DataPageHeaderV2} DataPageHeaderV2
|
||
|
|
* @typedef {import("./types.d.ts").PageHeader} PageHeader
|
||
|
|
* @typedef {import("./types.d.ts").SchemaElement} SchemaElement
|
||
|
|
* @param {Uint8Array} compressedBytes raw page data (should already be decompressed)
|
||
|
|
* @param {PageHeader} ph page header
|
||
|
|
* @param {SchemaElement[]} schema schema for the file
|
||
|
|
* @param {ColumnMetaData} columnMetadata metadata for the column
|
||
|
|
* @returns {DataPage} definition levels, repetition levels, and array of values
|
||
|
|
*/
|
||
|
|
export function readDataPageV2(compressedBytes, ph, schema, columnMetadata) {
|
||
|
|
const dataView = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength)
|
||
|
|
let offset = 0
|
||
|
|
/** @type {any} */
|
||
|
|
let values = []
|
||
|
|
|
||
|
|
const daph2 = ph.data_page_header_v2
|
||
|
|
if (!daph2) throw new Error('parquet data page header v2 is undefined')
|
||
|
|
|
||
|
|
// repetition levels
|
||
|
|
const repetitionLevels = readRepetitionLevelsV2(dataView, offset, daph2, schema, columnMetadata)
|
||
|
|
|
||
|
|
// definition levels
|
||
|
|
offset += daph2.repetition_levels_byte_length
|
||
|
|
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
|
||
|
|
const definitionLevels = readDefinitionLevelsV2(dataView, offset, daph2, maxDefinitionLevel)
|
||
|
|
offset += daph2.definition_levels_byte_length
|
||
|
|
|
||
|
|
const uncompressedPageSize = ph.uncompressed_page_size - daph2.definition_levels_byte_length - daph2.repetition_levels_byte_length
|
||
|
|
|
||
|
|
// read values based on encoding
|
||
|
|
const nValues = daph2.num_values - daph2.num_nulls
|
||
|
|
if (daph2.encoding === Encoding.PLAIN) {
|
||
|
|
const se = schemaElement(schema, columnMetadata.path_in_schema)
|
||
|
|
const utf8 = se.converted_type === 'UTF8'
|
||
|
|
const plainObj = readPlain(dataView, columnMetadata.type, nValues, offset, utf8)
|
||
|
|
values = plainObj.value
|
||
|
|
} else if (daph2.encoding === Encoding.RLE) {
|
||
|
|
const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec)
|
||
|
|
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
|
||
|
|
const bitWidth = 1
|
||
|
|
if (daph2.num_nulls) {
|
||
|
|
throw new Error('parquet RLE encoding with nulls not supported')
|
||
|
|
} else {
|
||
|
|
values = readRleBitPackedHybrid(
|
||
|
|
pageView, 4, bitWidth, uncompressedPageSize, nValues
|
||
|
|
).value
|
||
|
|
}
|
||
|
|
} else if (
|
||
|
|
daph2.encoding === Encoding.PLAIN_DICTIONARY ||
|
||
|
|
daph2.encoding === Encoding.RLE_DICTIONARY
|
||
|
|
) {
|
||
|
|
compressedBytes = compressedBytes.subarray(offset)
|
||
|
|
const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec)
|
||
|
|
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
|
||
|
|
|
||
|
|
const bitWidth = pageView.getUint8(0)
|
||
|
|
const { value } = readRleBitPackedHybrid(
|
||
|
|
pageView, 1, bitWidth, uncompressedPageSize, nValues
|
||
|
|
)
|
||
|
|
values = value
|
||
|
|
} else if (daph2.encoding === Encoding.DELTA_BINARY_PACKED) {
|
||
|
|
if (daph2.num_nulls) throw new Error('parquet delta-int not supported')
|
||
|
|
const codec = daph2.is_compressed ? columnMetadata.codec : 'UNCOMPRESSED'
|
||
|
|
const page = decompressPage(compressedBytes, uncompressedPageSize, codec)
|
||
|
|
deltaBinaryUnpack(page, nValues, values)
|
||
|
|
} else {
|
||
|
|
throw new Error(`parquet unsupported encoding: ${daph2.encoding}`)
|
||
|
|
}
|
||
|
|
|
||
|
|
return { definitionLevels, repetitionLevels, value: values }
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Read the repetition levels from this page, if any.
|
||
|
|
*
|
||
|
|
* @param {DataView} dataView data view for the page
|
||
|
|
* @param {number} offset offset to start reading from
|
||
|
|
* @param {DataPageHeaderV2} daph2 data page header
|
||
|
|
* @param {SchemaElement[]} schema schema for the file
|
||
|
|
* @param {ColumnMetaData} columnMetadata metadata for the column
|
||
|
|
* @returns {any[]} repetition levels and number of bytes read
|
||
|
|
*/
|
||
|
|
export function readRepetitionLevelsV2(dataView, offset, daph2, schema, columnMetadata) {
|
||
|
|
const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema)
|
||
|
|
if (maxRepetitionLevel) {
|
||
|
|
const bitWidth = widthFromMaxInt(maxRepetitionLevel)
|
||
|
|
// num_values is index 1 for either type of page header
|
||
|
|
return readRleBitPackedHybrid(
|
||
|
|
dataView, offset, bitWidth, daph2.repetition_levels_byte_length, daph2.num_values
|
||
|
|
).value
|
||
|
|
}
|
||
|
|
return []
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Read the definition levels from this page, if any.
|
||
|
|
*
|
||
|
|
* @param {DataView} dataView data view for the page
|
||
|
|
* @param {number} offset offset to start reading from
|
||
|
|
* @param {DataPageHeaderV2} daph2 data page header v2
|
||
|
|
* @param {number} maxDefinitionLevel maximum definition level for this column
|
||
|
|
* @returns {number[] | undefined} definition levels and number of bytes read
|
||
|
|
*/
|
||
|
|
function readDefinitionLevelsV2(dataView, offset, daph2, maxDefinitionLevel) {
|
||
|
|
if (maxDefinitionLevel) {
|
||
|
|
// not the same as V1, because we know the length
|
||
|
|
const bitWidth = widthFromMaxInt(maxDefinitionLevel)
|
||
|
|
return readRleBitPackedHybrid(
|
||
|
|
dataView, offset, bitWidth, daph2.definition_levels_byte_length, daph2.num_values
|
||
|
|
).value
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Unpack the delta binary packed encoding.
|
||
|
|
*
|
||
|
|
* @param {Uint8Array} page page data
|
||
|
|
* @param {number} nValues number of values to read
|
||
|
|
* @param {any[]} values array to write to
|
||
|
|
*/
|
||
|
|
function deltaBinaryUnpack(page, nValues, values) {
|
||
|
|
const dataView = new DataView(page.buffer, page.byteOffset, page.byteLength)
|
||
|
|
const [blockSize, index1] = readVarInt(dataView, 0)
|
||
|
|
const [miniblockPerBlock, index2] = readVarInt(dataView, index1)
|
||
|
|
const [count, index3] = readVarInt(dataView, index2)
|
||
|
|
let [value, offset] = readZigZag(dataView, index3)
|
||
|
|
|
||
|
|
const valuesPerMiniblock = blockSize / miniblockPerBlock
|
||
|
|
|
||
|
|
for (let valueIndex = 0; valueIndex < nValues;) {
|
||
|
|
const [minDelta, index4] = readZigZag(dataView, offset)
|
||
|
|
offset = index4
|
||
|
|
const bitWidths = new Uint8Array(miniblockPerBlock)
|
||
|
|
for (let i = 0; i < miniblockPerBlock; i++, offset++) {
|
||
|
|
bitWidths[i] = page[offset]
|
||
|
|
}
|
||
|
|
|
||
|
|
for (let i = 0; i < miniblockPerBlock; i++) {
|
||
|
|
const bitWidth = bitWidths[i]
|
||
|
|
if (bitWidth) {
|
||
|
|
if (count > 1) {
|
||
|
|
// no more diffs if on last value, delta read bitpacked
|
||
|
|
let data = 0
|
||
|
|
let stop = -bitWidth
|
||
|
|
// TODO: possible loss of precision
|
||
|
|
const mask = 0xffffffffffffffff >> (64 - bitWidth)
|
||
|
|
while (count) {
|
||
|
|
if (stop < 0) {
|
||
|
|
data = ((data & 0x00ffffffffffffff) << 8) | dataView.getUint8(offset++)
|
||
|
|
stop += 8
|
||
|
|
} else {
|
||
|
|
values.push((data >> stop) & mask)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
} else {
|
||
|
|
for (let j = 0; j < valuesPerMiniblock && valueIndex < nValues; j++, valueIndex++) {
|
||
|
|
values[valueIndex] = value
|
||
|
|
value += minDelta
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|