diff --git a/src/column.js b/src/column.js index cf95cab..ee83125 100644 --- a/src/column.js +++ b/src/column.js @@ -56,13 +56,13 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, const page = decompressPage( compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors ) - const { definitionLevels, repetitionLevels, value: dataPage } = readDataPage(page, daph, schemaPath, columnMetadata) + const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata) valuesSeen += daph.num_values const dictionaryEncoding = daph.encoding === 'PLAIN_DICTIONARY' || daph.encoding === 'RLE_DICTIONARY' // construct output values: skip nulls and construct lists - /** @type {any[]} */ + /** @type {DecodedArray} */ let values if (repetitionLevels.length) { dereferenceDictionary(dictionary, dataPage) @@ -109,7 +109,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, const daph2 = header.data_page_header_v2 if (!daph2) throw new Error('parquet data page header v2 is undefined') - const { definitionLevels, repetitionLevels, value: dataPage } = readDataPageV2( + const { definitionLevels, repetitionLevels, dataPage } = readDataPageV2( compressedBytes, header, schemaPath, columnMetadata, compressors ) valuesSeen += daph2.num_values @@ -145,8 +145,9 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, /** * Map data to dictionary values in place. * + * @typedef {import('./types.js').DecodedArray} DecodedArray * @param {ArrayLike | undefined} dictionary - * @param {number[]} dataPage + * @param {DecodedArray} dataPage */ function dereferenceDictionary(dictionary, dataPage) { if (dictionary) { diff --git a/src/convert.js b/src/convert.js index 364d7de..7697fd8 100644 --- a/src/convert.js +++ b/src/convert.js @@ -1,13 +1,15 @@ -const dayMillis = 86400000000000 // 1 day in ms +const dayMillis = 86400000000000 // 1 day in milliseconds /** * Convert known types from primitive to rich. * - * @param {any[]} data series of primitive types + * @typedef {import('./types.js').DecodedArray} DecodedArray + * @param {DecodedArray} data series of primitive types * @param {import('./types.js').SchemaElement} schemaElement schema element for the data - * @returns {any[]} series of rich types + * @returns {DecodedArray} series of rich types */ export function convert(data, schemaElement) { + if (!Array.isArray(data)) return data const ctype = schemaElement.converted_type if (ctype === 'UTF8') { const decoder = new TextDecoder() diff --git a/src/datapage.js b/src/datapage.js index 1cbcf1e..af0d49c 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -1,17 +1,15 @@ import { readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js' import { readPlain } from './plain.js' -import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired, skipDefinitionBytes } from './schema.js' - -const skipNulls = false // TODO +import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired } from './schema.js' /** * Read a data page from the given Uint8Array. * - * @typedef {{ definitionLevels: number[], numNulls: number }} DefinitionLevels * @typedef {import("./types.d.ts").DataPage} DataPage * @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData * @typedef {import("./types.d.ts").DataPageHeader} DataPageHeader * @typedef {import("./types.d.ts").SchemaTree} SchemaTree + * @typedef {import("./types.d.ts").DecodedArray} DecodedArray * @param {Uint8Array} bytes raw page data (should already be decompressed) * @param {DataPageHeader} daph data page header * @param {SchemaTree[]} schemaPath @@ -21,25 +19,14 @@ const skipNulls = false // TODO export function readDataPage(bytes, daph, schemaPath, columnMetadata) { const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength) const reader = { view, offset: 0 } - /** @type {any[]} */ - let values = [] + /** @type {DecodedArray} */ + let dataPage = [] // repetition levels const repetitionLevels = readRepetitionLevels(reader, daph, schemaPath) // definition levels - let definitionLevels = undefined - let numNulls = 0 - // let maxDefinitionLevel = -1 - // TODO: move into readDefinitionLevels - if (skipNulls && !isRequired(schemaPath)) { - // skip_definition_bytes - reader.offset += skipDefinitionBytes(daph.num_values) - } else { - const dl = readDefinitionLevels(reader, daph, schemaPath) - definitionLevels = dl.definitionLevels - numNulls = dl.numNulls - } + const { definitionLevels, numNulls } = readDefinitionLevels(reader, daph, schemaPath) // read values based on encoding const nValues = daph.num_values - numNulls @@ -47,7 +34,7 @@ export function readDataPage(bytes, daph, schemaPath, columnMetadata) { const { element } = schemaPath[schemaPath.length - 1] const utf8 = element.converted_type === 'UTF8' const plainObj = readPlain(reader, columnMetadata.type, nValues, utf8) - values = Array.isArray(plainObj) ? plainObj : Array.from(plainObj) + dataPage = plainObj } else if ( daph.encoding === 'PLAIN_DICTIONARY' || daph.encoding === 'RLE_DICTIONARY' || @@ -63,17 +50,17 @@ export function readDataPage(bytes, daph, schemaPath, columnMetadata) { reader.offset++ } if (bitWidth) { - values = new Array(nValues) - readRleBitPackedHybrid(reader, bitWidth, view.byteLength - reader.offset, values) + dataPage = new Array(nValues) + readRleBitPackedHybrid(reader, bitWidth, view.byteLength - reader.offset, dataPage) } else { // nval zeros - values = new Array(nValues).fill(0) + dataPage = new Array(nValues).fill(0) } } else { throw new Error(`parquet unsupported encoding: ${daph.encoding}`) } - return { definitionLevels, repetitionLevels, value: values } + return { definitionLevels, repetitionLevels, dataPage } } /** @@ -119,7 +106,7 @@ function readRepetitionLevels(reader, daph, schemaPath) { * @param {DataReader} reader data view for the page * @param {DataPageHeader} daph data page header * @param {SchemaTree[]} schemaPath - * @returns {DefinitionLevels} definition levels and number of bytes read + * @returns {{ definitionLevels: number[], numNulls: number }} definition levels */ function readDefinitionLevels(reader, daph, schemaPath) { if (!isRequired(schemaPath)) { diff --git a/src/datapageV2.js b/src/datapageV2.js index 880da08..d0a9106 100644 --- a/src/datapageV2.js +++ b/src/datapageV2.js @@ -23,7 +23,7 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, const view = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength) const reader = { view, offset: 0 } /** @type {any} */ - let values = [] + let dataPage = [] const daph2 = ph.data_page_header_v2 if (!daph2) throw new Error('parquet data page header v2 is undefined') @@ -56,7 +56,7 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, } const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength) const pageReader = { view: pageView, offset: 0 } - values = readPlain(pageReader, columnMetadata.type, nValues, utf8) + dataPage = readPlain(pageReader, columnMetadata.type, nValues, utf8) } else if (daph2.encoding === 'RLE') { const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec, compressors) const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength) @@ -65,8 +65,8 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, throw new Error('parquet RLE encoding with nulls not supported') } else { const pageReader = { view: pageView, offset: 4 } - values = new Array(nValues) - readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize, values) + dataPage = new Array(nValues) + readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize, dataPage) } } else if ( daph2.encoding === 'PLAIN_DICTIONARY' || @@ -77,18 +77,18 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength) const bitWidth = pageView.getUint8(0) const pageReader = { view: pageView, offset: 1 } - values = new Array(nValues) - readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize, values) + dataPage = new Array(nValues) + readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize, dataPage) } else if (daph2.encoding === 'DELTA_BINARY_PACKED') { if (daph2.num_nulls) throw new Error('parquet delta-int not supported') const codec = daph2.is_compressed ? columnMetadata.codec : 'UNCOMPRESSED' const page = decompressPage(compressedBytes, uncompressedPageSize, codec, compressors) - deltaBinaryUnpack(page, nValues, values) + deltaBinaryUnpack(page, nValues, dataPage) } else { throw new Error(`parquet unsupported encoding: ${daph2.encoding}`) } - return { definitionLevels, repetitionLevels, value: values } + return { definitionLevels, repetitionLevels, dataPage } } /** diff --git a/src/encoding.js b/src/encoding.js index 3c037a9..1fa6c1d 100644 --- a/src/encoding.js +++ b/src/encoding.js @@ -16,10 +16,11 @@ export function widthFromMaxInt(value) { * If length is zero, then read as int32 at the start of the encoded data. * * @typedef {import("./types.d.ts").DataReader} DataReader + * @typedef {number[]} DecodedArray * @param {DataReader} reader - buffer to read data from * @param {number} width - width of each bit-packed group * @param {number} length - length of the encoded data - * @param {number[]} values - output array + * @param {DecodedArray} values - output array */ export function readRleBitPackedHybrid(reader, width, length, values) { if (!length) { @@ -52,7 +53,7 @@ export function readRleBitPackedHybrid(reader, width, length, values) { * @param {DataReader} reader - buffer to read data from * @param {number} count - number of values to read * @param {number} bitWidth - width of each bit-packed group - * @param {number[]} values - output array + * @param {DecodedArray} values - output array * @param {number} seen - number of values seen so far */ function readRle(reader, count, bitWidth, values, seen) { diff --git a/src/plain.js b/src/plain.js index de1fc9a..8be0ccf 100644 --- a/src/plain.js +++ b/src/plain.js @@ -23,10 +23,15 @@ function readPlainBoolean(reader, count) { * * @param {DataReader} reader - buffer to read data from * @param {number} count - number of values to read - * @returns {number[]} array of int32 values + * @returns {Int32Array} array of int32 values */ function readPlainInt32(reader, count) { - const values = new Array(count) + if ((reader.view.byteOffset + reader.offset) % 4 === 0) { + const values = new Int32Array(reader.view.buffer, reader.view.byteOffset + reader.offset, count) + reader.offset += count * 4 + return values + } + const values = new Int32Array(count) for (let i = 0; i < count; i++) { values[i] = reader.view.getInt32(reader.offset + i * 4, true) } @@ -39,10 +44,15 @@ function readPlainInt32(reader, count) { * * @param {DataReader} reader - buffer to read data from * @param {number} count - number of values to read - * @returns {bigint[]} array of int64 values + * @returns {BigInt64Array} array of int64 values */ function readPlainInt64(reader, count) { - const values = new Array(count) + if ((reader.view.byteOffset + reader.offset) % 8 === 0) { + const values = new BigInt64Array(reader.view.buffer, reader.view.byteOffset + reader.offset, count) + reader.offset += count * 8 + return values + } + const values = new BigInt64Array(count) for (let i = 0; i < count; i++) { values[i] = reader.view.getBigInt64(reader.offset + i * 8, true) } @@ -73,13 +83,10 @@ function readPlainInt96(reader, count) { * * @param {DataReader} reader - buffer to read data from * @param {number} count - number of values to read - * @returns {number[]} array of float values + * @returns {Float32Array} array of float values */ function readPlainFloat(reader, count) { - const values = new Array(count) - for (let i = 0; i < count; i++) { - values[i] = reader.view.getFloat32(reader.offset + i * 4, true) - } + const values = new Float32Array(reader.view.buffer, reader.view.byteOffset + reader.offset, count) reader.offset += count * 4 return values } @@ -89,13 +96,10 @@ function readPlainFloat(reader, count) { * * @param {DataReader} reader - buffer to read data from * @param {number} count - number of values to read - * @returns {number[]} array of double values + * @returns {Float64Array} array of double values */ function readPlainDouble(reader, count) { - const values = new Array(count) - for (let i = 0; i < count; i++) { - values[i] = reader.view.getFloat64(reader.offset + i * 8, true) - } + const values = new Float64Array(reader.view.buffer, reader.view.byteOffset + reader.offset, count) reader.offset += count * 8 return values } @@ -137,12 +141,13 @@ function readPlainByteArrayFixed(reader, fixedLength) { /** * Read `count` values of the given type from the reader.view. * + * @typedef {import("./types.d.ts").DecodedArray} DecodedArray * @typedef {import("./types.d.ts").ParquetType} ParquetType * @param {DataReader} reader - buffer to read data from * @param {ParquetType} type - parquet type of the data * @param {number} count - number of values to read * @param {boolean} utf8 - whether to decode byte arrays as UTF-8 - * @returns {ArrayLike} array of values + * @returns {DecodedArray} array of values */ export function readPlain(reader, type, count, utf8) { if (count === 0) return [] diff --git a/src/schema.js b/src/schema.js index 06a65c2..ba62673 100644 --- a/src/schema.js +++ b/src/schema.js @@ -93,22 +93,6 @@ export function getMaxDefinitionLevel(schemaPath) { return maxLevel } -/** - * Get the number of bytes to skip for definition levels. - * - * @param {number} num number of values - * @returns {number} number of bytes to skip - */ -export function skipDefinitionBytes(num) { - let byteLength = 6 - let n = num >>> 8 - while (n !== 0) { - byteLength++ - n >>>= 7 - } - return byteLength -} - /** * Get the column name as foo.bar and handle list and map like columns. * diff --git a/src/types.d.ts b/src/types.d.ts index 886ba1e..64f337b 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -242,5 +242,13 @@ interface DataPageHeaderV2 { interface DataPage { definitionLevels: number[] | undefined repetitionLevels: number[] - value: any[] + dataPage: DecodedArray } + +export type DecodedArray = + Uint8Array | + Int32Array | + BigInt64Array | + Float32Array | + Float64Array | + any[] diff --git a/src/utils.js b/src/utils.js index e1160b0..850baed 100644 --- a/src/utils.js +++ b/src/utils.js @@ -24,8 +24,10 @@ export function toJson(obj) { /** * Concatenate two arrays fast. + * + * @typedef {import('./types.js').DecodedArray} DecodedArray * @param {any[]} aaa first array - * @param {any[]} bbb second array + * @param {DecodedArray} bbb second array */ export function concat(aaa, bbb) { const chunk = 10000 diff --git a/test/plain.test.js b/test/plain.test.js index 9d1de6b..1a29ab8 100644 --- a/test/plain.test.js +++ b/test/plain.test.js @@ -17,7 +17,7 @@ describe('readPlain', () => { view.setInt32(0, 123456789, true) // little-endian const reader = { view, offset: 0 } const result = readPlain(reader, 'INT32', 1, false) - expect(result).toEqual([123456789]) + expect(result).toEqual(new Int32Array([123456789])) expect(reader.offset).toBe(4) }) @@ -26,7 +26,7 @@ describe('readPlain', () => { view.setBigInt64(0, BigInt('1234567890123456789'), true) const reader = { view, offset: 0 } const result = readPlain(reader, 'INT64', 1, false) - expect(result).toEqual([1234567890123456789n]) + expect(result).toEqual(new BigInt64Array([1234567890123456789n])) expect(reader.offset).toBe(8) }) @@ -51,7 +51,7 @@ describe('readPlain', () => { view.setFloat32(0, 1234.5, true) // little-endian const reader = { view, offset: 0 } const result = readPlain(reader, 'FLOAT', 1, false) - expect(result).toEqual([1234.5]) + expect(result).toEqual(new Float32Array([1234.5])) expect(reader.offset).toBe(4) }) @@ -60,7 +60,7 @@ describe('readPlain', () => { view.setFloat64(0, 12345.6789, true) // little-endian const reader = { view, offset: 0 } const result = readPlain(reader, 'DOUBLE', 1, false) - expect(result).toEqual([12345.6789]) + expect(result).toEqual(new Float64Array([12345.6789])) expect(reader.offset).toBe(8) }) diff --git a/test/schema.test.js b/test/schema.test.js index 856adb6..55fab3d 100644 --- a/test/schema.test.js +++ b/test/schema.test.js @@ -6,7 +6,6 @@ import { isListLike, isMapLike, isRequired, - skipDefinitionBytes, } from '../src/schema.js' describe('Parquet schema utils', () => { @@ -63,11 +62,6 @@ describe('Parquet schema utils', () => { expect(getMaxDefinitionLevel(getSchemaPath(schema, ['child3']))).toBe(1) }) - it('skipDefinitionBytes', () => { - expect(skipDefinitionBytes(100)).toBe(6) - expect(skipDefinitionBytes(1000)).toBe(7) - }) - it('isListLike', () => { expect(isListLike(getSchemaPath(schema, []))).toBe(false) expect(isListLike(getSchemaPath(schema, ['child1']))).toBe(false)