From c38151e32f3c6db2ad4646261731a8521fc68ceb Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Tue, 8 Apr 2025 00:53:45 -0700 Subject: [PATCH] TPCH benchmark --- benchmark.js | 75 ++++++++++++++++++++++++++++++++++++++++++++++++ eslint.config.js | 4 +++ src/plain.js | 13 +++++++-- 3 files changed, 89 insertions(+), 3 deletions(-) create mode 100644 benchmark.js diff --git a/benchmark.js b/benchmark.js new file mode 100644 index 0000000..817b5e2 --- /dev/null +++ b/benchmark.js @@ -0,0 +1,75 @@ +import { createWriteStream, promises as fs } from 'fs' +import { pipeline } from 'stream/promises' +import { asyncBufferFromFile, parquetMetadataAsync, parquetReadObjects, parquetSchema } from 'hyparquet' +import { parquetWriteFile } from './src/write.js' + +const url = 'https://s3.hyperparam.app/tpch-lineitem-v2.parquet' +const filename = 'data/tpch-lineitem-v2.parquet' + +// download test parquet file if needed +let stat = await fs.stat(filename).catch(() => undefined) +if (!stat) { + console.log('downloading ' + url) + const res = await fetch(url) + if (!res.ok) throw new Error(res.statusText) + // write to file async + await pipeline(res.body, createWriteStream(filename)) + stat = await fs.stat(filename) + console.log('downloaded example.parquet', stat.size) +} + +// asyncBuffer +const file = await asyncBufferFromFile(filename) +console.log(`parsing ${filename} (${stat.size.toLocaleString()} bytes)`) +let startTime = performance.now() + +// read parquet file +const metadata = await parquetMetadataAsync(file) +const rows = await parquetReadObjects({ + file, + metadata, + columns: ['l_comment'], + rowStart: 0, + rowEnd: 100_000, +}) +let ms = performance.now() - startTime +console.log(`parsed ${filename} ${rows.length.toLocaleString()} rows in ${ms.toFixed(0)} ms`) + +// transpose rows +const schema = parquetSchema(metadata) +const columnData = schema.children.map(({ element }) => ({ + // name: element.name, + // type: element.type, + ...element, + data: [], +})).filter(({ name }) => name === 'l_comment') +for (const row of rows) { + for (const { name, data } of columnData) { + data.push(row[name]) + } +} + +// write parquet file +const outputFilename = 'data/output-tpch.parquet' +console.log(`writing ${outputFilename} (${rows.length.toLocaleString()} rows)`) +startTime = performance.now() +parquetWriteFile({ + filename: outputFilename, + columnData, +}) +ms = performance.now() - startTime +stat = await fs.stat(outputFilename) +console.log(`wrote ${outputFilename} (${stat.size.toLocaleString()} bytes) in ${ms.toFixed(0)} ms`) + +// check data is the same +const outputFile = await asyncBufferFromFile(outputFilename) +const outputRows = await parquetReadObjects({ file: outputFile }) +for (let i = 0; i < rows.length; i++) { + const inputRow = JSON.stringify(rows[i]) + const outputRow = JSON.stringify(outputRows[i]) + if (inputRow !== outputRow) { + console.log(`row ${i} mismatch`) + console.log('input ', inputRow) + console.log('output', outputRow) + } +} diff --git a/eslint.config.js b/eslint.config.js index 8f2cde6..becee16 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -11,6 +11,10 @@ export default [ globals: { 'TextDecoder': false, 'TextEncoder': false, + // for benchmark: + 'console': false, + 'fetch': false, + 'performance': false, }, }, diff --git a/src/plain.js b/src/plain.js index 97b91e3..e5bd8ae 100644 --- a/src/plain.js +++ b/src/plain.js @@ -104,9 +104,16 @@ function writePlainDouble(writer, values) { */ function writePlainByteArray(writer, values) { for (const value of values) { - if (!(value instanceof Uint8Array)) throw new Error('parquet expected Uint8Array value') - writer.appendUint32(value.length) - writer.appendBytes(value) + let bytes = value + if (typeof bytes === 'string') { + // convert string to Uint8Array + bytes = new TextEncoder().encode(value) + } + if (!(bytes instanceof Uint8Array)) { + throw new Error('parquet byte array expected Uint8Array value') + } + writer.appendUint32(bytes.length) + writer.appendBytes(bytes) } }