mirror of
https://github.com/asadbek064/hyparquet.git
synced 2025-12-06 06:51:54 +00:00
Move deltaBinaryUnpack to delta.js and always consume miniblock
This commit is contained in:
parent
561f06f701
commit
48d79e6a1d
11
README.md
11
README.md
@ -174,6 +174,17 @@ parquetRead({ file, compressors: {
|
||||
}})
|
||||
```
|
||||
|
||||
Parquet encodings:
|
||||
- [X] PLAIN
|
||||
- [X] PLAIN_DICTIONARY
|
||||
- [X] RLE_DICTIONARY
|
||||
- [X] RLE
|
||||
- [X] BIT_PACKED
|
||||
- [X] DELTA_BINARY_PACKED
|
||||
- [ ] DELTA_BYTE_ARRAY
|
||||
- [ ] DELTA_LENGTH_BYTE_ARRAY
|
||||
- [ ] BYTE_STREAM_SPLIT
|
||||
|
||||
## Hysnappy
|
||||
|
||||
The most common compression codec used in parquet is snappy compression.
|
||||
|
||||
@ -1,8 +1,8 @@
|
||||
import { decompressPage } from './column.js'
|
||||
import { deltaBinaryUnpack } from './delta.js'
|
||||
import { readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js'
|
||||
import { readPlain } from './plain.js'
|
||||
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
|
||||
import { readVarInt, readZigZagBigInt } from './thrift.js'
|
||||
|
||||
/**
|
||||
* Read a data page from the given Uint8Array.
|
||||
@ -66,7 +66,7 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata,
|
||||
} else if (daph2.encoding === 'DELTA_BINARY_PACKED') {
|
||||
const int32 = columnMetadata.type === 'INT32'
|
||||
dataPage = int32 ? new Int32Array(nValues) : new BigInt64Array(nValues)
|
||||
deltaBinaryUnpack(page, nValues, dataPage)
|
||||
deltaBinaryUnpack(pageReader, nValues, dataPage)
|
||||
} else {
|
||||
throw new Error(`parquet unsupported encoding: ${daph2.encoding}`)
|
||||
}
|
||||
@ -109,60 +109,3 @@ function readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) {
|
||||
return values
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Uint8Array} page page data
|
||||
* @param {number} nValues number of values to read
|
||||
* @param {Int32Array | BigInt64Array} values array to write to
|
||||
*/
|
||||
function deltaBinaryUnpack(page, nValues, values) {
|
||||
const int32 = values instanceof Int32Array
|
||||
const view = new DataView(page.buffer, page.byteOffset, page.byteLength)
|
||||
const reader = { view, offset: 0 }
|
||||
const blockSize = readVarInt(reader)
|
||||
const miniblockPerBlock = readVarInt(reader)
|
||||
let count = readVarInt(reader)
|
||||
let value = readZigZagBigInt(reader) // first value
|
||||
let valueIndex = 0
|
||||
values[valueIndex++] = int32 ? Number(value) : value
|
||||
|
||||
const valuesPerMiniblock = blockSize / miniblockPerBlock
|
||||
|
||||
while (valueIndex < nValues) {
|
||||
const minDelta = readZigZagBigInt(reader)
|
||||
const bitWidths = new Uint8Array(miniblockPerBlock)
|
||||
for (let i = 0; i < miniblockPerBlock; i++) {
|
||||
bitWidths[i] = page[reader.offset++]
|
||||
}
|
||||
|
||||
for (let i = 0; i < miniblockPerBlock; i++) {
|
||||
let miniblockCount = Math.min(count, valuesPerMiniblock)
|
||||
const bitWidth = BigInt(bitWidths[i])
|
||||
if (bitWidth) {
|
||||
if (count > 1) {
|
||||
const mask = (1n << bitWidth) - 1n
|
||||
let bitpackPos = 0n
|
||||
while (count && miniblockCount) {
|
||||
let bits = BigInt(view.getUint8(reader.offset)) >> bitpackPos & mask // TODO: don't re-read value every time
|
||||
bitpackPos += bitWidth
|
||||
while (bitpackPos >= 8) {
|
||||
bitpackPos -= 8n
|
||||
reader.offset++
|
||||
bits |= BigInt(view.getUint8(reader.offset)) << bitWidth - bitpackPos & mask
|
||||
}
|
||||
const delta = minDelta + bits
|
||||
value += delta
|
||||
values[valueIndex++] = int32 ? Number(value) : value
|
||||
count--
|
||||
miniblockCount--
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (let j = 0; j < valuesPerMiniblock && valueIndex < nValues; j++) {
|
||||
value += minDelta
|
||||
values[valueIndex++] = int32 ? Number(value) : value
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
62
src/delta.js
Normal file
62
src/delta.js
Normal file
@ -0,0 +1,62 @@
|
||||
import { readVarInt, readZigZagBigInt } from './thrift.js'
|
||||
|
||||
/**
|
||||
* @typedef {import('./types.d.ts').DataReader} DataReader
|
||||
* @param {DataReader} reader
|
||||
* @param {number} nValues number of values to read
|
||||
* @param {Int32Array | BigInt64Array} output output array
|
||||
*/
|
||||
export function deltaBinaryUnpack(reader, nValues, output) {
|
||||
const int32 = output instanceof Int32Array
|
||||
const blockSize = readVarInt(reader)
|
||||
const miniblockPerBlock = readVarInt(reader)
|
||||
readVarInt(reader) // assert(count === nValues)
|
||||
let value = readZigZagBigInt(reader) // first value
|
||||
let outputIndex = 0
|
||||
output[outputIndex++] = int32 ? Number(value) : value
|
||||
|
||||
const valuesPerMiniblock = blockSize / miniblockPerBlock
|
||||
|
||||
while (outputIndex < nValues) {
|
||||
// new block
|
||||
const minDelta = readZigZagBigInt(reader)
|
||||
const bitWidths = new Uint8Array(miniblockPerBlock)
|
||||
for (let i = 0; i < miniblockPerBlock; i++) {
|
||||
bitWidths[i] = reader.view.getUint8(reader.offset++)
|
||||
}
|
||||
|
||||
for (let i = 0; i < miniblockPerBlock && outputIndex < nValues; i++) {
|
||||
// new miniblock
|
||||
const bitWidth = BigInt(bitWidths[i])
|
||||
if (bitWidth) {
|
||||
let bitpackPos = 0n
|
||||
let miniblockCount = valuesPerMiniblock
|
||||
const mask = (1n << bitWidth) - 1n
|
||||
while (miniblockCount && outputIndex < nValues) {
|
||||
let bits = BigInt(reader.view.getUint8(reader.offset)) >> bitpackPos & mask // TODO: don't re-read value every time
|
||||
bitpackPos += bitWidth
|
||||
while (bitpackPos >= 8) {
|
||||
bitpackPos -= 8n
|
||||
reader.offset++
|
||||
if (bitpackPos) {
|
||||
bits |= BigInt(reader.view.getUint8(reader.offset)) << bitWidth - bitpackPos & mask
|
||||
}
|
||||
}
|
||||
const delta = minDelta + bits
|
||||
value += delta
|
||||
output[outputIndex++] = int32 ? Number(value) : value
|
||||
miniblockCount--
|
||||
}
|
||||
if (miniblockCount) {
|
||||
// consume leftover miniblock
|
||||
reader.offset += Math.ceil((miniblockCount * Number(bitWidth) + Number(bitpackPos)) / 8)
|
||||
}
|
||||
} else {
|
||||
for (let j = 0; j < valuesPerMiniblock && outputIndex < nValues; j++) {
|
||||
value += minDelta
|
||||
output[outputIndex++] = int32 ? Number(value) : value
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user