Refactor isListLike and isMapLike to use schemaPath

This commit is contained in:
Kenny Daniel 2024-04-29 18:45:29 -07:00
parent 2c6a111113
commit 93ff9a9f99
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
6 changed files with 57 additions and 82 deletions

@ -3,12 +3,12 @@ import { convert } from './convert.js'
import { readDataPage, readDictionaryPage } from './datapage.js'
import { readDataPageV2 } from './datapageV2.js'
import { parquetHeader } from './header.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel, getSchemaPath, isRequired } from './schema.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired } from './schema.js'
import { snappyUncompress } from './snappy.js'
import { concat } from './utils.js'
/**
* @typedef {import('./types.js').SchemaElement} SchemaElement
* @typedef {import('./types.js').SchemaTree} SchemaTree
* @typedef {import('./types.js').ColumnMetaData} ColumnMetaData
* @typedef {import('./types.js').Compressors} Compressors
* @typedef {import('./types.js').RowGroup} RowGroup
@ -21,20 +21,18 @@ import { concat } from './utils.js'
* @param {number} columnOffset offset to start reading from
* @param {RowGroup} rowGroup row group metadata
* @param {ColumnMetaData} columnMetadata column metadata
* @param {SchemaElement[]} schema schema for the file
* @param {SchemaTree[]} schemaPath schema path for the column
* @param {Compressors} [compressors] custom decompressors
* @returns {ArrayLike<any>} array of values
*/
export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schema, compressors) {
export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schemaPath, compressors) {
/** @type {ArrayLike<any> | undefined} */
let dictionary = undefined
let valuesSeen = 0
let byteOffset = 0 // byteOffset within the column
/** @type {any[]} */
const rowData = []
const schemaPath = getSchemaPath(schema, columnMetadata.path_in_schema)
const schemaElement = schemaPath[schemaPath.length - 1].element
const { element } = schemaPath[schemaPath.length - 1]
while (valuesSeen < rowGroup.num_rows) {
// parse column header
@ -73,7 +71,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
// convert primitive types to rich types
values = convert(dataPage, schemaElement)
values = convert(dataPage, element)
values = assembleObjects(
definitionLevels, repetitionLevels, values, isNullable, maxDefinitionLevel, maxRepetitionLevel
)
@ -85,10 +83,10 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
} else {
if (dictionaryEncoding && dictionary) {
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, schemaElement)
values = convert(dataPage, element)
} else if (Array.isArray(dataPage)) {
// convert primitive types to rich types
values = convert(dataPage, schemaElement)
values = convert(dataPage, element)
} else {
values = dataPage // TODO: data page shouldn't be a fixed byte array?
}
@ -161,7 +159,7 @@ function dereferenceDictionary(dictionary, dataPage) {
/**
* Find the start byte offset for a column chunk.
*
* @param {ColumnMetaData} columnMetadata column metadata
* @param {ColumnMetaData} columnMetadata
* @returns {number} byte offset
*/
export function getColumnOffset(columnMetadata) {

@ -13,8 +13,8 @@ const skipNulls = false // TODO
* @typedef {import("./types.d.ts").SchemaTree} SchemaTree
* @param {Uint8Array} bytes raw page data (should already be decompressed)
* @param {DataPageHeader} daph data page header
* @param {SchemaTree[]} schemaPath schema for the file
* @param {ColumnMetaData} columnMetadata metadata for the column
* @param {SchemaTree[]} schemaPath
* @param {ColumnMetaData} columnMetadata
* @returns {DataPage} definition levels, repetition levels, and array of values
*/
export function readDataPage(bytes, daph, schemaPath, columnMetadata) {
@ -83,7 +83,7 @@ export function readDataPage(bytes, daph, schemaPath, columnMetadata) {
* @typedef {import("./types.d.ts").DictionaryPageHeader} DictionaryPageHeader
* @param {Uint8Array} bytes raw page data
* @param {DictionaryPageHeader} diph dictionary page header
* @param {ColumnMetaData} columnMetadata metadata for the column
* @param {ColumnMetaData} columnMetadata
* @returns {ArrayLike<any>} array of values
*/
export function readDictionaryPage(bytes, diph, columnMetadata) {
@ -98,7 +98,7 @@ export function readDictionaryPage(bytes, diph, columnMetadata) {
* @typedef {import("./types.d.ts").DataReader} DataReader
* @param {DataReader} reader data view for the page
* @param {DataPageHeader} daph data page header
* @param {SchemaTree[]} schemaPath schema path for the column
* @param {SchemaTree[]} schemaPath
* @returns {any[]} repetition levels and number of bytes read
*/
function readRepetitionLevels(reader, daph, schemaPath) {
@ -119,7 +119,7 @@ function readRepetitionLevels(reader, daph, schemaPath) {
*
* @param {DataReader} reader data view for the page
* @param {DataPageHeader} daph data page header
* @param {SchemaTree[]} schemaPath schema path for the column
* @param {SchemaTree[]} schemaPath
* @returns {DefinitionLevels} definition levels and number of bytes read
*/
function readDefinitionLevels(reader, daph, schemaPath) {

@ -13,8 +13,8 @@ import { readVarInt, readZigZag } from './thrift.js'
* @typedef {import("./types.d.ts").SchemaTree} SchemaTree
* @param {Uint8Array} compressedBytes raw page data (should already be decompressed)
* @param {import("./types.d.ts").PageHeader} ph page header
* @param {SchemaTree[]} schemaPath schema path for the column
* @param {ColumnMetaData} columnMetadata metadata for the column
* @param {SchemaTree[]} schemaPath
* @param {ColumnMetaData} columnMetadata
* @param {Compressors | undefined} compressors
* @returns {DataPage} definition levels, repetition levels, and array of values
*/
@ -99,7 +99,7 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata,
* @typedef {import("./types.d.ts").DataReader} DataReader
* @param {DataReader} reader data view for the page
* @param {DataPageHeaderV2} daph2 data page header
* @param {SchemaTree[]} schemaPath schema path for the column
* @param {SchemaTree[]} schemaPath
* @returns {any[]} repetition levels and number of bytes read
*/
export function readRepetitionLevelsV2(reader, daph2, schemaPath) {
@ -118,7 +118,7 @@ export function readRepetitionLevelsV2(reader, daph2, schemaPath) {
*
* @param {DataReader} reader data view for the page
* @param {DataPageHeaderV2} daph2 data page header v2
* @param {number} maxDefinitionLevel maximum definition level for this column
* @param {number} maxDefinitionLevel
* @returns {number[] | undefined} definition levels and number of bytes read
*/
function readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) {

@ -1,7 +1,7 @@
import { getColumnOffset, readColumn } from './column.js'
import { parquetMetadataAsync } from './metadata.js'
import { getColumnName, isMapLike } from './schema.js'
import { getColumnName, getSchemaPath, isMapLike } from './schema.js'
import { concat } from './utils.js'
/**
@ -75,7 +75,7 @@ export async function parquetRead(options) {
* @param {string[]} [options.columns] columns to read, all columns if undefined
* @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
* @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed
* @param {Compressors} [options.compressors] custom decompressors
* @param {Compressors} [options.compressors]
* @param {RowGroup} rowGroup row group to read
* @param {number} groupStart row index of the first row in the group
* @returns {Promise<any[][]>} resolves to row data
@ -88,9 +88,8 @@ async function readRowGroup(options, rowGroup, groupStart) {
let [groupStartByte, groupEndByte] = [file.byteLength, 0]
rowGroup.columns.forEach(({ meta_data: columnMetadata }) => {
if (!columnMetadata) throw new Error('parquet column metadata is undefined')
const columnName = getColumnName(metadata.schema, columnMetadata.path_in_schema)
// skip columns that are not requested
if (columns && !columns.includes(columnName)) return
if (columns && !columns.includes(getColumnName(columnMetadata.path_in_schema))) return
const startByte = getColumnOffset(columnMetadata)
const endByte = startByte + Number(columnMetadata.total_compressed_size)
@ -120,8 +119,7 @@ async function readRowGroup(options, rowGroup, groupStart) {
if (!columnMetadata) throw new Error('parquet column metadata is undefined')
// skip columns that are not requested
const columnName = getColumnName(metadata.schema, columnMetadata.path_in_schema)
// skip columns that are not requested
const columnName = getColumnName(columnMetadata.path_in_schema)
if (columns && !columns.includes(columnName)) continue
const columnStartByte = getColumnOffset(columnMetadata)
@ -150,15 +148,16 @@ async function readRowGroup(options, rowGroup, groupStart) {
// read column data async
promises.push(buffer.then(arrayBuffer => {
const schemaPath = getSchemaPath(metadata.schema, columnMetadata.path_in_schema)
/** @type {ArrayLike<any> | undefined} */
let columnData = readColumn(
arrayBuffer, bufferOffset, rowGroup, columnMetadata, metadata.schema, compressors
arrayBuffer, bufferOffset, rowGroup, columnMetadata, schemaPath, compressors
)
if (columnData.length !== Number(rowGroup.num_rows)) {
throw new Error(`parquet column length ${columnData.length} does not match row group length ${rowGroup.num_rows}`)
}
if (isMapLike(metadata.schema, columnMetadata.path_in_schema)) {
if (isMapLike(schemaPath)) {
const name = columnMetadata.path_in_schema.slice(0, -2).join('.')
if (!maps.has(name)) {
maps.set(name, columnData)

@ -28,29 +28,11 @@ function schemaTree(schema, rootIndex) {
}
/**
* Get the schema element with the given name.
* Get schema elements from the root to the given element name.
*
* @param {SchemaElement[]} schema
* @param {string[]} name path to the element
* @returns {SchemaTree} schema element
*/
function schemaElement(schema, name) {
let tree = schemaTree(schema, 0)
// traverse the tree to find the element
for (const part of name) {
const child = tree.children.find(child => child.element.name === part)
if (!child) throw new Error(`parquet schema element not found: ${name}`)
tree = child
}
return tree
}
/**
* Get each schema element from the root to the given element name.
*
* @param {SchemaElement[]} schema
* @param {string[]} name path to the element
* @returns {SchemaTree[]} schema element
* @returns {SchemaTree[]} list of schema elements
*/
export function getSchemaPath(schema, name) {
let tree = schemaTree(schema, 0)
@ -65,8 +47,7 @@ export function getSchemaPath(schema, name) {
}
/**
* Check if the schema element with the given name is required.
* An element is required if all of its ancestors are required.
* Check if the schema path and all its ancestors are required.
*
* @param {SchemaTree[]} schemaPath
* @returns {boolean} true if the element is required
@ -88,7 +69,7 @@ export function isRequired(schemaPath) {
*/
export function getMaxRepetitionLevel(schemaPath) {
let maxLevel = 0
for (const { element } of schemaPath.slice(1)) {
for (const { element } of schemaPath) {
if (element.repetition_type === 'REPEATED') {
maxLevel++
}
@ -129,29 +110,27 @@ export function skipDefinitionBytes(num) {
}
/**
* Get the column name as foo.bar and handle list-like columns.
* @param {SchemaElement[]} schema
* Get the column name as foo.bar and handle list and map like columns.
*
* @param {string[]} path
* @returns {string} column name
*/
export function getColumnName(schema, path) {
if (isListLike(schema, path) || isMapLike(schema, path)) {
return path.slice(0, -2).join('.')
} else {
return path.join('.')
}
export function getColumnName(path) {
return path.join('.')
.replace(/(\.list\.element)+/g, '')
.replace(/\.key_value\.key/g, '')
.replace(/\.key_value\.value/g, '')
}
/**
* Check if a column is list-like.
*
* @param {SchemaElement[]} schemaElements parquet schema elements
* @param {string[]} path column path
* @param {SchemaTree[]} schemaPath
* @returns {boolean} true if map-like
*/
export function isListLike(schemaElements, path) {
const schema = schemaElement(schemaElements, path.slice(0, -2))
if (path.length < 3) return false
export function isListLike(schemaPath) {
const schema = schemaPath.at(-3)
if (!schema) return false
if (schema.element.converted_type !== 'LIST') return false
if (schema.children.length > 1) return false
@ -168,13 +147,12 @@ export function isListLike(schemaElements, path) {
/**
* Check if a column is map-like.
*
* @param {SchemaElement[]} schemaElements parquet schema elements
* @param {string[]} path column path
* @param {SchemaTree[]} schemaPath
* @returns {boolean} true if map-like
*/
export function isMapLike(schemaElements, path) {
const schema = schemaElement(schemaElements, path.slice(0, -2))
if (path.length < 3) return false
export function isMapLike(schemaPath) {
const schema = schemaPath.at(-3)
if (!schema) return false
if (schema.element.converted_type !== 'MAP') return false
if (schema.children.length > 1) return false

@ -69,21 +69,21 @@ describe('Parquet schema utils', () => {
})
it('isListLike', () => {
expect(isListLike(schema, [])).toBe(false)
expect(isListLike(schema, ['child1'])).toBe(false)
expect(isListLike(schema, ['child2'])).toBe(false)
expect(isListLike(schema, ['child2', 'list', 'element'])).toBe(true)
expect(isListLike(schema, ['child3'])).toBe(false)
expect(isListLike(schema, ['child3', 'map', 'key'])).toBe(false)
expect(isListLike(getSchemaPath(schema, []))).toBe(false)
expect(isListLike(getSchemaPath(schema, ['child1']))).toBe(false)
expect(isListLike(getSchemaPath(schema, ['child2']))).toBe(false)
expect(isListLike(getSchemaPath(schema, ['child2', 'list', 'element']))).toBe(true)
expect(isListLike(getSchemaPath(schema, ['child3']))).toBe(false)
expect(isListLike(getSchemaPath(schema, ['child3', 'map', 'key']))).toBe(false)
})
it('isMapLike', () => {
expect(isMapLike(schema, [])).toBe(false)
expect(isMapLike(schema, ['child1'])).toBe(false)
expect(isMapLike(schema, ['child2'])).toBe(false)
expect(isMapLike(schema, ['child2', 'list', 'element'])).toBe(false)
expect(isMapLike(schema, ['child3'])).toBe(false)
expect(isMapLike(schema, ['child3', 'map', 'key'])).toBe(true)
expect(isMapLike(schema, ['child3', 'map', 'value'])).toBe(true)
expect(isMapLike(getSchemaPath(schema, []))).toBe(false)
expect(isMapLike(getSchemaPath(schema, ['child1']))).toBe(false)
expect(isMapLike(getSchemaPath(schema, ['child2']))).toBe(false)
expect(isMapLike(getSchemaPath(schema, ['child2', 'list', 'element']))).toBe(false)
expect(isMapLike(getSchemaPath(schema, ['child3']))).toBe(false)
expect(isMapLike(getSchemaPath(schema, ['child3', 'map', 'key']))).toBe(true)
expect(isMapLike(getSchemaPath(schema, ['child3', 'map', 'value']))).toBe(true)
})
})