hyparquet/src/datapageV2.js

118 lines
4.9 KiB
JavaScript
Raw Normal View History

import { decompressPage } from './datapage.js'
2024-05-18 06:44:55 +00:00
import { deltaBinaryUnpack, deltaByteArray, deltaLengthByteArray } from './delta.js'
2024-05-22 09:34:42 +00:00
import { bitWidth, byteStreamSplit, readRleBitPackedHybrid } from './encoding.js'
2024-05-01 03:28:50 +00:00
import { readPlain } from './plain.js'
2024-04-30 00:38:26 +00:00
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
2024-02-24 18:11:04 +00:00
/**
* Read a data page from the given Uint8Array.
*
* @typedef {import("./types.d.ts").DataPage} DataPage
* @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData
2024-02-23 18:25:06 +00:00
* @typedef {import("./types.d.ts").Compressors} Compressors
2024-02-24 18:11:04 +00:00
* @typedef {import("./types.d.ts").DataPageHeaderV2} DataPageHeaderV2
2024-04-30 00:38:26 +00:00
* @typedef {import("./types.d.ts").SchemaTree} SchemaTree
2024-02-29 23:13:20 +00:00
* @param {Uint8Array} compressedBytes raw page data
2024-04-28 22:58:25 +00:00
* @param {import("./types.d.ts").PageHeader} ph page header
* @param {SchemaTree[]} schemaPath
* @param {ColumnMetaData} columnMetadata
2024-02-23 18:25:06 +00:00
* @param {Compressors | undefined} compressors
2024-02-24 18:11:04 +00:00
* @returns {DataPage} definition levels, repetition levels, and array of values
*/
2024-04-30 00:38:26 +00:00
export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, compressors) {
const view = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength)
const reader = { view, offset: 0 }
2024-05-20 09:53:07 +00:00
const { codec, type } = columnMetadata
2024-02-24 18:11:04 +00:00
const daph2 = ph.data_page_header_v2
if (!daph2) throw new Error('parquet data page header v2 is undefined')
// repetition levels
2024-04-30 00:38:26 +00:00
const repetitionLevels = readRepetitionLevelsV2(reader, daph2, schemaPath)
2024-05-24 05:11:47 +00:00
reader.offset = daph2.repetition_levels_byte_length // readVarInt() => len for boolean v2?
2024-02-24 18:11:04 +00:00
// definition levels
2024-05-22 09:34:42 +00:00
const definitionLevels = readDefinitionLevelsV2(reader, daph2, schemaPath)
2024-05-11 01:50:12 +00:00
// assert(reader.offset === daph2.repetition_levels_byte_length + daph2.definition_levels_byte_length)
2024-02-24 18:11:04 +00:00
const uncompressedPageSize = ph.uncompressed_page_size - daph2.definition_levels_byte_length - daph2.repetition_levels_byte_length
2024-05-11 01:50:12 +00:00
let page = compressedBytes.subarray(reader.offset)
2024-05-24 03:08:18 +00:00
if (daph2.is_compressed !== false) {
page = decompressPage(page, uncompressedPageSize, codec, compressors)
}
2024-05-11 01:50:12 +00:00
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
2024-05-19 01:21:18 +00:00
const pageReader = { view: pageView, offset: 0 }
2024-05-11 01:50:12 +00:00
2024-02-24 18:11:04 +00:00
// read values based on encoding
2024-05-11 01:50:12 +00:00
/** @type {import('./types.d.ts').DecodedArray} */
2024-02-29 23:13:20 +00:00
let dataPage
2024-02-24 18:11:04 +00:00
const nValues = daph2.num_values - daph2.num_nulls
2024-02-27 18:33:17 +00:00
if (daph2.encoding === 'PLAIN') {
2024-05-13 04:11:57 +00:00
const { type_length } = schemaPath[schemaPath.length - 1].element
2024-05-20 09:53:07 +00:00
dataPage = readPlain(pageReader, type, nValues, type_length)
2024-02-27 18:33:17 +00:00
} else if (daph2.encoding === 'RLE') {
2024-05-22 00:29:13 +00:00
// assert(columnMetadata.type === 'BOOLEAN')
2024-05-13 01:12:30 +00:00
dataPage = new Array(nValues)
2024-05-22 00:29:13 +00:00
readRleBitPackedHybrid(pageReader, 1, 0, dataPage)
2024-05-22 05:02:08 +00:00
dataPage = dataPage.map(x => !!x)
2024-02-24 18:11:04 +00:00
} else if (
2024-02-27 18:33:17 +00:00
daph2.encoding === 'PLAIN_DICTIONARY' ||
daph2.encoding === 'RLE_DICTIONARY'
2024-02-24 18:11:04 +00:00
) {
2024-05-22 00:29:13 +00:00
const bitWidth = pageView.getUint8(pageReader.offset++)
2024-05-02 06:23:50 +00:00
dataPage = new Array(nValues)
2024-05-22 00:29:13 +00:00
readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize - 1, dataPage)
2024-02-27 18:33:17 +00:00
} else if (daph2.encoding === 'DELTA_BINARY_PACKED') {
2024-05-20 09:53:07 +00:00
const int32 = type === 'INT32'
2024-05-11 01:50:12 +00:00
dataPage = int32 ? new Int32Array(nValues) : new BigInt64Array(nValues)
deltaBinaryUnpack(pageReader, nValues, dataPage)
2024-05-18 06:44:55 +00:00
} else if (daph2.encoding === 'DELTA_LENGTH_BYTE_ARRAY') {
dataPage = new Array(nValues)
deltaLengthByteArray(pageReader, nValues, dataPage)
2024-02-29 23:13:20 +00:00
} else if (daph2.encoding === 'DELTA_BYTE_ARRAY') {
dataPage = new Array(nValues)
deltaByteArray(pageReader, nValues, dataPage)
2024-05-20 09:53:07 +00:00
} else if (daph2.encoding === 'BYTE_STREAM_SPLIT') {
const { type_length } = schemaPath[schemaPath.length - 1].element
dataPage = byteStreamSplit(reader, nValues, type, type_length)
2024-02-24 18:11:04 +00:00
} else {
throw new Error(`parquet unsupported encoding: ${daph2.encoding}`)
}
2024-05-02 06:23:50 +00:00
return { definitionLevels, repetitionLevels, dataPage }
2024-02-24 18:11:04 +00:00
}
/**
* @typedef {import("./types.d.ts").DataReader} DataReader
2024-05-19 01:21:18 +00:00
* @param {DataReader} reader
* @param {DataPageHeaderV2} daph2 data page header v2
* @param {SchemaTree[]} schemaPath
2024-05-19 01:21:18 +00:00
* @returns {any[]} repetition levels
2024-02-24 18:11:04 +00:00
*/
2024-04-30 00:38:26 +00:00
export function readRepetitionLevelsV2(reader, daph2, schemaPath) {
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
2024-02-27 03:33:38 +00:00
if (!maxRepetitionLevel) return []
2024-04-30 21:40:18 +00:00
const values = new Array(daph2.num_values)
readRleBitPackedHybrid(
2024-05-22 09:34:42 +00:00
reader, bitWidth(maxRepetitionLevel), daph2.repetition_levels_byte_length, values
)
2024-04-30 21:40:18 +00:00
return values
2024-02-24 18:11:04 +00:00
}
/**
2024-05-19 01:21:18 +00:00
* @param {DataReader} reader
2024-02-24 18:11:04 +00:00
* @param {DataPageHeaderV2} daph2 data page header v2
2024-05-22 09:34:42 +00:00
* @param {SchemaTree[]} schemaPath
2024-05-19 01:21:18 +00:00
* @returns {number[] | undefined} definition levels
2024-02-24 18:11:04 +00:00
*/
2024-05-22 09:34:42 +00:00
function readDefinitionLevelsV2(reader, daph2, schemaPath) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
2024-02-24 18:11:04 +00:00
if (maxDefinitionLevel) {
2024-05-22 09:34:42 +00:00
// V2 we know the length
2024-04-30 21:40:18 +00:00
const values = new Array(daph2.num_values)
2024-05-22 09:34:42 +00:00
readRleBitPackedHybrid(reader, bitWidth(maxDefinitionLevel), daph2.definition_levels_byte_length, values)
2024-04-30 21:40:18 +00:00
return values
2024-02-24 18:11:04 +00:00
}
}