diff --git a/src/column.js b/src/column.js index a496a83..b579f51 100644 --- a/src/column.js +++ b/src/column.js @@ -1,21 +1,28 @@ import { Encoding, PageType } from 'hyparquet/src/constants.js' +import { unconvert } from './convert.js' import { writeRleBitPackedHybrid } from './encoding.js' import { writePlain } from './plain.js' import { serializeTCompactProtocol } from './thrift.js' import { Writer } from './writer.js' +import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' /** - * @import {ColumnMetaData, DecodedArray, FieldRepetitionType, PageHeader, ParquetType, SchemaElement} from 'hyparquet/src/types.js' + * @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType, SchemaElement} from 'hyparquet' * @param {Writer} writer - * @param {SchemaElement[]} schemaPath schema path for the column + * @param {SchemaElement[]} schemaPath * @param {DecodedArray} values - * @param {ParquetType} type * @returns {ColumnMetaData} */ -export function writeColumn(writer, schemaPath, values, type) { +export function writeColumn(writer, schemaPath, values) { + const schemaElement = schemaPath[schemaPath.length - 1] + const { type } = schemaElement + if (!type) throw new Error(`column ${schemaElement.name} cannot determine type`) const offsetStart = writer.offset let num_nulls = 0 + // Unconvert type if necessary + values = unconvert(schemaElement, values) + // Write page to temp buffer const page = new Writer() @@ -29,7 +36,7 @@ export function writeColumn(writer, schemaPath, values, type) { repetition_levels_byte_length = writeRleBitPackedHybrid(page, []) } - // TODO: definition levels + // definition levels const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) let definition_levels_byte_length = 0 if (maxDefinitionLevel) { @@ -83,53 +90,6 @@ export function writeColumn(writer, schemaPath, values, type) { } } -/** - * Deduce a ParquetType from JS values - * - * @param {DecodedArray} values - * @returns {{ type: ParquetType, repetition_type: 'REQUIRED' | 'OPTIONAL' }} - */ -export function getParquetTypeForValues(values) { - if (values instanceof Int32Array) return { type: 'INT32', repetition_type: 'REQUIRED' } - if (values instanceof BigInt64Array) return { type: 'INT64', repetition_type: 'REQUIRED' } - if (values instanceof Float32Array) return { type: 'FLOAT', repetition_type: 'REQUIRED' } - if (values instanceof Float64Array) return { type: 'DOUBLE', repetition_type: 'REQUIRED' } - /** @type {ParquetType | undefined} */ - let type = undefined - /** @type {FieldRepetitionType} */ - let repetition_type = 'REQUIRED' - for (const value of values) { - const valueType = getParquetTypeForValue(value) - if (!valueType) { - repetition_type = 'OPTIONAL' - } else if (type === undefined) { - type = valueType - } else if (type === 'INT32' && valueType === 'DOUBLE') { - type = 'DOUBLE' - } else if (type === 'DOUBLE' && valueType === 'INT32') { - // keep - } else if (type !== valueType) { - throw new Error(`parquet cannot write mixed types: ${type} and ${valueType}`) - } - } - if (!type) throw new Error('parquetWrite: empty column cannot determine type') - return { type, repetition_type } -} - -/** - * @param {any} value - * @returns {ParquetType | undefined} - */ -function getParquetTypeForValue(value) { - if (value === null || value === undefined) return undefined - if (value === true || value === false) return 'BOOLEAN' - if (typeof value === 'bigint') return 'INT64' - if (Number.isInteger(value)) return 'INT32' - if (typeof value === 'number') return 'DOUBLE' - if (typeof value === 'string') return 'BYTE_ARRAY' - throw new Error(`Cannot determine parquet type for: ${value}`) -} - /** * @param {Writer} writer * @param {PageHeader} header @@ -161,35 +121,3 @@ function writePageData(writer, values, type) { // write plain data writePlain(writer, values, type) } - -/** - * Get the max repetition level for a given schema path. - * - * @param {SchemaElement[]} schemaPath - * @returns {number} max repetition level - */ -function getMaxRepetitionLevel(schemaPath) { - let maxLevel = 0 - for (const element of schemaPath) { - if (element.repetition_type === 'REPEATED') { - maxLevel++ - } - } - return maxLevel -} - -/** - * Get the max definition level for a given schema path. - * - * @param {SchemaElement[]} schemaPath - * @returns {number} max definition level - */ -function getMaxDefinitionLevel(schemaPath) { - let maxLevel = 0 - for (const element of schemaPath.slice(1)) { - if (element.repetition_type !== 'REQUIRED') { - maxLevel++ - } - } - return maxLevel -} diff --git a/src/convert.js b/src/convert.js new file mode 100644 index 0000000..74e4787 --- /dev/null +++ b/src/convert.js @@ -0,0 +1,21 @@ + +/** + * Convert from rich to primitive types. + * + * @import {DecodedArray, SchemaElement} from 'hyparquet' + * @param {SchemaElement} schemaElement + * @param {DecodedArray} values + * @returns {DecodedArray} + */ +export function unconvert(schemaElement, values) { + const ctype = schemaElement.converted_type + if (ctype === 'DATE') { + return values.map(v => v.getTime()) + } + if (ctype === 'JSON') { + const encoder = new TextEncoder() + if (!Array.isArray(values)) throw new Error('JSON must be an array') + return values.map(v => encoder.encode(JSON.stringify(v))) + } + return values +} diff --git a/src/plain.js b/src/plain.js index de179a5..c8be649 100644 --- a/src/plain.js +++ b/src/plain.js @@ -84,8 +84,9 @@ function writePlainDouble(writer, values) { * @param {DecodedArray} values */ function writePlainByteArray(writer, values) { + const encoder = new TextEncoder() for (const value of values) { - const bytes = new TextEncoder().encode(value) + const bytes = typeof value === 'string' ? encoder.encode(value) : value writer.appendUint32(bytes.length) writer.appendBytes(bytes) } diff --git a/src/schema.js b/src/schema.js new file mode 100644 index 0000000..bc68035 --- /dev/null +++ b/src/schema.js @@ -0,0 +1,92 @@ + +/** + * Deduce a ParquetType from JS values + * + * @import {ConvertedType, DecodedArray, FieldRepetitionType, ParquetType, SchemaElement} from 'hyparquet' + * @param {string} name + * @param {DecodedArray} values + * @returns {SchemaElement} + */ +export function getSchemaElementForValues(name, values) { + if (values instanceof Int32Array) return { name, type: 'INT32', repetition_type: 'REQUIRED' } + if (values instanceof BigInt64Array) return { name, type: 'INT64', repetition_type: 'REQUIRED' } + if (values instanceof Float32Array) return { name, type: 'FLOAT', repetition_type: 'REQUIRED' } + if (values instanceof Float64Array) return { name, type: 'DOUBLE', repetition_type: 'REQUIRED' } + /** @type {ParquetType | undefined} */ + let type = undefined + /** @type {FieldRepetitionType} */ + let repetition_type = 'REQUIRED' + /** @type {ConvertedType | undefined} */ + let converted_type = undefined + for (const value of values) { + if (value === null || value === undefined) { + repetition_type = 'OPTIONAL' + } else { + // value is defined + /** @type {ParquetType | undefined} */ + let valueType = undefined + if (value === true || value === false) valueType = 'BOOLEAN' + else if (typeof value === 'bigint') valueType = 'INT64' + else if (Number.isInteger(value)) valueType = 'INT32' + else if (typeof value === 'number') valueType = 'DOUBLE' + else if (typeof value === 'string') valueType = 'BYTE_ARRAY' + else if (value instanceof Date) { + valueType = 'INT64' + // make sure they are all dates + if (type && !converted_type) throw new Error('mixed types not supported') + converted_type = 'TIMESTAMP_MILLIS' + } + else if (typeof value === 'object') { + // use json (TODO: native list and object types) + converted_type = 'JSON' + valueType = 'BYTE_ARRAY' + } + else if (!valueType) throw new Error(`Cannot determine parquet type for: ${value}`) + + // expand type if necessary + if (type === undefined) { + type = valueType + } else if (type === 'INT32' && valueType === 'DOUBLE') { + type = 'DOUBLE' + } else if (type === 'DOUBLE' && valueType === 'INT32') { + // keep + } else if (type !== valueType) { + throw new Error(`parquet cannot write mixed types: ${type} and ${valueType}`) + } + } + } + if (!type) throw new Error(`column ${name} cannot determine type`) + return { name, type, repetition_type, converted_type } +} + +/** + * Get the max repetition level for a given schema path. + * + * @param {SchemaElement[]} schemaPath + * @returns {number} max repetition level + */ +export function getMaxRepetitionLevel(schemaPath) { + let maxLevel = 0 + for (const element of schemaPath) { + if (element.repetition_type === 'REPEATED') { + maxLevel++ + } + } + return maxLevel +} + +/** + * Get the max definition level for a given schema path. + * + * @param {SchemaElement[]} schemaPath + * @returns {number} max definition level + */ +export function getMaxDefinitionLevel(schemaPath) { + let maxLevel = 0 + for (const element of schemaPath.slice(1)) { + if (element.repetition_type !== 'REQUIRED') { + maxLevel++ + } + } + return maxLevel +} diff --git a/src/write.js b/src/write.js index df324a8..93e5587 100644 --- a/src/write.js +++ b/src/write.js @@ -1,6 +1,7 @@ -import { getParquetTypeForValues, writeColumn } from './column.js' +import { writeColumn } from './column.js' import { Writer } from './writer.js' import { writeMetadata } from './metadata.js' +import { getSchemaElementForValues } from './schema.js' /** * Write data as parquet to an ArrayBuffer @@ -38,18 +39,18 @@ export function parquetWrite(columnData) { // Write columns for (const name of columnNames) { const values = columnData[name] - const { type, repetition_type } = getParquetTypeForValues(values) - if (!type) throw new Error(`parquetWrite: empty column ${name} cannot determine type`) + const schemaElement = getSchemaElementForValues(name, values) + if (!schemaElement.type) throw new Error(`column ${name} cannot determine type`) const file_offset = BigInt(writer.offset) /** @type {SchemaElement[]} */ - const schemaElements = [ + const schemaPath = [ schema[0], - { type, name, repetition_type, num_children: 0 }, + schemaElement, ] - const meta_data = writeColumn(writer, schemaElements, values, type) + const meta_data = writeColumn(writer, schemaPath, values) // save metadata - schema.push({ type, name, repetition_type }) + schema.push(schemaElement) columns.push({ file_path: name, file_offset, diff --git a/test/write.test.js b/test/write.test.js index 59d98ee..1afd009 100644 --- a/test/write.test.js +++ b/test/write.test.js @@ -30,7 +30,7 @@ describe('parquetWrite', () => { expect(metadata).toEqual(exampleMetadata) }) - it('serializes basic types correctly', async () => { + it('serializes basic types', async () => { const result = await roundTripDeserialize(data) expect(result).toEqual([ { bool: true, int: 0, bigint: 0n, double: 0, string: 'a', nullable: true }, @@ -51,4 +51,45 @@ describe('parquetWrite', () => { const metadata = parquetMetadata(buffer) expect(metadata.metadata_length).toBe(89) }) + + it('serializes list types', async () => { + const result = await roundTripDeserialize({ + list: [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]], + }) + expect(result).toEqual([ + { list: [1, 2, 3] }, + { list: [4, 5, 6] }, + { list: [7, 8, 9] }, + { list: [10, 11, 12] }, + ]) + }) + + it('serializes object types', async () => { + const result = await roundTripDeserialize({ + obj: [{ a: 1, b: 2 }, { a: 3, b: 4 }, { a: 5, b: 6 }, { a: 7, b: 8 }], + }) + expect(result).toEqual([ + { obj: { a: 1, b: 2 } }, + { obj: { a: 3, b: 4 } }, + { obj: { a: 5, b: 6 } }, + { obj: { a: 7, b: 8 } }, + ]) + }) + + it('serializes date types', async () => { + const result = await roundTripDeserialize({ + date: [new Date(0), new Date(100000), new Date(200000), new Date(300000)], + }) + expect(result).toEqual([ + { date: new Date(0) }, + { date: new Date(100000) }, + { date: new Date(200000) }, + { date: new Date(300000) }, + ]) + }) + + it('throws for mixed types', () => { + expect(() => parquetWrite({ mixed: [1, 2, 3, 'boom'] })) + .toThrow('parquet cannot write mixed types: INT32 and BYTE_ARRAY') + }) })