diff --git a/package.json b/package.json index 14a3eb2..92c8852 100644 --- a/package.json +++ b/package.json @@ -56,10 +56,10 @@ }, "devDependencies": { "@babel/eslint-parser": "7.28.4", - "@types/node": "24.8.1", + "@types/node": "24.9.1", "@vitest/coverage-v8": "3.2.4", "eslint": "9.38.0", - "eslint-plugin-jsdoc": "61.1.4", + "eslint-plugin-jsdoc": "61.1.5", "typescript": "5.9.3", "vitest": "3.2.4" } diff --git a/src/column.js b/src/column.js index a781290..1d97545 100644 --- a/src/column.js +++ b/src/column.js @@ -1,22 +1,22 @@ -import { unconvert } from './unconvert.js' -import { writePlain } from './plain.js' -import { snappyCompress } from './snappy.js' import { ByteWriter } from './bytewriter.js' import { writeDataPageV2, writePageHeader } from './datapage.js' +import { writePlain } from './plain.js' +import { snappyCompress } from './snappy.js' +import { unconvert } from './unconvert.js' /** * @param {Writer} writer - * @param {SchemaElement[]} schemaPath + * @param {ColumnEncoder} column * @param {DecodedArray} values - * @param {boolean} compressed * @param {boolean} stats * @returns {ColumnMetaData} */ -export function writeColumn(writer, schemaPath, values, compressed, stats) { - const element = schemaPath[schemaPath.length - 1] - const { type, type_length } = element - if (!type) throw new Error(`column ${element.name} cannot determine type`) +export function writeColumn(writer, column, values, stats) { + const { columnName, element, schemaPath, compressed } = column + const { type } = element + if (!type) throw new Error(`column ${columnName} cannot determine type`) const offsetStart = writer.offset + const num_values = values.length /** @type {Encoding[]} */ const encodings = [] @@ -42,11 +42,11 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) { // write unconverted dictionary page const unconverted = unconvert(element, dictionary) - writeDictionaryPage(writer, unconverted, type, type_length, compressed) + writeDictionaryPage(writer, column, unconverted) // write data page with dictionary indexes data_page_offset = BigInt(writer.offset) - writeDataPageV2(writer, indexes, schemaPath, 'RLE_DICTIONARY', compressed) + writeDataPageV2(writer, indexes, column, 'RLE_DICTIONARY') encodings.push('RLE_DICTIONARY') } else { // unconvert values from rich types to simple @@ -54,7 +54,7 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) { // write data page const encoding = type === 'BOOLEAN' && values.length > 16 ? 'RLE' : 'PLAIN' - writeDataPageV2(writer, values, schemaPath, encoding, compressed) + writeDataPageV2(writer, values, column, encoding) encodings.push(encoding) } @@ -90,14 +90,15 @@ function useDictionary(values, type) { /** * @param {Writer} writer + * @param {ColumnEncoder} column * @param {DecodedArray} dictionary - * @param {ParquetType} type - * @param {number | undefined} fixedLength - * @param {boolean} compressed */ -function writeDictionaryPage(writer, dictionary, type, fixedLength, compressed) { +function writeDictionaryPage(writer, column, dictionary) { + const { element, compressed } = column + const { type, type_length } = element + if (!type) throw new Error(`column ${column.columnName} cannot determine type`) const dictionaryPage = new ByteWriter() - writePlain(dictionaryPage, dictionary, type, fixedLength) + writePlain(dictionaryPage, dictionary, type, type_length) // compress dictionary page data let compressedDictionaryPage = dictionaryPage @@ -120,8 +121,8 @@ function writeDictionaryPage(writer, dictionary, type, fixedLength, compressed) } /** - * @import {ColumnMetaData, DecodedArray, Encoding, ParquetType, SchemaElement, Statistics} from 'hyparquet' - * @import {Writer} from '../src/types.js' + * @import {ColumnMetaData, DecodedArray, Encoding, ParquetType, Statistics} from 'hyparquet' + * @import {ColumnEncoder, Writer} from '../src/types.js' * @param {DecodedArray} values * @returns {Statistics} */ diff --git a/src/datapage.js b/src/datapage.js index bb4704d..c1464e4 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -7,23 +7,22 @@ import { serializeTCompactProtocol } from './thrift.js' import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' /** - * @import {Writer} from '../src/types.js' * @param {Writer} writer * @param {DecodedArray} values - * @param {SchemaElement[]} schemaPath + * @param {ColumnEncoder} column * @param {import('hyparquet').Encoding} encoding - * @param {boolean} compressed */ -export function writeDataPageV2(writer, values, schemaPath, encoding, compressed) { - const { name, type, type_length, repetition_type } = schemaPath[schemaPath.length - 1] +export function writeDataPageV2(writer, values, column, encoding) { + const { columnName, element, compressed } = column + const { type, type_length, repetition_type } = element - if (!type) throw new Error(`column ${name} cannot determine type`) - if (repetition_type === 'REPEATED') throw new Error(`column ${name} repeated types not supported`) + if (!type) throw new Error(`column ${columnName} cannot determine type`) + if (repetition_type === 'REPEATED') throw new Error(`column ${columnName} repeated types not supported`) // write levels to temp buffer - const levels = new ByteWriter() + const levelWriter = new ByteWriter() const { definition_levels_byte_length, repetition_levels_byte_length, num_nulls } - = writeLevels(levels, schemaPath, values) + = writeLevels(levelWriter, column, values) const nonnull = values.filter(v => v !== null && v !== undefined) @@ -54,8 +53,8 @@ export function writeDataPageV2(writer, values, schemaPath, encoding, compressed // write page header writePageHeader(writer, { type: 'DATA_PAGE_V2', - uncompressed_page_size: levels.offset + page.offset, - compressed_page_size: levels.offset + compressedPage.offset, + uncompressed_page_size: levelWriter.offset + page.offset, + compressed_page_size: levelWriter.offset + compressedPage.offset, data_page_header_v2: { num_values: values.length, num_nulls, @@ -68,7 +67,7 @@ export function writeDataPageV2(writer, values, schemaPath, encoding, compressed }) // write levels - writer.appendBuffer(levels.getBuffer()) + writer.appendBuffer(levelWriter.getBuffer()) // write page data writer.appendBuffer(compressedPage.getBuffer()) @@ -111,12 +110,19 @@ export function writePageHeader(writer, header) { /** * @import {DecodedArray, PageHeader, SchemaElement} from 'hyparquet' + * @import {ColumnEncoder, Writer} from '../src/types.js' * @param {Writer} writer - * @param {SchemaElement[]} schemaPath + * @param {ColumnEncoder} column * @param {DecodedArray} values - * @returns {{ definition_levels_byte_length: number, repetition_levels_byte_length: number, num_nulls: number}} + * @returns {{ + * definition_levels_byte_length: number + * repetition_levels_byte_length: number + * num_nulls: number + * }} */ -function writeLevels(writer, schemaPath, values) { +function writeLevels(writer, column, values) { + const { schemaPath } = column + let num_nulls = 0 // TODO: repetition levels diff --git a/src/parquet-writer.js b/src/parquet-writer.js index 4c6ff20..64da80d 100644 --- a/src/parquet-writer.js +++ b/src/parquet-writer.js @@ -1,3 +1,4 @@ +import { getSchemaPath } from 'hyparquet/src/schema.js' import { writeColumn } from './column.js' import { writeMetadata } from './metadata.js' @@ -5,7 +6,7 @@ import { writeMetadata } from './metadata.js' * ParquetWriter class allows incremental writing of parquet files. * * @import {ColumnChunk, FileMetaData, KeyValue, RowGroup, SchemaElement} from 'hyparquet' - * @import {ColumnSource, Writer} from '../src/types.js' + * @import {ColumnEncoder, ColumnSource, Writer} from '../src/types.js' * @param {object} options * @param {Writer} options.writer * @param {SchemaElement[]} options.schema @@ -47,11 +48,39 @@ ParquetWriter.prototype.write = function({ columnData, rowGroupSize = 100000 }) // write columns for (let j = 0; j < columnData.length; j++) { - const { data } = columnData[j] - const schemaPath = [this.schema[0], this.schema[j + 1]] + const { name, data } = columnData[j] const groupData = data.slice(groupStartIndex, groupStartIndex + groupSize) + + const schemaTree = getSchemaPath(this.schema, [name]) + // Dive into the leaf element + while (true) { + const child = schemaTree[schemaTree.length - 1] + if (!child.element.num_children) { + break + } else if (child.element.num_children === 1) { + schemaTree.push(child.children[0]) + } else { + throw new Error(`parquet column ${name} struct unsupported`) + } + } + const schemaPath = schemaTree.map(node => node.element) + const element = schemaPath.at(-1) + if (!element) throw new Error(`parquet column ${name} missing schema element`) + /** @type {ColumnEncoder} */ + const column = { + columnName: name, + element, + schemaPath, + compressed: this.compressed, + } + const file_offset = BigInt(this.writer.offset) - const meta_data = writeColumn(this.writer, schemaPath, groupData, this.compressed, this.statistics) + const meta_data = writeColumn( + this.writer, + column, + groupData, + this.statistics + ) // save column chunk metadata columns.push({ diff --git a/src/types.d.ts b/src/types.d.ts index f5a168e..0656e36 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -31,6 +31,13 @@ export interface ColumnSource { nullable?: boolean } +export interface ColumnEncoder { + columnName: string + element: SchemaElement + schemaPath: SchemaElement[] + compressed: boolean +} + export interface Writer { buffer: ArrayBuffer view: DataView diff --git a/test/write.roundtrip.test.js b/test/write.roundtrip.test.js index f79b40f..a6b4c4f 100644 --- a/test/write.roundtrip.test.js +++ b/test/write.roundtrip.test.js @@ -16,7 +16,7 @@ describe('parquetWrite round-trip', () => { const schemaTree = parquetSchema(metadata) const columnData = schemaTree.children.map(({ element }) => ({ name: element.name, - data: /** @type {any[]} */ ([]), + data: new Array(), })) for (const row of rows) { for (const { name, data } of columnData) {