diff --git a/README.md b/README.md index cda39ed..cd50f63 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,21 @@ # Hyparquet Writer [![mit license](https://img.shields.io/badge/License-MIT-orange.svg)](https://opensource.org/licenses/MIT) +![coverage](https://img.shields.io/badge/Coverage-96-darkred) + +## Usage + +```javascript +import { writeParquet } from 'hyparquet-writer' + +const arrayBuffer = writeParquet({ + name: ['Alice', 'Bob', 'Charlie'], + age: [25, 30, 35], +}) +``` + +## References + + - https://github.com/hyparam/hyparquet + - https://github.com/apache/parquet-format + - https://github.com/apache/parquet-testing diff --git a/src/column.js b/src/column.js index 54dbe84..a496a83 100644 --- a/src/column.js +++ b/src/column.js @@ -5,17 +5,16 @@ import { serializeTCompactProtocol } from './thrift.js' import { Writer } from './writer.js' /** - * @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType} from 'hyparquet/src/types.js' + * @import {ColumnMetaData, DecodedArray, FieldRepetitionType, PageHeader, ParquetType, SchemaElement} from 'hyparquet/src/types.js' * @param {Writer} writer - * @param {string} columnName + * @param {SchemaElement[]} schemaPath schema path for the column * @param {DecodedArray} values * @param {ParquetType} type * @returns {ColumnMetaData} */ -export function writeColumn(writer, columnName, values, type) { - // Get data stats - const num_nulls = values.filter(v => v === null).length +export function writeColumn(writer, schemaPath, values, type) { const offsetStart = writer.offset + let num_nulls = 0 // Write page to temp buffer const page = new Writer() @@ -24,28 +23,39 @@ export function writeColumn(writer, columnName, values, type) { const encoding = 'PLAIN' // TODO: repetition levels - const maxRepetitionLevel = 0 + const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) let repetition_levels_byte_length = 0 if (maxRepetitionLevel) { repetition_levels_byte_length = writeRleBitPackedHybrid(page, []) } // TODO: definition levels - const maxDefinitionLevel = 0 + const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) let definition_levels_byte_length = 0 if (maxDefinitionLevel) { - definition_levels_byte_length = writeRleBitPackedHybrid(page, []) + const definitionLevels = [] + for (const value of values) { + if (value === null || value === undefined) { + definitionLevels.push(maxDefinitionLevel - 1) + num_nulls++ + } else { + definitionLevels.push(maxDefinitionLevel) + } + } + definition_levels_byte_length = writeRleBitPackedHybrid(page, definitionLevels) } - // write page data (TODO: compressed) - const { uncompressed_page_size, compressed_page_size } = writePageData(page, values, type) + // write page data + writePageData(page, values, type) + + // TODO: compress page data // write page header /** @type {PageHeader} */ const header = { type: 'DATA_PAGE_V2', - uncompressed_page_size, - compressed_page_size, + uncompressed_page_size: page.offset, + compressed_page_size: page.offset, data_page_header_v2: { num_values: values.length, num_nulls, @@ -64,7 +74,7 @@ export function writeColumn(writer, columnName, values, type) { return { type, encodings: ['PLAIN'], - path_in_schema: [columnName], + path_in_schema: schemaPath.slice(1).map(s => s.name), codec: 'UNCOMPRESSED', num_values: BigInt(values.length), total_compressed_size: BigInt(writer.offset - offsetStart), @@ -74,18 +84,50 @@ export function writeColumn(writer, columnName, values, type) { } /** - * Deduce a ParquetType from the JS value + * Deduce a ParquetType from JS values * * @param {DecodedArray} values - * @returns {ParquetType} + * @returns {{ type: ParquetType, repetition_type: 'REQUIRED' | 'OPTIONAL' }} */ export function getParquetTypeForValues(values) { - if (values.every(v => typeof v === 'boolean')) return 'BOOLEAN' - if (values.every(v => typeof v === 'bigint')) return 'INT64' - if (values.every(v => Number.isInteger(v))) return 'INT32' - if (values.every(v => typeof v === 'number')) return 'DOUBLE' - if (values.every(v => typeof v === 'string')) return 'BYTE_ARRAY' - throw new Error(`Cannot determine parquet type for: ${values}`) + if (values instanceof Int32Array) return { type: 'INT32', repetition_type: 'REQUIRED' } + if (values instanceof BigInt64Array) return { type: 'INT64', repetition_type: 'REQUIRED' } + if (values instanceof Float32Array) return { type: 'FLOAT', repetition_type: 'REQUIRED' } + if (values instanceof Float64Array) return { type: 'DOUBLE', repetition_type: 'REQUIRED' } + /** @type {ParquetType | undefined} */ + let type = undefined + /** @type {FieldRepetitionType} */ + let repetition_type = 'REQUIRED' + for (const value of values) { + const valueType = getParquetTypeForValue(value) + if (!valueType) { + repetition_type = 'OPTIONAL' + } else if (type === undefined) { + type = valueType + } else if (type === 'INT32' && valueType === 'DOUBLE') { + type = 'DOUBLE' + } else if (type === 'DOUBLE' && valueType === 'INT32') { + // keep + } else if (type !== valueType) { + throw new Error(`parquet cannot write mixed types: ${type} and ${valueType}`) + } + } + if (!type) throw new Error('parquetWrite: empty column cannot determine type') + return { type, repetition_type } +} + +/** + * @param {any} value + * @returns {ParquetType | undefined} + */ +function getParquetTypeForValue(value) { + if (value === null || value === undefined) return undefined + if (value === true || value === false) return 'BOOLEAN' + if (typeof value === 'bigint') return 'INT64' + if (Number.isInteger(value)) return 'INT32' + if (typeof value === 'number') return 'DOUBLE' + if (typeof value === 'string') return 'BYTE_ARRAY' + throw new Error(`Cannot determine parquet type for: ${value}`) } /** @@ -114,13 +156,40 @@ function writePageHeader(writer, header) { * @param {Writer} writer * @param {DecodedArray} values * @param {ParquetType} type - * @returns {{ uncompressed_page_size: number, compressed_page_size: number }} */ function writePageData(writer, values, type) { // write plain data - const startOffset = writer.offset writePlain(writer, values, type) - const size = writer.offset - startOffset - - return { uncompressed_page_size: size, compressed_page_size: size } +} + +/** + * Get the max repetition level for a given schema path. + * + * @param {SchemaElement[]} schemaPath + * @returns {number} max repetition level + */ +function getMaxRepetitionLevel(schemaPath) { + let maxLevel = 0 + for (const element of schemaPath) { + if (element.repetition_type === 'REPEATED') { + maxLevel++ + } + } + return maxLevel +} + +/** + * Get the max definition level for a given schema path. + * + * @param {SchemaElement[]} schemaPath + * @returns {number} max definition level + */ +function getMaxDefinitionLevel(schemaPath) { + let maxLevel = 0 + for (const element of schemaPath.slice(1)) { + if (element.repetition_type !== 'REQUIRED') { + maxLevel++ + } + } + return maxLevel } diff --git a/src/write.js b/src/write.js index a0ade0b..df324a8 100644 --- a/src/write.js +++ b/src/write.js @@ -5,7 +5,7 @@ import { writeMetadata } from './metadata.js' /** * Write data as parquet to an ArrayBuffer * - * @import {ColumnChunk, DecodedArray, FileMetaData, SchemaElement} from 'hyparquet' + * @import {ColumnChunk, DecodedArray, FileMetaData, SchemaElement, SchemaTree} from 'hyparquet' * @param {Record} columnData * @returns {ArrayBuffer} */ @@ -29,7 +29,6 @@ export function parquetWrite(columnData) { const schema = [{ name: 'root', num_children: columnNames.length, - repetition_type: 'REQUIRED', }] // row group columns @@ -39,10 +38,15 @@ export function parquetWrite(columnData) { // Write columns for (const name of columnNames) { const values = columnData[name] - const type = getParquetTypeForValues(values) + const { type, repetition_type } = getParquetTypeForValues(values) + if (!type) throw new Error(`parquetWrite: empty column ${name} cannot determine type`) const file_offset = BigInt(writer.offset) - const meta_data = writeColumn(writer, name, values, type) - const repetition_type = 'REQUIRED' + /** @type {SchemaElement[]} */ + const schemaElements = [ + schema[0], + { type, name, repetition_type, num_children: 0 }, + ] + const meta_data = writeColumn(writer, schemaElements, values, type) // save metadata schema.push({ type, name, repetition_type }) diff --git a/test/metadata.test.js b/test/metadata.test.js index 9b70ae1..582c1d5 100644 --- a/test/metadata.test.js +++ b/test/metadata.test.js @@ -11,12 +11,13 @@ export const exampleMetadata = { version: 2, created_by: 'hyparquet', schema: [ - { name: 'root', num_children: 5, repetition_type: 'REQUIRED' }, + { name: 'root', num_children: 6 }, { name: 'bool', type: 'BOOLEAN', repetition_type: 'REQUIRED' }, { name: 'int', type: 'INT32', repetition_type: 'REQUIRED' }, { name: 'bigint', type: 'INT64', repetition_type: 'REQUIRED' }, { name: 'double', type: 'DOUBLE', repetition_type: 'REQUIRED' }, { name: 'string', type: 'BYTE_ARRAY', repetition_type: 'REQUIRED' }, + { name: 'nullable', type: 'BOOLEAN', repetition_type: 'OPTIONAL' }, ], num_rows: 4n, row_groups: [{ @@ -91,11 +92,25 @@ export const exampleMetadata = { data_page_offset: 173n, }, }, + { + file_path: 'nullable', + file_offset: 215n, + meta_data: { + type: 'BOOLEAN', + encodings: ['PLAIN'], + path_in_schema: ['nullable'], + codec: 'UNCOMPRESSED', + num_values: 4n, + total_uncompressed_size: 25n, + total_compressed_size: 25n, + data_page_offset: 215n, + }, + }, ], - total_byte_size: 211n, + total_byte_size: 236n, num_rows: 4n, }], - metadata_length: 280, + metadata_length: 336, } describe('writeMetadata', () => { diff --git a/test/write.test.js b/test/write.test.js index 44f7b9d..a031a83 100644 --- a/test/write.test.js +++ b/test/write.test.js @@ -20,6 +20,7 @@ const data = { bigint: [0n, 127n, 0x7fffn, 0x7fffffffffffffffn], // INT64 double: [0, 0.0001, 123.456, 1e100], // DOUBLE string: ['a', 'b', 'c', 'd'], // BYTE_ARRAY + nullable: [true, false, null, null], // BOOLEAN nullable } describe('parquetWrite', () => { @@ -32,10 +33,10 @@ describe('parquetWrite', () => { it('serializes basic types correctly', async () => { const result = await roundTripDeserialize(data) expect(result).toEqual([ - { bool: true, int: 0, bigint: 0n, double: 0, string: 'a' }, - { bool: false, int: 127, bigint: 127n, double: 0.0001, string: 'b' }, - { bool: true, int: 0x7fff, bigint: 0x7fffn, double: 123.456, string: 'c' }, - { bool: false, int: 0x7fffffff, bigint: 0x7fffffffffffffffn, double: 1e100, string: 'd' }, + { bool: true, int: 0, bigint: 0n, double: 0, string: 'a', nullable: true }, + { bool: false, int: 127, bigint: 127n, double: 0.0001, string: 'b', nullable: false }, + { bool: true, int: 0x7fff, bigint: 0x7fffn, double: 123.456, string: 'c', nullable: null }, + { bool: false, int: 0x7fffffff, bigint: 0x7fffffffffffffffn, double: 1e100, string: 'd', nullable: null }, ]) }) })