diff --git a/src/column.js b/src/column.js index 2747736..a71d8d7 100644 --- a/src/column.js +++ b/src/column.js @@ -3,7 +3,7 @@ import { convert } from './convert.js' import { readDataPage, readDictionaryPage } from './datapage.js' import { readDataPageV2 } from './datapageV2.js' import { parquetHeader } from './header.js' -import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired, schemaElement } from './schema.js' +import { getMaxDefinitionLevel, getMaxRepetitionLevel, getSchemaPath, isRequired } from './schema.js' import { snappyUncompress } from './snappy.js' import { concat } from './utils.js' @@ -33,6 +33,9 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, /** @type {any[]} */ const rowData = [] + const schemaPath = getSchemaPath(schema, columnMetadata.path_in_schema) + const schemaElement = schemaPath[schemaPath.length - 1].element + while (valuesSeen < rowGroup.num_rows) { // parse column header const { value: header, byteLength: headerLength } = parquetHeader(arrayBuffer, columnOffset + byteOffset) @@ -55,7 +58,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, const page = decompressPage( compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors ) - const { definitionLevels, repetitionLevels, value: dataPage } = readDataPage(page, daph, schema, columnMetadata) + const { definitionLevels, repetitionLevels, value: dataPage } = readDataPage(page, daph, schemaPath, columnMetadata) valuesSeen += daph.num_values const dictionaryEncoding = daph.encoding === 'PLAIN_DICTIONARY' || daph.encoding === 'RLE_DICTIONARY' @@ -66,26 +69,26 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, if (repetitionLevels.length) { dereferenceDictionary(dictionary, dataPage) // Use repetition levels to construct lists - const isNullable = columnMetadata && !isRequired(schema, [columnMetadata.path_in_schema[0]]) - const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema) - const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema) + const isNullable = columnMetadata && !isRequired(schemaPath.slice(0, 2)) + const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) + const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) // convert primitive types to rich types values = convert(dataPage, schemaElement) values = assembleObjects( definitionLevels, repetitionLevels, values, isNullable, maxDefinitionLevel, maxRepetitionLevel ) } else if (definitionLevels?.length) { - const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema) + const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) // Use definition levels to skip nulls values = [] skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, values) } else { if (dictionaryEncoding && dictionary) { dereferenceDictionary(dictionary, dataPage) - values = convert(dataPage, schemaElement(schema, columnMetadata.path_in_schema).element) + values = convert(dataPage, schemaElement) } else if (Array.isArray(dataPage)) { // convert primitive types to rich types - values = convert(dataPage, schemaElement(schema, columnMetadata.path_in_schema).element) + values = convert(dataPage, schemaElement) } else { values = dataPage // TODO: data page shouldn't be a fixed byte array? } @@ -103,18 +106,18 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, const page = decompressPage( compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors ) - dictionary = readDictionaryPage(page, diph, schema, columnMetadata) + dictionary = readDictionaryPage(page, diph, columnMetadata) } else if (header.type === 'DATA_PAGE_V2') { const daph2 = header.data_page_header_v2 if (!daph2) throw new Error('parquet data page header v2 is undefined') const { definitionLevels, repetitionLevels, value: dataPage } = readDataPageV2( - compressedBytes, header, schema, columnMetadata, compressors + compressedBytes, header, schemaPath, columnMetadata, compressors ) valuesSeen += daph2.num_values - const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema) - const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema) + const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) + const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) if (repetitionLevels.length) { dereferenceDictionary(dictionary, dataPage) // Use repetition levels to construct lists diff --git a/src/datapage.js b/src/datapage.js index ecb5b4d..6fd8a09 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -1,5 +1,5 @@ import { readData, readPlain, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js' -import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired, schemaElement, skipDefinitionBytes } from './schema.js' +import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired, skipDefinitionBytes } from './schema.js' const skipNulls = false // TODO @@ -10,34 +10,32 @@ const skipNulls = false // TODO * @typedef {import("./types.d.ts").DataPage} DataPage * @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData * @typedef {import("./types.d.ts").DataPageHeader} DataPageHeader - * @typedef {import("./types.d.ts").SchemaElement} SchemaElement + * @typedef {import("./types.d.ts").SchemaTree} SchemaTree * @param {Uint8Array} bytes raw page data (should already be decompressed) * @param {DataPageHeader} daph data page header - * @param {SchemaElement[]} schema schema for the file + * @param {SchemaTree[]} schemaPath schema for the file * @param {ColumnMetaData} columnMetadata metadata for the column * @returns {DataPage} definition levels, repetition levels, and array of values */ -export function readDataPage(bytes, daph, schema, columnMetadata) { +export function readDataPage(bytes, daph, schemaPath, columnMetadata) { const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength) const reader = { view, offset: 0 } /** @type {any[]} */ let values = [] // repetition levels - const repetitionLevels = readRepetitionLevels( - reader, daph, schema, columnMetadata - ) + const repetitionLevels = readRepetitionLevels(reader, daph, schemaPath) // definition levels let definitionLevels = undefined let numNulls = 0 // let maxDefinitionLevel = -1 // TODO: move into readDefinitionLevels - if (skipNulls && !isRequired(schema, columnMetadata.path_in_schema)) { + if (skipNulls && !isRequired(schemaPath)) { // skip_definition_bytes reader.offset += skipDefinitionBytes(daph.num_values) } else { - const dl = readDefinitionLevels(reader, daph, schema, columnMetadata.path_in_schema) + const dl = readDefinitionLevels(reader, daph, schemaPath) definitionLevels = dl.definitionLevels numNulls = dl.numNulls } @@ -45,7 +43,7 @@ export function readDataPage(bytes, daph, schema, columnMetadata) { // read values based on encoding const nValues = daph.num_values - numNulls if (daph.encoding === 'PLAIN') { - const { element } = schemaElement(schema, columnMetadata.path_in_schema) + const { element } = schemaPath[schemaPath.length - 1] const utf8 = element.converted_type === 'UTF8' const plainObj = readPlain(reader, columnMetadata.type, nValues, utf8) values = Array.isArray(plainObj) ? plainObj : Array.from(plainObj) @@ -85,11 +83,10 @@ export function readDataPage(bytes, daph, schema, columnMetadata) { * @typedef {import("./types.d.ts").DictionaryPageHeader} DictionaryPageHeader * @param {Uint8Array} bytes raw page data * @param {DictionaryPageHeader} diph dictionary page header - * @param {SchemaElement[]} schema schema for the file * @param {ColumnMetaData} columnMetadata metadata for the column * @returns {ArrayLike} array of values */ -export function readDictionaryPage(bytes, diph, schema, columnMetadata) { +export function readDictionaryPage(bytes, diph, columnMetadata) { const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength) const reader = { view, offset: 0 } return readPlain(reader, columnMetadata.type, diph.num_values, false) @@ -101,13 +98,12 @@ export function readDictionaryPage(bytes, diph, schema, columnMetadata) { * @typedef {import("./types.d.ts").DataReader} DataReader * @param {DataReader} reader data view for the page * @param {DataPageHeader} daph data page header - * @param {SchemaElement[]} schema schema for the file - * @param {ColumnMetaData} columnMetadata metadata for the column + * @param {SchemaTree[]} schemaPath schema path for the column * @returns {any[]} repetition levels and number of bytes read */ -function readRepetitionLevels(reader, daph, schema, columnMetadata) { - if (columnMetadata.path_in_schema.length > 1) { - const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema) +function readRepetitionLevels(reader, daph, schemaPath) { + if (schemaPath.length > 1) { + const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) if (maxRepetitionLevel) { const bitWidth = widthFromMaxInt(maxRepetitionLevel) return readData( @@ -123,13 +119,12 @@ function readRepetitionLevels(reader, daph, schema, columnMetadata) { * * @param {DataReader} reader data view for the page * @param {DataPageHeader} daph data page header - * @param {SchemaElement[]} schema schema for the file - * @param {string[]} path_in_schema path in the schema + * @param {SchemaTree[]} schemaPath schema path for the column * @returns {DefinitionLevels} definition levels and number of bytes read */ -function readDefinitionLevels(reader, daph, schema, path_in_schema) { - if (!isRequired(schema, path_in_schema)) { - const maxDefinitionLevel = getMaxDefinitionLevel(schema, path_in_schema) +function readDefinitionLevels(reader, daph, schemaPath) { + if (!isRequired(schemaPath)) { + const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) const bitWidth = widthFromMaxInt(maxDefinitionLevel) if (bitWidth) { // num_values is index 1 for either type of page header diff --git a/src/datapageV2.js b/src/datapageV2.js index e09d860..cf69579 100644 --- a/src/datapageV2.js +++ b/src/datapageV2.js @@ -1,6 +1,6 @@ import { decompressPage } from './column.js' import { readPlain, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js' -import { getMaxDefinitionLevel, getMaxRepetitionLevel, schemaElement } from './schema.js' +import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' import { readVarInt, readZigZag } from './thrift.js' /** @@ -10,15 +10,15 @@ import { readVarInt, readZigZag } from './thrift.js' * @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData * @typedef {import("./types.d.ts").Compressors} Compressors * @typedef {import("./types.d.ts").DataPageHeaderV2} DataPageHeaderV2 - * @typedef {import("./types.d.ts").SchemaElement} SchemaElement + * @typedef {import("./types.d.ts").SchemaTree} SchemaTree * @param {Uint8Array} compressedBytes raw page data (should already be decompressed) * @param {import("./types.d.ts").PageHeader} ph page header - * @param {SchemaElement[]} schema schema for the file + * @param {SchemaTree[]} schemaPath schema path for the column * @param {ColumnMetaData} columnMetadata metadata for the column * @param {Compressors | undefined} compressors * @returns {DataPage} definition levels, repetition levels, and array of values */ -export function readDataPageV2(compressedBytes, ph, schema, columnMetadata, compressors) { +export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, compressors) { const view = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength) const reader = { view, offset: 0 } /** @type {any} */ @@ -28,14 +28,14 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata, comp if (!daph2) throw new Error('parquet data page header v2 is undefined') // repetition levels - const repetitionLevels = readRepetitionLevelsV2(reader, daph2, schema, columnMetadata) + const repetitionLevels = readRepetitionLevelsV2(reader, daph2, schemaPath) if (reader.offset !== daph2.repetition_levels_byte_length) { throw new Error(`parquet repetition levels byte length ${reader.offset} does not match expected ${daph2.repetition_levels_byte_length}`) } // definition levels - const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema) + const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) const definitionLevels = readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) if (reader.offset !== daph2.repetition_levels_byte_length + daph2.definition_levels_byte_length) { @@ -47,7 +47,7 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata, comp // read values based on encoding const nValues = daph2.num_values - daph2.num_nulls if (daph2.encoding === 'PLAIN') { - const { element } = schemaElement(schema, columnMetadata.path_in_schema) + const { element } = schemaPath[schemaPath.length - 1] const utf8 = element.converted_type === 'UTF8' let page = compressedBytes.slice(reader.offset) if (daph2.is_compressed && columnMetadata.codec !== 'UNCOMPRESSED') { @@ -99,12 +99,11 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata, comp * @typedef {import("./types.d.ts").DataReader} DataReader * @param {DataReader} reader data view for the page * @param {DataPageHeaderV2} daph2 data page header - * @param {SchemaElement[]} schema schema for the file - * @param {ColumnMetaData} columnMetadata metadata for the column + * @param {SchemaTree[]} schemaPath schema path for the column * @returns {any[]} repetition levels and number of bytes read */ -export function readRepetitionLevelsV2(reader, daph2, schema, columnMetadata) { - const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema) +export function readRepetitionLevelsV2(reader, daph2, schemaPath) { + const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) if (!maxRepetitionLevel) return [] const bitWidth = widthFromMaxInt(maxRepetitionLevel) diff --git a/src/metadata.js b/src/metadata.js index f1919e8..5e33ac8 100644 --- a/src/metadata.js +++ b/src/metadata.js @@ -1,5 +1,5 @@ import { CompressionCodec, ConvertedType, Encoding, FieldRepetitionType, ParquetType } from './constants.js' -import { schemaElement } from './schema.js' +import { getSchemaPath } from './schema.js' import { deserializeTCompactProtocol } from './thrift.js' /** @@ -172,7 +172,7 @@ export function parquetMetadata(arrayBuffer) { * @returns {import("./types.d.ts").SchemaTree} tree of schema elements */ export function parquetSchema(metadata) { - return schemaElement(metadata.schema, []) + return getSchemaPath(metadata.schema, [])[0] } /** diff --git a/src/read.js b/src/read.js index b839158..56f4630 100644 --- a/src/read.js +++ b/src/read.js @@ -150,7 +150,6 @@ async function readRowGroup(options, rowGroup, groupStart) { // read column data async promises.push(buffer.then(arrayBuffer => { - // TODO: extract SchemaElement for this column /** @type {ArrayLike | undefined} */ let columnData = readColumn( arrayBuffer, bufferOffset, rowGroup, columnMetadata, metadata.schema, compressors diff --git a/src/schema.js b/src/schema.js index f5d58ee..ad9b7f1 100644 --- a/src/schema.js +++ b/src/schema.js @@ -34,7 +34,7 @@ function schemaTree(schema, rootIndex) { * @param {string[]} name path to the element * @returns {SchemaTree} schema element */ -export function schemaElement(schema, name) { +function schemaElement(schema, name) { let tree = schemaTree(schema, 0) // traverse the tree to find the element for (const part of name) { @@ -46,21 +46,34 @@ export function schemaElement(schema, name) { } /** - * Check if the schema element with the given name is required. - * An element is required if all of its ancestors are required. + * Get each schema element from the root to the given element name. * * @param {SchemaElement[]} schema * @param {string[]} name path to the element + * @returns {SchemaTree[]} schema element + */ +export function getSchemaPath(schema, name) { + let tree = schemaTree(schema, 0) + const path = [tree] + for (const part of name) { + const child = tree.children.find(child => child.element.name === part) + if (!child) throw new Error(`parquet schema element not found: ${name}`) + path.push(child) + tree = child + } + return path +} + +/** + * Check if the schema element with the given name is required. + * An element is required if all of its ancestors are required. + * + * @param {SchemaTree[]} schemaPath * @returns {boolean} true if the element is required */ -export function isRequired(schema, name) { - /** @type {SchemaTree | undefined} */ - let tree = schemaTree(schema, 0) - for (let i = 0; i < name.length; i++) { - // Find schema child with the given name - tree = tree.children.find(child => child.element.name === name[i]) - if (!tree) throw new Error(`parquet schema element not found: ${name}`) - if (tree.element.repetition_type !== 'REQUIRED') { +export function isRequired(schemaPath) { + for (const { element } of schemaPath.slice(1)) { + if (element.repetition_type !== 'REQUIRED') { return false } } @@ -70,36 +83,32 @@ export function isRequired(schema, name) { /** * Get the max repetition level for a given schema path. * - * @param {SchemaElement[]} schema - * @param {string[]} parts path to the element + * @param {SchemaTree[]} schemaPath * @returns {number} max repetition level */ -export function getMaxRepetitionLevel(schema, parts) { +export function getMaxRepetitionLevel(schemaPath) { let maxLevel = 0 - parts.forEach((part, i) => { - const { element } = schemaElement(schema, parts.slice(0, i + 1)) + for (const { element } of schemaPath.slice(1)) { if (element.repetition_type === 'REPEATED') { maxLevel++ } - }) + } return maxLevel } /** * Get the max definition level for a given schema path. * - * @param {SchemaElement[]} schema - * @param {string[]} parts path to the element + * @param {SchemaTree[]} schemaPath * @returns {number} max definition level */ -export function getMaxDefinitionLevel(schema, parts) { +export function getMaxDefinitionLevel(schemaPath) { let maxLevel = 0 - parts.forEach((part, i) => { - const { element } = schemaElement(schema, parts.slice(0, i + 1)) + for (const { element } of schemaPath.slice(1)) { if (element.repetition_type !== 'REQUIRED') { maxLevel++ } - }) + } return maxLevel } diff --git a/test/schema.test.js b/test/schema.test.js index 5fbbf87..8be0c49 100644 --- a/test/schema.test.js +++ b/test/schema.test.js @@ -2,10 +2,10 @@ import { describe, expect, it } from 'vitest' import { getMaxDefinitionLevel, getMaxRepetitionLevel, + getSchemaPath, isListLike, isMapLike, isRequired, - schemaElement, skipDefinitionBytes, } from '../src/schema.js' @@ -26,9 +26,10 @@ describe('Parquet schema utils', () => { { name: 'value', repetition_type: 'OPTIONAL' }, ] - describe('schemaElement', () => { - it('should return the schema element', () => { - expect(schemaElement(schema, ['child1'])).toEqual({ + describe('getSchemaPath', () => { + it('should return the schema path', () => { + const path = getSchemaPath(schema, ['child1']) + expect(path[path.length - 1]).toEqual({ children: [], count: 1, element: { name: 'child1', repetition_type: 'OPTIONAL' }, @@ -36,30 +37,30 @@ describe('Parquet schema utils', () => { }) it('should throw an error if element not found', () => { - expect(() => schemaElement(schema, ['nonexistent'])) + expect(() => getSchemaPath(schema, ['nonexistent'])) .toThrow('parquet schema element not found: nonexistent') }) }) it('isRequired', () => { - expect(isRequired(schema, [])).toBe(true) - expect(isRequired(schema, ['child1'])).toBe(false) - expect(isRequired(schema, ['child2'])).toBe(false) - expect(isRequired(schema, ['child3'])).toBe(false) + expect(isRequired(getSchemaPath(schema, []))).toBe(true) + expect(isRequired(getSchemaPath(schema, ['child1']))).toBe(false) + expect(isRequired(getSchemaPath(schema, ['child2']))).toBe(false) + expect(isRequired(getSchemaPath(schema, ['child3']))).toBe(false) }) it('getMaxRepetitionLevel', () => { - expect(getMaxRepetitionLevel(schema, ['child1'])).toBe(0) - expect(getMaxRepetitionLevel(schema, ['child2'])).toBe(0) - expect(getMaxRepetitionLevel(schema, ['child2', 'list', 'element'])).toBe(1) - expect(getMaxRepetitionLevel(schema, ['child3'])).toBe(0) - expect(getMaxRepetitionLevel(schema, ['child3', 'map', 'key'])).toBe(1) + expect(getMaxRepetitionLevel(getSchemaPath(schema, ['child1']))).toBe(0) + expect(getMaxRepetitionLevel(getSchemaPath(schema, ['child2']))).toBe(0) + expect(getMaxRepetitionLevel(getSchemaPath(schema, ['child2', 'list', 'element']))).toBe(1) + expect(getMaxRepetitionLevel(getSchemaPath(schema, ['child3']))).toBe(0) + expect(getMaxRepetitionLevel(getSchemaPath(schema, ['child3', 'map', 'key']))).toBe(1) }) it('getMaxDefinitionLevel', () => { - expect(getMaxDefinitionLevel(schema, ['child1'])).toBe(1) - expect(getMaxDefinitionLevel(schema, ['child2'])).toBe(1) - expect(getMaxDefinitionLevel(schema, ['child3'])).toBe(1) + expect(getMaxDefinitionLevel(getSchemaPath(schema, ['child1']))).toBe(1) + expect(getMaxDefinitionLevel(getSchemaPath(schema, ['child2']))).toBe(1) + expect(getMaxDefinitionLevel(getSchemaPath(schema, ['child3']))).toBe(1) }) it('skipDefinitionBytes', () => {