mirror of
https://github.com/asadbek064/hyparquet-writer.git
synced 2025-12-15 18:56:37 +00:00
Dictionary encoding
This commit is contained in:
parent
2e0431d815
commit
d70f727825
@ -41,7 +41,7 @@
|
||||
},
|
||||
"devDependencies": {
|
||||
"@babel/eslint-parser": "7.27.0",
|
||||
"@types/node": "22.13.13",
|
||||
"@types/node": "22.13.14",
|
||||
"@vitest/coverage-v8": "3.0.9",
|
||||
"eslint": "9.23.0",
|
||||
"eslint-plugin-jsdoc": "50.6.9",
|
||||
|
||||
151
src/column.js
151
src/column.js
@ -19,43 +19,52 @@ export function writeColumn(writer, schemaPath, values, compressed) {
|
||||
const schemaElement = schemaPath[schemaPath.length - 1]
|
||||
const { type } = schemaElement
|
||||
if (!type) throw new Error(`column ${schemaElement.name} cannot determine type`)
|
||||
let dataType = type
|
||||
const offsetStart = writer.offset
|
||||
const num_values = values.length
|
||||
let num_nulls = 0
|
||||
|
||||
// Write levels to temp buffer
|
||||
const levels = new Writer()
|
||||
const { definition_levels_byte_length, repetition_levels_byte_length, num_nulls } = writeLevels(levels, schemaPath, values)
|
||||
|
||||
// TODO: repetition levels
|
||||
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
|
||||
let repetition_levels_byte_length = 0
|
||||
if (maxRepetitionLevel) {
|
||||
repetition_levels_byte_length = writeRleBitPackedHybrid(levels, [])
|
||||
}
|
||||
// dictionary encoding
|
||||
let dictionary_page_offset = undefined
|
||||
/** @type {DecodedArray | undefined} */
|
||||
let dictionary = useDictionary(values, dataType)
|
||||
if (dictionary) {
|
||||
dictionary_page_offset = BigInt(writer.offset)
|
||||
|
||||
// definition levels
|
||||
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
|
||||
let definition_levels_byte_length = 0
|
||||
if (maxDefinitionLevel) {
|
||||
const definitionLevels = []
|
||||
for (const value of values) {
|
||||
if (value === null || value === undefined) {
|
||||
definitionLevels.push(maxDefinitionLevel - 1)
|
||||
num_nulls++
|
||||
} else {
|
||||
definitionLevels.push(maxDefinitionLevel)
|
||||
}
|
||||
// replace values with dictionary indices
|
||||
const indexes = new Int32Array(values.length)
|
||||
for (let i = 0; i < values.length; i++) {
|
||||
indexes[i] = dictionary.indexOf(values[i])
|
||||
}
|
||||
definition_levels_byte_length = writeRleBitPackedHybrid(levels, definitionLevels)
|
||||
}
|
||||
values = indexes
|
||||
dataType = 'INT32'
|
||||
|
||||
// Unconvert type and filter out nulls
|
||||
values = unconvert(schemaElement, values)
|
||||
.filter(v => v !== null && v !== undefined)
|
||||
// unconvert dictionary and filter out nulls
|
||||
dictionary = unconvert(schemaElement, dictionary)
|
||||
.filter(v => v !== null && v !== undefined)
|
||||
|
||||
// write dictionary page data
|
||||
writeDictionaryPage(writer, dictionary, type, compressed)
|
||||
} else {
|
||||
// unconvert type and filter out nulls
|
||||
values = unconvert(schemaElement, values)
|
||||
.filter(v => v !== null && v !== undefined)
|
||||
}
|
||||
|
||||
// write page data to temp buffer
|
||||
const page = new Writer()
|
||||
writePageData(page, values, type)
|
||||
/** @type {import('hyparquet').Encoding} */
|
||||
const encoding = dictionary ? 'RLE_DICTIONARY' : 'PLAIN'
|
||||
if (dictionary) {
|
||||
const bitWidth = Math.ceil(Math.log2(dictionary.length))
|
||||
page.appendUint8(bitWidth)
|
||||
writeRleBitPackedHybrid(page, values)
|
||||
} else {
|
||||
writePlain(page, values, type)
|
||||
}
|
||||
|
||||
// compress page data
|
||||
let compressedPage = page
|
||||
@ -65,6 +74,7 @@ export function writeColumn(writer, schemaPath, values, compressed) {
|
||||
}
|
||||
|
||||
// write page header
|
||||
const data_page_offset = BigInt(writer.offset)
|
||||
/** @type {PageHeader} */
|
||||
const header = {
|
||||
type: 'DATA_PAGE_V2',
|
||||
@ -74,7 +84,7 @@ export function writeColumn(writer, schemaPath, values, compressed) {
|
||||
num_values,
|
||||
num_nulls,
|
||||
num_rows: num_values,
|
||||
encoding: 'PLAIN',
|
||||
encoding: dictionary ? 'RLE_DICTIONARY' : encoding,
|
||||
definition_levels_byte_length,
|
||||
repetition_levels_byte_length,
|
||||
is_compressed: true,
|
||||
@ -90,13 +100,14 @@ export function writeColumn(writer, schemaPath, values, compressed) {
|
||||
|
||||
return {
|
||||
type,
|
||||
encodings: ['PLAIN'],
|
||||
encodings: [encoding],
|
||||
path_in_schema: schemaPath.slice(1).map(s => s.name),
|
||||
codec: compressed ? 'SNAPPY' : 'UNCOMPRESSED',
|
||||
num_values: BigInt(num_values),
|
||||
total_compressed_size: BigInt(writer.offset - offsetStart),
|
||||
total_uncompressed_size: BigInt(writer.offset - offsetStart),
|
||||
data_page_offset: BigInt(offsetStart),
|
||||
data_page_offset,
|
||||
dictionary_page_offset,
|
||||
}
|
||||
}
|
||||
|
||||
@ -109,6 +120,10 @@ function writePageHeader(writer, header) {
|
||||
field_1: PageType.indexOf(header.type),
|
||||
field_2: header.uncompressed_page_size,
|
||||
field_3: header.compressed_page_size,
|
||||
field_7: header.dictionary_page_header && {
|
||||
field_1: header.dictionary_page_header.num_values,
|
||||
field_2: Encoding.indexOf(header.dictionary_page_header.encoding),
|
||||
},
|
||||
field_8: header.data_page_header_v2 && {
|
||||
field_1: header.data_page_header_v2.num_values,
|
||||
field_2: header.data_page_header_v2.num_nulls,
|
||||
@ -123,11 +138,83 @@ function writePageHeader(writer, header) {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Writer} writer
|
||||
* @param {DecodedArray} values
|
||||
* @param {ParquetType} type
|
||||
* @returns {any[] | undefined}
|
||||
*/
|
||||
function writePageData(writer, values, type) {
|
||||
// write plain data
|
||||
writePlain(writer, values, type)
|
||||
function useDictionary(values, type) {
|
||||
if (type === 'BOOLEAN') return
|
||||
const unique = new Set(values)
|
||||
if (values.length > 10 && values.length / unique.size > 0.1) {
|
||||
if (unique.size < values.length) {
|
||||
// TODO: sort by frequency
|
||||
return Array.from(unique)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Writer} writer
|
||||
* @param {DecodedArray} dictionary
|
||||
* @param {ParquetType} type
|
||||
* @param {boolean} compressed
|
||||
*/
|
||||
function writeDictionaryPage(writer, dictionary, type, compressed) {
|
||||
const dictionaryPage = new Writer()
|
||||
writePlain(dictionaryPage, dictionary, type)
|
||||
|
||||
// compress dictionary page data
|
||||
let compressedDictionaryPage = dictionaryPage
|
||||
if (compressed) {
|
||||
compressedDictionaryPage = new Writer()
|
||||
snappyCompress(compressedDictionaryPage, new Uint8Array(dictionaryPage.getBuffer()))
|
||||
}
|
||||
|
||||
// write dictionary page header
|
||||
/** @type {PageHeader} */
|
||||
const dictionaryHeader = {
|
||||
type: 'DICTIONARY_PAGE',
|
||||
uncompressed_page_size: dictionaryPage.offset,
|
||||
compressed_page_size: compressedDictionaryPage.offset,
|
||||
dictionary_page_header: {
|
||||
num_values: dictionary.length,
|
||||
encoding: 'PLAIN',
|
||||
},
|
||||
}
|
||||
writePageHeader(writer, dictionaryHeader)
|
||||
writer.appendBuffer(compressedDictionaryPage.getBuffer())
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Writer} writer
|
||||
* @param {SchemaElement[]} schemaPath
|
||||
* @param {DecodedArray} values
|
||||
* @returns {{ definition_levels_byte_length: number, repetition_levels_byte_length: number, num_nulls: number}}
|
||||
*/
|
||||
function writeLevels(writer, schemaPath, values) {
|
||||
let num_nulls = 0
|
||||
|
||||
// TODO: repetition levels
|
||||
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
|
||||
let repetition_levels_byte_length = 0
|
||||
if (maxRepetitionLevel) {
|
||||
repetition_levels_byte_length = writeRleBitPackedHybrid(writer, [])
|
||||
}
|
||||
|
||||
// definition levels
|
||||
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
|
||||
let definition_levels_byte_length = 0
|
||||
if (maxDefinitionLevel) {
|
||||
const definitionLevels = []
|
||||
for (const value of values) {
|
||||
if (value === null || value === undefined) {
|
||||
definitionLevels.push(maxDefinitionLevel - 1)
|
||||
num_nulls++
|
||||
} else {
|
||||
definitionLevels.push(maxDefinitionLevel)
|
||||
}
|
||||
}
|
||||
definition_levels_byte_length = writeRleBitPackedHybrid(writer, definitionLevels)
|
||||
}
|
||||
return { definition_levels_byte_length, repetition_levels_byte_length, num_nulls }
|
||||
}
|
||||
|
||||
@ -1,8 +1,9 @@
|
||||
import { Writer } from './writer.js'
|
||||
|
||||
/**
|
||||
* @import {DecodedArray} from 'hyparquet'
|
||||
* @param {Writer} writer
|
||||
* @param {number[]} values
|
||||
* @param {DecodedArray} values
|
||||
* @returns {number} bytes written
|
||||
*/
|
||||
export function writeRleBitPackedHybrid(writer, values) {
|
||||
@ -27,7 +28,7 @@ export function writeRleBitPackedHybrid(writer, values) {
|
||||
|
||||
/**
|
||||
* @param {Writer} writer
|
||||
* @param {number[]} values
|
||||
* @param {DecodedArray} values
|
||||
* @param {number} bitWidth
|
||||
*/
|
||||
function writeBitPacked(writer, values, bitWidth) {
|
||||
@ -87,7 +88,7 @@ function writeBitPacked(writer, values, bitWidth) {
|
||||
* Run-length encoding: write repeated values by encoding the value and its count.
|
||||
*
|
||||
* @param {Writer} writer
|
||||
* @param {number[]} values
|
||||
* @param {DecodedArray} values
|
||||
* @param {number} bitWidth
|
||||
*/
|
||||
function writeRle(writer, values, bitWidth) {
|
||||
|
||||
@ -74,10 +74,14 @@ describe('parquetWrite', () => {
|
||||
expect(file.byteLength).toBe(10135)
|
||||
})
|
||||
|
||||
it('efficiently represents column with few distinct values', () => {
|
||||
it('efficiently serializes column with few distinct values', async () => {
|
||||
const data = Array(10000).fill('aaaa')
|
||||
const file = parquetWrite({ columnData: [{ name: 'string', data }] })
|
||||
expect(file.byteLength).toBe(3908)
|
||||
expect(file.byteLength).toBe(161)
|
||||
// round trip
|
||||
const result = await parquetReadObjects({ file })
|
||||
expect(result.length).toBe(10000)
|
||||
expect(result[0]).toEqual({ string: 'aaaa' })
|
||||
})
|
||||
|
||||
it('serializes list types', async () => {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user