From 045256b47836b4a4bb952d3a6b54e8f0de15f8ef Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Tue, 25 Mar 2025 10:27:15 -0700 Subject: [PATCH] Thrift writer --- eslint.config.js | 3 +- package.json | 2 +- src/thrift.js | 214 ++++++++++++++++++++++++++++++++++++++++++++ src/types.d.ts | 11 +++ src/writer.js | 101 +++++++++++++++++++++ test/thrift.test.js | 108 ++++++++++++++++++++++ tsconfig.json | 4 +- 7 files changed, 438 insertions(+), 5 deletions(-) create mode 100644 src/thrift.js create mode 100644 src/types.d.ts create mode 100644 src/writer.js create mode 100644 test/thrift.test.js diff --git a/eslint.config.js b/eslint.config.js index b4bcb8e..8f2cde6 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -9,7 +9,8 @@ export default [ languageOptions: { globals: { - 'atob': false, + 'TextDecoder': false, + 'TextEncoder': false, }, }, diff --git a/package.json b/package.json index a176b9b..1a706f8 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "@vitest/coverage-v8": "3.0.9", "eslint": "9.22.0", "eslint-plugin-jsdoc": "50.6.8", - "hyparquet": "1.9.1", + "hyparquet": "1.10.0", "typescript": "5.8.2", "vitest": "3.0.9" } diff --git a/src/thrift.js b/src/thrift.js new file mode 100644 index 0000000..a07af5d --- /dev/null +++ b/src/thrift.js @@ -0,0 +1,214 @@ +// TCompactProtocol types +const CompactType = { + STOP: 0, + TRUE: 1, + FALSE: 2, + BYTE: 3, + I16: 4, + I32: 5, + I64: 6, + DOUBLE: 7, + BINARY: 8, + LIST: 9, + SET: 10, + MAP: 11, + STRUCT: 12, + UUID: 13, +} + +/** + * Serialize a JS object in TCompactProtocol format. + * + * Expects keys named like "field_1", "field_2", etc. in ascending order. + * + * @import {Writer} from './types.js' + * @param {Writer} writer + * @param {Record} data + */ +export function serializeTCompactProtocol(writer, data) { + let lastFid = 0 + // Write each field + for (const [key, value] of Object.entries(data)) { + // We expect key = "field_N" so we can extract N as the field ID + const fid = parseInt(key.replace(/^field_/, ''), 10) + if (Number.isNaN(fid)) { + throw new Error(`Invalid field name: ${key}. Expected "field_###" format.`) + } + + // Figure out which compact type to use + const type = getCompactTypeForValue(value) + + // Write the field-begin header: (delta << 4) | type + const delta = fid - lastFid + if (delta <= 0) { + throw new Error(`Non-monotonic field ID. fid=${fid}, lastFid=${lastFid}`) + } + // High nibble = delta, low nibble = type + writer.appendUint8(delta << 4 | type & 0x0f) + + // Write the field content itself + writeElement(writer, type, value) + + lastFid = fid + } + + // Finally write STOP + writer.appendUint8(CompactType.STOP) +} + +/** + * Deduce a TCompactProtocol type from the JS value + * + * @param {any} value + * @returns {number} CompactType + */ +function getCompactTypeForValue(value) { + if (value === true) { + return CompactType.TRUE + } + if (value === false) { + return CompactType.FALSE + } + if (typeof value === 'number') { + // We'll store integer as I32, otherwise DOUBLE + return Number.isInteger(value) ? CompactType.I32 : CompactType.DOUBLE + } + if (typeof value === 'bigint') { + return CompactType.I64 + } + if (typeof value === 'string') { + // Possibly treat 32-hex as a 16-byte UUID + if (/^[0-9a-fA-F]{32}$/.test(value)) { + return CompactType.UUID + } + return CompactType.BINARY + } + if (value instanceof Uint8Array) { + return CompactType.BINARY + } + if (Array.isArray(value)) { + return CompactType.LIST + } + if (value && typeof value === 'object') { + return CompactType.STRUCT + } + throw new Error(`Cannot determine thrift compact type for: ${value}`) +} + +/** + * Write a single value of a given compact type. + * + * @param {Writer} writer + * @param {number} type + * @param {any} value + */ +function writeElement(writer, type, value) { + switch (type) { + case CompactType.TRUE: + case CompactType.FALSE: + return // true/false is stored in the type + case CompactType.BYTE: + writer.appendUint8(value) + return + case CompactType.I16: + case CompactType.I32: { + // ZigZag -> varint + // For 32-bit int: zigzag = (n << 1) ^ (n >> 31) + const zigzag = value << 1 ^ value >> 31 + writer.appendVarInt(zigzag) + return + } + case CompactType.I64: { + // For 64-bit (bigint) we do (value << 1n) ^ (value >> 63n) in zigzag + const n = BigInt(value) + const zigzag = n << 1n ^ n >> 63n + writer.appendVarBigInt(zigzag) + return + } + case CompactType.DOUBLE: + writer.appendFloat64(value) + return + case CompactType.BINARY: { + // store length as a varint, then raw bytes + let bytes + if (typeof value === 'string') { + bytes = new TextEncoder().encode(value) + } else { + // e.g. Uint8Array + bytes = value + } + writer.appendVarInt(bytes.length) + writer.appendBuffer(bytes) + return + } + case CompactType.LIST: { + // Must store (size << 4) | elementType + // We'll guess the element type from the first element + const arr = value + const size = arr.length + if (size === 0) { + // (0 << 4) | type for an empty list – pick BYTE arbitrarily + writer.appendUint8(0 << 4 | CompactType.BYTE) + return + } + + // TODO: Check for heterogeneous lists? + const elemType = getCompactTypeForValue(arr[0]) + + const sizeNibble = size > 14 ? 15 : size + writer.appendUint8(sizeNibble << 4 | elemType) + if (size > 14) { + writer.appendVarInt(size) + } + + // Special trick for booleans in a list + if (elemType === CompactType.TRUE || elemType === CompactType.FALSE) { + // Write each boolean as a single 0 or 1 byte + for (const v of arr) { + writer.appendUint8(v ? 1 : 0) + } + } else { + // Otherwise write them out normally + for (const v of arr) { + writeElement(writer, elemType, v) + } + } + return + } + case CompactType.STRUCT: { + // Recursively write sub-fields as "field_N: val", end with STOP + let lastFid = 0 + for (const [k, v] of Object.entries(value)) { + const fid = parseInt(k.replace(/^field_/, ''), 10) + if (Number.isNaN(fid)) { + throw new Error(`Invalid sub-field name: ${k}. Expected "field_###"`) + } + const t = getCompactTypeForValue(v) + const delta = fid - lastFid + if (delta <= 0) { + throw new Error(`Non-monotonic fid in struct: fid=${fid}, lastFid=${lastFid}`) + } + writer.appendUint8(delta << 4 | t & 0x0f) + writeElement(writer, t, v) + lastFid = fid + } + // Write STOP + writer.appendUint8(CompactType.STOP) + return + } + case CompactType.UUID: { + // Expect a 32-hex string. Write 16 bytes + if (typeof value !== 'string' || value.length !== 32) { + throw new Error(`Expected 32-hex string for UUID, got ${value}`) + } + for (let i = 0; i < 16; i++) { + const byte = parseInt(value.slice(i * 2, i * 2 + 2), 16) + writer.appendUint8(byte) + } + return + } + + default: + throw new Error(`Unhandled type in writeElement: ${type}`) + } +} diff --git a/src/types.d.ts b/src/types.d.ts new file mode 100644 index 0000000..4e17945 --- /dev/null +++ b/src/types.d.ts @@ -0,0 +1,11 @@ + +export interface Writer { + buffer: ArrayBuffer + offset: number + appendUint8(value: number): void + appendUint32(value: number): void + appendFloat64(value: number): void + appendBuffer(buffer: ArrayBuffer): void + appendVarInt(value: number): void + appendVarBigInt(value: bigint): void +} diff --git a/src/writer.js b/src/writer.js new file mode 100644 index 0000000..858b6a7 --- /dev/null +++ b/src/writer.js @@ -0,0 +1,101 @@ + +/** + * Self-expanding buffer view + * + * @import {Writer} from './types.js' + * @returns {Writer} + */ +export function Writer() { + this.buffer = new ArrayBuffer(1024) + this.offset = 0 + this.view = new DataView(this.buffer) + return this +} + +/** + * @param {number} size + */ +Writer.prototype.ensure = function(size) { + if (this.offset + size > this.buffer.byteLength) { + const newSize = Math.max(this.buffer.byteLength * 2, this.offset + size) + const newBuffer = new ArrayBuffer(newSize) + new Uint8Array(newBuffer).set(new Uint8Array(this.buffer)) + this.buffer = newBuffer + this.view = new DataView(this.buffer) + } +} + +/** + * @param {number} value + */ +Writer.prototype.appendUint8 = function(value) { + this.ensure(this.offset + 1) + this.view.setUint8(this.offset, value) + this.offset++ +} + +/** + * @param {number} value + */ +Writer.prototype.appendUint32 = function(value) { + this.ensure(this.offset + 4) + this.view.setUint32(this.offset, value, true) + this.offset += 4 +} + +/** + * @param {number} value + */ +Writer.prototype.appendFloat64 = function(value) { + this.ensure(this.offset + 8) + this.view.setFloat64(this.offset, value, true) + this.offset += 8 +} + +/** + * @param {ArrayBuffer} value + */ +Writer.prototype.appendBuffer = function(value) { + this.ensure(this.offset + value.byteLength) + new Uint8Array(this.buffer, this.offset, value.byteLength).set(new Uint8Array(value)) + this.offset += value.byteLength +} + +/** + * Convert a 32-bit signed integer to varint (1-5 bytes). + * Writes out groups of 7 bits at a time, setting high bit if more to come. + * + * @param {number} value + */ +Writer.prototype.appendVarInt = function(value) { + while (true) { + if ((value & ~0x7f) === 0) { + // fits in 7 bits + this.appendUint8(value) + return + } else { + // write 7 bits and set high bit + this.appendUint8(value & 0x7f | 0x80) + value >>>= 7 + } + } +} + +/** + * Convert a bigint to varint (1-10 bytes for 64-bit range). + * + * @param {bigint} value + */ +Writer.prototype.appendVarBigInt = function(value) { + while (true) { + if ((value & ~0x7fn) === 0n) { + // fits in 7 bits + this.appendUint8(Number(value)) + return + } else { + // write 7 bits and set high bit + this.appendUint8(Number(value & 0x7fn | 0x80n)) + value >>= 7n + } + } +} diff --git a/test/thrift.test.js b/test/thrift.test.js new file mode 100644 index 0000000..049429c --- /dev/null +++ b/test/thrift.test.js @@ -0,0 +1,108 @@ +import { deserializeTCompactProtocol } from 'hyparquet/src/thrift.js' +import { describe, expect, it } from 'vitest' +import { serializeTCompactProtocol } from '../src/thrift.js' +import { Writer } from '../src/writer.js' + +/** + * Utility to decode a Thrift-serialized buffer and return the parsed object. + * @param {ArrayBuffer} buf + * @returns {Record} + */ +function roundTripDeserialize(buf) { + const view = new DataView(buf) + const reader = { view, offset: 0 } + return deserializeTCompactProtocol(reader) +} + +describe('serializeTCompactProtocol', () => { + it('serializes basic types correctly', () => { + const data = { + field_1: true, // BOOL -> TRUE + field_2: false, // BOOL -> FALSE + field_3: 127, // BYTE / I32 + field_4: 0x7fff, // I16 / I32 + field_5: 0x7fffffff, // I32 + field_6: BigInt('0x7fffffffffffffff'), // I64 + field_7: 123.456, // DOUBLE + // BINARY (string as Uint8Array): + field_8: new TextEncoder().encode('Hello, Thrift!'), + } + + const writer = new Writer() + serializeTCompactProtocol(writer, data) + const buf = writer.buffer.slice(0, writer.offset) + const result = roundTripDeserialize(buf) + + expect(result.field_1).toBe(true) + expect(result.field_2).toBe(false) + expect(result.field_3).toBe(127) + expect(result.field_4).toBe(0x7fff) + expect(result.field_5).toBe(0x7fffffff) + expect(result.field_6).toBe(BigInt('0x7fffffffffffffff')) + expect(result.field_7).toBeCloseTo(123.456) + // Decode the binary back into a string + const decodedString = new TextDecoder().decode(result.field_8) + expect(decodedString).toBe('Hello, Thrift!') + }) + + it('serializes a nested STRUCT and LIST of booleans', () => { + const data = { + field_1: { + field_1: 42, + field_2: { + field_1: true, + field_2: false, + }, + }, + // List of booleans + field_2: [true, false, true, false], + } + + const writer = new Writer() + serializeTCompactProtocol(writer, data) + const buf = writer.buffer.slice(0, writer.offset) + const result = roundTripDeserialize(buf) + + expect(result.field_1.field_1).toBe(42) + expect(result.field_1.field_2.field_1).toBe(true) + expect(result.field_1.field_2.field_2).toBe(false) + expect(result.field_2).toEqual([true, false, true, false]) + }) + + it('serializes a UUID correctly', () => { + // 32 hex chars => 16 bytes + const uuidHex = '00112233445566778899aabbccddeeff' + const data = { field_1: uuidHex } + + const writer = new Writer() + serializeTCompactProtocol(writer, data) + const buf = writer.buffer.slice(0, writer.offset) + const result = roundTripDeserialize(buf) + + // Should come back as the same string + expect(result.field_1).toBe(uuidHex) + }) + + it('handles empty object (only STOP)', () => { + const data = {} + const writer = new Writer() + serializeTCompactProtocol(writer, data) + const buf = writer.buffer.slice(0, writer.offset) + const arr = new Uint8Array(buf) + // The entire buffer should just be [0x00] = STOP + expect(arr).toEqual(new Uint8Array([0x00])) + + // Round-trip: should deserialize to an empty object + const result = roundTripDeserialize(buf) + expect(result).toEqual({}) + }) + + it('throws on non-monotonic field IDs', () => { + const invalidData = { + field_2: 2, + field_1: 1, // field_1 is out of order (less than field_2) + } + const writer = new Writer() + expect(() => serializeTCompactProtocol(writer, invalidData)).toThrow() + }) +}) diff --git a/tsconfig.json b/tsconfig.json index 03324cb..04d5724 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -6,9 +6,7 @@ "module": "nodenext", "noEmit": true, "resolveJsonModule": true, - "skipLibCheck": false, - "strict": true, - "target": "esnext" + "strict": true }, "include": ["src", "test"] }