diff --git a/src/column.js b/src/column.js new file mode 100644 index 0000000..cbcef79 --- /dev/null +++ b/src/column.js @@ -0,0 +1,124 @@ +import { assembleObjects, readDataPage, readDictionaryPage } from './datapage.js' +import { parquetHeader } from './header.js' +import { getMaxDefinitionLevel, isRequired } from './schema.js' +import { snappyUncompress } from './snappy.js' +import { CompressionCodec, Encoding, PageType } from './types.js' + +/** + * @typedef {import('./types.js').SchemaElement} SchemaElement + * @typedef {import('./types.js').ColumnMetaData} ColumnMetaData + * @typedef {import('./types.js').RowGroup} RowGroup + */ + +/** + * Read a column from the file. + * + * @param {ArrayBuffer} arrayBuffer parquet file contents + * @param {RowGroup} rowGroup row group metadata + * @param {ColumnMetaData} columnMetadata column metadata + * @param {SchemaElement[]} schema schema for the file + * @returns {ArrayLike} array of values + */ +export function readColumn(arrayBuffer, rowGroup, columnMetadata, schema) { + // find start of column data + const { codec, dictionary_page_offset, data_page_offset } = columnMetadata + let columnOffset = dictionary_page_offset + if (dictionary_page_offset === undefined || data_page_offset < dictionary_page_offset) { + columnOffset = data_page_offset + } + columnOffset = Number(columnOffset) // cast bigint to number + + // parse column data + let valuesSeen = 0 + let byteOffset = 0 // byteOffset within the column + let dictionary = undefined + const rowIndex = [0] // map/list object index + while (valuesSeen < rowGroup.num_rows) { + // parse column header + const { value: header, byteLength: headerLength } = parquetHeader(arrayBuffer, columnOffset + byteOffset) + byteOffset += headerLength + if (!header || header.compressed_page_size === undefined) throw new Error('header is undefined') + + // read compressed_page_size bytes starting at offset + const compressedBytes = new Uint8Array( + arrayBuffer, columnOffset + byteOffset, header.compressed_page_size + ) + // decompress bytes + let page + const uncompressed_page_size = Number(header.uncompressed_page_size) // TODO: unsafe cast + if (codec === CompressionCodec.GZIP) { + throw new Error('GZIP compression not supported') + } else if (codec === CompressionCodec.SNAPPY) { + page = new Uint8Array(uncompressed_page_size) + snappyUncompress(compressedBytes, page) + } else if (codec === CompressionCodec.LZO) { + throw new Error('LZO compression not supported') + } + if (!page || page.length !== uncompressed_page_size) { + throw new Error('decompressed page size does not match header') + } + + // parse page data by type + if (header.type === PageType.DATA_PAGE) { + const daph = header.data_page_header + if (!daph) throw new Error('data page header is undefined') + + const { definitionLevels, repetitionLevels, value } = readDataPage(page, daph, schema, columnMetadata) + valuesSeen += daph.num_values + + // construct output values: skip nulls and construct lists + let values + if (repetitionLevels.length) { + // Use repetition levels to construct lists + if ([Encoding.PLAIN_DICTIONARY, Encoding.RLE_DICTIONARY].includes(daph.encoding)) { + // TODO: dereference dictionary values + } + const isNull = columnMetadata && !isRequired(schema, [columnMetadata.path_in_schema[0]]) + const nullValue = false // TODO: unused? + const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema) + values = assembleObjects(definitionLevels, repetitionLevels, value, isNull, nullValue, maxDefinitionLevel, rowIndex[0]) + } else if (definitionLevels) { + const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema) + // Use definition levels to skip nulls + let index = 0 + values = [] + const decoder = new TextDecoder() + for (let i = 0; i < definitionLevels.length; i++) { + if (definitionLevels[i] === maxDefinitionLevel) { + if (index > value.length) throw new Error('parquet index out of bounds') + let v = value[index++] + // map to dictionary value + if (dictionary) { + v = dictionary[v] + if (v instanceof Uint8Array) { + try { + v = decoder.decode(v) + } catch (e) { + console.warn('parquet failed to decode byte array as string', e) + } + } + } + values[i] = v + } else { + values[i] = undefined + } + } + } else { + // TODO: use dictionary + values = value + } + + // TODO: check that we are at the end of the page + return values + } else if (header.type === PageType.DICTIONARY_PAGE) { + const diph = header.dictionary_page_header + if (!diph) throw new Error('dictionary page header is undefined') + + dictionary = readDictionaryPage(page, diph, schema, columnMetadata) + } else { + throw new Error(`parquet unsupported page type: ${header.type}`) + } + byteOffset += header.compressed_page_size + } + throw new Error('parquet error reading column should have returned') +}