Refactor api to support arbitrary parquet schemas (#3)

* Refactor api to support arbitrary parquet schemas

* More detail in README and error messages

* Export schema tools with optional overrides

* Test for basic timestamp type

* Fix time-type tests

* schemaFromColumnData should take an options object

* Update README with schemaFromColumnData options
This commit is contained in:
Kenny Daniel 2025-05-09 14:47:22 -07:00 committed by GitHub
parent 43d8061d92
commit 791a70b983
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 318 additions and 107 deletions

103
README.md

@ -24,23 +24,27 @@ import { parquetWriteBuffer } from 'hyparquet-writer'
const arrayBuffer = parquetWriteBuffer({
columnData: [
{ name: 'name', data: ['Alice', 'Bob', 'Charlie'], type: 'BYTE_ARRAY' },
{ name: 'name', data: ['Alice', 'Bob', 'Charlie'], type: 'STRING' },
{ name: 'age', data: [25, 30, 35], type: 'INT32' },
],
})
```
Note: if `type` is not provided, the type will be guessed from the data. The supported parquet types are:
Note: if `type` is not provided, the type will be guessed from the data. The supported types are a superset of the parquet types:
- `BOOLEAN`
- `INT32`
- `INT64`
- `FLOAT`
- `DOUBLE`
- `BYTE_ARRAY`
- `FIXED_LEN_BYTE_ARRAY`
| `BOOLEAN` | `{ type: 'BOOLEAN' }` |
| `INT32` | `{ type: 'INT32' }` |
| `INT64` | `{ type: 'INT64' }` |
| `FLOAT` | `{ type: 'FLOAT' }` |
| `DOUBLE` | `{ type: 'DOUBLE' }` |
| `BYTE_ARRAY` | `{ type: 'BYTE_ARRAY' }` |
| `STRING` | `{ type: 'BYTE_ARRAY', converted_type: 'UTF8' }` |
| `JSON` | `{ type: 'BYTE_ARRAY', converted_type: 'JSON' }` |
| `TIMESTAMP` | `{ type: 'INT64', converted_type: 'TIMESTAMP_MILLIS' }` |
| `UUID` | `{ type: 'FIXED_LEN_BYTE_ARRAY', type_length: 16, logical_type: { type: 'UUID' } }` |
| `FLOAT16` | `{ type: 'FIXED_LEN_BYTE_ARRAY', type_length: 2, logical_type: { type: 'FLOAT16' } }` |
Strings are represented in parquet as type `BYTE_ARRAY`.
More types are supported but require defining the `schema` explicitly. See the [advanced usage](#advanced-usage) section for more details.
### Node.js Write to Local Parquet File
@ -52,7 +56,7 @@ const { parquetWriteFile } = await import('hyparquet-writer')
parquetWriteFile({
filename: 'example.parquet',
columnData: [
{ name: 'name', data: ['Alice', 'Bob', 'Charlie'], type: 'BYTE_ARRAY' },
{ name: 'name', data: ['Alice', 'Bob', 'Charlie'], type: 'STRING' },
{ name: 'age', data: [25, 30, 35], type: 'INT32' },
],
})
@ -65,6 +69,7 @@ Note: hyparquet-writer is published as an ES module, so dynamic `import()` may b
Options can be passed to `parquetWrite` to adjust parquet file writing behavior:
- `writer`: a generic writer object
- `schema`: parquet schema object (optional)
- `compressed`: use snappy compression (default true)
- `statistics`: write column statistics (default true)
- `rowGroupSize`: number of rows in each row group (default 100000)
@ -74,11 +79,19 @@ Options can be passed to `parquetWrite` to adjust parquet file writing behavior:
import { ByteWriter, parquetWrite } from 'hyparquet-writer'
const writer = new ByteWriter()
const arrayBuffer = parquetWrite({
parquetWrite({
writer,
columnData: [
{ name: 'name', data: ['Alice', 'Bob', 'Charlie'], type: 'BYTE_ARRAY' },
{ name: 'age', data: [25, 30, 35], type: 'INT32' },
{ name: 'name', data: ['Alice', 'Bob', 'Charlie'] },
{ name: 'age', data: [25, 30, 35] },
{ name: 'dob', data: [new Date(1000000), new Date(2000000), new Date(3000000)] },
],
// explicit schema:
schema: [
{ name: 'root', num_children: 3 },
{ name: 'name', type: 'BYTE_ARRAY', converted_type: 'UTF8' },
{ name: 'age', type: 'FIXED_LEN_BYTE_ARRAY', type_length: 4, converted_type: 'DECIMAL', scale: 2, precision: 4 },
{ name: 'dob', type: 'INT32', converted_type: 'DATE' },
],
compressed: false,
statistics: false,
@ -88,33 +101,55 @@ const arrayBuffer = parquetWrite({
{ key: 'key2', value: 'value2' },
],
})
const arrayBuffer = writer.getBuffer()
```
### Converted Types
### Types
You can provide additional type hints by providing a `converted_type` to the `columnData` elements:
Parquet requires an explicit schema to be defined. You can provide schema information in three ways:
```javascript
parquetWrite({
columnData: [
{
name: 'dates',
data: [new Date(1000000), new Date(2000000)],
type: 'INT64',
converted_type: 'TIMESTAMP_MILLIS',
},
{
name: 'json',
data: [{ foo: 'bar' }, { baz: 3 }, 'imastring'],
type: 'BYTE_ARRAY',
converted_type: 'JSON',
},
]
})
```
1. **Type**: You can provide a `type` in the `columnData` elements, the type will be used as the schema type.
2. **Schema**: You can provide a `schema` parameter that explicitly defines the parquet schema. The schema should be an array of `SchemaElement` objects (see [parquet-format](https://github.com/apache/parquet-format)), each containing the following properties:
- `name`: column name
- `type`: parquet type
- `num_children`: number children in parquet nested schema (optional)
- `converted_type`: parquet converted type (optional)
- `logical_type`: parquet logical type (optional)
- `repetition_type`: parquet repetition type (optional)
- `type_length`: length for `FIXED_LENGTH_BYTE_ARRAY` type (optional)
- `scale`: the scale factor for `DECIMAL` converted types (optional)
- `precision`: the precision for `DECIMAL` converted types (optional)
- `field_id`: the field id for the column (optional)
3. **Auto-detect**: If you provide no type or schema, the type will be auto-detected from the data. However, it is recommended that you provide type information when possible. (zero rows would throw an exception, floats might be typed as int, etc)
Most converted types will be auto-detected if you just provide data with no types. However, it is still recommended that you provide type information when possible. (zero rows would throw an exception, floats might be typed as int, etc)
#### Schema Overrides
You can use mostly automatic schema detection, but override the schema for specific columns. This is useful if most of the column types can be automatically determined, but you want to use a specific schema element for one particular element.
```javascript
import { parquetWrite, schemaFromColumnData } from 'hyparquet-writer'
const columnData = [
{ name: 'unsigned_int', data: [1000000, 2000000] },
{ name: 'signed_int', data: [1000000, 2000000] },
]
parquetWrite({
columnData,
// override schema for uint column
schema: schemaFromColumnData({
columnData,
schemaOverrides: {
unsigned_int: {
type: 'INT32',
converted_type: 'UINT_32',
},
},
}),
})
```
## References
- https://github.com/hyparam/hyparquet

@ -38,9 +38,7 @@ console.log(`parsed ${filename} ${rows.length.toLocaleString()} rows in ${ms.toF
// transpose rows
const schema = parquetSchema(metadata)
const columnData = schema.children.map(({ element }) => ({
// name: element.name,
// type: element.type,
...element,
name: element.name,
data: [],
})) // .filter(({ name }) => name === 'l_comment')
for (const row of rows) {
@ -56,6 +54,7 @@ startTime = performance.now()
parquetWriteFile({
filename: outputFilename,
columnData,
schema: metadata.schema,
})
ms = performance.now() - startTime
stat = await fs.stat(outputFilename)

@ -1,3 +1,4 @@
export { parquetWrite, parquetWriteBuffer } from './write.js'
export { autoSchemaElement, schemaFromColumnData } from './schema.js'
export { ByteWriter } from './bytewriter.js'
export { ParquetWriter } from './parquet-writer.js'

@ -1,10 +1,13 @@
/**
* Convert column data to schema.
* Infer a schema from column data.
* Accepts optional schemaOverrides to override the type of columns by name.
*
* @param {ColumnData[]} columnData
* @param {object} options
* @param {ColumnData[]} options.columnData
* @param {Record<string,SchemaElement>} [options.schemaOverrides]
* @returns {SchemaElement[]}
*/
export function schemaFromColumnData(columnData) {
export function schemaFromColumnData({ columnData, schemaOverrides }) {
/** @type {SchemaElement[]} */
const schema = [{
name: 'root',
@ -12,20 +15,26 @@ export function schemaFromColumnData(columnData) {
}]
let num_rows = 0
for (const column of columnData) {
for (const { name, data, type, nullable } of columnData) {
// check if all columns have the same length
num_rows = num_rows || column.data.length
if (num_rows !== column.data.length) {
num_rows = num_rows || data.length
if (num_rows !== data.length) {
throw new Error('columns must have the same length')
}
const { data, ...schemaElement } = column
if (column.type) {
if (schemaOverrides?.[name]) {
// use schema override
const override = schemaOverrides[name]
if (override.name !== name) throw new Error('schema override name does not match column name')
if (override.num_children) throw new Error('schema override cannot have children')
if (override.repetition_type === 'REPEATED') throw new Error('schema override cannot be repeated')
schema.push(override)
} else if (type) {
// use provided type
schema.push(schemaElement)
schema.push(basicTypeToSchemaElement(name, type, nullable))
} else {
// auto-detect type
schema.push(autoSchemaElement(column.name, data))
schema.push(autoSchemaElement(name, data))
}
}
@ -33,15 +42,41 @@ export function schemaFromColumnData(columnData) {
}
/**
* Deduce a ParquetType from JS values
*
* @import {ConvertedType, DecodedArray, FieldRepetitionType, ParquetType, SchemaElement} from 'hyparquet'
* @import {ColumnData} from '../src/types.js'
* @import {BasicType, ColumnData} from '../src/types.js'
* @param {string} name
* @param {BasicType} type
* @param {boolean} [nullable]
* @returns {SchemaElement}
*/
function basicTypeToSchemaElement(name, type, nullable) {
const repetition_type = nullable === false ? 'REQUIRED' : 'OPTIONAL'
if (type === 'STRING') {
return { name, type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type }
}
if (type === 'JSON') {
return { name, type: 'BYTE_ARRAY', converted_type: 'JSON', repetition_type }
}
if (type === 'TIMESTAMP') {
return { name, type: 'INT64', converted_type: 'TIMESTAMP_MILLIS', repetition_type }
}
if (type === 'UUID') {
return { name, type: 'FIXED_LEN_BYTE_ARRAY', type_length: 16, logical_type: { type: 'UUID' }, repetition_type }
}
if (type === 'FLOAT16') {
return { name, type: 'FIXED_LEN_BYTE_ARRAY', type_length: 2, logical_type: { type: 'FLOAT16' }, repetition_type }
}
return { name, type, repetition_type }
}
/**
* Automatically determine a SchemaElement from an array of values.
*
* @param {string} name
* @param {DecodedArray} values
* @returns {SchemaElement}
*/
function autoSchemaElement(name, values) {
export function autoSchemaElement(name, values) {
/** @type {ParquetType | undefined} */
let type
/** @type {FieldRepetitionType} */

28
src/types.d.ts vendored

@ -1,8 +1,23 @@
import type { ConvertedType, DecodedArray, FieldRepetitionType, KeyValue, LogicalType, ParquetType } from 'hyparquet'
import type { DecodedArray, KeyValue, SchemaElement } from 'hyparquet'
// Superset of parquet types with automatic conversions
export type BasicType =
'BOOLEAN' |
'INT32' |
'INT64' |
'FLOAT' |
'DOUBLE' |
'BYTE_ARRAY' |
'STRING' |
'JSON' |
'TIMESTAMP' |
'UUID' |
'FLOAT16'
export interface ParquetWriteOptions {
writer: Writer
columnData: ColumnData[]
schema?: SchemaElement[]
compressed?: boolean
statistics?: boolean
rowGroupSize?: number
@ -12,15 +27,8 @@ export interface ParquetWriteOptions {
export interface ColumnData {
name: string
data: DecodedArray
// fields from SchemaElement:
type?: ParquetType
type_length?: number
repetition_type?: FieldRepetitionType
converted_type?: ConvertedType
scale?: number
precision?: number
field_id?: number
logical_type?: LogicalType
type?: BasicType
nullable?: boolean
}
export interface Writer {

@ -197,6 +197,7 @@ export function unconvertDecimal({ type, type_length }, value) {
*/
export function unconvertFloat16(value) {
if (value === undefined || value === null) return
if (typeof value !== 'number') throw new Error('parquet float16 expected number value')
if (Number.isNaN(value)) return new Uint8Array([0x00, 0x7e])
const sign = value < 0 || Object.is(value, -0) ? 1 : 0

@ -11,12 +11,19 @@ import { schemaFromColumnData } from './schema.js'
export function parquetWrite({
writer,
columnData,
schema,
compressed = true,
statistics = true,
rowGroupSize = 100000,
kvMetadata,
}) {
const schema = schemaFromColumnData(columnData)
if (!schema) {
schema = schemaFromColumnData({ columnData })
} else if (columnData.some(({ type }) => type)) {
throw new Error('cannot provide both schema and columnData type')
} else {
// TODO: validate schema
}
const pq = new ParquetWriter({
writer,
schema,

@ -3,7 +3,7 @@ export const exampleData = [
{ name: 'bool', data: [true, false, true, false] },
{ name: 'int', data: [0, 127, 0x7fff, 0x7fffffff] },
{ name: 'bigint', data: [0n, 127n, 0x7fffn, 0x7fffffffffffffffn] },
{ name: 'float', data: [0, 0.0001, 123.456, 1e100], type: 'FLOAT', repetition_type: 'REQUIRED' },
{ name: 'float', data: [0, 0.0001, 123.456, 1e100], type: 'FLOAT', nullable: false },
{ name: 'double', data: [0, 0.0001, 123.456, 1e100] },
{ name: 'string', data: ['a', 'b', 'c', 'd'] },
{ name: 'nullable', data: [true, false, null, null] },

@ -8,10 +8,11 @@ import { exampleData, exampleMetadata } from './example.js'
*
* @import {ColumnData} from '../src/types.js'
* @param {ColumnData[]} columnData
* @param {import('hyparquet').SchemaElement[]} [schema]
* @returns {Promise<Record<string, any>>}
*/
async function roundTripDeserialize(columnData) {
const file = parquetWriteBuffer({ columnData })
async function roundTripDeserialize(columnData, schema) {
const file = parquetWriteBuffer({ columnData, schema })
return await parquetReadObjects({ file, utf8: false })
}
@ -32,10 +33,10 @@ describe('parquetWriteBuffer', () => {
])
})
it('serializes a string without converted_type', () => {
it('serializes a string as a BYTE_ARRAY', () => {
const data = ['string1', 'string2', 'string3']
const file = parquetWriteBuffer({ columnData: [{ name: 'string', data, type: 'BYTE_ARRAY' }] })
expect(file.byteLength).toBe(162)
expect(file.byteLength).toBe(164)
})
it('serializes booleans as RLE', async () => {
@ -141,23 +142,28 @@ describe('parquetWriteBuffer', () => {
})
it('serializes time types', async () => {
const result = await roundTripDeserialize([
{
name: 'time32',
data: [100000, 200000, 300000],
logical_type: { type: 'TIME', isAdjustedToUTC: false, unit: 'MILLIS' },
},
{
name: 'time64',
data: [100000000n, 200000000n, 300000000n],
logical_type: { type: 'TIME', isAdjustedToUTC: false, unit: 'MICROS' },
},
{
name: 'interval',
data: [1000000000n, 2000000000n, 3000000000n],
logical_type: { type: 'INTERVAL' },
},
])
const result = await roundTripDeserialize(
[
{
name: 'time32',
data: [100000, 200000, 300000],
},
{
name: 'time64',
data: [100000000n, 200000000n, 300000000n],
},
{
name: 'interval',
data: [1000000000n, 2000000000n, 3000000000n],
},
],
[
{ name: 'root', num_children: 3 },
{ name: 'time32', repetition_type: 'OPTIONAL', type: 'INT32', logical_type: { type: 'TIME', isAdjustedToUTC: false, unit: 'MILLIS' } },
{ name: 'time64', repetition_type: 'OPTIONAL', type: 'INT64', logical_type: { type: 'TIME', isAdjustedToUTC: false, unit: 'MICROS' } },
{ name: 'interval', repetition_type: 'OPTIONAL', type: 'INT64', logical_type: { type: 'INTERVAL' } },
]
)
expect(result).toEqual([
{ time32: 100000, time64: 100000000n, interval: 1000000000n },
{ time32: 200000, time64: 200000000n, interval: 2000000000n },
@ -186,9 +192,7 @@ describe('parquetWriteBuffer', () => {
new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]),
new Uint8Array([17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]),
],
type: 'FIXED_LEN_BYTE_ARRAY',
type_length: 16,
logical_type: { type: 'UUID' },
type: 'UUID',
},
{
name: 'string',
@ -196,9 +200,7 @@ describe('parquetWriteBuffer', () => {
'00000000-0000-0000-0000-000000000001',
'00010002-0003-0004-0005-000600070008',
],
type: 'FIXED_LEN_BYTE_ARRAY',
type_length: 16,
logical_type: { type: 'UUID' },
type: 'UUID',
},
])
expect(result).toEqual([
@ -277,20 +279,10 @@ describe('parquetWriteBuffer', () => {
.toThrow('parquet expected number value')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'int', data: [1, 2, 3], type: 'BYTE_ARRAY' }] }))
.toThrow('parquet expected Uint8Array value')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'float16', data: [1, 2, 3], type: 'FIXED_LEN_BYTE_ARRAY' }] }))
.toThrow('parquet FIXED_LEN_BYTE_ARRAY expected type_length')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'float16', data: [1, 2, 3], type: 'FIXED_LEN_BYTE_ARRAY', type_length: 4 }] }))
.toThrow('parquet expected Uint8Array value')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'float16', data: [1, 2, 3], type: 'FIXED_LEN_BYTE_ARRAY', type_length: 4, logical_type: { type: 'FLOAT16' } }] }))
.toThrow('FLOAT16 expected type_length to be 2 bytes')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'uuid', data: [new Uint8Array(4)], type: 'FIXED_LEN_BYTE_ARRAY', logical_type: { type: 'UUID' } }] }))
.toThrow('UUID expected type_length to be 16 bytes')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'uuid', data: [new Uint8Array(4)], type: 'FIXED_LEN_BYTE_ARRAY', type_length: 16, logical_type: { type: 'UUID' } }] }))
expect(() => parquetWriteBuffer({ columnData: [{ name: 'float16', data: [1n, 2n, 3n], type: 'FLOAT16' }] }))
.toThrow('parquet float16 expected number value')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'uuid', data: [new Uint8Array(4)], type: 'UUID' }] }))
.toThrow('parquet expected Uint8Array of length 16')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'uuid', data: [new Uint8Array(16)], type: 'FIXED_LEN_BYTE_ARRAY', type_length: 4, logical_type: { type: 'UUID' } }] }))
.toThrow('UUID expected type_length to be 16 bytes')
expect(() => parquetWriteBuffer({ columnData: [{ name: 'uuid', data: ['0000'], type: 'FIXED_LEN_BYTE_ARRAY', logical_type: { type: 'UUID' } }] }))
.toThrow('UUID expected type_length to be 16 bytes')
})
it('throws for empty column with no type specified', () => {

@ -13,9 +13,9 @@ describe('parquetWrite round-trip', () => {
const rows = await parquetReadObjects({ file })
// transpose the row data
const schema = parquetSchema(metadata)
const columnData = schema.children.map(({ element }) => ({
...element,
const schemaTree = parquetSchema(metadata)
const columnData = schemaTree.children.map(({ element }) => ({
name: element.name,
data: /** @type {any[]} */ ([]),
}))
for (const row of rows) {
@ -24,7 +24,7 @@ describe('parquetWrite round-trip', () => {
}
}
const buffer = parquetWriteBuffer({ columnData })
const buffer = parquetWriteBuffer({ columnData, schema: metadata.schema })
const output = await parquetReadObjects({ file: buffer })
expect(output.length).toBe(rows.length)

133
test/write.schema.test.js Normal file

@ -0,0 +1,133 @@
import { parquetMetadata } from 'hyparquet'
import { describe, expect, it } from 'vitest'
import { parquetWriteBuffer, schemaFromColumnData } from '../src/index.js'
describe('parquet schema', () => {
it('auto detects types', () => {
const file = parquetWriteBuffer({ columnData: [
{ name: 'strings', data: ['1', '2', '3'] },
] })
const metadata = parquetMetadata(file)
expect(metadata.schema).toEqual([
{
name: 'root',
num_children: 1,
},
{
converted_type: 'UTF8',
name: 'strings',
repetition_type: 'REQUIRED',
type: 'BYTE_ARRAY',
},
])
})
it('accepts basic type hints', () => {
const file = parquetWriteBuffer({ columnData: [
{
name: 'timestamps',
data: [new Date(1000000), new Date(2000000), new Date(3000000)],
type: 'TIMESTAMP',
},
] })
const metadata = parquetMetadata(file)
expect(metadata.schema).toEqual([
{
name: 'root',
num_children: 1,
},
{
converted_type: 'TIMESTAMP_MILLIS',
name: 'timestamps',
repetition_type: 'OPTIONAL',
type: 'INT64',
},
])
})
it('accepts nullable basic type hints', () => {
const file = parquetWriteBuffer({ columnData: [
{ name: 'numbers', data: [1, 2, 3], type: 'FLOAT', nullable: false },
] })
const metadata = parquetMetadata(file)
expect(metadata.schema).toEqual([
{
name: 'root',
num_children: 1,
},
{
name: 'numbers',
repetition_type: 'REQUIRED',
type: 'FLOAT',
},
])
})
it('accepts explicit schema', () => {
const file = parquetWriteBuffer({ columnData: [
{ name: 'numbers', data: [1, 2, 3] },
], schema: [
{ name: 'root', num_children: 1 },
{ name: 'numbers', type: 'FLOAT', repetition_type: 'REQUIRED' },
] })
const metadata = parquetMetadata(file)
expect(metadata.schema).toEqual([
{
name: 'root',
num_children: 1,
},
{
name: 'numbers',
repetition_type: 'REQUIRED',
type: 'FLOAT',
},
])
})
it('accepts schema override', () => {
const columnData = [
{ name: 'numbers', data: [1, 2, 3] },
]
const file = parquetWriteBuffer({
columnData,
schema: schemaFromColumnData({
columnData,
schemaOverrides: {
numbers: {
name: 'numbers',
type: 'DOUBLE',
repetition_type: 'OPTIONAL',
field_id: 1,
},
},
}),
})
const metadata = parquetMetadata(file)
expect(metadata.schema).toEqual([
{
name: 'root',
num_children: 1,
},
{
field_id: 1,
name: 'numbers',
repetition_type: 'OPTIONAL',
type: 'DOUBLE',
},
])
})
it('throws if basic types conflict with schema', () => {
expect(() => {
parquetWriteBuffer({
columnData: [
{ name: 'numbers', data: [1, 2, 3], type: 'FLOAT' },
],
schema: [
{ name: 'root', num_children: 1 },
{ name: 'numbers', type: 'DOUBLE', repetition_type: 'OPTIONAL' },
],
})
}).toThrow('cannot provide both schema and columnData type')
})
})