diff --git a/src/column.js b/src/column.js index 87d5165..1ba1b41 100644 --- a/src/column.js +++ b/src/column.js @@ -1,15 +1,11 @@ -import { Encoding, PageType } from 'hyparquet/src/constants.js' import { unconvert } from './unconvert.js' import { writeRleBitPackedHybrid } from './encoding.js' import { writePlain } from './plain.js' -import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' import { snappyCompress } from './snappy.js' -import { serializeTCompactProtocol } from './thrift.js' import { ByteWriter } from './bytewriter.js' +import { writeLevels, writePageHeader } from './datapage.js' /** - * @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType, SchemaElement, Statistics} from 'hyparquet' - * @import {ThriftObject, Writer} from '../src/types.js' * @param {Writer} writer * @param {SchemaElement[]} schemaPath * @param {DecodedArray} values @@ -21,40 +17,21 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) { const schemaElement = schemaPath[schemaPath.length - 1] const { type } = schemaElement if (!type) throw new Error(`column ${schemaElement.name} cannot determine type`) - let dataType = type const offsetStart = writer.offset const num_values = values.length - /** @type {Statistics | undefined} */ - let statistics = undefined // Compute statistics - if (stats) { - let min_value = undefined - let max_value = undefined - let null_count = 0n - for (const value of values) { - if (value === null || value === undefined) { - null_count++ - continue - } - if (min_value === undefined || value < min_value) { - min_value = value - } - if (max_value === undefined || value > max_value) { - max_value = value - } - } - statistics = { min_value, max_value, null_count } - } + const statistics = stats ? getStatistics(values) : undefined // Write levels to temp buffer const levels = new ByteWriter() - const { definition_levels_byte_length, repetition_levels_byte_length, num_nulls } = writeLevels(levels, schemaPath, values) + const { definition_levels_byte_length, repetition_levels_byte_length, num_nulls } + = writeLevels(levels, schemaPath, values) // dictionary encoding let dictionary_page_offset = undefined /** @type {DecodedArray | undefined} */ - let dictionary = useDictionary(values, dataType) + const dictionary = useDictionary(values, type) if (dictionary) { dictionary_page_offset = BigInt(writer.offset) @@ -64,14 +41,10 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) { indexes[i] = dictionary.indexOf(values[i]) } values = indexes - dataType = 'INT32' - // unconvert dictionary and filter out nulls - dictionary = unconvert(schemaElement, dictionary) - .filter(v => v !== null && v !== undefined) - - // write dictionary page data - writeDictionaryPage(writer, dictionary, type, compressed) + // write unconverted dictionary page + const unconverted = unconvert(schemaElement, dictionary) + writeDictionaryPage(writer, unconverted, type, compressed) } else { // unconvert type and filter out nulls values = unconvert(schemaElement, values) @@ -136,33 +109,6 @@ export function writeColumn(writer, schemaPath, values, compressed, stats) { } } -/** - * @param {Writer} writer - * @param {PageHeader} header - */ -function writePageHeader(writer, header) { - /** @type {ThriftObject} */ - const compact = { - field_1: PageType.indexOf(header.type), - field_2: header.uncompressed_page_size, - field_3: header.compressed_page_size, - field_7: header.dictionary_page_header && { - field_1: header.dictionary_page_header.num_values, - field_2: Encoding.indexOf(header.dictionary_page_header.encoding), - }, - field_8: header.data_page_header_v2 && { - field_1: header.data_page_header_v2.num_values, - field_2: header.data_page_header_v2.num_nulls, - field_3: header.data_page_header_v2.num_rows, - field_4: Encoding.indexOf(header.data_page_header_v2.encoding), - field_5: header.data_page_header_v2.definition_levels_byte_length, - field_6: header.data_page_header_v2.repetition_levels_byte_length, - field_7: header.data_page_header_v2.is_compressed ? undefined : false, // default true - }, - } - serializeTCompactProtocol(writer, compact) -} - /** * @param {DecodedArray} values * @param {ParquetType} type @@ -171,6 +117,8 @@ function writePageHeader(writer, header) { function useDictionary(values, type) { if (type === 'BOOLEAN') return const unique = new Set(values) + unique.delete(undefined) + unique.delete(null) if (values.length > 10 && values.length / unique.size > 0.1) { if (unique.size < values.length) { // TODO: sort by frequency @@ -212,35 +160,26 @@ function writeDictionaryPage(writer, dictionary, type, compressed) { } /** - * @param {Writer} writer - * @param {SchemaElement[]} schemaPath + * @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType, SchemaElement, Statistics} from 'hyparquet' + * @import {Writer} from '../src/types.js' * @param {DecodedArray} values - * @returns {{ definition_levels_byte_length: number, repetition_levels_byte_length: number, num_nulls: number}} + * @returns {Statistics} */ -function writeLevels(writer, schemaPath, values) { - let num_nulls = 0 - - // TODO: repetition levels - const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) - let repetition_levels_byte_length = 0 - if (maxRepetitionLevel) { - repetition_levels_byte_length = writeRleBitPackedHybrid(writer, []) - } - - // definition levels - const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) - let definition_levels_byte_length = 0 - if (maxDefinitionLevel) { - const definitionLevels = [] - for (const value of values) { - if (value === null || value === undefined) { - definitionLevels.push(maxDefinitionLevel - 1) - num_nulls++ - } else { - definitionLevels.push(maxDefinitionLevel) - } +function getStatistics(values) { + let min_value = undefined + let max_value = undefined + let null_count = 0n + for (const value of values) { + if (value === null || value === undefined) { + null_count++ + continue + } + if (min_value === undefined || value < min_value) { + min_value = value + } + if (max_value === undefined || value > max_value) { + max_value = value } - definition_levels_byte_length = writeRleBitPackedHybrid(writer, definitionLevels) } - return { definition_levels_byte_length, repetition_levels_byte_length, num_nulls } + return { min_value, max_value, null_count } } diff --git a/src/datapage.js b/src/datapage.js new file mode 100644 index 0000000..953b2b8 --- /dev/null +++ b/src/datapage.js @@ -0,0 +1,75 @@ +import { Encoding, PageType } from 'hyparquet/src/constants.js' +import { writeRleBitPackedHybrid } from './encoding.js' +import { serializeTCompactProtocol } from './thrift.js' +import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' + +/** + * @import {DecodedArray, PageHeader, SchemaElement} from 'hyparquet' + * @import {Writer} from '../src/types.js' + * @param {Writer} writer + * @param {PageHeader} header + */ +export function writePageHeader(writer, header) { + /** @type {import('../src/types.js').ThriftObject} */ + const compact = { + field_1: PageType.indexOf(header.type), + field_2: header.uncompressed_page_size, + field_3: header.compressed_page_size, + field_4: header.crc, + field_5: header.data_page_header && { + field_1: header.data_page_header.num_values, + field_2: Encoding.indexOf(header.data_page_header.encoding), + field_3: Encoding.indexOf(header.data_page_header.definition_level_encoding), + field_4: Encoding.indexOf(header.data_page_header.repetition_level_encoding), + // field_5: header.data_page_header.statistics, + }, + field_7: header.dictionary_page_header && { + field_1: header.dictionary_page_header.num_values, + field_2: Encoding.indexOf(header.dictionary_page_header.encoding), + }, + field_8: header.data_page_header_v2 && { + field_1: header.data_page_header_v2.num_values, + field_2: header.data_page_header_v2.num_nulls, + field_3: header.data_page_header_v2.num_rows, + field_4: Encoding.indexOf(header.data_page_header_v2.encoding), + field_5: header.data_page_header_v2.definition_levels_byte_length, + field_6: header.data_page_header_v2.repetition_levels_byte_length, + field_7: header.data_page_header_v2.is_compressed ? undefined : false, // default true + }, + } + serializeTCompactProtocol(writer, compact) +} + +/** + * @param {Writer} writer + * @param {SchemaElement[]} schemaPath + * @param {DecodedArray} values + * @returns {{ definition_levels_byte_length: number, repetition_levels_byte_length: number, num_nulls: number}} + */ +export function writeLevels(writer, schemaPath, values) { + let num_nulls = 0 + + // TODO: repetition levels + const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) + let repetition_levels_byte_length = 0 + if (maxRepetitionLevel) { + repetition_levels_byte_length = writeRleBitPackedHybrid(writer, []) + } + + // definition levels + const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) + let definition_levels_byte_length = 0 + if (maxDefinitionLevel) { + const definitionLevels = [] + for (const value of values) { + if (value === null || value === undefined) { + definitionLevels.push(maxDefinitionLevel - 1) + num_nulls++ + } else { + definitionLevels.push(maxDefinitionLevel) + } + } + definition_levels_byte_length = writeRleBitPackedHybrid(writer, definitionLevels) + } + return { definition_levels_byte_length, repetition_levels_byte_length, num_nulls } +} diff --git a/src/metadata.js b/src/metadata.js index 76cdebb..ba21a1c 100644 --- a/src/metadata.js +++ b/src/metadata.js @@ -1,6 +1,6 @@ import { CompressionCodec, ConvertedType, Encoding, FieldRepetitionType, PageType, ParquetType } from 'hyparquet/src/constants.js' import { serializeTCompactProtocol } from './thrift.js' -import { unconvertMetadata } from './unconvert.js' +import { unconvertStatistics } from './unconvert.js' /** * @import {FileMetaData, LogicalType, TimeUnit} from 'hyparquet' @@ -44,16 +44,7 @@ export function writeMetadata(writer, metadata) { field_9: c.meta_data.data_page_offset, field_10: c.meta_data.index_page_offset, field_11: c.meta_data.dictionary_page_offset, - field_12: c.meta_data.statistics && { - field_1: unconvertMetadata(c.meta_data.statistics.max, metadata.schema[columnIndex + 1]), - field_2: unconvertMetadata(c.meta_data.statistics.min, metadata.schema[columnIndex + 1]), - field_3: c.meta_data.statistics.null_count, - field_4: c.meta_data.statistics.distinct_count, - field_5: unconvertMetadata(c.meta_data.statistics.max_value, metadata.schema[columnIndex + 1]), - field_6: unconvertMetadata(c.meta_data.statistics.min_value, metadata.schema[columnIndex + 1]), - field_7: c.meta_data.statistics.is_max_value_exact, - field_8: c.meta_data.statistics.is_min_value_exact, - }, + field_12: c.meta_data.statistics && unconvertStatistics(c.meta_data.statistics, metadata.schema[columnIndex + 1]), field_13: c.meta_data.encoding_stats && c.meta_data.encoding_stats.map(es => ({ field_1: PageType.indexOf(es.page_type), field_2: Encoding.indexOf(es.encoding), diff --git a/src/unconvert.js b/src/unconvert.js index 3c56cdd..16844e5 100644 --- a/src/unconvert.js +++ b/src/unconvert.js @@ -3,7 +3,7 @@ const dayMillis = 86400000 // 1 day in milliseconds /** * Convert from rich to primitive types. * - * @import {DecodedArray, SchemaElement} from 'hyparquet' + * @import {DecodedArray, SchemaElement, Statistics} from 'hyparquet' * @param {SchemaElement} element * @param {DecodedArray} values * @returns {DecodedArray} @@ -48,7 +48,7 @@ export function unconvert(element, values) { * @param {SchemaElement} element * @returns {Uint8Array | undefined} */ -export function unconvertMetadata(value, element) { +export function unconvertMinMax(value, element) { if (value === undefined || value === null) return undefined const { type, converted_type } = element if (type === 'BOOLEAN') return new Uint8Array([value ? 1 : 0]) @@ -90,6 +90,24 @@ export function unconvertMetadata(value, element) { throw new Error(`unsupported type for statistics: ${type} with value ${value}`) } +/** + * @param {Statistics} stats + * @param {SchemaElement} element + * @returns {import('../src/types.js').ThriftObject} + */ +export function unconvertStatistics(stats, element) { + return { + field_1: unconvertMinMax(stats.max, element), + field_2: unconvertMinMax(stats.min, element), + field_3: stats.null_count, + field_4: stats.distinct_count, + field_5: unconvertMinMax(stats.max_value, element), + field_6: unconvertMinMax(stats.min_value, element), + field_7: stats.is_max_value_exact, + field_8: stats.is_min_value_exact, + } +} + /** * @param {SchemaElement} element * @param {bigint} value diff --git a/test/unconvert.test.js b/test/unconvert.test.js index bd4fcb9..31ff2db 100644 --- a/test/unconvert.test.js +++ b/test/unconvert.test.js @@ -1,5 +1,5 @@ import { describe, expect, it } from 'vitest' -import { unconvert, unconvertDecimal, unconvertMetadata } from '../src/unconvert.js' +import { unconvert, unconvertDecimal, unconvertMinMax } from '../src/unconvert.js' import { convertMetadata } from 'hyparquet/src/metadata.js' /** @@ -60,18 +60,18 @@ describe('unconvert', () => { }) }) -describe('unconvertMetadata', () => { +describe('unconvertMinMax', () => { it('should return undefined if value is undefined or null', () => { /** @type {SchemaElement} */ const schema = { name: 'test', type: 'INT32' } - expect(unconvertMetadata(undefined, schema)).toBeUndefined() + expect(unconvertMinMax(undefined, schema)).toBeUndefined() }) it('should handle BOOLEAN type', () => { /** @type {SchemaElement} */ const schema = { name: 'test', type: 'BOOLEAN' } - expect(unconvertMetadata(true, schema)).toEqual(new Uint8Array([1])) - expect(unconvertMetadata(false, schema)).toEqual(new Uint8Array([0])) + expect(unconvertMinMax(true, schema)).toEqual(new Uint8Array([1])) + expect(unconvertMinMax(false, schema)).toEqual(new Uint8Array([0])) }) it('should truncate BYTE_ARRAY or FIXED_LEN_BYTE_ARRAY to 16 bytes', () => { @@ -80,21 +80,21 @@ describe('unconvertMetadata', () => { const longStrUint8 = new TextEncoder().encode(longStr) // value is a Uint8Array - const result1 = unconvertMetadata(longStrUint8, { name: 'test', type: 'BYTE_ARRAY' }) + const result1 = unconvertMinMax(longStrUint8, { name: 'test', type: 'BYTE_ARRAY' }) expect(result1).toBeInstanceOf(Uint8Array) - expect(result1?.length).toBe(16) // truncated + expect(result1?.length).toBe(16) // value is a string - const result2 = unconvertMetadata(longStr, { name: 'test', type: 'FIXED_LEN_BYTE_ARRAY' }) + const result2 = unconvertMinMax(longStr, { name: 'test', type: 'FIXED_LEN_BYTE_ARRAY' }) expect(result2).toBeInstanceOf(Uint8Array) - expect(result2?.length).toBe(16) // truncated + expect(result2?.length).toBe(16) }) it('should correctly encode FLOAT values in little-endian', () => { /** @type {SchemaElement} */ const schema = { name: 'test', type: 'FLOAT' } const value = 1.5 - const result = unconvertMetadata(value, schema) + const result = unconvertMinMax(value, schema) expect(result).toBeInstanceOf(Uint8Array) const roundtrip = convertMetadata(result, schema) expect(roundtrip).toEqual(1.5) @@ -104,7 +104,7 @@ describe('unconvertMetadata', () => { /** @type {SchemaElement} */ const schema = { name: 'test', type: 'DOUBLE' } const value = 1.123456789 - const result = unconvertMetadata(value, schema) + const result = unconvertMinMax(value, schema) expect(result).toBeInstanceOf(Uint8Array) const roundtrip = convertMetadata(result, schema) expect(roundtrip).toEqual(1.123456789) @@ -114,7 +114,7 @@ describe('unconvertMetadata', () => { /** @type {SchemaElement} */ const schema = { name: 'test', type: 'INT32' } const value = 123456 - const result = unconvertMetadata(value, schema) + const result = unconvertMinMax(value, schema) const roundtrip = convertMetadata(result, schema) expect(roundtrip).toEqual(123456) }) @@ -123,7 +123,7 @@ describe('unconvertMetadata', () => { /** @type {SchemaElement} */ const schema = { name: 'test', type: 'INT64' } const value = 1234567890123456789n - const result = unconvertMetadata(value, schema) + const result = unconvertMinMax(value, schema) const roundtrip = convertMetadata(result, schema) expect(roundtrip).toEqual(1234567890123456789n) }) @@ -132,7 +132,7 @@ describe('unconvertMetadata', () => { /** @type {SchemaElement} */ const schema = { name: 'test', type: 'INT64', converted_type: 'TIMESTAMP_MILLIS' } const date = new Date('2023-01-01T00:00:00Z') - const result = unconvertMetadata(date, schema) + const result = unconvertMinMax(date, schema) const roundtrip = convertMetadata(result, schema) expect(roundtrip).toEqual(date) }) @@ -140,14 +140,14 @@ describe('unconvertMetadata', () => { it('should throw an error for unsupported types', () => { /** @type {SchemaElement} */ const schema = { name: 'test', type: 'INT96' } - expect(() => unconvertMetadata(123, schema)) + expect(() => unconvertMinMax(123, schema)) .toThrow('unsupported type for statistics: INT96 with value 123') }) it('should throw an error for INT64 if value is a number instead of bigint or Date', () => { /** @type {SchemaElement} */ const schema = { name: 'test', type: 'INT64' } - expect(() => unconvertMetadata(123, schema)) + expect(() => unconvertMinMax(123, schema)) .toThrow('unsupported type for statistics: INT64 with value 123') }) })