Prepare for data page v2

This commit is contained in:
Kenny Daniel 2024-02-26 10:32:53 -08:00
parent cf1f0595e1
commit c70b3b2227
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
8 changed files with 110 additions and 64 deletions

@ -69,6 +69,11 @@ const metadata = parquetMetadata(arrayBuffer)
To parse parquet files from a user drag-and-drop action, see example in [index.html](index.html).
## Async
Hyparquet supports asynchronous fetching of parquet files, over a network.
You can provide an `AsyncBuffer` which is like a js `ArrayBuffer` but the `slice` method returns `Promise<ArrayBuffer>`.
## Supported Parquet Files
The parquet format supports a number of different compression and encoding types.

@ -57,6 +57,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
const dictionaryEncoding = daph.encoding === Encoding.PLAIN_DICTIONARY || daph.encoding === Encoding.RLE_DICTIONARY
// construct output values: skip nulls and construct lists
/** @type {any[]} */
let values
if (repetitionLevels.length) {
// Use repetition levels to construct lists
@ -73,32 +74,8 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
} else if (definitionLevels?.length) {
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
// Use definition levels to skip nulls
let index = 0
values = []
const decoder = new TextDecoder()
for (let i = 0; i < definitionLevels.length; i++) {
if (definitionLevels[i] === maxDefinitionLevel) {
if (index > dataPage.length) {
throw new Error(`parquet index ${index} exceeds data page length ${dataPage.length}`)
}
let v = dataPage[index++]
// map to dictionary value
if (dictionary) {
v = dictionary[v]
if (v instanceof Uint8Array) {
try {
v = decoder.decode(v)
} catch (e) {
console.warn('parquet failed to decode byte array as string', e)
}
}
}
values[i] = v
} else {
values[i] = undefined
}
}
skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, values)
} else {
if (dictionaryEncoding && dictionary !== undefined && Array.isArray(dataPage)) {
// dereference dictionary values
@ -124,7 +101,9 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
const diph = header.dictionary_page_header
if (!diph) throw new Error('parquet dictionary page header is undefined')
const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec)
const page = decompressPage(
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec
)
dictionary = readDictionaryPage(page, diph, schema, columnMetadata)
} else if (header.type === PageType.DATA_PAGE_V2) {
throw new Error('parquet data page v2 not supported')
@ -235,3 +214,41 @@ function decompressPage(compressedBytes, uncompressed_page_size, codec) {
}
return page
}
/**
* Expand data page list with nulls and convert to utf8.
* @param {number[]} definitionLevels
* @param {number} maxDefinitionLevel
* @param {ArrayLike<any>} dataPage
* @param {any} dictionary
* @param {any[]} output
*/
function skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, output) {
if (output.length) throw new Error('parquet output array is not empty')
// Use definition levels to skip nulls
let index = 0
const decoder = new TextDecoder()
for (let i = 0; i < definitionLevels.length; i++) {
if (definitionLevels[i] === maxDefinitionLevel) {
if (index > dataPage.length) {
throw new Error(`parquet index ${index} exceeds data page length ${dataPage.length}`)
}
let v = dataPage[index++]
// map to dictionary value
if (dictionary) {
v = dictionary[v]
if (v instanceof Uint8Array) {
try {
v = decoder.decode(v)
} catch (e) {
console.warn('parquet failed to decode byte array as string', e)
}
}
}
output[i] = v
} else {
output[i] = undefined
}
}
}

