diff --git a/package.json b/package.json index 36c88c6..9400101 100644 --- a/package.json +++ b/package.json @@ -52,7 +52,7 @@ "test": "vitest run" }, "dependencies": { - "hyparquet": "1.22.0" + "hyparquet": "1.22.1" }, "devDependencies": { "@babel/eslint-parser": "7.28.5", diff --git a/src/datapage.js b/src/datapage.js index 7069eed..2654ded 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -4,6 +4,7 @@ import { deltaBinaryPack, deltaByteArray, deltaLengthByteArray } from './delta.j import { writeRleBitPackedHybrid } from './encoding.js' import { writePlain } from './plain.js' import { snappyCompress } from './snappy.js' +import { writeByteStreamSplit } from './splitstream.js' import { serializeTCompactProtocol } from './thrift.js' import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' @@ -62,6 +63,8 @@ export function writeDataPageV2(writer, values, column, encoding, listValues) { throw new Error('DELTA_BYTE_ARRAY encoding only supported for BYTE_ARRAY type') } deltaByteArray(page, nonnull) + } else if (encoding === 'BYTE_STREAM_SPLIT') { + writeByteStreamSplit(page, nonnull, type, type_length) } else { throw new Error(`parquet unsupported encoding: ${encoding}`) } diff --git a/src/splitstream.js b/src/splitstream.js new file mode 100644 index 0000000..c63bf67 --- /dev/null +++ b/src/splitstream.js @@ -0,0 +1,78 @@ +/** + * Write values using BYTE_STREAM_SPLIT encoding. + * This encoding writes all first bytes of values, then all second bytes, etc. + * Can improve compression for floating-point and fixed-width numeric data. + * + * @import {DecodedArray, ParquetType} from 'hyparquet' + * @import {Writer} from '../src/types.js' + * @param {Writer} writer + * @param {DecodedArray} values + * @param {ParquetType} type + * @param {number | undefined} typeLength + */ +export function writeByteStreamSplit(writer, values, type, typeLength) { + const count = values.length + + // Get bytes from values based on type + /** @type {Uint8Array} */ + let bytes + /** @type {number} */ + let width + if (type === 'FLOAT') { + const typed = values instanceof Float32Array ? values : new Float32Array(numberArray(values)) + bytes = new Uint8Array(typed.buffer, typed.byteOffset, typed.byteLength) + width = 4 + } else if (type === 'DOUBLE') { + const typed = values instanceof Float64Array ? values : new Float64Array(numberArray(values)) + bytes = new Uint8Array(typed.buffer, typed.byteOffset, typed.byteLength) + width = 8 + } else if (type === 'INT32') { + const typed = values instanceof Int32Array ? values : new Int32Array(numberArray(values)) + bytes = new Uint8Array(typed.buffer, typed.byteOffset, typed.byteLength) + width = 4 + } else if (type === 'INT64') { + const typed = bigIntArray(values) + bytes = new Uint8Array(typed.buffer, typed.byteOffset, typed.byteLength) + width = 8 + } else if (type === 'FIXED_LEN_BYTE_ARRAY') { + if (!typeLength) throw new Error('parquet byte_stream_split missing type_length') + width = typeLength + bytes = new Uint8Array(count * width) + for (let i = 0; i < count; i++) { + bytes.set(values[i], i * width) + } + } else { + throw new Error(`parquet byte_stream_split unsupported type: ${type}`) + } + + // Write bytes in column format (all byte 0 from all values, then byte 1, etc.) + for (let b = 0; b < width; b++) { + for (let i = 0; i < count; i++) { + writer.appendUint8(bytes[i * width + b]) + } + } +} + +/** + * @param {DecodedArray} values + * @returns {number[]} + */ +function numberArray(values) { + if (Array.isArray(values) && values.every(v => typeof v === 'number')) { + return values + } + throw new Error('Expected number array for BYTE_STREAM_SPLIT encoding') +} + +/** + * @param {DecodedArray} values + * @returns {BigInt64Array} + */ +function bigIntArray(values) { + if (values instanceof BigInt64Array) return values + if (Array.isArray(values) && values.every(v => typeof v === 'bigint')) { + return new BigInt64Array(values) + } + throw new Error('Expected bigint array for BYTE_STREAM_SPLIT encoding') +} + diff --git a/test/splitstream.test.js b/test/splitstream.test.js new file mode 100644 index 0000000..fa7de70 --- /dev/null +++ b/test/splitstream.test.js @@ -0,0 +1,107 @@ +import { describe, expect, it } from 'vitest' +import { ByteWriter } from '../src/bytewriter.js' +import { writeByteStreamSplit } from '../src/splitstream.js' +import { byteStreamSplit } from 'hyparquet/src/encoding.js' + +/** + * @import {DecodedArray, ParquetType} from 'hyparquet' + * @param {DecodedArray} values + * @param {ParquetType} type + * @param {number} [typeLength] + * @returns {DecodedArray} + */ +function roundTrip(values, type, typeLength) { + const writer = new ByteWriter() + writeByteStreamSplit(writer, values, type, typeLength) + const buffer = writer.getBuffer() + const reader = { view: new DataView(buffer), offset: 0 } + return byteStreamSplit(reader, values.length, type, typeLength) +} + +describe('BYTE_STREAM_SPLIT encoding', () => { + describe('FLOAT', () => { + it('should round-trip float values', () => { + const original = [1.5, 2.25, 3.125, -4.5, 0.0, 100.75] + expect(Array.from(roundTrip(original, 'FLOAT'))).toEqual(original) + }) + + it('should round-trip an empty array', () => { + expect(Array.from(roundTrip([], 'FLOAT'))).toEqual([]) + }) + + it('should round-trip special float values', () => { + const decoded = roundTrip([0.0, -0.0, Infinity, -Infinity], 'FLOAT') + expect(decoded[0]).toBe(0.0) + expect(decoded[1]).toBe(-0.0) + expect(decoded[2]).toBe(Infinity) + expect(decoded[3]).toBe(-Infinity) + }) + }) + + describe('DOUBLE', () => { + it('should round-trip double values', () => { + const original = [1.5, 2.25, 3.125, -4.5, 0.0, 100.75, 1e100, -1e-100] + expect(Array.from(roundTrip(original, 'DOUBLE'))).toEqual(original) + }) + + it('should round-trip an empty array', () => { + expect(Array.from(roundTrip([], 'DOUBLE'))).toEqual([]) + }) + }) + + describe('INT32', () => { + it('should round-trip int32 values', () => { + const original = [1, 2, 3, -100, 0, 2147483647, -2147483648] + expect(Array.from(roundTrip(original, 'INT32'))).toEqual(original) + }) + + it('should round-trip an empty array', () => { + expect(Array.from(roundTrip([], 'INT32'))).toEqual([]) + }) + }) + + describe('INT64', () => { + it('should round-trip int64 values', () => { + const original = [1n, 2n, 3n, -100n, 0n, 9223372036854775807n, -9223372036854775808n] + expect(Array.from(roundTrip(original, 'INT64'))).toEqual(original) + }) + + it('should round-trip an empty array', () => { + expect(Array.from(roundTrip([], 'INT64'))).toEqual([]) + }) + }) + + describe('FIXED_LEN_BYTE_ARRAY', () => { + it('should round-trip fixed-length byte arrays', () => { + const original = [ + new Uint8Array([1, 2, 3, 4]), + new Uint8Array([5, 6, 7, 8]), + new Uint8Array([9, 10, 11, 12]), + ] + const decoded = roundTrip(original, 'FIXED_LEN_BYTE_ARRAY', 4) + expect(decoded).toHaveLength(3) + expect(Array.from(decoded[0])).toEqual([1, 2, 3, 4]) + expect(Array.from(decoded[1])).toEqual([5, 6, 7, 8]) + expect(Array.from(decoded[2])).toEqual([9, 10, 11, 12]) + }) + + it('should round-trip an empty array', () => { + const decoded = roundTrip([], 'FIXED_LEN_BYTE_ARRAY', 4) + expect(Array.from(decoded)).toEqual([]) + }) + + it('should throw without typeLength', () => { + const writer = new ByteWriter() + expect(() => writeByteStreamSplit(writer, [], 'FIXED_LEN_BYTE_ARRAY', undefined)) + .toThrow('missing type_length') + }) + }) + + describe('errors', () => { + it('should throw for unsupported type', () => { + const writer = new ByteWriter() + expect(() => writeByteStreamSplit(writer, [], 'BOOLEAN', undefined)) + .toThrow('unsupported type') + }) + }) +}) diff --git a/test/write.buffer.test.js b/test/write.buffer.test.js index a8ad221..2b4dffd 100644 --- a/test/write.buffer.test.js +++ b/test/write.buffer.test.js @@ -332,8 +332,11 @@ describe('parquetWriteBuffer', () => { expect(result).toEqual(data.map(int => ({ int }))) }) - it('throws for BYTE_STREAM_SPLIT encoding', () => { - expect(() => parquetWriteBuffer({ columnData: [{ name: 'float', data: [1.0, 2.0, 3.0], encoding: 'BYTE_STREAM_SPLIT' }] })) - .toThrow('parquet unsupported encoding: BYTE_STREAM_SPLIT') + it('writes BYTE_STREAM_SPLIT encoding', () => { + const file = parquetWriteBuffer({ + columnData: [{ name: 'float', data: [1.0, 2.0, 3.0], encoding: 'BYTE_STREAM_SPLIT' }], + }) + const metadata = parquetMetadata(file) + expect(metadata.row_groups[0].columns[0].meta_data?.encodings).toEqual(['BYTE_STREAM_SPLIT']) }) }) diff --git a/test/write.splitstream.test.js b/test/write.splitstream.test.js new file mode 100644 index 0000000..dd2c7d6 --- /dev/null +++ b/test/write.splitstream.test.js @@ -0,0 +1,82 @@ +import { parquetMetadata, parquetReadObjects } from 'hyparquet' +import { describe, expect, it } from 'vitest' +import { parquetWriteBuffer } from '../src/index.js' + +describe('BYTE_STREAM_SPLIT encoding', () => { + it('writes BYTE_STREAM_SPLIT encoding for FLOAT', async () => { + const data = [1.5, 2.25, 3.125, -4.5, 0.0, 100.75] + const file = parquetWriteBuffer({ + columnData: [{ name: 'float', data, encoding: 'BYTE_STREAM_SPLIT' }], + }) + const metadata = parquetMetadata(file) + const columnMetadata = metadata.row_groups[0].columns[0].meta_data + expect(columnMetadata?.encodings).toEqual(['BYTE_STREAM_SPLIT']) + const result = await parquetReadObjects({ file }) + expect(result).toEqual(data.map(float => ({ float }))) + }) + + it('writes BYTE_STREAM_SPLIT encoding for DOUBLE', async () => { + const data = [1.5, 2.25, 3.125, -4.5, 0.0, 100.75, 1e100, -1e-100] + const file = parquetWriteBuffer({ + columnData: [{ name: 'double', data, type: 'DOUBLE', encoding: 'BYTE_STREAM_SPLIT' }], + }) + const metadata = parquetMetadata(file) + const columnMetadata = metadata.row_groups[0].columns[0].meta_data + expect(columnMetadata?.encodings).toEqual(['BYTE_STREAM_SPLIT']) + const result = await parquetReadObjects({ file }) + expect(result).toEqual(data.map(double => ({ double }))) + }) + + it('writes BYTE_STREAM_SPLIT encoding for INT32', async () => { + const data = [1, 2, 3, -100, 0, 2147483647, -2147483648] + const file = parquetWriteBuffer({ + columnData: [{ name: 'int', data, encoding: 'BYTE_STREAM_SPLIT' }], + }) + const metadata = parquetMetadata(file) + const columnMetadata = metadata.row_groups[0].columns[0].meta_data + expect(columnMetadata?.encodings).toEqual(['BYTE_STREAM_SPLIT']) + const result = await parquetReadObjects({ file }) + expect(result).toEqual(data.map(int => ({ int }))) + }) + + it('writes BYTE_STREAM_SPLIT encoding for INT64', async () => { + const data = [1n, 2n, 3n, -100n, 0n, 9223372036854775807n, -9223372036854775808n] + const file = parquetWriteBuffer({ + columnData: [{ name: 'bigint', data, encoding: 'BYTE_STREAM_SPLIT' }], + }) + const metadata = parquetMetadata(file) + const columnMetadata = metadata.row_groups[0].columns[0].meta_data + expect(columnMetadata?.encodings).toEqual(['BYTE_STREAM_SPLIT']) + const result = await parquetReadObjects({ file }) + expect(result).toEqual(data.map(bigint => ({ bigint }))) + }) + + it('writes BYTE_STREAM_SPLIT encoding with nulls', async () => { + const data = [1.5, null, 3.125, null, 0.0, 100.75] + const file = parquetWriteBuffer({ + columnData: [{ name: 'float', data, encoding: 'BYTE_STREAM_SPLIT' }], + }) + const metadata = parquetMetadata(file) + const columnMetadata = metadata.row_groups[0].columns[0].meta_data + expect(columnMetadata?.encodings).toContain('BYTE_STREAM_SPLIT') + const result = await parquetReadObjects({ file }) + expect(result).toEqual(data.map(float => ({ float }))) + }) + + it('writes BYTE_STREAM_SPLIT encoding with compression', async () => { + const data = Array.from({ length: 1000 }, (_, i) => i * 0.1) + const file = parquetWriteBuffer({ + columnData: [{ name: 'float', data, encoding: 'BYTE_STREAM_SPLIT' }], + compressed: true, + }) + const metadata = parquetMetadata(file) + const columnMetadata = metadata.row_groups[0].columns[0].meta_data + expect(columnMetadata?.encodings).toEqual(['BYTE_STREAM_SPLIT']) + expect(columnMetadata?.codec).toBe('SNAPPY') + const result = await parquetReadObjects({ file }) + expect(result.length).toBe(1000) + result.forEach((row, i) => { + expect(row.float).toBeCloseTo(i * 0.1, 5) + }) + }) +})