Refactor to use schemaPath

This commit is contained in:
Kenny Daniel 2024-04-29 17:38:26 -07:00
parent 6ebb6a9ee8
commit 2c6a111113
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
7 changed files with 94 additions and 88 deletions

@ -3,7 +3,7 @@ import { convert } from './convert.js'
import { readDataPage, readDictionaryPage } from './datapage.js'
import { readDataPageV2 } from './datapageV2.js'
import { parquetHeader } from './header.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired, schemaElement } from './schema.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel, getSchemaPath, isRequired } from './schema.js'
import { snappyUncompress } from './snappy.js'
import { concat } from './utils.js'
@ -33,6 +33,9 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
/** @type {any[]} */
const rowData = []
const schemaPath = getSchemaPath(schema, columnMetadata.path_in_schema)
const schemaElement = schemaPath[schemaPath.length - 1].element
while (valuesSeen < rowGroup.num_rows) {
// parse column header
const { value: header, byteLength: headerLength } = parquetHeader(arrayBuffer, columnOffset + byteOffset)
@ -55,7 +58,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
const page = decompressPage(
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors
)
const { definitionLevels, repetitionLevels, value: dataPage } = readDataPage(page, daph, schema, columnMetadata)
const { definitionLevels, repetitionLevels, value: dataPage } = readDataPage(page, daph, schemaPath, columnMetadata)
valuesSeen += daph.num_values
const dictionaryEncoding = daph.encoding === 'PLAIN_DICTIONARY' || daph.encoding === 'RLE_DICTIONARY'
@ -66,26 +69,26 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
if (repetitionLevels.length) {
dereferenceDictionary(dictionary, dataPage)
// Use repetition levels to construct lists
const isNullable = columnMetadata && !isRequired(schema, [columnMetadata.path_in_schema[0]])
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema)
const isNullable = columnMetadata && !isRequired(schemaPath.slice(0, 2))
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
// convert primitive types to rich types
values = convert(dataPage, schemaElement)
values = assembleObjects(
definitionLevels, repetitionLevels, values, isNullable, maxDefinitionLevel, maxRepetitionLevel
)
} else if (definitionLevels?.length) {
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
// Use definition levels to skip nulls
values = []
skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, values)
} else {
if (dictionaryEncoding && dictionary) {
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, schemaElement(schema, columnMetadata.path_in_schema).element)
values = convert(dataPage, schemaElement)
} else if (Array.isArray(dataPage)) {
// convert primitive types to rich types
values = convert(dataPage, schemaElement(schema, columnMetadata.path_in_schema).element)
values = convert(dataPage, schemaElement)
} else {
values = dataPage // TODO: data page shouldn't be a fixed byte array?
}
@ -103,18 +106,18 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
const page = decompressPage(
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors
)
dictionary = readDictionaryPage(page, diph, schema, columnMetadata)
dictionary = readDictionaryPage(page, diph, columnMetadata)
} else if (header.type === 'DATA_PAGE_V2') {
const daph2 = header.data_page_header_v2
if (!daph2) throw new Error('parquet data page header v2 is undefined')
const { definitionLevels, repetitionLevels, value: dataPage } = readDataPageV2(
compressedBytes, header, schema, columnMetadata, compressors
compressedBytes, header, schemaPath, columnMetadata, compressors
)
valuesSeen += daph2.num_values
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema)
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
if (repetitionLevels.length) {
dereferenceDictionary(dictionary, dataPage)
// Use repetition levels to construct lists

@ -1,5 +1,5 @@
import { readData, readPlain, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired, schemaElement, skipDefinitionBytes } from './schema.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired, skipDefinitionBytes } from './schema.js'
const skipNulls = false // TODO
@ -10,34 +10,32 @@ const skipNulls = false // TODO
* @typedef {import("./types.d.ts").DataPage} DataPage
* @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData
* @typedef {import("./types.d.ts").DataPageHeader} DataPageHeader
* @typedef {import("./types.d.ts").SchemaElement} SchemaElement
* @typedef {import("./types.d.ts").SchemaTree} SchemaTree
* @param {Uint8Array} bytes raw page data (should already be decompressed)
* @param {DataPageHeader} daph data page header
* @param {SchemaElement[]} schema schema for the file
* @param {SchemaTree[]} schemaPath schema for the file
* @param {ColumnMetaData} columnMetadata metadata for the column
* @returns {DataPage} definition levels, repetition levels, and array of values
*/
export function readDataPage(bytes, daph, schema, columnMetadata) {
export function readDataPage(bytes, daph, schemaPath, columnMetadata) {
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
const reader = { view, offset: 0 }
/** @type {any[]} */
let values = []
// repetition levels
const repetitionLevels = readRepetitionLevels(
reader, daph, schema, columnMetadata
)
const repetitionLevels = readRepetitionLevels(reader, daph, schemaPath)
// definition levels
let definitionLevels = undefined
let numNulls = 0
// let maxDefinitionLevel = -1
// TODO: move into readDefinitionLevels
if (skipNulls && !isRequired(schema, columnMetadata.path_in_schema)) {
if (skipNulls && !isRequired(schemaPath)) {
// skip_definition_bytes
reader.offset += skipDefinitionBytes(daph.num_values)
} else {
const dl = readDefinitionLevels(reader, daph, schema, columnMetadata.path_in_schema)
const dl = readDefinitionLevels(reader, daph, schemaPath)
definitionLevels = dl.definitionLevels
numNulls = dl.numNulls
}
@ -45,7 +43,7 @@ export function readDataPage(bytes, daph, schema, columnMetadata) {
// read values based on encoding
const nValues = daph.num_values - numNulls
if (daph.encoding === 'PLAIN') {
const { element } = schemaElement(schema, columnMetadata.path_in_schema)
const { element } = schemaPath[schemaPath.length - 1]
const utf8 = element.converted_type === 'UTF8'
const plainObj = readPlain(reader, columnMetadata.type, nValues, utf8)
values = Array.isArray(plainObj) ? plainObj : Array.from(plainObj)
@ -85,11 +83,10 @@ export function readDataPage(bytes, daph, schema, columnMetadata) {
* @typedef {import("./types.d.ts").DictionaryPageHeader} DictionaryPageHeader
* @param {Uint8Array} bytes raw page data
* @param {DictionaryPageHeader} diph dictionary page header
* @param {SchemaElement[]} schema schema for the file
* @param {ColumnMetaData} columnMetadata metadata for the column
* @returns {ArrayLike<any>} array of values
*/
export function readDictionaryPage(bytes, diph, schema, columnMetadata) {
export function readDictionaryPage(bytes, diph, columnMetadata) {
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
const reader = { view, offset: 0 }
return readPlain(reader, columnMetadata.type, diph.num_values, false)
@ -101,13 +98,12 @@ export function readDictionaryPage(bytes, diph, schema, columnMetadata) {
* @typedef {import("./types.d.ts").DataReader} DataReader
* @param {DataReader} reader data view for the page
* @param {DataPageHeader} daph data page header
* @param {SchemaElement[]} schema schema for the file
* @param {ColumnMetaData} columnMetadata metadata for the column
* @param {SchemaTree[]} schemaPath schema path for the column
* @returns {any[]} repetition levels and number of bytes read
*/
function readRepetitionLevels(reader, daph, schema, columnMetadata) {
if (columnMetadata.path_in_schema.length > 1) {
const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema)
function readRepetitionLevels(reader, daph, schemaPath) {
if (schemaPath.length > 1) {
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
if (maxRepetitionLevel) {
const bitWidth = widthFromMaxInt(maxRepetitionLevel)
return readData(
@ -123,13 +119,12 @@ function readRepetitionLevels(reader, daph, schema, columnMetadata) {
*
* @param {DataReader} reader data view for the page
* @param {DataPageHeader} daph data page header
* @param {SchemaElement[]} schema schema for the file
* @param {string[]} path_in_schema path in the schema
* @param {SchemaTree[]} schemaPath schema path for the column
* @returns {DefinitionLevels} definition levels and number of bytes read
*/
function readDefinitionLevels(reader, daph, schema, path_in_schema) {
if (!isRequired(schema, path_in_schema)) {
const maxDefinitionLevel = getMaxDefinitionLevel(schema, path_in_schema)
function readDefinitionLevels(reader, daph, schemaPath) {
if (!isRequired(schemaPath)) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const bitWidth = widthFromMaxInt(maxDefinitionLevel)
if (bitWidth) {
// num_values is index 1 for either type of page header

@ -1,6 +1,6 @@
import { decompressPage } from './column.js'
import { readPlain, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel, schemaElement } from './schema.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
import { readVarInt, readZigZag } from './thrift.js'
/**
@ -10,15 +10,15 @@ import { readVarInt, readZigZag } from './thrift.js'
* @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData
* @typedef {import("./types.d.ts").Compressors} Compressors
* @typedef {import("./types.d.ts").DataPageHeaderV2} DataPageHeaderV2
* @typedef {import("./types.d.ts").SchemaElement} SchemaElement
* @typedef {import("./types.d.ts").SchemaTree} SchemaTree
* @param {Uint8Array} compressedBytes raw page data (should already be decompressed)
* @param {import("./types.d.ts").PageHeader} ph page header
* @param {SchemaElement[]} schema schema for the file
* @param {SchemaTree[]} schemaPath schema path for the column
* @param {ColumnMetaData} columnMetadata metadata for the column
* @param {Compressors | undefined} compressors
* @returns {DataPage} definition levels, repetition levels, and array of values
*/
export function readDataPageV2(compressedBytes, ph, schema, columnMetadata, compressors) {
export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, compressors) {
const view = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength)
const reader = { view, offset: 0 }
/** @type {any} */
@ -28,14 +28,14 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata, comp
if (!daph2) throw new Error('parquet data page header v2 is undefined')
// repetition levels
const repetitionLevels = readRepetitionLevelsV2(reader, daph2, schema, columnMetadata)
const repetitionLevels = readRepetitionLevelsV2(reader, daph2, schemaPath)
if (reader.offset !== daph2.repetition_levels_byte_length) {
throw new Error(`parquet repetition levels byte length ${reader.offset} does not match expected ${daph2.repetition_levels_byte_length}`)
}
// definition levels
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const definitionLevels = readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel)
if (reader.offset !== daph2.repetition_levels_byte_length + daph2.definition_levels_byte_length) {
@ -47,7 +47,7 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata, comp
// read values based on encoding
const nValues = daph2.num_values - daph2.num_nulls
if (daph2.encoding === 'PLAIN') {
const { element } = schemaElement(schema, columnMetadata.path_in_schema)
const { element } = schemaPath[schemaPath.length - 1]
const utf8 = element.converted_type === 'UTF8'
let page = compressedBytes.slice(reader.offset)
if (daph2.is_compressed && columnMetadata.codec !== 'UNCOMPRESSED') {
@ -99,12 +99,11 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata, comp
* @typedef {import("./types.d.ts").DataReader} DataReader
* @param {DataReader} reader data view for the page
* @param {DataPageHeaderV2} daph2 data page header
* @param {SchemaElement[]} schema schema for the file
* @param {ColumnMetaData} columnMetadata metadata for the column
* @param {SchemaTree[]} schemaPath schema path for the column
* @returns {any[]} repetition levels and number of bytes read
*/
export function readRepetitionLevelsV2(reader, daph2, schema, columnMetadata) {
const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema)
export function readRepetitionLevelsV2(reader, daph2, schemaPath) {
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
if (!maxRepetitionLevel) return []
const bitWidth = widthFromMaxInt(maxRepetitionLevel)

@ -1,5 +1,5 @@
import { CompressionCodec, ConvertedType, Encoding, FieldRepetitionType, ParquetType } from './constants.js'
import { schemaElement } from './schema.js'
import { getSchemaPath } from './schema.js'
import { deserializeTCompactProtocol } from './thrift.js'
/**
@ -172,7 +172,7 @@ export function parquetMetadata(arrayBuffer) {
* @returns {import("./types.d.ts").SchemaTree} tree of schema elements
*/
export function parquetSchema(metadata) {
return schemaElement(metadata.schema, [])
return getSchemaPath(metadata.schema, [])[0]
}
/**

@ -150,7 +150,6 @@ async function readRowGroup(options, rowGroup, groupStart) {
// read column data async
promises.push(buffer.then(arrayBuffer => {
// TODO: extract SchemaElement for this column
/** @type {ArrayLike<any> | undefined} */
let columnData = readColumn(
arrayBuffer, bufferOffset, rowGroup, columnMetadata, metadata.schema, compressors

@ -34,7 +34,7 @@ function schemaTree(schema, rootIndex) {
* @param {string[]} name path to the element
* @returns {SchemaTree} schema element
*/
export function schemaElement(schema, name) {
function schemaElement(schema, name) {
let tree = schemaTree(schema, 0)
// traverse the tree to find the element
for (const part of name) {
@ -46,21 +46,34 @@ export function schemaElement(schema, name) {
}
/**
* Check if the schema element with the given name is required.
* An element is required if all of its ancestors are required.
* Get each schema element from the root to the given element name.
*
* @param {SchemaElement[]} schema
* @param {string[]} name path to the element
* @returns {SchemaTree[]} schema element
*/
export function getSchemaPath(schema, name) {
let tree = schemaTree(schema, 0)
const path = [tree]
for (const part of name) {
const child = tree.children.find(child => child.element.name === part)
if (!child) throw new Error(`parquet schema element not found: ${name}`)
path.push(child)
tree = child
}
return path
}
/**
* Check if the schema element with the given name is required.
* An element is required if all of its ancestors are required.
*
* @param {SchemaTree[]} schemaPath
* @returns {boolean} true if the element is required
*/
export function isRequired(schema, name) {
/** @type {SchemaTree | undefined} */
let tree = schemaTree(schema, 0)
for (let i = 0; i < name.length; i++) {
// Find schema child with the given name
tree = tree.children.find(child => child.element.name === name[i])
if (!tree) throw new Error(`parquet schema element not found: ${name}`)
if (tree.element.repetition_type !== 'REQUIRED') {
export function isRequired(schemaPath) {
for (const { element } of schemaPath.slice(1)) {
if (element.repetition_type !== 'REQUIRED') {
return false
}
}
@ -70,36 +83,32 @@ export function isRequired(schema, name) {
/**
* Get the max repetition level for a given schema path.
*
* @param {SchemaElement[]} schema
* @param {string[]} parts path to the element
* @param {SchemaTree[]} schemaPath
* @returns {number} max repetition level
*/
export function getMaxRepetitionLevel(schema, parts) {
export function getMaxRepetitionLevel(schemaPath) {
let maxLevel = 0
parts.forEach((part, i) => {
const { element } = schemaElement(schema, parts.slice(0, i + 1))
for (const { element } of schemaPath.slice(1)) {
if (element.repetition_type === 'REPEATED') {
maxLevel++
}
})
}
return maxLevel
}
/**
* Get the max definition level for a given schema path.
*
* @param {SchemaElement[]} schema
* @param {string[]} parts path to the element
* @param {SchemaTree[]} schemaPath
* @returns {number} max definition level
*/
export function getMaxDefinitionLevel(schema, parts) {
export function getMaxDefinitionLevel(schemaPath) {
let maxLevel = 0
parts.forEach((part, i) => {
const { element } = schemaElement(schema, parts.slice(0, i + 1))
for (const { element } of schemaPath.slice(1)) {
if (element.repetition_type !== 'REQUIRED') {
maxLevel++
}
})
}
return maxLevel
}

@ -2,10 +2,10 @@ import { describe, expect, it } from 'vitest'
import {
getMaxDefinitionLevel,
getMaxRepetitionLevel,
getSchemaPath,
isListLike,
isMapLike,
isRequired,
schemaElement,
skipDefinitionBytes,
} from '../src/schema.js'
@ -26,9 +26,10 @@ describe('Parquet schema utils', () => {
{ name: 'value', repetition_type: 'OPTIONAL' },
]
describe('schemaElement', () => {
it('should return the schema element', () => {
expect(schemaElement(schema, ['child1'])).toEqual({
describe('getSchemaPath', () => {
it('should return the schema path', () => {
const path = getSchemaPath(schema, ['child1'])
expect(path[path.length - 1]).toEqual({
children: [],
count: 1,
element: { name: 'child1', repetition_type: 'OPTIONAL' },
@ -36,30 +37,30 @@ describe('Parquet schema utils', () => {
})
it('should throw an error if element not found', () => {
expect(() => schemaElement(schema, ['nonexistent']))
expect(() => getSchemaPath(schema, ['nonexistent']))
.toThrow('parquet schema element not found: nonexistent')
})
})
it('isRequired', () => {
expect(isRequired(schema, [])).toBe(true)
expect(isRequired(schema, ['child1'])).toBe(false)
expect(isRequired(schema, ['child2'])).toBe(false)
expect(isRequired(schema, ['child3'])).toBe(false)
expect(isRequired(getSchemaPath(schema, []))).toBe(true)
expect(isRequired(getSchemaPath(schema, ['child1']))).toBe(false)
expect(isRequired(getSchemaPath(schema, ['child2']))).toBe(false)
expect(isRequired(getSchemaPath(schema, ['child3']))).toBe(false)
})
it('getMaxRepetitionLevel', () => {
expect(getMaxRepetitionLevel(schema, ['child1'])).toBe(0)
expect(getMaxRepetitionLevel(schema, ['child2'])).toBe(0)
expect(getMaxRepetitionLevel(schema, ['child2', 'list', 'element'])).toBe(1)
expect(getMaxRepetitionLevel(schema, ['child3'])).toBe(0)
expect(getMaxRepetitionLevel(schema, ['child3', 'map', 'key'])).toBe(1)
expect(getMaxRepetitionLevel(getSchemaPath(schema, ['child1']))).toBe(0)
expect(getMaxRepetitionLevel(getSchemaPath(schema, ['child2']))).toBe(0)
expect(getMaxRepetitionLevel(getSchemaPath(schema, ['child2', 'list', 'element']))).toBe(1)
expect(getMaxRepetitionLevel(getSchemaPath(schema, ['child3']))).toBe(0)
expect(getMaxRepetitionLevel(getSchemaPath(schema, ['child3', 'map', 'key']))).toBe(1)
})
it('getMaxDefinitionLevel', () => {
expect(getMaxDefinitionLevel(schema, ['child1'])).toBe(1)
expect(getMaxDefinitionLevel(schema, ['child2'])).toBe(1)
expect(getMaxDefinitionLevel(schema, ['child3'])).toBe(1)
expect(getMaxDefinitionLevel(getSchemaPath(schema, ['child1']))).toBe(1)
expect(getMaxDefinitionLevel(getSchemaPath(schema, ['child2']))).toBe(1)
expect(getMaxDefinitionLevel(getSchemaPath(schema, ['child3']))).toBe(1)
})
it('skipDefinitionBytes', () => {