From 2d061392b9bf1824d0af287883c0dd4b22a80e1c Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Thu, 14 Mar 2024 15:39:00 -0700 Subject: [PATCH] Column filter by name --- package.json | 2 +- src/hyparquet.d.ts | 8 ++++---- src/read.js | 32 ++++++++++++++++++-------------- test/read.test.js | 6 +++--- 4 files changed, 26 insertions(+), 22 deletions(-) diff --git a/package.json b/package.json index 485b3c8..d67ce61 100644 --- a/package.json +++ b/package.json @@ -27,7 +27,7 @@ "typecheck": "tsc" }, "devDependencies": { - "@types/node": "20.11.26", + "@types/node": "20.11.27", "@typescript-eslint/eslint-plugin": "7.2.0", "@vitest/coverage-v8": "1.3.1", "eslint": "8.57.0", diff --git a/src/hyparquet.d.ts b/src/hyparquet.d.ts index c7d17b9..40ce2ed 100644 --- a/src/hyparquet.d.ts +++ b/src/hyparquet.d.ts @@ -14,7 +14,7 @@ export { AsyncBuffer, FileMetaData, SchemaTree } from './types' * @param {object} options read options * @param {AsyncBuffer} options.file file-like object containing parquet data * @param {FileMetaData} [options.metadata] parquet file metadata - * @param {number[]} [options.columns] columns to read, all columns if undefined + * @param {string[]} [options.columns] columns to read, all columns if undefined * @param {number} [options.rowStart] first requested row index (inclusive) * @param {number} [options.rowEnd] last requested row index (exclusive) * @param {Function} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range. @@ -92,7 +92,7 @@ export function toJson(obj: any): unknown export interface ParquetReadOptions { file: AsyncBuffer // file-like object containing parquet data metadata?: FileMetaData // parquet metadata, will be parsed if not provided - columns?: number[] // columns to read, all columns if undefined + columns?: string[] // columns to read, all columns if undefined rowStart?: number // inclusive rowEnd?: number // exclusive onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may be outside the requested range. @@ -104,8 +104,8 @@ export interface ParquetReadOptions { * A run of column data */ export interface ColumnData { - column: number - data: ArrayLike + columnName: string + columnData: ArrayLike rowStart: number rowEnd: number } diff --git a/src/read.js b/src/read.js index dcc4a3b..0d2dd99 100644 --- a/src/read.js +++ b/src/read.js @@ -20,7 +20,7 @@ import { parquetMetadataAsync } from './metadata.js' * @param {object} options read options * @param {AsyncBuffer} options.file file-like object containing parquet data * @param {FileMetaData} [options.metadata] parquet file metadata - * @param {number[]} [options.columns] columns to read, all columns if undefined + * @param {string[]} [options.columns] columns to read, all columns if undefined * @param {number} [options.rowStart] first requested row index (inclusive) * @param {number} [options.rowEnd] last requested row index (exclusive) * @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range. @@ -69,7 +69,7 @@ export async function parquetRead(options) { * @param {object} options read options * @param {AsyncBuffer} options.file file-like object containing parquet data * @param {FileMetaData} [options.metadata] parquet file metadata - * @param {number[]} [options.columns] columns to read, all columns if undefined + * @param {string[]} [options.columns] columns to read, all columns if undefined * @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range. * @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed * @param {Compressors} [options.compressors] custom decompressors @@ -82,18 +82,20 @@ async function readRowGroup(options, rowGroup) { // loop through metadata to find min/max bytes to read let [groupStartByte, groupEndByte] = [file.byteLength, 0] - rowGroup.columns.forEach((columnChunk, columnIndex) => { - // skip columns that are not requested or lack metadata - if (columns && !columns.includes(columnIndex)) return - if (!columnChunk.meta_data) return + rowGroup.columns.forEach(({ meta_data: columnMetadata }) => { + if (!columnMetadata) throw new Error('parquet column metadata is undefined') + const columnName = columnMetadata.path_in_schema.join('.') + // skip columns that are not requested + if (columns && !columns.includes(columnName)) return - const startByte = getColumnOffset(columnChunk.meta_data) - const endByte = startByte + Number(columnChunk.meta_data.total_compressed_size) + const startByte = getColumnOffset(columnMetadata) + const endByte = startByte + Number(columnMetadata.total_compressed_size) groupStartByte = Math.min(groupStartByte, startByte) groupEndByte = Math.max(groupEndByte, endByte) }) - if (groupStartByte >= groupEndByte) { - throw new Error('parquet missing row group metadata') + if (groupStartByte >= groupEndByte && columns?.length) { + // TODO: should throw if any column is missing + throw new Error(`parquet columns not found: ${columns.join(', ')}`) } // if row group size is less than 128mb, pre-load in one read let groupBuffer @@ -108,12 +110,14 @@ async function readRowGroup(options, rowGroup) { const promises = [] // read column data for (let columnIndex = 0; columnIndex < rowGroup.columns.length; columnIndex++) { - // skip columns that are not requested - if (columns && !columns.includes(columnIndex)) continue - const columnMetadata = rowGroup.columns[columnIndex].meta_data if (!columnMetadata) throw new Error('parquet column metadata is undefined') + // skip columns that are not requested + const columnName = columnMetadata.path_in_schema.join('.') + // skip columns that are not requested + if (columns && !columns.includes(columnName)) continue + const columnStartByte = getColumnOffset(columnMetadata) const columnEndByte = columnStartByte + Number(columnMetadata.total_compressed_size) const columnBytes = columnEndByte - columnStartByte @@ -145,7 +149,7 @@ async function readRowGroup(options, rowGroup) { throw new Error(`parquet column length ${columnData.length} does not match row group length ${rowGroup.num_rows}`) } // notify caller of column data - if (options.onChunk) options.onChunk({ column: columnIndex, data: columnData, rowStart: 0, rowEnd: columnData.length }) + if (options.onChunk) options.onChunk({ columnName, columnData, rowStart: 0, rowEnd: columnData.length }) // add column data to group data only if onComplete is defined if (options.onComplete) addColumn(groupData, columnIndex, columnData) })) diff --git a/test/read.test.js b/test/read.test.js index 93bf14e..769ea8a 100644 --- a/test/read.test.js +++ b/test/read.test.js @@ -38,11 +38,11 @@ describe('parquetRead', () => { const asyncBuffer = fileToAsyncBuffer('test/files/datapage_v2.snappy.parquet') await parquetRead({ file: asyncBuffer, - columns: [2], + columns: ['c'], onChunk: (rows) => { expect(toJson(rows)).toEqual({ - column: 2, - data: [2, 3, 4, 5, 2], + columnName: 'c', + columnData: [2, 3, 4, 5, 2], rowStart: 0, rowEnd: 5, })