From e3b5fca8834697b235d50366f0055d616bb61743 Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Fri, 23 Feb 2024 10:25:06 -0800 Subject: [PATCH] Custom decompressors --- README.md | 14 ++++- src/column.js | 17 ++++-- src/datapageV2.js | 17 ++++-- src/hyparquet.d.ts | 2 + src/read.js | 8 ++- src/types.d.ts | 4 ++ test/files/concatenated_gzip_members.json | 54 ++++++++++++++++++ .../concatenated_gzip_members.metadata.json | 44 ++++++++++++++ test/files/concatenated_gzip_members.parquet | Bin 0 -> 1647 bytes test/read.test.js | 13 +++++ 10 files changed, 160 insertions(+), 13 deletions(-) create mode 100644 test/files/concatenated_gzip_members.json create mode 100644 test/files/concatenated_gzip_members.metadata.json create mode 100644 test/files/concatenated_gzip_members.parquet diff --git a/README.md b/README.md index b44f892..3981b8c 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,19 @@ You can provide an `AsyncBuffer` which is like a js `ArrayBuffer` but the `slice ## Supported Parquet Files The parquet format supports a number of different compression and encoding types. -Hyparquet does not support 100% of all parquet files, and probably never will, since supporting all possible compression types will increase the size of the library, and are rarely used in practice. +Hyparquet does not support 100% of all parquet files. +Supporting every possible compression codec available in parquet would blow up the size of the hyparquet library. +In practice, most parquet files use snappy compression. + +You can extend support for parquet files with other compression codec using the `compressors` option. + +```js +import { gunzipSync } from 'zlib' +parquetRead({ file, compressors: { + // add gzip support: + GZIP: (input, output) => output.set(gunzipSync(input)), +}}) +``` Compression: - [X] Uncompressed diff --git a/src/column.js b/src/column.js index 8af7515..a428f1a 100644 --- a/src/column.js +++ b/src/column.js @@ -9,6 +9,7 @@ import { snappyUncompress } from './snappy.js' /** * @typedef {import('./types.js').SchemaElement} SchemaElement * @typedef {import('./types.js').ColumnMetaData} ColumnMetaData + * @typedef {import('./types.js').Compressors} Compressors * @typedef {import('./types.js').RowGroup} RowGroup */ @@ -20,9 +21,10 @@ import { snappyUncompress } from './snappy.js' * @param {RowGroup} rowGroup row group metadata * @param {ColumnMetaData} columnMetadata column metadata * @param {SchemaElement[]} schema schema for the file + * @param {Compressors} [compressors] custom decompressors * @returns {ArrayLike} array of values */ -export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schema) { +export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schema, compressors) { /** @type {ArrayLike | undefined} */ let dictionary = undefined let valuesSeen = 0 @@ -50,7 +52,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, if (!daph) throw new Error('parquet data page header is undefined') const page = decompressPage( - compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec + compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors ) const { definitionLevels, repetitionLevels, value: dataPage } = readDataPage(page, daph, schema, columnMetadata) valuesSeen += daph.num_values @@ -96,7 +98,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, if (!diph) throw new Error('parquet dictionary page header is undefined') const page = decompressPage( - compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec + compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors ) dictionary = readDictionaryPage(page, diph, schema, columnMetadata) } else if (header.type === PageType.DATA_PAGE_V2) { @@ -104,7 +106,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, if (!daph2) throw new Error('parquet data page header v2 is undefined') const { definitionLevels, repetitionLevels, value: dataPage } = readDataPageV2( - compressedBytes, header, schema, columnMetadata + compressedBytes, header, schema, columnMetadata, compressors ) valuesSeen += daph2.num_values @@ -170,13 +172,18 @@ export function getColumnOffset(columnMetadata) { * @param {Uint8Array} compressedBytes * @param {number} uncompressed_page_size * @param {CompressionCodec} codec + * @param {Compressors | undefined} compressors * @returns {Uint8Array} */ -export function decompressPage(compressedBytes, uncompressed_page_size, codec) { +export function decompressPage(compressedBytes, uncompressed_page_size, codec, compressors) { /** @type {Uint8Array | undefined} */ let page + const customDecompressor = compressors?.[codec] if (codec === 'UNCOMPRESSED') { page = compressedBytes + } else if (customDecompressor) { + page = new Uint8Array(uncompressed_page_size) + customDecompressor(compressedBytes, page) } else if (codec === 'SNAPPY') { page = new Uint8Array(uncompressed_page_size) snappyUncompress(compressedBytes, page) diff --git a/src/datapageV2.js b/src/datapageV2.js index 04774f2..d715b92 100644 --- a/src/datapageV2.js +++ b/src/datapageV2.js @@ -14,6 +14,7 @@ import { readVarInt, readZigZag } from './thrift.js' * * @typedef {import("./types.d.ts").DataPage} DataPage * @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData + * @typedef {import("./types.d.ts").Compressors} Compressors * @typedef {import("./types.d.ts").DataPageHeaderV2} DataPageHeaderV2 * @typedef {import("./types.d.ts").PageHeader} PageHeader * @typedef {import("./types.d.ts").SchemaElement} SchemaElement @@ -21,9 +22,10 @@ import { readVarInt, readZigZag } from './thrift.js' * @param {PageHeader} ph page header * @param {SchemaElement[]} schema schema for the file * @param {ColumnMetaData} columnMetadata metadata for the column + * @param {Compressors | undefined} compressors * @returns {DataPage} definition levels, repetition levels, and array of values */ -export function readDataPageV2(compressedBytes, ph, schema, columnMetadata) { +export function readDataPageV2(compressedBytes, ph, schema, columnMetadata, compressors) { const dataView = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength) let offset = 0 /** @type {any} */ @@ -48,10 +50,15 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata) { if (daph2.encoding === Encoding.PLAIN) { const se = schemaElement(schema, columnMetadata.path_in_schema) const utf8 = se.converted_type === 'UTF8' - const plainObj = readPlain(dataView, columnMetadata.type, nValues, offset, utf8) + let page = compressedBytes.slice(offset) + if (daph2.is_compressed && columnMetadata.codec !== 'UNCOMPRESSED') { + page = decompressPage(page, uncompressedPageSize, columnMetadata.codec, compressors) + } + const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength) + const plainObj = readPlain(pageView, columnMetadata.type, nValues, 0, utf8) values = plainObj.value } else if (daph2.encoding === Encoding.RLE) { - const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec) + const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec, compressors) const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength) const bitWidth = 1 if (daph2.num_nulls) { @@ -66,7 +73,7 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata) { daph2.encoding === Encoding.RLE_DICTIONARY ) { compressedBytes = compressedBytes.subarray(offset) - const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec) + const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec, compressors) const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength) const bitWidth = pageView.getUint8(0) @@ -77,7 +84,7 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata) { } else if (daph2.encoding === Encoding.DELTA_BINARY_PACKED) { if (daph2.num_nulls) throw new Error('parquet delta-int not supported') const codec = daph2.is_compressed ? columnMetadata.codec : 'UNCOMPRESSED' - const page = decompressPage(compressedBytes, uncompressedPageSize, codec) + const page = decompressPage(compressedBytes, uncompressedPageSize, codec, compressors) deltaBinaryUnpack(page, nValues, values) } else { throw new Error(`parquet unsupported encoding: ${daph2.encoding}`) diff --git a/src/hyparquet.d.ts b/src/hyparquet.d.ts index 2928590..c7d17b9 100644 --- a/src/hyparquet.d.ts +++ b/src/hyparquet.d.ts @@ -19,6 +19,7 @@ export { AsyncBuffer, FileMetaData, SchemaTree } from './types' * @param {number} [options.rowEnd] last requested row index (exclusive) * @param {Function} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range. * @param {Function} [options.onComplete] called when all requested rows and columns are parsed + * @param {Compressors} [options.compressor] custom decompressors * @returns {Promise} resolves when all requested rows and columns are parsed */ export async function parquetRead(options: ParquetReadOptions): Promise @@ -96,6 +97,7 @@ export interface ParquetReadOptions { rowEnd?: number // exclusive onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may be outside the requested range. onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed + compressors?: Compressors // custom decompressors } /** diff --git a/src/read.js b/src/read.js index f1a22c1..dcc4a3b 100644 --- a/src/read.js +++ b/src/read.js @@ -14,6 +14,7 @@ import { parquetMetadataAsync } from './metadata.js' * the chunks. * * @typedef {import('./hyparquet.js').ColumnData} ColumnData + * @typedef {import('./types.js').Compressors} Compressors * @typedef {import('./types.js').AsyncBuffer} AsyncBuffer * @typedef {import('./types.js').FileMetaData} FileMetaData * @param {object} options read options @@ -24,6 +25,7 @@ import { parquetMetadataAsync } from './metadata.js' * @param {number} [options.rowEnd] last requested row index (exclusive) * @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range. * @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed + * @param {Compressors} [options.compressors] custom decompressors * @returns {Promise} resolves when all requested rows and columns are parsed */ export async function parquetRead(options) { @@ -70,11 +72,12 @@ export async function parquetRead(options) { * @param {number[]} [options.columns] columns to read, all columns if undefined * @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range. * @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed + * @param {Compressors} [options.compressors] custom decompressors * @param {RowGroup} rowGroup row group to read * @returns {Promise} resolves to row data */ async function readRowGroup(options, rowGroup) { - const { file, metadata, columns } = options + const { file, metadata, columns, compressors } = options if (!metadata) throw new Error('parquet metadata not found') // loop through metadata to find min/max bytes to read @@ -107,6 +110,7 @@ async function readRowGroup(options, rowGroup) { for (let columnIndex = 0; columnIndex < rowGroup.columns.length; columnIndex++) { // skip columns that are not requested if (columns && !columns.includes(columnIndex)) continue + const columnMetadata = rowGroup.columns[columnIndex].meta_data if (!columnMetadata) throw new Error('parquet column metadata is undefined') @@ -135,7 +139,7 @@ async function readRowGroup(options, rowGroup) { promises.push(buffer.then(arrayBuffer => { // TODO: extract SchemaElement for this column const columnData = readColumn( - arrayBuffer, bufferOffset, rowGroup, columnMetadata, metadata.schema + arrayBuffer, bufferOffset, rowGroup, columnMetadata, metadata.schema, compressors ) if (columnData.length !== Number(rowGroup.num_rows)) { throw new Error(`parquet column length ${columnData.length} does not match row group length ${rowGroup.num_rows}`) diff --git a/src/types.d.ts b/src/types.d.ts index db057b0..4f1396f 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -135,6 +135,10 @@ export type CompressionCodec = 'ZSTD' | 'LZ4_RAW' +export type Compressors = { + [K in CompressionCodec]?: (input: Uint8Array, output: Uint8Array) => void +} + interface KeyValue { key: string value?: string diff --git a/test/files/concatenated_gzip_members.json b/test/files/concatenated_gzip_members.json new file mode 100644 index 0000000..6f9ddd2 --- /dev/null +++ b/test/files/concatenated_gzip_members.json @@ -0,0 +1,54 @@ +[ + [1], [2], [3], [4], [5], [6], [7], [8], [9], [10], + [11], [12], [13], [14], [15], [16], [17], [18], [19], [20], + [21], [22], [23], [24], [25], [26], [27], [28], [29], [30], + [31], [32], [33], [34], [35], [36], [37], [38], [39], [40], + [41], [42], [43], [44], [45], [46], [47], [48], [49], [50], + [51], [52], [53], [54], [55], [56], [57], [58], [59], [60], + [61], [62], [63], [64], [65], [66], [67], [68], [69], [70], + [71], [72], [73], [74], [75], [76], [77], [78], [79], [80], + [81], [82], [83], [84], [85], [86], [87], [88], [89], [90], + [91], [92], [93], [94], [95], [96], [97], [98], [99], [100], + [101], [102], [103], [104], [105], [106], [107], [108], [109], [110], + [111], [112], [113], [114], [115], [116], [117], [118], [119], [120], + [121], [122], [123], [124], [125], [126], [127], [128], [129], [130], + [131], [132], [133], [134], [135], [136], [137], [138], [139], [140], + [141], [142], [143], [144], [145], [146], [147], [148], [149], [150], + [151], [152], [153], [154], [155], [156], [157], [158], [159], [160], + [161], [162], [163], [164], [165], [166], [167], [168], [169], [170], + [171], [172], [173], [174], [175], [176], [177], [178], [179], [180], + [181], [182], [183], [184], [185], [186], [187], [188], [189], [190], + [191], [192], [193], [194], [195], [196], [197], [198], [199], [200], + [201], [202], [203], [204], [205], [206], [207], [208], [209], [210], + [211], [212], [213], [214], [215], [216], [217], [218], [219], [220], + [221], [222], [223], [224], [225], [226], [227], [228], [229], [230], + [231], [232], [233], [234], [235], [236], [237], [238], [239], [240], + [241], [242], [243], [244], [245], [246], [247], [248], [249], [250], + [251], [252], [253], [254], [255], [256], [257], [258], [259], [260], + [261], [262], [263], [264], [265], [266], [267], [268], [269], [270], + [271], [272], [273], [274], [275], [276], [277], [278], [279], [280], + [281], [282], [283], [284], [285], [286], [287], [288], [289], [290], + [291], [292], [293], [294], [295], [296], [297], [298], [299], [300], + [301], [302], [303], [304], [305], [306], [307], [308], [309], [310], + [311], [312], [313], [314], [315], [316], [317], [318], [319], [320], + [321], [322], [323], [324], [325], [326], [327], [328], [329], [330], + [331], [332], [333], [334], [335], [336], [337], [338], [339], [340], + [341], [342], [343], [344], [345], [346], [347], [348], [349], [350], + [351], [352], [353], [354], [355], [356], [357], [358], [359], [360], + [361], [362], [363], [364], [365], [366], [367], [368], [369], [370], + [371], [372], [373], [374], [375], [376], [377], [378], [379], [380], + [381], [382], [383], [384], [385], [386], [387], [388], [389], [390], + [391], [392], [393], [394], [395], [396], [397], [398], [399], [400], + [401], [402], [403], [404], [405], [406], [407], [408], [409], [410], + [411], [412], [413], [414], [415], [416], [417], [418], [419], [420], + [421], [422], [423], [424], [425], [426], [427], [428], [429], [430], + [431], [432], [433], [434], [435], [436], [437], [438], [439], [440], + [441], [442], [443], [444], [445], [446], [447], [448], [449], [450], + [451], [452], [453], [454], [455], [456], [457], [458], [459], [460], + [461], [462], [463], [464], [465], [466], [467], [468], [469], [470], + [471], [472], [473], [474], [475], [476], [477], [478], [479], [480], + [481], [482], [483], [484], [485], [486], [487], [488], [489], [490], + [491], [492], [493], [494], [495], [496], [497], [498], [499], [500], + [501], [502], [503], [504], [505], [506], [507], [508], [509], [510], + [511], [512], [513] +] diff --git a/test/files/concatenated_gzip_members.metadata.json b/test/files/concatenated_gzip_members.metadata.json new file mode 100644 index 0000000..29da81a --- /dev/null +++ b/test/files/concatenated_gzip_members.metadata.json @@ -0,0 +1,44 @@ +{ + "version": 2, + "metadata_length": 115, + "num_rows": 513, + "row_groups": [ + { + "columns": [ + { + "file_offset": 1471, + "meta_data": { + "codec": "GZIP", + "data_page_offset": 4, + "encodings": [ + 0, + 3 + ], + "num_values": 513, + "path_in_schema": [ + "long_col" + ], + "statistics": {}, + "total_compressed_size": 1467, + "total_uncompressed_size": 4155, + "type": 2 + } + } + ], + "num_rows": 513, + "total_byte_size": 4155 + } + ], + "schema": [ + { + "name": "root", + "num_children": 1 + }, + { + "converted_type": "UINT_64", + "name": "long_col", + "repetition_type": "OPTIONAL", + "type": 2 + } + ] +} diff --git a/test/files/concatenated_gzip_members.parquet b/test/files/concatenated_gzip_members.parquet new file mode 100644 index 0000000000000000000000000000000000000000..9cdd999559ff8834d77ad89f39f193e7af1cf7e9 GIT binary patch literal 1647 zcmeHIPiP!<6rPz)H?uo4iT`%f;D%Z?$uHULE@Ejh%9`JZC{zDD6s>Me3Pz1Xn=Y8q z;uKfO90k2RQYbXex4f%w^lOtZ zy`v0>C6k^Hx8?DbAo>LkC!87EEs7u@O%*c0TsnDYi$Qb_7Bpv< z&BJ2cqP?M4z+*vl1r}6ihs^_G#iZS#m&g4a zoTm3B{|IphmQ(bk^nv6H5O2a-iuOo81F?lQgU(963ULBkIz1@)B*X@`()5|+ zOAvQrD@AWhJ_~U-whUU6yatEW%H?})_l8_=$n}39*H`;$%Axx{(v+JN<)420qLNS= zCwj{34xH)e?N^e$oArIw{Zo4;tNSt?*;zf?9LqMd1NsC1Zs6^qkw-gTt5&^?)@JC} t{vPkoJ@)afW4E-K+cdqo-rPVdoBK~ol=UNK_3h2 { + const result = gunzipSync(input) + output.set(result) + }, +} + describe('parquetRead', () => { const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet')) @@ -12,6 +24,7 @@ describe('parquetRead', () => { const asyncBuffer = fileToAsyncBuffer(`test/files/${file}`) await parquetRead({ file: asyncBuffer, + compressors, onComplete: (rows) => { const base = file.replace('.parquet', '') const expected = fileToJson(`test/files/${base}.json`)