Handle skipNulls in assembleLists

This commit is contained in:
Kenny Daniel 2024-05-17 19:41:40 -07:00
parent 0cfcc6331f
commit 3f958ed25d
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
3 changed files with 57 additions and 101 deletions

@ -4,26 +4,28 @@
* Reconstructs a complex nested structure from flat arrays of definition and repetition levels,
* according to Dremel encoding.
*
* @param {number[] | undefined} definitionLevels definition levels
* @param {number[]} repetitionLevels repetition levels
* @param {ArrayLike<any>} values values to process
* @typedef {import('./types.d.ts').DecodedArray} DecodedArray
* @param {number[] | undefined} definitionLevels
* @param {number[]} repetitionLevels
* @param {DecodedArray} values
* @param {boolean} isNullable can entries be null?
* @param {number} maxDefinitionLevel definition level that corresponds to non-null
* @param {number} maxRepetitionLevel repetition level that corresponds to a new row
* @returns {any[]} array of values
* @returns {DecodedArray} array of values
*/
export function assembleObjects(
export function assembleLists(
definitionLevels, repetitionLevels, values, isNullable, maxDefinitionLevel, maxRepetitionLevel
) {
const n = definitionLevels?.length || repetitionLevels.length
let valueIndex = 0
/** @type {any[]} */
const output = []
// Track state of nested structures
const containerStack = [output]
let currentContainer = output
// Trackers for nested structures.
const containerStack = [output]
for (let i = 0; i < repetitionLevels.length; i++) {
for (let i = 0; i < n; i++) {
const def = definitionLevels?.length ? definitionLevels[i] : maxDefinitionLevel
const rep = repetitionLevels[i]
@ -67,7 +69,6 @@ export function assembleObjects(
return [values]
}
// return max definition level of nested lists
/** @type {any[]} */
for (let i = 0; i < maxDefinitionLevel; i++) {
/** @type {any[]} */
const newList = []

@ -1,4 +1,4 @@
import { assembleObjects } from './assemble.js'
import { assembleLists } from './assemble.js'
import { convert } from './convert.js'
import { readDataPage, readDictionaryPage } from './datapage.js'
import { readDataPageV2 } from './datapageV2.js'
@ -62,38 +62,26 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
// assert(!daph.statistics || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length))
// construct output values: skip nulls and construct lists
if (repetitionLevels.length) {
dereferenceDictionary(dictionary, dataPage)
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, element)
if (repetitionLevels.length || definitionLevels?.length) {
// Use repetition levels to construct lists
const isNullable = columnMetadata && !isRequired(schemaPath.slice(0, 2))
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
// convert primitive types to rich types
values = convert(dataPage, element)
values = assembleObjects(
const isNullable = columnMetadata && !isRequired(schemaPath.slice(0, 2))
values = assembleLists(
definitionLevels, repetitionLevels, values, isNullable, maxDefinitionLevel, maxRepetitionLevel
)
} else if (definitionLevels?.length) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
// Use definition levels to skip nulls
values = []
skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, values)
values = convert(values, element)
} else {
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, element)
// wrap nested flat data by depth
for (let i = 2; i < schemaPath.length; i++) {
if (schemaPath[i].element.repetition_type !== 'REQUIRED') {
values = [values]
}
}
}
// assert(BigInt(values.length) === rowGroup.num_rows)
concat(rowData, values)
} else if (header.type === 'DICTIONARY_PAGE') {
const diph = header.dictionary_page_header
if (!diph) throw new Error('parquet dictionary page header is undefined')
const page = decompressPage(
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors
)
dictionary = readDictionaryPage(page, diph, columnMetadata, element.type_length)
} else if (header.type === 'DATA_PAGE_V2') {
const daph2 = header.data_page_header_v2
if (!daph2) throw new Error('parquet data page header v2 is undefined')
@ -103,26 +91,26 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
)
valuesSeen += daph2.num_values
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
if (repetitionLevels.length) {
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, element)
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, element)
if (repetitionLevels.length || definitionLevels?.length) {
// Use repetition levels to construct lists
values = assembleObjects(
definitionLevels, repetitionLevels, values, true, maxDefinitionLevel, maxRepetitionLevel
const isNullable = columnMetadata && !isRequired(schemaPath.slice(0, 2))
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
values = assembleLists(
definitionLevels, repetitionLevels, values, isNullable, maxDefinitionLevel, maxRepetitionLevel
)
} else if (daph2.num_nulls) {
// skip nulls
if (!definitionLevels) throw new Error('parquet data page v2 nulls missing definition levels')
values = [] // TODO: copy straight into rowData, combine convert into skipNulls
skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, values)
values = convert(values, element)
} else {
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, element)
}
concat(rowData, values)
} else if (header.type === 'DICTIONARY_PAGE') {
const diph = header.dictionary_page_header
if (!diph) throw new Error('parquet dictionary page header is undefined')
const page = decompressPage(
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors
)
dictionary = readDictionaryPage(page, diph, columnMetadata, element.type_length)
} else {
throw new Error(`parquet unsupported page type: ${header.type}`)
}
@ -190,34 +178,3 @@ export function decompressPage(compressedBytes, uncompressed_page_size, codec, c
}
return page
}
/**
* Expand data page list with nulls.
*
* @param {number[]} definitionLevels
* @param {number} maxDefinitionLevel
* @param {ArrayLike<any>} dataPage
* @param {any} dictionary
* @param {any[]} output
*/
function skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, output) {
if (output.length) throw new Error('parquet output array is not empty')
// Use definition levels to skip nulls
let index = 0
for (let i = 0; i < definitionLevels.length; i++) {
if (definitionLevels[i] === maxDefinitionLevel) {
if (index > dataPage.length) {
throw new Error(`parquet index ${index} exceeds data page length ${dataPage.length}`)
}
let v = dataPage[index++]
// map to dictionary value
if (dictionary) {
v = dictionary[v]
}
output[i] = v
} else {
output[i] = undefined
}
}
}

