Parquet metadata parser

This commit is contained in:
Kenny Daniel 2024-01-03 09:56:17 -08:00
parent a032785ea6
commit 88ff71d924
No known key found for this signature in database
GPG Key ID: 6A3C5E318BE71391
5 changed files with 262 additions and 8 deletions

@ -9,6 +9,8 @@ JavaScript parser for [Apache Parquet](https://parquet.apache.org) files.
Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval.
Dependency free since 2023!
## References
- https://github.com/apache/parquet-format

@ -1,3 +1,5 @@
import { parquetMetadata } from './metadata'
/**
* Read parquet data rows from a file
*
@ -5,5 +7,6 @@
* @returns array of rows
*/
export function parquetRead(arrayBuffer: ArrayBuffer): any[][] {
const metadata = parquetMetadata(arrayBuffer)
throw new Error('not implemented')
}

112
src/metadata.ts Normal file

@ -0,0 +1,112 @@
import { deserializeTCompactProtocol } from './thrift'
import type { FileMetaData, SchemaElement } from './types'
/**
* Read parquet header, metadata, and schema information from a file
* @param arrayBuffer parquet file contents
* @returns metadata object
*/
/* eslint-disable camelcase */
export function parquetMetadata(arrayBuffer: ArrayBuffer): FileMetaData {
// DataView for easier manipulation of the buffer
const view = new DataView(arrayBuffer)
// Validate footer magic number "PAR1"
if (view.byteLength < 8) {
throw new Error('parquet file is too short')
}
if (view.getUint32(view.byteLength - 4, true) !== 0x31524150) {
throw new Error('parquet file invalid magic number')
}
// Parquet files store metadata at the end of the file
// Metadata length is 4 bytes before the last PAR1
const metadataLengthOffset = view.byteLength - 8
const metadataLength = view.getUint32(view.byteLength - 8, true)
if (metadataLength <= 0 || metadataLength > metadataLengthOffset) {
throw new Error('parquet file invalid metadata length')
}
const metadataOffset = metadataLengthOffset - metadataLength
const metadataBuffer = view.buffer.slice(metadataOffset, metadataLengthOffset)
const { value: metadata } = deserializeTCompactProtocol(metadataBuffer)
// Parse parquet metadata from thrift data
const version = metadata.field_1
const schema = metadata.field_2.map((field: any) => ({
type: field.field_1,
type_length: field.field_2,
repetition_type: field.field_3,
name: field.field_4,
num_children: field.field_5,
converted_type: field.field_6,
scale: field.field_7,
precision: field.field_8,
field_id: field.field_9,
}))
const num_rows = metadata.field_3
const row_groups = metadata.field_4.map((rowGroup: any) => ({
columns: rowGroup.field_1.map((column: any) => ({
file_path: column.field_1,
file_offset: column.field_2,
meta_data: column.field_3 && {
type: column.field_3.field_1,
encodings: column.field_3.field_2,
path_in_schema: column.field_3.field_3,
codec: column.field_3.field_4,
num_values: column.field_3.field_5,
total_uncompressed_size: column.field_3.field_6,
total_compressed_size: column.field_3.field_7,
key_value_metadata: column.field_3.field_8,
data_page_offset: column.field_3.field_9,
index_page_offset: column.field_3.field_10,
dictionary_page_offset: column.field_3.field_11,
statistics: column.field_3.field_12 && {
max: column.field_3.field_12.field_1,
min: column.field_3.field_12.field_2,
null_count: column.field_3.field_12.field_3,
distinct_count: column.field_3.field_12.field_4,
},
encoding_stats: column.field_3.field_13?.map((encodingStat: any) => ({
page_type: encodingStat.field_1,
encoding: encodingStat.field_2,
count: encodingStat.field_3,
})),
},
})),
total_byte_size: rowGroup.field_2,
num_rows: rowGroup.field_3,
sorting_columns: rowGroup.field_4?.map((sortingColumn: any) => ({
column_idx: sortingColumn.field_1,
descending: sortingColumn.field_2,
nulls_first: sortingColumn.field_3,
})),
}))
const key_value_metadata = metadata.field_5?.map((keyValue: any) => ({
key: keyValue.field_1,
value: keyValue.field_2,
}))
const created_by = metadata.field_6
return {
version,
schema,
num_rows,
row_groups,
key_value_metadata,
created_by,
}
}
/**
* Get the schema element with the given name.
*/
export function schemaElement(schema: SchemaElement[], name: string[]): any {
function key(name: string[]) { return name.join('.') }
const schemaElementByName = new Map(schema.map(se => [se.name, se]))
const element = schemaElementByName.get(key(name))
if (!element) {
throw new Error(`schema element not found: ${name}`)
}
return element
}

@ -1,11 +1,4 @@
/**
* Represents a decoded value, and includes the number of bytes read.
* This is used to read data from the file and advance a virtual file pointer.
*/
interface Decoded<T> {
value: T
byteLength: number
}
import { Decoded } from './types'
// TCompactProtocol types
const CompactType = {

144
src/types.ts Normal file

@ -0,0 +1,144 @@
/**
* Represents a decoded value, and includes the number of bytes read.
* This is used to read data from the file and advance a virtual file pointer.
*/
export interface Decoded<T> {
value: T
byteLength: number
}
// Parquet file metadata types
export interface FileMetaData {
version: number
schema: SchemaElement[]
num_rows: number
row_groups: RowGroup[]
key_value_metadata?: KeyValue[]
created_by?: string
}
export interface SchemaElement {
type?: ParquetType
type_length?: number
repetition_type?: FieldRepetitionType
name: string
num_children?: number
converted_type?: ConvertedType
scale?: number
precision?: number
field_id?: number
}
export enum ParquetType {
BOOLEAN = 0,
INT32 = 1,
INT64 = 2,
INT96 = 3,
FLOAT = 4,
DOUBLE = 5,
BYTE_ARRAY = 6,
FIXED_LEN_BYTE_ARRAY = 7,
}
export enum FieldRepetitionType {
REQUIRED = 0,
OPTIONAL = 1,
REPEATED = 2,
}
enum ConvertedType {
UTF8 = 0,
MAP = 1,
MAP_KEY_VALUE = 2,
LIST = 3,
ENUM = 4,
DECIMAL = 5,
DATE = 6,
TIME_MILLIS = 7,
TIME_MICROS = 8,
TIMESTAMP_MILLIS = 9,
TIMESTAMP_MICROS = 10,
}
export interface RowGroup {
columns: ColumnChunk[]
total_byte_size: number
num_rows: number
sorting_columns?: SortingColumn[]
}
export interface ColumnChunk {
file_path?: string
file_offset: number
meta_data?: ColumnMetaData
}
export interface ColumnMetaData {
type: ParquetType
encodings: Encoding[]
path_in_schema: string[]
codec: CompressionCodec
num_values: number
total_uncompressed_size: number
total_compressed_size: number
key_value_metadata?: KeyValue[]
data_page_offset: number
index_page_offset?: number
dictionary_page_offset?: number
statistics?: Statistics
encoding_stats?: PageEncodingStats[]
}
export enum Encoding {
PLAIN = 0,
PLAIN_DICTIONARY = 2,
RLE = 3,
BIT_PACKED = 4,
DELTA_BINARY_PACKED = 5,
DELTA_LENGTH_BYTE_ARRAY = 6,
DELTA_BYTE_ARRAY = 7,
RLE_DICTIONARY = 8,
BYTE_STREAM_SPLIT = 9,
}
export enum CompressionCodec {
UNCOMPRESSED = 0,
SNAPPY = 1,
GZIP = 2,
LZO = 3,
BROTLI = 4,
LZ4 = 5,
ZSTD = 6,
LZ4_RAW = 7,
}
interface KeyValue {
key: string
value?: string
}
export interface Statistics {
max?: Uint8Array // binary representation
min?: Uint8Array // binary representation
null_count?: number
distinct_count?: number
}
interface PageEncodingStats {
page_type: PageType
encoding: Encoding
count: number
}
export enum PageType {
DATA_PAGE = 0,
INDEX_PAGE = 1,
DICTIONARY_PAGE = 2,
DATA_PAGE_V2 = 3,
}
interface SortingColumn {
column_idx: number
descending: boolean
nulls_first: boolean
}