Split out readPage

Remove dict-page-offset-zero test because it's a malformed parquet file.
This commit is contained in:
Kenny Daniel 2025-04-02 19:11:11 -07:00
parent a4b5c2a943
commit 1247f5d606
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
7 changed files with 98 additions and 257 deletions

@ -1,7 +1,8 @@
import { assembleLists } from './assemble.js'
import { Encoding, PageType } from './constants.js'
import { convertWithDictionary } from './convert.js'
import { decompressPage, readDataPage, readDataPageV2, readDictionaryPage } from './datapage.js'
import { decompressPage, readDataPage, readDataPageV2 } from './datapage.js'
import { readPlain } from './plain.js'
import { getMaxDefinitionLevel } from './schema.js'
import { deserializeTCompactProtocol } from './thrift.js'
@ -15,7 +16,7 @@ import { deserializeTCompactProtocol } from './thrift.js'
* @param {ParquetReadOptions} options read options
* @returns {DecodedArray[]}
*/
export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compressors, utf8 }) {
export function readColumn(reader, rowLimit, columnMetadata, schemaPath, options) {
const { element } = schemaPath[schemaPath.length - 1]
/** @type {DecodedArray[]} */
const chunks = []
@ -24,78 +25,16 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr
const hasRowLimit = rowLimit !== undefined && rowLimit >= 0 && isFinite(rowLimit)
let rowCount = 0
// read dictionary
if (hasDictionary(columnMetadata)) {
dictionary = readPage(reader, columnMetadata, schemaPath, element, dictionary, options)
}
while (!hasRowLimit || rowCount < rowLimit) {
if (reader.offset >= reader.view.byteLength - 1) break // end of reader
const header = parquetHeader(reader) // column header
// read compressed_page_size bytes starting at offset
const compressedBytes = new Uint8Array(
reader.view.buffer, reader.view.byteOffset + reader.offset, header.compressed_page_size
)
// parse page data by type
if (header.type === 'DATA_PAGE') {
const daph = header.data_page_header
if (!daph) throw new Error('parquet data page header is undefined')
const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors)
const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata)
// assert(!daph.statistics?.null_count || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length))
// convert types, dereference dictionary, and assemble lists
let values = convertWithDictionary(dataPage, dictionary, element, daph.encoding, utf8)
if (repetitionLevels.length || definitionLevels?.length) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const repetitionPath = schemaPath.map(({ element }) => element.repetition_type)
const chunk = assembleLists(
[], definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel
)
chunks.push(chunk)
rowCount += chunk.length
} else {
// wrap nested flat data by depth
for (let i = 2; i < schemaPath.length; i++) {
if (schemaPath[i].element.repetition_type !== 'REQUIRED') {
values = Array.from(values, e => [e])
}
}
chunks.push(values)
rowCount += values.length
}
} else if (header.type === 'DATA_PAGE_V2') {
const daph2 = header.data_page_header_v2
if (!daph2) throw new Error('parquet data page header v2 is undefined')
const { definitionLevels, repetitionLevels, dataPage } = readDataPageV2(
compressedBytes, header, schemaPath, columnMetadata, compressors
)
// convert types, dereference dictionary, and assemble lists
const values = convertWithDictionary(dataPage, dictionary, element, daph2.encoding, utf8)
if (repetitionLevels.length || definitionLevels?.length) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const repetitionPath = schemaPath.map(({ element }) => element.repetition_type)
const chunk = assembleLists(
[], definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel
)
chunks.push(chunk)
rowCount += chunk.length
} else {
chunks.push(values)
rowCount += values.length
}
} else if (header.type === 'DICTIONARY_PAGE') {
const diph = header.dictionary_page_header
if (!diph) throw new Error('parquet dictionary page header is undefined')
const page = decompressPage(
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors
)
dictionary = readDictionaryPage(page, diph, columnMetadata, element.type_length)
} else {
throw new Error(`parquet unsupported page type: ${header.type}`)
}
reader.offset += header.compressed_page_size
const values = readPage(reader, columnMetadata, schemaPath, element, dictionary, options)
chunks.push(values)
rowCount += values.length
}
if (hasRowLimit) {
if (rowCount < rowLimit) {
@ -110,6 +49,90 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr
return chunks
}
/**
* Read a page (data or dictionary) from a buffer.
*
* @param {DataReader} reader
* @param {ColumnMetaData} columnMetadata
* @param {SchemaTree[]} schemaPath
* @param {SchemaElement} element
* @param {DecodedArray | undefined} dictionary
* @param {ParquetReadOptions} options
* @returns {DecodedArray}
*/
export function readPage(reader, columnMetadata, schemaPath, element, dictionary, { utf8, compressors }) {
const header = parquetHeader(reader) // column header
// read compressed_page_size bytes
const compressedBytes = new Uint8Array(
reader.view.buffer, reader.view.byteOffset + reader.offset, header.compressed_page_size
)
reader.offset += header.compressed_page_size
// parse page data by type
if (header.type === 'DATA_PAGE') {
const daph = header.data_page_header
if (!daph) throw new Error('parquet data page header is undefined')
const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors)
const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata)
// assert(!daph.statistics?.null_count || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length))
// convert types, dereference dictionary, and assemble lists
let values = convertWithDictionary(dataPage, dictionary, element, daph.encoding, utf8)
if (repetitionLevels.length || definitionLevels?.length) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const repetitionPath = schemaPath.map(({ element }) => element.repetition_type)
return assembleLists([], definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel)
} else {
// wrap nested flat data by depth
for (let i = 2; i < schemaPath.length; i++) {
if (schemaPath[i].element.repetition_type !== 'REQUIRED') {
values = Array.from(values, e => [e])
}
}
return values
}
} else if (header.type === 'DATA_PAGE_V2') {
const daph2 = header.data_page_header_v2
if (!daph2) throw new Error('parquet data page header v2 is undefined')
const { definitionLevels, repetitionLevels, dataPage } = readDataPageV2(
compressedBytes, header, schemaPath, columnMetadata, compressors
)
// convert types, dereference dictionary, and assemble lists
const values = convertWithDictionary(dataPage, dictionary, element, daph2.encoding, utf8)
if (repetitionLevels.length || definitionLevels?.length) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const repetitionPath = schemaPath.map(({ element }) => element.repetition_type)
return assembleLists([], definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel)
} else {
return values
}
} else if (header.type === 'DICTIONARY_PAGE') {
const diph = header.dictionary_page_header
if (!diph) throw new Error('parquet dictionary page header is undefined')
const page = decompressPage(
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors
)
const reader = { view: new DataView(page.buffer, page.byteOffset, page.byteLength), offset: 0 }
return readPlain(reader, columnMetadata.type, diph.num_values, element.type_length)
} else {
throw new Error(`parquet unsupported page type: ${header.type}`)
}
}
/**
* @param {ColumnMetaData} columnMetadata
* @returns {boolean}
*/
function hasDictionary(columnMetadata) {
return columnMetadata.encodings.some(e => e.endsWith('_DICTIONARY'))
}
/**
* Find the start byte offset for a column chunk.
*
@ -117,17 +140,14 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr
* @returns {[bigint, bigint]} byte offset range
*/
export function getColumnRange({ dictionary_page_offset, data_page_offset, total_compressed_size }) {
let columnOffset = dictionary_page_offset
if (!columnOffset || data_page_offset < columnOffset) {
columnOffset = data_page_offset
}
const columnOffset = dictionary_page_offset || data_page_offset
return [columnOffset, columnOffset + total_compressed_size]
}
/**
* Read parquet header from a buffer.
*
* @import {ColumnMetaData, DecodedArray, DataReader, PageHeader, ParquetReadOptions, SchemaTree} from '../src/types.d.ts'
* @import {ColumnMetaData, DecodedArray, DataReader, PageHeader, ParquetReadOptions, SchemaElement, SchemaTree} from '../src/types.d.ts'
* @param {DataReader} reader
* @returns {PageHeader}
*/

