Byte split stream

This commit is contained in:
Kenny Daniel 2025-11-26 12:17:59 -08:00
parent c30f8fa8eb
commit 11efa2849a
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
6 changed files with 277 additions and 4 deletions

@ -52,7 +52,7 @@
"test": "vitest run"
},
"dependencies": {
"hyparquet": "1.22.0"
"hyparquet": "1.22.1"
},
"devDependencies": {
"@babel/eslint-parser": "7.28.5",

@ -4,6 +4,7 @@ import { deltaBinaryPack, deltaByteArray, deltaLengthByteArray } from './delta.j
import { writeRleBitPackedHybrid } from './encoding.js'
import { writePlain } from './plain.js'
import { snappyCompress } from './snappy.js'
import { writeByteStreamSplit } from './splitstream.js'
import { serializeTCompactProtocol } from './thrift.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
@ -62,6 +63,8 @@ export function writeDataPageV2(writer, values, column, encoding, listValues) {
throw new Error('DELTA_BYTE_ARRAY encoding only supported for BYTE_ARRAY type')
}
deltaByteArray(page, nonnull)
} else if (encoding === 'BYTE_STREAM_SPLIT') {
writeByteStreamSplit(page, nonnull, type, type_length)
} else {
throw new Error(`parquet unsupported encoding: ${encoding}`)
}

78
src/splitstream.js Normal file

@ -0,0 +1,78 @@
/**
* Write values using BYTE_STREAM_SPLIT encoding.
* This encoding writes all first bytes of values, then all second bytes, etc.
* Can improve compression for floating-point and fixed-width numeric data.
*
* @import {DecodedArray, ParquetType} from 'hyparquet'
* @import {Writer} from '../src/types.js'
* @param {Writer} writer
* @param {DecodedArray} values
* @param {ParquetType} type
* @param {number | undefined} typeLength
*/
export function writeByteStreamSplit(writer, values, type, typeLength) {
const count = values.length
// Get bytes from values based on type
/** @type {Uint8Array} */
let bytes
/** @type {number} */
let width
if (type === 'FLOAT') {
const typed = values instanceof Float32Array ? values : new Float32Array(numberArray(values))
bytes = new Uint8Array(typed.buffer, typed.byteOffset, typed.byteLength)
width = 4
} else if (type === 'DOUBLE') {
const typed = values instanceof Float64Array ? values : new Float64Array(numberArray(values))
bytes = new Uint8Array(typed.buffer, typed.byteOffset, typed.byteLength)
width = 8
} else if (type === 'INT32') {
const typed = values instanceof Int32Array ? values : new Int32Array(numberArray(values))
bytes = new Uint8Array(typed.buffer, typed.byteOffset, typed.byteLength)
width = 4
} else if (type === 'INT64') {
const typed = bigIntArray(values)
bytes = new Uint8Array(typed.buffer, typed.byteOffset, typed.byteLength)
width = 8
} else if (type === 'FIXED_LEN_BYTE_ARRAY') {
if (!typeLength) throw new Error('parquet byte_stream_split missing type_length')
width = typeLength
bytes = new Uint8Array(count * width)
for (let i = 0; i < count; i++) {
bytes.set(values[i], i * width)
}
} else {
throw new Error(`parquet byte_stream_split unsupported type: ${type}`)
}
// Write bytes in column format (all byte 0 from all values, then byte 1, etc.)
for (let b = 0; b < width; b++) {
for (let i = 0; i < count; i++) {
writer.appendUint8(bytes[i * width + b])
}
}
}
/**
* @param {DecodedArray} values
* @returns {number[]}
*/
function numberArray(values) {
if (Array.isArray(values) && values.every(v => typeof v === 'number')) {
return values
}
throw new Error('Expected number array for BYTE_STREAM_SPLIT encoding')
}
/**
* @param {DecodedArray} values
* @returns {BigInt64Array}
*/
function bigIntArray(values) {
if (values instanceof BigInt64Array) return values
if (Array.isArray(values) && values.every(v => typeof v === 'bigint')) {
return new BigInt64Array(values)
}
throw new Error('Expected bigint array for BYTE_STREAM_SPLIT encoding')
}

107
test/splitstream.test.js Normal file

