Fix fixed length byte array type

This commit is contained in:
Kenny Daniel 2024-05-12 21:11:57 -07:00
parent bd4e7c1699
commit 7639b8ca7f
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
9 changed files with 202 additions and 30 deletions

@ -96,7 +96,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
const page = decompressPage(
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors
)
dictionary = readDictionaryPage(page, diph, columnMetadata)
dictionary = readDictionaryPage(page, diph, columnMetadata, element.type_length)
} else if (header.type === 'DATA_PAGE_V2') {
const daph2 = header.data_page_header_v2
if (!daph2) throw new Error('parquet data page header v2 is undefined')

@ -22,8 +22,6 @@ export function convert(data, schemaElement) {
return Array.from(data).map(v => v * factor)
} else if (typeof data[0] === 'bigint') {
if (factor === 1) return data
// @ts-expect-error data is either BigInt64Array or bigint[]
if (factor > 1) return data.map(v => v * BigInt(factor))
return Array.from(data).map(v => Number(v) * factor)
} else {
return Array.from(data).map(v => parseDecimal(v) * factor)

@ -29,7 +29,8 @@ export function readDataPage(bytes, daph, schemaPath, columnMetadata) {
// read values based on encoding
const nValues = daph.num_values - numNulls
if (daph.encoding === 'PLAIN') {
dataPage = readPlain(reader, columnMetadata.type, nValues)
const { type_length } = schemaPath[schemaPath.length - 1].element
dataPage = readPlain(reader, columnMetadata.type, nValues, type_length)
} else if (
daph.encoding === 'PLAIN_DICTIONARY' ||
daph.encoding === 'RLE_DICTIONARY' ||
@ -62,12 +63,13 @@ export function readDataPage(bytes, daph, schemaPath, columnMetadata) {
* @param {Uint8Array} bytes raw page data
* @param {import("./types.d.ts").DictionaryPageHeader} diph dictionary page header
* @param {ColumnMetaData} columnMetadata
* @param {number | undefined} typeLength - type_length from schema
* @returns {ArrayLike<any>} array of values
*/
export function readDictionaryPage(bytes, diph, columnMetadata) {
export function readDictionaryPage(bytes, diph, columnMetadata, typeLength) {
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
const reader = { view, offset: 0 }
return readPlain(reader, columnMetadata.type, diph.num_values)
return readPlain(reader, columnMetadata.type, diph.num_values, typeLength)
}
/**

@ -49,7 +49,8 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata,
const nValues = daph2.num_values - daph2.num_nulls
if (daph2.encoding === 'PLAIN') {
const pageReader = { view: pageView, offset: 0 }
dataPage = readPlain(pageReader, columnMetadata.type, nValues)
const { type_length } = schemaPath[schemaPath.length - 1].element
dataPage = readPlain(pageReader, columnMetadata.type, nValues, type_length)
} else if (daph2.encoding === 'RLE') {
const bitWidth = 1
const pageReader = { view: pageView, offset: 4 }

@ -7,9 +7,10 @@
* @param {DataReader} reader - buffer to read data from
* @param {ParquetType} type - parquet type of the data
* @param {number} count - number of values to read
* @param {number | undefined} fixedLength - length of each fixed length byte array
* @returns {DecodedArray} array of values
*/
export function readPlain(reader, type, count) {
export function readPlain(reader, type, count, fixedLength) {
if (count === 0) return []
if (type === 'BOOLEAN') {
return readPlainBoolean(reader, count)
@ -26,7 +27,8 @@ export function readPlain(reader, type, count) {
} else if (type === 'BYTE_ARRAY') {
return readPlainByteArray(reader, count)
} else if (type === 'FIXED_LEN_BYTE_ARRAY') {
return readPlainByteArrayFixed(reader, count)
if (!fixedLength) throw new Error('parquet missing fixed length')
return readPlainByteArrayFixed(reader, count, fixedLength)
} else {
throw new Error(`parquet unhandled type: ${type}`)
}
@ -151,16 +153,18 @@ function readPlainByteArray(reader, count) {
* Read a fixed length byte array.
*
* @param {DataReader} reader
* @param {number} count
* @param {number} fixedLength
* @returns {Uint8Array}
* @returns {Uint8Array[]}
*/
function readPlainByteArrayFixed(reader, fixedLength) {
reader.offset += fixedLength
return new Uint8Array(
reader.view.buffer,
reader.view.byteOffset + reader.offset - fixedLength,
fixedLength
)
function readPlainByteArrayFixed(reader, count, fixedLength) {
// assert(reader.view.byteLength - reader.offset >= count * fixedLength)
const values = new Array(count)
for (let i = 0; i < count; i++) {
values[i] = new Uint8Array(reader.view.buffer, reader.view.byteOffset + reader.offset, fixedLength)
reader.offset += fixedLength
}
return values
}
/**

@ -0,0 +1,26 @@
[
[1],
[2],
[3],
[4],
[5],
[6],
[7],
[8],
[9],
[10],
[11],
[12],
[13],
[14],
[15],
[16],
[17],
[18],
[19],
[20],
[21],
[22],
[23],
[24]
]

@ -0,0 +1,90 @@
{
"version": 1,
"schema": [
{
"name": "spark_schema",
"num_children": 1
},
{
"type": "FIXED_LEN_BYTE_ARRAY",
"type_length": 11,
"repetition_type": "OPTIONAL",
"name": "value",
"converted_type": "DECIMAL",
"scale": 2,
"precision": 25
}
],
"num_rows": 24,
"row_groups": [
{
"columns": [
{
"file_offset": 4,
"meta_data": {
"type": "FIXED_LEN_BYTE_ARRAY",
"encodings": [
"BIT_PACKED",
"RLE",
"PLAIN"
],
"path_in_schema": [
"value"
],
"codec": "UNCOMPRESSED",
"num_values": 24,
"total_uncompressed_size": 319,
"total_compressed_size": 319,
"data_page_offset": 4,
"statistics": {
"max": [
0,
0,
0,
0,
0,
0,
0,
0,
0,
9,
96
],
"min": [
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
200
],
"null_count": 0
},
"encoding_stats": [
{
"page_type": 0,
"encoding": "PLAIN",
"count": 1
}
]
}
}
],
"total_byte_size": 319,
"num_rows": 24
}
],
"key_value_metadata": [
{
"key": "org.apache.spark.sql.parquet.row.metadata",
"value": "{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"decimal(25,2)\",\"nullable\":true,\"metadata\":{}}]}"
}
],
"created_by": "parquet-mr version 1.8.2 (build c6522788629e590a53eb79874b95f6c3ff11f16c)",
"metadata_length": 346
}

Binary file not shown.

@ -2,12 +2,19 @@ import { describe, expect, it } from 'vitest'
import { readPlain } from '../src/plain.js'
describe('readPlain', () => {
it('returns empty array for count 0', () => {
const view = new DataView(new ArrayBuffer(4))
const reader = { view, offset: 0 }
const result = readPlain(reader, 'INT32', 0, undefined)
expect(result).toEqual([])
expect(reader.offset).toBe(0)
})
it('reads BOOLEAN values', () => {
const view = new DataView(new ArrayBuffer(1))
view.setUint8(0, 0b00000101) // true, false, true
const reader = { view, offset: 0 }
const result = readPlain(reader, 'BOOLEAN', 3)
const result = readPlain(reader, 'BOOLEAN', 3, undefined)
expect(result).toEqual([true, false, true])
expect(reader.offset).toBe(1)
})
@ -16,20 +23,38 @@ describe('readPlain', () => {
const view = new DataView(new ArrayBuffer(4))
view.setInt32(0, 123456789, true) // little-endian
const reader = { view, offset: 0 }
const result = readPlain(reader, 'INT32', 1)
const result = readPlain(reader, 'INT32', 1, undefined)
expect(result).toEqual(new Int32Array([123456789]))
expect(reader.offset).toBe(4)
})
it('reads unaligned INT32 values', () => {
const view = new DataView(new ArrayBuffer(5))
view.setInt32(1, 123456789, true) // little-endian
const reader = { view, offset: 1 }
const result = readPlain(reader, 'INT32', 1, undefined)
expect(result).toEqual(new Int32Array([123456789]))
expect(reader.offset).toBe(5)
})
it('reads INT64 values', () => {
const view = new DataView(new ArrayBuffer(8))
view.setBigInt64(0, BigInt('1234567890123456789'), true)
const reader = { view, offset: 0 }
const result = readPlain(reader, 'INT64', 1)
const result = readPlain(reader, 'INT64', 1, undefined)
expect(result).toEqual(new BigInt64Array([1234567890123456789n]))
expect(reader.offset).toBe(8)
})
it('reads unaligned INT64 values', () => {
const view = new DataView(new ArrayBuffer(9))
view.setBigInt64(1, BigInt('1234567890123456789'), true)
const reader = { view, offset: 1 }
const result = readPlain(reader, 'INT64', 1, undefined)
expect(result).toEqual(new BigInt64Array([1234567890123456789n]))
expect(reader.offset).toBe(9)
})
it('reads INT96 values', () => {
const buffer = new ArrayBuffer(12)
const view = new DataView(buffer)
@ -40,7 +65,7 @@ describe('readPlain', () => {
view.setBigInt64(0, low, true)
view.setInt32(8, high, true)
const reader = { view, offset: 0 }
const result = readPlain(reader, 'INT96', 1)
const result = readPlain(reader, 'INT96', 1, undefined)
const expectedValue = (BigInt(high) << 64n) | low
expect(result).toEqual([expectedValue])
expect(reader.offset).toBe(12)
@ -50,20 +75,38 @@ describe('readPlain', () => {
const view = new DataView(new ArrayBuffer(4))
view.setFloat32(0, 1234.5, true) // little-endian
const reader = { view, offset: 0 }
const result = readPlain(reader, 'FLOAT', 1)
const result = readPlain(reader, 'FLOAT', 1, undefined)
expect(result).toEqual(new Float32Array([1234.5]))
expect(reader.offset).toBe(4)
})
it('reads unaligned FLOAT values', () => {
const view = new DataView(new ArrayBuffer(5))
view.setFloat32(1, 1234.5, true) // little-endian
const reader = { view, offset: 1 }
const result = readPlain(reader, 'FLOAT', 1, undefined)
expect(result).toEqual(new Float32Array([1234.5]))
expect(reader.offset).toBe(5)
})
it('reads DOUBLE values', () => {
const view = new DataView(new ArrayBuffer(8))
view.setFloat64(0, 12345.6789, true) // little-endian
const reader = { view, offset: 0 }
const result = readPlain(reader, 'DOUBLE', 1)
const result = readPlain(reader, 'DOUBLE', 1, undefined)
expect(result).toEqual(new Float64Array([12345.6789]))
expect(reader.offset).toBe(8)
})
it('reads unaligned DOUBLE values', () => {
const view = new DataView(new ArrayBuffer(9))
view.setFloat64(1, 12345.6789, true) // little-endian
const reader = { view, offset: 1 }
const result = readPlain(reader, 'DOUBLE', 1, undefined)
expect(result).toEqual(new Float64Array([12345.6789]))
expect(reader.offset).toBe(9)
})
it('reads BYTE_ARRAY values', () => {
const view = new DataView(new ArrayBuffer(10))
view.setInt32(0, 3, true) // length 3
@ -71,21 +114,29 @@ describe('readPlain', () => {
view.setUint8(5, 2)
view.setUint8(6, 3)
const reader = { view, offset: 0 }
const result = readPlain(reader, 'BYTE_ARRAY', 1)
const result = readPlain(reader, 'BYTE_ARRAY', 1, undefined)
expect(result).toEqual([new Uint8Array([1, 2, 3])])
expect(reader.offset).toBe(7)
})
it('reads FIXED_LEN_BYTE_ARRAY values', () => {
const fixedLength = 3
const view = new DataView(new ArrayBuffer(fixedLength))
const fixedLength = 2
const view = new DataView(new ArrayBuffer(fixedLength * 2))
view.setUint8(0, 4)
view.setUint8(1, 5)
view.setUint8(2, 6)
view.setUint8(3, 7)
const reader = { view, offset: 0 }
const result = readPlain(reader, 'FIXED_LEN_BYTE_ARRAY', fixedLength)
expect(result).toEqual(new Uint8Array([4, 5, 6]))
expect(reader.offset).toBe(fixedLength)
const result = readPlain(reader, 'FIXED_LEN_BYTE_ARRAY', 2, fixedLength)
expect(result).toEqual([new Uint8Array([4, 5]), new Uint8Array([6, 7])])
expect(reader.offset).toBe(4)
})
it('throws for missing fixedLength', () => {
const view = new DataView(new ArrayBuffer(1))
const reader = { view, offset: 0 }
expect(() => readPlain(reader, 'FIXED_LEN_BYTE_ARRAY', 1, undefined))
.toThrow('parquet missing fixed length')
})
it('throws an error for unhandled types', () => {
@ -93,7 +144,7 @@ describe('readPlain', () => {
const reader = { view, offset: 0 }
/** @type any */
const invalidType = 'invalidType'
expect(() => readPlain(reader, invalidType, 1))
expect(() => readPlain(reader, invalidType, 1, undefined))
.toThrow(`parquet unhandled type: ${invalidType}`)
})
})