From c30f8fa8eba21142930a789c3a701965c61a8045 Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Wed, 26 Nov 2025 02:42:39 -0800 Subject: [PATCH] User encoding options --- src/column.js | 13 +++--- src/datapage.js | 22 +++++++++- src/parquet-writer.js | 3 +- src/types.d.ts | 1 + test/write.buffer.test.js | 15 +++++++ test/write.delta.test.js | 85 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 130 insertions(+), 9 deletions(-) create mode 100644 test/write.delta.test.js diff --git a/src/column.js b/src/column.js index 5cecea3..99502a1 100644 --- a/src/column.js +++ b/src/column.js @@ -14,7 +14,7 @@ import { unconvert } from './unconvert.js' * @returns {ColumnChunk} */ export function writeColumn(writer, column, values, stats) { - const { columnName, element, schemaPath, compressed } = column + const { columnName, element, schemaPath, compressed, encoding: userEncoding } = column const { type } = element if (!type) throw new Error(`column ${columnName} cannot determine type`) const offsetStart = writer.offset @@ -42,8 +42,7 @@ export function writeColumn(writer, column, values, stats) { // dictionary encoding let dictionary_page_offset let data_page_offset = BigInt(writer.offset) - /** @type {DecodedArray | undefined} */ - const dictionary = useDictionary(values, type) + const dictionary = useDictionary(values, type, userEncoding) if (dictionary) { dictionary_page_offset = BigInt(writer.offset) @@ -68,7 +67,7 @@ export function writeColumn(writer, column, values, stats) { values = unconvert(element, values) // write data page - const encoding = type === 'BOOLEAN' && values.length > 16 ? 'RLE' : 'PLAIN' + const encoding = userEncoding ?? (type === 'BOOLEAN' && values.length > 16 ? 'RLE' : 'PLAIN') writeDataPageV2(writer, values, column, encoding, pageData) encodings.push(encoding) } @@ -94,9 +93,11 @@ export function writeColumn(writer, column, values, stats) { /** * @param {DecodedArray} values * @param {ParquetType} type + * @param {Encoding | undefined} encoding * @returns {any[] | undefined} */ -function useDictionary(values, type) { +function useDictionary(values, type, encoding) { + if (encoding && encoding !== 'RLE_DICTIONARY') return if (type === 'BOOLEAN') return const unique = new Set(values) unique.delete(undefined) @@ -140,7 +141,7 @@ function writeDictionaryPage(writer, column, dictionary) { } /** - * @import {ColumnChunk, ColumnMetaData, DecodedArray, Encoding, ParquetType, SchemaElement, Statistics} from 'hyparquet' + * @import {ColumnChunk, DecodedArray, Encoding, ParquetType, SchemaElement, Statistics} from 'hyparquet' * @import {ColumnEncoder, PageData, Writer} from '../src/types.js' * @param {DecodedArray} values * @returns {Statistics} diff --git a/src/datapage.js b/src/datapage.js index b9dca2f..7069eed 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -1,5 +1,6 @@ import { Encodings, PageTypes } from 'hyparquet/src/constants.js' import { ByteWriter } from './bytewriter.js' +import { deltaBinaryPack, deltaByteArray, deltaLengthByteArray } from './delta.js' import { writeRleBitPackedHybrid } from './encoding.js' import { writePlain } from './plain.js' import { snappyCompress } from './snappy.js' @@ -33,7 +34,9 @@ export function writeDataPageV2(writer, values, column, encoding, listValues) { // write page data to temp buffer const page = new ByteWriter() - if (encoding === 'RLE') { + if (encoding === 'PLAIN') { + writePlain(page, nonnull, type, type_length) + } else if (encoding === 'RLE') { if (type !== 'BOOLEAN') throw new Error('RLE encoding only supported for BOOLEAN type') page.appendUint32(nonnull.length) // prepend length writeRleBitPackedHybrid(page, nonnull, 1) @@ -44,8 +47,23 @@ export function writeDataPageV2(writer, values, column, encoding, listValues) { const bitWidth = Math.ceil(Math.log2(maxValue + 1)) page.appendUint8(bitWidth) // prepend bitWidth writeRleBitPackedHybrid(page, nonnull, bitWidth) + } else if (encoding === 'DELTA_BINARY_PACKED') { + if (type !== 'INT32' && type !== 'INT64') { + throw new Error('DELTA_BINARY_PACKED encoding only supported for INT32 and INT64 types') + } + deltaBinaryPack(page, nonnull) + } else if (encoding === 'DELTA_LENGTH_BYTE_ARRAY') { + if (type !== 'BYTE_ARRAY') { + throw new Error('DELTA_LENGTH_BYTE_ARRAY encoding only supported for BYTE_ARRAY type') + } + deltaLengthByteArray(page, nonnull) + } else if (encoding === 'DELTA_BYTE_ARRAY') { + if (type !== 'BYTE_ARRAY') { + throw new Error('DELTA_BYTE_ARRAY encoding only supported for BYTE_ARRAY type') + } + deltaByteArray(page, nonnull) } else { - writePlain(page, nonnull, type, type_length) + throw new Error(`parquet unsupported encoding: ${encoding}`) } // compress page data diff --git a/src/parquet-writer.js b/src/parquet-writer.js index c940522..fb241af 100644 --- a/src/parquet-writer.js +++ b/src/parquet-writer.js @@ -48,7 +48,7 @@ ParquetWriter.prototype.write = function({ columnData, rowGroupSize = 100000 }) // write columns for (let j = 0; j < columnData.length; j++) { - const { name, data } = columnData[j] + const { name, data, encoding } = columnData[j] const groupData = data.slice(groupStartIndex, groupStartIndex + groupSize) const schemaTree = getSchemaPath(this.schema, [name]) @@ -72,6 +72,7 @@ ParquetWriter.prototype.write = function({ columnData, rowGroupSize = 100000 }) element, schemaPath, compressed: this.compressed, + encoding, } const columnChunk = writeColumn( diff --git a/src/types.d.ts b/src/types.d.ts index adbfd32..9cdc239 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -46,6 +46,7 @@ export interface ColumnEncoder { element: SchemaElement schemaPath: SchemaElement[] compressed: boolean + encoding?: Encoding // user-specified encoding } export interface Writer { diff --git a/test/write.buffer.test.js b/test/write.buffer.test.js index 0099307..a8ad221 100644 --- a/test/write.buffer.test.js +++ b/test/write.buffer.test.js @@ -321,4 +321,19 @@ describe('parquetWriteBuffer', () => { expect(() => parquetWriteBuffer({ columnData: [{ name: 'func', data: [() => {}] }] })) .toThrow('cannot determine parquet type for: () => {}') }) + + it('skips dictionary encoding when encoding is specified', async () => { + // This data would normally use dictionary encoding due to low cardinality + const data = Array(1000).fill(1).map((_, i) => i % 10) + const file = parquetWriteBuffer({ columnData: [{ name: 'int', data, encoding: 'PLAIN' }] }) + const metadata = parquetMetadata(file) + expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['PLAIN']) + const result = await parquetReadObjects({ file }) + expect(result).toEqual(data.map(int => ({ int }))) + }) + + it('throws for BYTE_STREAM_SPLIT encoding', () => { + expect(() => parquetWriteBuffer({ columnData: [{ name: 'float', data: [1.0, 2.0, 3.0], encoding: 'BYTE_STREAM_SPLIT' }] })) + .toThrow('parquet unsupported encoding: BYTE_STREAM_SPLIT') + }) }) diff --git a/test/write.delta.test.js b/test/write.delta.test.js new file mode 100644 index 0000000..564c540 --- /dev/null +++ b/test/write.delta.test.js @@ -0,0 +1,85 @@ +import { parquetMetadata, parquetReadObjects } from 'hyparquet' +import { describe, expect, it } from 'vitest' +import { parquetWriteBuffer } from '../src/index.js' + +describe('DELTA_BINARY_PACKED encoding', () => { + it('writes DELTA_BINARY_PACKED encoding for INT32', async () => { + const data = [1, 2, 3, 100, 200, 300] + const file = parquetWriteBuffer({ + columnData: [{ name: 'int', data, encoding: 'DELTA_BINARY_PACKED' }], + }) + const metadata = parquetMetadata(file) + expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['DELTA_BINARY_PACKED']) + const result = await parquetReadObjects({ file }) + expect(result).toEqual(data.map(int => ({ int }))) + }) + + it('writes DELTA_BINARY_PACKED encoding for INT64', async () => { + const data = [1n, 2n, 3n, 100n, 200n, 300n] + const file = parquetWriteBuffer({ + columnData: [{ name: 'bigint', data, encoding: 'DELTA_BINARY_PACKED' }], + }) + const metadata = parquetMetadata(file) + expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['DELTA_BINARY_PACKED']) + const result = await parquetReadObjects({ file }) + expect(result).toEqual(data.map(bigint => ({ bigint }))) + }) +}) + +describe('DELTA_LENGTH_BYTE_ARRAY encoding', () => { + it('writes DELTA_LENGTH_BYTE_ARRAY encoding for strings', async () => { + const data = ['hello', 'world', 'foo', 'bar', 'baz', 'qux'] + const file = parquetWriteBuffer({ + columnData: [{ name: 'string', data, encoding: 'DELTA_LENGTH_BYTE_ARRAY' }], + }) + const metadata = parquetMetadata(file) + expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['DELTA_LENGTH_BYTE_ARRAY']) + const result = await parquetReadObjects({ file }) + expect(result).toEqual(data.map(string => ({ string }))) + }) + + it('writes DELTA_LENGTH_BYTE_ARRAY encoding for byte arrays', async () => { + const data = [ + Uint8Array.of(1, 2, 3), + Uint8Array.of(4, 5, 6, 7), + Uint8Array.of(8, 9), + Uint8Array.of(10, 11, 12, 13, 14), + ] + const file = parquetWriteBuffer({ + columnData: [{ name: 'bytes', data, encoding: 'DELTA_LENGTH_BYTE_ARRAY' }], + }) + const metadata = parquetMetadata(file) + expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['DELTA_LENGTH_BYTE_ARRAY']) + const result = await parquetReadObjects({ file, utf8: false }) + expect(result).toEqual(data.map(bytes => ({ bytes }))) + }) +}) + +describe('DELTA_BYTE_ARRAY encoding', () => { + it('writes DELTA_BYTE_ARRAY encoding for strings with common prefixes', async () => { + const data = ['apple', 'application', 'apply', 'banana', 'band', 'bandana'] + const file = parquetWriteBuffer({ + columnData: [{ name: 'string', data, encoding: 'DELTA_BYTE_ARRAY' }], + }) + const metadata = parquetMetadata(file) + expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['DELTA_BYTE_ARRAY']) + const result = await parquetReadObjects({ file }) + expect(result).toEqual(data.map(string => ({ string }))) + }) + + it('writes DELTA_BYTE_ARRAY encoding for byte arrays', async () => { + const data = [ + Uint8Array.of(1, 2, 3, 4), + Uint8Array.of(1, 2, 5, 6), + Uint8Array.of(1, 2, 7, 8), + Uint8Array.of(10, 11, 12, 13), + ] + const file = parquetWriteBuffer({ + columnData: [{ name: 'bytes', data, encoding: 'DELTA_BYTE_ARRAY' }], + }) + const metadata = parquetMetadata(file) + expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['DELTA_BYTE_ARRAY']) + const result = await parquetReadObjects({ file, utf8: false }) + expect(result).toEqual(data.map(bytes => ({ bytes }))) + }) +})