Add onPage callback to parquetRead

This commit is contained in:
Kenny Daniel 2025-04-10 23:29:58 -07:00
parent 90be536e05
commit f5274904b7
No known key found for this signature in database
GPG Key ID: FDF16101AF5AFD3A
5 changed files with 130 additions and 32 deletions

@ -112,8 +112,7 @@ const metadata = parquetMetadata(arrayBuffer)
### AsyncBuffer
Hyparquet accepts argument `file` of type `AsyncBuffer` which is like a js `ArrayBuffer` but the `slice` method can return `Promise<ArrayBuffer>`.
You can pass an `ArrayBuffer` anywhere that an `AsyncBuffer` is expected, if you have the entire file in memory.
Hyparquet requires an argument `file` of type `AsyncBuffer`. An `AsyncBuffer` is similar to a js `ArrayBuffer` but the `slice` method can return async `Promise<ArrayBuffer>`.
```typescript
type Awaitable<T> = T | Promise<T>
@ -123,7 +122,39 @@ interface AsyncBuffer {
}
```
You can define your own `AsyncBuffer` to create a virtual file that can be read asynchronously. In most cases, you should probably use `asyncBufferFromUrl` or `asyncBufferFromFile`.
In most cases, you should probably use `asyncBufferFromUrl` or `asyncBufferFromFile` to create an `AsyncBuffer` for hyparquet.
#### asyncBufferFromFile
If you are in a local node.js environment, use `asyncBufferFromFile` to wrap a local file as an `AsyncBuffer`:
```typescript
const file: AsyncBuffer = asyncBufferFromFile('local.parquet')
const data = await parquetReadObjects({ file })
```
#### asyncBufferFromUrl
If you want to read a parquet file remotely over http, use `asyncBufferFromUrl` to wrap an http url as an `AsyncBuffer` using http range requests.
- Pass `requestInit` option to provide additional fetch headers for authentication (optional)
- Pass `byteLength` if you know the file size to save a round trip HEAD request (optional)
```typescript
const url = 'https://s3.hyperparam.app/wiki_en.parquet'
const requestInit = { headers: { Authorization: 'Bearer my_token' } }
const byteLength = 415958713
const file: AsyncBuffer = await asyncBufferFromUrl({ url, requestInit, byteLength })
const data = await parquetReadObjects({ file })
```
#### ArrayBuffer
You can provide an `ArrayBuffer` anywhere that an `AsyncBuffer` is expected. This is useful if you already have the entire parquet file in memory.
#### Custom AsyncBuffer
You can implement your own `AsyncBuffer` to create a virtual file that can be read asynchronously by hyparquet.
### parquetRead vs parquetReadObjects
@ -139,11 +170,17 @@ parquetReadObjects({ file }): Promise<Record<string, any>[]>
`parquetRead` is the "base" function for reading parquet files.
It returns a `Promise<void>` that resolves when the file has been read or rejected if an error occurs.
Data is returned via `onComplete` or `onChunk` callbacks passed as arguments.
Data is returned via `onComplete` or `onChunk` or `onPage` callbacks passed as arguments.
The reason for this design is that parquet is a column-oriented format, and returning data in row-oriented format requires transposing the column data. This is an expensive operation in javascript. If you don't pass in an `onComplete` argument to `parquetRead`, hyparquet will skip this transpose step and save memory.
The `onChunk` callback allows column-oriented data to be streamed back as it is read.
### Chunk Streaming
The `onChunk` callback returns column-oriented data as it is ready. `onChunk` will always return top-level columns, including structs, assembled as a single column. This may require waiting for multiple sub-columns to all load before assembly can occur.
The `onPage` callback returns column-oriented page data as it is ready. `onPage` will NOT assemble struct columns and will always return individual sub-column data. Note that `onPage` _will_ assemble nested lists.
In some cases, `onPage` can return data sooner than `onChunk`.
```typescript
interface ColumnData {
@ -152,25 +189,20 @@ interface ColumnData {
rowStart: number
rowEnd: number
}
function onChunk(chunk: ColumnData): void {
console.log(chunk)
}
await parquetRead({ file, onChunk })
```
### Authorization
Pass the `requestInit` option to `asyncBufferFromUrl` to provide authentication information to a remote web server. For example:
```javascript
const requestInit = { headers: { Authorization: 'Bearer my_token' } }
const file = await asyncBufferFromUrl({ url, requestInit })
await parquetRead({
file,
onChunk(chunk: ColumnData) {
console.log('chunk', chunk)
},
onPage(chunk: ColumnData) {
console.log('page', chunk)
},
})
```
### Returned row format
By default, data returned by `parquetRead` in the `onComplete` function will be one **array** of columns per row.
If you would like each row to be an **object** with each key the name of the column, set the option `rowFormat` to `object`.
By default, the `onComplete` function returns an **array** of values for each row: `[value]`. If you would prefer each row to be an **object**: `{ columnName: value }`, set the option `rowFormat` to `'object'`.
```javascript
import { parquetRead } from 'hyparquet'
@ -182,7 +214,7 @@ await parquetRead({
})
```
The `parquetReadObjects` function defaults to returning an array of objects.
The `parquetReadObjects` function defaults to `rowFormat: 'object'`.
## Supported Parquet Files

