From f5274904b70d811e96d462460df741a4b4cc32ed Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Thu, 10 Apr 2025 23:29:58 -0700 Subject: [PATCH] Add onPage callback to parquetRead --- README.md | 74 +++++++++++++++++++++++++++++++++-------------- src/column.js | 25 ++++++++++++---- src/read.js | 12 ++++---- src/types.d.ts | 1 + test/read.test.js | 50 ++++++++++++++++++++++++++++++++ 5 files changed, 130 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 5acf45e..64feafb 100644 --- a/README.md +++ b/README.md @@ -112,8 +112,7 @@ const metadata = parquetMetadata(arrayBuffer) ### AsyncBuffer -Hyparquet accepts argument `file` of type `AsyncBuffer` which is like a js `ArrayBuffer` but the `slice` method can return `Promise`. -You can pass an `ArrayBuffer` anywhere that an `AsyncBuffer` is expected, if you have the entire file in memory. +Hyparquet requires an argument `file` of type `AsyncBuffer`. An `AsyncBuffer` is similar to a js `ArrayBuffer` but the `slice` method can return async `Promise`. ```typescript type Awaitable = T | Promise @@ -123,7 +122,39 @@ interface AsyncBuffer { } ``` -You can define your own `AsyncBuffer` to create a virtual file that can be read asynchronously. In most cases, you should probably use `asyncBufferFromUrl` or `asyncBufferFromFile`. +In most cases, you should probably use `asyncBufferFromUrl` or `asyncBufferFromFile` to create an `AsyncBuffer` for hyparquet. + +#### asyncBufferFromFile + +If you are in a local node.js environment, use `asyncBufferFromFile` to wrap a local file as an `AsyncBuffer`: + +```typescript +const file: AsyncBuffer = asyncBufferFromFile('local.parquet') +const data = await parquetReadObjects({ file }) +``` + +#### asyncBufferFromUrl + +If you want to read a parquet file remotely over http, use `asyncBufferFromUrl` to wrap an http url as an `AsyncBuffer` using http range requests. + + - Pass `requestInit` option to provide additional fetch headers for authentication (optional) + - Pass `byteLength` if you know the file size to save a round trip HEAD request (optional) + +```typescript +const url = 'https://s3.hyperparam.app/wiki_en.parquet' +const requestInit = { headers: { Authorization: 'Bearer my_token' } } +const byteLength = 415958713 +const file: AsyncBuffer = await asyncBufferFromUrl({ url, requestInit, byteLength }) +const data = await parquetReadObjects({ file }) +``` + +#### ArrayBuffer + +You can provide an `ArrayBuffer` anywhere that an `AsyncBuffer` is expected. This is useful if you already have the entire parquet file in memory. + +#### Custom AsyncBuffer + +You can implement your own `AsyncBuffer` to create a virtual file that can be read asynchronously by hyparquet. ### parquetRead vs parquetReadObjects @@ -139,11 +170,17 @@ parquetReadObjects({ file }): Promise[]> `parquetRead` is the "base" function for reading parquet files. It returns a `Promise` that resolves when the file has been read or rejected if an error occurs. -Data is returned via `onComplete` or `onChunk` callbacks passed as arguments. +Data is returned via `onComplete` or `onChunk` or `onPage` callbacks passed as arguments. The reason for this design is that parquet is a column-oriented format, and returning data in row-oriented format requires transposing the column data. This is an expensive operation in javascript. If you don't pass in an `onComplete` argument to `parquetRead`, hyparquet will skip this transpose step and save memory. -The `onChunk` callback allows column-oriented data to be streamed back as it is read. +### Chunk Streaming + +The `onChunk` callback returns column-oriented data as it is ready. `onChunk` will always return top-level columns, including structs, assembled as a single column. This may require waiting for multiple sub-columns to all load before assembly can occur. + +The `onPage` callback returns column-oriented page data as it is ready. `onPage` will NOT assemble struct columns and will always return individual sub-column data. Note that `onPage` _will_ assemble nested lists. + +In some cases, `onPage` can return data sooner than `onChunk`. ```typescript interface ColumnData { @@ -152,25 +189,20 @@ interface ColumnData { rowStart: number rowEnd: number } -function onChunk(chunk: ColumnData): void { - console.log(chunk) -} -await parquetRead({ file, onChunk }) -``` - -### Authorization - -Pass the `requestInit` option to `asyncBufferFromUrl` to provide authentication information to a remote web server. For example: - -```javascript -const requestInit = { headers: { Authorization: 'Bearer my_token' } } -const file = await asyncBufferFromUrl({ url, requestInit }) +await parquetRead({ + file, + onChunk(chunk: ColumnData) { + console.log('chunk', chunk) + }, + onPage(chunk: ColumnData) { + console.log('page', chunk) + }, +}) ``` ### Returned row format -By default, data returned by `parquetRead` in the `onComplete` function will be one **array** of columns per row. -If you would like each row to be an **object** with each key the name of the column, set the option `rowFormat` to `object`. +By default, the `onComplete` function returns an **array** of values for each row: `[value]`. If you would prefer each row to be an **object**: `{ columnName: value }`, set the option `rowFormat` to `'object'`. ```javascript import { parquetRead } from 'hyparquet' @@ -182,7 +214,7 @@ await parquetRead({ }) ``` -The `parquetReadObjects` function defaults to returning an array of objects. +The `parquetReadObjects` function defaults to `rowFormat: 'object'`. ## Supported Parquet Files diff --git a/src/column.js b/src/column.js index 8ab09a8..e50f8df 100644 --- a/src/column.js +++ b/src/column.js @@ -12,16 +12,28 @@ import { deserializeTCompactProtocol } from './thrift.js' * @param {DataReader} reader * @param {RowGroupSelect} rowGroupSelect row group selection * @param {ColumnDecoder} columnDecoder column decoder params + * @param {(chunk: ColumnData) => void} [onPage] callback for each page * @returns {DecodedArray[]} */ -export function readColumn(reader, { selectStart, selectEnd }, columnDecoder) { - const { element, utf8 } = columnDecoder +export function readColumn(reader, { groupStart, selectStart, selectEnd }, columnDecoder, onPage) { + const { columnName, element, utf8 } = columnDecoder /** @type {DecodedArray[]} */ const chunks = [] /** @type {DecodedArray | undefined} */ let dictionary = undefined + /** @type {DecodedArray | undefined} */ + let lastChunk = undefined let rowCount = 0 + const emitLastChunk = onPage && (() => { + lastChunk && onPage({ + columnName, + columnData: lastChunk, + rowStart: groupStart + rowCount - lastChunk.length, + rowEnd: groupStart + rowCount, + }) + }) + while (rowCount < selectEnd) { if (reader.offset >= reader.view.byteLength - 1) break // end of reader @@ -32,22 +44,23 @@ export function readColumn(reader, { selectStart, selectEnd }, columnDecoder) { dictionary = readPage(reader, header, columnDecoder, dictionary, undefined, 0) dictionary = convert(dictionary, element, utf8) } else { - const lastChunk = chunks.at(-1) const lastChunkLength = lastChunk?.length || 0 const values = readPage(reader, header, columnDecoder, dictionary, lastChunk, selectStart - rowCount) if (lastChunk === values) { // continued from previous page rowCount += values.length - lastChunkLength } else { + emitLastChunk?.() chunks.push(values) rowCount += values.length + lastChunk = values } } } + emitLastChunk?.() // assert(rowCount >= selectEnd) - if (rowCount > selectEnd) { + if (rowCount > selectEnd && lastChunk) { // truncate last chunk to row limit - const lastChunk = chunks[chunks.length - 1] chunks[chunks.length - 1] = lastChunk.slice(0, selectEnd - (rowCount - lastChunk.length)) } return chunks @@ -145,7 +158,7 @@ export function getColumnRange({ dictionary_page_offset, data_page_offset, total /** * Read parquet header from a buffer. * - * @import {ColumnMetaData, DecodedArray, DataReader, PageHeader, ColumnDecoder, RowGroupSelect} from '../src/types.d.ts' + * @import {ColumnData, ColumnDecoder, ColumnMetaData, DataReader, DecodedArray, PageHeader, RowGroupSelect} from '../src/types.d.ts' * @param {DataReader} reader * @returns {PageHeader} */ diff --git a/src/read.js b/src/read.js index 3f691cf..533e274 100644 --- a/src/read.js +++ b/src/read.js @@ -146,9 +146,11 @@ export async function readRowGroup(options, rowGroup, groupStart) { compressors: options.compressors, utf8: options.utf8, } - const columnData = readColumn(reader, rowGroupSelect, columnDecoder) /** @type {DecodedArray[] | undefined} */ - let chunks = columnData + let chunks = readColumn(reader, rowGroupSelect, columnDecoder, options.onPage) + + // skip assembly if no onComplete or onChunk + if (!options.onComplete && !options.onChunk) return // TODO: fast path for non-nested columns // save column data for assembly @@ -172,12 +174,12 @@ export async function readRowGroup(options, rowGroup, groupStart) { if (!chunks) return // notify caller of column data if (options.onChunk) { - for (const chunk of chunks) { + for (const columnData of chunks) { options.onChunk({ columnName, - columnData: chunk, + columnData, rowStart: groupStart, - rowEnd: groupStart + chunk.length, + rowEnd: groupStart + columnData.length, }) } } diff --git a/src/types.d.ts b/src/types.d.ts index 961435b..1c1a6ca 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -9,6 +9,7 @@ export interface ParquetReadOptions { rowStart?: number // first requested row index (inclusive) rowEnd?: number // last requested row index (exclusive) onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may contain data outside the requested range. + onPage?: (chunk: ColumnData) => void // called when a data page is parsed. pages may contain data outside the requested range. onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed compressors?: Compressors // custom decompressors utf8?: boolean // decode byte arrays as utf8 strings (default true) diff --git a/test/read.test.js b/test/read.test.js index 2991a63..b80ea05 100644 --- a/test/read.test.js +++ b/test/read.test.js @@ -181,4 +181,54 @@ describe('parquetRead', () => { expect(rows).toEqual([{ col: 'bad' }]) expect(convertWithDictionary).toHaveBeenCalledTimes(2) }) + + it('reads individual pages', async () => { + const file = await asyncBufferFromFile('test/files/page_indexed.parquet') + /** @type {import('../src/types.js').ColumnData[]} */ + const pages = [] + + await parquetRead({ + file, + onPage(page) { + pages.push(page) + }, + }) + + expect(pages).toEqual([ + { + columnName: 'col', + columnData: [ + 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'good', 'bad', 'bad', 'bad', + 'good', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', + 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'good', + 'bad', 'bad', 'good', 'bad', 'bad', 'bad', 'bad', 'good', 'bad', 'bad', + 'bad', 'bad', 'good', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', + 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'good', 'bad', 'good', 'bad', + 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'good', 'bad', + 'bad', 'bad', 'good', 'bad', 'bad', 'bad', 'bad', 'good', 'bad', 'bad', + 'bad', 'bad', 'bad', 'good', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', + 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'good', 'bad', + ], + rowStart: 0, + rowEnd: 100, + }, + { + columnName: 'col', + columnData: [ + 'good', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'good', + 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', + 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'good', 'bad', + 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', + 'bad', 'bad', 'bad', 'bad', 'bad', 'good', 'bad', 'bad', 'good', 'bad', + 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', + 'bad', 'bad', 'bad', 'bad', 'good', 'bad', 'bad', 'bad', 'good', 'bad', + 'bad', 'good', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', + 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', + 'good', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', + ], + rowStart: 100, + rowEnd: 200, + }, + ]) + }) })