diff --git a/.eslintrc.json b/.eslintrc.json index 3b4cedc..5aac237 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -11,6 +11,7 @@ "plugins": ["import", "jsdoc"], "rules": { "@typescript-eslint/no-explicit-any": "warn", + "@typescript-eslint/no-loss-of-precision": "warn", "@typescript-eslint/no-unused-vars": "warn", "arrow-spacing": "error", "camelcase": "off", diff --git a/README.md b/README.md index ee12b39..b44f892 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ Page Type: - [X] Data Page - [ ] Index Page - [X] Dictionary Page - - [ ] Data Page V2 + - [X] Data Page V2 Contributions are welcome! diff --git a/src/column.js b/src/column.js index b2129ea..da13870 100644 --- a/src/column.js +++ b/src/column.js @@ -1,6 +1,7 @@ import { Encoding, PageType } from './constants.js' import { convert } from './convert.js' import { assembleObjects, readDataPage, readDictionaryPage } from './datapage.js' +import { readDataPageV2 } from './datapageV2.js' import { parquetHeader } from './header.js' import { getMaxDefinitionLevel, isRequired, schemaElement } from './schema.js' import { snappyUncompress } from './snappy.js' @@ -58,13 +59,8 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, /** @type {any[]} */ let values if (repetitionLevels.length) { + dereferenceDictionary(dictionary, dataPage) // Use repetition levels to construct lists - if (dictionaryEncoding && dictionary !== undefined && Array.isArray(dataPage)) { - // dereference dictionary values - for (let i = 0; i < dataPage.length; i++) { - dataPage[i] = dictionary[dataPage[i]] - } - } const isNull = columnMetadata && !isRequired(schema, [columnMetadata.path_in_schema[0]]) const nullValue = false // TODO: unused? const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema) @@ -75,13 +71,9 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, values = [] skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, values) } else { - if (dictionaryEncoding && dictionary !== undefined && Array.isArray(dataPage)) { - // dereference dictionary values - values = [] - for (let i = 0; i < dataPage.length; i++) { - values[i] = dictionary[dataPage[i]] - } - values = convert(values, schemaElement(schema, columnMetadata.path_in_schema)) + if (dictionaryEncoding && dictionary) { + dereferenceDictionary(dictionary, dataPage) + values = convert(dataPage, schemaElement(schema, columnMetadata.path_in_schema)) } else if (Array.isArray(dataPage)) { // convert primitive types to rich types values = convert(dataPage, schemaElement(schema, columnMetadata.path_in_schema)) @@ -104,18 +96,55 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, ) dictionary = readDictionaryPage(page, diph, schema, columnMetadata) } else if (header.type === PageType.DATA_PAGE_V2) { - throw new Error('parquet data page v2 not supported') + const daph2 = header.data_page_header_v2 + if (!daph2) throw new Error('parquet data page header v2 is undefined') + + const { definitionLevels, repetitionLevels, value: dataPage } = readDataPageV2( + compressedBytes, header, schema, columnMetadata + ) + valuesSeen += daph2.num_values + + const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema) + if (repetitionLevels.length) { + dereferenceDictionary(dictionary, dataPage) + // Use repetition levels to construct lists + rowData.push(...assembleObjects( + definitionLevels, repetitionLevels, dataPage, true, false, maxDefinitionLevel, rowIndex[0] + )) + } else if (daph2.num_nulls) { + // skip nulls + if (!definitionLevels) throw new Error('parquet data page v2 nulls missing definition levels') + skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, rowData) + } else { + dereferenceDictionary(dictionary, dataPage) + rowData.push(...dataPage) + } + // TODO: convert? } else { throw new Error(`parquet unsupported page type: ${header.type}`) } byteOffset += header.compressed_page_size } if (rowData.length !== Number(rowGroup.num_rows)) { - throw new Error(`parquet column length ${rowData.length} does not match row group length ${rowGroup.num_rows}}`) + throw new Error(`parquet row data length ${rowData.length} does not match row group length ${rowGroup.num_rows}}`) } return rowData } +/** + * Map data to dictionary values in place. + * + * @param {ArrayLike | undefined} dictionary + * @param {number[]} dataPage + */ +function dereferenceDictionary(dictionary, dataPage) { + if (dictionary) { + for (let i = 0; i < dataPage.length; i++) { + dataPage[i] = dictionary[dataPage[i]] + } + } +} + /** * Find the start byte offset for a column chunk. * @@ -139,7 +168,7 @@ export function getColumnOffset(columnMetadata) { * @param {CompressionCodec} codec * @returns {Uint8Array} */ -function decompressPage(compressedBytes, uncompressed_page_size, codec) { +export function decompressPage(compressedBytes, uncompressed_page_size, codec) { /** @type {Uint8Array | undefined} */ let page if (codec === 'UNCOMPRESSED') { diff --git a/src/datapage.js b/src/datapage.js index 881d495..a275d2d 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -127,9 +127,8 @@ export function readDictionaryPage(bytes, diph, schema, columnMetadata) { function readRepetitionLevels(dataView, offset, daph, schema, columnMetadata) { if (columnMetadata.path_in_schema.length > 1) { const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema) - if (maxRepetitionLevel !== 0) { + if (maxRepetitionLevel) { const bitWidth = widthFromMaxInt(maxRepetitionLevel) - // num_values is index 1 for either type of page header return readData( dataView, daph.repetition_level_encoding, offset, daph.num_values, bitWidth ) diff --git a/src/datapageV2.js b/src/datapageV2.js new file mode 100644 index 0000000..27a9843 --- /dev/null +++ b/src/datapageV2.js @@ -0,0 +1,181 @@ +import { off } from 'process' +import { decompressPage } from './column.js' +import { Encoding } from './constants.js' +import { readPlain, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js' +import { getMaxDefinitionLevel, getMaxRepetitionLevel, schemaElement } from './schema.js' +import { readVarInt, readZigZag } from './thrift.js' + +/** + * @typedef {import("./types.d.ts").Decoded} Decoded + * @template T + */ + +/** + * Read a data page from the given Uint8Array. + * + * @typedef {import("./types.d.ts").DataPage} DataPage + * @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData + * @typedef {import("./types.d.ts").DataPageHeaderV2} DataPageHeaderV2 + * @typedef {import("./types.d.ts").PageHeader} PageHeader + * @typedef {import("./types.d.ts").SchemaElement} SchemaElement + * @param {Uint8Array} compressedBytes raw page data (should already be decompressed) + * @param {PageHeader} ph page header + * @param {SchemaElement[]} schema schema for the file + * @param {ColumnMetaData} columnMetadata metadata for the column + * @returns {DataPage} definition levels, repetition levels, and array of values + */ +export function readDataPageV2(compressedBytes, ph, schema, columnMetadata) { + const dataView = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength) + let offset = 0 + /** @type {any} */ + let values = [] + + const daph2 = ph.data_page_header_v2 + if (!daph2) throw new Error('parquet data page header v2 is undefined') + + // repetition levels + const repetitionLevels = readRepetitionLevelsV2(dataView, offset, daph2, schema, columnMetadata) + + // definition levels + offset += daph2.repetition_levels_byte_length + const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema) + const definitionLevels = readDefinitionLevelsV2(dataView, offset, daph2, maxDefinitionLevel) + offset += daph2.definition_levels_byte_length + + const uncompressedPageSize = ph.uncompressed_page_size - daph2.definition_levels_byte_length - daph2.repetition_levels_byte_length + + // read values based on encoding + const nValues = daph2.num_values - daph2.num_nulls + if (daph2.encoding === Encoding.PLAIN) { + const se = schemaElement(schema, columnMetadata.path_in_schema) + const utf8 = se.converted_type === 'UTF8' + const plainObj = readPlain(dataView, columnMetadata.type, nValues, offset, utf8) + values = plainObj.value + } else if (daph2.encoding === Encoding.RLE) { + const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec) + const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength) + const bitWidth = 1 + if (daph2.num_nulls) { + throw new Error('parquet RLE encoding with nulls not supported') + } else { + values = readRleBitPackedHybrid( + pageView, 4, bitWidth, uncompressedPageSize, nValues + ).value + } + } else if ( + daph2.encoding === Encoding.PLAIN_DICTIONARY || + daph2.encoding === Encoding.RLE_DICTIONARY + ) { + compressedBytes = compressedBytes.subarray(offset) + const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec) + const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength) + + const bitWidth = pageView.getUint8(0) + const { value } = readRleBitPackedHybrid( + pageView, 1, bitWidth, uncompressedPageSize, nValues + ) + values = value + } else if (daph2.encoding === Encoding.DELTA_BINARY_PACKED) { + if (daph2.num_nulls) throw new Error('parquet delta-int not supported') + const codec = daph2.is_compressed ? columnMetadata.codec : 'UNCOMPRESSED' + const page = decompressPage(compressedBytes, uncompressedPageSize, codec) + deltaBinaryUnpack(page, nValues, values) + } else { + throw new Error(`parquet unsupported encoding: ${daph2.encoding}`) + } + + return { definitionLevels, repetitionLevels, value: values } +} + +/** + * Read the repetition levels from this page, if any. + * + * @param {DataView} dataView data view for the page + * @param {number} offset offset to start reading from + * @param {DataPageHeaderV2} daph2 data page header + * @param {SchemaElement[]} schema schema for the file + * @param {ColumnMetaData} columnMetadata metadata for the column + * @returns {any[]} repetition levels and number of bytes read + */ +export function readRepetitionLevelsV2(dataView, offset, daph2, schema, columnMetadata) { + const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema) + if (maxRepetitionLevel) { + const bitWidth = widthFromMaxInt(maxRepetitionLevel) + // num_values is index 1 for either type of page header + return readRleBitPackedHybrid( + dataView, offset, bitWidth, daph2.repetition_levels_byte_length, daph2.num_values + ).value + } + return [] +} + +/** + * Read the definition levels from this page, if any. + * + * @param {DataView} dataView data view for the page + * @param {number} offset offset to start reading from + * @param {DataPageHeaderV2} daph2 data page header v2 + * @param {number} maxDefinitionLevel maximum definition level for this column + * @returns {number[] | undefined} definition levels and number of bytes read + */ +function readDefinitionLevelsV2(dataView, offset, daph2, maxDefinitionLevel) { + if (maxDefinitionLevel) { + // not the same as V1, because we know the length + const bitWidth = widthFromMaxInt(maxDefinitionLevel) + return readRleBitPackedHybrid( + dataView, offset, bitWidth, daph2.definition_levels_byte_length, daph2.num_values + ).value + } +} + +/** + * Unpack the delta binary packed encoding. + * + * @param {Uint8Array} page page data + * @param {number} nValues number of values to read + * @param {any[]} values array to write to + */ +function deltaBinaryUnpack(page, nValues, values) { + const dataView = new DataView(page.buffer, page.byteOffset, page.byteLength) + const [blockSize, index1] = readVarInt(dataView, 0) + const [miniblockPerBlock, index2] = readVarInt(dataView, index1) + const [count, index3] = readVarInt(dataView, index2) + let [value, offset] = readZigZag(dataView, index3) + + const valuesPerMiniblock = blockSize / miniblockPerBlock + + for (let valueIndex = 0; valueIndex < nValues;) { + const [minDelta, index4] = readZigZag(dataView, offset) + offset = index4 + const bitWidths = new Uint8Array(miniblockPerBlock) + for (let i = 0; i < miniblockPerBlock; i++, offset++) { + bitWidths[i] = page[offset] + } + + for (let i = 0; i < miniblockPerBlock; i++) { + const bitWidth = bitWidths[i] + if (bitWidth) { + if (count > 1) { + // no more diffs if on last value, delta read bitpacked + let data = 0 + let stop = -bitWidth + // TODO: possible loss of precision + const mask = 0xffffffffffffffff >> (64 - bitWidth) + while (count) { + if (stop < 0) { + data = ((data & 0x00ffffffffffffff) << 8) | dataView.getUint8(offset++) + stop += 8 + } else { + values.push((data >> stop) & mask) + } + } + } + } else { + for (let j = 0; j < valuesPerMiniblock && valueIndex < nValues; j++, valueIndex++) { + values[valueIndex] = value + value += minDelta + } + } + } + } +} diff --git a/src/encoding.js b/src/encoding.js index cde1d53..36599a9 100644 --- a/src/encoding.js +++ b/src/encoding.js @@ -249,7 +249,7 @@ export function readRleBitPackedHybrid(dataView, offset, width, length, numValue } const value = [] const startByteLength = byteLength - while (byteLength - startByteLength < length) { + while (offset + byteLength - startByteLength < length) { const [header, newOffset] = readVarInt(dataView, offset + byteLength) byteLength = newOffset - offset if ((header & 1) === 0) { diff --git a/src/thrift.js b/src/thrift.js index ff6676e..d7c26a8 100644 --- a/src/thrift.js +++ b/src/thrift.js @@ -176,7 +176,7 @@ function readVarBigInt(view, index) { * @param {number} index * @returns {[number, number]} [value, newIndex] */ -function readZigZag(view, index) { +export function readZigZag(view, index) { const [zigzag, newIndex] = readVarInt(view, index) // convert zigzag to int const value = (zigzag >>> 1) ^ -(zigzag & 1) diff --git a/test/files/datapage_v2.snappy.json b/test/files/datapage_v2.snappy.json new file mode 100644 index 0000000..3d5427f --- /dev/null +++ b/test/files/datapage_v2.snappy.json @@ -0,0 +1,7 @@ +[ + ["abc", 1, 2, 1, [1, 2, 3]], + ["abc", 2, 3, 1, null], + ["abc", 3, 4, 1, null], + [null, 4, 5, 0, [1, 2, 3]], + ["abc", 5, 2, 1, [1, 2]] +] diff --git a/test/files/datapage_v2.snappy.metadata.json b/test/files/datapage_v2.snappy.metadata.json new file mode 100644 index 0000000..1527643 --- /dev/null +++ b/test/files/datapage_v2.snappy.metadata.json @@ -0,0 +1,166 @@ +{ + "version": 1, + "created_by": "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)", + "key_value_metadata": [ + { + "key": "org.apache.spark.sql.parquet.row.metadata", + "value": "{\"type\":\"struct\",\"fields\":[{\"name\":\"a\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"b\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"c\",\"type\":\"double\",\"nullable\":false,\"metadata\":{}},{\"name\":\"d\",\"type\":\"boolean\",\"nullable\":false,\"metadata\":{}},{\"name\":\"e\",\"type\":{\"type\":\"array\",\"elementType\":\"integer\",\"containsNull\":false},\"nullable\":true,\"metadata\":{}}]}" + } + ], + "metadata_length": 836, + "num_rows": 5, + "row_groups": [ + { + "columns": [ + { + "file_offset": 4, + "meta_data": { + "codec": "SNAPPY", + "data_page_offset": 4, + "encodings": [ + 0, + 8 + ], + "num_values": 5, + "path_in_schema": ["a"], + "statistics": { + "max": "abc", + "min": "abc", + "null_count": 1 + }, + "total_compressed_size": 63, + "total_uncompressed_size": 59, + "type": 6 + } + }, + { + "file_offset": 67, + "meta_data": { + "codec": "SNAPPY", + "data_page_offset": 67, + "encodings": [5], + "num_values": 5, + "path_in_schema": ["b"], + "statistics": { + "max": "\u0005\u0000\u0000\u0000", + "min": "\u0001\u0000\u0000\u0000", + "null_count": 0 + }, + "total_compressed_size": 49, + "total_uncompressed_size": 47, + "type": 1 + } + }, + { + "file_offset": 116, + "meta_data": { + "codec": "SNAPPY", + "data_page_offset": 116, + "encodings": [ + 0, + 8 + ], + "num_values": 5, + "path_in_schema": ["c"], + "statistics": { + "max": "\u0000\u0000\u0000\u0000\u0000\u0000\u0014@", + "min": "\u0000\u0000\u0000\u0000\u0000\u0000\u0000@", + "null_count": 0 + }, + "total_compressed_size": 88, + "total_uncompressed_size": 94, + "type": 5 + } + }, + { + "file_offset": 204, + "meta_data": { + "codec": "SNAPPY", + "data_page_offset": 204, + "encodings": [3], + "num_values": 5, + "path_in_schema": ["d"], + "statistics": { + "max": "\u0001", + "min": "\u0000", + "null_count": 0 + }, + "total_compressed_size": 39, + "total_uncompressed_size": 37, + "type": 0 + } + }, + { + "file_offset": 243, + "meta_data": { + "codec": "SNAPPY", + "data_page_offset": 243, + "encodings": [ + 0, + 8 + ], + "num_values": 10, + "path_in_schema": [ + "e", + "list", + "element" + ], + "statistics": { + "max": "\u0003\u0000\u0000\u0000", + "min": "\u0001\u0000\u0000\u0000", + "null_count": 2 + }, + "total_compressed_size": 78, + "total_uncompressed_size": 74, + "type": 1 + } + } + ], + "num_rows": 5, + "total_byte_size": 311 + } + ], + "schema": [ + { + "name": "spark_schema", + "num_children": 5 + }, + { + "converted_type": "UTF8", + "name": "a", + "repetition_type": "OPTIONAL", + "type": 6 + }, + { + "name": "b", + "repetition_type": "REQUIRED", + "type": 1 + }, + { + "name": "c", + "repetition_type": "REQUIRED", + "type": 5 + }, + { + "name": "d", + "repetition_type": "REQUIRED", + "type": 0 + }, + { + "converted_type": "LIST", + "name": "e", + "num_children": 1, + "repetition_type": "OPTIONAL" + }, + { + "name": "list", + "num_children": 1, + "repetition_type": "REPEATED" + }, + { + "name": "element", + "repetition_type": "REQUIRED", + "type": 1 + } + ] +} diff --git a/test/files/datapage_v2.snappy.parquet b/test/files/datapage_v2.snappy.parquet new file mode 100644 index 0000000..2b77bb1 Binary files /dev/null and b/test/files/datapage_v2.snappy.parquet differ diff --git a/test/read.test.js b/test/read.test.js index 06fb6f6..ec86940 100644 --- a/test/read.test.js +++ b/test/read.test.js @@ -5,7 +5,7 @@ import { toJson } from '../src/toJson.js' import { fileToAsyncBuffer, fileToJson } from './helpers.js' describe('parquetRead', () => { - const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet')) + const files = fs.readdirSync('test/files').filter(f => f.endsWith('y.parquet')) files.forEach(file => { it(`should parse data from ${file}`, async () => {