@ -0,0 +1,107 @@
import { describe, expect, it } from 'vitest'
import { ByteWriter } from '../src/bytewriter.js'
import { writeByteStreamSplit } from '../src/splitstream.js'
import { byteStreamSplit } from 'hyparquet/src/encoding.js'
/**
* @import {DecodedArray, ParquetType} from 'hyparquet'
* @param {DecodedArray} values
* @param {ParquetType} type
* @param {number} [typeLength]
* @returns {DecodedArray}
*/
function roundTrip(values, type, typeLength) {
const writer = new ByteWriter()
writeByteStreamSplit(writer, values, type, typeLength)
const buffer = writer.getBuffer()
const reader = { view: new DataView(buffer), offset: 0 }
return byteStreamSplit(reader, values.length, type, typeLength)
}
describe('BYTE_STREAM_SPLIT encoding', () => {
describe('FLOAT', () => {
it('should round-trip float values', () => {
const original = [1.5, 2.25, 3.125, -4.5, 0.0, 100.75]
expect(Array.from(roundTrip(original, 'FLOAT'))).toEqual(original)
})
it('should round-trip an empty array', () => {
expect(Array.from(roundTrip([], 'FLOAT'))).toEqual([])
})
it('should round-trip special float values', () => {
const decoded = roundTrip([0.0, -0.0, Infinity, -Infinity], 'FLOAT')
expect(decoded[0]).toBe(0.0)
expect(decoded[1]).toBe(-0.0)
expect(decoded[2]).toBe(Infinity)
expect(decoded[3]).toBe(-Infinity)
})
})
describe('DOUBLE', () => {
it('should round-trip double values', () => {
const original = [1.5, 2.25, 3.125, -4.5, 0.0, 100.75, 1e100, -1e-100]
expect(Array.from(roundTrip(original, 'DOUBLE'))).toEqual(original)
})
it('should round-trip an empty array', () => {
expect(Array.from(roundTrip([], 'DOUBLE'))).toEqual([])
})
})
describe('INT32', () => {
it('should round-trip int32 values', () => {
const original = [1, 2, 3, -100, 0, 2147483647, -2147483648]
expect(Array.from(roundTrip(original, 'INT32'))).toEqual(original)
})
it('should round-trip an empty array', () => {
expect(Array.from(roundTrip([], 'INT32'))).toEqual([])
})
})
describe('INT64', () => {
it('should round-trip int64 values', () => {
const original = [1n, 2n, 3n, -100n, 0n, 9223372036854775807n, -9223372036854775808n]
expect(Array.from(roundTrip(original, 'INT64'))).toEqual(original)
})
it('should round-trip an empty array', () => {
expect(Array.from(roundTrip([], 'INT64'))).toEqual([])
})
})
describe('FIXED_LEN_BYTE_ARRAY', () => {
it('should round-trip fixed-length byte arrays', () => {
const original = [
new Uint8Array([1, 2, 3, 4]),
new Uint8Array([5, 6, 7, 8]),
new Uint8Array([9, 10, 11, 12]),
]
const decoded = roundTrip(original, 'FIXED_LEN_BYTE_ARRAY', 4)
expect(decoded).toHaveLength(3)
expect(Array.from(decoded[0])).toEqual([1, 2, 3, 4])
expect(Array.from(decoded[1])).toEqual([5, 6, 7, 8])
expect(Array.from(decoded[2])).toEqual([9, 10, 11, 12])
})
it('should round-trip an empty array', () => {
const decoded = roundTrip([], 'FIXED_LEN_BYTE_ARRAY', 4)
expect(Array.from(decoded)).toEqual([])
})
it('should throw without typeLength', () => {
const writer = new ByteWriter()
expect(() => writeByteStreamSplit(writer, [], 'FIXED_LEN_BYTE_ARRAY', undefined))
.toThrow('missing type_length')
})
})
describe('errors', () => {
it('should throw for unsupported type', () => {
const writer = new ByteWriter()
expect(() => writeByteStreamSplit(writer, [], 'BOOLEAN', undefined))
.toThrow('unsupported type')
})
})
})

@ -332,8 +332,11 @@ describe('parquetWriteBuffer', () => {
expect(result).toEqual(data.map(int => ({ int })))
})
it('throws for BYTE_STREAM_SPLIT encoding', () => {
expect(() => parquetWriteBuffer({ columnData: [{ name: 'float', data: [1.0, 2.0, 3.0], encoding: 'BYTE_STREAM_SPLIT' }] }))
.toThrow('parquet unsupported encoding: BYTE_STREAM_SPLIT')
it('writes BYTE_STREAM_SPLIT encoding', () => {
const file = parquetWriteBuffer({
columnData: [{ name: 'float', data: [1.0, 2.0, 3.0], encoding: 'BYTE_STREAM_SPLIT' }],
})
const metadata = parquetMetadata(file)
expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['BYTE_STREAM_SPLIT'])
})
})

