diff --git a/src/column.js b/src/column.js index a71d8d7..dfa2439 100644 --- a/src/column.js +++ b/src/column.js @@ -3,12 +3,12 @@ import { convert } from './convert.js' import { readDataPage, readDictionaryPage } from './datapage.js' import { readDataPageV2 } from './datapageV2.js' import { parquetHeader } from './header.js' -import { getMaxDefinitionLevel, getMaxRepetitionLevel, getSchemaPath, isRequired } from './schema.js' +import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired } from './schema.js' import { snappyUncompress } from './snappy.js' import { concat } from './utils.js' /** - * @typedef {import('./types.js').SchemaElement} SchemaElement + * @typedef {import('./types.js').SchemaTree} SchemaTree * @typedef {import('./types.js').ColumnMetaData} ColumnMetaData * @typedef {import('./types.js').Compressors} Compressors * @typedef {import('./types.js').RowGroup} RowGroup @@ -21,20 +21,18 @@ import { concat } from './utils.js' * @param {number} columnOffset offset to start reading from * @param {RowGroup} rowGroup row group metadata * @param {ColumnMetaData} columnMetadata column metadata - * @param {SchemaElement[]} schema schema for the file + * @param {SchemaTree[]} schemaPath schema path for the column * @param {Compressors} [compressors] custom decompressors * @returns {ArrayLike} array of values */ -export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schema, compressors) { +export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schemaPath, compressors) { /** @type {ArrayLike | undefined} */ let dictionary = undefined let valuesSeen = 0 let byteOffset = 0 // byteOffset within the column /** @type {any[]} */ const rowData = [] - - const schemaPath = getSchemaPath(schema, columnMetadata.path_in_schema) - const schemaElement = schemaPath[schemaPath.length - 1].element + const { element } = schemaPath[schemaPath.length - 1] while (valuesSeen < rowGroup.num_rows) { // parse column header @@ -73,7 +71,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) // convert primitive types to rich types - values = convert(dataPage, schemaElement) + values = convert(dataPage, element) values = assembleObjects( definitionLevels, repetitionLevels, values, isNullable, maxDefinitionLevel, maxRepetitionLevel ) @@ -85,10 +83,10 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, } else { if (dictionaryEncoding && dictionary) { dereferenceDictionary(dictionary, dataPage) - values = convert(dataPage, schemaElement) + values = convert(dataPage, element) } else if (Array.isArray(dataPage)) { // convert primitive types to rich types - values = convert(dataPage, schemaElement) + values = convert(dataPage, element) } else { values = dataPage // TODO: data page shouldn't be a fixed byte array? } @@ -161,7 +159,7 @@ function dereferenceDictionary(dictionary, dataPage) { /** * Find the start byte offset for a column chunk. * - * @param {ColumnMetaData} columnMetadata column metadata + * @param {ColumnMetaData} columnMetadata * @returns {number} byte offset */ export function getColumnOffset(columnMetadata) { diff --git a/src/datapage.js b/src/datapage.js index 6fd8a09..7cca676 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -13,8 +13,8 @@ const skipNulls = false // TODO * @typedef {import("./types.d.ts").SchemaTree} SchemaTree * @param {Uint8Array} bytes raw page data (should already be decompressed) * @param {DataPageHeader} daph data page header - * @param {SchemaTree[]} schemaPath schema for the file - * @param {ColumnMetaData} columnMetadata metadata for the column + * @param {SchemaTree[]} schemaPath + * @param {ColumnMetaData} columnMetadata * @returns {DataPage} definition levels, repetition levels, and array of values */ export function readDataPage(bytes, daph, schemaPath, columnMetadata) { @@ -83,7 +83,7 @@ export function readDataPage(bytes, daph, schemaPath, columnMetadata) { * @typedef {import("./types.d.ts").DictionaryPageHeader} DictionaryPageHeader * @param {Uint8Array} bytes raw page data * @param {DictionaryPageHeader} diph dictionary page header - * @param {ColumnMetaData} columnMetadata metadata for the column + * @param {ColumnMetaData} columnMetadata * @returns {ArrayLike} array of values */ export function readDictionaryPage(bytes, diph, columnMetadata) { @@ -98,7 +98,7 @@ export function readDictionaryPage(bytes, diph, columnMetadata) { * @typedef {import("./types.d.ts").DataReader} DataReader * @param {DataReader} reader data view for the page * @param {DataPageHeader} daph data page header - * @param {SchemaTree[]} schemaPath schema path for the column + * @param {SchemaTree[]} schemaPath * @returns {any[]} repetition levels and number of bytes read */ function readRepetitionLevels(reader, daph, schemaPath) { @@ -119,7 +119,7 @@ function readRepetitionLevels(reader, daph, schemaPath) { * * @param {DataReader} reader data view for the page * @param {DataPageHeader} daph data page header - * @param {SchemaTree[]} schemaPath schema path for the column + * @param {SchemaTree[]} schemaPath * @returns {DefinitionLevels} definition levels and number of bytes read */ function readDefinitionLevels(reader, daph, schemaPath) { diff --git a/src/datapageV2.js b/src/datapageV2.js index cf69579..84f5357 100644 --- a/src/datapageV2.js +++ b/src/datapageV2.js @@ -13,8 +13,8 @@ import { readVarInt, readZigZag } from './thrift.js' * @typedef {import("./types.d.ts").SchemaTree} SchemaTree * @param {Uint8Array} compressedBytes raw page data (should already be decompressed) * @param {import("./types.d.ts").PageHeader} ph page header - * @param {SchemaTree[]} schemaPath schema path for the column - * @param {ColumnMetaData} columnMetadata metadata for the column + * @param {SchemaTree[]} schemaPath + * @param {ColumnMetaData} columnMetadata * @param {Compressors | undefined} compressors * @returns {DataPage} definition levels, repetition levels, and array of values */ @@ -99,7 +99,7 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, * @typedef {import("./types.d.ts").DataReader} DataReader * @param {DataReader} reader data view for the page * @param {DataPageHeaderV2} daph2 data page header - * @param {SchemaTree[]} schemaPath schema path for the column + * @param {SchemaTree[]} schemaPath * @returns {any[]} repetition levels and number of bytes read */ export function readRepetitionLevelsV2(reader, daph2, schemaPath) { @@ -118,7 +118,7 @@ export function readRepetitionLevelsV2(reader, daph2, schemaPath) { * * @param {DataReader} reader data view for the page * @param {DataPageHeaderV2} daph2 data page header v2 - * @param {number} maxDefinitionLevel maximum definition level for this column + * @param {number} maxDefinitionLevel * @returns {number[] | undefined} definition levels and number of bytes read */ function readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) { diff --git a/src/read.js b/src/read.js index 56f4630..d4b3b99 100644 --- a/src/read.js +++ b/src/read.js @@ -1,7 +1,7 @@ import { getColumnOffset, readColumn } from './column.js' import { parquetMetadataAsync } from './metadata.js' -import { getColumnName, isMapLike } from './schema.js' +import { getColumnName, getSchemaPath, isMapLike } from './schema.js' import { concat } from './utils.js' /** @@ -75,7 +75,7 @@ export async function parquetRead(options) { * @param {string[]} [options.columns] columns to read, all columns if undefined * @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range. * @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed - * @param {Compressors} [options.compressors] custom decompressors + * @param {Compressors} [options.compressors] * @param {RowGroup} rowGroup row group to read * @param {number} groupStart row index of the first row in the group * @returns {Promise} resolves to row data @@ -88,9 +88,8 @@ async function readRowGroup(options, rowGroup, groupStart) { let [groupStartByte, groupEndByte] = [file.byteLength, 0] rowGroup.columns.forEach(({ meta_data: columnMetadata }) => { if (!columnMetadata) throw new Error('parquet column metadata is undefined') - const columnName = getColumnName(metadata.schema, columnMetadata.path_in_schema) // skip columns that are not requested - if (columns && !columns.includes(columnName)) return + if (columns && !columns.includes(getColumnName(columnMetadata.path_in_schema))) return const startByte = getColumnOffset(columnMetadata) const endByte = startByte + Number(columnMetadata.total_compressed_size) @@ -120,8 +119,7 @@ async function readRowGroup(options, rowGroup, groupStart) { if (!columnMetadata) throw new Error('parquet column metadata is undefined') // skip columns that are not requested - const columnName = getColumnName(metadata.schema, columnMetadata.path_in_schema) - // skip columns that are not requested + const columnName = getColumnName(columnMetadata.path_in_schema) if (columns && !columns.includes(columnName)) continue const columnStartByte = getColumnOffset(columnMetadata) @@ -150,15 +148,16 @@ async function readRowGroup(options, rowGroup, groupStart) { // read column data async promises.push(buffer.then(arrayBuffer => { + const schemaPath = getSchemaPath(metadata.schema, columnMetadata.path_in_schema) /** @type {ArrayLike | undefined} */ let columnData = readColumn( - arrayBuffer, bufferOffset, rowGroup, columnMetadata, metadata.schema, compressors + arrayBuffer, bufferOffset, rowGroup, columnMetadata, schemaPath, compressors ) if (columnData.length !== Number(rowGroup.num_rows)) { throw new Error(`parquet column length ${columnData.length} does not match row group length ${rowGroup.num_rows}`) } - if (isMapLike(metadata.schema, columnMetadata.path_in_schema)) { + if (isMapLike(schemaPath)) { const name = columnMetadata.path_in_schema.slice(0, -2).join('.') if (!maps.has(name)) { maps.set(name, columnData) diff --git a/src/schema.js b/src/schema.js index ad9b7f1..06a65c2 100644 --- a/src/schema.js +++ b/src/schema.js @@ -28,29 +28,11 @@ function schemaTree(schema, rootIndex) { } /** - * Get the schema element with the given name. + * Get schema elements from the root to the given element name. * * @param {SchemaElement[]} schema * @param {string[]} name path to the element - * @returns {SchemaTree} schema element - */ -function schemaElement(schema, name) { - let tree = schemaTree(schema, 0) - // traverse the tree to find the element - for (const part of name) { - const child = tree.children.find(child => child.element.name === part) - if (!child) throw new Error(`parquet schema element not found: ${name}`) - tree = child - } - return tree -} - -/** - * Get each schema element from the root to the given element name. - * - * @param {SchemaElement[]} schema - * @param {string[]} name path to the element - * @returns {SchemaTree[]} schema element + * @returns {SchemaTree[]} list of schema elements */ export function getSchemaPath(schema, name) { let tree = schemaTree(schema, 0) @@ -65,8 +47,7 @@ export function getSchemaPath(schema, name) { } /** - * Check if the schema element with the given name is required. - * An element is required if all of its ancestors are required. + * Check if the schema path and all its ancestors are required. * * @param {SchemaTree[]} schemaPath * @returns {boolean} true if the element is required @@ -88,7 +69,7 @@ export function isRequired(schemaPath) { */ export function getMaxRepetitionLevel(schemaPath) { let maxLevel = 0 - for (const { element } of schemaPath.slice(1)) { + for (const { element } of schemaPath) { if (element.repetition_type === 'REPEATED') { maxLevel++ } @@ -129,29 +110,27 @@ export function skipDefinitionBytes(num) { } /** - * Get the column name as foo.bar and handle list-like columns. - * @param {SchemaElement[]} schema + * Get the column name as foo.bar and handle list and map like columns. + * * @param {string[]} path * @returns {string} column name */ -export function getColumnName(schema, path) { - if (isListLike(schema, path) || isMapLike(schema, path)) { - return path.slice(0, -2).join('.') - } else { - return path.join('.') - } +export function getColumnName(path) { + return path.join('.') + .replace(/(\.list\.element)+/g, '') + .replace(/\.key_value\.key/g, '') + .replace(/\.key_value\.value/g, '') } /** * Check if a column is list-like. * - * @param {SchemaElement[]} schemaElements parquet schema elements - * @param {string[]} path column path + * @param {SchemaTree[]} schemaPath * @returns {boolean} true if map-like */ -export function isListLike(schemaElements, path) { - const schema = schemaElement(schemaElements, path.slice(0, -2)) - if (path.length < 3) return false +export function isListLike(schemaPath) { + const schema = schemaPath.at(-3) + if (!schema) return false if (schema.element.converted_type !== 'LIST') return false if (schema.children.length > 1) return false @@ -168,13 +147,12 @@ export function isListLike(schemaElements, path) { /** * Check if a column is map-like. * - * @param {SchemaElement[]} schemaElements parquet schema elements - * @param {string[]} path column path + * @param {SchemaTree[]} schemaPath * @returns {boolean} true if map-like */ -export function isMapLike(schemaElements, path) { - const schema = schemaElement(schemaElements, path.slice(0, -2)) - if (path.length < 3) return false +export function isMapLike(schemaPath) { + const schema = schemaPath.at(-3) + if (!schema) return false if (schema.element.converted_type !== 'MAP') return false if (schema.children.length > 1) return false diff --git a/test/schema.test.js b/test/schema.test.js index 8be0c49..856adb6 100644 --- a/test/schema.test.js +++ b/test/schema.test.js @@ -69,21 +69,21 @@ describe('Parquet schema utils', () => { }) it('isListLike', () => { - expect(isListLike(schema, [])).toBe(false) - expect(isListLike(schema, ['child1'])).toBe(false) - expect(isListLike(schema, ['child2'])).toBe(false) - expect(isListLike(schema, ['child2', 'list', 'element'])).toBe(true) - expect(isListLike(schema, ['child3'])).toBe(false) - expect(isListLike(schema, ['child3', 'map', 'key'])).toBe(false) + expect(isListLike(getSchemaPath(schema, []))).toBe(false) + expect(isListLike(getSchemaPath(schema, ['child1']))).toBe(false) + expect(isListLike(getSchemaPath(schema, ['child2']))).toBe(false) + expect(isListLike(getSchemaPath(schema, ['child2', 'list', 'element']))).toBe(true) + expect(isListLike(getSchemaPath(schema, ['child3']))).toBe(false) + expect(isListLike(getSchemaPath(schema, ['child3', 'map', 'key']))).toBe(false) }) it('isMapLike', () => { - expect(isMapLike(schema, [])).toBe(false) - expect(isMapLike(schema, ['child1'])).toBe(false) - expect(isMapLike(schema, ['child2'])).toBe(false) - expect(isMapLike(schema, ['child2', 'list', 'element'])).toBe(false) - expect(isMapLike(schema, ['child3'])).toBe(false) - expect(isMapLike(schema, ['child3', 'map', 'key'])).toBe(true) - expect(isMapLike(schema, ['child3', 'map', 'value'])).toBe(true) + expect(isMapLike(getSchemaPath(schema, []))).toBe(false) + expect(isMapLike(getSchemaPath(schema, ['child1']))).toBe(false) + expect(isMapLike(getSchemaPath(schema, ['child2']))).toBe(false) + expect(isMapLike(getSchemaPath(schema, ['child2', 'list', 'element']))).toBe(false) + expect(isMapLike(getSchemaPath(schema, ['child3']))).toBe(false) + expect(isMapLike(getSchemaPath(schema, ['child3', 'map', 'key']))).toBe(true) + expect(isMapLike(getSchemaPath(schema, ['child3', 'map', 'value']))).toBe(true) }) })