Oops fix the other tests

This commit is contained in:
Kenny Daniel 2024-02-26 19:33:38 -08:00
parent 5147dbe709
commit 87d78ab06e
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
6 changed files with 47 additions and 20 deletions

@ -11,7 +11,6 @@
"plugins": ["import", "jsdoc"],
"rules": {
"@typescript-eslint/no-explicit-any": "warn",
"@typescript-eslint/no-loss-of-precision": "warn",
"@typescript-eslint/no-unused-vars": "warn",
"arrow-spacing": "error",
"camelcase": "off",

@ -49,7 +49,9 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
const daph = header.data_page_header
if (!daph) throw new Error('parquet data page header is undefined')
const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec)
const page = decompressPage(
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec
)
const { definitionLevels, repetitionLevels, value: dataPage } = readDataPage(page, daph, schema, columnMetadata)
valuesSeen += daph.num_values
@ -64,7 +66,9 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
const isNull = columnMetadata && !isRequired(schema, [columnMetadata.path_in_schema[0]])
const nullValue = false // TODO: unused?
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
values = assembleObjects(definitionLevels, repetitionLevels, dataPage, isNull, nullValue, maxDefinitionLevel, rowIndex[0])
values = assembleObjects(
definitionLevels, repetitionLevels, dataPage, isNull, nullValue, maxDefinitionLevel, rowIndex[0]
)
} else if (definitionLevels?.length) {
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
// Use definition levels to skip nulls
@ -86,7 +90,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
// values.length !== daph.num_values isn't right. In cases like arrays,
// you need the total number of children, not the number of top-level values.
rowData.push(...Array.from(values))
rowData.push(...values)
} else if (header.type === PageType.DICTIONARY_PAGE) {
const diph = header.dictionary_page_header
if (!diph) throw new Error('parquet dictionary page header is undefined')

@ -109,9 +109,7 @@ export function readDataPage(bytes, daph, schema, columnMetadata) {
*/
export function readDictionaryPage(bytes, diph, schema, columnMetadata) {
const dataView = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
// read values based on encoding
const { value } = readPlain(dataView, columnMetadata.type, diph.num_values, 0, false)
return value
return readPlain(dataView, columnMetadata.type, diph.num_values, 0, false).value
}
/**

@ -98,14 +98,13 @@ export function readDataPageV2(compressedBytes, ph, schema, columnMetadata) {
*/
export function readRepetitionLevelsV2(dataView, offset, daph2, schema, columnMetadata) {
const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema)
if (maxRepetitionLevel) {
const bitWidth = widthFromMaxInt(maxRepetitionLevel)
// num_values is index 1 for either type of page header
return readRleBitPackedHybrid(
dataView, offset, bitWidth, daph2.repetition_levels_byte_length, daph2.num_values
).value
}
return []
if (!maxRepetitionLevel) return []
const bitWidth = widthFromMaxInt(maxRepetitionLevel)
// num_values is index 1 for either type of page header
return readRleBitPackedHybrid(
dataView, offset, bitWidth, daph2.repetition_levels_byte_length, daph2.num_values
).value
}
/**
@ -158,11 +157,12 @@ function deltaBinaryUnpack(page, nValues, values) {
// no more diffs if on last value, delta read bitpacked
let data = 0
let stop = -bitWidth
// TODO: possible loss of precision
const mask = 0xffffffffffffffff >> (64 - bitWidth)
// only works for bitWidth < 31
const mask = (1 << bitWidth) - 1
while (count) {
if (stop < 0) {
data = ((data & 0x00ffffffffffffff) << 8) | dataView.getUint8(offset++)
// fails when data gets too large
data = (data << 8) | dataView.getUint8(offset++)
stop += 8
} else {
values.push((data >> stop) & mask)

@ -249,7 +249,7 @@ export function readRleBitPackedHybrid(dataView, offset, width, length, numValue
}
const value = []
const startByteLength = byteLength
while (offset + byteLength - startByteLength < length) {
while (byteLength - startByteLength < length && value.length < numValues) {
const [header, newOffset] = readVarInt(dataView, offset + byteLength)
byteLength = newOffset - offset
if ((header & 1) === 0) {

@ -5,7 +5,7 @@ import { toJson } from '../src/toJson.js'
import { fileToAsyncBuffer, fileToJson } from './helpers.js'
describe('parquetRead', () => {
const files = fs.readdirSync('test/files').filter(f => f.endsWith('y.parquet'))
const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet'))
files.forEach(file => {
it(`should parse data from ${file}`, async () => {
@ -20,4 +20,30 @@ describe('parquetRead', () => {
})
})
})
it('should read a single column from a file', async () => {
const asyncBuffer = fileToAsyncBuffer('test/files/datapage_v2.snappy.parquet')
await parquetRead({
file: asyncBuffer,
columns: [2],
onChunk: (rows) => {
expect(toJson(rows)).toEqual({
column: 2,
data: [2, 3, 4, 5, 2],
rowStart: 0,
rowEnd: 5,
})
},
onComplete: (rows) => {
/* eslint-disable no-sparse-arrays */
expect(toJson(rows)).toEqual([
[,, 2],
[,, 3],
[,, 4],
[,, 5],
[,, 2],
])
},
})
})
})