UUID and improved fixed length byte array support

This commit is contained in:
Kenny Daniel 2025-04-21 12:16:28 -07:00
parent edabe74bd6
commit b11b92ffb9
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
6 changed files with 141 additions and 30 deletions

@ -14,7 +14,7 @@ import { writeDataPageV2, writePageHeader } from './datapage.js'
*/
export function writeColumn(writer, schemaPath, values, compressed, stats) {
const schemaElement = schemaPath[schemaPath.length - 1]
const { type } = schemaElement
const { type, type_length } = schemaElement
if (!type) throw new Error(`column ${schemaElement.name} cannot determine type`)
const offsetStart = writer.offset
const num_values = values.length
@ -42,7 +42,7 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) {
// write unconverted dictionary page
const unconverted = unconvert(schemaElement, dictionary)
writeDictionaryPage(writer, unconverted, type, compressed)
writeDictionaryPage(writer, unconverted, type, type_length, compressed)
// write data page with dictionary indexes
data_page_offset = BigInt(writer.offset)
@ -92,11 +92,12 @@ function useDictionary(values, type) {
* @param {Writer} writer
* @param {DecodedArray} dictionary
* @param {ParquetType} type
* @param {number | undefined} fixedLength
* @param {boolean} compressed
*/
function writeDictionaryPage(writer, dictionary, type, compressed) {
function writeDictionaryPage(writer, dictionary, type, fixedLength, compressed) {
const dictionaryPage = new ByteWriter()
writePlain(dictionaryPage, dictionary, type)
writePlain(dictionaryPage, dictionary, type, fixedLength)
// compress dictionary page data
let compressedDictionaryPage = dictionaryPage

@ -16,7 +16,7 @@ import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
* @param {boolean} compressed
*/
export function writeDataPageV2(writer, values, type, schemaPath, encoding, compressed) {
const num_values = values.length
const fixedLength = schemaPath.at(-1)?.type_length
// write levels to temp buffer
const levels = new ByteWriter()
@ -39,7 +39,7 @@ export function writeDataPageV2(writer, values, type, schemaPath, encoding, comp
page.appendUint8(bitWidth) // prepend bitWidth
writeRleBitPackedHybrid(page, nonnull, bitWidth)
} else {
writePlain(page, nonnull, type)
writePlain(page, nonnull, type, fixedLength)
}
// compress page data
@ -55,9 +55,9 @@ export function writeDataPageV2(writer, values, type, schemaPath, encoding, comp
uncompressed_page_size: levels.offset + page.offset,
compressed_page_size: levels.offset + compressedPage.offset,
data_page_header_v2: {
num_values,
num_values: values.length,
num_nulls,
num_rows: num_values,
num_rows: values.length,
encoding,
definition_levels_byte_length,
repetition_levels_byte_length,

@ -5,8 +5,9 @@
* @param {Writer} writer
* @param {DecodedArray} values
* @param {ParquetType} type
* @param {number | undefined} fixedLength
*/
export function writePlain(writer, values, type) {
export function writePlain(writer, values, type, fixedLength) {
if (type === 'BOOLEAN') {
writePlainBoolean(writer, values)
} else if (type === 'INT32') {
@ -20,7 +21,8 @@ export function writePlain(writer, values, type) {
} else if (type === 'BYTE_ARRAY') {
writePlainByteArray(writer, values)
} else if (type === 'FIXED_LEN_BYTE_ARRAY') {
writePlainByteArrayFixed(writer, values)
if (!fixedLength) throw new Error('parquet FIXED_LEN_BYTE_ARRAY expected type_length')
writePlainByteArrayFixed(writer, values, fixedLength)
} else {
throw new Error(`parquet unsupported type: ${type}`)
}
@ -120,10 +122,12 @@ function writePlainByteArray(writer, values) {
/**
* @param {Writer} writer
* @param {DecodedArray} values
* @param {number} fixedLength
*/
function writePlainByteArrayFixed(writer, values) {
function writePlainByteArrayFixed(writer, values, fixedLength) {
for (const value of values) {
if (!(value instanceof Uint8Array)) throw new Error('parquet expected Uint8Array value')
if (value.length !== fixedLength) throw new Error(`parquet expected Uint8Array of length ${fixedLength}`)
writer.appendBytes(value)
}
}

@ -9,7 +9,7 @@ const dayMillis = 86400000 // 1 day in milliseconds
* @returns {DecodedArray}
*/
export function unconvert(element, values) {
const { converted_type: ctype, logical_type: ltype } = element
const { type, converted_type: ctype, logical_type: ltype } = element
if (ctype === 'DECIMAL') {
const factor = 10 ** (element.scale || 0)
return values.map(v => {
@ -32,17 +32,47 @@ export function unconvert(element, values) {
const encoder = new TextEncoder()
return values.map(v => encoder.encode(JSON.stringify(v)))
}
if (ltype?.type === 'FLOAT16') {
return Array.from(values).map(unconvertFloat16)
}
if (ctype === 'UTF8') {
if (!Array.isArray(values)) throw new Error('strings must be an array')
const encoder = new TextEncoder()
return values.map(v => encoder.encode(v))
}
if (ltype?.type === 'FLOAT16') {
if (type !== 'FIXED_LEN_BYTE_ARRAY') throw new Error('FLOAT16 must be FIXED_LEN_BYTE_ARRAY type')
if (element.type_length !== 2) throw new Error('FLOAT16 expected type_length to be 2 bytes')
return Array.from(values).map(unconvertFloat16)
}
if (ltype?.type === 'UUID') {
if (!Array.isArray(values)) throw new Error('UUID must be an array')
if (type !== 'FIXED_LEN_BYTE_ARRAY') throw new Error('UUID must be FIXED_LEN_BYTE_ARRAY type')
if (element.type_length !== 16) throw new Error('UUID expected type_length to be 16 bytes')
return values.map(unconvertUuid)
}
return values
}
/**
* @param {Uint8Array | string | undefined} value
* @returns {Uint8Array | undefined}
*/
function unconvertUuid(value) {
if (value === undefined || value === null) return
if (value instanceof Uint8Array) return value
if (typeof value === 'string') {
const uuidRegex = /^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$/i
if (!uuidRegex.test(value)) {
throw new Error('UUID must be a valid UUID string')
}
value = value.replace(/-/g, '').toLowerCase()
const bytes = new Uint8Array(16)
for (let i = 0; i < 16; i++) {
bytes[i] = parseInt(value.slice(i * 2, i * 2 + 2), 16)
}
return bytes
}
throw new Error('UUID must be a string or Uint8Array')
}
/**
* Uncovert from rich type to byte array for metadata statistics.
*

@ -6,7 +6,7 @@ describe('writePlain', () => {
it('writes BOOLEAN (multiple of 8 bits, plus leftover)', () => {
const writer = new ByteWriter()
const booleans = [true, false, true, true, false, false, false, true, true]
writePlain(writer, booleans, 'BOOLEAN')
writePlain(writer, booleans, 'BOOLEAN', undefined)
expect(writer.offset).toBe(2)
expect(writer.view.getUint8(0)).toBe(0b10001101)
@ -16,7 +16,7 @@ describe('writePlain', () => {
it('writes INT32', () => {
const writer = new ByteWriter()
const ints = [0, 1, 255, 256, 65535, -1, -2147483648, 2147483647]
writePlain(writer, ints, 'INT32')
writePlain(writer, ints, 'INT32', undefined)
// 4 bytes per int
expect(writer.offset).toBe(4 * ints.length)
@ -30,7 +30,7 @@ describe('writePlain', () => {
it('writes INT64', () => {
const writer = new ByteWriter()
const bigints = [0n, 1n, 42n, BigInt(2 ** 53 - 1)]
writePlain(writer, bigints, 'INT64')
writePlain(writer, bigints, 'INT64', undefined)
// 8 bytes per int64
expect(writer.offset).toBe(8 * bigints.length)
@ -44,7 +44,7 @@ describe('writePlain', () => {
it('writes FLOAT', () => {
const writer = new ByteWriter()
const floats = [0, 300.5, -2.7100000381469727, Infinity, -Infinity, NaN]
writePlain(writer, floats, 'FLOAT')
writePlain(writer, floats, 'FLOAT', undefined)
// 4 bytes per float
expect(writer.offset).toBe(4 * floats.length)
@ -62,7 +62,7 @@ describe('writePlain', () => {
it('writes DOUBLE', () => {
const writer = new ByteWriter()
const doubles = [0, 3.14, -2.71, Infinity, -Infinity, NaN]
writePlain(writer, doubles, 'DOUBLE')
writePlain(writer, doubles, 'DOUBLE', undefined)
// 8 bytes per double
expect(writer.offset).toBe(8 * doubles.length)
@ -80,7 +80,7 @@ describe('writePlain', () => {
it('writes BYTE_ARRAY', () => {
const writer = new ByteWriter()
const strings = ['a', 'b', 'c', 'd']
writePlain(writer, strings, 'BYTE_ARRAY')
writePlain(writer, strings, 'BYTE_ARRAY', undefined)
let offset = 0
for (const s of strings) {
@ -100,7 +100,7 @@ describe('writePlain', () => {
const encoder = new TextEncoder()
const strings = ['abcd', 'efgh', 'ijkl']
.map(s => encoder.encode(s))
writePlain(writer, strings, 'FIXED_LEN_BYTE_ARRAY')
writePlain(writer, strings, 'FIXED_LEN_BYTE_ARRAY', 4)
let offset = 0
for (const s of strings) {
@ -113,25 +113,27 @@ describe('writePlain', () => {
it('throws error on unsupported type', () => {
const writer = new ByteWriter()
expect(() => writePlain(writer, [1, 2, 3], 'INT96'))
expect(() => writePlain(writer, [1, 2, 3], 'INT96', undefined))
.toThrow(/parquet unsupported type/i)
})
it('throws error on type mismatch', () => {
const writer = new ByteWriter()
expect(() => writePlain(writer, [1, 2, 3], 'BOOLEAN'))
expect(() => writePlain(writer, [1, 2, 3], 'BOOLEAN', undefined))
.toThrow('parquet expected boolean value')
expect(() => writePlain(writer, [1, 2, 3.5], 'INT32'))
expect(() => writePlain(writer, [1, 2, 3.5], 'INT32', undefined))
.toThrow('parquet expected integer value')
expect(() => writePlain(writer, [1n, 2n, 3], 'INT64'))
expect(() => writePlain(writer, [1n, 2n, 3], 'INT64', undefined))
.toThrow('parquet expected bigint value')
expect(() => writePlain(writer, [1, 2, 3n], 'FLOAT'))
expect(() => writePlain(writer, [1, 2, 3n], 'FLOAT', undefined))
.toThrow('parquet expected number value')
expect(() => writePlain(writer, [1, 2, 3n], 'DOUBLE'))
expect(() => writePlain(writer, [1, 2, 3n], 'DOUBLE', undefined))
.toThrow('parquet expected number value')
expect(() => writePlain(writer, [1, 2, 3], 'BYTE_ARRAY'))
expect(() => writePlain(writer, [1, 2, 3], 'BYTE_ARRAY', undefined))
.toThrow('parquet expected Uint8Array value')
expect(() => writePlain(writer, [1, 2, 3], 'FIXED_LEN_BYTE_ARRAY'))
expect(() => writePlain(writer, [1, 2, 3], 'FIXED_LEN_BYTE_ARRAY', undefined))
.toThrow('parquet FIXED_LEN_BYTE_ARRAY expected type_length')
expect(() => writePlain(writer, [1, 2, 3], 'FIXED_LEN_BYTE_ARRAY', 16))
.toThrow('parquet expected Uint8Array value')
})
})

@ -140,6 +140,31 @@ describe('parquetWriteBuffer', () => {
])
})
it('serializes time types', async () => {
const result = await roundTripDeserialize([
{
name: 'time32',
data: [100000, 200000, 300000],
logical_type: { type: 'TIME', isAdjustedToUTC: false, unit: 'MILLIS' },
},
{
name: 'time64',
data: [100000000n, 200000000n, 300000000n],
logical_type: { type: 'TIME', isAdjustedToUTC: false, unit: 'MICROS' },
},
{
name: 'interval',
data: [1000000000n, 2000000000n, 3000000000n],
logical_type: { type: 'INTERVAL' },
},
])
expect(result).toEqual([
{ time32: 100000, time64: 100000000n, interval: 1000000000n },
{ time32: 200000, time64: 200000000n, interval: 2000000000n },
{ time32: 300000, time64: 300000000n, interval: 3000000000n },
])
})
it('serializes byte array types', async () => {
const result = await roundTripDeserialize([{
name: 'bytes',
@ -153,6 +178,41 @@ describe('parquetWriteBuffer', () => {
])
})
it('serializes uuid types', async () => {
const result = await roundTripDeserialize([
{
name: 'uuid',
data: [
new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]),
new Uint8Array([17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]),
],
type: 'FIXED_LEN_BYTE_ARRAY',
type_length: 16,
logical_type: { type: 'UUID' },
},
{
name: 'string',
data: [
'00000000-0000-0000-0000-000000000001',
'00010002-0003-0004-0005-000600070008',
],
type: 'FIXED_LEN_BYTE_ARRAY',
type_length: 16,
logical_type: { type: 'UUID' },
},
])
expect(result).toEqual([
{
uuid: new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]),
string: new Uint8Array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]),
},
{
uuid: new Uint8Array([17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]),
string: new Uint8Array([0, 1, 0, 2, 0, 3, 0, 4, 0, 5, 0, 6, 0, 7, 0, 8]),
},
])
})
it('serializes empty column', async () => {
const result = await roundTripDeserialize([{
name: 'empty',
@ -217,6 +277,20 @@ describe('parquetWriteBuffer', () => {
.toThrow('parquet expected number value')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'int', data: [1, 2, 3], type: 'BYTE_ARRAY' }] }))
.toThrow('parquet expected Uint8Array value')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'float16', data: [1, 2, 3], type: 'FIXED_LEN_BYTE_ARRAY' }] }))
.toThrow('parquet FIXED_LEN_BYTE_ARRAY expected type_length')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'float16', data: [1, 2, 3], type: 'FIXED_LEN_BYTE_ARRAY', type_length: 4 }] }))
.toThrow('parquet expected Uint8Array value')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'float16', data: [1, 2, 3], type: 'FIXED_LEN_BYTE_ARRAY', type_length: 4, logical_type: { type: 'FLOAT16' } }] }))
.toThrow('FLOAT16 expected type_length to be 2 bytes')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'uuid', data: [new Uint8Array(4)], type: 'FIXED_LEN_BYTE_ARRAY', logical_type: { type: 'UUID' } }] }))
.toThrow('UUID expected type_length to be 16 bytes')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'uuid', data: [new Uint8Array(4)], type: 'FIXED_LEN_BYTE_ARRAY', type_length: 16, logical_type: { type: 'UUID' } }] }))
.toThrow('parquet expected Uint8Array of length 16')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'uuid', data: [new Uint8Array(16)], type: 'FIXED_LEN_BYTE_ARRAY', type_length: 4, logical_type: { type: 'UUID' } }] }))
.toThrow('UUID expected type_length to be 16 bytes')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'uuid', data: ['0000'], type: 'FIXED_LEN_BYTE_ARRAY', logical_type: { type: 'UUID' } }] }))
.toThrow('UUID expected type_length to be 16 bytes')
})
it('throws for empty column with no type specified', () => {