hyparquet-writer/src/column.js

124 lines
3.8 KiB
JavaScript
Raw Normal View History

2025-03-26 04:06:43 +00:00
import { Encoding, PageType } from 'hyparquet/src/constants.js'
import { unconvert } from './convert.js'
2025-03-26 04:06:43 +00:00
import { writeRleBitPackedHybrid } from './encoding.js'
import { writePlain } from './plain.js'
import { serializeTCompactProtocol } from './thrift.js'
import { Writer } from './writer.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
2025-03-26 04:06:43 +00:00
/**
* @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType, SchemaElement} from 'hyparquet'
2025-03-26 04:06:43 +00:00
* @param {Writer} writer
* @param {SchemaElement[]} schemaPath
2025-03-26 04:06:43 +00:00
* @param {DecodedArray} values
* @returns {ColumnMetaData}
*/
export function writeColumn(writer, schemaPath, values) {
const schemaElement = schemaPath[schemaPath.length - 1]
const { type } = schemaElement
if (!type) throw new Error(`column ${schemaElement.name} cannot determine type`)
2025-03-26 04:06:43 +00:00
const offsetStart = writer.offset
2025-03-26 05:36:06 +00:00
let num_nulls = 0
2025-03-26 04:06:43 +00:00
// Unconvert type if necessary
values = unconvert(schemaElement, values)
2025-03-26 04:06:43 +00:00
// Write page to temp buffer
const page = new Writer()
/** @type {import('hyparquet/src/types.js').Encoding} */
const encoding = 'PLAIN'
// TODO: repetition levels
2025-03-26 05:36:06 +00:00
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
2025-03-26 04:06:43 +00:00
let repetition_levels_byte_length = 0
if (maxRepetitionLevel) {
repetition_levels_byte_length = writeRleBitPackedHybrid(page, [])
}
// definition levels
2025-03-26 05:36:06 +00:00
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
2025-03-26 04:06:43 +00:00
let definition_levels_byte_length = 0
if (maxDefinitionLevel) {
2025-03-26 05:36:06 +00:00
const definitionLevels = []
for (const value of values) {
if (value === null || value === undefined) {
definitionLevels.push(maxDefinitionLevel - 1)
num_nulls++
} else {
definitionLevels.push(maxDefinitionLevel)
}
}
definition_levels_byte_length = writeRleBitPackedHybrid(page, definitionLevels)
2025-03-26 04:06:43 +00:00
}
2025-03-26 05:36:06 +00:00
// write page data
writePageData(page, values, type)
// TODO: compress page data
2025-03-26 04:06:43 +00:00
// write page header
/** @type {PageHeader} */
const header = {
type: 'DATA_PAGE_V2',
2025-03-26 05:36:06 +00:00
uncompressed_page_size: page.offset,
compressed_page_size: page.offset,
2025-03-26 04:06:43 +00:00
data_page_header_v2: {
num_values: values.length,
num_nulls,
num_rows: values.length,
encoding,
definition_levels_byte_length,
repetition_levels_byte_length,
is_compressed: false,
},
}
writePageHeader(writer, header)
// write page data
writer.appendBuffer(page.getBuffer())
return {
type,
encodings: ['PLAIN'],
2025-03-26 05:36:06 +00:00
path_in_schema: schemaPath.slice(1).map(s => s.name),
2025-03-26 04:06:43 +00:00
codec: 'UNCOMPRESSED',
num_values: BigInt(values.length),
total_compressed_size: BigInt(writer.offset - offsetStart),
total_uncompressed_size: BigInt(writer.offset - offsetStart),
data_page_offset: BigInt(offsetStart),
}
}
/**
* @param {Writer} writer
* @param {PageHeader} header
*/
function writePageHeader(writer, header) {
const compact = {
field_1: PageType.indexOf(header.type),
field_2: header.uncompressed_page_size,
field_3: header.compressed_page_size,
field_8: header.data_page_header_v2 && {
field_1: header.data_page_header_v2.num_values,
field_2: header.data_page_header_v2.num_nulls,
field_3: header.data_page_header_v2.num_rows,
field_4: Encoding.indexOf(header.data_page_header_v2.encoding),
field_5: header.data_page_header_v2.definition_levels_byte_length,
field_6: header.data_page_header_v2.repetition_levels_byte_length,
field_7: header.data_page_header_v2.is_compressed ? undefined : false, // default true
},
}
serializeTCompactProtocol(writer, compact)
}
/**
* @param {Writer} writer
* @param {DecodedArray} values
* @param {ParquetType} type
*/
function writePageData(writer, values, type) {
// write plain data
writePlain(writer, values, type)
2025-03-26 05:36:06 +00:00
}