TypedArrays

This commit is contained in:
Kenny Daniel 2024-05-01 23:23:50 -07:00
parent b1f24c3892
commit 4d5c8324aa
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
11 changed files with 68 additions and 84 deletions

@ -56,13 +56,13 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
const page = decompressPage(
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors
)
const { definitionLevels, repetitionLevels, value: dataPage } = readDataPage(page, daph, schemaPath, columnMetadata)
const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata)
valuesSeen += daph.num_values
const dictionaryEncoding = daph.encoding === 'PLAIN_DICTIONARY' || daph.encoding === 'RLE_DICTIONARY'
// construct output values: skip nulls and construct lists
/** @type {any[]} */
/** @type {DecodedArray} */
let values
if (repetitionLevels.length) {
dereferenceDictionary(dictionary, dataPage)
@ -109,7 +109,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
const daph2 = header.data_page_header_v2
if (!daph2) throw new Error('parquet data page header v2 is undefined')
const { definitionLevels, repetitionLevels, value: dataPage } = readDataPageV2(
const { definitionLevels, repetitionLevels, dataPage } = readDataPageV2(
compressedBytes, header, schemaPath, columnMetadata, compressors
)
valuesSeen += daph2.num_values
@ -145,8 +145,9 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
/**
* Map data to dictionary values in place.
*
* @typedef {import('./types.js').DecodedArray} DecodedArray
* @param {ArrayLike<any> | undefined} dictionary
* @param {number[]} dataPage
* @param {DecodedArray} dataPage
*/
function dereferenceDictionary(dictionary, dataPage) {
if (dictionary) {

@ -1,13 +1,15 @@
const dayMillis = 86400000000000 // 1 day in ms
const dayMillis = 86400000000000 // 1 day in milliseconds
/**
* Convert known types from primitive to rich.
*
* @param {any[]} data series of primitive types
* @typedef {import('./types.js').DecodedArray} DecodedArray
* @param {DecodedArray} data series of primitive types
* @param {import('./types.js').SchemaElement} schemaElement schema element for the data
* @returns {any[]} series of rich types
* @returns {DecodedArray} series of rich types
*/
export function convert(data, schemaElement) {
if (!Array.isArray(data)) return data
const ctype = schemaElement.converted_type
if (ctype === 'UTF8') {
const decoder = new TextDecoder()

@ -1,17 +1,15 @@
import { readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js'
import { readPlain } from './plain.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired, skipDefinitionBytes } from './schema.js'
const skipNulls = false // TODO
import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired } from './schema.js'
/**
* Read a data page from the given Uint8Array.
*
* @typedef {{ definitionLevels: number[], numNulls: number }} DefinitionLevels
* @typedef {import("./types.d.ts").DataPage} DataPage
* @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData
* @typedef {import("./types.d.ts").DataPageHeader} DataPageHeader
* @typedef {import("./types.d.ts").SchemaTree} SchemaTree
* @typedef {import("./types.d.ts").DecodedArray} DecodedArray
* @param {Uint8Array} bytes raw page data (should already be decompressed)
* @param {DataPageHeader} daph data page header
* @param {SchemaTree[]} schemaPath
@ -21,25 +19,14 @@ const skipNulls = false // TODO
export function readDataPage(bytes, daph, schemaPath, columnMetadata) {
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
const reader = { view, offset: 0 }
/** @type {any[]} */
let values = []
/** @type {DecodedArray} */
let dataPage = []
// repetition levels
const repetitionLevels = readRepetitionLevels(reader, daph, schemaPath)
// definition levels
let definitionLevels = undefined
let numNulls = 0
// let maxDefinitionLevel = -1
// TODO: move into readDefinitionLevels
if (skipNulls && !isRequired(schemaPath)) {
// skip_definition_bytes
reader.offset += skipDefinitionBytes(daph.num_values)
} else {
const dl = readDefinitionLevels(reader, daph, schemaPath)
definitionLevels = dl.definitionLevels
numNulls = dl.numNulls
}
const { definitionLevels, numNulls } = readDefinitionLevels(reader, daph, schemaPath)
// read values based on encoding
const nValues = daph.num_values - numNulls
@ -47,7 +34,7 @@ export function readDataPage(bytes, daph, schemaPath, columnMetadata) {
const { element } = schemaPath[schemaPath.length - 1]
const utf8 = element.converted_type === 'UTF8'
const plainObj = readPlain(reader, columnMetadata.type, nValues, utf8)
values = Array.isArray(plainObj) ? plainObj : Array.from(plainObj)
dataPage = plainObj
} else if (
daph.encoding === 'PLAIN_DICTIONARY' ||
daph.encoding === 'RLE_DICTIONARY' ||
@ -63,17 +50,17 @@ export function readDataPage(bytes, daph, schemaPath, columnMetadata) {
reader.offset++
}
if (bitWidth) {
values = new Array(nValues)
readRleBitPackedHybrid(reader, bitWidth, view.byteLength - reader.offset, values)
dataPage = new Array(nValues)
readRleBitPackedHybrid(reader, bitWidth, view.byteLength - reader.offset, dataPage)
} else {
// nval zeros
values = new Array(nValues).fill(0)
dataPage = new Array(nValues).fill(0)
}
} else {
throw new Error(`parquet unsupported encoding: ${daph.encoding}`)
}
return { definitionLevels, repetitionLevels, value: values }
return { definitionLevels, repetitionLevels, dataPage }
}
/**
@ -119,7 +106,7 @@ function readRepetitionLevels(reader, daph, schemaPath) {
* @param {DataReader} reader data view for the page
* @param {DataPageHeader} daph data page header
* @param {SchemaTree[]} schemaPath
* @returns {DefinitionLevels} definition levels and number of bytes read
* @returns {{ definitionLevels: number[], numNulls: number }} definition levels
*/
function readDefinitionLevels(reader, daph, schemaPath) {
if (!isRequired(schemaPath)) {

@ -23,7 +23,7 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata,
const view = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength)
const reader = { view, offset: 0 }
/** @type {any} */
let values = []
let dataPage = []
const daph2 = ph.data_page_header_v2
if (!daph2) throw new Error('parquet data page header v2 is undefined')
@ -56,7 +56,7 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata,
}
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const pageReader = { view: pageView, offset: 0 }
values = readPlain(pageReader, columnMetadata.type, nValues, utf8)
dataPage = readPlain(pageReader, columnMetadata.type, nValues, utf8)
} else if (daph2.encoding === 'RLE') {
const page = decompressPage(compressedBytes, uncompressedPageSize, columnMetadata.codec, compressors)
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
@ -65,8 +65,8 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata,
throw new Error('parquet RLE encoding with nulls not supported')
} else {
const pageReader = { view: pageView, offset: 4 }
values = new Array(nValues)
readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize, values)
dataPage = new Array(nValues)
readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize, dataPage)
}
} else if (
daph2.encoding === 'PLAIN_DICTIONARY' ||
@ -77,18 +77,18 @@ export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata,
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const bitWidth = pageView.getUint8(0)
const pageReader = { view: pageView, offset: 1 }
values = new Array(nValues)
readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize, values)
dataPage = new Array(nValues)
readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize, dataPage)
} else if (daph2.encoding === 'DELTA_BINARY_PACKED') {
if (daph2.num_nulls) throw new Error('parquet delta-int not supported')
const codec = daph2.is_compressed ? columnMetadata.codec : 'UNCOMPRESSED'
const page = decompressPage(compressedBytes, uncompressedPageSize, codec, compressors)
deltaBinaryUnpack(page, nValues, values)
deltaBinaryUnpack(page, nValues, dataPage)
} else {
throw new Error(`parquet unsupported encoding: ${daph2.encoding}`)
}
return { definitionLevels, repetitionLevels, value: values }
return { definitionLevels, repetitionLevels, dataPage }
}
/**

@ -16,10 +16,11 @@ export function widthFromMaxInt(value) {
* If length is zero, then read as int32 at the start of the encoded data.
*
* @typedef {import("./types.d.ts").DataReader} DataReader
* @typedef {number[]} DecodedArray
* @param {DataReader} reader - buffer to read data from
* @param {number} width - width of each bit-packed group
* @param {number} length - length of the encoded data
* @param {number[]} values - output array
* @param {DecodedArray} values - output array
*/
export function readRleBitPackedHybrid(reader, width, length, values) {
if (!length) {
@ -52,7 +53,7 @@ export function readRleBitPackedHybrid(reader, width, length, values) {
* @param {DataReader} reader - buffer to read data from
* @param {number} count - number of values to read
* @param {number} bitWidth - width of each bit-packed group
* @param {number[]} values - output array
* @param {DecodedArray} values - output array
* @param {number} seen - number of values seen so far
*/
function readRle(reader, count, bitWidth, values, seen) {

@ -23,10 +23,15 @@ function readPlainBoolean(reader, count) {
*
* @param {DataReader} reader - buffer to read data from
* @param {number} count - number of values to read
* @returns {number[]} array of int32 values
* @returns {Int32Array} array of int32 values
*/
function readPlainInt32(reader, count) {
const values = new Array(count)
if ((reader.view.byteOffset + reader.offset) % 4 === 0) {
const values = new Int32Array(reader.view.buffer, reader.view.byteOffset + reader.offset, count)
reader.offset += count * 4
return values
}
const values = new Int32Array(count)
for (let i = 0; i < count; i++) {
values[i] = reader.view.getInt32(reader.offset + i * 4, true)
}
@ -39,10 +44,15 @@ function readPlainInt32(reader, count) {
*
* @param {DataReader} reader - buffer to read data from
* @param {number} count - number of values to read
* @returns {bigint[]} array of int64 values
* @returns {BigInt64Array} array of int64 values
*/
function readPlainInt64(reader, count) {
const values = new Array(count)
if ((reader.view.byteOffset + reader.offset) % 8 === 0) {
const values = new BigInt64Array(reader.view.buffer, reader.view.byteOffset + reader.offset, count)
reader.offset += count * 8
return values
}
const values = new BigInt64Array(count)
for (let i = 0; i < count; i++) {
values[i] = reader.view.getBigInt64(reader.offset + i * 8, true)
}
@ -73,13 +83,10 @@ function readPlainInt96(reader, count) {
*
* @param {DataReader} reader - buffer to read data from
* @param {number} count - number of values to read
* @returns {number[]} array of float values
* @returns {Float32Array} array of float values
*/
function readPlainFloat(reader, count) {
const values = new Array(count)
for (let i = 0; i < count; i++) {
values[i] = reader.view.getFloat32(reader.offset + i * 4, true)
}
const values = new Float32Array(reader.view.buffer, reader.view.byteOffset + reader.offset, count)
reader.offset += count * 4
return values
}
@ -89,13 +96,10 @@ function readPlainFloat(reader, count) {
*
* @param {DataReader} reader - buffer to read data from
* @param {number} count - number of values to read
* @returns {number[]} array of double values
* @returns {Float64Array} array of double values
*/
function readPlainDouble(reader, count) {
const values = new Array(count)
for (let i = 0; i < count; i++) {
values[i] = reader.view.getFloat64(reader.offset + i * 8, true)
}
const values = new Float64Array(reader.view.buffer, reader.view.byteOffset + reader.offset, count)
reader.offset += count * 8
return values
}
@ -137,12 +141,13 @@ function readPlainByteArrayFixed(reader, fixedLength) {
/**
* Read `count` values of the given type from the reader.view.
*
* @typedef {import("./types.d.ts").DecodedArray} DecodedArray
* @typedef {import("./types.d.ts").ParquetType} ParquetType
* @param {DataReader} reader - buffer to read data from
* @param {ParquetType} type - parquet type of the data
* @param {number} count - number of values to read
* @param {boolean} utf8 - whether to decode byte arrays as UTF-8
* @returns {ArrayLike<any>} array of values
* @returns {DecodedArray} array of values
*/
export function readPlain(reader, type, count, utf8) {
if (count === 0) return []

@ -93,22 +93,6 @@ export function getMaxDefinitionLevel(schemaPath) {
return maxLevel
}
/**
* Get the number of bytes to skip for definition levels.
*
* @param {number} num number of values
* @returns {number} number of bytes to skip
*/
export function skipDefinitionBytes(num) {
let byteLength = 6
let n = num >>> 8
while (n !== 0) {
byteLength++
n >>>= 7
}
return byteLength
}
/**
* Get the column name as foo.bar and handle list and map like columns.
*

10
src/types.d.ts vendored

@ -242,5 +242,13 @@ interface DataPageHeaderV2 {
interface DataPage {
definitionLevels: number[] | undefined
repetitionLevels: number[]
value: any[]
dataPage: DecodedArray
}
export type DecodedArray =
Uint8Array |
Int32Array |
BigInt64Array |
Float32Array |
Float64Array |
any[]

@ -24,8 +24,10 @@ export function toJson(obj) {
/**
* Concatenate two arrays fast.
*
* @typedef {import('./types.js').DecodedArray} DecodedArray
* @param {any[]} aaa first array
* @param {any[]} bbb second array
* @param {DecodedArray} bbb second array
*/
export function concat(aaa, bbb) {
const chunk = 10000

@ -17,7 +17,7 @@ describe('readPlain', () => {
view.setInt32(0, 123456789, true) // little-endian
const reader = { view, offset: 0 }
const result = readPlain(reader, 'INT32', 1, false)
expect(result).toEqual([123456789])
expect(result).toEqual(new Int32Array([123456789]))
expect(reader.offset).toBe(4)
})
@ -26,7 +26,7 @@ describe('readPlain', () => {
view.setBigInt64(0, BigInt('1234567890123456789'), true)
const reader = { view, offset: 0 }
const result = readPlain(reader, 'INT64', 1, false)
expect(result).toEqual([1234567890123456789n])
expect(result).toEqual(new BigInt64Array([1234567890123456789n]))
expect(reader.offset).toBe(8)
})
@ -51,7 +51,7 @@ describe('readPlain', () => {
view.setFloat32(0, 1234.5, true) // little-endian
const reader = { view, offset: 0 }
const result = readPlain(reader, 'FLOAT', 1, false)
expect(result).toEqual([1234.5])
expect(result).toEqual(new Float32Array([1234.5]))
expect(reader.offset).toBe(4)
})
@ -60,7 +60,7 @@ describe('readPlain', () => {
view.setFloat64(0, 12345.6789, true) // little-endian
const reader = { view, offset: 0 }
const result = readPlain(reader, 'DOUBLE', 1, false)
expect(result).toEqual([12345.6789])
expect(result).toEqual(new Float64Array([12345.6789]))
expect(reader.offset).toBe(8)
})

@ -6,7 +6,6 @@ import {
isListLike,
isMapLike,
isRequired,
skipDefinitionBytes,
} from '../src/schema.js'
describe('Parquet schema utils', () => {
@ -63,11 +62,6 @@ describe('Parquet schema utils', () => {
expect(getMaxDefinitionLevel(getSchemaPath(schema, ['child3']))).toBe(1)
})
it('skipDefinitionBytes', () => {
expect(skipDefinitionBytes(100)).toBe(6)
expect(skipDefinitionBytes(1000)).toBe(7)
})
it('isListLike', () => {
expect(isListLike(getSchemaPath(schema, []))).toBe(false)
expect(isListLike(getSchemaPath(schema, ['child1']))).toBe(false)