From d70f7278259b66cbe2bd173f4300992e7d64e8bf Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Thu, 27 Mar 2025 23:30:32 -0700 Subject: [PATCH] Dictionary encoding --- package.json | 2 +- src/column.js | 151 +++++++++++++++++++++++++++++++++++---------- src/encoding.js | 7 ++- test/write.test.js | 8 ++- 4 files changed, 130 insertions(+), 38 deletions(-) diff --git a/package.json b/package.json index 6407dce..d3f82a7 100644 --- a/package.json +++ b/package.json @@ -41,7 +41,7 @@ }, "devDependencies": { "@babel/eslint-parser": "7.27.0", - "@types/node": "22.13.13", + "@types/node": "22.13.14", "@vitest/coverage-v8": "3.0.9", "eslint": "9.23.0", "eslint-plugin-jsdoc": "50.6.9", diff --git a/src/column.js b/src/column.js index f41c000..f6a044a 100644 --- a/src/column.js +++ b/src/column.js @@ -19,43 +19,52 @@ export function writeColumn(writer, schemaPath, values, compressed) { const schemaElement = schemaPath[schemaPath.length - 1] const { type } = schemaElement if (!type) throw new Error(`column ${schemaElement.name} cannot determine type`) + let dataType = type const offsetStart = writer.offset const num_values = values.length - let num_nulls = 0 // Write levels to temp buffer const levels = new Writer() + const { definition_levels_byte_length, repetition_levels_byte_length, num_nulls } = writeLevels(levels, schemaPath, values) - // TODO: repetition levels - const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) - let repetition_levels_byte_length = 0 - if (maxRepetitionLevel) { - repetition_levels_byte_length = writeRleBitPackedHybrid(levels, []) - } + // dictionary encoding + let dictionary_page_offset = undefined + /** @type {DecodedArray | undefined} */ + let dictionary = useDictionary(values, dataType) + if (dictionary) { + dictionary_page_offset = BigInt(writer.offset) - // definition levels - const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) - let definition_levels_byte_length = 0 - if (maxDefinitionLevel) { - const definitionLevels = [] - for (const value of values) { - if (value === null || value === undefined) { - definitionLevels.push(maxDefinitionLevel - 1) - num_nulls++ - } else { - definitionLevels.push(maxDefinitionLevel) - } + // replace values with dictionary indices + const indexes = new Int32Array(values.length) + for (let i = 0; i < values.length; i++) { + indexes[i] = dictionary.indexOf(values[i]) } - definition_levels_byte_length = writeRleBitPackedHybrid(levels, definitionLevels) - } + values = indexes + dataType = 'INT32' - // Unconvert type and filter out nulls - values = unconvert(schemaElement, values) - .filter(v => v !== null && v !== undefined) + // unconvert dictionary and filter out nulls + dictionary = unconvert(schemaElement, dictionary) + .filter(v => v !== null && v !== undefined) + + // write dictionary page data + writeDictionaryPage(writer, dictionary, type, compressed) + } else { + // unconvert type and filter out nulls + values = unconvert(schemaElement, values) + .filter(v => v !== null && v !== undefined) + } // write page data to temp buffer const page = new Writer() - writePageData(page, values, type) + /** @type {import('hyparquet').Encoding} */ + const encoding = dictionary ? 'RLE_DICTIONARY' : 'PLAIN' + if (dictionary) { + const bitWidth = Math.ceil(Math.log2(dictionary.length)) + page.appendUint8(bitWidth) + writeRleBitPackedHybrid(page, values) + } else { + writePlain(page, values, type) + } // compress page data let compressedPage = page @@ -65,6 +74,7 @@ export function writeColumn(writer, schemaPath, values, compressed) { } // write page header + const data_page_offset = BigInt(writer.offset) /** @type {PageHeader} */ const header = { type: 'DATA_PAGE_V2', @@ -74,7 +84,7 @@ export function writeColumn(writer, schemaPath, values, compressed) { num_values, num_nulls, num_rows: num_values, - encoding: 'PLAIN', + encoding: dictionary ? 'RLE_DICTIONARY' : encoding, definition_levels_byte_length, repetition_levels_byte_length, is_compressed: true, @@ -90,13 +100,14 @@ export function writeColumn(writer, schemaPath, values, compressed) { return { type, - encodings: ['PLAIN'], + encodings: [encoding], path_in_schema: schemaPath.slice(1).map(s => s.name), codec: compressed ? 'SNAPPY' : 'UNCOMPRESSED', num_values: BigInt(num_values), total_compressed_size: BigInt(writer.offset - offsetStart), total_uncompressed_size: BigInt(writer.offset - offsetStart), - data_page_offset: BigInt(offsetStart), + data_page_offset, + dictionary_page_offset, } } @@ -109,6 +120,10 @@ function writePageHeader(writer, header) { field_1: PageType.indexOf(header.type), field_2: header.uncompressed_page_size, field_3: header.compressed_page_size, + field_7: header.dictionary_page_header && { + field_1: header.dictionary_page_header.num_values, + field_2: Encoding.indexOf(header.dictionary_page_header.encoding), + }, field_8: header.data_page_header_v2 && { field_1: header.data_page_header_v2.num_values, field_2: header.data_page_header_v2.num_nulls, @@ -123,11 +138,83 @@ function writePageHeader(writer, header) { } /** - * @param {Writer} writer * @param {DecodedArray} values * @param {ParquetType} type + * @returns {any[] | undefined} */ -function writePageData(writer, values, type) { - // write plain data - writePlain(writer, values, type) +function useDictionary(values, type) { + if (type === 'BOOLEAN') return + const unique = new Set(values) + if (values.length > 10 && values.length / unique.size > 0.1) { + if (unique.size < values.length) { + // TODO: sort by frequency + return Array.from(unique) + } + } +} + +/** + * @param {Writer} writer + * @param {DecodedArray} dictionary + * @param {ParquetType} type + * @param {boolean} compressed + */ +function writeDictionaryPage(writer, dictionary, type, compressed) { + const dictionaryPage = new Writer() + writePlain(dictionaryPage, dictionary, type) + + // compress dictionary page data + let compressedDictionaryPage = dictionaryPage + if (compressed) { + compressedDictionaryPage = new Writer() + snappyCompress(compressedDictionaryPage, new Uint8Array(dictionaryPage.getBuffer())) + } + + // write dictionary page header + /** @type {PageHeader} */ + const dictionaryHeader = { + type: 'DICTIONARY_PAGE', + uncompressed_page_size: dictionaryPage.offset, + compressed_page_size: compressedDictionaryPage.offset, + dictionary_page_header: { + num_values: dictionary.length, + encoding: 'PLAIN', + }, + } + writePageHeader(writer, dictionaryHeader) + writer.appendBuffer(compressedDictionaryPage.getBuffer()) +} + +/** + * @param {Writer} writer + * @param {SchemaElement[]} schemaPath + * @param {DecodedArray} values + * @returns {{ definition_levels_byte_length: number, repetition_levels_byte_length: number, num_nulls: number}} + */ +function writeLevels(writer, schemaPath, values) { + let num_nulls = 0 + + // TODO: repetition levels + const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) + let repetition_levels_byte_length = 0 + if (maxRepetitionLevel) { + repetition_levels_byte_length = writeRleBitPackedHybrid(writer, []) + } + + // definition levels + const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) + let definition_levels_byte_length = 0 + if (maxDefinitionLevel) { + const definitionLevels = [] + for (const value of values) { + if (value === null || value === undefined) { + definitionLevels.push(maxDefinitionLevel - 1) + num_nulls++ + } else { + definitionLevels.push(maxDefinitionLevel) + } + } + definition_levels_byte_length = writeRleBitPackedHybrid(writer, definitionLevels) + } + return { definition_levels_byte_length, repetition_levels_byte_length, num_nulls } } diff --git a/src/encoding.js b/src/encoding.js index fa30f0e..661fc1e 100644 --- a/src/encoding.js +++ b/src/encoding.js @@ -1,8 +1,9 @@ import { Writer } from './writer.js' /** + * @import {DecodedArray} from 'hyparquet' * @param {Writer} writer - * @param {number[]} values + * @param {DecodedArray} values * @returns {number} bytes written */ export function writeRleBitPackedHybrid(writer, values) { @@ -27,7 +28,7 @@ export function writeRleBitPackedHybrid(writer, values) { /** * @param {Writer} writer - * @param {number[]} values + * @param {DecodedArray} values * @param {number} bitWidth */ function writeBitPacked(writer, values, bitWidth) { @@ -87,7 +88,7 @@ function writeBitPacked(writer, values, bitWidth) { * Run-length encoding: write repeated values by encoding the value and its count. * * @param {Writer} writer - * @param {number[]} values + * @param {DecodedArray} values * @param {number} bitWidth */ function writeRle(writer, values, bitWidth) { diff --git a/test/write.test.js b/test/write.test.js index 285c8be..d00dabb 100644 --- a/test/write.test.js +++ b/test/write.test.js @@ -74,10 +74,14 @@ describe('parquetWrite', () => { expect(file.byteLength).toBe(10135) }) - it('efficiently represents column with few distinct values', () => { + it('efficiently serializes column with few distinct values', async () => { const data = Array(10000).fill('aaaa') const file = parquetWrite({ columnData: [{ name: 'string', data }] }) - expect(file.byteLength).toBe(3908) + expect(file.byteLength).toBe(161) + // round trip + const result = await parquetReadObjects({ file }) + expect(result.length).toBe(10000) + expect(result[0]).toEqual({ string: 'aaaa' }) }) it('serializes list types', async () => {