diff --git a/.eslintrc.json b/.eslintrc.json index 14b410c..884b158 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -42,6 +42,9 @@ "no-extra-parens": "error", "no-multi-spaces": "error", "no-trailing-spaces": "error", + "no-useless-concat": "error", + "no-useless-rename": "error", + "no-useless-return": "error", "no-var": "error", "object-curly-spacing": ["error", "always"], "prefer-const": "error", diff --git a/package.json b/package.json index b0e7547..be5cc93 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,7 @@ "eslint-plugin-import": "2.29.1", "eslint-plugin-jsdoc": "48.2.5", "http-server": "14.1.1", - "hyparquet-compressors": "0.1.2", + "hyparquet-compressors": "0.1.3", "typescript": "5.4.5", "vitest": "1.6.0" } diff --git a/src/assemble.js b/src/assemble.js index fd29dac..8c70148 100644 --- a/src/assemble.js +++ b/src/assemble.js @@ -162,7 +162,6 @@ export function assembleNested(subcolumnData, schema, depth = 0) { const inverted = invertStruct(struct, invertDepth) if (optional) flattenAtDepth(inverted, depth) subcolumnData.set(path, inverted) - return } // assert(schema.element.repetition_type !== 'REPEATED') } diff --git a/src/datapage.js b/src/datapage.js index 93a95a1..03cf1af 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -1,6 +1,6 @@ -import { byteStreamSplit, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js' +import { bitWidth, byteStreamSplit, readRleBitPackedHybrid } from './encoding.js' import { readPlain } from './plain.js' -import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired } from './schema.js' +import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' /** * Read a data page from the given Uint8Array. @@ -69,8 +69,6 @@ export function readDictionaryPage(bytes, diph, columnMetadata, typeLength) { } /** - * Read the repetition levels from this page, if any. - * * @typedef {import("./types.d.ts").DataReader} DataReader * @param {DataReader} reader data view for the page * @param {DataPageHeader} daph data page header @@ -81,9 +79,8 @@ function readRepetitionLevels(reader, daph, schemaPath) { if (schemaPath.length > 1) { const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) if (maxRepetitionLevel) { - const bitWidth = widthFromMaxInt(maxRepetitionLevel) const values = new Array(daph.num_values) - readRleBitPackedHybrid(reader, bitWidth, 0, values) + readRleBitPackedHybrid(reader, bitWidth(maxRepetitionLevel), 0, values) return values } } @@ -91,33 +88,24 @@ function readRepetitionLevels(reader, daph, schemaPath) { } /** - * Read the definition levels from this page, if any. - * * @param {DataReader} reader data view for the page * @param {DataPageHeader} daph data page header * @param {SchemaTree[]} schemaPath * @returns {{ definitionLevels: number[], numNulls: number }} definition levels */ function readDefinitionLevels(reader, daph, schemaPath) { - if (!isRequired(schemaPath)) { - const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) - const bitWidth = widthFromMaxInt(maxDefinitionLevel) - if (bitWidth) { - // num_values is index 1 for either type of page header - const definitionLevels = new Array(daph.num_values) - readRleBitPackedHybrid(reader, bitWidth, 0, definitionLevels) + const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) + if (!maxDefinitionLevel) return { definitionLevels: [], numNulls: 0 } - // count nulls - let numNulls = daph.num_values - for (const def of definitionLevels) { - if (def === maxDefinitionLevel) numNulls-- - } - if (numNulls === 0) { - definitionLevels.length = 0 - } + const definitionLevels = new Array(daph.num_values) + readRleBitPackedHybrid(reader, bitWidth(maxDefinitionLevel), 0, definitionLevels) - return { definitionLevels, numNulls } - } + // count nulls + let numNulls = daph.num_values + for (const def of definitionLevels) { + if (def === maxDefinitionLevel) numNulls-- } - return { definitionLevels: [], numNulls: 0 } + if (numNulls === 0) definitionLevels.length = 0 + + return { definitionLevels, numNulls } } diff --git a/src/datapageV2.js b/src/datapageV2.js index 7a008d0..ff4c0a6 100644 --- a/src/datapageV2.js +++ b/src/datapageV2.js @@ -1,6 +1,6 @@ import { decompressPage } from './column.js' import { deltaBinaryUnpack, deltaByteArray, deltaLengthByteArray } from './delta.js' -import { byteStreamSplit, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js' +import { bitWidth, byteStreamSplit, readRleBitPackedHybrid } from './encoding.js' import { readPlain } from './plain.js' import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' @@ -31,8 +31,7 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, // assert(reader.offset === daph2.repetition_levels_byte_length) // definition levels - const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) - const definitionLevels = readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) + const definitionLevels = readDefinitionLevelsV2(reader, daph2, schemaPath) // assert(reader.offset === daph2.repetition_levels_byte_length + daph2.definition_levels_byte_length) const uncompressedPageSize = ph.uncompressed_page_size - daph2.definition_levels_byte_length - daph2.repetition_levels_byte_length @@ -92,11 +91,9 @@ export function readRepetitionLevelsV2(reader, daph2, schemaPath) { const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) if (!maxRepetitionLevel) return [] - const bitWidth = widthFromMaxInt(maxRepetitionLevel) - // num_values is index 1 for either type of page header const values = new Array(daph2.num_values) readRleBitPackedHybrid( - reader, bitWidth, daph2.repetition_levels_byte_length, values + reader, bitWidth(maxRepetitionLevel), daph2.repetition_levels_byte_length, values ) return values } @@ -104,15 +101,15 @@ export function readRepetitionLevelsV2(reader, daph2, schemaPath) { /** * @param {DataReader} reader * @param {DataPageHeaderV2} daph2 data page header v2 - * @param {number} maxDefinitionLevel + * @param {SchemaTree[]} schemaPath * @returns {number[] | undefined} definition levels */ -function readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) { +function readDefinitionLevelsV2(reader, daph2, schemaPath) { + const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) if (maxDefinitionLevel) { - // not the same as V1, because we know the length - const bitWidth = widthFromMaxInt(maxDefinitionLevel) + // V2 we know the length const values = new Array(daph2.num_values) - readRleBitPackedHybrid(reader, bitWidth, daph2.definition_levels_byte_length, values) + readRleBitPackedHybrid(reader, bitWidth(maxDefinitionLevel), daph2.definition_levels_byte_length, values) return values } } diff --git a/src/delta.js b/src/delta.js index 7424baa..6b07395 100644 --- a/src/delta.js +++ b/src/delta.js @@ -3,21 +3,21 @@ import { readVarInt, readZigZagBigInt } from './thrift.js' /** * @typedef {import('./types.d.ts').DataReader} DataReader * @param {DataReader} reader - * @param {number} nValues number of values to read - * @param {Int32Array | BigInt64Array} output output array + * @param {number} count number of values to read + * @param {Int32Array | BigInt64Array} output */ -export function deltaBinaryUnpack(reader, nValues, output) { +export function deltaBinaryUnpack(reader, count, output) { const int32 = output instanceof Int32Array const blockSize = readVarInt(reader) const miniblockPerBlock = readVarInt(reader) - readVarInt(reader) // assert(count === nValues) + readVarInt(reader) // assert(=== count) let value = readZigZagBigInt(reader) // first value let outputIndex = 0 output[outputIndex++] = int32 ? Number(value) : value const valuesPerMiniblock = blockSize / miniblockPerBlock - while (outputIndex < nValues) { + while (outputIndex < count) { // new block const minDelta = readZigZagBigInt(reader) const bitWidths = new Uint8Array(miniblockPerBlock) @@ -25,14 +25,14 @@ export function deltaBinaryUnpack(reader, nValues, output) { bitWidths[i] = reader.view.getUint8(reader.offset++) } - for (let i = 0; i < miniblockPerBlock && outputIndex < nValues; i++) { + for (let i = 0; i < miniblockPerBlock && outputIndex < count; i++) { // new miniblock const bitWidth = BigInt(bitWidths[i]) if (bitWidth) { let bitpackPos = 0n let miniblockCount = valuesPerMiniblock const mask = (1n << bitWidth) - 1n - while (miniblockCount && outputIndex < nValues) { + while (miniblockCount && outputIndex < count) { let bits = BigInt(reader.view.getUint8(reader.offset)) >> bitpackPos & mask // TODO: don't re-read value every time bitpackPos += bitWidth while (bitpackPos >= 8) { @@ -52,7 +52,7 @@ export function deltaBinaryUnpack(reader, nValues, output) { reader.offset += Math.ceil((miniblockCount * Number(bitWidth) + Number(bitpackPos)) / 8) } } else { - for (let j = 0; j < valuesPerMiniblock && outputIndex < nValues; j++) { + for (let j = 0; j < valuesPerMiniblock && outputIndex < count; j++) { value += minDelta output[outputIndex++] = int32 ? Number(value) : value } @@ -63,13 +63,13 @@ export function deltaBinaryUnpack(reader, nValues, output) { /** * @param {DataReader} reader - * @param {number} nValues + * @param {number} count * @param {Uint8Array[]} output */ -export function deltaLengthByteArray(reader, nValues, output) { - const lengths = new Int32Array(nValues) - deltaBinaryUnpack(reader, nValues, lengths) - for (let i = 0; i < nValues; i++) { +export function deltaLengthByteArray(reader, count, output) { + const lengths = new Int32Array(count) + deltaBinaryUnpack(reader, count, lengths) + for (let i = 0; i < count; i++) { output[i] = new Uint8Array(reader.view.buffer, reader.view.byteOffset + reader.offset, lengths[i]) reader.offset += lengths[i] } @@ -77,16 +77,16 @@ export function deltaLengthByteArray(reader, nValues, output) { /** * @param {DataReader} reader - * @param {number} nValues + * @param {number} count * @param {Uint8Array[]} output */ -export function deltaByteArray(reader, nValues, output) { - const prefixData = new Int32Array(nValues) - deltaBinaryUnpack(reader, nValues, prefixData) - const suffixData = new Int32Array(nValues) - deltaBinaryUnpack(reader, nValues, suffixData) +export function deltaByteArray(reader, count, output) { + const prefixData = new Int32Array(count) + deltaBinaryUnpack(reader, count, prefixData) + const suffixData = new Int32Array(count) + deltaBinaryUnpack(reader, count, suffixData) - for (let i = 0; i < nValues; i++) { + for (let i = 0; i < count; i++) { const suffix = new Uint8Array(reader.view.buffer, reader.view.byteOffset + reader.offset, suffixData[i]) if (prefixData[i]) { // copy from previous value diff --git a/src/encoding.js b/src/encoding.js index 82d328f..1a1acdd 100644 --- a/src/encoding.js +++ b/src/encoding.js @@ -6,18 +6,18 @@ import { readVarInt } from './thrift.js' * @param {number} value * @returns {number} */ -export function widthFromMaxInt(value) { - return Math.ceil(Math.log2(value + 1)) +export function bitWidth(value) { + return 32 - Math.clz32(value) } /** * Read values from a run-length encoded/bit-packed hybrid encoding. * - * If length is zero, then read as int32 at the start of the encoded data. + * If length is zero, then read int32 length at the start. * * @typedef {import("./types.d.ts").DataReader} DataReader * @typedef {import("./types.d.ts").DecodedArray} DecodedArray - * @param {DataReader} reader - buffer to read data from + * @param {DataReader} reader * @param {number} width - width of each bit-packed group * @param {number} length - length of the encoded data * @param {DecodedArray} output diff --git a/src/plain.js b/src/plain.js index 4e61389..762e404 100644 --- a/src/plain.js +++ b/src/plain.js @@ -44,7 +44,7 @@ export function readPlain(reader, type, count, fixedLength) { function readPlainBoolean(reader, count) { const values = new Array(count) for (let i = 0; i < count; i++) { - const byteOffset = reader.offset + Math.floor(i / 8) + const byteOffset = reader.offset + (i / 8 | 0) const bitOffset = i % 8 const byte = reader.view.getUint8(byteOffset) values[i] = (byte & 1 << bitOffset) !== 0 diff --git a/src/schema.js b/src/schema.js index c6a89a7..c888d9f 100644 --- a/src/schema.js +++ b/src/schema.js @@ -45,21 +45,6 @@ export function getSchemaPath(schema, name) { return path } -/** - * Check if the schema path and all its ancestors are required. - * - * @param {SchemaTree[]} schemaPath - * @returns {boolean} true if the element is required - */ -export function isRequired(schemaPath) { - for (const { element } of schemaPath.slice(1)) { - if (element.repetition_type !== 'REQUIRED') { - return false - } - } - return true -} - /** * Get the max repetition level for a given schema path. * diff --git a/test/encoding.test.js b/test/encoding.test.js index 3c459e1..399de76 100644 --- a/test/encoding.test.js +++ b/test/encoding.test.js @@ -1,5 +1,5 @@ import { describe, expect, it } from 'vitest' -import { readRleBitPackedHybrid, widthFromMaxInt } from '../src/encoding.js' +import { bitWidth, readRleBitPackedHybrid } from '../src/encoding.js' describe('readRleBitPackedHybrid', () => { it('reads RLE values with explicit length', () => { @@ -100,14 +100,15 @@ describe('readRleBitPackedHybrid', () => { }) }) -describe('widthFromMaxInt', () => { +describe('bitWidth', () => { it('calculates bit widths', () => { - // Test a range of inputs and their expected outputs - expect(widthFromMaxInt(0)).toBe(0) - expect(widthFromMaxInt(1)).toBe(1) - expect(widthFromMaxInt(255)).toBe(8) - expect(widthFromMaxInt(256)).toBe(9) - expect(widthFromMaxInt(1023)).toBe(10) - expect(widthFromMaxInt(1048575)).toBe(20) + expect(bitWidth(0)).toBe(0) + expect(bitWidth(1)).toBe(1) + expect(bitWidth(7)).toBe(3) + expect(bitWidth(8)).toBe(4) + expect(bitWidth(255)).toBe(8) + expect(bitWidth(256)).toBe(9) + expect(bitWidth(1023)).toBe(10) + expect(bitWidth(1048575)).toBe(20) }) }) diff --git a/test/schema.test.js b/test/schema.test.js index 071a403..c22e213 100644 --- a/test/schema.test.js +++ b/test/schema.test.js @@ -5,7 +5,6 @@ import { getSchemaPath, isListLike, isMapLike, - isRequired, } from '../src/schema.js' describe('Parquet schema utils', () => { @@ -14,71 +13,98 @@ describe('Parquet schema utils', () => { * @type {SchemaElement[]} */ const schema = [ - { name: 'root', num_children: 3 }, - { name: 'child1', repetition_type: 'OPTIONAL' }, - { name: 'child2', repetition_type: 'OPTIONAL', num_children: 1, converted_type: 'LIST' }, + { name: 'root', num_children: 7 }, + { name: 'flat', repetition_type: 'OPTIONAL' }, + { name: 'listy', repetition_type: 'OPTIONAL', num_children: 1, converted_type: 'LIST' }, { name: 'list', repetition_type: 'REPEATED', num_children: 1 }, { name: 'element', repetition_type: 'REQUIRED' }, - { name: 'child3', repetition_type: 'OPTIONAL', num_children: 1, converted_type: 'MAP' }, + { name: 'mappy', repetition_type: 'OPTIONAL', num_children: 1, converted_type: 'MAP' }, { name: 'map', repetition_type: 'REPEATED', num_children: 2 }, { name: 'key', repetition_type: 'REQUIRED' }, { name: 'value', repetition_type: 'OPTIONAL' }, + { name: 'invalid_list', repetition_type: 'OPTIONAL', num_children: 2, converted_type: 'LIST' }, + { name: 'list1', repetition_type: 'REPEATED' }, + { name: 'list2', repetition_type: 'REPEATED' }, + { name: 'structy', repetition_type: 'OPTIONAL', num_children: 2, converted_type: 'LIST' }, + { name: 'element1', repetition_type: 'REQUIRED' }, + { name: 'element2', repetition_type: 'REQUIRED' }, + { name: 'list_structy', repetition_type: 'OPTIONAL', num_children: 1, converted_type: 'LIST' }, + { name: 'list', repetition_type: 'REPEATED', num_children: 2 }, + { name: 'element1', repetition_type: 'REQUIRED' }, + { name: 'element2', repetition_type: 'REQUIRED' }, + { name: 'invalid_list', repetition_type: 'OPTIONAL', num_children: 1, converted_type: 'LIST' }, + { name: 'list', repetition_type: 'OPTIONAL', num_children: 1 }, + { name: 'element', repetition_type: 'OPTIONAL' }, ] describe('getSchemaPath', () => { - it('should return the schema path', () => { - const path = getSchemaPath(schema, ['child1']) - expect(path[path.length - 1]).toEqual({ - children: [], - count: 1, - element: { name: 'child1', repetition_type: 'OPTIONAL' }, - path: ['child1'], + it('return the root schema path', () => { + const root = getSchemaPath(schema, []).at(-1) + expect(root?.children.length).toEqual(7) + expect(root).containSubset({ + count: 22, + element: { name: 'root', num_children: 7 }, + path: [], }) }) - it('should throw an error if element not found', () => { + it('return the schema path', () => { + expect(getSchemaPath(schema, ['flat']).at(-1)).toEqual({ + children: [], + count: 1, + element: { name: 'flat', repetition_type: 'OPTIONAL' }, + path: ['flat'], + }) + }) + + it('throw an error if element not found', () => { expect(() => getSchemaPath(schema, ['nonexistent'])) .toThrow('parquet schema element not found: nonexistent') }) }) - it('isRequired', () => { - expect(isRequired(getSchemaPath(schema, []))).toBe(true) - expect(isRequired(getSchemaPath(schema, ['child1']))).toBe(false) - expect(isRequired(getSchemaPath(schema, ['child2']))).toBe(false) - expect(isRequired(getSchemaPath(schema, ['child3']))).toBe(false) - }) - it('getMaxRepetitionLevel', () => { - expect(getMaxRepetitionLevel(getSchemaPath(schema, ['child1']))).toBe(0) - expect(getMaxRepetitionLevel(getSchemaPath(schema, ['child2']))).toBe(0) - expect(getMaxRepetitionLevel(getSchemaPath(schema, ['child2', 'list', 'element']))).toBe(1) - expect(getMaxRepetitionLevel(getSchemaPath(schema, ['child3']))).toBe(0) - expect(getMaxRepetitionLevel(getSchemaPath(schema, ['child3', 'map', 'key']))).toBe(1) + expect(getMaxRepetitionLevel(getSchemaPath(schema, ['flat']))).toBe(0) + expect(getMaxRepetitionLevel(getSchemaPath(schema, ['listy']))).toBe(0) + expect(getMaxRepetitionLevel(getSchemaPath(schema, ['listy', 'list', 'element']))).toBe(1) + expect(getMaxRepetitionLevel(getSchemaPath(schema, ['mappy']))).toBe(0) + expect(getMaxRepetitionLevel(getSchemaPath(schema, ['mappy', 'map', 'key']))).toBe(1) }) it('getMaxDefinitionLevel', () => { - expect(getMaxDefinitionLevel(getSchemaPath(schema, ['child1']))).toBe(1) - expect(getMaxDefinitionLevel(getSchemaPath(schema, ['child2']))).toBe(1) - expect(getMaxDefinitionLevel(getSchemaPath(schema, ['child3']))).toBe(1) + expect(getMaxDefinitionLevel(getSchemaPath(schema, ['flat']))).toBe(1) + expect(getMaxDefinitionLevel(getSchemaPath(schema, ['listy']))).toBe(1) + expect(getMaxDefinitionLevel(getSchemaPath(schema, ['mappy']))).toBe(1) }) it('isListLike', () => { expect(isListLike(getSchemaPath(schema, [])[1])).toBe(false) - expect(isListLike(getSchemaPath(schema, ['child1'])[1])).toBe(false) - expect(isListLike(getSchemaPath(schema, ['child2'])[1])).toBe(true) - expect(isListLike(getSchemaPath(schema, ['child2', 'list', 'element'])[1])).toBe(true) - expect(isListLike(getSchemaPath(schema, ['child3'])[1])).toBe(false) - expect(isListLike(getSchemaPath(schema, ['child3', 'map', 'key'])[1])).toBe(false) + expect(isListLike(getSchemaPath(schema, ['flat'])[1])).toBe(false) + expect(isListLike(getSchemaPath(schema, ['listy'])[1])).toBe(true) + expect(isListLike(getSchemaPath(schema, ['listy', 'list', 'element'])[1])).toBe(true) + expect(isListLike(getSchemaPath(schema, ['mappy'])[1])).toBe(false) + expect(isListLike(getSchemaPath(schema, ['mappy', 'map', 'key'])[1])).toBe(false) + expect(isListLike(getSchemaPath(schema, ['invalid_list'])[1])).toBe(false) + expect(isListLike(getSchemaPath(schema, ['invalid_list', 'list1'])[1])).toBe(false) + expect(isListLike(getSchemaPath(schema, ['invalid_list', 'list2'])[1])).toBe(false) + expect(isListLike(getSchemaPath(schema, ['structy'])[1])).toBe(false) + expect(isListLike(getSchemaPath(schema, ['list_structy'])[1])).toBe(false) + expect(isListLike(getSchemaPath(schema, ['invalid_list'])[1])).toBe(false) }) it('isMapLike', () => { expect(isMapLike(getSchemaPath(schema, [])[1])).toBe(false) - expect(isMapLike(getSchemaPath(schema, ['child1'])[1])).toBe(false) - expect(isMapLike(getSchemaPath(schema, ['child2'])[1])).toBe(false) - expect(isMapLike(getSchemaPath(schema, ['child2', 'list', 'element'])[1])).toBe(false) - expect(isMapLike(getSchemaPath(schema, ['child3'])[1])).toBe(true) - expect(isMapLike(getSchemaPath(schema, ['child3', 'map', 'key'])[1])).toBe(true) - expect(isMapLike(getSchemaPath(schema, ['child3', 'map', 'value'])[1])).toBe(true) + expect(isMapLike(getSchemaPath(schema, ['flat'])[1])).toBe(false) + expect(isMapLike(getSchemaPath(schema, ['listy'])[1])).toBe(false) + expect(isMapLike(getSchemaPath(schema, ['listy', 'list', 'element'])[1])).toBe(false) + expect(isMapLike(getSchemaPath(schema, ['mappy'])[1])).toBe(true) + expect(isMapLike(getSchemaPath(schema, ['mappy', 'map', 'key'])[1])).toBe(true) + expect(isMapLike(getSchemaPath(schema, ['mappy', 'map', 'value'])[1])).toBe(true) + expect(isMapLike(getSchemaPath(schema, ['invalid_list'])[1])).toBe(false) + expect(isMapLike(getSchemaPath(schema, ['invalid_list', 'list1'])[1])).toBe(false) + expect(isMapLike(getSchemaPath(schema, ['invalid_list', 'list2'])[1])).toBe(false) + expect(isMapLike(getSchemaPath(schema, ['structy'])[1])).toBe(false) + expect(isMapLike(getSchemaPath(schema, ['list_structy'])[1])).toBe(false) + expect(isMapLike(getSchemaPath(schema, ['invalid_list'])[1])).toBe(false) }) })