From d13d52b6062d4d693b649401058fed9256589404 Mon Sep 17 00:00:00 2001 From: ctranstrum Date: Tue, 13 Aug 2024 09:15:59 -0700 Subject: [PATCH] Add an option to return each row as an object keyed by column name (#25) * Add an option to return each row as an object keyed by column name * rename option to rowFormat and address feedback --- README.md | 15 +++++++++++++++ demo/demo.js | 4 ++-- src/hyparquet.d.ts | 4 +++- src/read.js | 24 +++++++++++++++++++----- test/read.test.js | 26 ++++++++++++++++++++++++++ 5 files changed, 65 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index eb191c3..912a0e8 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,21 @@ await parquetRead({ }) ``` +## Column names + +By default, data returned in the `onComplete` function will be one array of columns per row. +If you would like each row to be an object with each key the name of the column, set the option `rowFormat` to `object`. + +```js +import { parquetRead } from 'hyparquet' + +await parquetRead({ + file, + rowFormat: 'object', + onComplete: data => console.log(data), +}) +``` + ## Advanced Usage ### AsyncBuffer diff --git a/demo/demo.js b/demo/demo.js index 3913d8a..d68d911 100644 --- a/demo/demo.js +++ b/demo/demo.js @@ -112,7 +112,7 @@ async function render(asyncBuffer, metadata, name) { compressors, file: asyncBuffer, rowEnd: 1000, - onComplete(/** @type {any[][]} */ data) { + onComplete(/** @type {any[][] | Record[]} */ data) { const ms = performance.now() - startTime console.log(`parsed ${name} in ${ms.toFixed(0)} ms`) content.appendChild(renderTable(header, data)) @@ -144,7 +144,7 @@ fileInput?.addEventListener('change', () => { /** * @param {string[]} header - * @param {any[][]} data + * @param {any[][] | Record[]} data * @returns {HTMLTableElement} */ function renderTable(header, data) { diff --git a/src/hyparquet.d.ts b/src/hyparquet.d.ts index 7957e01..2ae394c 100644 --- a/src/hyparquet.d.ts +++ b/src/hyparquet.d.ts @@ -17,6 +17,7 @@ export type { AsyncBuffer, Compressors, FileMetaData, SchemaTree } * @param {AsyncBuffer} options.file file-like object containing parquet data * @param {FileMetaData} [options.metadata] parquet file metadata * @param {string[]} [options.columns] columns to read, all columns if undefined + * @param {string} [options.rowFormat] desired format of each row passed to the onComplete function * @param {number} [options.rowStart] first requested row index (inclusive) * @param {number} [options.rowEnd] last requested row index (exclusive) * @param {Function} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range. @@ -111,10 +112,11 @@ export interface ParquetReadOptions { file: AsyncBuffer // file-like object containing parquet data metadata?: FileMetaData // parquet metadata, will be parsed if not provided columns?: string[] // columns to read, all columns if undefined + rowFormat?: string // format of each row passed to the onComplete function rowStart?: number // inclusive rowEnd?: number // exclusive onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may be outside the requested range. - onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed + onComplete?: (rows: any[][] | Record[]) => void // called when all requested rows and columns are parsed compressors?: Compressors // custom decompressors utf8?: boolean // decode byte arrays as utf8 strings (default true) } diff --git a/src/read.js b/src/read.js index 057ab01..544e49b 100644 --- a/src/read.js +++ b/src/read.js @@ -1,4 +1,3 @@ - import { assembleNested } from './assemble.js' import { getColumnRange, readColumn } from './column.js' import { parquetMetadataAsync } from './metadata.js' @@ -24,10 +23,11 @@ import { concat } from './utils.js' * @param {AsyncBuffer} options.file file-like object containing parquet data * @param {FileMetaData} [options.metadata] parquet file metadata * @param {string[]} [options.columns] columns to read, all columns if undefined + * @param {string} [options.rowFormat] format of each row passed to the onComplete function * @param {number} [options.rowStart] first requested row index (inclusive) * @param {number} [options.rowEnd] last requested row index (exclusive) * @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range. - * @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed + * @param {(rows: any[][] | Record[]) => void} [options.onComplete] called when all requested rows and columns are parsed * @param {Compressors} [options.compressors] custom decompressors * @returns {Promise} resolves when all requested rows and columns are parsed */ @@ -74,8 +74,9 @@ export async function parquetRead(options) { * @param {AsyncBuffer} options.file file-like object containing parquet data * @param {FileMetaData} [options.metadata] parquet file metadata * @param {string[]} [options.columns] columns to read, all columns if undefined + * @param {string} [options.rowFormat] format of each row passed to the onComplete function * @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range. - * @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed + * @param {(rows: any[][] | Record[]) => void} [options.onComplete] called when all requested rows and columns are parsed * @param {Compressors} [options.compressors] * @param {RowGroup} rowGroup row group to read * @param {number} groupStart row index of the first row in the group @@ -186,12 +187,25 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) { if (options.onComplete) { // transpose columns into rows const groupData = new Array(rowLimit) - const includedColumns = children + const includedColumnNames = children .map(child => child.element.name) .filter(name => !columns || columns.includes(name)) + const includedColumns = includedColumnNames .map(name => subcolumnData.get(name)) + for (let row = 0; row < rowLimit; row++) { - groupData[row] = includedColumns.map(column => column[row]) + if (options.rowFormat === 'object') { + // return each row as an object + /** @type {Record} */ + const rowData = {} + includedColumnNames.forEach((name, index) => { + rowData[name] = includedColumns[index][row] + }) + groupData[row] = rowData + } else { + // return each row as an array + groupData[row] = includedColumns.map(column => column[row]) + } } return groupData } diff --git a/test/read.test.js b/test/read.test.js index 82ff09d..cd1177d 100644 --- a/test/read.test.js +++ b/test/read.test.js @@ -123,4 +123,30 @@ describe('parquetRead', () => { }, }) }) + + it('format row as object', async () => { + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') + await parquetRead({ + file, + columns: ['c'], + rowFormat: 'object', + onChunk: chunk => { + expect(toJson(chunk)).toEqual({ + columnName: 'c', + columnData: [2, 3, 4, 5, 2], + rowStart: 0, + rowEnd: 5, + }) + }, + onComplete: (rows) => { + expect(toJson(rows)).toEqual([ + { c: 2 }, + { c: 3 }, + { c: 4 }, + { c: 5 }, + { c: 2 }, + ]) + }, + }) + }) })