Factor out getColumnOffset

This commit is contained in:
Kenny Daniel 2024-01-14 11:14:04 -08:00
parent 6ca1d2a85c
commit 4708ffca2f
No known key found for this signature in database
GPG Key ID: 6A3C5E318BE71391

@ -13,7 +13,7 @@ import { CompressionCodec, Encoding, PageType } from './types.js'
/**
* Read a column from the file.
*
* @param {ArrayBuffer} arrayBuffer parquet file contents
* @param {ArrayBufferLike} arrayBuffer parquet file contents
* @param {RowGroup} rowGroup row group metadata
* @param {ColumnMetaData} columnMetadata column metadata
* @param {SchemaElement[]} schema schema for the file
@ -21,18 +21,14 @@ import { CompressionCodec, Encoding, PageType } from './types.js'
*/
export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) {
// find start of column data
const { codec, dictionary_page_offset, data_page_offset } = columnMetadata
let columnOffset = dictionary_page_offset
if (dictionary_page_offset === undefined || data_page_offset < dictionary_page_offset) {
columnOffset = data_page_offset
}
columnOffset = Number(columnOffset)
const columnOffset = getColumnOffset(columnMetadata)
// parse column data
let valuesSeen = 0
let byteOffset = 0 // byteOffset within the column
let dictionary = undefined
const rowIndex = [0] // map/list object index
const rowData = []
while (valuesSeen < rowGroup.num_rows) {
// parse column header
const { value: header, byteLength: headerLength } = parquetHeader(arrayBuffer, columnOffset + byteOffset)
@ -40,12 +36,14 @@ export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) {
if (!header || header.compressed_page_size === undefined) throw new Error('parquet header is undefined')
// read compressed_page_size bytes starting at offset
const compressedBytes = new Uint8Array(
arrayBuffer, columnOffset + byteOffset, header.compressed_page_size
)
const compressedBytes = new Uint8Array(arrayBuffer.slice(
columnOffset + byteOffset,
columnOffset + byteOffset + header.compressed_page_size
))
// decompress bytes
let page
const uncompressed_page_size = Number(header.uncompressed_page_size)
const { codec } = columnMetadata
if (codec === CompressionCodec.GZIP) {
throw new Error('parquet gzip compression not supported')
} else if (codec === CompressionCodec.SNAPPY) {
@ -108,8 +106,11 @@ export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) {
values = value
}
// TODO: check that we are at the end of the page
return values
// check that we are at the end of the page
if (values.length !== daph.num_values) {
throw new Error('parquet column length does not match page header')
}
rowData.push(...Array.from(values))
} else if (header.type === PageType.DICTIONARY_PAGE) {
const diph = header.dictionary_page_header
if (!diph) throw new Error('parquet dictionary page header is undefined')
@ -120,5 +121,23 @@ export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) {
}
byteOffset += header.compressed_page_size
}
throw new Error('parquet error reading column should have returned')
if (rowData.length !== Number(rowGroup.num_rows)) {
throw new Error('parquet column length does not match row group length')
}
return rowData
}
/**
* Find the start byte offset for a column chunk.
*
* @param {ColumnMetaData} columnMetadata column metadata
* @returns {number} byte offset
*/
export function getColumnOffset(columnMetadata) {
const { dictionary_page_offset, data_page_offset } = columnMetadata
let columnOffset = dictionary_page_offset
if (dictionary_page_offset === undefined || data_page_offset < dictionary_page_offset) {
columnOffset = data_page_offset
}
return Number(columnOffset)
}