Fix rowStart in onChunk callback.

Also split out readFiles tests.
This commit is contained in:
Kenny Daniel 2024-04-12 13:09:31 -07:00
parent dd91122753
commit 00cbb6a3e6
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
6 changed files with 86 additions and 60 deletions

@ -27,15 +27,15 @@
"typecheck": "tsc"
},
"devDependencies": {
"@types/node": "20.12.5",
"@typescript-eslint/eslint-plugin": "7.5.0",
"@types/node": "20.12.7",
"@typescript-eslint/eslint-plugin": "7.6.0",
"@vitest/coverage-v8": "1.4.0",
"eslint": "8.57.0",
"eslint-plugin-import": "2.29.1",
"eslint-plugin-jsdoc": "48.2.3",
"http-server": "14.1.1",
"hysnappy": "0.3.0",
"typescript": "5.4.4",
"typescript": "5.4.5",
"vitest": "1.4.0"
}
}

@ -51,7 +51,7 @@ export async function parquetRead(options) {
// if row group overlaps with row range, read it
if (groupStart + groupRows >= rowStart && groupStart < rowEnd) {
// read row group
const groupData = await readRowGroup(options, rowGroup)
const groupData = await readRowGroup(options, rowGroup, groupStart)
if (onComplete) {
// filter to rows in range
const start = Math.max(rowStart - groupStart, 0)
@ -78,9 +78,10 @@ export async function parquetRead(options) {
* @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed
* @param {Compressors} [options.compressors] custom decompressors
* @param {RowGroup} rowGroup row group to read
* @param {number} groupStart row index of the first row in the group
* @returns {Promise<any[][]>} resolves to row data
*/
async function readRowGroup(options, rowGroup) {
async function readRowGroup(options, rowGroup, groupStart) {
const { file, metadata, columns, compressors } = options
if (!metadata) throw new Error('parquet metadata not found')
@ -201,8 +202,13 @@ async function readRowGroup(options, rowGroup) {
// do not emit column data until structs are fully parsed
if (!columnData) return
// notify caller of column data
if (options.onChunk) options.onChunk({ columnName, columnData, rowStart: 0, rowEnd: columnData.length })
// add column data to group data only if onComplete is defined
options.onChunk?.({
columnName,
columnData,
rowStart: groupStart,
rowEnd: groupStart + columnData.length,
})
// add colum data to group data only if onComplete is defined
if (options.onComplete) addColumn(groupData, outputColumnIndex, columnData)
outputColumnIndex++
}))

@ -8,7 +8,7 @@ const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet'))
describe('parquetMetadata', () => {
files.forEach(file => {
it(`should parse metadata from ${file}`, async () => {
it(`parse metadata from ${file}`, async () => {
const arrayBuffer = await readFileToArrayBuffer(`test/files/${file}`)
const result = toJson(parquetMetadata(arrayBuffer))
const base = file.replace('.parquet', '')
@ -52,7 +52,7 @@ describe('parquetMetadata', () => {
describe('parquetMetadataAsync', () => {
files.forEach(file => {
it(`should parse metadata async from ${file}`, async () => {
it(`parse metadata async from ${file}`, async () => {
const asyncBuffer = fileToAsyncBuffer(`test/files/${file}`)
const result = await parquetMetadataAsync(asyncBuffer)
const base = file.replace('.parquet', '')

@ -1,52 +1,36 @@
import fs from 'fs'
import { describe, expect, it } from 'vitest'
import { gunzipSync } from 'zlib'
import { parquetRead } from '../src/hyparquet.js'
import { toJson } from '../src/utils.js'
import { fileToAsyncBuffer, fileToJson } from './helpers.js'
/**
* @typedef {import('../src/types.js').Compressors} Compressors
* @type {Compressors}
*/
const compressors = {
GZIP: (/** @type {Uint8Array} */ input, /** @type {number} */ outputLength) => {
const result = gunzipSync(input)
return new Uint8Array(result.buffer, result.byteOffset, outputLength)
},
}
import { fileToAsyncBuffer } from './helpers.js'
describe('parquetRead', () => {
const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet'))
files.forEach(file => {
it(`should parse data from ${file}`, async () => {
const asyncBuffer = fileToAsyncBuffer(`test/files/${file}`)
await parquetRead({
file: asyncBuffer,
compressors,
onComplete: (rows) => {
const base = file.replace('.parquet', '')
const expected = fileToJson(`test/files/${base}.json`)
expect(toJson(rows)).toEqual(expected)
},
})
})
})
it('throws reasonable error messages', async () => {
it('throws error for undefined file', async () => {
const file = undefined
await expect(parquetRead({ file }))
.rejects.toThrow('parquet file is required')
})
it('should read a single column from a file', async () => {
const asyncBuffer = fileToAsyncBuffer('test/files/datapage_v2.snappy.parquet')
it('filter by row', async () => {
const file = fileToAsyncBuffer('test/files/rowgroups.parquet')
await parquetRead({
file: asyncBuffer,
file,
rowEnd: 2,
onComplete: rows => {
/* eslint-disable no-sparse-arrays */
expect(toJson(rows)).toEqual([
[1], [2],
])
},
})
})
it('read a single column', async () => {
const file = fileToAsyncBuffer('test/files/datapage_v2.snappy.parquet')
await parquetRead({
file,
columns: ['c'],
onChunk: (rows) => {
expect(toJson(rows)).toEqual({
onChunk: chunk => {
expect(toJson(chunk)).toEqual({
columnName: 'c',
columnData: [2, 3, 4, 5, 2],
rowStart: 0,
@ -66,20 +50,20 @@ describe('parquetRead', () => {
})
})
it('should read a list-like column from a file', async () => {
const asyncBuffer = fileToAsyncBuffer('test/files/datapage_v2.snappy.parquet')
it('read a list-like column', async () => {
const file = fileToAsyncBuffer('test/files/datapage_v2.snappy.parquet')
await parquetRead({
file: asyncBuffer,
file,
columns: ['e'],
onChunk: (rows) => {
expect(toJson(rows)).toEqual({
onChunk: chunk => {
expect(toJson(chunk)).toEqual({
columnName: 'e',
columnData: [[1, 2, 3], null, null, [1, 2, 3], [1, 2]],
rowStart: 0,
rowEnd: 5,
})
},
onComplete: (rows) => {
onComplete: rows => {
/* eslint-disable no-sparse-arrays */
expect(toJson(rows)).toEqual([
[[1, 2, 3]],
@ -92,13 +76,13 @@ describe('parquetRead', () => {
})
})
it('should read a map-like column from a file', async () => {
const asyncBuffer = fileToAsyncBuffer('test/files/Int_Map.parquet')
it('read a map-like column', async () => {
const file = fileToAsyncBuffer('test/files/Int_Map.parquet')
await parquetRead({
file: asyncBuffer,
file,
columns: ['int_map'],
onChunk: (rows) => {
expect(toJson(rows)).toEqual({
onChunk: chunk => {
expect(toJson(chunk)).toEqual({
columnName: 'int_map',
columnData: [
{ k1: 1, k2: 100 },
@ -113,7 +97,7 @@ describe('parquetRead', () => {
rowEnd: 7,
})
},
onComplete: (rows) => {
onComplete: rows => {
/* eslint-disable no-sparse-arrays */
expect(toJson(rows)).toEqual([
[{ k1: 1, k2: 100 }],

36
test/readFiles.test.js Normal file

@ -0,0 +1,36 @@
import fs from 'fs'
import { describe, expect, it } from 'vitest'
import { gunzipSync } from 'zlib'
import { parquetRead } from '../src/hyparquet.js'
import { toJson } from '../src/utils.js'
import { fileToAsyncBuffer, fileToJson } from './helpers.js'
/**
* @typedef {import('../src/types.js').Compressors} Compressors
* @type {Compressors}
*/
const compressors = {
GZIP: (/** @type {Uint8Array} */ input, /** @type {number} */ outputLength) => {
const result = gunzipSync(input)
return new Uint8Array(result.buffer, result.byteOffset, outputLength)
},
}
describe('parquetRead test files', () => {
const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet'))
files.forEach(filename => {
it(`parse data from ${filename}`, async () => {
const file = fileToAsyncBuffer(`test/files/${filename}`)
await parquetRead({
file,
compressors,
onComplete: (rows) => {
const base = filename.replace('.parquet', '')
const expected = fileToJson(`test/files/${base}.json`)
expect(toJson(rows)).toEqual(expected)
},
})
})
})
})

@ -3,14 +3,14 @@ import { parquetMetadata, parquetSchema } from '../src/hyparquet.js'
import { readFileToArrayBuffer } from './helpers.js'
describe('schemaTree', () => {
it('should parse schema tree from addrtype-missing-value.parquet', async () => {
it('parse schema tree from addrtype-missing-value.parquet', async () => {
const arrayBuffer = await readFileToArrayBuffer('test/files/addrtype-missing-value.parquet')
const metadata = parquetMetadata(arrayBuffer)
const result = parquetSchema(metadata)
expect(result).toEqual(addrtypeSchema)
})
it('should parse schema tree from rowgroups.parquet', async () => {
it('parse schema tree from rowgroups.parquet', async () => {
const arrayBuffer = await readFileToArrayBuffer('test/files/rowgroups.parquet')
const metadata = parquetMetadata(arrayBuffer)
const result = parquetSchema(metadata)