Write statistics

This commit is contained in:
Kenny Daniel 2025-04-03 13:21:57 -07:00
parent ae829f7c9b
commit 2bd2e396d2
No known key found for this signature in database
GPG Key ID: FDF16101AF5AFD3A
8 changed files with 230 additions and 17 deletions

@ -26,9 +26,10 @@ const arrayBuffer = parquetWrite({
## Options
- `compression`: Boolean defaults to `true`. Set to `false` to disable snappy compression.
- `rowGroupSize`: Integer number of rows to include in each row group.
- `kvMetadata`: Extra key-value metadata to store in the parquet footer.
- `compression`: use snappy compression (default true)
- `statistics`: write column statistics (default true)
- `rowGroupSize`: number of rows in each row group (default 100000)
- `kvMetadata`: extra key-value metadata
## References

@ -8,20 +8,46 @@ import { serializeTCompactProtocol } from './thrift.js'
import { Writer } from './writer.js'
/**
* @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType, SchemaElement} from 'hyparquet'
* @import {ColumnMetaData, DecodedArray, PageHeader, ParquetType, SchemaElement, Statistics} from 'hyparquet'
* @param {Writer} writer
* @param {SchemaElement[]} schemaPath
* @param {DecodedArray} values
* @param {boolean} compressed
* @param {boolean} stats
* @returns {ColumnMetaData}
*/
export function writeColumn(writer, schemaPath, values, compressed) {
export function writeColumn(writer, schemaPath, values, compressed, stats) {
const schemaElement = schemaPath[schemaPath.length - 1]
const { type } = schemaElement
if (!type) throw new Error(`column ${schemaElement.name} cannot determine type`)
let dataType = type
const offsetStart = writer.offset
const num_values = values.length
/** @type {Statistics | undefined} */
let statistics = undefined
// Compute statistics
if (stats) {
statistics = {
min_value: undefined,
max_value: undefined,
null_count: 0n,
}
let null_count = 0n
for (const value of values) {
if (value === null || value === undefined) {
null_count++
continue
}
if (statistics.min_value === undefined || value < statistics.min_value) {
statistics.min_value = value
}
if (statistics.max_value === undefined || value > statistics.max_value) {
statistics.max_value = value
}
}
statistics.null_count = null_count
}
// Write levels to temp buffer
const levels = new Writer()
@ -108,6 +134,7 @@ export function writeColumn(writer, schemaPath, values, compressed) {
total_uncompressed_size: BigInt(writer.offset - offsetStart),
data_page_offset,
dictionary_page_offset,
statistics,
}
}

@ -1,5 +1,6 @@
import { CompressionCodec, ConvertedType, Encoding, FieldRepetitionType, PageType, ParquetType } from 'hyparquet/src/constants.js'
import { serializeTCompactProtocol } from './thrift.js'
import { unconvertMetadata } from './unconvert.js'
/**
* @import {FileMetaData} from 'hyparquet'
@ -24,7 +25,7 @@ export function writeMetadata(writer, metadata) {
})),
field_3: metadata.num_rows,
field_4: metadata.row_groups.map(rg => ({
field_1: rg.columns.map(c => ({
field_1: rg.columns.map((c, columnIndex) => ({
field_1: c.file_path,
field_2: c.file_offset,
field_3: c.meta_data && {
@ -39,7 +40,16 @@ export function writeMetadata(writer, metadata) {
field_9: c.meta_data.data_page_offset,
field_10: c.meta_data.index_page_offset,
field_11: c.meta_data.dictionary_page_offset,
field_12: c.meta_data.statistics,
field_12: c.meta_data.statistics && {
field_1: unconvertMetadata(c.meta_data.statistics.max, metadata.schema[columnIndex + 1]),
field_2: unconvertMetadata(c.meta_data.statistics.min, metadata.schema[columnIndex + 1]),
field_3: c.meta_data.statistics.null_count,
field_4: c.meta_data.statistics.distinct_count,
field_5: unconvertMetadata(c.meta_data.statistics.max_value, metadata.schema[columnIndex + 1]),
field_6: unconvertMetadata(c.meta_data.statistics.min_value, metadata.schema[columnIndex + 1]),
field_7: c.meta_data.statistics.is_max_value_exact,
field_8: c.meta_data.statistics.is_min_value_exact,
},
field_13: c.meta_data.encoding_stats && c.meta_data.encoding_stats.map(es => ({
field_1: PageType.indexOf(es.page_type),
field_2: Encoding.indexOf(es.encoding),

@ -24,3 +24,47 @@ export function unconvert(schemaElement, values) {
}
return values
}
/**
* Uncovert from rich type to byte array for metadata statistics.
*
* @param {import('hyparquet/src/types.js').MinMaxType | undefined} value
* @param {SchemaElement} schema
* @returns {Uint8Array | undefined}
*/
export function unconvertMetadata(value, schema) {
if (value === undefined || value === null) return undefined
const { type, converted_type } = schema
if (type === 'BOOLEAN') return new Uint8Array([value ? 1 : 0])
if (type === 'BYTE_ARRAY' || type === 'FIXED_LEN_BYTE_ARRAY') {
// truncate byte arrays to 16 bytes for statistics
if (value instanceof Uint8Array) return value.slice(0, 16)
return new TextEncoder().encode(value.toString().slice(0, 16))
}
if (type === 'FLOAT' && typeof value === 'number') {
const buffer = new ArrayBuffer(4)
new DataView(buffer).setFloat32(0, value, true)
return new Uint8Array(buffer)
}
if (type === 'DOUBLE' && typeof value === 'number') {
const buffer = new ArrayBuffer(8)
new DataView(buffer).setFloat64(0, value, true)
return new Uint8Array(buffer)
}
if (type === 'INT32' && typeof value === 'number') {
const buffer = new ArrayBuffer(4)
new DataView(buffer).setInt32(0, value, true)
return new Uint8Array(buffer)
}
if (type === 'INT64' && typeof value === 'bigint') {
const buffer = new ArrayBuffer(8)
new DataView(buffer).setBigInt64(0, value, true)
return new Uint8Array(buffer)
}
if (type === 'INT64' && converted_type === 'TIMESTAMP_MILLIS' && value instanceof Date) {
const buffer = new ArrayBuffer(8)
new DataView(buffer).setBigInt64(0, BigInt(value.getTime()), true)
return new Uint8Array(buffer)
}
throw new Error(`unsupported type for statistics: ${type} with value ${value}`)
}

@ -12,11 +12,12 @@ import { getSchemaElementForValues } from './schema.js'
* @param {object} options
* @param {ColumnData[]} options.columnData
* @param {boolean} [options.compressed]
* @param {boolean} [options.statistics]
* @param {number} [options.rowGroupSize]
* @param {KeyValue[]} [options.kvMetadata]
* @returns {ArrayBuffer}
*/
export function parquetWrite({ columnData, compressed = true, rowGroupSize = 100000, kvMetadata }) {
export function parquetWrite({ columnData, compressed = true, statistics = true, rowGroupSize = 100000, kvMetadata }) {
const num_rows = columnData.length ? BigInt(columnData[0].data.length) : 0n
const writer = new Writer()
@ -54,7 +55,7 @@ export function parquetWrite({ columnData, compressed = true, rowGroupSize = 100
const { name, data } = columnData[i]
const file_offset = BigInt(writer.offset)
const schemaPath = [schema[0], schema[i + 1]]
const meta_data = writeColumn(writer, schemaPath, data, compressed)
const meta_data = writeColumn(writer, schemaPath, data, compressed, statistics)
// save metadata
columns.push({

@ -34,6 +34,11 @@ export const exampleMetadata = {
total_uncompressed_size: 24n,
total_compressed_size: 24n,
data_page_offset: 4n,
statistics: {
null_count: 0n,
min_value: false,
max_value: true,
},
},
},
{
@ -48,6 +53,11 @@ export const exampleMetadata = {
total_uncompressed_size: 39n,
total_compressed_size: 39n,
data_page_offset: 28n,
statistics: {
null_count: 0n,
min_value: 0,
max_value: 0x7fffffff,
},
},
},
{
@ -62,6 +72,11 @@ export const exampleMetadata = {
total_uncompressed_size: 43n,
total_compressed_size: 43n,
data_page_offset: 67n,
statistics: {
null_count: 0n,
min_value: 0n,
max_value: 0x7fffffffffffffffn,
},
},
},
{
@ -76,6 +91,11 @@ export const exampleMetadata = {
total_uncompressed_size: 51n,
total_compressed_size: 51n,
data_page_offset: 110n,
statistics: {
null_count: 0n,
min_value: 0,
max_value: 1e100,
},
},
},
{
@ -90,6 +110,11 @@ export const exampleMetadata = {
total_uncompressed_size: 42n,
total_compressed_size: 42n,
data_page_offset: 161n,
statistics: {
null_count: 0n,
min_value: 'a',
max_value: 'd',
},
},
},
{
@ -104,13 +129,18 @@ export const exampleMetadata = {
total_uncompressed_size: 26n,
total_compressed_size: 26n,
data_page_offset: 203n,
statistics: {
null_count: 2n,
min_value: false,
max_value: true,
},
},
},
],
total_byte_size: 225n,
num_rows: 4n,
}],
metadata_length: 338,
metadata_length: 432,
}
describe('writeMetadata', () => {
@ -127,7 +157,7 @@ describe('writeMetadata', () => {
{ key: 'key1', value: 'value1' },
{ key: 'key2', value: 'value2' },
],
metadata_length: 370,
metadata_length: 464,
}
writeMetadata(writer, withKvMetadata)

@ -1,5 +1,6 @@
import { describe, expect, it } from 'vitest'
import { unconvert } from '../src/unconvert.js'
import { unconvert, unconvertMetadata } from '../src/unconvert.js'
import { convertMetadata } from 'hyparquet/src/metadata.js'
/**
* @import {SchemaElement} from 'hyparquet'
@ -62,3 +63,95 @@ describe('unconvert', () => {
expect(result).toEqual(input)
})
})
describe('unconvertMetadata', () => {
it('should return undefined if value is undefined or null', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'INT32' }
expect(unconvertMetadata(undefined, schema)).toBeUndefined()
})
it('should handle BOOLEAN type', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'BOOLEAN' }
expect(unconvertMetadata(true, schema)).toEqual(new Uint8Array([1]))
expect(unconvertMetadata(false, schema)).toEqual(new Uint8Array([0]))
})
it('should truncate BYTE_ARRAY or FIXED_LEN_BYTE_ARRAY to 16 bytes', () => {
// longer string to test truncation
const longStr = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
const longStrUint8 = new TextEncoder().encode(longStr)
// value is a Uint8Array
const result1 = unconvertMetadata(longStrUint8, { name: 'test', type: 'BYTE_ARRAY' })
expect(result1).toBeInstanceOf(Uint8Array)
expect(result1?.length).toBe(16) // truncated
// value is a string
const result2 = unconvertMetadata(longStr, { name: 'test', type: 'FIXED_LEN_BYTE_ARRAY' })
expect(result2).toBeInstanceOf(Uint8Array)
expect(result2?.length).toBe(16) // truncated
})
it('should correctly encode FLOAT values in little-endian', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'FLOAT' }
const value = 1.5
const result = unconvertMetadata(value, schema)
expect(result).toBeInstanceOf(Uint8Array)
const roundtrip = convertMetadata(result, schema)
expect(roundtrip).toEqual(1.5)
})
it('should correctly encode DOUBLE values in little-endian', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'DOUBLE' }
const value = 1.123456789
const result = unconvertMetadata(value, schema)
expect(result).toBeInstanceOf(Uint8Array)
const roundtrip = convertMetadata(result, schema)
expect(roundtrip).toEqual(1.123456789)
})
it('should correctly encode INT32 values in little-endian', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'INT32' }
const value = 123456
const result = unconvertMetadata(value, schema)
const roundtrip = convertMetadata(result, schema)
expect(roundtrip).toEqual(123456)
})
it('should correctly encode INT64 values when given a bigint', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'INT64' }
const value = 1234567890123456789n
const result = unconvertMetadata(value, schema)
const roundtrip = convertMetadata(result, schema)
expect(roundtrip).toEqual(1234567890123456789n)
})
it('should correctly encode a Date as TIMESTAMP_MILLIS for INT64', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'INT64', converted_type: 'TIMESTAMP_MILLIS' }
const date = new Date('2023-01-01T00:00:00Z')
const result = unconvertMetadata(date, schema)
const roundtrip = convertMetadata(result, schema)
expect(roundtrip).toEqual(date)
})
it('should throw an error for unsupported types', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'INT96' }
expect(() => unconvertMetadata(123, schema))
.toThrow('unsupported type for statistics: INT96 with value 123')
})
it('should throw an error for INT64 if value is a number instead of bigint or Date', () => {
/** @type {SchemaElement} */
const schema = { name: 'test', type: 'INT64' }
expect(() => unconvertMetadata(123, schema))
.toThrow('unsupported type for statistics: INT64 with value 123')
})
})

@ -48,9 +48,9 @@ describe('parquetWrite', () => {
bool[500] = true
bool[9999] = false
const file = parquetWrite({ columnData: [{ name: 'bool', data: bool }] })
expect(file.byteLength).toBe(148)
expect(file.byteLength).toBe(160)
const metadata = parquetMetadata(file)
expect(metadata.metadata_length).toBe(86)
expect(metadata.metadata_length).toBe(98)
const result = await parquetReadObjects({ file })
expect(result.length).toBe(10000)
expect(result[0]).toEqual({ bool: null })
@ -64,21 +64,21 @@ describe('parquetWrite', () => {
it('efficiently serializes long string', () => {
const str = 'a'.repeat(10000)
const file = parquetWrite({ columnData: [{ name: 'string', data: [str] }] })
expect(file.byteLength).toBe(606)
expect(file.byteLength).toBe(646)
})
it('less efficiently serializes string without compression', () => {
const str = 'a'.repeat(10000)
const columnData = [{ name: 'string', data: [str] }]
const file = parquetWrite({ columnData, compressed: false })
expect(file.byteLength).toBe(10135)
expect(file.byteLength).toBe(10175)
})
it('efficiently serializes column with few distinct values', async () => {
const data = Array(100000)
.fill('aaaa', 0, 50000)
.fill('bbbb', 50000, 100000)
const file = parquetWrite({ columnData: [{ name: 'string', data }] })
const file = parquetWrite({ columnData: [{ name: 'string', data }], statistics: false })
expect(file.byteLength).toBe(178)
// round trip
const result = await parquetReadObjects({ file })
@ -87,6 +87,13 @@ describe('parquetWrite', () => {
expect(result[50000]).toEqual({ string: 'bbbb' })
})
it('writes statistics when enabled', () => {
const withStats = parquetWrite({ columnData: basicData, statistics: true })
const noStats = parquetWrite({ columnData: basicData, statistics: false })
expect(withStats.byteLength).toBe(669)
expect(noStats.byteLength).toBe(575)
})
it('serializes list types', async () => {
const result = await roundTripDeserialize([{
name: 'list',