From fff0b1c6d988817f15e76084c35137b44aa88bdc Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Fri, 11 Apr 2025 01:41:56 -0700 Subject: [PATCH] Support more SchemaElement options --- .github/workflows/ci.yml | 6 +-- package.json | 5 ++- src/column.js | 17 ++++----- src/parquet-writer.js | 3 +- src/plain.js | 13 +++++-- src/schema.js | 79 +++++++++++++++++++-------------------- src/types.d.ts | 11 +++++- src/unconvert.js | 7 ++++ test/write.buffer.test.js | 4 +- test/write.file.test.js | 2 +- 10 files changed, 83 insertions(+), 64 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6eb5edb..c21f27b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,20 +7,20 @@ jobs: lint: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - run: npm i - run: npm run lint typecheck: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - run: npm i - run: npx tsc test: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - run: npm i - run: npm run coverage diff --git a/package.json b/package.json index 9296495..9c96ee0 100644 --- a/package.json +++ b/package.json @@ -5,8 +5,11 @@ "author": "Hyperparam", "homepage": "https://hyperparam.app", "keywords": [ + "ai", + "data", "hyperparam", "hyparquet", + "ml", "parquet", "snappy", "thrift" @@ -42,7 +45,7 @@ "test": "vitest run" }, "dependencies": { - "hyparquet": "1.11.0" + "hyparquet": "1.11.1" }, "devDependencies": { "@babel/eslint-parser": "7.27.0", diff --git a/src/column.js b/src/column.js index fc575e7..cef283e 100644 --- a/src/column.js +++ b/src/column.js @@ -29,25 +29,22 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) { // Compute statistics if (stats) { - statistics = { - min_value: undefined, - max_value: undefined, - null_count: 0n, - } + let min_value = undefined + let max_value = undefined let null_count = 0n for (const value of values) { if (value === null || value === undefined) { null_count++ continue } - if (statistics.min_value === undefined || value < statistics.min_value) { - statistics.min_value = value + if (min_value === undefined || value < min_value) { + min_value = value } - if (statistics.max_value === undefined || value > statistics.max_value) { - statistics.max_value = value + if (max_value === undefined || value > max_value) { + max_value = value } } - statistics.null_count = null_count + statistics = { min_value, max_value, null_count } } // Write levels to temp buffer diff --git a/src/parquet-writer.js b/src/parquet-writer.js index 819d536..ed198db 100644 --- a/src/parquet-writer.js +++ b/src/parquet-writer.js @@ -4,8 +4,7 @@ import { writeMetadata } from './metadata.js' /** * ParquetWriter class allows incremental writing of parquet files. * - * @import {ColumnChunk, FileMetaData, RowGroup, SchemaElement} from 'hyparquet' - * @import {KeyValue} from 'hyparquet/src/types.js' // TODO export from hyparquet + * @import {ColumnChunk, FileMetaData, KeyValue, RowGroup, SchemaElement} from 'hyparquet' * @import {ColumnData, Writer} from '../src/types.js' * @param {object} options * @param {Writer} options.writer diff --git a/src/plain.js b/src/plain.js index acaf441..d4776a7 100644 --- a/src/plain.js +++ b/src/plain.js @@ -19,6 +19,8 @@ export function writePlain(writer, values, type) { writePlainDouble(writer, values) } else if (type === 'BYTE_ARRAY') { writePlainByteArray(writer, values) + } else if (type === 'FIXED_LEN_BYTE_ARRAY') { + writePlainByteArray(writer, values) } else { throw new Error(`parquet unsupported type: ${type}`) } @@ -32,20 +34,21 @@ function writePlainBoolean(writer, values) { let currentByte = 0 for (let i = 0; i < values.length; i++) { + if (typeof values[i] !== 'boolean') throw new Error('parquet expected boolean value') const bitOffset = i % 8 if (values[i]) { currentByte |= 1 << bitOffset } - // Once we've packed 8 bits or are at a multiple of 8, we write out the byte + // once we've packed 8 bits or are at a multiple of 8, we write out the byte if (bitOffset === 7) { writer.appendUint8(currentByte) currentByte = 0 } } - // If the array length is not a multiple of 8, write the leftover bits + // if the array length is not a multiple of 8, write the leftover bits if (values.length % 8 !== 0) { writer.appendUint8(currentByte) } @@ -57,6 +60,7 @@ function writePlainBoolean(writer, values) { */ function writePlainInt32(writer, values) { for (const value of values) { + if (typeof value !== 'number') throw new Error('parquet expected number value') writer.appendInt32(value) } } @@ -67,6 +71,7 @@ function writePlainInt32(writer, values) { */ function writePlainInt64(writer, values) { for (const value of values) { + if (typeof value !== 'bigint') throw new Error('parquet expected bigint value') writer.appendInt64(value) } } @@ -77,6 +82,7 @@ function writePlainInt64(writer, values) { */ function writePlainFloat(writer, values) { for (const value of values) { + if (typeof value !== 'number') throw new Error('parquet expected number value') writer.appendFloat32(value) } } @@ -87,6 +93,7 @@ function writePlainFloat(writer, values) { */ function writePlainDouble(writer, values) { for (const value of values) { + if (typeof value !== 'number') throw new Error('parquet expected number value') writer.appendFloat64(value) } } @@ -97,7 +104,7 @@ function writePlainDouble(writer, values) { */ function writePlainByteArray(writer, values) { for (const value of values) { - if (!(value instanceof Uint8Array)) throw new Error('BYTE_ARRAY must be Uint8Array') + if (!(value instanceof Uint8Array)) throw new Error('parquet expected Uint8Array value') writer.appendUint32(value.length) writer.appendBytes(value) } diff --git a/src/schema.js b/src/schema.js index 279d8bb..bae468f 100644 --- a/src/schema.js +++ b/src/schema.js @@ -1,24 +1,56 @@ +/** + * Convert column data to schema. + * + * @param {ColumnData[]} columnData + * @returns {SchemaElement[]} + */ +export function schemaFromColumnData(columnData) { + /** @type {SchemaElement[]} */ + const schema = [{ + name: 'root', + num_children: columnData.length, + }] + let num_rows = 0 + + for (const column of columnData) { + // check if all columns have the same length + num_rows = num_rows || column.data.length + if (num_rows !== column.data.length) { + throw new Error('columns must have the same length') + } + + // auto-detect type if not provided + /** @type {SchemaElement} */ + const schemaElement = column.type ? column : autoSchemaElement(column.name, column.data) + if (!schemaElement.type) throw new Error(`column ${column.name} cannot determine type`) + schema.push(schemaElement) + } + + return schema +} /** * Deduce a ParquetType from JS values * * @import {ConvertedType, DecodedArray, FieldRepetitionType, ParquetType, SchemaElement} from 'hyparquet' + * @import {ColumnData} from '../src/types.js' * @param {string} name * @param {DecodedArray} values - * @param {ParquetType | undefined} type * @returns {SchemaElement} */ -export function getSchemaElementForValues(name, values, type) { - if (values instanceof Int32Array) return { name, type: 'INT32', repetition_type: 'REQUIRED' } - if (values instanceof BigInt64Array) return { name, type: 'INT64', repetition_type: 'REQUIRED' } - if (values instanceof Float32Array) return { name, type: 'FLOAT', repetition_type: 'REQUIRED' } - if (values instanceof Float64Array) return { name, type: 'DOUBLE', repetition_type: 'REQUIRED' } - +function autoSchemaElement(name, values) { + /** @type {ParquetType | undefined} */ + let type /** @type {FieldRepetitionType} */ let repetition_type = 'REQUIRED' /** @type {ConvertedType | undefined} */ let converted_type = undefined + if (values instanceof Int32Array) return { name, type: 'INT32', repetition_type } + if (values instanceof BigInt64Array) return { name, type: 'INT64', repetition_type } + if (values instanceof Float32Array) return { name, type: 'FLOAT', repetition_type } + if (values instanceof Float64Array) return { name, type: 'DOUBLE', repetition_type } + for (const value of values) { if (value === null || value === undefined) { repetition_type = 'OPTIONAL' @@ -55,10 +87,6 @@ export function getSchemaElementForValues(name, values, type) { type = valueType } else if (type === 'INT32' && valueType === 'DOUBLE') { type = 'DOUBLE' - } else if (type === 'FLOAT' && valueType === 'INT32') { - valueType = 'FLOAT' - } else if (type === 'FLOAT' && valueType === 'DOUBLE') { - valueType = 'FLOAT' } else if (type === 'DOUBLE' && valueType === 'INT32') { valueType = 'DOUBLE' } @@ -102,32 +130,3 @@ export function getMaxDefinitionLevel(schemaPath) { } return maxLevel } - -/** - * Convert column data to schema. - * - * @import {ColumnData} from '../src/types.js' - * @param {ColumnData[]} columnData - * @returns {SchemaElement[]} - */ -export function schemaFromColumnData(columnData) { - /** @type {SchemaElement[]} */ - const schema = [{ - name: 'root', - num_children: columnData.length, - }] - let num_rows = 0 - for (const { name, data, type } of columnData) { - // check if all columns have the same length - if (num_rows === 0) { - num_rows = data.length - } else if (num_rows !== data.length) { - throw new Error('columns must have the same length') - } - // auto-detect type - const schemaElement = getSchemaElementForValues(name, data, type) - if (!schemaElement.type) throw new Error(`column ${name} cannot determine type`) - schema.push(schemaElement) - } - return schema -} diff --git a/src/types.d.ts b/src/types.d.ts index 776da78..576ea1d 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -1,5 +1,4 @@ -import type { DecodedArray, ParquetType } from 'hyparquet' -import type { KeyValue } from 'hyparquet/src/types.js' // TODO export from hyparquet +import type { ConvertedType, DecodedArray, FieldRepetitionType, KeyValue, LogicalType, ParquetType } from 'hyparquet' export interface ParquetWriteOptions { writer: Writer @@ -13,7 +12,15 @@ export interface ParquetWriteOptions { export interface ColumnData { name: string data: DecodedArray + // fields from SchemaElement: type?: ParquetType + type_length?: number + repetition_type?: FieldRepetitionType + converted_type?: ConvertedType + scale?: number + precision?: number + field_id?: number + logical_type?: LogicalType } export interface Writer { diff --git a/src/unconvert.js b/src/unconvert.js index 51a356c..0a772ea 100644 --- a/src/unconvert.js +++ b/src/unconvert.js @@ -9,9 +9,16 @@ */ export function unconvert(schemaElement, values) { const ctype = schemaElement.converted_type + // TODO: DECIMAL if (ctype === 'DATE') { return values.map(v => v.getTime()) } + if (ctype === 'TIMESTAMP_MILLIS') { + return Array.from(values).map(v => BigInt(v.getTime())) + } + if (ctype === 'TIMESTAMP_MICROS') { + return Array.from(values).map(v => BigInt(v.getTime() * 1000)) + } if (ctype === 'JSON') { if (!Array.isArray(values)) throw new Error('JSON must be an array') const encoder = new TextEncoder() diff --git a/test/write.buffer.test.js b/test/write.buffer.test.js index 68ea11c..6d64c0d 100644 --- a/test/write.buffer.test.js +++ b/test/write.buffer.test.js @@ -20,7 +20,7 @@ export const basicData = [ { name: 'bool', data: [true, false, true, false] }, { name: 'int', data: [0, 127, 0x7fff, 0x7fffffff] }, { name: 'bigint', data: [0n, 127n, 0x7fffn, 0x7fffffffffffffffn] }, - { name: 'float', data: [0, 0.0001, 123.456, 1e100], type: 'FLOAT' }, + { name: 'float', data: [0, 0.0001, 123.456, 1e100], type: 'FLOAT', repetition_type: 'REQUIRED' }, { name: 'double', data: [0, 0.0001, 123.456, 1e100] }, { name: 'string', data: ['a', 'b', 'c', 'd'] }, { name: 'nullable', data: [true, false, null, null] }, @@ -199,7 +199,7 @@ describe('parquetWriteBuffer', () => { it('throws for wrong type specified', () => { expect(() => parquetWriteBuffer({ columnData: [{ name: 'int', data: [1, 2, 3], type: 'BOOLEAN' }] })) - .toThrow('parquet cannot write mixed types') + .toThrow('parquet expected boolean value') }) it('throws for empty column with no type specified', () => { diff --git a/test/write.file.test.js b/test/write.file.test.js index b9f853d..edac247 100644 --- a/test/write.file.test.js +++ b/test/write.file.test.js @@ -8,7 +8,7 @@ import { basicData } from './write.buffer.test.js' const filedir = 'data/' const filename = 'data/write.file.parquet' -describe('parquetWrite with FileWriter', () => { +describe('parquetWriteFile', () => { beforeEach(() => { // ensure data directory exists if (!fs.existsSync(filedir)) {