hyparquet/src/datapage.js

128 lines
4.6 KiB
JavaScript
Raw Normal View History

2024-05-01 03:28:50 +00:00
import { readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js'
import { readPlain } from './plain.js'
2024-05-02 06:23:50 +00:00
import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired } from './schema.js'
2024-01-07 23:33:24 +00:00
/**
2024-04-18 07:02:29 +00:00
* Read a data page from the given Uint8Array.
*
2024-02-26 18:32:53 +00:00
* @typedef {import("./types.d.ts").DataPage} DataPage
2024-01-07 23:33:24 +00:00
* @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData
* @typedef {import("./types.d.ts").DataPageHeader} DataPageHeader
2024-04-30 00:38:26 +00:00
* @typedef {import("./types.d.ts").SchemaTree} SchemaTree
2024-05-02 06:23:50 +00:00
* @typedef {import("./types.d.ts").DecodedArray} DecodedArray
2024-01-07 23:33:24 +00:00
* @param {Uint8Array} bytes raw page data (should already be decompressed)
* @param {DataPageHeader} daph data page header
* @param {SchemaTree[]} schemaPath
* @param {ColumnMetaData} columnMetadata
2024-01-20 21:52:36 +00:00
* @returns {DataPage} definition levels, repetition levels, and array of values
2024-01-07 23:33:24 +00:00
*/
2024-04-30 00:38:26 +00:00
export function readDataPage(bytes, daph, schemaPath, columnMetadata) {
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
const reader = { view, offset: 0 }
2024-05-02 06:23:50 +00:00
/** @type {DecodedArray} */
let dataPage = []
2024-01-07 23:33:24 +00:00
2024-05-11 01:50:12 +00:00
// repetition and definition levels
2024-04-30 00:38:26 +00:00
const repetitionLevels = readRepetitionLevels(reader, daph, schemaPath)
2024-05-02 06:23:50 +00:00
const { definitionLevels, numNulls } = readDefinitionLevels(reader, daph, schemaPath)
2024-01-07 23:33:24 +00:00
// read values based on encoding
2024-02-26 18:32:53 +00:00
const nValues = daph.num_values - numNulls
2024-02-27 18:33:17 +00:00
if (daph.encoding === 'PLAIN') {
2024-05-13 04:11:57 +00:00
const { type_length } = schemaPath[schemaPath.length - 1].element
dataPage = readPlain(reader, columnMetadata.type, nValues, type_length)
2024-02-12 04:43:54 +00:00
} else if (
2024-02-27 18:33:17 +00:00
daph.encoding === 'PLAIN_DICTIONARY' ||
daph.encoding === 'RLE_DICTIONARY' ||
daph.encoding === 'RLE'
2024-02-12 04:43:54 +00:00
) {
2024-01-07 23:33:24 +00:00
// bit width is stored as single byte
2024-05-11 01:50:12 +00:00
let bitWidth = 1
2024-01-07 23:33:24 +00:00
// TODO: RLE encoding uses bitWidth = schemaElement.type_length
2024-05-11 01:50:12 +00:00
if (columnMetadata.type !== 'BOOLEAN') {
bitWidth = view.getUint8(reader.offset)
reader.offset++
2024-01-07 23:33:24 +00:00
}
if (bitWidth) {
2024-05-02 06:23:50 +00:00
dataPage = new Array(nValues)
readRleBitPackedHybrid(reader, bitWidth, view.byteLength - reader.offset, dataPage)
2024-01-07 23:33:24 +00:00
} else {
// nval zeros
2024-05-02 06:23:50 +00:00
dataPage = new Array(nValues).fill(0)
2024-01-07 23:33:24 +00:00
}
} else {
throw new Error(`parquet unsupported encoding: ${daph.encoding}`)
}
2024-05-02 06:23:50 +00:00
return { definitionLevels, repetitionLevels, dataPage }
2024-01-07 23:33:24 +00:00
}
/**
* Read a page containing dictionary data.
*
* @param {Uint8Array} bytes raw page data
2024-05-11 01:50:12 +00:00
* @param {import("./types.d.ts").DictionaryPageHeader} diph dictionary page header
* @param {ColumnMetaData} columnMetadata
2024-05-13 04:11:57 +00:00
* @param {number | undefined} typeLength - type_length from schema
2024-01-07 23:33:24 +00:00
* @returns {ArrayLike<any>} array of values
*/
2024-05-13 04:11:57 +00:00
export function readDictionaryPage(bytes, diph, columnMetadata, typeLength) {
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
const reader = { view, offset: 0 }
2024-05-13 04:11:57 +00:00
return readPlain(reader, columnMetadata.type, diph.num_values, typeLength)
2024-01-07 23:33:24 +00:00
}
/**
* Read the repetition levels from this page, if any.
*
* @typedef {import("./types.d.ts").DataReader} DataReader
* @param {DataReader} reader data view for the page
2024-01-07 23:33:24 +00:00
* @param {DataPageHeader} daph data page header
* @param {SchemaTree[]} schemaPath
* @returns {any[]} repetition levels and number of bytes read
2024-01-07 23:33:24 +00:00
*/
2024-04-30 00:38:26 +00:00
function readRepetitionLevels(reader, daph, schemaPath) {
if (schemaPath.length > 1) {
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
2024-02-24 18:11:04 +00:00
if (maxRepetitionLevel) {
2024-01-07 23:33:24 +00:00
const bitWidth = widthFromMaxInt(maxRepetitionLevel)
2024-04-30 21:40:18 +00:00
const values = new Array(daph.num_values)
readRleBitPackedHybrid(reader, bitWidth, 0, values)
return values
2024-01-07 23:33:24 +00:00
}
}
return []
2024-01-07 23:33:24 +00:00
}
/**
* Read the definition levels from this page, if any.
*
* @param {DataReader} reader data view for the page
2024-01-07 23:33:24 +00:00
* @param {DataPageHeader} daph data page header
* @param {SchemaTree[]} schemaPath
2024-05-02 06:23:50 +00:00
* @returns {{ definitionLevels: number[], numNulls: number }} definition levels
2024-01-07 23:33:24 +00:00
*/
2024-04-30 00:38:26 +00:00
function readDefinitionLevels(reader, daph, schemaPath) {
if (!isRequired(schemaPath)) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
2024-01-07 23:33:24 +00:00
const bitWidth = widthFromMaxInt(maxDefinitionLevel)
if (bitWidth) {
// num_values is index 1 for either type of page header
2024-04-30 21:40:18 +00:00
const definitionLevels = new Array(daph.num_values)
readRleBitPackedHybrid(reader, bitWidth, 0, definitionLevels)
2024-01-20 21:52:36 +00:00
// count nulls
let numNulls = daph.num_values
for (const def of definitionLevels) {
if (def === maxDefinitionLevel) numNulls--
}
if (numNulls === 0) {
definitionLevels.length = 0
}
return { definitionLevels, numNulls }
2024-01-07 23:33:24 +00:00
}
}
return { definitionLevels: [], numNulls: 0 }
2024-01-07 23:33:24 +00:00
}