Move decompressPage to avoid circular dependency chain

This commit is contained in:
Kenny Daniel 2024-05-26 06:00:20 -07:00
parent e44078ac9e
commit 36f5b4f043
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
3 changed files with 30 additions and 30 deletions

@ -1,10 +1,9 @@
import { assembleLists } from './assemble.js'
import { convertWithDictionary } from './convert.js'
import { readDataPage, readDictionaryPage } from './datapage.js'
import { decompressPage, readDataPage, readDictionaryPage } from './datapage.js'
import { readDataPageV2 } from './datapageV2.js'
import { parquetHeader } from './header.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
import { snappyUncompress } from './snappy.js'
import { concat } from './utils.js'
/**
@ -120,30 +119,3 @@ export function getColumnOffset({ dictionary_page_offset, data_page_offset }) {
}
return Number(columnOffset)
}
/**
* @param {Uint8Array} compressedBytes
* @param {number} uncompressed_page_size
* @param {import('./types.js').CompressionCodec} codec
* @param {import('./types.js').Compressors | undefined} compressors
* @returns {Uint8Array}
*/
export function decompressPage(compressedBytes, uncompressed_page_size, codec, compressors) {
/** @type {Uint8Array} */
let page
const customDecompressor = compressors?.[codec]
if (codec === 'UNCOMPRESSED') {
page = compressedBytes
} else if (customDecompressor) {
page = customDecompressor(compressedBytes, uncompressed_page_size)
} else if (codec === 'SNAPPY') {
page = new Uint8Array(uncompressed_page_size)
snappyUncompress(compressedBytes, page)
} else {
throw new Error(`parquet unsupported compression codec: ${codec}`)
}
if (page?.length !== uncompressed_page_size) {
throw new Error(`parquet decompressed page length ${page?.length} does not match header ${uncompressed_page_size}`)
}
return page
}

@ -1,6 +1,7 @@
import { bitWidth, byteStreamSplit, readRleBitPackedHybrid } from './encoding.js'
import { readPlain } from './plain.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
import { snappyUncompress } from './snappy.js'
/**
* Read a data page from uncompressed reader.
@ -107,3 +108,30 @@ function readDefinitionLevels(reader, daph, schemaPath) {
return { definitionLevels, numNulls }
}
/**
* @param {Uint8Array} compressedBytes
* @param {number} uncompressed_page_size
* @param {import('./types.js').CompressionCodec} codec
* @param {import('./types.js').Compressors | undefined} compressors
* @returns {Uint8Array}
*/
export function decompressPage(compressedBytes, uncompressed_page_size, codec, compressors) {
/** @type {Uint8Array} */
let page
const customDecompressor = compressors?.[codec]
if (codec === 'UNCOMPRESSED') {
page = compressedBytes
} else if (customDecompressor) {
page = customDecompressor(compressedBytes, uncompressed_page_size)
} else if (codec === 'SNAPPY') {
page = new Uint8Array(uncompressed_page_size)
snappyUncompress(compressedBytes, page)
} else {
throw new Error(`parquet unsupported compression codec: ${codec}`)
}
if (page?.length !== uncompressed_page_size) {
throw new Error(`parquet decompressed page length ${page?.length} does not match header ${uncompressed_page_size}`)
}
return page
}

@ -1,4 +1,4 @@
import { decompressPage } from './column.js'
import { decompressPage } from './datapage.js'
import { deltaBinaryUnpack, deltaByteArray, deltaLengthByteArray } from './delta.js'
import { bitWidth, byteStreamSplit, readRleBitPackedHybrid } from './encoding.js'
import { readPlain } from './plain.js'