diff --git a/src/column.js b/src/column.js index 3666ee2..3b0b02a 100644 --- a/src/column.js +++ b/src/column.js @@ -1,7 +1,8 @@ import { assembleLists } from './assemble.js' import { Encoding, PageType } from './constants.js' import { convertWithDictionary } from './convert.js' -import { decompressPage, readDataPage, readDataPageV2, readDictionaryPage } from './datapage.js' +import { decompressPage, readDataPage, readDataPageV2 } from './datapage.js' +import { readPlain } from './plain.js' import { getMaxDefinitionLevel } from './schema.js' import { deserializeTCompactProtocol } from './thrift.js' @@ -15,7 +16,7 @@ import { deserializeTCompactProtocol } from './thrift.js' * @param {ParquetReadOptions} options read options * @returns {DecodedArray[]} */ -export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compressors, utf8 }) { +export function readColumn(reader, rowLimit, columnMetadata, schemaPath, options) { const { element } = schemaPath[schemaPath.length - 1] /** @type {DecodedArray[]} */ const chunks = [] @@ -24,78 +25,16 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr const hasRowLimit = rowLimit !== undefined && rowLimit >= 0 && isFinite(rowLimit) let rowCount = 0 + // read dictionary + if (hasDictionary(columnMetadata)) { + dictionary = readPage(reader, columnMetadata, schemaPath, element, dictionary, options) + } + while (!hasRowLimit || rowCount < rowLimit) { if (reader.offset >= reader.view.byteLength - 1) break // end of reader - const header = parquetHeader(reader) // column header - - // read compressed_page_size bytes starting at offset - const compressedBytes = new Uint8Array( - reader.view.buffer, reader.view.byteOffset + reader.offset, header.compressed_page_size - ) - - // parse page data by type - if (header.type === 'DATA_PAGE') { - const daph = header.data_page_header - if (!daph) throw new Error('parquet data page header is undefined') - - const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors) - const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata) - // assert(!daph.statistics?.null_count || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length)) - - // convert types, dereference dictionary, and assemble lists - let values = convertWithDictionary(dataPage, dictionary, element, daph.encoding, utf8) - if (repetitionLevels.length || definitionLevels?.length) { - const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) - const repetitionPath = schemaPath.map(({ element }) => element.repetition_type) - const chunk = assembleLists( - [], definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel - ) - chunks.push(chunk) - rowCount += chunk.length - } else { - // wrap nested flat data by depth - for (let i = 2; i < schemaPath.length; i++) { - if (schemaPath[i].element.repetition_type !== 'REQUIRED') { - values = Array.from(values, e => [e]) - } - } - chunks.push(values) - rowCount += values.length - } - } else if (header.type === 'DATA_PAGE_V2') { - const daph2 = header.data_page_header_v2 - if (!daph2) throw new Error('parquet data page header v2 is undefined') - - const { definitionLevels, repetitionLevels, dataPage } = readDataPageV2( - compressedBytes, header, schemaPath, columnMetadata, compressors - ) - - // convert types, dereference dictionary, and assemble lists - const values = convertWithDictionary(dataPage, dictionary, element, daph2.encoding, utf8) - if (repetitionLevels.length || definitionLevels?.length) { - const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) - const repetitionPath = schemaPath.map(({ element }) => element.repetition_type) - const chunk = assembleLists( - [], definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel - ) - chunks.push(chunk) - rowCount += chunk.length - } else { - chunks.push(values) - rowCount += values.length - } - } else if (header.type === 'DICTIONARY_PAGE') { - const diph = header.dictionary_page_header - if (!diph) throw new Error('parquet dictionary page header is undefined') - - const page = decompressPage( - compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors - ) - dictionary = readDictionaryPage(page, diph, columnMetadata, element.type_length) - } else { - throw new Error(`parquet unsupported page type: ${header.type}`) - } - reader.offset += header.compressed_page_size + const values = readPage(reader, columnMetadata, schemaPath, element, dictionary, options) + chunks.push(values) + rowCount += values.length } if (hasRowLimit) { if (rowCount < rowLimit) { @@ -110,6 +49,90 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr return chunks } +/** + * Read a page (data or dictionary) from a buffer. + * + * @param {DataReader} reader + * @param {ColumnMetaData} columnMetadata + * @param {SchemaTree[]} schemaPath + * @param {SchemaElement} element + * @param {DecodedArray | undefined} dictionary + * @param {ParquetReadOptions} options + * @returns {DecodedArray} + */ +export function readPage(reader, columnMetadata, schemaPath, element, dictionary, { utf8, compressors }) { + const header = parquetHeader(reader) // column header + + // read compressed_page_size bytes + const compressedBytes = new Uint8Array( + reader.view.buffer, reader.view.byteOffset + reader.offset, header.compressed_page_size + ) + reader.offset += header.compressed_page_size + + // parse page data by type + if (header.type === 'DATA_PAGE') { + const daph = header.data_page_header + if (!daph) throw new Error('parquet data page header is undefined') + + const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors) + const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata) + // assert(!daph.statistics?.null_count || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length)) + + // convert types, dereference dictionary, and assemble lists + let values = convertWithDictionary(dataPage, dictionary, element, daph.encoding, utf8) + if (repetitionLevels.length || definitionLevels?.length) { + const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) + const repetitionPath = schemaPath.map(({ element }) => element.repetition_type) + return assembleLists([], definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel) + } else { + // wrap nested flat data by depth + for (let i = 2; i < schemaPath.length; i++) { + if (schemaPath[i].element.repetition_type !== 'REQUIRED') { + values = Array.from(values, e => [e]) + } + } + return values + } + } else if (header.type === 'DATA_PAGE_V2') { + const daph2 = header.data_page_header_v2 + if (!daph2) throw new Error('parquet data page header v2 is undefined') + + const { definitionLevels, repetitionLevels, dataPage } = readDataPageV2( + compressedBytes, header, schemaPath, columnMetadata, compressors + ) + + // convert types, dereference dictionary, and assemble lists + const values = convertWithDictionary(dataPage, dictionary, element, daph2.encoding, utf8) + if (repetitionLevels.length || definitionLevels?.length) { + const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) + const repetitionPath = schemaPath.map(({ element }) => element.repetition_type) + return assembleLists([], definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel) + } else { + return values + } + } else if (header.type === 'DICTIONARY_PAGE') { + const diph = header.dictionary_page_header + if (!diph) throw new Error('parquet dictionary page header is undefined') + + const page = decompressPage( + compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors + ) + + const reader = { view: new DataView(page.buffer, page.byteOffset, page.byteLength), offset: 0 } + return readPlain(reader, columnMetadata.type, diph.num_values, element.type_length) + } else { + throw new Error(`parquet unsupported page type: ${header.type}`) + } +} + +/** + * @param {ColumnMetaData} columnMetadata + * @returns {boolean} + */ +function hasDictionary(columnMetadata) { + return columnMetadata.encodings.some(e => e.endsWith('_DICTIONARY')) +} + /** * Find the start byte offset for a column chunk. * @@ -117,17 +140,14 @@ export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compr * @returns {[bigint, bigint]} byte offset range */ export function getColumnRange({ dictionary_page_offset, data_page_offset, total_compressed_size }) { - let columnOffset = dictionary_page_offset - if (!columnOffset || data_page_offset < columnOffset) { - columnOffset = data_page_offset - } + const columnOffset = dictionary_page_offset || data_page_offset return [columnOffset, columnOffset + total_compressed_size] } /** * Read parquet header from a buffer. * - * @import {ColumnMetaData, DecodedArray, DataReader, PageHeader, ParquetReadOptions, SchemaTree} from '../src/types.d.ts' + * @import {ColumnMetaData, DecodedArray, DataReader, PageHeader, ParquetReadOptions, SchemaElement, SchemaTree} from '../src/types.d.ts' * @param {DataReader} reader * @returns {PageHeader} */ diff --git a/src/datapage.js b/src/datapage.js index b29effe..af04498 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -59,20 +59,7 @@ export function readDataPage(bytes, daph, schemaPath, { type }) { } /** - * @param {Uint8Array} bytes raw page data - * @param {DictionaryPageHeader} diph dictionary page header - * @param {ColumnMetaData} columnMetadata - * @param {number | undefined} typeLength - type_length from schema - * @returns {DecodedArray} - */ -export function readDictionaryPage(bytes, diph, columnMetadata, typeLength) { - const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength) - const reader = { view, offset: 0 } - return readPlain(reader, columnMetadata.type, diph.num_values, typeLength) -} - -/** - * @import {ColumnMetaData, CompressionCodec, Compressors, DataPage, DataPageHeader, DataPageHeaderV2, DataReader, DecodedArray, DictionaryPageHeader, PageHeader, SchemaTree} from '../src/types.d.ts' + * @import {ColumnMetaData, CompressionCodec, Compressors, DataPage, DataPageHeader, DataPageHeaderV2, DataReader, DecodedArray, PageHeader, SchemaTree} from '../src/types.d.ts' * @param {DataReader} reader data view for the page * @param {DataPageHeader} daph data page header * @param {SchemaTree[]} schemaPath diff --git a/test/files/dict-page-offset-zero.column_indexes.json b/test/files/dict-page-offset-zero.column_indexes.json deleted file mode 100644 index 9cbd1e7..0000000 --- a/test/files/dict-page-offset-zero.column_indexes.json +++ /dev/null @@ -1,19 +0,0 @@ -[ - [ - { - "null_pages": [ - false - ], - "min_values": [ - 1552 - ], - "max_values": [ - 1552 - ], - "boundary_order": "ASCENDING", - "null_counts": [ - 0 - ] - } - ] -] \ No newline at end of file diff --git a/test/files/dict-page-offset-zero.json b/test/files/dict-page-offset-zero.json deleted file mode 100644 index 5e5d65e..0000000 --- a/test/files/dict-page-offset-zero.json +++ /dev/null @@ -1,41 +0,0 @@ -[ - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552], - [1552] -] diff --git a/test/files/dict-page-offset-zero.metadata.json b/test/files/dict-page-offset-zero.metadata.json deleted file mode 100644 index 343d067..0000000 --- a/test/files/dict-page-offset-zero.metadata.json +++ /dev/null @@ -1,93 +0,0 @@ -{ - "version": 1, - "schema": [ - { - "name": "root", - "num_children": 1 - }, - { - "type": "INT32", - "repetition_type": "OPTIONAL", - "name": "l_partkey" - } - ], - "num_rows": 39, - "row_groups": [ - { - "columns": [ - { - "file_offset": 4, - "meta_data": { - "type": "INT32", - "encodings": [ - "PLAIN", - "BIT_PACKED", - "RLE" - ], - "path_in_schema": [ - "l_partkey" - ], - "codec": "SNAPPY", - "num_values": 39, - "total_uncompressed_size": 180, - "total_compressed_size": 40, - "data_page_offset": 4, - "dictionary_page_offset": 0, - "statistics": { - "max": 1552, - "min": 1552, - "null_count": 0, - "max_value": 1552, - "min_value": 1552 - }, - "encoding_stats": [ - { - "page_type": "DATA_PAGE", - "encoding": "PLAIN", - "count": 1 - } - ], - "bloom_filter_length": [ - { - "field_1": { - "field_1": 0, - "field_2": 162, - "field_3": 22, - "field_5": { - "field_1": 39, - "field_2": 0, - "field_3": 3, - "field_4": 4 - } - }, - "field_2": 22 - } - ] - }, - "offset_index_offset": 67, - "offset_index_length": 10, - "column_index_offset": 44, - "column_index_length": 23 - } - ], - "total_byte_size": 180, - "num_rows": 39 - } - ], - "key_value_metadata": [ - { - "key": "is.date.correct", - "value": "true" - }, - { - "key": "dremio.arrow.schema.2.1", - "value": "{\n \"fields\" : [ {\n \"name\" : \"l_partkey\",\n \"nullable\" : true,\n \"type\" : {\n \"name\" : \"int\",\n \"bitWidth\" : 32,\n \"isSigned\" : true\n },\n \"children\" : [ ]\n } ]\n}" - }, - { - "key": "dremio.version", - "value": "3.2.0-201905102005330382-0598733" - } - ], - "created_by": "parquet-mr version 1.12.0-201812210311360288-a86293f (build cec1a483e9dcd545e09170ae787d3dcb13744433)", - "metadata_length": 550 -} diff --git a/test/files/dict-page-offset-zero.offset_indexes.json b/test/files/dict-page-offset-zero.offset_indexes.json deleted file mode 100644 index fadc4ee..0000000 --- a/test/files/dict-page-offset-zero.offset_indexes.json +++ /dev/null @@ -1,13 +0,0 @@ -[ - [ - { - "page_locations": [ - { - "offset": 4, - "compressed_page_size": 40, - "first_row_index": 0 - } - ] - } - ] -] \ No newline at end of file diff --git a/test/files/dict-page-offset-zero.parquet b/test/files/dict-page-offset-zero.parquet deleted file mode 100644 index f9dbd7f..0000000 Binary files a/test/files/dict-page-offset-zero.parquet and /dev/null differ