@ -1,11 +1,11 @@
import { describe, expect, it } from 'vitest'
import { assembleObjects } from '../src/assemble.js'
import { assembleLists } from '../src/assemble.js'
describe('assembleObjects', () => {
it('should assemble objects with non-null values', () => {
const repetitionLevels = [0, 1]
const values = ['a', 'b']
const result = assembleObjects([], repetitionLevels, values, false, 3, 1)
const result = assembleLists([], repetitionLevels, values, false, 1, 1)
expect(result).toEqual([['a', 'b']])
})
@ -13,26 +13,26 @@ describe('assembleObjects', () => {
const definitionLevels = [3, 0, 3]
const repetitionLevels = [0, 1, 1]
const values = ['a', 'c']
const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 3, 1)
const result = assembleLists(definitionLevels, repetitionLevels, values, true, 3, 1)
expect(result).toEqual([['a', undefined, 'c']])
})
it('should handle empty lists', () => {
const result = assembleObjects([], [], [], false, 0, 0)
expect(result).toEqual([])
expect(assembleLists([], [], [], false, 0, 0)).toEqual([])
expect(assembleLists([], [], [], false, 1, 0)).toEqual([[]])
})
it('should handle multiple lists', () => {
const repetitionLevels = [0, 0]
const values = [22, 33]
const result = assembleObjects([], repetitionLevels, values, false, 3, 1)
const result = assembleLists([], repetitionLevels, values, false, 1, 1)
expect(result).toEqual([[22], [33]])
})
it('should handle multiple lists (6)', () => {
const repetitionLevels = [0, 1, 1, 0, 1, 1]
const values = [1, 2, 3, 4, 5, 6]
const result = assembleObjects([], repetitionLevels, values, false, 3, 1)
const result = assembleLists([], repetitionLevels, values, false, 1, 1)
expect(result).toEqual([[1, 2, 3], [4, 5, 6]])
})
@ -40,7 +40,7 @@ describe('assembleObjects', () => {
const definitionLevels = [3, 3, 0, 3, 3]
const repetitionLevels = [0, 1, 0, 0, 1]
const values = ['a', 'b', 'd', 'e']
const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 3, 1)
const result = assembleLists(definitionLevels, repetitionLevels, values, true, 3, 1)
expect(result).toEqual([['a', 'b'], undefined, ['d', 'e']])
})
@ -56,7 +56,7 @@ describe('assembleObjects', () => {
// from nullable.impala.parquet
const repetitionLevels = [0, 2, 1, 2]
const values = [1, 2, 3, 4]
const result = assembleObjects([], repetitionLevels, values, false, 3, 2)
const result = assembleLists([], repetitionLevels, values, false, 2, 2)
expect(result).toEqual([[[1, 2], [3, 4]]])
})
@ -65,7 +65,7 @@ describe('assembleObjects', () => {
const definitionLevels = [2, 2, 2, 2, 1, 1, 1, 0, 2, 2]
const repetitionLevels = [0, 1, 0, 1, 0, 0, 0, 0, 0, 1]
const values = ['k1', 'k2', 'k1', 'k2', 'k1', 'k3']
const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 2, 1)
const result = assembleLists(definitionLevels, repetitionLevels, values, true, 2, 1)
expect(result).toEqual([
['k1', 'k2'],
['k1', 'k2'],
@ -79,14 +79,12 @@ describe('assembleObjects', () => {
it('should handle empty lists with definition level', () => {
// from nonnullable.impala.parquet
const result = assembleObjects([0], [0], [], false, 2, 2)
expect(result).toEqual([[[]]])
expect(assembleLists([0], [0], [], false, 1, 2)).toEqual([[[]]])
})
it('should handle nonnullable lists', () => {
// from nonnullable.impala.parquet
const result = assembleObjects([2], [0], [-1], false, 2, 2)
expect(result).toEqual([[[-1]]])
expect(assembleLists([1], [0], [-1], false, 1, 2)).toEqual([[[-1]]])
})
it('should handle nullable int_array', () => {
@ -95,7 +93,7 @@ describe('assembleObjects', () => {
const definitionLevels = [3, 3, 3, 2, 3, 3, 2, 3, 2, 1, 0, 0]
const repetitionLevels = [0, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 0]
const values = [1, 2, 3, 1, 2, 3]
const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 3, 1)
const result = assembleLists(definitionLevels, repetitionLevels, values, true, 3, 1)
expect(result).toEqual([
[1, 2, 3],
[undefined, 1, 2, undefined, 3, undefined],
@ -111,7 +109,7 @@ describe('assembleObjects', () => {
const definitionLevels = [5, 5, 5, 5, 4, 5, 5, 4, 5, 4, 5, 3, 2, 2, 1, 0, 0, 2, 5, 5]
const repetitionLevels = [0, 2, 1, 2, 0, 2, 2, 2, 1, 2, 2, 1, 1, 0, 0, 0, 0, 0, 1, 2]
const values = [1, 2, 3, 4, 1, 2, 3, 4, 5, 6]
const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 5, 2)
const result = assembleLists(definitionLevels, repetitionLevels, values, true, 5, 2)
expect(result).toEqual([
[[1, 2], [3, 4]],
[[undefined, 1, 2, undefined], [3, undefined, 4], [], undefined],
@ -127,7 +125,7 @@ describe('assembleObjects', () => {
const definitionLevels = [3, 4, 3, 3]
const repetitionLevels = [0, 1, 1, 1]
const values = ['k1']
const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 4, 2)
const result = assembleLists(definitionLevels, repetitionLevels, values, true, 4, 2)
expect(result).toEqual([[[], ['k1'], [], []]])
})
@ -135,7 +133,7 @@ describe('assembleObjects', () => {
const definitionLevels = [3, 5, 3, 3]
const repetitionLevels = [0, 1, 1, 1]
const values = ['v1']
const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 5, 2)
const result = assembleLists(definitionLevels, repetitionLevels, values, true, 5, 2)
expect(result).toEqual([[[], ['v1'], [], []]])
})
@ -144,7 +142,7 @@ describe('assembleObjects', () => {
const definitionLevels = [2, 2, 2, 0, 0, 2, 2, 2, 2, 2]
const repetitionLevels = [0, 1, 1, 0, 0, 0, 1, 1, 0, 1]
const values = [1, 2, 3, 1, 2, 3, 1, 2]
const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 2, 1)
const result = assembleLists(definitionLevels, repetitionLevels, values, true, 2, 1)
expect(result).toEqual([[1, 2, 3], undefined, undefined, [1, 2, 3], [1, 2]])
})
@ -152,7 +150,7 @@ describe('assembleObjects', () => {
// from nonnullable.impala.parquet nested_Struct i
const definitionLevels = [0]
const repetitionLevels = [0]
const result = assembleObjects(definitionLevels, repetitionLevels, [], false, 2, 2)
const result = assembleLists(definitionLevels, repetitionLevels, [], false, 2, 2)
expect(result).toEqual([[[]]])
})
})