mirror of
https://github.com/asadbek064/hyparquet.git
synced 2026-02-22 04:11:32 +00:00
Column filter by name
This commit is contained in:
parent
c6ad30b59a
commit
2d061392b9
@ -27,7 +27,7 @@
|
||||
"typecheck": "tsc"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/node": "20.11.26",
|
||||
"@types/node": "20.11.27",
|
||||
"@typescript-eslint/eslint-plugin": "7.2.0",
|
||||
"@vitest/coverage-v8": "1.3.1",
|
||||
"eslint": "8.57.0",
|
||||
|
||||
8
src/hyparquet.d.ts
vendored
8
src/hyparquet.d.ts
vendored
@ -14,7 +14,7 @@ export { AsyncBuffer, FileMetaData, SchemaTree } from './types'
|
||||
* @param {object} options read options
|
||||
* @param {AsyncBuffer} options.file file-like object containing parquet data
|
||||
* @param {FileMetaData} [options.metadata] parquet file metadata
|
||||
* @param {number[]} [options.columns] columns to read, all columns if undefined
|
||||
* @param {string[]} [options.columns] columns to read, all columns if undefined
|
||||
* @param {number} [options.rowStart] first requested row index (inclusive)
|
||||
* @param {number} [options.rowEnd] last requested row index (exclusive)
|
||||
* @param {Function} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
|
||||
@ -92,7 +92,7 @@ export function toJson(obj: any): unknown
|
||||
export interface ParquetReadOptions {
|
||||
file: AsyncBuffer // file-like object containing parquet data
|
||||
metadata?: FileMetaData // parquet metadata, will be parsed if not provided
|
||||
columns?: number[] // columns to read, all columns if undefined
|
||||
columns?: string[] // columns to read, all columns if undefined
|
||||
rowStart?: number // inclusive
|
||||
rowEnd?: number // exclusive
|
||||
onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may be outside the requested range.
|
||||
@ -104,8 +104,8 @@ export interface ParquetReadOptions {
|
||||
* A run of column data
|
||||
*/
|
||||
export interface ColumnData {
|
||||
column: number
|
||||
data: ArrayLike<any>
|
||||
columnName: string
|
||||
columnData: ArrayLike<any>
|
||||
rowStart: number
|
||||
rowEnd: number
|
||||
}
|
||||
|
||||
32
src/read.js
32
src/read.js
@ -20,7 +20,7 @@ import { parquetMetadataAsync } from './metadata.js'
|
||||
* @param {object} options read options
|
||||
* @param {AsyncBuffer} options.file file-like object containing parquet data
|
||||
* @param {FileMetaData} [options.metadata] parquet file metadata
|
||||
* @param {number[]} [options.columns] columns to read, all columns if undefined
|
||||
* @param {string[]} [options.columns] columns to read, all columns if undefined
|
||||
* @param {number} [options.rowStart] first requested row index (inclusive)
|
||||
* @param {number} [options.rowEnd] last requested row index (exclusive)
|
||||
* @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
|
||||
@ -69,7 +69,7 @@ export async function parquetRead(options) {
|
||||
* @param {object} options read options
|
||||
* @param {AsyncBuffer} options.file file-like object containing parquet data
|
||||
* @param {FileMetaData} [options.metadata] parquet file metadata
|
||||
* @param {number[]} [options.columns] columns to read, all columns if undefined
|
||||
* @param {string[]} [options.columns] columns to read, all columns if undefined
|
||||
* @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
|
||||
* @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed
|
||||
* @param {Compressors} [options.compressors] custom decompressors
|
||||
@ -82,18 +82,20 @@ async function readRowGroup(options, rowGroup) {
|
||||
|
||||
// loop through metadata to find min/max bytes to read
|
||||
let [groupStartByte, groupEndByte] = [file.byteLength, 0]
|
||||
rowGroup.columns.forEach((columnChunk, columnIndex) => {
|
||||
// skip columns that are not requested or lack metadata
|
||||
if (columns && !columns.includes(columnIndex)) return
|
||||
if (!columnChunk.meta_data) return
|
||||
rowGroup.columns.forEach(({ meta_data: columnMetadata }) => {
|
||||
if (!columnMetadata) throw new Error('parquet column metadata is undefined')
|
||||
const columnName = columnMetadata.path_in_schema.join('.')
|
||||
// skip columns that are not requested
|
||||
if (columns && !columns.includes(columnName)) return
|
||||
|
||||
const startByte = getColumnOffset(columnChunk.meta_data)
|
||||
const endByte = startByte + Number(columnChunk.meta_data.total_compressed_size)
|
||||
const startByte = getColumnOffset(columnMetadata)
|
||||
const endByte = startByte + Number(columnMetadata.total_compressed_size)
|
||||
groupStartByte = Math.min(groupStartByte, startByte)
|
||||
groupEndByte = Math.max(groupEndByte, endByte)
|
||||
})
|
||||
if (groupStartByte >= groupEndByte) {
|
||||
throw new Error('parquet missing row group metadata')
|
||||
if (groupStartByte >= groupEndByte && columns?.length) {
|
||||
// TODO: should throw if any column is missing
|
||||
throw new Error(`parquet columns not found: ${columns.join(', ')}`)
|
||||
}
|
||||
// if row group size is less than 128mb, pre-load in one read
|
||||
let groupBuffer
|
||||
@ -108,12 +110,14 @@ async function readRowGroup(options, rowGroup) {
|
||||
const promises = []
|
||||
// read column data
|
||||
for (let columnIndex = 0; columnIndex < rowGroup.columns.length; columnIndex++) {
|
||||
// skip columns that are not requested
|
||||
if (columns && !columns.includes(columnIndex)) continue
|
||||
|
||||
const columnMetadata = rowGroup.columns[columnIndex].meta_data
|
||||
if (!columnMetadata) throw new Error('parquet column metadata is undefined')
|
||||
|
||||
// skip columns that are not requested
|
||||
const columnName = columnMetadata.path_in_schema.join('.')
|
||||
// skip columns that are not requested
|
||||
if (columns && !columns.includes(columnName)) continue
|
||||
|
||||
const columnStartByte = getColumnOffset(columnMetadata)
|
||||
const columnEndByte = columnStartByte + Number(columnMetadata.total_compressed_size)
|
||||
const columnBytes = columnEndByte - columnStartByte
|
||||
@ -145,7 +149,7 @@ async function readRowGroup(options, rowGroup) {
|
||||
throw new Error(`parquet column length ${columnData.length} does not match row group length ${rowGroup.num_rows}`)
|
||||
}
|
||||
// notify caller of column data
|
||||
if (options.onChunk) options.onChunk({ column: columnIndex, data: columnData, rowStart: 0, rowEnd: columnData.length })
|
||||
if (options.onChunk) options.onChunk({ columnName, columnData, rowStart: 0, rowEnd: columnData.length })
|
||||
// add column data to group data only if onComplete is defined
|
||||
if (options.onComplete) addColumn(groupData, columnIndex, columnData)
|
||||
}))
|
||||
|
||||
@ -38,11 +38,11 @@ describe('parquetRead', () => {
|
||||
const asyncBuffer = fileToAsyncBuffer('test/files/datapage_v2.snappy.parquet')
|
||||
await parquetRead({
|
||||
file: asyncBuffer,
|
||||
columns: [2],
|
||||
columns: ['c'],
|
||||
onChunk: (rows) => {
|
||||
expect(toJson(rows)).toEqual({
|
||||
column: 2,
|
||||
data: [2, 3, 4, 5, 2],
|
||||
columnName: 'c',
|
||||
columnData: [2, 3, 4, 5, 2],
|
||||
rowStart: 0,
|
||||
rowEnd: 5,
|
||||
})
|
||||
|
||||
Loading…
Reference in New Issue
Block a user