Rewrite dremel assembly

This commit is contained in:
Kenny Daniel 2024-03-20 17:24:25 -07:00
parent ef9e6a40f3
commit 46df1ab454
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
5 changed files with 126 additions and 54 deletions

@ -1,59 +1,81 @@
/**
* Dremel-assembly of arrays of values into lists
*
* Reconstructs a complex nested structure from flat arrays of definition and repetition levels,
* according to Dremel encoding. This simplified version focuses on arrays and scalar values,
* with optional support for null values.
*
* @param {number[] | undefined} definitionLevels definition levels, max 3
* @param {number[]} repetitionLevels repetition levels, max 1
* @param {ArrayLike<any>} values values to process
* @param {boolean} isNull can an entry be null?
* @param {number} maxDefinitionLevel definition level that corresponds to non-null
* @param {number} maxRepetitionLevel repetition level that corresponds to a new row
* @returns {any[]} array of values
*/
export function assembleObjects(
definitionLevels, repetitionLevels, values, isNull, maxDefinitionLevel
definitionLevels, repetitionLevels, values, isNull, maxDefinitionLevel, maxRepetitionLevel
) {
let valueIndex = 0
let started = false
let haveNull = false
let outputIndex = 0
let part = []
/** @type {any[]} */
const output = []
let currentContainer = output
for (let counter = 0; counter < repetitionLevels.length; counter++) {
const def = definitionLevels?.length ? definitionLevels[counter] : maxDefinitionLevel
const rep = repetitionLevels[counter]
// Trackers for nested structures.
const containerStack = [output]
if (!rep) {
// new row - save what we have
if (started) {
output[outputIndex] = haveNull ? undefined : part
part = []
outputIndex++
} else {
// first time: no row to save yet, unless it's a row continued from previous page
if (valueIndex > 0) {
output[outputIndex - 1] = output[outputIndex - 1]?.concat(part) // add items to previous row
part = []
// don't increment i since we only filled i-1
for (let i = 0; i < repetitionLevels.length; i++) {
const def = definitionLevels?.length ? definitionLevels[i] : maxDefinitionLevel
const rep = repetitionLevels[i]
if (rep !== maxRepetitionLevel) {
// Move back to the parent container
while (rep < containerStack.length - 1) {
containerStack.pop()
}
// Construct new lists up to max repetition level
// @ts-expect-error won't be empty
currentContainer = containerStack.at(-1)
if (def) {
for (let j = rep; j < maxRepetitionLevel; j++) {
/** @type {any[]} */
const newList = []
currentContainer.push(newList)
currentContainer = newList
containerStack.push(newList)
}
started = true
}
}
// Add value or null based on definition level
if (def === maxDefinitionLevel) {
// append real value to current item
part.push(values[valueIndex])
valueIndex++
} else if (def > 0) {
// append null to current item
part.push(undefined)
currentContainer.push(values[valueIndex++])
} else if (isNull && def < maxDefinitionLevel) {
// Go up one level to add null
if (def) {
containerStack.pop()
// @ts-expect-error won't be empty
currentContainer = containerStack.at(-1)
} else {
currentContainer.push(undefined)
}
}
haveNull = def === 0 && isNull
}
if (started) {
output[outputIndex] = haveNull ? undefined : part
// Handle edge cases for empty inputs or single-level data
if (output.length === 0) {
if (values.length > 0 && maxRepetitionLevel === 0) {
// All values belong to the same (root) list
return [values]
}
// return max definition level of nested lists
/** @type {any[]} */
for (let i = 0; i < maxDefinitionLevel; i++) {
/** @type {any[]} */
const newList = []
currentContainer.push(newList)
currentContainer = newList
}
}
return output

@ -4,7 +4,7 @@ import { convert } from './convert.js'
import { readDataPage, readDictionaryPage } from './datapage.js'
import { readDataPageV2 } from './datapageV2.js'
import { parquetHeader } from './header.js'
import { getMaxDefinitionLevel, isRequired, schemaElement } from './schema.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired, schemaElement } from './schema.js'
import { snappyUncompress } from './snappy.js'
/**
@ -67,8 +67,9 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
// Use repetition levels to construct lists
const isNull = columnMetadata && !isRequired(schema, [columnMetadata.path_in_schema[0]])
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema)
values = assembleObjects(
definitionLevels, repetitionLevels, dataPage, isNull, maxDefinitionLevel
definitionLevels, repetitionLevels, dataPage, isNull, maxDefinitionLevel, maxRepetitionLevel
)
} else if (definitionLevels?.length) {
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
@ -110,11 +111,12 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
valuesSeen += daph2.num_values
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema)
if (repetitionLevels.length) {
dereferenceDictionary(dictionary, dataPage)
// Use repetition levels to construct lists
rowData.push(...assembleObjects(
definitionLevels, repetitionLevels, dataPage, true, maxDefinitionLevel
definitionLevels, repetitionLevels, dataPage, true, maxDefinitionLevel, maxRepetitionLevel
))
} else if (daph2.num_nulls) {
// skip nulls

@ -172,11 +172,16 @@ async function readRowGroup(options, rowGroup) {
// assemble map-like column data
for (let i = 0; i < keys.length; i++) {
// keys will be empty for {} and undefined for null
if (keys[i] !== undefined) {
if (keys[i]) {
/** @type {Record<string, any>} */
const obj = {}
for (let j = 0; j < keys[i].length; j++) {
if (keys[i][j] === undefined) continue
if (Array.isArray(keys[i][j])) {
// TODO: key should not be an array, this is an assemble bug
keys[i][j] = keys[i][j][0]
values[i][j] = values[i][j][0]
}
if (!keys[i][j]) continue
obj[keys[i][j]] = values[i][j] === undefined ? null : values[i][j]
}
out.push(obj)

@ -5,28 +5,34 @@ describe('assembleObjects', () => {
it('should assemble objects with non-null values', () => {
const repetitionLevels = [0, 1]
const values = ['a', 'b']
const result = assembleObjects([], repetitionLevels, values, false, 3)
const result = assembleObjects([], repetitionLevels, values, false, 3, 1)
expect(result).toEqual([['a', 'b']])
})
it('should handle null values', () => {
const definitionLevels = [3, 0, 3]
const repetitionLevels = [0, 0, 1]
const repetitionLevels = [0, 1, 1]
const values = ['a', 'c']
const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 3)
expect(result).toEqual([['a'], ['c']])
const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 3, 1)
expect(result).toEqual([['a', undefined, 'c']])
})
it('should handle empty lists', () => {
const result = assembleObjects([], [], [], false, 3)
const result = assembleObjects([], [], [], false, 0, 0)
expect(result).toEqual([])
})
it('should handle multiple lists', () => {
const definitionLevels = [3, 3, 3, 3, 3, 3]
const repetitionLevels = [0, 0]
const values = [22, 33]
const result = assembleObjects([], repetitionLevels, values, false, 3, 1)
expect(result).toEqual([[22], [33]])
})
it('should handle multiple lists (6)', () => {
const repetitionLevels = [0, 1, 1, 0, 1, 1]
const values = [1, 2, 3, 4, 5, 6]
const result = assembleObjects(definitionLevels, repetitionLevels, values, false, 3)
const result = assembleObjects([], repetitionLevels, values, false, 3, 1)
expect(result).toEqual([[1, 2, 3], [4, 5, 6]])
})
@ -34,15 +40,52 @@ describe('assembleObjects', () => {
const definitionLevels = [3, 3, 0, 3, 3]
const repetitionLevels = [0, 1, 0, 0, 1]
const values = ['a', 'b', 'd', 'e']
const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 3)
const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 3, 1)
expect(result).toEqual([['a', 'b'], undefined, ['d', 'e']])
})
it('should handle continuing a row from the previous page', () => {
const definitionLevels = [3, 3, 3, 1]
const repetitionLevels = [1, 0, 1, 0]
const values = ['a', 'b', 'c', 'd']
const result = assembleObjects(definitionLevels, repetitionLevels, values, false, 3)
expect(result).toEqual([['b', 'c'], [undefined]])
// it('should handle continuing a row from the previous page', () => {
// const definitionLevels = [3, 3, 3, 1]
// const repetitionLevels = [1, 0, 1, 0]
// const values = ['a', 'b', 'c', 'd']
// const result = assembleObjects(definitionLevels, repetitionLevels, values, false, 3, 1)
// expect(result).toEqual([['b', 'c'], [undefined]])
// })
it('should handle nested arrays', () => {
// from nullable.impala.parquet
const repetitionLevels = [0, 2, 1, 2]
const values = [1, 2, 3, 4]
const result = assembleObjects([], repetitionLevels, values, false, 3, 2)
expect(result).toEqual([[[1, 2], [3, 4]]])
})
it('should handle top repetition level', () => {
// from int_map.parquet
const definitionLevels = [2, 2, 2, 2, 1, 1, 1, 0, 2, 2]
const repetitionLevels = [0, 1, 0, 1, 0, 0, 0, 0, 0, 1]
const values = ['k1', 'k2', 'k1', 'k2', 'k1', 'k3']
const result = assembleObjects(definitionLevels, repetitionLevels, values, true, 2, 1)
expect(result).toEqual([
['k1', 'k2'],
['k1', 'k2'],
[],
[],
[],
undefined,
['k1', 'k3'],
])
})
it('should handle empty lists with definition level', () => {
// from nonnullable.impala.parquet
const result = assembleObjects([0], [0], [], false, 2, 2)
expect(result).toEqual([[[]]])
})
it('should handle isNull correctly', () => {
// from nonnullable.impala.parquet
const result = assembleObjects([2], [0], [-1], false, 2, 2)
expect(result).toEqual([[[-1]]])
})
})

@ -2,13 +2,13 @@
[
8,
[-1],
[-1, -2, null],
[[-1, -2], []],
{ "k1": -1 },
{ "k1": 1 },
-1,
[-1],
[-1],
["nonnullable"],
[]
[[-1]],
[["nonnullable"]],
[[]]
]
]