mirror of
https://github.com/asadbek064/hyparquet.git
synced 2026-01-10 04:56:38 +00:00
Parquet column parser
This commit is contained in:
parent
c384cf469b
commit
955a5f0653
124
src/column.js
Normal file
124
src/column.js
Normal file
@ -0,0 +1,124 @@
|
||||
import { assembleObjects, readDataPage, readDictionaryPage } from './datapage.js'
|
||||
import { parquetHeader } from './header.js'
|
||||
import { getMaxDefinitionLevel, isRequired } from './schema.js'
|
||||
import { snappyUncompress } from './snappy.js'
|
||||
import { CompressionCodec, Encoding, PageType } from './types.js'
|
||||
|
||||
/**
|
||||
* @typedef {import('./types.js').SchemaElement} SchemaElement
|
||||
* @typedef {import('./types.js').ColumnMetaData} ColumnMetaData
|
||||
* @typedef {import('./types.js').RowGroup} RowGroup
|
||||
*/
|
||||
|
||||
/**
|
||||
* Read a column from the file.
|
||||
*
|
||||
* @param {ArrayBuffer} arrayBuffer parquet file contents
|
||||
* @param {RowGroup} rowGroup row group metadata
|
||||
* @param {ColumnMetaData} columnMetadata column metadata
|
||||
* @param {SchemaElement[]} schema schema for the file
|
||||
* @returns {ArrayLike<any>} array of values
|
||||
*/
|
||||
export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) {
|
||||
// find start of column data
|
||||
const { codec, dictionary_page_offset, data_page_offset } = columnMetadata
|
||||
let columnOffset = dictionary_page_offset
|
||||
if (dictionary_page_offset === undefined || data_page_offset < dictionary_page_offset) {
|
||||
columnOffset = data_page_offset
|
||||
}
|
||||
columnOffset = Number(columnOffset) // cast bigint to number
|
||||
|
||||
// parse column data
|
||||
let valuesSeen = 0
|
||||
let byteOffset = 0 // byteOffset within the column
|
||||
let dictionary = undefined
|
||||
const rowIndex = [0] // map/list object index
|
||||
while (valuesSeen < rowGroup.num_rows) {
|
||||
// parse column header
|
||||
const { value: header, byteLength: headerLength } = parquetHeader(arrayBuffer, columnOffset + byteOffset)
|
||||
byteOffset += headerLength
|
||||
if (!header || header.compressed_page_size === undefined) throw new Error('header is undefined')
|
||||
|
||||
// read compressed_page_size bytes starting at offset
|
||||
const compressedBytes = new Uint8Array(
|
||||
arrayBuffer, columnOffset + byteOffset, header.compressed_page_size
|
||||
)
|
||||
// decompress bytes
|
||||
let page
|
||||
const uncompressed_page_size = Number(header.uncompressed_page_size) // TODO: unsafe cast
|
||||
if (codec === CompressionCodec.GZIP) {
|
||||
throw new Error('GZIP compression not supported')
|
||||
} else if (codec === CompressionCodec.SNAPPY) {
|
||||
page = new Uint8Array(uncompressed_page_size)
|
||||
snappyUncompress(compressedBytes, page)
|
||||
} else if (codec === CompressionCodec.LZO) {
|
||||
throw new Error('LZO compression not supported')
|
||||
}
|
||||
if (!page || page.length !== uncompressed_page_size) {
|
||||
throw new Error('decompressed page size does not match header')
|
||||
}
|
||||
|
||||
// parse page data by type
|
||||
if (header.type === PageType.DATA_PAGE) {
|
||||
const daph = header.data_page_header
|
||||
if (!daph) throw new Error('data page header is undefined')
|
||||
|
||||
const { definitionLevels, repetitionLevels, value } = readDataPage(page, daph, schema, columnMetadata)
|
||||
valuesSeen += daph.num_values
|
||||
|
||||
// construct output values: skip nulls and construct lists
|
||||
let values
|
||||
if (repetitionLevels.length) {
|
||||
// Use repetition levels to construct lists
|
||||
if ([Encoding.PLAIN_DICTIONARY, Encoding.RLE_DICTIONARY].includes(daph.encoding)) {
|
||||
// TODO: dereference dictionary values
|
||||
}
|
||||
const isNull = columnMetadata && !isRequired(schema, [columnMetadata.path_in_schema[0]])
|
||||
const nullValue = false // TODO: unused?
|
||||
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
|
||||
values = assembleObjects(definitionLevels, repetitionLevels, value, isNull, nullValue, maxDefinitionLevel, rowIndex[0])
|
||||
} else if (definitionLevels) {
|
||||
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
|
||||
// Use definition levels to skip nulls
|
||||
let index = 0
|
||||
values = []
|
||||
const decoder = new TextDecoder()
|
||||
for (let i = 0; i < definitionLevels.length; i++) {
|
||||
if (definitionLevels[i] === maxDefinitionLevel) {
|
||||
if (index > value.length) throw new Error('parquet index out of bounds')
|
||||
let v = value[index++]
|
||||
// map to dictionary value
|
||||
if (dictionary) {
|
||||
v = dictionary[v]
|
||||
if (v instanceof Uint8Array) {
|
||||
try {
|
||||
v = decoder.decode(v)
|
||||
} catch (e) {
|
||||
console.warn('parquet failed to decode byte array as string', e)
|
||||
}
|
||||
}
|
||||
}
|
||||
values[i] = v
|
||||
} else {
|
||||
values[i] = undefined
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO: use dictionary
|
||||
values = value
|
||||
}
|
||||
|
||||
// TODO: check that we are at the end of the page
|
||||
return values
|
||||
} else if (header.type === PageType.DICTIONARY_PAGE) {
|
||||
const diph = header.dictionary_page_header
|
||||
if (!diph) throw new Error('dictionary page header is undefined')
|
||||
|
||||
dictionary = readDictionaryPage(page, diph, schema, columnMetadata)
|
||||
} else {
|
||||
throw new Error(`parquet unsupported page type: ${header.type}`)
|
||||
}
|
||||
byteOffset += header.compressed_page_size
|
||||
}
|
||||
throw new Error('parquet error reading column should have returned')
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user