diff --git a/src/column.js b/src/column.js index 9de7b66..8006fd1 100644 --- a/src/column.js +++ b/src/column.js @@ -14,7 +14,7 @@ import { writeDataPageV2, writePageHeader } from './datapage.js' */ export function writeColumn(writer, schemaPath, values, compressed, stats) { const schemaElement = schemaPath[schemaPath.length - 1] - const { type } = schemaElement + const { type, type_length } = schemaElement if (!type) throw new Error(`column ${schemaElement.name} cannot determine type`) const offsetStart = writer.offset const num_values = values.length @@ -42,7 +42,7 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) { // write unconverted dictionary page const unconverted = unconvert(schemaElement, dictionary) - writeDictionaryPage(writer, unconverted, type, compressed) + writeDictionaryPage(writer, unconverted, type, type_length, compressed) // write data page with dictionary indexes data_page_offset = BigInt(writer.offset) @@ -92,11 +92,12 @@ function useDictionary(values, type) { * @param {Writer} writer * @param {DecodedArray} dictionary * @param {ParquetType} type + * @param {number | undefined} fixedLength * @param {boolean} compressed */ -function writeDictionaryPage(writer, dictionary, type, compressed) { +function writeDictionaryPage(writer, dictionary, type, fixedLength, compressed) { const dictionaryPage = new ByteWriter() - writePlain(dictionaryPage, dictionary, type) + writePlain(dictionaryPage, dictionary, type, fixedLength) // compress dictionary page data let compressedDictionaryPage = dictionaryPage diff --git a/src/datapage.js b/src/datapage.js index f4246f8..df4883e 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -16,7 +16,7 @@ import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' * @param {boolean} compressed */ export function writeDataPageV2(writer, values, type, schemaPath, encoding, compressed) { - const num_values = values.length + const fixedLength = schemaPath.at(-1)?.type_length // write levels to temp buffer const levels = new ByteWriter() @@ -39,7 +39,7 @@ export function writeDataPageV2(writer, values, type, schemaPath, encoding, comp page.appendUint8(bitWidth) // prepend bitWidth writeRleBitPackedHybrid(page, nonnull, bitWidth) } else { - writePlain(page, nonnull, type) + writePlain(page, nonnull, type, fixedLength) } // compress page data @@ -55,9 +55,9 @@ export function writeDataPageV2(writer, values, type, schemaPath, encoding, comp uncompressed_page_size: levels.offset + page.offset, compressed_page_size: levels.offset + compressedPage.offset, data_page_header_v2: { - num_values, + num_values: values.length, num_nulls, - num_rows: num_values, + num_rows: values.length, encoding, definition_levels_byte_length, repetition_levels_byte_length, diff --git a/src/plain.js b/src/plain.js index ce2a367..680f337 100644 --- a/src/plain.js +++ b/src/plain.js @@ -5,8 +5,9 @@ * @param {Writer} writer * @param {DecodedArray} values * @param {ParquetType} type + * @param {number | undefined} fixedLength */ -export function writePlain(writer, values, type) { +export function writePlain(writer, values, type, fixedLength) { if (type === 'BOOLEAN') { writePlainBoolean(writer, values) } else if (type === 'INT32') { @@ -20,7 +21,8 @@ export function writePlain(writer, values, type) { } else if (type === 'BYTE_ARRAY') { writePlainByteArray(writer, values) } else if (type === 'FIXED_LEN_BYTE_ARRAY') { - writePlainByteArrayFixed(writer, values) + if (!fixedLength) throw new Error('parquet FIXED_LEN_BYTE_ARRAY expected type_length') + writePlainByteArrayFixed(writer, values, fixedLength) } else { throw new Error(`parquet unsupported type: ${type}`) } @@ -120,10 +122,12 @@ function writePlainByteArray(writer, values) { /** * @param {Writer} writer * @param {DecodedArray} values + * @param {number} fixedLength */ -function writePlainByteArrayFixed(writer, values) { +function writePlainByteArrayFixed(writer, values, fixedLength) { for (const value of values) { if (!(value instanceof Uint8Array)) throw new Error('parquet expected Uint8Array value') + if (value.length !== fixedLength) throw new Error(`parquet expected Uint8Array of length ${fixedLength}`) writer.appendBytes(value) } } diff --git a/src/unconvert.js b/src/unconvert.js index b0ef3ed..c0c6bf0 100644 --- a/src/unconvert.js +++ b/src/unconvert.js @@ -9,7 +9,7 @@ const dayMillis = 86400000 // 1 day in milliseconds * @returns {DecodedArray} */ export function unconvert(element, values) { - const { converted_type: ctype, logical_type: ltype } = element + const { type, converted_type: ctype, logical_type: ltype } = element if (ctype === 'DECIMAL') { const factor = 10 ** (element.scale || 0) return values.map(v => { @@ -32,17 +32,47 @@ export function unconvert(element, values) { const encoder = new TextEncoder() return values.map(v => encoder.encode(JSON.stringify(v))) } - if (ltype?.type === 'FLOAT16') { - return Array.from(values).map(unconvertFloat16) - } if (ctype === 'UTF8') { if (!Array.isArray(values)) throw new Error('strings must be an array') const encoder = new TextEncoder() return values.map(v => encoder.encode(v)) } + if (ltype?.type === 'FLOAT16') { + if (type !== 'FIXED_LEN_BYTE_ARRAY') throw new Error('FLOAT16 must be FIXED_LEN_BYTE_ARRAY type') + if (element.type_length !== 2) throw new Error('FLOAT16 expected type_length to be 2 bytes') + return Array.from(values).map(unconvertFloat16) + } + if (ltype?.type === 'UUID') { + if (!Array.isArray(values)) throw new Error('UUID must be an array') + if (type !== 'FIXED_LEN_BYTE_ARRAY') throw new Error('UUID must be FIXED_LEN_BYTE_ARRAY type') + if (element.type_length !== 16) throw new Error('UUID expected type_length to be 16 bytes') + return values.map(unconvertUuid) + } return values } +/** + * @param {Uint8Array | string | undefined} value + * @returns {Uint8Array | undefined} + */ +function unconvertUuid(value) { + if (value === undefined || value === null) return + if (value instanceof Uint8Array) return value + if (typeof value === 'string') { + const uuidRegex = /^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$/i + if (!uuidRegex.test(value)) { + throw new Error('UUID must be a valid UUID string') + } + value = value.replace(/-/g, '').toLowerCase() + const bytes = new Uint8Array(16) + for (let i = 0; i < 16; i++) { + bytes[i] = parseInt(value.slice(i * 2, i * 2 + 2), 16) + } + return bytes + } + throw new Error('UUID must be a string or Uint8Array') +} + /** * Uncovert from rich type to byte array for metadata statistics. * diff --git a/test/plain.test.js b/test/plain.test.js index c627fc7..9597fbf 100644 --- a/test/plain.test.js +++ b/test/plain.test.js @@ -6,7 +6,7 @@ describe('writePlain', () => { it('writes BOOLEAN (multiple of 8 bits, plus leftover)', () => { const writer = new ByteWriter() const booleans = [true, false, true, true, false, false, false, true, true] - writePlain(writer, booleans, 'BOOLEAN') + writePlain(writer, booleans, 'BOOLEAN', undefined) expect(writer.offset).toBe(2) expect(writer.view.getUint8(0)).toBe(0b10001101) @@ -16,7 +16,7 @@ describe('writePlain', () => { it('writes INT32', () => { const writer = new ByteWriter() const ints = [0, 1, 255, 256, 65535, -1, -2147483648, 2147483647] - writePlain(writer, ints, 'INT32') + writePlain(writer, ints, 'INT32', undefined) // 4 bytes per int expect(writer.offset).toBe(4 * ints.length) @@ -30,7 +30,7 @@ describe('writePlain', () => { it('writes INT64', () => { const writer = new ByteWriter() const bigints = [0n, 1n, 42n, BigInt(2 ** 53 - 1)] - writePlain(writer, bigints, 'INT64') + writePlain(writer, bigints, 'INT64', undefined) // 8 bytes per int64 expect(writer.offset).toBe(8 * bigints.length) @@ -44,7 +44,7 @@ describe('writePlain', () => { it('writes FLOAT', () => { const writer = new ByteWriter() const floats = [0, 300.5, -2.7100000381469727, Infinity, -Infinity, NaN] - writePlain(writer, floats, 'FLOAT') + writePlain(writer, floats, 'FLOAT', undefined) // 4 bytes per float expect(writer.offset).toBe(4 * floats.length) @@ -62,7 +62,7 @@ describe('writePlain', () => { it('writes DOUBLE', () => { const writer = new ByteWriter() const doubles = [0, 3.14, -2.71, Infinity, -Infinity, NaN] - writePlain(writer, doubles, 'DOUBLE') + writePlain(writer, doubles, 'DOUBLE', undefined) // 8 bytes per double expect(writer.offset).toBe(8 * doubles.length) @@ -80,7 +80,7 @@ describe('writePlain', () => { it('writes BYTE_ARRAY', () => { const writer = new ByteWriter() const strings = ['a', 'b', 'c', 'd'] - writePlain(writer, strings, 'BYTE_ARRAY') + writePlain(writer, strings, 'BYTE_ARRAY', undefined) let offset = 0 for (const s of strings) { @@ -100,7 +100,7 @@ describe('writePlain', () => { const encoder = new TextEncoder() const strings = ['abcd', 'efgh', 'ijkl'] .map(s => encoder.encode(s)) - writePlain(writer, strings, 'FIXED_LEN_BYTE_ARRAY') + writePlain(writer, strings, 'FIXED_LEN_BYTE_ARRAY', 4) let offset = 0 for (const s of strings) { @@ -113,25 +113,27 @@ describe('writePlain', () => { it('throws error on unsupported type', () => { const writer = new ByteWriter() - expect(() => writePlain(writer, [1, 2, 3], 'INT96')) + expect(() => writePlain(writer, [1, 2, 3], 'INT96', undefined)) .toThrow(/parquet unsupported type/i) }) it('throws error on type mismatch', () => { const writer = new ByteWriter() - expect(() => writePlain(writer, [1, 2, 3], 'BOOLEAN')) + expect(() => writePlain(writer, [1, 2, 3], 'BOOLEAN', undefined)) .toThrow('parquet expected boolean value') - expect(() => writePlain(writer, [1, 2, 3.5], 'INT32')) + expect(() => writePlain(writer, [1, 2, 3.5], 'INT32', undefined)) .toThrow('parquet expected integer value') - expect(() => writePlain(writer, [1n, 2n, 3], 'INT64')) + expect(() => writePlain(writer, [1n, 2n, 3], 'INT64', undefined)) .toThrow('parquet expected bigint value') - expect(() => writePlain(writer, [1, 2, 3n], 'FLOAT')) + expect(() => writePlain(writer, [1, 2, 3n], 'FLOAT', undefined)) .toThrow('parquet expected number value') - expect(() => writePlain(writer, [1, 2, 3n], 'DOUBLE')) + expect(() => writePlain(writer, [1, 2, 3n], 'DOUBLE', undefined)) .toThrow('parquet expected number value') - expect(() => writePlain(writer, [1, 2, 3], 'BYTE_ARRAY')) + expect(() => writePlain(writer, [1, 2, 3], 'BYTE_ARRAY', undefined)) .toThrow('parquet expected Uint8Array value') - expect(() => writePlain(writer, [1, 2, 3], 'FIXED_LEN_BYTE_ARRAY')) + expect(() => writePlain(writer, [1, 2, 3], 'FIXED_LEN_BYTE_ARRAY', undefined)) + .toThrow('parquet FIXED_LEN_BYTE_ARRAY expected type_length') + expect(() => writePlain(writer, [1, 2, 3], 'FIXED_LEN_BYTE_ARRAY', 16)) .toThrow('parquet expected Uint8Array value') }) }) diff --git a/test/write.buffer.test.js b/test/write.buffer.test.js index 24139cf..2a9d9d3 100644 --- a/test/write.buffer.test.js +++ b/test/write.buffer.test.js @@ -140,6 +140,31 @@ describe('parquetWriteBuffer', () => { ]) }) + it('serializes time types', async () => { + const result = await roundTripDeserialize([ + { + name: 'time32', + data: [100000, 200000, 300000], + logical_type: { type: 'TIME', isAdjustedToUTC: false, unit: 'MILLIS' }, + }, + { + name: 'time64', + data: [100000000n, 200000000n, 300000000n], + logical_type: { type: 'TIME', isAdjustedToUTC: false, unit: 'MICROS' }, + }, + { + name: 'interval', + data: [1000000000n, 2000000000n, 3000000000n], + logical_type: { type: 'INTERVAL' }, + }, + ]) + expect(result).toEqual([ + { time32: 100000, time64: 100000000n, interval: 1000000000n }, + { time32: 200000, time64: 200000000n, interval: 2000000000n }, + { time32: 300000, time64: 300000000n, interval: 3000000000n }, + ]) + }) + it('serializes byte array types', async () => { const result = await roundTripDeserialize([{ name: 'bytes', @@ -153,6 +178,41 @@ describe('parquetWriteBuffer', () => { ]) }) + it('serializes uuid types', async () => { + const result = await roundTripDeserialize([ + { + name: 'uuid', + data: [ + new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]), + new Uint8Array([17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]), + ], + type: 'FIXED_LEN_BYTE_ARRAY', + type_length: 16, + logical_type: { type: 'UUID' }, + }, + { + name: 'string', + data: [ + '00000000-0000-0000-0000-000000000001', + '00010002-0003-0004-0005-000600070008', + ], + type: 'FIXED_LEN_BYTE_ARRAY', + type_length: 16, + logical_type: { type: 'UUID' }, + }, + ]) + expect(result).toEqual([ + { + uuid: new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]), + string: new Uint8Array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]), + }, + { + uuid: new Uint8Array([17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]), + string: new Uint8Array([0, 1, 0, 2, 0, 3, 0, 4, 0, 5, 0, 6, 0, 7, 0, 8]), + }, + ]) + }) + it('serializes empty column', async () => { const result = await roundTripDeserialize([{ name: 'empty', @@ -217,6 +277,20 @@ describe('parquetWriteBuffer', () => { .toThrow('parquet expected number value') expect(() => parquetWriteBuffer({ columnData: [{ name: 'int', data: [1, 2, 3], type: 'BYTE_ARRAY' }] })) .toThrow('parquet expected Uint8Array value') + expect(() => parquetWriteBuffer({ columnData: [{ name: 'float16', data: [1, 2, 3], type: 'FIXED_LEN_BYTE_ARRAY' }] })) + .toThrow('parquet FIXED_LEN_BYTE_ARRAY expected type_length') + expect(() => parquetWriteBuffer({ columnData: [{ name: 'float16', data: [1, 2, 3], type: 'FIXED_LEN_BYTE_ARRAY', type_length: 4 }] })) + .toThrow('parquet expected Uint8Array value') + expect(() => parquetWriteBuffer({ columnData: [{ name: 'float16', data: [1, 2, 3], type: 'FIXED_LEN_BYTE_ARRAY', type_length: 4, logical_type: { type: 'FLOAT16' } }] })) + .toThrow('FLOAT16 expected type_length to be 2 bytes') + expect(() => parquetWriteBuffer({ columnData: [{ name: 'uuid', data: [new Uint8Array(4)], type: 'FIXED_LEN_BYTE_ARRAY', logical_type: { type: 'UUID' } }] })) + .toThrow('UUID expected type_length to be 16 bytes') + expect(() => parquetWriteBuffer({ columnData: [{ name: 'uuid', data: [new Uint8Array(4)], type: 'FIXED_LEN_BYTE_ARRAY', type_length: 16, logical_type: { type: 'UUID' } }] })) + .toThrow('parquet expected Uint8Array of length 16') + expect(() => parquetWriteBuffer({ columnData: [{ name: 'uuid', data: [new Uint8Array(16)], type: 'FIXED_LEN_BYTE_ARRAY', type_length: 4, logical_type: { type: 'UUID' } }] })) + .toThrow('UUID expected type_length to be 16 bytes') + expect(() => parquetWriteBuffer({ columnData: [{ name: 'uuid', data: ['0000'], type: 'FIXED_LEN_BYTE_ARRAY', logical_type: { type: 'UUID' } }] })) + .toThrow('UUID expected type_length to be 16 bytes') }) it('throws for empty column with no type specified', () => {