Refactor assembleLists to take a schemaPath

This commit is contained in:
Kenny Daniel 2025-04-02 23:03:46 -07:00
parent 1247f5d606
commit b38b65f7c7
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
4 changed files with 66 additions and 64 deletions

@ -1,24 +1,22 @@
import { isListLike, isMapLike } from './schema.js'
import { getMaxDefinitionLevel, isListLike, isMapLike } from './schema.js'
/**
* Dremel-assembly of arrays of values into lists
* Reconstructs a complex nested structure from flat arrays of values and
* definition and repetition levels, according to Dremel encoding.
*
* Reconstructs a complex nested structure from flat arrays of definition and repetition levels,
* according to Dremel encoding.
*
* @import {DecodedArray, FieldRepetitionType} from '../src/types.d.ts'
* @import {DecodedArray} from '../src/types.d.ts'
* @param {any[]} output
* @param {number[] | undefined} definitionLevels
* @param {number[]} repetitionLevels
* @param {DecodedArray} values
* @param {(FieldRepetitionType | undefined)[]} repetitionPath
* @param {number} maxDefinitionLevel definition level that corresponds to non-null
* @returns {any[]}
* @param {SchemaTree[]} schemaPath
* @returns {DecodedArray}
*/
export function assembleLists(
output, definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel
) {
export function assembleLists(output, definitionLevels, repetitionLevels, values, schemaPath) {
const n = definitionLevels?.length || repetitionLevels.length
if (!n) return values
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const repetitionPath = schemaPath.map(({ element }) => element.repetition_type)
let valueIndex = 0
// Track state of nested structures

@ -3,7 +3,6 @@ import { Encoding, PageType } from './constants.js'
import { convertWithDictionary } from './convert.js'
import { decompressPage, readDataPage, readDataPageV2 } from './datapage.js'
import { readPlain } from './plain.js'
import { getMaxDefinitionLevel } from './schema.js'
import { deserializeTCompactProtocol } from './thrift.js'
/**
@ -81,9 +80,7 @@ export function readPage(reader, columnMetadata, schemaPath, element, dictionary
// convert types, dereference dictionary, and assemble lists
let values = convertWithDictionary(dataPage, dictionary, element, daph.encoding, utf8)
if (repetitionLevels.length || definitionLevels?.length) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const repetitionPath = schemaPath.map(({ element }) => element.repetition_type)
return assembleLists([], definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel)
return assembleLists([], definitionLevels, repetitionLevels, values, schemaPath)
} else {
// wrap nested flat data by depth
for (let i = 2; i < schemaPath.length; i++) {
@ -103,13 +100,7 @@ export function readPage(reader, columnMetadata, schemaPath, element, dictionary
// convert types, dereference dictionary, and assemble lists
const values = convertWithDictionary(dataPage, dictionary, element, daph2.encoding, utf8)
if (repetitionLevels.length || definitionLevels?.length) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const repetitionPath = schemaPath.map(({ element }) => element.repetition_type)
return assembleLists([], definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel)
} else {
return values
}
return assembleLists([], definitionLevels, repetitionLevels, values, schemaPath)
} else if (header.type === 'DICTIONARY_PAGE') {
const diph = header.dictionary_page_header
if (!diph) throw new Error('parquet dictionary page header is undefined')

@ -92,7 +92,7 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) {
}
const promises = []
// Top-level columns to assemble
// top-level columns to assemble
const { children } = getSchemaPath(metadata.schema, [])[0]
const subcolumnNames = new Map(children.map(child => [child.element.name, getSubcolumns(child)]))
/** @type {Map<string, DecodedArray[]>} */
@ -138,7 +138,7 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) {
let chunks = columnData
// TODO: fast path for non-nested columns
// Save column data for assembly
// save column data for assembly
const subcolumn = columnMetadata.path_in_schema.join('.')
subcolumnData.set(subcolumn, chunks)
chunks = undefined

@ -1,22 +1,33 @@
import { describe, expect, it } from 'vitest'
import { assembleLists } from '../src/assemble.js'
/** @typedef {import('../src/types.js').FieldRepetitionType | undefined} FieldRepetitionType */
describe('assembleLists', () => {
/** @type {FieldRepetitionType[]} */
const nonnullable = [undefined, 'REQUIRED', 'REPEATED', 'REQUIRED']
/** @type {FieldRepetitionType[]} */
const nullable = [undefined, 'OPTIONAL', 'REPEATED', 'OPTIONAL']
/** @type {FieldRepetitionType[]} */
const nestedRequired = [undefined, 'REQUIRED', 'REPEATED', 'REQUIRED', 'REPEATED', 'REQUIRED']
/** @type {FieldRepetitionType[]} */
const nestedOptional = [undefined, 'OPTIONAL', 'REPEATED', 'OPTIONAL', 'REPEATED', 'OPTIONAL']
const nonnullable = toSchemaPath([undefined, 'REQUIRED', 'REPEATED', 'REQUIRED'])
const nullable = toSchemaPath([undefined, 'OPTIONAL', 'REPEATED', 'OPTIONAL'])
const nestedRequired = toSchemaPath([undefined, 'REQUIRED', 'REPEATED', 'REQUIRED', 'REPEATED', 'REQUIRED'])
const nestedOptional = toSchemaPath([undefined, 'OPTIONAL', 'REPEATED', 'OPTIONAL', 'REPEATED', 'OPTIONAL'])
/**
* @import {FieldRepetitionType, SchemaTree} from '../src/types.js'
* @param {(FieldRepetitionType | undefined)[]} repetitionPath
* @returns {SchemaTree[]}
*/
function toSchemaPath(repetitionPath) {
return repetitionPath.map(repetition_type => ({
element: {
name: 'name',
repetition_type,
},
count: 1,
children: [],
path: [],
}))
}
it('should assemble objects with non-null values', () => {
const repetitionLevels = [0, 1]
const values = ['a', 'b']
const result = assembleLists([], [], repetitionLevels, values, nonnullable, 1)
const result = assembleLists([], [], repetitionLevels, values, nonnullable)
expect(result).toEqual([['a', 'b']])
})
@ -24,26 +35,26 @@ describe('assembleLists', () => {
const definitionLevels = [3, 0, 3]
const repetitionLevels = [0, 1, 1]
const values = ['a', 'c']
const result = assembleLists([], definitionLevels, repetitionLevels, values, nullable, 3)
const result = assembleLists([], definitionLevels, repetitionLevels, values, nullable)
expect(result).toEqual([[['a', null, 'c']]])
})
it('should handle empty lists', () => {
expect(assembleLists([], [], [], [], nonnullable, 0)).toEqual([])
expect(assembleLists([], [], [], [], nonnullable, 1)).toEqual([[]])
expect(assembleLists([], [], [], [], nonnullable)).toEqual([])
expect(assembleLists([], [], [], [], nullable)).toEqual([])
})
it('should handle multiple lists', () => {
const repetitionLevels = [0, 0]
const values = [22, 33]
const result = assembleLists([], [], repetitionLevels, values, nonnullable, 1)
const result = assembleLists([], [], repetitionLevels, values, nonnullable)
expect(result).toEqual([[22], [33]])
})
it('should handle multiple lists (6)', () => {
const repetitionLevels = [0, 1, 1, 0, 1, 1]
const values = [1, 2, 3, 4, 5, 6]
const result = assembleLists([], [], repetitionLevels, values, nonnullable, 1)
const result = assembleLists([], [], repetitionLevels, values, nonnullable)
expect(result).toEqual([[1, 2, 3], [4, 5, 6]])
})
@ -51,7 +62,7 @@ describe('assembleLists', () => {
const definitionLevels = [3, 3, 0, 3, 3]
const repetitionLevels = [0, 1, 0, 0, 1]
const values = ['a', 'b', 'd', 'e']
const result = assembleLists([], definitionLevels, repetitionLevels, values, nullable, 3)
const result = assembleLists([], definitionLevels, repetitionLevels, values, nullable)
expect(result).toEqual([[['a', 'b']], [], [['d', 'e']]])
})
@ -60,7 +71,7 @@ describe('assembleLists', () => {
const repetitionLevels = [1, 0, 1, 0]
const values = ['b', 'c', 'd', 'e']
const prev = [[['a']]]
const result = assembleLists(prev, definitionLevels, repetitionLevels, values, nullable, 3)
const result = assembleLists(prev, definitionLevels, repetitionLevels, values, nullable)
expect(result).toEqual([[['a', 'b']], [['c', 'd']], [[]]])
})
@ -68,7 +79,7 @@ describe('assembleLists', () => {
// from nullable.impala.parquet
const repetitionLevels = [0, 2, 1, 2]
const values = [1, 2, 3, 4]
const result = assembleLists([], [], repetitionLevels, values, nestedRequired, 2)
const result = assembleLists([], [], repetitionLevels, values, nestedRequired)
expect(result).toEqual([[[1, 2], [3, 4]]])
})
@ -77,9 +88,8 @@ describe('assembleLists', () => {
const definitionLevels = [2, 2, 2, 2, 1, 1, 1, 0, 2, 2]
const repetitionLevels = [0, 1, 0, 1, 0, 0, 0, 0, 0, 1]
const values = ['k1', 'k2', 'k1', 'k2', 'k1', 'k3']
/** @type {FieldRepetitionType[]} */
const repetitionPath = ['REQUIRED', 'OPTIONAL', 'REPEATED', 'REQUIRED'] // map key required
const result = assembleLists([], definitionLevels, repetitionLevels, values, repetitionPath, 2)
const schemaPath = toSchemaPath(['REQUIRED', 'OPTIONAL', 'REPEATED', 'REQUIRED'])
const result = assembleLists([], definitionLevels, repetitionLevels, values, schemaPath)
expect(result).toEqual([
[['k1', 'k2']],
[['k1', 'k2']],
@ -93,12 +103,12 @@ describe('assembleLists', () => {
it('should handle empty lists with definition level', () => {
// from nonnullable.impala.parquet
expect(assembleLists([], [0], [0], [], nonnullable, 1)).toEqual([[]])
expect(assembleLists([], [0], [0], [], nonnullable)).toEqual([[]])
})
it('should handle nonnullable lists', () => {
// from nonnullable.impala.parquet
expect(assembleLists([], [1], [0], [-1], nonnullable, 1)).toEqual([[-1]])
expect(assembleLists([], [1], [0], [-1], nonnullable)).toEqual([[-1]])
})
it('should handle nullable int_array', () => {
@ -107,7 +117,7 @@ describe('assembleLists', () => {
const definitionLevels = [3, 3, 3, 2, 3, 3, 2, 3, 2, 1, 0, 0]
const repetitionLevels = [0, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 0]
const values = [1, 2, 3, 1, 2, 3]
const result = assembleLists([], definitionLevels, repetitionLevels, values, nullable, 3)
const result = assembleLists([], definitionLevels, repetitionLevels, values, nullable)
expect(result).toEqual([
[[1, 2, 3]],
[[null, 1, 2, null, 3, null]],
@ -123,7 +133,7 @@ describe('assembleLists', () => {
const definitionLevels = [5, 5, 5, 5, 4, 5, 5, 4, 5, 4, 5, 3, 2, 2, 1, 0, 0, 2, 5, 5]
const repetitionLevels = [0, 2, 1, 2, 0, 2, 2, 2, 1, 2, 2, 1, 1, 0, 0, 0, 0, 0, 1, 2]
const values = [1, 2, 3, 4, 1, 2, 3, 4, 5, 6]
const result = assembleLists([], definitionLevels, repetitionLevels, values, nestedOptional, 5)
const result = assembleLists([], definitionLevels, repetitionLevels, values, nestedOptional)
expect(result).toEqual([
[[[[1, 2]], [[3, 4]]]],
[[[[null, 1, 2, null]], [[3, null, 4]], [[]], []]],
@ -139,16 +149,18 @@ describe('assembleLists', () => {
const definitionLevels = [3, 4, 3, 3]
const repetitionLevels = [0, 1, 1, 1]
const values = ['k1']
const result = assembleLists([], definitionLevels, repetitionLevels, values, nullable, 4)
expect(result).toEqual([[[null, 'k1', null, null]]])
const schemaPath = toSchemaPath([undefined, 'OPTIONAL', 'REPEATED', 'OPTIONAL', 'REPEATED', 'REQUIRED'])
const result = assembleLists([], definitionLevels, repetitionLevels, values, schemaPath)
expect(result).toEqual([[[[[]], [['k1']], [[]], [[]]]]])
})
it('should handle nonnullable int_map_array values', () => {
const definitionLevels = [3, 5, 3, 3]
const repetitionLevels = [0, 1, 1, 1]
const values = ['v1']
const result = assembleLists([], definitionLevels, repetitionLevels, values, nullable, 5)
expect(result).toEqual([[[null, 'v1', null, null]]])
const values = [1]
const schemaPath = toSchemaPath([undefined, 'OPTIONAL', 'REPEATED', 'OPTIONAL', 'REPEATED', 'OPTIONAL'])
const result = assembleLists([], definitionLevels, repetitionLevels, values, schemaPath)
expect(result).toEqual([[[[[]], [[1]], [[]], [[]]]]])
})
it('should handle mixed optional and required', () => {
@ -156,9 +168,8 @@ describe('assembleLists', () => {
const definitionLevels = [2, 2, 2, 0, 0, 2, 2, 2, 2, 2]
const repetitionLevels = [0, 1, 1, 0, 0, 0, 1, 1, 0, 1]
const values = [1, 2, 3, 1, 2, 3, 1, 2]
/** @type {FieldRepetitionType[]} */
const repetitionPath = [undefined, 'OPTIONAL', 'REPEATED', 'REQUIRED']
const result = assembleLists([], definitionLevels, repetitionLevels, values, repetitionPath, 2)
const schemaPath = toSchemaPath([undefined, 'OPTIONAL', 'REPEATED', 'REQUIRED'])
const result = assembleLists([], definitionLevels, repetitionLevels, values, schemaPath)
expect(result).toEqual([[[1, 2, 3]], [], [], [[1, 2, 3]], [[1, 2]]])
})
@ -166,23 +177,25 @@ describe('assembleLists', () => {
// from nonnullable.impala.parquet nested_Struct i
const definitionLevels = [0]
const repetitionLevels = [0]
/** @type {FieldRepetitionType[]} */
const repetitionPath = [undefined, 'REQUIRED', 'REQUIRED', 'REPEATED', 'REQUIRED', 'REQUIRED', 'REPEATED', 'REQUIRED']
const result = assembleLists([], definitionLevels, repetitionLevels, [], repetitionPath, 2)
const schemaPath = toSchemaPath([
undefined, 'REQUIRED', 'REQUIRED', 'REPEATED', 'REQUIRED', 'REQUIRED', 'REPEATED', 'REQUIRED',
])
const result = assembleLists([], definitionLevels, repetitionLevels, [], schemaPath)
expect(result).toEqual([[]])
})
it('should handle dzenilee', () => {
const repetitionLevels = [0, 1, 1, 0, 1, 1]
const values = ['a', 'b', 'c', 'd', 'e', 'f']
const result = assembleLists([], [], repetitionLevels, values, nullable, 3)
const result = assembleLists([], [], repetitionLevels, values, nullable)
expect(result).toEqual([[['a', 'b', 'c']], [['d', 'e', 'f']]])
})
it('handle complex.parquet with nested require', () => {
const definitionLevels = [1, 1]
const values = ['a', 'b']
const result = assembleLists([], definitionLevels, [], values, [undefined, 'OPTIONAL', 'REQUIRED', 'REQUIRED'], 1)
const schemaPath = toSchemaPath([undefined, 'OPTIONAL', 'REQUIRED', 'REQUIRED'])
const result = assembleLists([], definitionLevels, [], values, schemaPath)
expect(result).toEqual([['a'], ['b']])
})
})