diff --git a/src/column.js b/src/column.js index 073c433..37466f2 100644 --- a/src/column.js +++ b/src/column.js @@ -13,7 +13,7 @@ import { CompressionCodec, Encoding, PageType } from './types.js' /** * Read a column from the file. * - * @param {ArrayBuffer} arrayBuffer parquet file contents + * @param {ArrayBufferLike} arrayBuffer parquet file contents * @param {RowGroup} rowGroup row group metadata * @param {ColumnMetaData} columnMetadata column metadata * @param {SchemaElement[]} schema schema for the file @@ -21,18 +21,14 @@ import { CompressionCodec, Encoding, PageType } from './types.js' */ export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) { // find start of column data - const { codec, dictionary_page_offset, data_page_offset } = columnMetadata - let columnOffset = dictionary_page_offset - if (dictionary_page_offset === undefined || data_page_offset < dictionary_page_offset) { - columnOffset = data_page_offset - } - columnOffset = Number(columnOffset) + const columnOffset = getColumnOffset(columnMetadata) // parse column data let valuesSeen = 0 let byteOffset = 0 // byteOffset within the column let dictionary = undefined const rowIndex = [0] // map/list object index + const rowData = [] while (valuesSeen < rowGroup.num_rows) { // parse column header const { value: header, byteLength: headerLength } = parquetHeader(arrayBuffer, columnOffset + byteOffset) @@ -40,12 +36,14 @@ export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) { if (!header || header.compressed_page_size === undefined) throw new Error('parquet header is undefined') // read compressed_page_size bytes starting at offset - const compressedBytes = new Uint8Array( - arrayBuffer, columnOffset + byteOffset, header.compressed_page_size - ) + const compressedBytes = new Uint8Array(arrayBuffer.slice( + columnOffset + byteOffset, + columnOffset + byteOffset + header.compressed_page_size + )) // decompress bytes let page const uncompressed_page_size = Number(header.uncompressed_page_size) + const { codec } = columnMetadata if (codec === CompressionCodec.GZIP) { throw new Error('parquet gzip compression not supported') } else if (codec === CompressionCodec.SNAPPY) { @@ -108,8 +106,11 @@ export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) { values = value } - // TODO: check that we are at the end of the page - return values + // check that we are at the end of the page + if (values.length !== daph.num_values) { + throw new Error('parquet column length does not match page header') + } + rowData.push(...Array.from(values)) } else if (header.type === PageType.DICTIONARY_PAGE) { const diph = header.dictionary_page_header if (!diph) throw new Error('parquet dictionary page header is undefined') @@ -120,5 +121,23 @@ export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) { } byteOffset += header.compressed_page_size } - throw new Error('parquet error reading column should have returned') + if (rowData.length !== Number(rowGroup.num_rows)) { + throw new Error('parquet column length does not match row group length') + } + return rowData +} + +/** + * Find the start byte offset for a column chunk. + * + * @param {ColumnMetaData} columnMetadata column metadata + * @returns {number} byte offset + */ +export function getColumnOffset(columnMetadata) { + const { dictionary_page_offset, data_page_offset } = columnMetadata + let columnOffset = dictionary_page_offset + if (dictionary_page_offset === undefined || data_page_offset < dictionary_page_offset) { + columnOffset = data_page_offset + } + return Number(columnOffset) }