mirror of
https://github.com/asadbek064/hyparquet-writer.git
synced 2025-12-05 23:31:54 +00:00
Initial writer implementation
This commit is contained in:
parent
c20356087b
commit
05ee5e274f
126
src/column.js
Normal file
126
src/column.js
Normal file
@ -0,0 +1,126 @@
|
||||
import { Encoding, PageType } from 'hyparquet/src/constants.js'
|
||||
import { writeRleBitPackedHybrid } from './encoding.js'
|
||||
import { writePlain } from './plain.js'
|
||||
import { serializeTCompactProtocol } from './thrift.js'
|
||||
import { Writer } from './writer.js'
|
||||
|
||||
/**
|
||||
* @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType} from 'hyparquet/src/types.js'
|
||||
* @param {Writer} writer
|
||||
* @param {string} columnName
|
||||
* @param {DecodedArray} values
|
||||
* @param {ParquetType} type
|
||||
* @returns {ColumnMetaData}
|
||||
*/
|
||||
export function writeColumn(writer, columnName, values, type) {
|
||||
// Get data stats
|
||||
const num_nulls = values.filter(v => v === null).length
|
||||
const offsetStart = writer.offset
|
||||
|
||||
// Write page to temp buffer
|
||||
const page = new Writer()
|
||||
|
||||
/** @type {import('hyparquet/src/types.js').Encoding} */
|
||||
const encoding = 'PLAIN'
|
||||
|
||||
// TODO: repetition levels
|
||||
const maxRepetitionLevel = 0
|
||||
let repetition_levels_byte_length = 0
|
||||
if (maxRepetitionLevel) {
|
||||
repetition_levels_byte_length = writeRleBitPackedHybrid(page, [])
|
||||
}
|
||||
|
||||
// TODO: definition levels
|
||||
const maxDefinitionLevel = 0
|
||||
let definition_levels_byte_length = 0
|
||||
if (maxDefinitionLevel) {
|
||||
definition_levels_byte_length = writeRleBitPackedHybrid(page, [])
|
||||
}
|
||||
|
||||
// write page data (TODO: compressed)
|
||||
const { uncompressed_page_size, compressed_page_size } = writePageData(page, values, type)
|
||||
|
||||
// write page header
|
||||
/** @type {PageHeader} */
|
||||
const header = {
|
||||
type: 'DATA_PAGE_V2',
|
||||
uncompressed_page_size,
|
||||
compressed_page_size,
|
||||
data_page_header_v2: {
|
||||
num_values: values.length,
|
||||
num_nulls,
|
||||
num_rows: values.length,
|
||||
encoding,
|
||||
definition_levels_byte_length,
|
||||
repetition_levels_byte_length,
|
||||
is_compressed: false,
|
||||
},
|
||||
}
|
||||
writePageHeader(writer, header)
|
||||
|
||||
// write page data
|
||||
writer.appendBuffer(page.getBuffer())
|
||||
|
||||
return {
|
||||
type,
|
||||
encodings: ['PLAIN'],
|
||||
path_in_schema: [columnName],
|
||||
codec: 'UNCOMPRESSED',
|
||||
num_values: BigInt(values.length),
|
||||
total_compressed_size: BigInt(writer.offset - offsetStart),
|
||||
total_uncompressed_size: BigInt(writer.offset - offsetStart),
|
||||
data_page_offset: BigInt(offsetStart),
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deduce a ParquetType from the JS value
|
||||
*
|
||||
* @param {DecodedArray} values
|
||||
* @returns {ParquetType}
|
||||
*/
|
||||
export function getParquetTypeForValues(values) {
|
||||
if (values.every(v => typeof v === 'boolean')) return 'BOOLEAN'
|
||||
if (values.every(v => typeof v === 'bigint')) return 'INT64'
|
||||
if (values.every(v => Number.isInteger(v))) return 'INT32'
|
||||
if (values.every(v => typeof v === 'number')) return 'DOUBLE'
|
||||
if (values.every(v => typeof v === 'string')) return 'BYTE_ARRAY'
|
||||
throw new Error(`Cannot determine parquet type for: ${values}`)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Writer} writer
|
||||
* @param {PageHeader} header
|
||||
*/
|
||||
function writePageHeader(writer, header) {
|
||||
const compact = {
|
||||
field_1: PageType.indexOf(header.type),
|
||||
field_2: header.uncompressed_page_size,
|
||||
field_3: header.compressed_page_size,
|
||||
field_8: header.data_page_header_v2 && {
|
||||
field_1: header.data_page_header_v2.num_values,
|
||||
field_2: header.data_page_header_v2.num_nulls,
|
||||
field_3: header.data_page_header_v2.num_rows,
|
||||
field_4: Encoding.indexOf(header.data_page_header_v2.encoding),
|
||||
field_5: header.data_page_header_v2.definition_levels_byte_length,
|
||||
field_6: header.data_page_header_v2.repetition_levels_byte_length,
|
||||
field_7: header.data_page_header_v2.is_compressed ? undefined : false, // default true
|
||||
},
|
||||
}
|
||||
serializeTCompactProtocol(writer, compact)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Writer} writer
|
||||
* @param {DecodedArray} values
|
||||
* @param {ParquetType} type
|
||||
* @returns {{ uncompressed_page_size: number, compressed_page_size: number }}
|
||||
*/
|
||||
function writePageData(writer, values, type) {
|
||||
// write plain data
|
||||
const startOffset = writer.offset
|
||||
writePlain(writer, values, type)
|
||||
const size = writer.offset - startOffset
|
||||
|
||||
return { uncompressed_page_size: size, compressed_page_size: size }
|
||||
}
|
||||
@ -0,0 +1 @@
|
||||
export { parquetWrite } from './write.js'
|
||||
@ -1,4 +1,4 @@
|
||||
import { Encoding, FieldRepetitionType, ParquetType } from 'hyparquet/src/constants.js'
|
||||
import { ConvertedType, Encoding, FieldRepetitionType, ParquetType } from 'hyparquet/src/constants.js'
|
||||
import { serializeTCompactProtocol } from './thrift.js'
|
||||
|
||||
const CompressionCodec = [
|
||||
@ -27,7 +27,7 @@ export function writeMetadata(writer, metadata) {
|
||||
field_3: element.repetition_type && FieldRepetitionType.indexOf(element.repetition_type),
|
||||
field_4: element.name,
|
||||
field_5: element.num_children,
|
||||
field_6: element.converted_type,
|
||||
field_6: element.converted_type && ConvertedType.indexOf(element.converted_type),
|
||||
field_7: element.scale,
|
||||
field_8: element.precision,
|
||||
field_9: element.field_id,
|
||||
@ -40,7 +40,7 @@ export function writeMetadata(writer, metadata) {
|
||||
field_2: c.file_offset,
|
||||
field_3: c.meta_data && {
|
||||
field_1: ParquetType.indexOf(c.meta_data.type),
|
||||
field_2: c.meta_data.encodings.map(e => Encoding.indexOf(e)), // WTF simplfy?
|
||||
field_2: c.meta_data.encodings.map(e => Encoding.indexOf(e)),
|
||||
field_3: c.meta_data.path_in_schema,
|
||||
field_4: CompressionCodec.indexOf(c.meta_data.codec),
|
||||
field_5: c.meta_data.num_values,
|
||||
|
||||
78
src/write.js
Normal file
78
src/write.js
Normal file
@ -0,0 +1,78 @@
|
||||
import { getParquetTypeForValues, writeColumn } from './column.js'
|
||||
import { Writer } from './writer.js'
|
||||
import { writeMetadata } from './metadata.js'
|
||||
|
||||
/**
|
||||
* Write data as parquet to an ArrayBuffer
|
||||
*
|
||||
* @import {ColumnChunk, DecodedArray, FileMetaData, SchemaElement} from 'hyparquet'
|
||||
* @param {Record<string, DecodedArray>} columnData
|
||||
* @returns {ArrayBuffer}
|
||||
*/
|
||||
export function parquetWrite(columnData) {
|
||||
const writer = new Writer()
|
||||
|
||||
// Check if all columns have the same length
|
||||
const columnNames = Object.keys(columnData)
|
||||
const num_rows = columnNames.length ? BigInt(columnData[columnNames[0]].length) : 0n
|
||||
for (const name of columnNames) {
|
||||
if (BigInt(columnData[name].length) !== num_rows) {
|
||||
throw new Error('parquetWrite: all columns must have the same length')
|
||||
}
|
||||
}
|
||||
|
||||
// Write header PAR1
|
||||
writer.appendUint32(0x31524150)
|
||||
|
||||
// schema
|
||||
/** @type {SchemaElement[]} */
|
||||
const schema = [{
|
||||
name: 'root',
|
||||
num_children: columnNames.length,
|
||||
repetition_type: 'REQUIRED',
|
||||
}]
|
||||
|
||||
// row group columns
|
||||
/** @type {ColumnChunk[]} */
|
||||
const columns = []
|
||||
|
||||
// Write columns
|
||||
for (const name of columnNames) {
|
||||
const values = columnData[name]
|
||||
const type = getParquetTypeForValues(values)
|
||||
const file_offset = BigInt(writer.offset)
|
||||
const meta_data = writeColumn(writer, name, values, type)
|
||||
const repetition_type = 'REQUIRED'
|
||||
|
||||
// save metadata
|
||||
schema.push({ type, name, repetition_type })
|
||||
columns.push({
|
||||
file_path: name,
|
||||
file_offset,
|
||||
meta_data,
|
||||
})
|
||||
}
|
||||
|
||||
// Write metadata
|
||||
/** @type {FileMetaData} */
|
||||
const metadata = {
|
||||
version: 2,
|
||||
created_by: 'hyparquet',
|
||||
schema,
|
||||
num_rows,
|
||||
row_groups: [{
|
||||
columns,
|
||||
total_byte_size: BigInt(writer.offset - 4),
|
||||
num_rows,
|
||||
}],
|
||||
metadata_length: 0,
|
||||
}
|
||||
// @ts-ignore don't want to actually serialize metadata_length
|
||||
delete metadata.metadata_length
|
||||
writeMetadata(writer, metadata)
|
||||
|
||||
// Write footer PAR1
|
||||
writer.appendUint32(0x31524150)
|
||||
|
||||
return writer.getBuffer()
|
||||
}
|
||||
41
test/write.test.js
Normal file
41
test/write.test.js
Normal file
@ -0,0 +1,41 @@
|
||||
import { parquetMetadata, parquetReadObjects } from 'hyparquet'
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import { parquetWrite } from '../src/index.js'
|
||||
import { exampleMetadata } from './metadata.test.js'
|
||||
|
||||
/**
|
||||
* Utility to encode a parquet file and then read it back into a JS object.
|
||||
*
|
||||
* @param {Record<string, any[]>} columnData
|
||||
* @returns {Promise<Record<string, any>>}
|
||||
*/
|
||||
async function roundTripDeserialize(columnData) {
|
||||
const file = parquetWrite(columnData)
|
||||
return await parquetReadObjects({ file })
|
||||
}
|
||||
|
||||
const data = {
|
||||
bool: [true, false, true, false], // BOOLEAN
|
||||
int: [0, 127, 0x7fff, 0x7fffffff], // INT32
|
||||
bigint: [0n, 127n, 0x7fffn, 0x7fffffffffffffffn], // INT64
|
||||
double: [0, 0.0001, 123.456, 1e100], // DOUBLE
|
||||
string: ['a', 'b', 'c', 'd'], // BYTE_ARRAY
|
||||
}
|
||||
|
||||
describe('parquetWrite', () => {
|
||||
it('writes expected metadata', () => {
|
||||
const file = parquetWrite(data)
|
||||
const metadata = parquetMetadata(file)
|
||||
expect(metadata).toEqual(exampleMetadata)
|
||||
})
|
||||
|
||||
it('serializes basic types correctly', async () => {
|
||||
const result = await roundTripDeserialize(data)
|
||||
expect(result).toEqual([
|
||||
{ bool: true, int: 0, bigint: 0n, double: 0, string: 'a' },
|
||||
{ bool: false, int: 127, bigint: 127n, double: 0.0001, string: 'b' },
|
||||
{ bool: true, int: 0x7fff, bigint: 0x7fffn, double: 123.456, string: 'c' },
|
||||
{ bool: false, int: 0x7fffffff, bigint: 0x7fffffffffffffffn, double: 1e100, string: 'd' },
|
||||
])
|
||||
})
|
||||
})
|
||||
Loading…
Reference in New Issue
Block a user