From 4da31a5f8380400f0d1988c2c77f58b05df225d4 Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Wed, 26 Nov 2025 02:31:10 -0800 Subject: [PATCH] Delta binary packed --- src/bytewriter.js | 13 +++ src/datapage.js | 8 +- src/delta.js | 205 +++++++++++++++++++++++++++++++++++++ src/types.d.ts | 4 +- test/delta.test.js | 247 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 472 insertions(+), 5 deletions(-) create mode 100644 src/delta.js create mode 100644 test/delta.test.js diff --git a/src/bytewriter.js b/src/bytewriter.js index 9124334..96eec05 100644 --- a/src/bytewriter.js +++ b/src/bytewriter.js @@ -151,3 +151,16 @@ ByteWriter.prototype.appendVarBigInt = function(value) { } } } + +/** + * Convert number to zigzag encoding and write as varint. + * + * @param {number | bigint} value + */ +ByteWriter.prototype.appendZigZag = function(value) { + if (typeof value === 'number') { + this.appendVarInt(value << 1 ^ value >> 31) + } else { + this.appendVarBigInt(value << 1n ^ value >> 63n) + } +} diff --git a/src/datapage.js b/src/datapage.js index 8717547..b9dca2f 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -10,7 +10,7 @@ import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' * @param {Writer} writer * @param {DecodedArray} values * @param {ColumnEncoder} column - * @param {import('hyparquet').Encoding} encoding + * @param {Encoding} encoding * @param {PageData} [listValues] */ export function writeDataPageV2(writer, values, column, encoding, listValues) { @@ -83,7 +83,7 @@ export function writeDataPageV2(writer, values, column, encoding, listValues) { * @param {PageHeader} header */ export function writePageHeader(writer, header) { - /** @type {import('../src/types.js').ThriftObject} */ + /** @type {ThriftObject} */ const compact = { field_1: PageTypes.indexOf(header.type), field_2: header.uncompressed_page_size, @@ -114,8 +114,8 @@ export function writePageHeader(writer, header) { } /** - * @import {DecodedArray, PageHeader, SchemaElement} from 'hyparquet' - * @import {ColumnEncoder, PageData, Writer} from '../src/types.js' + * @import {DecodedArray, Encoding, PageHeader} from 'hyparquet' + * @import {ColumnEncoder, PageData, ThriftObject, Writer} from '../src/types.js' * @param {Writer} writer * @param {ColumnEncoder} column * @param {DecodedArray} values diff --git a/src/delta.js b/src/delta.js new file mode 100644 index 0000000..ef28ccd --- /dev/null +++ b/src/delta.js @@ -0,0 +1,205 @@ +/** + * Delta Binary Packed encoding for parquet. + * Encodes integers as deltas with variable bit-width packing. + * + * @import {DecodedArray} from 'hyparquet' + * @import {Writer} from '../src/types.js' + */ + +const BLOCK_SIZE = 128 +const MINIBLOCKS_PER_BLOCK = 4 +const VALUES_PER_MINIBLOCK = BLOCK_SIZE / MINIBLOCKS_PER_BLOCK // 32 + +/** + * Write values using delta binary packed encoding. + * + * @param {Writer} writer + * @param {DecodedArray} values + */ +export function deltaBinaryPack(writer, values) { + const count = values.length + if (count === 0) { + // Write header with zero count + writer.appendVarInt(BLOCK_SIZE) + writer.appendVarInt(MINIBLOCKS_PER_BLOCK) + writer.appendVarInt(0) + writer.appendVarInt(0) + return + } + if (typeof values[0] !== 'number' && typeof values[0] !== 'bigint') { + throw new Error('deltaBinaryPack only supports number or bigint arrays') + } + + // Write header + writer.appendVarInt(BLOCK_SIZE) + writer.appendVarInt(MINIBLOCKS_PER_BLOCK) + writer.appendVarInt(count) + writer.appendZigZag(values[0]) + + // Process blocks + let index = 1 + while (index < count) { + const blockEnd = Math.min(index + BLOCK_SIZE, count) + const blockSize = blockEnd - index + + // Compute deltas for this block + const blockDeltas = new BigInt64Array(blockSize) + let minDelta = BigInt(values[index]) - BigInt(values[index - 1]) + blockDeltas[0] = minDelta + for (let i = 1; i < blockSize; i++) { + const delta = BigInt(values[index + i]) - BigInt(values[index + i - 1]) + blockDeltas[i] = delta + if (delta < minDelta) minDelta = delta + } + writer.appendZigZag(minDelta) + + // Calculate bit widths for each miniblock + const bitWidths = new Uint8Array(MINIBLOCKS_PER_BLOCK) + for (let mb = 0; mb < MINIBLOCKS_PER_BLOCK; mb++) { + const mbStart = mb * VALUES_PER_MINIBLOCK + const mbEnd = Math.min(mbStart + VALUES_PER_MINIBLOCK, blockSize) + + let maxAdjusted = 0n + for (let i = mbStart; i < mbEnd; i++) { + const adjusted = blockDeltas[i] - minDelta + if (adjusted > maxAdjusted) maxAdjusted = adjusted + } + bitWidths[mb] = bitWidth(maxAdjusted) + } + + // Write bit widths + writer.appendBytes(bitWidths) + + // Write packed miniblocks + for (let mb = 0; mb < MINIBLOCKS_PER_BLOCK; mb++) { + const bitWidth = bitWidths[mb] + if (bitWidth === 0) continue // No data needed for zero bit width + + const mbStart = mb * VALUES_PER_MINIBLOCK + const mbEnd = Math.min(mbStart + VALUES_PER_MINIBLOCK, blockSize) + + // Bit pack the adjusted deltas + let buffer = 0n + let bitsUsed = 0 + + for (let i = 0; i < VALUES_PER_MINIBLOCK; i++) { + const adjusted = mbStart + i < mbEnd ? blockDeltas[mbStart + i] - minDelta : 0n + buffer |= adjusted << BigInt(bitsUsed) + bitsUsed += bitWidth + + // Flush complete bytes + while (bitsUsed >= 8) { + writer.appendUint8(Number(buffer & 0xffn)) + buffer >>= 8n + bitsUsed -= 8 + } + } + // assert(bitsUsed === 0) // because multiple of 8 + } + + index = blockEnd + } +} + +/** + * Write byte arrays using delta length encoding. + * Encodes lengths using delta binary packed, then writes raw bytes. + * + * @param {Writer} writer + * @param {DecodedArray} values + */ +export function deltaLengthByteArray(writer, values) { + // Extract lengths + const lengths = new Int32Array(values.length) + for (let i = 0; i < values.length; i++) { + const value = values[i] + if (!(value instanceof Uint8Array)) { + throw new Error('deltaLengthByteArray expects Uint8Array values') + } + lengths[i] = value.length + } + + // Write delta-packed lengths + deltaBinaryPack(writer, lengths) + + // Write raw byte data + for (const value of values) { + writer.appendBytes(value) + } +} + +/** + * Write byte arrays using delta encoding with prefix compression. + * Stores common prefixes with previous value to improve compression. + * + * @param {Writer} writer + * @param {DecodedArray} values + */ +export function deltaByteArray(writer, values) { + if (values.length === 0) { + deltaBinaryPack(writer, []) + deltaBinaryPack(writer, []) + return + } + + // Calculate prefix lengths and suffixes + const prefixLengths = new Int32Array(values.length) + const suffixLengths = new Int32Array(values.length) + /** @type {Uint8Array[]} */ + const suffixes = new Array(values.length) + + // First value has no prefix + const value = values[0] + if (!(value instanceof Uint8Array)) { + throw new Error('deltaByteArray expects Uint8Array values') + } + prefixLengths[0] = 0 + suffixLengths[0] = values[0].length + suffixes[0] = values[0] + + for (let i = 1; i < values.length; i++) { + const prev = values[i - 1] + const curr = values[i] + if (!(curr instanceof Uint8Array)) { + throw new Error('deltaByteArray expects Uint8Array values') + } + + // Find common prefix length + let prefixLen = 0 + const maxPrefix = Math.min(prev.length, curr.length) + while (prefixLen < maxPrefix && prev[prefixLen] === curr[prefixLen]) { + prefixLen++ + } + + prefixLengths[i] = prefixLen + suffixLengths[i] = curr.length - prefixLen + suffixes[i] = curr.subarray(prefixLen) + } + + // Write delta-packed prefix lengths + deltaBinaryPack(writer, prefixLengths) + + // Write delta-packed suffix lengths + deltaBinaryPack(writer, suffixLengths) + + // Write suffix bytes + for (const suffix of suffixes) { + writer.appendBytes(suffix) + } +} + +/** + * Minimum bits needed to store value. + * + * @param {bigint} value + * @returns {number} + */ +function bitWidth(value) { + if (value === 0n) return 0 + let bits = 0 + while (value > 0n) { + bits++ + value >>= 1n + } + return bits +} diff --git a/src/types.d.ts b/src/types.d.ts index 829aef1..adbfd32 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -1,4 +1,4 @@ -import type { DecodedArray, KeyValue, SchemaElement } from 'hyparquet' +import type { DecodedArray, Encoding, KeyValue, SchemaElement } from 'hyparquet' // Superset of parquet types with automatic conversions export type BasicType = @@ -31,6 +31,7 @@ export interface ColumnSource { data: DecodedArray type?: BasicType nullable?: boolean + encoding?: Encoding } export interface PageData { @@ -65,6 +66,7 @@ export interface Writer { appendBytes(value: Uint8Array): void appendVarInt(value: number): void appendVarBigInt(value: bigint): void + appendZigZag(value: number | bigint): void } export type ThriftObject = { [ key: `field_${number}` ]: ThriftType } diff --git a/test/delta.test.js b/test/delta.test.js new file mode 100644 index 0000000..6515ae6 --- /dev/null +++ b/test/delta.test.js @@ -0,0 +1,247 @@ +import { describe, expect, it } from 'vitest' +import { ByteWriter } from '../src/bytewriter.js' +import { deltaBinaryPack, deltaByteArray, deltaLengthByteArray } from '../src/delta.js' +import { deltaBinaryUnpack, deltaByteArray as deltaByteArrayRead, deltaLengthByteArray as deltaLengthByteArrayRead } from 'hyparquet/src/delta.js' + +/** + * Round-trip test for deltaBinaryPack with Int32Array output. + * + * @param {number[]} values + * @returns {number[]} + */ +function roundTripInt32(values) { + const writer = new ByteWriter() + deltaBinaryPack(writer, values) + const buffer = writer.getBuffer() + const reader = { view: new DataView(buffer), offset: 0 } + + const output = new Int32Array(values.length) + deltaBinaryUnpack(reader, values.length, output) + return Array.from(output) +} + +/** + * Round-trip test for deltaBinaryPack with BigInt64Array output. + * + * @param {bigint[]} values + * @returns {bigint[]} + */ +function roundTripBigInt(values) { + const writer = new ByteWriter() + deltaBinaryPack(writer, values) + const buffer = writer.getBuffer() + const reader = { view: new DataView(buffer), offset: 0 } + + const output = new BigInt64Array(values.length) + deltaBinaryUnpack(reader, values.length, output) + return Array.from(output) +} + +/** + * Round-trip test for deltaLengthByteArray. + * + * @param {Uint8Array[]} values + * @returns {Uint8Array[]} + */ +function roundTripLengthByteArray(values) { + const writer = new ByteWriter() + deltaLengthByteArray(writer, values) + const buffer = writer.getBuffer() + const reader = { view: new DataView(buffer), offset: 0 } + + /** @type {Uint8Array[]} */ + const output = new Array(values.length) + deltaLengthByteArrayRead(reader, values.length, output) + return output +} + +/** + * Round-trip test for deltaByteArray. + * + * @param {Uint8Array[]} values + * @returns {Uint8Array[]} + */ +function roundTripByteArray(values) { + const writer = new ByteWriter() + deltaByteArray(writer, values) + const buffer = writer.getBuffer() + const reader = { view: new DataView(buffer), offset: 0 } + + /** @type {Uint8Array[]} */ + const output = new Array(values.length) + deltaByteArrayRead(reader, values.length, output) + return output +} + +describe('deltaBinaryPack', () => { + it('should round-trip empty array', () => { + const decoded = roundTripInt32([]) + expect(decoded).toEqual([]) + }) + + it('should round-trip single value', () => { + const decoded = roundTripInt32([42]) + expect(decoded).toEqual([42]) + }) + + it('should round-trip monotonically increasing values', () => { + const original = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + const decoded = roundTripInt32(original) + expect(decoded).toEqual(original) + }) + + it('should round-trip constant values', () => { + const original = Array(100).fill(42) + const decoded = roundTripInt32(original) + expect(decoded).toEqual(original) + }) + + it('should round-trip negative deltas', () => { + const original = [100, 90, 80, 70, 60, 50, 40, 30, 20, 10] + const decoded = roundTripInt32(original) + expect(decoded).toEqual(original) + }) + + it('should round-trip mixed deltas', () => { + const original = [0, 5, 3, 8, 2, 9, 1, 7, 4, 6] + const decoded = roundTripInt32(original) + expect(decoded).toEqual(original) + }) + + it('should round-trip values spanning multiple blocks', () => { + // More than 128 values to test multiple blocks + const original = Array.from({ length: 300 }, (_, i) => i * 2) + const decoded = roundTripInt32(original) + expect(decoded).toEqual(original) + }) + + it('should round-trip large values', () => { + const original = [1000000, 1000001, 1000002, 1000003] + const decoded = roundTripInt32(original) + expect(decoded).toEqual(original) + }) + + it('should round-trip negative values', () => { + const original = [-10, -5, 0, 5, 10] + const decoded = roundTripInt32(original) + expect(decoded).toEqual(original) + }) + + it('should round-trip bigint values', () => { + const original = [1n, 2n, 3n, 4n, 5n] + const decoded = roundTripBigInt(original) + expect(decoded).toEqual(original) + }) + + it('should round-trip large bigint values', () => { + const original = [10000000000n, 10000000001n, 10000000002n] + const decoded = roundTripBigInt(original) + expect(decoded).toEqual(original) + }) + + it('should round-trip random values', () => { + const original = Array.from({ length: 200 }, () => Math.floor(Math.random() * 10000)) + const decoded = roundTripInt32(original) + expect(decoded).toEqual(original) + }) + + it('should throw for unsupported types', () => { + const writer = new ByteWriter() + expect(() => deltaBinaryPack(writer, ['string'])).toThrow('deltaBinaryPack only supports number or bigint arrays') + }) + + it('should handle values requiring bit flush at end of miniblock', () => { + // Values with varying bit widths to exercise the bitsUsed > 0 flush path + const original = Array.from({ length: 32 }, (_, i) => i * 7) + const decoded = roundTripInt32(original) + expect(decoded).toEqual(original) + }) +}) + +describe('deltaLengthByteArray', () => { + it('should round-trip empty array', () => { + const decoded = roundTripLengthByteArray([]) + expect(decoded).toEqual([]) + }) + + it('should round-trip single byte array', () => { + const original = [new Uint8Array([1, 2, 3])] + const decoded = roundTripLengthByteArray(original) + expect(decoded.length).toBe(1) + expect(Array.from(decoded[0])).toEqual([1, 2, 3]) + }) + + it('should round-trip multiple byte arrays', () => { + const original = [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5]), + new Uint8Array([6, 7, 8, 9]), + ] + const decoded = roundTripLengthByteArray(original) + expect(decoded.length).toBe(3) + expect(Array.from(decoded[0])).toEqual([1, 2, 3]) + expect(Array.from(decoded[1])).toEqual([4, 5]) + expect(Array.from(decoded[2])).toEqual([6, 7, 8, 9]) + }) + + it('should round-trip strings as byte arrays', () => { + const encoder = new TextEncoder() + const original = ['hello', 'world', 'test'].map(s => encoder.encode(s)) + const decoded = roundTripLengthByteArray(original) + const decoder = new TextDecoder() + expect(decoded.map(d => decoder.decode(d))).toEqual(['hello', 'world', 'test']) + }) + + it('should throw for non-Uint8Array values', () => { + const writer = new ByteWriter() + expect(() => deltaLengthByteArray(writer, ['string'])).toThrow('deltaLengthByteArray expects Uint8Array values') + }) +}) + +describe('deltaByteArray', () => { + it('should round-trip empty array', () => { + const decoded = roundTripByteArray([]) + expect(decoded).toEqual([]) + }) + + it('should round-trip single byte array', () => { + const original = [new Uint8Array([1, 2, 3])] + const decoded = roundTripByteArray(original) + expect(decoded.length).toBe(1) + expect(Array.from(decoded[0])).toEqual([1, 2, 3]) + }) + + it('should round-trip arrays with common prefixes', () => { + const encoder = new TextEncoder() + const original = ['prefix_a', 'prefix_b', 'prefix_c'].map(s => encoder.encode(s)) + const decoded = roundTripByteArray(original) + const decoder = new TextDecoder() + expect(decoded.map(d => decoder.decode(d))).toEqual(['prefix_a', 'prefix_b', 'prefix_c']) + }) + + it('should round-trip arrays with no common prefix', () => { + const encoder = new TextEncoder() + const original = ['abc', 'xyz', '123'].map(s => encoder.encode(s)) + const decoded = roundTripByteArray(original) + const decoder = new TextDecoder() + expect(decoded.map(d => decoder.decode(d))).toEqual(['abc', 'xyz', '123']) + }) + + it('should round-trip sorted strings efficiently', () => { + const encoder = new TextEncoder() + const original = ['apple', 'application', 'apply', 'banana', 'bandana'].map(s => encoder.encode(s)) + const decoded = roundTripByteArray(original) + const decoder = new TextDecoder() + expect(decoded.map(d => decoder.decode(d))).toEqual(['apple', 'application', 'apply', 'banana', 'bandana']) + }) + + it('should throw for non-Uint8Array first value', () => { + const writer = new ByteWriter() + expect(() => deltaByteArray(writer, ['string'])).toThrow('deltaByteArray expects Uint8Array values') + }) + + it('should throw for non-Uint8Array subsequent value', () => { + const writer = new ByteWriter() + expect(() => deltaByteArray(writer, [new Uint8Array([1]), 'string'])).toThrow('deltaByteArray expects Uint8Array values') + }) +})