Data Page V2

This commit is contained in:
Kenny Daniel 2024-02-24 10:11:04 -08:00
parent ca971ccc01
commit a65132b79c
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
11 changed files with 405 additions and 22 deletions

@ -11,6 +11,7 @@
"plugins": ["import", "jsdoc"],
"rules": {
"@typescript-eslint/no-explicit-any": "warn",
"@typescript-eslint/no-loss-of-precision": "warn",
"@typescript-eslint/no-unused-vars": "warn",
"arrow-spacing": "error",
"camelcase": "off",

@ -93,7 +93,7 @@ Page Type:
- [X] Data Page
- [ ] Index Page
- [X] Dictionary Page
- [ ] Data Page V2
- [X] Data Page V2
Contributions are welcome!

@ -1,6 +1,7 @@
import { Encoding, PageType } from './constants.js'
import { convert } from './convert.js'
import { assembleObjects, readDataPage, readDictionaryPage } from './datapage.js'
import { readDataPageV2 } from './datapageV2.js'
import { parquetHeader } from './header.js'
import { getMaxDefinitionLevel, isRequired, schemaElement } from './schema.js'
import { snappyUncompress } from './snappy.js'
@ -58,13 +59,8 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
/** @type {any[]} */
let values
if (repetitionLevels.length) {
dereferenceDictionary(dictionary, dataPage)
// Use repetition levels to construct lists
if (dictionaryEncoding && dictionary !== undefined && Array.isArray(dataPage)) {
// dereference dictionary values
for (let i = 0; i < dataPage.length; i++) {
dataPage[i] = dictionary[dataPage[i]]
}
}
const isNull = columnMetadata && !isRequired(schema, [columnMetadata.path_in_schema[0]])
const nullValue = false // TODO: unused?
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
@ -75,13 +71,9 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
values = []
skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, values)
} else {
if (dictionaryEncoding && dictionary !== undefined && Array.isArray(dataPage)) {
// dereference dictionary values
values = []
for (let i = 0; i < dataPage.length; i++) {
values[i] = dictionary[dataPage[i]]
}
values = convert(values, schemaElement(schema, columnMetadata.path_in_schema))
if (dictionaryEncoding && dictionary) {
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, schemaElement(schema, columnMetadata.path_in_schema))
} else if (Array.isArray(dataPage)) {
// convert primitive types to rich types
values = convert(dataPage, schemaElement(schema, columnMetadata.path_in_schema))
@ -104,18 +96,55 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
)
dictionary = readDictionaryPage(page, diph, schema, columnMetadata)
} else if (header.type === PageType.DATA_PAGE_V2) {
throw new Error('parquet data page v2 not supported')
const daph2 = header.data_page_header_v2
if (!daph2) throw new Error('parquet data page header v2 is undefined')
const { definitionLevels, repetitionLevels, value: dataPage } = readDataPageV2(
compressedBytes, header, schema, columnMetadata
)
valuesSeen += daph2.num_values
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
if (repetitionLevels.length) {
dereferenceDictionary(dictionary, dataPage)
// Use repetition levels to construct lists
rowData.push(...assembleObjects(
definitionLevels, repetitionLevels, dataPage, true, false, maxDefinitionLevel, rowIndex[0]
))
} else if (daph2.num_nulls) {
// skip nulls
if (!definitionLevels) throw new Error('parquet data page v2 nulls missing definition levels')
skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, rowData)
} else {
dereferenceDictionary(dictionary, dataPage)
rowData.push(...dataPage)
}
// TODO: convert?
} else {
throw new Error(`parquet unsupported page type: ${header.type}`)
}
byteOffset += header.compressed_page_size
}
if (rowData.length !== Number(rowGroup.num_rows)) {
throw new Error(`parquet column length ${rowData.length} does not match row group length ${rowGroup.num_rows}}`)
throw new Error(`parquet row data length ${rowData.length} does not match row group length ${rowGroup.num_rows}}`)
}
return rowData
}
/**
* Map data to dictionary values in place.
*
* @param {ArrayLike<any> | undefined} dictionary
* @param {number[]} dataPage
*/
function dereferenceDictionary(dictionary, dataPage) {
if (dictionary) {
for (let i = 0; i < dataPage.length; i++) {
dataPage[i] = dictionary[dataPage[i]]
}
}
}
/**
* Find the start byte offset for a column chunk.
*
@ -139,7 +168,7 @@ export function getColumnOffset(columnMetadata) {
* @param {CompressionCodec} codec
* @returns {Uint8Array}
*/
function decompressPage(compressedBytes, uncompressed_page_size, codec) {
export function decompressPage(compressedBytes, uncompressed_page_size, codec) {
/** @type {Uint8Array | undefined} */
let page
if (codec === 'UNCOMPRESSED') {

@ -127,9 +127,8 @@ export function readDictionaryPage(bytes, diph, schema, columnMetadata) {
function readRepetitionLevels(dataView, offset, daph, schema, columnMetadata) {
if (columnMetadata.path_in_schema.length > 1) {
const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema)
if (maxRepetitionLevel !== 0) {
if (maxRepetitionLevel) {
const bitWidth = widthFromMaxInt(maxRepetitionLevel)
// num_values is index 1 for either type of page header
return readData(
dataView, daph.repetition_level_encoding, offset, daph.num_values, bitWidth
)

181
src/datapageV2.js Normal file

@ -0,0 +1,181 @@
import { off } from 'process'
import { decompressPage } from './column.js'
import { Encoding } from './constants.js'
import { readPlain, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel, schemaElement } from './schema.js'
import { readVarInt, readZigZag } from './thrift.js'
/**
* @typedef {import("./types.d.ts").Decoded<T>} Decoded
* @template T
*/
/**
* Read a data page from the given Uint8Array.
*
* @typedef {import("./types.d.ts").DataPage} DataPage
* @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData
* @typedef {import("./types.d.ts").DataPageHeaderV2} DataPageHeaderV2
* @typedef {import("./types.d.ts").PageHeader} PageHeader
* @typedef {import("./types.d.ts").SchemaElement} SchemaElement
* @param {Uint8Array} compressedBytes raw page data (should already be decompressed)
* @param {PageHeader} ph page header
* @param {SchemaElement[]} schema schema for the file
* @param {ColumnMetaData} columnMetadata metadata for the column
* @returns {DataPage} definition levels, repetition levels, and array of values
*/
export function readDataPageV2(compressedBytes, ph, schema, columnMetadata) {
const dataView = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength)
let offset = 0
/** @type {any} */
let values = []
const daph2 = ph.data_page_header_v2
if (!daph2) throw new Error('parquet data page header v2 is undefined')
// repetition levels
const repetitionLevels = readRepetitionLevelsV2(dataView, offset, daph2, schema, columnMetadata)
// definition levels
offset += daph2.repetition_levels_byte_length
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
const definitionLevels = readDefinitionLevelsV2(dataView, offset, daph2, maxDefinitionLevel)
offset += daph2.definition_levels_byte_length
const uncompressedPageSize = ph.uncompressed_page_size - daph2.definition_levels_byte_length - daph2.repetition_levels_byte_length
// read values based on encoding
const nValues = daph2.num_values - daph2.num_nulls
if (daph2.encoding === Encoding.PLAIN) {
const se = schemaElement(schema, columnMetadata.path_in_schema)
const utf8 = se.converted_type === 'UTF8'
const plainObj = readPlain(dataView, columnMetadata.type, nValues, offset, utf8)
values = plainObj.value
} else if (daph2.encoding === Encoding.RLE) {
const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec)
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const bitWidth = 1
if (daph2.num_nulls) {
throw new Error('parquet RLE encoding with nulls not supported')
} else {
values = readRleBitPackedHybrid(
pageView, 4, bitWidth, uncompressedPageSize, nValues
).value
}
} else if (
daph2.encoding === Encoding.PLAIN_DICTIONARY ||
daph2.encoding === Encoding.RLE_DICTIONARY
) {
compressedBytes = compressedBytes.subarray(offset)
const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec)
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const bitWidth = pageView.getUint8(0)
const { value } = readRleBitPackedHybrid(
pageView, 1, bitWidth, uncompressedPageSize, nValues
)
values = value
} else if (daph2.encoding === Encoding.DELTA_BINARY_PACKED) {
if (daph2.num_nulls) throw new Error('parquet delta-int not supported')
const codec = daph2.is_compressed ? columnMetadata.codec : 'UNCOMPRESSED'
const page = decompressPage(compressedBytes, uncompressedPageSize, codec)
deltaBinaryUnpack(page, nValues, values)
} else {
throw new Error(`parquet unsupported encoding: ${daph2.encoding}`)
}
return { definitionLevels, repetitionLevels, value: values }
}
/**
* Read the repetition levels from this page, if any.
*
* @param {DataView} dataView data view for the page
* @param {number} offset offset to start reading from
* @param {DataPageHeaderV2} daph2 data page header
* @param {SchemaElement[]} schema schema for the file
* @param {ColumnMetaData} columnMetadata metadata for the column
* @returns {any[]} repetition levels and number of bytes read
*/
export function readRepetitionLevelsV2(dataView, offset, daph2, schema, columnMetadata) {
const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema)
if (maxRepetitionLevel) {
const bitWidth = widthFromMaxInt(maxRepetitionLevel)
// num_values is index 1 for either type of page header
return readRleBitPackedHybrid(
dataView, offset, bitWidth, daph2.repetition_levels_byte_length, daph2.num_values
).value
}
return []
}
/**
* Read the definition levels from this page, if any.
*
* @param {DataView} dataView data view for the page
* @param {number} offset offset to start reading from
* @param {DataPageHeaderV2} daph2 data page header v2
* @param {number} maxDefinitionLevel maximum definition level for this column
* @returns {number[] | undefined} definition levels and number of bytes read
*/
function readDefinitionLevelsV2(dataView, offset, daph2, maxDefinitionLevel) {
if (maxDefinitionLevel) {
// not the same as V1, because we know the length
const bitWidth = widthFromMaxInt(maxDefinitionLevel)
return readRleBitPackedHybrid(
dataView, offset, bitWidth, daph2.definition_levels_byte_length, daph2.num_values
).value
}
}
/**
* Unpack the delta binary packed encoding.
*
* @param {Uint8Array} page page data
* @param {number} nValues number of values to read
* @param {any[]} values array to write to
*/
function deltaBinaryUnpack(page, nValues, values) {
const dataView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const [blockSize, index1] = readVarInt(dataView, 0)
const [miniblockPerBlock, index2] = readVarInt(dataView, index1)
const [count, index3] = readVarInt(dataView, index2)
let [value, offset] = readZigZag(dataView, index3)
const valuesPerMiniblock = blockSize / miniblockPerBlock
for (let valueIndex = 0; valueIndex < nValues;) {
const [minDelta, index4] = readZigZag(dataView, offset)
offset = index4
const bitWidths = new Uint8Array(miniblockPerBlock)
for (let i = 0; i < miniblockPerBlock; i++, offset++) {
bitWidths[i] = page[offset]
}
for (let i = 0; i < miniblockPerBlock; i++) {
const bitWidth = bitWidths[i]
if (bitWidth) {
if (count > 1) {
// no more diffs if on last value, delta read bitpacked
let data = 0
let stop = -bitWidth
// TODO: possible loss of precision
const mask = 0xffffffffffffffff >> (64 - bitWidth)
while (count) {
if (stop < 0) {
data = ((data & 0x00ffffffffffffff) << 8) | dataView.getUint8(offset++)
stop += 8
} else {
values.push((data >> stop) & mask)
}
}
}
} else {
for (let j = 0; j < valuesPerMiniblock && valueIndex < nValues; j++, valueIndex++) {
values[valueIndex] = value
value += minDelta
}
}
}
}
}

@ -249,7 +249,7 @@ export function readRleBitPackedHybrid(dataView, offset, width, length, numValue
}
const value = []
const startByteLength = byteLength
while (byteLength - startByteLength < length) {
while (offset + byteLength - startByteLength < length) {
const [header, newOffset] = readVarInt(dataView, offset + byteLength)
byteLength = newOffset - offset
if ((header & 1) === 0) {

@ -176,7 +176,7 @@ function readVarBigInt(view, index) {
* @param {number} index
* @returns {[number, number]} [value, newIndex]
*/
function readZigZag(view, index) {
export function readZigZag(view, index) {
const [zigzag, newIndex] = readVarInt(view, index)
// convert zigzag to int
const value = (zigzag >>> 1) ^ -(zigzag & 1)

@ -0,0 +1,7 @@
[
["abc", 1, 2, 1, [1, 2, 3]],
["abc", 2, 3, 1, null],
["abc", 3, 4, 1, null],
[null, 4, 5, 0, [1, 2, 3]],
["abc", 5, 2, 1, [1, 2]]
]

@ -0,0 +1,166 @@
{
"version": 1,
"created_by": "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)",
"key_value_metadata": [
{
"key": "org.apache.spark.sql.parquet.row.metadata",
"value": "{\"type\":\"struct\",\"fields\":[{\"name\":\"a\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"b\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"c\",\"type\":\"double\",\"nullable\":false,\"metadata\":{}},{\"name\":\"d\",\"type\":\"boolean\",\"nullable\":false,\"metadata\":{}},{\"name\":\"e\",\"type\":{\"type\":\"array\",\"elementType\":\"integer\",\"containsNull\":false},\"nullable\":true,\"metadata\":{}}]}"
}
],
"metadata_length": 836,
"num_rows": 5,
"row_groups": [
{
"columns": [
{
"file_offset": 4,
"meta_data": {
"codec": "SNAPPY",
"data_page_offset": 4,
"encodings": [
0,
8
],
"num_values": 5,
"path_in_schema": ["a"],
"statistics": {
"max": "abc",
"min": "abc",
"null_count": 1
},
"total_compressed_size": 63,
"total_uncompressed_size": 59,
"type": 6
}
},
{
"file_offset": 67,
"meta_data": {
"codec": "SNAPPY",
"data_page_offset": 67,
"encodings": [5],
"num_values": 5,
"path_in_schema": ["b"],
"statistics": {
"max": "\u0005\u0000\u0000\u0000",
"min": "\u0001\u0000\u0000\u0000",
"null_count": 0
},
"total_compressed_size": 49,
"total_uncompressed_size": 47,
"type": 1
}
},
{
"file_offset": 116,
"meta_data": {
"codec": "SNAPPY",
"data_page_offset": 116,
"encodings": [
0,
8
],
"num_values": 5,
"path_in_schema": ["c"],
"statistics": {
"max": "\u0000\u0000\u0000\u0000\u0000\u0000\u0014@",
"min": "\u0000\u0000\u0000\u0000\u0000\u0000\u0000@",
"null_count": 0
},
"total_compressed_size": 88,
"total_uncompressed_size": 94,
"type": 5
}
},
{
"file_offset": 204,
"meta_data": {
"codec": "SNAPPY",
"data_page_offset": 204,
"encodings": [3],
"num_values": 5,
"path_in_schema": ["d"],
"statistics": {
"max": "\u0001",
"min": "\u0000",
"null_count": 0
},
"total_compressed_size": 39,
"total_uncompressed_size": 37,
"type": 0
}
},
{
"file_offset": 243,
"meta_data": {
"codec": "SNAPPY",
"data_page_offset": 243,
"encodings": [
0,
8
],
"num_values": 10,
"path_in_schema": [
"e",
"list",
"element"
],
"statistics": {
"max": "\u0003\u0000\u0000\u0000",
"min": "\u0001\u0000\u0000\u0000",
"null_count": 2
},
"total_compressed_size": 78,
"total_uncompressed_size": 74,
"type": 1
}
}
],
"num_rows": 5,
"total_byte_size": 311
}
],
"schema": [
{
"name": "spark_schema",
"num_children": 5
},
{
"converted_type": "UTF8",
"name": "a",
"repetition_type": "OPTIONAL",
"type": 6
},
{
"name": "b",
"repetition_type": "REQUIRED",
"type": 1
},
{
"name": "c",
"repetition_type": "REQUIRED",
"type": 5
},
{
"name": "d",
"repetition_type": "REQUIRED",
"type": 0
},
{
"converted_type": "LIST",
"name": "e",
"num_children": 1,
"repetition_type": "OPTIONAL"
},
{
"name": "list",
"num_children": 1,
"repetition_type": "REPEATED"
},
{
"name": "element",
"repetition_type": "REQUIRED",
"type": 1
}
]
}

Binary file not shown.

@ -5,7 +5,7 @@ import { toJson } from '../src/toJson.js'
import { fileToAsyncBuffer, fileToJson } from './helpers.js'
describe('parquetRead', () => {
const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet'))
const files = fs.readdirSync('test/files').filter(f => f.endsWith('y.parquet'))
files.forEach(file => {
it(`should parse data from ${file}`, async () => {