Handle more types (date and json)

Converts lists and object columns to json.
This commit is contained in:
Kenny Daniel 2025-03-26 00:11:14 -07:00
parent 947a78f72d
commit abe825ded4
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
6 changed files with 177 additions and 93 deletions

@ -1,21 +1,28 @@
import { Encoding, PageType } from 'hyparquet/src/constants.js'
import { unconvert } from './convert.js'
import { writeRleBitPackedHybrid } from './encoding.js'
import { writePlain } from './plain.js'
import { serializeTCompactProtocol } from './thrift.js'
import { Writer } from './writer.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
/**
* @import {ColumnMetaData, DecodedArray, FieldRepetitionType, PageHeader, ParquetType, SchemaElement} from 'hyparquet/src/types.js'
* @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType, SchemaElement} from 'hyparquet'
* @param {Writer} writer
* @param {SchemaElement[]} schemaPath schema path for the column
* @param {SchemaElement[]} schemaPath
* @param {DecodedArray} values
* @param {ParquetType} type
* @returns {ColumnMetaData}
*/
export function writeColumn(writer, schemaPath, values, type) {
export function writeColumn(writer, schemaPath, values) {
const schemaElement = schemaPath[schemaPath.length - 1]
const { type } = schemaElement
if (!type) throw new Error(`column ${schemaElement.name} cannot determine type`)
const offsetStart = writer.offset
let num_nulls = 0
// Unconvert type if necessary
values = unconvert(schemaElement, values)
// Write page to temp buffer
const page = new Writer()
@ -29,7 +36,7 @@ export function writeColumn(writer, schemaPath, values, type) {
repetition_levels_byte_length = writeRleBitPackedHybrid(page, [])
}
// TODO: definition levels
// definition levels
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
let definition_levels_byte_length = 0
if (maxDefinitionLevel) {
@ -83,53 +90,6 @@ export function writeColumn(writer, schemaPath, values, type) {
}
}
/**
* Deduce a ParquetType from JS values
*
* @param {DecodedArray} values
* @returns {{ type: ParquetType, repetition_type: 'REQUIRED' | 'OPTIONAL' }}
*/
export function getParquetTypeForValues(values) {
if (values instanceof Int32Array) return { type: 'INT32', repetition_type: 'REQUIRED' }
if (values instanceof BigInt64Array) return { type: 'INT64', repetition_type: 'REQUIRED' }
if (values instanceof Float32Array) return { type: 'FLOAT', repetition_type: 'REQUIRED' }
if (values instanceof Float64Array) return { type: 'DOUBLE', repetition_type: 'REQUIRED' }
/** @type {ParquetType | undefined} */
let type = undefined
/** @type {FieldRepetitionType} */
let repetition_type = 'REQUIRED'
for (const value of values) {
const valueType = getParquetTypeForValue(value)
if (!valueType) {
repetition_type = 'OPTIONAL'
} else if (type === undefined) {
type = valueType
} else if (type === 'INT32' && valueType === 'DOUBLE') {
type = 'DOUBLE'
} else if (type === 'DOUBLE' && valueType === 'INT32') {
// keep
} else if (type !== valueType) {
throw new Error(`parquet cannot write mixed types: ${type} and ${valueType}`)
}
}
if (!type) throw new Error('parquetWrite: empty column cannot determine type')
return { type, repetition_type }
}
/**
* @param {any} value
* @returns {ParquetType | undefined}
*/
function getParquetTypeForValue(value) {
if (value === null || value === undefined) return undefined
if (value === true || value === false) return 'BOOLEAN'
if (typeof value === 'bigint') return 'INT64'
if (Number.isInteger(value)) return 'INT32'
if (typeof value === 'number') return 'DOUBLE'
if (typeof value === 'string') return 'BYTE_ARRAY'
throw new Error(`Cannot determine parquet type for: ${value}`)
}
/**
* @param {Writer} writer
* @param {PageHeader} header
@ -161,35 +121,3 @@ function writePageData(writer, values, type) {
// write plain data
writePlain(writer, values, type)
}
/**
* Get the max repetition level for a given schema path.
*
* @param {SchemaElement[]} schemaPath
* @returns {number} max repetition level
*/
function getMaxRepetitionLevel(schemaPath) {
let maxLevel = 0
for (const element of schemaPath) {
if (element.repetition_type === 'REPEATED') {
maxLevel++
}
}
return maxLevel
}
/**
* Get the max definition level for a given schema path.
*
* @param {SchemaElement[]} schemaPath
* @returns {number} max definition level
*/
function getMaxDefinitionLevel(schemaPath) {
let maxLevel = 0
for (const element of schemaPath.slice(1)) {
if (element.repetition_type !== 'REQUIRED') {
maxLevel++
}
}
return maxLevel
}

21
src/convert.js Normal file

@ -0,0 +1,21 @@
/**
* Convert from rich to primitive types.
*
* @import {DecodedArray, SchemaElement} from 'hyparquet'
* @param {SchemaElement} schemaElement
* @param {DecodedArray} values
* @returns {DecodedArray}
*/
export function unconvert(schemaElement, values) {
const ctype = schemaElement.converted_type
if (ctype === 'DATE') {
return values.map(v => v.getTime())
}
if (ctype === 'JSON') {
const encoder = new TextEncoder()
if (!Array.isArray(values)) throw new Error('JSON must be an array')
return values.map(v => encoder.encode(JSON.stringify(v)))
}
return values
}

@ -84,8 +84,9 @@ function writePlainDouble(writer, values) {
* @param {DecodedArray} values
*/
function writePlainByteArray(writer, values) {
const encoder = new TextEncoder()
for (const value of values) {
const bytes = new TextEncoder().encode(value)
const bytes = typeof value === 'string' ? encoder.encode(value) : value
writer.appendUint32(bytes.length)
writer.appendBytes(bytes)
}

92
src/schema.js Normal file

@ -0,0 +1,92 @@
/**
* Deduce a ParquetType from JS values
*
* @import {ConvertedType, DecodedArray, FieldRepetitionType, ParquetType, SchemaElement} from 'hyparquet'
* @param {string} name
* @param {DecodedArray} values
* @returns {SchemaElement}
*/
export function getSchemaElementForValues(name, values) {
if (values instanceof Int32Array) return { name, type: 'INT32', repetition_type: 'REQUIRED' }
if (values instanceof BigInt64Array) return { name, type: 'INT64', repetition_type: 'REQUIRED' }
if (values instanceof Float32Array) return { name, type: 'FLOAT', repetition_type: 'REQUIRED' }
if (values instanceof Float64Array) return { name, type: 'DOUBLE', repetition_type: 'REQUIRED' }
/** @type {ParquetType | undefined} */
let type = undefined
/** @type {FieldRepetitionType} */
let repetition_type = 'REQUIRED'
/** @type {ConvertedType | undefined} */
let converted_type = undefined
for (const value of values) {
if (value === null || value === undefined) {
repetition_type = 'OPTIONAL'
} else {
// value is defined
/** @type {ParquetType | undefined} */
let valueType = undefined
if (value === true || value === false) valueType = 'BOOLEAN'
else if (typeof value === 'bigint') valueType = 'INT64'
else if (Number.isInteger(value)) valueType = 'INT32'
else if (typeof value === 'number') valueType = 'DOUBLE'
else if (typeof value === 'string') valueType = 'BYTE_ARRAY'
else if (value instanceof Date) {
valueType = 'INT64'
// make sure they are all dates
if (type && !converted_type) throw new Error('mixed types not supported')
converted_type = 'TIMESTAMP_MILLIS'
}
else if (typeof value === 'object') {
// use json (TODO: native list and object types)
converted_type = 'JSON'
valueType = 'BYTE_ARRAY'
}
else if (!valueType) throw new Error(`Cannot determine parquet type for: ${value}`)
// expand type if necessary
if (type === undefined) {
type = valueType
} else if (type === 'INT32' && valueType === 'DOUBLE') {
type = 'DOUBLE'
} else if (type === 'DOUBLE' && valueType === 'INT32') {
// keep
} else if (type !== valueType) {
throw new Error(`parquet cannot write mixed types: ${type} and ${valueType}`)
}
}
}
if (!type) throw new Error(`column ${name} cannot determine type`)
return { name, type, repetition_type, converted_type }
}
/**
* Get the max repetition level for a given schema path.
*
* @param {SchemaElement[]} schemaPath
* @returns {number} max repetition level
*/
export function getMaxRepetitionLevel(schemaPath) {
let maxLevel = 0
for (const element of schemaPath) {
if (element.repetition_type === 'REPEATED') {
maxLevel++
}
}
return maxLevel
}
/**
* Get the max definition level for a given schema path.
*
* @param {SchemaElement[]} schemaPath
* @returns {number} max definition level
*/
export function getMaxDefinitionLevel(schemaPath) {
let maxLevel = 0
for (const element of schemaPath.slice(1)) {
if (element.repetition_type !== 'REQUIRED') {
maxLevel++
}
}
return maxLevel
}

@ -1,6 +1,7 @@
import { getParquetTypeForValues, writeColumn } from './column.js'
import { writeColumn } from './column.js'
import { Writer } from './writer.js'
import { writeMetadata } from './metadata.js'
import { getSchemaElementForValues } from './schema.js'
/**
* Write data as parquet to an ArrayBuffer
@ -38,18 +39,18 @@ export function parquetWrite(columnData) {
// Write columns
for (const name of columnNames) {
const values = columnData[name]
const { type, repetition_type } = getParquetTypeForValues(values)
if (!type) throw new Error(`parquetWrite: empty column ${name} cannot determine type`)
const schemaElement = getSchemaElementForValues(name, values)
if (!schemaElement.type) throw new Error(`column ${name} cannot determine type`)
const file_offset = BigInt(writer.offset)
/** @type {SchemaElement[]} */
const schemaElements = [
const schemaPath = [
schema[0],
{ type, name, repetition_type, num_children: 0 },
schemaElement,
]
const meta_data = writeColumn(writer, schemaElements, values, type)
const meta_data = writeColumn(writer, schemaPath, values)
// save metadata
schema.push({ type, name, repetition_type })
schema.push(schemaElement)
columns.push({
file_path: name,
file_offset,

@ -30,7 +30,7 @@ describe('parquetWrite', () => {
expect(metadata).toEqual(exampleMetadata)
})
it('serializes basic types correctly', async () => {
it('serializes basic types', async () => {
const result = await roundTripDeserialize(data)
expect(result).toEqual([
{ bool: true, int: 0, bigint: 0n, double: 0, string: 'a', nullable: true },
@ -51,4 +51,45 @@ describe('parquetWrite', () => {
const metadata = parquetMetadata(buffer)
expect(metadata.metadata_length).toBe(89)
})
it('serializes list types', async () => {
const result = await roundTripDeserialize({
list: [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]],
})
expect(result).toEqual([
{ list: [1, 2, 3] },
{ list: [4, 5, 6] },
{ list: [7, 8, 9] },
{ list: [10, 11, 12] },
])
})
it('serializes object types', async () => {
const result = await roundTripDeserialize({
obj: [{ a: 1, b: 2 }, { a: 3, b: 4 }, { a: 5, b: 6 }, { a: 7, b: 8 }],
})
expect(result).toEqual([
{ obj: { a: 1, b: 2 } },
{ obj: { a: 3, b: 4 } },
{ obj: { a: 5, b: 6 } },
{ obj: { a: 7, b: 8 } },
])
})
it('serializes date types', async () => {
const result = await roundTripDeserialize({
date: [new Date(0), new Date(100000), new Date(200000), new Date(300000)],
})
expect(result).toEqual([
{ date: new Date(0) },
{ date: new Date(100000) },
{ date: new Date(200000) },
{ date: new Date(300000) },
])
})
it('throws for mixed types', () => {
expect(() => parquetWrite({ mixed: [1, 2, 3, 'boom'] }))
.toThrow('parquet cannot write mixed types: INT32 and BYTE_ARRAY')
})
})