mirror of
https://github.com/asadbek064/hyparquet.git
synced 2026-02-23 12:51:32 +00:00
Async parquetRead with options
This commit is contained in:
parent
e701cfa4a1
commit
647056da8b
@ -1,8 +1,8 @@
|
||||
import { CompressionCodec, Encoding, PageType } from './constants.js'
|
||||
import { assembleObjects, readDataPage, readDictionaryPage } from './datapage.js'
|
||||
import { parquetHeader } from './header.js'
|
||||
import { getMaxDefinitionLevel, isRequired } from './schema.js'
|
||||
import { snappyUncompress } from './snappy.js'
|
||||
import { CompressionCodec, Encoding, PageType } from './types.js'
|
||||
|
||||
/**
|
||||
* @typedef {import('./types.js').ArrayBufferLike} ArrayBufferLike
|
||||
|
||||
@ -26,3 +26,33 @@ export const FieldRepetitionType = {
|
||||
OPTIONAL: 1,
|
||||
REPEATED: 2,
|
||||
}
|
||||
|
||||
export const CompressionCodec = {
|
||||
UNCOMPRESSED: 0,
|
||||
SNAPPY: 1,
|
||||
GZIP: 2,
|
||||
LZO: 3,
|
||||
BROTLI: 4,
|
||||
LZ4: 5,
|
||||
ZSTD: 6,
|
||||
LZ4_RAW: 7,
|
||||
}
|
||||
|
||||
export const PageType = {
|
||||
DATA_PAGE: 0,
|
||||
INDEX_PAGE: 1,
|
||||
DICTIONARY_PAGE: 2,
|
||||
DATA_PAGE_V2: 3,
|
||||
}
|
||||
|
||||
export const Encoding = {
|
||||
PLAIN: 0,
|
||||
PLAIN_DICTIONARY: 2,
|
||||
RLE: 3,
|
||||
BIT_PACKED: 4, // deprecated
|
||||
DELTA_BINARY_PACKED: 5,
|
||||
DELTA_LENGTH_BYTE_ARRAY: 6,
|
||||
DELTA_BYTE_ARRAY: 7,
|
||||
RLE_DICTIONARY: 8,
|
||||
BYTE_STREAM_SPLIT: 9,
|
||||
}
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import { Encoding, ParquetType } from './constants.js'
|
||||
import { readData, readPlain, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js'
|
||||
import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired, skipDefinitionBytes } from './schema.js'
|
||||
import { Encoding, ParquetType } from './types.js'
|
||||
|
||||
const skipNulls = false // TODO
|
||||
|
||||
|
||||
46
src/hyparquet.d.ts
vendored
46
src/hyparquet.d.ts
vendored
@ -1,12 +1,27 @@
|
||||
export { AsyncBuffer, FileMetaData } from './types'
|
||||
|
||||
/**
|
||||
* Read parquet data rows from a file
|
||||
* Read parquet data rows from a file-like object.
|
||||
* Reads the minimal number of row groups and columns to satisfy the request.
|
||||
*
|
||||
* @param {ArrayBuffer} arrayBuffer parquet file contents
|
||||
* @returns {any[][]} row data
|
||||
* Returns a void promise when complete, and to throw errors.
|
||||
* Data is returned in onComplete, not the return promise, because
|
||||
* if onComplete is undefined, we parse the data, and emit chunks, but skip
|
||||
* computing the row view directly. This saves on allocation if the caller
|
||||
* wants to cache the full chunks, and make their own view of the data from
|
||||
* the chunks.
|
||||
*
|
||||
* @param {object} options read options
|
||||
* @param {AsyncBuffer} options.file file-like object containing parquet data
|
||||
* @param {FileMetaData} [options.metadata] parquet file metadata
|
||||
* @param {number[]} [options.columns] columns to read, all columns if undefined
|
||||
* @param {number} [options.rowStart] first requested row index (inclusive)
|
||||
* @param {number} [options.rowEnd] last requested row index (exclusive)
|
||||
* @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
|
||||
* @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed
|
||||
* @returns {Promise<void>} resolves when all requested rows and columns are parsed
|
||||
*/
|
||||
export function parquetRead(arrayBuffer: ArrayBuffer): any[][]
|
||||
export async function parquetRead(options: ParquetReadOptions): Promise<void>
|
||||
|
||||
/**
|
||||
* Read parquet metadata from an async buffer.
|
||||
@ -54,3 +69,26 @@ export function snappyUncompress(inputArray: Uint8Array, outputArray: Uint8Array
|
||||
* @returns {unknown} converted object
|
||||
*/
|
||||
export function toJson(obj: any): unknown
|
||||
|
||||
/**
|
||||
* Parquet query options for reading data
|
||||
*/
|
||||
export interface ParquetReadOptions {
|
||||
file: AsyncBuffer // file-like object containing parquet data
|
||||
metadata?: FileMetaData // parquet metadata, will be parsed if not provided
|
||||
columns?: number[] // columns to read, all columns if undefined
|
||||
rowStart?: number // inclusive
|
||||
rowEnd?: number // exclusive
|
||||
onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may be outside the requested range.
|
||||
onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed
|
||||
}
|
||||
|
||||
/**
|
||||
* A run of column data
|
||||
*/
|
||||
export interface ColumnData {
|
||||
column: number
|
||||
data: ArrayLike<any>
|
||||
rowStart: number
|
||||
rowEnd: number
|
||||
}
|
||||
|
||||
@ -1,19 +1,11 @@
|
||||
import { parquetMetadata, parquetMetadataAsync } from './metadata.js'
|
||||
export { parquetMetadata, parquetMetadataAsync }
|
||||
|
||||
import { parquetRead } from './read.js'
|
||||
export { parquetRead }
|
||||
|
||||
import { snappyUncompress } from './snappy.js'
|
||||
export { snappyUncompress }
|
||||
|
||||
import { toJson } from './toJson.js'
|
||||
export { toJson }
|
||||
|
||||
/**
|
||||
* Read parquet data rows from a buffer.
|
||||
*
|
||||
* @param {ArrayBuffer} arrayBuffer parquet file contents
|
||||
* @returns {any[][]} row data
|
||||
*/
|
||||
export function parquetRead(arrayBuffer) {
|
||||
const metadata = parquetMetadata(arrayBuffer)
|
||||
throw new Error('not implemented')
|
||||
}
|
||||
|
||||
159
src/read.js
Normal file
159
src/read.js
Normal file
@ -0,0 +1,159 @@
|
||||
|
||||
import { offsetArrayBuffer } from './asyncbuffer.js'
|
||||
import { getColumnOffset, readColumn } from './column.js'
|
||||
import { parquetMetadataAsync } from './metadata.js'
|
||||
|
||||
/**
|
||||
* Read parquet data rows from a file-like object.
|
||||
* Reads the minimal number of row groups and columns to satisfy the request.
|
||||
*
|
||||
* Returns a void promise when complete, and to throw errors.
|
||||
* Data is returned in onComplete, not the return promise, because
|
||||
* if onComplete is undefined, we parse the data, and emit chunks, but skip
|
||||
* computing the row view directly. This saves on allocation if the caller
|
||||
* wants to cache the full chunks, and make their own view of the data from
|
||||
* the chunks.
|
||||
*
|
||||
* @typedef {import('./hyparquet.js').ColumnData} ColumnData
|
||||
* @typedef {import('./types.js').AsyncBuffer} AsyncBuffer
|
||||
* @typedef {import('./types.js').FileMetaData} FileMetaData
|
||||
* @param {object} options read options
|
||||
* @param {AsyncBuffer} options.file file-like object containing parquet data
|
||||
* @param {FileMetaData} [options.metadata] parquet file metadata
|
||||
* @param {number[]} [options.columns] columns to read, all columns if undefined
|
||||
* @param {number} [options.rowStart] first requested row index (inclusive)
|
||||
* @param {number} [options.rowEnd] last requested row index (exclusive)
|
||||
* @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
|
||||
* @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed
|
||||
* @returns {Promise<void>} resolves when all requested rows and columns are parsed
|
||||
*/
|
||||
export async function parquetRead(options) {
|
||||
// load metadata if not provided
|
||||
options.metadata ||= await parquetMetadataAsync(options.file)
|
||||
if (!options.metadata) throw new Error('parquet metadata not found')
|
||||
|
||||
const { metadata, onComplete } = options
|
||||
/** @type {any[][]} */
|
||||
const rowData = []
|
||||
const rowStart = options.rowStart || 0
|
||||
const rowEnd = options.rowEnd || Number(metadata.num_rows)
|
||||
|
||||
// find which row groups to read
|
||||
let groupStart = 0 // first row index of the current group
|
||||
for (const rowGroup of metadata.row_groups) {
|
||||
// number of rows in this row group
|
||||
const groupRows = Number(rowGroup.num_rows)
|
||||
// if row group overlaps with row range, read it
|
||||
if (groupStart + groupRows >= rowStart && groupStart < rowEnd) {
|
||||
// read row group
|
||||
const groupData = await readRowGroup(options, rowGroup)
|
||||
if (onComplete) {
|
||||
// filter to rows in range
|
||||
const start = Math.max(rowStart - groupStart, 0)
|
||||
const end = Math.min(rowEnd - groupStart, groupRows)
|
||||
rowData.push(...groupData.slice(start, end))
|
||||
}
|
||||
}
|
||||
groupStart += groupRows
|
||||
}
|
||||
|
||||
if (onComplete) onComplete(rowData)
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a row group from a file-like object.
|
||||
* Reads the minimal number of columns to satisfy the request.
|
||||
*
|
||||
* @typedef {import('./types.js').RowGroup} RowGroup
|
||||
* @param {object} options read options
|
||||
* @param {AsyncBuffer} options.file file-like object containing parquet data
|
||||
* @param {FileMetaData} [options.metadata] parquet file metadata
|
||||
* @param {number[]} [options.columns] columns to read, all columns if undefined
|
||||
* @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
|
||||
* @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed
|
||||
* @param {RowGroup} rowGroup row group to read
|
||||
* @returns {Promise<any[][]>} resolves to row data
|
||||
*/
|
||||
async function readRowGroup(options, rowGroup) {
|
||||
const { file, metadata, columns } = options
|
||||
if (!metadata) throw new Error('parquet metadata not found')
|
||||
|
||||
// loop through metadata to find min/max bytes to read
|
||||
let [groupStartByte, groupEndByte] = [file.byteLength, 0]
|
||||
rowGroup.columns.forEach((columnChunk, columnIndex) => {
|
||||
// skip columns that are not requested or lack metadata
|
||||
if (columns && !columns.includes(columnIndex)) return
|
||||
if (!columnChunk.meta_data) return
|
||||
|
||||
const startByte = getColumnOffset(columnChunk.meta_data)
|
||||
const endByte = startByte + Number(columnChunk.meta_data.total_compressed_size)
|
||||
groupStartByte = Math.min(groupStartByte, startByte)
|
||||
groupEndByte = Math.max(groupEndByte, endByte)
|
||||
})
|
||||
if (groupStartByte >= groupEndByte) {
|
||||
throw new Error('parquet missing row group metadata')
|
||||
}
|
||||
// if row group size is less than 128mb, pre-load in one read
|
||||
let groupBuffer = undefined
|
||||
if (groupEndByte - groupStartByte <= 1 << 27) {
|
||||
// pre-load row group byte data in one big read,
|
||||
// otherwise read column data individually
|
||||
groupBuffer = offsetArrayBuffer(await file.slice(groupStartByte, groupEndByte), groupStartByte)
|
||||
}
|
||||
|
||||
/** @type {any[][]} */
|
||||
const groupData = []
|
||||
const promises = []
|
||||
// read column data
|
||||
for (let columnIndex = 0; columnIndex < rowGroup.columns.length; columnIndex++) {
|
||||
// skip columns that are not requested
|
||||
if (columns && !columns.includes(columnIndex)) continue
|
||||
const columnMetadata = rowGroup.columns[columnIndex].meta_data
|
||||
if (!columnMetadata) throw new Error('parquet column metadata is undefined')
|
||||
const columnStartByte = getColumnOffset(columnMetadata)
|
||||
const columnEndByte = columnStartByte + Number(columnMetadata.total_compressed_size)
|
||||
const columnBytes = columnEndByte - columnStartByte
|
||||
// skip columns larger than 1gb
|
||||
if (columnBytes > 1 << 30) {
|
||||
console.warn(`parquet skipping huge column "${columnMetadata.path_in_schema}" ${columnBytes.toLocaleString()} bytes`)
|
||||
continue
|
||||
}
|
||||
// use pre-loaded row group byte data if available, else read column data
|
||||
let buffer
|
||||
if (!groupBuffer) {
|
||||
buffer = file.slice(columnStartByte, columnEndByte).then(arrayBuffer => {
|
||||
return offsetArrayBuffer(arrayBuffer, columnStartByte)
|
||||
})
|
||||
} else {
|
||||
buffer = Promise.resolve(groupBuffer)
|
||||
}
|
||||
// read column data async
|
||||
promises.push(buffer.then(arrayBuffer => {
|
||||
// TODO: extract SchemaElement for this column
|
||||
const columnData = readColumn(arrayBuffer, rowGroup, columnMetadata, metadata.schema)
|
||||
if (columnData.length !== Number(rowGroup.num_rows)) {
|
||||
throw new Error('parquet column length does not match row group length')
|
||||
}
|
||||
// notify caller of column data
|
||||
if (options.onChunk) options.onChunk({ column: columnIndex, data: columnData, rowStart: 0, rowEnd: columnData.length })
|
||||
// add column data to group data only if onComplete is defined
|
||||
if (options.onComplete) addColumn(groupData, columnIndex, columnData)
|
||||
}))
|
||||
}
|
||||
await Promise.all(promises)
|
||||
return groupData
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a column to rows.
|
||||
*
|
||||
* @param {any[][]} rows rows to add column data to
|
||||
* @param {number} columnIndex column index to add
|
||||
* @param {ArrayLike<any>} columnData column data to add
|
||||
*/
|
||||
function addColumn(rows, columnIndex, columnData) {
|
||||
for (let i = 0; i < columnData.length; i++) {
|
||||
if (!rows[i]) rows[i] = []
|
||||
rows[i][columnIndex] = columnData[i]
|
||||
}
|
||||
}
|
||||
@ -1,6 +1,6 @@
|
||||
import fs from 'fs'
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import { parquetMetadata, parquetMetadataAsync } from '../src/metadata.js'
|
||||
import { parquetMetadata, parquetMetadataAsync } from '../src/hyparquet.js'
|
||||
import { toJson } from '../src/toJson.js'
|
||||
|
||||
/**
|
||||
@ -29,13 +29,13 @@ function fileToAsyncBuffer(filePath) {
|
||||
}
|
||||
|
||||
describe('parquetMetadata', () => {
|
||||
it('should correctly decode metadata from addrtype-missing-value.parquet', async () => {
|
||||
it('should parse metadata from addrtype-missing-value.parquet', async () => {
|
||||
const arrayBuffer = await readFileToArrayBuffer('test/files/addrtype-missing-value.parquet')
|
||||
const result = parquetMetadata(arrayBuffer)
|
||||
expect(toJson(result)).toEqual(addrtypeMetadata)
|
||||
})
|
||||
|
||||
it('should correctly decode metadata from rowgroups.parquet', async () => {
|
||||
it('should parse metadata from rowgroups.parquet', async () => {
|
||||
const arrayBuffer = await readFileToArrayBuffer('test/files/rowgroups.parquet')
|
||||
const result = parquetMetadata(arrayBuffer)
|
||||
expect(toJson(result)).containSubset(rowgroupsMetadata)
|
||||
@ -63,13 +63,13 @@ describe('parquetMetadata', () => {
|
||||
})
|
||||
|
||||
describe('parquetMetadataAsync', () => {
|
||||
it('should correctly decode metadata from addrtype-missing-value.parquet', async () => {
|
||||
it('should parse metadata asynchronously from addrtype-missing-value.parquet', async () => {
|
||||
const asyncBuffer = fileToAsyncBuffer('test/files/addrtype-missing-value.parquet')
|
||||
const result = await parquetMetadataAsync(asyncBuffer)
|
||||
expect(toJson(result)).toEqual(addrtypeMetadata)
|
||||
})
|
||||
|
||||
it('should correctly decode metadata from rowgroups.parquet', async () => {
|
||||
it('should parse metadata asynchronously from rowgroups.parquet', async () => {
|
||||
const asyncBuffer = fileToAsyncBuffer('test/files/rowgroups.parquet')
|
||||
// force two fetches
|
||||
const result = await parquetMetadataAsync(asyncBuffer, 1609)
|
||||
|
||||
83
test/read.test.js
Normal file
83
test/read.test.js
Normal file
@ -0,0 +1,83 @@
|
||||
import fs from 'fs'
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import { parquetRead } from '../src/hyparquet.js'
|
||||
import { toJson } from '../src/toJson.js'
|
||||
|
||||
/**
|
||||
* Helper function to read .parquet file into ArrayBuffer
|
||||
*
|
||||
* @param {string} filePath
|
||||
* @returns {Promise<ArrayBuffer>}
|
||||
*/
|
||||
async function readFileToArrayBuffer(filePath) {
|
||||
const buffer = await fs.promises.readFile(filePath)
|
||||
return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength)
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap .parquet file in an AsyncBuffer
|
||||
*
|
||||
* @typedef {import('../src/types.js').AsyncBuffer} AsyncBuffer
|
||||
* @param {string} filePath
|
||||
* @returns {AsyncBuffer}
|
||||
*/
|
||||
function fileToAsyncBuffer(filePath) {
|
||||
return {
|
||||
byteLength: fs.statSync(filePath).size,
|
||||
slice: async (start, end) => (await readFileToArrayBuffer(filePath)).slice(start, end),
|
||||
}
|
||||
}
|
||||
|
||||
describe('parquetMetadataAsync', () => {
|
||||
it('should parse data from addrtype-missing-value.parquet', async () => {
|
||||
const asyncBuffer = fileToAsyncBuffer('test/files/addrtype-missing-value.parquet')
|
||||
await parquetRead({
|
||||
file: asyncBuffer,
|
||||
onComplete: (rows) => {
|
||||
expect(toJson(rows)).toEqual(addrtypeData)
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
it('should parse data from rowgroups.parquet', async () => {
|
||||
const asyncBuffer = fileToAsyncBuffer('test/files/rowgroups.parquet')
|
||||
await parquetRead({
|
||||
file: asyncBuffer,
|
||||
onComplete: (rows) => {
|
||||
expect(toJson(rows)).toEqual(rowgroupsData)
|
||||
},
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// Parquet v1 from DuckDB
|
||||
const addrtypeData = [
|
||||
['Block'],
|
||||
['Intersection'],
|
||||
['Block'],
|
||||
['Block'],
|
||||
[undefined],
|
||||
['Block'],
|
||||
['Intersection'],
|
||||
['Block'],
|
||||
['Block'],
|
||||
['Intersection'],
|
||||
]
|
||||
|
||||
const rowgroupsData = [
|
||||
[1],
|
||||
[2],
|
||||
[3],
|
||||
[4],
|
||||
[5],
|
||||
[6],
|
||||
[7],
|
||||
[8],
|
||||
[9],
|
||||
[10],
|
||||
[11],
|
||||
[12],
|
||||
[13],
|
||||
[14],
|
||||
[15],
|
||||
]
|
||||
Loading…
Reference in New Issue
Block a user