mirror of
https://github.com/asadbek064/hyparquet-writer.git
synced 2025-12-06 07:31:55 +00:00
Support nullable columns
This commit is contained in:
parent
05ee5e274f
commit
44d0d0c77a
18
README.md
18
README.md
@ -1,3 +1,21 @@
|
||||
# Hyparquet Writer
|
||||
|
||||
[](https://opensource.org/licenses/MIT)
|
||||

|
||||
|
||||
## Usage
|
||||
|
||||
```javascript
|
||||
import { writeParquet } from 'hyparquet-writer'
|
||||
|
||||
const arrayBuffer = writeParquet({
|
||||
name: ['Alice', 'Bob', 'Charlie'],
|
||||
age: [25, 30, 35],
|
||||
})
|
||||
```
|
||||
|
||||
## References
|
||||
|
||||
- https://github.com/hyparam/hyparquet
|
||||
- https://github.com/apache/parquet-format
|
||||
- https://github.com/apache/parquet-testing
|
||||
|
||||
121
src/column.js
121
src/column.js
@ -5,17 +5,16 @@ import { serializeTCompactProtocol } from './thrift.js'
|
||||
import { Writer } from './writer.js'
|
||||
|
||||
/**
|
||||
* @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType} from 'hyparquet/src/types.js'
|
||||
* @import {ColumnMetaData, DecodedArray, FieldRepetitionType, PageHeader, ParquetType, SchemaElement} from 'hyparquet/src/types.js'
|
||||
* @param {Writer} writer
|
||||
* @param {string} columnName
|
||||
* @param {SchemaElement[]} schemaPath schema path for the column
|
||||
* @param {DecodedArray} values
|
||||
* @param {ParquetType} type
|
||||
* @returns {ColumnMetaData}
|
||||
*/
|
||||
export function writeColumn(writer, columnName, values, type) {
|
||||
// Get data stats
|
||||
const num_nulls = values.filter(v => v === null).length
|
||||
export function writeColumn(writer, schemaPath, values, type) {
|
||||
const offsetStart = writer.offset
|
||||
let num_nulls = 0
|
||||
|
||||
// Write page to temp buffer
|
||||
const page = new Writer()
|
||||
@ -24,28 +23,39 @@ export function writeColumn(writer, columnName, values, type) {
|
||||
const encoding = 'PLAIN'
|
||||
|
||||
// TODO: repetition levels
|
||||
const maxRepetitionLevel = 0
|
||||
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
|
||||
let repetition_levels_byte_length = 0
|
||||
if (maxRepetitionLevel) {
|
||||
repetition_levels_byte_length = writeRleBitPackedHybrid(page, [])
|
||||
}
|
||||
|
||||
// TODO: definition levels
|
||||
const maxDefinitionLevel = 0
|
||||
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
|
||||
let definition_levels_byte_length = 0
|
||||
if (maxDefinitionLevel) {
|
||||
definition_levels_byte_length = writeRleBitPackedHybrid(page, [])
|
||||
const definitionLevels = []
|
||||
for (const value of values) {
|
||||
if (value === null || value === undefined) {
|
||||
definitionLevels.push(maxDefinitionLevel - 1)
|
||||
num_nulls++
|
||||
} else {
|
||||
definitionLevels.push(maxDefinitionLevel)
|
||||
}
|
||||
}
|
||||
definition_levels_byte_length = writeRleBitPackedHybrid(page, definitionLevels)
|
||||
}
|
||||
|
||||
// write page data (TODO: compressed)
|
||||
const { uncompressed_page_size, compressed_page_size } = writePageData(page, values, type)
|
||||
// write page data
|
||||
writePageData(page, values, type)
|
||||
|
||||
// TODO: compress page data
|
||||
|
||||
// write page header
|
||||
/** @type {PageHeader} */
|
||||
const header = {
|
||||
type: 'DATA_PAGE_V2',
|
||||
uncompressed_page_size,
|
||||
compressed_page_size,
|
||||
uncompressed_page_size: page.offset,
|
||||
compressed_page_size: page.offset,
|
||||
data_page_header_v2: {
|
||||
num_values: values.length,
|
||||
num_nulls,
|
||||
@ -64,7 +74,7 @@ export function writeColumn(writer, columnName, values, type) {
|
||||
return {
|
||||
type,
|
||||
encodings: ['PLAIN'],
|
||||
path_in_schema: [columnName],
|
||||
path_in_schema: schemaPath.slice(1).map(s => s.name),
|
||||
codec: 'UNCOMPRESSED',
|
||||
num_values: BigInt(values.length),
|
||||
total_compressed_size: BigInt(writer.offset - offsetStart),
|
||||
@ -74,18 +84,50 @@ export function writeColumn(writer, columnName, values, type) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Deduce a ParquetType from the JS value
|
||||
* Deduce a ParquetType from JS values
|
||||
*
|
||||
* @param {DecodedArray} values
|
||||
* @returns {ParquetType}
|
||||
* @returns {{ type: ParquetType, repetition_type: 'REQUIRED' | 'OPTIONAL' }}
|
||||
*/
|
||||
export function getParquetTypeForValues(values) {
|
||||
if (values.every(v => typeof v === 'boolean')) return 'BOOLEAN'
|
||||
if (values.every(v => typeof v === 'bigint')) return 'INT64'
|
||||
if (values.every(v => Number.isInteger(v))) return 'INT32'
|
||||
if (values.every(v => typeof v === 'number')) return 'DOUBLE'
|
||||
if (values.every(v => typeof v === 'string')) return 'BYTE_ARRAY'
|
||||
throw new Error(`Cannot determine parquet type for: ${values}`)
|
||||
if (values instanceof Int32Array) return { type: 'INT32', repetition_type: 'REQUIRED' }
|
||||
if (values instanceof BigInt64Array) return { type: 'INT64', repetition_type: 'REQUIRED' }
|
||||
if (values instanceof Float32Array) return { type: 'FLOAT', repetition_type: 'REQUIRED' }
|
||||
if (values instanceof Float64Array) return { type: 'DOUBLE', repetition_type: 'REQUIRED' }
|
||||
/** @type {ParquetType | undefined} */
|
||||
let type = undefined
|
||||
/** @type {FieldRepetitionType} */
|
||||
let repetition_type = 'REQUIRED'
|
||||
for (const value of values) {
|
||||
const valueType = getParquetTypeForValue(value)
|
||||
if (!valueType) {
|
||||
repetition_type = 'OPTIONAL'
|
||||
} else if (type === undefined) {
|
||||
type = valueType
|
||||
} else if (type === 'INT32' && valueType === 'DOUBLE') {
|
||||
type = 'DOUBLE'
|
||||
} else if (type === 'DOUBLE' && valueType === 'INT32') {
|
||||
// keep
|
||||
} else if (type !== valueType) {
|
||||
throw new Error(`parquet cannot write mixed types: ${type} and ${valueType}`)
|
||||
}
|
||||
}
|
||||
if (!type) throw new Error('parquetWrite: empty column cannot determine type')
|
||||
return { type, repetition_type }
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {any} value
|
||||
* @returns {ParquetType | undefined}
|
||||
*/
|
||||
function getParquetTypeForValue(value) {
|
||||
if (value === null || value === undefined) return undefined
|
||||
if (value === true || value === false) return 'BOOLEAN'
|
||||
if (typeof value === 'bigint') return 'INT64'
|
||||
if (Number.isInteger(value)) return 'INT32'
|
||||
if (typeof value === 'number') return 'DOUBLE'
|
||||
if (typeof value === 'string') return 'BYTE_ARRAY'
|
||||
throw new Error(`Cannot determine parquet type for: ${value}`)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -114,13 +156,40 @@ function writePageHeader(writer, header) {
|
||||
* @param {Writer} writer
|
||||
* @param {DecodedArray} values
|
||||
* @param {ParquetType} type
|
||||
* @returns {{ uncompressed_page_size: number, compressed_page_size: number }}
|
||||
*/
|
||||
function writePageData(writer, values, type) {
|
||||
// write plain data
|
||||
const startOffset = writer.offset
|
||||
writePlain(writer, values, type)
|
||||
const size = writer.offset - startOffset
|
||||
|
||||
return { uncompressed_page_size: size, compressed_page_size: size }
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the max repetition level for a given schema path.
|
||||
*
|
||||
* @param {SchemaElement[]} schemaPath
|
||||
* @returns {number} max repetition level
|
||||
*/
|
||||
function getMaxRepetitionLevel(schemaPath) {
|
||||
let maxLevel = 0
|
||||
for (const element of schemaPath) {
|
||||
if (element.repetition_type === 'REPEATED') {
|
||||
maxLevel++
|
||||
}
|
||||
}
|
||||
return maxLevel
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the max definition level for a given schema path.
|
||||
*
|
||||
* @param {SchemaElement[]} schemaPath
|
||||
* @returns {number} max definition level
|
||||
*/
|
||||
function getMaxDefinitionLevel(schemaPath) {
|
||||
let maxLevel = 0
|
||||
for (const element of schemaPath.slice(1)) {
|
||||
if (element.repetition_type !== 'REQUIRED') {
|
||||
maxLevel++
|
||||
}
|
||||
}
|
||||
return maxLevel
|
||||
}
|
||||
|
||||
14
src/write.js
14
src/write.js
@ -5,7 +5,7 @@ import { writeMetadata } from './metadata.js'
|
||||
/**
|
||||
* Write data as parquet to an ArrayBuffer
|
||||
*
|
||||
* @import {ColumnChunk, DecodedArray, FileMetaData, SchemaElement} from 'hyparquet'
|
||||
* @import {ColumnChunk, DecodedArray, FileMetaData, SchemaElement, SchemaTree} from 'hyparquet'
|
||||
* @param {Record<string, DecodedArray>} columnData
|
||||
* @returns {ArrayBuffer}
|
||||
*/
|
||||
@ -29,7 +29,6 @@ export function parquetWrite(columnData) {
|
||||
const schema = [{
|
||||
name: 'root',
|
||||
num_children: columnNames.length,
|
||||
repetition_type: 'REQUIRED',
|
||||
}]
|
||||
|
||||
// row group columns
|
||||
@ -39,10 +38,15 @@ export function parquetWrite(columnData) {
|
||||
// Write columns
|
||||
for (const name of columnNames) {
|
||||
const values = columnData[name]
|
||||
const type = getParquetTypeForValues(values)
|
||||
const { type, repetition_type } = getParquetTypeForValues(values)
|
||||
if (!type) throw new Error(`parquetWrite: empty column ${name} cannot determine type`)
|
||||
const file_offset = BigInt(writer.offset)
|
||||
const meta_data = writeColumn(writer, name, values, type)
|
||||
const repetition_type = 'REQUIRED'
|
||||
/** @type {SchemaElement[]} */
|
||||
const schemaElements = [
|
||||
schema[0],
|
||||
{ type, name, repetition_type, num_children: 0 },
|
||||
]
|
||||
const meta_data = writeColumn(writer, schemaElements, values, type)
|
||||
|
||||
// save metadata
|
||||
schema.push({ type, name, repetition_type })
|
||||
|
||||
@ -11,12 +11,13 @@ export const exampleMetadata = {
|
||||
version: 2,
|
||||
created_by: 'hyparquet',
|
||||
schema: [
|
||||
{ name: 'root', num_children: 5, repetition_type: 'REQUIRED' },
|
||||
{ name: 'root', num_children: 6 },
|
||||
{ name: 'bool', type: 'BOOLEAN', repetition_type: 'REQUIRED' },
|
||||
{ name: 'int', type: 'INT32', repetition_type: 'REQUIRED' },
|
||||
{ name: 'bigint', type: 'INT64', repetition_type: 'REQUIRED' },
|
||||
{ name: 'double', type: 'DOUBLE', repetition_type: 'REQUIRED' },
|
||||
{ name: 'string', type: 'BYTE_ARRAY', repetition_type: 'REQUIRED' },
|
||||
{ name: 'nullable', type: 'BOOLEAN', repetition_type: 'OPTIONAL' },
|
||||
],
|
||||
num_rows: 4n,
|
||||
row_groups: [{
|
||||
@ -91,11 +92,25 @@ export const exampleMetadata = {
|
||||
data_page_offset: 173n,
|
||||
},
|
||||
},
|
||||
{
|
||||
file_path: 'nullable',
|
||||
file_offset: 215n,
|
||||
meta_data: {
|
||||
type: 'BOOLEAN',
|
||||
encodings: ['PLAIN'],
|
||||
path_in_schema: ['nullable'],
|
||||
codec: 'UNCOMPRESSED',
|
||||
num_values: 4n,
|
||||
total_uncompressed_size: 25n,
|
||||
total_compressed_size: 25n,
|
||||
data_page_offset: 215n,
|
||||
},
|
||||
},
|
||||
],
|
||||
total_byte_size: 211n,
|
||||
total_byte_size: 236n,
|
||||
num_rows: 4n,
|
||||
}],
|
||||
metadata_length: 280,
|
||||
metadata_length: 336,
|
||||
}
|
||||
|
||||
describe('writeMetadata', () => {
|
||||
|
||||
@ -20,6 +20,7 @@ const data = {
|
||||
bigint: [0n, 127n, 0x7fffn, 0x7fffffffffffffffn], // INT64
|
||||
double: [0, 0.0001, 123.456, 1e100], // DOUBLE
|
||||
string: ['a', 'b', 'c', 'd'], // BYTE_ARRAY
|
||||
nullable: [true, false, null, null], // BOOLEAN nullable
|
||||
}
|
||||
|
||||
describe('parquetWrite', () => {
|
||||
@ -32,10 +33,10 @@ describe('parquetWrite', () => {
|
||||
it('serializes basic types correctly', async () => {
|
||||
const result = await roundTripDeserialize(data)
|
||||
expect(result).toEqual([
|
||||
{ bool: true, int: 0, bigint: 0n, double: 0, string: 'a' },
|
||||
{ bool: false, int: 127, bigint: 127n, double: 0.0001, string: 'b' },
|
||||
{ bool: true, int: 0x7fff, bigint: 0x7fffn, double: 123.456, string: 'c' },
|
||||
{ bool: false, int: 0x7fffffff, bigint: 0x7fffffffffffffffn, double: 1e100, string: 'd' },
|
||||
{ bool: true, int: 0, bigint: 0n, double: 0, string: 'a', nullable: true },
|
||||
{ bool: false, int: 127, bigint: 127n, double: 0.0001, string: 'b', nullable: false },
|
||||
{ bool: true, int: 0x7fff, bigint: 0x7fffn, double: 123.456, string: 'c', nullable: null },
|
||||
{ bool: false, int: 0x7fffffff, bigint: 0x7fffffffffffffffn, double: 1e100, string: 'd', nullable: null },
|
||||
])
|
||||
})
|
||||
})
|
||||
|
||||
Loading…
Reference in New Issue
Block a user