diff --git a/src/column.js b/src/column.js index 92cacb3..901bf8a 100644 --- a/src/column.js +++ b/src/column.js @@ -96,7 +96,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, const page = decompressPage( compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors ) - dictionary = readDictionaryPage(page, diph, columnMetadata) + dictionary = readDictionaryPage(page, diph, columnMetadata, element.type_length) } else if (header.type === 'DATA_PAGE_V2') { const daph2 = header.data_page_header_v2 if (!daph2) throw new Error('parquet data page header v2 is undefined') diff --git a/src/convert.js b/src/convert.js index 353020e..188be41 100644 --- a/src/convert.js +++ b/src/convert.js @@ -22,8 +22,6 @@ export function convert(data, schemaElement) { return Array.from(data).map(v => v * factor) } else if (typeof data[0] === 'bigint') { if (factor === 1) return data - // @ts-expect-error data is either BigInt64Array or bigint[] - if (factor > 1) return data.map(v => v * BigInt(factor)) return Array.from(data).map(v => Number(v) * factor) } else { return Array.from(data).map(v => parseDecimal(v) * factor) diff --git a/src/datapage.js b/src/datapage.js index 63efb8e..a3b99f9 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -29,7 +29,8 @@ export function readDataPage(bytes, daph, schemaPath, columnMetadata) { // read values based on encoding const nValues = daph.num_values - numNulls if (daph.encoding === 'PLAIN') { - dataPage = readPlain(reader, columnMetadata.type, nValues) + const { type_length } = schemaPath[schemaPath.length - 1].element + dataPage = readPlain(reader, columnMetadata.type, nValues, type_length) } else if ( daph.encoding === 'PLAIN_DICTIONARY' || daph.encoding === 'RLE_DICTIONARY' || @@ -62,12 +63,13 @@ export function readDataPage(bytes, daph, schemaPath, columnMetadata) { * @param {Uint8Array} bytes raw page data * @param {import("./types.d.ts").DictionaryPageHeader} diph dictionary page header * @param {ColumnMetaData} columnMetadata + * @param {number | undefined} typeLength - type_length from schema * @returns {ArrayLike} array of values */ -export function readDictionaryPage(bytes, diph, columnMetadata) { +export function readDictionaryPage(bytes, diph, columnMetadata, typeLength) { const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength) const reader = { view, offset: 0 } - return readPlain(reader, columnMetadata.type, diph.num_values) + return readPlain(reader, columnMetadata.type, diph.num_values, typeLength) } /** diff --git a/src/datapageV2.js b/src/datapageV2.js index 47f8ff5..900343f 100644 --- a/src/datapageV2.js +++ b/src/datapageV2.js @@ -49,7 +49,8 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, const nValues = daph2.num_values - daph2.num_nulls if (daph2.encoding === 'PLAIN') { const pageReader = { view: pageView, offset: 0 } - dataPage = readPlain(pageReader, columnMetadata.type, nValues) + const { type_length } = schemaPath[schemaPath.length - 1].element + dataPage = readPlain(pageReader, columnMetadata.type, nValues, type_length) } else if (daph2.encoding === 'RLE') { const bitWidth = 1 const pageReader = { view: pageView, offset: 4 } diff --git a/src/plain.js b/src/plain.js index a01fa90..2d8c429 100644 --- a/src/plain.js +++ b/src/plain.js @@ -7,9 +7,10 @@ * @param {DataReader} reader - buffer to read data from * @param {ParquetType} type - parquet type of the data * @param {number} count - number of values to read + * @param {number | undefined} fixedLength - length of each fixed length byte array * @returns {DecodedArray} array of values */ -export function readPlain(reader, type, count) { +export function readPlain(reader, type, count, fixedLength) { if (count === 0) return [] if (type === 'BOOLEAN') { return readPlainBoolean(reader, count) @@ -26,7 +27,8 @@ export function readPlain(reader, type, count) { } else if (type === 'BYTE_ARRAY') { return readPlainByteArray(reader, count) } else if (type === 'FIXED_LEN_BYTE_ARRAY') { - return readPlainByteArrayFixed(reader, count) + if (!fixedLength) throw new Error('parquet missing fixed length') + return readPlainByteArrayFixed(reader, count, fixedLength) } else { throw new Error(`parquet unhandled type: ${type}`) } @@ -151,16 +153,18 @@ function readPlainByteArray(reader, count) { * Read a fixed length byte array. * * @param {DataReader} reader + * @param {number} count * @param {number} fixedLength - * @returns {Uint8Array} + * @returns {Uint8Array[]} */ -function readPlainByteArrayFixed(reader, fixedLength) { - reader.offset += fixedLength - return new Uint8Array( - reader.view.buffer, - reader.view.byteOffset + reader.offset - fixedLength, - fixedLength - ) +function readPlainByteArrayFixed(reader, count, fixedLength) { + // assert(reader.view.byteLength - reader.offset >= count * fixedLength) + const values = new Array(count) + for (let i = 0; i < count; i++) { + values[i] = new Uint8Array(reader.view.buffer, reader.view.byteOffset + reader.offset, fixedLength) + reader.offset += fixedLength + } + return values } /** diff --git a/test/files/fixed_length_decimal.json b/test/files/fixed_length_decimal.json new file mode 100644 index 0000000..7aa76e6 --- /dev/null +++ b/test/files/fixed_length_decimal.json @@ -0,0 +1,26 @@ +[ + [1], + [2], + [3], + [4], + [5], + [6], + [7], + [8], + [9], + [10], + [11], + [12], + [13], + [14], + [15], + [16], + [17], + [18], + [19], + [20], + [21], + [22], + [23], + [24] +] diff --git a/test/files/fixed_length_decimal.metadata.json b/test/files/fixed_length_decimal.metadata.json new file mode 100644 index 0000000..eb1b00e --- /dev/null +++ b/test/files/fixed_length_decimal.metadata.json @@ -0,0 +1,90 @@ +{ + "version": 1, + "schema": [ + { + "name": "spark_schema", + "num_children": 1 + }, + { + "type": "FIXED_LEN_BYTE_ARRAY", + "type_length": 11, + "repetition_type": "OPTIONAL", + "name": "value", + "converted_type": "DECIMAL", + "scale": 2, + "precision": 25 + } + ], + "num_rows": 24, + "row_groups": [ + { + "columns": [ + { + "file_offset": 4, + "meta_data": { + "type": "FIXED_LEN_BYTE_ARRAY", + "encodings": [ + "BIT_PACKED", + "RLE", + "PLAIN" + ], + "path_in_schema": [ + "value" + ], + "codec": "UNCOMPRESSED", + "num_values": 24, + "total_uncompressed_size": 319, + "total_compressed_size": 319, + "data_page_offset": 4, + "statistics": { + "max": [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 9, + 96 + ], + "min": [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 200 + ], + "null_count": 0 + }, + "encoding_stats": [ + { + "page_type": 0, + "encoding": "PLAIN", + "count": 1 + } + ] + } + } + ], + "total_byte_size": 319, + "num_rows": 24 + } + ], + "key_value_metadata": [ + { + "key": "org.apache.spark.sql.parquet.row.metadata", + "value": "{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"decimal(25,2)\",\"nullable\":true,\"metadata\":{}}]}" + } + ], + "created_by": "parquet-mr version 1.8.2 (build c6522788629e590a53eb79874b95f6c3ff11f16c)", + "metadata_length": 346 + } diff --git a/test/files/fixed_length_decimal.parquet b/test/files/fixed_length_decimal.parquet new file mode 100644 index 0000000..69fce53 Binary files /dev/null and b/test/files/fixed_length_decimal.parquet differ diff --git a/test/plain.test.js b/test/plain.test.js index 63d27af..6d04f1b 100644 --- a/test/plain.test.js +++ b/test/plain.test.js @@ -2,12 +2,19 @@ import { describe, expect, it } from 'vitest' import { readPlain } from '../src/plain.js' describe('readPlain', () => { + it('returns empty array for count 0', () => { + const view = new DataView(new ArrayBuffer(4)) + const reader = { view, offset: 0 } + const result = readPlain(reader, 'INT32', 0, undefined) + expect(result).toEqual([]) + expect(reader.offset).toBe(0) + }) it('reads BOOLEAN values', () => { const view = new DataView(new ArrayBuffer(1)) view.setUint8(0, 0b00000101) // true, false, true const reader = { view, offset: 0 } - const result = readPlain(reader, 'BOOLEAN', 3) + const result = readPlain(reader, 'BOOLEAN', 3, undefined) expect(result).toEqual([true, false, true]) expect(reader.offset).toBe(1) }) @@ -16,20 +23,38 @@ describe('readPlain', () => { const view = new DataView(new ArrayBuffer(4)) view.setInt32(0, 123456789, true) // little-endian const reader = { view, offset: 0 } - const result = readPlain(reader, 'INT32', 1) + const result = readPlain(reader, 'INT32', 1, undefined) expect(result).toEqual(new Int32Array([123456789])) expect(reader.offset).toBe(4) }) + it('reads unaligned INT32 values', () => { + const view = new DataView(new ArrayBuffer(5)) + view.setInt32(1, 123456789, true) // little-endian + const reader = { view, offset: 1 } + const result = readPlain(reader, 'INT32', 1, undefined) + expect(result).toEqual(new Int32Array([123456789])) + expect(reader.offset).toBe(5) + }) + it('reads INT64 values', () => { const view = new DataView(new ArrayBuffer(8)) view.setBigInt64(0, BigInt('1234567890123456789'), true) const reader = { view, offset: 0 } - const result = readPlain(reader, 'INT64', 1) + const result = readPlain(reader, 'INT64', 1, undefined) expect(result).toEqual(new BigInt64Array([1234567890123456789n])) expect(reader.offset).toBe(8) }) + it('reads unaligned INT64 values', () => { + const view = new DataView(new ArrayBuffer(9)) + view.setBigInt64(1, BigInt('1234567890123456789'), true) + const reader = { view, offset: 1 } + const result = readPlain(reader, 'INT64', 1, undefined) + expect(result).toEqual(new BigInt64Array([1234567890123456789n])) + expect(reader.offset).toBe(9) + }) + it('reads INT96 values', () => { const buffer = new ArrayBuffer(12) const view = new DataView(buffer) @@ -40,7 +65,7 @@ describe('readPlain', () => { view.setBigInt64(0, low, true) view.setInt32(8, high, true) const reader = { view, offset: 0 } - const result = readPlain(reader, 'INT96', 1) + const result = readPlain(reader, 'INT96', 1, undefined) const expectedValue = (BigInt(high) << 64n) | low expect(result).toEqual([expectedValue]) expect(reader.offset).toBe(12) @@ -50,20 +75,38 @@ describe('readPlain', () => { const view = new DataView(new ArrayBuffer(4)) view.setFloat32(0, 1234.5, true) // little-endian const reader = { view, offset: 0 } - const result = readPlain(reader, 'FLOAT', 1) + const result = readPlain(reader, 'FLOAT', 1, undefined) expect(result).toEqual(new Float32Array([1234.5])) expect(reader.offset).toBe(4) }) + it('reads unaligned FLOAT values', () => { + const view = new DataView(new ArrayBuffer(5)) + view.setFloat32(1, 1234.5, true) // little-endian + const reader = { view, offset: 1 } + const result = readPlain(reader, 'FLOAT', 1, undefined) + expect(result).toEqual(new Float32Array([1234.5])) + expect(reader.offset).toBe(5) + }) + it('reads DOUBLE values', () => { const view = new DataView(new ArrayBuffer(8)) view.setFloat64(0, 12345.6789, true) // little-endian const reader = { view, offset: 0 } - const result = readPlain(reader, 'DOUBLE', 1) + const result = readPlain(reader, 'DOUBLE', 1, undefined) expect(result).toEqual(new Float64Array([12345.6789])) expect(reader.offset).toBe(8) }) + it('reads unaligned DOUBLE values', () => { + const view = new DataView(new ArrayBuffer(9)) + view.setFloat64(1, 12345.6789, true) // little-endian + const reader = { view, offset: 1 } + const result = readPlain(reader, 'DOUBLE', 1, undefined) + expect(result).toEqual(new Float64Array([12345.6789])) + expect(reader.offset).toBe(9) + }) + it('reads BYTE_ARRAY values', () => { const view = new DataView(new ArrayBuffer(10)) view.setInt32(0, 3, true) // length 3 @@ -71,21 +114,29 @@ describe('readPlain', () => { view.setUint8(5, 2) view.setUint8(6, 3) const reader = { view, offset: 0 } - const result = readPlain(reader, 'BYTE_ARRAY', 1) + const result = readPlain(reader, 'BYTE_ARRAY', 1, undefined) expect(result).toEqual([new Uint8Array([1, 2, 3])]) expect(reader.offset).toBe(7) }) it('reads FIXED_LEN_BYTE_ARRAY values', () => { - const fixedLength = 3 - const view = new DataView(new ArrayBuffer(fixedLength)) + const fixedLength = 2 + const view = new DataView(new ArrayBuffer(fixedLength * 2)) view.setUint8(0, 4) view.setUint8(1, 5) view.setUint8(2, 6) + view.setUint8(3, 7) const reader = { view, offset: 0 } - const result = readPlain(reader, 'FIXED_LEN_BYTE_ARRAY', fixedLength) - expect(result).toEqual(new Uint8Array([4, 5, 6])) - expect(reader.offset).toBe(fixedLength) + const result = readPlain(reader, 'FIXED_LEN_BYTE_ARRAY', 2, fixedLength) + expect(result).toEqual([new Uint8Array([4, 5]), new Uint8Array([6, 7])]) + expect(reader.offset).toBe(4) + }) + + it('throws for missing fixedLength', () => { + const view = new DataView(new ArrayBuffer(1)) + const reader = { view, offset: 0 } + expect(() => readPlain(reader, 'FIXED_LEN_BYTE_ARRAY', 1, undefined)) + .toThrow('parquet missing fixed length') }) it('throws an error for unhandled types', () => { @@ -93,7 +144,7 @@ describe('readPlain', () => { const reader = { view, offset: 0 } /** @type any */ const invalidType = 'invalidType' - expect(() => readPlain(reader, invalidType, 1)) + expect(() => readPlain(reader, invalidType, 1, undefined)) .toThrow(`parquet unhandled type: ${invalidType}`) }) })