@ -59,20 +59,7 @@ export function readDataPage(bytes, daph, schemaPath, { type }) {
}
/**
* @param {Uint8Array} bytes raw page data
* @param {DictionaryPageHeader} diph dictionary page header
* @param {ColumnMetaData} columnMetadata
* @param {number | undefined} typeLength - type_length from schema
* @returns {DecodedArray}
*/
export function readDictionaryPage(bytes, diph, columnMetadata, typeLength) {
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
const reader = { view, offset: 0 }
return readPlain(reader, columnMetadata.type, diph.num_values, typeLength)
}
/**
* @import {ColumnMetaData, CompressionCodec, Compressors, DataPage, DataPageHeader, DataPageHeaderV2, DataReader, DecodedArray, DictionaryPageHeader, PageHeader, SchemaTree} from '../src/types.d.ts'
* @import {ColumnMetaData, CompressionCodec, Compressors, DataPage, DataPageHeader, DataPageHeaderV2, DataReader, DecodedArray, PageHeader, SchemaTree} from '../src/types.d.ts'
* @param {DataReader} reader data view for the page
* @param {DataPageHeader} daph data page header
* @param {SchemaTree[]} schemaPath

@ -1,19 +0,0 @@
[
[
{
"null_pages": [
false
],
"min_values": [
1552
],
"max_values": [
1552
],
"boundary_order": "ASCENDING",
"null_counts": [
0
]
}
]
]

@ -1,41 +0,0 @@
[
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552],
[1552]
]

@ -1,93 +0,0 @@
{
"version": 1,
"schema": [
{
"name": "root",
"num_children": 1
},
{
"type": "INT32",
"repetition_type": "OPTIONAL",
"name": "l_partkey"
}
],
"num_rows": 39,
"row_groups": [
{
"columns": [
{
"file_offset": 4,
"meta_data": {
"type": "INT32",
"encodings": [
"PLAIN",
"BIT_PACKED",
"RLE"
],
"path_in_schema": [
"l_partkey"
],
"codec": "SNAPPY",
"num_values": 39,
"total_uncompressed_size": 180,
"total_compressed_size": 40,
"data_page_offset": 4,
"dictionary_page_offset": 0,
"statistics": {
"max": 1552,
"min": 1552,
"null_count": 0,
"max_value": 1552,
"min_value": 1552
},
"encoding_stats": [
{
"page_type": "DATA_PAGE",
"encoding": "PLAIN",
"count": 1
}
],
"bloom_filter_length": [
{
"field_1": {
"field_1": 0,
"field_2": 162,
"field_3": 22,
"field_5": {
"field_1": 39,
"field_2": 0,
"field_3": 3,
"field_4": 4
}
},
"field_2": 22
}
]
},
"offset_index_offset": 67,
"offset_index_length": 10,
"column_index_offset": 44,
"column_index_length": 23
}
],
"total_byte_size": 180,
"num_rows": 39
}
],
"key_value_metadata": [
{
"key": "is.date.correct",
"value": "true"
},
{
"key": "dremio.arrow.schema.2.1",
"value": "{\n \"fields\" : [ {\n \"name\" : \"l_partkey\",\n \"nullable\" : true,\n \"type\" : {\n \"name\" : \"int\",\n \"bitWidth\" : 32,\n \"isSigned\" : true\n },\n \"children\" : [ ]\n } ]\n}"
},
{
"key": "dremio.version",
"value": "3.2.0-201905102005330382-0598733"
}
],
"created_by": "parquet-mr version 1.12.0-201812210311360288-a86293f (build cec1a483e9dcd545e09170ae787d3dcb13744433)",
"metadata_length": 550
}

@ -1,13 +0,0 @@
[
[
{
"page_locations": [
{
"offset": 4,
"compressed_page_size": 40,
"first_row_index": 0
}
]
}
]
]