Move helpers to datapage.js

This commit is contained in:
Kenny Daniel 2025-04-13 14:44:48 -06:00
parent 4070d0dc49
commit 6226d50734
No known key found for this signature in database
GPG Key ID: FDF16101AF5AFD3A
5 changed files with 141 additions and 118 deletions

@ -1,15 +1,11 @@
import { Encoding, PageType } from 'hyparquet/src/constants.js'
import { unconvert } from './unconvert.js'
import { writeRleBitPackedHybrid } from './encoding.js'
import { writePlain } from './plain.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
import { snappyCompress } from './snappy.js'
import { serializeTCompactProtocol } from './thrift.js'
import { ByteWriter } from './bytewriter.js'
import { writeLevels, writePageHeader } from './datapage.js'
/**
* @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType, SchemaElement, Statistics} from 'hyparquet'
* @import {ThriftObject, Writer} from '../src/types.js'
* @param {Writer} writer
* @param {SchemaElement[]} schemaPath
* @param {DecodedArray} values
@ -21,40 +17,21 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) {
const schemaElement = schemaPath[schemaPath.length - 1]
const { type } = schemaElement
if (!type) throw new Error(`column ${schemaElement.name} cannot determine type`)
let dataType = type
const offsetStart = writer.offset
const num_values = values.length
/** @type {Statistics | undefined} */
let statistics = undefined
// Compute statistics
if (stats) {
let min_value = undefined
let max_value = undefined
let null_count = 0n
for (const value of values) {
if (value === null || value === undefined) {
null_count++
continue
}
if (min_value === undefined || value < min_value) {
min_value = value
}
if (max_value === undefined || value > max_value) {
max_value = value
}
}
statistics = { min_value, max_value, null_count }
}
const statistics = stats ? getStatistics(values) : undefined
// Write levels to temp buffer
const levels = new ByteWriter()
const { definition_levels_byte_length, repetition_levels_byte_length, num_nulls } = writeLevels(levels, schemaPath, values)
const { definition_levels_byte_length, repetition_levels_byte_length, num_nulls }
= writeLevels(levels, schemaPath, values)
// dictionary encoding
let dictionary_page_offset = undefined
/** @type {DecodedArray | undefined} */
let dictionary = useDictionary(values, dataType)
const dictionary = useDictionary(values, type)
if (dictionary) {
dictionary_page_offset = BigInt(writer.offset)
@ -64,14 +41,10 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) {
indexes[i] = dictionary.indexOf(values[i])
}
values = indexes
dataType = 'INT32'
// unconvert dictionary and filter out nulls
dictionary = unconvert(schemaElement, dictionary)
.filter(v => v !== null && v !== undefined)
// write dictionary page data
writeDictionaryPage(writer, dictionary, type, compressed)
// write unconverted dictionary page
const unconverted = unconvert(schemaElement, dictionary)
writeDictionaryPage(writer, unconverted, type, compressed)
} else {
// unconvert type and filter out nulls
values = unconvert(schemaElement, values)
@ -136,33 +109,6 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) {
}
}
/**
* @param {Writer} writer
* @param {PageHeader} header
*/
function writePageHeader(writer, header) {
/** @type {ThriftObject} */
const compact = {
field_1: PageType.indexOf(header.type),
field_2: header.uncompressed_page_size,
field_3: header.compressed_page_size,
field_7: header.dictionary_page_header && {
field_1: header.dictionary_page_header.num_values,
field_2: Encoding.indexOf(header.dictionary_page_header.encoding),
},
field_8: header.data_page_header_v2 && {
field_1: header.data_page_header_v2.num_values,
field_2: header.data_page_header_v2.num_nulls,
field_3: header.data_page_header_v2.num_rows,
field_4: Encoding.indexOf(header.data_page_header_v2.encoding),
field_5: header.data_page_header_v2.definition_levels_byte_length,
field_6: header.data_page_header_v2.repetition_levels_byte_length,
field_7: header.data_page_header_v2.is_compressed ? undefined : false, // default true
},
}
serializeTCompactProtocol(writer, compact)
}
/**
* @param {DecodedArray} values
* @param {ParquetType} type
@ -171,6 +117,8 @@ function writePageHeader(writer, header) {
function useDictionary(values, type) {
if (type === 'BOOLEAN') return
const unique = new Set(values)
unique.delete(undefined)
unique.delete(null)
if (values.length > 10 && values.length / unique.size > 0.1) {
if (unique.size < values.length) {
// TODO: sort by frequency
@ -212,35 +160,26 @@ function writeDictionaryPage(writer, dictionary, type, compressed) {
}
/**
* @param {Writer} writer
* @param {SchemaElement[]} schemaPath
* @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType, SchemaElement, Statistics} from 'hyparquet'
* @import {Writer} from '../src/types.js'
* @param {DecodedArray} values
* @returns {{ definition_levels_byte_length: number, repetition_levels_byte_length: number, num_nulls: number}}
* @returns {Statistics}
*/
function writeLevels(writer, schemaPath, values) {
let num_nulls = 0
// TODO: repetition levels
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
let repetition_levels_byte_length = 0
if (maxRepetitionLevel) {
repetition_levels_byte_length = writeRleBitPackedHybrid(writer, [])
}
// definition levels
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
let definition_levels_byte_length = 0
if (maxDefinitionLevel) {
const definitionLevels = []
for (const value of values) {
if (value === null || value === undefined) {
definitionLevels.push(maxDefinitionLevel - 1)
num_nulls++
} else {
definitionLevels.push(maxDefinitionLevel)
}
function getStatistics(values) {
let min_value = undefined
let max_value = undefined
let null_count = 0n
for (const value of values) {
if (value === null || value === undefined) {
null_count++
continue
}
if (min_value === undefined || value < min_value) {
min_value = value
}
if (max_value === undefined || value > max_value) {
max_value = value
}
definition_levels_byte_length = writeRleBitPackedHybrid(writer, definitionLevels)
}
return { definition_levels_byte_length, repetition_levels_byte_length, num_nulls }
return { min_value, max_value, null_count }
}

75
src/datapage.js Normal file

@ -0,0 +1,75 @@
import { Encoding, PageType } from 'hyparquet/src/constants.js'
import { writeRleBitPackedHybrid } from './encoding.js'
import { serializeTCompactProtocol } from './thrift.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
/**
* @import {DecodedArray, PageHeader, SchemaElement} from 'hyparquet'
* @import {Writer} from '../src/types.js'
* @param {Writer} writer
* @param {PageHeader} header
*/
export function writePageHeader(writer, header) {
/** @type {import('../src/types.js').ThriftObject} */
const compact = {
field_1: PageType.indexOf(header.type),
field_2: header.uncompressed_page_size,
field_3: header.compressed_page_size,
field_4: header.crc,
field_5: header.data_page_header && {
field_1: header.data_page_header.num_values,
field_2: Encoding.indexOf(header.data_page_header.encoding),
field_3: Encoding.indexOf(header.data_page_header.definition_level_encoding),
field_4: Encoding.indexOf(header.data_page_header.repetition_level_encoding),
// field_5: header.data_page_header.statistics,
},
field_7: header.dictionary_page_header && {
field_1: header.dictionary_page_header.num_values,
field_2: Encoding.indexOf(header.dictionary_page_header.encoding),
},
field_8: header.data_page_header_v2 && {
field_1: header.data_page_header_v2.num_values,
field_2: header.data_page_header_v2.num_nulls,
field_3: header.data_page_header_v2.num_rows,
field_4: Encoding.indexOf(header.data_page_header_v2.encoding),
field_5: header.data_page_header_v2.definition_levels_byte_length,
field_6: header.data_page_header_v2.repetition_levels_byte_length,
field_7: header.data_page_header_v2.is_compressed ? undefined : false, // default true
},
}
serializeTCompactProtocol(writer, compact)
}
/**
* @param {Writer} writer
* @param {SchemaElement[]} schemaPath
* @param {DecodedArray} values
* @returns {{ definition_levels_byte_length: number, repetition_levels_byte_length: number, num_nulls: number}}
*/
export function writeLevels(writer, schemaPath, values) {
let num_nulls = 0
// TODO: repetition levels
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
let repetition_levels_byte_length = 0
if (maxRepetitionLevel) {
repetition_levels_byte_length = writeRleBitPackedHybrid(writer, [])
}
// definition levels
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
let definition_levels_byte_length = 0
if (maxDefinitionLevel) {
const definitionLevels = []
for (const value of values) {
if (value === null || value === undefined) {
definitionLevels.push(maxDefinitionLevel - 1)
num_nulls++
} else {
definitionLevels.push(maxDefinitionLevel)
}
}
definition_levels_byte_length = writeRleBitPackedHybrid(writer, definitionLevels)
}
return { definition_levels_byte_length, repetition_levels_byte_length, num_nulls }
}

@ -1,6 +1,6 @@
import { CompressionCodec, ConvertedType, Encoding, FieldRepetitionType, PageType, ParquetType } from 'hyparquet/src/constants.js'
import { serializeTCompactProtocol } from './thrift.js'
import { unconvertMetadata } from './unconvert.js'
import { unconvertStatistics } from './unconvert.js'
/**
* @import {FileMetaData, LogicalType, TimeUnit} from 'hyparquet'
@ -44,16 +44,7 @@ export function writeMetadata(writer, metadata) {
field_9: c.meta_data.data_page_offset,
field_10: c.meta_data.index_page_offset,
field_11: c.meta_data.dictionary_page_offset,
field_12: c.meta_data.statistics && {
field_1: unconvertMetadata(c.meta_data.statistics.max, metadata.schema[columnIndex + 1]),
field_2: unconvertMetadata(c.meta_data.statistics.min, metadata.schema[columnIndex + 1]),
field_3: c.meta_data.statistics.null_count,
field_4: c.meta_data.statistics.distinct_count,
field_5: unconvertMetadata(c.meta_data.statistics.max_value, metadata.schema[columnIndex + 1]),
field_6: unconvertMetadata(c.meta_data.statistics.min_value, metadata.schema[columnIndex + 1]),
field_7: c.meta_data.statistics.is_max_value_exact,
field_8: c.meta_data.statistics.is_min_value_exact,
},
field_12: c.meta_data.statistics && unconvertStatistics(c.meta_data.statistics, metadata.schema[columnIndex + 1]),
field_13: c.meta_data.encoding_stats && c.meta_data.encoding_stats.map(es => ({
field_1: PageType.indexOf(es.page_type),
field_2: Encoding.indexOf(es.encoding),

@ -3,7 +3,7 @@ const dayMillis = 86400000 // 1 day in milliseconds
/**
* Convert from rich to primitive types.
*
* @import {DecodedArray, SchemaElement} from 'hyparquet'
* @import {DecodedArray, SchemaElement, Statistics} from 'hyparquet'
* @param {SchemaElement} element
* @param {DecodedArray} values
* @returns {DecodedArray}
@ -48,7 +48,7 @@ export function unconvert(element, values) {
* @param {SchemaElement} element
* @returns {Uint8Array | undefined}
*/
export function unconvertMetadata(value, element) {
export function unconvertMinMax(value, element) {
if (value === undefined || value === null) return undefined
const { type, converted_type } = element
if (type === 'BOOLEAN') return new Uint8Array([value ? 1 : 0])
@ -90,6 +90,24 @@ export function unconvertMetadata(value, element) {
throw new Error(`unsupported type for statistics: ${type} with value ${value}`)
}
/**
* @param {Statistics} stats
* @param {SchemaElement} element
* @returns {import('../src/types.js').ThriftObject}
*/
export function unconvertStatistics(stats, element) {
return {
field_1: unconvertMinMax(stats.max, element),
field_2: unconvertMinMax(stats.min, element),
field_3: stats.null_count,
field_4: stats.distinct_count,
field_5: unconvertMinMax(stats.max_value, element),
field_6: unconvertMinMax(stats.min_value, element),
field_7: stats.is_max_value_exact,
field_8: stats.is_min_value_exact,
}
}
/**
* @param {SchemaElement} element
* @param {bigint} value

@ -1,5 +1,5 @@
import { describe, expect, it } from 'vitest'
import { unconvert, unconvertDecimal, unconvertMetadata } from '../src/unconvert.js'
import { unconvert, unconvertDecimal, unconvertMinMax } from '../src/unconvert.js'
import { convertMetadata } from 'hyparquet/src/metadata.js'
/**
@ -60,18 +60,18 @@ describe('unconvert', () => {
})
})
describe('unconvertMetadata', () => {
describe('unconvertMinMax', () => {
it('should return undefined if value is undefined or null', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'INT32' }
expect(unconvertMetadata(undefined, schema)).toBeUndefined()
expect(unconvertMinMax(undefined, schema)).toBeUndefined()
})
it('should handle BOOLEAN type', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'BOOLEAN' }
expect(unconvertMetadata(true, schema)).toEqual(new Uint8Array([1]))
expect(unconvertMetadata(false, schema)).toEqual(new Uint8Array([0]))
expect(unconvertMinMax(true, schema)).toEqual(new Uint8Array([1]))
expect(unconvertMinMax(false, schema)).toEqual(new Uint8Array([0]))
})
it('should truncate BYTE_ARRAY or FIXED_LEN_BYTE_ARRAY to 16 bytes', () => {
@ -80,21 +80,21 @@ describe('unconvertMetadata', () => {
const longStrUint8 = new TextEncoder().encode(longStr)
// value is a Uint8Array
const result1 = unconvertMetadata(longStrUint8, { name: 'test', type: 'BYTE_ARRAY' })
const result1 = unconvertMinMax(longStrUint8, { name: 'test', type: 'BYTE_ARRAY' })
expect(result1).toBeInstanceOf(Uint8Array)
expect(result1?.length).toBe(16) // truncated
expect(result1?.length).toBe(16)
// value is a string
const result2 = unconvertMetadata(longStr, { name: 'test', type: 'FIXED_LEN_BYTE_ARRAY' })
const result2 = unconvertMinMax(longStr, { name: 'test', type: 'FIXED_LEN_BYTE_ARRAY' })
expect(result2).toBeInstanceOf(Uint8Array)
expect(result2?.length).toBe(16) // truncated
expect(result2?.length).toBe(16)
})
it('should correctly encode FLOAT values in little-endian', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'FLOAT' }
const value = 1.5
const result = unconvertMetadata(value, schema)
const result = unconvertMinMax(value, schema)
expect(result).toBeInstanceOf(Uint8Array)
const roundtrip = convertMetadata(result, schema)
expect(roundtrip).toEqual(1.5)
@ -104,7 +104,7 @@ describe('unconvertMetadata', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'DOUBLE' }
const value = 1.123456789
const result = unconvertMetadata(value, schema)
const result = unconvertMinMax(value, schema)
expect(result).toBeInstanceOf(Uint8Array)
const roundtrip = convertMetadata(result, schema)
expect(roundtrip).toEqual(1.123456789)
@ -114,7 +114,7 @@ describe('unconvertMetadata', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'INT32' }
const value = 123456
const result = unconvertMetadata(value, schema)
const result = unconvertMinMax(value, schema)
const roundtrip = convertMetadata(result, schema)
expect(roundtrip).toEqual(123456)
})
@ -123,7 +123,7 @@ describe('unconvertMetadata', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'INT64' }
const value = 1234567890123456789n
const result = unconvertMetadata(value, schema)
const result = unconvertMinMax(value, schema)
const roundtrip = convertMetadata(result, schema)
expect(roundtrip).toEqual(1234567890123456789n)
})
@ -132,7 +132,7 @@ describe('unconvertMetadata', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'INT64', converted_type: 'TIMESTAMP_MILLIS' }
const date = new Date('2023-01-01T00:00:00Z')
const result = unconvertMetadata(date, schema)
const result = unconvertMinMax(date, schema)
const roundtrip = convertMetadata(result, schema)
expect(roundtrip).toEqual(date)
})
@ -140,14 +140,14 @@ describe('unconvertMetadata', () => {
it('should throw an error for unsupported types', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'INT96' }
expect(() => unconvertMetadata(123, schema))
expect(() => unconvertMinMax(123, schema))
.toThrow('unsupported type for statistics: INT96 with value 123')
})
it('should throw an error for INT64 if value is a number instead of bigint or Date', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'INT64' }
expect(() => unconvertMetadata(123, schema))
expect(() => unconvertMinMax(123, schema))
.toThrow('unsupported type for statistics: INT64 with value 123')
})
})