From 2bd2e396d2f6d0b913965e60956056729ad0308c Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Thu, 3 Apr 2025 13:21:57 -0700 Subject: [PATCH] Write statistics --- README.md | 7 ++-- src/column.js | 31 +++++++++++++- src/metadata.js | 14 ++++++- src/unconvert.js | 44 +++++++++++++++++++ src/write.js | 5 ++- test/metadata.test.js | 34 ++++++++++++++- test/unconvert.test.js | 95 +++++++++++++++++++++++++++++++++++++++++- test/write.test.js | 17 +++++--- 8 files changed, 230 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 18a3b54..4697851 100644 --- a/README.md +++ b/README.md @@ -26,9 +26,10 @@ const arrayBuffer = parquetWrite({ ## Options - - `compression`: Boolean defaults to `true`. Set to `false` to disable snappy compression. - - `rowGroupSize`: Integer number of rows to include in each row group. - - `kvMetadata`: Extra key-value metadata to store in the parquet footer. + - `compression`: use snappy compression (default true) + - `statistics`: write column statistics (default true) + - `rowGroupSize`: number of rows in each row group (default 100000) + - `kvMetadata`: extra key-value metadata ## References diff --git a/src/column.js b/src/column.js index 41e8ada..8a3aeac 100644 --- a/src/column.js +++ b/src/column.js @@ -8,20 +8,46 @@ import { serializeTCompactProtocol } from './thrift.js' import { Writer } from './writer.js' /** - * @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType, SchemaElement} from 'hyparquet' + * @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType, SchemaElement, Statistics} from 'hyparquet' * @param {Writer} writer * @param {SchemaElement[]} schemaPath * @param {DecodedArray} values * @param {boolean} compressed + * @param {boolean} stats * @returns {ColumnMetaData} */ -export function writeColumn(writer, schemaPath, values, compressed) { +export function writeColumn(writer, schemaPath, values, compressed, stats) { const schemaElement = schemaPath[schemaPath.length - 1] const { type } = schemaElement if (!type) throw new Error(`column ${schemaElement.name} cannot determine type`) let dataType = type const offsetStart = writer.offset const num_values = values.length + /** @type {Statistics | undefined} */ + let statistics = undefined + + // Compute statistics + if (stats) { + statistics = { + min_value: undefined, + max_value: undefined, + null_count: 0n, + } + let null_count = 0n + for (const value of values) { + if (value === null || value === undefined) { + null_count++ + continue + } + if (statistics.min_value === undefined || value < statistics.min_value) { + statistics.min_value = value + } + if (statistics.max_value === undefined || value > statistics.max_value) { + statistics.max_value = value + } + } + statistics.null_count = null_count + } // Write levels to temp buffer const levels = new Writer() @@ -108,6 +134,7 @@ export function writeColumn(writer, schemaPath, values, compressed) { total_uncompressed_size: BigInt(writer.offset - offsetStart), data_page_offset, dictionary_page_offset, + statistics, } } diff --git a/src/metadata.js b/src/metadata.js index 4ba9025..f6a8456 100644 --- a/src/metadata.js +++ b/src/metadata.js @@ -1,5 +1,6 @@ import { CompressionCodec, ConvertedType, Encoding, FieldRepetitionType, PageType, ParquetType } from 'hyparquet/src/constants.js' import { serializeTCompactProtocol } from './thrift.js' +import { unconvertMetadata } from './unconvert.js' /** * @import {FileMetaData} from 'hyparquet' @@ -24,7 +25,7 @@ export function writeMetadata(writer, metadata) { })), field_3: metadata.num_rows, field_4: metadata.row_groups.map(rg => ({ - field_1: rg.columns.map(c => ({ + field_1: rg.columns.map((c, columnIndex) => ({ field_1: c.file_path, field_2: c.file_offset, field_3: c.meta_data && { @@ -39,7 +40,16 @@ export function writeMetadata(writer, metadata) { field_9: c.meta_data.data_page_offset, field_10: c.meta_data.index_page_offset, field_11: c.meta_data.dictionary_page_offset, - field_12: c.meta_data.statistics, + field_12: c.meta_data.statistics && { + field_1: unconvertMetadata(c.meta_data.statistics.max, metadata.schema[columnIndex + 1]), + field_2: unconvertMetadata(c.meta_data.statistics.min, metadata.schema[columnIndex + 1]), + field_3: c.meta_data.statistics.null_count, + field_4: c.meta_data.statistics.distinct_count, + field_5: unconvertMetadata(c.meta_data.statistics.max_value, metadata.schema[columnIndex + 1]), + field_6: unconvertMetadata(c.meta_data.statistics.min_value, metadata.schema[columnIndex + 1]), + field_7: c.meta_data.statistics.is_max_value_exact, + field_8: c.meta_data.statistics.is_min_value_exact, + }, field_13: c.meta_data.encoding_stats && c.meta_data.encoding_stats.map(es => ({ field_1: PageType.indexOf(es.page_type), field_2: Encoding.indexOf(es.encoding), diff --git a/src/unconvert.js b/src/unconvert.js index 6973990..51a356c 100644 --- a/src/unconvert.js +++ b/src/unconvert.js @@ -24,3 +24,47 @@ export function unconvert(schemaElement, values) { } return values } + +/** + * Uncovert from rich type to byte array for metadata statistics. + * + * @param {import('hyparquet/src/types.js').MinMaxType | undefined} value + * @param {SchemaElement} schema + * @returns {Uint8Array | undefined} + */ +export function unconvertMetadata(value, schema) { + if (value === undefined || value === null) return undefined + const { type, converted_type } = schema + if (type === 'BOOLEAN') return new Uint8Array([value ? 1 : 0]) + if (type === 'BYTE_ARRAY' || type === 'FIXED_LEN_BYTE_ARRAY') { + // truncate byte arrays to 16 bytes for statistics + if (value instanceof Uint8Array) return value.slice(0, 16) + return new TextEncoder().encode(value.toString().slice(0, 16)) + } + if (type === 'FLOAT' && typeof value === 'number') { + const buffer = new ArrayBuffer(4) + new DataView(buffer).setFloat32(0, value, true) + return new Uint8Array(buffer) + } + if (type === 'DOUBLE' && typeof value === 'number') { + const buffer = new ArrayBuffer(8) + new DataView(buffer).setFloat64(0, value, true) + return new Uint8Array(buffer) + } + if (type === 'INT32' && typeof value === 'number') { + const buffer = new ArrayBuffer(4) + new DataView(buffer).setInt32(0, value, true) + return new Uint8Array(buffer) + } + if (type === 'INT64' && typeof value === 'bigint') { + const buffer = new ArrayBuffer(8) + new DataView(buffer).setBigInt64(0, value, true) + return new Uint8Array(buffer) + } + if (type === 'INT64' && converted_type === 'TIMESTAMP_MILLIS' && value instanceof Date) { + const buffer = new ArrayBuffer(8) + new DataView(buffer).setBigInt64(0, BigInt(value.getTime()), true) + return new Uint8Array(buffer) + } + throw new Error(`unsupported type for statistics: ${type} with value ${value}`) +} diff --git a/src/write.js b/src/write.js index ed12ac2..0bffb47 100644 --- a/src/write.js +++ b/src/write.js @@ -12,11 +12,12 @@ import { getSchemaElementForValues } from './schema.js' * @param {object} options * @param {ColumnData[]} options.columnData * @param {boolean} [options.compressed] + * @param {boolean} [options.statistics] * @param {number} [options.rowGroupSize] * @param {KeyValue[]} [options.kvMetadata] * @returns {ArrayBuffer} */ -export function parquetWrite({ columnData, compressed = true, rowGroupSize = 100000, kvMetadata }) { +export function parquetWrite({ columnData, compressed = true, statistics = true, rowGroupSize = 100000, kvMetadata }) { const num_rows = columnData.length ? BigInt(columnData[0].data.length) : 0n const writer = new Writer() @@ -54,7 +55,7 @@ export function parquetWrite({ columnData, compressed = true, rowGroupSize = 100 const { name, data } = columnData[i] const file_offset = BigInt(writer.offset) const schemaPath = [schema[0], schema[i + 1]] - const meta_data = writeColumn(writer, schemaPath, data, compressed) + const meta_data = writeColumn(writer, schemaPath, data, compressed, statistics) // save metadata columns.push({ diff --git a/test/metadata.test.js b/test/metadata.test.js index 1543e78..3a3d2f5 100644 --- a/test/metadata.test.js +++ b/test/metadata.test.js @@ -34,6 +34,11 @@ export const exampleMetadata = { total_uncompressed_size: 24n, total_compressed_size: 24n, data_page_offset: 4n, + statistics: { + null_count: 0n, + min_value: false, + max_value: true, + }, }, }, { @@ -48,6 +53,11 @@ export const exampleMetadata = { total_uncompressed_size: 39n, total_compressed_size: 39n, data_page_offset: 28n, + statistics: { + null_count: 0n, + min_value: 0, + max_value: 0x7fffffff, + }, }, }, { @@ -62,6 +72,11 @@ export const exampleMetadata = { total_uncompressed_size: 43n, total_compressed_size: 43n, data_page_offset: 67n, + statistics: { + null_count: 0n, + min_value: 0n, + max_value: 0x7fffffffffffffffn, + }, }, }, { @@ -76,6 +91,11 @@ export const exampleMetadata = { total_uncompressed_size: 51n, total_compressed_size: 51n, data_page_offset: 110n, + statistics: { + null_count: 0n, + min_value: 0, + max_value: 1e100, + }, }, }, { @@ -90,6 +110,11 @@ export const exampleMetadata = { total_uncompressed_size: 42n, total_compressed_size: 42n, data_page_offset: 161n, + statistics: { + null_count: 0n, + min_value: 'a', + max_value: 'd', + }, }, }, { @@ -104,13 +129,18 @@ export const exampleMetadata = { total_uncompressed_size: 26n, total_compressed_size: 26n, data_page_offset: 203n, + statistics: { + null_count: 2n, + min_value: false, + max_value: true, + }, }, }, ], total_byte_size: 225n, num_rows: 4n, }], - metadata_length: 338, + metadata_length: 432, } describe('writeMetadata', () => { @@ -127,7 +157,7 @@ describe('writeMetadata', () => { { key: 'key1', value: 'value1' }, { key: 'key2', value: 'value2' }, ], - metadata_length: 370, + metadata_length: 464, } writeMetadata(writer, withKvMetadata) diff --git a/test/unconvert.test.js b/test/unconvert.test.js index 79180b0..4c4741d 100644 --- a/test/unconvert.test.js +++ b/test/unconvert.test.js @@ -1,5 +1,6 @@ import { describe, expect, it } from 'vitest' -import { unconvert } from '../src/unconvert.js' +import { unconvert, unconvertMetadata } from '../src/unconvert.js' +import { convertMetadata } from 'hyparquet/src/metadata.js' /** * @import {SchemaElement} from 'hyparquet' @@ -62,3 +63,95 @@ describe('unconvert', () => { expect(result).toEqual(input) }) }) + +describe('unconvertMetadata', () => { + it('should return undefined if value is undefined or null', () => { + /** @type {SchemaElement} */ + const schema = { name: 'test', type: 'INT32' } + expect(unconvertMetadata(undefined, schema)).toBeUndefined() + }) + + it('should handle BOOLEAN type', () => { + /** @type {SchemaElement} */ + const schema = { name: 'test', type: 'BOOLEAN' } + expect(unconvertMetadata(true, schema)).toEqual(new Uint8Array([1])) + expect(unconvertMetadata(false, schema)).toEqual(new Uint8Array([0])) + }) + + it('should truncate BYTE_ARRAY or FIXED_LEN_BYTE_ARRAY to 16 bytes', () => { + // longer string to test truncation + const longStr = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' + const longStrUint8 = new TextEncoder().encode(longStr) + + // value is a Uint8Array + const result1 = unconvertMetadata(longStrUint8, { name: 'test', type: 'BYTE_ARRAY' }) + expect(result1).toBeInstanceOf(Uint8Array) + expect(result1?.length).toBe(16) // truncated + + // value is a string + const result2 = unconvertMetadata(longStr, { name: 'test', type: 'FIXED_LEN_BYTE_ARRAY' }) + expect(result2).toBeInstanceOf(Uint8Array) + expect(result2?.length).toBe(16) // truncated + }) + + it('should correctly encode FLOAT values in little-endian', () => { + /** @type {SchemaElement} */ + const schema = { name: 'test', type: 'FLOAT' } + const value = 1.5 + const result = unconvertMetadata(value, schema) + expect(result).toBeInstanceOf(Uint8Array) + const roundtrip = convertMetadata(result, schema) + expect(roundtrip).toEqual(1.5) + }) + + it('should correctly encode DOUBLE values in little-endian', () => { + /** @type {SchemaElement} */ + const schema = { name: 'test', type: 'DOUBLE' } + const value = 1.123456789 + const result = unconvertMetadata(value, schema) + expect(result).toBeInstanceOf(Uint8Array) + const roundtrip = convertMetadata(result, schema) + expect(roundtrip).toEqual(1.123456789) + }) + + it('should correctly encode INT32 values in little-endian', () => { + /** @type {SchemaElement} */ + const schema = { name: 'test', type: 'INT32' } + const value = 123456 + const result = unconvertMetadata(value, schema) + const roundtrip = convertMetadata(result, schema) + expect(roundtrip).toEqual(123456) + }) + + it('should correctly encode INT64 values when given a bigint', () => { + /** @type {SchemaElement} */ + const schema = { name: 'test', type: 'INT64' } + const value = 1234567890123456789n + const result = unconvertMetadata(value, schema) + const roundtrip = convertMetadata(result, schema) + expect(roundtrip).toEqual(1234567890123456789n) + }) + + it('should correctly encode a Date as TIMESTAMP_MILLIS for INT64', () => { + /** @type {SchemaElement} */ + const schema = { name: 'test', type: 'INT64', converted_type: 'TIMESTAMP_MILLIS' } + const date = new Date('2023-01-01T00:00:00Z') + const result = unconvertMetadata(date, schema) + const roundtrip = convertMetadata(result, schema) + expect(roundtrip).toEqual(date) + }) + + it('should throw an error for unsupported types', () => { + /** @type {SchemaElement} */ + const schema = { name: 'test', type: 'INT96' } + expect(() => unconvertMetadata(123, schema)) + .toThrow('unsupported type for statistics: INT96 with value 123') + }) + + it('should throw an error for INT64 if value is a number instead of bigint or Date', () => { + /** @type {SchemaElement} */ + const schema = { name: 'test', type: 'INT64' } + expect(() => unconvertMetadata(123, schema)) + .toThrow('unsupported type for statistics: INT64 with value 123') + }) +}) diff --git a/test/write.test.js b/test/write.test.js index 2bd5d3f..6ca53dc 100644 --- a/test/write.test.js +++ b/test/write.test.js @@ -48,9 +48,9 @@ describe('parquetWrite', () => { bool[500] = true bool[9999] = false const file = parquetWrite({ columnData: [{ name: 'bool', data: bool }] }) - expect(file.byteLength).toBe(148) + expect(file.byteLength).toBe(160) const metadata = parquetMetadata(file) - expect(metadata.metadata_length).toBe(86) + expect(metadata.metadata_length).toBe(98) const result = await parquetReadObjects({ file }) expect(result.length).toBe(10000) expect(result[0]).toEqual({ bool: null }) @@ -64,21 +64,21 @@ describe('parquetWrite', () => { it('efficiently serializes long string', () => { const str = 'a'.repeat(10000) const file = parquetWrite({ columnData: [{ name: 'string', data: [str] }] }) - expect(file.byteLength).toBe(606) + expect(file.byteLength).toBe(646) }) it('less efficiently serializes string without compression', () => { const str = 'a'.repeat(10000) const columnData = [{ name: 'string', data: [str] }] const file = parquetWrite({ columnData, compressed: false }) - expect(file.byteLength).toBe(10135) + expect(file.byteLength).toBe(10175) }) it('efficiently serializes column with few distinct values', async () => { const data = Array(100000) .fill('aaaa', 0, 50000) .fill('bbbb', 50000, 100000) - const file = parquetWrite({ columnData: [{ name: 'string', data }] }) + const file = parquetWrite({ columnData: [{ name: 'string', data }], statistics: false }) expect(file.byteLength).toBe(178) // round trip const result = await parquetReadObjects({ file }) @@ -87,6 +87,13 @@ describe('parquetWrite', () => { expect(result[50000]).toEqual({ string: 'bbbb' }) }) + it('writes statistics when enabled', () => { + const withStats = parquetWrite({ columnData: basicData, statistics: true }) + const noStats = parquetWrite({ columnData: basicData, statistics: false }) + expect(withStats.byteLength).toBe(669) + expect(noStats.byteLength).toBe(575) + }) + it('serializes list types', async () => { const result = await roundTripDeserialize([{ name: 'list',