User encoding options

This commit is contained in:
Kenny Daniel 2025-11-26 02:42:39 -08:00
parent 4da31a5f83
commit c30f8fa8eb
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
6 changed files with 130 additions and 9 deletions

@ -14,7 +14,7 @@ import { unconvert } from './unconvert.js'
* @returns {ColumnChunk}
*/
export function writeColumn(writer, column, values, stats) {
const { columnName, element, schemaPath, compressed } = column
const { columnName, element, schemaPath, compressed, encoding: userEncoding } = column
const { type } = element
if (!type) throw new Error(`column ${columnName} cannot determine type`)
const offsetStart = writer.offset
@ -42,8 +42,7 @@ export function writeColumn(writer, column, values, stats) {
// dictionary encoding
let dictionary_page_offset
let data_page_offset = BigInt(writer.offset)
/** @type {DecodedArray | undefined} */
const dictionary = useDictionary(values, type)
const dictionary = useDictionary(values, type, userEncoding)
if (dictionary) {
dictionary_page_offset = BigInt(writer.offset)
@ -68,7 +67,7 @@ export function writeColumn(writer, column, values, stats) {
values = unconvert(element, values)
// write data page
const encoding = type === 'BOOLEAN' && values.length > 16 ? 'RLE' : 'PLAIN'
const encoding = userEncoding ?? (type === 'BOOLEAN' && values.length > 16 ? 'RLE' : 'PLAIN')
writeDataPageV2(writer, values, column, encoding, pageData)
encodings.push(encoding)
}
@ -94,9 +93,11 @@ export function writeColumn(writer, column, values, stats) {
/**
* @param {DecodedArray} values
* @param {ParquetType} type
* @param {Encoding | undefined} encoding
* @returns {any[] | undefined}
*/
function useDictionary(values, type) {
function useDictionary(values, type, encoding) {
if (encoding && encoding !== 'RLE_DICTIONARY') return
if (type === 'BOOLEAN') return
const unique = new Set(values)
unique.delete(undefined)
@ -140,7 +141,7 @@ function writeDictionaryPage(writer, column, dictionary) {
}
/**
* @import {ColumnChunk, ColumnMetaData, DecodedArray, Encoding, ParquetType, SchemaElement, Statistics} from 'hyparquet'
* @import {ColumnChunk, DecodedArray, Encoding, ParquetType, SchemaElement, Statistics} from 'hyparquet'
* @import {ColumnEncoder, PageData, Writer} from '../src/types.js'
* @param {DecodedArray} values
* @returns {Statistics}

@ -1,5 +1,6 @@
import { Encodings, PageTypes } from 'hyparquet/src/constants.js'
import { ByteWriter } from './bytewriter.js'
import { deltaBinaryPack, deltaByteArray, deltaLengthByteArray } from './delta.js'
import { writeRleBitPackedHybrid } from './encoding.js'
import { writePlain } from './plain.js'
import { snappyCompress } from './snappy.js'
@ -33,7 +34,9 @@ export function writeDataPageV2(writer, values, column, encoding, listValues) {
// write page data to temp buffer
const page = new ByteWriter()
if (encoding === 'RLE') {
if (encoding === 'PLAIN') {
writePlain(page, nonnull, type, type_length)
} else if (encoding === 'RLE') {
if (type !== 'BOOLEAN') throw new Error('RLE encoding only supported for BOOLEAN type')
page.appendUint32(nonnull.length) // prepend length
writeRleBitPackedHybrid(page, nonnull, 1)
@ -44,8 +47,23 @@ export function writeDataPageV2(writer, values, column, encoding, listValues) {
const bitWidth = Math.ceil(Math.log2(maxValue + 1))
page.appendUint8(bitWidth) // prepend bitWidth
writeRleBitPackedHybrid(page, nonnull, bitWidth)
} else if (encoding === 'DELTA_BINARY_PACKED') {
if (type !== 'INT32' && type !== 'INT64') {
throw new Error('DELTA_BINARY_PACKED encoding only supported for INT32 and INT64 types')
}
deltaBinaryPack(page, nonnull)
} else if (encoding === 'DELTA_LENGTH_BYTE_ARRAY') {
if (type !== 'BYTE_ARRAY') {
throw new Error('DELTA_LENGTH_BYTE_ARRAY encoding only supported for BYTE_ARRAY type')
}
deltaLengthByteArray(page, nonnull)
} else if (encoding === 'DELTA_BYTE_ARRAY') {
if (type !== 'BYTE_ARRAY') {
throw new Error('DELTA_BYTE_ARRAY encoding only supported for BYTE_ARRAY type')
}
deltaByteArray(page, nonnull)
} else {
writePlain(page, nonnull, type, type_length)
throw new Error(`parquet unsupported encoding: ${encoding}`)
}
// compress page data

@ -48,7 +48,7 @@ ParquetWriter.prototype.write = function({ columnData, rowGroupSize = 100000 })
// write columns
for (let j = 0; j < columnData.length; j++) {
const { name, data } = columnData[j]
const { name, data, encoding } = columnData[j]
const groupData = data.slice(groupStartIndex, groupStartIndex + groupSize)
const schemaTree = getSchemaPath(this.schema, [name])
@ -72,6 +72,7 @@ ParquetWriter.prototype.write = function({ columnData, rowGroupSize = 100000 })
element,
schemaPath,
compressed: this.compressed,
encoding,
}
const columnChunk = writeColumn(

1
src/types.d.ts vendored

@ -46,6 +46,7 @@ export interface ColumnEncoder {
element: SchemaElement
schemaPath: SchemaElement[]
compressed: boolean
encoding?: Encoding // user-specified encoding
}
export interface Writer {

@ -321,4 +321,19 @@ describe('parquetWriteBuffer', () => {
expect(() => parquetWriteBuffer({ columnData: [{ name: 'func', data: [() => {}] }] }))
.toThrow('cannot determine parquet type for: () => {}')
})
it('skips dictionary encoding when encoding is specified', async () => {
// This data would normally use dictionary encoding due to low cardinality
const data = Array(1000).fill(1).map((_, i) => i % 10)
const file = parquetWriteBuffer({ columnData: [{ name: 'int', data, encoding: 'PLAIN' }] })
const metadata = parquetMetadata(file)
expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['PLAIN'])
const result = await parquetReadObjects({ file })
expect(result).toEqual(data.map(int => ({ int })))
})
it('throws for BYTE_STREAM_SPLIT encoding', () => {
expect(() => parquetWriteBuffer({ columnData: [{ name: 'float', data: [1.0, 2.0, 3.0], encoding: 'BYTE_STREAM_SPLIT' }] }))
.toThrow('parquet unsupported encoding: BYTE_STREAM_SPLIT')
})
})

85
test/write.delta.test.js Normal file

@ -0,0 +1,85 @@
import { parquetMetadata, parquetReadObjects } from 'hyparquet'
import { describe, expect, it } from 'vitest'
import { parquetWriteBuffer } from '../src/index.js'
describe('DELTA_BINARY_PACKED encoding', () => {
it('writes DELTA_BINARY_PACKED encoding for INT32', async () => {
const data = [1, 2, 3, 100, 200, 300]
const file = parquetWriteBuffer({
columnData: [{ name: 'int', data, encoding: 'DELTA_BINARY_PACKED' }],
})
const metadata = parquetMetadata(file)
expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['DELTA_BINARY_PACKED'])
const result = await parquetReadObjects({ file })
expect(result).toEqual(data.map(int => ({ int })))
})
it('writes DELTA_BINARY_PACKED encoding for INT64', async () => {
const data = [1n, 2n, 3n, 100n, 200n, 300n]
const file = parquetWriteBuffer({
columnData: [{ name: 'bigint', data, encoding: 'DELTA_BINARY_PACKED' }],
})
const metadata = parquetMetadata(file)
expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['DELTA_BINARY_PACKED'])
const result = await parquetReadObjects({ file })
expect(result).toEqual(data.map(bigint => ({ bigint })))
})
})
describe('DELTA_LENGTH_BYTE_ARRAY encoding', () => {
it('writes DELTA_LENGTH_BYTE_ARRAY encoding for strings', async () => {
const data = ['hello', 'world', 'foo', 'bar', 'baz', 'qux']
const file = parquetWriteBuffer({
columnData: [{ name: 'string', data, encoding: 'DELTA_LENGTH_BYTE_ARRAY' }],
})
const metadata = parquetMetadata(file)
expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['DELTA_LENGTH_BYTE_ARRAY'])
const result = await parquetReadObjects({ file })
expect(result).toEqual(data.map(string => ({ string })))
})
it('writes DELTA_LENGTH_BYTE_ARRAY encoding for byte arrays', async () => {
const data = [
Uint8Array.of(1, 2, 3),
Uint8Array.of(4, 5, 6, 7),
Uint8Array.of(8, 9),
Uint8Array.of(10, 11, 12, 13, 14),
]
const file = parquetWriteBuffer({
columnData: [{ name: 'bytes', data, encoding: 'DELTA_LENGTH_BYTE_ARRAY' }],
})
const metadata = parquetMetadata(file)
expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['DELTA_LENGTH_BYTE_ARRAY'])
const result = await parquetReadObjects({ file, utf8: false })
expect(result).toEqual(data.map(bytes => ({ bytes })))
})
})
describe('DELTA_BYTE_ARRAY encoding', () => {
it('writes DELTA_BYTE_ARRAY encoding for strings with common prefixes', async () => {
const data = ['apple', 'application', 'apply', 'banana', 'band', 'bandana']
const file = parquetWriteBuffer({
columnData: [{ name: 'string', data, encoding: 'DELTA_BYTE_ARRAY' }],
})
const metadata = parquetMetadata(file)
expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['DELTA_BYTE_ARRAY'])
const result = await parquetReadObjects({ file })
expect(result).toEqual(data.map(string => ({ string })))
})
it('writes DELTA_BYTE_ARRAY encoding for byte arrays', async () => {
const data = [
Uint8Array.of(1, 2, 3, 4),
Uint8Array.of(1, 2, 5, 6),
Uint8Array.of(1, 2, 7, 8),
Uint8Array.of(10, 11, 12, 13),
]
const file = parquetWriteBuffer({
columnData: [{ name: 'bytes', data, encoding: 'DELTA_BYTE_ARRAY' }],
})
const metadata = parquetMetadata(file)
expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['DELTA_BYTE_ARRAY'])
const result = await parquetReadObjects({ file, utf8: false })
expect(result).toEqual(data.map(bytes => ({ bytes })))
})
})