diff --git a/README.md b/README.md index fbf99d7..cf554b6 100644 --- a/README.md +++ b/README.md @@ -174,6 +174,17 @@ parquetRead({ file, compressors: { }}) ``` +Parquet encodings: + - [X] PLAIN + - [X] PLAIN_DICTIONARY + - [X] RLE_DICTIONARY + - [X] RLE + - [X] BIT_PACKED + - [X] DELTA_BINARY_PACKED + - [ ] DELTA_BYTE_ARRAY + - [ ] DELTA_LENGTH_BYTE_ARRAY + - [ ] BYTE_STREAM_SPLIT + ## Hysnappy The most common compression codec used in parquet is snappy compression. diff --git a/src/datapageV2.js b/src/datapageV2.js index 7e1f252..9c652aa 100644 --- a/src/datapageV2.js +++ b/src/datapageV2.js @@ -1,8 +1,8 @@ import { decompressPage } from './column.js' +import { deltaBinaryUnpack } from './delta.js' import { readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js' import { readPlain } from './plain.js' import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' -import { readVarInt, readZigZagBigInt } from './thrift.js' /** * Read a data page from the given Uint8Array. @@ -66,7 +66,7 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, } else if (daph2.encoding === 'DELTA_BINARY_PACKED') { const int32 = columnMetadata.type === 'INT32' dataPage = int32 ? new Int32Array(nValues) : new BigInt64Array(nValues) - deltaBinaryUnpack(page, nValues, dataPage) + deltaBinaryUnpack(pageReader, nValues, dataPage) } else { throw new Error(`parquet unsupported encoding: ${daph2.encoding}`) } @@ -109,60 +109,3 @@ function readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) { return values } } - -/** - * @param {Uint8Array} page page data - * @param {number} nValues number of values to read - * @param {Int32Array | BigInt64Array} values array to write to - */ -function deltaBinaryUnpack(page, nValues, values) { - const int32 = values instanceof Int32Array - const view = new DataView(page.buffer, page.byteOffset, page.byteLength) - const reader = { view, offset: 0 } - const blockSize = readVarInt(reader) - const miniblockPerBlock = readVarInt(reader) - let count = readVarInt(reader) - let value = readZigZagBigInt(reader) // first value - let valueIndex = 0 - values[valueIndex++] = int32 ? Number(value) : value - - const valuesPerMiniblock = blockSize / miniblockPerBlock - - while (valueIndex < nValues) { - const minDelta = readZigZagBigInt(reader) - const bitWidths = new Uint8Array(miniblockPerBlock) - for (let i = 0; i < miniblockPerBlock; i++) { - bitWidths[i] = page[reader.offset++] - } - - for (let i = 0; i < miniblockPerBlock; i++) { - let miniblockCount = Math.min(count, valuesPerMiniblock) - const bitWidth = BigInt(bitWidths[i]) - if (bitWidth) { - if (count > 1) { - const mask = (1n << bitWidth) - 1n - let bitpackPos = 0n - while (count && miniblockCount) { - let bits = BigInt(view.getUint8(reader.offset)) >> bitpackPos & mask // TODO: don't re-read value every time - bitpackPos += bitWidth - while (bitpackPos >= 8) { - bitpackPos -= 8n - reader.offset++ - bits |= BigInt(view.getUint8(reader.offset)) << bitWidth - bitpackPos & mask - } - const delta = minDelta + bits - value += delta - values[valueIndex++] = int32 ? Number(value) : value - count-- - miniblockCount-- - } - } - } else { - for (let j = 0; j < valuesPerMiniblock && valueIndex < nValues; j++) { - value += minDelta - values[valueIndex++] = int32 ? Number(value) : value - } - } - } - } -} diff --git a/src/delta.js b/src/delta.js new file mode 100644 index 0000000..828880a --- /dev/null +++ b/src/delta.js @@ -0,0 +1,62 @@ +import { readVarInt, readZigZagBigInt } from './thrift.js' + +/** + * @typedef {import('./types.d.ts').DataReader} DataReader + * @param {DataReader} reader + * @param {number} nValues number of values to read + * @param {Int32Array | BigInt64Array} output output array + */ +export function deltaBinaryUnpack(reader, nValues, output) { + const int32 = output instanceof Int32Array + const blockSize = readVarInt(reader) + const miniblockPerBlock = readVarInt(reader) + readVarInt(reader) // assert(count === nValues) + let value = readZigZagBigInt(reader) // first value + let outputIndex = 0 + output[outputIndex++] = int32 ? Number(value) : value + + const valuesPerMiniblock = blockSize / miniblockPerBlock + + while (outputIndex < nValues) { + // new block + const minDelta = readZigZagBigInt(reader) + const bitWidths = new Uint8Array(miniblockPerBlock) + for (let i = 0; i < miniblockPerBlock; i++) { + bitWidths[i] = reader.view.getUint8(reader.offset++) + } + + for (let i = 0; i < miniblockPerBlock && outputIndex < nValues; i++) { + // new miniblock + const bitWidth = BigInt(bitWidths[i]) + if (bitWidth) { + let bitpackPos = 0n + let miniblockCount = valuesPerMiniblock + const mask = (1n << bitWidth) - 1n + while (miniblockCount && outputIndex < nValues) { + let bits = BigInt(reader.view.getUint8(reader.offset)) >> bitpackPos & mask // TODO: don't re-read value every time + bitpackPos += bitWidth + while (bitpackPos >= 8) { + bitpackPos -= 8n + reader.offset++ + if (bitpackPos) { + bits |= BigInt(reader.view.getUint8(reader.offset)) << bitWidth - bitpackPos & mask + } + } + const delta = minDelta + bits + value += delta + output[outputIndex++] = int32 ? Number(value) : value + miniblockCount-- + } + if (miniblockCount) { + // consume leftover miniblock + reader.offset += Math.ceil((miniblockCount * Number(bitWidth) + Number(bitpackPos)) / 8) + } + } else { + for (let j = 0; j < valuesPerMiniblock && outputIndex < nValues; j++) { + value += minDelta + output[outputIndex++] = int32 ? Number(value) : value + } + } + } + } +}