decompressPage for dictionary and data page v1 only

This commit is contained in:
Kenny Daniel 2024-02-24 11:55:04 -08:00
parent cfaa0bca90
commit a7e5aef31f
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
4 changed files with 58 additions and 44 deletions

@ -14,7 +14,7 @@ import { snappyUncompress } from './snappy.js'
const dayMillis = 86400000000000 // 1 day in milliseconds
/**
* Read a column from the file.
* Parse column data from a buffer.
*
* @param {ArrayBuffer} arrayBuffer parquet file contents
* @param {number} columnOffset offset to start reading from
@ -24,13 +24,13 @@ const dayMillis = 86400000000000 // 1 day in milliseconds
* @returns {ArrayLike<any>} array of values
*/
export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schema) {
// parse column data
let valuesSeen = 0
let byteOffset = 0 // byteOffset within the column
/** @type {ArrayLike<any> | undefined} */
let dictionary = undefined
let valuesSeen = 0
let byteOffset = 0 // byteOffset within the column
const rowIndex = [0] // map/list object index
const rowData = []
while (valuesSeen < rowGroup.num_rows) {
// parse column header
const { value: header, byteLength: headerLength } = parquetHeader(arrayBuffer, columnOffset + byteOffset)
@ -44,28 +44,13 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
columnOffset + byteOffset,
columnOffset + byteOffset + header.compressed_page_size
)
// decompress bytes
/** @type {Uint8Array | undefined} */
let page
const uncompressed_page_size = Number(header.uncompressed_page_size)
const { codec } = columnMetadata
if (codec === 'UNCOMPRESSED') {
page = compressedBytes
} else if (codec === 'SNAPPY') {
page = new Uint8Array(uncompressed_page_size)
snappyUncompress(compressedBytes, page)
} else {
throw new Error(`parquet unsupported compression codec: ${codec}`)
}
if (page?.length !== uncompressed_page_size) {
throw new Error(`parquet decompressed page length ${page?.length} does not match header ${uncompressed_page_size}`)
}
// parse page data by type
if (header.type === PageType.DATA_PAGE) {
const daph = header.data_page_header
if (!daph) throw new Error('parquet data page header is undefined')
const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec)
const { definitionLevels, repetitionLevels, value: dataPage } = readDataPage(page, daph, schema, columnMetadata)
valuesSeen += daph.num_values
@ -97,6 +82,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
throw new Error(`parquet index ${index} exceeds data page length ${dataPage.length}`)
}
let v = dataPage[index++]
// map to dictionary value
if (dictionary) {
v = dictionary[v]
@ -138,6 +124,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
const diph = header.dictionary_page_header
if (!diph) throw new Error('parquet dictionary page header is undefined')
const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec)
dictionary = readDictionaryPage(page, diph, schema, columnMetadata)
} else if (header.type === PageType.DATA_PAGE_V2) {
throw new Error('parquet data page v2 not supported')
@ -223,3 +210,28 @@ function parseDecimal(bytes) {
}
return value
}
/**
* @typedef {import('./types.js').PageHeader} PageHeader
* @typedef {import('./types.js').CompressionCodec} CompressionCodec
* @param {Uint8Array} compressedBytes
* @param {number} uncompressed_page_size
* @param {CompressionCodec} codec
* @returns {Uint8Array}
*/
function decompressPage(compressedBytes, uncompressed_page_size, codec) {
/** @type {Uint8Array | undefined} */
let page
if (codec === 'UNCOMPRESSED') {
page = compressedBytes
} else if (codec === 'SNAPPY') {
page = new Uint8Array(uncompressed_page_size)
snappyUncompress(compressedBytes, page)
} else {
throw new Error(`parquet unsupported compression codec: ${codec}`)
}
if (page?.length !== uncompressed_page_size) {
throw new Error(`parquet decompressed page length ${page?.length} does not match header ${uncompressed_page_size}`)
}
return page
}

10
src/hyparquet.d.ts vendored

