Fix handling of dictionary pages from parquet.net

This commit is contained in:
Kenny Daniel 2025-04-09 16:38:18 -07:00
parent c2adfba626
commit 972402d083
No known key found for this signature in database
GPG Key ID: FDF16101AF5AFD3A
5 changed files with 81 additions and 27 deletions

@ -1,6 +1,6 @@
import { assembleLists } from './assemble.js'
import { Encoding, PageType } from './constants.js'
import { convertWithDictionary } from './convert.js'
import { convert, convertWithDictionary } from './convert.js'
import { decompressPage, readDataPage, readDataPageV2 } from './datapage.js'
import { readPlain } from './plain.js'
import { isFlatColumn } from './schema.js'
@ -25,22 +25,26 @@ export function readColumn(reader, rowGroupStart, rowGroupEnd, columnMetadata, s
let dictionary = undefined
let rowCount = 0
// read dictionary
if (hasDictionary(columnMetadata)) {
dictionary = readPage(reader, columnMetadata, schemaPath, element, dictionary, undefined, 0, options)
}
while (rowCount < rowGroupEnd) {
if (reader.offset >= reader.view.byteLength - 1) break // end of reader
const lastChunk = chunks.at(-1)
const lastChunkLength = lastChunk ? lastChunk.length : 0
const values = readPage(reader, columnMetadata, schemaPath, element, dictionary, lastChunk, rowGroupStart - rowCount, options)
if (lastChunk === values) {
// continued from previous page
rowCount += values.length - lastChunkLength
// read page header
const header = parquetHeader(reader)
if (header.type === 'DICTIONARY_PAGE') {
// assert(!dictionary)
dictionary = readPage(reader, header, columnMetadata, schemaPath, element, dictionary, undefined, 0, options)
dictionary = convert(dictionary, element, options.utf8)
} else {
chunks.push(values)
rowCount += values.length
const lastChunk = chunks.at(-1)
const lastChunkLength = lastChunk?.length || 0
const values = readPage(reader, header, columnMetadata, schemaPath, element, dictionary, lastChunk, rowGroupStart - rowCount, options)
if (lastChunk === values) {
// continued from previous page
rowCount += values.length - lastChunkLength
} else {
chunks.push(values)
rowCount += values.length
}
}
}
if (isFinite(rowGroupEnd)) {
@ -60,6 +64,7 @@ export function readColumn(reader, rowGroupStart, rowGroupEnd, columnMetadata, s
* Read a page (data or dictionary) from a buffer.
*
* @param {DataReader} reader
* @param {PageHeader} header
* @param {ColumnMetaData} columnMetadata
* @param {SchemaTree[]} schemaPath
* @param {SchemaElement} element
@ -69,9 +74,7 @@ export function readColumn(reader, rowGroupStart, rowGroupEnd, columnMetadata, s
* @param {ParquetReadOptions} options
* @returns {DecodedArray}
*/
export function readPage(reader, columnMetadata, schemaPath, element, dictionary, previousChunk, pageStart, { utf8, compressors }) {
const header = parquetHeader(reader) // column header
export function readPage(reader, header, columnMetadata, schemaPath, element, dictionary, previousChunk, pageStart, { utf8, compressors }) {
// read compressed_page_size bytes
const compressedBytes = new Uint8Array(
reader.view.buffer, reader.view.byteOffset + reader.offset, header.compressed_page_size
@ -138,14 +141,6 @@ export function readPage(reader, columnMetadata, schemaPath, element, dictionary
}
}
/**
* @param {ColumnMetaData} columnMetadata
* @returns {boolean}
*/
function hasDictionary(columnMetadata) {
return columnMetadata.encodings.some(e => e.endsWith('_DICTIONARY'))
}
/**
* Find the start byte offset for a column chunk.
*

@ -13,8 +13,6 @@ const dayMillis = 86400000 // 1 day in milliseconds
*/
export function convertWithDictionary(data, dictionary, schemaElement, encoding, utf8 = true) {
if (dictionary && encoding.endsWith('_DICTIONARY')) {
// convert dictionary
dictionary = convert(dictionary, schemaElement, utf8)
let output = data
if (data instanceof Uint8Array && !(dictionary instanceof Uint8Array)) {
// @ts-expect-error upgrade data to match dictionary type with fancy constructor

5
test/files/issue72.json Normal file

@ -0,0 +1,5 @@
[
["258d7fff-6418-499f-af07-c6611937d7d8"],
["086f2968-327b-48a8-8cdf-64f46bcd8173"],
["258d7fff-6418-499f-af07-c6611937d7d8"]
]

@ -0,0 +1,56 @@
{
"version": 1,
"schema": [
{
"name": "root",
"num_children": 1
},
{
"type": "BYTE_ARRAY",
"repetition_type": "OPTIONAL",
"name": "TextColumn",
"converted_type": "UTF8",
"logical_type": {
"type": "STRING"
}
}
],
"num_rows": 3,
"row_groups": [
{
"columns": [
{
"file_offset": 4,
"meta_data": {
"type": "BYTE_ARRAY",
"encodings": [
"RLE",
"BIT_PACKED",
"PLAIN"
],
"path_in_schema": [
"TextColumn"
],
"codec": "SNAPPY",
"num_values": 3,
"total_uncompressed_size": 283,
"total_compressed_size": 288,
"data_page_offset": 4,
"statistics": {
"max": "258d7fff-6418-499f-af07-c6611937d7d8",
"min": "086f2968-327b-48a8-8cdf-64f46bcd8173",
"null_count": 0,
"distinct_count": 2,
"max_value": "258d7fff-6418-499f-af07-c6611937d7d8",
"min_value": "086f2968-327b-48a8-8cdf-64f46bcd8173"
}
}
}
],
"total_byte_size": 288,
"num_rows": 3
}
],
"created_by": "Parquet.Net version 4.25.0 (build 687fbb462e94eddd1dc5a0aa26f33ba8e53f60e3)",
"metadata_length": 321
}

BIN
test/files/issue72.parquet Normal file

Binary file not shown.