diff --git a/benchmark.js b/benchmark.js index 489d14d..ca1a22c 100644 --- a/benchmark.js +++ b/benchmark.js @@ -1,7 +1,8 @@ -import { createReadStream, createWriteStream, promises as fs } from 'fs' +import { createWriteStream, promises as fs } from 'fs' import { compressors } from 'hyparquet-compressors' import { pipeline } from 'stream/promises' import { parquetRead } from './src/hyparquet.js' +import { asyncBufferFromFile } from './src/utils.js' const url = 'https://huggingface.co/datasets/wikimedia/wikipedia/resolve/main/20231101.en/train-00000-of-00041.parquet' const filename = 'example.parquet' @@ -19,14 +20,7 @@ if (!stat) { } // asyncBuffer -const file = { - byteLength: stat.size, - async slice(start, end) { - // read file slice - const readStream = createReadStream(filename, { start, end }) - return await readStreamToArrayBuffer(readStream) - }, -} +const file = await asyncBufferFromFile(filename) const startTime = performance.now() console.log('parsing example.parquet data...') @@ -37,21 +31,3 @@ await parquetRead({ }) const ms = performance.now() - startTime console.log(`parsed ${stat.size.toLocaleString()} bytes in ${ms.toFixed(0)} ms`) - -/** - * Convert a web ReadableStream to ArrayBuffer. - * - * @param {ReadStream} input - * @returns {Promise} - */ -function readStreamToArrayBuffer(input) { - return new Promise((resolve, reject) => { - const chunks = [] - input.on('data', chunk => chunks.push(chunk)) - input.on('end', () => { - const buffer = Buffer.concat(chunks) - resolve(buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength)) - }) - input.on('error', reject) - }) -} diff --git a/src/utils.js b/src/utils.js index 55f057f..c839143 100644 --- a/src/utils.js +++ b/src/utils.js @@ -65,3 +65,41 @@ export async function asyncBufferFromUrl(url) { }, } } + +/** + * Construct an AsyncBuffer for a local file using node fs package. + * + * @param {string} filename + * @returns {Promise} + */ +export async function asyncBufferFromFile(filename) { + const fs = await import('fs') + const stat = await fs.promises.stat(filename) + return { + byteLength: stat.size, + async slice(start, end) { + // read file slice + const readStream = fs.createReadStream(filename, { start, end }) + return await readStreamToArrayBuffer(readStream) + }, + } +} + +/** + * Convert a node ReadStream to ArrayBuffer. + * + * @param {import('stream').Readable} input + * @returns {Promise} + */ +function readStreamToArrayBuffer(input) { + return new Promise((resolve, reject) => { + /** @type {Buffer[]} */ + const chunks = [] + input.on('data', chunk => chunks.push(chunk)) + input.on('end', () => { + const buffer = Buffer.concat(chunks) + resolve(buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength)) + }) + input.on('error', reject) + }) +} diff --git a/test/helpers.js b/test/helpers.js index fb575fb..85fe9a3 100644 --- a/test/helpers.js +++ b/test/helpers.js @@ -1,29 +1,5 @@ import fs from 'fs' -/** - * Helper function to read .parquet file into ArrayBuffer - * - * @param {string} filePath - * @returns {Promise} - */ -export async function readFileToArrayBuffer(filePath) { - const buffer = await fs.promises.readFile(filePath) - return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength) -} - -/** - * Wrap .parquet file in an AsyncBuffer - * - * @param {string} filePath - * @returns {import('../src/types.js').AsyncBuffer} - */ -export function fileToAsyncBuffer(filePath) { - return { - byteLength: fs.statSync(filePath).size, - slice: async (start, end) => (await readFileToArrayBuffer(filePath)).slice(start, end), - } -} - /** * Read .parquet file into JSON * diff --git a/test/metadata.test.js b/test/metadata.test.js index fb5aa5a..56049a9 100644 --- a/test/metadata.test.js +++ b/test/metadata.test.js @@ -1,15 +1,16 @@ import fs from 'fs' import { describe, expect, it } from 'vitest' import { parquetMetadata, parquetMetadataAsync } from '../src/hyparquet.js' -import { toJson } from '../src/utils.js' -import { fileToAsyncBuffer, fileToJson, readFileToArrayBuffer } from './helpers.js' +import { asyncBufferFromFile, toJson } from '../src/utils.js' +import { fileToJson } from './helpers.js' const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet')) describe('parquetMetadata', () => { files.forEach(file => { it(`parse metadata from ${file}`, async () => { - const arrayBuffer = await readFileToArrayBuffer(`test/files/${file}`) + const asyncBuffer = await asyncBufferFromFile(`test/files/${file}`) + const arrayBuffer = await asyncBuffer.slice(0) const result = toJson(parquetMetadata(arrayBuffer)) const base = file.replace('.parquet', '') const expected = fileToJson(`test/files/${base}.metadata.json`) @@ -53,7 +54,7 @@ describe('parquetMetadata', () => { describe('parquetMetadataAsync', () => { files.forEach(file => { it(`parse metadata async from ${file}`, async () => { - const asyncBuffer = fileToAsyncBuffer(`test/files/${file}`) + const asyncBuffer = await asyncBufferFromFile(`test/files/${file}`) const result = await parquetMetadataAsync(asyncBuffer) const base = file.replace('.parquet', '') const expected = fileToJson(`test/files/${base}.metadata.json`) diff --git a/test/read.test.js b/test/read.test.js index 6d1b472..82ff09d 100644 --- a/test/read.test.js +++ b/test/read.test.js @@ -1,7 +1,6 @@ import { describe, expect, it } from 'vitest' import { parquetRead } from '../src/hyparquet.js' -import { toJson } from '../src/utils.js' -import { fileToAsyncBuffer } from './helpers.js' +import { asyncBufferFromFile, toJson } from '../src/utils.js' describe('parquetRead', () => { it('throws error for undefined file', async () => { @@ -18,7 +17,7 @@ describe('parquetRead', () => { }) it('filter by row', async () => { - const file = fileToAsyncBuffer('test/files/rowgroups.parquet') + const file = await asyncBufferFromFile('test/files/rowgroups.parquet') await parquetRead({ file, rowStart: 2, @@ -30,7 +29,7 @@ describe('parquetRead', () => { }) it('filter by row overestimate', async () => { - const file = fileToAsyncBuffer('test/files/rowgroups.parquet') + const file = await asyncBufferFromFile('test/files/rowgroups.parquet') await parquetRead({ file, rowEnd: 100, @@ -41,7 +40,7 @@ describe('parquetRead', () => { }) it('read a single column', async () => { - const file = fileToAsyncBuffer('test/files/datapage_v2.snappy.parquet') + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') await parquetRead({ file, columns: ['c'], @@ -66,7 +65,7 @@ describe('parquetRead', () => { }) it('read a list-like column', async () => { - const file = fileToAsyncBuffer('test/files/datapage_v2.snappy.parquet') + const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet') await parquetRead({ file, columns: ['e'], @@ -91,7 +90,7 @@ describe('parquetRead', () => { }) it('read a map-like column', async () => { - const file = fileToAsyncBuffer('test/files/nullable.impala.parquet') + const file = await asyncBufferFromFile('test/files/nullable.impala.parquet') await parquetRead({ file, columns: ['int_map'], diff --git a/test/readFiles.test.js b/test/readFiles.test.js index 688a2e5..5dc0e02 100644 --- a/test/readFiles.test.js +++ b/test/readFiles.test.js @@ -2,15 +2,15 @@ import fs from 'fs' import { compressors } from 'hyparquet-compressors' import { describe, expect, it } from 'vitest' import { parquetRead } from '../src/hyparquet.js' -import { toJson } from '../src/utils.js' -import { fileToAsyncBuffer, fileToJson } from './helpers.js' +import { asyncBufferFromFile, toJson } from '../src/utils.js' +import { fileToJson } from './helpers.js' describe('parquetRead test files', () => { const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet')) files.forEach(filename => { it(`parse data from ${filename}`, async () => { - const file = fileToAsyncBuffer(`test/files/${filename}`) + const file = await asyncBufferFromFile(`test/files/${filename}`) await parquetRead({ file, compressors, diff --git a/test/schemaTree.test.js b/test/schemaTree.test.js index 0ccadec..4189314 100644 --- a/test/schemaTree.test.js +++ b/test/schemaTree.test.js @@ -1,11 +1,11 @@ import { describe, expect, it } from 'vitest' -import { parquetMetadata, parquetSchema } from '../src/hyparquet.js' -import { readFileToArrayBuffer } from './helpers.js' +import { parquetMetadataAsync, parquetSchema } from '../src/hyparquet.js' +import { asyncBufferFromFile } from '../src/utils.js' describe('parquetSchema', () => { it('parse schema tree from rowgroups.parquet', async () => { - const arrayBuffer = await readFileToArrayBuffer('test/files/rowgroups.parquet') - const metadata = parquetMetadata(arrayBuffer) + const arrayBuffer = await asyncBufferFromFile('test/files/rowgroups.parquet') + const metadata = await parquetMetadataAsync(arrayBuffer) const result = parquetSchema(metadata) expect(result).toEqual(rowgroupsSchema) })