@ -17,8 +17,8 @@ export { AsyncBuffer, FileMetaData, SchemaTree } from './types'
* @param {number[]} [options.columns] columns to read, all columns if undefined
* @param {number} [options.rowStart] first requested row index (inclusive)
* @param {number} [options.rowEnd] last requested row index (exclusive)
* @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
* @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed
* @param {Function} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
* @param {Function} [options.onComplete] called when all requested rows and columns are parsed
* @returns {Promise<void>} resolves when all requested rows and columns are parsed
*/
export async function parquetRead(options: ParquetReadOptions): Promise<void>
@ -69,11 +69,11 @@ export function parquetSchema(metadata: FileMetaData): SchemaTree
* Decompress snappy data.
* Accepts an output buffer to avoid allocating a new buffer for each call.
*
* @param {Uint8Array} inputArray compressed data
* @param {Uint8Array} outputArray output buffer
* @param {Uint8Array} input compressed data
* @param {Uint8Array} output output buffer
* @returns {boolean} true if successful
*/
export function snappyUncompress(inputArray: Uint8Array, outputArray: Uint8Array): boolean
export function snappyUncompress(input: Uint8Array, output: Uint8Array): boolean
/**
* Replace bigints with numbers.

@ -109,11 +109,13 @@ async function readRowGroup(options, rowGroup) {
if (columns && !columns.includes(columnIndex)) continue
const columnMetadata = rowGroup.columns[columnIndex].meta_data
if (!columnMetadata) throw new Error('parquet column metadata is undefined')
const columnStartByte = getColumnOffset(columnMetadata)
const columnEndByte = columnStartByte + Number(columnMetadata.total_compressed_size)
const columnBytes = columnEndByte - columnStartByte
// skip columns larger than 1gb
// TODO: stream process the data, returning only the requested rows
if (columnBytes > 1 << 30) {
console.warn(`parquet skipping huge column "${columnMetadata.path_in_schema}" ${columnBytes.toLocaleString()} bytes`)
continue

@ -41,19 +41,19 @@ function selfCopyBytes(array, pos, offset, length) {
* Decompress snappy data.
* Accepts an output buffer to avoid allocating a new buffer for each call.
*
* @param {Uint8Array} inputArray compressed data
* @param {Uint8Array} outputArray output buffer
* @param {Uint8Array} input compressed data
* @param {Uint8Array} output output buffer
* @returns {void}
*/
export function snappyUncompress(inputArray, outputArray) {
const inputLength = inputArray.byteLength
const outputLength = outputArray.byteLength
export function snappyUncompress(input, output) {
const inputLength = input.byteLength
const outputLength = output.byteLength
let pos = 0
let outPos = 0
// skip preamble (contains uncompressed length as varint)
while (pos < inputLength) {
const c = inputArray[pos]
const c = input[pos]
pos += 1
if (c < 128) {
break
@ -64,7 +64,7 @@ export function snappyUncompress(inputArray, outputArray) {
}
while (pos < inputLength) {
const c = inputArray[pos]
const c = input[pos]
let len = 0
pos += 1
@ -82,17 +82,17 @@ export function snappyUncompress(inputArray, outputArray) {
throw new Error('snappy error literal pos + 3 >= inputLength')
}
const lengthSize = len - 60 // length bytes - 1
len = inputArray[pos]
+ (inputArray[pos + 1] << 8)
+ (inputArray[pos + 2] << 16)
+ (inputArray[pos + 3] << 24)
len = input[pos]
+ (input[pos + 1] << 8)
+ (input[pos + 2] << 16)
+ (input[pos + 3] << 24)
len = (len & WORD_MASK[lengthSize]) + 1
pos += lengthSize
}
if (pos + len > inputLength) {
throw new Error('snappy error literal exceeds input length')
}
copyBytes(inputArray, pos, outputArray, outPos, len)
copyBytes(input, pos, output, outPos, len)
pos += len
outPos += len
} else {
@ -102,7 +102,7 @@ export function snappyUncompress(inputArray, outputArray) {
case 1:
// Copy with 1-byte offset
len = ((c >>> 2) & 0x7) + 4
offset = inputArray[pos] + ((c >>> 5) << 8)
offset = input[pos] + ((c >>> 5) << 8)
pos += 1
break
case 2:
@ -111,7 +111,7 @@ export function snappyUncompress(inputArray, outputArray) {
throw new Error('snappy error end of input')
}
len = (c >>> 2) + 1
offset = inputArray[pos] + (inputArray[pos + 1] << 8)
offset = input[pos] + (input[pos + 1] << 8)
pos += 2
break
case 3:
@ -120,10 +120,10 @@ export function snappyUncompress(inputArray, outputArray) {
throw new Error('snappy error end of input')
}
len = (c >>> 2) + 1
offset = inputArray[pos]
+ (inputArray[pos + 1] << 8)
+ (inputArray[pos + 2] << 16)
+ (inputArray[pos + 3] << 24)
offset = input[pos]
+ (input[pos + 1] << 8)
+ (input[pos + 2] << 16)
+ (input[pos + 3] << 24)
pos += 4
break
default:
@ -135,7 +135,7 @@ export function snappyUncompress(inputArray, outputArray) {
if (offset > outPos) {
throw new Error('cannot copy from before start of buffer')
}
selfCopyBytes(outputArray, outPos, offset, len)
selfCopyBytes(output, outPos, offset, len)
outPos += len
}
}