Dremel list encoding

This commit is contained in:
Kenny Daniel 2025-10-21 00:37:26 -07:00
parent 1ac0c70197
commit e64e66abaa
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
7 changed files with 292 additions and 25 deletions

@ -1,5 +1,6 @@
import { ByteWriter } from './bytewriter.js'
import { writeDataPageV2, writePageHeader } from './datapage.js'
import { encodeListValues } from './dremel.js'
import { writePlain } from './plain.js'
import { snappyCompress } from './snappy.js'
import { unconvert } from './unconvert.js'
@ -17,6 +18,16 @@ export function writeColumn(writer, column, values, stats) {
if (!type) throw new Error(`column ${columnName} cannot determine type`)
const offsetStart = writer.offset
/** @type {ListValues | undefined} */
let listValues
if (isListLike(schemaPath)) {
if (!Array.isArray(values)) {
throw new Error(`parquet column ${columnName} expects array values for list encoding`)
}
listValues = encodeListValues(schemaPath, values)
values = listValues.values
}
const num_values = values.length
/** @type {Encoding[]} */
const encodings = []
@ -46,7 +57,7 @@ export function writeColumn(writer, column, values, stats) {
// write data page with dictionary indexes
data_page_offset = BigInt(writer.offset)
writeDataPageV2(writer, indexes, column, 'RLE_DICTIONARY')
writeDataPageV2(writer, indexes, column, 'RLE_DICTIONARY', listValues)
encodings.push('RLE_DICTIONARY')
} else {
// unconvert values from rich types to simple
@ -54,7 +65,7 @@ export function writeColumn(writer, column, values, stats) {
// write data page
const encoding = type === 'BOOLEAN' && values.length > 16 ? 'RLE' : 'PLAIN'
writeDataPageV2(writer, values, column, encoding)
writeDataPageV2(writer, values, column, encoding, listValues)
encodings.push(encoding)
}
@ -121,8 +132,8 @@ function writeDictionaryPage(writer, column, dictionary) {
}
/**
* @import {ColumnMetaData, DecodedArray, Encoding, ParquetType, Statistics} from 'hyparquet'
* @import {ColumnEncoder, Writer} from '../src/types.js'
* @import {ColumnMetaData, DecodedArray, Encoding, ParquetType, SchemaElement, Statistics} from 'hyparquet'
* @import {ColumnEncoder, ListValues, Writer} from '../src/types.js'
* @param {DecodedArray} values
* @returns {Statistics}
*/
@ -144,3 +155,18 @@ function getStatistics(values) {
}
return { min_value, max_value, null_count }
}
/**
* @param {SchemaElement[]} schemaPath
* @returns {boolean}
*/
function isListLike(schemaPath) {
for (let i = 1; i < schemaPath.length; i++) {
const element = schemaPath[i]
if (element?.converted_type === 'LIST') {
const repeatedChild = schemaPath[i + 1]
return repeatedChild?.repetition_type === 'REPEATED'
}
}
return false
}

@ -11,8 +11,9 @@ import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
* @param {DecodedArray} values
* @param {ColumnEncoder} column
* @param {import('hyparquet').Encoding} encoding
* @param {ListValues} [listValues]
*/
export function writeDataPageV2(writer, values, column, encoding) {
export function writeDataPageV2(writer, values, column, encoding, listValues) {
const { columnName, element, compressed } = column
const { type, type_length, repetition_type } = element
@ -21,8 +22,12 @@ export function writeDataPageV2(writer, values, column, encoding) {
// write levels to temp buffer
const levelWriter = new ByteWriter()
const { definition_levels_byte_length, repetition_levels_byte_length, num_nulls }
= writeLevels(levelWriter, column, values)
const {
definition_levels_byte_length,
repetition_levels_byte_length,
num_nulls,
num_values,
} = writeLevels(levelWriter, column, values, listValues)
const nonnull = values.filter(v => v !== null && v !== undefined)
@ -56,7 +61,7 @@ export function writeDataPageV2(writer, values, column, encoding) {
uncompressed_page_size: levelWriter.offset + page.offset,
compressed_page_size: levelWriter.offset + compressedPage.offset,
data_page_header_v2: {
num_values: values.length,
num_values,
num_nulls,
num_rows: values.length,
encoding,
@ -110,43 +115,63 @@ export function writePageHeader(writer, header) {
/**
* @import {DecodedArray, PageHeader, SchemaElement} from 'hyparquet'
* @import {ColumnEncoder, Writer} from '../src/types.js'
* @import {ColumnEncoder, ListValues, Writer} from '../src/types.js'
* @param {Writer} writer
* @param {ColumnEncoder} column
* @param {DecodedArray} values
* @param {ListValues} [listValues]
* @returns {{
* definition_levels_byte_length: number
* repetition_levels_byte_length: number
* num_nulls: number
* num_values: number
* }}
*/
function writeLevels(writer, column, values) {
function writeLevels(writer, column, values, listValues) {
const { schemaPath } = column
const definitionLevels = listValues?.definitionLevels
const repetitionLevels = listValues?.repetitionLevels
let num_nulls = 0
let num_nulls = listValues?.numNulls ?? 0
let num_values = definitionLevels?.length ?? values.length
// TODO: repetition levels
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
let repetition_levels_byte_length = 0
if (maxRepetitionLevel) {
repetition_levels_byte_length = writeRleBitPackedHybrid(writer, [], 0)
const bitWidth = Math.ceil(Math.log2(maxRepetitionLevel + 1))
const reps = repetitionLevels ?? []
repetition_levels_byte_length = writeRleBitPackedHybrid(writer, reps, bitWidth)
}
// definition levels
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
let definition_levels_byte_length = 0
if (maxDefinitionLevel) {
const definitionLevels = []
for (const value of values) {
if (value === null || value === undefined) {
definitionLevels.push(maxDefinitionLevel - 1)
num_nulls++
} else {
definitionLevels.push(maxDefinitionLevel)
}
}
const bitWidth = Math.ceil(Math.log2(maxDefinitionLevel + 1))
definition_levels_byte_length = writeRleBitPackedHybrid(writer, definitionLevels, bitWidth)
const defs = definitionLevels ?? (() => {
const generated = []
for (const value of values) {
if (value === null || value === undefined) {
generated.push(maxDefinitionLevel - 1)
num_nulls++
} else {
generated.push(maxDefinitionLevel)
}
}
num_values = generated.length
return generated
})()
if (definitionLevels && listValues === undefined) {
num_nulls = definitionLevels.reduce(
(count, def) => def === maxDefinitionLevel ? count : count + 1,
0
)
}
definition_levels_byte_length = writeRleBitPackedHybrid(writer, defs, bitWidth)
} else {
num_nulls = values.filter(value => value === null || value === undefined).length
}
return { definition_levels_byte_length, repetition_levels_byte_length, num_nulls }
return { definition_levels_byte_length, repetition_levels_byte_length, num_nulls, num_values }
}

111
src/dremel.js Normal file

@ -0,0 +1,111 @@
import { getMaxDefinitionLevel } from './schema.js'
/**
* Encode nested list values into repetition and definition levels.
*
* @import {SchemaElement} from 'hyparquet'
* @import {ListValues} from '../src/types.js'
* @param {SchemaElement[]} schemaPath schema elements from root to leaf
* @param {any[]} rows column data for the current row group
* @returns {ListValues}
*/
export function encodeListValues(schemaPath, rows) {
if (schemaPath.length < 2) throw new Error('parquet list schema path must include column')
/** @type {any[]} */
const values = []
/** @type {number[]} */
const definitionLevels = []
/** @type {number[]} */
const repetitionLevels = []
// Track repetition depth prior to each level
const repLevelPrior = new Array(schemaPath.length)
let repeatedCount = 0
for (let i = 0; i < schemaPath.length; i++) {
repLevelPrior[i] = repeatedCount
if (schemaPath[i].repetition_type === 'REPEATED') repeatedCount++
}
const leafIndex = schemaPath.length - 1
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
for (let row = 0; row < rows.length; row++) {
visit(1, rows[row], 0, 0, false)
}
const numNulls = definitionLevels.reduce(
(count, def) => def === maxDefinitionLevel ? count : count + 1,
0
)
return { values, definitionLevels, repetitionLevels, numNulls }
/**
* Recursively walk the schema path, emitting definition/repetition pairs.
*
* @param {number} depth index into schemaPath
* @param {any} value value at the current depth
* @param {number} defLevel definition level accumulated so far
* @param {number} repLevel repetition level for the next emitted slot
* @param {boolean} allowNull whether the current value is allowed to be null
*/
function visit(depth, value, defLevel, repLevel, allowNull) {
const element = schemaPath[depth]
const repetition = element.repetition_type || 'REQUIRED'
const isLeaf = depth === leafIndex
if (isLeaf) {
if (value === null || value === undefined) {
if (repetition === 'REQUIRED' && !allowNull) {
throw new Error('parquet required value is undefined')
}
definitionLevels.push(defLevel)
repetitionLevels.push(repLevel)
values.push(null)
} else {
const finalDef = repetition === 'REQUIRED' ? defLevel : defLevel + 1
definitionLevels.push(finalDef)
repetitionLevels.push(repLevel)
values.push(value)
}
return
}
if (repetition === 'REPEATED') {
if (value === null || value === undefined) {
if (!allowNull) throw new Error('parquet required value is undefined')
visit(depth + 1, undefined, defLevel, repLevel, true)
return
}
if (!Array.isArray(value)) {
throw new Error(`parquet repeated field ${element.name} must be an array`)
}
if (!value.length) {
visit(depth + 1, undefined, defLevel, repLevel, true)
return
}
for (let i = 0; i < value.length; i++) {
const childRep = i === 0 ? repLevel : repLevelPrior[depth] + 1
visit(depth + 1, value[i], defLevel + 1, childRep, false)
}
return
}
if (repetition === 'OPTIONAL') {
if (value === null || value === undefined) {
visit(depth + 1, undefined, defLevel, repLevel, true)
} else {
visit(depth + 1, value, defLevel + 1, repLevel, false)
}
return
}
// REQUIRED
if (value === null || value === undefined) {
if (!allowNull) throw new Error('parquet required value is undefined')
visit(depth + 1, undefined, defLevel, repLevel, true)
} else {
visit(depth + 1, value, defLevel, repLevel, false)
}
}
}

@ -1,3 +1,4 @@
import { getSchemaPath } from 'hyparquet/src/schema.js'
import { CompressionCodec, ConvertedType, Encoding, FieldRepetitionType, PageType, ParquetType } from 'hyparquet/src/constants.js'
import { serializeTCompactProtocol } from './thrift.js'
import { unconvertStatistics } from './unconvert.js'
@ -44,7 +45,10 @@ export function writeMetadata(writer, metadata) {
field_9: c.meta_data.data_page_offset,
field_10: c.meta_data.index_page_offset,
field_11: c.meta_data.dictionary_page_offset,
field_12: c.meta_data.statistics && unconvertStatistics(c.meta_data.statistics, metadata.schema[columnIndex + 1]),
field_12: c.meta_data.statistics && unconvertStatistics(
c.meta_data.statistics,
schemaElement(metadata.schema, c.meta_data.path_in_schema, columnIndex + 1)
),
field_13: c.meta_data.encoding_stats && c.meta_data.encoding_stats.map(es => ({
field_1: PageType.indexOf(es.page_type),
field_2: Encoding.indexOf(es.encoding),
@ -91,6 +95,22 @@ export function writeMetadata(writer, metadata) {
writer.appendUint32(metadataLength)
}
/**
* Resolve schema element for statistics using the stored path.
*
* @param {import('hyparquet').SchemaElement[]} schema
* @param {string[] | undefined} path
* @param {number} fallbackIndex
* @returns {import('hyparquet').SchemaElement}
*/
function schemaElement(schema, path, fallbackIndex) {
if (path?.length) {
const resolved = getSchemaPath(schema, path).at(-1)?.element
if (resolved) return resolved
}
return schema[fallbackIndex]
}
/**
* @param {LogicalType | undefined} type
* @returns {ThriftObject | undefined}

7
src/types.d.ts vendored

@ -31,6 +31,13 @@ export interface ColumnSource {
nullable?: boolean
}
export interface ListValues {
values: any[]
definitionLevels: number[]
repetitionLevels: number[]
numNulls: number
}
export interface ColumnEncoder {
columnName: string
element: SchemaElement

BIN
test/files/listy.parquet Normal file

Binary file not shown.

78
test/write.list.test.js Normal file

@ -0,0 +1,78 @@
import { parquetReadObjects } from 'hyparquet'
import { describe, expect, it } from 'vitest'
import { parquetWriteBuffer } from '../src/index.js'
describe('parquetWrite lists', () => {
it('writes optional list columns', async () => {
const listy = [
[1, 2],
null,
[],
[3, null, 4],
[null],
]
const buffer = parquetWriteBuffer({
columnData: [{ name: 'listy', data: listy }],
schema: [
{ name: 'root', num_children: 1 },
{
name: 'listy',
repetition_type: 'OPTIONAL',
num_children: 1,
converted_type: 'LIST',
},
{
name: 'list',
repetition_type: 'REPEATED',
num_children: 1,
},
{
name: 'element',
repetition_type: 'OPTIONAL',
type: 'INT32',
},
],
})
const rows = await parquetReadObjects({ file: buffer })
expect(rows).toEqual([
{ listy: [1, 2] },
{ listy: undefined },
{ listy: [] },
{ listy: [3, null, 4] },
{ listy: [null] },
])
})
it('throws on null data for required list columns', () => {
/**
* Schema for a required list of required INT32 values.
* @type {import('hyparquet').SchemaElement[]}
*/
const requiredListSchema = [
{ name: 'root', num_children: 1 },
{
name: 'numbers',
repetition_type: 'REQUIRED',
num_children: 1,
converted_type: 'LIST',
},
{
name: 'list',
repetition_type: 'REPEATED',
num_children: 1,
},
{
name: 'element',
repetition_type: 'REQUIRED',
type: 'INT32',
},
]
expect(() => parquetWriteBuffer({
columnData: [{ name: 'numbers', data: [[420], null] }],
schema: requiredListSchema,
})).toThrow('parquet required value is undefined')
})
})