diff --git a/package.json b/package.json index 8f6fa9b..63b9a6f 100644 --- a/package.json +++ b/package.json @@ -27,15 +27,15 @@ "typecheck": "tsc" }, "devDependencies": { - "@types/node": "20.12.5", - "@typescript-eslint/eslint-plugin": "7.5.0", + "@types/node": "20.12.7", + "@typescript-eslint/eslint-plugin": "7.6.0", "@vitest/coverage-v8": "1.4.0", "eslint": "8.57.0", "eslint-plugin-import": "2.29.1", "eslint-plugin-jsdoc": "48.2.3", "http-server": "14.1.1", "hysnappy": "0.3.0", - "typescript": "5.4.4", + "typescript": "5.4.5", "vitest": "1.4.0" } } diff --git a/src/read.js b/src/read.js index ff2289c..33c03d6 100644 --- a/src/read.js +++ b/src/read.js @@ -51,7 +51,7 @@ export async function parquetRead(options) { // if row group overlaps with row range, read it if (groupStart + groupRows >= rowStart && groupStart < rowEnd) { // read row group - const groupData = await readRowGroup(options, rowGroup) + const groupData = await readRowGroup(options, rowGroup, groupStart) if (onComplete) { // filter to rows in range const start = Math.max(rowStart - groupStart, 0) @@ -78,9 +78,10 @@ export async function parquetRead(options) { * @param {(rows: any[][]) => void} [options.onComplete] called when all requested rows and columns are parsed * @param {Compressors} [options.compressors] custom decompressors * @param {RowGroup} rowGroup row group to read + * @param {number} groupStart row index of the first row in the group * @returns {Promise} resolves to row data */ -async function readRowGroup(options, rowGroup) { +async function readRowGroup(options, rowGroup, groupStart) { const { file, metadata, columns, compressors } = options if (!metadata) throw new Error('parquet metadata not found') @@ -201,8 +202,13 @@ async function readRowGroup(options, rowGroup) { // do not emit column data until structs are fully parsed if (!columnData) return // notify caller of column data - if (options.onChunk) options.onChunk({ columnName, columnData, rowStart: 0, rowEnd: columnData.length }) - // add column data to group data only if onComplete is defined + options.onChunk?.({ + columnName, + columnData, + rowStart: groupStart, + rowEnd: groupStart + columnData.length, + }) + // add colum data to group data only if onComplete is defined if (options.onComplete) addColumn(groupData, outputColumnIndex, columnData) outputColumnIndex++ })) diff --git a/test/metadata.test.js b/test/metadata.test.js index 2bed334..539970f 100644 --- a/test/metadata.test.js +++ b/test/metadata.test.js @@ -8,7 +8,7 @@ const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet')) describe('parquetMetadata', () => { files.forEach(file => { - it(`should parse metadata from ${file}`, async () => { + it(`parse metadata from ${file}`, async () => { const arrayBuffer = await readFileToArrayBuffer(`test/files/${file}`) const result = toJson(parquetMetadata(arrayBuffer)) const base = file.replace('.parquet', '') @@ -52,7 +52,7 @@ describe('parquetMetadata', () => { describe('parquetMetadataAsync', () => { files.forEach(file => { - it(`should parse metadata async from ${file}`, async () => { + it(`parse metadata async from ${file}`, async () => { const asyncBuffer = fileToAsyncBuffer(`test/files/${file}`) const result = await parquetMetadataAsync(asyncBuffer) const base = file.replace('.parquet', '') diff --git a/test/read.test.js b/test/read.test.js index 22446c8..7f2bf32 100644 --- a/test/read.test.js +++ b/test/read.test.js @@ -1,52 +1,36 @@ -import fs from 'fs' import { describe, expect, it } from 'vitest' -import { gunzipSync } from 'zlib' import { parquetRead } from '../src/hyparquet.js' import { toJson } from '../src/utils.js' -import { fileToAsyncBuffer, fileToJson } from './helpers.js' - -/** - * @typedef {import('../src/types.js').Compressors} Compressors - * @type {Compressors} - */ -const compressors = { - GZIP: (/** @type {Uint8Array} */ input, /** @type {number} */ outputLength) => { - const result = gunzipSync(input) - return new Uint8Array(result.buffer, result.byteOffset, outputLength) - }, -} +import { fileToAsyncBuffer } from './helpers.js' describe('parquetRead', () => { - const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet')) - - files.forEach(file => { - it(`should parse data from ${file}`, async () => { - const asyncBuffer = fileToAsyncBuffer(`test/files/${file}`) - await parquetRead({ - file: asyncBuffer, - compressors, - onComplete: (rows) => { - const base = file.replace('.parquet', '') - const expected = fileToJson(`test/files/${base}.json`) - expect(toJson(rows)).toEqual(expected) - }, - }) - }) - }) - - it('throws reasonable error messages', async () => { + it('throws error for undefined file', async () => { const file = undefined await expect(parquetRead({ file })) .rejects.toThrow('parquet file is required') }) - it('should read a single column from a file', async () => { - const asyncBuffer = fileToAsyncBuffer('test/files/datapage_v2.snappy.parquet') + it('filter by row', async () => { + const file = fileToAsyncBuffer('test/files/rowgroups.parquet') await parquetRead({ - file: asyncBuffer, + file, + rowEnd: 2, + onComplete: rows => { + /* eslint-disable no-sparse-arrays */ + expect(toJson(rows)).toEqual([ + [1], [2], + ]) + }, + }) + }) + + it('read a single column', async () => { + const file = fileToAsyncBuffer('test/files/datapage_v2.snappy.parquet') + await parquetRead({ + file, columns: ['c'], - onChunk: (rows) => { - expect(toJson(rows)).toEqual({ + onChunk: chunk => { + expect(toJson(chunk)).toEqual({ columnName: 'c', columnData: [2, 3, 4, 5, 2], rowStart: 0, @@ -66,20 +50,20 @@ describe('parquetRead', () => { }) }) - it('should read a list-like column from a file', async () => { - const asyncBuffer = fileToAsyncBuffer('test/files/datapage_v2.snappy.parquet') + it('read a list-like column', async () => { + const file = fileToAsyncBuffer('test/files/datapage_v2.snappy.parquet') await parquetRead({ - file: asyncBuffer, + file, columns: ['e'], - onChunk: (rows) => { - expect(toJson(rows)).toEqual({ + onChunk: chunk => { + expect(toJson(chunk)).toEqual({ columnName: 'e', columnData: [[1, 2, 3], null, null, [1, 2, 3], [1, 2]], rowStart: 0, rowEnd: 5, }) }, - onComplete: (rows) => { + onComplete: rows => { /* eslint-disable no-sparse-arrays */ expect(toJson(rows)).toEqual([ [[1, 2, 3]], @@ -92,13 +76,13 @@ describe('parquetRead', () => { }) }) - it('should read a map-like column from a file', async () => { - const asyncBuffer = fileToAsyncBuffer('test/files/Int_Map.parquet') + it('read a map-like column', async () => { + const file = fileToAsyncBuffer('test/files/Int_Map.parquet') await parquetRead({ - file: asyncBuffer, + file, columns: ['int_map'], - onChunk: (rows) => { - expect(toJson(rows)).toEqual({ + onChunk: chunk => { + expect(toJson(chunk)).toEqual({ columnName: 'int_map', columnData: [ { k1: 1, k2: 100 }, @@ -113,7 +97,7 @@ describe('parquetRead', () => { rowEnd: 7, }) }, - onComplete: (rows) => { + onComplete: rows => { /* eslint-disable no-sparse-arrays */ expect(toJson(rows)).toEqual([ [{ k1: 1, k2: 100 }], diff --git a/test/readFiles.test.js b/test/readFiles.test.js new file mode 100644 index 0000000..c05a596 --- /dev/null +++ b/test/readFiles.test.js @@ -0,0 +1,36 @@ +import fs from 'fs' +import { describe, expect, it } from 'vitest' +import { gunzipSync } from 'zlib' +import { parquetRead } from '../src/hyparquet.js' +import { toJson } from '../src/utils.js' +import { fileToAsyncBuffer, fileToJson } from './helpers.js' + +/** + * @typedef {import('../src/types.js').Compressors} Compressors + * @type {Compressors} + */ +const compressors = { + GZIP: (/** @type {Uint8Array} */ input, /** @type {number} */ outputLength) => { + const result = gunzipSync(input) + return new Uint8Array(result.buffer, result.byteOffset, outputLength) + }, +} + +describe('parquetRead test files', () => { + const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet')) + + files.forEach(filename => { + it(`parse data from ${filename}`, async () => { + const file = fileToAsyncBuffer(`test/files/${filename}`) + await parquetRead({ + file, + compressors, + onComplete: (rows) => { + const base = filename.replace('.parquet', '') + const expected = fileToJson(`test/files/${base}.json`) + expect(toJson(rows)).toEqual(expected) + }, + }) + }) + }) +}) diff --git a/test/schemaTree.test.js b/test/schemaTree.test.js index 7c73ba8..9c1f8d9 100644 --- a/test/schemaTree.test.js +++ b/test/schemaTree.test.js @@ -3,14 +3,14 @@ import { parquetMetadata, parquetSchema } from '../src/hyparquet.js' import { readFileToArrayBuffer } from './helpers.js' describe('schemaTree', () => { - it('should parse schema tree from addrtype-missing-value.parquet', async () => { + it('parse schema tree from addrtype-missing-value.parquet', async () => { const arrayBuffer = await readFileToArrayBuffer('test/files/addrtype-missing-value.parquet') const metadata = parquetMetadata(arrayBuffer) const result = parquetSchema(metadata) expect(result).toEqual(addrtypeSchema) }) - it('should parse schema tree from rowgroups.parquet', async () => { + it('parse schema tree from rowgroups.parquet', async () => { const arrayBuffer = await readFileToArrayBuffer('test/files/rowgroups.parquet') const metadata = parquetMetadata(arrayBuffer) const result = parquetSchema(metadata)