mirror of
https://github.com/asadbek064/hyparquet.git
synced 2025-12-30 00:46:38 +00:00
Group column decoding params into an object
This commit is contained in:
parent
4645e34f97
commit
4df7095ab4
@ -12,13 +12,11 @@ import { deserializeTCompactProtocol } from './thrift.js'
|
||||
* @param {DataReader} reader
|
||||
* @param {number} rowGroupStart skip this many rows in the row group
|
||||
* @param {number} rowGroupEnd read up to this index in the row group (Infinity reads all rows)
|
||||
* @param {ColumnMetaData} columnMetadata column metadata
|
||||
* @param {SchemaTree[]} schemaPath schema path for the column
|
||||
* @param {ParquetReadOptions} options read options
|
||||
* @param {ColumnDecoder} columnDecoder column decoder params
|
||||
* @returns {DecodedArray[]}
|
||||
*/
|
||||
export function readColumn(reader, rowGroupStart, rowGroupEnd, columnMetadata, schemaPath, options) {
|
||||
const { element } = schemaPath[schemaPath.length - 1]
|
||||
export function readColumn(reader, rowGroupStart, rowGroupEnd, columnDecoder) {
|
||||
const { element, utf8 } = columnDecoder
|
||||
/** @type {DecodedArray[]} */
|
||||
const chunks = []
|
||||
/** @type {DecodedArray | undefined} */
|
||||
@ -32,12 +30,12 @@ export function readColumn(reader, rowGroupStart, rowGroupEnd, columnMetadata, s
|
||||
const header = parquetHeader(reader)
|
||||
if (header.type === 'DICTIONARY_PAGE') {
|
||||
// assert(!dictionary)
|
||||
dictionary = readPage(reader, header, columnMetadata, schemaPath, element, dictionary, undefined, 0, options)
|
||||
dictionary = convert(dictionary, element, options.utf8)
|
||||
dictionary = readPage(reader, header, columnDecoder, dictionary, undefined, 0)
|
||||
dictionary = convert(dictionary, element, utf8)
|
||||
} else {
|
||||
const lastChunk = chunks.at(-1)
|
||||
const lastChunkLength = lastChunk?.length || 0
|
||||
const values = readPage(reader, header, columnMetadata, schemaPath, element, dictionary, lastChunk, rowGroupStart - rowCount, options)
|
||||
const values = readPage(reader, header, columnDecoder, dictionary, lastChunk, rowGroupStart - rowCount)
|
||||
if (lastChunk === values) {
|
||||
// continued from previous page
|
||||
rowCount += values.length - lastChunkLength
|
||||
@ -65,16 +63,14 @@ export function readColumn(reader, rowGroupStart, rowGroupEnd, columnMetadata, s
|
||||
*
|
||||
* @param {DataReader} reader
|
||||
* @param {PageHeader} header
|
||||
* @param {ColumnMetaData} columnMetadata
|
||||
* @param {SchemaTree[]} schemaPath
|
||||
* @param {SchemaElement} element
|
||||
* @param {ColumnDecoder} columnDecoder
|
||||
* @param {DecodedArray | undefined} dictionary
|
||||
* @param {DecodedArray | undefined} previousChunk
|
||||
* @param {number} pageStart skip this many rows in the page
|
||||
* @param {ParquetReadOptions} options
|
||||
* @returns {DecodedArray}
|
||||
*/
|
||||
export function readPage(reader, header, columnMetadata, schemaPath, element, dictionary, previousChunk, pageStart, { utf8, compressors }) {
|
||||
export function readPage(reader, header, columnDecoder, dictionary, previousChunk, pageStart) {
|
||||
const { type, element, schemaPath, codec, compressors, utf8 } = columnDecoder
|
||||
// read compressed_page_size bytes
|
||||
const compressedBytes = new Uint8Array(
|
||||
reader.view.buffer, reader.view.byteOffset + reader.offset, header.compressed_page_size
|
||||
@ -91,8 +87,8 @@ export function readPage(reader, header, columnMetadata, schemaPath, element, di
|
||||
return new Array(daph.num_values) // TODO: don't allocate array
|
||||
}
|
||||
|
||||
const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors)
|
||||
const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata)
|
||||
const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), codec, compressors)
|
||||
const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, columnDecoder)
|
||||
// assert(!daph.statistics?.null_count || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length))
|
||||
|
||||
// convert types, dereference dictionary, and assemble lists
|
||||
@ -118,9 +114,8 @@ export function readPage(reader, header, columnMetadata, schemaPath, element, di
|
||||
return new Array(daph2.num_values) // TODO: don't allocate array
|
||||
}
|
||||
|
||||
const { definitionLevels, repetitionLevels, dataPage } = readDataPageV2(
|
||||
compressedBytes, header, schemaPath, columnMetadata, compressors
|
||||
)
|
||||
const { definitionLevels, repetitionLevels, dataPage } =
|
||||
readDataPageV2(compressedBytes, header, columnDecoder)
|
||||
|
||||
// convert types, dereference dictionary, and assemble lists
|
||||
const values = convertWithDictionary(dataPage, dictionary, element, daph2.encoding, utf8)
|
||||
@ -131,11 +126,11 @@ export function readPage(reader, header, columnMetadata, schemaPath, element, di
|
||||
if (!diph) throw new Error('parquet dictionary page header is undefined')
|
||||
|
||||
const page = decompressPage(
|
||||
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors
|
||||
compressedBytes, Number(header.uncompressed_page_size), codec, compressors
|
||||
)
|
||||
|
||||
const reader = { view: new DataView(page.buffer, page.byteOffset, page.byteLength), offset: 0 }
|
||||
return readPlain(reader, columnMetadata.type, diph.num_values, element.type_length)
|
||||
return readPlain(reader, type, diph.num_values, element.type_length)
|
||||
} else {
|
||||
throw new Error(`parquet unsupported page type: ${header.type}`)
|
||||
}
|
||||
@ -155,7 +150,7 @@ export function getColumnRange({ dictionary_page_offset, data_page_offset, total
|
||||
/**
|
||||
* Read parquet header from a buffer.
|
||||
*
|
||||
* @import {ColumnMetaData, DecodedArray, DataReader, PageHeader, ParquetReadOptions, SchemaElement, SchemaTree} from '../src/types.d.ts'
|
||||
* @import {ColumnMetaData, DecodedArray, DataReader, PageHeader, ColumnDecoder} from '../src/types.d.ts'
|
||||
* @param {DataReader} reader
|
||||
* @returns {PageHeader}
|
||||
*/
|
||||
|
||||
@ -9,11 +9,10 @@ import { snappyUncompress } from './snappy.js'
|
||||
*
|
||||
* @param {Uint8Array} bytes raw page data (should already be decompressed)
|
||||
* @param {DataPageHeader} daph data page header
|
||||
* @param {SchemaTree[]} schemaPath
|
||||
* @param {ColumnMetaData} columnMetadata
|
||||
* @param {ColumnDecoder} columnDecoder
|
||||
* @returns {DataPage} definition levels, repetition levels, and array of values
|
||||
*/
|
||||
export function readDataPage(bytes, daph, schemaPath, { type }) {
|
||||
export function readDataPage(bytes, daph, { type, element, schemaPath }) {
|
||||
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
|
||||
const reader = { view, offset: 0 }
|
||||
/** @type {DecodedArray} */
|
||||
@ -28,8 +27,7 @@ export function readDataPage(bytes, daph, schemaPath, { type }) {
|
||||
// read values based on encoding
|
||||
const nValues = daph.num_values - numNulls
|
||||
if (daph.encoding === 'PLAIN') {
|
||||
const { type_length } = schemaPath[schemaPath.length - 1].element
|
||||
dataPage = readPlain(reader, type, nValues, type_length)
|
||||
dataPage = readPlain(reader, type, nValues, element.type_length)
|
||||
} else if (
|
||||
daph.encoding === 'PLAIN_DICTIONARY' ||
|
||||
daph.encoding === 'RLE_DICTIONARY' ||
|
||||
@ -49,8 +47,7 @@ export function readDataPage(bytes, daph, schemaPath, { type }) {
|
||||
dataPage = new Uint8Array(nValues) // nValue zeroes
|
||||
}
|
||||
} else if (daph.encoding === 'BYTE_STREAM_SPLIT') {
|
||||
const { type_length } = schemaPath[schemaPath.length - 1].element
|
||||
dataPage = byteStreamSplit(reader, nValues, type, type_length)
|
||||
dataPage = byteStreamSplit(reader, nValues, type, element.type_length)
|
||||
} else {
|
||||
throw new Error(`parquet unsupported encoding: ${daph.encoding}`)
|
||||
}
|
||||
@ -59,7 +56,7 @@ export function readDataPage(bytes, daph, schemaPath, { type }) {
|
||||
}
|
||||
|
||||
/**
|
||||
* @import {ColumnMetaData, CompressionCodec, Compressors, DataPage, DataPageHeader, DataPageHeaderV2, DataReader, DecodedArray, PageHeader, SchemaTree} from '../src/types.d.ts'
|
||||
* @import {ColumnDecoder, CompressionCodec, Compressors, DataPage, DataPageHeader, DataPageHeaderV2, DataReader, DecodedArray, PageHeader, SchemaTree} from '../src/types.d.ts'
|
||||
* @param {DataReader} reader data view for the page
|
||||
* @param {DataPageHeader} daph data page header
|
||||
* @param {SchemaTree[]} schemaPath
|
||||
@ -133,15 +130,13 @@ export function decompressPage(compressedBytes, uncompressed_page_size, codec, c
|
||||
*
|
||||
* @param {Uint8Array} compressedBytes raw page data
|
||||
* @param {PageHeader} ph page header
|
||||
* @param {SchemaTree[]} schemaPath
|
||||
* @param {ColumnMetaData} columnMetadata
|
||||
* @param {Compressors | undefined} compressors
|
||||
* @param {ColumnDecoder} columnDecoder
|
||||
* @returns {DataPage} definition levels, repetition levels, and array of values
|
||||
*/
|
||||
export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, compressors) {
|
||||
export function readDataPageV2(compressedBytes, ph, columnDecoder) {
|
||||
const view = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength)
|
||||
const reader = { view, offset: 0 }
|
||||
const { codec, type } = columnMetadata
|
||||
const { type, element, schemaPath, codec, compressors } = columnDecoder
|
||||
const daph2 = ph.data_page_header_v2
|
||||
if (!daph2) throw new Error('parquet data page header v2 is undefined')
|
||||
|
||||
@ -167,10 +162,9 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata,
|
||||
let dataPage
|
||||
const nValues = daph2.num_values - daph2.num_nulls
|
||||
if (daph2.encoding === 'PLAIN') {
|
||||
const { type_length } = schemaPath[schemaPath.length - 1].element
|
||||
dataPage = readPlain(pageReader, type, nValues, type_length)
|
||||
dataPage = readPlain(pageReader, type, nValues, element.type_length)
|
||||
} else if (daph2.encoding === 'RLE') {
|
||||
// assert(columnMetadata.type === 'BOOLEAN')
|
||||
// assert(type === 'BOOLEAN')
|
||||
dataPage = new Array(nValues)
|
||||
readRleBitPackedHybrid(pageReader, 1, 0, dataPage)
|
||||
dataPage = dataPage.map(x => !!x)
|
||||
@ -192,8 +186,7 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata,
|
||||
dataPage = new Array(nValues)
|
||||
deltaByteArray(pageReader, nValues, dataPage)
|
||||
} else if (daph2.encoding === 'BYTE_STREAM_SPLIT') {
|
||||
const { type_length } = schemaPath[schemaPath.length - 1].element
|
||||
dataPage = byteStreamSplit(reader, nValues, type, type_length)
|
||||
dataPage = byteStreamSplit(reader, nValues, type, element.type_length)
|
||||
} else {
|
||||
throw new Error(`parquet unsupported encoding: ${daph2.encoding}`)
|
||||
}
|
||||
|
||||
11
src/read.js
11
src/read.js
@ -134,7 +134,16 @@ export async function readRowGroup(options, rowGroup, groupStart) {
|
||||
promises.push(buffer.then(arrayBuffer => {
|
||||
const schemaPath = getSchemaPath(metadata.schema, columnMetadata.path_in_schema)
|
||||
const reader = { view: new DataView(arrayBuffer), offset: bufferOffset }
|
||||
const columnData = readColumn(reader, rowGroupStart, rowGroupEnd, columnMetadata, schemaPath, options)
|
||||
const columnDecoder = {
|
||||
columnName: columnMetadata.path_in_schema.join('.'),
|
||||
type: columnMetadata.type,
|
||||
element: schemaPath[schemaPath.length - 1].element,
|
||||
schemaPath,
|
||||
codec: columnMetadata.codec,
|
||||
compressors: options.compressors,
|
||||
utf8: options.utf8,
|
||||
}
|
||||
const columnData = readColumn(reader, rowGroupStart, rowGroupEnd, columnDecoder)
|
||||
/** @type {DecodedArray[] | undefined} */
|
||||
let chunks = columnData
|
||||
|
||||
|
||||
10
src/types.d.ts
vendored
10
src/types.d.ts
vendored
@ -376,3 +376,13 @@ export type BoundaryOrder = 'UNORDERED' | 'ASCENDING' | 'DESCENDING'
|
||||
|
||||
export type ThriftObject = { [ key: `field_${number}` ]: ThriftType }
|
||||
export type ThriftType = boolean | number | bigint | Uint8Array | ThriftType[] | ThriftObject
|
||||
|
||||
export interface ColumnDecoder {
|
||||
columnName: string
|
||||
type: ParquetType
|
||||
element: SchemaElement
|
||||
schemaPath: SchemaTree[]
|
||||
codec: CompressionCodec
|
||||
compressors?: Compressors
|
||||
utf8?: boolean
|
||||
}
|
||||
|
||||
@ -23,8 +23,15 @@ describe('readColumn', () => {
|
||||
const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte)
|
||||
const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? [])
|
||||
const reader = { view: new DataView(columnArrayBuffer), offset: 0 }
|
||||
const columnDecoder = {
|
||||
columnName: column.meta_data.path_in_schema.join('.'),
|
||||
type: column.meta_data.type,
|
||||
element: schemaPath[schemaPath.length - 1].element,
|
||||
schemaPath,
|
||||
codec: column.meta_data.codec,
|
||||
}
|
||||
|
||||
const result = readColumn(reader, 0, rowGroupEnd, column.meta_data, schemaPath, { file })
|
||||
const result = readColumn(reader, 0, rowGroupEnd, columnDecoder)
|
||||
expect(result).toEqual(expected)
|
||||
})
|
||||
|
||||
@ -40,8 +47,15 @@ describe('readColumn', () => {
|
||||
const columnArrayBuffer = arrayBuffer.slice(columnStartByte, columnEndByte)
|
||||
const schemaPath = getSchemaPath(metadata.schema, column.meta_data?.path_in_schema ?? [])
|
||||
const reader = { view: new DataView(columnArrayBuffer), offset: 0 }
|
||||
const columnDecoder = {
|
||||
columnName: column.meta_data.path_in_schema.join('.'),
|
||||
type: column.meta_data.type,
|
||||
element: schemaPath[schemaPath.length - 1].element,
|
||||
schemaPath,
|
||||
codec: column.meta_data.codec,
|
||||
}
|
||||
|
||||
const columnData = readColumn(reader, 0, Infinity, column.meta_data, schemaPath, { file })
|
||||
const columnData = readColumn(reader, 0, Infinity, columnDecoder)
|
||||
expect(columnData[0]).toBeInstanceOf(Int32Array)
|
||||
})
|
||||
})
|
||||
|
||||
Loading…
Reference in New Issue
Block a user