Convert consistently

This commit is contained in:
Kenny Daniel 2024-05-05 17:51:31 -07:00
parent e398e66dd4
commit 4d960e0533
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
3 changed files with 17 additions and 30 deletions

@ -49,6 +49,8 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
)
// parse page data by type
/** @type {DecodedArray} */
let values
if (header.type === 'DATA_PAGE') {
const daph = header.data_page_header
if (!daph) throw new Error('parquet data page header is undefined')
@ -59,11 +61,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata)
valuesSeen += daph.num_values
const dictionaryEncoding = daph.encoding === 'PLAIN_DICTIONARY' || daph.encoding === 'RLE_DICTIONARY'
// construct output values: skip nulls and construct lists
/** @type {DecodedArray} */
let values
if (repetitionLevels.length) {
dereferenceDictionary(dictionary, dataPage)
// Use repetition levels to construct lists
@ -80,16 +78,10 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
// Use definition levels to skip nulls
values = []
skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, values)
values = convert(values, element)
} else {
if (dictionaryEncoding && dictionary) {
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, element)
} else if (Array.isArray(dataPage)) {
// convert primitive types to rich types
values = convert(dataPage, element)
} else {
values = dataPage // TODO: data page shouldn't be a fixed byte array?
}
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, element)
}
// TODO: check that we are at the end of the page
@ -118,19 +110,22 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
if (repetitionLevels.length) {
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, element)
// Use repetition levels to construct lists
concat(rowData, assembleObjects(
definitionLevels, repetitionLevels, dataPage, true, maxDefinitionLevel, maxRepetitionLevel
))
values = assembleObjects(
definitionLevels, repetitionLevels, values, true, maxDefinitionLevel, maxRepetitionLevel
)
} else if (daph2.num_nulls) {
// skip nulls
if (!definitionLevels) throw new Error('parquet data page v2 nulls missing definition levels')
skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, rowData)
values = [] // TODO: copy straight into rowData, combine convert into skipNulls
skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, values)
values = convert(values, element)
} else {
dereferenceDictionary(dictionary, dataPage)
concat(rowData, dataPage)
values = convert(dataPage, element)
}
// TODO: convert?
concat(rowData, values)
} else {
throw new Error(`parquet unsupported page type: ${header.type}`)
}
@ -200,7 +195,7 @@ export function decompressPage(compressedBytes, uncompressed_page_size, codec, c
}
/**
* Expand data page list with nulls and convert to utf8.
* Expand data page list with nulls.
*
* @param {number[]} definitionLevels
* @param {number} maxDefinitionLevel
@ -212,7 +207,6 @@ function skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, o
if (output.length) throw new Error('parquet output array is not empty')
// Use definition levels to skip nulls
let index = 0
const decoder = new TextDecoder()
for (let i = 0; i < definitionLevels.length; i++) {
if (definitionLevels[i] === maxDefinitionLevel) {
if (index > dataPage.length) {
@ -223,13 +217,6 @@ function skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, o
// map to dictionary value
if (dictionary) {
v = dictionary[v]
if (v instanceof Uint8Array) {
try {
v = decoder.decode(v)
} catch (e) {
console.warn('parquet failed to decode byte array as string', e)
}
}
}
output[i] = v
} else {

@ -13,7 +13,7 @@ export function convert(data, schemaElement) {
const ctype = schemaElement.converted_type
if (ctype === 'UTF8') {
const decoder = new TextDecoder()
return data.map(v => typeof v === 'string' ? v : decoder.decode(v))
return data.map(v => v && decoder.decode(v))
}
if (ctype === 'DECIMAL') {
const scaleFactor = schemaElement.scale ? Math.pow(10, schemaElement.scale) : 1

2
src/hyparquet.d.ts vendored

@ -1,6 +1,6 @@
import type { AsyncBuffer, Compressors, FileMetaData, SchemaTree } from './types.d.ts'
export type { AsyncBuffer, FileMetaData, SchemaTree }
export type { AsyncBuffer, Compressors, FileMetaData, SchemaTree }
/**
* Read parquet data rows from a file-like object.