mirror of
https://github.com/asadbek064/hyparquet-writer.git
synced 2026-02-23 21:01:33 +00:00
Support more SchemaElement options
This commit is contained in:
parent
702b8fe3ec
commit
fff0b1c6d9
6
.github/workflows/ci.yml
vendored
6
.github/workflows/ci.yml
vendored
@ -7,20 +7,20 @@ jobs:
|
||||
lint:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- run: npm i
|
||||
- run: npm run lint
|
||||
|
||||
typecheck:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- run: npm i
|
||||
- run: npx tsc
|
||||
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- run: npm i
|
||||
- run: npm run coverage
|
||||
|
||||
@ -5,8 +5,11 @@
|
||||
"author": "Hyperparam",
|
||||
"homepage": "https://hyperparam.app",
|
||||
"keywords": [
|
||||
"ai",
|
||||
"data",
|
||||
"hyperparam",
|
||||
"hyparquet",
|
||||
"ml",
|
||||
"parquet",
|
||||
"snappy",
|
||||
"thrift"
|
||||
@ -42,7 +45,7 @@
|
||||
"test": "vitest run"
|
||||
},
|
||||
"dependencies": {
|
||||
"hyparquet": "1.11.0"
|
||||
"hyparquet": "1.11.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@babel/eslint-parser": "7.27.0",
|
||||
|
||||
@ -29,25 +29,22 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) {
|
||||
|
||||
// Compute statistics
|
||||
if (stats) {
|
||||
statistics = {
|
||||
min_value: undefined,
|
||||
max_value: undefined,
|
||||
null_count: 0n,
|
||||
}
|
||||
let min_value = undefined
|
||||
let max_value = undefined
|
||||
let null_count = 0n
|
||||
for (const value of values) {
|
||||
if (value === null || value === undefined) {
|
||||
null_count++
|
||||
continue
|
||||
}
|
||||
if (statistics.min_value === undefined || value < statistics.min_value) {
|
||||
statistics.min_value = value
|
||||
if (min_value === undefined || value < min_value) {
|
||||
min_value = value
|
||||
}
|
||||
if (statistics.max_value === undefined || value > statistics.max_value) {
|
||||
statistics.max_value = value
|
||||
if (max_value === undefined || value > max_value) {
|
||||
max_value = value
|
||||
}
|
||||
}
|
||||
statistics.null_count = null_count
|
||||
statistics = { min_value, max_value, null_count }
|
||||
}
|
||||
|
||||
// Write levels to temp buffer
|
||||
|
||||
@ -4,8 +4,7 @@ import { writeMetadata } from './metadata.js'
|
||||
/**
|
||||
* ParquetWriter class allows incremental writing of parquet files.
|
||||
*
|
||||
* @import {ColumnChunk, FileMetaData, RowGroup, SchemaElement} from 'hyparquet'
|
||||
* @import {KeyValue} from 'hyparquet/src/types.js' // TODO export from hyparquet
|
||||
* @import {ColumnChunk, FileMetaData, KeyValue, RowGroup, SchemaElement} from 'hyparquet'
|
||||
* @import {ColumnData, Writer} from '../src/types.js'
|
||||
* @param {object} options
|
||||
* @param {Writer} options.writer
|
||||
|
||||
13
src/plain.js
13
src/plain.js
@ -19,6 +19,8 @@ export function writePlain(writer, values, type) {
|
||||
writePlainDouble(writer, values)
|
||||
} else if (type === 'BYTE_ARRAY') {
|
||||
writePlainByteArray(writer, values)
|
||||
} else if (type === 'FIXED_LEN_BYTE_ARRAY') {
|
||||
writePlainByteArray(writer, values)
|
||||
} else {
|
||||
throw new Error(`parquet unsupported type: ${type}`)
|
||||
}
|
||||
@ -32,20 +34,21 @@ function writePlainBoolean(writer, values) {
|
||||
let currentByte = 0
|
||||
|
||||
for (let i = 0; i < values.length; i++) {
|
||||
if (typeof values[i] !== 'boolean') throw new Error('parquet expected boolean value')
|
||||
const bitOffset = i % 8
|
||||
|
||||
if (values[i]) {
|
||||
currentByte |= 1 << bitOffset
|
||||
}
|
||||
|
||||
// Once we've packed 8 bits or are at a multiple of 8, we write out the byte
|
||||
// once we've packed 8 bits or are at a multiple of 8, we write out the byte
|
||||
if (bitOffset === 7) {
|
||||
writer.appendUint8(currentByte)
|
||||
currentByte = 0
|
||||
}
|
||||
}
|
||||
|
||||
// If the array length is not a multiple of 8, write the leftover bits
|
||||
// if the array length is not a multiple of 8, write the leftover bits
|
||||
if (values.length % 8 !== 0) {
|
||||
writer.appendUint8(currentByte)
|
||||
}
|
||||
@ -57,6 +60,7 @@ function writePlainBoolean(writer, values) {
|
||||
*/
|
||||
function writePlainInt32(writer, values) {
|
||||
for (const value of values) {
|
||||
if (typeof value !== 'number') throw new Error('parquet expected number value')
|
||||
writer.appendInt32(value)
|
||||
}
|
||||
}
|
||||
@ -67,6 +71,7 @@ function writePlainInt32(writer, values) {
|
||||
*/
|
||||
function writePlainInt64(writer, values) {
|
||||
for (const value of values) {
|
||||
if (typeof value !== 'bigint') throw new Error('parquet expected bigint value')
|
||||
writer.appendInt64(value)
|
||||
}
|
||||
}
|
||||
@ -77,6 +82,7 @@ function writePlainInt64(writer, values) {
|
||||
*/
|
||||
function writePlainFloat(writer, values) {
|
||||
for (const value of values) {
|
||||
if (typeof value !== 'number') throw new Error('parquet expected number value')
|
||||
writer.appendFloat32(value)
|
||||
}
|
||||
}
|
||||
@ -87,6 +93,7 @@ function writePlainFloat(writer, values) {
|
||||
*/
|
||||
function writePlainDouble(writer, values) {
|
||||
for (const value of values) {
|
||||
if (typeof value !== 'number') throw new Error('parquet expected number value')
|
||||
writer.appendFloat64(value)
|
||||
}
|
||||
}
|
||||
@ -97,7 +104,7 @@ function writePlainDouble(writer, values) {
|
||||
*/
|
||||
function writePlainByteArray(writer, values) {
|
||||
for (const value of values) {
|
||||
if (!(value instanceof Uint8Array)) throw new Error('BYTE_ARRAY must be Uint8Array')
|
||||
if (!(value instanceof Uint8Array)) throw new Error('parquet expected Uint8Array value')
|
||||
writer.appendUint32(value.length)
|
||||
writer.appendBytes(value)
|
||||
}
|
||||
|
||||
@ -1,24 +1,56 @@
|
||||
/**
|
||||
* Convert column data to schema.
|
||||
*
|
||||
* @param {ColumnData[]} columnData
|
||||
* @returns {SchemaElement[]}
|
||||
*/
|
||||
export function schemaFromColumnData(columnData) {
|
||||
/** @type {SchemaElement[]} */
|
||||
const schema = [{
|
||||
name: 'root',
|
||||
num_children: columnData.length,
|
||||
}]
|
||||
let num_rows = 0
|
||||
|
||||
for (const column of columnData) {
|
||||
// check if all columns have the same length
|
||||
num_rows = num_rows || column.data.length
|
||||
if (num_rows !== column.data.length) {
|
||||
throw new Error('columns must have the same length')
|
||||
}
|
||||
|
||||
// auto-detect type if not provided
|
||||
/** @type {SchemaElement} */
|
||||
const schemaElement = column.type ? column : autoSchemaElement(column.name, column.data)
|
||||
if (!schemaElement.type) throw new Error(`column ${column.name} cannot determine type`)
|
||||
schema.push(schemaElement)
|
||||
}
|
||||
|
||||
return schema
|
||||
}
|
||||
|
||||
/**
|
||||
* Deduce a ParquetType from JS values
|
||||
*
|
||||
* @import {ConvertedType, DecodedArray, FieldRepetitionType, ParquetType, SchemaElement} from 'hyparquet'
|
||||
* @import {ColumnData} from '../src/types.js'
|
||||
* @param {string} name
|
||||
* @param {DecodedArray} values
|
||||
* @param {ParquetType | undefined} type
|
||||
* @returns {SchemaElement}
|
||||
*/
|
||||
export function getSchemaElementForValues(name, values, type) {
|
||||
if (values instanceof Int32Array) return { name, type: 'INT32', repetition_type: 'REQUIRED' }
|
||||
if (values instanceof BigInt64Array) return { name, type: 'INT64', repetition_type: 'REQUIRED' }
|
||||
if (values instanceof Float32Array) return { name, type: 'FLOAT', repetition_type: 'REQUIRED' }
|
||||
if (values instanceof Float64Array) return { name, type: 'DOUBLE', repetition_type: 'REQUIRED' }
|
||||
|
||||
function autoSchemaElement(name, values) {
|
||||
/** @type {ParquetType | undefined} */
|
||||
let type
|
||||
/** @type {FieldRepetitionType} */
|
||||
let repetition_type = 'REQUIRED'
|
||||
/** @type {ConvertedType | undefined} */
|
||||
let converted_type = undefined
|
||||
|
||||
if (values instanceof Int32Array) return { name, type: 'INT32', repetition_type }
|
||||
if (values instanceof BigInt64Array) return { name, type: 'INT64', repetition_type }
|
||||
if (values instanceof Float32Array) return { name, type: 'FLOAT', repetition_type }
|
||||
if (values instanceof Float64Array) return { name, type: 'DOUBLE', repetition_type }
|
||||
|
||||
for (const value of values) {
|
||||
if (value === null || value === undefined) {
|
||||
repetition_type = 'OPTIONAL'
|
||||
@ -55,10 +87,6 @@ export function getSchemaElementForValues(name, values, type) {
|
||||
type = valueType
|
||||
} else if (type === 'INT32' && valueType === 'DOUBLE') {
|
||||
type = 'DOUBLE'
|
||||
} else if (type === 'FLOAT' && valueType === 'INT32') {
|
||||
valueType = 'FLOAT'
|
||||
} else if (type === 'FLOAT' && valueType === 'DOUBLE') {
|
||||
valueType = 'FLOAT'
|
||||
} else if (type === 'DOUBLE' && valueType === 'INT32') {
|
||||
valueType = 'DOUBLE'
|
||||
}
|
||||
@ -102,32 +130,3 @@ export function getMaxDefinitionLevel(schemaPath) {
|
||||
}
|
||||
return maxLevel
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert column data to schema.
|
||||
*
|
||||
* @import {ColumnData} from '../src/types.js'
|
||||
* @param {ColumnData[]} columnData
|
||||
* @returns {SchemaElement[]}
|
||||
*/
|
||||
export function schemaFromColumnData(columnData) {
|
||||
/** @type {SchemaElement[]} */
|
||||
const schema = [{
|
||||
name: 'root',
|
||||
num_children: columnData.length,
|
||||
}]
|
||||
let num_rows = 0
|
||||
for (const { name, data, type } of columnData) {
|
||||
// check if all columns have the same length
|
||||
if (num_rows === 0) {
|
||||
num_rows = data.length
|
||||
} else if (num_rows !== data.length) {
|
||||
throw new Error('columns must have the same length')
|
||||
}
|
||||
// auto-detect type
|
||||
const schemaElement = getSchemaElementForValues(name, data, type)
|
||||
if (!schemaElement.type) throw new Error(`column ${name} cannot determine type`)
|
||||
schema.push(schemaElement)
|
||||
}
|
||||
return schema
|
||||
}
|
||||
|
||||
11
src/types.d.ts
vendored
11
src/types.d.ts
vendored
@ -1,5 +1,4 @@
|
||||
import type { DecodedArray, ParquetType } from 'hyparquet'
|
||||
import type { KeyValue } from 'hyparquet/src/types.js' // TODO export from hyparquet
|
||||
import type { ConvertedType, DecodedArray, FieldRepetitionType, KeyValue, LogicalType, ParquetType } from 'hyparquet'
|
||||
|
||||
export interface ParquetWriteOptions {
|
||||
writer: Writer
|
||||
@ -13,7 +12,15 @@ export interface ParquetWriteOptions {
|
||||
export interface ColumnData {
|
||||
name: string
|
||||
data: DecodedArray
|
||||
// fields from SchemaElement:
|
||||
type?: ParquetType
|
||||
type_length?: number
|
||||
repetition_type?: FieldRepetitionType
|
||||
converted_type?: ConvertedType
|
||||
scale?: number
|
||||
precision?: number
|
||||
field_id?: number
|
||||
logical_type?: LogicalType
|
||||
}
|
||||
|
||||
export interface Writer {
|
||||
|
||||
@ -9,9 +9,16 @@
|
||||
*/
|
||||
export function unconvert(schemaElement, values) {
|
||||
const ctype = schemaElement.converted_type
|
||||
// TODO: DECIMAL
|
||||
if (ctype === 'DATE') {
|
||||
return values.map(v => v.getTime())
|
||||
}
|
||||
if (ctype === 'TIMESTAMP_MILLIS') {
|
||||
return Array.from(values).map(v => BigInt(v.getTime()))
|
||||
}
|
||||
if (ctype === 'TIMESTAMP_MICROS') {
|
||||
return Array.from(values).map(v => BigInt(v.getTime() * 1000))
|
||||
}
|
||||
if (ctype === 'JSON') {
|
||||
if (!Array.isArray(values)) throw new Error('JSON must be an array')
|
||||
const encoder = new TextEncoder()
|
||||
|
||||
@ -20,7 +20,7 @@ export const basicData = [
|
||||
{ name: 'bool', data: [true, false, true, false] },
|
||||
{ name: 'int', data: [0, 127, 0x7fff, 0x7fffffff] },
|
||||
{ name: 'bigint', data: [0n, 127n, 0x7fffn, 0x7fffffffffffffffn] },
|
||||
{ name: 'float', data: [0, 0.0001, 123.456, 1e100], type: 'FLOAT' },
|
||||
{ name: 'float', data: [0, 0.0001, 123.456, 1e100], type: 'FLOAT', repetition_type: 'REQUIRED' },
|
||||
{ name: 'double', data: [0, 0.0001, 123.456, 1e100] },
|
||||
{ name: 'string', data: ['a', 'b', 'c', 'd'] },
|
||||
{ name: 'nullable', data: [true, false, null, null] },
|
||||
@ -199,7 +199,7 @@ describe('parquetWriteBuffer', () => {
|
||||
|
||||
it('throws for wrong type specified', () => {
|
||||
expect(() => parquetWriteBuffer({ columnData: [{ name: 'int', data: [1, 2, 3], type: 'BOOLEAN' }] }))
|
||||
.toThrow('parquet cannot write mixed types')
|
||||
.toThrow('parquet expected boolean value')
|
||||
})
|
||||
|
||||
it('throws for empty column with no type specified', () => {
|
||||
|
||||
@ -8,7 +8,7 @@ import { basicData } from './write.buffer.test.js'
|
||||
const filedir = 'data/'
|
||||
const filename = 'data/write.file.parquet'
|
||||
|
||||
describe('parquetWrite with FileWriter', () => {
|
||||
describe('parquetWriteFile', () => {
|
||||
beforeEach(() => {
|
||||
// ensure data directory exists
|
||||
if (!fs.existsSync(filedir)) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user