Custom decompressors

This commit is contained in:
Kenny Daniel 2024-02-23 10:25:06 -08:00
parent ea3d1a8bbb
commit e3b5fca883
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
10 changed files with 160 additions and 13 deletions

@ -77,7 +77,19 @@ You can provide an `AsyncBuffer` which is like a js `ArrayBuffer` but the `slice
## Supported Parquet Files
The parquet format supports a number of different compression and encoding types.
Hyparquet does not support 100% of all parquet files, and probably never will, since supporting all possible compression types will increase the size of the library, and are rarely used in practice.
Hyparquet does not support 100% of all parquet files.
Supporting every possible compression codec available in parquet would blow up the size of the hyparquet library.
In practice, most parquet files use snappy compression.
You can extend support for parquet files with other compression codec using the `compressors` option.
```js
import { gunzipSync } from 'zlib'
parquetRead({ file, compressors: {
// add gzip support:
GZIP: (input, output) => output.set(gunzipSync(input)),
}})
```
Compression:
- [X] Uncompressed

@ -9,6 +9,7 @@ import { snappyUncompress } from './snappy.js'
/**
* @typedef {import('./types.js').SchemaElement} SchemaElement
* @typedef {import('./types.js').ColumnMetaData} ColumnMetaData
* @typedef {import('./types.js').Compressors} Compressors
* @typedef {import('./types.js').RowGroup} RowGroup
*/
@ -20,9 +21,10 @@ import { snappyUncompress } from './snappy.js'
* @param {RowGroup} rowGroup row group metadata
* @param {ColumnMetaData} columnMetadata column metadata
* @param {SchemaElement[]} schema schema for the file
* @param {Compressors} [compressors] custom decompressors
* @returns {ArrayLike<any>} array of values
*/
export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schema) {
export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schema, compressors) {
/** @type {ArrayLike<any> | undefined} */
let dictionary = undefined
let valuesSeen = 0
@ -50,7 +52,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
if (!daph) throw new Error('parquet data page header is undefined')
const page = decompressPage(
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors
)
const { definitionLevels, repetitionLevels, value: dataPage } = readDataPage(page, daph, schema, columnMetadata)
valuesSeen += daph.num_values
@ -96,7 +98,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
if (!diph) throw new Error('parquet dictionary page header is undefined')
const page = decompressPage(
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors
)
dictionary = readDictionaryPage(page, diph, schema, columnMetadata)
} else if (header.type === PageType.DATA_PAGE_V2) {
@ -104,7 +106,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
if (!daph2) throw new Error('parquet data page header v2 is undefined')
const { definitionLevels, repetitionLevels, value: dataPage } = readDataPageV2(
compressedBytes, header, schema, columnMetadata
compressedBytes, header, schema, columnMetadata, compressors
)
valuesSeen += daph2.num_values
@ -170,13 +172,18 @@ export function getColumnOffset(columnMetadata) {
* @param {Uint8Array} compressedBytes
* @param {number} uncompressed_page_size
* @param {CompressionCodec} codec
* @param {Compressors | undefined} compressors
* @returns {Uint8Array}
*/
export function decompressPage(compressedBytes, uncompressed_page_size, codec) {
export function decompressPage(compressedBytes, uncompressed_page_size, codec, compressors) {
/** @type {Uint8Array | undefined} */
let page
const customDecompressor = compressors?.[codec]
if (codec === 'UNCOMPRESSED') {
page = compressedBytes
} else if (customDecompressor) {
page = new Uint8Array(uncompressed_page_size)
customDecompressor(compressedBytes, page)
} else if (codec === 'SNAPPY') {
page = new Uint8Array(uncompressed_page_size)
snappyUncompress(compressedBytes, page)

@ -14,6 +14,7 @@ import { readVarInt, readZigZag } from './thrift.js'
*
* @typedef {import("./types.d.ts").DataPage} DataPage
* @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData
* @typedef {import("./types.d.ts").Compressors} Compressors
* @typedef {import("./types.d.ts").DataPageHeaderV2} DataPageHeaderV2
* @typedef {import("./types.d.ts").PageHeader} PageHeader
* @typedef {import("./types.d.ts").SchemaElement} SchemaElement
@ -21,9 +22,10 @@ import { readVarInt, readZigZag } from './thrift.js'
* @param {PageHeader} ph page header
* @param {SchemaElement[]} schema schema for the file
* @param {ColumnMetaData} columnMetadata metadata for the column
* @param {Compressors | undefined} compressors
* @returns {DataPage} definition levels, repetition levels, and array of values
*/
export function readDataPageV2(compressedBytes, ph, schema, columnMetadata) {
export function readDataPageV2(compressedBytes, ph, schema, columnMetadata, compressors) {
const dataView = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength)
let offset = 0
/** @type {any} */
@ -48,10 +50,15 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata) {
if (daph2.encoding === Encoding.PLAIN) {
const se = schemaElement(schema, columnMetadata.path_in_schema)
const utf8 = se.converted_type === 'UTF8'
const plainObj = readPlain(dataView, columnMetadata.type, nValues, offset, utf8)
let page = compressedBytes.slice(offset)
if (daph2.is_compressed && columnMetadata.codec !== 'UNCOMPRESSED') {
page = decompressPage(page, uncompressedPageSize, columnMetadata.codec, compressors)
}
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const plainObj = readPlain(pageView, columnMetadata.type, nValues, 0, utf8)
values = plainObj.value
} else if (daph2.encoding === Encoding.RLE) {
const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec)
const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec, compressors)
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const bitWidth = 1
if (daph2.num_nulls) {
@ -66,7 +73,7 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata) {
daph2.encoding === Encoding.RLE_DICTIONARY
) {
compressedBytes = compressedBytes.subarray(offset)
const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec)
const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec, compressors)
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const bitWidth = pageView.getUint8(0)
@ -77,7 +84,7 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata) {
} else if (daph2.encoding === Encoding.DELTA_BINARY_PACKED) {
if (daph2.num_nulls) throw new Error('parquet delta-int not supported')
const codec = daph2.is_compressed ? columnMetadata.codec : 'UNCOMPRESSED'
const page = decompressPage(compressedBytes, uncompressedPageSize, codec)
const page = decompressPage(compressedBytes, uncompressedPageSize, codec, compressors)
deltaBinaryUnpack(page, nValues, values)
} else {
throw new Error(`parquet unsupported encoding: ${daph2.encoding}`)

2
src/hyparquet.d.ts vendored

@ -19,6 +19,7 @@ export { AsyncBuffer, FileMetaData, SchemaTree } from './types'
* @param {number} [options.rowEnd] last requested row index (exclusive)
* @param {Function} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
* @param {Function} [options.onComplete] called when all requested rows and columns are parsed
* @param {Compressors} [options.compressor] custom decompressors
* @returns {Promise<void>} resolves when all requested rows and columns are parsed
*/
export async function parquetRead(options: ParquetReadOptions): Promise<void>
@ -96,6 +97,7 @@ export interface ParquetReadOptions {
rowEnd?: number // exclusive
onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may be outside the requested range.
onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed
compressors?: Compressors // custom decompressors
}
/**

@ -14,6 +14,7 @@ import { parquetMetadataAsync } from './metadata.js'
* the chunks.
*
* @typedef {import('./hyparquet.js').ColumnData} ColumnData
* @typedef {import('./types.js').Compressors} Compressors
* @typedef {import('./types.js').AsyncBuffer} AsyncBuffer
* @typedef {import('./types.js').FileMetaData} FileMetaData
* @param {object} options read options
@ -24,6 +25,7 @@ import { parquetMetadataAsync } from './metadata.js'
* @param {number} [options.rowEnd] last requested row index (exclusive)
* @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
* @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed
* @param {Compressors} [options.compressors] custom decompressors
* @returns {Promise<void>} resolves when all requested rows and columns are parsed
*/
export async function parquetRead(options) {
@ -70,11 +72,12 @@ export async function parquetRead(options) {
* @param {number[]} [options.columns] columns to read, all columns if undefined
* @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
* @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed
* @param {Compressors} [options.compressors] custom decompressors
* @param {RowGroup} rowGroup row group to read
* @returns {Promise<any[][]>} resolves to row data
*/
async function readRowGroup(options, rowGroup) {
const { file, metadata, columns } = options
const { file, metadata, columns, compressors } = options
if (!metadata) throw new Error('parquet metadata not found')
// loop through metadata to find min/max bytes to read
@ -107,6 +110,7 @@ async function readRowGroup(options, rowGroup) {
for (let columnIndex = 0; columnIndex < rowGroup.columns.length; columnIndex++) {
// skip columns that are not requested
if (columns && !columns.includes(columnIndex)) continue
const columnMetadata = rowGroup.columns[columnIndex].meta_data
if (!columnMetadata) throw new Error('parquet column metadata is undefined')
@ -135,7 +139,7 @@ async function readRowGroup(options, rowGroup) {
promises.push(buffer.then(arrayBuffer => {
// TODO: extract SchemaElement for this column
const columnData = readColumn(
arrayBuffer, bufferOffset, rowGroup, columnMetadata, metadata.schema
arrayBuffer, bufferOffset, rowGroup, columnMetadata, metadata.schema, compressors
)
if (columnData.length !== Number(rowGroup.num_rows)) {
throw new Error(`parquet column length ${columnData.length} does not match row group length ${rowGroup.num_rows}`)

4
src/types.d.ts vendored

@ -135,6 +135,10 @@ export type CompressionCodec =
'ZSTD' |
'LZ4_RAW'
export type Compressors = {
[K in CompressionCodec]?: (input: Uint8Array, output: Uint8Array) => void
}
interface KeyValue {
key: string
value?: string

@ -0,0 +1,54 @@
[
[1], [2], [3], [4], [5], [6], [7], [8], [9], [10],
[11], [12], [13], [14], [15], [16], [17], [18], [19], [20],
[21], [22], [23], [24], [25], [26], [27], [28], [29], [30],
[31], [32], [33], [34], [35], [36], [37], [38], [39], [40],
[41], [42], [43], [44], [45], [46], [47], [48], [49], [50],
[51], [52], [53], [54], [55], [56], [57], [58], [59], [60],
[61], [62], [63], [64], [65], [66], [67], [68], [69], [70],
[71], [72], [73], [74], [75], [76], [77], [78], [79], [80],
[81], [82], [83], [84], [85], [86], [87], [88], [89], [90],
[91], [92], [93], [94], [95], [96], [97], [98], [99], [100],
[101], [102], [103], [104], [105], [106], [107], [108], [109], [110],
[111], [112], [113], [114], [115], [116], [117], [118], [119], [120],
[121], [122], [123], [124], [125], [126], [127], [128], [129], [130],
[131], [132], [133], [134], [135], [136], [137], [138], [139], [140],
[141], [142], [143], [144], [145], [146], [147], [148], [149], [150],
[151], [152], [153], [154], [155], [156], [157], [158], [159], [160],
[161], [162], [163], [164], [165], [166], [167], [168], [169], [170],
[171], [172], [173], [174], [175], [176], [177], [178], [179], [180],
[181], [182], [183], [184], [185], [186], [187], [188], [189], [190],
[191], [192], [193], [194], [195], [196], [197], [198], [199], [200],
[201], [202], [203], [204], [205], [206], [207], [208], [209], [210],
[211], [212], [213], [214], [215], [216], [217], [218], [219], [220],
[221], [222], [223], [224], [225], [226], [227], [228], [229], [230],
[231], [232], [233], [234], [235], [236], [237], [238], [239], [240],
[241], [242], [243], [244], [245], [246], [247], [248], [249], [250],
[251], [252], [253], [254], [255], [256], [257], [258], [259], [260],
[261], [262], [263], [264], [265], [266], [267], [268], [269], [270],
[271], [272], [273], [274], [275], [276], [277], [278], [279], [280],
[281], [282], [283], [284], [285], [286], [287], [288], [289], [290],
[291], [292], [293], [294], [295], [296], [297], [298], [299], [300],
[301], [302], [303], [304], [305], [306], [307], [308], [309], [310],
[311], [312], [313], [314], [315], [316], [317], [318], [319], [320],
[321], [322], [323], [324], [325], [326], [327], [328], [329], [330],
[331], [332], [333], [334], [335], [336], [337], [338], [339], [340],
[341], [342], [343], [344], [345], [346], [347], [348], [349], [350],
[351], [352], [353], [354], [355], [356], [357], [358], [359], [360],
[361], [362], [363], [364], [365], [366], [367], [368], [369], [370],
[371], [372], [373], [374], [375], [376], [377], [378], [379], [380],
[381], [382], [383], [384], [385], [386], [387], [388], [389], [390],
[391], [392], [393], [394], [395], [396], [397], [398], [399], [400],
[401], [402], [403], [404], [405], [406], [407], [408], [409], [410],
[411], [412], [413], [414], [415], [416], [417], [418], [419], [420],
[421], [422], [423], [424], [425], [426], [427], [428], [429], [430],
[431], [432], [433], [434], [435], [436], [437], [438], [439], [440],
[441], [442], [443], [444], [445], [446], [447], [448], [449], [450],
[451], [452], [453], [454], [455], [456], [457], [458], [459], [460],
[461], [462], [463], [464], [465], [466], [467], [468], [469], [470],
[471], [472], [473], [474], [475], [476], [477], [478], [479], [480],
[481], [482], [483], [484], [485], [486], [487], [488], [489], [490],
[491], [492], [493], [494], [495], [496], [497], [498], [499], [500],
[501], [502], [503], [504], [505], [506], [507], [508], [509], [510],
[511], [512], [513]
]

@ -0,0 +1,44 @@
{
"version": 2,
"metadata_length": 115,
"num_rows": 513,
"row_groups": [
{
"columns": [
{
"file_offset": 1471,
"meta_data": {
"codec": "GZIP",
"data_page_offset": 4,
"encodings": [
0,
3
],
"num_values": 513,
"path_in_schema": [
"long_col"
],
"statistics": {},
"total_compressed_size": 1467,
"total_uncompressed_size": 4155,
"type": 2
}
}
],
"num_rows": 513,
"total_byte_size": 4155
}
],
"schema": [
{
"name": "root",
"num_children": 1
},
{
"converted_type": "UINT_64",
"name": "long_col",
"repetition_type": "OPTIONAL",
"type": 2
}
]
}

Binary file not shown.

@ -1,9 +1,21 @@
import fs from 'fs'
import { describe, expect, it } from 'vitest'
import { gunzipSync } from 'zlib'
import { parquetRead } from '../src/hyparquet.js'
import { toJson } from '../src/toJson.js'
import { fileToAsyncBuffer, fileToJson } from './helpers.js'
/**
* @typedef {import('../src/types.js').Compressors} Compressors
* @type {Compressors}
*/
const compressors = {
GZIP: (/** @type {Uint8Array} */ input, /** @type {Uint8Array} */ output) => {
const result = gunzipSync(input)
output.set(result)
},
}
describe('parquetRead', () => {
const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet'))
@ -12,6 +24,7 @@ describe('parquetRead', () => {
const asyncBuffer = fileToAsyncBuffer(`test/files/${file}`)
await parquetRead({
file: asyncBuffer,
compressors,
onComplete: (rows) => {
const base = file.replace('.parquet', '')
const expected = fileToJson(`test/files/${base}.json`)