From e64e66abaade136223d3e3a0471c1183c389679a Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Tue, 21 Oct 2025 00:37:26 -0700 Subject: [PATCH] Dremel list encoding --- src/column.js | 34 ++++++++++-- src/datapage.js | 65 ++++++++++++++++------- src/dremel.js | 111 +++++++++++++++++++++++++++++++++++++++ src/metadata.js | 22 +++++++- src/types.d.ts | 7 +++ test/files/listy.parquet | Bin 0 -> 1515 bytes test/write.list.test.js | 78 +++++++++++++++++++++++++++ 7 files changed, 292 insertions(+), 25 deletions(-) create mode 100644 src/dremel.js create mode 100644 test/files/listy.parquet create mode 100644 test/write.list.test.js diff --git a/src/column.js b/src/column.js index 1d97545..874c7cd 100644 --- a/src/column.js +++ b/src/column.js @@ -1,5 +1,6 @@ import { ByteWriter } from './bytewriter.js' import { writeDataPageV2, writePageHeader } from './datapage.js' +import { encodeListValues } from './dremel.js' import { writePlain } from './plain.js' import { snappyCompress } from './snappy.js' import { unconvert } from './unconvert.js' @@ -17,6 +18,16 @@ export function writeColumn(writer, column, values, stats) { if (!type) throw new Error(`column ${columnName} cannot determine type`) const offsetStart = writer.offset + /** @type {ListValues | undefined} */ + let listValues + if (isListLike(schemaPath)) { + if (!Array.isArray(values)) { + throw new Error(`parquet column ${columnName} expects array values for list encoding`) + } + listValues = encodeListValues(schemaPath, values) + values = listValues.values + } + const num_values = values.length /** @type {Encoding[]} */ const encodings = [] @@ -46,7 +57,7 @@ export function writeColumn(writer, column, values, stats) { // write data page with dictionary indexes data_page_offset = BigInt(writer.offset) - writeDataPageV2(writer, indexes, column, 'RLE_DICTIONARY') + writeDataPageV2(writer, indexes, column, 'RLE_DICTIONARY', listValues) encodings.push('RLE_DICTIONARY') } else { // unconvert values from rich types to simple @@ -54,7 +65,7 @@ export function writeColumn(writer, column, values, stats) { // write data page const encoding = type === 'BOOLEAN' && values.length > 16 ? 'RLE' : 'PLAIN' - writeDataPageV2(writer, values, column, encoding) + writeDataPageV2(writer, values, column, encoding, listValues) encodings.push(encoding) } @@ -121,8 +132,8 @@ function writeDictionaryPage(writer, column, dictionary) { } /** - * @import {ColumnMetaData, DecodedArray, Encoding, ParquetType, Statistics} from 'hyparquet' - * @import {ColumnEncoder, Writer} from '../src/types.js' + * @import {ColumnMetaData, DecodedArray, Encoding, ParquetType, SchemaElement, Statistics} from 'hyparquet' + * @import {ColumnEncoder, ListValues, Writer} from '../src/types.js' * @param {DecodedArray} values * @returns {Statistics} */ @@ -144,3 +155,18 @@ function getStatistics(values) { } return { min_value, max_value, null_count } } + +/** + * @param {SchemaElement[]} schemaPath + * @returns {boolean} + */ +function isListLike(schemaPath) { + for (let i = 1; i < schemaPath.length; i++) { + const element = schemaPath[i] + if (element?.converted_type === 'LIST') { + const repeatedChild = schemaPath[i + 1] + return repeatedChild?.repetition_type === 'REPEATED' + } + } + return false +} diff --git a/src/datapage.js b/src/datapage.js index c1464e4..92ed892 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -11,8 +11,9 @@ import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' * @param {DecodedArray} values * @param {ColumnEncoder} column * @param {import('hyparquet').Encoding} encoding + * @param {ListValues} [listValues] */ -export function writeDataPageV2(writer, values, column, encoding) { +export function writeDataPageV2(writer, values, column, encoding, listValues) { const { columnName, element, compressed } = column const { type, type_length, repetition_type } = element @@ -21,8 +22,12 @@ export function writeDataPageV2(writer, values, column, encoding) { // write levels to temp buffer const levelWriter = new ByteWriter() - const { definition_levels_byte_length, repetition_levels_byte_length, num_nulls } - = writeLevels(levelWriter, column, values) + const { + definition_levels_byte_length, + repetition_levels_byte_length, + num_nulls, + num_values, + } = writeLevels(levelWriter, column, values, listValues) const nonnull = values.filter(v => v !== null && v !== undefined) @@ -56,7 +61,7 @@ export function writeDataPageV2(writer, values, column, encoding) { uncompressed_page_size: levelWriter.offset + page.offset, compressed_page_size: levelWriter.offset + compressedPage.offset, data_page_header_v2: { - num_values: values.length, + num_values, num_nulls, num_rows: values.length, encoding, @@ -110,43 +115,63 @@ export function writePageHeader(writer, header) { /** * @import {DecodedArray, PageHeader, SchemaElement} from 'hyparquet' - * @import {ColumnEncoder, Writer} from '../src/types.js' + * @import {ColumnEncoder, ListValues, Writer} from '../src/types.js' * @param {Writer} writer * @param {ColumnEncoder} column * @param {DecodedArray} values + * @param {ListValues} [listValues] * @returns {{ * definition_levels_byte_length: number * repetition_levels_byte_length: number * num_nulls: number + * num_values: number * }} */ -function writeLevels(writer, column, values) { +function writeLevels(writer, column, values, listValues) { const { schemaPath } = column + const definitionLevels = listValues?.definitionLevels + const repetitionLevels = listValues?.repetitionLevels - let num_nulls = 0 + let num_nulls = listValues?.numNulls ?? 0 + let num_values = definitionLevels?.length ?? values.length - // TODO: repetition levels const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) let repetition_levels_byte_length = 0 if (maxRepetitionLevel) { - repetition_levels_byte_length = writeRleBitPackedHybrid(writer, [], 0) + const bitWidth = Math.ceil(Math.log2(maxRepetitionLevel + 1)) + const reps = repetitionLevels ?? [] + repetition_levels_byte_length = writeRleBitPackedHybrid(writer, reps, bitWidth) } // definition levels const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) let definition_levels_byte_length = 0 if (maxDefinitionLevel) { - const definitionLevels = [] - for (const value of values) { - if (value === null || value === undefined) { - definitionLevels.push(maxDefinitionLevel - 1) - num_nulls++ - } else { - definitionLevels.push(maxDefinitionLevel) - } - } const bitWidth = Math.ceil(Math.log2(maxDefinitionLevel + 1)) - definition_levels_byte_length = writeRleBitPackedHybrid(writer, definitionLevels, bitWidth) + const defs = definitionLevels ?? (() => { + const generated = [] + for (const value of values) { + if (value === null || value === undefined) { + generated.push(maxDefinitionLevel - 1) + num_nulls++ + } else { + generated.push(maxDefinitionLevel) + } + } + num_values = generated.length + return generated + })() + + if (definitionLevels && listValues === undefined) { + num_nulls = definitionLevels.reduce( + (count, def) => def === maxDefinitionLevel ? count : count + 1, + 0 + ) + } + + definition_levels_byte_length = writeRleBitPackedHybrid(writer, defs, bitWidth) + } else { + num_nulls = values.filter(value => value === null || value === undefined).length } - return { definition_levels_byte_length, repetition_levels_byte_length, num_nulls } + return { definition_levels_byte_length, repetition_levels_byte_length, num_nulls, num_values } } diff --git a/src/dremel.js b/src/dremel.js new file mode 100644 index 0000000..3f776fb --- /dev/null +++ b/src/dremel.js @@ -0,0 +1,111 @@ +import { getMaxDefinitionLevel } from './schema.js' + +/** + * Encode nested list values into repetition and definition levels. + * + * @import {SchemaElement} from 'hyparquet' + * @import {ListValues} from '../src/types.js' + * @param {SchemaElement[]} schemaPath schema elements from root to leaf + * @param {any[]} rows column data for the current row group + * @returns {ListValues} + */ +export function encodeListValues(schemaPath, rows) { + if (schemaPath.length < 2) throw new Error('parquet list schema path must include column') + /** @type {any[]} */ + const values = [] + /** @type {number[]} */ + const definitionLevels = [] + /** @type {number[]} */ + const repetitionLevels = [] + + // Track repetition depth prior to each level + const repLevelPrior = new Array(schemaPath.length) + let repeatedCount = 0 + for (let i = 0; i < schemaPath.length; i++) { + repLevelPrior[i] = repeatedCount + if (schemaPath[i].repetition_type === 'REPEATED') repeatedCount++ + } + + const leafIndex = schemaPath.length - 1 + const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) + + for (let row = 0; row < rows.length; row++) { + visit(1, rows[row], 0, 0, false) + } + + const numNulls = definitionLevels.reduce( + (count, def) => def === maxDefinitionLevel ? count : count + 1, + 0 + ) + + return { values, definitionLevels, repetitionLevels, numNulls } + + /** + * Recursively walk the schema path, emitting definition/repetition pairs. + * + * @param {number} depth index into schemaPath + * @param {any} value value at the current depth + * @param {number} defLevel definition level accumulated so far + * @param {number} repLevel repetition level for the next emitted slot + * @param {boolean} allowNull whether the current value is allowed to be null + */ + function visit(depth, value, defLevel, repLevel, allowNull) { + const element = schemaPath[depth] + const repetition = element.repetition_type || 'REQUIRED' + const isLeaf = depth === leafIndex + + if (isLeaf) { + if (value === null || value === undefined) { + if (repetition === 'REQUIRED' && !allowNull) { + throw new Error('parquet required value is undefined') + } + definitionLevels.push(defLevel) + repetitionLevels.push(repLevel) + values.push(null) + } else { + const finalDef = repetition === 'REQUIRED' ? defLevel : defLevel + 1 + definitionLevels.push(finalDef) + repetitionLevels.push(repLevel) + values.push(value) + } + return + } + + if (repetition === 'REPEATED') { + if (value === null || value === undefined) { + if (!allowNull) throw new Error('parquet required value is undefined') + visit(depth + 1, undefined, defLevel, repLevel, true) + return + } + if (!Array.isArray(value)) { + throw new Error(`parquet repeated field ${element.name} must be an array`) + } + if (!value.length) { + visit(depth + 1, undefined, defLevel, repLevel, true) + return + } + for (let i = 0; i < value.length; i++) { + const childRep = i === 0 ? repLevel : repLevelPrior[depth] + 1 + visit(depth + 1, value[i], defLevel + 1, childRep, false) + } + return + } + + if (repetition === 'OPTIONAL') { + if (value === null || value === undefined) { + visit(depth + 1, undefined, defLevel, repLevel, true) + } else { + visit(depth + 1, value, defLevel + 1, repLevel, false) + } + return + } + + // REQUIRED + if (value === null || value === undefined) { + if (!allowNull) throw new Error('parquet required value is undefined') + visit(depth + 1, undefined, defLevel, repLevel, true) + } else { + visit(depth + 1, value, defLevel, repLevel, false) + } + } +} diff --git a/src/metadata.js b/src/metadata.js index 390c0e4..0e3dbbc 100644 --- a/src/metadata.js +++ b/src/metadata.js @@ -1,3 +1,4 @@ +import { getSchemaPath } from 'hyparquet/src/schema.js' import { CompressionCodec, ConvertedType, Encoding, FieldRepetitionType, PageType, ParquetType } from 'hyparquet/src/constants.js' import { serializeTCompactProtocol } from './thrift.js' import { unconvertStatistics } from './unconvert.js' @@ -44,7 +45,10 @@ export function writeMetadata(writer, metadata) { field_9: c.meta_data.data_page_offset, field_10: c.meta_data.index_page_offset, field_11: c.meta_data.dictionary_page_offset, - field_12: c.meta_data.statistics && unconvertStatistics(c.meta_data.statistics, metadata.schema[columnIndex + 1]), + field_12: c.meta_data.statistics && unconvertStatistics( + c.meta_data.statistics, + schemaElement(metadata.schema, c.meta_data.path_in_schema, columnIndex + 1) + ), field_13: c.meta_data.encoding_stats && c.meta_data.encoding_stats.map(es => ({ field_1: PageType.indexOf(es.page_type), field_2: Encoding.indexOf(es.encoding), @@ -91,6 +95,22 @@ export function writeMetadata(writer, metadata) { writer.appendUint32(metadataLength) } +/** + * Resolve schema element for statistics using the stored path. + * + * @param {import('hyparquet').SchemaElement[]} schema + * @param {string[] | undefined} path + * @param {number} fallbackIndex + * @returns {import('hyparquet').SchemaElement} + */ +function schemaElement(schema, path, fallbackIndex) { + if (path?.length) { + const resolved = getSchemaPath(schema, path).at(-1)?.element + if (resolved) return resolved + } + return schema[fallbackIndex] +} + /** * @param {LogicalType | undefined} type * @returns {ThriftObject | undefined} diff --git a/src/types.d.ts b/src/types.d.ts index 0656e36..e7e7402 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -31,6 +31,13 @@ export interface ColumnSource { nullable?: boolean } +export interface ListValues { + values: any[] + definitionLevels: number[] + repetitionLevels: number[] + numNulls: number +} + export interface ColumnEncoder { columnName: string element: SchemaElement diff --git a/test/files/listy.parquet b/test/files/listy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..7f6eef4dfe6f6d97216b8bb667fcdc6145c82afc GIT binary patch literal 1515 zcmaJ>&rcIU6rSDgZcEc>A28LOQ!++K z^o>9W5y5<55||98fce1!U?X5bFiL2g&QDR5hN(i8h$f*|laQ~9(Q`9!+=^g0S@p|; zKmP9RJtE2Bx!vPaaDsk{E@Wtc>rViGL?FHqe~K#ufUDyOinpmT1$uqO0Oo~F3!o}C zH4weT3k#zuzNy!khwsr((RYT*&uJk;gIswgg*&yJ#-WAVF4}EN7!6gdT1_5FmTjkP zVFm0)tLoHUNaYTV{!3+EiOLAIG?_;LJdF@kO&p^1X05J!i8*khJ(zgsH*BOL473}JtJ`4h0v#< z_OLBH(l89kyGAgi&l7PRL{U!a%U#^)MZK{4Lr}j9#%Iwu5hL0qM}NhIxGV!jUlm0O z?>MFq>Zfq7Y{Tkb1occXzDdS!Uu5VG3~8uw-3W3O5x*7nqpSvvfci-$dPEDcT<&T9 zp?5X3Q|Ugoc*SIl?SNr>!dRD?6}HM)g)LXO&ji30zAZDhfg{WZBg}9qeSm)6@lfUp zh6b0JHhE|s356J2!Hz?D{PpcvspfPFv0kI}0I=yaFc)J3zL`U~hxhn=zK(q%-*bxj z{Z5&g2Z(pG&~;&9r@M#io7j0<<5W&!3*a+5<$SMokl$<-_MA?6#nV1Q{vzrlW@VXY z3dO()c3vyNU^>OM{dOUrh_UsfKAw3y1tlmR)aQViU9WCH9@KYGk2=OWR}ajZrBP}v zaa??nm@TgLHj8d<%X+pQ11}H3@qgn3e>gs4hZqSC51JPr-`ItEVSZgaPw;anVoZLh zY-x#T6Ai2RvSYWWOO3{~)oj+g6UTP5Rjs=dbF(wCnV6o0kJo!h3w~x<_|5+T6FV@d literal 0 HcmV?d00001 diff --git a/test/write.list.test.js b/test/write.list.test.js new file mode 100644 index 0000000..f0c9f3b --- /dev/null +++ b/test/write.list.test.js @@ -0,0 +1,78 @@ +import { parquetReadObjects } from 'hyparquet' +import { describe, expect, it } from 'vitest' +import { parquetWriteBuffer } from '../src/index.js' + +describe('parquetWrite lists', () => { + it('writes optional list columns', async () => { + const listy = [ + [1, 2], + null, + [], + [3, null, 4], + [null], + ] + + const buffer = parquetWriteBuffer({ + columnData: [{ name: 'listy', data: listy }], + schema: [ + { name: 'root', num_children: 1 }, + { + name: 'listy', + repetition_type: 'OPTIONAL', + num_children: 1, + converted_type: 'LIST', + }, + { + name: 'list', + repetition_type: 'REPEATED', + num_children: 1, + }, + { + name: 'element', + repetition_type: 'OPTIONAL', + type: 'INT32', + }, + ], + }) + + const rows = await parquetReadObjects({ file: buffer }) + expect(rows).toEqual([ + { listy: [1, 2] }, + { listy: undefined }, + { listy: [] }, + { listy: [3, null, 4] }, + { listy: [null] }, + ]) + }) + + it('throws on null data for required list columns', () => { + /** + * Schema for a required list of required INT32 values. + * @type {import('hyparquet').SchemaElement[]} + */ + const requiredListSchema = [ + { name: 'root', num_children: 1 }, + { + name: 'numbers', + repetition_type: 'REQUIRED', + num_children: 1, + converted_type: 'LIST', + }, + { + name: 'list', + repetition_type: 'REPEATED', + num_children: 1, + }, + { + name: 'element', + repetition_type: 'REQUIRED', + type: 'INT32', + }, + ] + + expect(() => parquetWriteBuffer({ + columnData: [{ name: 'numbers', data: [[420], null] }], + schema: requiredListSchema, + })).toThrow('parquet required value is undefined') + }) +})