@ -11,7 +11,8 @@ import {
const skipNulls = false // TODO
/**
* @typedef {{ definitionLevels: number[] | undefined, repetitionLevels: number[], value: ArrayLike<any> }} DataPage
* @typedef {{ byteLength: number, definitionLevels: number[], numNulls: number }} DefinitionLevels
* @typedef {import("./types.d.ts").DataPage} DataPage
* @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData
* @typedef {import("./types.d.ts").DataPageHeader} DataPageHeader
* @typedef {import("./types.d.ts").DictionaryPageHeader} DictionaryPageHeader
@ -34,7 +35,7 @@ const skipNulls = false // TODO
export function readDataPage(bytes, daph, schema, columnMetadata) {
const dataView = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
let offset = 0
/** @type {ArrayLike<any>} */
/** @type {any[]} */
let values = []
// repetition levels
@ -47,23 +48,24 @@ export function readDataPage(bytes, daph, schema, columnMetadata) {
let definitionLevels = undefined
let numNulls = 0
// let maxDefinitionLevel = -1
// TODO: move into readDefinitionLevels
if (skipNulls && !isRequired(schema, columnMetadata.path_in_schema)) {
// skip_definition_bytes
offset += skipDefinitionBytes(daph.num_values)
} else {
const dl = readDefinitionLevels(dataView, offset, daph, schema, columnMetadata)
const dl = readDefinitionLevels(dataView, offset, daph, schema, columnMetadata.path_in_schema)
definitionLevels = dl.definitionLevels
numNulls = dl.numNulls
offset += dl.byteLength
}
// read values based on encoding
const nval = daph.num_values - numNulls
const nValues = daph.num_values - numNulls
if (daph.encoding === Encoding.PLAIN) {
const se = schemaElement(schema, columnMetadata.path_in_schema)
const utf8 = se.converted_type === 'UTF8'
const plainObj = readPlain(dataView, columnMetadata.type, nval, offset, utf8)
values = plainObj.value
const plainObj = readPlain(dataView, columnMetadata.type, nValues, offset, utf8)
values = Array.isArray(plainObj.value) ? plainObj.value : Array.from(plainObj.value)
offset += plainObj.byteLength
} else if (
daph.encoding === Encoding.PLAIN_DICTIONARY ||
@ -81,13 +83,13 @@ export function readDataPage(bytes, daph, schema, columnMetadata) {
}
if (bitWidth) {
const { value, byteLength } = readRleBitPackedHybrid(
dataView, offset, bitWidth, dataView.byteLength - offset, nval
dataView, offset, bitWidth, dataView.byteLength - offset, nValues
)
offset += byteLength
values = value
values = Array.isArray(value) ? value : Array.from(value)
} else {
// nval zeros
values = new Array(nval).fill(0)
values = new Array(nValues).fill(0)
}
} else {
throw new Error(`parquet unsupported encoding: ${daph.encoding}`)
@ -136,8 +138,6 @@ function readRepetitionLevels(dataView, offset, daph, schema, columnMetadata) {
return { value: [], byteLength: 0 }
}
/** @typedef {{ byteLength: number, definitionLevels: number[], numNulls: number }} DefinitionLevels */
/**
* Read the definition levels from this page, if any.
*
@ -145,12 +145,12 @@ function readRepetitionLevels(dataView, offset, daph, schema, columnMetadata) {
* @param {number} offset offset to start reading from
* @param {DataPageHeader} daph data page header
* @param {SchemaElement[]} schema schema for the file
* @param {ColumnMetaData} columnMetadata metadata for the column
* @param {string[]} path_in_schema path in the schema
* @returns {DefinitionLevels} definition levels and number of bytes read
*/
function readDefinitionLevels(dataView, offset, daph, schema, columnMetadata) {
if (!isRequired(schema, columnMetadata.path_in_schema)) {
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
function readDefinitionLevels(dataView, offset, daph, schema, path_in_schema) {
if (!isRequired(schema, path_in_schema)) {
const maxDefinitionLevel = getMaxDefinitionLevel(schema, path_in_schema)
const bitWidth = widthFromMaxInt(maxDefinitionLevel)
if (bitWidth) {
// num_values is index 1 for either type of page header

@ -149,12 +149,13 @@ function readPlainByteArrayFixed(dataView, offset, fixedLength) {
/**
* Read `count` values of the given type from the dataView.
*
* @typedef {import("./types.d.ts").DecodedArray} DecodedArray
* @param {DataView} dataView - buffer to read data from
* @param {number} type - parquet type of the data
* @param {number} count - number of values to read
* @param {number} offset - offset to start reading from the DataView
* @param {boolean} utf8 - whether to decode byte arrays as UTF-8
* @returns {Decoded<ArrayLike<any>>} array of values
* @returns {Decoded<DecodedArray>} array of values
*/
export function readPlain(dataView, type, count, offset, utf8) {
if (count === 0) return { value: [], byteLength: 0 }
@ -323,7 +324,13 @@ function readBitPacked(dataView, offset, header, bitWidth, remaining) {
let count = (header >> 1) << 3
const mask = maskForBits(bitWidth)
let data = dataView.getUint8(offset)
// Sometimes it tries to read outside of available memory, but it will be masked out anyway
let data = 0
if (offset < dataView.byteLength) {
data = dataView.getUint8(offset)
} else if (mask) {
throw new Error(`parquet bitpack offset ${offset} out of range`)
}
let byteLength = 1
let left = 8
let right = 0

@ -52,7 +52,7 @@ export function parquetHeader(arrayBuffer, offset) {
encoding: header.field_8.field_4,
definition_levels_byte_length: header.field_8.field_5,
repetition_levels_byte_length: header.field_8.field_6,
is_compressed: header.field_8.field_7,
is_compressed: header.field_8.field_7 === undefined ? true : header.field_8.field_7, // default to true
statistics: header.field_8.field_8,
}

@ -138,7 +138,7 @@ async function readRowGroup(options, rowGroup) {
arrayBuffer, bufferOffset, rowGroup, columnMetadata, metadata.schema
)
if (columnData.length !== Number(rowGroup.num_rows)) {
throw new Error('parquet column length does not match row group length')
throw new Error(`parquet column length ${columnData.length} does not match row group length ${rowGroup.num_rows}`)
}
// notify caller of column data
if (options.onChunk) options.onChunk({ column: columnIndex, data: columnData, rowStart: 0, rowEnd: columnData.length })

8
src/types.d.ts vendored

@ -212,3 +212,11 @@ interface DataPageHeaderV2 {
is_compressed?: boolean
statistics?: Statistics
}
type DecodedArray = any[] | Uint8Array
interface DataPage {
definitionLevels: number[] | undefined
repetitionLevels: number[]
value: any[]
}

@ -4,20 +4,26 @@ import { snappyUncompress } from '../src/snappy.js'
describe('snappy uncompress', () => {
it('decompresses valid input correctly', async () => {
const testCases = [
{ compressed: new Uint8Array([0x00]), expected: '' },
{ compressed: new Uint8Array([0x01, 0x00, 0x68]), expected: 'h' },
{ compressed: new Uint8Array([0x02, 0x04, 0x68, 0x79]), expected: 'hy' },
{ compressed: new Uint8Array([0x03, 0x08, 0x68, 0x79, 0x70]), expected: 'hyp' },
{ compressed: new Uint8Array([0x05, 0x10, 0x68, 0x79, 0x70, 0x65, 0x72]), expected: 'hyper' },
{ compressed: new Uint8Array([0x0a, 0x24, 0x68, 0x79, 0x70, 0x65, 0x72, 0x70, 0x61, 0x72, 0x61, 0x6d]), expected: 'hyperparam' },
{ compressed: new Uint8Array([0x15, 0x08, 0x68, 0x79, 0x70, 0x46, 0x03, 0x00]), expected: 'hyphyphyphyphyphyphyp' },
{ compressed: [0x00], expected: '' },
{ compressed: [0x01, 0x00, 0x68], expected: 'h' },
{ compressed: [0x02, 0x04, 0x68, 0x79], expected: 'hy' },
{ compressed: [0x03, 0x08, 0x68, 0x79, 0x70], expected: 'hyp' },
{ compressed: [0x05, 0x10, 0x68, 0x79, 0x70, 0x65, 0x72], expected: 'hyper' },
{
compressed: [0x0a, 0x24, 0x68, 0x79, 0x70, 0x65, 0x72, 0x70, 0x61, 0x72, 0x61, 0x6d],
expected: 'hyperparam',
},
{
compressed: [0x15, 0x08, 0x68, 0x79, 0x70, 0x46, 0x03, 0x00],
expected: 'hyphyphyphyphyphyphyp',
},
{
// from rowgroups.parquet
compressed: new Uint8Array([
compressed: [
80, 4, 1, 0, 9, 1, 0, 2, 9, 7, 4, 0, 3, 13, 8, 0, 4, 13, 8, 0, 5, 13,
8, 0, 6, 13, 8, 0, 7, 13, 8, 0, 8, 13, 8, 60, 9, 0, 0, 0, 0, 0, 0, 0,
10, 0, 0, 0, 0, 0, 0, 0,
]),
],
expected: new Uint8Array([
1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0,
0, 4, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0,
@ -25,16 +31,19 @@ describe('snappy uncompress', () => {
0, 0, 0, 10, 0, 0, 0, 0, 0, 0, 0,
]),
},
// from datapage_v2.snappy.parquet
{ compressed: [2, 4, 0, 3], expected: new Uint8Array([0, 3]) },
{ compressed: [ 6, 20, 2, 0, 0, 0, 3, 23], expected: new Uint8Array([2, 0, 0, 0, 3, 23]) },
]
const futures = testCases.map(async ({ compressed, expected }) => {
const outputArray = new Uint8Array(expected.length)
await snappyUncompress(compressed, outputArray)
const output = new Uint8Array(expected.length)
await snappyUncompress(new Uint8Array(compressed), output)
if (typeof expected === 'string') {
const outputStr = new TextDecoder().decode(outputArray)
const outputStr = new TextDecoder().decode(output)
expect(outputStr).toBe(expected)
} else {
expect(outputArray).toEqual(expected) // Uint8Array
expect(output).toEqual(expected) // Uint8Array
}
})
@ -42,16 +51,16 @@ describe('snappy uncompress', () => {
})
it('throws for invalid input', () => {
const outputArray = new Uint8Array(10)
expect(() => snappyUncompress(new Uint8Array([]), outputArray))
const output = new Uint8Array(10)
expect(() => snappyUncompress(new Uint8Array([]), output))
.toThrow('invalid snappy length header')
expect(() => snappyUncompress(new Uint8Array([0xff]), outputArray))
expect(() => snappyUncompress(new Uint8Array([0xff]), output))
.toThrow('invalid snappy length header')
expect(() => snappyUncompress(new Uint8Array([0x03, 0x61]), outputArray))
expect(() => snappyUncompress(new Uint8Array([0x03, 0x61]), output))
.toThrow('missing eof marker')
expect(() => snappyUncompress(new Uint8Array([0x03, 0xf1]), outputArray))
expect(() => snappyUncompress(new Uint8Array([0x03, 0xf1]), output))
.toThrow('missing eof marker')
expect(() => snappyUncompress(new Uint8Array([0x02, 0x00, 0x68]), outputArray))
expect(() => snappyUncompress(new Uint8Array([0x02, 0x00, 0x68]), output))
.toThrow('premature end of input')
})
})