diff --git a/src/column.js b/src/column.js index 4de8c80..83a8535 100644 --- a/src/column.js +++ b/src/column.js @@ -1,8 +1,8 @@ +import { CompressionCodec, Encoding, PageType } from './constants.js' import { assembleObjects, readDataPage, readDictionaryPage } from './datapage.js' import { parquetHeader } from './header.js' import { getMaxDefinitionLevel, isRequired } from './schema.js' import { snappyUncompress } from './snappy.js' -import { CompressionCodec, Encoding, PageType } from './types.js' /** * @typedef {import('./types.js').ArrayBufferLike} ArrayBufferLike diff --git a/src/constants.js b/src/constants.js index 61e785c..c742c7b 100644 --- a/src/constants.js +++ b/src/constants.js @@ -26,3 +26,33 @@ export const FieldRepetitionType = { OPTIONAL: 1, REPEATED: 2, } + +export const CompressionCodec = { + UNCOMPRESSED: 0, + SNAPPY: 1, + GZIP: 2, + LZO: 3, + BROTLI: 4, + LZ4: 5, + ZSTD: 6, + LZ4_RAW: 7, +} + +export const PageType = { + DATA_PAGE: 0, + INDEX_PAGE: 1, + DICTIONARY_PAGE: 2, + DATA_PAGE_V2: 3, +} + +export const Encoding = { + PLAIN: 0, + PLAIN_DICTIONARY: 2, + RLE: 3, + BIT_PACKED: 4, // deprecated + DELTA_BINARY_PACKED: 5, + DELTA_LENGTH_BYTE_ARRAY: 6, + DELTA_BYTE_ARRAY: 7, + RLE_DICTIONARY: 8, + BYTE_STREAM_SPLIT: 9, +} diff --git a/src/datapage.js b/src/datapage.js index 2d0d38c..eb1c91b 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -1,6 +1,6 @@ +import { Encoding, ParquetType } from './constants.js' import { readData, readPlain, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js' import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired, skipDefinitionBytes } from './schema.js' -import { Encoding, ParquetType } from './types.js' const skipNulls = false // TODO diff --git a/src/hyparquet.d.ts b/src/hyparquet.d.ts index 4e728de..3dcc0fe 100644 --- a/src/hyparquet.d.ts +++ b/src/hyparquet.d.ts @@ -1,12 +1,27 @@ export { AsyncBuffer, FileMetaData } from './types' /** - * Read parquet data rows from a file + * Read parquet data rows from a file-like object. + * Reads the minimal number of row groups and columns to satisfy the request. * - * @param {ArrayBuffer} arrayBuffer parquet file contents - * @returns {any[][]} row data + * Returns a void promise when complete, and to throw errors. + * Data is returned in onComplete, not the return promise, because + * if onComplete is undefined, we parse the data, and emit chunks, but skip + * computing the row view directly. This saves on allocation if the caller + * wants to cache the full chunks, and make their own view of the data from + * the chunks. + * + * @param {object} options read options + * @param {AsyncBuffer} options.file file-like object containing parquet data + * @param {FileMetaData} [options.metadata] parquet file metadata + * @param {number[]} [options.columns] columns to read, all columns if undefined + * @param {number} [options.rowStart] first requested row index (inclusive) + * @param {number} [options.rowEnd] last requested row index (exclusive) + * @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range. + * @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed + * @returns {Promise} resolves when all requested rows and columns are parsed */ -export function parquetRead(arrayBuffer: ArrayBuffer): any[][] +export async function parquetRead(options: ParquetReadOptions): Promise /** * Read parquet metadata from an async buffer. @@ -54,3 +69,26 @@ export function snappyUncompress(inputArray: Uint8Array, outputArray: Uint8Array * @returns {unknown} converted object */ export function toJson(obj: any): unknown + +/** + * Parquet query options for reading data + */ +export interface ParquetReadOptions { + file: AsyncBuffer // file-like object containing parquet data + metadata?: FileMetaData // parquet metadata, will be parsed if not provided + columns?: number[] // columns to read, all columns if undefined + rowStart?: number // inclusive + rowEnd?: number // exclusive + onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may be outside the requested range. + onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed +} + +/** + * A run of column data + */ +export interface ColumnData { + column: number + data: ArrayLike + rowStart: number + rowEnd: number +} diff --git a/src/hyparquet.js b/src/hyparquet.js index 1a80a59..5f826d2 100644 --- a/src/hyparquet.js +++ b/src/hyparquet.js @@ -1,19 +1,11 @@ import { parquetMetadata, parquetMetadataAsync } from './metadata.js' export { parquetMetadata, parquetMetadataAsync } +import { parquetRead } from './read.js' +export { parquetRead } + import { snappyUncompress } from './snappy.js' export { snappyUncompress } import { toJson } from './toJson.js' export { toJson } - -/** - * Read parquet data rows from a buffer. - * - * @param {ArrayBuffer} arrayBuffer parquet file contents - * @returns {any[][]} row data - */ -export function parquetRead(arrayBuffer) { - const metadata = parquetMetadata(arrayBuffer) - throw new Error('not implemented') -} diff --git a/src/read.js b/src/read.js new file mode 100644 index 0000000..d4648ec --- /dev/null +++ b/src/read.js @@ -0,0 +1,159 @@ + +import { offsetArrayBuffer } from './asyncbuffer.js' +import { getColumnOffset, readColumn } from './column.js' +import { parquetMetadataAsync } from './metadata.js' + +/** + * Read parquet data rows from a file-like object. + * Reads the minimal number of row groups and columns to satisfy the request. + * + * Returns a void promise when complete, and to throw errors. + * Data is returned in onComplete, not the return promise, because + * if onComplete is undefined, we parse the data, and emit chunks, but skip + * computing the row view directly. This saves on allocation if the caller + * wants to cache the full chunks, and make their own view of the data from + * the chunks. + * + * @typedef {import('./hyparquet.js').ColumnData} ColumnData + * @typedef {import('./types.js').AsyncBuffer} AsyncBuffer + * @typedef {import('./types.js').FileMetaData} FileMetaData + * @param {object} options read options + * @param {AsyncBuffer} options.file file-like object containing parquet data + * @param {FileMetaData} [options.metadata] parquet file metadata + * @param {number[]} [options.columns] columns to read, all columns if undefined + * @param {number} [options.rowStart] first requested row index (inclusive) + * @param {number} [options.rowEnd] last requested row index (exclusive) + * @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range. + * @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed + * @returns {Promise} resolves when all requested rows and columns are parsed + */ +export async function parquetRead(options) { + // load metadata if not provided + options.metadata ||= await parquetMetadataAsync(options.file) + if (!options.metadata) throw new Error('parquet metadata not found') + + const { metadata, onComplete } = options + /** @type {any[][]} */ + const rowData = [] + const rowStart = options.rowStart || 0 + const rowEnd = options.rowEnd || Number(metadata.num_rows) + + // find which row groups to read + let groupStart = 0 // first row index of the current group + for (const rowGroup of metadata.row_groups) { + // number of rows in this row group + const groupRows = Number(rowGroup.num_rows) + // if row group overlaps with row range, read it + if (groupStart + groupRows >= rowStart && groupStart < rowEnd) { + // read row group + const groupData = await readRowGroup(options, rowGroup) + if (onComplete) { + // filter to rows in range + const start = Math.max(rowStart - groupStart, 0) + const end = Math.min(rowEnd - groupStart, groupRows) + rowData.push(...groupData.slice(start, end)) + } + } + groupStart += groupRows + } + + if (onComplete) onComplete(rowData) +} + +/** + * Read a row group from a file-like object. + * Reads the minimal number of columns to satisfy the request. + * + * @typedef {import('./types.js').RowGroup} RowGroup + * @param {object} options read options + * @param {AsyncBuffer} options.file file-like object containing parquet data + * @param {FileMetaData} [options.metadata] parquet file metadata + * @param {number[]} [options.columns] columns to read, all columns if undefined + * @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range. + * @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed + * @param {RowGroup} rowGroup row group to read + * @returns {Promise} resolves to row data + */ +async function readRowGroup(options, rowGroup) { + const { file, metadata, columns } = options + if (!metadata) throw new Error('parquet metadata not found') + + // loop through metadata to find min/max bytes to read + let [groupStartByte, groupEndByte] = [file.byteLength, 0] + rowGroup.columns.forEach((columnChunk, columnIndex) => { + // skip columns that are not requested or lack metadata + if (columns && !columns.includes(columnIndex)) return + if (!columnChunk.meta_data) return + + const startByte = getColumnOffset(columnChunk.meta_data) + const endByte = startByte + Number(columnChunk.meta_data.total_compressed_size) + groupStartByte = Math.min(groupStartByte, startByte) + groupEndByte = Math.max(groupEndByte, endByte) + }) + if (groupStartByte >= groupEndByte) { + throw new Error('parquet missing row group metadata') + } + // if row group size is less than 128mb, pre-load in one read + let groupBuffer = undefined + if (groupEndByte - groupStartByte <= 1 << 27) { + // pre-load row group byte data in one big read, + // otherwise read column data individually + groupBuffer = offsetArrayBuffer(await file.slice(groupStartByte, groupEndByte), groupStartByte) + } + + /** @type {any[][]} */ + const groupData = [] + const promises = [] + // read column data + for (let columnIndex = 0; columnIndex < rowGroup.columns.length; columnIndex++) { + // skip columns that are not requested + if (columns && !columns.includes(columnIndex)) continue + const columnMetadata = rowGroup.columns[columnIndex].meta_data + if (!columnMetadata) throw new Error('parquet column metadata is undefined') + const columnStartByte = getColumnOffset(columnMetadata) + const columnEndByte = columnStartByte + Number(columnMetadata.total_compressed_size) + const columnBytes = columnEndByte - columnStartByte + // skip columns larger than 1gb + if (columnBytes > 1 << 30) { + console.warn(`parquet skipping huge column "${columnMetadata.path_in_schema}" ${columnBytes.toLocaleString()} bytes`) + continue + } + // use pre-loaded row group byte data if available, else read column data + let buffer + if (!groupBuffer) { + buffer = file.slice(columnStartByte, columnEndByte).then(arrayBuffer => { + return offsetArrayBuffer(arrayBuffer, columnStartByte) + }) + } else { + buffer = Promise.resolve(groupBuffer) + } + // read column data async + promises.push(buffer.then(arrayBuffer => { + // TODO: extract SchemaElement for this column + const columnData = readColumn(arrayBuffer, rowGroup, columnMetadata, metadata.schema) + if (columnData.length !== Number(rowGroup.num_rows)) { + throw new Error('parquet column length does not match row group length') + } + // notify caller of column data + if (options.onChunk) options.onChunk({ column: columnIndex, data: columnData, rowStart: 0, rowEnd: columnData.length }) + // add column data to group data only if onComplete is defined + if (options.onComplete) addColumn(groupData, columnIndex, columnData) + })) + } + await Promise.all(promises) + return groupData +} + +/** + * Add a column to rows. + * + * @param {any[][]} rows rows to add column data to + * @param {number} columnIndex column index to add + * @param {ArrayLike} columnData column data to add + */ +function addColumn(rows, columnIndex, columnData) { + for (let i = 0; i < columnData.length; i++) { + if (!rows[i]) rows[i] = [] + rows[i][columnIndex] = columnData[i] + } +} diff --git a/test/metadata.test.js b/test/metadata.test.js index ff0bd9c..040c73e 100644 --- a/test/metadata.test.js +++ b/test/metadata.test.js @@ -1,6 +1,6 @@ import fs from 'fs' import { describe, expect, it } from 'vitest' -import { parquetMetadata, parquetMetadataAsync } from '../src/metadata.js' +import { parquetMetadata, parquetMetadataAsync } from '../src/hyparquet.js' import { toJson } from '../src/toJson.js' /** @@ -29,13 +29,13 @@ function fileToAsyncBuffer(filePath) { } describe('parquetMetadata', () => { - it('should correctly decode metadata from addrtype-missing-value.parquet', async () => { + it('should parse metadata from addrtype-missing-value.parquet', async () => { const arrayBuffer = await readFileToArrayBuffer('test/files/addrtype-missing-value.parquet') const result = parquetMetadata(arrayBuffer) expect(toJson(result)).toEqual(addrtypeMetadata) }) - it('should correctly decode metadata from rowgroups.parquet', async () => { + it('should parse metadata from rowgroups.parquet', async () => { const arrayBuffer = await readFileToArrayBuffer('test/files/rowgroups.parquet') const result = parquetMetadata(arrayBuffer) expect(toJson(result)).containSubset(rowgroupsMetadata) @@ -63,13 +63,13 @@ describe('parquetMetadata', () => { }) describe('parquetMetadataAsync', () => { - it('should correctly decode metadata from addrtype-missing-value.parquet', async () => { + it('should parse metadata asynchronously from addrtype-missing-value.parquet', async () => { const asyncBuffer = fileToAsyncBuffer('test/files/addrtype-missing-value.parquet') const result = await parquetMetadataAsync(asyncBuffer) expect(toJson(result)).toEqual(addrtypeMetadata) }) - it('should correctly decode metadata from rowgroups.parquet', async () => { + it('should parse metadata asynchronously from rowgroups.parquet', async () => { const asyncBuffer = fileToAsyncBuffer('test/files/rowgroups.parquet') // force two fetches const result = await parquetMetadataAsync(asyncBuffer, 1609) diff --git a/test/read.test.js b/test/read.test.js new file mode 100644 index 0000000..7fb63a6 --- /dev/null +++ b/test/read.test.js @@ -0,0 +1,83 @@ +import fs from 'fs' +import { describe, expect, it } from 'vitest' +import { parquetRead } from '../src/hyparquet.js' +import { toJson } from '../src/toJson.js' + +/** + * Helper function to read .parquet file into ArrayBuffer + * + * @param {string} filePath + * @returns {Promise} + */ +async function readFileToArrayBuffer(filePath) { + const buffer = await fs.promises.readFile(filePath) + return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength) +} + +/** + * Wrap .parquet file in an AsyncBuffer + * + * @typedef {import('../src/types.js').AsyncBuffer} AsyncBuffer + * @param {string} filePath + * @returns {AsyncBuffer} + */ +function fileToAsyncBuffer(filePath) { + return { + byteLength: fs.statSync(filePath).size, + slice: async (start, end) => (await readFileToArrayBuffer(filePath)).slice(start, end), + } +} + +describe('parquetMetadataAsync', () => { + it('should parse data from addrtype-missing-value.parquet', async () => { + const asyncBuffer = fileToAsyncBuffer('test/files/addrtype-missing-value.parquet') + await parquetRead({ + file: asyncBuffer, + onComplete: (rows) => { + expect(toJson(rows)).toEqual(addrtypeData) + }, + }) + }) + + it('should parse data from rowgroups.parquet', async () => { + const asyncBuffer = fileToAsyncBuffer('test/files/rowgroups.parquet') + await parquetRead({ + file: asyncBuffer, + onComplete: (rows) => { + expect(toJson(rows)).toEqual(rowgroupsData) + }, + }) + }) +}) + +// Parquet v1 from DuckDB +const addrtypeData = [ + ['Block'], + ['Intersection'], + ['Block'], + ['Block'], + [undefined], + ['Block'], + ['Intersection'], + ['Block'], + ['Block'], + ['Intersection'], +] + +const rowgroupsData = [ + [1], + [2], + [3], + [4], + [5], + [6], + [7], + [8], + [9], + [10], + [11], + [12], + [13], + [14], + [15], +]