Refactor ColumnEncoder to pass around column info

This commit is contained in:
Kenny Daniel 2025-10-20 23:46:33 -07:00
parent 1ba902f0ed
commit 1ac0c70197
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
6 changed files with 84 additions and 41 deletions

@ -56,10 +56,10 @@
},
"devDependencies": {
"@babel/eslint-parser": "7.28.4",
"@types/node": "24.8.1",
"@types/node": "24.9.1",
"@vitest/coverage-v8": "3.2.4",
"eslint": "9.38.0",
"eslint-plugin-jsdoc": "61.1.4",
"eslint-plugin-jsdoc": "61.1.5",
"typescript": "5.9.3",
"vitest": "3.2.4"
}

@ -1,22 +1,22 @@
import { unconvert } from './unconvert.js'
import { writePlain } from './plain.js'
import { snappyCompress } from './snappy.js'
import { ByteWriter } from './bytewriter.js'
import { writeDataPageV2, writePageHeader } from './datapage.js'
import { writePlain } from './plain.js'
import { snappyCompress } from './snappy.js'
import { unconvert } from './unconvert.js'
/**
* @param {Writer} writer
* @param {SchemaElement[]} schemaPath
* @param {ColumnEncoder} column
* @param {DecodedArray} values
* @param {boolean} compressed
* @param {boolean} stats
* @returns {ColumnMetaData}
*/
export function writeColumn(writer, schemaPath, values, compressed, stats) {
const element = schemaPath[schemaPath.length - 1]
const { type, type_length } = element
if (!type) throw new Error(`column ${element.name} cannot determine type`)
export function writeColumn(writer, column, values, stats) {
const { columnName, element, schemaPath, compressed } = column
const { type } = element
if (!type) throw new Error(`column ${columnName} cannot determine type`)
const offsetStart = writer.offset
const num_values = values.length
/** @type {Encoding[]} */
const encodings = []
@ -42,11 +42,11 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) {
// write unconverted dictionary page
const unconverted = unconvert(element, dictionary)
writeDictionaryPage(writer, unconverted, type, type_length, compressed)
writeDictionaryPage(writer, column, unconverted)
// write data page with dictionary indexes
data_page_offset = BigInt(writer.offset)
writeDataPageV2(writer, indexes, schemaPath, 'RLE_DICTIONARY', compressed)
writeDataPageV2(writer, indexes, column, 'RLE_DICTIONARY')
encodings.push('RLE_DICTIONARY')
} else {
// unconvert values from rich types to simple
@ -54,7 +54,7 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) {
// write data page
const encoding = type === 'BOOLEAN' && values.length > 16 ? 'RLE' : 'PLAIN'
writeDataPageV2(writer, values, schemaPath, encoding, compressed)
writeDataPageV2(writer, values, column, encoding)
encodings.push(encoding)
}
@ -90,14 +90,15 @@ function useDictionary(values, type) {
/**
* @param {Writer} writer
* @param {ColumnEncoder} column
* @param {DecodedArray} dictionary
* @param {ParquetType} type
* @param {number | undefined} fixedLength
* @param {boolean} compressed
*/
function writeDictionaryPage(writer, dictionary, type, fixedLength, compressed) {
function writeDictionaryPage(writer, column, dictionary) {
const { element, compressed } = column
const { type, type_length } = element
if (!type) throw new Error(`column ${column.columnName} cannot determine type`)
const dictionaryPage = new ByteWriter()
writePlain(dictionaryPage, dictionary, type, fixedLength)
writePlain(dictionaryPage, dictionary, type, type_length)
// compress dictionary page data
let compressedDictionaryPage = dictionaryPage
@ -120,8 +121,8 @@ function writeDictionaryPage(writer, dictionary, type, fixedLength, compressed)
}
/**
* @import {ColumnMetaData, DecodedArray, Encoding, ParquetType, SchemaElement, Statistics} from 'hyparquet'
* @import {Writer} from '../src/types.js'
* @import {ColumnMetaData, DecodedArray, Encoding, ParquetType, Statistics} from 'hyparquet'
* @import {ColumnEncoder, Writer} from '../src/types.js'
* @param {DecodedArray} values
* @returns {Statistics}
*/

@ -7,23 +7,22 @@ import { serializeTCompactProtocol } from './thrift.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
/**
* @import {Writer} from '../src/types.js'
* @param {Writer} writer
* @param {DecodedArray} values
* @param {SchemaElement[]} schemaPath
* @param {ColumnEncoder} column
* @param {import('hyparquet').Encoding} encoding
* @param {boolean} compressed
*/
export function writeDataPageV2(writer, values, schemaPath, encoding, compressed) {
const { name, type, type_length, repetition_type } = schemaPath[schemaPath.length - 1]
export function writeDataPageV2(writer, values, column, encoding) {
const { columnName, element, compressed } = column
const { type, type_length, repetition_type } = element
if (!type) throw new Error(`column ${name} cannot determine type`)
if (repetition_type === 'REPEATED') throw new Error(`column ${name} repeated types not supported`)
if (!type) throw new Error(`column ${columnName} cannot determine type`)
if (repetition_type === 'REPEATED') throw new Error(`column ${columnName} repeated types not supported`)
// write levels to temp buffer
const levels = new ByteWriter()
const levelWriter = new ByteWriter()
const { definition_levels_byte_length, repetition_levels_byte_length, num_nulls }
= writeLevels(levels, schemaPath, values)
= writeLevels(levelWriter, column, values)
const nonnull = values.filter(v => v !== null && v !== undefined)
@ -54,8 +53,8 @@ export function writeDataPageV2(writer, values, schemaPath, encoding, compressed
// write page header
writePageHeader(writer, {
type: 'DATA_PAGE_V2',
uncompressed_page_size: levels.offset + page.offset,
compressed_page_size: levels.offset + compressedPage.offset,
uncompressed_page_size: levelWriter.offset + page.offset,
compressed_page_size: levelWriter.offset + compressedPage.offset,
data_page_header_v2: {
num_values: values.length,
num_nulls,
@ -68,7 +67,7 @@ export function writeDataPageV2(writer, values, schemaPath, encoding, compressed
})
// write levels
writer.appendBuffer(levels.getBuffer())
writer.appendBuffer(levelWriter.getBuffer())
// write page data
writer.appendBuffer(compressedPage.getBuffer())
@ -111,12 +110,19 @@ export function writePageHeader(writer, header) {
/**
* @import {DecodedArray, PageHeader, SchemaElement} from 'hyparquet'
* @import {ColumnEncoder, Writer} from '../src/types.js'
* @param {Writer} writer
* @param {SchemaElement[]} schemaPath
* @param {ColumnEncoder} column
* @param {DecodedArray} values
* @returns {{ definition_levels_byte_length: number, repetition_levels_byte_length: number, num_nulls: number}}
* @returns {{
* definition_levels_byte_length: number
* repetition_levels_byte_length: number
* num_nulls: number
* }}
*/
function writeLevels(writer, schemaPath, values) {
function writeLevels(writer, column, values) {
const { schemaPath } = column
let num_nulls = 0
// TODO: repetition levels

@ -1,3 +1,4 @@
import { getSchemaPath } from 'hyparquet/src/schema.js'
import { writeColumn } from './column.js'
import { writeMetadata } from './metadata.js'
@ -5,7 +6,7 @@ import { writeMetadata } from './metadata.js'
* ParquetWriter class allows incremental writing of parquet files.
*
* @import {ColumnChunk, FileMetaData, KeyValue, RowGroup, SchemaElement} from 'hyparquet'
* @import {ColumnSource, Writer} from '../src/types.js'
* @import {ColumnEncoder, ColumnSource, Writer} from '../src/types.js'
* @param {object} options
* @param {Writer} options.writer
* @param {SchemaElement[]} options.schema
@ -47,11 +48,39 @@ ParquetWriter.prototype.write = function({ columnData, rowGroupSize = 100000 })
// write columns
for (let j = 0; j < columnData.length; j++) {
const { data } = columnData[j]
const schemaPath = [this.schema[0], this.schema[j + 1]]
const { name, data } = columnData[j]
const groupData = data.slice(groupStartIndex, groupStartIndex + groupSize)
const schemaTree = getSchemaPath(this.schema, [name])
// Dive into the leaf element
while (true) {
const child = schemaTree[schemaTree.length - 1]
if (!child.element.num_children) {
break
} else if (child.element.num_children === 1) {
schemaTree.push(child.children[0])
} else {
throw new Error(`parquet column ${name} struct unsupported`)
}
}
const schemaPath = schemaTree.map(node => node.element)
const element = schemaPath.at(-1)
if (!element) throw new Error(`parquet column ${name} missing schema element`)
/** @type {ColumnEncoder} */
const column = {
columnName: name,
element,
schemaPath,
compressed: this.compressed,
}
const file_offset = BigInt(this.writer.offset)
const meta_data = writeColumn(this.writer, schemaPath, groupData, this.compressed, this.statistics)
const meta_data = writeColumn(
this.writer,
column,
groupData,
this.statistics
)
// save column chunk metadata
columns.push({

7
src/types.d.ts vendored

@ -31,6 +31,13 @@ export interface ColumnSource {
nullable?: boolean
}
export interface ColumnEncoder {
columnName: string
element: SchemaElement
schemaPath: SchemaElement[]
compressed: boolean
}
export interface Writer {
buffer: ArrayBuffer
view: DataView

@ -16,7 +16,7 @@ describe('parquetWrite round-trip', () => {
const schemaTree = parquetSchema(metadata)
const columnData = schemaTree.children.map(({ element }) => ({
name: element.name,
data: /** @type {any[]} */ ([]),
data: new Array(),
}))
for (const row of rows) {
for (const { name, data } of columnData) {