mirror of
https://github.com/asadbek064/hyparquet-writer.git
synced 2026-01-04 19:06:37 +00:00
Delta binary packed
This commit is contained in:
parent
5dadd5f7ef
commit
4da31a5f83
@ -151,3 +151,16 @@ ByteWriter.prototype.appendVarBigInt = function(value) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert number to zigzag encoding and write as varint.
|
||||
*
|
||||
* @param {number | bigint} value
|
||||
*/
|
||||
ByteWriter.prototype.appendZigZag = function(value) {
|
||||
if (typeof value === 'number') {
|
||||
this.appendVarInt(value << 1 ^ value >> 31)
|
||||
} else {
|
||||
this.appendVarBigInt(value << 1n ^ value >> 63n)
|
||||
}
|
||||
}
|
||||
|
||||
@ -10,7 +10,7 @@ import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
|
||||
* @param {Writer} writer
|
||||
* @param {DecodedArray} values
|
||||
* @param {ColumnEncoder} column
|
||||
* @param {import('hyparquet').Encoding} encoding
|
||||
* @param {Encoding} encoding
|
||||
* @param {PageData} [listValues]
|
||||
*/
|
||||
export function writeDataPageV2(writer, values, column, encoding, listValues) {
|
||||
@ -83,7 +83,7 @@ export function writeDataPageV2(writer, values, column, encoding, listValues) {
|
||||
* @param {PageHeader} header
|
||||
*/
|
||||
export function writePageHeader(writer, header) {
|
||||
/** @type {import('../src/types.js').ThriftObject} */
|
||||
/** @type {ThriftObject} */
|
||||
const compact = {
|
||||
field_1: PageTypes.indexOf(header.type),
|
||||
field_2: header.uncompressed_page_size,
|
||||
@ -114,8 +114,8 @@ export function writePageHeader(writer, header) {
|
||||
}
|
||||
|
||||
/**
|
||||
* @import {DecodedArray, PageHeader, SchemaElement} from 'hyparquet'
|
||||
* @import {ColumnEncoder, PageData, Writer} from '../src/types.js'
|
||||
* @import {DecodedArray, Encoding, PageHeader} from 'hyparquet'
|
||||
* @import {ColumnEncoder, PageData, ThriftObject, Writer} from '../src/types.js'
|
||||
* @param {Writer} writer
|
||||
* @param {ColumnEncoder} column
|
||||
* @param {DecodedArray} values
|
||||
|
||||
205
src/delta.js
Normal file
205
src/delta.js
Normal file
@ -0,0 +1,205 @@
|
||||
/**
|
||||
* Delta Binary Packed encoding for parquet.
|
||||
* Encodes integers as deltas with variable bit-width packing.
|
||||
*
|
||||
* @import {DecodedArray} from 'hyparquet'
|
||||
* @import {Writer} from '../src/types.js'
|
||||
*/
|
||||
|
||||
const BLOCK_SIZE = 128
|
||||
const MINIBLOCKS_PER_BLOCK = 4
|
||||
const VALUES_PER_MINIBLOCK = BLOCK_SIZE / MINIBLOCKS_PER_BLOCK // 32
|
||||
|
||||
/**
|
||||
* Write values using delta binary packed encoding.
|
||||
*
|
||||
* @param {Writer} writer
|
||||
* @param {DecodedArray} values
|
||||
*/
|
||||
export function deltaBinaryPack(writer, values) {
|
||||
const count = values.length
|
||||
if (count === 0) {
|
||||
// Write header with zero count
|
||||
writer.appendVarInt(BLOCK_SIZE)
|
||||
writer.appendVarInt(MINIBLOCKS_PER_BLOCK)
|
||||
writer.appendVarInt(0)
|
||||
writer.appendVarInt(0)
|
||||
return
|
||||
}
|
||||
if (typeof values[0] !== 'number' && typeof values[0] !== 'bigint') {
|
||||
throw new Error('deltaBinaryPack only supports number or bigint arrays')
|
||||
}
|
||||
|
||||
// Write header
|
||||
writer.appendVarInt(BLOCK_SIZE)
|
||||
writer.appendVarInt(MINIBLOCKS_PER_BLOCK)
|
||||
writer.appendVarInt(count)
|
||||
writer.appendZigZag(values[0])
|
||||
|
||||
// Process blocks
|
||||
let index = 1
|
||||
while (index < count) {
|
||||
const blockEnd = Math.min(index + BLOCK_SIZE, count)
|
||||
const blockSize = blockEnd - index
|
||||
|
||||
// Compute deltas for this block
|
||||
const blockDeltas = new BigInt64Array(blockSize)
|
||||
let minDelta = BigInt(values[index]) - BigInt(values[index - 1])
|
||||
blockDeltas[0] = minDelta
|
||||
for (let i = 1; i < blockSize; i++) {
|
||||
const delta = BigInt(values[index + i]) - BigInt(values[index + i - 1])
|
||||
blockDeltas[i] = delta
|
||||
if (delta < minDelta) minDelta = delta
|
||||
}
|
||||
writer.appendZigZag(minDelta)
|
||||
|
||||
// Calculate bit widths for each miniblock
|
||||
const bitWidths = new Uint8Array(MINIBLOCKS_PER_BLOCK)
|
||||
for (let mb = 0; mb < MINIBLOCKS_PER_BLOCK; mb++) {
|
||||
const mbStart = mb * VALUES_PER_MINIBLOCK
|
||||
const mbEnd = Math.min(mbStart + VALUES_PER_MINIBLOCK, blockSize)
|
||||
|
||||
let maxAdjusted = 0n
|
||||
for (let i = mbStart; i < mbEnd; i++) {
|
||||
const adjusted = blockDeltas[i] - minDelta
|
||||
if (adjusted > maxAdjusted) maxAdjusted = adjusted
|
||||
}
|
||||
bitWidths[mb] = bitWidth(maxAdjusted)
|
||||
}
|
||||
|
||||
// Write bit widths
|
||||
writer.appendBytes(bitWidths)
|
||||
|
||||
// Write packed miniblocks
|
||||
for (let mb = 0; mb < MINIBLOCKS_PER_BLOCK; mb++) {
|
||||
const bitWidth = bitWidths[mb]
|
||||
if (bitWidth === 0) continue // No data needed for zero bit width
|
||||
|
||||
const mbStart = mb * VALUES_PER_MINIBLOCK
|
||||
const mbEnd = Math.min(mbStart + VALUES_PER_MINIBLOCK, blockSize)
|
||||
|
||||
// Bit pack the adjusted deltas
|
||||
let buffer = 0n
|
||||
let bitsUsed = 0
|
||||
|
||||
for (let i = 0; i < VALUES_PER_MINIBLOCK; i++) {
|
||||
const adjusted = mbStart + i < mbEnd ? blockDeltas[mbStart + i] - minDelta : 0n
|
||||
buffer |= adjusted << BigInt(bitsUsed)
|
||||
bitsUsed += bitWidth
|
||||
|
||||
// Flush complete bytes
|
||||
while (bitsUsed >= 8) {
|
||||
writer.appendUint8(Number(buffer & 0xffn))
|
||||
buffer >>= 8n
|
||||
bitsUsed -= 8
|
||||
}
|
||||
}
|
||||
// assert(bitsUsed === 0) // because multiple of 8
|
||||
}
|
||||
|
||||
index = blockEnd
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write byte arrays using delta length encoding.
|
||||
* Encodes lengths using delta binary packed, then writes raw bytes.
|
||||
*
|
||||
* @param {Writer} writer
|
||||
* @param {DecodedArray} values
|
||||
*/
|
||||
export function deltaLengthByteArray(writer, values) {
|
||||
// Extract lengths
|
||||
const lengths = new Int32Array(values.length)
|
||||
for (let i = 0; i < values.length; i++) {
|
||||
const value = values[i]
|
||||
if (!(value instanceof Uint8Array)) {
|
||||
throw new Error('deltaLengthByteArray expects Uint8Array values')
|
||||
}
|
||||
lengths[i] = value.length
|
||||
}
|
||||
|
||||
// Write delta-packed lengths
|
||||
deltaBinaryPack(writer, lengths)
|
||||
|
||||
// Write raw byte data
|
||||
for (const value of values) {
|
||||
writer.appendBytes(value)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write byte arrays using delta encoding with prefix compression.
|
||||
* Stores common prefixes with previous value to improve compression.
|
||||
*
|
||||
* @param {Writer} writer
|
||||
* @param {DecodedArray} values
|
||||
*/
|
||||
export function deltaByteArray(writer, values) {
|
||||
if (values.length === 0) {
|
||||
deltaBinaryPack(writer, [])
|
||||
deltaBinaryPack(writer, [])
|
||||
return
|
||||
}
|
||||
|
||||
// Calculate prefix lengths and suffixes
|
||||
const prefixLengths = new Int32Array(values.length)
|
||||
const suffixLengths = new Int32Array(values.length)
|
||||
/** @type {Uint8Array[]} */
|
||||
const suffixes = new Array(values.length)
|
||||
|
||||
// First value has no prefix
|
||||
const value = values[0]
|
||||
if (!(value instanceof Uint8Array)) {
|
||||
throw new Error('deltaByteArray expects Uint8Array values')
|
||||
}
|
||||
prefixLengths[0] = 0
|
||||
suffixLengths[0] = values[0].length
|
||||
suffixes[0] = values[0]
|
||||
|
||||
for (let i = 1; i < values.length; i++) {
|
||||
const prev = values[i - 1]
|
||||
const curr = values[i]
|
||||
if (!(curr instanceof Uint8Array)) {
|
||||
throw new Error('deltaByteArray expects Uint8Array values')
|
||||
}
|
||||
|
||||
// Find common prefix length
|
||||
let prefixLen = 0
|
||||
const maxPrefix = Math.min(prev.length, curr.length)
|
||||
while (prefixLen < maxPrefix && prev[prefixLen] === curr[prefixLen]) {
|
||||
prefixLen++
|
||||
}
|
||||
|
||||
prefixLengths[i] = prefixLen
|
||||
suffixLengths[i] = curr.length - prefixLen
|
||||
suffixes[i] = curr.subarray(prefixLen)
|
||||
}
|
||||
|
||||
// Write delta-packed prefix lengths
|
||||
deltaBinaryPack(writer, prefixLengths)
|
||||
|
||||
// Write delta-packed suffix lengths
|
||||
deltaBinaryPack(writer, suffixLengths)
|
||||
|
||||
// Write suffix bytes
|
||||
for (const suffix of suffixes) {
|
||||
writer.appendBytes(suffix)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Minimum bits needed to store value.
|
||||
*
|
||||
* @param {bigint} value
|
||||
* @returns {number}
|
||||
*/
|
||||
function bitWidth(value) {
|
||||
if (value === 0n) return 0
|
||||
let bits = 0
|
||||
while (value > 0n) {
|
||||
bits++
|
||||
value >>= 1n
|
||||
}
|
||||
return bits
|
||||
}
|
||||
4
src/types.d.ts
vendored
4
src/types.d.ts
vendored
@ -1,4 +1,4 @@
|
||||
import type { DecodedArray, KeyValue, SchemaElement } from 'hyparquet'
|
||||
import type { DecodedArray, Encoding, KeyValue, SchemaElement } from 'hyparquet'
|
||||
|
||||
// Superset of parquet types with automatic conversions
|
||||
export type BasicType =
|
||||
@ -31,6 +31,7 @@ export interface ColumnSource {
|
||||
data: DecodedArray
|
||||
type?: BasicType
|
||||
nullable?: boolean
|
||||
encoding?: Encoding
|
||||
}
|
||||
|
||||
export interface PageData {
|
||||
@ -65,6 +66,7 @@ export interface Writer {
|
||||
appendBytes(value: Uint8Array): void
|
||||
appendVarInt(value: number): void
|
||||
appendVarBigInt(value: bigint): void
|
||||
appendZigZag(value: number | bigint): void
|
||||
}
|
||||
|
||||
export type ThriftObject = { [ key: `field_${number}` ]: ThriftType }
|
||||
|
||||
247
test/delta.test.js
Normal file
247
test/delta.test.js
Normal file
@ -0,0 +1,247 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import { ByteWriter } from '../src/bytewriter.js'
|
||||
import { deltaBinaryPack, deltaByteArray, deltaLengthByteArray } from '../src/delta.js'
|
||||
import { deltaBinaryUnpack, deltaByteArray as deltaByteArrayRead, deltaLengthByteArray as deltaLengthByteArrayRead } from 'hyparquet/src/delta.js'
|
||||
|
||||
/**
|
||||
* Round-trip test for deltaBinaryPack with Int32Array output.
|
||||
*
|
||||
* @param {number[]} values
|
||||
* @returns {number[]}
|
||||
*/
|
||||
function roundTripInt32(values) {
|
||||
const writer = new ByteWriter()
|
||||
deltaBinaryPack(writer, values)
|
||||
const buffer = writer.getBuffer()
|
||||
const reader = { view: new DataView(buffer), offset: 0 }
|
||||
|
||||
const output = new Int32Array(values.length)
|
||||
deltaBinaryUnpack(reader, values.length, output)
|
||||
return Array.from(output)
|
||||
}
|
||||
|
||||
/**
|
||||
* Round-trip test for deltaBinaryPack with BigInt64Array output.
|
||||
*
|
||||
* @param {bigint[]} values
|
||||
* @returns {bigint[]}
|
||||
*/
|
||||
function roundTripBigInt(values) {
|
||||
const writer = new ByteWriter()
|
||||
deltaBinaryPack(writer, values)
|
||||
const buffer = writer.getBuffer()
|
||||
const reader = { view: new DataView(buffer), offset: 0 }
|
||||
|
||||
const output = new BigInt64Array(values.length)
|
||||
deltaBinaryUnpack(reader, values.length, output)
|
||||
return Array.from(output)
|
||||
}
|
||||
|
||||
/**
|
||||
* Round-trip test for deltaLengthByteArray.
|
||||
*
|
||||
* @param {Uint8Array[]} values
|
||||
* @returns {Uint8Array[]}
|
||||
*/
|
||||
function roundTripLengthByteArray(values) {
|
||||
const writer = new ByteWriter()
|
||||
deltaLengthByteArray(writer, values)
|
||||
const buffer = writer.getBuffer()
|
||||
const reader = { view: new DataView(buffer), offset: 0 }
|
||||
|
||||
/** @type {Uint8Array[]} */
|
||||
const output = new Array(values.length)
|
||||
deltaLengthByteArrayRead(reader, values.length, output)
|
||||
return output
|
||||
}
|
||||
|
||||
/**
|
||||
* Round-trip test for deltaByteArray.
|
||||
*
|
||||
* @param {Uint8Array[]} values
|
||||
* @returns {Uint8Array[]}
|
||||
*/
|
||||
function roundTripByteArray(values) {
|
||||
const writer = new ByteWriter()
|
||||
deltaByteArray(writer, values)
|
||||
const buffer = writer.getBuffer()
|
||||
const reader = { view: new DataView(buffer), offset: 0 }
|
||||
|
||||
/** @type {Uint8Array[]} */
|
||||
const output = new Array(values.length)
|
||||
deltaByteArrayRead(reader, values.length, output)
|
||||
return output
|
||||
}
|
||||
|
||||
describe('deltaBinaryPack', () => {
|
||||
it('should round-trip empty array', () => {
|
||||
const decoded = roundTripInt32([])
|
||||
expect(decoded).toEqual([])
|
||||
})
|
||||
|
||||
it('should round-trip single value', () => {
|
||||
const decoded = roundTripInt32([42])
|
||||
expect(decoded).toEqual([42])
|
||||
})
|
||||
|
||||
it('should round-trip monotonically increasing values', () => {
|
||||
const original = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
|
||||
const decoded = roundTripInt32(original)
|
||||
expect(decoded).toEqual(original)
|
||||
})
|
||||
|
||||
it('should round-trip constant values', () => {
|
||||
const original = Array(100).fill(42)
|
||||
const decoded = roundTripInt32(original)
|
||||
expect(decoded).toEqual(original)
|
||||
})
|
||||
|
||||
it('should round-trip negative deltas', () => {
|
||||
const original = [100, 90, 80, 70, 60, 50, 40, 30, 20, 10]
|
||||
const decoded = roundTripInt32(original)
|
||||
expect(decoded).toEqual(original)
|
||||
})
|
||||
|
||||
it('should round-trip mixed deltas', () => {
|
||||
const original = [0, 5, 3, 8, 2, 9, 1, 7, 4, 6]
|
||||
const decoded = roundTripInt32(original)
|
||||
expect(decoded).toEqual(original)
|
||||
})
|
||||
|
||||
it('should round-trip values spanning multiple blocks', () => {
|
||||
// More than 128 values to test multiple blocks
|
||||
const original = Array.from({ length: 300 }, (_, i) => i * 2)
|
||||
const decoded = roundTripInt32(original)
|
||||
expect(decoded).toEqual(original)
|
||||
})
|
||||
|
||||
it('should round-trip large values', () => {
|
||||
const original = [1000000, 1000001, 1000002, 1000003]
|
||||
const decoded = roundTripInt32(original)
|
||||
expect(decoded).toEqual(original)
|
||||
})
|
||||
|
||||
it('should round-trip negative values', () => {
|
||||
const original = [-10, -5, 0, 5, 10]
|
||||
const decoded = roundTripInt32(original)
|
||||
expect(decoded).toEqual(original)
|
||||
})
|
||||
|
||||
it('should round-trip bigint values', () => {
|
||||
const original = [1n, 2n, 3n, 4n, 5n]
|
||||
const decoded = roundTripBigInt(original)
|
||||
expect(decoded).toEqual(original)
|
||||
})
|
||||
|
||||
it('should round-trip large bigint values', () => {
|
||||
const original = [10000000000n, 10000000001n, 10000000002n]
|
||||
const decoded = roundTripBigInt(original)
|
||||
expect(decoded).toEqual(original)
|
||||
})
|
||||
|
||||
it('should round-trip random values', () => {
|
||||
const original = Array.from({ length: 200 }, () => Math.floor(Math.random() * 10000))
|
||||
const decoded = roundTripInt32(original)
|
||||
expect(decoded).toEqual(original)
|
||||
})
|
||||
|
||||
it('should throw for unsupported types', () => {
|
||||
const writer = new ByteWriter()
|
||||
expect(() => deltaBinaryPack(writer, ['string'])).toThrow('deltaBinaryPack only supports number or bigint arrays')
|
||||
})
|
||||
|
||||
it('should handle values requiring bit flush at end of miniblock', () => {
|
||||
// Values with varying bit widths to exercise the bitsUsed > 0 flush path
|
||||
const original = Array.from({ length: 32 }, (_, i) => i * 7)
|
||||
const decoded = roundTripInt32(original)
|
||||
expect(decoded).toEqual(original)
|
||||
})
|
||||
})
|
||||
|
||||
describe('deltaLengthByteArray', () => {
|
||||
it('should round-trip empty array', () => {
|
||||
const decoded = roundTripLengthByteArray([])
|
||||
expect(decoded).toEqual([])
|
||||
})
|
||||
|
||||
it('should round-trip single byte array', () => {
|
||||
const original = [new Uint8Array([1, 2, 3])]
|
||||
const decoded = roundTripLengthByteArray(original)
|
||||
expect(decoded.length).toBe(1)
|
||||
expect(Array.from(decoded[0])).toEqual([1, 2, 3])
|
||||
})
|
||||
|
||||
it('should round-trip multiple byte arrays', () => {
|
||||
const original = [
|
||||
new Uint8Array([1, 2, 3]),
|
||||
new Uint8Array([4, 5]),
|
||||
new Uint8Array([6, 7, 8, 9]),
|
||||
]
|
||||
const decoded = roundTripLengthByteArray(original)
|
||||
expect(decoded.length).toBe(3)
|
||||
expect(Array.from(decoded[0])).toEqual([1, 2, 3])
|
||||
expect(Array.from(decoded[1])).toEqual([4, 5])
|
||||
expect(Array.from(decoded[2])).toEqual([6, 7, 8, 9])
|
||||
})
|
||||
|
||||
it('should round-trip strings as byte arrays', () => {
|
||||
const encoder = new TextEncoder()
|
||||
const original = ['hello', 'world', 'test'].map(s => encoder.encode(s))
|
||||
const decoded = roundTripLengthByteArray(original)
|
||||
const decoder = new TextDecoder()
|
||||
expect(decoded.map(d => decoder.decode(d))).toEqual(['hello', 'world', 'test'])
|
||||
})
|
||||
|
||||
it('should throw for non-Uint8Array values', () => {
|
||||
const writer = new ByteWriter()
|
||||
expect(() => deltaLengthByteArray(writer, ['string'])).toThrow('deltaLengthByteArray expects Uint8Array values')
|
||||
})
|
||||
})
|
||||
|
||||
describe('deltaByteArray', () => {
|
||||
it('should round-trip empty array', () => {
|
||||
const decoded = roundTripByteArray([])
|
||||
expect(decoded).toEqual([])
|
||||
})
|
||||
|
||||
it('should round-trip single byte array', () => {
|
||||
const original = [new Uint8Array([1, 2, 3])]
|
||||
const decoded = roundTripByteArray(original)
|
||||
expect(decoded.length).toBe(1)
|
||||
expect(Array.from(decoded[0])).toEqual([1, 2, 3])
|
||||
})
|
||||
|
||||
it('should round-trip arrays with common prefixes', () => {
|
||||
const encoder = new TextEncoder()
|
||||
const original = ['prefix_a', 'prefix_b', 'prefix_c'].map(s => encoder.encode(s))
|
||||
const decoded = roundTripByteArray(original)
|
||||
const decoder = new TextDecoder()
|
||||
expect(decoded.map(d => decoder.decode(d))).toEqual(['prefix_a', 'prefix_b', 'prefix_c'])
|
||||
})
|
||||
|
||||
it('should round-trip arrays with no common prefix', () => {
|
||||
const encoder = new TextEncoder()
|
||||
const original = ['abc', 'xyz', '123'].map(s => encoder.encode(s))
|
||||
const decoded = roundTripByteArray(original)
|
||||
const decoder = new TextDecoder()
|
||||
expect(decoded.map(d => decoder.decode(d))).toEqual(['abc', 'xyz', '123'])
|
||||
})
|
||||
|
||||
it('should round-trip sorted strings efficiently', () => {
|
||||
const encoder = new TextEncoder()
|
||||
const original = ['apple', 'application', 'apply', 'banana', 'bandana'].map(s => encoder.encode(s))
|
||||
const decoded = roundTripByteArray(original)
|
||||
const decoder = new TextDecoder()
|
||||
expect(decoded.map(d => decoder.decode(d))).toEqual(['apple', 'application', 'apply', 'banana', 'bandana'])
|
||||
})
|
||||
|
||||
it('should throw for non-Uint8Array first value', () => {
|
||||
const writer = new ByteWriter()
|
||||
expect(() => deltaByteArray(writer, ['string'])).toThrow('deltaByteArray expects Uint8Array values')
|
||||
})
|
||||
|
||||
it('should throw for non-Uint8Array subsequent value', () => {
|
||||
const writer = new ByteWriter()
|
||||
expect(() => deltaByteArray(writer, [new Uint8Array([1]), 'string'])).toThrow('deltaByteArray expects Uint8Array values')
|
||||
})
|
||||
})
|
||||
Loading…
Reference in New Issue
Block a user