diff --git a/eslint.config.js b/eslint.config.js index 8987d34..c077825 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -53,6 +53,7 @@ export default [ 'no-useless-return': 'error', 'no-var': 'error', 'object-curly-spacing': ['error', 'always'], + 'object-shorthand': 'error', 'prefer-const': 'error', 'prefer-destructuring': ['warn', { object: true, diff --git a/src/column.js b/src/column.js index eee8d51..3e52940 100644 --- a/src/column.js +++ b/src/column.js @@ -3,46 +3,47 @@ import { Encoding, PageType } from './constants.js' import { convertWithDictionary } from './convert.js' import { decompressPage, readDataPage, readDataPageV2 } from './datapage.js' import { readPlain } from './plain.js' +import { isFlatColumn } from './schema.js' import { deserializeTCompactProtocol } from './thrift.js' /** * Parse column data from a buffer. * * @param {DataReader} reader - * @param {number | undefined} rowLimit maximum number of rows to read (undefined reads all rows) + * @param {number} rowGroupStart skip this many rows in the row group + * @param {number} rowGroupEnd read up to this index in the row group (Infinity reads all rows) * @param {ColumnMetaData} columnMetadata column metadata * @param {SchemaTree[]} schemaPath schema path for the column * @param {ParquetReadOptions} options read options * @returns {DecodedArray[]} */ -export function readColumn(reader, rowLimit, columnMetadata, schemaPath, options) { +export function readColumn(reader, rowGroupStart, rowGroupEnd, columnMetadata, schemaPath, options) { const { element } = schemaPath[schemaPath.length - 1] /** @type {DecodedArray[]} */ const chunks = [] /** @type {DecodedArray | undefined} */ let dictionary = undefined - const hasRowLimit = rowLimit !== undefined && rowLimit >= 0 && isFinite(rowLimit) let rowCount = 0 // read dictionary if (hasDictionary(columnMetadata)) { - dictionary = readPage(reader, columnMetadata, schemaPath, element, dictionary, options) + dictionary = readPage(reader, columnMetadata, schemaPath, element, dictionary, 0, options) } - while (!hasRowLimit || rowCount < rowLimit) { + while (rowCount < rowGroupEnd) { if (reader.offset >= reader.view.byteLength - 1) break // end of reader - const values = readPage(reader, columnMetadata, schemaPath, element, dictionary, options) + const values = readPage(reader, columnMetadata, schemaPath, element, dictionary, rowGroupStart - rowCount, options) chunks.push(values) rowCount += values.length } - if (hasRowLimit) { - if (rowCount < rowLimit) { - throw new Error(`parquet row data length ${rowCount} does not match row group limit ${rowLimit}}`) + if (isFinite(rowGroupEnd)) { + if (rowCount < rowGroupEnd) { + throw new Error(`parquet row data length ${rowCount} does not match row group limit ${rowGroupEnd}}`) } - if (rowCount > rowLimit) { + if (rowCount > rowGroupEnd) { // truncate last chunk to row limit const lastChunk = chunks[chunks.length - 1] - chunks[chunks.length - 1] = lastChunk.slice(0, rowLimit - (rowCount - lastChunk.length)) + chunks[chunks.length - 1] = lastChunk.slice(0, rowGroupEnd - (rowCount - lastChunk.length)) } } return chunks @@ -56,10 +57,11 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, options * @param {SchemaTree[]} schemaPath * @param {SchemaElement} element * @param {DecodedArray | undefined} dictionary + * @param {number} pageStart skip this many rows in the page * @param {ParquetReadOptions} options * @returns {DecodedArray} */ -export function readPage(reader, columnMetadata, schemaPath, element, dictionary, { utf8, compressors }) { +export function readPage(reader, columnMetadata, schemaPath, element, dictionary, pageStart, { utf8, compressors }) { const header = parquetHeader(reader) // column header // read compressed_page_size bytes @@ -73,6 +75,11 @@ export function readPage(reader, columnMetadata, schemaPath, element, dictionary const daph = header.data_page_header if (!daph) throw new Error('parquet data page header is undefined') + // skip unnecessary non-nested pages + if (pageStart > daph.num_values && isFlatColumn(schemaPath)) { + return new Array(daph.num_values) // TODO: don't allocate array + } + const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors) const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata) // assert(!daph.statistics?.null_count || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length)) @@ -94,6 +101,11 @@ export function readPage(reader, columnMetadata, schemaPath, element, dictionary const daph2 = header.data_page_header_v2 if (!daph2) throw new Error('parquet data page header v2 is undefined') + // skip unnecessary pages + if (pageStart > daph2.num_rows) { + return new Array(daph2.num_values) // TODO: don't allocate array + } + const { definitionLevels, repetitionLevels, dataPage } = readDataPageV2( compressedBytes, header, schemaPath, columnMetadata, compressors ) diff --git a/src/read.js b/src/read.js index ccb3bdc..3fbbbf6 100644 --- a/src/read.js +++ b/src/read.js @@ -41,8 +41,7 @@ export async function parquetRead(options) { // if row group overlaps with row range, read it if (groupStart + groupRows >= rowStart && (rowEnd === undefined || groupStart < rowEnd)) { // read row group - const rowLimit = rowEnd && rowEnd - groupStart - const groupData = await readRowGroup(options, rowGroup, groupStart, rowLimit) + const groupData = await readRowGroup(options, rowGroup, groupStart) if (onComplete) { // filter to rows in range const start = Math.max(rowStart - groupStart, 0) @@ -62,13 +61,14 @@ export async function parquetRead(options) { * @param {ParquetReadOptions} options read options * @param {RowGroup} rowGroup row group to read * @param {number} groupStart row index of the first row in the group - * @param {number} [rowLimit] max rows to read from this group * @returns {Promise} resolves to row data */ -export async function readRowGroup(options, rowGroup, groupStart, rowLimit) { - const { file, metadata, columns } = options +export async function readRowGroup(options, rowGroup, groupStart) { + const { file, metadata, columns, rowStart, rowEnd } = options if (!metadata) throw new Error('parquet metadata not found') - if (rowLimit === undefined || rowLimit > rowGroup.num_rows) rowLimit = Number(rowGroup.num_rows) + const numRows = Number(rowGroup.num_rows) + const rowGroupStart = Math.max((rowStart || 0) - groupStart, 0) + const rowGroupEnd = rowEnd === undefined ? numRows : Math.min(rowEnd - groupStart, numRows) // loop through metadata to find min/max bytes to read let [groupStartByte, groupEndByte] = [file.byteLength, 0] @@ -134,7 +134,7 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) { promises.push(buffer.then(arrayBuffer => { const schemaPath = getSchemaPath(metadata.schema, columnMetadata.path_in_schema) const reader = { view: new DataView(arrayBuffer), offset: bufferOffset } - const columnData = readColumn(reader, rowLimit, columnMetadata, schemaPath, options) + const columnData = readColumn(reader, rowGroupStart, rowGroupEnd, columnMetadata, schemaPath, options) /** @type {DecodedArray[] | undefined} */ let chunks = columnData @@ -164,7 +164,7 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) { columnName, columnData: chunk, rowStart: groupStart, - rowEnd: groupStart + rowLimit, + rowEnd: groupStart + chunk.length, }) } })) @@ -179,8 +179,8 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) { .map(name => includedColumnNames.includes(name) ? flatten(subcolumnData.get(name)) : undefined) // transpose columns into rows - const groupData = new Array(rowLimit) - for (let row = 0; row < rowLimit; row++) { + const groupData = new Array(rowGroupEnd) + for (let row = rowGroupStart; row < rowGroupEnd; row++) { if (options.rowFormat === 'object') { // return each row as an object /** @type {Record} */ diff --git a/src/schema.js b/src/schema.js index 2917e5b..a63199d 100644 --- a/src/schema.js +++ b/src/schema.js @@ -117,3 +117,17 @@ export function isMapLike(schema) { return true } + +/** + * Returns true if a column is non-nested. + * + * @param {SchemaTree[]} schemaPath + * @returns {boolean} + */ +export function isFlatColumn(schemaPath) { + if (schemaPath.length !== 2) return false + const [, column] = schemaPath + if (column.element.repetition_type === 'REPEATED') return false + if (column.children.length) return false + return true +} diff --git a/test/column.test.js b/test/column.test.js index 6e8eded..62967a4 100644 --- a/test/column.test.js +++ b/test/column.test.js @@ -9,11 +9,10 @@ const values = [null, 1, -2, NaN, 0, -1, -0, 2] describe('readColumn', () => { it.for([ - { rowLimit: undefined, expected: [values] }, - { rowLimit: Infinity, expected: [values] }, - { rowLimit: 2, expected: [values.slice(0, 2)] }, - { rowLimit: 0, expected: [] }, - ])('readColumn with rowLimit %p', async ({ rowLimit, expected }) => { + { rowGroupEnd: Infinity, expected: [values] }, + { rowGroupEnd: 2, expected: [values.slice(0, 2)] }, + { rowGroupEnd: 0, expected: [] }, + ])('readColumn with rowGroupEnd %p', async ({ rowGroupEnd, expected }) => { const testFile = 'test/files/float16_nonzeros_and_nans.parquet' const asyncBuffer = await asyncBufferFromFile(testFile) const arrayBuffer = await asyncBuffer.slice(0) @@ -26,7 +25,7 @@ describe('readColumn', () => { const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? []) const reader = { view: new DataView(columnArrayBuffer), offset: 0 } - const result = readColumn(reader, rowLimit, column.meta_data, schemaPath, { file: asyncBuffer, compressors }) + const result = readColumn(reader, 0, rowGroupEnd, column.meta_data, schemaPath, { file: asyncBuffer, compressors }) expect(result).toEqual(expected) }) @@ -43,7 +42,7 @@ describe('readColumn', () => { const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? []) const reader = { view: new DataView(columnArrayBuffer), offset: 0 } - const columnData = readColumn(reader, Infinity, column.meta_data, schemaPath, { file: asyncBuffer, compressors }) + const columnData = readColumn(reader, 0, Infinity, column.meta_data, schemaPath, { file: asyncBuffer, compressors }) expect(columnData[0]).toBeInstanceOf(Int32Array) }) }) diff --git a/test/read.test.js b/test/read.test.js index cea3d85..2991a63 100644 --- a/test/read.test.js +++ b/test/read.test.js @@ -179,6 +179,6 @@ describe('parquetRead', () => { rowEnd: 91, }) expect(rows).toEqual([{ col: 'bad' }]) - expect(convertWithDictionary).toHaveBeenCalledTimes(10) + expect(convertWithDictionary).toHaveBeenCalledTimes(2) }) })