From 3f958ed25d54a4ade230b173d81c4f2a14d5d294 Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Fri, 17 May 2024 19:41:40 -0700 Subject: [PATCH] Handle skipNulls in assembleLists --- src/assemble.js | 21 ++++----- src/column.js | 99 ++++++++++++------------------------------- test/assemble.test.js | 38 ++++++++--------- 3 files changed, 57 insertions(+), 101 deletions(-) diff --git a/src/assemble.js b/src/assemble.js index fbe5a98..253d6a9 100644 --- a/src/assemble.js +++ b/src/assemble.js @@ -4,26 +4,28 @@ * Reconstructs a complex nested structure from flat arrays of definition and repetition levels, * according to Dremel encoding. * - * @param {number[] | undefined} definitionLevels definition levels - * @param {number[]} repetitionLevels repetition levels - * @param {ArrayLike} values values to process + * @typedef {import('./types.d.ts').DecodedArray} DecodedArray + * @param {number[] | undefined} definitionLevels + * @param {number[]} repetitionLevels + * @param {DecodedArray} values * @param {boolean} isNullable can entries be null? * @param {number} maxDefinitionLevel definition level that corresponds to non-null * @param {number} maxRepetitionLevel repetition level that corresponds to a new row - * @returns {any[]} array of values + * @returns {DecodedArray} array of values */ -export function assembleObjects( +export function assembleLists( definitionLevels, repetitionLevels, values, isNullable, maxDefinitionLevel, maxRepetitionLevel ) { + const n = definitionLevels?.length || repetitionLevels.length let valueIndex = 0 /** @type {any[]} */ const output = [] + + // Track state of nested structures + const containerStack = [output] let currentContainer = output - // Trackers for nested structures. - const containerStack = [output] - - for (let i = 0; i < repetitionLevels.length; i++) { + for (let i = 0; i < n; i++) { const def = definitionLevels?.length ? definitionLevels[i] : maxDefinitionLevel const rep = repetitionLevels[i] @@ -67,7 +69,6 @@ export function assembleObjects( return [values] } // return max definition level of nested lists - /** @type {any[]} */ for (let i = 0; i < maxDefinitionLevel; i++) { /** @type {any[]} */ const newList = [] diff --git a/src/column.js b/src/column.js index 0d8c86b..4b9f4c0 100644 --- a/src/column.js +++ b/src/column.js @@ -1,4 +1,4 @@ -import { assembleObjects } from './assemble.js' +import { assembleLists } from './assemble.js' import { convert } from './convert.js' import { readDataPage, readDictionaryPage } from './datapage.js' import { readDataPageV2 } from './datapageV2.js' @@ -62,38 +62,26 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, // assert(!daph.statistics || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length)) // construct output values: skip nulls and construct lists - if (repetitionLevels.length) { - dereferenceDictionary(dictionary, dataPage) + dereferenceDictionary(dictionary, dataPage) + values = convert(dataPage, element) + if (repetitionLevels.length || definitionLevels?.length) { // Use repetition levels to construct lists - const isNullable = columnMetadata && !isRequired(schemaPath.slice(0, 2)) const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) - // convert primitive types to rich types - values = convert(dataPage, element) - values = assembleObjects( + const isNullable = columnMetadata && !isRequired(schemaPath.slice(0, 2)) + values = assembleLists( definitionLevels, repetitionLevels, values, isNullable, maxDefinitionLevel, maxRepetitionLevel ) - } else if (definitionLevels?.length) { - const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) - // Use definition levels to skip nulls - values = [] - skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, values) - values = convert(values, element) } else { - dereferenceDictionary(dictionary, dataPage) - values = convert(dataPage, element) + // wrap nested flat data by depth + for (let i = 2; i < schemaPath.length; i++) { + if (schemaPath[i].element.repetition_type !== 'REQUIRED') { + values = [values] + } + } } // assert(BigInt(values.length) === rowGroup.num_rows) - concat(rowData, values) - } else if (header.type === 'DICTIONARY_PAGE') { - const diph = header.dictionary_page_header - if (!diph) throw new Error('parquet dictionary page header is undefined') - - const page = decompressPage( - compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors - ) - dictionary = readDictionaryPage(page, diph, columnMetadata, element.type_length) } else if (header.type === 'DATA_PAGE_V2') { const daph2 = header.data_page_header_v2 if (!daph2) throw new Error('parquet data page header v2 is undefined') @@ -103,26 +91,26 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, ) valuesSeen += daph2.num_values - const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) - const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) - if (repetitionLevels.length) { - dereferenceDictionary(dictionary, dataPage) - values = convert(dataPage, element) + dereferenceDictionary(dictionary, dataPage) + values = convert(dataPage, element) + if (repetitionLevels.length || definitionLevels?.length) { // Use repetition levels to construct lists - values = assembleObjects( - definitionLevels, repetitionLevels, values, true, maxDefinitionLevel, maxRepetitionLevel + const isNullable = columnMetadata && !isRequired(schemaPath.slice(0, 2)) + const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) + const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) + values = assembleLists( + definitionLevels, repetitionLevels, values, isNullable, maxDefinitionLevel, maxRepetitionLevel ) - } else if (daph2.num_nulls) { - // skip nulls - if (!definitionLevels) throw new Error('parquet data page v2 nulls missing definition levels') - values = [] // TODO: copy straight into rowData, combine convert into skipNulls - skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, values) - values = convert(values, element) - } else { - dereferenceDictionary(dictionary, dataPage) - values = convert(dataPage, element) } concat(rowData, values) + } else if (header.type === 'DICTIONARY_PAGE') { + const diph = header.dictionary_page_header + if (!diph) throw new Error('parquet dictionary page header is undefined') + + const page = decompressPage( + compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors + ) + dictionary = readDictionaryPage(page, diph, columnMetadata, element.type_length) } else { throw new Error(`parquet unsupported page type: ${header.type}`) } @@ -190,34 +178,3 @@ export function decompressPage(compressedBytes, uncompressed_page_size, codec, c } return page } - -/** - * Expand data page list with nulls. - * - * @param {number[]} definitionLevels - * @param {number} maxDefinitionLevel - * @param {ArrayLike} dataPage - * @param {any} dictionary - * @param {any[]} output - */ -function skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, output) { - if (output.length) throw new Error('parquet output array is not empty') - // Use definition levels to skip nulls - let index = 0 - for (let i = 0; i < definitionLevels.length; i++) { - if (definitionLevels[i] === maxDefinitionLevel) { - if (index > dataPage.length) { - throw new Error(`parquet index ${index} exceeds data page length ${dataPage.length}`) - } - let v = dataPage[index++] - - // map to dictionary value - if (dictionary) { - v = dictionary[v] - } - output[i] = v - } else { - output[i] = undefined - } - } -} diff --git a/test/assemble.test.js b/test/assemble.test.js index b561164..23d24f1 100644 --- a/test/assemble.test.js +++ b/test/assemble.test.js @@ -1,11 +1,11 @@ import { describe, expect, it } from 'vitest' -import { assembleObjects } from '../src/assemble.js' +import { assembleLists } from '../src/assemble.js' describe('assembleObjects', () => { it('should assemble objects with non-null values', () => { const repetitionLevels = [0, 1] const values = ['a', 'b'] - const result = assembleObjects([], repetitionLevels, values, false, 3, 1) + const result = assembleLists([], repetitionLevels, values, false, 1, 1) expect(result).toEqual([['a', 'b']]) }) @@ -13,26 +13,26 @@ describe('assembleObjects', () => { const definitionLevels = [3, 0, 3] const repetitionLevels = [0, 1, 1] const values = ['a', 'c'] - const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 3, 1) + const result = assembleLists(definitionLevels, repetitionLevels, values, true, 3, 1) expect(result).toEqual([['a', undefined, 'c']]) }) it('should handle empty lists', () => { - const result = assembleObjects([], [], [], false, 0, 0) - expect(result).toEqual([]) + expect(assembleLists([], [], [], false, 0, 0)).toEqual([]) + expect(assembleLists([], [], [], false, 1, 0)).toEqual([[]]) }) it('should handle multiple lists', () => { const repetitionLevels = [0, 0] const values = [22, 33] - const result = assembleObjects([], repetitionLevels, values, false, 3, 1) + const result = assembleLists([], repetitionLevels, values, false, 1, 1) expect(result).toEqual([[22], [33]]) }) it('should handle multiple lists (6)', () => { const repetitionLevels = [0, 1, 1, 0, 1, 1] const values = [1, 2, 3, 4, 5, 6] - const result = assembleObjects([], repetitionLevels, values, false, 3, 1) + const result = assembleLists([], repetitionLevels, values, false, 1, 1) expect(result).toEqual([[1, 2, 3], [4, 5, 6]]) }) @@ -40,7 +40,7 @@ describe('assembleObjects', () => { const definitionLevels = [3, 3, 0, 3, 3] const repetitionLevels = [0, 1, 0, 0, 1] const values = ['a', 'b', 'd', 'e'] - const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 3, 1) + const result = assembleLists(definitionLevels, repetitionLevels, values, true, 3, 1) expect(result).toEqual([['a', 'b'], undefined, ['d', 'e']]) }) @@ -56,7 +56,7 @@ describe('assembleObjects', () => { // from nullable.impala.parquet const repetitionLevels = [0, 2, 1, 2] const values = [1, 2, 3, 4] - const result = assembleObjects([], repetitionLevels, values, false, 3, 2) + const result = assembleLists([], repetitionLevels, values, false, 2, 2) expect(result).toEqual([[[1, 2], [3, 4]]]) }) @@ -65,7 +65,7 @@ describe('assembleObjects', () => { const definitionLevels = [2, 2, 2, 2, 1, 1, 1, 0, 2, 2] const repetitionLevels = [0, 1, 0, 1, 0, 0, 0, 0, 0, 1] const values = ['k1', 'k2', 'k1', 'k2', 'k1', 'k3'] - const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 2, 1) + const result = assembleLists(definitionLevels, repetitionLevels, values, true, 2, 1) expect(result).toEqual([ ['k1', 'k2'], ['k1', 'k2'], @@ -79,14 +79,12 @@ describe('assembleObjects', () => { it('should handle empty lists with definition level', () => { // from nonnullable.impala.parquet - const result = assembleObjects([0], [0], [], false, 2, 2) - expect(result).toEqual([[[]]]) + expect(assembleLists([0], [0], [], false, 1, 2)).toEqual([[[]]]) }) it('should handle nonnullable lists', () => { // from nonnullable.impala.parquet - const result = assembleObjects([2], [0], [-1], false, 2, 2) - expect(result).toEqual([[[-1]]]) + expect(assembleLists([1], [0], [-1], false, 1, 2)).toEqual([[[-1]]]) }) it('should handle nullable int_array', () => { @@ -95,7 +93,7 @@ describe('assembleObjects', () => { const definitionLevels = [3, 3, 3, 2, 3, 3, 2, 3, 2, 1, 0, 0] const repetitionLevels = [0, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 0] const values = [1, 2, 3, 1, 2, 3] - const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 3, 1) + const result = assembleLists(definitionLevels, repetitionLevels, values, true, 3, 1) expect(result).toEqual([ [1, 2, 3], [undefined, 1, 2, undefined, 3, undefined], @@ -111,7 +109,7 @@ describe('assembleObjects', () => { const definitionLevels = [5, 5, 5, 5, 4, 5, 5, 4, 5, 4, 5, 3, 2, 2, 1, 0, 0, 2, 5, 5] const repetitionLevels = [0, 2, 1, 2, 0, 2, 2, 2, 1, 2, 2, 1, 1, 0, 0, 0, 0, 0, 1, 2] const values = [1, 2, 3, 4, 1, 2, 3, 4, 5, 6] - const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 5, 2) + const result = assembleLists(definitionLevels, repetitionLevels, values, true, 5, 2) expect(result).toEqual([ [[1, 2], [3, 4]], [[undefined, 1, 2, undefined], [3, undefined, 4], [], undefined], @@ -127,7 +125,7 @@ describe('assembleObjects', () => { const definitionLevels = [3, 4, 3, 3] const repetitionLevels = [0, 1, 1, 1] const values = ['k1'] - const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 4, 2) + const result = assembleLists(definitionLevels, repetitionLevels, values, true, 4, 2) expect(result).toEqual([[[], ['k1'], [], []]]) }) @@ -135,7 +133,7 @@ describe('assembleObjects', () => { const definitionLevels = [3, 5, 3, 3] const repetitionLevels = [0, 1, 1, 1] const values = ['v1'] - const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 5, 2) + const result = assembleLists(definitionLevels, repetitionLevels, values, true, 5, 2) expect(result).toEqual([[[], ['v1'], [], []]]) }) @@ -144,7 +142,7 @@ describe('assembleObjects', () => { const definitionLevels = [2, 2, 2, 0, 0, 2, 2, 2, 2, 2] const repetitionLevels = [0, 1, 1, 0, 0, 0, 1, 1, 0, 1] const values = [1, 2, 3, 1, 2, 3, 1, 2] - const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 2, 1) + const result = assembleLists(definitionLevels, repetitionLevels, values, true, 2, 1) expect(result).toEqual([[1, 2, 3], undefined, undefined, [1, 2, 3], [1, 2]]) }) @@ -152,7 +150,7 @@ describe('assembleObjects', () => { // from nonnullable.impala.parquet nested_Struct i const definitionLevels = [0] const repetitionLevels = [0] - const result = assembleObjects(definitionLevels, repetitionLevels, [], false, 2, 2) + const result = assembleLists(definitionLevels, repetitionLevels, [], false, 2, 2) expect(result).toEqual([[[]]]) }) })