@ -12,16 +12,28 @@ import { deserializeTCompactProtocol } from './thrift.js'
* @param {DataReader} reader
* @param {RowGroupSelect} rowGroupSelect row group selection
* @param {ColumnDecoder} columnDecoder column decoder params
* @param {(chunk: ColumnData) => void} [onPage] callback for each page
* @returns {DecodedArray[]}
*/
export function readColumn(reader, { selectStart, selectEnd }, columnDecoder) {
const { element, utf8 } = columnDecoder
export function readColumn(reader, { groupStart, selectStart, selectEnd }, columnDecoder, onPage) {
const { columnName, element, utf8 } = columnDecoder
/** @type {DecodedArray[]} */
const chunks = []
/** @type {DecodedArray | undefined} */
let dictionary = undefined
/** @type {DecodedArray | undefined} */
let lastChunk = undefined
let rowCount = 0
const emitLastChunk = onPage && (() => {
lastChunk && onPage({
columnName,
columnData: lastChunk,
rowStart: groupStart + rowCount - lastChunk.length,
rowEnd: groupStart + rowCount,
})
})
while (rowCount < selectEnd) {
if (reader.offset >= reader.view.byteLength - 1) break // end of reader
@ -32,22 +44,23 @@ export function readColumn(reader, { selectStart, selectEnd }, columnDecoder) {
dictionary = readPage(reader, header, columnDecoder, dictionary, undefined, 0)
dictionary = convert(dictionary, element, utf8)
} else {
const lastChunk = chunks.at(-1)
const lastChunkLength = lastChunk?.length || 0
const values = readPage(reader, header, columnDecoder, dictionary, lastChunk, selectStart - rowCount)
if (lastChunk === values) {
// continued from previous page
rowCount += values.length - lastChunkLength
} else {
emitLastChunk?.()
chunks.push(values)
rowCount += values.length
lastChunk = values
}
}
}
emitLastChunk?.()
// assert(rowCount >= selectEnd)
if (rowCount > selectEnd) {
if (rowCount > selectEnd && lastChunk) {
// truncate last chunk to row limit
const lastChunk = chunks[chunks.length - 1]
chunks[chunks.length - 1] = lastChunk.slice(0, selectEnd - (rowCount - lastChunk.length))
}
return chunks
@ -145,7 +158,7 @@ export function getColumnRange({ dictionary_page_offset, data_page_offset, total
/**
* Read parquet header from a buffer.
*
* @import {ColumnMetaData, DecodedArray, DataReader, PageHeader, ColumnDecoder, RowGroupSelect} from '../src/types.d.ts'
* @import {ColumnData, ColumnDecoder, ColumnMetaData, DataReader, DecodedArray, PageHeader, RowGroupSelect} from '../src/types.d.ts'
* @param {DataReader} reader
* @returns {PageHeader}
*/

@ -146,9 +146,11 @@ export async function readRowGroup(options, rowGroup, groupStart) {
compressors: options.compressors,
utf8: options.utf8,
}
const columnData = readColumn(reader, rowGroupSelect, columnDecoder)
/** @type {DecodedArray[] | undefined} */
let chunks = columnData
let chunks = readColumn(reader, rowGroupSelect, columnDecoder, options.onPage)
// skip assembly if no onComplete or onChunk
if (!options.onComplete && !options.onChunk) return
// TODO: fast path for non-nested columns
// save column data for assembly
@ -172,12 +174,12 @@ export async function readRowGroup(options, rowGroup, groupStart) {
if (!chunks) return
// notify caller of column data
if (options.onChunk) {
for (const chunk of chunks) {
for (const columnData of chunks) {
options.onChunk({
columnName,
columnData: chunk,
columnData,
rowStart: groupStart,
rowEnd: groupStart + chunk.length,
rowEnd: groupStart + columnData.length,
})
}
}

1
src/types.d.ts vendored

@ -9,6 +9,7 @@ export interface ParquetReadOptions {
rowStart?: number // first requested row index (inclusive)
rowEnd?: number // last requested row index (exclusive)
onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may contain data outside the requested range.
onPage?: (chunk: ColumnData) => void // called when a data page is parsed. pages may contain data outside the requested range.
onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed
compressors?: Compressors // custom decompressors
utf8?: boolean // decode byte arrays as utf8 strings (default true)

@ -181,4 +181,54 @@ describe('parquetRead', () => {
expect(rows).toEqual([{ col: 'bad' }])
expect(convertWithDictionary).toHaveBeenCalledTimes(2)
})
it('reads individual pages', async () => {
const file = await asyncBufferFromFile('test/files/page_indexed.parquet')
/** @type {import('../src/types.js').ColumnData[]} */
const pages = []
await parquetRead({
file,
onPage(page) {
pages.push(page)
},
})
expect(pages).toEqual([
{
columnName: 'col',
columnData: [
'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'good', 'bad', 'bad', 'bad',
'good', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad',
'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'good',
'bad', 'bad', 'good', 'bad', 'bad', 'bad', 'bad', 'good', 'bad', 'bad',
'bad', 'bad', 'good', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad',
'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'good', 'bad', 'good', 'bad',
'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'good', 'bad',
'bad', 'bad', 'good', 'bad', 'bad', 'bad', 'bad', 'good', 'bad', 'bad',
'bad', 'bad', 'bad', 'good', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad',
'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'good', 'bad',
],
rowStart: 0,
rowEnd: 100,
},
{
columnName: 'col',
columnData: [
'good', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'good',
'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad',
'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'good', 'bad',
'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad',
'bad', 'bad', 'bad', 'bad', 'bad', 'good', 'bad', 'bad', 'good', 'bad',
'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad',
'bad', 'bad', 'bad', 'bad', 'good', 'bad', 'bad', 'bad', 'good', 'bad',
'bad', 'good', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad',
'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad',
'good', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad', 'bad',
],
rowStart: 100,
rowEnd: 200,
},
])
})
})