Write metadata

This commit is contained in:
Kenny Daniel 2025-03-25 17:49:59 -07:00
parent a0560ee412
commit c7d84e0e9d
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
3 changed files with 192 additions and 0 deletions

81
src/metadata.js Normal file

@ -0,0 +1,81 @@
import { Encoding, ParquetType } from 'hyparquet/src/constants.js'
import { serializeTCompactProtocol } from './thrift.js'
const CompressionCodec = [
'UNCOMPRESSED',
'SNAPPY',
'GZIP',
'LZO',
'BROTLI',
'LZ4',
'ZSTD',
'LZ4_RAW',
]
/**
* @import {FileMetaData} from 'hyparquet'
* @import {Writer} from './writer.js'
* @param {Writer} writer
* @param {FileMetaData} metadata
*/
export function writeMetadata(writer, metadata) {
const compact = {
field_1: metadata.version,
field_2: metadata.schema && metadata.schema.map(element => ({
field_1: element.type && ParquetType.indexOf(element.type),
field_2: element.type_length,
field_3: element.repetition_type,
field_4: element.name,
field_5: element.num_children,
field_6: element.converted_type,
field_7: element.scale,
field_8: element.precision,
field_9: element.field_id,
field_10: element.logical_type,
})),
field_3: metadata.num_rows,
field_4: metadata.row_groups.map(rg => ({
field_1: rg.columns.map(c => ({
field_1: c.file_path,
field_2: c.file_offset,
field_3: c.meta_data && {
field_1: ParquetType.indexOf(c.meta_data.type),
field_2: c.meta_data.encodings.map(e => Encoding.indexOf(e)), // WTF simplfy?
field_3: c.meta_data.path_in_schema,
field_4: CompressionCodec.indexOf(c.meta_data.codec),
field_5: c.meta_data.num_values,
field_6: c.meta_data.total_uncompressed_size,
field_7: c.meta_data.total_compressed_size,
field_8: c.meta_data.key_value_metadata,
field_9: c.meta_data.data_page_offset,
field_10: c.meta_data.index_page_offset,
field_11: c.meta_data.dictionary_page_offset,
field_12: c.meta_data.statistics,
field_13: c.meta_data.encoding_stats,
field_14: c.meta_data.bloom_filter_offset,
field_15: c.meta_data.bloom_filter_length,
field_16: c.meta_data.size_statistics,
},
field_4: c.offset_index_offset,
field_5: c.offset_index_length,
field_6: c.column_index_offset,
field_7: c.column_index_length,
field_8: c.crypto_metadata,
field_9: c.encrypted_column_metadata,
})),
field_2: rg.total_byte_size,
field_3: rg.num_rows,
field_4: rg.sorting_columns,
field_5: rg.file_offset,
field_6: rg.total_compressed_size,
field_7: rg.ordinal,
})),
field_5: metadata.key_value_metadata,
field_6: metadata.created_by,
}
const metadataStart = writer.offset
serializeTCompactProtocol(writer, compact)
const metadataLength = writer.offset - metadataStart
writer.appendUint32(metadataLength)
}

@ -29,6 +29,8 @@ export function serializeTCompactProtocol(writer, data) {
let lastFid = 0
// Write each field
for (const [key, value] of Object.entries(data)) {
if (value === undefined) continue
// We expect key = "field_N" so we can extract N as the field ID
const fid = parseInt(key.replace(/^field_/, ''), 10)
if (Number.isNaN(fid)) {
@ -179,6 +181,8 @@ function writeElement(writer, type, value) {
// Recursively write sub-fields as "field_N: val", end with STOP
let lastFid = 0
for (const [k, v] of Object.entries(value)) {
if (v === undefined) continue
const fid = parseInt(k.replace(/^field_/, ''), 10)
if (Number.isNaN(fid)) {
throw new Error(`Invalid sub-field name: ${k}. Expected "field_###"`)

107
test/metadata.test.js Normal file

@ -0,0 +1,107 @@
import { parquetMetadata } from 'hyparquet'
import { describe, expect, it } from 'vitest'
import { Writer } from '../src/writer.js'
import { writeMetadata } from '../src/metadata.js'
/**
* @import {FileMetaData} from 'hyparquet'
* @type {FileMetaData}
*/
export const exampleMetadata = {
version: 2,
created_by: 'hyparquet',
schema: [
{ name: 'root', num_children: 4 },
{ name: 'bool', type: 'BOOLEAN' },
{ name: 'int', type: 'INT32' },
{ name: 'bigint', type: 'INT64' },
{ name: 'double', type: 'DOUBLE' },
],
num_rows: 4n,
row_groups: [{
columns: [
{
file_path: 'bool',
file_offset: 32n,
meta_data: {
type: 'BOOLEAN',
encodings: ['PLAIN'],
path_in_schema: ['bool'],
codec: 'UNCOMPRESSED',
num_values: 4n,
total_uncompressed_size: 28n,
total_compressed_size: 28n,
data_page_offset: 4n,
},
},
{
file_path: 'int',
file_offset: 75n,
meta_data: {
type: 'INT32',
encodings: ['PLAIN'],
path_in_schema: ['int'],
codec: 'UNCOMPRESSED',
num_values: 4n,
total_uncompressed_size: 43n,
total_compressed_size: 43n,
data_page_offset: 32n,
},
},
{
file_path: 'bigint',
file_offset: 134n,
meta_data: {
type: 'INT64',
encodings: ['PLAIN'],
path_in_schema: ['bigint'],
codec: 'UNCOMPRESSED',
num_values: 4n,
total_uncompressed_size: 59n,
total_compressed_size: 59n,
data_page_offset: 75n,
},
},
{
file_path: 'double',
file_offset: 193n,
meta_data: {
type: 'DOUBLE',
encodings: ['PLAIN'],
path_in_schema: ['double'],
codec: 'UNCOMPRESSED',
num_values: 4n,
total_uncompressed_size: 59n,
total_compressed_size: 59n,
data_page_offset: 134n,
},
},
],
total_byte_size: 189n,
num_rows: 4n,
}],
metadata_length: 219,
}
describe('writeMetadata', () => {
it('writes metadata and parses in hyparquet', () => {
const writer = new Writer()
// Write header PAR1
writer.appendUint32(0x31524150)
// Write metadata
/** @type {FileMetaData} */
writeMetadata(writer, exampleMetadata)
// Write footer PAR1
writer.appendUint32(0x31524150)
const file = writer.getBuffer()
const output = parquetMetadata(file)
/** @type {FileMetaData} */
expect(output).toEqual(exampleMetadata)
})
})