diff --git a/src/column.js b/src/column.js index be15467..68169f9 100644 --- a/src/column.js +++ b/src/column.js @@ -12,13 +12,11 @@ import { deserializeTCompactProtocol } from './thrift.js' * @param {DataReader} reader * @param {number} rowGroupStart skip this many rows in the row group * @param {number} rowGroupEnd read up to this index in the row group (Infinity reads all rows) - * @param {ColumnMetaData} columnMetadata column metadata - * @param {SchemaTree[]} schemaPath schema path for the column - * @param {ParquetReadOptions} options read options + * @param {ColumnDecoder} columnDecoder column decoder params * @returns {DecodedArray[]} */ -export function readColumn(reader, rowGroupStart, rowGroupEnd, columnMetadata, schemaPath, options) { - const { element } = schemaPath[schemaPath.length - 1] +export function readColumn(reader, rowGroupStart, rowGroupEnd, columnDecoder) { + const { element, utf8 } = columnDecoder /** @type {DecodedArray[]} */ const chunks = [] /** @type {DecodedArray | undefined} */ @@ -32,12 +30,12 @@ export function readColumn(reader, rowGroupStart, rowGroupEnd, columnMetadata, s const header = parquetHeader(reader) if (header.type === 'DICTIONARY_PAGE') { // assert(!dictionary) - dictionary = readPage(reader, header, columnMetadata, schemaPath, element, dictionary, undefined, 0, options) - dictionary = convert(dictionary, element, options.utf8) + dictionary = readPage(reader, header, columnDecoder, dictionary, undefined, 0) + dictionary = convert(dictionary, element, utf8) } else { const lastChunk = chunks.at(-1) const lastChunkLength = lastChunk?.length || 0 - const values = readPage(reader, header, columnMetadata, schemaPath, element, dictionary, lastChunk, rowGroupStart - rowCount, options) + const values = readPage(reader, header, columnDecoder, dictionary, lastChunk, rowGroupStart - rowCount) if (lastChunk === values) { // continued from previous page rowCount += values.length - lastChunkLength @@ -65,16 +63,14 @@ export function readColumn(reader, rowGroupStart, rowGroupEnd, columnMetadata, s * * @param {DataReader} reader * @param {PageHeader} header - * @param {ColumnMetaData} columnMetadata - * @param {SchemaTree[]} schemaPath - * @param {SchemaElement} element + * @param {ColumnDecoder} columnDecoder * @param {DecodedArray | undefined} dictionary * @param {DecodedArray | undefined} previousChunk * @param {number} pageStart skip this many rows in the page - * @param {ParquetReadOptions} options * @returns {DecodedArray} */ -export function readPage(reader, header, columnMetadata, schemaPath, element, dictionary, previousChunk, pageStart, { utf8, compressors }) { +export function readPage(reader, header, columnDecoder, dictionary, previousChunk, pageStart) { + const { type, element, schemaPath, codec, compressors, utf8 } = columnDecoder // read compressed_page_size bytes const compressedBytes = new Uint8Array( reader.view.buffer, reader.view.byteOffset + reader.offset, header.compressed_page_size @@ -91,8 +87,8 @@ export function readPage(reader, header, columnMetadata, schemaPath, element, di return new Array(daph.num_values) // TODO: don't allocate array } - const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors) - const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata) + const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), codec, compressors) + const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, columnDecoder) // assert(!daph.statistics?.null_count || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length)) // convert types, dereference dictionary, and assemble lists @@ -118,9 +114,8 @@ export function readPage(reader, header, columnMetadata, schemaPath, element, di return new Array(daph2.num_values) // TODO: don't allocate array } - const { definitionLevels, repetitionLevels, dataPage } = readDataPageV2( - compressedBytes, header, schemaPath, columnMetadata, compressors - ) + const { definitionLevels, repetitionLevels, dataPage } = + readDataPageV2(compressedBytes, header, columnDecoder) // convert types, dereference dictionary, and assemble lists const values = convertWithDictionary(dataPage, dictionary, element, daph2.encoding, utf8) @@ -131,11 +126,11 @@ export function readPage(reader, header, columnMetadata, schemaPath, element, di if (!diph) throw new Error('parquet dictionary page header is undefined') const page = decompressPage( - compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors + compressedBytes, Number(header.uncompressed_page_size), codec, compressors ) const reader = { view: new DataView(page.buffer, page.byteOffset, page.byteLength), offset: 0 } - return readPlain(reader, columnMetadata.type, diph.num_values, element.type_length) + return readPlain(reader, type, diph.num_values, element.type_length) } else { throw new Error(`parquet unsupported page type: ${header.type}`) } @@ -155,7 +150,7 @@ export function getColumnRange({ dictionary_page_offset, data_page_offset, total /** * Read parquet header from a buffer. * - * @import {ColumnMetaData, DecodedArray, DataReader, PageHeader, ParquetReadOptions, SchemaElement, SchemaTree} from '../src/types.d.ts' + * @import {ColumnMetaData, DecodedArray, DataReader, PageHeader, ColumnDecoder} from '../src/types.d.ts' * @param {DataReader} reader * @returns {PageHeader} */ diff --git a/src/datapage.js b/src/datapage.js index af04498..d0cc989 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -9,11 +9,10 @@ import { snappyUncompress } from './snappy.js' * * @param {Uint8Array} bytes raw page data (should already be decompressed) * @param {DataPageHeader} daph data page header - * @param {SchemaTree[]} schemaPath - * @param {ColumnMetaData} columnMetadata + * @param {ColumnDecoder} columnDecoder * @returns {DataPage} definition levels, repetition levels, and array of values */ -export function readDataPage(bytes, daph, schemaPath, { type }) { +export function readDataPage(bytes, daph, { type, element, schemaPath }) { const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength) const reader = { view, offset: 0 } /** @type {DecodedArray} */ @@ -28,8 +27,7 @@ export function readDataPage(bytes, daph, schemaPath, { type }) { // read values based on encoding const nValues = daph.num_values - numNulls if (daph.encoding === 'PLAIN') { - const { type_length } = schemaPath[schemaPath.length - 1].element - dataPage = readPlain(reader, type, nValues, type_length) + dataPage = readPlain(reader, type, nValues, element.type_length) } else if ( daph.encoding === 'PLAIN_DICTIONARY' || daph.encoding === 'RLE_DICTIONARY' || @@ -49,8 +47,7 @@ export function readDataPage(bytes, daph, schemaPath, { type }) { dataPage = new Uint8Array(nValues) // nValue zeroes } } else if (daph.encoding === 'BYTE_STREAM_SPLIT') { - const { type_length } = schemaPath[schemaPath.length - 1].element - dataPage = byteStreamSplit(reader, nValues, type, type_length) + dataPage = byteStreamSplit(reader, nValues, type, element.type_length) } else { throw new Error(`parquet unsupported encoding: ${daph.encoding}`) } @@ -59,7 +56,7 @@ export function readDataPage(bytes, daph, schemaPath, { type }) { } /** - * @import {ColumnMetaData, CompressionCodec, Compressors, DataPage, DataPageHeader, DataPageHeaderV2, DataReader, DecodedArray, PageHeader, SchemaTree} from '../src/types.d.ts' + * @import {ColumnDecoder, CompressionCodec, Compressors, DataPage, DataPageHeader, DataPageHeaderV2, DataReader, DecodedArray, PageHeader, SchemaTree} from '../src/types.d.ts' * @param {DataReader} reader data view for the page * @param {DataPageHeader} daph data page header * @param {SchemaTree[]} schemaPath @@ -133,15 +130,13 @@ export function decompressPage(compressedBytes, uncompressed_page_size, codec, c * * @param {Uint8Array} compressedBytes raw page data * @param {PageHeader} ph page header - * @param {SchemaTree[]} schemaPath - * @param {ColumnMetaData} columnMetadata - * @param {Compressors | undefined} compressors + * @param {ColumnDecoder} columnDecoder * @returns {DataPage} definition levels, repetition levels, and array of values */ -export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, compressors) { +export function readDataPageV2(compressedBytes, ph, columnDecoder) { const view = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength) const reader = { view, offset: 0 } - const { codec, type } = columnMetadata + const { type, element, schemaPath, codec, compressors } = columnDecoder const daph2 = ph.data_page_header_v2 if (!daph2) throw new Error('parquet data page header v2 is undefined') @@ -167,10 +162,9 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, let dataPage const nValues = daph2.num_values - daph2.num_nulls if (daph2.encoding === 'PLAIN') { - const { type_length } = schemaPath[schemaPath.length - 1].element - dataPage = readPlain(pageReader, type, nValues, type_length) + dataPage = readPlain(pageReader, type, nValues, element.type_length) } else if (daph2.encoding === 'RLE') { - // assert(columnMetadata.type === 'BOOLEAN') + // assert(type === 'BOOLEAN') dataPage = new Array(nValues) readRleBitPackedHybrid(pageReader, 1, 0, dataPage) dataPage = dataPage.map(x => !!x) @@ -192,8 +186,7 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, dataPage = new Array(nValues) deltaByteArray(pageReader, nValues, dataPage) } else if (daph2.encoding === 'BYTE_STREAM_SPLIT') { - const { type_length } = schemaPath[schemaPath.length - 1].element - dataPage = byteStreamSplit(reader, nValues, type, type_length) + dataPage = byteStreamSplit(reader, nValues, type, element.type_length) } else { throw new Error(`parquet unsupported encoding: ${daph2.encoding}`) } diff --git a/src/read.js b/src/read.js index 3fbbbf6..0505b3e 100644 --- a/src/read.js +++ b/src/read.js @@ -134,7 +134,16 @@ export async function readRowGroup(options, rowGroup, groupStart) { promises.push(buffer.then(arrayBuffer => { const schemaPath = getSchemaPath(metadata.schema, columnMetadata.path_in_schema) const reader = { view: new DataView(arrayBuffer), offset: bufferOffset } - const columnData = readColumn(reader, rowGroupStart, rowGroupEnd, columnMetadata, schemaPath, options) + const columnDecoder = { + columnName: columnMetadata.path_in_schema.join('.'), + type: columnMetadata.type, + element: schemaPath[schemaPath.length - 1].element, + schemaPath, + codec: columnMetadata.codec, + compressors: options.compressors, + utf8: options.utf8, + } + const columnData = readColumn(reader, rowGroupStart, rowGroupEnd, columnDecoder) /** @type {DecodedArray[] | undefined} */ let chunks = columnData diff --git a/src/types.d.ts b/src/types.d.ts index 60c8cee..23b1339 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -376,3 +376,13 @@ export type BoundaryOrder = 'UNORDERED' | 'ASCENDING' | 'DESCENDING' export type ThriftObject = { [ key: `field_${number}` ]: ThriftType } export type ThriftType = boolean | number | bigint | Uint8Array | ThriftType[] | ThriftObject + +export interface ColumnDecoder { + columnName: string + type: ParquetType + element: SchemaElement + schemaPath: SchemaTree[] + codec: CompressionCodec + compressors?: Compressors + utf8?: boolean +} diff --git a/test/column.test.js b/test/column.test.js index b5bb3cd..b6e4ed3 100644 --- a/test/column.test.js +++ b/test/column.test.js @@ -23,8 +23,15 @@ describe('readColumn', () => { const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte) const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? []) const reader = { view: new DataView(columnArrayBuffer), offset: 0 } + const columnDecoder = { + columnName: column.meta_data.path_in_schema.join('.'), + type: column.meta_data.type, + element: schemaPath[schemaPath.length - 1].element, + schemaPath, + codec: column.meta_data.codec, + } - const result = readColumn(reader, 0, rowGroupEnd, column.meta_data, schemaPath, { file }) + const result = readColumn(reader, 0, rowGroupEnd, columnDecoder) expect(result).toEqual(expected) }) @@ -40,8 +47,15 @@ describe('readColumn', () => { const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte) const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? []) const reader = { view: new DataView(columnArrayBuffer), offset: 0 } + const columnDecoder = { + columnName: column.meta_data.path_in_schema.join('.'), + type: column.meta_data.type, + element: schemaPath[schemaPath.length - 1].element, + schemaPath, + codec: column.meta_data.codec, + } - const columnData = readColumn(reader, 0, Infinity, column.meta_data, schemaPath, { file }) + const columnData = readColumn(reader, 0, Infinity, columnDecoder) expect(columnData[0]).toBeInstanceOf(Int32Array) }) })