mirror of
https://github.com/asadbek064/hyparquet-writer.git
synced 2025-12-05 23:31:54 +00:00
Thrift writer
This commit is contained in:
parent
3a29bb7bca
commit
045256b478
@ -9,7 +9,8 @@ export default [
|
||||
|
||||
languageOptions: {
|
||||
globals: {
|
||||
'atob': false,
|
||||
'TextDecoder': false,
|
||||
'TextEncoder': false,
|
||||
},
|
||||
},
|
||||
|
||||
|
||||
@ -31,7 +31,7 @@
|
||||
"@vitest/coverage-v8": "3.0.9",
|
||||
"eslint": "9.22.0",
|
||||
"eslint-plugin-jsdoc": "50.6.8",
|
||||
"hyparquet": "1.9.1",
|
||||
"hyparquet": "1.10.0",
|
||||
"typescript": "5.8.2",
|
||||
"vitest": "3.0.9"
|
||||
}
|
||||
|
||||
214
src/thrift.js
Normal file
214
src/thrift.js
Normal file
@ -0,0 +1,214 @@
|
||||
// TCompactProtocol types
|
||||
const CompactType = {
|
||||
STOP: 0,
|
||||
TRUE: 1,
|
||||
FALSE: 2,
|
||||
BYTE: 3,
|
||||
I16: 4,
|
||||
I32: 5,
|
||||
I64: 6,
|
||||
DOUBLE: 7,
|
||||
BINARY: 8,
|
||||
LIST: 9,
|
||||
SET: 10,
|
||||
MAP: 11,
|
||||
STRUCT: 12,
|
||||
UUID: 13,
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialize a JS object in TCompactProtocol format.
|
||||
*
|
||||
* Expects keys named like "field_1", "field_2", etc. in ascending order.
|
||||
*
|
||||
* @import {Writer} from './types.js'
|
||||
* @param {Writer} writer
|
||||
* @param {Record<string, any>} data
|
||||
*/
|
||||
export function serializeTCompactProtocol(writer, data) {
|
||||
let lastFid = 0
|
||||
// Write each field
|
||||
for (const [key, value] of Object.entries(data)) {
|
||||
// We expect key = "field_N" so we can extract N as the field ID
|
||||
const fid = parseInt(key.replace(/^field_/, ''), 10)
|
||||
if (Number.isNaN(fid)) {
|
||||
throw new Error(`Invalid field name: ${key}. Expected "field_###" format.`)
|
||||
}
|
||||
|
||||
// Figure out which compact type to use
|
||||
const type = getCompactTypeForValue(value)
|
||||
|
||||
// Write the field-begin header: (delta << 4) | type
|
||||
const delta = fid - lastFid
|
||||
if (delta <= 0) {
|
||||
throw new Error(`Non-monotonic field ID. fid=${fid}, lastFid=${lastFid}`)
|
||||
}
|
||||
// High nibble = delta, low nibble = type
|
||||
writer.appendUint8(delta << 4 | type & 0x0f)
|
||||
|
||||
// Write the field content itself
|
||||
writeElement(writer, type, value)
|
||||
|
||||
lastFid = fid
|
||||
}
|
||||
|
||||
// Finally write STOP
|
||||
writer.appendUint8(CompactType.STOP)
|
||||
}
|
||||
|
||||
/**
|
||||
* Deduce a TCompactProtocol type from the JS value
|
||||
*
|
||||
* @param {any} value
|
||||
* @returns {number} CompactType
|
||||
*/
|
||||
function getCompactTypeForValue(value) {
|
||||
if (value === true) {
|
||||
return CompactType.TRUE
|
||||
}
|
||||
if (value === false) {
|
||||
return CompactType.FALSE
|
||||
}
|
||||
if (typeof value === 'number') {
|
||||
// We'll store integer as I32, otherwise DOUBLE
|
||||
return Number.isInteger(value) ? CompactType.I32 : CompactType.DOUBLE
|
||||
}
|
||||
if (typeof value === 'bigint') {
|
||||
return CompactType.I64
|
||||
}
|
||||
if (typeof value === 'string') {
|
||||
// Possibly treat 32-hex as a 16-byte UUID
|
||||
if (/^[0-9a-fA-F]{32}$/.test(value)) {
|
||||
return CompactType.UUID
|
||||
}
|
||||
return CompactType.BINARY
|
||||
}
|
||||
if (value instanceof Uint8Array) {
|
||||
return CompactType.BINARY
|
||||
}
|
||||
if (Array.isArray(value)) {
|
||||
return CompactType.LIST
|
||||
}
|
||||
if (value && typeof value === 'object') {
|
||||
return CompactType.STRUCT
|
||||
}
|
||||
throw new Error(`Cannot determine thrift compact type for: ${value}`)
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a single value of a given compact type.
|
||||
*
|
||||
* @param {Writer} writer
|
||||
* @param {number} type
|
||||
* @param {any} value
|
||||
*/
|
||||
function writeElement(writer, type, value) {
|
||||
switch (type) {
|
||||
case CompactType.TRUE:
|
||||
case CompactType.FALSE:
|
||||
return // true/false is stored in the type
|
||||
case CompactType.BYTE:
|
||||
writer.appendUint8(value)
|
||||
return
|
||||
case CompactType.I16:
|
||||
case CompactType.I32: {
|
||||
// ZigZag -> varint
|
||||
// For 32-bit int: zigzag = (n << 1) ^ (n >> 31)
|
||||
const zigzag = value << 1 ^ value >> 31
|
||||
writer.appendVarInt(zigzag)
|
||||
return
|
||||
}
|
||||
case CompactType.I64: {
|
||||
// For 64-bit (bigint) we do (value << 1n) ^ (value >> 63n) in zigzag
|
||||
const n = BigInt(value)
|
||||
const zigzag = n << 1n ^ n >> 63n
|
||||
writer.appendVarBigInt(zigzag)
|
||||
return
|
||||
}
|
||||
case CompactType.DOUBLE:
|
||||
writer.appendFloat64(value)
|
||||
return
|
||||
case CompactType.BINARY: {
|
||||
// store length as a varint, then raw bytes
|
||||
let bytes
|
||||
if (typeof value === 'string') {
|
||||
bytes = new TextEncoder().encode(value)
|
||||
} else {
|
||||
// e.g. Uint8Array
|
||||
bytes = value
|
||||
}
|
||||
writer.appendVarInt(bytes.length)
|
||||
writer.appendBuffer(bytes)
|
||||
return
|
||||
}
|
||||
case CompactType.LIST: {
|
||||
// Must store (size << 4) | elementType
|
||||
// We'll guess the element type from the first element
|
||||
const arr = value
|
||||
const size = arr.length
|
||||
if (size === 0) {
|
||||
// (0 << 4) | type for an empty list – pick BYTE arbitrarily
|
||||
writer.appendUint8(0 << 4 | CompactType.BYTE)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: Check for heterogeneous lists?
|
||||
const elemType = getCompactTypeForValue(arr[0])
|
||||
|
||||
const sizeNibble = size > 14 ? 15 : size
|
||||
writer.appendUint8(sizeNibble << 4 | elemType)
|
||||
if (size > 14) {
|
||||
writer.appendVarInt(size)
|
||||
}
|
||||
|
||||
// Special trick for booleans in a list
|
||||
if (elemType === CompactType.TRUE || elemType === CompactType.FALSE) {
|
||||
// Write each boolean as a single 0 or 1 byte
|
||||
for (const v of arr) {
|
||||
writer.appendUint8(v ? 1 : 0)
|
||||
}
|
||||
} else {
|
||||
// Otherwise write them out normally
|
||||
for (const v of arr) {
|
||||
writeElement(writer, elemType, v)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
case CompactType.STRUCT: {
|
||||
// Recursively write sub-fields as "field_N: val", end with STOP
|
||||
let lastFid = 0
|
||||
for (const [k, v] of Object.entries(value)) {
|
||||
const fid = parseInt(k.replace(/^field_/, ''), 10)
|
||||
if (Number.isNaN(fid)) {
|
||||
throw new Error(`Invalid sub-field name: ${k}. Expected "field_###"`)
|
||||
}
|
||||
const t = getCompactTypeForValue(v)
|
||||
const delta = fid - lastFid
|
||||
if (delta <= 0) {
|
||||
throw new Error(`Non-monotonic fid in struct: fid=${fid}, lastFid=${lastFid}`)
|
||||
}
|
||||
writer.appendUint8(delta << 4 | t & 0x0f)
|
||||
writeElement(writer, t, v)
|
||||
lastFid = fid
|
||||
}
|
||||
// Write STOP
|
||||
writer.appendUint8(CompactType.STOP)
|
||||
return
|
||||
}
|
||||
case CompactType.UUID: {
|
||||
// Expect a 32-hex string. Write 16 bytes
|
||||
if (typeof value !== 'string' || value.length !== 32) {
|
||||
throw new Error(`Expected 32-hex string for UUID, got ${value}`)
|
||||
}
|
||||
for (let i = 0; i < 16; i++) {
|
||||
const byte = parseInt(value.slice(i * 2, i * 2 + 2), 16)
|
||||
writer.appendUint8(byte)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
default:
|
||||
throw new Error(`Unhandled type in writeElement: ${type}`)
|
||||
}
|
||||
}
|
||||
11
src/types.d.ts
vendored
Normal file
11
src/types.d.ts
vendored
Normal file
@ -0,0 +1,11 @@
|
||||
|
||||
export interface Writer {
|
||||
buffer: ArrayBuffer
|
||||
offset: number
|
||||
appendUint8(value: number): void
|
||||
appendUint32(value: number): void
|
||||
appendFloat64(value: number): void
|
||||
appendBuffer(buffer: ArrayBuffer): void
|
||||
appendVarInt(value: number): void
|
||||
appendVarBigInt(value: bigint): void
|
||||
}
|
||||
101
src/writer.js
Normal file
101
src/writer.js
Normal file
@ -0,0 +1,101 @@
|
||||
|
||||
/**
|
||||
* Self-expanding buffer view
|
||||
*
|
||||
* @import {Writer} from './types.js'
|
||||
* @returns {Writer}
|
||||
*/
|
||||
export function Writer() {
|
||||
this.buffer = new ArrayBuffer(1024)
|
||||
this.offset = 0
|
||||
this.view = new DataView(this.buffer)
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {number} size
|
||||
*/
|
||||
Writer.prototype.ensure = function(size) {
|
||||
if (this.offset + size > this.buffer.byteLength) {
|
||||
const newSize = Math.max(this.buffer.byteLength * 2, this.offset + size)
|
||||
const newBuffer = new ArrayBuffer(newSize)
|
||||
new Uint8Array(newBuffer).set(new Uint8Array(this.buffer))
|
||||
this.buffer = newBuffer
|
||||
this.view = new DataView(this.buffer)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {number} value
|
||||
*/
|
||||
Writer.prototype.appendUint8 = function(value) {
|
||||
this.ensure(this.offset + 1)
|
||||
this.view.setUint8(this.offset, value)
|
||||
this.offset++
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {number} value
|
||||
*/
|
||||
Writer.prototype.appendUint32 = function(value) {
|
||||
this.ensure(this.offset + 4)
|
||||
this.view.setUint32(this.offset, value, true)
|
||||
this.offset += 4
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {number} value
|
||||
*/
|
||||
Writer.prototype.appendFloat64 = function(value) {
|
||||
this.ensure(this.offset + 8)
|
||||
this.view.setFloat64(this.offset, value, true)
|
||||
this.offset += 8
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {ArrayBuffer} value
|
||||
*/
|
||||
Writer.prototype.appendBuffer = function(value) {
|
||||
this.ensure(this.offset + value.byteLength)
|
||||
new Uint8Array(this.buffer, this.offset, value.byteLength).set(new Uint8Array(value))
|
||||
this.offset += value.byteLength
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a 32-bit signed integer to varint (1-5 bytes).
|
||||
* Writes out groups of 7 bits at a time, setting high bit if more to come.
|
||||
*
|
||||
* @param {number} value
|
||||
*/
|
||||
Writer.prototype.appendVarInt = function(value) {
|
||||
while (true) {
|
||||
if ((value & ~0x7f) === 0) {
|
||||
// fits in 7 bits
|
||||
this.appendUint8(value)
|
||||
return
|
||||
} else {
|
||||
// write 7 bits and set high bit
|
||||
this.appendUint8(value & 0x7f | 0x80)
|
||||
value >>>= 7
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a bigint to varint (1-10 bytes for 64-bit range).
|
||||
*
|
||||
* @param {bigint} value
|
||||
*/
|
||||
Writer.prototype.appendVarBigInt = function(value) {
|
||||
while (true) {
|
||||
if ((value & ~0x7fn) === 0n) {
|
||||
// fits in 7 bits
|
||||
this.appendUint8(Number(value))
|
||||
return
|
||||
} else {
|
||||
// write 7 bits and set high bit
|
||||
this.appendUint8(Number(value & 0x7fn | 0x80n))
|
||||
value >>= 7n
|
||||
}
|
||||
}
|
||||
}
|
||||
108
test/thrift.test.js
Normal file
108
test/thrift.test.js
Normal file
@ -0,0 +1,108 @@
|
||||
import { deserializeTCompactProtocol } from 'hyparquet/src/thrift.js'
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import { serializeTCompactProtocol } from '../src/thrift.js'
|
||||
import { Writer } from '../src/writer.js'
|
||||
|
||||
/**
|
||||
* Utility to decode a Thrift-serialized buffer and return the parsed object.
|
||||
* @param {ArrayBuffer} buf
|
||||
* @returns {Record<string, any>}
|
||||
*/
|
||||
function roundTripDeserialize(buf) {
|
||||
const view = new DataView(buf)
|
||||
const reader = { view, offset: 0 }
|
||||
return deserializeTCompactProtocol(reader)
|
||||
}
|
||||
|
||||
describe('serializeTCompactProtocol', () => {
|
||||
it('serializes basic types correctly', () => {
|
||||
const data = {
|
||||
field_1: true, // BOOL -> TRUE
|
||||
field_2: false, // BOOL -> FALSE
|
||||
field_3: 127, // BYTE / I32
|
||||
field_4: 0x7fff, // I16 / I32
|
||||
field_5: 0x7fffffff, // I32
|
||||
field_6: BigInt('0x7fffffffffffffff'), // I64
|
||||
field_7: 123.456, // DOUBLE
|
||||
// BINARY (string as Uint8Array):
|
||||
field_8: new TextEncoder().encode('Hello, Thrift!'),
|
||||
}
|
||||
|
||||
const writer = new Writer()
|
||||
serializeTCompactProtocol(writer, data)
|
||||
const buf = writer.buffer.slice(0, writer.offset)
|
||||
const result = roundTripDeserialize(buf)
|
||||
|
||||
expect(result.field_1).toBe(true)
|
||||
expect(result.field_2).toBe(false)
|
||||
expect(result.field_3).toBe(127)
|
||||
expect(result.field_4).toBe(0x7fff)
|
||||
expect(result.field_5).toBe(0x7fffffff)
|
||||
expect(result.field_6).toBe(BigInt('0x7fffffffffffffff'))
|
||||
expect(result.field_7).toBeCloseTo(123.456)
|
||||
// Decode the binary back into a string
|
||||
const decodedString = new TextDecoder().decode(result.field_8)
|
||||
expect(decodedString).toBe('Hello, Thrift!')
|
||||
})
|
||||
|
||||
it('serializes a nested STRUCT and LIST of booleans', () => {
|
||||
const data = {
|
||||
field_1: {
|
||||
field_1: 42,
|
||||
field_2: {
|
||||
field_1: true,
|
||||
field_2: false,
|
||||
},
|
||||
},
|
||||
// List of booleans
|
||||
field_2: [true, false, true, false],
|
||||
}
|
||||
|
||||
const writer = new Writer()
|
||||
serializeTCompactProtocol(writer, data)
|
||||
const buf = writer.buffer.slice(0, writer.offset)
|
||||
const result = roundTripDeserialize(buf)
|
||||
|
||||
expect(result.field_1.field_1).toBe(42)
|
||||
expect(result.field_1.field_2.field_1).toBe(true)
|
||||
expect(result.field_1.field_2.field_2).toBe(false)
|
||||
expect(result.field_2).toEqual([true, false, true, false])
|
||||
})
|
||||
|
||||
it('serializes a UUID correctly', () => {
|
||||
// 32 hex chars => 16 bytes
|
||||
const uuidHex = '00112233445566778899aabbccddeeff'
|
||||
const data = { field_1: uuidHex }
|
||||
|
||||
const writer = new Writer()
|
||||
serializeTCompactProtocol(writer, data)
|
||||
const buf = writer.buffer.slice(0, writer.offset)
|
||||
const result = roundTripDeserialize(buf)
|
||||
|
||||
// Should come back as the same string
|
||||
expect(result.field_1).toBe(uuidHex)
|
||||
})
|
||||
|
||||
it('handles empty object (only STOP)', () => {
|
||||
const data = {}
|
||||
const writer = new Writer()
|
||||
serializeTCompactProtocol(writer, data)
|
||||
const buf = writer.buffer.slice(0, writer.offset)
|
||||
const arr = new Uint8Array(buf)
|
||||
// The entire buffer should just be [0x00] = STOP
|
||||
expect(arr).toEqual(new Uint8Array([0x00]))
|
||||
|
||||
// Round-trip: should deserialize to an empty object
|
||||
const result = roundTripDeserialize(buf)
|
||||
expect(result).toEqual({})
|
||||
})
|
||||
|
||||
it('throws on non-monotonic field IDs', () => {
|
||||
const invalidData = {
|
||||
field_2: 2,
|
||||
field_1: 1, // field_1 is out of order (less than field_2)
|
||||
}
|
||||
const writer = new Writer()
|
||||
expect(() => serializeTCompactProtocol(writer, invalidData)).toThrow()
|
||||
})
|
||||
})
|
||||
@ -6,9 +6,7 @@
|
||||
"module": "nodenext",
|
||||
"noEmit": true,
|
||||
"resolveJsonModule": true,
|
||||
"skipLibCheck": false,
|
||||
"strict": true,
|
||||
"target": "esnext"
|
||||
"strict": true
|
||||
},
|
||||
"include": ["src", "test"]
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user