diff --git a/src/column.js b/src/column.js index 0128ab3..f82cd17 100644 --- a/src/column.js +++ b/src/column.js @@ -14,7 +14,7 @@ import { snappyUncompress } from './snappy.js' const dayMillis = 86400000000000 // 1 day in milliseconds /** - * Read a column from the file. + * Parse column data from a buffer. * * @param {ArrayBuffer} arrayBuffer parquet file contents * @param {number} columnOffset offset to start reading from @@ -24,13 +24,13 @@ const dayMillis = 86400000000000 // 1 day in milliseconds * @returns {ArrayLike} array of values */ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schema) { - // parse column data - let valuesSeen = 0 - let byteOffset = 0 // byteOffset within the column /** @type {ArrayLike | undefined} */ let dictionary = undefined + let valuesSeen = 0 + let byteOffset = 0 // byteOffset within the column const rowIndex = [0] // map/list object index const rowData = [] + while (valuesSeen < rowGroup.num_rows) { // parse column header const { value: header, byteLength: headerLength } = parquetHeader(arrayBuffer, columnOffset + byteOffset) @@ -44,28 +44,13 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, columnOffset + byteOffset, columnOffset + byteOffset + header.compressed_page_size ) - // decompress bytes - /** @type {Uint8Array | undefined} */ - let page - const uncompressed_page_size = Number(header.uncompressed_page_size) - const { codec } = columnMetadata - if (codec === 'UNCOMPRESSED') { - page = compressedBytes - } else if (codec === 'SNAPPY') { - page = new Uint8Array(uncompressed_page_size) - snappyUncompress(compressedBytes, page) - } else { - throw new Error(`parquet unsupported compression codec: ${codec}`) - } - if (page?.length !== uncompressed_page_size) { - throw new Error(`parquet decompressed page length ${page?.length} does not match header ${uncompressed_page_size}`) - } // parse page data by type if (header.type === PageType.DATA_PAGE) { const daph = header.data_page_header if (!daph) throw new Error('parquet data page header is undefined') + const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec) const { definitionLevels, repetitionLevels, value: dataPage } = readDataPage(page, daph, schema, columnMetadata) valuesSeen += daph.num_values @@ -97,6 +82,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, throw new Error(`parquet index ${index} exceeds data page length ${dataPage.length}`) } let v = dataPage[index++] + // map to dictionary value if (dictionary) { v = dictionary[v] @@ -138,6 +124,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, const diph = header.dictionary_page_header if (!diph) throw new Error('parquet dictionary page header is undefined') + const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec) dictionary = readDictionaryPage(page, diph, schema, columnMetadata) } else if (header.type === PageType.DATA_PAGE_V2) { throw new Error('parquet data page v2 not supported') @@ -223,3 +210,28 @@ function parseDecimal(bytes) { } return value } + +/** + * @typedef {import('./types.js').PageHeader} PageHeader + * @typedef {import('./types.js').CompressionCodec} CompressionCodec + * @param {Uint8Array} compressedBytes + * @param {number} uncompressed_page_size + * @param {CompressionCodec} codec + * @returns {Uint8Array} + */ +function decompressPage(compressedBytes, uncompressed_page_size, codec) { + /** @type {Uint8Array | undefined} */ + let page + if (codec === 'UNCOMPRESSED') { + page = compressedBytes + } else if (codec === 'SNAPPY') { + page = new Uint8Array(uncompressed_page_size) + snappyUncompress(compressedBytes, page) + } else { + throw new Error(`parquet unsupported compression codec: ${codec}`) + } + if (page?.length !== uncompressed_page_size) { + throw new Error(`parquet decompressed page length ${page?.length} does not match header ${uncompressed_page_size}`) + } + return page +} diff --git a/src/hyparquet.d.ts b/src/hyparquet.d.ts index 55582ff..2928590 100644 --- a/src/hyparquet.d.ts +++ b/src/hyparquet.d.ts @@ -17,8 +17,8 @@ export { AsyncBuffer, FileMetaData, SchemaTree } from './types' * @param {number[]} [options.columns] columns to read, all columns if undefined * @param {number} [options.rowStart] first requested row index (inclusive) * @param {number} [options.rowEnd] last requested row index (exclusive) - * @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range. - * @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed + * @param {Function} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range. + * @param {Function} [options.onComplete] called when all requested rows and columns are parsed * @returns {Promise} resolves when all requested rows and columns are parsed */ export async function parquetRead(options: ParquetReadOptions): Promise @@ -69,11 +69,11 @@ export function parquetSchema(metadata: FileMetaData): SchemaTree * Decompress snappy data. * Accepts an output buffer to avoid allocating a new buffer for each call. * - * @param {Uint8Array} inputArray compressed data - * @param {Uint8Array} outputArray output buffer + * @param {Uint8Array} input compressed data + * @param {Uint8Array} output output buffer * @returns {boolean} true if successful */ -export function snappyUncompress(inputArray: Uint8Array, outputArray: Uint8Array): boolean +export function snappyUncompress(input: Uint8Array, output: Uint8Array): boolean /** * Replace bigints with numbers. diff --git a/src/read.js b/src/read.js index b9bf1ea..64bace5 100644 --- a/src/read.js +++ b/src/read.js @@ -109,11 +109,13 @@ async function readRowGroup(options, rowGroup) { if (columns && !columns.includes(columnIndex)) continue const columnMetadata = rowGroup.columns[columnIndex].meta_data if (!columnMetadata) throw new Error('parquet column metadata is undefined') + const columnStartByte = getColumnOffset(columnMetadata) const columnEndByte = columnStartByte + Number(columnMetadata.total_compressed_size) const columnBytes = columnEndByte - columnStartByte // skip columns larger than 1gb + // TODO: stream process the data, returning only the requested rows if (columnBytes > 1 << 30) { console.warn(`parquet skipping huge column "${columnMetadata.path_in_schema}" ${columnBytes.toLocaleString()} bytes`) continue diff --git a/src/snappy.js b/src/snappy.js index 36aefcd..d0dc4e7 100644 --- a/src/snappy.js +++ b/src/snappy.js @@ -41,19 +41,19 @@ function selfCopyBytes(array, pos, offset, length) { * Decompress snappy data. * Accepts an output buffer to avoid allocating a new buffer for each call. * - * @param {Uint8Array} inputArray compressed data - * @param {Uint8Array} outputArray output buffer + * @param {Uint8Array} input compressed data + * @param {Uint8Array} output output buffer * @returns {void} */ -export function snappyUncompress(inputArray, outputArray) { - const inputLength = inputArray.byteLength - const outputLength = outputArray.byteLength +export function snappyUncompress(input, output) { + const inputLength = input.byteLength + const outputLength = output.byteLength let pos = 0 let outPos = 0 // skip preamble (contains uncompressed length as varint) while (pos < inputLength) { - const c = inputArray[pos] + const c = input[pos] pos += 1 if (c < 128) { break @@ -64,7 +64,7 @@ export function snappyUncompress(inputArray, outputArray) { } while (pos < inputLength) { - const c = inputArray[pos] + const c = input[pos] let len = 0 pos += 1 @@ -82,17 +82,17 @@ export function snappyUncompress(inputArray, outputArray) { throw new Error('snappy error literal pos + 3 >= inputLength') } const lengthSize = len - 60 // length bytes - 1 - len = inputArray[pos] - + (inputArray[pos + 1] << 8) - + (inputArray[pos + 2] << 16) - + (inputArray[pos + 3] << 24) + len = input[pos] + + (input[pos + 1] << 8) + + (input[pos + 2] << 16) + + (input[pos + 3] << 24) len = (len & WORD_MASK[lengthSize]) + 1 pos += lengthSize } if (pos + len > inputLength) { throw new Error('snappy error literal exceeds input length') } - copyBytes(inputArray, pos, outputArray, outPos, len) + copyBytes(input, pos, output, outPos, len) pos += len outPos += len } else { @@ -102,7 +102,7 @@ export function snappyUncompress(inputArray, outputArray) { case 1: // Copy with 1-byte offset len = ((c >>> 2) & 0x7) + 4 - offset = inputArray[pos] + ((c >>> 5) << 8) + offset = input[pos] + ((c >>> 5) << 8) pos += 1 break case 2: @@ -111,7 +111,7 @@ export function snappyUncompress(inputArray, outputArray) { throw new Error('snappy error end of input') } len = (c >>> 2) + 1 - offset = inputArray[pos] + (inputArray[pos + 1] << 8) + offset = input[pos] + (input[pos + 1] << 8) pos += 2 break case 3: @@ -120,10 +120,10 @@ export function snappyUncompress(inputArray, outputArray) { throw new Error('snappy error end of input') } len = (c >>> 2) + 1 - offset = inputArray[pos] - + (inputArray[pos + 1] << 8) - + (inputArray[pos + 2] << 16) - + (inputArray[pos + 3] << 24) + offset = input[pos] + + (input[pos + 1] << 8) + + (input[pos + 2] << 16) + + (input[pos + 3] << 24) pos += 4 break default: @@ -135,7 +135,7 @@ export function snappyUncompress(inputArray, outputArray) { if (offset > outPos) { throw new Error('cannot copy from before start of buffer') } - selfCopyBytes(outputArray, outPos, offset, len) + selfCopyBytes(output, outPos, offset, len) outPos += len } }