@ -0,0 +1,82 @@
import { parquetMetadata, parquetReadObjects } from 'hyparquet'
import { describe, expect, it } from 'vitest'
import { parquetWriteBuffer } from '../src/index.js'
describe('BYTE_STREAM_SPLIT encoding', () => {
it('writes BYTE_STREAM_SPLIT encoding for FLOAT', async () => {
const data = [1.5, 2.25, 3.125, -4.5, 0.0, 100.75]
const file = parquetWriteBuffer({
columnData: [{ name: 'float', data, encoding: 'BYTE_STREAM_SPLIT' }],
})
const metadata = parquetMetadata(file)
const columnMetadata = metadata.row_groups[0].columns[0].meta_data
expect(columnMetadata?.encodings).toEqual(['BYTE_STREAM_SPLIT'])
const result = await parquetReadObjects({ file })
expect(result).toEqual(data.map(float => ({ float })))
})
it('writes BYTE_STREAM_SPLIT encoding for DOUBLE', async () => {
const data = [1.5, 2.25, 3.125, -4.5, 0.0, 100.75, 1e100, -1e-100]
const file = parquetWriteBuffer({
columnData: [{ name: 'double', data, type: 'DOUBLE', encoding: 'BYTE_STREAM_SPLIT' }],
})
const metadata = parquetMetadata(file)
const columnMetadata = metadata.row_groups[0].columns[0].meta_data
expect(columnMetadata?.encodings).toEqual(['BYTE_STREAM_SPLIT'])
const result = await parquetReadObjects({ file })
expect(result).toEqual(data.map(double => ({ double })))
})
it('writes BYTE_STREAM_SPLIT encoding for INT32', async () => {
const data = [1, 2, 3, -100, 0, 2147483647, -2147483648]
const file = parquetWriteBuffer({
columnData: [{ name: 'int', data, encoding: 'BYTE_STREAM_SPLIT' }],
})
const metadata = parquetMetadata(file)
const columnMetadata = metadata.row_groups[0].columns[0].meta_data
expect(columnMetadata?.encodings).toEqual(['BYTE_STREAM_SPLIT'])
const result = await parquetReadObjects({ file })
expect(result).toEqual(data.map(int => ({ int })))
})
it('writes BYTE_STREAM_SPLIT encoding for INT64', async () => {
const data = [1n, 2n, 3n, -100n, 0n, 9223372036854775807n, -9223372036854775808n]
const file = parquetWriteBuffer({
columnData: [{ name: 'bigint', data, encoding: 'BYTE_STREAM_SPLIT' }],
})
const metadata = parquetMetadata(file)
const columnMetadata = metadata.row_groups[0].columns[0].meta_data
expect(columnMetadata?.encodings).toEqual(['BYTE_STREAM_SPLIT'])
const result = await parquetReadObjects({ file })
expect(result).toEqual(data.map(bigint => ({ bigint })))
})
it('writes BYTE_STREAM_SPLIT encoding with nulls', async () => {
const data = [1.5, null, 3.125, null, 0.0, 100.75]
const file = parquetWriteBuffer({
columnData: [{ name: 'float', data, encoding: 'BYTE_STREAM_SPLIT' }],
})
const metadata = parquetMetadata(file)
const columnMetadata = metadata.row_groups[0].columns[0].meta_data
expect(columnMetadata?.encodings).toContain('BYTE_STREAM_SPLIT')
const result = await parquetReadObjects({ file })
expect(result).toEqual(data.map(float => ({ float })))
})
it('writes BYTE_STREAM_SPLIT encoding with compression', async () => {
const data = Array.from({ length: 1000 }, (_, i) => i * 0.1)
const file = parquetWriteBuffer({
columnData: [{ name: 'float', data, encoding: 'BYTE_STREAM_SPLIT' }],
compressed: true,
})
const metadata = parquetMetadata(file)
const columnMetadata = metadata.row_groups[0].columns[0].meta_data
expect(columnMetadata?.encodings).toEqual(['BYTE_STREAM_SPLIT'])
expect(columnMetadata?.codec).toBe('SNAPPY')
const result = await parquetReadObjects({ file })
expect(result.length).toBe(1000)
result.forEach((row, i) => {
expect(row.float).toBeCloseTo(i * 0.1, 5)
})
})
})